From eafd6bfd98e985a82c62a49c99ff63fcc016c5de Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 18 Dec 2017 15:08:21 -0800 Subject: [PATCH] release connection capacity when recv_data has stream error (#186) --- src/proto/streams/recv.rs | 2 +- src/proto/streams/streams.rs | 9 +++++ tests/flow_control.rs | 72 +++++++++++++++++++++++++++++++++++- 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 9f5cd6b..a2c2977 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -246,7 +246,7 @@ impl Recv { } /// Releases capacity of the connection - fn release_connection_capacity( + pub fn release_connection_capacity( &mut self, capacity: WindowSize, task: &mut Option, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 9acab33..6a431dc 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -188,7 +188,16 @@ where let send_buffer = &mut *send_buffer; me.counts.transition(stream, |_, stream| { + let sz = frame.payload().len(); let res = actions.recv.recv_data(frame, stream); + + // Any stream error after receiving a DATA frame means + // we won't give the data to the user, and so they can't + // release the capacity. We do it automatically. + if let Err(RecvError::Stream { .. }) = res { + actions.recv.release_connection_capacity(sz as WindowSize, &mut None); + } + actions.reset_on_recv_stream_err(send_buffer, stream, res) }) } diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 2c4b112..360cd8a 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -320,6 +320,76 @@ fn recv_window_update_causes_overflow() { // A received window update causes the window to overflow. } +#[test] +fn stream_error_release_connection_capacity() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos() + ) + // we're sending the wrong content-length + .send_frame( + frames::headers(1) + .response(200) + .field("content-length", &*(16_384 * 3).to_string()) + ) + .send_frame(frames::data(1, vec![0; 16_384])) + .send_frame(frames::data(1, vec![0; 16_384])) + .send_frame(frames::data(1, vec![0; 10]).eos()) + // mismatched content-length is a protocol error + .recv_frame(frames::reset(1).protocol_error()) + // but then the capacity should be released automatically + .recv_frame(frames::window_update(0, 16_384 * 2 + 10)) + .close(); + + let client = Client::handshake(io).unwrap() + .and_then(|(mut client, conn)| { + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + let req = client.send_request(request, true) + .unwrap() + .0.expect("response") + .and_then(|resp| { + assert_eq!(resp.status(), StatusCode::OK); + let mut body = resp.into_parts().1; + let mut cap = body.release_capacity().clone(); + let to_release = 16_384 * 2; + let mut should_recv_bytes = to_release; + let mut should_recv_frames = 2; + body + .for_each(move |bytes| { + should_recv_bytes -= bytes.len(); + should_recv_frames -= 1; + if should_recv_bytes == 0 { + assert_eq!(should_recv_bytes, 0); + } + + Ok(()) + }) + .expect_err("body") + .map(move |err| { + assert_eq!( + err.to_string(), + "protocol error: unspecific protocol error detected" + ); + cap.release_capacity(to_release).expect("release_capacity"); + }) + }); + conn.drive(req.expect("response")) + .and_then(|(conn, _)| conn.expect("client")) + }); + + srv.join(client).wait().unwrap(); +} + #[test] fn stream_close_by_data_frame_releases_capacity() { let _ = ::env_logger::init(); @@ -1016,7 +1086,7 @@ fn increase_target_window_size_after_using_some() { #[test] fn decrease_target_window_size() { - let _ = ::env_logger::init(); + let _ = ::env_logger::init(); let (io, srv) = mock::new(); let srv = srv.assert_client_handshake()