diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index a2b2311..1d8b35d 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -121,6 +121,7 @@ impl Prioritize if frame.is_end_stream() { stream.state.send_close(); + self.reserve_capacity(0, stream); } trace!("send_data (2); available={}; buffered={}", @@ -150,8 +151,21 @@ impl Prioritize if capacity == stream.requested_send_capacity { // Nothing to do } else if capacity < stream.requested_send_capacity { - // TODO: release capacity - unimplemented!(); + // Update the target requested capacity + stream.requested_send_capacity = capacity; + + // Currently available capacity assigned to the stream + let available = stream.send_flow.available(); + + // If the stream has more assigned capacity than requested, reclaim + // some for the connection + if available > capacity { + let diff = available - capacity; + + stream.send_flow.claim_capacity(diff); + + self.assign_connection_capacity(diff, stream); + } } else { // Update the target requested capacity stream.requested_send_capacity = capacity; @@ -239,7 +253,8 @@ impl Prioritize // If the stream has requested capacity, then it must be in the // streaming state (more data could be sent) or there is buffered data // waiting to be sent. - debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0); + debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0, + "state={:?}", stream.state); // The amount of currently available capacity on the connection let conn_available = self.flow.available(); @@ -287,7 +302,6 @@ impl Prioritize } } - pub fn poll_complete(&mut self, store: &mut Store, dst: &mut Codec>) diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index b49d09e..431b946 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -86,8 +86,11 @@ where B: Buf, -> Result<(), UserError> { trace!("send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz); + + let end_stream = frame.is_end_stream(); + // Update the state - stream.state.send_open(frame.is_end_stream())?; + stream.state.send_open(end_stream)?; // Queue the frame for sending self.prioritize.queue_frame(frame.into(), stream, task); @@ -156,6 +159,9 @@ where B: Buf, trace!("send_trailers -- queuing; frame={:?}", frame); self.prioritize.queue_frame(frame.into(), stream, task); + // Release any excess capacity + self.prioritize.reserve_capacity(0, stream); + Ok(()) } diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 9ff672a..d23b5bc 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -1,3 +1,4 @@ +#[macro_use] extern crate h2_test_support; use h2_test_support::prelude::*; @@ -224,3 +225,206 @@ fn recv_data_overflows_window() { fn recv_window_update_causes_overflow() { // A received window update causes the window to overflow. } + +#[test] +fn stream_close_by_data_frame_releases_capacity() { + let _ = ::env_logger::init(); + let (m, mock) = mock::new(); + + let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE as usize; + + let h2 = Client::handshake(m).unwrap() + .and_then(|mut h2| { + let request = Request::builder() + .method(Method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + // Send request + let mut s1 = h2.request(request, false).unwrap(); + + // This effectively reserves the entire connection window + s1.reserve_capacity(window_size); + + // The capacity should be immediately available as nothing else is + // happening on the stream. + assert_eq!(s1.capacity(), window_size); + + let request = Request::builder() + .method(Method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + // Create a second stream + let mut s2 = h2.request(request, false).unwrap(); + + // Request capacity + s2.reserve_capacity(5); + + // There should be no available capacity (as it is being held up by + // the previous stream + assert_eq!(s2.capacity(), 0); + + // Closing the previous stream by sending an empty data frame will + // release the capacity to s2 + s1.send_data("".into(), true).unwrap(); + + // The capacity should be available + assert_eq!(s2.capacity(), 5); + + // Send the frame + s2.send_data("hello".into(), true).unwrap(); + + // Wait for the connection to close + h2.unwrap() + }) + ; + + let mock = mock.assert_client_handshake().unwrap() + // Get the first frame + .and_then(|(_, mock)| mock.into_future().unwrap()) + .and_then(|(frame, mock)| { + let request = assert_headers!(frame.unwrap()); + + assert_eq!(request.stream_id(), 1); + assert!(!request.is_end_stream()); + + mock.into_future().unwrap() + }) + .and_then(|(frame, mock)| { + let request = assert_headers!(frame.unwrap()); + + assert_eq!(request.stream_id(), 3); + assert!(!request.is_end_stream()); + + mock.into_future().unwrap() + }) + .and_then(|(frame, mock)| { + let data = assert_data!(frame.unwrap()); + + assert_eq!(data.stream_id(), 1); + assert_eq!(data.payload().len(), 0); + assert!(data.is_end_stream()); + + mock.into_future().unwrap() + }) + .and_then(|(frame, _)| { + let data = assert_data!(frame.unwrap()); + + assert_eq!(data.stream_id(), 3); + assert_eq!(data.payload(), "hello"); + assert!(data.is_end_stream()); + + Ok(()) + }) + ; + + let _ = h2.join(mock) + .wait().unwrap(); +} + +#[test] +fn stream_close_by_trailers_frame_releases_capacity() { + let _ = ::env_logger::init(); + let (m, mock) = mock::new(); + + let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE as usize; + + let h2 = Client::handshake(m).unwrap() + .and_then(|mut h2| { + let request = Request::builder() + .method(Method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + // Send request + let mut s1 = h2.request(request, false).unwrap(); + + // This effectively reserves the entire connection window + s1.reserve_capacity(window_size); + + // The capacity should be immediately available as nothing else is + // happening on the stream. + assert_eq!(s1.capacity(), window_size); + + let request = Request::builder() + .method(Method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + // Create a second stream + let mut s2 = h2.request(request, false).unwrap(); + + // Request capacity + s2.reserve_capacity(5); + + // There should be no available capacity (as it is being held up by + // the previous stream + assert_eq!(s2.capacity(), 0); + + // Closing the previous stream by sending a trailers frame will + // release the capacity to s2 + s1.send_trailers(Default::default()).unwrap(); + + // The capacity should be available + assert_eq!(s2.capacity(), 5); + + // Send the frame + s2.send_data("hello".into(), true).unwrap(); + + // Wait for the connection to close + h2.unwrap() + }) + ; + + let mock = mock.assert_client_handshake().unwrap() + // Get the first frame + .and_then(|(_, mock)| mock.into_future().unwrap()) + .and_then(|(frame, mock)| { + let request = assert_headers!(frame.unwrap()); + + assert_eq!(request.stream_id(), 1); + assert!(!request.is_end_stream()); + + mock.into_future().unwrap() + }) + .and_then(|(frame, mock)| { + let request = assert_headers!(frame.unwrap()); + + assert_eq!(request.stream_id(), 3); + assert!(!request.is_end_stream()); + + mock.into_future().unwrap() + }) + .and_then(|(frame, mock)| { + let trailers = assert_headers!(frame.unwrap()); + + assert_eq!(trailers.stream_id(), 1); + assert!(trailers.is_end_stream()); + + mock.into_future().unwrap() + }) + .and_then(|(frame, _)| { + let data = assert_data!(frame.unwrap()); + + assert_eq!(data.stream_id(), 3); + assert_eq!(data.payload(), "hello"); + assert!(data.is_end_stream()); + + Ok(()) + }) + ; + + let _ = h2.join(mock) + .wait().unwrap(); +} + +#[test] +#[ignore] +fn stream_close_by_send_reset_frame_releases_capacity() { +} + +#[test] +#[ignore] +fn stream_close_by_recv_reset_frame_releases_capacity() { +} diff --git a/tests/prioritization.rs b/tests/prioritization.rs index ff408af..76560ac 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -238,11 +238,10 @@ fn send_data_receive_window_update() { }); let mock = mock.assert_client_handshake().unwrap() - .and_then(|(_, mock)| { - mock.into_future().unwrap() - }) + .and_then(|(_, mock)| mock.into_future().unwrap()) .and_then(|(frame, mock)| { - let _ = assert_headers!(frame.unwrap()); + let request = assert_headers!(frame.unwrap()); + assert!(!request.is_end_stream()); mock.into_future().unwrap() }) .and_then(|(frame, mut mock)| {