Fix some flow control bugs. (#152)
* Release stream capacity back to the connection to avoid capacity leaks. * Actually notify waiting tasks when capacity becomes available.
This commit is contained in:
		| @@ -248,6 +248,8 @@ where | |||||||
|     where |     where | ||||||
|         R: Resolve<B, P>, |         R: Resolve<B, P>, | ||||||
|     { |     { | ||||||
|  |         trace!("assign_connection_capacity; inc={}", inc); | ||||||
|  |  | ||||||
|         self.flow.assign_capacity(inc); |         self.flow.assign_capacity(inc); | ||||||
|  |  | ||||||
|         // Assign newly acquired capacity to streams pending capacity. |         // Assign newly acquired capacity to streams pending capacity. | ||||||
| @@ -315,6 +317,8 @@ where | |||||||
|             // TODO: Should prioritization factor into this? |             // TODO: Should prioritization factor into this? | ||||||
|             let assign = cmp::min(conn_available, additional); |             let assign = cmp::min(conn_available, additional); | ||||||
|  |  | ||||||
|  |             trace!("  assigning; num={}", assign); | ||||||
|  |  | ||||||
|             // Assign the capacity to the stream |             // Assign the capacity to the stream | ||||||
|             stream.assign_capacity(assign); |             stream.assign_capacity(assign); | ||||||
|  |  | ||||||
|   | |||||||
| @@ -214,6 +214,7 @@ where | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         if !stream.send_capacity_inc { |         if !stream.send_capacity_inc { | ||||||
|  |             stream.wait_send(); | ||||||
|             return Ok(Async::NotReady); |             return Ok(Async::NotReady); | ||||||
|         } |         } | ||||||
|  |  | ||||||
| @@ -304,13 +305,16 @@ where | |||||||
|  |  | ||||||
|                 trace!("decrementing all windows; dec={}", dec); |                 trace!("decrementing all windows; dec={}", dec); | ||||||
|  |  | ||||||
|  |                 let mut total_reclaimed = 0; | ||||||
|                 store.for_each(|mut stream| { |                 store.for_each(|mut stream| { | ||||||
|                     let stream = &mut *stream; |                     let stream = &mut *stream; | ||||||
|  |  | ||||||
|                     stream.send_flow.dec_window(dec); |                     stream.send_flow.dec_window(dec); | ||||||
|  |  | ||||||
|                     let available = stream.send_flow.available().as_size(); |                     let available = stream.send_flow.available().as_size(); | ||||||
|                     stream.send_flow.claim_capacity(cmp::min(dec, available)); |                     let reclaim = cmp::min(dec, available); | ||||||
|  |                     stream.send_flow.claim_capacity(reclaim); | ||||||
|  |                     total_reclaimed += reclaim; | ||||||
|  |  | ||||||
|                     trace!( |                     trace!( | ||||||
|                         "decremented stream window; id={:?}; decr={}; flow={:?}", |                         "decremented stream window; id={:?}; decr={}; flow={:?}", | ||||||
| @@ -319,15 +323,15 @@ where | |||||||
|                         stream.send_flow |                         stream.send_flow | ||||||
|                     ); |                     ); | ||||||
|  |  | ||||||
|                     // TODO: Probably try to assign capacity? |                     // TODO: Should this notify the producer when the capacity | ||||||
|  |                     // of a stream is reduced? Maybe it should if the capacity | ||||||
|                     // TODO: Handle reclaiming connection level window |                     // is reduced to zero, allowing the producer to stop work. | ||||||
|                     // capacity. |  | ||||||
|  |  | ||||||
|                     // TODO: Should this notify the producer? |  | ||||||
|  |  | ||||||
|                     Ok::<_, RecvError>(()) |                     Ok::<_, RecvError>(()) | ||||||
|                 })?; |                 })?; | ||||||
|  |  | ||||||
|  |                 self.prioritize | ||||||
|  |                     .assign_connection_capacity(total_reclaimed, store); | ||||||
|             } else if val > old_val { |             } else if val > old_val { | ||||||
|                 let inc = val - old_val; |                 let inc = val - old_val; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -229,8 +229,12 @@ where | |||||||
|         self.send_capacity_inc = true; |         self.send_capacity_inc = true; | ||||||
|         self.send_flow.assign_capacity(capacity); |         self.send_flow.assign_capacity(capacity); | ||||||
|  |  | ||||||
|  |         trace!("  assigned capacity to stream; available={}; buffered={}; id={:?}", | ||||||
|  |                self.send_flow.available(), self.buffered_send_data, self.id); | ||||||
|  |  | ||||||
|         // Only notify if the capacity exceeds the amount of buffered data |         // Only notify if the capacity exceeds the amount of buffered data | ||||||
|         if self.send_flow.available() > self.buffered_send_data { |         if self.send_flow.available() > self.buffered_send_data { | ||||||
|  |             trace!("  notifying task"); | ||||||
|             self.notify_send(); |             self.notify_send(); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| @@ -263,6 +267,10 @@ where | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn wait_send(&mut self) { | ||||||
|  |         self.send_task = Some(task::current()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn notify_recv(&mut self) { |     pub fn notify_recv(&mut self) { | ||||||
|         if let Some(task) = self.recv_task.take() { |         if let Some(task) = self.recv_task.take() { | ||||||
|             task.notify(); |             task.notify(); | ||||||
|   | |||||||
| @@ -801,6 +801,170 @@ fn recv_settings_removes_available_capacity() { | |||||||
|         .wait().unwrap(); |         .wait().unwrap(); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn recv_no_init_window_then_receive_some_init_window() { | ||||||
|  |     let _ = ::env_logger::init(); | ||||||
|  |     let (io, srv) = mock::new(); | ||||||
|  |  | ||||||
|  |     let mut settings = frame::Settings::default(); | ||||||
|  |     settings.set_initial_window_size(Some(0)); | ||||||
|  |  | ||||||
|  |     let srv = srv.assert_client_handshake_with_settings(settings).unwrap() | ||||||
|  |         .recv_settings() | ||||||
|  |         .recv_frame( | ||||||
|  |             frames::headers(1) | ||||||
|  |                 .request("POST", "https://http2.akamai.com/") | ||||||
|  |         ) | ||||||
|  |         .idle_ms(100) | ||||||
|  |         .send_frame(frames::settings().initial_window_size(10)) | ||||||
|  |         .recv_frame(frames::settings_ack()) | ||||||
|  |         .recv_frame(frames::data(1, "hello worl")) | ||||||
|  |         .idle_ms(100) | ||||||
|  |         .send_frame(frames::settings().initial_window_size(11)) | ||||||
|  |         .recv_frame(frames::settings_ack()) | ||||||
|  |         .recv_frame(frames::data(1, "d").eos()) | ||||||
|  |         .send_frame( | ||||||
|  |             frames::headers(1) | ||||||
|  |                 .response(204) | ||||||
|  |                 .eos() | ||||||
|  |         ) | ||||||
|  |         .close(); | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     let h2 = Client::handshake(io).unwrap() | ||||||
|  |         .and_then(|(mut client, h2)| { | ||||||
|  |             let request = Request::builder() | ||||||
|  |                 .method(Method::POST) | ||||||
|  |                 .uri("https://http2.akamai.com/") | ||||||
|  |                 .body(()).unwrap(); | ||||||
|  |  | ||||||
|  |             let mut stream = client.send_request(request, false).unwrap(); | ||||||
|  |  | ||||||
|  |             stream.reserve_capacity(11); | ||||||
|  |  | ||||||
|  |             h2.drive(util::wait_for_capacity(stream, 11)) | ||||||
|  |         }) | ||||||
|  |         .and_then(|(h2, mut stream)| { | ||||||
|  |             assert_eq!(stream.capacity(), 11); | ||||||
|  |  | ||||||
|  |             stream.send_data("hello world".into(), true).unwrap(); | ||||||
|  |  | ||||||
|  |             h2.drive(GetResponse { stream: Some(stream) }) | ||||||
|  |         }) | ||||||
|  |         .and_then(|(h2, (response, _))| { | ||||||
|  |             assert_eq!(response.status(), StatusCode::NO_CONTENT); | ||||||
|  |  | ||||||
|  |             // Wait for the connection to close | ||||||
|  |             h2.unwrap() | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     let _ = h2.join(srv) | ||||||
|  |         .wait().unwrap(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn settings_lowered_capacity_returns_capacity_to_connection() { | ||||||
|  |     use std::thread; | ||||||
|  |  | ||||||
|  |     let _ = ::env_logger::init(); | ||||||
|  |     let (io, srv) = mock::new(); | ||||||
|  |  | ||||||
|  |     let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE as usize; | ||||||
|  |  | ||||||
|  |     // Spawn the server on a thread | ||||||
|  |     let th1 = thread::spawn(move || { | ||||||
|  |         srv.assert_client_handshake().unwrap() | ||||||
|  |             .recv_settings() | ||||||
|  |             .recv_frame( | ||||||
|  |                 frames::headers(1) | ||||||
|  |                     .request("POST", "https://example.com/one") | ||||||
|  |             ) | ||||||
|  |             .recv_frame( | ||||||
|  |                 frames::headers(3) | ||||||
|  |                     .request("POST", "https://example.com/two") | ||||||
|  |             ) | ||||||
|  |             .idle_ms(200) | ||||||
|  |             // Remove all capacity from streams | ||||||
|  |             .send_frame(frames::settings().initial_window_size(0)) | ||||||
|  |             .recv_frame(frames::settings_ack()) | ||||||
|  |  | ||||||
|  |             // Let stream 3 make progress | ||||||
|  |             .send_frame(frames::window_update(3, 11)) | ||||||
|  |             .recv_frame(frames::data(3, "hello world").eos()) | ||||||
|  |  | ||||||
|  |             // Wait a bit | ||||||
|  |             // | ||||||
|  |             // TODO: Receive signal from main thread | ||||||
|  |             .idle_ms(200) | ||||||
|  |  | ||||||
|  |             // Reset initial window size | ||||||
|  |             .send_frame(frames::settings().initial_window_size(window_size as u32)) | ||||||
|  |             .recv_frame(frames::settings_ack()) | ||||||
|  |  | ||||||
|  |             // Get data from first stream | ||||||
|  |             .recv_frame(frames::data(1, "hello world").eos()) | ||||||
|  |  | ||||||
|  |             // Send responses | ||||||
|  |             .send_frame( | ||||||
|  |                 frames::headers(1) | ||||||
|  |                     .response(204) | ||||||
|  |                     .eos() | ||||||
|  |             ) | ||||||
|  |             .send_frame( | ||||||
|  |                 frames::headers(3) | ||||||
|  |                     .response(204) | ||||||
|  |                     .eos() | ||||||
|  |             ) | ||||||
|  |             .close() | ||||||
|  |             .wait().unwrap(); | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     let (mut client, h2) = Client::handshake(io).unwrap() | ||||||
|  |         .wait().unwrap(); | ||||||
|  |  | ||||||
|  |     // Drive client connection | ||||||
|  |     let th2 = thread::spawn(move || { | ||||||
|  |         h2.wait().unwrap(); | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     let request = Request::post("https://example.com/one") | ||||||
|  |         .body(()).unwrap(); | ||||||
|  |  | ||||||
|  |     let mut stream1 = client.send_request(request, false).unwrap(); | ||||||
|  |  | ||||||
|  |     let request = Request::post("https://example.com/two") | ||||||
|  |         .body(()).unwrap(); | ||||||
|  |  | ||||||
|  |     let mut stream2 = client.send_request(request, false).unwrap(); | ||||||
|  |  | ||||||
|  |     // Reserve capacity for stream one, this will consume all connection level | ||||||
|  |     // capacity | ||||||
|  |     stream1.reserve_capacity(window_size); | ||||||
|  |     let stream1 = util::wait_for_capacity(stream1, window_size).wait().unwrap(); | ||||||
|  |  | ||||||
|  |     // Now, wait for capacity on the other stream | ||||||
|  |     stream2.reserve_capacity(11); | ||||||
|  |     let mut stream2 = util::wait_for_capacity(stream2, 11).wait().unwrap(); | ||||||
|  |  | ||||||
|  |     // Send data on stream 2 | ||||||
|  |     stream2.send_data("hello world".into(), true).unwrap(); | ||||||
|  |  | ||||||
|  |     // Wait for capacity on stream 1 | ||||||
|  |     let mut stream1 = util::wait_for_capacity(stream1, 11).wait().unwrap(); | ||||||
|  |  | ||||||
|  |     stream1.send_data("hello world".into(), true).unwrap(); | ||||||
|  |  | ||||||
|  |     // Wait for responses.. | ||||||
|  |     let resp = stream1.wait().unwrap(); | ||||||
|  |     assert_eq!(resp.status(), StatusCode::NO_CONTENT); | ||||||
|  |  | ||||||
|  |     let resp = stream2.wait().unwrap(); | ||||||
|  |     assert_eq!(resp.status(), StatusCode::NO_CONTENT); | ||||||
|  |  | ||||||
|  |     th1.join().unwrap(); | ||||||
|  |     th2.join().unwrap(); | ||||||
|  | } | ||||||
|  |  | ||||||
| #[test] | #[test] | ||||||
| fn client_increase_target_window_size() { | fn client_increase_target_window_size() { | ||||||
|     let _ = ::env_logger::init(); |     let _ = ::env_logger::init(); | ||||||
|   | |||||||
| @@ -68,6 +68,10 @@ pub fn settings() -> Mock<frame::Settings> { | |||||||
|     Mock(frame::Settings::default()) |     Mock(frame::Settings::default()) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | pub fn settings_ack() -> Mock<frame::Settings> { | ||||||
|  |     Mock(frame::Settings::ack()) | ||||||
|  | } | ||||||
|  |  | ||||||
| pub fn ping(payload: [u8; 8]) -> Mock<frame::Ping> { | pub fn ping(payload: [u8; 8]) -> Mock<frame::Ping> { | ||||||
|     Mock(frame::Ping::new(payload)) |     Mock(frame::Ping::new(payload)) | ||||||
| } | } | ||||||
| @@ -269,6 +273,11 @@ impl Mock<frame::Settings> { | |||||||
|         self.0.set_max_concurrent_streams(Some(max)); |         self.0.set_max_concurrent_streams(Some(max)); | ||||||
|         self |         self | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn initial_window_size(mut self, val: u32) -> Self { | ||||||
|  |         self.0.set_initial_window_size(Some(val)); | ||||||
|  |         self | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl From<Mock<frame::Settings>> for frame::Settings { | impl From<Mock<frame::Settings>> for frame::Settings { | ||||||
|   | |||||||
| @@ -35,8 +35,6 @@ impl Future for WaitForCapacity { | |||||||
|  |  | ||||||
|         let act = self.stream().capacity(); |         let act = self.stream().capacity(); | ||||||
|  |  | ||||||
|         println!("CAP={:?}", act); |  | ||||||
|  |  | ||||||
|         if act >= self.target { |         if act >= self.target { | ||||||
|             return Ok(self.stream.take().unwrap().into()); |             return Ok(self.stream.take().unwrap().into()); | ||||||
|         } |         } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user