diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 3282ef8..994da93 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -87,6 +87,7 @@ impl Prioritize { pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option) { // If the stream is waiting to be opened, nothing more to do. if !stream.is_pending_open { + trace!("schedule_send; {:?}", stream.id); // Queue the stream self.pending_send.push(stream); diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index fb67a7a..dd8b138 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -615,6 +615,8 @@ impl Recv { return; } + trace!("enqueue_reset_expiration; {:?}", stream.id); + if !counts.can_inc_num_reset_streams() { // try to evict 1 stream if possible // if max allow is 0, this won't be able to evict, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 29011b8..2a8d678 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -909,16 +909,29 @@ fn drop_stream_ref(inner: &Mutex, key: store::Key) { let actions = &mut me.actions; - me.counts.transition(stream, |counts, mut stream| { - if stream.is_canceled_interest() { - actions.send.schedule_cancel( - &mut stream, - &mut actions.task); - actions.recv.enqueue_reset_expiration(stream, counts); + me.counts.transition(stream, |counts, stream| { + maybe_cancel(stream, actions, counts); + + if stream.ref_count == 0 { + let mut ppp = stream.pending_push_promises.take(); + while let Some(promise) = ppp.pop(stream.store_mut()) { + counts.transition(promise, |counts, stream| { + maybe_cancel(stream, actions, counts); + }); + } } }); } +fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) { + if stream.is_canceled_interest() { + actions.send.schedule_cancel( + stream, + &mut actions.task); + actions.recv.enqueue_reset_expiration(stream, counts); + } +} + // ===== impl SendBuffer ===== impl SendBuffer { diff --git a/tests/push_promise.rs b/tests/push_promise.rs index 1178167..c1e92ba 100644 --- a/tests/push_promise.rs +++ b/tests/push_promise.rs @@ -94,6 +94,49 @@ fn recv_push_when_push_disabled_is_conn_error() { h2.join(mock).wait().unwrap(); } +#[test] +fn pending_push_promises_reset_when_dropped() { + let _ = ::env_logger::init(); + + let (io, srv) = mock::new(); + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .send_frame( + frames::push_promise(1, 2) + .request("GET", "https://http2.akamai.com/style.css") + ) + .send_frame(frames::headers(1).response(200).eos()) + .recv_frame(frames::reset(2).cancel()) + .close(); + + let client = Client::handshake(io).unwrap().and_then(|(mut client, conn)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + let req = client + .send_request(request, true) + .unwrap() + .0.expect("response") + .and_then(|resp| { + assert_eq!(resp.status(), StatusCode::OK); + Ok(()) + }); + + conn.drive(req) + .and_then(|(conn, _)| conn.expect("client")) + }); + + client.join(srv).wait().expect("wait"); +} + #[test] #[ignore] fn recv_push_promise_with_unsafe_method_is_stream_error() {