diff --git a/crates/service-client/src/http.rs b/crates/service-client/src/http.rs index e4b3ca3964..ff3379d461 100644 --- a/crates/service-client/src/http.rs +++ b/crates/service-client/src/http.rs @@ -134,7 +134,11 @@ impl HttpClient { .keep_alive_interval(keep_alive_interval) .streams_per_connection_limit(options.streams_per_connection_limit) .keep_alive_timeout(options.http_keep_alive_options.timeout.into()) - .idle_authority_timeout(options.idle_pool_timeout.map(Into::into)); + .idle_connection_timeout(if options.idle_connection_timeout.is_zero() { + None + } else { + Some(options.idle_connection_timeout.into()) + }); let builder = match options.initial_max_send_streams { Some(value) => builder.initial_max_send_streams(value), diff --git a/crates/service-client/src/pool/authority.rs b/crates/service-client/src/pool/authority.rs index e085014a46..a2ffab601e 100644 --- a/crates/service-client/src/pool/authority.rs +++ b/crates/service-client/src/pool/authority.rs @@ -16,7 +16,6 @@ use std::collections::{HashSet, VecDeque}; use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; use std::task::{Context, Poll}; use bytes::Bytes; @@ -26,9 +25,7 @@ use metrics::histogram; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use tokio::io::{AsyncRead, AsyncWrite}; use tower::Service; -use tracing::trace; - -use restate_types::time::MillisSinceEpoch; +use tracing::{debug, trace}; use crate::pool::PoolConfig; use crate::pool::conn::{ConnectionConfig, ConnectionConfigBuilder}; @@ -48,7 +45,6 @@ struct AuthorityPoolShared { config: PoolConfig, connection_config: ConnectionConfig, inner: Arc>>, - last_used: AtomicU64, } enum AuthorityPoolState { @@ -85,31 +81,25 @@ impl Clone for AuthorityPool { } impl AuthorityPool { - /// Updates the last-used timestamp to the current time. - pub(crate) fn touch(&self) { - self.shared - .last_used - .store(MillisSinceEpoch::now().as_u64(), Ordering::Relaxed); - } - - /// Returns the last-used timestamp. - pub(crate) fn last_used(&self) -> MillisSinceEpoch { - MillisSinceEpoch::from(self.shared.last_used.load(Ordering::Relaxed)) - } + // Retains the connections for which retain returns true and returns (remaining, evicted). + pub fn retain(&self, mut retain: F) -> (usize, Vec>) + where + F: FnMut(&Connection) -> bool, + { + let mut drained = Vec::default(); + let mut inner = self.shared.inner.write(); + let mut i = 0; + + while i < inner.connections.len() { + if retain(&inner.connections[i]) { + i += 1; + continue; + } - #[cfg(test)] - pub(crate) fn set_last_used(&self, ts: MillisSinceEpoch) { - self.shared.last_used.store(ts.as_u64(), Ordering::Relaxed); - } + drained.push(inner.connections.swap_remove_back(i).unwrap()); + } - /// Returns `true` if any connection has in-flight H2 streams. - pub(crate) fn has_inflight(&self) -> bool { - self.shared - .inner - .read() - .connections - .iter() - .any(|c| c.inflight() > 0) + (inner.connections.len(), drained) } } @@ -133,7 +123,6 @@ where connector, config, connection_config, - last_used: AtomicU64::new(MillisSinceEpoch::now().as_u64()), inner: Arc::new(RwLock::new(AuthorityPoolInner { epoch: 0, connections: VecDeque::new(), @@ -329,6 +318,13 @@ where }); if scaleup { + debug!( + "Try scaling up pool (connections: {}, available streams: {}, total streams:{})", + inner.connections.len(), + total_available_streams, + total_max_concurrent_streams + ); + match self.try_expand_pool(&mut inner) { Some(mut candidate) => { drop(inner); diff --git a/crates/service-client/src/pool/config.rs b/crates/service-client/src/pool/config.rs index 5b0f2080f6..8f44b5eeeb 100644 --- a/crates/service-client/src/pool/config.rs +++ b/crates/service-client/src/pool/config.rs @@ -60,9 +60,9 @@ pub struct PoolConfig { /// How often to send HTTP/2 PING frames to keep idle connections alive. /// `None` disables keep-alive pings entirely. Defaults to `None`. pub(crate) keep_alive_interval: Option, - /// How long an authority pool can be idle before it is evicted from the + /// How long a connection can be idle before it is evicted from the /// pool. `None` disables eviction entirely. Defaults to 5 minutes. - pub(crate) idle_authority_timeout: Option, + pub(crate) idle_connection_timeout: Option, } impl Default for PoolConfig { @@ -73,7 +73,7 @@ impl Default for PoolConfig { streams_per_connection_limit: NonZeroUsize::new(128).unwrap(), keep_alive_interval: None, keep_alive_timeout: Duration::from_secs(20), - idle_authority_timeout: Some(Duration::from_secs(300)), + idle_connection_timeout: Some(Duration::from_secs(300)), } } } diff --git a/crates/service-client/src/pool/conn.rs b/crates/service-client/src/pool/conn.rs index f20a18dc5c..9edfd71948 100644 --- a/crates/service-client/src/pool/conn.rs +++ b/crates/service-client/src/pool/conn.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use std::pin::Pin; -use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, OnceLock}; use std::task::{Context, Poll}; use std::time::Duration; @@ -67,6 +67,7 @@ const STATE_CONNECTED: u8 = 2; const STATE_CLOSED: u8 = 3; /// The H2 handle obtained after a successful handshake. Set exactly once. +#[derive(Debug)] struct H2Handle { send_request: SendRequest, cancel: CancellationToken, @@ -78,12 +79,15 @@ struct H2Handle { /// The `state` field tracks the discriminant atomically. The `h2` handle is set /// once via `OnceLock` when transitioning to `Connected`. Only the waiter list /// requires a brief lock during the `Connecting` phase. +#[derive(Debug)] struct ConnectionShared { id: usize, config: ConnectionConfig, concurrency: Concurrency, state: AtomicU8, h2: OnceLock, + created_at: MillisSinceEpoch, + last_used_at: AtomicU64, /// Waiters registered during the Connecting phase. Narrowly-scoped lock. /// This is an Option to mark waiters list as invalid /// (not in CONNECTING state anymore) and it's not possible @@ -99,6 +103,7 @@ impl ConnectionShared { .min(config.initial_max_send_streams as usize), ); + let now = MillisSinceEpoch::now(); Self { id: next_connection_id(), config, @@ -106,6 +111,8 @@ impl ConnectionShared { state: AtomicU8::new(STATE_NEW), h2: OnceLock::new(), waiters: Mutex::new(Some(Vec::new())), + created_at: now, + last_used_at: AtomicU64::new(now.as_u64()), } } @@ -126,7 +133,7 @@ impl Drop for ConnectionShared { } } -#[derive(Clone, Copy, derive_builder::Builder)] +#[derive(Clone, Copy, Debug, derive_builder::Builder)] #[builder(pattern = "owned", default)] pub struct ConnectionConfig { initial_max_send_streams: u32, @@ -190,6 +197,30 @@ impl Connection { pub(crate) fn inflight(&self) -> usize { self.shared.concurrency.acquired() } + + /// Returns a unique connection id. + pub fn id(&self) -> usize { + self.shared.id + } + + pub fn created_at(&self) -> MillisSinceEpoch { + self.shared.created_at + } + + pub fn last_used_at(&self) -> MillisSinceEpoch { + self.shared.last_used_at.load(Ordering::Relaxed).into() + } + + fn touch(&self) { + // Concurrent updates from multiple threads could race and move the + // timestamp slightly backwards, but millisecond resolution makes this + // unlikely and a small skew is harmless for our use case (detecting + // connections that have been idle for a long time). + + self.shared + .last_used_at + .store(MillisSinceEpoch::now().as_u64(), Ordering::Relaxed); + } } impl Connection @@ -208,11 +239,6 @@ where } } - /// Returns a unique connection id. - pub fn id(&self) -> usize { - self.shared.id - } - pub async fn ready(&mut self) -> Result<(), Error> { poll_fn(|cx| self.poll_ready(cx)).await } @@ -316,6 +342,7 @@ where { // we should already have a permit. let permit = self.permit.take().expect("poll_ready() was called before"); + self.touch(); let state = match self.shared.state.load(Ordering::Acquire) { STATE_CLOSED => ResponseFutureState::error(Error::Closed), @@ -706,7 +733,9 @@ where counter!(CONNECTION_POOL_STREAM_OPENED).increment(1); let permit = this.permit.take().expect("available permit"); - let resp = resp.map(|recv| PermittedRecvStream::new(recv, permit)); + let resp = resp.map(|recv| { + PermittedRecvStream::new(recv, Arc::clone(&this.shared), permit) + }); return Poll::Ready(Ok(resp)); } } @@ -819,6 +848,7 @@ pub struct PermittedRecvStream { data_done: bool, start_time: MillisSinceEpoch, _permit: Permit, + _connection: Arc, } impl Drop for PermittedRecvStream { @@ -829,12 +859,13 @@ impl Drop for PermittedRecvStream { } impl PermittedRecvStream { - fn new(stream: RecvStream, permit: Permit) -> Self { + fn new(stream: RecvStream, connection: Arc, permit: Permit) -> Self { Self { stream, data_done: false, start_time: MillisSinceEpoch::now(), _permit: permit, + _connection: connection, } } } diff --git a/crates/service-client/src/pool/mod.rs b/crates/service-client/src/pool/mod.rs index 1562d753a8..126045f658 100644 --- a/crates/service-client/src/pool/mod.rs +++ b/crates/service-client/src/pool/mod.rs @@ -7,7 +7,7 @@ // As of the Change Date specified in that file, in accordance with // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -pub mod authority; +mod authority; mod config; pub mod conn; mod metric_definitions; @@ -76,7 +76,7 @@ impl Pool { let authorities = Arc::new(DashMap::default()); - if let Some(idle_timeout) = config.idle_authority_timeout { + if let Some(idle_timeout) = config.idle_connection_timeout { tokio::task::Builder::new() .name("h2:eviction-task") .spawn(eviction_task(Arc::downgrade(&authorities), idle_timeout)) @@ -115,7 +115,6 @@ where .or_insert_with(|| AuthorityPool::new(self.connector.clone(), self.config)) .value() .clone(); - authority_pool.touch(); async move { let mut request = request; @@ -136,56 +135,61 @@ where } } -/// Background task that periodically evicts idle authority pools. +/// Background task that periodically evicts idle connections. /// -/// Eviction uses two phases because simply removing a pool from the map and -/// dropping it would close the underlying H2 connections immediately — aborting -/// any requests that are still in flight. By splitting the process we decouple -/// "stop routing new requests here" from "shut down the connections": +/// On each tick the task walks every authority pool and removes connections +/// that are idle: `inflight() == 0` **and** `last_used_at` is older than +/// `idle_timeout`. Connections with in-flight streams are left in place so +/// that ongoing requests — including long-lived streams whose `last_used_at` +/// has gone stale — continue to be reused by new callers. Authority pools +/// whose connection list becomes empty are removed from the [`DashMap`]. /// -/// 1. **Evict** — Remove idle pools from the active [`DashMap`] into a -/// task-local drain list. New requests to the same authority will create a -/// fresh pool, while existing in-flight requests continue on the evicted -/// connection until all permits are dropped (has_inflight() is false). -/// 2. **Drain** — On each subsequent tick, check whether every connection in a -/// draining pool has zero in-flight streams. Once that is true the pool is -/// dropped, which triggers [`AuthorityPoolInner::Drop`] and gracefully -/// closes all connections. +/// Evicted connections do not need to be tracked here: each in-flight +/// [`PermittedRecvStream`] holds an `Arc`, so the +/// underlying H2 handle stays alive until the last outstanding stream +/// completes. This also covers the race in [`AuthorityPool::poll_ready`] +/// where a caller clones a `Connection` under the read lock and acquires a +/// stream permit on it after the lock is released — even if the eviction +/// task removes that connection in between, the late-arriving request still +/// has a live H2 handle to use, and graceful close via +/// [`conn::Connection`]'s `Drop` fires once the last `Arc` is released. /// -/// The task exits when all [`Pool`] clones are dropped (weak upgrade fails) -/// **and** the drain list is empty. +/// The task exits as soon as all [`Pool`] clones are dropped (weak upgrade +/// fails). async fn eviction_task( authorities: Weak>>, idle_timeout: Duration, ) { let interval = (idle_timeout / 4).max(Duration::from_secs(10)); - let mut draining: Vec> = Vec::new(); loop { tokio::time::sleep(interval).await; - // Phase 2: drop drained pools that have no in-flight streams. - draining.retain(|pool| pool.has_inflight()); - - // Phase 1: evict idle pools from the active map. + // Evict idle connections from each authority pool. match authorities.upgrade() { Some(map) => { let now = MillisSinceEpoch::now(); map.retain(|key, pool| { - if now.duration_since(pool.last_used()) > idle_timeout { - trace!("evicting idle authority pool ({})", key); - draining.push(pool.clone()); - false - } else { - true + let (size, evicted) = pool.retain(|con| { + con.inflight() > 0 || now.duration_since(con.last_used_at()) < idle_timeout + }); + + if !evicted.is_empty() { + debug!( + "evicting {} idle connections from pool ({})", + evicted.len(), + key + ); + } + + if size == 0 { + debug!("evicting empty authority pool ({})", key); } + size != 0 }); } None => { - // All Pool clones dropped. Keep draining until empty, then exit. - if draining.is_empty() { - return; - } + return; } } } @@ -362,7 +366,6 @@ mod test { use bytes::Bytes; use http::{Request, Uri}; use http_body_util::BodyExt; - use restate_types::time::MillisSinceEpoch; use crate::pool::PoolBuilder; use crate::pool::test_util::TestConnector; @@ -379,7 +382,7 @@ mod test { ) -> super::Pool { PoolBuilder::default() .initial_max_send_streams(std::num::NonZeroU32::new(max_concurrent_streams).unwrap()) - .idle_authority_timeout(Some(idle_timeout)) + .idle_connection_timeout(Some(idle_timeout)) .build(TestConnector::new(max_concurrent_streams)) } @@ -461,71 +464,13 @@ mod test { assert_eq!(pool.authorities.len(), 3); } - /// Idle authority pools are evicted by the background task. - /// Active pools (recently touched) are retained. - #[tokio::test] - async fn evicts_idle_authority_pools() { - let idle_timeout = Duration::from_secs(10); - let pool = make_pool_with_eviction(10, idle_timeout); - - // Send requests to two authorities to create their pools. - for host in ["host-a", "host-b"] { - pool.request( - Request::builder() - .uri(format!("http://{}:80", host)) - .body(http_body_util::Empty::::new()) - .unwrap(), - ) - .await - .unwrap(); - } - assert_eq!(pool.authorities.len(), 2); - - // Backdate host-a's last_used to simulate it being idle longer than the timeout. - let stale = MillisSinceEpoch::from(MillisSinceEpoch::now().as_u64().saturating_sub(20_000)); - pool.authorities - .iter() - .find(|entry| { - entry - .key() - .authority - .as_ref() - .is_some_and(|a| a.as_str() == "host-a:80") - }) - .unwrap() - .value() - .set_last_used(stale); - - // Run eviction directly to avoid timing dependencies. - let now = MillisSinceEpoch::now(); - pool.authorities - .retain(|_key, ap| now.duration_since(ap.last_used()) <= idle_timeout); - - // host-a should be evicted, host-b should remain. - assert_eq!(pool.authorities.len(), 1); - assert!(pool.authorities.iter().any(|e| { - e.key() - .authority - .as_ref() - .is_some_and(|a| a.as_str() == "host-b:80") - })); - - // A new request to host-a creates a fresh pool. - pool.request( - Request::builder() - .uri("http://host-a:80") - .body(http_body_util::Empty::::new()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(pool.authorities.len(), 2); - } - - /// Evicted authority pools with in-flight streams are kept in the drain - /// list until streams complete, then dropped (closing connections via Drop). + /// A connection removed from the authority pool keeps reporting its + /// in-flight count for as long as the response body is held, and the + /// count drops to zero once the body is released — the permit (and the + /// `Arc` inside [`PermittedRecvStream`]) is what + /// keeps the stream slot occupied past eviction. #[tokio::test] - async fn draining_waits_for_inflight_streams() { + async fn inflight_reflects_body_after_eviction() { let pool = make_pool_with_eviction(10, Duration::from_secs(10)); // Send a request and hold the response body to keep the stream alive. @@ -542,20 +487,27 @@ mod test { assert_eq!(pool.authorities.len(), 1); - // Clone the authority pool before removing it (simulates what eviction_task does). - let draining_pool = pool.authorities.iter().next().unwrap().value().clone(); + // Drain all connections + let (remaining, draining_pool) = pool + .authorities + .iter() + .next() + .unwrap() + .value() + .retain(|_| false); + assert_eq!(remaining, 0); // Remove from DashMap. pool.authorities.clear(); assert_eq!(pool.authorities.len(), 0); // The draining pool should report in-flight streams. - assert!(draining_pool.has_inflight()); + assert_ne!(draining_pool.iter().fold(0, |acc, v| acc + v.inflight()), 0); // Drop the response body to complete the stream. drop(body); // Now the draining pool should have no in-flight streams. - assert!(!draining_pool.has_inflight()); + assert_eq!(draining_pool.iter().fold(0, |acc, v| acc + v.inflight()), 0); } } diff --git a/crates/types/src/config/http.rs b/crates/types/src/config/http.rs index 458fdb8935..c0813b6433 100644 --- a/crates/types/src/config/http.rs +++ b/crates/types/src/config/http.rs @@ -77,13 +77,15 @@ pub struct HttpOptions { /// Default: 128 pub streams_per_connection_limit: NonZeroUsize, - /// # Idle Pool Timeout + /// # Idle Connection Timeout /// - /// How long a per-host connection pool can be idle before it is evicted - /// and its connections are closed. Set to `None` to disable eviction. + /// How long a connection can be idle before it is evicted + /// and closed. Set to `0` to disable eviction. + /// + /// Since: v1.7.0 /// /// Default: 5 minutes - pub idle_pool_timeout: Option, + pub idle_connection_timeout: FriendlyDuration, } impl Default for HttpOptions { @@ -95,7 +97,7 @@ impl Default for HttpOptions { connect_timeout: NonZeroFriendlyDuration::from_secs_unchecked(10), initial_max_send_streams: None, streams_per_connection_limit: NonZeroUsize::new(128).unwrap(), - idle_pool_timeout: Some(NonZeroFriendlyDuration::from_secs_unchecked(300)), + idle_connection_timeout: FriendlyDuration::from_secs(300), } } }