-
Notifications
You must be signed in to change notification settings - Fork 81
feat: implement Waku API Health spec (WIP) #3689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 40 commits
Commits
Show all changes
41 commits
Select commit
Hold shift + click to select a range
6c7f7f1
feat: implement Waku API Health spec (WIP)
fcecin 7a38194
Fix failing libwaku build
fcecin a441933
Improvements
fcecin 60ee9b1
Introduce api/send
NagyZoltanPeter 11c2ac3
Fix edge mode config and test added
NagyZoltanPeter e9df811
Fix some import issues, start and stop waku shall not throw exception…
NagyZoltanPeter 7e68a66
Utlize sync RequestBroker, adapt to non-async broker usage and gcsafe…
NagyZoltanPeter e8bbf56
add api_example app to examples2
NagyZoltanPeter 1f691ae
Adapt after merge from master
NagyZoltanPeter d27ce37
Adapt code for using broker context
NagyZoltanPeter 36fdba3
Fix brokerCtx settings for all usedbrokers, cover locked node init
NagyZoltanPeter 4b163ea
Various fixes upon test failures. Added initial of subscribe API and …
NagyZoltanPeter 5af834b
More test added
NagyZoltanPeter fc8e9f6
Fix multi propagate event emit, fix fail send test case
NagyZoltanPeter 3642646
Fix rebase
NagyZoltanPeter 8b9e7b8
Rename
fcecin 66ebce6
Fix PushMessageHandlers in tests
NagyZoltanPeter a97de82
adapt libwaku to api changes
NagyZoltanPeter 31586f5
Merge branch 'feat-waku-api-send' into feat/lmn-health-api
fcecin 2b14692
Merge branch 'master' into feat/lmn-health-api
fcecin 4333f69
Connect with Lmn Health API (WIP)
fcecin a834abb
Implement Waku Health API (WIP)
fcecin 8eee1ec
Improve topic health API
fcecin 8595b1c
stricter API availability check proc
fcecin f5e2b97
Add EventWakuPeer emitted by PeerManager
fcecin baf68a8
Health brokering improvements
fcecin 8d2bc13
Merge branch 'master' into feat/lmn-health-api
fcecin 345616f
Add missing API elements and improvements
fcecin 53830ae
Health API testing and fixes
fcecin 765cd60
Simplify NodeHealthMonitor creation
fcecin 2100e48
Fix var relay -> let relay
fcecin 8c66356
Merge branch 'chore/fix-health-monitor-ctor' into feat/lmn-health-api
fcecin 0cb2c33
Fixes from Ivan's review (partial, round 1)
fcecin 2f6c59e
Fixes from Ivan's review (round 2/2)
fcecin 4532b6a
Merge branch 'master' into feat/lmn-health-api
fcecin f0c1be7
fix lint
fcecin c28e4aa
Roll back passing brokerCtx in PeerManager and WakuRelay ctors
fcecin f999487
Add tests & wire RequestHealthReport
fcecin 51019a7
Fix RequestContentTopicsHealth provider and tests
fcecin 84e25e5
add RequestProtocolHealth edge mode smoke test
fcecin e658175
last review fixes
fcecin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| {.push raises: [].} | ||
|
|
||
| import system, std/json | ||
| import ./json_base_event | ||
| import ../../waku/api/types | ||
|
|
||
| type JsonConnectionStatusChangeEvent* = ref object of JsonEvent | ||
| status*: ConnectionStatus | ||
|
|
||
| proc new*( | ||
| T: type JsonConnectionStatusChangeEvent, status: ConnectionStatus | ||
| ): T = | ||
| return JsonConnectionStatusChangeEvent( | ||
| eventType: "node_health_change", | ||
| status: status | ||
| ) | ||
|
|
||
| method `$`*(event: JsonConnectionStatusChangeEvent): string = | ||
| $(%*event) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,3 @@ | ||
| {.used.} | ||
|
|
||
| import ./test_entry_nodes, ./test_node_conf | ||
| import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,296 @@ | ||
| {.used.} | ||
|
|
||
| import std/[options, sequtils, times] | ||
| import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo] | ||
| import ../testlib/[common, wakucore, wakunode, testasync] | ||
|
|
||
| import | ||
| waku, | ||
| waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context], | ||
| waku/node/health_monitor/[topic_health, health_status, protocol_health, health_report], | ||
| waku/requests/health_requests, | ||
| waku/requests/node_requests, | ||
| waku/events/health_events, | ||
| waku/common/waku_protocol, | ||
| waku/factory/waku_conf | ||
|
|
||
| const TestTimeout = chronos.seconds(10) | ||
| const DefaultShard = PubsubTopic("/waku/2/rs/1/0") | ||
| const TestContentTopic = ContentTopic("/waku/2/default-content/proto") | ||
|
|
||
| proc dummyHandler( | ||
| topic: PubsubTopic, msg: WakuMessage | ||
| ): Future[void] {.async, gcsafe.} = | ||
| discard | ||
|
|
||
| proc waitForConnectionStatus( | ||
| brokerCtx: BrokerContext, expected: ConnectionStatus | ||
| ): Future[void] {.async.} = | ||
| var future = newFuture[void]("waitForConnectionStatus") | ||
|
|
||
| let handler: EventConnectionStatusChangeListenerProc = proc( | ||
| e: EventConnectionStatusChange | ||
| ) {.async: (raises: []), gcsafe.} = | ||
| if not future.finished: | ||
| if e.connectionStatus == expected: | ||
| future.complete() | ||
|
|
||
| let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr: | ||
| raiseAssert error | ||
|
|
||
| try: | ||
| if not await future.withTimeout(TestTimeout): | ||
| raiseAssert "Timeout waiting for status: " & $expected | ||
| finally: | ||
| EventConnectionStatusChange.dropListener(brokerCtx, handle) | ||
|
|
||
| proc waitForShardHealthy( | ||
| brokerCtx: BrokerContext | ||
| ): Future[EventShardTopicHealthChange] {.async.} = | ||
| var future = newFuture[EventShardTopicHealthChange]("waitForShardHealthy") | ||
|
|
||
| let handler: EventShardTopicHealthChangeListenerProc = proc( | ||
| e: EventShardTopicHealthChange | ||
| ) {.async: (raises: []), gcsafe.} = | ||
| if not future.finished: | ||
| if e.health == TopicHealth.MINIMALLY_HEALTHY or | ||
| e.health == TopicHealth.SUFFICIENTLY_HEALTHY: | ||
| future.complete(e) | ||
|
|
||
| let handle = EventShardTopicHealthChange.listen(brokerCtx, handler).valueOr: | ||
| raiseAssert error | ||
|
|
||
| try: | ||
| if await future.withTimeout(TestTimeout): | ||
| return future.read() | ||
| else: | ||
| raiseAssert "Timeout waiting for shard health event" | ||
| finally: | ||
| EventShardTopicHealthChange.dropListener(brokerCtx, handle) | ||
|
|
||
| suite "LM API health checking": | ||
| var | ||
| serviceNode {.threadvar.}: WakuNode | ||
| client {.threadvar.}: Waku | ||
| servicePeerInfo {.threadvar.}: RemotePeerInfo | ||
|
|
||
| asyncSetup: | ||
| lockNewGlobalBrokerContext: | ||
| serviceNode = | ||
| newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) | ||
| (await serviceNode.mountRelay()).isOkOr: | ||
| raiseAssert error | ||
| serviceNode.mountMetadata(1, @[0'u16]).isOkOr: | ||
| raiseAssert error | ||
| await serviceNode.mountLibp2pPing() | ||
| await serviceNode.start() | ||
|
|
||
| servicePeerInfo = serviceNode.peerInfo.toRemotePeerInfo() | ||
| serviceNode.wakuRelay.subscribe(DefaultShard, dummyHandler) | ||
|
|
||
| lockNewGlobalBrokerContext: | ||
| let conf = NodeConfig.init( | ||
| mode = WakuMode.Core, | ||
| networkingConfig = | ||
| NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0), | ||
| protocolsConfig = ProtocolsConfig.init( | ||
| entryNodes = @[], | ||
| clusterId = 1'u16, | ||
| autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), | ||
| ), | ||
| ) | ||
|
|
||
| client = (await createNode(conf)).valueOr: | ||
| raiseAssert error | ||
| (await startWaku(addr client)).isOkOr: | ||
| raiseAssert error | ||
|
|
||
| asyncTeardown: | ||
| discard await client.stop() | ||
| await serviceNode.stop() | ||
|
|
||
| asyncTest "RequestShardTopicsHealth, check PubsubTopic health": | ||
| client.node.wakuRelay.subscribe(DefaultShard, dummyHandler) | ||
| await client.node.connectToNodes(@[servicePeerInfo]) | ||
|
|
||
| var isHealthy = false | ||
| let start = Moment.now() | ||
| while Moment.now() - start < TestTimeout: | ||
| let req = RequestShardTopicsHealth.request(client.brokerCtx, @[DefaultShard]).valueOr: | ||
| raiseAssert "RequestShardTopicsHealth failed" | ||
|
|
||
| if req.topicHealth.len > 0: | ||
| let h = req.topicHealth[0].health | ||
| if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY: | ||
| isHealthy = true | ||
| break | ||
| await sleepAsync(chronos.milliseconds(100)) | ||
|
|
||
| check isHealthy == true | ||
|
|
||
| asyncTest "RequestShardTopicsHealth, check disconnected PubsubTopic": | ||
| const GhostShard = PubsubTopic("/waku/2/rs/1/666") | ||
| client.node.wakuRelay.subscribe(GhostShard, dummyHandler) | ||
|
|
||
| let req = RequestShardTopicsHealth.request(client.brokerCtx, @[GhostShard]).valueOr: | ||
| raiseAssert "Request failed" | ||
|
|
||
| check req.topicHealth.len > 0 | ||
| check req.topicHealth[0].health == TopicHealth.UNHEALTHY | ||
|
|
||
| asyncTest "RequestProtocolHealth, check relay status": | ||
| await client.node.connectToNodes(@[servicePeerInfo]) | ||
|
|
||
| var isReady = false | ||
| let start = Moment.now() | ||
| while Moment.now() - start < TestTimeout: | ||
| let relayReq = await RequestProtocolHealth.request( | ||
| client.brokerCtx, WakuProtocol.RelayProtocol | ||
| ) | ||
| if relayReq.isOk() and relayReq.get().healthStatus.health == HealthStatus.READY: | ||
| isReady = true | ||
| break | ||
| await sleepAsync(chronos.milliseconds(100)) | ||
|
|
||
| check isReady == true | ||
|
|
||
| let storeReq = | ||
| await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol) | ||
| if storeReq.isOk(): | ||
| check storeReq.get().healthStatus.health != HealthStatus.READY | ||
|
|
||
| asyncTest "RequestProtocolHealth, check unmounted protocol": | ||
| let req = | ||
| await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol) | ||
| check req.isOk() | ||
|
|
||
| let status = req.get().healthStatus | ||
| check status.health == HealthStatus.NOT_MOUNTED | ||
| check status.desc.isNone() | ||
|
|
||
| asyncTest "RequestConnectionStatus, check connectivity state": | ||
| let initialReq = RequestConnectionStatus.request(client.brokerCtx).valueOr: | ||
| raiseAssert "RequestConnectionStatus failed" | ||
| check initialReq.connectionStatus == ConnectionStatus.Disconnected | ||
|
|
||
| await client.node.connectToNodes(@[servicePeerInfo]) | ||
|
|
||
| var isConnected = false | ||
| let start = Moment.now() | ||
| while Moment.now() - start < TestTimeout: | ||
| let req = RequestConnectionStatus.request(client.brokerCtx).valueOr: | ||
| raiseAssert "RequestConnectionStatus failed" | ||
|
|
||
| if req.connectionStatus == ConnectionStatus.PartiallyConnected or | ||
| req.connectionStatus == ConnectionStatus.Connected: | ||
| isConnected = true | ||
| break | ||
| await sleepAsync(chronos.milliseconds(100)) | ||
|
|
||
| check isConnected == true | ||
|
|
||
| asyncTest "EventConnectionStatusChange, detect connect and disconnect": | ||
| let connectFuture = | ||
| waitForConnectionStatus(client.brokerCtx, ConnectionStatus.PartiallyConnected) | ||
|
|
||
| await client.node.connectToNodes(@[servicePeerInfo]) | ||
| await connectFuture | ||
|
|
||
| let disconnectFuture = | ||
| waitForConnectionStatus(client.brokerCtx, ConnectionStatus.Disconnected) | ||
| await client.node.disconnectNode(servicePeerInfo) | ||
| await disconnectFuture | ||
|
|
||
| asyncTest "EventShardTopicHealthChange, detect health improvement": | ||
| client.node.wakuRelay.subscribe(DefaultShard, dummyHandler) | ||
|
|
||
| let healthEventFuture = waitForShardHealthy(client.brokerCtx) | ||
|
|
||
| await client.node.connectToNodes(@[servicePeerInfo]) | ||
|
|
||
| let event = await healthEventFuture | ||
| check event.topic == DefaultShard | ||
|
|
||
| asyncTest "RequestHealthReport, check aggregate report": | ||
| let req = await RequestHealthReport.request(client.brokerCtx) | ||
|
|
||
| check req.isOk() | ||
|
|
||
| let report = req.get().healthReport | ||
| check report.nodeHealth == HealthStatus.READY | ||
| check report.protocolsHealth.len > 0 | ||
| check report.protocolsHealth.anyIt(it.protocol == $WakuProtocol.RelayProtocol) | ||
|
|
||
| asyncTest "RequestContentTopicsHealth, smoke test": | ||
| let fictionalTopic = ContentTopic("/waku/2/this-does-not-exist/proto") | ||
|
|
||
| let req = RequestContentTopicsHealth.request(client.brokerCtx, @[fictionalTopic]) | ||
|
|
||
| check req.isOk() | ||
|
|
||
| let res = req.get() | ||
| check res.contentTopicHealth.len == 1 | ||
| check res.contentTopicHealth[0].topic == fictionalTopic | ||
| check res.contentTopicHealth[0].health == TopicHealth.NOT_SUBSCRIBED | ||
|
|
||
| asyncTest "RequestContentTopicsHealth, core mode trivial 1-shard autosharding": | ||
| let cTopic = ContentTopic("/waku/2/my-content-topic/proto") | ||
|
|
||
| let shardReq = | ||
| RequestRelayShard.request(client.brokerCtx, none(PubsubTopic), cTopic) | ||
|
|
||
| check shardReq.isOk() | ||
| let targetShard = $shardReq.get().relayShard | ||
|
|
||
| client.node.wakuRelay.subscribe(targetShard, dummyHandler) | ||
| serviceNode.wakuRelay.subscribe(targetShard, dummyHandler) | ||
|
|
||
| await client.node.connectToNodes(@[servicePeerInfo]) | ||
|
|
||
| var isHealthy = false | ||
| let start = Moment.now() | ||
| while Moment.now() - start < TestTimeout: | ||
| let req = RequestContentTopicsHealth.request(client.brokerCtx, @[cTopic]).valueOr: | ||
| raiseAssert "Request failed" | ||
|
|
||
| if req.contentTopicHealth.len > 0: | ||
| let h = req.contentTopicHealth[0].health | ||
| if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY: | ||
| isHealthy = true | ||
| break | ||
|
|
||
| await sleepAsync(chronos.milliseconds(100)) | ||
|
|
||
| check isHealthy == true | ||
|
|
||
| asyncTest "RequestProtocolHealth, edge mode smoke test": | ||
| var edgeWaku: Waku | ||
|
|
||
| lockNewGlobalBrokerContext: | ||
| let edgeConf = NodeConfig.init( | ||
| mode = WakuMode.Edge, | ||
| networkingConfig = | ||
| NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0), | ||
| protocolsConfig = ProtocolsConfig.init( | ||
| entryNodes = @[], | ||
| clusterId = 1'u16, | ||
| messageValidation = | ||
| MessageValidation(maxMessageSize: "150 KiB", rlnConfig: none(RlnConfig)), | ||
| ), | ||
| ) | ||
|
|
||
| edgeWaku = (await createNode(edgeConf)).valueOr: | ||
| raiseAssert "Failed to create edge node: " & error | ||
|
|
||
| (await startWaku(addr edgeWaku)).isOkOr: | ||
| raiseAssert "Failed to start edge waku: " & error | ||
|
|
||
| let relayReq = await RequestProtocolHealth.request( | ||
| edgeWaku.brokerCtx, WakuProtocol.RelayProtocol | ||
| ) | ||
| check relayReq.isOk() | ||
| check relayReq.get().healthStatus.health == HealthStatus.NOT_MOUNTED | ||
|
|
||
| check not edgeWaku.node.wakuFilterClient.isNil() | ||
|
|
||
| discard await edgeWaku.stop() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.