Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 99 additions & 9 deletions waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"encoding/json"
"errors"
"sync"
"time"

"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
Expand All @@ -33,6 +33,16 @@ func (fc FilterConfig) String() string {
const filterSubLoopInterval = 5 * time.Second
const filterSubMaxErrCnt = 3

// filterRateLimitBackoff is how long the apiSub waits before re-issuing a
// subscribe attempt after at least one peer returned HTTP 429
// ("filter request rejected due rate limit exceeded"). The waku server uses
// 429 to ask clients to slow down; the previous implementation flattened the
// typed peer error into a plain *errors.errorString so the apiSub never saw
// the signal and kept retrying aggressively. With the typed *SubscribeError
// (see protocol/filter/subscribe_error.go) the apiSub now honors the signal
// by suppressing retries for filterRateLimitBackoff after a 429.
const filterRateLimitBackoff = 60 * time.Second

type Sub struct {
ContentFilter protocol.ContentFilter
DataCh chan *protocol.Envelope
Expand All @@ -47,6 +57,16 @@ type Sub struct {
resubscribeInProgress bool
id string
errcnt int
// rateLimitedUntil is set when subscribe() observes a *SubscribeError whose
// FailedPeers contain at least one HTTP 429. While time.Now().Before(rateLimitedUntil),
// subscriptionLoop suppresses retry triggers (ticker push and checkAndResubscribe).
// Cleared by a successful subscribe(). Read/written only from the subscriptionLoop
// goroutine; no lock needed.
rateLimitedUntil time.Time
// multiplexWG tracks per-subscription goroutines that forward envelopes from
// subDetails.C to DataCh. cleanup() must wait for them before close(DataCh)
// to avoid "send on closed channel" panics during teardown.
multiplexWG sync.WaitGroup
}

type subscribeParameters struct {
Expand Down Expand Up @@ -119,6 +139,12 @@ func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) {
select {
case <-ticker.C:
apiSub.errcnt = 0 //reset errorCount
if shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()) {
apiSub.log.Debug("ticker push suppressed by rate-limit backoff",
zap.Time("rate-limited-until", apiSub.rateLimitedUntil),
)
continue
}
if apiSub.onlineChecker.IsOnline() && len(apiSub.subs) < apiSub.Config.MaxPeers &&
!apiSub.resubscribeInProgress && len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- ""
Expand All @@ -128,17 +154,27 @@ func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) {
apiSub.cleanup()
return
case subId := <-apiSub.closing:
if shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()) {
apiSub.log.Debug("checkAndResubscribe suppressed by rate-limit backoff",
zap.Time("rate-limited-until", apiSub.rateLimitedUntil),
)
continue
}
if apiSub.errcnt < filterSubMaxErrCnt {
apiSub.resubscribeInProgress = true
//trigger resubscribe flow for subscription.
apiSub.checkAndResubscribe(subId)
} else {
apiSub.log.Debug("retry suppressed by errcnt bound",
zap.Int("errcnt", apiSub.errcnt),
zap.Int("filter-sub-max-err-cnt", filterSubMaxErrCnt),
)
}
}
}
}

func (apiSub *Sub) checkAndResubscribe(subId string) {

var failedPeer peer.ID
if subId != "" {
apiSub.log.Debug("subscription close and resubscribe", zap.String("sub-id", subId), zap.Stringer("content-filter", apiSub.ContentFilter))
Expand All @@ -164,6 +200,9 @@ func (apiSub *Sub) cleanup() {
apiSub.log.Info("failed to unsubscribe filter", zap.Error(err))
}
}
// Wait for in-flight multiplex goroutines to exit before closing DataCh,
// otherwise they may panic sending to a closed channel.
apiSub.multiplexWG.Wait()
close(apiSub.DataCh)
}

Expand All @@ -188,8 +227,12 @@ func (apiSub *Sub) resubscribe(failedPeer peer.ID) {
apiSub.multiplex(subs)
}

func possibleRecursiveError(err error) bool {
return errors.Is(err, utils.ErrNoPeersAvailable) || errors.Is(err, swarm.ErrDialBackoff)
// shouldHonourRateLimitBackoff reports whether the apiSub is currently within
// a rate-limit backoff window and should skip retry triggers. now == rateLimitedUntil
// is treated as "window has just elapsed" → false (allow retry), so a zero-value
// rateLimitedUntil (never set) is always false.
func shouldHonourRateLimitBackoff(rateLimitedUntil, now time.Time) bool {
return now.Before(rateLimitedUntil)
}

func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
Expand All @@ -206,9 +249,31 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
subs, err := apiSub.wf.Subscribe(apiSub.ctx, contentFilter, options...)

if err != nil {
if possibleRecursiveError(err) {
apiSub.errcnt++
apiSub.log.Warn("subscribe error",
zap.Error(err),
zap.Int("errcnt-before-inc", apiSub.errcnt),
)

// If any peer responded HTTP 429, enter a rate-limit backoff window.
// subscriptionLoop's gates will suppress retry triggers for
// filterRateLimitBackoff. The typed *SubscribeError comes from
// protocol/filter/client.go.
var subErr *filter.SubscribeError
if errors.As(err, &subErr) && subErr.HasRateLimitError() {
apiSub.rateLimitedUntil = time.Now().Add(filterRateLimitBackoff)
apiSub.log.Warn("rate-limited by peer, backing off",
zap.Duration("backoff", filterRateLimitBackoff),
zap.Time("until", apiSub.rateLimitedUntil),
zap.Int("failed-peers", len(subErr.FailedPeers)),
)
}

apiSub.errcnt++
apiSub.log.Debug("errcnt incremented",
zap.Int("new-errcnt", apiSub.errcnt),
zap.Error(err),
)

//Inform of error, so that resubscribe can be triggered if required
if len(apiSub.closing) < apiSub.Config.MaxPeers {
apiSub.closing <- ""
Expand All @@ -221,19 +286,44 @@ func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int
// TODO: Once filter error handling indicates specific error, this can be handled better.
return nil, err
}
// On full success, clear any prior rate-limit backoff so retries can resume
// normally if a fresh failure occurs later.
apiSub.rateLimitedUntil = time.Time{}
apiSub.log.Debug("subscribe success", zap.Int("subs-count", len(subs)))
return subs, nil
}

func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
// Multiplex onto single channel
// Goroutines will exit once sub channels are closed
// Goroutines exit when subDetails.C is closed or apiSub.ctx is done.
// cleanup() waits on multiplexWG before close(DataCh) to avoid a race.
for _, subDetails := range subs {
apiSub.subs[subDetails.ID] = subDetails
apiSub.multiplexWG.Add(1)
go func(subDetails *subscription.SubscriptionDetails) {
defer utils.LogOnPanic()
defer apiSub.multiplexWG.Done()
apiSub.log.Debug("new multiplex", zap.String("sub-id", subDetails.ID))
for env := range subDetails.C {
apiSub.DataCh <- env
// Both the receive and the send must be cancelable via apiSub.ctx:
// during node teardown, UnsubscribeWithSubscription may early-return
// from ErrOnNotRunning() without calling sub.Close(), leaving
// subDetails.C open forever. A bare `for env := range subDetails.C`
// would then block here, multiplexWG.Wait() in cleanup() would block
// on it, and the whole filter shutdown would deadlock.
for {
select {
case env, ok := <-subDetails.C:
if !ok {
return
}
select {
case apiSub.DataCh <- env:
case <-apiSub.ctx.Done():
return
}
case <-apiSub.ctx.Done():
return
}
}
}(subDetails)
go func(subDetails *subscription.SubscriptionDetails) {
Expand Down
51 changes: 33 additions & 18 deletions waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ type FilterManager struct {
filterSubBatchDuration time.Duration
incompleteFilterBatch map[string]filterConfig
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
// waitingToSubQueue holds filter batches that arrived while the node was offline.
// Always accessed under mgr.Lock(); a slice (rather than a bounded channel) avoids
// a deadlock where SubscribeFilter would block on a full channel while still
// holding mgr.Lock(), preventing the only drainer (checkAndProcessQueue, also
// invoked under the same lock) from running.
waitingToSubQueue []filterConfig
envProcessor EnevelopeProcessor
networkConnType byte
}
Expand Down Expand Up @@ -77,7 +82,6 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
mgr.node.SetOnlineChecker(mgr.onlineChecker)
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
mgr.networkConnType = initNetworkConnType

//parsing the subscribe params only to read the batchInterval passed.
Expand Down Expand Up @@ -142,8 +146,10 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi
go mgr.subscribeAndRunLoop(afilter)
} else {
mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(cf.ContentTopics)))
// queue existing batch as node is not online
mgr.waitingToSubQueue <- afilter
// queue existing batch as node is not online.
// Safe: caller holds mgr.Lock() so the append is atomic with respect
// to checkAndProcessQueue and other mutations.
mgr.waitingToSubQueue = append(mgr.waitingToSubQueue, afilter)
}
afilter = filterConfig{uuid.NewString(), cf}
mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", cf.PubsubTopic), zap.Stringer("content-filter", cf))
Expand Down Expand Up @@ -183,23 +189,32 @@ func (mgr *FilterManager) NetworkChange() {
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
}

// checkAndProcessQueue drains the offline-pending filter queue. For each batch
// that matches the given pubsubTopic (or always, when pubsubTopic == ""), a
// subscribe goroutine is spawned; non-matching batches are retained for a
// future call. Caller must hold mgr.Lock().
func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) {
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
if len(mgr.waitingToSubQueue) == 0 {
return
}
// Reuse the slice's backing array. Each iteration reads `af` (value copy)
// before any potential overwrite at the same index, so partitioning in place
// is safe.
remaining := mgr.waitingToSubQueue[:0]
for _, af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
remaining = append(remaining, af)
}
}
mgr.waitingToSubQueue = remaining
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
}
}

func (mgr *FilterManager) closeAndWait(wg *sync.WaitGroup, asub *SubDetails) {
Expand Down
Loading
Loading