From 01574ab941d2ccb82c583a78b297d1a61efe3ed7 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 27 Feb 2026 11:34:48 +0530 Subject: [PATCH] fix(kad): add configurable option to purge stale entries on bucketrefresh interval --- libp2p/protocols/kademlia.nim | 6 +- libp2p/protocols/kademlia/routingtable.nim | 28 +++++++- libp2p/protocols/kademlia/types.nim | 7 ++ tests/libp2p/kademlia/test_routingtable.nim | 71 +++++++++++++++++++++ 4 files changed, 110 insertions(+), 2 deletions(-) diff --git a/libp2p/protocols/kademlia.nim b/libp2p/protocols/kademlia.nim index 1f91825f24..a58cd7b4f5 100644 --- a/libp2p/protocols/kademlia.nim +++ b/libp2p/protocols/kademlia.nim @@ -21,6 +21,8 @@ proc bootstrap*( ## Sends a findNode to find itself to keep nearby peers up to date ## Also sends a findNode to find a random key for each non-empty k-bucket + kad.rtable.purgeExpired() + discard await kad.findNode(kad.rtable.selfId) # Snapshot bucket count. findNode() can grow buckets and mutate length. @@ -55,7 +57,9 @@ proc new*( ): T {.raises: [].} = var rtable = RoutingTable.new( switch.peerInfo.peerId.toKey(), - config = RoutingTableConfig.new(replication = config.replication), + config = RoutingTableConfig.new( + replication = config.replication, purgeStaleEntries = config.purgeStaleEntries + ), ) let kad = T( rng: rng, diff --git a/libp2p/protocols/kademlia/routingtable.nim b/libp2p/protocols/kademlia/routingtable.nim index c74a9e81c6..c57723eb3f 100644 --- a/libp2p/protocols/kademlia/routingtable.nim +++ b/libp2p/protocols/kademlia/routingtable.nim @@ -18,8 +18,14 @@ proc new*( replication = DefaultReplication, hasher: Opt[XorDHasher] = NoneHasher, maxBuckets: int = DefaultMaxBuckets, + purgeStaleEntries: bool = false, ): T = - RoutingTableConfig(replication: replication, hasher: hasher, maxBuckets: maxBuckets) + RoutingTableConfig( + replication: replication, + hasher: hasher, + maxBuckets: maxBuckets, + purgeStaleEntries: purgeStaleEntries, + ) proc `$`*(rt: RoutingTable): string = "selfId(" & $rt.selfId & ") buckets(" & $rt.buckets & ")" @@ -134,6 +140,26 @@ proc findClosestPeerIds*(rtable: RoutingTable, targetId: Key, count: int): seq[P .filterIt(it.isOk) .mapIt(it.value()) +proc purgeExpired*(rtable: var RoutingTable) = + ## Remove entries from all buckets that have not been refreshed within + ## DefaultBucketStaleTime. No-op if purgeStaleEntries is false. + if not rtable.config.purgeStaleEntries: + return + + let now = Moment.now() + var totalPurged = 0 + for i in 0 ..< rtable.buckets.len: + let before = rtable.buckets[i].peers.len + rtable.buckets[i].peers.keepItIf(now - it.lastSeen <= DefaultBucketStaleTime) + let purged = before - rtable.buckets[i].peers.len + if purged > 0: + debug "Purged stale routing table entries", bucketIdx = i, count = purged + totalPurged += purged + + if totalPurged > 0: + kad_routing_table_replacements.inc(totalPurged) + updateRoutingTableMetrics(rtable) + proc isStale*(bucket: Bucket): bool = if bucket.peers.len == 0: return true diff --git a/libp2p/protocols/kademlia/types.nim b/libp2p/protocols/kademlia/types.nim index 4dc7713fe5..e8ede95688 100644 --- a/libp2p/protocols/kademlia/types.nim +++ b/libp2p/protocols/kademlia/types.nim @@ -167,6 +167,7 @@ type replication*: int hasher*: Opt[XorDHasher] maxBuckets*: int + purgeStaleEntries*: bool RoutingTable* = ref object selfId*: Key @@ -296,6 +297,10 @@ type KadDHTConfig* = ref object republishProvidedKeysInterval*: chronos.Duration cleanupProvidersInterval*: chronos.Duration providerExpirationInterval*: chronos.Duration + purgeStaleEntries*: bool + ## When true, routing table entries not refreshed within + ## DefaultBucketStaleTime are removed during each bootstrap cycle. + ## Defaults to false to preserve existing behaviour. proc new*( T: typedesc[KadDHTConfig], @@ -312,6 +317,7 @@ proc new*( republishProvidedKeysInterval: chronos.Duration = DefaultRepublishInterval, cleanupProvidersInterval: chronos.Duration = DefaultCleanupProvidersInterval, providerExpirationInterval: chronos.Duration = DefaultProviderExpirationInterval, + purgeStaleEntries: bool = false, ): T {.raises: [].} = KadDHTConfig( validator: validator, @@ -327,6 +333,7 @@ proc new*( republishProvidedKeysInterval: republishProvidedKeysInterval, cleanupProvidersInterval: cleanupProvidersInterval, providerExpirationInterval: providerExpirationInterval, + purgeStaleEntries: purgeStaleEntries, ) type KadDHT* = ref object of LPProtocol diff --git a/tests/libp2p/kademlia/test_routingtable.nim b/tests/libp2p/kademlia/test_routingtable.nim index 0917b8c0f0..ea9ae0d2a1 100644 --- a/tests/libp2p/kademlia/test_routingtable.nim +++ b/tests/libp2p/kademlia/test_routingtable.nim @@ -137,3 +137,74 @@ suite "KadDHT Routing Table": check: idx == TargetBucket rid != selfId + + test "purgeExpired is no-op when purgeStaleEntries is false": + let selfId = testKey(0) + let config = RoutingTableConfig.new(hasher = Opt.some(noOpHasher)) + var rt = RoutingTable.new(selfId, config) + + let kid = randomKeyInBucket(selfId, TargetBucket, rng[]) + discard rt.insert(kid) + rt.buckets[TargetBucket].peers[0].lastSeen = + Moment.now() - DefaultBucketStaleTime - 1.seconds + + rt.purgeExpired() + + check rt.buckets[TargetBucket].peers.len == 1 + + test "purgeExpired removes stale entries when purgeStaleEntries is true": + let selfId = testKey(0) + let config = + RoutingTableConfig.new(hasher = Opt.some(noOpHasher), purgeStaleEntries = true) + var rt = RoutingTable.new(selfId, config) + + let kid = randomKeyInBucket(selfId, TargetBucket, rng[]) + discard rt.insert(kid) + rt.buckets[TargetBucket].peers[0].lastSeen = + Moment.now() - DefaultBucketStaleTime - 1.seconds + + rt.purgeExpired() + + check rt.buckets[TargetBucket].peers.len == 0 + + test "purgeExpired keeps fresh entries": + let selfId = testKey(0) + let config = + RoutingTableConfig.new(hasher = Opt.some(noOpHasher), purgeStaleEntries = true) + var rt = RoutingTable.new(selfId, config) + + let stale = randomKeyInBucket(selfId, TargetBucket, rng[]) + let fresh = randomKeyInBucket(selfId, TargetBucket, rng[]) + discard rt.insert(stale) + discard rt.insert(fresh) + + rt.buckets[TargetBucket].peers[0].lastSeen = + Moment.now() - DefaultBucketStaleTime - 1.seconds + + rt.purgeExpired() + + check: + rt.buckets[TargetBucket].peers.len == 1 + rt.buckets[TargetBucket].peers[0].nodeId == fresh + + test "purgeExpired removes stale entries across multiple buckets": + let selfId = testKey(0) + let config = + RoutingTableConfig.new(hasher = Opt.some(noOpHasher), purgeStaleEntries = true) + var rt = RoutingTable.new(selfId, config) + + let kid1 = randomKeyInBucket(selfId, TargetBucket, rng[]) + let kid2 = randomKeyInBucket(selfId, TargetBucket + 1, rng[]) + discard rt.insert(kid1) + discard rt.insert(kid2) + + rt.buckets[TargetBucket].peers[0].lastSeen = + Moment.now() - DefaultBucketStaleTime - 1.seconds + rt.buckets[TargetBucket + 1].peers[0].lastSeen = + Moment.now() - DefaultBucketStaleTime - 1.seconds + + rt.purgeExpired() + + check: + rt.buckets[TargetBucket].peers.len == 0 + rt.buckets[TargetBucket + 1].peers.len == 0