Release closed streams capacity back to connection (#334)

Previously, any streams that were dropped or closed while not having
consumed the inflight received window capacity would simply leak that
capacity for the connection. This could easily happen if a `RecvStream`
were dropped before fully consuming the data, and therefore a user would
have no idea how much capacity to release in the first place. This
resulted in stalled connections that would never have capacity again.
This commit is contained in:
Sean McArthur
2018-12-05 09:44:20 -08:00
committed by GitHub
parent 545ff1e7dd
commit c7d4182ffe
3 changed files with 110 additions and 19 deletions

View File

@@ -334,7 +334,11 @@ impl Recv {
capacity: WindowSize,
task: &mut Option<Task>,
) {
trace!("release_connection_capacity; size={}", capacity);
trace!(
"release_connection_capacity; size={}, connection in_flight_data={}",
capacity,
self.in_flight_data,
);
// Decrement in-flight data
self.in_flight_data -= capacity;
@@ -383,6 +387,31 @@ impl Recv {
Ok(())
}
/// Release any unclaimed capacity for a closed stream.
pub fn release_closed_capacity(
&mut self,
stream: &mut store::Ptr,
task: &mut Option<Task>,
) {
debug_assert_eq!(stream.ref_count, 0);
if stream.in_flight_recv_data == 0 {
return;
}
trace!(
"auto-release closed stream ({:?}) capacity: {:?}",
stream.id,
stream.in_flight_recv_data,
);
self.release_connection_capacity(
stream.in_flight_recv_data,
task,
);
stream.in_flight_recv_data = 0;
}
/// Set the "target" connection window size.
///
/// By default, all new connections start with 64kb of window size. As
@@ -515,12 +544,6 @@ impl Recv {
});
}
// Update stream level flow control
stream.recv_flow.send_data(sz);
// Track the data as in-flight
stream.in_flight_recv_data += sz;
if stream.dec_content_length(frame.payload().len()).is_err() {
trace!("content-length overflow");
return Err(RecvError::Stream {
@@ -544,6 +567,12 @@ impl Recv {
}
}
// Update stream level flow control
stream.recv_flow.send_data(sz);
// Track the data as in-flight
stream.in_flight_recv_data += sz;
let event = Event::Data(frame.into_payload());
// Push the frame onto the recv buffer

View File

@@ -1118,10 +1118,16 @@ fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
}
}
me.counts.transition(stream, |counts, stream| {
maybe_cancel(stream, actions, counts);
if stream.ref_count == 0 {
// Release any recv window back to connection, no one can access
// it anymore.
actions.recv.release_closed_capacity(stream, &mut actions.task);
// We won't be able to reach our push promises anymore
let mut ppp = stream.pending_push_promises.take();
while let Some(promise) = ppp.pop(stream.store_mut()) {

View File

@@ -542,8 +542,52 @@ fn stream_close_by_trailers_frame_releases_capacity() {
}
#[test]
#[ignore]
fn stream_close_by_send_reset_frame_releases_capacity() {}
fn stream_close_by_send_reset_frame_releases_capacity() {
let _ = ::env_logger::try_init();
let (io, srv) = mock::new();
let srv = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos()
)
.send_frame(frames::headers(1).response(200))
.send_frame(frames::data(1, vec![0; 16_384]))
.send_frame(frames::data(1, vec![0; 16_384]).eos())
.recv_frame(frames::window_update(0, 16_384 * 2))
.recv_frame(
frames::headers(3)
.request("GET", "https://http2.akamai.com/")
.eos()
)
.send_frame(frames::headers(3).response(200).eos())
.close();
let client = client::handshake(io).expect("client handshake")
.and_then(|(mut client, conn)| {
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(()).unwrap();
let (resp, _) = client.send_request(request, true).unwrap();
conn.drive(resp.expect("response")).map(move |c| (c, client))
})
.and_then(|((conn, _res), mut client)| {
// ^-- ignore the response body
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(()).unwrap();
let (resp, _) = client.send_request(request, true).unwrap();
conn.drive(resp.expect("response"))
})
.and_then(|(conn, _res)| {
conn.expect("client conn")
});
srv.join(client).wait().expect("wait");
}
#[test]
#[ignore]
@@ -1130,17 +1174,29 @@ fn increase_target_window_size_after_using_some() {
.uri("https://http2.akamai.com/")
.body(()).unwrap();
let res = client.send_request(request, true).unwrap().0
.and_then(|res| {
// "leak" the capacity for now
res.into_parts().1.concat2()
});
let res = client.send_request(request, true).unwrap().0;
conn.drive(res)
.and_then(|(mut conn, _bytes)| {
conn.set_target_window_size(2 << 20);
conn.unwrap()
}).map(|c| (c, client))
})
.and_then(|(mut conn, res)| {
conn.set_target_window_size(2 << 20);
// drive an empty future to allow the WINDOW_UPDATE
// to go out while the response capacity is still in use.
let mut yielded = false;
conn.drive(futures::future::poll_fn(move || {
if yielded {
Ok::<_, ()>(().into())
} else {
yielded = true;
futures::task::current().notify();
Ok(futures::Async::NotReady)
}
}))
.map(move |(c, _)| (c, res))
})
.and_then(|(conn, res)| {
conn.drive(res.into_body().concat2())
.and_then(|(c, _)| c.expect("client"))
});
srv.join(client).wait().unwrap();
@@ -1214,7 +1270,6 @@ fn server_target_window_size() {
srv.join(client).wait().unwrap();
}
#[test]
fn recv_settings_increase_window_size_after_using_some() {
// See https://github.com/carllerche/h2/issues/208
@@ -1361,6 +1416,7 @@ fn reset_stream_waiting_for_capacity() {
client.join(srv).wait().unwrap();
}
#[test]
fn data_padding() {
let _ = ::env_logger::try_init();