feat(http2): check Error::source() for an HTTP2 error code to send in reset
This commit is contained in:
67
src/error.rs
67
src/error.rs
@@ -5,6 +5,7 @@ use std::io;
|
|||||||
|
|
||||||
use httparse;
|
use httparse;
|
||||||
use http;
|
use http;
|
||||||
|
use h2;
|
||||||
|
|
||||||
/// Result type often returned from methods that can have hyper `Error`s.
|
/// Result type often returned from methods that can have hyper `Error`s.
|
||||||
pub type Result<T> = ::std::result::Result<T, Error>;
|
pub type Result<T> = ::std::result::Result<T, Error>;
|
||||||
@@ -137,10 +138,8 @@ impl Error {
|
|||||||
self.inner.kind == Kind::Connect
|
self.inner.kind == Kind::Connect
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the error's cause.
|
#[doc(hidden)]
|
||||||
///
|
#[cfg_attr(error_source, deprecated(note = "use Error::source instead"))]
|
||||||
/// This is identical to `Error::cause` except that it provides extra
|
|
||||||
/// bounds required to be able to downcast the error.
|
|
||||||
pub fn cause2(&self) -> Option<&(StdError + 'static + Sync + Send)> {
|
pub fn cause2(&self) -> Option<&(StdError + 'static + Sync + Send)> {
|
||||||
self.inner.cause.as_ref().map(|e| &**e)
|
self.inner.cause.as_ref().map(|e| &**e)
|
||||||
}
|
}
|
||||||
@@ -163,6 +162,45 @@ impl Error {
|
|||||||
&self.inner.kind
|
&self.inner.kind
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(error_source))]
|
||||||
|
pub(crate) fn h2_reason(&self) -> h2::Reason {
|
||||||
|
// Since we don't have access to `Error::source`, we can only
|
||||||
|
// look so far...
|
||||||
|
let mut cause = self.cause2();
|
||||||
|
while let Some(err) = cause {
|
||||||
|
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
|
||||||
|
return h2_err
|
||||||
|
.reason()
|
||||||
|
.unwrap_or(h2::Reason::INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
cause = err
|
||||||
|
.downcast_ref::<Error>()
|
||||||
|
.and_then(Error::cause2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// else
|
||||||
|
h2::Reason::INTERNAL_ERROR
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(error_source)]
|
||||||
|
pub(crate) fn h2_reason(&self) -> h2::Reason {
|
||||||
|
// Find an h2::Reason somewhere in the cause stack, if it exists,
|
||||||
|
// otherwise assume an INTERNAL_ERROR.
|
||||||
|
let mut cause = self.source();
|
||||||
|
while let Some(err) = cause {
|
||||||
|
if let Some(h2_err) = err.downcast_ref::<h2::Error>() {
|
||||||
|
return h2_err
|
||||||
|
.reason()
|
||||||
|
.unwrap_or(h2::Reason::INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
cause = err.source();
|
||||||
|
}
|
||||||
|
|
||||||
|
// else
|
||||||
|
h2::Reason::INTERNAL_ERROR
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn new_canceled<E: Into<Cause>>(cause: Option<E>) -> Error {
|
pub(crate) fn new_canceled<E: Into<Cause>>(cause: Option<E>) -> Error {
|
||||||
Error::new(Kind::Canceled, cause.map(Into::into))
|
Error::new(Kind::Canceled, cause.map(Into::into))
|
||||||
}
|
}
|
||||||
@@ -400,4 +438,25 @@ impl AssertSendSync for Error {}
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn h2_reason_unknown() {
|
||||||
|
let closed = Error::new_closed();
|
||||||
|
assert_eq!(closed.h2_reason(), h2::Reason::INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn h2_reason_one_level() {
|
||||||
|
let body_err = Error::new_user_body(h2::Error::from(h2::Reason::ENHANCE_YOUR_CALM));
|
||||||
|
assert_eq!(body_err.h2_reason(), h2::Reason::ENHANCE_YOUR_CALM);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn h2_reason_nested() {
|
||||||
|
let recvd = Error::new_h2(h2::Error::from(h2::Reason::HTTP_1_1_REQUIRED));
|
||||||
|
// Suppose a user were proxying the received error
|
||||||
|
let svc_err = Error::new_user_service(recvd);
|
||||||
|
assert_eq!(svc_err.h2_reason(), h2::Reason::HTTP_1_1_REQUIRED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use bytes::Buf;
|
use bytes::Buf;
|
||||||
use futures::{Async, Future, Poll};
|
use futures::{Async, Future, Poll};
|
||||||
use h2::{Reason, SendStream};
|
use h2::{SendStream};
|
||||||
use http::header::{
|
use http::header::{
|
||||||
HeaderName, CONNECTION, PROXY_AUTHENTICATE, PROXY_AUTHORIZATION, TE, TRAILER,
|
HeaderName, CONNECTION, PROXY_AUTHENTICATE, PROXY_AUTHORIZATION, TE, TRAILER,
|
||||||
TRANSFER_ENCODING, UPGRADE,
|
TRANSFER_ENCODING, UPGRADE,
|
||||||
@@ -91,10 +91,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_err(&mut self, err: S::Error) -> ::Error {
|
fn on_user_err(&mut self, err: S::Error) -> ::Error {
|
||||||
let err = ::Error::new_user_body(err);
|
let err = ::Error::new_user_body(err);
|
||||||
trace!("send body user stream error: {}", err);
|
debug!("send body user stream error: {}", err);
|
||||||
self.body_tx.send_reset(Reason::INTERNAL_ERROR);
|
self.body_tx.send_reset(err.h2_reason());
|
||||||
err
|
err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -138,7 +138,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match try_ready!(self.stream.poll_data().map_err(|e| self.on_err(e))) {
|
match try_ready!(self.stream.poll_data().map_err(|e| self.on_user_err(e))) {
|
||||||
Some(chunk) => {
|
Some(chunk) => {
|
||||||
let is_eos = self.stream.is_end_stream();
|
let is_eos = self.stream.is_end_stream();
|
||||||
trace!(
|
trace!(
|
||||||
@@ -175,7 +175,7 @@ where
|
|||||||
return Err(::Error::new_body_write(::h2::Error::from(reason)));
|
return Err(::Error::new_body_write(::h2::Error::from(reason)));
|
||||||
}
|
}
|
||||||
|
|
||||||
match try_ready!(self.stream.poll_trailers().map_err(|e| self.on_err(e))) {
|
match try_ready!(self.stream.poll_trailers().map_err(|e| self.on_user_err(e))) {
|
||||||
Some(trailers) => {
|
Some(trailers) => {
|
||||||
self.body_tx
|
self.body_tx
|
||||||
.send_trailers(trailers)
|
.send_trailers(trailers)
|
||||||
|
|||||||
@@ -211,7 +211,7 @@ where
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
let err = ::Error::new_user_service(e);
|
let err = ::Error::new_user_service(e);
|
||||||
warn!("http2 service errored: {}", err);
|
warn!("http2 service errored: {}", err);
|
||||||
self.reply.send_reset(Reason::INTERNAL_ERROR);
|
self.reply.send_reset(err.h2_reason());
|
||||||
return Err(err);
|
return Err(err);
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
@@ -232,7 +232,7 @@ where
|
|||||||
match self.reply.send_response(res, $eos) {
|
match self.reply.send_response(res, $eos) {
|
||||||
Ok(tx) => tx,
|
Ok(tx) => tx,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
trace!("send response error: {}", e);
|
debug!("send response error: {}", e);
|
||||||
self.reply.send_reset(Reason::INTERNAL_ERROR);
|
self.reply.send_reset(Reason::INTERNAL_ERROR);
|
||||||
return Err(::Error::new_h2(e));
|
return Err(::Error::new_h2(e));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
#![deny(warnings)]
|
#![deny(warnings)]
|
||||||
extern crate http;
|
extern crate http;
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
|
extern crate h2;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate futures_timer;
|
extern crate futures_timer;
|
||||||
@@ -1562,6 +1563,73 @@ fn http1_only() {
|
|||||||
})).unwrap_err();
|
})).unwrap_err();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn http2_service_error_sends_reset_reason() {
|
||||||
|
use std::error::Error;
|
||||||
|
let server = serve();
|
||||||
|
let addr_str = format!("http://{}", server.addr());
|
||||||
|
|
||||||
|
server
|
||||||
|
.reply()
|
||||||
|
.error(h2::Error::from(h2::Reason::INADEQUATE_SECURITY));
|
||||||
|
|
||||||
|
let mut rt = Runtime::new().expect("runtime new");
|
||||||
|
|
||||||
|
let err = rt.block_on(hyper::rt::lazy(move || {
|
||||||
|
let client = Client::builder()
|
||||||
|
.http2_only(true)
|
||||||
|
.build_http::<hyper::Body>();
|
||||||
|
let uri = addr_str.parse().expect("server addr should parse");
|
||||||
|
|
||||||
|
client.get(uri)
|
||||||
|
})).unwrap_err();
|
||||||
|
|
||||||
|
let h2_err = err
|
||||||
|
.source()
|
||||||
|
.unwrap()
|
||||||
|
.downcast_ref::<h2::Error>()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn http2_body_user_error_sends_reset_reason() {
|
||||||
|
use std::error::Error;
|
||||||
|
let server = serve();
|
||||||
|
let addr_str = format!("http://{}", server.addr());
|
||||||
|
|
||||||
|
let b = ::futures::stream::once::<String, h2::Error>(Err(
|
||||||
|
h2::Error::from(h2::Reason::INADEQUATE_SECURITY)
|
||||||
|
));
|
||||||
|
let b = hyper::Body::wrap_stream(b);
|
||||||
|
|
||||||
|
server
|
||||||
|
.reply()
|
||||||
|
.body_stream(b);
|
||||||
|
|
||||||
|
let mut rt = Runtime::new().expect("runtime new");
|
||||||
|
|
||||||
|
let err = rt.block_on(hyper::rt::lazy(move || {
|
||||||
|
let client = Client::builder()
|
||||||
|
.http2_only(true)
|
||||||
|
.build_http::<hyper::Body>();
|
||||||
|
let uri = addr_str.parse().expect("server addr should parse");
|
||||||
|
|
||||||
|
client
|
||||||
|
.get(uri)
|
||||||
|
.and_then(|res| res.into_body().concat2())
|
||||||
|
})).unwrap_err();
|
||||||
|
|
||||||
|
let h2_err = err
|
||||||
|
.source()
|
||||||
|
.unwrap()
|
||||||
|
.downcast_ref::<h2::Error>()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------
|
// -------------------------------------------------
|
||||||
// the Server that is used to run all the tests with
|
// the Server that is used to run all the tests with
|
||||||
// -------------------------------------------------
|
// -------------------------------------------------
|
||||||
@@ -1609,6 +1677,8 @@ impl Serve {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type BoxError = Box<::std::error::Error + Send + Sync>;
|
||||||
|
|
||||||
struct ReplyBuilder<'a> {
|
struct ReplyBuilder<'a> {
|
||||||
tx: &'a spmc::Sender<Reply>,
|
tx: &'a spmc::Sender<Reply>,
|
||||||
}
|
}
|
||||||
@@ -1635,10 +1705,13 @@ impl<'a> ReplyBuilder<'a> {
|
|||||||
self.tx.send(Reply::Body(body.as_ref().to_vec().into())).unwrap();
|
self.tx.send(Reply::Body(body.as_ref().to_vec().into())).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn body_stream(self, body: Body)
|
fn body_stream(self, body: Body) {
|
||||||
{
|
|
||||||
self.tx.send(Reply::Body(body)).unwrap();
|
self.tx.send(Reply::Body(body)).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn error<E: Into<BoxError>>(self, err: E) {
|
||||||
|
self.tx.send(Reply::Error(err.into())).unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Drop for ReplyBuilder<'a> {
|
impl<'a> Drop for ReplyBuilder<'a> {
|
||||||
@@ -1666,6 +1739,7 @@ enum Reply {
|
|||||||
Version(hyper::Version),
|
Version(hyper::Version),
|
||||||
Header(HeaderName, HeaderValue),
|
Header(HeaderName, HeaderValue),
|
||||||
Body(hyper::Body),
|
Body(hyper::Body),
|
||||||
|
Error(BoxError),
|
||||||
End,
|
End,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1677,7 +1751,7 @@ enum Msg {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TestService {
|
impl TestService {
|
||||||
fn call(&self, req: Request<Body>) -> Box<Future<Item=Response<Body>, Error=hyper::Error> + Send> {
|
fn call(&self, req: Request<Body>) -> Box<Future<Item=Response<Body>, Error=BoxError> + Send> {
|
||||||
let tx1 = self.tx.clone();
|
let tx1 = self.tx.clone();
|
||||||
let tx2 = self.tx.clone();
|
let tx2 = self.tx.clone();
|
||||||
let replies = self.reply.clone();
|
let replies = self.reply.clone();
|
||||||
@@ -1691,12 +1765,12 @@ impl TestService {
|
|||||||
};
|
};
|
||||||
tx2.send(msg).unwrap();
|
tx2.send(msg).unwrap();
|
||||||
Ok(())
|
Ok(())
|
||||||
}).map(move |_| {
|
}).and_then(move |_| {
|
||||||
TestService::build_reply(replies)
|
TestService::build_reply(replies)
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_reply(replies: spmc::Receiver<Reply>) -> Response<Body> {
|
fn build_reply(replies: spmc::Receiver<Reply>) -> Result<Response<Body>, BoxError> {
|
||||||
let mut res = Response::new(Body::empty());
|
let mut res = Response::new(Body::empty());
|
||||||
while let Ok(reply) = replies.try_recv() {
|
while let Ok(reply) = replies.try_recv() {
|
||||||
match reply {
|
match reply {
|
||||||
@@ -1712,10 +1786,11 @@ impl TestService {
|
|||||||
Reply::Body(body) => {
|
Reply::Body(body) => {
|
||||||
*res.body_mut() = body;
|
*res.body_mut() = body;
|
||||||
},
|
},
|
||||||
|
Reply::Error(err) => return Err(err),
|
||||||
Reply::End => break,
|
Reply::End => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
res
|
Ok(res)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user