From cf43bc1d2849cb3b04517946e9697670b692e67f Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Mon, 18 May 2026 19:59:14 +0200 Subject: [PATCH] connection-pool: evict idle connections instead of entire authority pools Summary: Previously, the eviction task dropped an authority pool once it had been idle for longer than the configured timeout, taking all of its connections with it. This commit switches to a per-connection policy: connections that have been idle for longer than the timeout are evicted individually, and the authority pool entry is removed only once it has no remaining connections. Evicted connections are not drained immediately but only after the last stream that is holding an Arc to the underlying connection is dropped. Only then the connection is closed. This is to avoid racing with AuthorityPool::poll_ready. As a result, a connection may stay alive longer than the configured idle_connection_timeout if it still has in-flight work. The corresponding config knob is renamed from `idle_pool_timeout` to `idle_connection_timeout` to reflect the new semantics; the internal `idle_authority_timeout` field on `PoolConfig` is renamed to match. --- crates/service-client/src/http.rs | 6 +- crates/service-client/src/pool/authority.rs | 54 ++++--- crates/service-client/src/pool/config.rs | 6 +- crates/service-client/src/pool/conn.rs | 49 ++++-- crates/service-client/src/pool/mod.rs | 158 +++++++------------- crates/types/src/config/http.rs | 12 +- 6 files changed, 135 insertions(+), 150 deletions(-) diff --git a/crates/service-client/src/http.rs b/crates/service-client/src/http.rs index e4b3ca3964..ff3379d461 100644 --- a/crates/service-client/src/http.rs +++ b/crates/service-client/src/http.rs @@ -134,7 +134,11 @@ impl HttpClient { .keep_alive_interval(keep_alive_interval) .streams_per_connection_limit(options.streams_per_connection_limit) .keep_alive_timeout(options.http_keep_alive_options.timeout.into()) - .idle_authority_timeout(options.idle_pool_timeout.map(Into::into)); + .idle_connection_timeout(if options.idle_connection_timeout.is_zero() { + None + } else { + Some(options.idle_connection_timeout.into()) + }); let builder = match options.initial_max_send_streams { Some(value) => builder.initial_max_send_streams(value), diff --git a/crates/service-client/src/pool/authority.rs b/crates/service-client/src/pool/authority.rs index e085014a46..a2ffab601e 100644 --- a/crates/service-client/src/pool/authority.rs +++ b/crates/service-client/src/pool/authority.rs @@ -16,7 +16,6 @@ use std::collections::{HashSet, VecDeque}; use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; use std::task::{Context, Poll}; use bytes::Bytes; @@ -26,9 +25,7 @@ use metrics::histogram; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use tokio::io::{AsyncRead, AsyncWrite}; use tower::Service; -use tracing::trace; - -use restate_types::time::MillisSinceEpoch; +use tracing::{debug, trace}; use crate::pool::PoolConfig; use crate::pool::conn::{ConnectionConfig, ConnectionConfigBuilder}; @@ -48,7 +45,6 @@ struct AuthorityPoolShared { config: PoolConfig, connection_config: ConnectionConfig, inner: Arc>>, - last_used: AtomicU64, } enum AuthorityPoolState { @@ -85,31 +81,25 @@ impl Clone for AuthorityPool { } impl AuthorityPool { - /// Updates the last-used timestamp to the current time. - pub(crate) fn touch(&self) { - self.shared - .last_used - .store(MillisSinceEpoch::now().as_u64(), Ordering::Relaxed); - } - - /// Returns the last-used timestamp. - pub(crate) fn last_used(&self) -> MillisSinceEpoch { - MillisSinceEpoch::from(self.shared.last_used.load(Ordering::Relaxed)) - } + // Retains the connections for which retain returns true and returns (remaining, evicted). + pub fn retain(&self, mut retain: F) -> (usize, Vec>) + where + F: FnMut(&Connection) -> bool, + { + let mut drained = Vec::default(); + let mut inner = self.shared.inner.write(); + let mut i = 0; + + while i < inner.connections.len() { + if retain(&inner.connections[i]) { + i += 1; + continue; + } - #[cfg(test)] - pub(crate) fn set_last_used(&self, ts: MillisSinceEpoch) { - self.shared.last_used.store(ts.as_u64(), Ordering::Relaxed); - } + drained.push(inner.connections.swap_remove_back(i).unwrap()); + } - /// Returns `true` if any connection has in-flight H2 streams. - pub(crate) fn has_inflight(&self) -> bool { - self.shared - .inner - .read() - .connections - .iter() - .any(|c| c.inflight() > 0) + (inner.connections.len(), drained) } } @@ -133,7 +123,6 @@ where connector, config, connection_config, - last_used: AtomicU64::new(MillisSinceEpoch::now().as_u64()), inner: Arc::new(RwLock::new(AuthorityPoolInner { epoch: 0, connections: VecDeque::new(), @@ -329,6 +318,13 @@ where }); if scaleup { + debug!( + "Try scaling up pool (connections: {}, available streams: {}, total streams:{})", + inner.connections.len(), + total_available_streams, + total_max_concurrent_streams + ); + match self.try_expand_pool(&mut inner) { Some(mut candidate) => { drop(inner); diff --git a/crates/service-client/src/pool/config.rs b/crates/service-client/src/pool/config.rs index 5b0f2080f6..8f44b5eeeb 100644 --- a/crates/service-client/src/pool/config.rs +++ b/crates/service-client/src/pool/config.rs @@ -60,9 +60,9 @@ pub struct PoolConfig { /// How often to send HTTP/2 PING frames to keep idle connections alive. /// `None` disables keep-alive pings entirely. Defaults to `None`. pub(crate) keep_alive_interval: Option, - /// How long an authority pool can be idle before it is evicted from the + /// How long a connection can be idle before it is evicted from the /// pool. `None` disables eviction entirely. Defaults to 5 minutes. - pub(crate) idle_authority_timeout: Option, + pub(crate) idle_connection_timeout: Option, } impl Default for PoolConfig { @@ -73,7 +73,7 @@ impl Default for PoolConfig { streams_per_connection_limit: NonZeroUsize::new(128).unwrap(), keep_alive_interval: None, keep_alive_timeout: Duration::from_secs(20), - idle_authority_timeout: Some(Duration::from_secs(300)), + idle_connection_timeout: Some(Duration::from_secs(300)), } } } diff --git a/crates/service-client/src/pool/conn.rs b/crates/service-client/src/pool/conn.rs index f20a18dc5c..9edfd71948 100644 --- a/crates/service-client/src/pool/conn.rs +++ b/crates/service-client/src/pool/conn.rs @@ -9,7 +9,7 @@ // by the Apache License, Version 2.0. use std::pin::Pin; -use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, OnceLock}; use std::task::{Context, Poll}; use std::time::Duration; @@ -67,6 +67,7 @@ const STATE_CONNECTED: u8 = 2; const STATE_CLOSED: u8 = 3; /// The H2 handle obtained after a successful handshake. Set exactly once. +#[derive(Debug)] struct H2Handle { send_request: SendRequest, cancel: CancellationToken, @@ -78,12 +79,15 @@ struct H2Handle { /// The `state` field tracks the discriminant atomically. The `h2` handle is set /// once via `OnceLock` when transitioning to `Connected`. Only the waiter list /// requires a brief lock during the `Connecting` phase. +#[derive(Debug)] struct ConnectionShared { id: usize, config: ConnectionConfig, concurrency: Concurrency, state: AtomicU8, h2: OnceLock, + created_at: MillisSinceEpoch, + last_used_at: AtomicU64, /// Waiters registered during the Connecting phase. Narrowly-scoped lock. /// This is an Option to mark waiters list as invalid /// (not in CONNECTING state anymore) and it's not possible @@ -99,6 +103,7 @@ impl ConnectionShared { .min(config.initial_max_send_streams as usize), ); + let now = MillisSinceEpoch::now(); Self { id: next_connection_id(), config, @@ -106,6 +111,8 @@ impl ConnectionShared { state: AtomicU8::new(STATE_NEW), h2: OnceLock::new(), waiters: Mutex::new(Some(Vec::new())), + created_at: now, + last_used_at: AtomicU64::new(now.as_u64()), } } @@ -126,7 +133,7 @@ impl Drop for ConnectionShared { } } -#[derive(Clone, Copy, derive_builder::Builder)] +#[derive(Clone, Copy, Debug, derive_builder::Builder)] #[builder(pattern = "owned", default)] pub struct ConnectionConfig { initial_max_send_streams: u32, @@ -190,6 +197,30 @@ impl Connection { pub(crate) fn inflight(&self) -> usize { self.shared.concurrency.acquired() } + + /// Returns a unique connection id. + pub fn id(&self) -> usize { + self.shared.id + } + + pub fn created_at(&self) -> MillisSinceEpoch { + self.shared.created_at + } + + pub fn last_used_at(&self) -> MillisSinceEpoch { + self.shared.last_used_at.load(Ordering::Relaxed).into() + } + + fn touch(&self) { + // Concurrent updates from multiple threads could race and move the + // timestamp slightly backwards, but millisecond resolution makes this + // unlikely and a small skew is harmless for our use case (detecting + // connections that have been idle for a long time). + + self.shared + .last_used_at + .store(MillisSinceEpoch::now().as_u64(), Ordering::Relaxed); + } } impl Connection @@ -208,11 +239,6 @@ where } } - /// Returns a unique connection id. - pub fn id(&self) -> usize { - self.shared.id - } - pub async fn ready(&mut self) -> Result<(), Error> { poll_fn(|cx| self.poll_ready(cx)).await } @@ -316,6 +342,7 @@ where { // we should already have a permit. let permit = self.permit.take().expect("poll_ready() was called before"); + self.touch(); let state = match self.shared.state.load(Ordering::Acquire) { STATE_CLOSED => ResponseFutureState::error(Error::Closed), @@ -706,7 +733,9 @@ where counter!(CONNECTION_POOL_STREAM_OPENED).increment(1); let permit = this.permit.take().expect("available permit"); - let resp = resp.map(|recv| PermittedRecvStream::new(recv, permit)); + let resp = resp.map(|recv| { + PermittedRecvStream::new(recv, Arc::clone(&this.shared), permit) + }); return Poll::Ready(Ok(resp)); } } @@ -819,6 +848,7 @@ pub struct PermittedRecvStream { data_done: bool, start_time: MillisSinceEpoch, _permit: Permit, + _connection: Arc, } impl Drop for PermittedRecvStream { @@ -829,12 +859,13 @@ impl Drop for PermittedRecvStream { } impl PermittedRecvStream { - fn new(stream: RecvStream, permit: Permit) -> Self { + fn new(stream: RecvStream, connection: Arc, permit: Permit) -> Self { Self { stream, data_done: false, start_time: MillisSinceEpoch::now(), _permit: permit, + _connection: connection, } } } diff --git a/crates/service-client/src/pool/mod.rs b/crates/service-client/src/pool/mod.rs index 1562d753a8..126045f658 100644 --- a/crates/service-client/src/pool/mod.rs +++ b/crates/service-client/src/pool/mod.rs @@ -7,7 +7,7 @@ // As of the Change Date specified in that file, in accordance with // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -pub mod authority; +mod authority; mod config; pub mod conn; mod metric_definitions; @@ -76,7 +76,7 @@ impl Pool { let authorities = Arc::new(DashMap::default()); - if let Some(idle_timeout) = config.idle_authority_timeout { + if let Some(idle_timeout) = config.idle_connection_timeout { tokio::task::Builder::new() .name("h2:eviction-task") .spawn(eviction_task(Arc::downgrade(&authorities), idle_timeout)) @@ -115,7 +115,6 @@ where .or_insert_with(|| AuthorityPool::new(self.connector.clone(), self.config)) .value() .clone(); - authority_pool.touch(); async move { let mut request = request; @@ -136,56 +135,61 @@ where } } -/// Background task that periodically evicts idle authority pools. +/// Background task that periodically evicts idle connections. /// -/// Eviction uses two phases because simply removing a pool from the map and -/// dropping it would close the underlying H2 connections immediately — aborting -/// any requests that are still in flight. By splitting the process we decouple -/// "stop routing new requests here" from "shut down the connections": +/// On each tick the task walks every authority pool and removes connections +/// that are idle: `inflight() == 0` **and** `last_used_at` is older than +/// `idle_timeout`. Connections with in-flight streams are left in place so +/// that ongoing requests — including long-lived streams whose `last_used_at` +/// has gone stale — continue to be reused by new callers. Authority pools +/// whose connection list becomes empty are removed from the [`DashMap`]. /// -/// 1. **Evict** — Remove idle pools from the active [`DashMap`] into a -/// task-local drain list. New requests to the same authority will create a -/// fresh pool, while existing in-flight requests continue on the evicted -/// connection until all permits are dropped (has_inflight() is false). -/// 2. **Drain** — On each subsequent tick, check whether every connection in a -/// draining pool has zero in-flight streams. Once that is true the pool is -/// dropped, which triggers [`AuthorityPoolInner::Drop`] and gracefully -/// closes all connections. +/// Evicted connections do not need to be tracked here: each in-flight +/// [`PermittedRecvStream`] holds an `Arc`, so the +/// underlying H2 handle stays alive until the last outstanding stream +/// completes. This also covers the race in [`AuthorityPool::poll_ready`] +/// where a caller clones a `Connection` under the read lock and acquires a +/// stream permit on it after the lock is released — even if the eviction +/// task removes that connection in between, the late-arriving request still +/// has a live H2 handle to use, and graceful close via +/// [`conn::Connection`]'s `Drop` fires once the last `Arc` is released. /// -/// The task exits when all [`Pool`] clones are dropped (weak upgrade fails) -/// **and** the drain list is empty. +/// The task exits as soon as all [`Pool`] clones are dropped (weak upgrade +/// fails). async fn eviction_task( authorities: Weak>>, idle_timeout: Duration, ) { let interval = (idle_timeout / 4).max(Duration::from_secs(10)); - let mut draining: Vec> = Vec::new(); loop { tokio::time::sleep(interval).await; - // Phase 2: drop drained pools that have no in-flight streams. - draining.retain(|pool| pool.has_inflight()); - - // Phase 1: evict idle pools from the active map. + // Evict idle connections from each authority pool. match authorities.upgrade() { Some(map) => { let now = MillisSinceEpoch::now(); map.retain(|key, pool| { - if now.duration_since(pool.last_used()) > idle_timeout { - trace!("evicting idle authority pool ({})", key); - draining.push(pool.clone()); - false - } else { - true + let (size, evicted) = pool.retain(|con| { + con.inflight() > 0 || now.duration_since(con.last_used_at()) < idle_timeout + }); + + if !evicted.is_empty() { + debug!( + "evicting {} idle connections from pool ({})", + evicted.len(), + key + ); + } + + if size == 0 { + debug!("evicting empty authority pool ({})", key); } + size != 0 }); } None => { - // All Pool clones dropped. Keep draining until empty, then exit. - if draining.is_empty() { - return; - } + return; } } } @@ -362,7 +366,6 @@ mod test { use bytes::Bytes; use http::{Request, Uri}; use http_body_util::BodyExt; - use restate_types::time::MillisSinceEpoch; use crate::pool::PoolBuilder; use crate::pool::test_util::TestConnector; @@ -379,7 +382,7 @@ mod test { ) -> super::Pool { PoolBuilder::default() .initial_max_send_streams(std::num::NonZeroU32::new(max_concurrent_streams).unwrap()) - .idle_authority_timeout(Some(idle_timeout)) + .idle_connection_timeout(Some(idle_timeout)) .build(TestConnector::new(max_concurrent_streams)) } @@ -461,71 +464,13 @@ mod test { assert_eq!(pool.authorities.len(), 3); } - /// Idle authority pools are evicted by the background task. - /// Active pools (recently touched) are retained. - #[tokio::test] - async fn evicts_idle_authority_pools() { - let idle_timeout = Duration::from_secs(10); - let pool = make_pool_with_eviction(10, idle_timeout); - - // Send requests to two authorities to create their pools. - for host in ["host-a", "host-b"] { - pool.request( - Request::builder() - .uri(format!("http://{}:80", host)) - .body(http_body_util::Empty::::new()) - .unwrap(), - ) - .await - .unwrap(); - } - assert_eq!(pool.authorities.len(), 2); - - // Backdate host-a's last_used to simulate it being idle longer than the timeout. - let stale = MillisSinceEpoch::from(MillisSinceEpoch::now().as_u64().saturating_sub(20_000)); - pool.authorities - .iter() - .find(|entry| { - entry - .key() - .authority - .as_ref() - .is_some_and(|a| a.as_str() == "host-a:80") - }) - .unwrap() - .value() - .set_last_used(stale); - - // Run eviction directly to avoid timing dependencies. - let now = MillisSinceEpoch::now(); - pool.authorities - .retain(|_key, ap| now.duration_since(ap.last_used()) <= idle_timeout); - - // host-a should be evicted, host-b should remain. - assert_eq!(pool.authorities.len(), 1); - assert!(pool.authorities.iter().any(|e| { - e.key() - .authority - .as_ref() - .is_some_and(|a| a.as_str() == "host-b:80") - })); - - // A new request to host-a creates a fresh pool. - pool.request( - Request::builder() - .uri("http://host-a:80") - .body(http_body_util::Empty::::new()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(pool.authorities.len(), 2); - } - - /// Evicted authority pools with in-flight streams are kept in the drain - /// list until streams complete, then dropped (closing connections via Drop). + /// A connection removed from the authority pool keeps reporting its + /// in-flight count for as long as the response body is held, and the + /// count drops to zero once the body is released — the permit (and the + /// `Arc` inside [`PermittedRecvStream`]) is what + /// keeps the stream slot occupied past eviction. #[tokio::test] - async fn draining_waits_for_inflight_streams() { + async fn inflight_reflects_body_after_eviction() { let pool = make_pool_with_eviction(10, Duration::from_secs(10)); // Send a request and hold the response body to keep the stream alive. @@ -542,20 +487,27 @@ mod test { assert_eq!(pool.authorities.len(), 1); - // Clone the authority pool before removing it (simulates what eviction_task does). - let draining_pool = pool.authorities.iter().next().unwrap().value().clone(); + // Drain all connections + let (remaining, draining_pool) = pool + .authorities + .iter() + .next() + .unwrap() + .value() + .retain(|_| false); + assert_eq!(remaining, 0); // Remove from DashMap. pool.authorities.clear(); assert_eq!(pool.authorities.len(), 0); // The draining pool should report in-flight streams. - assert!(draining_pool.has_inflight()); + assert_ne!(draining_pool.iter().fold(0, |acc, v| acc + v.inflight()), 0); // Drop the response body to complete the stream. drop(body); // Now the draining pool should have no in-flight streams. - assert!(!draining_pool.has_inflight()); + assert_eq!(draining_pool.iter().fold(0, |acc, v| acc + v.inflight()), 0); } } diff --git a/crates/types/src/config/http.rs b/crates/types/src/config/http.rs index 458fdb8935..c0813b6433 100644 --- a/crates/types/src/config/http.rs +++ b/crates/types/src/config/http.rs @@ -77,13 +77,15 @@ pub struct HttpOptions { /// Default: 128 pub streams_per_connection_limit: NonZeroUsize, - /// # Idle Pool Timeout + /// # Idle Connection Timeout /// - /// How long a per-host connection pool can be idle before it is evicted - /// and its connections are closed. Set to `None` to disable eviction. + /// How long a connection can be idle before it is evicted + /// and closed. Set to `0` to disable eviction. + /// + /// Since: v1.7.0 /// /// Default: 5 minutes - pub idle_pool_timeout: Option, + pub idle_connection_timeout: FriendlyDuration, } impl Default for HttpOptions { @@ -95,7 +97,7 @@ impl Default for HttpOptions { connect_timeout: NonZeroFriendlyDuration::from_secs_unchecked(10), initial_max_send_streams: None, streams_per_connection_limit: NonZeroUsize::new(128).unwrap(), - idle_pool_timeout: Some(NonZeroFriendlyDuration::from_secs_unchecked(300)), + idle_connection_timeout: FriendlyDuration::from_secs(300), } } }