Prevent pending_open streams from being released. (#295)
* Prevent `pending_open` streams from being released. This fixes a panic that would otherwise occur in some cases. A test demonstrating said panic is included. * Clear the pending_open queue together with everything else.
This commit is contained in:
committed by
Sean McArthur
parent
f3806d5144
commit
fdfb873438
@@ -622,6 +622,13 @@ impl Prioritize {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
|
||||
while let Some(stream) = self.pending_open.pop(store) {
|
||||
let is_pending_reset = stream.is_pending_reset_expiration();
|
||||
counts.transition_after(stream, is_pending_reset);
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_frame<B>(
|
||||
&mut self,
|
||||
buffer: &mut Buffer<Frame<B>>,
|
||||
|
||||
@@ -417,6 +417,7 @@ impl Send {
|
||||
pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) {
|
||||
self.prioritize.clear_pending_capacity(store, counts);
|
||||
self.prioritize.clear_pending_send(store, counts);
|
||||
self.prioritize.clear_pending_open(store, counts);
|
||||
}
|
||||
|
||||
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
|
||||
|
||||
@@ -229,7 +229,7 @@ impl Stream {
|
||||
// The stream is not in any queue
|
||||
!self.is_pending_send && !self.is_pending_send_capacity &&
|
||||
!self.is_pending_accept && !self.is_pending_window_update &&
|
||||
!self.reset_at.is_some()
|
||||
!self.is_pending_open && !self.reset_at.is_some()
|
||||
}
|
||||
|
||||
/// Returns true when the consumer of the stream has dropped all handles
|
||||
|
||||
@@ -971,6 +971,96 @@ fn send_stream_poll_reset() {
|
||||
client.join(srv).wait().expect("wait");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_pending_open() {
|
||||
// This test checks that a stream queued for pending open behaves correctly when its
|
||||
// client drops.
|
||||
let _ = ::env_logger::try_init();
|
||||
|
||||
let (io, srv) = mock::new();
|
||||
let (init_tx, init_rx) = futures::sync::oneshot::channel();
|
||||
let (trigger_go_away_tx, trigger_go_away_rx) = futures::sync::oneshot::channel();
|
||||
let (sent_go_away_tx, sent_go_away_rx) = futures::sync::oneshot::channel();
|
||||
let (drop_tx, drop_rx) = futures::sync::oneshot::channel();
|
||||
|
||||
let mut settings = frame::Settings::default();
|
||||
settings.set_max_concurrent_streams(Some(2));
|
||||
|
||||
let srv = srv
|
||||
.assert_client_handshake_with_settings(settings)
|
||||
.unwrap()
|
||||
// This is the ACK
|
||||
.recv_settings()
|
||||
.map(move |h| {
|
||||
init_tx.send(()).unwrap();
|
||||
h
|
||||
})
|
||||
.recv_frame(
|
||||
frames::headers(1)
|
||||
.request("GET", "https://www.example.com/"),
|
||||
)
|
||||
.recv_frame(
|
||||
frames::headers(3)
|
||||
.request("GET", "https://www.example.com/")
|
||||
.eos(),
|
||||
)
|
||||
.wait_for(trigger_go_away_rx)
|
||||
.send_frame(frames::go_away(3))
|
||||
.map(move |h| {
|
||||
sent_go_away_tx.send(()).unwrap();
|
||||
h
|
||||
})
|
||||
.wait_for(drop_rx)
|
||||
.send_frame(frames::headers(3).response(200).eos())
|
||||
.recv_frame(
|
||||
frames::data(1, vec![]).eos(),
|
||||
)
|
||||
.send_frame(frames::headers(1).response(200).eos())
|
||||
.close()
|
||||
;
|
||||
|
||||
fn request() -> Request<()> {
|
||||
Request::builder()
|
||||
.uri("https://www.example.com/")
|
||||
.body(())
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
let client = client::Builder::new()
|
||||
.max_concurrent_reset_streams(0)
|
||||
.handshake::<_, Bytes>(io)
|
||||
.expect("handshake")
|
||||
.and_then(move |(mut client, conn)| {
|
||||
conn.expect("h2").join(init_rx.expect("init_rx").and_then(move |()| {
|
||||
// Fill up the concurrent stream limit.
|
||||
assert!(client.poll_ready().unwrap().is_ready());
|
||||
let mut response1 = client.send_request(request(), false).unwrap();
|
||||
assert!(client.poll_ready().unwrap().is_ready());
|
||||
let response2 = client.send_request(request(), true).unwrap();
|
||||
assert!(client.poll_ready().unwrap().is_ready());
|
||||
let response3 = client.send_request(request(), true).unwrap();
|
||||
|
||||
// Trigger a GOAWAY frame to invalidate our third request.
|
||||
trigger_go_away_tx.send(()).unwrap();
|
||||
sent_go_away_rx.expect("sent_go_away_rx").and_then(move |_| {
|
||||
// Now drop all the references to that stream.
|
||||
drop(response3);
|
||||
drop(client);
|
||||
drop_tx.send(()).unwrap();
|
||||
|
||||
// Complete the second request, freeing up a stream.
|
||||
response2.0.expect("resp2")
|
||||
}).and_then(move |_| {
|
||||
response1.1.send_data(Default::default(), true).unwrap();
|
||||
response1.0.expect("resp1")
|
||||
})
|
||||
}))
|
||||
});
|
||||
|
||||
|
||||
client.join(srv).wait().unwrap();
|
||||
}
|
||||
|
||||
const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
|
||||
const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
|
||||
|
||||
|
||||
Reference in New Issue
Block a user