perf(http1): propagate service user error generic further into dispatcher
In the case of `Never`, this allows the optimizer to eliminate more code inside the dispatcher, winning some more performance in benchmarks.
This commit is contained in:
@@ -5,6 +5,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
|||||||
|
|
||||||
use body::{Body, Payload};
|
use body::{Body, Payload};
|
||||||
use body::internal::FullDataArg;
|
use body::internal::FullDataArg;
|
||||||
|
use common::Never;
|
||||||
use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead};
|
use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead};
|
||||||
use super::Http1Transaction;
|
use super::Http1Transaction;
|
||||||
use service::Service;
|
use service::Service;
|
||||||
@@ -20,8 +21,9 @@ pub(crate) struct Dispatcher<D, Bs: Payload, I, T> {
|
|||||||
pub(crate) trait Dispatch {
|
pub(crate) trait Dispatch {
|
||||||
type PollItem;
|
type PollItem;
|
||||||
type PollBody;
|
type PollBody;
|
||||||
|
type PollError;
|
||||||
type RecvItem;
|
type RecvItem;
|
||||||
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, ::Error>;
|
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError>;
|
||||||
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>;
|
fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>;
|
||||||
fn poll_ready(&mut self) -> Poll<(), ()>;
|
fn poll_ready(&mut self) -> Poll<(), ()>;
|
||||||
fn should_poll(&self) -> bool;
|
fn should_poll(&self) -> bool;
|
||||||
@@ -42,6 +44,7 @@ type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>;
|
|||||||
impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
|
impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
|
||||||
where
|
where
|
||||||
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
|
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
|
||||||
|
D::PollError: Into<Box<::std::error::Error + Send + Sync>>,
|
||||||
I: AsyncRead + AsyncWrite,
|
I: AsyncRead + AsyncWrite,
|
||||||
T: Http1Transaction,
|
T: Http1Transaction,
|
||||||
Bs: Payload,
|
Bs: Payload,
|
||||||
@@ -231,7 +234,7 @@ where
|
|||||||
if self.is_closing {
|
if self.is_closing {
|
||||||
return Ok(Async::Ready(()));
|
return Ok(Async::Ready(()));
|
||||||
} else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() {
|
} else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() {
|
||||||
if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg()) {
|
if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg().map_err(::Error::new_user_service)) {
|
||||||
// Check if the body knows its full data immediately.
|
// Check if the body knows its full data immediately.
|
||||||
//
|
//
|
||||||
// If so, we can skip a bit of bookkeeping that streaming
|
// If so, we can skip a bit of bookkeeping that streaming
|
||||||
@@ -332,6 +335,7 @@ where
|
|||||||
impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
|
impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
|
||||||
where
|
where
|
||||||
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
|
D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
|
||||||
|
D::PollError: Into<Box<::std::error::Error + Send + Sync>>,
|
||||||
I: AsyncRead + AsyncWrite,
|
I: AsyncRead + AsyncWrite,
|
||||||
T: Http1Transaction,
|
T: Http1Transaction,
|
||||||
Bs: Payload,
|
Bs: Payload,
|
||||||
@@ -367,11 +371,12 @@ where
|
|||||||
{
|
{
|
||||||
type PollItem = MessageHead<StatusCode>;
|
type PollItem = MessageHead<StatusCode>;
|
||||||
type PollBody = Bs;
|
type PollBody = Bs;
|
||||||
|
type PollError = S::Error;
|
||||||
type RecvItem = RequestHead;
|
type RecvItem = RequestHead;
|
||||||
|
|
||||||
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, ::Error> {
|
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError> {
|
||||||
if let Some(mut fut) = self.in_flight.take() {
|
if let Some(mut fut) = self.in_flight.take() {
|
||||||
let resp = match fut.poll().map_err(::Error::new_user_service)? {
|
let resp = match fut.poll()? {
|
||||||
Async::Ready(res) => res,
|
Async::Ready(res) => res,
|
||||||
Async::NotReady => {
|
Async::NotReady => {
|
||||||
self.in_flight = Some(fut);
|
self.in_flight = Some(fut);
|
||||||
@@ -432,9 +437,10 @@ where
|
|||||||
{
|
{
|
||||||
type PollItem = RequestHead;
|
type PollItem = RequestHead;
|
||||||
type PollBody = B;
|
type PollBody = B;
|
||||||
|
type PollError = Never;
|
||||||
type RecvItem = ResponseHead;
|
type RecvItem = ResponseHead;
|
||||||
|
|
||||||
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, ::Error> {
|
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Never> {
|
||||||
match self.rx.poll() {
|
match self.rx.poll() {
|
||||||
Ok(Async::Ready(Some((req, mut cb)))) => {
|
Ok(Async::Ready(Some((req, mut cb)))) => {
|
||||||
// check that future hasn't been canceled already
|
// check that future hasn't been canceled already
|
||||||
|
|||||||
Reference in New Issue
Block a user