diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 06eb02c..ae71b69 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -159,12 +159,17 @@ impl Send { return; } - self.recv_err(buffer, stream, counts); + // Clear all pending outbound frames. + // Note that we don't call `self.recv_err` because we want to enqueue + // the reset frame before transitioning the stream inside + // `reclaim_all_capacity`. + self.prioritize.clear_queue(buffer, stream); let frame = frame::Reset::new(stream.id, reason); trace!("send_reset -- queueing; frame={:?}", frame); self.prioritize.queue_frame(frame.into(), buffer, stream, task); + self.prioritize.reclaim_all_capacity(stream, counts); } pub fn schedule_implicit_reset( diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 96060c2..958fd6a 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -1061,6 +1061,79 @@ fn drop_pending_open() { client.join(srv).wait().unwrap(); } +#[test] +fn malformed_response_headers_dont_unlink_stream() { + // This test checks that receiving malformed headers frame on a stream with + // no remaining references correctly resets the stream, without prematurely + // unlinking it. + let _ = ::env_logger::try_init(); + + let (io, srv) = mock::new(); + let (drop_tx, drop_rx) = futures::sync::oneshot::channel(); + let (queued_tx, queued_rx) = futures::sync::oneshot::channel(); + + let srv = srv + .assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame(frames::headers(1).request("GET", "http://example.com/")) + .recv_frame(frames::headers(3).request("GET", "http://example.com/")) + .recv_frame(frames::headers(5).request("GET", "http://example.com/")) + .map(move |h| { + drop_tx.send(()).unwrap(); + h + }) + .wait_for(queued_rx) + .send_bytes(&[ + // 2 byte frame + 0, 0, 2, + // type: HEADERS + 1, + // flags: END_STREAM | END_HEADERS + 5, + // stream identifier: 3 + 0, 0, 0, 3, + // data - invalid (pseudo not at end of block) + 144, 135 + // Per the spec, this frame should cause a stream error of type + // PROTOCOL_ERROR. + ]) + .close() + ; + + fn request() -> Request<()> { + Request::builder() + .uri("http://example.com/") + .body(()) + .unwrap() + } + + let client = client::Builder::new() + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(move |(mut client, conn)| { + let (_req1, mut send1) = client.send_request( + request(), false).unwrap(); + // Use up most of the connection window. + send1.send_data(vec![0; 65534].into(), true).unwrap(); + let (req2, mut send2) = client.send_request( + request(), false).unwrap(); + let (req3, mut send3) = client.send_request( + request(), false).unwrap(); + conn.expect("h2").join(drop_rx.then(move |_| { + // Use up the remainder of the connection window. + send2.send_data(vec![0; 2].into(), true).unwrap(); + // Queue up for more connection window. + send3.send_data(vec![0; 1].into(), true).unwrap(); + queued_tx.send(()).unwrap(); + Ok((req2, req3)) + })) + }); + + + client.join(srv).wait().unwrap(); +} + const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];