fix(http2): correctly propagate HTTP2 request cancellation

This commit is contained in:
Sean McArthur
2019-06-03 14:30:30 -07:00
parent 01c03db7ea
commit 50198851a2
2 changed files with 35 additions and 5 deletions

View File

@@ -1,4 +1,4 @@
use futures::{Async, Poll, Stream}; use futures::{future, Async, Future, Poll, Stream};
use futures::sync::{mpsc, oneshot}; use futures::sync::{mpsc, oneshot};
use want; use want;
@@ -202,6 +202,37 @@ impl<T, U> Callback<T, U> {
} }
} }
} }
pub(crate) fn send_when(
self,
mut when: impl Future<Item=U, Error=(::Error, Option<T>)>,
) -> impl Future<Item=(), Error=()> {
let mut cb = Some(self);
// "select" on this callback being canceled, and the future completing
future::poll_fn(move || {
match when.poll() {
Ok(Async::Ready(res)) => {
cb.take()
.expect("polled after complete")
.send(Ok(res));
Ok(().into())
},
Ok(Async::NotReady) => {
// check if the callback is canceled
try_ready!(cb.as_mut().unwrap().poll_cancel());
trace!("send_when canceled");
Ok(().into())
},
Err(err) => {
cb.take()
.expect("polled after complete")
.send(Err(err));
Ok(().into())
}
}
})
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -163,16 +163,15 @@ where
let content_length = content_length_parse_all(res.headers()); let content_length = content_length_parse_all(res.headers());
let res = res.map(|stream| let res = res.map(|stream|
::Body::h2(stream, content_length)); ::Body::h2(stream, content_length));
cb.send(Ok(res)); Ok(res)
}, },
Err(err) => { Err(err) => {
debug!("client response error: {}", err); debug!("client response error: {}", err);
cb.send(Err((::Error::new_h2(err), None))); Err((::Error::new_h2(err), None))
} }
} }
Ok(())
}); });
self.executor.execute(fut)?; self.executor.execute(cb.send_when(fut))?;
continue; continue;
}, },