feat(mix): cover traffic with constant rate#2243
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2243 +/- ##
==========================================
- Coverage 72.66% 72.60% -0.06%
==========================================
Files 166 167 +1
Lines 21435 21844 +409
Branches 20 20
==========================================
+ Hits 15575 15860 +285
- Misses 5860 5984 +124
🚀 New features to boost your workflow:
|
f05d549 to
03573c4
Compare
2f63b32 to
d1960b2
Compare
d1960b2 to
f7ebceb
Compare
There was a problem hiding this comment.
Pull request overview
Implements opt-in Mix Protocol cover traffic as a pluggable strategy (constant-rate per spec), including shared per-epoch slot budgeting and integration hooks in MixProtocol and SpamProtection, plus accompanying unit/integration tests and new metrics.
Changes:
- Add
CoverTrafficbase type,ConstantRateCoverTrafficstrategy, andSlotPoolbudget management. - Integrate cover traffic into MixProtocol (cover emission/build/send hooks, slot claiming on send/forward, epoch-change propagation, exit handling).
- Add cover-traffic-focused tests and Mix metrics; remove GossipSub interop test/binary code.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
libp2p/protocols/mix/cover_traffic.nim |
New cover traffic abstractions + constant-rate implementation with optional precomputation. |
libp2p/protocols/mix/mix_protocol.nim |
Wires cover traffic into MixProtocol lifecycle, slot claiming paths, and cover packet build/send helpers. |
libp2p/protocols/mix/spam_protection.nim |
Extends SpamProtection with epoch-change callbacks + epoch/budget query stubs. |
libp2p/protocols/mix/mix_metrics.nim |
Adds counters/gauges for cover emission/reception and slot exhaustion. |
tests/libp2p/mix/test_cover_traffic.nim |
Unit tests for SlotPool, ConstantRateCoverTraffic emission, precompute behavior, and lifecycle. |
tests/libp2p/mix/component/test_cover_traffic.nim |
Component/integration tests validating cover packet validity, traversal, and slot exhaustion behavior. |
tests/interop/test_gossipsub.nim |
Removed GossipSub interop tests. |
interop/gossipsub/src/runner.nim |
Removed GossipSub interop script runner implementation. |
interop/gossipsub/src/node.nim |
Removed GossipSub interop node utilities (keys/peer IDs/msg IDs). |
interop/gossipsub/src/logger.nim |
Removed structured JSON logger for GossipSub interop. |
interop/gossipsub/src/instructions.nim |
Removed GossipSub interop instruction parsing/types. |
interop/gossipsub/peer.nim |
Removed GossipSub interop “peer” binary entrypoint. |
interop/gossipsub/nim_peer.nimble |
Removed nimble manifest for the GossipSub interop binary. |
interop/gossipsub/config.nims |
Removed build config for the GossipSub interop binary. |
f7ebceb to
26a9fd5
Compare
26a9fd5 to
e0cbd9d
Compare
gmelodie
left a comment
There was a problem hiding this comment.
This PR is a PoC, and thus it should not be merged. In the future please wait until the spec is more mature to open a PR. Mature here doesn't necessarily mean merged in v1, but at it should at least have had some discussions and implemented feedback. Also, if you want feedback on a PoC, consider making the PR a draft, letting us know it's a PoC and then requesting the reviews.
Good work though!
| type CoverPacket* = object | ||
| packet*: seq[byte] | ||
| firstHopPeerId*: PeerId | ||
| firstHopAddr*: MultiAddress |
There was a problem hiding this comment.
| type CoverPacket* = object | |
| packet*: seq[byte] | |
| firstHopPeerId*: PeerId | |
| firstHopAddr*: MultiAddress | |
| type CoverPacket* = object | |
| packet*: seq[byte] | |
| firstHop*: (PeerId, seq[MultiAddress]) |
There was a problem hiding this comment.
keeping separate fields for now. writeLp and getConn do use seq[MultiAddress] but we always resolve to a single supported address per mix node, so wrapping in @[multiAddr] at the boundary is fine. changing the type would ripple across callback types and call sites for no functional benefit.
2d903d0 to
2730495
Compare
41f9110 to
260e7de
Compare
| let built = buildRes.get() | ||
| let sendRes = | ||
| await ct.sendPacket(built.firstHopPeerId, built.firstHopAddr, built.packet) | ||
| if sendRes.isErr: | ||
| debug "Failed to send cover packet", err = sendRes.error | ||
| mix_cover_error.inc(labelValues = ["SEND_FAILED"]) | ||
| else: | ||
| mix_cover_emitted.inc(labelValues = ["on_demand"]) | ||
|
|
||
| proc emitCoverPacket*( | ||
| ct: ConstantRateCoverTraffic | ||
| ) {.async: (raises: [CancelledError]).} = | ||
| if ct.enablePrecomputation and ct.slotPool.queuedCount > 0: | ||
| if not ct.slotPool.claimSlotForCover(): | ||
| mix_slot_claim_rejected.inc(labelValues = ["cover"]) | ||
| return | ||
| ct.slotPool.dequeue().withValue(pkt): | ||
| # Check if the prebuilt proof is still valid (e.g., Merkle root not stale) | ||
| if ct.validateProofToken != nil and pkt.proofToken.len > 0 and | ||
| not ct.validateProofToken(pkt.proofToken): | ||
| trace "Prebuilt cover packet has stale proof, rebuilding on-demand" | ||
| mix_cover_error.inc(labelValues = ["STALE_PROOF"]) | ||
| # Reclaim the stale proof's messageId so it can be reused | ||
| if ct.reclaimProofToken != nil: | ||
| ct.reclaimProofToken(pkt.proofToken) | ||
| await ct.buildAndSendOnDemand() | ||
| return | ||
| else: | ||
| let sendRes = | ||
| await ct.sendPacket(pkt.firstHopPeerId, pkt.firstHopAddr, pkt.packet) | ||
| if sendRes.isErr: | ||
| debug "Failed to send pre-built cover packet", err = sendRes.error | ||
| mix_cover_error.inc(labelValues = ["SEND_FAILED"]) | ||
| else: | ||
| mix_cover_emitted.inc(labelValues = ["prebuilt"]) | ||
| return |
There was a problem hiding this comment.
When sendPacket returns an error, the cover packet was effectively discarded but its proofToken is not reclaimed. For spam-protection implementations where token represents a reserved/consumed slot (eg precomputed messageId), this can leak budget and cause premature slot exhaustion after transient send failures. Consider invoking reclaimProofToken (when set and token non-empty) on SEND_FAILED for both on-demand builds (built.proofToken) and prebuilt packets (pkt.proofToken).
| func new*(T: typedesc[SlotPool], totalSlots: int): T = | ||
| T(epoch: 0, totalSlots: totalSlots, coverQueue: initDeque[CoverPacket]()) | ||
|
|
||
| func beginEpoch*(pool: SlotPool, epoch: uint64) = | ||
| ## Clear stale cover packets and refill slots for the new epoch. | ||
| ## Precomputed packets from the previous epoch are discarded because | ||
| ## their proofs belong to the old epoch. | ||
| pool.epoch = epoch | ||
| pool.coverQueue = initDeque[CoverPacket]() | ||
| pool.coverClaimed = 0 | ||
| pool.nonCoverClaimed = 0 |
There was a problem hiding this comment.
SlotPool exposes several state-mutating routines as func (beginEpoch, claimSlotForCover, claimSlot, updateTotalSlots, addPacket, dequeue). In Nim, func implies noSideEffect, so assignments like pool.coverClaimed += 1 / pool.coverQueue.popFirst() will fail to compile under the effect system. These should be proc (or otherwise explicitly allow side effects) to match mutable APIs elsewhere in the codebase.
260e7de to
586fce9
Compare
586fce9 to
6ba3e3a
Compare
| method registerOnEpochChange*( | ||
| self: SpamProtection, cb: EpochChangeCallback | ||
| ) {.base, gcsafe, raises: [].} = | ||
| self.epochChangeCallbacks.add(cb) | ||
|
|
||
| proc notifyEpochChange*(self: SpamProtection, epoch: uint64) {.raises: [].} = | ||
| ## Fire all registered epoch change callbacks. | ||
| for cb in self.epochChangeCallbacks: | ||
| cb(epoch) |
There was a problem hiding this comment.
registerOnEpochChange is a virtual method, but notifyEpochChange is a non-virtual proc that iterates epochChangeCallbacks directly. If a concrete SpamProtection overrides registerOnEpochChange (e.g., to forward registration elsewhere), notifyEpochChange won’t fire those callbacks. Consider making registerOnEpochChange a non-virtual proc, or make notifyEpochChange a method whose base implementation dispatches through the same override point.
6ba3e3a to
8f85adc
Compare
8f85adc to
cf3d7a7
Compare
| ## for proof slot tracking. | ||
| let spamProtection = mixProto.spamProtection.valueOr: | ||
| return ok(packet) | ||
| return ok((packet, newSeq[byte]())) |
There was a problem hiding this comment.
When spamProtection is disabled, generateAndAppendProof returns ok((packet, newSeq[byte]())), which allocates a new empty seq on every call. This is on the hot path for sending/forwarding; consider returning a shared empty seq literal (e.g. @[] / default(seq[byte])) to avoid per-packet allocations.
| return ok((packet, newSeq[byte]())) | |
| return ok((packet, default(seq[byte]))) |
| method stop*(ct: CoverTraffic) {.base, async: (raises: []).} = | ||
| raiseAssert "stop must be implemented by concrete cover traffic types" | ||
|
|
||
| method onEpochChange*(ct: CoverTraffic, epoch: uint64) {.base, gcsafe, raises: [].} = |
There was a problem hiding this comment.
CoverTraffic.onEpochChange calls slotPool.beginEpoch(epoch), which clears coverQueue and drops any queued precomputed packets. If those packets carry proofTokens from spam protection, they are discarded without calling reclaimProofToken, so implementations that track/lease proof slots via tokens may leak capacity/resources. Consider draining the existing queue and invoking ct.reclaimProofToken(token) (when set and non-empty) for each packet before clearing/refilling for the new epoch.
| method onEpochChange*(ct: CoverTraffic, epoch: uint64) {.base, gcsafe, raises: [].} = | |
| method onEpochChange*(ct: CoverTraffic, epoch: uint64) {.base, gcsafe, raises: [].} = | |
| if not ct.reclaimProofToken.isNil: | |
| while ct.slotPool.coverQueue.len > 0: | |
| let discarded = ct.slotPool.coverQueue.popFirst() | |
| if discarded.proofToken.len > 0: | |
| ct.reclaimProofToken(discarded.proofToken) |
cf3d7a7 to
3c05e25
Compare
3c05e25 to
84437aa
Compare
| proc(): Future[Result[tuple[packet: seq[byte], proofToken: seq[byte]], string]] {. | ||
| async | ||
| .} = |
There was a problem hiding this comment.
The proc pragma formatting here ({. async .} split across lines) is inconsistent with the surrounding code and is likely to be rewritten by the repo formatter (nph), causing CI formatting failures. Please run nimble format or adjust this to the standard single-line pragma formatting used elsewhere in the file.
| proc(): Future[Result[tuple[packet: seq[byte], proofToken: seq[byte]], string]] {. | |
| async | |
| .} = | |
| proc(): Future[Result[tuple[packet: seq[byte], proofToken: seq[byte]], string]] {.async.} = |
| coverClaimed*: int | ||
| nonCoverClaimed*: int | ||
|
|
||
| proc new*(T: typedesc[SlotPool], totalSlots: int): T = |
There was a problem hiding this comment.
SlotPool.new is exported and currently accepts any totalSlots value without validation. Negative or zero values would make availableSlots incorrect (and claimSlotForCover / claimSlot behavior confusing). Consider asserting totalSlots > 0 here (consistent with ConstantRateCoverTraffic.new), or clamp/return an error if you want to allow SlotPool to be used directly as a public API.
| proc new*(T: typedesc[SlotPool], totalSlots: int): T = | |
| proc new*(T: typedesc[SlotPool], totalSlots: int): T {.raises: [AssertionDefect].} = | |
| doAssert totalSlots > 0, "SlotPool totalSlots must be greater than zero" |
| ## Builds cover packets in batches and adds them to the coverQueue for the | ||
| ## current epoch (same-epoch precomputation). | ||
| ## | ||
| ## Packets are built with proofs for the current epoch so their proof tokens | ||
| ## can be reclaimed if the packet is discarded before sending. |
There was a problem hiding this comment.
The PR description mentions pre-computation "staged for the next epoch", but the implementation and comment here explicitly describe same-epoch precomputation and beginEpoch clears the queue on epoch change. Please either update the PR description to match the current behavior, or adjust the implementation if next-epoch staging is required by the spec/requirements.
| proc unclaimCoverSlot(ct: ConstantRateCoverTraffic) = | ||
| ## Return a cover slot on build/send failure so it can be retried. | ||
| if ct.slotPool.coverClaimed > 0: | ||
| ct.slotPool.coverClaimed -= 1 | ||
|
|
There was a problem hiding this comment.
unclaimCoverSlot claims to return a slot on "build/send failure", but it is only invoked on build failures (send failures still consume a slot). Either update the comment to match the behavior, or call unclaimCoverSlot() on send failures as well if the intended behavior is to retry without consuming the epoch budget.
| asyncSpawn ct.start() | ||
| checkUntilTimeout: | ||
| ct.isRunning == true |
There was a problem hiding this comment.
In this test, asyncSpawn ct.start() launches an untracked future. The project guidelines discourage asyncSpawn unless the spawned Future is tracked, and here it's also unnecessary because ConstantRateCoverTraffic.start() doesn't block (it just sets up internal loops and returns). Prefer await ct.start() (or store the returned Future and await/cancel it in teardown) to avoid untracked tasks.
| asyncSpawn ct.start() | |
| checkUntilTimeout: | |
| ct.isRunning == true | |
| await ct.start() | |
| check ct.isRunning == true |
84437aa to
8e06dc0
Compare
Adds constant-rate cover traffic emission and per-hop spam protection proof token reuse for the Mix Protocol per spec sections 3-7 and 9.6. Key changes: - ConstantRateCoverTraffic: fixed-interval emission with slot pool budget - Same-epoch precomputation with stale proof detection and rebuild - Opaque ProofResult token for messageId reclaim on packet discard - SlotPool: unified R-slot budget across cover/forward/origination - SpamProtection: reclaimProofToken + isProofTokenValid methods - Metrics: mix_cover_emitted, mix_cover_error, mix_slot_claim_rejected Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
8e06dc0 to
fe79825
Compare
| ## Builds cover packets in batches and adds them to the coverQueue for the | ||
| ## current epoch (same-epoch precomputation). | ||
| ## | ||
| ## Packets are built with proofs for the current epoch so their proof tokens | ||
| ## can be reclaimed if the packet is discarded before sending. | ||
| while ct.running: |
There was a problem hiding this comment.
PR description says pre-computation is "staged for the next epoch", but this loop explicitly precomputes for the current epoch (and discards packets on epoch change). Either adjust implementation to build into a next-epoch queue, or update the PR description to match the current-epoch behavior.
| doAssert not switch.rng.isNil, "Switch must have RNG initialized" | ||
|
|
||
| mixProto.mixNodeInfo = mixNodeInfo | ||
| mixProto.switch = switch | ||
| mixProto.rng = rng | ||
| mixProto.rng = switch.rng |
There was a problem hiding this comment.
MixProtocol.init now hard-asserts switch.rng is non-nil, but newSwitch defaults rng to nil (so callers constructing a Switch directly can still end up with a nil RNG). This is a behavior/API change from the previous fallback and can lead to assertions (or nil derefs if asserts are disabled). Consider either (a) restoring a safe fallback initialization here, (b) turning this into a clear error return/exception in an API that can signal failure, or (c) documenting/enforcing the non-nil RNG requirement at Switch construction time.
Summary
Implements the cover traffic mechanism for the Mix Protocol as a pluggable component, following the same pattern as
DelayStrategy.Cover traffic ensures sender unobservability by emitting dummy Sphinx packets at a constant rate, making it impossible for an observer to distinguish cover traffic from locally originated messages.
This PR:
CoverTrafficabstract base type andConstantRateCoverTrafficstrategy (spec §7.1 RECOMMENDED)SlotPoolto manage the per-epoch R-slot budget shared across cover, local origination, and forwarding trafficMixProtocolat four spec-defined touch points: cover packet transmission, non-cover slot claiming, epoch boundary handling, and cover packet reception at exitOnEpochChangecallback support toSpamProtectionfor epoch boundary notifications (spec: mix-dos-protection §8.2.3)The spec this implements: vacp2p/rfc-index#311
Affected Areas
cover_traffic.nimmodule, integration intomix_protocol.nim(exit handler codec check, slot claiming in send/forward paths, init/stopwiring), epoch change support in
spam_protection.nimCompatibility & Downstream Validation
Cover traffic is opt-in. When not configured existing behavior is fully preserved.
The
SpamProtectioninterface gains new methods (registerOnEpochChange,epochDurationSeconds,rateLimitBudget) with base implementations that are no-ops/returndefaults, so existing concrete implementations (e.g. RLN plugin) continue to compile without changes.
CoverTrafficintoWakuMix.initand update the RLN plugin to callnotifyEpochChangeImpact on Library Users
MixProtocol.newandMixProtocol.initacceptcoverTraffic: Opt[CoverTraffic](defaultOpt.none) — no migration requiredCoverTraffic,ConstantRateCoverTraffic,SlotPool,PrebuiltCoverPacket,BuildCoverPacketProc,SendCoverPacketProc— all inlibp2p/protocols/mix/cover_trafficRisk Assessment
Additional Notes
epochDurationSeconds/rateLimitBudgetand callnotifyEpochChangeon epoch transitions.