-
Notifications
You must be signed in to change notification settings - Fork 81
chore(examples): add pubsub example with production env #1333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| # basic2 | ||
|
|
||
| TODO | ||
|
|
||
| # publisher/subscriber | ||
|
|
||
| Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publises messages to the default pubsub topic to a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives. | ||
|
|
||
| **Some notes:** | ||
| * These examples are meant to work even in if you are behind a firewall and you can't be discovered by discv5. | ||
| * You only need to provide a reachable bootstrap peer (see our [fleets](https://fleets.status.im/)) | ||
| * The examples are meant to work out of the box. | ||
| * Note that both services wait for some time until a given minimum amount of connections are reached. This is to ensure messages are gossiped. | ||
|
|
||
| **Compile:** | ||
|
|
||
| Make all examples. | ||
| ```console | ||
| make example2 | ||
| ``` | ||
|
|
||
| **Run:** | ||
|
|
||
| Wait until the subscriber is ready. | ||
| ```console | ||
| ./build/subscriber | ||
| ``` | ||
|
|
||
| And run a publisher | ||
| ```console | ||
| ./build/publisher | ||
| ``` | ||
|
|
||
| See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| import | ||
| std/[tables,times,sequtils], | ||
| stew/byteutils, | ||
| stew/shims/net, | ||
| chronicles, | ||
| chronicles/topics_registry, | ||
| chronos, | ||
| confutils, | ||
| libp2p/crypto/crypto, | ||
| eth/keys, | ||
| eth/p2p/discoveryv5/enr | ||
|
|
||
| import | ||
| ../../../waku/v2/node/discv5/waku_discv5, | ||
| ../../../waku/v2/node/peer_manager/peer_manager, | ||
| ../../../waku/v2/node/waku_node, | ||
| ../../../waku/v2/protocol/waku_message, | ||
| ../../../waku/v2/utils/time, | ||
| ../../../waku/v2/utils/wakuenr | ||
|
|
||
| proc now*(): Timestamp = | ||
| getNanosecondTime(getTime().toUnixFloat()) | ||
|
|
||
| # An accesible bootstrap node. See wakuv2.prod fleets.status.im | ||
| const bootstrapNodes = @["enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"] | ||
|
|
||
| # careful if running pub and sub in the same machine | ||
| const wakuPort = 60000 | ||
| const discv5Port = 9000 | ||
|
|
||
| proc setupAndPublish() {.async.} = | ||
| # use notice to filter all waku messaging | ||
| setLogLevel(LogLevel.NOTICE) | ||
| notice "starting publisher", wakuPort=wakuPort, discv5Port=discv5Port | ||
| let | ||
| nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] | ||
| ip = ValidIpAddress.init("0.0.0.0") | ||
| node = WakuNode.new(nodeKey, ip, Port(wakuPort)) | ||
| flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true) | ||
|
|
||
| # assumes behind a firewall, so not care about being discoverable | ||
| node.wakuDiscv5 = WakuDiscoveryV5.new( | ||
| extIp= none(ValidIpAddress), | ||
| extTcpPort = none(Port), | ||
| extUdpPort = none(Port), | ||
| bindIP = ip, | ||
| discv5UdpPort = Port(discv5Port), | ||
| bootstrapNodes = bootstrapNodes, | ||
| privateKey = keys.PrivateKey(nodeKey.skkey), | ||
| flags = flags, | ||
| enrFields = [], | ||
| rng = node.rng) | ||
|
|
||
| await node.start() | ||
| await node.mountRelay() | ||
| if not await node.startDiscv5(): | ||
| error "failed to start discv5" | ||
| quit(1) | ||
|
|
||
| # wait for a minimum of peers to be connected, otherwise messages wont be gossiped | ||
| while true: | ||
| let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected) | ||
| if numConnectedPeers >= 6: | ||
| notice "publisher is ready", connectedPeers=numConnectedPeers, required=6 | ||
| break | ||
| notice "waiting to be ready", connectedPeers=numConnectedPeers, required=6 | ||
| await sleepAsync(5000) | ||
|
|
||
| # Make sure it matches the publisher. Use default value | ||
| # see spec: https://rfc.vac.dev/spec/23/ | ||
| let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto") | ||
|
|
||
| # any content topic can be chosen | ||
| let contentTopic = ContentTopic("/examples/1/pubsub-example/proto") | ||
|
|
||
| notice "publisher service started" | ||
| while true: | ||
| let text = "hi there i'm a publisher" | ||
| let message = WakuMessage(payload: toBytes(text), # content of the message | ||
| contentTopic: contentTopic, # content topic to publish to | ||
| ephemeral: true, # tell store nodes to not store it | ||
| timestamp: now()) # current timestamp | ||
| await node.publish(pubSubTopic, message) | ||
| notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic | ||
| await sleepAsync(5000) | ||
|
|
||
| asyncSpawn setupAndPublish() | ||
| runForever() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| import | ||
| std/[tables, sequtils], | ||
| stew/byteutils, | ||
| stew/shims/net, | ||
| chronicles, | ||
| chronicles/topics_registry, | ||
| chronos, | ||
| confutils, | ||
| libp2p/crypto/crypto, | ||
| eth/keys, | ||
| eth/p2p/discoveryv5/enr | ||
|
|
||
| import | ||
| ../../../waku/v2/node/discv5/waku_discv5, | ||
| ../../../waku/v2/node/peer_manager/peer_manager, | ||
| ../../../waku/v2/node/waku_node, | ||
| ../../../waku/v2/protocol/waku_message, | ||
| ../../../waku/v2/utils/wakuenr | ||
|
|
||
| # An accesible bootstrap node. See wakuv2.prod fleets.status.im | ||
| const bootstrapNodes = @["enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"] | ||
|
|
||
| # careful if running pub and sub in the same machine | ||
| const wakuPort = 50000 | ||
| const discv5Port = 8000 | ||
|
|
||
| proc setupAndSubscribe() {.async.} = | ||
| # use notice to filter all waku messaging | ||
| setLogLevel(LogLevel.NOTICE) | ||
| notice "starting subscriber", wakuPort=wakuPort, discv5Port=discv5Port | ||
| let | ||
| nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] | ||
| ip = ValidIpAddress.init("0.0.0.0") | ||
| node = WakuNode.new(nodeKey, ip, Port(wakuPort)) | ||
| flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true) | ||
|
|
||
| # assumes behind a firewall, so not care about being discoverable | ||
| node.wakuDiscv5 = WakuDiscoveryV5.new( | ||
| extIp= none(ValidIpAddress), | ||
| extTcpPort = none(Port), | ||
| extUdpPort = none(Port), | ||
| bindIP = ip, | ||
| discv5UdpPort = Port(discv5Port), | ||
| bootstrapNodes = bootstrapNodes, | ||
| privateKey = keys.PrivateKey(nodeKey.skkey), | ||
| flags = flags, | ||
| enrFields = [], | ||
| rng = node.rng) | ||
|
|
||
| await node.start() | ||
| await node.mountRelay() | ||
| if not await node.startDiscv5(): | ||
| error "failed to start discv5" | ||
| quit(1) | ||
|
|
||
| # wait for a minimum of peers to be connected, otherwise messages wont be gossiped | ||
| while true: | ||
| let numConnectedPeers = node.peerManager.peerStore.connectionBook.book.values().countIt(it == Connected) | ||
| if numConnectedPeers >= 6: | ||
| notice "subscriber is ready", connectedPeers=numConnectedPeers, required=6 | ||
| break | ||
| notice "waiting to be ready", connectedPeers=numConnectedPeers, required=6 | ||
| await sleepAsync(5000) | ||
|
|
||
| # Make sure it matches the publisher. Use default value | ||
| # see spec: https://rfc.vac.dev/spec/23/ | ||
| let pubSubTopic = PubsubTopic("/waku/2/default-waku/proto") | ||
|
|
||
| # any content topic can be chosen. make sure it matches the publisher | ||
| let contentTopic = ContentTopic("/examples/1/pubsub-example/proto") | ||
|
|
||
| proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = | ||
| let message = WakuMessage.init(data).value | ||
| let payloadStr = string.fromBytes(message.payload) | ||
| if message.contentTopic == contentTopic: | ||
| notice "message received", payload=payloadStr, | ||
| pubsubTopic=pubsubTopic, | ||
| contentTopic=message.contentTopic, | ||
| timestamp=message.timestamp | ||
| node.subscribe(pubSubTopic, handler) | ||
|
|
||
| asyncSpawn setupAndSubscribe() | ||
|
|
||
| runForever() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it just me doing something dumb, or does Github not render markdown headers correctly if the first letter is not capitalised (which would be annoying)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Github doesn't render them looks like :0