feat: implement Waku API Health spec (WIP)#3689
Conversation
|
You can find the image built from this PR at Built from cb934cf |
8188437 to
6c7f7f1
Compare
|
Probably change the name from "health status" to "connection status" and update the spec |
* Fix protocol strength metric to consider connected peers only * Remove polling loop; event-driven node connection health updates * Add edge-mode node connection health event (callback) test
weboko
left a comment
There was a problem hiding this comment.
did first iteration, looks good but we need to update the spec as mentioned above
Added events and requests for support. Reworked delivery_monitor into a featured devlivery_service, that - supports relay publish and lightpush depending on configuration but with fallback options - if available and configured it utilizes store api to confirm message delivery - emits message delivery events accordingly Notice: There are parts still in WIP and needs review and follow ups. prepare for use in api_example
… but return with result properly
… where appropriate, removed leftover
…auto-subscribe for send api
* Add getSyncProtocolHealthInfo and getSyncNodeHealthReport * Rename RequestNodeHealth -> RequestConnectionStatus * Connect RequestConnectionStatus to new connection status logic * Add ConnectionStatusChangeEvent
* Add RequestHealthReport * Wire up RequestProtocolHealth * Refactor sync/async protocol health queries in the health monitor
* Add EventRelayTopicHealthChange * Propagate brokerCtx from WakuNode to WakuRelay * Remove 10s WakuRelay topic health polling loop; now event-driven
* Add WakuPeerEventKind higher-level peer event * Add Edge support for topics health requests and events * Rename "RelayTopic" -> "Topic"
* Add RequestContentTopicsHealth sync request * Add EventContentTopicHealthChange (just API; not yet emitted) * Rename RequestTopicsHealth -> RequestShardTopicsHealth * Rename health_request.nim -> health_requests.nim
Ivansete-status
left a comment
There was a problem hiding this comment.
Thanks for the PR!
I'm just adding some nitpicks that I hope you find useful.
I'll revisit the PR again once it gets ready :)
|
|
||
| import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils | ||
|
|
||
| proc p(kind: WakuProtocol, health: HealthStatus): ProtocolHealth = |
There was a problem hiding this comment.
Better use a longer name so it's easier to lookup.
| proc p(kind: WakuProtocol, health: HealthStatus): ProtocolHealth = | |
| proc pLongerName(kind: WakuProtocol, health: HealthStatus): ProtocolHealth = |
| strength: Table[WakuProtocol, int], | ||
| relayFailoverThreshold: int, |
There was a problem hiding this comment.
Kindly add comments in this proc so that is clear what information strength and relayFailoverThreshold contains/represent.
| const | ||
| DefaultRelayFailoverThreshold* = 4 | ||
| FailoverThreshold* = 2 |
There was a problem hiding this comment.
We need comments for these
| ] | ||
| let strength = initTable[WakuProtocol, int]() | ||
| let state = | ||
| calculateConnectionState(protocols, strength, DefaultRelayFailoverThreshold) |
There was a problem hiding this comment.
Shouldn't be DefaultRelayFailoverThreshold - 1 in this case ?
I'd assume state == Connected when DefaultRelayFailoverThreshold
There was a problem hiding this comment.
This test is mocking basically everything and is just testing the math behind the ConnectionStatus calculation. Here DefaultRelayFailoverThreshold is just a stand-in for the DLow config parameter for GossipSub. Since GossipSub doesn't exist here, we are just saying it is the default value for when an actual DLow config doesn't exist for some reason. The logic in calculateConnectionState, which is otherwise just a static calculator, depends on what DLow is, which is externally-provided from the node and the network.
| proc getRelayFailoverThreshold(hm: NodeHealthMonitor): int = | ||
| if isNil(hm.node.wakuRelay): | ||
| # Could return an Optional[int] instead, but for simplicity just use a default. | ||
| # This also helps in writing mocks for the health monitor tests. | ||
| return DefaultRelayFailoverThreshold |
There was a problem hiding this comment.
I can't quite see that failover is a correct term in this case. Maybe better without :)
| proc getRelayFailoverThreshold(hm: NodeHealthMonitor): int = | |
| if isNil(hm.node.wakuRelay): | |
| # Could return an Optional[int] instead, but for simplicity just use a default. | |
| # This also helps in writing mocks for the health monitor tests. | |
| return DefaultRelayFailoverThreshold | |
| proc getRelayThreshold(hm: NodeHealthMonitor): int = | |
| if isNil(hm.node.wakuRelay): | |
| # Could return an Optional[int] instead, but for simplicity just use a default. | |
| # This also helps in writing mocks for the health monitor tests. | |
| return DefaultRelayThreshold |
On the other hand, I think we don't need DefaultRelayFailoverThreshold at all. We need to only use the values defined in gossipsub/relay.
If node.isNil() then the getRelayThreshold should return an error.
Another option is to perform such node.isNil() validation in NodeHealthMonitor ctor. We might need to refactor the proc new*(T: type Waku ... to allow that but I think the node_health_monitor.nim will get simpler then.
There was a problem hiding this comment.
Renamed DefaultRelayFailoverThreshold as DLow and made it a proper Option[int] that is none in Edge.
| proc new*( | ||
| T: type PeerManager, | ||
| switch: Switch, | ||
| brokerCtx: BrokerContext = globalBrokerContext(), |
There was a problem hiding this comment.
Shall we add the new parameter brokerCtx at the end? Maybe that way we need less changes somewhere else.
There was a problem hiding this comment.
Sorry, this brokerCtx coloring / DI / extra ctor arg stuff is garbage. That's not how Zoltan built context-aware brokers to be used. I'm fixing this.
| ../common/rate_limit/setting, | ||
| ../common/callbacks, | ||
| ../common/nimchronos, | ||
| ../waku_mix |
There was a problem hiding this comment.
| ../common/rate_limit/setting, | |
| ../common/callbacks, | |
| ../common/nimchronos, | |
| ../waku_mix | |
| waku/common/rate_limit/setting, | |
| waku/common/callbacks, | |
| waku/common/nimchronos, | |
| waku/waku_mix |
| return ok() | ||
|
|
||
| proc startProvidersAndListeners(node: WakuNode) = | ||
| proc calculateEdgeTopicHealth(node: WakuNode, shard: PubsubTopic): TopicHealth = |
There was a problem hiding this comment.
Shouldn't these calculateEdgeTopicHealth, loopEdgeHealth, and startProvidersAndListeners belong to HealthMonitor instead?
There was a problem hiding this comment.
This is in WakuNode because we don't have a "WakuEdge" virtual protocol or component to reflect all the stuff we centralize already in WakuRelay (which acts as a "WakuCore" component). In any case we can't move anything that depends heavily on data that is in the WakuNode (again, it could be in a "WakuEdge" to offload WakuNode a bit, just like WakuRelay offloads "WakuCore" logic from WakuNode), otherwise it will just add a level of "." to the entire business logic of request fulfillment and event firing (it needs to know about peer management, etc.). We can refactor all this in the future.
Also, startProvidersAndListeners is going to be very common everywhere when we're done refactoring everything to use brokering. All components will have something to say and to listen.
| import results | ||
| import chronicles, json_serialization, json_serialization/std/options | ||
| import ../../../waku_node, ../serdes | ||
| import ../../../api/types |
There was a problem hiding this comment.
| import ../../../api/types | |
| import waku/api/types |
| EventEmitter* = object | ||
| # Placeholder for future event emitter implementation | ||
| observers*: seq[proc (data: EventData): void] | ||
|
|
There was a problem hiding this comment.
This file wasn't parsed by nph for sure :)
find . ! -path "./vendor/*" -name "*.nim" -exec nph {} \;There was a problem hiding this comment.
Fixed it. I'll do the mass-nph in a separate PR after we get the new nph release that is upcoming (or maybe we should do it twice, right now with the current nph and later we redo it).
* Add basic smoke tests to some Health APIs * Remove health check gating from checkApiAvailability * Fix Send API tests broken by checkApiAvailability change
* Force NodeHealthMonitor.new() to set up a WakuNode * Remove all checks for isNil(node) in NodeHealthMonitor * Fix tests to use the new NodeHealthMonitor.new()
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
0d8130f to
0cb2c33
Compare
NagyZoltanPeter
left a comment
There was a problem hiding this comment.
Great work!!!!
Thank you! Left some questions and nitpicks.
| @@ -0,0 +1,13 @@ | |||
| import chronos, results, std/strutils, ../../api/types | |||
|
|
|||
| export ConnectionStatus | |||
There was a problem hiding this comment.
I agree to have a more structured type folder.
And yes everything need to be known by API user shall be under waku/api/types/...
We might think of also waku/api/events/... too.
This would deliver the intent behind those definitions.
There was a problem hiding this comment.
Is it used anywhere? Leftover?
Isn't EventBroker has this functionality?
There was a problem hiding this comment.
I deleted that EventEmitter (dead code).
(I edited this comment later, let me put this back in so the reply makes sense:) Let's follow up with some meeting time and maintenance PRs to move the types to better places. I have some questions, like, I'm not sure if any Request* will be pat of the API? Maybe all of them get wrapped in procs? Or even EventBroker, do we "hide" them in the LMD API behind some callback registration mechanic that's more familiar? The reverse is also possible: we could make the Nim API broker only, which would be very cool as I think you mentioned at some point and I think that's very possible as well.
There was a problem hiding this comment.
Oh, yeah, current Event/Request -Brokers are only internal to a single execution unit/thread. So they cannott cross thread boundaries. As such we need to put interface at FFI level to translate Events to FFI callback events as of current form of FFI. So with requests, they can be issued upon incoming FFI call site.
| proc topicsHealthLoop(w: WakuRelay) {.async.} = | ||
| while true: | ||
| await w.topicHealthUpdateEvent.wait() | ||
| w.topicHealthUpdateEvent.clear() |
There was a problem hiding this comment.
It smells me we might need this loop at all, if we replace topicHealthUpdateEvent with EventBroker and listen to it to make action and emit the output event + call the observers.
(at some point it would be nice to replace those app observers with EventBroker as a common communication channel - not this PR definetely).
There was a problem hiding this comment.
The goal of this AsyncEvent + loop pattern is to make the implementation of the event handler just an AsyncEvent.fire(), so the asyncSpawned task inside the broker does almost nothing and so the task can be collected faster.
We can discuss this pattern in an upcoming meeting and if we don't like it, we can kill this pretty fast with a maintenance PR.
What it has going for itself is that it gives us safety and certainty on how the event system will behave e.g. we get a barrage of events from libp2p and we don't just ripple this out unnecessarily to the app or spend too much in the handlers doing complicated and expensive things as a reaction to every single redundant state transition.
I was even thinking this loop infra could be hidden inside a macro/template that "requestifies and eventifies" middleware components and helps to wire them to each other, so it just becomes the way we use the broker library. Which would help us make the asyncSpawn inside the broker impl optional; it could work like e.g. Boost::Asio, where you can post a task to an IOContext or execute inline "in this task" and it will be fine -- in this case it would be fine because the inline would be just "AsyncEvent.fire()".
There was a problem hiding this comment.
I see it as kind of equal. EventBroker listeners are fired with asyncSpawn by design, so the emitter is not effected by longer running event processors.
There are not much magic behind the wait/fire asyncEvent, due that is also built on future semaphore in the same chronos worker loop.
We can keep this one, it's really just a flavor. For me the EventBroker solution sounds more simple as it avoids while true: loop and other dependencies.
| proc onRelayMsg( | ||
| hm: NodeHealthMonitor, peer: PubSubPeer, msg: var RPCMsg | ||
| ) {.gcsafe, raises: [].} = | ||
| if msg.subscriptions.len == 0: | ||
| if msg.control.isNone(): | ||
| return | ||
| let ctrl = msg.control.get() | ||
| if ctrl.graft.len == 0 and ctrl.prune.len == 0: | ||
| return | ||
|
|
||
| # recomputing node health when peer relay events of interest trigger | ||
| hm.healthUpdateEvent.fire() |
There was a problem hiding this comment.
I would need a bit more detailed comment here explaining why it is actually needed and what is the condition it looks for.
This observer will be triggered a lot IMO.
There was a problem hiding this comment.
Added comments.
Yes we have to assume this will be called "infinity times," so that's why it does almost nothing. It just checks if there's any mesh or subscription change whatsoever, and instead of trying to look into these and make complicated guesses on whether this "changes anything," it just schedules a recompute of health/state which we can do at our leisure and throttle at will as well.
There was a problem hiding this comment.
Yeah, IDK how frequent such a update event will be fired. It would be good to have the topicsHealthLoop processing fall over if within a certain amount of time, I don't think we need to recalculate health status more frequent than a second. WDYT?
There was a problem hiding this comment.
You're right! This is missing a sleep for sure, especially because most health calculations are sync. However 1s is too brutal IMO; we can initially gate this at 50ms I think, which in CPU terms is an eternity but still OK for human user feedback. We can increase this later if needed.
EDIT: Wait, no, this loop has a 100ms sleep already. So I think that's fine?
EDIT 2: The other two health loops are missing this; I'll add this sleep to them as well.
There was a problem hiding this comment.
Just wonder if this can be simplified if using MultiRequestBroker with the intent each protocol should answer to the request if mounted (those NotMounted state can be derived from the list of expected protocol answers vs. actual answers). Such way we could delegate the health logic to protocols decoupling health monitor from the knowledge of them.
I would not block this aswome PR with such rework, just an idea.
There was a problem hiding this comment.
Yes, I saw the API hint you put there to use MultiRequestBroker. I tried doing it that way but hit a few roadblocks, like sync vs. async health metrics. We need first to rework how the protocols provide their health -- each one of the 15 protocols would be changed to provide in their own modules their own health metric instead of centralizing everything in the NodeHealthMonitor. Then the NodeHealthMonitor is deleted or becomes a shim. This is a pure refactor, so I'd rather not roll that into a PR that introduces large behavioral changes.
Ivansete-status
left a comment
There was a problem hiding this comment.
LGTM! Thanks for it! 💯
I'm just adding some minor nitpicks/questions
Great work
| ) | ||
|
|
||
| proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = | ||
| if msgs.control.isSome: |
There was a problem hiding this comment.
Sorry, super nitpick. Parenthesis with verbs, and no-parenthesis with nouns.
| if msgs.control.isSome: | |
| if msgs.control.isSome(): |
|
|
||
| proc waitForConnectionStatus( | ||
| brokerCtx: BrokerContext, expected: ConnectionStatus | ||
| ): Future[void] {.async.} = |
There was a problem hiding this comment.
| ): Future[void] {.async.} = | |
| ) {.async.} = |
| let healthStatus = RequestNodeHealth.request(w.brokerCtx) | ||
|
|
||
| if healthStatus.isErr(): | ||
| warn "Failed to get Waku node health status: ", error = healthStatus.error | ||
| # Let's suppose the node is hesalthy enough, go ahead | ||
| else: | ||
| if healthStatus.get().healthStatus == NodeHealth.Unhealthy: | ||
| return err("Waku node is not healthy, has got no connections.") |
There was a problem hiding this comment.
Why not needed if may I ask?
There was a problem hiding this comment.
It is not "needed" since as we discussed in the last meeting, it can work either way. You can have it be nice and smart and reject a send because it knows the node is super dead right now, or you can have it be dumb and retry to send for 30 or 60 seconds because it doesn't know the difference between a permanent config failure and a temporary connectivity health condition that can be overcome.
However, it is definitely wanted: if we can know the connection is fully shot, then we should tell the app by either rejecting the send, or at least warning it that the node connection appears to be dead (which the app can also inspect via a health API request and via health events, if it can piece together the puzzle itself).
The reason we can't have it now is entirely pragmatic: writing a test that isn't tripped by this currently is super hard. We need a new test library for LMD API level testing that will get this right. There's some things we need to sort out before we can get another crack at this, and in any case this is a back-end improvement -- the API is still the same.
| import waku/common/broker/event_broker | ||
| import libp2p/switch | ||
|
|
||
| type WakuPeerEventKind* = enum |
There was a problem hiding this comment.
| type WakuPeerEventKind* = enum | |
| type WakuPeerEventKind* {.pure.} = enum |
| ok(RequestProtocolHealth(healthStatus: protocolHealthStatus)) | ||
| except CatchableError: | ||
| err("Failed to get protocol health: " & getCurrentExceptionMsg()), |
There was a problem hiding this comment.
| ok(RequestProtocolHealth(healthStatus: protocolHealthStatus)) | |
| except CatchableError: | |
| err("Failed to get protocol health: " & getCurrentExceptionMsg()), | |
| return ok(RequestProtocolHealth(healthStatus: protocolHealthStatus)) | |
| except CatchableError: | |
| return err("Failed to get protocol health: " & getCurrentExceptionMsg()), |
|
The MacOS test failure seems completely unrelated, so I'm just ignoring it: |
Description
This PR:
[Connected, PartiallyConnected, Disconnected]overall node status based on the current state of the internal mounted protocols;Changes
NodeHealthChangeHandlercallback andJsonNodeHealthChangeEventcallback eventnodeStatefield (alongside the oldnodeHealthfield) to the current Rest API /health response (HealthReport), which reports aNodeHealthStatus([Connected, PartiallyConnected, Disconnected]).waku/common/waku_protocol.nimto enumerate all Vac protocols mountable by logos-messaging-nimstrength[]table toNodeHealthMonitorto internally measure connectivity strength of each mounted protocol (when applicable)calculateConnectionStateproc to centralizeNodeHealthStatuscomputing logic.healthLoopthat triggers node connectivity health updates on relevant peer-connectivity (general) and relay (specific) eventsRequestConnectionStatusnim LM API to the new node health monitor logicConnectionStatusChangeEventnim LM APIIssue
Addresses #3646
Notes
The Logos Messaging Health monitoring API spec is itself a work-in-progress, so merging this PR probably shouldn't close the issue yet.
To complete all health monitoring features, some prerequisites need to be implemented, such as: