diff --git a/pkg/messaging/api.go b/pkg/messaging/api.go index f6130e77d32..0c8fa6113cc 100644 --- a/pkg/messaging/api.go +++ b/pkg/messaging/api.go @@ -61,3 +61,13 @@ func (a *API) ResumeTransport() { a.core.stack.Transport.Resume() } } + +// PauseDataSync idles (paused==true) or re-arms (paused==false) the reliability +// layer's data-sync node so its outbound loop performs no work while the host is +// backgrounded. +func (a *API) PauseDataSync(paused bool) error { + if a.core.stack.Reliability == nil { + return nil + } + return a.core.stack.Reliability.SetPaused(paused) +} diff --git a/pkg/messaging/layers/reliability/datasync/datasync.go b/pkg/messaging/layers/reliability/datasync/datasync.go index 1caf8cfe19a..4d64c979b6c 100644 --- a/pkg/messaging/layers/reliability/datasync/datasync.go +++ b/pkg/messaging/layers/reliability/datasync/datasync.go @@ -3,6 +3,7 @@ package datasync import ( "crypto/ecdsa" "errors" + "sync/atomic" "github.com/golang/protobuf/proto" datasyncnode "github.com/status-im/mvds/node" @@ -17,12 +18,17 @@ type DataSync struct { *datasyncnode.Node // NodeTransport is the implementation of the datasync transport interface. *NodeTransport - logger *zap.Logger - sendingEnabled bool + logger *zap.Logger + // sendingEnabled gates whether Unwrap feeds received messages into the node + // for acknowledgement. Read on the inbound-message goroutine, written by + // reliability.SetPaused on the lifecycle goroutine — hence atomic. + sendingEnabled atomic.Bool } func New(node *datasyncnode.Node, transport *NodeTransport, sendingEnabled bool, logger *zap.Logger) *DataSync { - return &DataSync{Node: node, NodeTransport: transport, sendingEnabled: sendingEnabled, logger: logger} + d := &DataSync{Node: node, NodeTransport: transport, logger: logger} + d.sendingEnabled.Store(sendingEnabled) + return d } // Unwrap tries to unwrap datasync message and passes back the message to datasync in order to acknowledge any potential message and mark messages as acknowledged @@ -38,7 +44,7 @@ func (d *DataSync) Unwrap(sender *ecdsa.PublicKey, payload []byte) (*protobuf.Pa return nil, errors.New("handling non-datasync message") } else { logger.Debug("handling datasync message") - if d.sendingEnabled { + if d.sendingEnabled.Load() { d.add(sender, &datasyncMessage) } } @@ -50,6 +56,16 @@ func (d *DataSync) Stop() { d.Node.Stop() } +// SetSendingEnabled toggles whether Unwrap feeds received datasync messages into +// the node for acknowledgement. It is flipped to false around a node recreate +// (see reliability.SetPaused) so Unwrap short-circuits instead of pushing +// packets onto a transport whose consumer goroutine has exited — AddPacket is +// non-blocking and buffered, so it wouldn't hang, but those packets would be +// silently dropped during the recreate window. +func (d *DataSync) SetSendingEnabled(v bool) { + d.sendingEnabled.Store(v) +} + func (d *DataSync) add(publicKey *ecdsa.PublicKey, datasyncMessage *protobuf.Payload) { packet := datasynctransport.Packet{ Sender: datasyncpeer.PublicKeyToPeerID(*publicKey), diff --git a/pkg/messaging/layers/reliability/datasync/transport.go b/pkg/messaging/layers/reliability/datasync/transport.go index 0df0b1ec58b..02173ec1a2a 100644 --- a/pkg/messaging/layers/reliability/datasync/transport.go +++ b/pkg/messaging/layers/reliability/datasync/transport.go @@ -5,7 +5,6 @@ import ( "errors" "math" "math/rand" - "sync" "time" "github.com/status-im/mvds/protobuf" @@ -17,22 +16,14 @@ import ( const backoffInterval = 60 var errNotInitialized = errors.New("datasync transport not initialized") -var DatasyncTicker = 300 * time.Millisecond -var datasyncTickerMutex sync.RWMutex - -func SetPaused(paused bool) { - datasyncTickerMutex.Lock() - defer datasyncTickerMutex.Unlock() - if paused { - DatasyncTicker = 2 * time.Second - } else { - DatasyncTicker = 300 * time.Millisecond - } -} + +// DatasyncTicker is the mvds outbound-loop interval while running. The loop is +// idled while the host is backgrounded by reliability.SetPaused, which stops the +// mvds node and recreates it with a much larger tick — see +// pkg/messaging/layers/reliability/reliability.go. +const DatasyncTicker = 300 * time.Millisecond func currentOffsetToSecond() uint64 { - datasyncTickerMutex.RLock() - defer datasyncTickerMutex.RUnlock() return uint64(math.Ceil(float64(time.Second) / float64(DatasyncTicker))) } @@ -44,9 +35,15 @@ type NodeTransport struct { var _ transport.Transport = (*NodeTransport)(nil) +// packetBufferSize gives AddPacket some slack so a non-blocking send succeeds during +// normal operation (the watch-loop consumer isn't always precisely parked on the channel); +// the buffer only fills, and packets only drop, if the consumer is genuinely stalled +// (e.g. between Stop and the next node being started in reliability.SetPaused). +const packetBufferSize = 32 + func NewNodeTransport() *NodeTransport { return &NodeTransport{ - packets: make(chan transport.Packet), + packets: make(chan transport.Packet, packetBufferSize), } } @@ -55,8 +52,19 @@ func (t *NodeTransport) Init(dispatch func(state.PeerID, *protobuf.Payload) erro t.logger = logger } +// AddPacket hands an inbound datasync packet to the node's watch loop. The send is +// non-blocking: if the node isn't currently consuming (e.g. mid-Stop during a pause/resume +// node recreate, see reliability.SetPaused), the packet is dropped rather than blocking the +// caller's goroutine forever — the sender re-acks on retransmit, so this is recoverable +// (mirrors NodeTransport.Send, which also swallows errors for the same reason). func (t *NodeTransport) AddPacket(p transport.Packet) { - t.packets <- p + select { + case t.packets <- p: + default: + if t.logger != nil { + t.logger.Debug("datasync: dropped inbound packet (node not consuming)") + } + } } func (t *NodeTransport) Watch(ctx context.Context) (*transport.Packet, bool) { diff --git a/pkg/messaging/layers/reliability/reliability.go b/pkg/messaging/layers/reliability/reliability.go index 4dff766e24a..d45023883d1 100644 --- a/pkg/messaging/layers/reliability/reliability.go +++ b/pkg/messaging/layers/reliability/reliability.go @@ -2,6 +2,9 @@ package reliability import ( "crypto/ecdsa" + "sync" + "sync/atomic" + "time" "github.com/golang/protobuf/proto" "github.com/pkg/errors" @@ -17,6 +20,19 @@ import ( datasyncpeer "github.com/status-im/status-go/pkg/messaging/layers/reliability/datasync/peer" ) +// pausedDuration is the tick interval handed to the mvds node while the host is +// backgrounded. We don't modify the mvds package: the interval is the `duration` +// argument to Node.Start, so we stop the node and recreate it with a different +// one. 5 minutes is rare enough that the SoC deep-sleeps essentially the whole +// interval (battery-wise indistinguishable from never ticking) yet still lets +// the outbound loop drain its in-memory ack buffer (mvds `payloads`) every 5 +// minutes — so acks owed for messages received while backgrounded go out within +// ~5 min instead of waiting for the next foreground, and a queued message that +// ages past its SendEpoch dispatches on the first tick after resume. +const pausedDuration = 5 * time.Minute + +var errNotStarted = errors.New("reliability layer not started") + // MessageDispatcher is a function that dispatches messages to a given public key. // // publicKey: the recipient public key @@ -26,11 +42,19 @@ type MessageDispatcher func(publicKey *ecdsa.PublicKey, wrappedPayload []byte, m type Reliability struct { identity *ecdsa.PrivateKey - datasync *datasync2.DataSync mvdsPersistence mvdsnode.Persistence mvdsStatusChangeEvent chan mvdsnode.PeerStatusChangeEvent sdsManager *sds.ReliabilityManager logger *zap.Logger + + // mu guards lifecycle transitions (Start/Stop/SetPaused/buildNode) so they + // don't interleave. The hot path (Unwrap / WrapAndQueue) only does an atomic + // Load of `datasync` and never takes mu. + mu sync.Mutex + datasync atomic.Pointer[datasync2.DataSync] + dispatch MessageDispatcher + paused bool + tick time.Duration // outbound-loop interval the active node was built with } func NewReliability(datasyncPersistence mvdsnode.Persistence, identity *ecdsa.PrivateKey, logger *zap.Logger) *Reliability { @@ -45,10 +69,27 @@ func NewReliability(datasyncPersistence mvdsnode.Persistence, identity *ecdsa.Pr } func (r *Reliability) Start(dispatch MessageDispatcher) error { - dataSyncTransport := datasync2.NewNodeTransport() - dataSyncNode, err := mvdsnode.NewPersistentNode( + r.mu.Lock() + defer r.mu.Unlock() + r.dispatch = dispatch + duration := datasync2.DatasyncTicker + if r.paused { + duration = pausedDuration + } + return r.buildNode(duration) +} + +// buildNode (re)creates the mvds data-sync node with the given outbound-loop +// interval and installs it. Caller must hold r.mu. +// +// NewPersistentNode reloads the last persisted epoch from r.mvdsPersistence +// (SQLite) on construction, so the epoch counter survives a recreate; the +// status-change channel is reused so peer-online events keep flowing. +func (r *Reliability) buildNode(duration time.Duration) error { + transport := datasync2.NewNodeTransport() + node, err := mvdsnode.NewPersistentNode( r.mvdsPersistence, - dataSyncTransport, + transport, datasyncpeer.PublicKeyToPeerID(r.identity.PublicKey), mvdsnode.BATCH, datasync2.CalculateSendTime, @@ -58,43 +99,106 @@ func (r *Reliability) Start(dispatch MessageDispatcher) error { if err != nil { return err } + ds := datasync2.New(node, transport, true, r.logger) + ds.Init(r.mvdsDispatch, r.logger) + ds.Start(duration) + r.tick = duration + r.datasync.Store(ds) + return nil +} - r.datasync = datasync2.New(dataSyncNode, dataSyncTransport, true, r.logger) - - mvdsDispatch := func(receiver mvdsstate.PeerID, payload *mvdsproto.Payload) error { - if !payload.IsValid() { - return errors.New("payload is invalid") - } - - marshalledPayload, err := proto.Marshal(payload) - if err != nil { - return errors.Wrap(err, "failed to marshal payload") - } - - publicKey, err := datasyncpeer.IDToPublicKey(receiver) - if err != nil { - return errors.Wrap(err, "failed to convert id to public key") - } - - messages := make([][]byte, 0, len(payload.Messages)) - for _, msg := range payload.Messages { - messages = append(messages, msg.Body) - } +// currentTick returns the outbound-loop interval the active mvds node was built +// with — DatasyncTicker when running, pausedDuration when paused. Used by tests. +func (r *Reliability) currentTick() time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + return r.tick +} - return dispatch(publicKey, marshalledPayload, messages) +// mvdsDispatch adapts mvds's dispatch signature to r.dispatch. +func (r *Reliability) mvdsDispatch(receiver mvdsstate.PeerID, payload *mvdsproto.Payload) error { + if !payload.IsValid() { + return errors.New("payload is invalid") } + marshalledPayload, err := proto.Marshal(payload) + if err != nil { + return errors.Wrap(err, "failed to marshal payload") + } + publicKey, err := datasyncpeer.IDToPublicKey(receiver) + if err != nil { + return errors.Wrap(err, "failed to convert id to public key") + } + messages := make([][]byte, 0, len(payload.Messages)) + for _, msg := range payload.Messages { + messages = append(messages, msg.Body) + } + return r.dispatch(publicKey, marshalledPayload, messages) +} - r.datasync.Init(mvdsDispatch, r.logger) - r.datasync.Start(datasync2.DatasyncTicker) - +// SetPaused idles (paused==true) or re-arms (paused==false) the mvds outbound +// loop by stopping the data-sync node and recreating it with a different tick +// interval (pausedDuration vs DatasyncTicker). While paused the loop's outbound +// work runs only every pausedDuration; inbound processing and peer-status +// handling keep running. Safe to call before Start (the state is recorded and +// applied when Start runs). +// +// Caveat: messages queued via WrapAndQueueMessageForDispatch while paused are +// persisted to SQLite but not transmitted until either the next pausedDuration +// tick or the next Resume (whichever comes first) — callers that must send +// promptly while backgrounded (e.g. the Android notification quick-reply) should +// Resume("messaging") first. Acks owed for messages received while paused live +// in the mvds node's in-memory `payloads` buffer; the pausedDuration tick drains +// them, but the resume-recreate drops whatever accumulated since the last tick +// (senders re-ack on retransmit, so no message loss — only delayed "delivered"). +func (r *Reliability) SetPaused(paused bool) error { + r.mu.Lock() + defer r.mu.Unlock() + if r.paused == paused { + return nil + } + r.paused = paused + old := r.datasync.Load() + if old == nil { + return nil // not started yet; Start will pick the right interval + } + start := time.Now() + // Stop accepting inbound packets into the (about-to-be-stopped) node: with + // AddPacket non-blocking on a buffered channel, an in-flight Unwrap wouldn't + // hang here — but once the node's Watch goroutine exits the packets it pushed + // would be silently dropped. Short-circuit Unwrap so we don't decode and + // then discard packets during the recreate window. + old.SetSendingEnabled(false) + old.Stop() + stopDur := time.Since(start) + duration := datasync2.DatasyncTicker + if paused { + duration = pausedDuration + } + buildStart := time.Now() + if err := r.buildNode(duration); err != nil { + // The old node is already stopped; don't leave the atomic pointer aimed + // at a corpse. Started() now reports false and Unwrap/WrapAndQueue return + // errNotStarted; the next Reliability.Start (e.g. on a connection change) + // will rebuild. + r.datasync.Store(nil) + r.logger.Error("reliability SetPaused: rebuild failed; data-sync node is down until next Start", zap.Error(err)) + return err + } + r.logger.Info("reliability SetPaused", + zap.Bool("paused", paused), + zap.Duration("stop", stopDur), + zap.Duration("build", time.Since(buildStart)), + zap.Duration("total", time.Since(start))) return nil } func (r *Reliability) Stop() { - if r.Started() { - r.datasync.Stop() + r.mu.Lock() + defer r.mu.Unlock() + if old := r.datasync.Swap(nil); old != nil { + old.SetSendingEnabled(false) + old.Stop() } - r.datasync = nil if r.sdsManager != nil { err := r.sdsManager.Cleanup() if err != nil { @@ -105,30 +209,38 @@ func (r *Reliability) Stop() { } func (r *Reliability) Started() bool { - return r.datasync != nil + return r.datasync.Load() != nil } // WrapAndQueueMessageForDispatch wraps the message in the reliability layer, // then queues it for delivery to the target public key using configured MVDSDispatcher. func (r *Reliability) WrapAndQueueMessageForDispatch(publicKey *ecdsa.PublicKey, message []byte) (mvdsstate.MessageID, error) { + ds := r.datasync.Load() + if ds == nil { + return mvdsstate.MessageID{}, errNotStarted + } groupID := datasync2.ToOneToOneGroupID(&r.identity.PublicKey, publicKey) peerID := datasyncpeer.PublicKeyToPeerID(*publicKey) - exist, err := r.datasync.IsPeerInGroup(groupID, peerID) + exist, err := ds.IsPeerInGroup(groupID, peerID) if err != nil { return mvdsstate.MessageID{}, errors.Wrap(err, "failed to check if peer is in group") } if !exist { - if err := r.datasync.AddPeer(groupID, peerID); err != nil { + if err := ds.AddPeer(groupID, peerID); err != nil { return mvdsstate.MessageID{}, errors.Wrap(err, "failed to add peer") } } - return r.datasync.AppendMessage(groupID, message) + return ds.AppendMessage(groupID, message) } // UnwrapAndAcknowledge tries to unwrap received datasync message, // and potentially acknowledges it. func (r *Reliability) UnwrapAndAcknowledgeMessage(publicKey *ecdsa.PublicKey, message []byte) (*mvdsproto.Payload, error) { - return r.datasync.Unwrap(publicKey, message) + ds := r.datasync.Load() + if ds == nil { + return nil, errNotStarted + } + return ds.Unwrap(publicKey, message) } // ReportPeerOnline reports to MVDS that a peer is online at a given event time. diff --git a/pkg/messaging/layers/reliability/reliability_test.go b/pkg/messaging/layers/reliability/reliability_test.go new file mode 100644 index 00000000000..61951ab86d5 --- /dev/null +++ b/pkg/messaging/layers/reliability/reliability_test.go @@ -0,0 +1,153 @@ +package reliability + +import ( + "crypto/ecdsa" + "errors" + "testing" + "time" + + mvdsnode "github.com/status-im/mvds/node" + mvdsmigrations "github.com/status-im/mvds/persistenceutil" + mvdsstate "github.com/status-im/mvds/state" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/status-im/status-go/internal/crypto" + "github.com/status-im/status-go/internal/testutils" +) + +func noopDispatch(*ecdsa.PublicKey, []byte, [][]byte) error { return nil } + +func newTestPersistence(t *testing.T) mvdsnode.Persistence { + t.Helper() + db, err := testutils.SetupTestMemorySQLDB(testutils.NewTestDBInitializer(nil)) + require.NoError(t, err) + t.Cleanup(func() { _ = db.Close() }) + require.NoError(t, mvdsmigrations.Migrate(db)) + return mvdsnode.NewSQLitePersistence(db) +} + +func newTestReliability(t *testing.T) *Reliability { + t.Helper() + key, err := crypto.GenerateKey() + require.NoError(t, err) + return NewReliability(newTestPersistence(t), key, zap.NewNop()) +} + +func TestReliabilityPauseResume(t *testing.T) { + r := newTestReliability(t) + require.NoError(t, r.Start(noopDispatch)) + t.Cleanup(r.Stop) + + require.True(t, r.Started()) + require.Less(t, r.currentTick(), time.Second, "running node should tick frequently") + + // Pause: the mvds node is recreated with the long (pausedDuration) interval + // so its outbound loop idles. The node stays started. + require.NoError(t, r.SetPaused(true)) + require.True(t, r.Started()) + require.Equal(t, pausedDuration, r.currentTick()) + + // Idempotent: a second SetPaused(true) does not recreate the node again. + require.NoError(t, r.SetPaused(true)) + require.Equal(t, pausedDuration, r.currentTick()) + + // Resume: recreated with the normal interval. + require.NoError(t, r.SetPaused(false)) + require.True(t, r.Started()) + require.Less(t, r.currentTick(), time.Second) + + r.Stop() + require.False(t, r.Started()) +} + +func TestReliabilitySetPausedBeforeStart(t *testing.T) { + r := newTestReliability(t) + + // Before Start: SetPaused only records the state; there is no node yet. + require.NoError(t, r.SetPaused(true)) + require.False(t, r.Started()) + + // Start while paused → the node is built with the paused interval. + require.NoError(t, r.Start(noopDispatch)) + t.Cleanup(r.Stop) + require.True(t, r.Started()) + require.Equal(t, pausedDuration, r.currentTick()) + + // Resume. + require.NoError(t, r.SetPaused(false)) + require.Less(t, r.currentTick(), time.Second) +} + +// failingEpochStore makes mvdsnode.NewPersistentNode fail (it reads the epoch +// during construction). +type failingEpochStore struct{} + +func (failingEpochStore) Get(mvdsstate.PeerID) (int64, error) { + return 0, errors.New("epoch read failed") +} +func (failingEpochStore) Set(mvdsstate.PeerID, int64) error { return nil } + +// epochFailsOnNthBuild wraps a real Persistence but hands out a failing +// EpochStore on the n-th call (1-indexed) — used to make the recreate inside +// SetPaused fail while the initial Start succeeds. +type epochFailsOnNthBuild struct { + mvdsnode.Persistence + n int + calls int +} + +func (p *epochFailsOnNthBuild) EpochStore() mvdsnode.EpochPersistence { + p.calls++ + if p.calls == p.n { + return failingEpochStore{} + } + return p.Persistence.EpochStore() +} + +func TestReliabilityBuildNodeFailureLeavesNoZombie(t *testing.T) { + key, err := crypto.GenerateKey() + require.NoError(t, err) + // Fail the 2nd NewPersistentNode (the recreate in SetPaused); the 1st (Start) + // succeeds. + persistence := &epochFailsOnNthBuild{Persistence: newTestPersistence(t), n: 2} + r := NewReliability(persistence, key, zap.NewNop()) + t.Cleanup(r.Stop) + + require.NoError(t, r.Start(noopDispatch)) + require.True(t, r.Started()) + + // The recreate fails: SetPaused must report the error and must NOT leave the + // atomic pointer aimed at the stopped old node. + err = r.SetPaused(true) + require.Error(t, err) + require.False(t, r.Started(), "a failed rebuild must not leave a zombie node") + + // A subsequent Start (e.g. on a connection change) recovers — the 3rd + // NewPersistentNode succeeds. + require.NoError(t, r.Start(noopDispatch)) + require.True(t, r.Started()) +} + +// TestReliabilityConcurrent stresses the lifecycle ops against concurrent +// readers (Started / currentTick); run the package with -race to exercise the +// atomic.Pointer swap and the mutex. +func TestReliabilityConcurrent(t *testing.T) { + r := newTestReliability(t) + require.NoError(t, r.Start(noopDispatch)) + t.Cleanup(r.Stop) + + done := make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < 20; i++ { + _ = r.SetPaused(i%2 == 0) + } + }() + for i := 0; i < 300; i++ { + _ = r.Started() + _ = r.currentTick() + } + <-done + require.True(t, r.Started()) +} diff --git a/pkg/messaging/layers/transport/transport.go b/pkg/messaging/layers/transport/transport.go index 1cbf8e0136f..7bce50d734d 100644 --- a/pkg/messaging/layers/transport/transport.go +++ b/pkg/messaging/layers/transport/transport.go @@ -85,20 +85,32 @@ type Transport struct { quit chan struct{} } -// Pause signals Transport's internal goroutines to idle and cascades to EnvelopesMonitor. +// Pause signals Transport's internal goroutines to idle and cascades to +// EnvelopesMonitor and the underlying waku transport. func (t *Transport) Pause() { t.MarkPaused() if t.envelopesMonitor != nil { t.envelopesMonitor.MarkPaused() } + if t.waku != nil { + if err := t.waku.Pause(); err != nil { + t.logger.Warn("waku Pause failed", zap.Error(err)) + } + } } -// Resume signals Transport's internal goroutines to resume and cascades to EnvelopesMonitor. +// Resume signals Transport's internal goroutines to resume and cascades to +// EnvelopesMonitor and the underlying waku transport. func (t *Transport) Resume() { t.MarkResumed() if t.envelopesMonitor != nil { t.envelopesMonitor.MarkResumed() } + if t.waku != nil { + if err := t.waku.Resume(); err != nil { + t.logger.Warn("waku Resume failed", zap.Error(err)) + } + } } var cleanFiltersLoopInterval = 5 * time.Minute diff --git a/pkg/messaging/waku/types/waku.go b/pkg/messaging/waku/types/waku.go index a11853db2c3..7bc9bad5869 100644 --- a/pkg/messaging/waku/types/waku.go +++ b/pkg/messaging/waku/types/waku.go @@ -102,6 +102,13 @@ type Waku interface { Start() error Stop() error + // Pause signals the waku transport to idle its goroutines (e.g. when the + // hosting app is backgrounded). Idempotent. Cascaded from the messenger + // transport so the protocol layer can suppress background work. + Pause() error + // Resume re-arms goroutines suspended by Pause. Idempotent. + Resume() error + // Waku protocol version Version() uint diff --git a/protocol/messenger.go b/protocol/messenger.go index 0fb546bc1b7..f0f15f8e2da 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -39,7 +39,6 @@ import ( "github.com/status-im/status-go/internal/images" "github.com/status-im/status-go/internal/instrumentation/trace" messaging2 "github.com/status-im/status-go/pkg/messaging" - datasync "github.com/status-im/status-go/pkg/messaging/layers/reliability/datasync" types2 "github.com/status-im/status-go/pkg/messaging/types" "github.com/status-im/status-go/protocol/contacts" @@ -566,7 +565,6 @@ func (m *Messenger) ToBackground() { func (m *Messenger) SetPaused(paused bool) { m.paused.Store(paused) - datasync.SetPaused(paused) if m.pushNotificationClient != nil { if paused { m.pushNotificationClient.Offline() @@ -580,6 +578,9 @@ func (m *Messenger) SetPaused(paused bool) { } else { m.messaging.ResumeTransport() } + if err := m.messaging.PauseDataSync(paused); err != nil { + m.logger.Warn("failed to pause data sync", zap.Error(err)) + } } // ToDo: the current ArchiveManager does not provide SetPaused method yet // if m.archiveManager != nil { diff --git a/services/ext/service.go b/services/ext/service.go index d87f5d4dd7b..2f863ab80c9 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -252,18 +252,13 @@ func (s *Service) StartMessenger() (*protocol.MessengerResponse, error) { func (s *Service) retrieveStats(tick time.Duration, cancel <-chan struct{}) { defer gocommon.LogOnPanic() - ticker := time.NewTicker(tick) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - response := s.messenger.GetStats() - PublisherSignalHandler{}.Stats(response) - case <-cancel: - return - } - } + sub := s.PauseBroadcaster.Subscribe() + defer sub.Unsubscribe() + pt := gocommon.NewPausableTicker(gocommon.PausableTickerConfig{ + Interval: tick, + OnTick: func() { PublisherSignalHandler{}.Stats(s.messenger.GetStats()) }, + }, sub.C()) + pt.Run(cancel) } func (s *Service) EnableInstallation(installationID string) error {