From c7d4182ffec23bc1f508a62bb59a9c286fdc888a Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 5 Dec 2018 09:44:20 -0800 Subject: [PATCH] Release closed streams capacity back to connection (#334) Previously, any streams that were dropped or closed while not having consumed the inflight received window capacity would simply leak that capacity for the connection. This could easily happen if a `RecvStream` were dropped before fully consuming the data, and therefore a user would have no idea how much capacity to release in the first place. This resulted in stalled connections that would never have capacity again. --- src/proto/streams/recv.rs | 43 ++++++++++++--- src/proto/streams/streams.rs | 6 +++ tests/h2-tests/tests/flow_control.rs | 80 +++++++++++++++++++++++----- 3 files changed, 110 insertions(+), 19 deletions(-) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index df54a39..cf900cd 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -334,7 +334,11 @@ impl Recv { capacity: WindowSize, task: &mut Option, ) { - trace!("release_connection_capacity; size={}", capacity); + trace!( + "release_connection_capacity; size={}, connection in_flight_data={}", + capacity, + self.in_flight_data, + ); // Decrement in-flight data self.in_flight_data -= capacity; @@ -383,6 +387,31 @@ impl Recv { Ok(()) } + /// Release any unclaimed capacity for a closed stream. + pub fn release_closed_capacity( + &mut self, + stream: &mut store::Ptr, + task: &mut Option, + ) { + debug_assert_eq!(stream.ref_count, 0); + + if stream.in_flight_recv_data == 0 { + return; + } + + trace!( + "auto-release closed stream ({:?}) capacity: {:?}", + stream.id, + stream.in_flight_recv_data, + ); + + self.release_connection_capacity( + stream.in_flight_recv_data, + task, + ); + stream.in_flight_recv_data = 0; + } + /// Set the "target" connection window size. /// /// By default, all new connections start with 64kb of window size. As @@ -515,12 +544,6 @@ impl Recv { }); } - // Update stream level flow control - stream.recv_flow.send_data(sz); - - // Track the data as in-flight - stream.in_flight_recv_data += sz; - if stream.dec_content_length(frame.payload().len()).is_err() { trace!("content-length overflow"); return Err(RecvError::Stream { @@ -544,6 +567,12 @@ impl Recv { } } + // Update stream level flow control + stream.recv_flow.send_data(sz); + + // Track the data as in-flight + stream.in_flight_recv_data += sz; + let event = Event::Data(frame.into_payload()); // Push the frame onto the recv buffer diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index cb02c8b..d91921f 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1118,10 +1118,16 @@ fn drop_stream_ref(inner: &Mutex, key: store::Key) { } } + me.counts.transition(stream, |counts, stream| { maybe_cancel(stream, actions, counts); if stream.ref_count == 0 { + + // Release any recv window back to connection, no one can access + // it anymore. + actions.recv.release_closed_capacity(stream, &mut actions.task); + // We won't be able to reach our push promises anymore let mut ppp = stream.pending_push_promises.take(); while let Some(promise) = ppp.pop(stream.store_mut()) { diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 63ce56f..1a5c7ef 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -542,8 +542,52 @@ fn stream_close_by_trailers_frame_releases_capacity() { } #[test] -#[ignore] -fn stream_close_by_send_reset_frame_releases_capacity() {} +fn stream_close_by_send_reset_frame_releases_capacity() { + let _ = ::env_logger::try_init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos() + ) + .send_frame(frames::headers(1).response(200)) + .send_frame(frames::data(1, vec![0; 16_384])) + .send_frame(frames::data(1, vec![0; 16_384]).eos()) + .recv_frame(frames::window_update(0, 16_384 * 2)) + .recv_frame( + frames::headers(3) + .request("GET", "https://http2.akamai.com/") + .eos() + ) + .send_frame(frames::headers(3).response(200).eos()) + .close(); + + let client = client::handshake(io).expect("client handshake") + .and_then(|(mut client, conn)| { + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + let (resp, _) = client.send_request(request, true).unwrap(); + conn.drive(resp.expect("response")).map(move |c| (c, client)) + }) + .and_then(|((conn, _res), mut client)| { + // ^-- ignore the response body + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + let (resp, _) = client.send_request(request, true).unwrap(); + conn.drive(resp.expect("response")) + }) + .and_then(|(conn, _res)| { + conn.expect("client conn") + }); + + srv.join(client).wait().expect("wait"); +} #[test] #[ignore] @@ -1130,17 +1174,29 @@ fn increase_target_window_size_after_using_some() { .uri("https://http2.akamai.com/") .body(()).unwrap(); - let res = client.send_request(request, true).unwrap().0 - .and_then(|res| { - // "leak" the capacity for now - res.into_parts().1.concat2() - }); + let res = client.send_request(request, true).unwrap().0; conn.drive(res) - .and_then(|(mut conn, _bytes)| { - conn.set_target_window_size(2 << 20); - conn.unwrap() - }).map(|c| (c, client)) + }) + .and_then(|(mut conn, res)| { + conn.set_target_window_size(2 << 20); + // drive an empty future to allow the WINDOW_UPDATE + // to go out while the response capacity is still in use. + let mut yielded = false; + conn.drive(futures::future::poll_fn(move || { + if yielded { + Ok::<_, ()>(().into()) + } else { + yielded = true; + futures::task::current().notify(); + Ok(futures::Async::NotReady) + } + })) + .map(move |(c, _)| (c, res)) + }) + .and_then(|(conn, res)| { + conn.drive(res.into_body().concat2()) + .and_then(|(c, _)| c.expect("client")) }); srv.join(client).wait().unwrap(); @@ -1214,7 +1270,6 @@ fn server_target_window_size() { srv.join(client).wait().unwrap(); } - #[test] fn recv_settings_increase_window_size_after_using_some() { // See https://github.com/carllerche/h2/issues/208 @@ -1361,6 +1416,7 @@ fn reset_stream_waiting_for_capacity() { client.join(srv).wait().unwrap(); } + #[test] fn data_padding() { let _ = ::env_logger::try_init();