Wakeup waiting tasks when transitioning a stream from pending_open (#277)
This commit is contained in:
@@ -801,6 +801,7 @@ impl Prioritize {
|
|||||||
|
|
||||||
counts.inc_num_send_streams(&mut stream);
|
counts.inc_num_send_streams(&mut stream);
|
||||||
self.pending_send.push(&mut stream);
|
self.pending_send.push(&mut stream);
|
||||||
|
stream.notify_send();
|
||||||
} else {
|
} else {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -842,6 +842,90 @@ fn request_options_with_star() {
|
|||||||
client.join(srv).wait().unwrap();
|
client.join(srv).wait().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn notify_on_send_capacity() {
|
||||||
|
// This test ensures that the client gets notified when there is additional
|
||||||
|
// send capacity. In other words, when the server is ready to accept a new
|
||||||
|
// stream, the client is notified.
|
||||||
|
use std::sync::mpsc;
|
||||||
|
|
||||||
|
let _ = ::env_logger::try_init();
|
||||||
|
let (io, srv) = mock::new();
|
||||||
|
let (done_tx, done_rx) = mpsc::channel();
|
||||||
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
|
let mut settings = frame::Settings::default();
|
||||||
|
settings.set_max_concurrent_streams(Some(1));
|
||||||
|
|
||||||
|
let srv = srv
|
||||||
|
.assert_client_handshake_with_settings(settings)
|
||||||
|
.unwrap()
|
||||||
|
// This is the ACK
|
||||||
|
.recv_settings()
|
||||||
|
.map(move |h| {
|
||||||
|
tx.send(()).unwrap();
|
||||||
|
h
|
||||||
|
})
|
||||||
|
.recv_frame(
|
||||||
|
frames::headers(1)
|
||||||
|
.request("GET", "https://www.example.com/")
|
||||||
|
.eos(),
|
||||||
|
)
|
||||||
|
.send_frame(frames::headers(1).response(200).eos())
|
||||||
|
.recv_frame(
|
||||||
|
frames::headers(3)
|
||||||
|
.request("GET", "https://www.example.com/")
|
||||||
|
.eos(),
|
||||||
|
)
|
||||||
|
.send_frame(frames::headers(3).response(200).eos())
|
||||||
|
.recv_frame(
|
||||||
|
frames::headers(5)
|
||||||
|
.request("GET", "https://www.example.com/")
|
||||||
|
.eos(),
|
||||||
|
)
|
||||||
|
.send_frame(frames::headers(5).response(200).eos())
|
||||||
|
.close()
|
||||||
|
;
|
||||||
|
|
||||||
|
let client = client::handshake(io)
|
||||||
|
.expect("handshake")
|
||||||
|
.and_then(move |(mut client, conn)| {
|
||||||
|
::std::thread::spawn(move || {
|
||||||
|
rx.recv().unwrap();
|
||||||
|
|
||||||
|
let mut responses = vec![];
|
||||||
|
|
||||||
|
for _ in 0..3 {
|
||||||
|
// Wait for capacity. If the client is **not** notified,
|
||||||
|
// this hangs.
|
||||||
|
poll_fn(|| client.poll_ready()).wait().unwrap();
|
||||||
|
|
||||||
|
let request = Request::builder()
|
||||||
|
.uri("https://www.example.com/")
|
||||||
|
.body(())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let response = client.send_request(request, true)
|
||||||
|
.unwrap().0;
|
||||||
|
|
||||||
|
responses.push(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
for response in responses {
|
||||||
|
let response = response.wait().unwrap();
|
||||||
|
assert_eq!(response.status(), StatusCode::OK);
|
||||||
|
}
|
||||||
|
|
||||||
|
done_tx.send(()).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
conn.expect("h2")
|
||||||
|
});
|
||||||
|
|
||||||
|
client.join(srv).wait().unwrap();
|
||||||
|
done_rx.recv().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
|
const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
|
||||||
const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
|
const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user