From fdfb873438d7a7c53b8e478ab1098fbbb3d2e691 Mon Sep 17 00:00:00 2001 From: Geoffry Song Date: Mon, 23 Jul 2018 15:41:54 -0700 Subject: [PATCH] Prevent `pending_open` streams from being released. (#295) * Prevent `pending_open` streams from being released. This fixes a panic that would otherwise occur in some cases. A test demonstrating said panic is included. * Clear the pending_open queue together with everything else. --- src/proto/streams/prioritize.rs | 7 ++ src/proto/streams/send.rs | 1 + src/proto/streams/stream.rs | 2 +- tests/h2-tests/tests/client_request.rs | 90 ++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 1 deletion(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index d4109b3..b1d59fc 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -622,6 +622,13 @@ impl Prioritize { } } + pub fn clear_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_open.pop(store) { + let is_pending_reset = stream.is_pending_reset_expiration(); + counts.transition_after(stream, is_pending_reset); + } + } + fn pop_frame( &mut self, buffer: &mut Buffer>, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 0d1642c..89c243e 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -417,6 +417,7 @@ impl Send { pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) { self.prioritize.clear_pending_capacity(store, counts); self.prioritize.clear_pending_send(store, counts); + self.prioritize.clear_pending_open(store, counts); } pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 39c69a9..b4594ef 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -229,7 +229,7 @@ impl Stream { // The stream is not in any queue !self.is_pending_send && !self.is_pending_send_capacity && !self.is_pending_accept && !self.is_pending_window_update && - !self.reset_at.is_some() + !self.is_pending_open && !self.reset_at.is_some() } /// Returns true when the consumer of the stream has dropped all handles diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 2c7d8e7..96060c2 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -971,6 +971,96 @@ fn send_stream_poll_reset() { client.join(srv).wait().expect("wait"); } +#[test] +fn drop_pending_open() { + // This test checks that a stream queued for pending open behaves correctly when its + // client drops. + let _ = ::env_logger::try_init(); + + let (io, srv) = mock::new(); + let (init_tx, init_rx) = futures::sync::oneshot::channel(); + let (trigger_go_away_tx, trigger_go_away_rx) = futures::sync::oneshot::channel(); + let (sent_go_away_tx, sent_go_away_rx) = futures::sync::oneshot::channel(); + let (drop_tx, drop_rx) = futures::sync::oneshot::channel(); + + let mut settings = frame::Settings::default(); + settings.set_max_concurrent_streams(Some(2)); + + let srv = srv + .assert_client_handshake_with_settings(settings) + .unwrap() + // This is the ACK + .recv_settings() + .map(move |h| { + init_tx.send(()).unwrap(); + h + }) + .recv_frame( + frames::headers(1) + .request("GET", "https://www.example.com/"), + ) + .recv_frame( + frames::headers(3) + .request("GET", "https://www.example.com/") + .eos(), + ) + .wait_for(trigger_go_away_rx) + .send_frame(frames::go_away(3)) + .map(move |h| { + sent_go_away_tx.send(()).unwrap(); + h + }) + .wait_for(drop_rx) + .send_frame(frames::headers(3).response(200).eos()) + .recv_frame( + frames::data(1, vec![]).eos(), + ) + .send_frame(frames::headers(1).response(200).eos()) + .close() + ; + + fn request() -> Request<()> { + Request::builder() + .uri("https://www.example.com/") + .body(()) + .unwrap() + } + + let client = client::Builder::new() + .max_concurrent_reset_streams(0) + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(move |(mut client, conn)| { + conn.expect("h2").join(init_rx.expect("init_rx").and_then(move |()| { + // Fill up the concurrent stream limit. + assert!(client.poll_ready().unwrap().is_ready()); + let mut response1 = client.send_request(request(), false).unwrap(); + assert!(client.poll_ready().unwrap().is_ready()); + let response2 = client.send_request(request(), true).unwrap(); + assert!(client.poll_ready().unwrap().is_ready()); + let response3 = client.send_request(request(), true).unwrap(); + + // Trigger a GOAWAY frame to invalidate our third request. + trigger_go_away_tx.send(()).unwrap(); + sent_go_away_rx.expect("sent_go_away_rx").and_then(move |_| { + // Now drop all the references to that stream. + drop(response3); + drop(client); + drop_tx.send(()).unwrap(); + + // Complete the second request, freeing up a stream. + response2.0.expect("resp2") + }).and_then(move |_| { + response1.1.send_data(Default::default(), true).unwrap(); + response1.0.expect("resp1") + }) + })) + }); + + + client.join(srv).wait().unwrap(); +} + const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];