From 7c287af0d0d5dbe1baec264d3589916730ee5c24 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 13 Oct 2017 14:15:20 -0700 Subject: [PATCH] Fix some flow control bugs. (#152) * Release stream capacity back to the connection to avoid capacity leaks. * Actually notify waiting tasks when capacity becomes available. --- src/proto/streams/prioritize.rs | 4 + src/proto/streams/send.rs | 18 ++-- src/proto/streams/stream.rs | 8 ++ tests/flow_control.rs | 164 ++++++++++++++++++++++++++++++++ tests/support/frames.rs | 9 ++ tests/support/util.rs | 2 - 6 files changed, 196 insertions(+), 9 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index da587fa..c1d42e1 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -248,6 +248,8 @@ where where R: Resolve, { + trace!("assign_connection_capacity; inc={}", inc); + self.flow.assign_capacity(inc); // Assign newly acquired capacity to streams pending capacity. @@ -315,6 +317,8 @@ where // TODO: Should prioritization factor into this? let assign = cmp::min(conn_available, additional); + trace!(" assigning; num={}", assign); + // Assign the capacity to the stream stream.assign_capacity(assign); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index f804dc7..9ed6b40 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -214,6 +214,7 @@ where } if !stream.send_capacity_inc { + stream.wait_send(); return Ok(Async::NotReady); } @@ -304,13 +305,16 @@ where trace!("decrementing all windows; dec={}", dec); + let mut total_reclaimed = 0; store.for_each(|mut stream| { let stream = &mut *stream; stream.send_flow.dec_window(dec); let available = stream.send_flow.available().as_size(); - stream.send_flow.claim_capacity(cmp::min(dec, available)); + let reclaim = cmp::min(dec, available); + stream.send_flow.claim_capacity(reclaim); + total_reclaimed += reclaim; trace!( "decremented stream window; id={:?}; decr={}; flow={:?}", @@ -319,15 +323,15 @@ where stream.send_flow ); - // TODO: Probably try to assign capacity? - - // TODO: Handle reclaiming connection level window - // capacity. - - // TODO: Should this notify the producer? + // TODO: Should this notify the producer when the capacity + // of a stream is reduced? Maybe it should if the capacity + // is reduced to zero, allowing the producer to stop work. Ok::<_, RecvError>(()) })?; + + self.prioritize + .assign_connection_capacity(total_reclaimed, store); } else if val > old_val { let inc = val - old_val; diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index b2988ef..9e4b304 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -229,8 +229,12 @@ where self.send_capacity_inc = true; self.send_flow.assign_capacity(capacity); + trace!(" assigned capacity to stream; available={}; buffered={}; id={:?}", + self.send_flow.available(), self.buffered_send_data, self.id); + // Only notify if the capacity exceeds the amount of buffered data if self.send_flow.available() > self.buffered_send_data { + trace!(" notifying task"); self.notify_send(); } } @@ -263,6 +267,10 @@ where } } + pub fn wait_send(&mut self) { + self.send_task = Some(task::current()); + } + pub fn notify_recv(&mut self) { if let Some(task) = self.recv_task.take() { task.notify(); diff --git a/tests/flow_control.rs b/tests/flow_control.rs index bc92fd4..0ee5a14 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -801,6 +801,170 @@ fn recv_settings_removes_available_capacity() { .wait().unwrap(); } +#[test] +fn recv_no_init_window_then_receive_some_init_window() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let mut settings = frame::Settings::default(); + settings.set_initial_window_size(Some(0)); + + let srv = srv.assert_client_handshake_with_settings(settings).unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("POST", "https://http2.akamai.com/") + ) + .idle_ms(100) + .send_frame(frames::settings().initial_window_size(10)) + .recv_frame(frames::settings_ack()) + .recv_frame(frames::data(1, "hello worl")) + .idle_ms(100) + .send_frame(frames::settings().initial_window_size(11)) + .recv_frame(frames::settings_ack()) + .recv_frame(frames::data(1, "d").eos()) + .send_frame( + frames::headers(1) + .response(204) + .eos() + ) + .close(); + + + let h2 = Client::handshake(io).unwrap() + .and_then(|(mut client, h2)| { + let request = Request::builder() + .method(Method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + let mut stream = client.send_request(request, false).unwrap(); + + stream.reserve_capacity(11); + + h2.drive(util::wait_for_capacity(stream, 11)) + }) + .and_then(|(h2, mut stream)| { + assert_eq!(stream.capacity(), 11); + + stream.send_data("hello world".into(), true).unwrap(); + + h2.drive(GetResponse { stream: Some(stream) }) + }) + .and_then(|(h2, (response, _))| { + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + // Wait for the connection to close + h2.unwrap() + }); + + let _ = h2.join(srv) + .wait().unwrap(); +} + +#[test] +fn settings_lowered_capacity_returns_capacity_to_connection() { + use std::thread; + + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE as usize; + + // Spawn the server on a thread + let th1 = thread::spawn(move || { + srv.assert_client_handshake().unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("POST", "https://example.com/one") + ) + .recv_frame( + frames::headers(3) + .request("POST", "https://example.com/two") + ) + .idle_ms(200) + // Remove all capacity from streams + .send_frame(frames::settings().initial_window_size(0)) + .recv_frame(frames::settings_ack()) + + // Let stream 3 make progress + .send_frame(frames::window_update(3, 11)) + .recv_frame(frames::data(3, "hello world").eos()) + + // Wait a bit + // + // TODO: Receive signal from main thread + .idle_ms(200) + + // Reset initial window size + .send_frame(frames::settings().initial_window_size(window_size as u32)) + .recv_frame(frames::settings_ack()) + + // Get data from first stream + .recv_frame(frames::data(1, "hello world").eos()) + + // Send responses + .send_frame( + frames::headers(1) + .response(204) + .eos() + ) + .send_frame( + frames::headers(3) + .response(204) + .eos() + ) + .close() + .wait().unwrap(); + }); + + let (mut client, h2) = Client::handshake(io).unwrap() + .wait().unwrap(); + + // Drive client connection + let th2 = thread::spawn(move || { + h2.wait().unwrap(); + }); + + let request = Request::post("https://example.com/one") + .body(()).unwrap(); + + let mut stream1 = client.send_request(request, false).unwrap(); + + let request = Request::post("https://example.com/two") + .body(()).unwrap(); + + let mut stream2 = client.send_request(request, false).unwrap(); + + // Reserve capacity for stream one, this will consume all connection level + // capacity + stream1.reserve_capacity(window_size); + let stream1 = util::wait_for_capacity(stream1, window_size).wait().unwrap(); + + // Now, wait for capacity on the other stream + stream2.reserve_capacity(11); + let mut stream2 = util::wait_for_capacity(stream2, 11).wait().unwrap(); + + // Send data on stream 2 + stream2.send_data("hello world".into(), true).unwrap(); + + // Wait for capacity on stream 1 + let mut stream1 = util::wait_for_capacity(stream1, 11).wait().unwrap(); + + stream1.send_data("hello world".into(), true).unwrap(); + + // Wait for responses.. + let resp = stream1.wait().unwrap(); + assert_eq!(resp.status(), StatusCode::NO_CONTENT); + + let resp = stream2.wait().unwrap(); + assert_eq!(resp.status(), StatusCode::NO_CONTENT); + + th1.join().unwrap(); + th2.join().unwrap(); +} + #[test] fn client_increase_target_window_size() { let _ = ::env_logger::init(); diff --git a/tests/support/frames.rs b/tests/support/frames.rs index 4c2ae8b..aa2775a 100644 --- a/tests/support/frames.rs +++ b/tests/support/frames.rs @@ -68,6 +68,10 @@ pub fn settings() -> Mock { Mock(frame::Settings::default()) } +pub fn settings_ack() -> Mock { + Mock(frame::Settings::ack()) +} + pub fn ping(payload: [u8; 8]) -> Mock { Mock(frame::Ping::new(payload)) } @@ -269,6 +273,11 @@ impl Mock { self.0.set_max_concurrent_streams(Some(max)); self } + + pub fn initial_window_size(mut self, val: u32) -> Self { + self.0.set_initial_window_size(Some(val)); + self + } } impl From> for frame::Settings { diff --git a/tests/support/util.rs b/tests/support/util.rs index 2e7cb49..ddf656c 100644 --- a/tests/support/util.rs +++ b/tests/support/util.rs @@ -35,8 +35,6 @@ impl Future for WaitForCapacity { let act = self.stream().capacity(); - println!("CAP={:?}", act); - if act >= self.target { return Ok(self.stream.take().unwrap().into()); }