From c47717204ca75681f0e505601a493617f7efa3ac Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 8 Sep 2017 12:15:46 -0700 Subject: [PATCH] Flow control bug fix (#70) The requested capacity was not decreased as data is written. --- src/proto/streams/prioritize.rs | 45 +++++++++++++---- tests/flow_control.rs | 89 +++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 10 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 1d8b35d..c92d19c 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -144,6 +144,12 @@ impl Prioritize /// Request capacity to send data pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { + trace!("reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}", + stream.id, + capacity, + capacity + stream.buffered_send_data, + stream.requested_send_capacity); + // Actual capacity is `capacity` + the current amount of buffered data. // It it were less, then we could never send out the buffered data. let capacity = capacity + stream.buffered_send_data; @@ -242,8 +248,12 @@ impl Prioritize total_requested - stream.send_flow.available(), stream.send_flow.window_size()); - trace!("try_assign_capacity; requested={}; additional={}; window={}; conn={}", - total_requested, additional, stream.send_flow.window_size(), self.flow.available()); + trace!("try_assign_capacity; requested={}; additional={}; buffered={}; window={}; conn={}", + total_requested, + additional, + stream.buffered_send_data, + stream.send_flow.window_size(), + self.flow.available()); if additional == 0 { // Nothing more to do @@ -296,7 +306,18 @@ impl Prioritize // If data is buffered, then schedule the stream for execution if stream.buffered_send_data > 0 { debug_assert!(stream.send_flow.available() > 0); - debug_assert!(!stream.pending_send.is_empty()); + + // TODO: This assertion isn't *exactly* correct. There can still be + // buffered send data while the stream's pending send queue is + // empty. This can happen when a large data frame is in the process + // of being **partially** sent. Once the window has been sent, the + // data frame will be returned to the prioritization layer to be + // re-scheduled. + // + // That said, it would be nice to figure out how to make this + // assertion correctly. + // + // debug_assert!(!stream.pending_send.is_empty()); self.pending_send.push(stream); } @@ -464,32 +485,36 @@ impl Prioritize let len = cmp::min(sz, max_len); // Only send up to the stream's window capacity - let len = cmp::min(len, stream_capacity as usize); + let len = cmp::min(len, stream_capacity as usize) as WindowSize; // There *must* be be enough connection level // capacity at this point. - debug_assert!(len <= self.flow.window_size() as usize); + debug_assert!(len <= self.flow.window_size()); + + trace!(" --> sending data frame; len={}", len); // Update the flow control trace!(" -- updating stream flow --"); - stream.send_flow.send_data(len as WindowSize); + stream.send_flow.send_data(len); // Decrement the stream's buffered data counter - debug_assert!(stream.buffered_send_data >= len as u32); - stream.buffered_send_data -= len as u32; + debug_assert!(stream.buffered_send_data >= len); + stream.buffered_send_data -= len; + stream.requested_send_capacity -= len; // Assign the capacity back to the connection that // was just consumed from the stream in the previous // line. - self.flow.assign_capacity(len as WindowSize); + self.flow.assign_capacity(len); trace!(" -- updating connection flow --"); - self.flow.send_data(len as WindowSize); + self.flow.send_data(len); // Wrap the frame's data payload to ensure that the // correct amount of data gets written. let eos = frame.is_end_stream(); + let len = len as usize; if frame.payload().remaining() > len { frame.set_end_stream(false); diff --git a/tests/flow_control.rs b/tests/flow_control.rs index b2a3e68..c73e0c2 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -402,3 +402,92 @@ fn stream_close_by_send_reset_frame_releases_capacity() { #[ignore] fn stream_close_by_recv_reset_frame_releases_capacity() { } + +use futures::{Async, Poll}; + +struct GetResponse { + stream: Option>, +} + +impl Future for GetResponse { + type Item = (Response>, client::Stream); + type Error = (); + + fn poll(&mut self) -> Poll { + let response = match self.stream.as_mut().unwrap().poll_response() { + Ok(Async::Ready(v)) => v, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => panic!("unexpected error; {:?}", e), + }; + + Ok(Async::Ready((response, self.stream.take().unwrap()))) + } +} + +#[test] +fn recv_window_update_on_stream_closed_by_data_frame() { + let _ = ::env_logger::init(); + let (m, mock) = mock::new(); + + let h2 = Client::handshake(m).unwrap() + .and_then(|mut h2| { + let request = Request::builder() + .method(Method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + let stream = h2.request(request, false).unwrap(); + + // Wait for the response + h2.drive(GetResponse { + stream: Some(stream), + }) + }) + .and_then(|(h2, (response, mut stream))| { + assert_eq!(response.status(), StatusCode::OK); + + // Send a data frame, this will also close the connection + stream.send_data("hello".into(), true).unwrap(); + + // Wait for the connection to close + h2.unwrap() + }) + ; + + let mock = mock.assert_client_handshake().unwrap() + // Get the first frame + .and_then(|(_, mock)| mock.into_future().unwrap()) + .and_then(|(frame, mut mock)| { + let request = assert_headers!(frame.unwrap()); + + assert_eq!(request.stream_id(), 1); + assert!(!request.is_end_stream()); + + // Send the response which also closes the stream + let mut f = frame::Headers::new( + request.stream_id(), + frame::Pseudo::response(StatusCode::OK), + HeaderMap::new()); + f.set_end_stream(); + + mock.send(f.into()).unwrap(); + + mock.into_future().unwrap() + }) + .and_then(|(frame, mut mock)| { + let data = assert_data!(frame.unwrap()); + assert_eq!(data.payload(), "hello"); + + // Send a window update just for fun + let f = frame::WindowUpdate::new( + data.stream_id(), data.payload().len() as u32); + + mock.send(f.into()).unwrap(); + + Ok(()) + }) + ; + + let _ = h2.join(mock) + .wait().unwrap(); +}