From 90a0c78d68297ff2085a50cde1bc62a04214b7a6 Mon Sep 17 00:00:00 2001 From: Robert Ream <154010+robertream@users.noreply.github.com> Date: Tue, 7 Apr 2026 17:24:41 -0700 Subject: [PATCH 1/4] feat(invoker): add deployment performance metrics and label enrichment * Deployment-level observability to the invoker subsystem, combining issues #4553 and #4454 into a single change. Lets operators distinguish slow user services from internal Restate bottlenecks. New metrics: - restate.invoker.http_request_duration.seconds (TTFB) - restate.invoker.http_total_duration.seconds (total duration) - restate.invoker.queue_duration.seconds (internal wait time) - restate.invoker.active_invocations (concurrency gauge) - restate.invoker.http_status_code.total (non-200 responses) - restate.invoker.throttle_balance (token bucket debt) * Label enrichment on invocation_tasks, task_duration, eager_state_truncated, enqueue, and concurrency metrics with partition_id, service_name, and deployment_id where applicable. * ServiceMetrics struct with gauge/counter/histogram helpers.All service metric recording goes through the struct methods including TaskMetrics struct handles INVOCATION_TASKS lifecycle counters with status/transient labels. * ResponseStream::instrument(metric) builder for HTTP timing, enqueued_at timestamp on InvokeCommand for queue duration, and event-driven throttle balance recording at slot acquire/release. * LazyIntern for generic string interpolation including STATUS_CODE_LOOKUP for zero-allocation HTTP status code interning. --- crates/invoker-impl/src/input_command.rs | 7 + .../src/invocation_state_machine.rs | 7 + .../invoker-impl/src/invocation_task/mod.rs | 188 +++++++++++---- .../service_protocol_runner.rs | 18 +- .../service_protocol_runner_v4.rs | 6 +- crates/invoker-impl/src/lib.rs | 135 ++++++++--- crates/invoker-impl/src/metric_definitions.rs | 214 +++++++++++++++++- crates/invoker-impl/src/quota.rs | 10 +- 8 files changed, 501 insertions(+), 84 deletions(-) diff --git a/crates/invoker-impl/src/input_command.rs b/crates/invoker-impl/src/input_command.rs index a193cc9bd5..773beada84 100644 --- a/crates/invoker-impl/src/input_command.rs +++ b/crates/invoker-impl/src/input_command.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::ops::RangeInclusive; +use std::time::Instant; use tokio::sync::mpsc; @@ -30,6 +31,11 @@ pub(crate) struct InvokeCommand { // removed in v1.6 // pub(super) invocation_epoch: InvocationEpoch, pub(super) invocation_target: InvocationTarget, + /// Timestamp when this command was enqueued. Used to compute queue duration. + /// Skipped in serde — deserialized commands get a fresh Instant, so queue + /// duration may be slightly under-reported for spilled entries. Acceptable. + #[serde(skip, default = "Instant::now")] + pub(super) enqueued_at: Instant, } #[derive(derive_more::Debug)] @@ -118,6 +124,7 @@ impl restate_invoker_api::InvokerHandle for InvokerHandle { partition, invocation_id, invocation_target, + enqueued_at: Instant::now(), }))) .map_err(|_| NotRunningError) } diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index be12dd64b3..6c4f3e5a65 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -21,6 +21,7 @@ use restate_types::retries; use restate_types::schema::invocation_target::OnMaxAttempts; use restate_types::vqueue::VQueueId; +use crate::metric_definitions::ServiceMetrics; use crate::quota::ConcurrencySlot; use super::*; @@ -57,6 +58,9 @@ pub(super) struct InvocationStateMachine, + + /// Cached interned metric labels. `deployment_id` is empty until `PinnedDeployment`. + pub(super) metric: ServiceMetrics, } /// This struct tracks which commands the invocation task generates, @@ -205,6 +209,7 @@ impl InvocationStateMachine { qid: Option, permit: Permit, invocation_target: InvocationTarget, + metric: ServiceMetrics, retry_iter: retries::RetryIter<'static>, on_max_attempts: OnMaxAttempts, concurrency_slot: ConcurrencySlot, @@ -224,6 +229,7 @@ impl InvocationStateMachine { requested_pause: false, _concurrency_slot: concurrency_slot, budget: None, + metric, } } @@ -625,6 +631,7 @@ mod tests { None, Permit::new_empty(), InvocationTarget::mock_virtual_object(), + ServiceMetrics::EMPTY, RetryPolicy::fixed_delay(Duration::from_secs(1), Some(10)).into_iter(), OnMaxAttempts::Kill, ConcurrencySlot::empty(), diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index 4cdad7f7af..3992098787 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -28,7 +28,6 @@ use http::response::Parts as ResponseParts; use http::{HeaderName, HeaderValue, Response}; use http_body::{Body, Frame}; use http_body_util::StreamBody; -use metrics::{counter, histogram}; use restate_memory::{LocalMemoryLease, LocalMemoryPool}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -57,7 +56,10 @@ use restate_types::service_protocol::ServiceProtocolVersion; use crate::TokenBucket; use crate::error::{InvocationMemoryExhausted, InvokerError}; use crate::invocation_task::service_protocol_runner::ServiceProtocolRunner; -use crate::metric_definitions::{ID_LOOKUP, INVOKER_EAGER_STATE_TRUNCATED, INVOKER_TASK_DURATION}; +use crate::metric_definitions::{ + INVOKER_EAGER_STATE_TRUNCATED, INVOKER_HTTP_REQUEST_DURATION, INVOKER_HTTP_TOTAL_DURATION, + INVOKER_TASK_DURATION, ServiceMetrics, UUID_LOOKUP, +}; // Clippy false positive, might be caused by Bytes contained within HeaderValue. // https://github.com/rust-lang/rust/issues/40543#issuecomment-1212981256 @@ -99,6 +101,7 @@ const X_RESTATE_SERVER: HeaderName = HeaderName::from_static("x-restate-server") async fn collect_eager_state( state: Option>, size_limit: usize, + labels: ServiceMetrics, mut mapper: impl FnMut((Bytes, Bytes)) -> T, ) -> Result<(bool, Vec, Option), InvokerError> where @@ -128,7 +131,7 @@ where ByteCount::from(size_limit), entries.len() ); - counter!(INVOKER_EAGER_STATE_TRUNCATED).increment(1); + labels.counter(INVOKER_EAGER_STATE_TRUNCATED).increment(1); is_partial = true; break; } @@ -258,6 +261,13 @@ pub(super) struct InvocationTask { // throttling action_token_bucket: Option, + + /// Resolved deployment id for the current attempt. Set once the deployment is pinned. + /// Used for metric labels on task duration and eager state truncation. + pinned_deployment_id: Option, + + /// Cached interned metric labels for zero-allocation metric emissions. + metric: ServiceMetrics, } /// This is needed to split the run_internal in multiple loop functions and have shortcircuiting. @@ -333,6 +343,7 @@ where invoker_tx: mpsc::UnboundedSender, invoker_rx: mpsc::UnboundedReceiver, action_token_bucket: Option, + labels: ServiceMetrics, ) -> Self { Self { client, @@ -350,6 +361,8 @@ where message_size_warning, retry_count_since_last_stored_entry, action_token_bucket, + pinned_deployment_id: None, + metric: labels, } } @@ -409,7 +422,8 @@ where }; self.send_invoker_tx(inner); - histogram!(INVOKER_TASK_DURATION, "partition_id" => ID_LOOKUP.get(self.partition.0)) + self.metric + .histogram(INVOKER_TASK_DURATION) .record(start.elapsed()); } @@ -537,6 +551,9 @@ where None }; + // Store deployment id for metric labels (INVOKER_TASK_DURATION, INVOKER_EAGER_STATE_TRUNCATED) + self.pinned_deployment_id = Some(deployment.id); + self.metric.deployment_id = UUID_LOOKUP.get(&deployment.id); self.send_invoker_tx(InvocationTaskOutputInner::PinnedDeployment( PinnedDeployment::new(deployment.id, chosen_service_protocol_version), deployment_changed, @@ -610,14 +627,14 @@ fn invocation_id_to_header_value(invocation_id: &InvocationId) -> HeaderValue { .unwrap_or_else(|_| unreachable!("invocation id should be always valid")) } -enum ResponseChunk { +pub(super) enum ResponseChunk { Parts(ResponseParts), Data(Bytes), } pin_project_lite::pin_project! { #[project = ResponseStreamProj] - enum ResponseStream { + pub(super) enum ResponseStream { WaitingHeaders { join_handle: AbortOnDropHandle, ServiceClientError>>, }, @@ -643,6 +660,69 @@ impl ResponseStream { } } +/// Wraps a [`ResponseStream`] to record HTTP timing and status metrics. +/// +/// Records: +/// - TTFB (`INVOKER_HTTP_REQUEST_DURATION`) when the first response headers arrive +/// - Total duration (`INVOKER_HTTP_TOTAL_DURATION`) when the stream terminates +/// - Non-200 status codes (`INVOKER_HTTP_STATUS_CODE`) at header receipt +pub(super) struct InstrumentedResponseStream { + inner: ResponseStream, + started_at: Instant, + metric: ServiceMetrics, + ttfb_recorded: bool, +} + +impl ResponseStream { + pub(super) fn instrument(self, metric: ServiceMetrics) -> InstrumentedResponseStream { + InstrumentedResponseStream { + inner: self, + started_at: Instant::now(), + metric, + ttfb_recorded: false, + } + } +} + +impl Stream for InstrumentedResponseStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // SAFETY: InstrumentedResponseStream is Unpin (all fields are Unpin) + let this = self.get_mut(); + let result = this.inner.poll_next_unpin(cx); + + if let Poll::Ready(Some(Ok(ResponseChunk::Parts(parts)))) = &result { + // WaitingHeaders -> ReadingBody transition: record TTFB and status code + if !this.ttfb_recorded { + this.ttfb_recorded = true; + this.metric + .histogram(INVOKER_HTTP_REQUEST_DURATION) + .record(this.started_at.elapsed()); + + let status = parts.status; + if !status.is_success() { + this.metric + .http_status_counter(status.as_u16()) + .increment(1); + } + } + } + + result + } +} + +impl Drop for InstrumentedResponseStream { + fn drop(&mut self) { + // Note: on the v4 protocol path, this includes the response stream drain period + // (up to 5s for deployments that don't cleanly close the stream). + self.metric + .histogram(INVOKER_HTTP_TOTAL_DURATION) + .record(self.started_at.elapsed()); + } +} + impl Stream for ResponseStream { type Item = Result; @@ -698,6 +778,7 @@ mod tests { use std::sync::LazyLock; use super::collect_eager_state; + use crate::metric_definitions::ServiceMetrics; use restate_invoker_api::InvocationReaderError; use restate_invoker_api::invocation_reader::EagerState; use restate_memory::{LocalMemoryLease, LocalMemoryPool}; @@ -727,13 +808,14 @@ mod tests { #[tokio::test] async fn collect_eager_state_no_state_returns_partial() { - let (is_partial, entries, _memory_lease) = collect_eager_state::< - stream::Empty>, - _, - _, - >(None, 1024, std::convert::identity) - .await - .unwrap(); + let (is_partial, entries, _memory_lease) = + collect_eager_state::< + stream::Empty>, + _, + _, + >(None, 1024, ServiceMetrics::EMPTY, std::convert::identity) + .await + .unwrap(); assert!(is_partial, "no state should be reported as partial"); assert!(entries.is_empty()); @@ -744,10 +826,14 @@ mod tests { let items = vec![entry(10, 20), entry(5, 15)]; let state = EagerState::new_complete(stream::iter(items)); - let (is_partial, entries, _memory_lease) = - collect_eager_state(Some(state), 1024, std::convert::identity) - .await - .unwrap(); + let (is_partial, entries, _memory_lease) = collect_eager_state( + Some(state), + 1024, + ServiceMetrics::EMPTY, + std::convert::identity, + ) + .await + .unwrap(); assert!(!is_partial, "all entries fit within limit"); assert_eq!(entries.len(), 2); @@ -759,10 +845,14 @@ mod tests { let items = vec![entry(10, 10)]; let state = EagerState::new_partial(stream::iter(items)); - let (is_partial, entries, _memory_lease) = - collect_eager_state(Some(state), 1024, std::convert::identity) - .await - .unwrap(); + let (is_partial, entries, _memory_lease) = collect_eager_state( + Some(state), + 1024, + ServiceMetrics::EMPTY, + std::convert::identity, + ) + .await + .unwrap(); assert!(is_partial, "partial flag should be preserved from source"); assert_eq!(entries.len(), 1); @@ -774,10 +864,14 @@ mod tests { let items = vec![entry(25, 25), entry(25, 25), entry(25, 25)]; let state = EagerState::new_complete(stream::iter(items)); - let (is_partial, entries, _memory_lease) = - collect_eager_state(Some(state), 120, std::convert::identity) - .await - .unwrap(); + let (is_partial, entries, _memory_lease) = collect_eager_state( + Some(state), + 120, + ServiceMetrics::EMPTY, + std::convert::identity, + ) + .await + .unwrap(); assert!(is_partial, "should be partial after truncation"); assert_eq!(entries.len(), 2, "only 2 entries should fit (100 bytes)"); @@ -789,10 +883,14 @@ mod tests { let items = vec![entry(100, 101)]; let state = EagerState::new_complete(stream::iter(items)); - let (is_partial, entries, _memory_lease) = - collect_eager_state(Some(state), 200, std::convert::identity) - .await - .unwrap(); + let (is_partial, entries, _memory_lease) = collect_eager_state( + Some(state), + 200, + ServiceMetrics::EMPTY, + std::convert::identity, + ) + .await + .unwrap(); assert!(is_partial, "first entry exceeded limit so partial state"); assert!( @@ -806,7 +904,13 @@ mod tests { let items: Vec = vec![Err(TestError)]; let state = EagerState::new_complete(stream::iter(items)); - let result = collect_eager_state(Some(state), 1024, std::convert::identity).await; + let result = collect_eager_state( + Some(state), + 1024, + ServiceMetrics::EMPTY, + std::convert::identity, + ) + .await; assert!(result.is_err(), "stream error should be propagated"); } @@ -816,10 +920,14 @@ mod tests { let items = vec![entry(25, 25), entry(25, 25)]; let state = EagerState::new_complete(stream::iter(items)); - let (is_partial, entries, _memory_lease) = - collect_eager_state(Some(state), 100, std::convert::identity) - .await - .unwrap(); + let (is_partial, entries, _memory_lease) = collect_eager_state( + Some(state), + 100, + ServiceMetrics::EMPTY, + std::convert::identity, + ) + .await + .unwrap(); assert!(!is_partial, "entries exactly at limit should fit"); assert_eq!(entries.len(), 2); @@ -831,10 +939,14 @@ mod tests { let items = vec![entry(25, 25), entry(25, 25)]; let state = EagerState::new_complete(stream::iter(items)); - let (is_partial, entries, _memory_lease) = - collect_eager_state(Some(state), 99, std::convert::identity) - .await - .unwrap(); + let (is_partial, entries, _memory_lease) = collect_eager_state( + Some(state), + 99, + ServiceMetrics::EMPTY, + std::convert::identity, + ) + .await + .unwrap(); assert!(is_partial, "should be partial when 1 byte over"); assert_eq!(entries.len(), 1, "only first entry should fit"); diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs index ea6f79efa1..e0b5475df4 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs @@ -47,9 +47,9 @@ use restate_types::service_protocol::ServiceProtocolVersion; use crate::Notification; use crate::error::{InvocationErrorRelatedEntry, InvokerError, SdkInvocationError}; use crate::invocation_task::{ - InvocationTask, InvocationTaskOutputInner, InvokerBodySender, InvokerBodyType, ResponseChunk, - ResponseStream, TerminalLoopState, X_RESTATE_SERVER, collect_eager_state, - invocation_id_to_header_value, leased_frame, new_invoker_body, + InstrumentedResponseStream, InvocationTask, InvocationTaskOutputInner, InvokerBodySender, + InvokerBodyType, ResponseChunk, ResponseStream, TerminalLoopState, X_RESTATE_SERVER, + collect_eager_state, invocation_id_to_header_value, leased_frame, new_invoker_body, service_protocol_version_to_header_value, }; @@ -165,8 +165,9 @@ where &service_invocation_span_context, ); - // Initialize the response stream state - let mut http_stream_rx = ResponseStream::initialize(&self.invocation_task.client, request); + // Initialize the response stream state, wrapped to record HTTP timing metrics + let mut http_stream_rx = ResponseStream::initialize(&self.invocation_task.client, request) + .instrument(self.invocation_task.metric); // === Replay phase (transaction alive) === { @@ -342,7 +343,7 @@ where async fn replay_loop( &mut self, http_stream_tx: &mut InvokerBodySender, - http_stream_rx: &mut ResponseStream, + http_stream_rx: &mut InstrumentedResponseStream, journal_stream: JournalStream, ) -> TerminalLoopState<()> where @@ -419,7 +420,7 @@ where &mut self, parent_span_context: &ServiceInvocationSpanContext, mut http_stream_tx: InvokerBodySender, - http_stream_rx: &mut ResponseStream, + http_stream_rx: &mut InstrumentedResponseStream, mut invocation_reader: IR, outbound_budget: &mut LocalMemoryPool, ) -> TerminalLoopState<()> @@ -485,7 +486,7 @@ where async fn response_stream_loop( &mut self, parent_span_context: &ServiceInvocationSpanContext, - http_stream_rx: &mut ResponseStream, + http_stream_rx: &mut InstrumentedResponseStream, ) -> TerminalLoopState<()> { loop { tokio::select! { @@ -524,6 +525,7 @@ where let (partial_state, state_map, state_lease) = collect_eager_state( state, self.invocation_task.eager_state_size_limit, + self.invocation_task.metric, |(key, value)| StateEntry { key, value }, ) .await?; diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index 1287b8b0d2..f486ea5166 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -173,8 +173,9 @@ where &journal_metadata.span_context, ); - // Initialize the response stream state - let http_stream_rx = ResponseStream::initialize(&self.invocation_task.client, request); + // Initialize the response stream, wrapped to record HTTP timing metrics + let http_stream_rx = ResponseStream::initialize(&self.invocation_task.client, request) + .instrument(self.invocation_task.metric); let mut decoder_stream = std::pin::pin!( DecoderStream::new( @@ -651,6 +652,7 @@ where let (partial_state, state_map, state_lease) = collect_eager_state( state, self.invocation_task.eager_state_size_limit, + self.invocation_task.metric, |(key, value)| StateEntry { key, value }, ) .await?; diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 1127b41775..beab760fe1 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -74,8 +74,9 @@ use crate::invocation_state_machine::OnTaskError; use crate::invocation_task::InvocationTask; use crate::invocation_task::{InvocationTaskOutput, InvocationTaskOutputInner}; use crate::metric_definitions::{ - ID_LOOKUP, INVOKER_ENQUEUE, INVOKER_INVOCATION_TASKS, TASK_OP_COMPLETED, TASK_OP_FAILED, - TASK_OP_STARTED, TASK_OP_SUSPENDED, + ID_LOOKUP, INVOKER_ACTIVE_INVOCATIONS, INVOKER_ENQUEUE, INVOKER_QUEUE_DURATION, + INVOKER_THROTTLE_BALANCE, STR_LOOKUP, ServiceMetrics, TASK_OP_COMPLETED, TASK_OP_FAILED, + TASK_OP_STARTED, TASK_OP_SUSPENDED, UUID_LOOKUP, }; use crate::status_store::InvocationStatusStore; @@ -110,6 +111,7 @@ trait InvocationTaskRunner { invoker_rx: mpsc::UnboundedReceiver, task_pool: &mut JoinSet<()>, budget: LocalMemoryPool, + metric: ServiceMetrics, ) -> AbortHandle; } @@ -138,6 +140,7 @@ where invoker_rx: mpsc::UnboundedReceiver, task_pool: &mut JoinSet<()>, budget: LocalMemoryPool, + metric: ServiceMetrics, ) -> AbortHandle { task_pool .build_task() @@ -159,6 +162,7 @@ where invoker_tx, invoker_rx, self.action_token_bucket.clone(), + metric, ) .run(storage_reader, budget), ) @@ -265,6 +269,7 @@ impl Service { /// Acquired at the top of `step()` when the queue is non-empty, and consumed /// when the segment queue arm fires. pending_memory_lease: Option, + + /// Clone of the invocation token bucket for reading throttle balance. + /// `None` if throttling is not configured. + invocation_token_bucket: Option, } impl ServiceInner @@ -464,11 +473,21 @@ where match input_message { // --- Spillable queue loading/offloading InputCommand::Invoke(invoke_command) => { - counter!(INVOKER_ENQUEUE, "partition_id" => invoke_command.partition.0.to_string()).increment(1); + ServiceMetrics::new( + ID_LOOKUP.get(invoke_command.partition.0), + STR_LOOKUP.get(invoke_command.invocation_target.service_name()), + ).counter(INVOKER_ENQUEUE).increment(1); segmented_input_queue.inner_pin_mut().enqueue(invoke_command).await; }, InputCommand::VQInvoke(command) => { - counter!(INVOKER_ENQUEUE, "status" => TASK_OP_COMPLETED, "partition_id" => ID_LOOKUP.get(command.partition.0)).increment(1); + // Note: VQInvoke path has a legacy "status" label that Invoke path lacks. + // Kept for backwards compatibility — clean up in a future PR. + counter!( + INVOKER_ENQUEUE, + "status" => TASK_OP_COMPLETED, + "partition_id" => ID_LOOKUP.get(command.partition.0), + "service_name" => STR_LOOKUP.get(command.invocation_target.service_name()), + ).increment(1); self.handle_vqueue_invoke(options, *command); }, // --- Other commands (they don't go through the segment queue) @@ -502,7 +521,7 @@ where Some(invoke_input_command) = segmented_input_queue.next(), if !segmented_input_queue.inner().is_empty() && self.quota.is_slot_available() && self.pending_memory_lease.is_some() => { let initial_memory_lease = self.pending_memory_lease.take().unwrap(); let budget = self.create_outbound_budget(options, initial_memory_lease); - self.handle_invoke(options, invoke_input_command.partition, invoke_input_command.invocation_id, invoke_input_command.invocation_target, budget); + self.handle_invoke(options, invoke_input_command.partition, invoke_input_command.invocation_id, invoke_input_command.invocation_target, invoke_input_command.enqueued_at, budget); }, memory_lease = self.memory_pool.reserve(initial_invocation_memory), if !segmented_input_queue.inner().is_empty() && self.pending_memory_lease.is_none() => { self.pending_memory_lease = Some(memory_lease); @@ -647,6 +666,11 @@ where .partition_storage_reader(command.partition) .expect("partition is registered"); let concurrency_slot = self.quota.acquire_slot(); + let labels = ServiceMetrics::new( + ID_LOOKUP.get(command.partition.0), + STR_LOOKUP.get(command.invocation_target.service_name()), + ); + self.record_throttle_balance(&labels); // VQueue path: the vqueue scheduler supplies a pre-acquired MemoryLease // used as the initial memory for the outbound budget. @@ -660,6 +684,7 @@ where Some(command.qid), command.permit, command.invocation_target, + labels, retry_iter, on_max_attempts, concurrency_slot, @@ -691,8 +716,19 @@ where partition: PartitionLeaderEpoch, invocation_id: InvocationId, invocation_target: InvocationTarget, + enqueued_at: Instant, budget: LocalMemoryPool, ) { + let labels = ServiceMetrics::new( + ID_LOOKUP.get(partition.0), + STR_LOOKUP.get(invocation_target.service_name()), + ); + + // Record time spent waiting in the segment queue + labels + .histogram(INVOKER_QUEUE_DURATION) + .record(enqueued_at.elapsed()); + if self .invocation_state_machine_manager .has_partition(partition) @@ -709,6 +745,7 @@ where .partition_storage_reader(partition) .expect("partition is registered"); let concurrency_slot = self.quota.acquire_slot(); + self.record_throttle_balance(&labels); self.start_invocation_task( options, partition, @@ -718,6 +755,7 @@ where None, Permit::new_empty(), invocation_target, + labels, retry_iter, on_max_attempts, concurrency_slot, @@ -813,6 +851,10 @@ where self.schemas.live_load(), ); + // Set deployment_id now that it's known, and track active invocations + ism.metric.deployment_id = UUID_LOOKUP.get(&pinned_deployment.deployment_id); + ism.metric.gauge(INVOKER_ACTIVE_INVOCATIONS).increment(1.0); + ism.notify_pinned_deployment(pinned_deployment, has_changed); }, ); @@ -1072,7 +1114,8 @@ where .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { - counter!(INVOKER_INVOCATION_TASKS, "status" => TASK_OP_COMPLETED, "partition_id" => ID_LOOKUP.get(partition.0)).increment(1); + decrement_active_invocations(&ism); + ism.metric.task(TASK_OP_COMPLETED).counter().increment(1); trace!( restate.invocation.target = %ism.invocation_target, "Invocation task closed correctly"); @@ -1108,7 +1151,8 @@ where .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { - counter!(INVOKER_INVOCATION_TASKS, "status" => TASK_OP_SUSPENDED, "partition_id" => ID_LOOKUP.get(partition.0)).increment(1); + decrement_active_invocations(&ism); + ism.metric.task(TASK_OP_SUSPENDED).counter().increment(1); self.status_store.on_end(&partition, &invocation_id); if ism.requested_pause { @@ -1167,8 +1211,8 @@ where .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { - counter!(INVOKER_INVOCATION_TASKS, "status" => TASK_OP_SUSPENDED, "partition_id" => ID_LOOKUP.get(partition.0)) - .increment(1); + decrement_active_invocations(&ism); + ism.metric.task(TASK_OP_SUSPENDED).counter().increment(1); self.status_store.on_end(&partition, &invocation_id); if ism.requested_pause { @@ -1287,6 +1331,7 @@ where "Invocation yielding due to global memory pool exhaustion while {}", oom.context, ); + decrement_active_invocations(&ism); ism.abort(); self.status_store.on_end(&partition, &invocation_id); let _ = sender @@ -1343,6 +1388,7 @@ where restate.invocation.target = %ism.invocation_target, "Aborting invocation" ); + decrement_active_invocations(&ism); ism.abort(); self.status_store.on_end(&partition, &invocation_id); } else { @@ -1405,6 +1451,7 @@ where if ism.notify_pause() { // If returns true, we need to pause now + decrement_active_invocations(&ism); let _ = sender .send(Box::new(Effect { invocation_id, @@ -1452,6 +1499,7 @@ where restate.invocation.target = %ism.invocation_target, "Aborting invocation" ); + decrement_active_invocations(&ism); ism.abort(); self.status_store.on_end(&partition, &fid); } @@ -1479,6 +1527,8 @@ where error: InvokerError, mut ism: InvocationStateMachine, ) { + let metric = ism.metric; + decrement_active_invocations(&ism); let attempt_deployment_id = ism.attempt_deployment_id(); // Call handle_task_error with a closure that registers the timer. @@ -1495,12 +1545,10 @@ where match result { OnTaskError::Retrying(next_retry_timer_duration) => { - counter!(INVOKER_INVOCATION_TASKS, - "status" => TASK_OP_FAILED, - "transient" => "true", - "partition_id" => ID_LOOKUP.get(partition.0) - ) - .increment(1); + metric + .task(TASK_OP_FAILED) + .failed_counter(true) + .increment(1); if let Some(error_stacktrace) = error.error_stacktrace() { // The error details is treated differently from the pretty printer, // makes sure it prints at the end of the log the spammy exception @@ -1586,12 +1634,10 @@ where ); } OnTaskError::Pause => { - counter!(INVOKER_INVOCATION_TASKS, - "status" => TASK_OP_FAILED, - "transient" => "false", - "partition_id" => ID_LOOKUP.get(partition.0) - ) - .increment(1); + metric + .task(TASK_OP_FAILED) + .failed_counter(false) + .increment(1); warn_it!( error, restate.invocation.id = %invocation_id, @@ -1647,12 +1693,10 @@ where .await; } OnTaskError::Kill => { - counter!(INVOKER_INVOCATION_TASKS, - "status" => TASK_OP_FAILED, - "transient" => "false", - "partition_id" => ID_LOOKUP.get(partition.0) - ) - .increment(1); + metric + .task(TASK_OP_FAILED) + .failed_counter(false) + .increment(1); warn_it!( error, restate.invocation.id = %invocation_id, @@ -1696,6 +1740,14 @@ where ) } + /// Record the invocation token bucket balance as a throttle gauge. + /// No-op if throttling is not configured. + fn record_throttle_balance(&self, labels: &ServiceMetrics) { + if let Some(bucket) = &self.invocation_token_bucket { + labels.gauge(INVOKER_THROTTLE_BALANCE).set(bucket.balance()); + } + } + fn start_invocation_task( &mut self, options: &InvokerOptions, @@ -1718,6 +1770,7 @@ where completions_rx, &mut self.invocation_tasks, budget, + ism.metric, ); // Transition the state machine, and store it @@ -1728,7 +1781,7 @@ where "Invocation task started state. Invocation state: {:?}", ism.invocation_state_debug() ); - counter!(INVOKER_INVOCATION_TASKS, "status" => TASK_OP_STARTED, "partition_id" => ID_LOOKUP.get(partition.0)).increment(1); + ism.metric.task(TASK_OP_STARTED).counter().increment(1); self.invocation_state_machine_manager .register_invocation(partition, invocation_id, ism); } @@ -1785,6 +1838,13 @@ where } } +/// Decrement the active invocations gauge if deployment has been pinned. +fn decrement_active_invocations(ism: &InvocationStateMachine) { + if !ism.metric.deployment_id.is_empty() { + ism.metric.gauge(INVOKER_ACTIVE_INVOCATIONS).decrement(1.0); + } +} + #[cfg(test)] mod tests { use super::*; @@ -1876,6 +1936,7 @@ mod tests { invocation_state_machine_manager: Default::default(), memory_pool: MemoryPool::unlimited(), pending_memory_lease: None, + invocation_token_bucket: None, }; (input_tx, status_tx, service_inner) } @@ -1958,6 +2019,7 @@ mod tests { invoker_rx: mpsc::UnboundedReceiver, task_pool: &mut JoinSet<()>, _budget: LocalMemoryPool, + _metric: ServiceMetrics, ) -> AbortHandle { task_pool .build_task() @@ -1991,6 +2053,7 @@ mod tests { _invoker_rx: mpsc::UnboundedReceiver, task_pool: &mut JoinSet<()>, _budget: LocalMemoryPool, + _metric: ServiceMetrics, ) -> AbortHandle { task_pool.spawn(pending()) } @@ -2012,6 +2075,7 @@ mod tests { _invoker_rx: mpsc::UnboundedReceiver, task_pool: &mut JoinSet<()>, _budget: LocalMemoryPool, + _metric: ServiceMetrics, ) -> AbortHandle { self.fetch_add(1, Ordering::SeqCst); task_pool.spawn(pending()) @@ -2192,6 +2256,7 @@ mod tests { partition: MOCK_PARTITION, invocation_id: invocation_id_1, invocation_target: InvocationTarget::mock_virtual_object(), + enqueued_at: Instant::now(), })) .await; segment_queue @@ -2201,6 +2266,7 @@ mod tests { partition: MOCK_PARTITION, invocation_id: invocation_id_2, invocation_target: InvocationTarget::mock_virtual_object(), + enqueued_at: Instant::now(), })) .await; @@ -2308,6 +2374,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -2392,6 +2459,7 @@ mod tests { )), Permit::new_empty(), invocation_target.clone(), + ServiceMetrics::EMPTY, RetryPolicy::fixed_delay(Duration::from_millis(100), None).into_iter(), OnMaxAttempts::Kill, ConcurrencySlot::empty(), @@ -2449,6 +2517,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -2536,6 +2605,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -2642,6 +2712,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -2696,6 +2767,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -2802,6 +2874,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -2885,6 +2958,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -2953,6 +3027,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -3014,6 +3089,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -3081,6 +3157,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -3154,6 +3231,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); @@ -3225,6 +3303,7 @@ mod tests { MOCK_PARTITION, invocation_id, InvocationTarget::mock_virtual_object(), + Instant::now(), budget, ); diff --git a/crates/invoker-impl/src/metric_definitions.rs b/crates/invoker-impl/src/metric_definitions.rs index 24339920dc..7dc5ed9c85 100644 --- a/crates/invoker-impl/src/metric_definitions.rs +++ b/crates/invoker-impl/src/metric_definitions.rs @@ -8,16 +8,24 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::fmt; +use std::hash::Hash; use std::sync::LazyLock; +use bytestring::ByteString; use dashmap::{DashMap, Entry}; -/// Optional to have but adds description/help message to the metrics emitted to -/// the metrics' sink. -use metrics::{Unit, describe_counter, describe_gauge, describe_histogram}; +use metrics::{ + Unit, counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, +}; + +use restate_types::identifiers::DeploymentId; /// Lazily initialized cache that maps "partition" ids to their string representation for metric labels, /// avoiding fresh string allocations whenever a partition id is used as a metric dimension. pub(crate) static ID_LOOKUP: LazyLock = LazyLock::new(IdLookup::new); +pub(crate) static STR_LOOKUP: LazyIntern = LazyIntern::new(); +pub(crate) static UUID_LOOKUP: LazyIntern = LazyIntern::new(); +static STATUS_CODE_LOOKUP: LazyIntern = LazyIntern::new(); pub(crate) struct IdLookup { index: Vec<&'static str>, @@ -63,6 +71,164 @@ impl IdLookup { } } +/// Lazily initialized intern cache that maps values to their `Display` string representation +/// as `&'static str`. Cache hit = hash comparison only, no formatting. Cache miss = format +/// once via `Display`, leak, store. Bounded by the number of distinct values seen at runtime. +pub(crate) struct LazyIntern { + cache: LazyLock>, +} + +impl LazyIntern { + pub const fn new() -> Self { + Self { + cache: LazyLock::new(DashMap::new), + } + } + + #[inline] + pub fn get(&self, key: &K) -> &'static str { + if let Some(entry) = self.cache.get(key) { + return entry.value(); + } + + let entry = self.cache.entry(key.clone()); + match entry { + Entry::Occupied(entry) => entry.get(), + Entry::Vacant(entry) => { + let s: &'static str = entry.key().to_string().leak(); + entry.insert(s).value() + } + } + } +} + +/// Cached interned label values for zero-allocation metric emissions. +/// Built incrementally: `partition_id` and `service_name` set at task start, +/// `deployment_id` set when `PinnedDeployment` is received (empty until then). +/// When `deployment_id` is empty, it is omitted from emitted labels to avoid +/// cardinality splits in Prometheus. +#[derive(Debug, Clone, Copy)] +pub(crate) struct ServiceMetrics { + pub partition_id: &'static str, + pub service_name: &'static str, + pub deployment_id: &'static str, +} + +impl ServiceMetrics { + pub fn new(partition_id: &'static str, service_name: &'static str) -> Self { + Self { + partition_id, + service_name, + deployment_id: "", + } + } + + pub fn gauge(&self, name: &'static str) -> metrics::Gauge { + if self.deployment_id.is_empty() { + gauge!(name, + "partition_id" => self.partition_id, + "service_name" => self.service_name, + ) + } else { + gauge!(name, + "partition_id" => self.partition_id, + "service_name" => self.service_name, + "deployment_id" => self.deployment_id, + ) + } + } + + pub fn counter(&self, name: &'static str) -> metrics::Counter { + if self.deployment_id.is_empty() { + counter!(name, + "partition_id" => self.partition_id, + "service_name" => self.service_name, + ) + } else { + counter!(name, + "partition_id" => self.partition_id, + "service_name" => self.service_name, + "deployment_id" => self.deployment_id, + ) + } + } + + pub fn histogram(&self, name: &'static str) -> metrics::Histogram { + if self.deployment_id.is_empty() { + histogram!(name, + "partition_id" => self.partition_id, + "service_name" => self.service_name, + ) + } else { + histogram!(name, + "partition_id" => self.partition_id, + "service_name" => self.service_name, + "deployment_id" => self.deployment_id, + ) + } + } + + pub fn http_status_counter(&self, status_code: u16) -> metrics::Counter { + let code = STATUS_CODE_LOOKUP.get(&status_code); + if self.deployment_id.is_empty() { + counter!(INVOKER_HTTP_STATUS_CODE, + "partition_id" => self.partition_id, + "service_name" => self.service_name, + "status_code" => code, + ) + } else { + counter!(INVOKER_HTTP_STATUS_CODE, + "partition_id" => self.partition_id, + "service_name" => self.service_name, + "deployment_id" => self.deployment_id, + "status_code" => code, + ) + } + } + + pub fn task(&self, status: &'static str) -> TaskMetrics { + TaskMetrics { + partition_id: self.partition_id, + service_name: self.service_name, + status, + } + } + + #[cfg(test)] + pub const EMPTY: Self = Self { + partition_id: "", + service_name: "", + deployment_id: "", + }; +} + +/// Task lifecycle metrics: partition + service + status (no deployment_id). +#[derive(Debug, Clone, Copy)] +pub(crate) struct TaskMetrics { + partition_id: &'static str, + service_name: &'static str, + status: &'static str, +} + +impl TaskMetrics { + pub fn counter(&self) -> metrics::Counter { + counter!(INVOKER_INVOCATION_TASKS, + "status" => self.status, + "partition_id" => self.partition_id, + "service_name" => self.service_name, + ) + } + + pub fn failed_counter(&self, transient: bool) -> metrics::Counter { + counter!(INVOKER_INVOCATION_TASKS, + "status" => self.status, + "transient" => if transient { "true" } else { "false" }, + "partition_id" => self.partition_id, + "service_name" => self.service_name, + ) + } +} + pub const INVOKER_ENQUEUE: &str = "restate.invoker.enqueue.total"; pub const INVOKER_INVOCATION_TASKS: &str = "restate.invoker.invocation_tasks.total"; pub const INVOKER_CONCURRENCY_SLOTS_ACQUIRED: &str = "restate.invoker.concurrency_slots.acquired"; @@ -70,6 +236,12 @@ pub const INVOKER_CONCURRENCY_SLOTS_RELEASED: &str = "restate.invoker.concurrenc pub const INVOKER_CONCURRENCY_LIMIT: &str = "restate.invoker.concurrency_limit"; pub const INVOKER_TASK_DURATION: &str = "restate.invoker.task_duration.seconds"; pub const INVOKER_EAGER_STATE_TRUNCATED: &str = "restate.invoker.eager_state_truncated.total"; +pub const INVOKER_THROTTLE_BALANCE: &str = "restate.invoker.throttle_balance"; +pub const INVOKER_QUEUE_DURATION: &str = "restate.invoker.queue_duration.seconds"; +pub const INVOKER_ACTIVE_INVOCATIONS: &str = "restate.invoker.active_invocations"; +pub const INVOKER_HTTP_REQUEST_DURATION: &str = "restate.invoker.http_request_duration.seconds"; +pub const INVOKER_HTTP_TOTAL_DURATION: &str = "restate.invoker.http_total_duration.seconds"; +pub const INVOKER_HTTP_STATUS_CODE: &str = "restate.invoker.http_status_code.total"; pub const TASK_OP_STARTED: &str = "started"; pub const TASK_OP_SUSPENDED: &str = "suspended"; @@ -118,4 +290,40 @@ pub(crate) fn describe_metrics() { Unit::Count, "Number of invocations where eager state was truncated due to size limit" ); + + describe_gauge!( + INVOKER_THROTTLE_BALANCE, + Unit::Count, + "Invocation token bucket balance, recorded on acquire/release. Negative = throttled." + ); + + describe_histogram!( + INVOKER_QUEUE_DURATION, + Unit::Seconds, + "Wall-clock time from invocation enqueue to task start" + ); + + describe_gauge!( + INVOKER_ACTIVE_INVOCATIONS, + Unit::Count, + "Current in-flight invocations per service/deployment" + ); + + describe_histogram!( + INVOKER_HTTP_REQUEST_DURATION, + Unit::Seconds, + "Time-to-first-byte (TTFB): HTTP request send to response headers received" + ); + + describe_histogram!( + INVOKER_HTTP_TOTAL_DURATION, + Unit::Seconds, + "Full HTTP request duration including response body streaming" + ); + + describe_counter!( + INVOKER_HTTP_STATUS_CODE, + Unit::Count, + "Count of non-200 HTTP status codes returned by deployments" + ); } diff --git a/crates/invoker-impl/src/quota.rs b/crates/invoker-impl/src/quota.rs index 7f7d4983ae..b8547f09f3 100644 --- a/crates/invoker-impl/src/quota.rs +++ b/crates/invoker-impl/src/quota.rs @@ -50,15 +50,15 @@ pub(super) struct InvokerConcurrencyQuota { impl InvokerConcurrencyQuota { pub(super) fn new(invoker_id: impl Into, quota: Option) -> Self { - let invoker_id = invoker_id.into(); + let partition_id_str = ID_LOOKUP.get(invoker_id.into()); let inner = match quota { Some(available_slots) => { - gauge!(INVOKER_CONCURRENCY_LIMIT, "invoker_id" => ID_LOOKUP.get(invoker_id)) + gauge!(INVOKER_CONCURRENCY_LIMIT, "invoker_id" => partition_id_str, "partition_id" => partition_id_str) .set(available_slots.get() as f64); - let acquired_counter = counter!(INVOKER_CONCURRENCY_SLOTS_ACQUIRED, "invoker_id" => ID_LOOKUP.get(invoker_id)); - let released_counter = counter!(INVOKER_CONCURRENCY_SLOTS_RELEASED, "invoker_id" => ID_LOOKUP.get(invoker_id)); + let acquired_counter = counter!(INVOKER_CONCURRENCY_SLOTS_ACQUIRED, "invoker_id" => partition_id_str, "partition_id" => partition_id_str); + let released_counter = counter!(INVOKER_CONCURRENCY_SLOTS_RELEASED, "invoker_id" => partition_id_str, "partition_id" => partition_id_str); InvokerConcurrencyQuotaInner::Limited { slots: Arc::new(LimitedSlots { @@ -69,7 +69,7 @@ impl InvokerConcurrencyQuota { } } None => { - gauge!(INVOKER_CONCURRENCY_LIMIT, "invoker_id" => ID_LOOKUP.get(invoker_id)) + gauge!(INVOKER_CONCURRENCY_LIMIT, "invoker_id" => partition_id_str, "partition_id" => partition_id_str) .set(f64::INFINITY); InvokerConcurrencyQuotaInner::Unlimited From 44b7ca2c1ffef078297fba971590ddd7626d3cbc Mon Sep 17 00:00:00 2001 From: Robert Ream <154010+robertream@users.noreply.github.com> Date: Wed, 15 Apr 2026 16:50:31 -0700 Subject: [PATCH 2/4] feat(network): add connection pool instrumentation metrics (#4559) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add comprehensive metrics to the networking layer's connection pool via two scoped structs: NetworkMetrics (swimlane-scoped, pre-cached per variant): - pool.connections.active gauge - connection.pending gauge - connection.acquisition.duration histogram (cache=hit/miss, result) - connection.handshake.duration histogram (result=success/error) - rpc.duration histogram (result=success/error) ConnectionMetrics (swimlane + peer_name + direction, stored on Connection): - connection.opened/closed counters (+ legacy connection_created/dropped) - permit_acquisition.duration histogram - connection.duration (lifetime) histogram Key design decisions: - NetworkMetrics instances pre-built in a static array indexed by Swimlane - ConnectionMetrics created via NetworkMetrics::connection(peer, direction) - All labels are &'static str via LazyIntern for peer IDs — zero allocation - Legacy metrics (connection_created/dropped) preserved with enriched labels - Pool gauge decremented in deregister() to avoid double-count on drain+drop - LazyIntern extracted to restate_core::metric_definitions for reuse Also renames invoker metric constants to follow OTel naming conventions. --- crates/core/src/metric_definitions.rs | 45 ++++ crates/core/src/network/connection.rs | 29 ++- crates/core/src/network/connection_manager.rs | 57 +++-- crates/core/src/network/metric_definitions.rs | 204 +++++++++++++++++- crates/core/src/network/networking.rs | 16 +- .../core/src/network/transport_connector.rs | 10 +- .../invoker-impl/src/invocation_task/mod.rs | 12 +- crates/invoker-impl/src/lib.rs | 16 +- crates/invoker-impl/src/metric_definitions.rs | 94 +++----- crates/invoker-impl/src/quota.rs | 6 +- 10 files changed, 380 insertions(+), 109 deletions(-) diff --git a/crates/core/src/metric_definitions.rs b/crates/core/src/metric_definitions.rs index c5e3f8e250..675f2a0cea 100644 --- a/crates/core/src/metric_definitions.rs +++ b/crates/core/src/metric_definitions.rs @@ -10,8 +10,53 @@ #![allow(unused)] +use std::fmt; +use std::hash::Hash; +use std::sync::LazyLock; + +use dashmap::{DashMap, Entry}; use metrics::{Unit, describe_counter}; +// --------------------------------------------------------------------------- +// Metric label interning utilities +// +// These caches convert runtime values into `&'static str` for use as metric +// labels, avoiding per-emission allocations. Values are leaked and never freed. +// +// **Only use for low-cardinality dimensions** (node IDs, partition IDs, +// service names, deployment IDs, HTTP status codes). High-cardinality keys +// (request IDs, trace IDs, user-supplied strings) will leak unbounded memory. +// --------------------------------------------------------------------------- + +/// Intern cache that maps values to their `Display` representation as leaked +/// `&'static str`. Cache hit = hash lookup, no formatting. Cache miss = format +/// once, leak, store. +pub struct LazyIntern { + cache: LazyLock>, +} + +impl LazyIntern { + pub const fn new() -> Self { + Self { + cache: LazyLock::new(DashMap::new), + } + } + + #[inline] + pub fn get(&self, key: &K) -> &'static str { + if let Some(entry) = self.cache.get(key) { + return entry.value(); + } + match self.cache.entry(key.clone()) { + Entry::Occupied(entry) => entry.get(), + Entry::Vacant(entry) => { + let s: &'static str = entry.key().to_string().leak(); + entry.insert(s).value() + } + } + } +} + // value of label `kind` in TC_SPAWN are defined in [`crate::TaskKind`]. pub const TC_SPAWN: &str = "restate.task_center.spawned.total"; pub const TC_FINISHED: &str = "restate.task_center.finished.total"; diff --git a/crates/core/src/network/connection.rs b/crates/core/src/network/connection.rs index 1a770f434f..020ca027a5 100644 --- a/crates/core/src/network/connection.rs +++ b/crates/core/src/network/connection.rs @@ -16,7 +16,6 @@ pub use throttle::ConnectThrottle; use std::sync::Arc; -use metrics::counter; use tokio::sync::mpsc; use tokio::time::Instant; use tracing::debug; @@ -31,7 +30,7 @@ use crate::Metadata; use crate::TaskId; use crate::TaskKind; use crate::network::PeerMetadataVersion; -use crate::network::metric_definitions::NETWORK_CONNECTION_CREATED; +use crate::network::metric_definitions::{ConnectionMetrics, NetworkMetrics}; use super::ConnectError; use super::ConnectionClosed; @@ -160,6 +159,7 @@ pub struct Connection { pub(crate) sender: EgressSender, pub(crate) swimlane: Swimlane, pub(crate) created: Instant, + pub(crate) metrics: ConnectionMetrics, } impl Connection { @@ -168,6 +168,7 @@ impl Connection { protocol_version: ProtocolVersion, swimlane: Swimlane, sender: EgressSender, + metrics: ConnectionMetrics, ) -> Self { Self { peer, @@ -175,6 +176,7 @@ impl Connection { sender, swimlane, created: Instant::now(), + metrics, } } @@ -216,6 +218,7 @@ impl Connection { conn_tracker: impl ConnectionTracking + Send + Sync + 'static, is_dedicated: bool, ) -> Result<(Self, TaskId), ConnectError> { + let start = Instant::now(); let result = Self::connect_inner( destination.clone(), swimlane, @@ -227,6 +230,10 @@ impl Connection { is_dedicated, ) .await; + let elapsed = start.elapsed(); + NetworkMetrics::new(swimlane) + .handshake_duration(result.is_ok()) + .record(elapsed); ConnectThrottle::note_connect_status(&destination, result.is_ok()); match result { @@ -345,7 +352,13 @@ impl Connection { .into()); } - let connection = Connection::new(peer_node_id, protocol_version, swimlane, tx); + let connection = Connection::new( + peer_node_id, + protocol_version, + swimlane, + tx, + NetworkMetrics::new(swimlane).connection(peer_node_id, "outgoing"), + ); // if peer cannot respect our hello intent of direction, we are okay with registering let is_bidi = matches!( @@ -361,7 +374,7 @@ impl Connection { let task_id = reactor.start(task_kind, conn_tracker, is_dedicated, incoming)?; - counter!(NETWORK_CONNECTION_CREATED, "direction" => "outgoing", "swimlane" => swimlane.as_str_name()).increment(1); + connection.metrics.record_opened(); Ok((connection, task_id)) } @@ -397,7 +410,11 @@ impl Connection { /// returns None. #[must_use] pub async fn reserve(&self) -> Option> { + let start = Instant::now(); let permit = self.sender.reserve().await.ok()?; + self.metrics + .permit_acquisition_duration() + .record(start.elapsed()); Some(SendPermit { permit, protocol_version: self.protocol_version, @@ -406,7 +423,11 @@ impl Connection { #[must_use] pub async fn reserve_owned(&self) -> Option { + let start = Instant::now(); let permit = self.sender.clone().reserve_owned().await.ok()?; + self.metrics + .permit_acquisition_duration() + .record(start.elapsed()); Some(OwnedSendPermit { permit, protocol_version: self.protocol_version, diff --git a/crates/core/src/network/connection_manager.rs b/crates/core/src/network/connection_manager.rs index 01a86be31f..7682567096 100644 --- a/crates/core/src/network/connection_manager.rs +++ b/crates/core/src/network/connection_manager.rs @@ -14,8 +14,8 @@ use std::sync::Arc; use ahash::HashMap; use futures::future::{BoxFuture, Shared}; use futures::{FutureExt, Stream}; -use metrics::counter; use parking_lot::Mutex; +use tokio::time::Instant; use tracing::{debug, info, trace}; use restate_types::config::Configuration; @@ -35,7 +35,7 @@ use super::{ use crate::network::PeerMetadataVersion; use crate::network::connection::ConnectThrottle; use crate::network::handshake::{negotiate_protocol_version, wait_for_hello}; -use crate::network::metric_definitions::{NETWORK_CONNECTION_CREATED, NETWORK_CONNECTION_DROPPED}; +use crate::network::metric_definitions::NetworkMetrics; use crate::{Metadata, TaskId, TaskKind, my_node_id}; #[derive(Copy, Clone, PartialOrd, PartialEq, Default)] @@ -102,6 +102,7 @@ impl ConnectionManagerInner { restate_types::net::CURRENT_PROTOCOL_VERSION, swimlane, tx, + NetworkMetrics::new(swimlane).connection(my_node_id(), "loopback"), ); let reactor = ConnectionReactor::new(connection.clone(), shared, None, self.router.clone()); @@ -155,6 +156,9 @@ impl ConnectionManagerInner { match self.connection_by_gen_id.entry((peer, swimlane)) { hash_map::Entry::Occupied(c) if c.get() == connection => { c.remove(); + NetworkMetrics::new(connection.swimlane) + .pool_connections_active() + .decrement(1.0); } _ => {} } @@ -366,6 +370,7 @@ impl ConnectionManager { selected_protocol_version, hello.swimlane(), sender, + NetworkMetrics::new(hello.swimlane()).connection(peer_node_id, "incoming"), ); // Register the connection. @@ -405,7 +410,7 @@ impl ConnectionManager { "Incoming connection accepted from node {}", peer_node_id ); - counter!(NETWORK_CONNECTION_CREATED, "direction" => "incoming", "swimlane" => hello.swimlane().as_str_name()).increment(1); + connection.metrics.record_opened(); // Our output stream, i.e. responses. Ok(egress) @@ -462,6 +467,7 @@ impl ConnectionManager { where C: TransportConnect, { + let start = Instant::now(); let my_node_id_opt = Metadata::with_current(|m| m.my_node_id_opt()); let node_id = node_id.into(); // find latest generation if this is not generational node id @@ -481,17 +487,26 @@ impl ConnectionManager { return Err(DiscoveryError::NodeIsGone(node_id.into()).into()); } + let metrics = NetworkMetrics::new(swimlane); + let router = { // -- Lock held let mut guard = self.inner.lock(); // find a connection by node_id if let Some(connection) = guard.get_connection(node_id, swimlane) { + metrics + .connection_acquisition_duration("hit", true) + .record(start.elapsed()); return Ok(connection); } if my_node_id_opt.is_some_and(|my_node| my_node == node_id) { - return guard.create_loopback_connection(swimlane, self.clone()); + let result = guard.create_loopback_connection(swimlane, self.clone()); + metrics + .connection_acquisition_duration("hit", result.is_ok()) + .record(start.elapsed()); + return result; } // fail if the node is seen as gone before @@ -501,13 +516,19 @@ impl ConnectionManager { // We have no connection. We attempt to create a new connection or latch onto an // existing attempt. - self.create_shared_connection( - Destination::Node(node_id), - swimlane, - router, - transport_connector, - ) - .await + let result = self + .create_shared_connection( + Destination::Node(node_id), + swimlane, + router, + transport_connector, + &metrics, + ) + .await; + metrics + .connection_acquisition_duration("miss", result.is_ok()) + .record(start.elapsed()); + result } async fn create_shared_connection( @@ -516,6 +537,7 @@ impl ConnectionManager { swimlane: Swimlane, router: Arc, transport_connector: &C, + metrics: &NetworkMetrics, ) -> Result where C: TransportConnect, @@ -568,6 +590,7 @@ impl ConnectionManager { // Put it in the map so other concurrent callers share the same future in_flight.insert((dest.clone(), swimlane), fut.clone()); + metrics.connections_pending().increment(1.0); fut } }; @@ -577,7 +600,9 @@ impl ConnectionManager { // 5) Remove the completed future so subsequent calls can attempt a fresh connect let mut in_flight = self.in_flight_connects.lock(); - in_flight.remove(&(dest, swimlane)); + if in_flight.remove(&(dest, swimlane)).is_some() { + metrics.connections_pending().decrement(1.0); + } Ok(maybe_connection?.0) } @@ -594,6 +619,9 @@ impl ConnectionTracking for ConnectionManager { fn connection_created(&self, conn: &Connection, is_dedicated: bool) { if !is_dedicated { self.inner.lock().register(conn.clone()); + NetworkMetrics::new(conn.swimlane) + .pool_connections_active() + .increment(1.0); } trace!( swimlane = %conn.swimlane, @@ -606,8 +634,11 @@ impl ConnectionTracking for ConnectionManager { "Connection terminated, connection lived for {:?}", conn.created.elapsed() ); + conn.metrics + .connection_lifetime() + .record(conn.created.elapsed()); self.inner.lock().deregister(conn); - counter!(NETWORK_CONNECTION_DROPPED).increment(1); + conn.metrics.record_closed(); } fn notify_peer_shutdown(&self, node_id: GenerationalNodeId) { diff --git a/crates/core/src/network/metric_definitions.rs b/crates/core/src/network/metric_definitions.rs index 977b2cfd67..7810e4370e 100644 --- a/crates/core/src/network/metric_definitions.rs +++ b/crates/core/src/network/metric_definitions.rs @@ -8,18 +8,39 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use metrics::{Unit, describe_counter, describe_histogram}; +use std::sync::LazyLock; +use metrics::{ + Unit, counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, +}; + +use super::Swimlane; +use crate::metric_definitions::LazyIntern; + +// -- Pre-existing metrics (names unchanged for dashboard compatibility) -- pub const NETWORK_CONNECTION_CREATED: &str = "restate.network.connection_created.total"; pub const NETWORK_CONNECTION_DROPPED: &str = "restate.network.connection_dropped.total"; pub const NETWORK_SERVICE_ACCEPTED_REQUEST_BYTES: &str = "restate.network.service.accepted_request_bytes.total"; pub const NETWORK_SERVICE_REJECTED_REQUEST_BYTES: &str = "restate.network.service.rejected_request_bytes.total"; - pub const NETWORK_MESSAGE_PROCESSING_DURATION: &str = "restate.network.message_processing_duration.seconds"; +// -- New metrics -- +const NETWORK_CONNECTION_OPENED: &str = "restate.network.connection.opened.total"; +const NETWORK_CONNECTION_CLOSED: &str = "restate.network.connection.closed.total"; +const NETWORK_POOL_CONNECTIONS_ACTIVE: &str = "restate.network.pool.connections.active"; +const NETWORK_CONNECTION_HANDSHAKE_DURATION: &str = + "restate.network.connection.handshake.duration.seconds"; +const NETWORK_PERMIT_ACQUISITION_DURATION: &str = + "restate.network.permit_acquisition.duration.seconds"; +const NETWORK_CONNECTIONS_PENDING: &str = "restate.network.connection.pending"; +const NETWORK_CONNECTION_LIFETIME: &str = "restate.network.connection.duration.seconds"; +const NETWORK_CONNECTION_ACQUISITION_DURATION: &str = + "restate.network.connection.acquisition.duration.seconds"; +const NETWORK_RPC_DURATION: &str = "restate.network.rpc.duration.seconds"; + pub fn describe_metrics() { describe_counter!( NETWORK_CONNECTION_CREATED, @@ -36,16 +57,191 @@ pub fn describe_metrics() { Unit::Bytes, "Number of bytes accepted by service name" ); - describe_counter!( NETWORK_SERVICE_REJECTED_REQUEST_BYTES, Unit::Bytes, "Number of bytes received and dropped/rejected by service name" ); - describe_histogram!( NETWORK_MESSAGE_PROCESSING_DURATION, Unit::Seconds, "Latency of deserializing and processing incoming messages" ); + describe_counter!( + NETWORK_CONNECTION_OPENED, + Unit::Count, + "Number of connections opened (with direction and peer labels)" + ); + describe_counter!( + NETWORK_CONNECTION_CLOSED, + Unit::Count, + "Number of connections closed (with direction and peer labels)" + ); + describe_gauge!( + NETWORK_POOL_CONNECTIONS_ACTIVE, + Unit::Count, + "Current number of active connections in the pool" + ); + describe_histogram!( + NETWORK_CONNECTION_HANDSHAKE_DURATION, + Unit::Seconds, + "Time to establish a new connection including transport and handshake" + ); + describe_histogram!( + NETWORK_PERMIT_ACQUISITION_DURATION, + Unit::Seconds, + "Time waiting for a send permit on a connection" + ); + describe_gauge!( + NETWORK_CONNECTIONS_PENDING, + Unit::Count, + "Current number of connections pending (in-flight handshakes)" + ); + describe_histogram!( + NETWORK_CONNECTION_LIFETIME, + Unit::Seconds, + "Duration a connection was alive before being closed" + ); + describe_histogram!( + NETWORK_CONNECTION_ACQUISITION_DURATION, + Unit::Seconds, + "Time to acquire or establish a connection from the pool" + ); + describe_histogram!( + NETWORK_RPC_DURATION, + Unit::Seconds, + "End-to-end RPC call duration including connection, permit, send, and reply" + ); +} + +static PEER_LOOKUP: LazyIntern = LazyIntern::new(); + +/// Pool-level metrics scoped to a swimlane. One instance per swimlane variant, +/// accessed via [`NetworkMetrics::from`]. Creates [`ConnectionMetrics`] via +/// [`Self::connection`] when a peer and direction are known. +#[derive(Clone, Copy, Debug)] +pub struct NetworkMetrics { + swimlane: &'static str, +} + +impl NetworkMetrics { + pub fn new(swimlane: Swimlane) -> Self { + Self { + swimlane: swimlane.as_str_name(), + } + } + + /// Narrow the scope to a specific connection. + pub fn connection( + &self, + peer: restate_types::GenerationalNodeId, + direction: &'static str, + ) -> ConnectionMetrics { + ConnectionMetrics { + swimlane: self.swimlane, + peer: PEER_LOOKUP.get(&peer), + direction, + } + } + + pub fn pool_connections_active(&self) -> metrics::Gauge { + gauge!(NETWORK_POOL_CONNECTIONS_ACTIVE, "swimlane" => self.swimlane) + } + + pub fn connections_pending(&self) -> metrics::Gauge { + gauge!(NETWORK_CONNECTIONS_PENDING, "swimlane" => self.swimlane) + } + + pub fn connection_acquisition_duration( + &self, + cache: &'static str, + success: bool, + ) -> metrics::Histogram { + let result = if success { "success" } else { "error" }; + histogram!(NETWORK_CONNECTION_ACQUISITION_DURATION, "swimlane" => self.swimlane, "cache" => cache, "result" => result) + } + + pub fn handshake_duration(&self, success: bool) -> metrics::Histogram { + let result = if success { "success" } else { "error" }; + histogram!(NETWORK_CONNECTION_HANDSHAKE_DURATION, "swimlane" => self.swimlane, "result" => result) + } + + pub fn rpc_duration(&self, success: bool) -> metrics::Histogram { + let result = if success { "success" } else { "error" }; + histogram!(NETWORK_RPC_DURATION, "swimlane" => self.swimlane, "result" => result) + } +} + +/// Per-connection metrics. Created from [`NetworkMetrics::connection`], inheriting +/// the swimlane + peer labels and adding direction. +/// +/// All fields are `&'static str` so the struct is `Copy` and metric emission +/// never allocates. +#[derive(Clone, Copy, Debug)] +pub struct ConnectionMetrics { + swimlane: &'static str, + peer: &'static str, + direction: &'static str, +} + +impl ConnectionMetrics { + /// Increments both the legacy `connection_created` counter and the new + /// `connection.opened` counter. + pub fn record_opened(&self) { + // Legacy metric (original labels + new peer_name) + counter!( + NETWORK_CONNECTION_CREATED, + "direction" => self.direction, + "swimlane" => self.swimlane, + "peer_name" => self.peer, + ) + .increment(1); + // New metric + counter!( + NETWORK_CONNECTION_OPENED, + "direction" => self.direction, + "swimlane" => self.swimlane, + "peer_name" => self.peer, + ) + .increment(1); + } + + /// Increments both the legacy `connection_dropped` counter and the new + /// `connection.closed` counter. + pub fn record_closed(&self) { + // Legacy metric (originally had no labels, now enriched) + counter!( + NETWORK_CONNECTION_DROPPED, + "direction" => self.direction, + "swimlane" => self.swimlane, + "peer_name" => self.peer, + ) + .increment(1); + // New metric + counter!( + NETWORK_CONNECTION_CLOSED, + "direction" => self.direction, + "swimlane" => self.swimlane, + "peer_name" => self.peer, + ) + .increment(1); + } + + pub fn permit_acquisition_duration(&self) -> metrics::Histogram { + histogram!( + NETWORK_PERMIT_ACQUISITION_DURATION, + "direction" => self.direction, + "swimlane" => self.swimlane, + "peer_name" => self.peer, + ) + } + + pub fn connection_lifetime(&self) -> metrics::Histogram { + histogram!( + NETWORK_CONNECTION_LIFETIME, + "direction" => self.direction, + "swimlane" => self.swimlane, + "peer_name" => self.peer, + ) + } } diff --git a/crates/core/src/network/networking.rs b/crates/core/src/network/networking.rs index da4e70012f..41a2b21225 100644 --- a/crates/core/src/network/networking.rs +++ b/crates/core/src/network/networking.rs @@ -15,6 +15,8 @@ use restate_types::{GenerationalNodeId, NodeId}; use tokio::time::Instant; +use crate::network::metric_definitions::NetworkMetrics; + use super::connection::OwnedSendPermit; use super::{ ConnectError, Connection, ConnectionClosed, ConnectionManager, LazyConnection, @@ -119,6 +121,7 @@ impl NetworkSender for Networking { M: RpcRequest, N: Into + Send, { + let net_metrics = NetworkMetrics::new(swimlane); let start = Instant::now(); let op = async { let connection = self @@ -136,11 +139,12 @@ impl NetworkSender for Networking { Ok(reply.await?) }; - match timeout { - Some(timeout) => tokio::time::timeout(timeout, op) - .await - .map_err(|_| RpcError::Timeout(start.elapsed()))?, - None => op.await, - } + let result = match timeout { + Some(timeout) => tokio::time::timeout(timeout, op).await, + None => Ok(op.await), + }; + let elapsed = start.elapsed(); + net_metrics.rpc_duration(result.is_ok()).record(elapsed); + result.map_err(|_| RpcError::Timeout(elapsed))? } } diff --git a/crates/core/src/network/transport_connector.rs b/crates/core/src/network/transport_connector.rs index 70467097cc..1d58e5a04c 100644 --- a/crates/core/src/network/transport_connector.rs +++ b/crates/core/src/network/transport_connector.rs @@ -190,8 +190,14 @@ pub mod test_util { .unbounded_send(EgressMessage::Message(welcome.into())) .unwrap(); - let connection = - Connection::new(my_node_id, selected_protocol_version, hello.swimlane(), tx); + let connection = Connection::new( + my_node_id, + selected_protocol_version, + hello.swimlane(), + tx, + crate::network::metric_definitions::NetworkMetrics::new(hello.swimlane()) + .connection(my_node_id, "incoming"), + ); let reactor = ConnectionReactor::new(connection.clone(), shared, Some(peer_metadata), router) .start( diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index 3992098787..0094e6e5b9 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -57,7 +57,7 @@ use crate::TokenBucket; use crate::error::{InvocationMemoryExhausted, InvokerError}; use crate::invocation_task::service_protocol_runner::ServiceProtocolRunner; use crate::metric_definitions::{ - INVOKER_EAGER_STATE_TRUNCATED, INVOKER_HTTP_REQUEST_DURATION, INVOKER_HTTP_TOTAL_DURATION, + INVOKER_HTTP_REQUEST_DURATION, INVOKER_HTTP_ROUNDTRIP_DURATION, INVOKER_STATE_TRUNCATIONS, INVOKER_TASK_DURATION, ServiceMetrics, UUID_LOOKUP, }; @@ -131,7 +131,7 @@ where ByteCount::from(size_limit), entries.len() ); - labels.counter(INVOKER_EAGER_STATE_TRUNCATED).increment(1); + labels.counter(INVOKER_STATE_TRUNCATIONS).increment(1); is_partial = true; break; } @@ -551,7 +551,7 @@ where None }; - // Store deployment id for metric labels (INVOKER_TASK_DURATION, INVOKER_EAGER_STATE_TRUNCATED) + // Store deployment id for metric labels (INVOKER_TASK_DURATION, INVOKER_STATE_TRUNCATIONS) self.pinned_deployment_id = Some(deployment.id); self.metric.deployment_id = UUID_LOOKUP.get(&deployment.id); self.send_invoker_tx(InvocationTaskOutputInner::PinnedDeployment( @@ -664,8 +664,8 @@ impl ResponseStream { /// /// Records: /// - TTFB (`INVOKER_HTTP_REQUEST_DURATION`) when the first response headers arrive -/// - Total duration (`INVOKER_HTTP_TOTAL_DURATION`) when the stream terminates -/// - Non-200 status codes (`INVOKER_HTTP_STATUS_CODE`) at header receipt +/// - Roundtrip duration (`INVOKER_HTTP_ROUNDTRIP_DURATION`) when the stream terminates +/// - Response status codes (`INVOKER_HTTP_RESPONSES`) at header receipt pub(super) struct InstrumentedResponseStream { inner: ResponseStream, started_at: Instant, @@ -718,7 +718,7 @@ impl Drop for InstrumentedResponseStream { // Note: on the v4 protocol path, this includes the response stream drain period // (up to 5s for deployments that don't cleanly close the stream). self.metric - .histogram(INVOKER_HTTP_TOTAL_DURATION) + .histogram(INVOKER_HTTP_ROUNDTRIP_DURATION) .record(self.started_at.elapsed()); } } diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index beab760fe1..c2a5584ae2 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -74,8 +74,8 @@ use crate::invocation_state_machine::OnTaskError; use crate::invocation_task::InvocationTask; use crate::invocation_task::{InvocationTaskOutput, InvocationTaskOutputInner}; use crate::metric_definitions::{ - ID_LOOKUP, INVOKER_ACTIVE_INVOCATIONS, INVOKER_ENQUEUE, INVOKER_QUEUE_DURATION, - INVOKER_THROTTLE_BALANCE, STR_LOOKUP, ServiceMetrics, TASK_OP_COMPLETED, TASK_OP_FAILED, + ID_LOOKUP, INVOKER_INVOCATIONS_ACTIVE, INVOKER_INVOCATIONS_QUEUED, INVOKER_QUEUE_DURATION, + INVOKER_THROTTLING_BALANCE, STR_LOOKUP, ServiceMetrics, TASK_OP_COMPLETED, TASK_OP_FAILED, TASK_OP_STARTED, TASK_OP_SUSPENDED, UUID_LOOKUP, }; use crate::status_store::InvocationStatusStore; @@ -476,14 +476,14 @@ where ServiceMetrics::new( ID_LOOKUP.get(invoke_command.partition.0), STR_LOOKUP.get(invoke_command.invocation_target.service_name()), - ).counter(INVOKER_ENQUEUE).increment(1); + ).counter(INVOKER_INVOCATIONS_QUEUED).increment(1); segmented_input_queue.inner_pin_mut().enqueue(invoke_command).await; }, InputCommand::VQInvoke(command) => { // Note: VQInvoke path has a legacy "status" label that Invoke path lacks. // Kept for backwards compatibility — clean up in a future PR. counter!( - INVOKER_ENQUEUE, + INVOKER_INVOCATIONS_QUEUED, "status" => TASK_OP_COMPLETED, "partition_id" => ID_LOOKUP.get(command.partition.0), "service_name" => STR_LOOKUP.get(command.invocation_target.service_name()), @@ -853,7 +853,7 @@ where // Set deployment_id now that it's known, and track active invocations ism.metric.deployment_id = UUID_LOOKUP.get(&pinned_deployment.deployment_id); - ism.metric.gauge(INVOKER_ACTIVE_INVOCATIONS).increment(1.0); + ism.metric.gauge(INVOKER_INVOCATIONS_ACTIVE).increment(1.0); ism.notify_pinned_deployment(pinned_deployment, has_changed); }, @@ -1744,7 +1744,9 @@ where /// No-op if throttling is not configured. fn record_throttle_balance(&self, labels: &ServiceMetrics) { if let Some(bucket) = &self.invocation_token_bucket { - labels.gauge(INVOKER_THROTTLE_BALANCE).set(bucket.balance()); + labels + .gauge(INVOKER_THROTTLING_BALANCE) + .set(bucket.balance()); } } @@ -1841,7 +1843,7 @@ where /// Decrement the active invocations gauge if deployment has been pinned. fn decrement_active_invocations(ism: &InvocationStateMachine) { if !ism.metric.deployment_id.is_empty() { - ism.metric.gauge(INVOKER_ACTIVE_INVOCATIONS).decrement(1.0); + ism.metric.gauge(INVOKER_INVOCATIONS_ACTIVE).decrement(1.0); } } diff --git a/crates/invoker-impl/src/metric_definitions.rs b/crates/invoker-impl/src/metric_definitions.rs index 7dc5ed9c85..38e0888035 100644 --- a/crates/invoker-impl/src/metric_definitions.rs +++ b/crates/invoker-impl/src/metric_definitions.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::fmt; -use std::hash::Hash; use std::sync::LazyLock; use bytestring::ByteString; @@ -18,6 +16,7 @@ use metrics::{ Unit, counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, }; +use restate_core::metric_definitions::LazyIntern; use restate_types::identifiers::DeploymentId; /// Lazily initialized cache that maps "partition" ids to their string representation for metric labels, @@ -71,37 +70,6 @@ impl IdLookup { } } -/// Lazily initialized intern cache that maps values to their `Display` string representation -/// as `&'static str`. Cache hit = hash comparison only, no formatting. Cache miss = format -/// once via `Display`, leak, store. Bounded by the number of distinct values seen at runtime. -pub(crate) struct LazyIntern { - cache: LazyLock>, -} - -impl LazyIntern { - pub const fn new() -> Self { - Self { - cache: LazyLock::new(DashMap::new), - } - } - - #[inline] - pub fn get(&self, key: &K) -> &'static str { - if let Some(entry) = self.cache.get(key) { - return entry.value(); - } - - let entry = self.cache.entry(key.clone()); - match entry { - Entry::Occupied(entry) => entry.get(), - Entry::Vacant(entry) => { - let s: &'static str = entry.key().to_string().leak(); - entry.insert(s).value() - } - } - } -} - /// Cached interned label values for zero-allocation metric emissions. /// Built incrementally: `partition_id` and `service_name` set at task start, /// `deployment_id` set when `PinnedDeployment` is received (empty until then). @@ -171,13 +139,13 @@ impl ServiceMetrics { pub fn http_status_counter(&self, status_code: u16) -> metrics::Counter { let code = STATUS_CODE_LOOKUP.get(&status_code); if self.deployment_id.is_empty() { - counter!(INVOKER_HTTP_STATUS_CODE, + counter!(INVOKER_HTTP_RESPONSES, "partition_id" => self.partition_id, "service_name" => self.service_name, "status_code" => code, ) } else { - counter!(INVOKER_HTTP_STATUS_CODE, + counter!(INVOKER_HTTP_RESPONSES, "partition_id" => self.partition_id, "service_name" => self.service_name, "deployment_id" => self.deployment_id, @@ -212,7 +180,7 @@ pub(crate) struct TaskMetrics { impl TaskMetrics { pub fn counter(&self) -> metrics::Counter { - counter!(INVOKER_INVOCATION_TASKS, + counter!(INVOKER_TASKS, "status" => self.status, "partition_id" => self.partition_id, "service_name" => self.service_name, @@ -220,7 +188,7 @@ impl TaskMetrics { } pub fn failed_counter(&self, transient: bool) -> metrics::Counter { - counter!(INVOKER_INVOCATION_TASKS, + counter!(INVOKER_TASKS, "status" => self.status, "transient" => if transient { "true" } else { "false" }, "partition_id" => self.partition_id, @@ -229,19 +197,21 @@ impl TaskMetrics { } } -pub const INVOKER_ENQUEUE: &str = "restate.invoker.enqueue.total"; -pub const INVOKER_INVOCATION_TASKS: &str = "restate.invoker.invocation_tasks.total"; -pub const INVOKER_CONCURRENCY_SLOTS_ACQUIRED: &str = "restate.invoker.concurrency_slots.acquired"; -pub const INVOKER_CONCURRENCY_SLOTS_RELEASED: &str = "restate.invoker.concurrency_slots.released"; -pub const INVOKER_CONCURRENCY_LIMIT: &str = "restate.invoker.concurrency_limit"; -pub const INVOKER_TASK_DURATION: &str = "restate.invoker.task_duration.seconds"; -pub const INVOKER_EAGER_STATE_TRUNCATED: &str = "restate.invoker.eager_state_truncated.total"; -pub const INVOKER_THROTTLE_BALANCE: &str = "restate.invoker.throttle_balance"; -pub const INVOKER_QUEUE_DURATION: &str = "restate.invoker.queue_duration.seconds"; -pub const INVOKER_ACTIVE_INVOCATIONS: &str = "restate.invoker.active_invocations"; -pub const INVOKER_HTTP_REQUEST_DURATION: &str = "restate.invoker.http_request_duration.seconds"; -pub const INVOKER_HTTP_TOTAL_DURATION: &str = "restate.invoker.http_total_duration.seconds"; -pub const INVOKER_HTTP_STATUS_CODE: &str = "restate.invoker.http_status_code.total"; +pub const INVOKER_INVOCATIONS_QUEUED: &str = "restate.invoker.invocations.queued.total"; +pub const INVOKER_TASKS: &str = "restate.invoker.tasks.total"; +pub const INVOKER_CONCURRENCY_SLOTS_LIMIT: &str = "restate.invoker.concurrency.slots.limit"; +pub const INVOKER_CONCURRENCY_SLOTS_ACQUIRED: &str = + "restate.invoker.concurrency.slots.acquired.total"; +pub const INVOKER_CONCURRENCY_SLOTS_RELEASED: &str = + "restate.invoker.concurrency.slots.released.total"; +pub const INVOKER_TASK_DURATION: &str = "restate.invoker.task.duration.seconds"; +pub const INVOKER_STATE_TRUNCATIONS: &str = "restate.invoker.state.truncations.total"; +pub const INVOKER_THROTTLING_BALANCE: &str = "restate.invoker.throttling.balance"; +pub const INVOKER_QUEUE_DURATION: &str = "restate.invoker.queue.duration.seconds"; +pub const INVOKER_INVOCATIONS_ACTIVE: &str = "restate.invoker.invocations.active"; +pub const INVOKER_HTTP_REQUEST_DURATION: &str = "restate.invoker.http.request.duration.seconds"; +pub const INVOKER_HTTP_ROUNDTRIP_DURATION: &str = "restate.invoker.http.roundtrip.duration.seconds"; +pub const INVOKER_HTTP_RESPONSES: &str = "restate.invoker.http.responses.total"; pub const TASK_OP_STARTED: &str = "started"; pub const TASK_OP_SUSPENDED: &str = "suspended"; @@ -250,22 +220,18 @@ pub const TASK_OP_COMPLETED: &str = "completed"; pub(crate) fn describe_metrics() { describe_counter!( - INVOKER_ENQUEUE, + INVOKER_INVOCATIONS_QUEUED, Unit::Count, - "Number of invocations that were added to the queue" + "Number of invocations enqueued" ); describe_gauge!( - INVOKER_CONCURRENCY_LIMIT, + INVOKER_CONCURRENCY_SLOTS_LIMIT, Unit::Count, "Concurrency limit (slots) for invoker tasks" ); - describe_counter!( - INVOKER_INVOCATION_TASKS, - Unit::Count, - "Invocation task operation" - ); + describe_counter!(INVOKER_TASKS, Unit::Count, "Invocation task operation"); describe_counter!( INVOKER_CONCURRENCY_SLOTS_ACQUIRED, @@ -286,13 +252,13 @@ pub(crate) fn describe_metrics() { ); describe_counter!( - INVOKER_EAGER_STATE_TRUNCATED, + INVOKER_STATE_TRUNCATIONS, Unit::Count, "Number of invocations where eager state was truncated due to size limit" ); describe_gauge!( - INVOKER_THROTTLE_BALANCE, + INVOKER_THROTTLING_BALANCE, Unit::Count, "Invocation token bucket balance, recorded on acquire/release. Negative = throttled." ); @@ -304,7 +270,7 @@ pub(crate) fn describe_metrics() { ); describe_gauge!( - INVOKER_ACTIVE_INVOCATIONS, + INVOKER_INVOCATIONS_ACTIVE, Unit::Count, "Current in-flight invocations per service/deployment" ); @@ -316,14 +282,14 @@ pub(crate) fn describe_metrics() { ); describe_histogram!( - INVOKER_HTTP_TOTAL_DURATION, + INVOKER_HTTP_ROUNDTRIP_DURATION, Unit::Seconds, "Full HTTP request duration including response body streaming" ); describe_counter!( - INVOKER_HTTP_STATUS_CODE, + INVOKER_HTTP_RESPONSES, Unit::Count, - "Count of non-200 HTTP status codes returned by deployments" + "HTTP response status codes by deployment" ); } diff --git a/crates/invoker-impl/src/quota.rs b/crates/invoker-impl/src/quota.rs index b8547f09f3..3b175e4d84 100644 --- a/crates/invoker-impl/src/quota.rs +++ b/crates/invoker-impl/src/quota.rs @@ -21,7 +21,7 @@ use metrics::{Counter, counter, gauge}; use crate::{ InvokerId, metric_definitions::{ - ID_LOOKUP, INVOKER_CONCURRENCY_LIMIT, INVOKER_CONCURRENCY_SLOTS_ACQUIRED, + ID_LOOKUP, INVOKER_CONCURRENCY_SLOTS_ACQUIRED, INVOKER_CONCURRENCY_SLOTS_LIMIT, INVOKER_CONCURRENCY_SLOTS_RELEASED, }, }; @@ -54,7 +54,7 @@ impl InvokerConcurrencyQuota { let inner = match quota { Some(available_slots) => { - gauge!(INVOKER_CONCURRENCY_LIMIT, "invoker_id" => partition_id_str, "partition_id" => partition_id_str) + gauge!(INVOKER_CONCURRENCY_SLOTS_LIMIT, "invoker_id" => partition_id_str, "partition_id" => partition_id_str) .set(available_slots.get() as f64); let acquired_counter = counter!(INVOKER_CONCURRENCY_SLOTS_ACQUIRED, "invoker_id" => partition_id_str, "partition_id" => partition_id_str); @@ -69,7 +69,7 @@ impl InvokerConcurrencyQuota { } } None => { - gauge!(INVOKER_CONCURRENCY_LIMIT, "invoker_id" => partition_id_str, "partition_id" => partition_id_str) + gauge!(INVOKER_CONCURRENCY_SLOTS_LIMIT, "invoker_id" => partition_id_str, "partition_id" => partition_id_str) .set(f64::INFINITY); InvokerConcurrencyQuotaInner::Unlimited From ea0433a4e8a735c6edf4e1e0d1be46713cf9fefc Mon Sep 17 00:00:00 2001 From: Robert Ream <154010+robertream@users.noreply.github.com> Date: Mon, 20 Apr 2026 15:14:23 -0700 Subject: [PATCH 3/4] fix(core): add Default impl for LazyIntern and remove unused LazyLock import Clippy flags new_without_default on LazyIntern::new(). Add the trivial Default impl. Also remove an unused std::sync::LazyLock import in the network metric_definitions introduced by the connection pool instrumentation commit. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/core/src/metric_definitions.rs | 6 ++++++ crates/core/src/network/metric_definitions.rs | 2 -- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/core/src/metric_definitions.rs b/crates/core/src/metric_definitions.rs index 675f2a0cea..911d0b80f7 100644 --- a/crates/core/src/metric_definitions.rs +++ b/crates/core/src/metric_definitions.rs @@ -35,6 +35,12 @@ pub struct LazyIntern { cache: LazyLock>, } +impl Default for LazyIntern { + fn default() -> Self { + Self::new() + } +} + impl LazyIntern { pub const fn new() -> Self { Self { diff --git a/crates/core/src/network/metric_definitions.rs b/crates/core/src/network/metric_definitions.rs index 7810e4370e..ab8fb3d23f 100644 --- a/crates/core/src/network/metric_definitions.rs +++ b/crates/core/src/network/metric_definitions.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::LazyLock; - use metrics::{ Unit, counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram, }; From 33e7440918f539bc39611d48b985c322bc29e4d3 Mon Sep 17 00:00:00 2001 From: Robert Ream <154010+robertream@users.noreply.github.com> Date: Mon, 20 Apr 2026 15:14:40 -0700 Subject: [PATCH 4/4] feat(discovery): add deployment discovery handshake metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instrument ServiceDiscovery with counters and histograms for deployment endpoint reachability — currently only visible in logs. Three metrics: - restate.deployment.discovery.attempts.total{outcome} (success | retryable_exhausted | permanent_failure) - restate.deployment.discovery.retries.total{reason} (server_error | rate_limited | connection | body_error | other) - restate.deployment.discovery.duration.seconds{outcome} describe_metrics() called from ServiceDiscovery::new(), following the crate-internal pattern used across the codebase. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 1 + crates/service-protocol/Cargo.toml | 3 +- crates/service-protocol/src/discovery.rs | 41 ++++++++++++++++++- crates/service-protocol/src/lib.rs | 2 + .../src/metric_definitions.rs | 35 ++++++++++++++++ 5 files changed, 79 insertions(+), 3 deletions(-) create mode 100644 crates/service-protocol/src/metric_definitions.rs diff --git a/Cargo.lock b/Cargo.lock index 003266d465..6725627374 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7993,6 +7993,7 @@ dependencies = [ "http 1.4.0", "http-body-util", "itertools 0.14.0", + "metrics", "paste", "prost", "restate-errors", diff --git a/crates/service-protocol/Cargo.toml b/crates/service-protocol/Cargo.toml index 28f38040cc..7be11ba05a 100644 --- a/crates/service-protocol/Cargo.toml +++ b/crates/service-protocol/Cargo.toml @@ -10,7 +10,7 @@ publish = false [features] default = [] codec = ["dep:restate-types", "dep:paste"] -discovery = ["dep:serde_json", "dep:tracing", "dep:codederror", "dep:restate-errors", "dep:http", "dep:http-body-util", "dep:restate-service-client", "dep:restate-types", "dep:tokio"] +discovery = ["dep:serde_json", "dep:tracing", "dep:codederror", "dep:restate-errors", "dep:http", "dep:http-body-util", "dep:metrics", "dep:restate-service-client", "dep:restate-types", "dep:tokio"] message = ["dep:restate-types", "dep:bytes-utils", "dep:codederror", "dep:restate-errors", "dep:tracing"] test-util = ["restate-types/test-util"] @@ -27,6 +27,7 @@ bytes-utils = { workspace = true, optional = true } codederror = { workspace = true, optional = true } http = { workspace = true, optional = true } http-body-util = { workspace = true, optional = true } +metrics = { workspace = true, optional = true } itertools = { workspace = true } paste = { workspace = true, optional = true } prost = { workspace = true } diff --git a/crates/service-protocol/src/discovery.rs b/crates/service-protocol/src/discovery.rs index 1e29becec6..db64cf3010 100644 --- a/crates/service-protocol/src/discovery.rs +++ b/crates/service-protocol/src/discovery.rs @@ -12,6 +12,7 @@ use std::borrow::Cow; use std::fmt::Display; use std::ops::Deref; use std::sync::LazyLock; +use std::time::Instant; use bytes::Bytes; use codederror::CodedError; @@ -22,6 +23,7 @@ use http::{HeaderMap, HeaderName, HeaderValue, StatusCode, Version}; use http_body_util::BodyExt; use http_body_util::Empty; use itertools::Itertools; +use metrics::{counter, histogram}; use strum::IntoEnumIterator; use tracing::{debug, warn}; @@ -44,6 +46,10 @@ use restate_types::service_protocol::{ MIN_DISCOVERABLE_SERVICE_PROTOCOL_VERSION, ServiceProtocolVersion, }; +use crate::metric_definitions::{ + DEPLOYMENT_DISCOVERY_ATTEMPTS, DEPLOYMENT_DISCOVERY_DURATION, DEPLOYMENT_DISCOVERY_RETRIES, +}; + // TODO(slinkydeveloper) move this code somewhere else! #[allow(clippy::declare_interior_mutable_const)] @@ -181,6 +187,7 @@ pub struct ServiceDiscovery { impl ServiceDiscovery { pub fn new(retry_policy: RetryPolicy, client: ServiceClient) -> Self { + crate::metric_definitions::describe_metrics(); Self { retry_policy, client, @@ -237,13 +244,25 @@ impl DiscoveryClient for ServiceDiscovery { }; let retry_policy = self.retry_policy.iter(); - let (mut parts, body) = Self::invoke_discovery_endpoint( + let started = Instant::now(); + let result = Self::invoke_discovery_endpoint( &self.client, endpoint.clone(), build_request, retry_policy, ) - .await?; + .await; + + let elapsed = started.elapsed().as_secs_f64(); + let outcome = match &result { + Ok(_) => "success", + Err(e) if e.is_retryable() => "retryable_exhausted", + Err(_) => "permanent_failure", + }; + counter!(DEPLOYMENT_DISCOVERY_ATTEMPTS, "outcome" => outcome).increment(1); + histogram!(DEPLOYMENT_DISCOVERY_DURATION, "outcome" => outcome).record(elapsed); + + let (mut parts, body) = result?; // Retrieve chosen service discovery protocol version. // No need to retry these: if the validation fails, they're sdk bugs. @@ -488,6 +507,8 @@ impl ServiceDiscovery { if e.is_retryable() && let Some(next_retry_interval) = retry_iter.next() { + let reason = discovery_error_category(&e); + counter!(DEPLOYMENT_DISCOVERY_RETRIES, "reason" => reason).increment(1); warn!( "Error when discovering deployment at address '{}'. Retrying in {} seconds: {}", address, @@ -503,6 +524,22 @@ impl ServiceDiscovery { } } +fn discovery_error_category(e: &DiscoveryError) -> &'static str { + match e { + DiscoveryError::BadStatusCode(status, _, _) if status.is_server_error() => "server_error", + DiscoveryError::BadStatusCode(status, _, _) + if *status == StatusCode::REQUEST_TIMEOUT + || *status == StatusCode::TOO_MANY_REQUESTS => + { + "rate_limited" + } + DiscoveryError::BadStatusCode(_, _, _) => "bad_status", + DiscoveryError::Client(_) => "connection", + DiscoveryError::BodyError(_) => "body_error", + _ => "other", + } +} + #[cfg(test)] mod tests { use crate::discovery::endpoint_manifest::ProtocolMode; diff --git a/crates/service-protocol/src/lib.rs b/crates/service-protocol/src/lib.rs index 9d9cd3e77f..d910c83cef 100644 --- a/crates/service-protocol/src/lib.rs +++ b/crates/service-protocol/src/lib.rs @@ -19,3 +19,5 @@ pub mod codec; pub mod discovery; #[cfg(feature = "message")] pub mod message; +#[cfg(feature = "discovery")] +mod metric_definitions; diff --git a/crates/service-protocol/src/metric_definitions.rs b/crates/service-protocol/src/metric_definitions.rs new file mode 100644 index 0000000000..97677544e1 --- /dev/null +++ b/crates/service-protocol/src/metric_definitions.rs @@ -0,0 +1,35 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use metrics::{Unit, describe_counter, describe_histogram}; + +pub(crate) const DEPLOYMENT_DISCOVERY_ATTEMPTS: &str = + "restate.deployment.discovery.attempts.total"; +pub(crate) const DEPLOYMENT_DISCOVERY_RETRIES: &str = "restate.deployment.discovery.retries.total"; +pub(crate) const DEPLOYMENT_DISCOVERY_DURATION: &str = + "restate.deployment.discovery.duration.seconds"; + +pub(crate) fn describe_metrics() { + describe_counter!( + DEPLOYMENT_DISCOVERY_ATTEMPTS, + Unit::Count, + "Deployment discovery attempts by outcome" + ); + describe_counter!( + DEPLOYMENT_DISCOVERY_RETRIES, + Unit::Count, + "Deployment discovery retries by error category" + ); + describe_histogram!( + DEPLOYMENT_DISCOVERY_DURATION, + Unit::Seconds, + "Total time for a discovery handshake including retries" + ); +}