From 6e63d7bae2a753d5a950eeeb64dc1bd2fc2c50ee Mon Sep 17 00:00:00 2001 From: johnklai1 Date: Tue, 22 May 2018 15:42:41 -0700 Subject: [PATCH] Wakeup waiting tasks when transitioning a stream from pending_open (#277) --- src/proto/streams/prioritize.rs | 1 + tests/h2-tests/tests/client_request.rs | 84 ++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 2c01ce4..d4109b3 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -801,6 +801,7 @@ impl Prioritize { counts.inc_num_send_streams(&mut stream); self.pending_send.push(&mut stream); + stream.notify_send(); } else { return; } diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index e3bca1f..48b9764 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -842,6 +842,90 @@ fn request_options_with_star() { client.join(srv).wait().unwrap(); } +#[test] +fn notify_on_send_capacity() { + // This test ensures that the client gets notified when there is additional + // send capacity. In other words, when the server is ready to accept a new + // stream, the client is notified. + use std::sync::mpsc; + + let _ = ::env_logger::try_init(); + let (io, srv) = mock::new(); + let (done_tx, done_rx) = mpsc::channel(); + let (tx, rx) = mpsc::channel(); + + let mut settings = frame::Settings::default(); + settings.set_max_concurrent_streams(Some(1)); + + let srv = srv + .assert_client_handshake_with_settings(settings) + .unwrap() + // This is the ACK + .recv_settings() + .map(move |h| { + tx.send(()).unwrap(); + h + }) + .recv_frame( + frames::headers(1) + .request("GET", "https://www.example.com/") + .eos(), + ) + .send_frame(frames::headers(1).response(200).eos()) + .recv_frame( + frames::headers(3) + .request("GET", "https://www.example.com/") + .eos(), + ) + .send_frame(frames::headers(3).response(200).eos()) + .recv_frame( + frames::headers(5) + .request("GET", "https://www.example.com/") + .eos(), + ) + .send_frame(frames::headers(5).response(200).eos()) + .close() + ; + + let client = client::handshake(io) + .expect("handshake") + .and_then(move |(mut client, conn)| { + ::std::thread::spawn(move || { + rx.recv().unwrap(); + + let mut responses = vec![]; + + for _ in 0..3 { + // Wait for capacity. If the client is **not** notified, + // this hangs. + poll_fn(|| client.poll_ready()).wait().unwrap(); + + let request = Request::builder() + .uri("https://www.example.com/") + .body(()) + .unwrap(); + + let response = client.send_request(request, true) + .unwrap().0; + + responses.push(response); + } + + for response in responses { + let response = response.wait().unwrap(); + assert_eq!(response.status(), StatusCode::OK); + } + + done_tx.send(()).unwrap(); + }); + + conn.expect("h2") + }); + + client.join(srv).wait().unwrap(); + done_rx.recv().unwrap(); +} + const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];