Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/messaging/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,13 @@ func (a *API) ResumeTransport() {
a.core.stack.Transport.Resume()
}
}

// PauseDataSync idles (paused==true) or re-arms (paused==false) the reliability
// layer's data-sync node so its outbound loop performs no work while the host is
// backgrounded.
func (a *API) PauseDataSync(paused bool) error {
if a.core.stack.Reliability == nil {
return nil
}
return a.core.stack.Reliability.SetPaused(paused)
}
24 changes: 20 additions & 4 deletions pkg/messaging/layers/reliability/datasync/datasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package datasync
import (
"crypto/ecdsa"
"errors"
"sync/atomic"

"github.com/golang/protobuf/proto"
datasyncnode "github.com/status-im/mvds/node"
Expand All @@ -17,12 +18,17 @@ type DataSync struct {
*datasyncnode.Node
// NodeTransport is the implementation of the datasync transport interface.
*NodeTransport
logger *zap.Logger
sendingEnabled bool
logger *zap.Logger
// sendingEnabled gates whether Unwrap feeds received messages into the node
// for acknowledgement. Read on the inbound-message goroutine, written by
// reliability.SetPaused on the lifecycle goroutine — hence atomic.
sendingEnabled atomic.Bool
}

func New(node *datasyncnode.Node, transport *NodeTransport, sendingEnabled bool, logger *zap.Logger) *DataSync {
return &DataSync{Node: node, NodeTransport: transport, sendingEnabled: sendingEnabled, logger: logger}
d := &DataSync{Node: node, NodeTransport: transport, logger: logger}
d.sendingEnabled.Store(sendingEnabled)
return d
}

// Unwrap tries to unwrap datasync message and passes back the message to datasync in order to acknowledge any potential message and mark messages as acknowledged
Expand All @@ -38,7 +44,7 @@ func (d *DataSync) Unwrap(sender *ecdsa.PublicKey, payload []byte) (*protobuf.Pa
return nil, errors.New("handling non-datasync message")
} else {
logger.Debug("handling datasync message")
if d.sendingEnabled {
if d.sendingEnabled.Load() {
d.add(sender, &datasyncMessage)
}
}
Expand All @@ -50,6 +56,16 @@ func (d *DataSync) Stop() {
d.Node.Stop()
}

// SetSendingEnabled toggles whether Unwrap feeds received datasync messages into
// the node for acknowledgement. It is flipped to false around a node recreate
// (see reliability.SetPaused) so Unwrap short-circuits instead of pushing
// packets onto a transport whose consumer goroutine has exited — AddPacket is
// non-blocking and buffered, so it wouldn't hang, but those packets would be
// silently dropped during the recreate window.
func (d *DataSync) SetSendingEnabled(v bool) {
d.sendingEnabled.Store(v)
}

func (d *DataSync) add(publicKey *ecdsa.PublicKey, datasyncMessage *protobuf.Payload) {
packet := datasynctransport.Packet{
Sender: datasyncpeer.PublicKeyToPeerID(*publicKey),
Expand Down
42 changes: 25 additions & 17 deletions pkg/messaging/layers/reliability/datasync/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"math"
"math/rand"
"sync"
"time"

"github.com/status-im/mvds/protobuf"
Expand All @@ -17,22 +16,14 @@ import (
const backoffInterval = 60

var errNotInitialized = errors.New("datasync transport not initialized")
var DatasyncTicker = 300 * time.Millisecond
var datasyncTickerMutex sync.RWMutex

func SetPaused(paused bool) {
datasyncTickerMutex.Lock()
defer datasyncTickerMutex.Unlock()
if paused {
DatasyncTicker = 2 * time.Second
} else {
DatasyncTicker = 300 * time.Millisecond
}
}

// DatasyncTicker is the mvds outbound-loop interval while running. The loop is
// idled while the host is backgrounded by reliability.SetPaused, which stops the
// mvds node and recreates it with a much larger tick — see
// pkg/messaging/layers/reliability/reliability.go.
const DatasyncTicker = 300 * time.Millisecond

func currentOffsetToSecond() uint64 {
datasyncTickerMutex.RLock()
defer datasyncTickerMutex.RUnlock()
return uint64(math.Ceil(float64(time.Second) / float64(DatasyncTicker)))
}

Expand All @@ -44,9 +35,15 @@ type NodeTransport struct {

var _ transport.Transport = (*NodeTransport)(nil)

// packetBufferSize gives AddPacket some slack so a non-blocking send succeeds during
// normal operation (the watch-loop consumer isn't always precisely parked on the channel);
// the buffer only fills, and packets only drop, if the consumer is genuinely stalled
// (e.g. between Stop and the next node being started in reliability.SetPaused).
const packetBufferSize = 32

func NewNodeTransport() *NodeTransport {
return &NodeTransport{
packets: make(chan transport.Packet),
packets: make(chan transport.Packet, packetBufferSize),
}
}

Expand All @@ -55,8 +52,19 @@ func (t *NodeTransport) Init(dispatch func(state.PeerID, *protobuf.Payload) erro
t.logger = logger
}

// AddPacket hands an inbound datasync packet to the node's watch loop. The send is
// non-blocking: if the node isn't currently consuming (e.g. mid-Stop during a pause/resume
// node recreate, see reliability.SetPaused), the packet is dropped rather than blocking the
// caller's goroutine forever — the sender re-acks on retransmit, so this is recoverable
// (mirrors NodeTransport.Send, which also swallows errors for the same reason).
func (t *NodeTransport) AddPacket(p transport.Packet) {
t.packets <- p
select {
case t.packets <- p:
default:
if t.logger != nil {
t.logger.Debug("datasync: dropped inbound packet (node not consuming)")
}
}
}

func (t *NodeTransport) Watch(ctx context.Context) (*transport.Packet, bool) {
Expand Down
Loading
Loading