Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 287 additions & 1 deletion tests/test_peer_store_extended.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[sequtils, times],
std/[sequtils, times, random],
chronos,
libp2p/crypto/crypto,
libp2p/peerid,
Expand Down Expand Up @@ -353,3 +353,289 @@ suite "Extended nim-libp2p Peer Store":
peerStore[DisconnectBook][p1] == 0
peerStore[SourceBook][p1] == default(PeerOrigin)
peerStore[DirectionBook][p1] == default(PeerDirection)
peerStore[GriefBook][p1] == default(GriefData)

suite "Extended nim-libp2p Peer Store: grief scores":
# These tests mock the clock and work better as a separate suite
var peerStore: PeerStore
var p1, p2, p3: PeerId

setup:
peerStore = PeerStore.new(nil, capacity = 50)
require p1.init(basePeerId & "1")
require p2.init(basePeerId & "2")
require p3.init(basePeerId & "3")

# Shorthand: one cooldown interval
let interval = GriefCooldownInterval

test "new peer has grief score 0":
check peerStore.getGriefScore(p1) == 0

test "griefPeer increases score":
let t0 = Moment.init(1000, Minute)

peerStore.griefPeer(p1, 5, t0)
check peerStore.getGriefScore(p1, t0) == 5

test "griefPeer accumulates":
let t0 = Moment.init(1000, Minute)

peerStore.griefPeer(p1, 3, t0)
peerStore.griefPeer(p1, 2, t0)
check peerStore.getGriefScore(p1, t0) == 5

test "grief cools down by 1 point per interval":
let t0 = Moment.init(1000, Minute)

peerStore.griefPeer(p1, 5, t0)

check peerStore.getGriefScore(p1, t0) == 5
check peerStore.getGriefScore(p1, t0 + interval * 1) == 4
check peerStore.getGriefScore(p1, t0 + interval * 2) == 3
check peerStore.getGriefScore(p1, t0 + interval * 3) == 2
check peerStore.getGriefScore(p1, t0 + interval * 4) == 1
check peerStore.getGriefScore(p1, t0 + interval * 5) == 0

test "grief floors at 0":
let t0 = Moment.init(1000, Minute)

peerStore.griefPeer(p1, 3, t0)

# Well past full cooldown, should be 0
check peerStore.getGriefScore(p1, t0 + interval * 10) == 0

test "cooldown preserves remainder":
let t0 = Moment.init(1000, Minute)
# Half an interval past 2 full intervals
let tHalf = t0 + interval * 2 + interval div 2
# Complete the 3rd interval
let t3 = t0 + interval * 3

peerStore.griefPeer(p1, 5, t0)

# After 2.5 intervals, score should be 3
check peerStore.getGriefScore(p1, tHalf) == 3

# After completing the 3rd interval, score should be 2
check peerStore.getGriefScore(p1, t3) == 2

test "grief after full cooldown restarts cooldown time":
let t0 = Moment.init(1000, Minute)

peerStore.griefPeer(p1, 2, t0)

# Fully cool down
check peerStore.getGriefScore(p1, t0 + interval * 5) == 0

# Grief again
let t1 = t0 + interval * 5
peerStore.griefPeer(p1, 3, t1)
check peerStore.getGriefScore(p1, t1) == 3

# 1 interval after second grief
check peerStore.getGriefScore(p1, t1 + interval) == 2

test "independent grief scores per peer":
let t0 = Moment.init(1000, Minute)

peerStore.griefPeer(p1, 10, t0)
peerStore.griefPeer(p2, 3, t0)

check peerStore.getGriefScore(p1, t0 + interval * 2) == 8
check peerStore.getGriefScore(p2, t0 + interval * 2) == 1
check peerStore.getGriefScore(p3, t0 + interval * 2) == 0

test "grief with default amount is 1":
let t0 = Moment.init(1000, Minute)

peerStore.griefPeer(p1, now = t0)
check peerStore.getGriefScore(p1, t0) == 1

test "griefPeer with zero or negative amount is ignored":
let t0 = Moment.init(1000, Minute)

peerStore.griefPeer(p1, 5, t0)
peerStore.griefPeer(p1, 0, t0)
peerStore.griefPeer(p1, -3, t0)
check peerStore.getGriefScore(p1, t0) == 5

test "grief added during partial cooldown does not reset cooldown time":
let t0 = Moment.init(1000, Minute)
let tHalf = t0 + interval * 2 + interval div 2
let t3 = t0 + interval * 3
let t4 = t0 + interval * 4

peerStore.griefPeer(p1, 5, t0)

# At 2.5 intervals: 2 consumed, score 3, half-interval remainder
check peerStore.getGriefScore(p1, tHalf) == 3

# Add more grief — cooldown time should NOT reset, remainder preserved
peerStore.griefPeer(p1, 4, tHalf)
check peerStore.getGriefScore(p1, tHalf) == 7

# Remainder completes another interval
check peerStore.getGriefScore(p1, t3) == 6

# And one more full interval
check peerStore.getGriefScore(p1, t4) == 5

test "multiple reads without time change are idempotent":
let t0 = Moment.init(1000, Minute)

peerStore.griefPeer(p1, 10, t0)

check peerStore.getGriefScore(p1, t0 + interval * 3) == 7
check peerStore.getGriefScore(p1, t0 + interval * 3) == 7
check peerStore.getGriefScore(p1, t0 + interval * 3) == 7

test "interleaved grief and cooldown across multiple peers":
let t0 = Moment.init(1000, Minute)

# Stagger grief: p1 at t0, p2 at t0+1interval, p3 at t0+2interval
peerStore.griefPeer(p1, 6, t0)
peerStore.griefPeer(p2, 4, t0 + interval)
peerStore.griefPeer(p3, 2, t0 + interval * 2)

# At t0+3*interval: p1 lost 3, p2 lost 2, p3 lost 1
check peerStore.getGriefScore(p1, t0 + interval * 3) == 3
check peerStore.getGriefScore(p2, t0 + interval * 3) == 2
check peerStore.getGriefScore(p3, t0 + interval * 3) == 1

# Grief p2 again at t0+3I
peerStore.griefPeer(p2, 10, t0 + interval * 3)

# At t0+5*interval: p1 lost 5 total, p2 lost 2 more since re-grief, p3 floored at 0
check peerStore.getGriefScore(p1, t0 + interval * 5) == 1
check peerStore.getGriefScore(p2, t0 + interval * 5) == 10
check peerStore.getGriefScore(p3, t0 + interval * 5) == 0

suite "Extended nim-libp2p Peer Store: grief-based peer selection":
# Tests for sortByGriefScore via selectPeers
const testProto = "/test/grief/1.0.0"

proc makePeer(port: int): RemotePeerInfo =
let key = generateSecp256k1Key()
RemotePeerInfo.init(
peerId = PeerId.init(key.getPublicKey().tryGet()).tryGet(),
addrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/" & $port).tryGet()],
protocols = @[testProto],
)

test "all peers at grief 0 returns all peers (shuffled)":
let switch = newTestSwitch()
let pm = PeerManager.new(switch)
let peerStore = switch.peerStore
let peers = (1..5).mapIt(makePeer(it + 10000))
for p in peers:
peerStore.addPeer(p)

let selected = pm.selectPeers(testProto)
check selected.len == 5

test "lower grief peers come before higher grief peers":
let switch = newTestSwitch()
let pm = PeerManager.new(switch)
let peerStore = switch.peerStore
let pA = makePeer(20001)
let pB = makePeer(20002)
let pC = makePeer(20003)
peerStore.addPeer(pA)
peerStore.addPeer(pB)
peerStore.addPeer(pC)

# pA: grief 0 (bucket 0), pB: grief 5 (bucket 1), pC: grief 15 (bucket 3)
peerStore.griefPeer(pB.peerId, 5)
peerStore.griefPeer(pC.peerId, 15)

# Run multiple times to account for shuffle within buckets
for i in 0 ..< 20:
let selected = pm.selectPeers(testProto)
check selected.len == 3
# pA (bucket 0) must always be first
check selected[0].peerId == pA.peerId
# pB (bucket 1) must always come before pC (bucket 3)
check selected[1].peerId == pB.peerId
check selected[2].peerId == pC.peerId

test "peers within same bucket are interchangeable":
let switch = newTestSwitch()
let pm = PeerManager.new(switch)
let peerStore = switch.peerStore
let pA = makePeer(30001)
let pB = makePeer(30002)
peerStore.addPeer(pA)
peerStore.addPeer(pB)

# Both within bucket 0 (scores 1 and 4, both div 5 == 0)
peerStore.griefPeer(pA.peerId, 1)
peerStore.griefPeer(pB.peerId, 4)

var sawAFirst = false
var sawBFirst = false
for i in 0 ..< 50:
let selected = pm.selectPeers(testProto)
check selected.len == 2
if selected[0].peerId == pA.peerId:
sawAFirst = true
else:
sawBFirst = true

# Both orderings should appear since they're in the same bucket
check sawAFirst
check sawBFirst

test "peers in different buckets never swap order":
let switch = newTestSwitch()
let pm = PeerManager.new(switch)
let peerStore = switch.peerStore
let pLow = makePeer(40001)
let pHigh = makePeer(40002)
peerStore.addPeer(pLow)
peerStore.addPeer(pHigh)

# pLow in bucket 0 (score 1), pHigh in bucket 1 (score 5)
peerStore.griefPeer(pLow.peerId, 1)
peerStore.griefPeer(pHigh.peerId, 5)

for i in 0 ..< 30:
let selected = pm.selectPeers(testProto)
check selected.len == 2
check selected[0].peerId == pLow.peerId
check selected[1].peerId == pHigh.peerId

test "zero-grief peers always come before grieved peers":
let switch = newTestSwitch()
let pm = PeerManager.new(switch)
let peerStore = switch.peerStore
let pClean1 = makePeer(50001)
let pClean2 = makePeer(50002)
let pGrieved = makePeer(50003)
peerStore.addPeer(pClean1)
peerStore.addPeer(pClean2)
peerStore.addPeer(pGrieved)

peerStore.griefPeer(pGrieved.peerId, 6)

for i in 0 ..< 20:
let selected = pm.selectPeers(testProto)
check selected.len == 3
# Grieved peer (bucket 1) must be last; clean peers (bucket 0) first
check selected[2].peerId == pGrieved.peerId

test "peers beyond MaxGriefBucket are excluded from selection":
let switch = newTestSwitch()
let pm = PeerManager.new(switch)
let peerStore = switch.peerStore
let pGood = makePeer(60001)
let pBad = makePeer(60002)
peerStore.addPeer(pGood)
peerStore.addPeer(pBad)

# pBad in bucket 4 (score 20, 20 div 5 = 4 > MaxGriefBucket)
peerStore.griefPeer(pBad.peerId, 20)

let selected = pm.selectPeers(testProto)
check selected.len == 1
check selected[0].peerId == pGood.peerId
1 change: 1 addition & 0 deletions tests/waku_peer_exchange/test_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ suite "Waku Peer Exchange":
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchangeClient()])

# Create connection
let connOpt = await node2.peerManager.dialPeer(
Expand Down
51 changes: 44 additions & 7 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -215,27 +215,64 @@ proc loadFromStorage(pm: PeerManager) {.gcsafe.} =

trace "recovered peers from storage", amount = amount

proc griefPeer*(pm: PeerManager, peerId: PeerId, amount: int = 1) =
if not pm.isNil:
pm.switch.peerStore.griefPeer(peerId, amount)

proc sortByGriefScore(pm: PeerManager, peers: var seq[RemotePeerInfo]) =
## Sorts peers by grief score ascending, with random shuffling within each
## score tier. Peers with lower grief are preferred.
## NOTE: shuffling defaultPeerStoreCapacity (750 currently) on demand is
## negligible, but if that increases, might be worth exploring different
## data structures.
let peerStore = pm.switch.peerStore

# Resolve grief scores for all peers
var anyGrief = false
var scored: seq[(int, RemotePeerInfo)]
for p in peers:
let score = peerStore.getGriefScore(p.peerId)
if score > 0:
anyGrief = true
scored.add((score, p))

# Fast path: if all peers are at 0, just shuffle
if not anyGrief:
shuffle(peers)
return

# Shuffle first so that within-bucket order is random
shuffle(scored)

# Stable sort by bucket preserves the random order within each bucket
scored.sort(
proc(a, b: (int, RemotePeerInfo)): int =
cmp(a[0] div GriefBucketSize, b[0] div GriefBucketSize),
order = SortOrder.Ascending)

# Drop peers beyond the max grief bucket
peers = scored.filterIt(it[0] div GriefBucketSize <= MaxGriefBucket).mapIt(it[1])

proc selectPeers*(
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
): seq[RemotePeerInfo] =
## Returns all peers that support the given protocol (and optionally shard),
## shuffled randomly. Callers can further filter or pick from this list.
var peers = pm.switch.peerStore.getPeersByProtocol(proto)
## sorted by grief score ascending (shuffled within each score tier).
result = pm.switch.peerStore.getPeersByProtocol(proto)
trace "Selecting peers from peerstore",
protocol = proto, num_peers = peers.len, address = cast[uint](pm.switch.peerStore)
protocol = proto, num_peers = result.len, address = cast[uint](pm.switch.peerStore)

if shard.isSome():
let shardInfo = RelayShard.parse(shard.get()).valueOr:
trace "Failed to parse shard from pubsub topic", topic = shard.get()
return @[]

peers.keepItIf(
result.keepItIf(
(it.enr.isSome() and it.enr.get().containsShard(shard.get())) or
(it.shards.len > 0 and it.shards.contains(shardInfo.shardId))
)

shuffle(peers)
return peers
pm.sortByGriefScore(result)

proc selectPeer*(
pm: PeerManager, proto: string, shard: Option[PubsubTopic] = none(PubsubTopic)
Expand Down Expand Up @@ -640,7 +677,7 @@ proc connectToRelayPeers*(pm: PeerManager) {.async.} =

var outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))

shuffle(outsideBackoffPeers)
pm.sortByGriefScore(outsideBackoffPeers)

var index = 0
var numPendingConnReqs =
Expand Down
Loading
Loading