-
Notifications
You must be signed in to change notification settings - Fork 81
feat: implement Waku API Health spec (WIP) #3689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 28 commits
6c7f7f1
7a38194
a441933
60ee9b1
11c2ac3
e9df811
7e68a66
e8bbf56
1f691ae
d27ce37
36fdba3
4b163ea
5af834b
fc8e9f6
3642646
8b9e7b8
66ebce6
a97de82
31586f5
2b14692
4333f69
a834abb
8eee1ec
8595b1c
f5e2b97
baf68a8
8d2bc13
345616f
53830ae
765cd60
2100e48
8c66356
0cb2c33
2f6c59e
4532b6a
f0c1be7
c28e4aa
f999487
51019a7
84e25e5
e658175
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| {.push raises: [].} | ||
|
|
||
| import system, std/json | ||
| import ./json_base_event | ||
| import ../../waku/api/types | ||
|
|
||
| type JsonConnectionStatusChangeEvent* = ref object of JsonEvent | ||
| status*: ConnectionStatus | ||
|
|
||
| proc new*( | ||
| T: type JsonConnectionStatusChangeEvent, status: ConnectionStatus | ||
| ): T = | ||
| return JsonConnectionStatusChangeEvent( | ||
| eventType: "node_health_change", | ||
| status: status | ||
| ) | ||
|
|
||
| method `$`*(event: JsonConnectionStatusChangeEvent): string = | ||
| $(%*event) |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,324 @@ | ||||||
| {.used.} | ||||||
|
|
||||||
| import | ||||||
| std/[json, options, sequtils, strutils, tables], testutils/unittests, chronos, results | ||||||
|
|
||||||
| import | ||||||
| waku/[ | ||||||
| waku_core, | ||||||
| common/waku_protocol, | ||||||
| node/waku_node, | ||||||
| node/peer_manager, | ||||||
| node/health_monitor/health_status, | ||||||
| node/health_monitor/connection_status, | ||||||
| node/health_monitor/protocol_health, | ||||||
| node/health_monitor/node_health_monitor, | ||||||
| node/kernel_api/relay, | ||||||
| node/kernel_api/store, | ||||||
| node/kernel_api/lightpush, | ||||||
| node/kernel_api/filter, | ||||||
| waku_archive, | ||||||
| ] | ||||||
|
|
||||||
| import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils | ||||||
|
|
||||||
| proc p(kind: WakuProtocol, health: HealthStatus): ProtocolHealth = | ||||||
| var ph = ProtocolHealth.init(kind) | ||||||
| if health == HealthStatus.READY: | ||||||
| return ph.ready() | ||||||
| else: | ||||||
| return ph.notReady("mock") | ||||||
|
|
||||||
| suite "Health Monitor - health state calculation": | ||||||
| test "Disconnected, zero peers": | ||||||
| let protocols = | ||||||
| @[ | ||||||
| p(RelayProtocol, HealthStatus.NOT_READY), | ||||||
| p(StoreClientProtocol, HealthStatus.NOT_READY), | ||||||
| p(FilterClientProtocol, HealthStatus.NOT_READY), | ||||||
| p(LightpushClientProtocol, HealthStatus.NOT_READY), | ||||||
| ] | ||||||
| let strength = initTable[WakuProtocol, int]() | ||||||
| let state = | ||||||
| calculateConnectionState(protocols, strength, DefaultRelayFailoverThreshold) | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't be I'd assume state == Connected when DefaultRelayFailoverThreshold
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is mocking basically everything and is just testing the math behind the ConnectionStatus calculation. Here |
||||||
| check state == ConnectionStatus.Disconnected | ||||||
|
|
||||||
| test "PartiallyConnected, weak relay": | ||||||
| let weakCount = DefaultRelayFailoverThreshold - 1 | ||||||
| let protocols = | ||||||
| @[ | ||||||
| p(RelayProtocol, HealthStatus.READY), p(StoreClientProtocol, HealthStatus.READY) | ||||||
| ] | ||||||
| var strength = initTable[WakuProtocol, int]() | ||||||
| strength[RelayProtocol] = weakCount | ||||||
| strength[StoreClientProtocol] = 1 | ||||||
| let state = | ||||||
| calculateConnectionState(protocols, strength, DefaultRelayFailoverThreshold) | ||||||
| check state == ConnectionStatus.PartiallyConnected | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe a comment explaining why "PartiallyConnected" would help. |
||||||
|
|
||||||
| test "Connected, robust relay": | ||||||
| let protocols = | ||||||
| @[ | ||||||
| p(RelayProtocol, HealthStatus.READY), p(StoreClientProtocol, HealthStatus.READY) | ||||||
| ] | ||||||
| var strength = initTable[WakuProtocol, int]() | ||||||
| strength[RelayProtocol] = DefaultRelayFailoverThreshold | ||||||
| strength[StoreClientProtocol] = FailoverThreshold | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe :)?
Suggested change
|
||||||
| let state = | ||||||
| calculateConnectionState(protocols, strength, DefaultRelayFailoverThreshold) | ||||||
| check state == ConnectionStatus.Connected | ||||||
|
|
||||||
| test "Connected, robust edge": | ||||||
| let protocols = | ||||||
| @[ | ||||||
| p(RelayProtocol, HealthStatus.NOT_MOUNTED), | ||||||
| p(LightpushClientProtocol, HealthStatus.READY), | ||||||
| p(FilterClientProtocol, HealthStatus.READY), | ||||||
| p(StoreClientProtocol, HealthStatus.READY), | ||||||
| ] | ||||||
| var strength = initTable[WakuProtocol, int]() | ||||||
| strength[LightpushClientProtocol] = FailoverThreshold | ||||||
| strength[FilterClientProtocol] = FailoverThreshold | ||||||
| strength[StoreClientProtocol] = FailoverThreshold | ||||||
| let state = | ||||||
| calculateConnectionState(protocols, strength, DefaultRelayFailoverThreshold) | ||||||
| check state == ConnectionStatus.Connected | ||||||
|
|
||||||
| test "Disconnected, edge missing store": | ||||||
| let protocols = | ||||||
| @[ | ||||||
| p(LightpushClientProtocol, HealthStatus.READY), | ||||||
| p(FilterClientProtocol, HealthStatus.READY), | ||||||
| p(StoreClientProtocol, HealthStatus.NOT_READY), | ||||||
| ] | ||||||
| var strength = initTable[WakuProtocol, int]() | ||||||
| strength[LightpushClientProtocol] = FailoverThreshold | ||||||
| strength[FilterClientProtocol] = FailoverThreshold | ||||||
| strength[StoreClientProtocol] = 0 | ||||||
| let state = | ||||||
| calculateConnectionState(protocols, strength, DefaultRelayFailoverThreshold) | ||||||
| check state == ConnectionStatus.Disconnected | ||||||
|
|
||||||
| test "PartiallyConnected, edge meets minimum failover requirement": | ||||||
| let weakCount = max(1, FailoverThreshold - 1) | ||||||
| let protocols = | ||||||
| @[ | ||||||
| p(LightpushClientProtocol, HealthStatus.READY), | ||||||
| p(FilterClientProtocol, HealthStatus.READY), | ||||||
| p(StoreClientProtocol, HealthStatus.READY), | ||||||
| ] | ||||||
| var strength = initTable[WakuProtocol, int]() | ||||||
| strength[LightpushClientProtocol] = weakCount | ||||||
| strength[FilterClientProtocol] = weakCount | ||||||
| strength[StoreClientProtocol] = weakCount | ||||||
| let state = | ||||||
| calculateConnectionState(protocols, strength, DefaultRelayFailoverThreshold) | ||||||
| check state == ConnectionStatus.PartiallyConnected | ||||||
|
|
||||||
| test "Connected, robust relay ignores store server": | ||||||
| let protocols = | ||||||
| @[p(RelayProtocol, HealthStatus.READY), p(StoreProtocol, HealthStatus.READY)] | ||||||
| var strength = initTable[WakuProtocol, int]() | ||||||
| strength[RelayProtocol] = DefaultRelayFailoverThreshold | ||||||
| strength[StoreProtocol] = 0 | ||||||
| let state = | ||||||
| calculateConnectionState(protocols, strength, DefaultRelayFailoverThreshold) | ||||||
| check state == ConnectionStatus.Connected | ||||||
|
|
||||||
| test "Connected, robust relay ignores store client": | ||||||
| let protocols = | ||||||
| @[ | ||||||
| p(RelayProtocol, HealthStatus.READY), | ||||||
| p(StoreProtocol, HealthStatus.READY), | ||||||
| p(StoreClientProtocol, HealthStatus.NOT_READY), | ||||||
| ] | ||||||
| var strength = initTable[WakuProtocol, int]() | ||||||
| strength[RelayProtocol] = DefaultRelayFailoverThreshold | ||||||
| strength[StoreProtocol] = 0 | ||||||
| strength[StoreClientProtocol] = 0 | ||||||
| let state = | ||||||
| calculateConnectionState(protocols, strength, DefaultRelayFailoverThreshold) | ||||||
| check state == ConnectionStatus.Connected | ||||||
|
|
||||||
| suite "Health Monitor - events": | ||||||
| asyncTest "Core (relay) health update": | ||||||
| let | ||||||
| nodeAKey = generateSecp256k1Key() | ||||||
| nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0)) | ||||||
|
|
||||||
| (await nodeA.mountRelay()).expect("Node A failed to mount Relay") | ||||||
|
|
||||||
| nodeA.mountStoreClient() | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not needed. That happens in |
||||||
|
|
||||||
| await nodeA.start() | ||||||
|
|
||||||
| let monitorA = NodeHealthMonitor.new() | ||||||
| monitorA.setNodeToHealthMonitor(nodeA) | ||||||
|
|
||||||
| var | ||||||
| lastStatus = ConnectionStatus.Disconnected | ||||||
| callbackCount = 0 | ||||||
| healthChangeSignal = newFuture[void]() | ||||||
|
|
||||||
| monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} = | ||||||
| lastStatus = status | ||||||
| callbackCount.inc() | ||||||
| if not healthChangeSignal.finished: | ||||||
| healthChangeSignal.complete() | ||||||
|
|
||||||
| monitorA.startHealthMonitor().expect("Health monitor failed to start") | ||||||
|
|
||||||
| let | ||||||
| nodeBKey = generateSecp256k1Key() | ||||||
| nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0)) | ||||||
|
|
||||||
| let driver = newSqliteArchiveDriver() | ||||||
| nodeB.mountArchive(driver).expect("Node B failed to mount archive") | ||||||
|
|
||||||
| (await nodeB.mountRelay()).expect("Node B failed to mount relay") | ||||||
| await nodeB.mountStore() | ||||||
|
|
||||||
| await nodeB.start() | ||||||
|
|
||||||
| await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()]) | ||||||
|
|
||||||
| proc dummyHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async.} = | ||||||
| discard | ||||||
|
|
||||||
| nodeA.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect( | ||||||
| "Node A failed to subscribe" | ||||||
| ) | ||||||
| nodeB.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect( | ||||||
| "Node B failed to subscribe" | ||||||
| ) | ||||||
|
Comment on lines
+177
to
+182
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These might not be needed as
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taking these out breaks the test unfortunately. The newTestWakuNode helper just passes this to the "enrBuilder"; it doesn't seem to execute relay subscriptions with it, like |
||||||
|
|
||||||
| let connectTimeLimit = Moment.now() + 10.seconds | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We cannot wait so long :)
Suggested change
|
||||||
| var gotConnected = false | ||||||
|
|
||||||
| while Moment.now() < connectTimeLimit: | ||||||
| if lastStatus != ConnectionStatus.Disconnected: | ||||||
| gotConnected = true | ||||||
| break | ||||||
|
|
||||||
| if healthChangeSignal.finished: | ||||||
| healthChangeSignal = newFuture[void]() | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe not needed as it happens a few lines below
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed from void Future to AsyncEvent to make it cleaner and clearer |
||||||
|
|
||||||
| discard await healthChangeSignal.withTimeout(connectTimeLimit - Moment.now()) | ||||||
|
|
||||||
| check: | ||||||
| gotConnected == true | ||||||
| callbackCount >= 1 | ||||||
|
|
||||||
| if healthChangeSignal.finished: | ||||||
| healthChangeSignal = newFuture[void]() | ||||||
|
|
||||||
| await nodeB.stop() | ||||||
| await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo()) | ||||||
|
|
||||||
| let disconnectTimeLimit = Moment.now() + 10.seconds | ||||||
| var gotDisconnected = false | ||||||
|
|
||||||
| while Moment.now() < disconnectTimeLimit: | ||||||
| if lastStatus == ConnectionStatus.Disconnected: | ||||||
| gotDisconnected = true | ||||||
| break | ||||||
|
|
||||||
| if healthChangeSignal.finished: | ||||||
| healthChangeSignal = newFuture[void]() | ||||||
|
|
||||||
| discard await healthChangeSignal.withTimeout(disconnectTimeLimit - Moment.now()) | ||||||
|
|
||||||
| check: | ||||||
| gotDisconnected == true | ||||||
|
|
||||||
| await monitorA.stopHealthMonitor() | ||||||
| await nodeA.stop() | ||||||
|
|
||||||
| asyncTest "Edge (light client) health update": | ||||||
| let | ||||||
| nodeAKey = generateSecp256k1Key() | ||||||
| nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0)) | ||||||
|
|
||||||
| nodeA.mountLightpushClient() | ||||||
| await nodeA.mountFilterClient() | ||||||
| nodeA.mountStoreClient() | ||||||
|
|
||||||
| await nodeA.start() | ||||||
|
|
||||||
| let monitorA = NodeHealthMonitor.new() | ||||||
| monitorA.setNodeToHealthMonitor(nodeA) | ||||||
|
|
||||||
| var | ||||||
| lastStatus = ConnectionStatus.Disconnected | ||||||
| callbackCount = 0 | ||||||
| healthChangeSignal = newFuture[void]() | ||||||
|
|
||||||
| monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} = | ||||||
| lastStatus = status | ||||||
| callbackCount.inc() | ||||||
| if not healthChangeSignal.finished: | ||||||
| healthChangeSignal.complete() | ||||||
|
|
||||||
| monitorA.startHealthMonitor().expect("Health monitor failed to start") | ||||||
|
|
||||||
| let | ||||||
| nodeBKey = generateSecp256k1Key() | ||||||
| nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0)) | ||||||
|
|
||||||
| let driver = newSqliteArchiveDriver() | ||||||
| nodeB.mountArchive(driver).expect("Node B failed to mount archive") | ||||||
|
|
||||||
| (await nodeB.mountRelay()).expect("Node B failed to mount relay") | ||||||
|
|
||||||
| (await nodeB.mountLightpush()).expect("Node B failed to mount lightpush") | ||||||
| await nodeB.mountFilter() | ||||||
| await nodeB.mountStore() | ||||||
|
|
||||||
| await nodeB.start() | ||||||
|
|
||||||
| await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()]) | ||||||
|
|
||||||
| let connectTimeLimit = Moment.now() + 10.seconds | ||||||
| var gotConnected = false | ||||||
|
|
||||||
| while Moment.now() < connectTimeLimit: | ||||||
| if lastStatus == ConnectionStatus.PartiallyConnected: | ||||||
| gotConnected = true | ||||||
| break | ||||||
|
|
||||||
| if healthChangeSignal.finished: | ||||||
| healthChangeSignal = newFuture[void]() | ||||||
|
|
||||||
| discard await healthChangeSignal.withTimeout(connectTimeLimit - Moment.now()) | ||||||
|
|
||||||
| check: | ||||||
| gotConnected == true | ||||||
| callbackCount >= 1 | ||||||
| lastStatus == ConnectionStatus.PartiallyConnected | ||||||
|
|
||||||
| if healthChangeSignal.finished: | ||||||
| healthChangeSignal = newFuture[void]() | ||||||
|
|
||||||
| await nodeB.stop() | ||||||
| await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo()) | ||||||
|
|
||||||
| let disconnectTimeLimit = Moment.now() + 10.seconds | ||||||
| var gotDisconnected = false | ||||||
|
|
||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think is interesting to confirm the opposite state happens before entering the loop.
Suggested change
|
||||||
| while Moment.now() < disconnectTimeLimit: | ||||||
| if lastStatus == ConnectionStatus.Disconnected: | ||||||
| gotDisconnected = true | ||||||
| break | ||||||
|
|
||||||
| if healthChangeSignal.finished: | ||||||
| healthChangeSignal = newFuture[void]() | ||||||
|
|
||||||
| discard await healthChangeSignal.withTimeout(disconnectTimeLimit - Moment.now()) | ||||||
|
|
||||||
| check: | ||||||
| gotDisconnected == true | ||||||
| lastStatus == ConnectionStatus.Disconnected | ||||||
|
|
||||||
| await monitorA.stopHealthMonitor() | ||||||
| await nodeA.stop() | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better use a longer name so it's easier to lookup.