diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index ae71b69..806f25d 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -325,15 +325,6 @@ impl Send { Ok(()) } - pub fn recv_reset( - &mut self, - buffer: &mut Buffer>, - stream: &mut store::Ptr - ) { - // Clear all pending outbound frames - self.prioritize.clear_queue(buffer, stream); - } - pub fn recv_err( &mut self, buffer: &mut Buffer>, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index eb1ceaa..bc33244 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -284,9 +284,9 @@ where let actions = &mut me.actions; - me.counts.transition(stream, |_, stream| { + me.counts.transition(stream, |counts, stream| { actions.recv.recv_reset(frame, stream); - actions.send.recv_reset(send_buffer, stream); + actions.send.recv_err(send_buffer, stream, counts); assert!(stream.state.is_closed()); Ok(()) }) diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index c102994..9244bd7 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1298,3 +1298,66 @@ fn reserve_capacity_after_peer_closes() { srv.join(client).wait().expect("wait"); } +#[test] +fn reset_stream_waiting_for_capacity() { + // This tests that receiving a reset on a stream that has some available + // connection-level window reassigns that window to another stream. + let _ = ::env_logger::try_init(); + + let (io, srv) = mock::new(); + + let srv = srv + .assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame(frames::headers(1).request("GET", "http://example.com/")) + .recv_frame(frames::headers(3).request("GET", "http://example.com/")) + .recv_frame(frames::headers(5).request("GET", "http://example.com/")) + .recv_frame(frames::data(1, vec![0; 16384])) + .recv_frame(frames::data(1, vec![0; 16384])) + .recv_frame(frames::data(1, vec![0; 16384])) + .recv_frame(frames::data(1, vec![0; 16383]).eos()) + .send_frame(frames::headers(1).response(200)) + // Assign enough connection window for stream 3... + .send_frame(frames::window_update(0, 1)) + // but then reset it. + .send_frame(frames::reset(3)) + // 5 should use that window instead. + .recv_frame(frames::data(5, vec![0; 1]).eos()) + .send_frame(frames::headers(5).response(200)) + .close() + ; + + fn request() -> Request<()> { + Request::builder() + .uri("http://example.com/") + .body(()) + .unwrap() + } + + let client = client::Builder::new() + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(move |(mut client, conn)| { + let (req1, mut send1) = client.send_request( + request(), false).unwrap(); + let (req2, mut send2) = client.send_request( + request(), false).unwrap(); + let (req3, mut send3) = client.send_request( + request(), false).unwrap(); + // Use up the connection window. + send1.send_data(vec![0; 65535].into(), true).unwrap(); + // Queue up for more connection window. + send2.send_data(vec![0; 1].into(), true).unwrap(); + // .. and even more. + send3.send_data(vec![0; 1].into(), true).unwrap(); + conn.expect("h2") + .join(req1.expect("req1")) + .join(req2.then(|r| Ok(r.unwrap_err()))) + .join(req3.expect("req3")) + }); + + + client.join(srv).wait().unwrap(); +} +