From b8c53af5c6989b987b8d5566edbf6174c3692360 Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Tue, 28 Apr 2026 12:17:55 -0400 Subject: [PATCH 1/3] feat: add recv buffer size option --- src/client.rs | 19 ++++++++++++++++++- src/codec/mod.rs | 32 ++++++++++++++++++++++++-------- src/proto/mod.rs | 1 + src/server.rs | 19 ++++++++++++++++++- 4 files changed, 61 insertions(+), 10 deletions(-) diff --git a/src/client.rs b/src/client.rs index 4393fdc7e..b8199793d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -324,6 +324,9 @@ pub struct Builder { /// Maximum amount of bytes to "buffer" for writing per stream. max_send_buffer_size: usize, + /// Maximum number of bytes to read at a time (for the entire connection). + recv_buffer_size: usize, + /// Maximum number of locally reset streams to keep at a time. reset_stream_max: usize, @@ -655,6 +658,7 @@ impl Builder { pub fn new() -> Builder { Builder { max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, + recv_buffer_size: proto::DEFAULT_RECV_BUFFER_SIZE, reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX, @@ -1074,6 +1078,19 @@ impl Builder { self } + /// Sets the read buffer size for the entire connection. + /// Determines the maximum number of bytes that can be read at a time. + /// The default is currently 8KB, but may change. + /// + /// # Panics + /// + /// This function panics if `n` is larger than `i32::MAX`. + pub fn recv_buffer_size(&mut self, n: usize) -> &mut Self { + assert!(n <= i32::MAX as usize); + self.recv_buffer_size = n; + self + } + /// Enables or disables server push promises. /// /// This value is included in the initial SETTINGS handshake. @@ -1309,7 +1326,7 @@ where bind_connection(&mut io).await?; // Create the codec - let mut codec = Codec::new(io); + let mut codec = Codec::with_recv_buffer_size(io, builder.recv_buffer_size); if let Some(max) = builder.settings.max_frame_size() { codec.set_max_recv_frame_size(max as usize); diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 6cbdc1e18..6bd4c4284 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -8,7 +8,7 @@ use self::framed_read::FramedRead; use self::framed_write::FramedWrite; use crate::frame::{self, Data, Frame}; -use crate::proto::Error; +use crate::proto::{self, Error}; use bytes::Buf; use futures_core::Stream; @@ -17,6 +17,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::length_delimited; +use tokio_util::codec::FramedRead as InnerFramedRead; use std::io; @@ -33,21 +34,36 @@ where /// Returns a new `Codec` with the default max frame size #[inline] pub fn new(io: T) -> Self { - Self::with_max_recv_frame_size(io, frame::DEFAULT_MAX_FRAME_SIZE as usize) + Self::with_recv_buffer_size(io, proto::DEFAULT_RECV_BUFFER_SIZE) + } + + /// Returns a new `Codec` with the given read buffer size + pub fn with_recv_buffer_size(io: T, recv_buffer_size: usize) -> Self { + Self::with_max_recv_frame_size_and_recv_buffer_size(io, frame::DEFAULT_MAX_FRAME_SIZE as usize, recv_buffer_size) } /// Returns a new `Codec` with the given maximum frame size + #[allow(dead_code)] pub fn with_max_recv_frame_size(io: T, max_frame_size: usize) -> Self { + Self::with_max_recv_frame_size_and_recv_buffer_size(io, max_frame_size, proto::DEFAULT_RECV_BUFFER_SIZE) + } + + /// Returns a new `Codec` with the given maximum frame size and read buffer size + pub fn with_max_recv_frame_size_and_recv_buffer_size(io: T, max_frame_size: usize, recv_buffer_size: usize) -> Self { // Wrap with writer let framed_write = FramedWrite::new(io); // Delimit the frames - let delimited = length_delimited::Builder::new() - .big_endian() - .length_field_length(3) - .length_adjustment(9) - .num_skip(0) // Don't skip the header - .new_read(framed_write); + let delimited = InnerFramedRead::with_capacity( + framed_write, + length_delimited::Builder::new() + .big_endian() + .length_field_length(3) + .length_adjustment(9) + .num_skip(0) // Don't skip the header + .new_codec(), + recv_buffer_size, + ); let mut inner = FramedRead::new(delimited); diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 730fec998..15fd11ecd 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -40,3 +40,4 @@ pub const DEFAULT_RESET_STREAM_MAX: usize = 50; // reasonable guess of the average here. pub const DEFAULT_RESET_STREAM_SECS: u64 = 1; pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400; +pub const DEFAULT_RECV_BUFFER_SIZE: usize = 1024 * 8; diff --git a/src/server.rs b/src/server.rs index da6f259b8..2c40b046b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -253,6 +253,9 @@ pub struct Builder { /// Maximum amount of bytes to "buffer" for writing per stream. max_send_buffer_size: usize, + /// Maximum number of bytes to read at a time (for the entire connection). + recv_buffer_size: usize, + /// Maximum number of locally reset streams due to protocol error across /// the lifetime of the connection. /// @@ -381,7 +384,7 @@ where let entered = span.enter(); // Create the codec. - let mut codec = Codec::new(io); + let mut codec = Codec::with_recv_buffer_size(io, builder.recv_buffer_size); if let Some(max) = builder.settings.max_frame_size() { codec.set_max_recv_frame_size(max as usize); @@ -655,6 +658,7 @@ impl Builder { settings: Settings::default(), initial_target_connection_window_size: None, max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, + recv_buffer_size: proto::DEFAULT_RECV_BUFFER_SIZE, local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX), } @@ -985,6 +989,19 @@ impl Builder { self } + /// Sets the read buffer size for the entire connection. + /// Determines the maximum number of bytes that can be read at a time. + /// The default is currently 8KB, but may change. + /// + /// # Panics + /// + /// This function panics if `n` is larger than `i32::MAX`. + pub fn recv_buffer_size(&mut self, n: usize) -> &mut Self { + assert!(n <= i32::MAX as usize); + self.recv_buffer_size = n; + self + } + /// Sets the maximum number of concurrent locally reset streams. /// /// When a stream is explicitly reset by either calling From 9c15df08932b817ac5fc100286f5b1dd1844798a Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Wed, 29 Apr 2026 17:15:00 -0400 Subject: [PATCH 2/3] fix: use secondary recv buffer on low capacity --- src/codec/framed_read.rs | 82 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 78 insertions(+), 4 deletions(-) diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index edb8d9548..6ec97abd2 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -11,19 +11,20 @@ use futures_core::Stream; use bytes::{Buf, BytesMut}; use std::io; - +use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::task::{Context, Poll}; + use tokio::io::AsyncRead; use tokio_util::codec::FramedRead as InnerFramedRead; -use tokio_util::codec::{LengthDelimitedCodec, LengthDelimitedCodecError}; +use tokio_util::codec::{Decoder, LengthDelimitedCodec, LengthDelimitedCodecError}; // 16 MB "sane default" taken from golang http2 const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: usize = 16 << 20; #[derive(Debug)] pub struct FramedRead { - inner: InnerFramedRead, + inner: InnerFramedRead>, // hpack decoder state hpack: hpack::Decoder, @@ -35,6 +36,19 @@ pub struct FramedRead { partial: Option, } +/// Enables more efficient frame decoder buffering +#[derive(Debug)] +struct BufferManager { + inner: T, + + /// Secondary buffer which gets used when the primary buffer's capacity falls below min_buf_capacity. + /// + /// This buffer has a higher likelihood of being able to reclaim its original buffer space. + alt_buf: BytesMut, + + min_buf_capacity: usize, +} + /// Partially loaded headers frame #[derive(Debug)] struct Partial { @@ -58,8 +72,9 @@ impl FramedRead { let max_header_list_size = DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE; let max_continuation_frames = calc_max_continuation_frames(max_header_list_size, inner.decoder().max_frame_length()); + let min_buf_capacity = inner.read_buffer().capacity() / 2; FramedRead { - inner, + inner: inner.map_decoder(|d| BufferManager::with_min_buf_capacity(d, min_buf_capacity)), hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE), max_header_list_size, max_continuation_frames, @@ -419,6 +434,65 @@ fn map_err(err: io::Error) -> Error { err.into() } +// ===== impl BufferManager ===== + +impl BufferManager { + pub fn with_min_buf_capacity(inner: T, min_buf_capacity: usize) -> Self { + BufferManager { + inner, + alt_buf: BytesMut::new(), + min_buf_capacity, + } + } +} + +impl Deref for BufferManager { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for BufferManager { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl Decoder for BufferManager { + type Item = T::Item; + type Error = T::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let r = self.inner.decode(src); + // If we can't decode any more frames at the moment, + // and buffer capacity has fallen below the desired minimum. + if matches!(r, Ok(None)) && src.capacity() < self.min_buf_capacity { + // Empty the secondary buffer (it hasn't been used in a while anyways). + self.alt_buf.clear(); + + // Ensure that the secondary buffer has at least 2 times the minimum desired capacity. + // It's much more likely for the secondary buffer to no longer be referenced externally at this point, + // increasing the likelihood of this reservation simply reclaiming its original buffer space. + self.alt_buf.reserve(self.min_buf_capacity * 2); + + // Copy the primary buffer to the secondary buffer. + // The primary buffer is highly likely to be empty or almost empty within this block. + self.alt_buf.extend_from_slice(src); + + tracing::trace!( + capacity_before = %src.capacity(), + capacity_after = %self.alt_buf.capacity(), + len = %src.len(), + "replacing read buffer", + ); + + std::mem::swap(src, &mut self.alt_buf); + } + r + } +} + // ===== impl Continuable ===== impl Continuable { From 5a8d254a01def7c392dd3031a9350a6342c91df5 Mon Sep 17 00:00:00 2001 From: Ned Anderson Date: Wed, 6 May 2026 21:29:42 -0400 Subject: [PATCH 3/3] fix formatting --- src/codec/mod.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 6bd4c4284..ed6ca6cfd 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -39,17 +39,29 @@ where /// Returns a new `Codec` with the given read buffer size pub fn with_recv_buffer_size(io: T, recv_buffer_size: usize) -> Self { - Self::with_max_recv_frame_size_and_recv_buffer_size(io, frame::DEFAULT_MAX_FRAME_SIZE as usize, recv_buffer_size) + Self::with_max_recv_frame_size_and_recv_buffer_size( + io, + frame::DEFAULT_MAX_FRAME_SIZE as usize, + recv_buffer_size, + ) } /// Returns a new `Codec` with the given maximum frame size #[allow(dead_code)] pub fn with_max_recv_frame_size(io: T, max_frame_size: usize) -> Self { - Self::with_max_recv_frame_size_and_recv_buffer_size(io, max_frame_size, proto::DEFAULT_RECV_BUFFER_SIZE) + Self::with_max_recv_frame_size_and_recv_buffer_size( + io, + max_frame_size, + proto::DEFAULT_RECV_BUFFER_SIZE, + ) } /// Returns a new `Codec` with the given maximum frame size and read buffer size - pub fn with_max_recv_frame_size_and_recv_buffer_size(io: T, max_frame_size: usize, recv_buffer_size: usize) -> Self { + pub fn with_max_recv_frame_size_and_recv_buffer_size( + io: T, + max_frame_size: usize, + recv_buffer_size: usize, + ) -> Self { // Wrap with writer let framed_write = FramedWrite::new(io);