Merge pull request #66 from carllerche/flow-control-bugs
Fix some flow control bugs
This commit is contained in:
		| @@ -121,6 +121,7 @@ impl<B, P> Prioritize<B, P> | |||||||
|  |  | ||||||
|         if frame.is_end_stream() { |         if frame.is_end_stream() { | ||||||
|             stream.state.send_close(); |             stream.state.send_close(); | ||||||
|  |             self.reserve_capacity(0, stream); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         trace!("send_data (2); available={}; buffered={}", |         trace!("send_data (2); available={}; buffered={}", | ||||||
| @@ -150,8 +151,21 @@ impl<B, P> Prioritize<B, P> | |||||||
|         if capacity == stream.requested_send_capacity { |         if capacity == stream.requested_send_capacity { | ||||||
|             // Nothing to do |             // Nothing to do | ||||||
|         } else if capacity < stream.requested_send_capacity { |         } else if capacity < stream.requested_send_capacity { | ||||||
|             // TODO: release capacity |             // Update the target requested capacity | ||||||
|             unimplemented!(); |             stream.requested_send_capacity = capacity; | ||||||
|  |  | ||||||
|  |             // Currently available capacity assigned to the stream | ||||||
|  |             let available = stream.send_flow.available(); | ||||||
|  |  | ||||||
|  |             // If the stream has more assigned capacity than requested, reclaim | ||||||
|  |             // some for the connection | ||||||
|  |             if available > capacity { | ||||||
|  |                 let diff = available - capacity; | ||||||
|  |  | ||||||
|  |                 stream.send_flow.claim_capacity(diff); | ||||||
|  |  | ||||||
|  |                 self.assign_connection_capacity(diff, stream); | ||||||
|  |             } | ||||||
|         } else { |         } else { | ||||||
|             // Update the target requested capacity |             // Update the target requested capacity | ||||||
|             stream.requested_send_capacity = capacity; |             stream.requested_send_capacity = capacity; | ||||||
| @@ -239,7 +253,8 @@ impl<B, P> Prioritize<B, P> | |||||||
|         // If the stream has requested capacity, then it must be in the |         // If the stream has requested capacity, then it must be in the | ||||||
|         // streaming state (more data could be sent) or there is buffered data |         // streaming state (more data could be sent) or there is buffered data | ||||||
|         // waiting to be sent. |         // waiting to be sent. | ||||||
|         debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0); |         debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0, | ||||||
|  |                       "state={:?}", stream.state); | ||||||
|  |  | ||||||
|         // The amount of currently available capacity on the connection |         // The amount of currently available capacity on the connection | ||||||
|         let conn_available = self.flow.available(); |         let conn_available = self.flow.available(); | ||||||
| @@ -287,7 +302,6 @@ impl<B, P> Prioritize<B, P> | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |  | ||||||
|     pub fn poll_complete<T>(&mut self, |     pub fn poll_complete<T>(&mut self, | ||||||
|                             store: &mut Store<B, P>, |                             store: &mut Store<B, P>, | ||||||
|                             dst: &mut Codec<T, Prioritized<B>>) |                             dst: &mut Codec<T, Prioritized<B>>) | ||||||
|   | |||||||
| @@ -86,8 +86,11 @@ where B: Buf, | |||||||
|         -> Result<(), UserError> |         -> Result<(), UserError> | ||||||
|     { |     { | ||||||
|         trace!("send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz); |         trace!("send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz); | ||||||
|  |  | ||||||
|  |         let end_stream = frame.is_end_stream(); | ||||||
|  |  | ||||||
|         // Update the state |         // Update the state | ||||||
|         stream.state.send_open(frame.is_end_stream())?; |         stream.state.send_open(end_stream)?; | ||||||
|  |  | ||||||
|         // Queue the frame for sending |         // Queue the frame for sending | ||||||
|         self.prioritize.queue_frame(frame.into(), stream, task); |         self.prioritize.queue_frame(frame.into(), stream, task); | ||||||
| @@ -156,6 +159,9 @@ where B: Buf, | |||||||
|         trace!("send_trailers -- queuing; frame={:?}", frame); |         trace!("send_trailers -- queuing; frame={:?}", frame); | ||||||
|         self.prioritize.queue_frame(frame.into(), stream, task); |         self.prioritize.queue_frame(frame.into(), stream, task); | ||||||
|  |  | ||||||
|  |         // Release any excess capacity | ||||||
|  |         self.prioritize.reserve_capacity(0, stream); | ||||||
|  |  | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,3 +1,4 @@ | |||||||
|  | #[macro_use] | ||||||
| extern crate h2_test_support; | extern crate h2_test_support; | ||||||
| use h2_test_support::prelude::*; | use h2_test_support::prelude::*; | ||||||
|  |  | ||||||
| @@ -198,3 +199,206 @@ fn recv_data_overflows_window() { | |||||||
| fn recv_window_update_causes_overflow() { | fn recv_window_update_causes_overflow() { | ||||||
|     // A received window update causes the window to overflow. |     // A received window update causes the window to overflow. | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn stream_close_by_data_frame_releases_capacity() { | ||||||
|  |     let _ = ::env_logger::init(); | ||||||
|  |     let (m, mock) = mock::new(); | ||||||
|  |  | ||||||
|  |     let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE as usize; | ||||||
|  |  | ||||||
|  |     let h2 = Client::handshake(m).unwrap() | ||||||
|  |         .and_then(|mut h2| { | ||||||
|  |             let request = Request::builder() | ||||||
|  |                 .method(Method::POST) | ||||||
|  |                 .uri("https://http2.akamai.com/") | ||||||
|  |                 .body(()).unwrap(); | ||||||
|  |  | ||||||
|  |             // Send request | ||||||
|  |             let mut s1 = h2.request(request, false).unwrap(); | ||||||
|  |  | ||||||
|  |             // This effectively reserves the entire connection window | ||||||
|  |             s1.reserve_capacity(window_size); | ||||||
|  |  | ||||||
|  |             // The capacity should be immediately available as nothing else is | ||||||
|  |             // happening on the stream. | ||||||
|  |             assert_eq!(s1.capacity(), window_size); | ||||||
|  |  | ||||||
|  |             let request = Request::builder() | ||||||
|  |                 .method(Method::POST) | ||||||
|  |                 .uri("https://http2.akamai.com/") | ||||||
|  |                 .body(()).unwrap(); | ||||||
|  |  | ||||||
|  |             // Create a second stream | ||||||
|  |             let mut s2 = h2.request(request, false).unwrap(); | ||||||
|  |  | ||||||
|  |             // Request capacity | ||||||
|  |             s2.reserve_capacity(5); | ||||||
|  |  | ||||||
|  |             // There should be no available capacity (as it is being held up by | ||||||
|  |             // the previous stream | ||||||
|  |             assert_eq!(s2.capacity(), 0); | ||||||
|  |  | ||||||
|  |             // Closing the previous stream by sending an empty data frame will | ||||||
|  |             // release the capacity to s2 | ||||||
|  |             s1.send_data("".into(), true).unwrap(); | ||||||
|  |  | ||||||
|  |             // The capacity should be available | ||||||
|  |             assert_eq!(s2.capacity(), 5); | ||||||
|  |  | ||||||
|  |             // Send the frame | ||||||
|  |             s2.send_data("hello".into(), true).unwrap(); | ||||||
|  |  | ||||||
|  |             // Wait for the connection to close | ||||||
|  |             h2.unwrap() | ||||||
|  |         }) | ||||||
|  |         ; | ||||||
|  |  | ||||||
|  |     let mock = mock.assert_client_handshake().unwrap() | ||||||
|  |         // Get the first frame | ||||||
|  |         .and_then(|(_, mock)| mock.into_future().unwrap()) | ||||||
|  |         .and_then(|(frame, mock)| { | ||||||
|  |             let request = assert_headers!(frame.unwrap()); | ||||||
|  |  | ||||||
|  |             assert_eq!(request.stream_id(), 1); | ||||||
|  |             assert!(!request.is_end_stream()); | ||||||
|  |  | ||||||
|  |             mock.into_future().unwrap() | ||||||
|  |         }) | ||||||
|  |         .and_then(|(frame, mock)| { | ||||||
|  |             let request = assert_headers!(frame.unwrap()); | ||||||
|  |  | ||||||
|  |             assert_eq!(request.stream_id(), 3); | ||||||
|  |             assert!(!request.is_end_stream()); | ||||||
|  |  | ||||||
|  |             mock.into_future().unwrap() | ||||||
|  |         }) | ||||||
|  |         .and_then(|(frame, mock)| { | ||||||
|  |             let data = assert_data!(frame.unwrap()); | ||||||
|  |  | ||||||
|  |             assert_eq!(data.stream_id(), 1); | ||||||
|  |             assert_eq!(data.payload().len(), 0); | ||||||
|  |             assert!(data.is_end_stream()); | ||||||
|  |  | ||||||
|  |             mock.into_future().unwrap() | ||||||
|  |         }) | ||||||
|  |         .and_then(|(frame, _)| { | ||||||
|  |             let data = assert_data!(frame.unwrap()); | ||||||
|  |  | ||||||
|  |             assert_eq!(data.stream_id(), 3); | ||||||
|  |             assert_eq!(data.payload(), "hello"); | ||||||
|  |             assert!(data.is_end_stream()); | ||||||
|  |  | ||||||
|  |             Ok(()) | ||||||
|  |         }) | ||||||
|  |         ; | ||||||
|  |  | ||||||
|  |     let _ = h2.join(mock) | ||||||
|  |         .wait().unwrap(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn stream_close_by_trailers_frame_releases_capacity() { | ||||||
|  |     let _ = ::env_logger::init(); | ||||||
|  |     let (m, mock) = mock::new(); | ||||||
|  |  | ||||||
|  |     let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE as usize; | ||||||
|  |  | ||||||
|  |     let h2 = Client::handshake(m).unwrap() | ||||||
|  |         .and_then(|mut h2| { | ||||||
|  |             let request = Request::builder() | ||||||
|  |                 .method(Method::POST) | ||||||
|  |                 .uri("https://http2.akamai.com/") | ||||||
|  |                 .body(()).unwrap(); | ||||||
|  |  | ||||||
|  |             // Send request | ||||||
|  |             let mut s1 = h2.request(request, false).unwrap(); | ||||||
|  |  | ||||||
|  |             // This effectively reserves the entire connection window | ||||||
|  |             s1.reserve_capacity(window_size); | ||||||
|  |  | ||||||
|  |             // The capacity should be immediately available as nothing else is | ||||||
|  |             // happening on the stream. | ||||||
|  |             assert_eq!(s1.capacity(), window_size); | ||||||
|  |  | ||||||
|  |             let request = Request::builder() | ||||||
|  |                 .method(Method::POST) | ||||||
|  |                 .uri("https://http2.akamai.com/") | ||||||
|  |                 .body(()).unwrap(); | ||||||
|  |  | ||||||
|  |             // Create a second stream | ||||||
|  |             let mut s2 = h2.request(request, false).unwrap(); | ||||||
|  |  | ||||||
|  |             // Request capacity | ||||||
|  |             s2.reserve_capacity(5); | ||||||
|  |  | ||||||
|  |             // There should be no available capacity (as it is being held up by | ||||||
|  |             // the previous stream | ||||||
|  |             assert_eq!(s2.capacity(), 0); | ||||||
|  |  | ||||||
|  |             // Closing the previous stream by sending a trailers frame will | ||||||
|  |             // release the capacity to s2 | ||||||
|  |             s1.send_trailers(Default::default()).unwrap(); | ||||||
|  |  | ||||||
|  |             // The capacity should be available | ||||||
|  |             assert_eq!(s2.capacity(), 5); | ||||||
|  |  | ||||||
|  |             // Send the frame | ||||||
|  |             s2.send_data("hello".into(), true).unwrap(); | ||||||
|  |  | ||||||
|  |             // Wait for the connection to close | ||||||
|  |             h2.unwrap() | ||||||
|  |         }) | ||||||
|  |         ; | ||||||
|  |  | ||||||
|  |     let mock = mock.assert_client_handshake().unwrap() | ||||||
|  |         // Get the first frame | ||||||
|  |         .and_then(|(_, mock)| mock.into_future().unwrap()) | ||||||
|  |         .and_then(|(frame, mock)| { | ||||||
|  |             let request = assert_headers!(frame.unwrap()); | ||||||
|  |  | ||||||
|  |             assert_eq!(request.stream_id(), 1); | ||||||
|  |             assert!(!request.is_end_stream()); | ||||||
|  |  | ||||||
|  |             mock.into_future().unwrap() | ||||||
|  |         }) | ||||||
|  |         .and_then(|(frame, mock)| { | ||||||
|  |             let request = assert_headers!(frame.unwrap()); | ||||||
|  |  | ||||||
|  |             assert_eq!(request.stream_id(), 3); | ||||||
|  |             assert!(!request.is_end_stream()); | ||||||
|  |  | ||||||
|  |             mock.into_future().unwrap() | ||||||
|  |         }) | ||||||
|  |         .and_then(|(frame, mock)| { | ||||||
|  |             let trailers = assert_headers!(frame.unwrap()); | ||||||
|  |  | ||||||
|  |             assert_eq!(trailers.stream_id(), 1); | ||||||
|  |             assert!(trailers.is_end_stream()); | ||||||
|  |  | ||||||
|  |             mock.into_future().unwrap() | ||||||
|  |         }) | ||||||
|  |         .and_then(|(frame, _)| { | ||||||
|  |             let data = assert_data!(frame.unwrap()); | ||||||
|  |  | ||||||
|  |             assert_eq!(data.stream_id(), 3); | ||||||
|  |             assert_eq!(data.payload(), "hello"); | ||||||
|  |             assert!(data.is_end_stream()); | ||||||
|  |  | ||||||
|  |             Ok(()) | ||||||
|  |         }) | ||||||
|  |         ; | ||||||
|  |  | ||||||
|  |     let _ = h2.join(mock) | ||||||
|  |         .wait().unwrap(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | #[ignore] | ||||||
|  | fn stream_close_by_send_reset_frame_releases_capacity() { | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | #[ignore] | ||||||
|  | fn stream_close_by_recv_reset_frame_releases_capacity() { | ||||||
|  | } | ||||||
|   | |||||||
| @@ -238,11 +238,10 @@ fn send_data_receive_window_update() { | |||||||
|         }); |         }); | ||||||
|  |  | ||||||
|     let mock = mock.assert_client_handshake().unwrap() |     let mock = mock.assert_client_handshake().unwrap() | ||||||
|         .and_then(|(_, mock)| { |         .and_then(|(_, mock)| mock.into_future().unwrap()) | ||||||
|             mock.into_future().unwrap() |  | ||||||
|         }) |  | ||||||
|         .and_then(|(frame, mock)| { |         .and_then(|(frame, mock)| { | ||||||
|             let _ = assert_headers!(frame.unwrap()); |             let request = assert_headers!(frame.unwrap()); | ||||||
|  |             assert!(!request.is_end_stream()); | ||||||
|             mock.into_future().unwrap() |             mock.into_future().unwrap() | ||||||
|         }) |         }) | ||||||
|         .and_then(|(frame, mut mock)| { |         .and_then(|(frame, mut mock)| { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user