reset pending push promises if user drops all refs (#199)

This commit is contained in:
Sean McArthur
2017-12-20 16:50:20 -08:00
committed by GitHub
parent 10d8ed7429
commit a89401dd91
4 changed files with 65 additions and 6 deletions

View File

@@ -87,6 +87,7 @@ impl Prioritize {
pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Task>) {
// If the stream is waiting to be opened, nothing more to do.
if !stream.is_pending_open {
trace!("schedule_send; {:?}", stream.id);
// Queue the stream
self.pending_send.push(stream);

View File

@@ -615,6 +615,8 @@ impl Recv {
return;
}
trace!("enqueue_reset_expiration; {:?}", stream.id);
if !counts.can_inc_num_reset_streams() {
// try to evict 1 stream if possible
// if max allow is 0, this won't be able to evict,

View File

@@ -909,16 +909,29 @@ fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
let actions = &mut me.actions;
me.counts.transition(stream, |counts, mut stream| {
if stream.is_canceled_interest() {
actions.send.schedule_cancel(
&mut stream,
&mut actions.task);
actions.recv.enqueue_reset_expiration(stream, counts);
me.counts.transition(stream, |counts, stream| {
maybe_cancel(stream, actions, counts);
if stream.ref_count == 0 {
let mut ppp = stream.pending_push_promises.take();
while let Some(promise) = ppp.pop(stream.store_mut()) {
counts.transition(promise, |counts, stream| {
maybe_cancel(stream, actions, counts);
});
}
}
});
}
fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) {
if stream.is_canceled_interest() {
actions.send.schedule_cancel(
stream,
&mut actions.task);
actions.recv.enqueue_reset_expiration(stream, counts);
}
}
// ===== impl SendBuffer =====
impl<B> SendBuffer<B> {

View File

@@ -94,6 +94,49 @@ fn recv_push_when_push_disabled_is_conn_error() {
h2.join(mock).wait().unwrap();
}
#[test]
fn pending_push_promises_reset_when_dropped() {
let _ = ::env_logger::init();
let (io, srv) = mock::new();
let srv = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(
frames::push_promise(1, 2)
.request("GET", "https://http2.akamai.com/style.css")
)
.send_frame(frames::headers(1).response(200).eos())
.recv_frame(frames::reset(2).cancel())
.close();
let client = Client::handshake(io).unwrap().and_then(|(mut client, conn)| {
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = client
.send_request(request, true)
.unwrap()
.0.expect("response")
.and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
});
conn.drive(req)
.and_then(|(conn, _)| conn.expect("client"))
});
client.join(srv).wait().expect("wait");
}
#[test]
#[ignore]
fn recv_push_promise_with_unsafe_method_is_stream_error() {