Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/service-client/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ impl HttpClient {
.keep_alive_interval(keep_alive_interval)
.streams_per_connection_limit(options.streams_per_connection_limit)
.keep_alive_timeout(options.http_keep_alive_options.timeout.into())
.idle_authority_timeout(options.idle_pool_timeout.map(Into::into));
.idle_connection_timeout(if options.idle_connection_timeout.is_zero() {
None
} else {
Some(options.idle_connection_timeout.into())
});

let builder = match options.initial_max_send_streams {
Some(value) => builder.initial_max_send_streams(value),
Expand Down
54 changes: 25 additions & 29 deletions crates/service-client/src/pool/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context, Poll};

use bytes::Bytes;
Expand All @@ -26,9 +25,7 @@ use metrics::histogram;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use tokio::io::{AsyncRead, AsyncWrite};
use tower::Service;
use tracing::trace;

use restate_types::time::MillisSinceEpoch;
use tracing::{debug, trace};

use crate::pool::PoolConfig;
use crate::pool::conn::{ConnectionConfig, ConnectionConfigBuilder};
Expand All @@ -48,7 +45,6 @@ struct AuthorityPoolShared<C> {
config: PoolConfig,
connection_config: ConnectionConfig,
inner: Arc<RwLock<AuthorityPoolInner<C>>>,
last_used: AtomicU64,
}

enum AuthorityPoolState<C> {
Expand Down Expand Up @@ -85,31 +81,25 @@ impl<C: Clone> Clone for AuthorityPool<C> {
}

impl<C> AuthorityPool<C> {
/// Updates the last-used timestamp to the current time.
pub(crate) fn touch(&self) {
self.shared
.last_used
.store(MillisSinceEpoch::now().as_u64(), Ordering::Relaxed);
}

/// Returns the last-used timestamp.
pub(crate) fn last_used(&self) -> MillisSinceEpoch {
MillisSinceEpoch::from(self.shared.last_used.load(Ordering::Relaxed))
}
// Retains the connections for which retain returns true and returns (remaining, evicted).
pub fn retain<F>(&self, mut retain: F) -> (usize, Vec<Connection<C>>)
where
F: FnMut(&Connection<C>) -> bool,
{
let mut drained = Vec::default();
let mut inner = self.shared.inner.write();
let mut i = 0;

while i < inner.connections.len() {
if retain(&inner.connections[i]) {
i += 1;
continue;
}

#[cfg(test)]
pub(crate) fn set_last_used(&self, ts: MillisSinceEpoch) {
self.shared.last_used.store(ts.as_u64(), Ordering::Relaxed);
}
drained.push(inner.connections.swap_remove_back(i).unwrap());
}

/// Returns `true` if any connection has in-flight H2 streams.
pub(crate) fn has_inflight(&self) -> bool {
self.shared
.inner
.read()
.connections
.iter()
.any(|c| c.inflight() > 0)
(inner.connections.len(), drained)
}
}

Expand All @@ -133,7 +123,6 @@ where
connector,
config,
connection_config,
last_used: AtomicU64::new(MillisSinceEpoch::now().as_u64()),
inner: Arc::new(RwLock::new(AuthorityPoolInner {
epoch: 0,
connections: VecDeque::new(),
Expand Down Expand Up @@ -329,6 +318,13 @@ where
});

if scaleup {
debug!(
"Try scaling up pool (connections: {}, available streams: {}, total streams:{})",
inner.connections.len(),
total_available_streams,
total_max_concurrent_streams
);

match self.try_expand_pool(&mut inner) {
Some(mut candidate) => {
drop(inner);
Expand Down
6 changes: 3 additions & 3 deletions crates/service-client/src/pool/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ pub struct PoolConfig {
/// How often to send HTTP/2 PING frames to keep idle connections alive.
/// `None` disables keep-alive pings entirely. Defaults to `None`.
pub(crate) keep_alive_interval: Option<Duration>,
/// How long an authority pool can be idle before it is evicted from the
/// How long a connection can be idle before it is evicted from the
/// pool. `None` disables eviction entirely. Defaults to 5 minutes.
pub(crate) idle_authority_timeout: Option<Duration>,
pub(crate) idle_connection_timeout: Option<Duration>,
Comment on lines +63 to +65
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to set None value via toml in the configuration?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right! While this is the pool config (not restate config) but the same comment applies.

Maybe use 0 instead of None as no idle timeout ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah true. Yeah 0 for disabling eviction makes sense to me.

}

impl Default for PoolConfig {
Expand All @@ -73,7 +73,7 @@ impl Default for PoolConfig {
streams_per_connection_limit: NonZeroUsize::new(128).unwrap(),
keep_alive_interval: None,
keep_alive_timeout: Duration::from_secs(20),
idle_authority_timeout: Some(Duration::from_secs(300)),
idle_connection_timeout: Some(Duration::from_secs(300)),
}
}
}
Expand Down
49 changes: 40 additions & 9 deletions crates/service-client/src/pool/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// by the Apache License, Version 2.0.

use std::pin::Pin;
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll};
use std::time::Duration;
Expand Down Expand Up @@ -67,6 +67,7 @@ const STATE_CONNECTED: u8 = 2;
const STATE_CLOSED: u8 = 3;

/// The H2 handle obtained after a successful handshake. Set exactly once.
#[derive(Debug)]
struct H2Handle {
send_request: SendRequest<Bytes>,
cancel: CancellationToken,
Expand All @@ -78,12 +79,15 @@ struct H2Handle {
/// The `state` field tracks the discriminant atomically. The `h2` handle is set
/// once via `OnceLock` when transitioning to `Connected`. Only the waiter list
/// requires a brief lock during the `Connecting` phase.
#[derive(Debug)]
struct ConnectionShared {
id: usize,
config: ConnectionConfig,
concurrency: Concurrency,
state: AtomicU8,
h2: OnceLock<H2Handle>,
created_at: MillisSinceEpoch,
last_used_at: AtomicU64,
/// Waiters registered during the Connecting phase. Narrowly-scoped lock.
/// This is an Option<T> to mark waiters list as invalid
/// (not in CONNECTING state anymore) and it's not possible
Expand All @@ -99,13 +103,16 @@ impl ConnectionShared {
.min(config.initial_max_send_streams as usize),
);

let now = MillisSinceEpoch::now();
Self {
id: next_connection_id(),
config,
concurrency,
state: AtomicU8::new(STATE_NEW),
h2: OnceLock::new(),
waiters: Mutex::new(Some(Vec::new())),
created_at: now,
last_used_at: AtomicU64::new(now.as_u64()),
}
}

Expand All @@ -126,7 +133,7 @@ impl Drop for ConnectionShared {
}
}

#[derive(Clone, Copy, derive_builder::Builder)]
#[derive(Clone, Copy, Debug, derive_builder::Builder)]
#[builder(pattern = "owned", default)]
pub struct ConnectionConfig {
initial_max_send_streams: u32,
Expand Down Expand Up @@ -190,6 +197,30 @@ impl<C> Connection<C> {
pub(crate) fn inflight(&self) -> usize {
self.shared.concurrency.acquired()
}

/// Returns a unique connection id.
pub fn id(&self) -> usize {
self.shared.id
}

pub fn created_at(&self) -> MillisSinceEpoch {
self.shared.created_at
}

pub fn last_used_at(&self) -> MillisSinceEpoch {
self.shared.last_used_at.load(Ordering::Relaxed).into()
}

fn touch(&self) {
// Concurrent updates from multiple threads could race and move the
// timestamp slightly backwards, but millisecond resolution makes this
// unlikely and a small skew is harmless for our use case (detecting
// connections that have been idle for a long time).

self.shared
.last_used_at
.store(MillisSinceEpoch::now().as_u64(), Ordering::Relaxed);
}
}

impl<C> Connection<C>
Expand All @@ -208,11 +239,6 @@ where
}
}

/// Returns a unique connection id.
pub fn id(&self) -> usize {
self.shared.id
}

pub async fn ready(&mut self) -> Result<(), Error> {
poll_fn(|cx| self.poll_ready(cx)).await
}
Expand Down Expand Up @@ -316,6 +342,7 @@ where
{
// we should already have a permit.
let permit = self.permit.take().expect("poll_ready() was called before");
self.touch();

let state = match self.shared.state.load(Ordering::Acquire) {
STATE_CLOSED => ResponseFutureState::error(Error::Closed),
Expand Down Expand Up @@ -706,7 +733,9 @@ where

counter!(CONNECTION_POOL_STREAM_OPENED).increment(1);
let permit = this.permit.take().expect("available permit");
let resp = resp.map(|recv| PermittedRecvStream::new(recv, permit));
let resp = resp.map(|recv| {
PermittedRecvStream::new(recv, Arc::clone(&this.shared), permit)
});
return Poll::Ready(Ok(resp));
}
}
Expand Down Expand Up @@ -819,6 +848,7 @@ pub struct PermittedRecvStream {
data_done: bool,
start_time: MillisSinceEpoch,
_permit: Permit,
_connection: Arc<ConnectionShared>,
}

impl Drop for PermittedRecvStream {
Expand All @@ -829,12 +859,13 @@ impl Drop for PermittedRecvStream {
}

impl PermittedRecvStream {
fn new(stream: RecvStream, permit: Permit) -> Self {
fn new(stream: RecvStream, connection: Arc<ConnectionShared>, permit: Permit) -> Self {
Self {
stream,
data_done: false,
start_time: MillisSinceEpoch::now(),
_permit: permit,
_connection: connection,
}
}
}
Expand Down
Loading
Loading