diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 290edfe2..174a8cb9 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -5,6 +5,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use body::{Body, Payload}; use body::internal::FullDataArg; +use common::Never; use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead}; use super::Http1Transaction; use service::Service; @@ -20,8 +21,9 @@ pub(crate) struct Dispatcher { pub(crate) trait Dispatch { type PollItem; type PollBody; + type PollError; type RecvItem; - fn poll_msg(&mut self) -> Poll, ::Error>; + fn poll_msg(&mut self) -> Poll, Self::PollError>; fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>; fn poll_ready(&mut self) -> Poll<(), ()>; fn should_poll(&self) -> bool; @@ -42,6 +44,7 @@ type ClientRx = ::client::dispatch::Receiver, Response>; impl Dispatcher where D: Dispatch, PollBody=Bs, RecvItem=MessageHead>, + D::PollError: Into>, I: AsyncRead + AsyncWrite, T: Http1Transaction, Bs: Payload, @@ -231,7 +234,7 @@ where if self.is_closing { return Ok(Async::Ready(())); } else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() { - if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg()) { + if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg().map_err(::Error::new_user_service)) { // Check if the body knows its full data immediately. // // If so, we can skip a bit of bookkeeping that streaming @@ -332,6 +335,7 @@ where impl Future for Dispatcher where D: Dispatch, PollBody=Bs, RecvItem=MessageHead>, + D::PollError: Into>, I: AsyncRead + AsyncWrite, T: Http1Transaction, Bs: Payload, @@ -367,11 +371,12 @@ where { type PollItem = MessageHead; type PollBody = Bs; + type PollError = S::Error; type RecvItem = RequestHead; - fn poll_msg(&mut self) -> Poll, ::Error> { + fn poll_msg(&mut self) -> Poll, Self::PollError> { if let Some(mut fut) = self.in_flight.take() { - let resp = match fut.poll().map_err(::Error::new_user_service)? { + let resp = match fut.poll()? { Async::Ready(res) => res, Async::NotReady => { self.in_flight = Some(fut); @@ -432,9 +437,10 @@ where { type PollItem = RequestHead; type PollBody = B; + type PollError = Never; type RecvItem = ResponseHead; - fn poll_msg(&mut self) -> Poll, ::Error> { + fn poll_msg(&mut self) -> Poll, Never> { match self.rx.poll() { Ok(Async::Ready(Some((req, mut cb)))) => { // check that future hasn't been canceled already