From efa113bac6252104cd65284091814f8d13cd36dc Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 8 Dec 2021 10:03:15 -0800 Subject: [PATCH] Add max send buffer per stream option (#580) --- src/client.rs | 23 +++++++++++ src/proto/connection.rs | 2 + src/proto/mod.rs | 1 + src/proto/streams/mod.rs | 3 ++ src/proto/streams/send.rs | 12 +++--- src/server.rs | 23 +++++++++++ tests/h2-tests/tests/flow_control.rs | 57 ++++++++++++++++++++++++++++ 7 files changed, 115 insertions(+), 6 deletions(-) diff --git a/src/client.rs b/src/client.rs index 3a818a5..d4ec3b9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -320,6 +320,9 @@ pub struct Builder { /// Initial target window size for new connections. initial_target_connection_window_size: Option, + /// Maximum amount of bytes to "buffer" for writing per stream. + max_send_buffer_size: usize, + /// Maximum number of locally reset streams to keep at a time. reset_stream_max: usize, @@ -628,6 +631,7 @@ impl Builder { /// ``` pub fn new() -> Builder { Builder { + max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, initial_target_connection_window_size: None, @@ -962,6 +966,24 @@ impl Builder { self } + /// Sets the maximum send buffer size per stream. + /// + /// Once a stream has buffered up to (or over) the maximum, the stream's + /// flow control will not "poll" additional capacity. Once bytes for the + /// stream have been written to the connection, the send buffer capacity + /// will be freed up again. + /// + /// The default is currently ~400MB, but may change. + /// + /// # Panics + /// + /// This function panics if `max` is larger than `u32::MAX`. + pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + assert!(max <= std::u32::MAX as usize); + self.max_send_buffer_size = max; + self + } + /// Enables or disables server push promises. /// /// This value is included in the initial SETTINGS handshake. When set, the @@ -1184,6 +1206,7 @@ where proto::Config { next_stream_id: builder.stream_id, initial_max_send_streams: builder.initial_max_send_streams, + max_send_buffer_size: builder.max_send_buffer_size, reset_stream_duration: builder.reset_stream_duration, reset_stream_max: builder.reset_stream_max, settings: builder.settings.clone(), diff --git a/src/proto/connection.rs b/src/proto/connection.rs index d1b8b51..cd011a1 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -77,6 +77,7 @@ struct DynConnection<'a, B: Buf = Bytes> { pub(crate) struct Config { pub next_stream_id: StreamId, pub initial_max_send_streams: usize, + pub max_send_buffer_size: usize, pub reset_stream_duration: Duration, pub reset_stream_max: usize, pub settings: frame::Settings, @@ -108,6 +109,7 @@ where .initial_window_size() .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), initial_max_send_streams: config.initial_max_send_streams, + local_max_buffer_size: config.max_send_buffer_size, local_next_stream_id: config.next_stream_id, local_push_enabled: config.settings.is_push_enabled().unwrap_or(true), extended_connect_protocol_enabled: config diff --git a/src/proto/mod.rs b/src/proto/mod.rs index d505e77..5ec7bf9 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -33,3 +33,4 @@ pub type WindowSize = u32; pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; pub const DEFAULT_RESET_STREAM_MAX: usize = 10; pub const DEFAULT_RESET_STREAM_SECS: u64 = 30; +pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400; diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 0fd61a2..de2a2c8 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -41,6 +41,9 @@ pub struct Config { /// MAX_CONCURRENT_STREAMS specified in the frame. pub initial_max_send_streams: usize, + /// Max amount of DATA bytes to buffer per stream. + pub local_max_buffer_size: usize, + /// The stream ID to start the next local stream with pub local_next_stream_id: StreamId, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index e3fcf6d..b723003 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -28,6 +28,9 @@ pub(super) struct Send { /// > the identified last stream. max_stream_id: StreamId, + /// The maximum amount of bytes a stream should buffer. + max_buffer_size: usize, + /// Initial window size of locally initiated streams init_window_sz: WindowSize, @@ -52,6 +55,7 @@ impl Send { pub fn new(config: &Config) -> Self { Send { init_window_sz: config.remote_init_window_sz, + max_buffer_size: config.local_max_buffer_size, max_stream_id: StreamId::MAX, next_stream_id: Ok(config.local_next_stream_id), prioritize: Prioritize::new(config), @@ -333,14 +337,10 @@ impl Send { /// Current available stream send capacity pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { - let available = stream.send_flow.available().as_size(); + let available = stream.send_flow.available().as_size() as usize; let buffered = stream.buffered_send_data; - if available as usize <= buffered { - 0 - } else { - available - buffered as WindowSize - } + available.min(self.max_buffer_size).saturating_sub(buffered) as WindowSize } pub fn poll_reset( diff --git a/src/server.rs b/src/server.rs index 1eb4031..87c3000 100644 --- a/src/server.rs +++ b/src/server.rs @@ -245,6 +245,9 @@ pub struct Builder { /// Initial target window size for new connections. initial_target_connection_window_size: Option, + + /// Maximum amount of bytes to "buffer" for writing per stream. + max_send_buffer_size: usize, } /// Send a response back to the client @@ -633,6 +636,7 @@ impl Builder { reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, settings: Settings::default(), initial_target_connection_window_size: None, + max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, } } @@ -870,6 +874,24 @@ impl Builder { self } + /// Sets the maximum send buffer size per stream. + /// + /// Once a stream has buffered up to (or over) the maximum, the stream's + /// flow control will not "poll" additional capacity. Once bytes for the + /// stream have been written to the connection, the send buffer capacity + /// will be freed up again. + /// + /// The default is currently ~400MB, but may change. + /// + /// # Panics + /// + /// This function panics if `max` is larger than `u32::MAX`. + pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { + assert!(max <= std::u32::MAX as usize); + self.max_send_buffer_size = max; + self + } + /// Sets the maximum number of concurrent locally reset streams. /// /// When a stream is explicitly reset by either calling @@ -1290,6 +1312,7 @@ where next_stream_id: 2.into(), // Server does not need to locally initiate any streams initial_max_send_streams: 0, + max_send_buffer_size: self.builder.max_send_buffer_size, reset_stream_duration: self.builder.reset_stream_duration, reset_stream_max: self.builder.reset_stream_max, settings: self.builder.settings.clone(), diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index e7d6308..1a6018f 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1611,3 +1611,60 @@ async fn poll_capacity_after_send_data_and_reserve() { join(srv, h2).await; } + +#[tokio::test] +async fn max_send_buffer_size_overflow() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/")) + .await; + srv.send_frame(frames::headers(1).response(200).eos()).await; + srv.recv_frame(frames::data(1, &[0; 10][..])).await; + srv.recv_frame(frames::data(1, &[][..]).eos()).await; + }; + + let client = async move { + let (mut client, mut conn) = client::Builder::new() + .max_send_buffer_size(5) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let request = Request::builder() + .method(Method::POST) + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + let response = conn.drive(response).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + + assert_eq!(stream.capacity(), 0); + stream.reserve_capacity(10); + assert_eq!( + stream.capacity(), + 5, + "polled capacity not over max buffer size" + ); + + stream.send_data([0; 10][..].into(), false).unwrap(); + + stream.reserve_capacity(15); + assert_eq!( + stream.capacity(), + 0, + "now with buffered over the max, don't overflow" + ); + stream.send_data([0; 0][..].into(), true).unwrap(); + + // Wait for the connection to close + conn.await.unwrap(); + }; + + join(srv, client).await; +}