Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 126 additions & 16 deletions tests/waku_core/topics/test_sharding.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ suite "Autosharding":
suite "getGenZeroShard":
test "Generate Gen0 Shard":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)

# Given two valid topics
let
Expand Down Expand Up @@ -68,7 +68,7 @@ suite "Autosharding":
suite "getShard from NsContentTopic":
test "Generate Gen0 Shard with topic.generation==none":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)

# When we get a shard from a topic without generation
let shard = sharding.getShard(contentTopicShort)
Expand All @@ -79,7 +79,7 @@ suite "Autosharding":

test "Generate Gen0 Shard with topic.generation==0":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When we get a shard from a gen0 topic
let shard = sharding.getShard(contentTopicFull)

Expand All @@ -89,7 +89,7 @@ suite "Autosharding":

test "Generate Gen0 Shard with topic.generation==other":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When we get a shard from ain invalid content topic
let shard = sharding.getShard(contentTopicInvalid)

Expand All @@ -100,7 +100,7 @@ suite "Autosharding":
suite "getShard from ContentTopic":
test "Generate Gen0 Shard with topic.generation==none":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When we get a shard from it
let shard = sharding.getShard(contentTopicShort)

Expand All @@ -110,7 +110,7 @@ suite "Autosharding":

test "Generate Gen0 Shard with topic.generation==0":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When we get a shard from it
let shard = sharding.getShard(contentTopicFull)

Expand All @@ -120,7 +120,7 @@ suite "Autosharding":

test "Generate Gen0 Shard with topic.generation==other":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When we get a shard from it
let shard = sharding.getShard(contentTopicInvalid)

Expand All @@ -130,18 +130,18 @@ suite "Autosharding":

test "Generate Gen0 Shard invalid topic":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When we get a shard from it
let shard = sharding.getShard("invalid")

# Then the generated shard is valid
check:
shard.error() == "invalid format: topic must start with slash"
shard.error() == "invalid format: content-topic 'invalid' must start with slash"

suite "parseSharding":
xsuite "parseSharding":
test "contentTopics is ContentTopic":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When calling with contentTopic as string
let topicMap = sharding.parseSharding(some(pubsubTopic04), contentTopicShort)

Expand All @@ -151,7 +151,7 @@ suite "Autosharding":

test "contentTopics is seq[ContentTopic]":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When calling with contentTopic as string seq
let topicMap = sharding.parseSharding(
some(pubsubTopic04), @[contentTopicShort, "/0/foo/1/bar/proto"]
Expand All @@ -163,7 +163,7 @@ suite "Autosharding":

test "pubsubTopic is none":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When calling with pubsubTopic as none
let topicMap = sharding.parseSharding(PubsubTopic.none(), contentTopicShort)

Expand All @@ -173,7 +173,7 @@ suite "Autosharding":

test "content parse error":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When calling with pubsubTopic as none with invalid content
let topicMap = sharding.parseSharding(PubsubTopic.none(), "invalid")

Expand All @@ -184,7 +184,7 @@ suite "Autosharding":

test "pubsubTopic parse error":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When calling with pubsubTopic as none with invalid content
let topicMap = sharding.parseSharding(some("invalid"), contentTopicShort)

Expand All @@ -195,7 +195,7 @@ suite "Autosharding":

test "pubsubTopic getShard error":
let sharding =
Sharding(clusterId: ClusterId, shardCountGenZero: GenerationZeroShardsCount)
Sharding.init(ClusterId, GenerationZeroShardsCount)
# When calling with pubsubTopic as none with invalid content
let topicMap = sharding.parseSharding(PubsubTopic.none(), contentTopicInvalid)

Expand All @@ -207,3 +207,113 @@ suite "Autosharding":
xtest "catchable error on add to topicMap":
# TODO: Trigger a CatchableError or mock
discard

suite "Arbitrary sharder network, auto shard selection":
const arbitraryShards = @[2'u16, 4, 8, 16, 32, 64, 128, 256]

test "Initialize with arbitrary shard list":
# When we initialize sharding with a custom shard list
let sharding = Sharding.init(ClusterId, arbitraryShards)

# Given valid content topics
let
nsContentTopic1 = NsContentTopic.parse(contentTopicShort).value()
nsContentTopic2 = NsContentTopic.parse(contentTopicFull).value()
nsContentTopic3 = NsContentTopic.parse(contentTopicShort2).value()
nsContentTopic4 = NsContentTopic.parse(contentTopicFull2).value()
nsContentTopic5 = NsContentTopic.parse(contentTopicShort3).value()
nsContentTopic6 = NsContentTopic.parse(contentTopicFull4).value()

# When we generate shards from them
let
shard1 = sharding.getGenZeroShard(nsContentTopic1, arbitraryShards.len)
shard2 = sharding.getGenZeroShard(nsContentTopic2, arbitraryShards.len)
shard3 = sharding.getGenZeroShard(nsContentTopic3, arbitraryShards.len)
shard4 = sharding.getGenZeroShard(nsContentTopic4, arbitraryShards.len)
shard5 = sharding.getGenZeroShard(nsContentTopic5, arbitraryShards.len)
shard6 = sharding.getGenZeroShard(nsContentTopic6, arbitraryShards.len)

# Then the generated shards use IDs from the arbitrary list
check:
shard1 == RelayShard(clusterId: ClusterId, shardId: 16)
shard2 == RelayShard(clusterId: ClusterId, shardId: 16)
shard3 == RelayShard(clusterId: ClusterId, shardId: 128)
shard4 == RelayShard(clusterId: ClusterId, shardId: 128)
shard5 == RelayShard(clusterId: ClusterId, shardId: 16)
shard6 == RelayShard(clusterId: ClusterId, shardId: 256)

test "getShard with arbitrary shard list - generation none":
# When we initialize sharding with a custom shard list
let sharding = Sharding.init(ClusterId, arbitraryShards)

# When we get a shard from a topic without generation
let shard = sharding.getShard(contentTopicShort)

# Then the generated shard uses an ID from the arbitrary list
check:
shard.value() == RelayShard(clusterId: ClusterId, shardId: 16)

test "getShard with arbitrary shard list - generation zero":
# When we initialize sharding with a custom shard list
let sharding = Sharding.init(ClusterId, arbitraryShards)

# When we get a shard from a gen0 topic
let shard = sharding.getShard(contentTopicFull)

# Then the generated shard uses an ID from the arbitrary list
check:
shard.value() == RelayShard(clusterId: ClusterId, shardId: 16)

test "Multiple topics map to shards from arbitrary list":
# When we initialize sharding with a custom shard list
let sharding = Sharding.init(ClusterId, arbitraryShards)

# Given multiple content topics
let contentTopics = @[
contentTopicShort,
contentTopicFull,
contentTopicShort2,
contentTopicFull2,
contentTopicShort3,
contentTopicFull3,
]

# When we get shards for all topics
let shards = @[
sharding.getShard(contentTopicShort).value(),
sharding.getShard(contentTopicFull).value(),
sharding.getShard(contentTopicShort2).value(),
sharding.getShard(contentTopicFull2).value(),
sharding.getShard(contentTopicShort3).value(),
sharding.getShard(contentTopicFull3).value(),
]

# Then all shard IDs match expected values from the arbitrary list
check:
shards[0] == RelayShard(clusterId: ClusterId, shardId: 16)
shards[1] == RelayShard(clusterId: ClusterId, shardId: 16)
shards[2] == RelayShard(clusterId: ClusterId, shardId: 128)
shards[3] == RelayShard(clusterId: ClusterId, shardId: 128)
shards[4] == RelayShard(clusterId: ClusterId, shardId: 16)
shards[5] == RelayShard(clusterId: ClusterId, shardId: 16)

test "Consistent shard mapping with arbitrary list":
# When we initialize sharding with a custom shard list
let sharding = Sharding.init(ClusterId, arbitraryShards)

# Given a content topic
let topic = contentTopicShort

# When we get the shard multiple times
let
shard1 = sharding.getShard(topic)
shard2 = sharding.getShard(topic)
shard3 = sharding.getShard(topic)

# Then the shard is consistent
check:
shard1.isOk()
shard2.isOk()
shard3.isOk()
shard1.value() == shard2.value()
shard2.value() == shard3.value()
2 changes: 1 addition & 1 deletion tests/waku_lightpush/lightpush_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ proc newTestWakuLightpushNode*(
): Future[WakuLightPush] {.async.} =
let
peerManager = PeerManager.new(switch)
wakuAutoSharding = Sharding(clusterId: 1, shardCountGenZero: 8)
wakuAutoSharding = Sharding.init(clusterId = 1, shardCount = 8)
proto = WakuLightPush.new(
peerManager, rng, handler, some(wakuAutoSharding), rateLimitSetting
)
Expand Down
3 changes: 2 additions & 1 deletion waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ proc setupProtocols(
node.mountAutoSharding(conf.clusterId, conf.shardingConf.numShardsInCluster).isOkOr:
return err("failed to mount waku auto sharding: " & error)
else:
warn("Auto sharding is disabled")
node.mountAutoSharding(conf.clusterId, conf.subscribeShards).isOkOr:
return err("failed to mount waku auto sharding: " & error)

# Mount relay on all nodes
var peerExchangeHandler = none(RoutingRecordsHandler)
Expand Down
11 changes: 9 additions & 2 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,15 @@ proc mountAutoSharding*(
node: WakuNode, clusterId: uint16, shardCount: uint32
): Result[void, string] =
info "mounting auto sharding", clusterId = clusterId, shardCount = shardCount
node.wakuAutoSharding =
some(Sharding(clusterId: clusterId, shardCountGenZero: shardCount))
node.wakuAutoSharding = some(Sharding.init(clusterId, shardCount))

return ok()

proc mountAutoSharding*(
node: WakuNode, clusterId: uint16, shards: seq[uint16]
): Result[void, string] =
info "mounting auto sharding", clusterId = clusterId, shards = shards
node.wakuAutoSharding = some(Sharding.init(clusterId, shards))

return ok()

Expand Down
26 changes: 19 additions & 7 deletions waku/waku_core/topics/sharding.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@

{.push raises: [].}

import nimcrypto, std/options, std/tables, stew/endians2, results, stew/byteutils
import nimcrypto, std/[options, tables, sequtils], stew/[endians2, byteutils], results

import ./content_topic, ./pubsub_topic

# TODO: this is autosharding, not just "sharding"
type Sharding* = object
clusterId*: uint16
clusterId: uint16
# TODO: generations could be stored in a table here
shardCountGenZero*: uint32

proc new*(T: type Sharding, clusterId: uint16, shardCount: uint32): T =
return Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
shardCountGenZero: uint32
supportedShards: seq[uint16]

proc init*(T: typedesc[Sharding], clusterId: uint16, shardCount: uint32): T =
return Sharding(
clusterId: clusterId,
shardCountGenZero: shardCount,
supportedShards: toSeq(0'u16 ..< uint16(shardCount)),
)

proc init*(T: typedesc[Sharding], clusterId: uint16, supportedShards: seq[uint16]): T =
return Sharding(
clusterId: clusterId,
shardCountGenZero: uint32(supportedShards.len),
supportedShards: supportedShards,
)

proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShard =
let bytes = toBytes(topic.application) & toBytes(topic.version)
Expand All @@ -27,7 +39,7 @@ proc getGenZeroShard*(s: Sharding, topic: NsContentTopic, count: int): RelayShar

let shard = hashValue mod uint64(count)

RelayShard(clusterId: s.clusterId, shardId: uint16(shard))
RelayShard(clusterId: s.clusterId, shardId: s.supportedShards[shard])

proc getShard*(s: Sharding, topic: NsContentTopic): Result[RelayShard, string] =
## Compute the (pubsub topic) shard to use for this content topic.
Expand Down
Loading