From 5c1bde7d62b94a5c9415169bf87117c3dda2a642 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 13 Oct 2017 11:19:56 -0700 Subject: [PATCH] add set_target_window_size methods to Server and Client (#149) Closes #101 --- src/client.rs | 15 ++++ src/proto/connection.rs | 4 + src/proto/streams/flow_control.rs | 143 +++++++++++++++++++++++++----- src/proto/streams/prioritize.rs | 10 +-- src/proto/streams/recv.rs | 48 ++++++++++ src/proto/streams/send.rs | 12 ++- src/proto/streams/streams.rs | 9 ++ src/server.rs | 8 ++ tests/flow_control.rs | 129 +++++++++++++++++++++++++++ tests/support/mock.rs | 2 +- 10 files changed, 347 insertions(+), 33 deletions(-) diff --git a/src/client.rs b/src/client.rs index f063953..92b6cee 100644 --- a/src/client.rs +++ b/src/client.rs @@ -243,6 +243,21 @@ impl Default for Builder { // ===== impl Connection ===== + +impl Connection +where + T: AsyncRead + AsyncWrite, + B: IntoBuf, +{ + /// Sets the target window size for the whole connection. + /// + /// Default in HTTP2 is 65_535. + pub fn set_target_window_size(&mut self, size: u32) { + assert!(size <= proto::MAX_WINDOW_SIZE); + self.inner.set_target_window_size(size); + } +} + impl Future for Connection where T: AsyncRead + AsyncWrite, diff --git a/src/proto/connection.rs b/src/proto/connection.rs index da3e125..985eb7a 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -91,6 +91,10 @@ where } } + pub fn set_target_window_size(&mut self, size: WindowSize) { + self.streams.set_target_connection_window_size(size); + } + /// Returns `Ready` when the connection is ready to receive a frame. /// /// Returns `RecvError` as this may raise errors that are caused by delayed diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index 85a2310..9dd0fb1 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -1,6 +1,8 @@ use frame::Reason; use proto::{WindowSize, MAX_WINDOW_SIZE}; +use std::fmt; + // We don't want to send WINDOW_UPDATE frames for tiny changes, but instead // aggregate them when the changes are significant. Many implementations do // this by keeping a "ratio" of the update version the allowed window size. @@ -25,32 +27,41 @@ fn sanity_unclaimed_ratio() { #[derive(Copy, Clone, Debug)] pub struct FlowControl { - /// Window size as indicated by the peer. This can go negative. - window_size: i32, + /// Window the peer knows about. + /// + /// This can go negative if a SETTINGS_INITIAL_WINDOW_SIZE is received. + /// + /// For example, say the peer sends a request and uses 32kb of the window. + /// We send a SETTINGS_INITIAL_WINDOW_SIZE of 16kb. The peer has to adjust + /// its understanding of the capacity of the window, and that would be: + /// + /// ```notrust + /// default (64kb) - used (32kb) - settings_diff (64kb - 16kb): -16kb + /// ``` + window_size: Window, - /// The amount of the window that is currently available to consume. - available: WindowSize, + /// Window that we know about. + /// + /// This can go negative if a user declares a smaller target window than + /// the peer knows about. + available: Window, } impl FlowControl { pub fn new() -> FlowControl { FlowControl { - window_size: 0, - available: 0, + window_size: Window(0), + available: Window(0), } } /// Returns the window size as known by the peer pub fn window_size(&self) -> WindowSize { - if self.window_size < 0 { - 0 - } else { - self.window_size as WindowSize - } + self.window_size.as_size() } /// Returns the window size available to the consumer - pub fn available(&self) -> WindowSize { + pub fn available(&self) -> Window { self.available } @@ -60,11 +71,10 @@ impl FlowControl { return false; } - self.window_size as WindowSize > self.available + self.window_size > self.available } pub fn claim_capacity(&mut self, capacity: WindowSize) { - assert!(self.available >= capacity); self.available -= capacity; } @@ -80,14 +90,14 @@ impl FlowControl { /// /// This represents pending outbound WINDOW_UPDATE frames. pub fn unclaimed_capacity(&self) -> Option { - let available = self.available as i32; + let available = self.available; if self.window_size >= available { return None; } - let unclaimed = available - self.window_size; - let threshold = self.window_size / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR; + let unclaimed = available.0 - self.window_size.0; + let threshold = self.window_size.0 / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR; if unclaimed < threshold { None @@ -100,7 +110,7 @@ impl FlowControl { /// /// This is called after receiving a WINDOW_UPDATE frame pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> { - let (val, overflow) = self.window_size.overflowing_add(sz as i32); + let (val, overflow) = self.window_size.0.overflowing_add(sz as i32); if overflow { return Err(Reason::FLOW_CONTROL_ERROR); @@ -117,7 +127,7 @@ impl FlowControl { val ); - self.window_size = val; + self.window_size = Window(val); Ok(()) } @@ -133,8 +143,7 @@ impl FlowControl { self.available ); // This should not be able to overflow `window_size` from the bottom. - self.window_size -= sz as i32; - self.available = self.available.saturating_sub(sz); + self.window_size -= sz; } /// Decrements the window reflecting data has actually been sent. The caller @@ -148,10 +157,98 @@ impl FlowControl { ); // Ensure that the argument is correct - assert!(sz <= self.window_size as WindowSize); + assert!(sz <= self.window_size); // Update values - self.window_size -= sz as i32; + self.window_size -= sz; self.available -= sz; } } + +/// The current capacity of a flow-controlled Window. +/// +/// This number can go negative when either side has used a certain amount +/// of capacity when the other side advertises a reduction in size. +/// +/// This type tries to centralize the knowledge of addition and subtraction +/// to this capacity, instead of having integer casts throughout the source. +#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)] +pub struct Window(i32); + +impl Window { + pub fn as_size(&self) -> WindowSize { + if self.0 < 0 { + 0 + } else { + self.0 as WindowSize + } + } + + pub fn checked_size(&self) -> WindowSize { + assert!(self.0 >= 0, "negative Window"); + self.0 as WindowSize + } +} + +impl PartialEq for Window { + fn eq(&self, other: &WindowSize) -> bool { + if self.0 < 0 { + false + } else { + (self.0 as WindowSize).eq(other) + } + } +} + + +impl PartialEq for WindowSize { + fn eq(&self, other: &Window) -> bool { + other.eq(self) + } +} + +impl PartialOrd for Window { + fn partial_cmp(&self, other: &WindowSize) -> Option<::std::cmp::Ordering> { + if self.0 < 0 { + Some(::std::cmp::Ordering::Less) + } else { + (self.0 as WindowSize).partial_cmp(other) + } + } +} + +impl PartialOrd for WindowSize { + fn partial_cmp(&self, other: &Window) -> Option<::std::cmp::Ordering> { + if other.0 < 0 { + Some(::std::cmp::Ordering::Greater) + } else { + self.partial_cmp(&(other.0 as WindowSize)) + } + } +} + + +impl ::std::ops::SubAssign for Window { + fn sub_assign(&mut self, other: WindowSize) { + self.0 -= other as i32; + } +} + +impl ::std::ops::Add for Window { + type Output = Self; + fn add(self, other: WindowSize) -> Self::Output { + Window(self.0 + other as i32) + } +} + +impl ::std::ops::AddAssign for Window { + fn add_assign(&mut self, other: WindowSize) { + self.0 += other as i32; + } +} + +impl fmt::Display for Window { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.0, f) + } +} diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 502d8e6..da587fa 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -187,7 +187,7 @@ where stream.requested_send_capacity = capacity; // Currently available capacity assigned to the stream - let available = stream.send_flow.available(); + let available = stream.send_flow.available().as_size(); // If the stream has more assigned capacity than requested, reclaim // some for the connection @@ -275,9 +275,9 @@ where // The amount of additional capacity that the stream requests. // Don't assign more than the window has available! let additional = cmp::min( - total_requested - stream.send_flow.available(), + total_requested - stream.send_flow.available().as_size(), // Can't assign more than what is available - stream.send_flow.window_size() - stream.send_flow.available(), + stream.send_flow.window_size() - stream.send_flow.available().as_size(), ); trace!( @@ -304,7 +304,7 @@ where ); // The amount of currently available capacity on the connection - let conn_available = self.flow.available(); + let conn_available = self.flow.available().as_size(); // First check if capacity is immediately available if conn_available > 0 { @@ -550,7 +550,7 @@ where let len = cmp::min(sz, max_len); // Only send up to the stream's window capacity - let len = cmp::min(len, stream_capacity as usize) as WindowSize; + let len = cmp::min(len, stream_capacity.as_size() as usize) as WindowSize; // There *must* be be enough connection level // capacity at this point. diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 445fff5..ebb2d88 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -20,6 +20,9 @@ where /// Connection level flow control governing received data flow: FlowControl, + /// Amount of connection window capacity currently used by outstanding streams. + in_flight_data: WindowSize, + /// The lowest stream ID that is still idle next_stream_id: Result, @@ -75,6 +78,7 @@ where Recv { init_window_sz: config.local_init_window_sz, flow: flow, + in_flight_data: 0 as WindowSize, next_stream_id: Ok(next_stream_id.into()), pending_window_updates: store::Queue::new(), last_processed_id: StreamId::zero(), @@ -223,6 +227,7 @@ where // Decrement in-flight data stream.in_flight_recv_data -= capacity; + self.in_flight_data -= capacity; // Assign capacity to connection & stream self.flow.assign_capacity(capacity); @@ -246,6 +251,48 @@ where Ok(()) } + /// Set the "target" connection window size. + /// + /// By default, all new connections start with 64kb of window size. As + /// streams used and release capacity, we will send WINDOW_UPDATEs for the + /// connection to bring it back up to the initial "target". + /// + /// Setting a target means that we will try to tell the peer about + /// WINDOW_UPDATEs so the peer knows it has about `target` window to use + /// for the whole conection. + /// + /// The `task` is an optional parked task for the `Connection` that might + /// be blocked on needing more window capacity. + pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option) { + trace!( + "set_target_connection_window; target={}; available={}, reserved={}", + target, + self.flow.available(), + self.in_flight_data, + ); + + // The current target connection window is our `available` plus any + // in-flight data reserved by streams. + // + // Update the flow controller with the difference between the new + // target and the current target. + let current = (self.flow.available() + self.in_flight_data).checked_size(); + if target > current { + self.flow.assign_capacity(target - current); + } else { + self.flow.claim_capacity(current - target); + } + + // If changing the target capacity means we gained a bunch of capacity, + // enough that we went over the update threshold, then schedule sending + // a connection WINDOW_UPDATE. + if self.flow.unclaimed_capacity().is_some() { + if let Some(task) = task.take() { + task.notify(); + } + } + } + pub fn body_is_empty(&self, stream: &store::Ptr) -> bool { if !stream.state.is_recv_closed() { return false; @@ -298,6 +345,7 @@ where // Track the data as in-flight stream.in_flight_recv_data += sz; + self.in_flight_data += sz; if stream.dec_content_length(frame.payload().len()).is_err() { return Err(RecvError::Stream { diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 7c1cafa..f804dc7 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -6,7 +6,7 @@ use proto::*; use bytes::Buf; -use std::io; +use std::{cmp, io}; /// Manages state transitions related to outbound frames. #[derive(Debug)] @@ -144,7 +144,7 @@ where // Reclaim all capacity assigned to the stream and re-assign it to the // connection - let available = stream.send_flow.available(); + let available = stream.send_flow.available().as_size(); stream.send_flow.claim_capacity(available); let frame = frame::Reset::new(stream.id, reason); @@ -224,7 +224,7 @@ where /// Current available stream send capacity pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { - let available = stream.send_flow.available(); + let available = stream.send_flow.available().as_size(); let buffered = stream.buffered_send_data; if available <= buffered { @@ -265,7 +265,7 @@ where // Reclaim all capacity assigned to the stream and re-assign it to the // connection - let available = stream.send_flow.available(); + let available = stream.send_flow.available().as_size(); stream.send_flow.claim_capacity(available); // Re-assign all capacity to the connection self.prioritize @@ -308,6 +308,10 @@ where let stream = &mut *stream; stream.send_flow.dec_window(dec); + + let available = stream.send_flow.available().as_size(); + stream.send_flow.claim_capacity(cmp::min(dec, available)); + trace!( "decremented stream window; id={:?}; decr={}; flow={:?}", stream.id, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 028ca65..31e1697 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -80,6 +80,15 @@ where } } + pub fn set_target_connection_window_size(&mut self, size: WindowSize) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions + .recv + .set_target_connection_window(size, &mut me.actions.task) + } + /// Process inbound headers pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), RecvError> { let id = frame.stream_id(); diff --git a/src/server.rs b/src/server.rs index 29c3359..db7903b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -121,6 +121,14 @@ where } } + /// Sets the target window size for the whole connection. + /// + /// Default in HTTP2 is 65_535. + pub fn set_target_window_size(&mut self, size: u32) { + assert!(size <= proto::MAX_WINDOW_SIZE); + self.connection.set_target_window_size(size); + } + /// Returns `Ready` when the underlying connection has closed. pub fn poll_close(&mut self) -> Poll<(), ::Error> { self.connection.poll().map_err(Into::into) diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 86901ec..bc92fd4 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -800,3 +800,132 @@ fn recv_settings_removes_available_capacity() { let _ = h2.join(srv) .wait().unwrap(); } + +#[test] +fn client_increase_target_window_size() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame(frames::window_update(0, (2 << 20) - 65_535)) + .close(); + + + let client = Client::handshake(io).unwrap() + .and_then(|(_client, mut conn)| { + conn.set_target_window_size(2 << 20); + + conn.unwrap() + }); + + srv.join(client).wait().unwrap(); +} + +#[test] +fn increase_target_window_size_after_using_some() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos() + ) + .send_frame(frames::headers(1).response(200)) + .send_frame(frames::data(1, vec![0; 16_384]).eos()) + .recv_frame(frames::window_update(0, (2 << 20) - 65_535)) + .close(); + + let client = Client::handshake(io).unwrap() + .and_then(|(mut client, conn)| { + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + let res = client.send_request(request, true).unwrap() + .and_then(|res| { + // "leak" the capacity for now + res.into_parts().1.concat2() + }); + + conn.drive(res) + .and_then(|(mut conn, _bytes)| { + conn.set_target_window_size(2 << 20); + conn.unwrap() + }) + }); + + srv.join(client).wait().unwrap(); +} + +#[test] +fn decrease_target_window_size() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos() + ) + .send_frame(frames::headers(1).response(200)) + .send_frame(frames::data(1, vec![0; 16_384])) + .send_frame(frames::data(1, vec![0; 16_384])) + .send_frame(frames::data(1, vec![0; 16_384])) + .send_frame(frames::data(1, vec![0; 16_383]).eos()) + .recv_frame(frames::window_update(0, 16_384)) + .close(); + + let client = Client::handshake(io).unwrap() + .and_then(|(mut client, mut conn)| { + conn.set_target_window_size(16_384 * 2); + + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + let res = client.send_request(request, true).unwrap(); + conn.drive(res.expect("response")) + }) + .and_then(|(mut conn, res)| { + conn.set_target_window_size(16_384); + let mut body = res.into_parts().1; + let mut cap = body.release_capacity().clone(); + + conn.drive(body.concat2().expect("concat")) + .and_then(move |(conn, bytes)| { + assert_eq!(bytes.len(), 65_535); + cap.release_capacity(bytes.len()).unwrap(); + conn.expect("conn") + }) + }); + + srv.join(client).wait().unwrap(); +} + +#[test] +fn server_target_window_size() { + let _ = ::env_logger::init(); + let (io, client) = mock::new(); + + let client = client.assert_server_handshake() + .unwrap() + .recv_settings() + .recv_frame(frames::window_update(0, (2 << 20) - 65_535)) + .close(); + + let srv = Server::handshake(io).unwrap() + .and_then(|mut conn| { + conn.set_target_window_size(2 << 20); + conn.into_future().unwrap() + }); + + srv.join(client).wait().unwrap(); +} diff --git a/tests/support/mock.rs b/tests/support/mock.rs index 4fc3171..296e18f 100644 --- a/tests/support/mock.rs +++ b/tests/support/mock.rs @@ -538,7 +538,7 @@ impl Future for Idle { match self.handle.as_mut().unwrap().poll() { Ok(Async::NotReady) => Ok(Async::NotReady), res => { - panic!("Received unexpected frame on handle; frame={:?}", res); + panic!("Idle received unexpected frame on handle; frame={:?}", res); }, } }