feat(http2): quickly cancel when receiving RST_STREAM
Update Http2 proto to cancel quick when the stream is reset, on an `RST_STREAM` frame. Closes: #1549
This commit is contained in:
committed by
Sean McArthur
parent
2a3844acc3
commit
ffdb478831
@@ -116,6 +116,13 @@ where
|
|||||||
None => return Err(::Error::new_canceled(None::<::Error>)),
|
None => return Err(::Error::new_canceled(None::<::Error>)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if let Async::Ready(reason) =
|
||||||
|
self.body_tx.poll_reset().map_err(|e| ::Error::new_h2(e))?
|
||||||
|
{
|
||||||
|
debug!("stream received RST_STREAM: {:?}", reason);
|
||||||
|
return Err(::Error::new_h2(reason.into()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match try_ready!(self.stream.poll_data().map_err(|e| self.on_err(e))) {
|
match try_ready!(self.stream.poll_data().map_err(|e| self.on_err(e))) {
|
||||||
@@ -148,6 +155,13 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
if let Async::Ready(reason) =
|
||||||
|
self.body_tx.poll_reset().map_err(|e| ::Error::new_h2(e))?
|
||||||
|
{
|
||||||
|
debug!("stream received RST_STREAM: {:?}", reason);
|
||||||
|
return Err(::Error::new_h2(reason.into()));
|
||||||
|
}
|
||||||
|
|
||||||
match try_ready!(self.stream.poll_trailers().map_err(|e| self.on_err(e))) {
|
match try_ready!(self.stream.poll_trailers().map_err(|e| self.on_err(e))) {
|
||||||
Some(trailers) => {
|
Some(trailers) => {
|
||||||
self.body_tx
|
self.body_tx
|
||||||
|
|||||||
@@ -174,7 +174,22 @@ where
|
|||||||
loop {
|
loop {
|
||||||
let next = match self.state {
|
let next = match self.state {
|
||||||
H2StreamState::Service(ref mut h) => {
|
H2StreamState::Service(ref mut h) => {
|
||||||
let res = try_ready!(h.poll().map_err(::Error::new_user_service));
|
let res = match h.poll() {
|
||||||
|
Ok(Async::Ready(r)) => r,
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
// Body is not yet ready, so we want to check if the client has sent a
|
||||||
|
// RST_STREAM frame which would cancel the current request.
|
||||||
|
if let Async::Ready(reason) =
|
||||||
|
self.reply.poll_reset().map_err(|e| ::Error::new_h2(e))?
|
||||||
|
{
|
||||||
|
debug!("stream received RST_STREAM: {:?}", reason);
|
||||||
|
return Err(::Error::new_h2(reason.into()));
|
||||||
|
}
|
||||||
|
return Ok(Async::NotReady);
|
||||||
|
}
|
||||||
|
Err(e) => return Err(::Error::new_user_service(e)),
|
||||||
|
};
|
||||||
|
|
||||||
let (head, body) = res.into_parts();
|
let (head, body) = res.into_parts();
|
||||||
let mut res = ::http::Response::from_parts(head, ());
|
let mut res = ::http::Response::from_parts(head, ());
|
||||||
super::strip_connection_headers(res.headers_mut());
|
super::strip_connection_headers(res.headers_mut());
|
||||||
|
|||||||
Reference in New Issue
Block a user