refactor(body): use HttpBody with extra bounds instead of Payload trait

This commit is contained in:
Dirkjan Ochtman
2020-04-19 21:59:52 +02:00
committed by Sean McArthur
parent 203621e3be
commit aac0e2dd57
20 changed files with 142 additions and 243 deletions

View File

@@ -22,17 +22,14 @@ pub use self::aggregate::aggregate;
pub use self::body::{Body, Sender};
pub use self::to_bytes::to_bytes;
pub(crate) use self::payload::Payload;
mod aggregate;
mod body;
mod payload;
mod to_bytes;
/// An optimization to try to take a full body if immediately available.
///
/// This is currently limited to *only* `hyper::Body`s.
pub(crate) fn take_full_data<T: Payload + 'static>(body: &mut T) -> Option<T::Data> {
pub(crate) fn take_full_data<T: HttpBody + 'static>(body: &mut T) -> Option<T::Data> {
use std::any::{Any, TypeId};
// This static type check can be optimized at compile-time.

View File

@@ -1,139 +0,0 @@
use std::error::Error as StdError;
use bytes::Buf;
use http::HeaderMap;
use crate::common::{task, Pin, Poll};
use http_body::{Body as HttpBody, SizeHint};
/// This trait represents a streaming body of a `Request` or `Response`.
///
/// The built-in implementation of this trait is [`Body`](::Body), in case you
/// don't need to customize a send stream for your own application.
pub trait Payload: sealed::Sealed + Send + 'static {
/// A buffer of bytes representing a single chunk of a body.
type Data: Buf + Send;
/// The error type of this stream.
type Error: Into<Box<dyn StdError + Send + Sync>>;
/// Poll for a `Data` buffer.
///
/// Similar to `Stream::poll_next`, this yields `Some(Data)` until
/// the body ends, when it yields `None`.
fn poll_data(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>>;
/// Poll for an optional **single** `HeaderMap` of trailers.
///
/// This should **only** be called after `poll_data` has ended.
///
/// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2.
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut task::Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
/// A hint that the `Body` is complete, and doesn't need to be polled more.
///
/// This can be useful to determine if the there is any body or trailers
/// without having to poll. An empty `Body` could return `true` and hyper
/// would be able to know that only the headers need to be sent. Or, it can
/// also be checked after each `poll_data` call, to allow hyper to try to
/// end the underlying stream with the last chunk, instead of needing to
/// send an extra `DATA` frame just to mark the stream as finished.
///
/// As a hint, it is used to try to optimize, and thus is OK for a default
/// implementation to return `false`.
fn is_end_stream(&self) -> bool {
false
}
/// Returns a `SizeHint` providing an upper and lower bound on the possible size.
///
/// If there is an exact size of bytes known, this would allow hyper to
/// send a `Content-Length` header automatically, not needing to fall back to
/// `TransferEncoding: chunked`.
///
/// This does not need to be kept updated after polls, it will only be
/// called once to create the headers.
fn size_hint(&self) -> SizeHint {
SizeHint::default()
}
}
impl<T> Payload for T
where
T: HttpBody + Send + 'static,
T::Data: Send,
T::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Data = T::Data;
type Error = T::Error;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
HttpBody::poll_data(self, cx)
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
HttpBody::poll_trailers(self, cx)
}
fn is_end_stream(&self) -> bool {
HttpBody::is_end_stream(self)
}
fn size_hint(&self) -> SizeHint {
HttpBody::size_hint(self)
}
}
impl<T> sealed::Sealed for T
where
T: HttpBody + Send + 'static,
T::Data: Send,
T::Error: Into<Box<dyn StdError + Send + Sync>>,
{
}
mod sealed {
pub trait Sealed {}
}
/*
impl<E: Payload> Payload for Box<E> {
type Data = E::Data;
type Error = E::Error;
fn poll_data(&mut self) -> Poll<Option<Self::Data>, Self::Error> {
(**self).poll_data()
}
fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, Self::Error> {
(**self).poll_trailers()
}
fn is_end_stream(&self) -> bool {
(**self).is_end_stream()
}
fn content_length(&self) -> Option<u64> {
(**self).content_length()
}
#[doc(hidden)]
fn __hyper_full_data(&mut self, arg: FullDataArg) -> FullDataRet<Self::Data> {
(**self).__hyper_full_data(arg)
}
}
*/

View File

@@ -8,6 +8,7 @@
//! If don't have need to manage connections yourself, consider using the
//! higher-level [Client](super) API.
use std::error::Error as StdError;
use std::fmt;
use std::mem;
use std::sync::Arc;
@@ -21,7 +22,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::Service;
use super::dispatch;
use crate::body::Payload;
use crate::body::HttpBody;
use crate::common::{task, BoxSendFuture, Exec, Executor, Future, Pin, Poll};
use crate::proto;
use crate::upgrade::Upgraded;
@@ -32,7 +33,7 @@ type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<proto::dispatch::Cli
#[pin_project]
enum ProtoClient<T, B>
where
B: Payload,
B: HttpBody,
{
H1(#[pin] Http1Dispatcher<T, B, proto::h1::ClientTransaction>),
H2(#[pin] proto::h2::ClientTask<B>),
@@ -63,7 +64,7 @@ pub struct SendRequest<B> {
pub struct Connection<T, B>
where
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
B: HttpBody + 'static,
{
inner: Option<ProtoClient<T, B>>,
}
@@ -160,7 +161,7 @@ impl<B> SendRequest<B> {
impl<B> SendRequest<B>
where
B: Payload + 'static,
B: HttpBody + 'static,
{
/// Sends a `Request` on the associated connection.
///
@@ -245,7 +246,7 @@ where
impl<B> Service<Request<B>> for SendRequest<B>
where
B: Payload + 'static,
B: HttpBody + 'static,
{
type Response = Response<Body>;
type Error = crate::Error;
@@ -280,7 +281,7 @@ impl<B> Http2SendRequest<B> {
impl<B> Http2SendRequest<B>
where
B: Payload + 'static,
B: HttpBody + 'static,
{
pub(super) fn send_request_retryable(
&mut self,
@@ -328,7 +329,9 @@ impl<B> Clone for Http2SendRequest<B> {
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + Unpin + 'static,
B: HttpBody + Unpin + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Return the inner IO object, and additional information.
///
@@ -380,7 +383,9 @@ where
impl<T, B> Future for Connection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<()>;
@@ -404,7 +409,7 @@ where
impl<T, B> fmt::Debug for Connection<T, B>
where
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
B: Payload + 'static,
B: HttpBody + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection").finish()
@@ -580,7 +585,9 @@ impl Builder {
) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + 'static,
B: HttpBody + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
let opts = self.clone();
@@ -652,7 +659,9 @@ impl fmt::Debug for ResponseFuture {
impl<T, B> Future for ProtoClient<T, B>
where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
B: Payload + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<proto::Dispatched>;
@@ -678,7 +687,8 @@ impl<B: Send> AssertSendSync for SendRequest<B> {}
impl<T: Send, B: Send> AssertSend for Connection<T, B>
where
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
B: HttpBody + 'static,
B::Data: Send,
{
}
@@ -686,7 +696,7 @@ where
impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
where
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
B: HttpBody + 'static,
B::Data: Send + Sync + 'static,
{
}

View File

@@ -48,6 +48,7 @@
//! # fn main () {}
//! ```
use std::error::Error as StdError;
use std::fmt;
use std::mem;
use std::time::Duration;
@@ -60,7 +61,7 @@ use http::{Method, Request, Response, Uri, Version};
use self::connect::{sealed::Connect, Alpn, Connected, Connection};
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
use crate::body::{Body, Payload};
use crate::body::{Body, HttpBody};
use crate::common::{lazy as hyper_lazy, task, BoxSendFuture, Executor, Future, Lazy, Pin, Poll};
#[cfg(feature = "tcp")]
@@ -150,16 +151,17 @@ impl Client<(), Body> {
impl<C, B> Client<C, B>
where
C: Connect + Clone + Send + Sync + 'static,
B: Payload + Send + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
/// Send a `GET` request to the supplied `Uri`.
///
/// # Note
///
/// This requires that the `Payload` type have a `Default` implementation.
/// This requires that the `HttpBody` type have a `Default` implementation.
/// It *should* return an "empty" version of itself, such that
/// `Payload::is_end_stream` is `true`.
/// `HttpBody::is_end_stream` is `true`.
///
/// # Example
///
@@ -180,7 +182,7 @@ where
{
let body = B::default();
if !body.is_end_stream() {
warn!("default Payload used for get() does not return true for is_end_stream");
warn!("default HttpBody used for get() does not return true for is_end_stream");
}
let mut req = Request::new(body);
@@ -543,8 +545,9 @@ where
impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
where
C: Connect + Clone + Send + Sync + 'static,
B: Payload + Send + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Response = Response<Body>;
type Error = crate::Error;
@@ -653,7 +656,7 @@ impl<B> PoolClient<B> {
}
}
impl<B: Payload + 'static> PoolClient<B> {
impl<B: HttpBody + 'static> PoolClient<B> {
fn send_request_retryable(
&mut self,
req: Request<B>,
@@ -1132,7 +1135,7 @@ impl Builder {
#[cfg(feature = "tcp")]
pub fn build_http<B>(&self) -> Client<HttpConnector, B>
where
B: Payload + Send,
B: HttpBody + Send,
B::Data: Send,
{
let mut connector = HttpConnector::new();
@@ -1146,7 +1149,7 @@ impl Builder {
pub fn build<C, B>(&self, connector: C) -> Client<C, B>
where
C: Connect + Clone,
B: Payload + Send,
B: HttpBody + Send,
B::Data: Send,
{
Client {

View File

@@ -8,7 +8,7 @@ use std::marker::PhantomData;
use super::conn::{Builder, SendRequest};
use crate::{
body::Payload,
body::HttpBody,
common::{task, Pin, Poll},
service::{MakeConnection, Service},
};
@@ -43,8 +43,9 @@ where
C::Connection: Unpin + Send + 'static,
C::Future: Send + 'static,
C::Error: Into<Box<dyn StdError + Send + Sync>> + Send,
B: Payload + Unpin + 'static,
B::Data: Unpin,
B: HttpBody + Unpin + Send + 'static,
B::Data: Send + Unpin,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Response = SendRequest<B>;
type Error = crate::Error;

View File

@@ -3,7 +3,7 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use crate::body::{Body, Payload};
use crate::body::{Body, HttpBody};
use crate::proto::h2::server::H2Stream;
use crate::server::conn::spawn_all::{NewSvcTask, Watcher};
use crate::service::HttpService;
@@ -14,7 +14,7 @@ pub trait Executor<Fut> {
fn execute(&self, fut: Fut);
}
pub trait H2Exec<F, B: Payload>: Clone {
pub trait H2Exec<F, B: HttpBody>: Clone {
fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
}
@@ -67,7 +67,7 @@ impl fmt::Debug for Exec {
impl<F, B> H2Exec<F, B> for Exec
where
H2Stream<F, B>: Future<Output = ()> + Send + 'static,
B: Payload,
B: HttpBody,
{
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
self.execute(fut)
@@ -91,7 +91,7 @@ impl<E, F, B> H2Exec<F, B> for E
where
E: Executor<H2Stream<F, B>> + Clone,
H2Stream<F, B>: Future<Output = ()>,
B: Payload,
B: HttpBody,
{
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) {
self.execute(fut)

View File

@@ -65,7 +65,7 @@ pub(crate) enum Parse {
#[derive(Debug, PartialEq)]
pub(crate) enum User {
/// Error calling user's Payload::poll_data().
/// Error calling user's HttpBody::poll_data().
Body,
/// Error calling user's MakeService.
MakeService,
@@ -316,7 +316,7 @@ impl Error {
Kind::Http2 => "http2 error",
Kind::Io => "connection error",
Kind::User(User::Body) => "error from user's Payload stream",
Kind::User(User::Body) => "error from user's HttpBody stream",
Kind::User(User::MakeService) => "error from user's MakeService",
Kind::User(User::Service) => "error from user's Service",
Kind::User(User::UnexpectedHeader) => "user sent unexpected header",

View File

@@ -5,7 +5,7 @@ use http::{Request, Response, StatusCode};
use tokio::io::{AsyncRead, AsyncWrite};
use super::{Http1Transaction, Wants};
use crate::body::{Body, Payload};
use crate::body::{Body, HttpBody};
use crate::common::{task, Future, Never, Pin, Poll, Unpin};
use crate::proto::{
BodyLength, Conn, DecodedLength, Dispatched, MessageHead, RequestHead, RequestLine,
@@ -13,7 +13,7 @@ use crate::proto::{
};
use crate::service::HttpService;
pub(crate) struct Dispatcher<D, Bs: Payload, I, T> {
pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
conn: Conn<I, Bs::Data, T>,
dispatch: D,
body_tx: Option<crate::body::Sender>,
@@ -58,7 +58,8 @@ where
D::PollError: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
T: Http1Transaction + Unpin,
Bs: Payload,
Bs: HttpBody + 'static,
Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
{
pub fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
Dispatcher {
@@ -400,7 +401,8 @@ where
D::PollError: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
T: Http1Transaction + Unpin,
Bs: Payload,
Bs: HttpBody + 'static,
Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<Dispatched>;
@@ -459,7 +461,7 @@ impl<S, Bs> Dispatch for Server<S, Body>
where
S: HttpService<Body, ResBody = Bs>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bs: Payload,
Bs: HttpBody,
{
type PollItem = MessageHead<StatusCode>;
type PollBody = Bs;
@@ -530,7 +532,7 @@ impl<B> Client<B> {
impl<B> Dispatch for Client<B>
where
B: Payload,
B: HttpBody,
{
type PollItem = RequestHead;
type PollBody = B;

View File

@@ -170,7 +170,7 @@ impl Encoder {
/// Encodes the full body, without verifying the remaining length matches.
///
/// This is used in conjunction with Payload::__hyper_full_data(), which
/// This is used in conjunction with HttpBody::__hyper_full_data(), which
/// means we can trust that the buf has the correct size (the buf itself
/// was checked to make the headers).
pub(super) fn danger_full_buf<B>(self, msg: B, dst: &mut WriteBuf<EncodedBuf<B>>)

View File

@@ -359,7 +359,7 @@ impl Http1Transaction for Server {
}
match msg.body {
Some(BodyLength::Known(known_len)) => {
// The Payload claims to know a length, and
// The HttpBody claims to know a length, and
// the headers are already set. For performance
// reasons, we are just going to trust that
// the values match.
@@ -388,7 +388,7 @@ impl Http1Transaction for Server {
continue 'headers;
}
Some(BodyLength::Unknown) => {
// The Payload impl didn't know how long the
// The HttpBody impl didn't know how long the
// body is, but a length header was included.
// We have to parse the value to return our
// Encoder...
@@ -825,7 +825,7 @@ impl Client {
let headers = &mut head.headers;
// If the user already set specific headers, we should respect them, regardless
// of what the Payload knows about itself. They set them for a reason.
// of what the HttpBody knows about itself. They set them for a reason.
// Because of the borrow checker, we can't check the for an existing
// Content-Length header while holding an `Entry` for the Transfer-Encoding

View File

@@ -1,3 +1,4 @@
use std::error::Error as StdError;
#[cfg(feature = "runtime")]
use std::time::Duration;
@@ -8,7 +9,7 @@ use h2::client::{Builder, SendRequest};
use tokio::io::{AsyncRead, AsyncWrite};
use super::{decode_content_length, ping, PipeToSendStream, SendBuf};
use crate::body::Payload;
use crate::body::HttpBody;
use crate::common::{task, Exec, Future, Never, Pin, Poll};
use crate::headers;
use crate::proto::Dispatched;
@@ -67,7 +68,8 @@ pub(crate) async fn handshake<T, B>(
) -> crate::Result<ClientTask<B>>
where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
B: Payload,
B: HttpBody,
B::Data: Send + 'static,
{
let (h2_tx, mut conn) = Builder::default()
.initial_window_size(config.initial_stream_window_size)
@@ -167,7 +169,7 @@ where
pub(crate) struct ClientTask<B>
where
B: Payload,
B: HttpBody,
{
ping: ping::Recorder,
conn_drop_ref: ConnDropRef,
@@ -179,7 +181,9 @@ where
impl<B> Future for ClientTask<B>
where
B: Payload + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<Dispatched>;

View File

@@ -6,9 +6,10 @@ use http::header::{
};
use http::HeaderMap;
use pin_project::pin_project;
use std::error::Error as StdError;
use super::DecodedLength;
use crate::body::Payload;
use crate::body::HttpBody;
use crate::common::{task, Future, Pin, Poll};
use crate::headers::content_length_parse_all;
@@ -91,7 +92,7 @@ fn decode_content_length(headers: &HeaderMap) -> DecodedLength {
#[pin_project]
struct PipeToSendStream<S>
where
S: Payload,
S: HttpBody,
{
body_tx: SendStream<SendBuf<S::Data>>,
data_done: bool,
@@ -101,7 +102,7 @@ where
impl<S> PipeToSendStream<S>
where
S: Payload,
S: HttpBody,
{
fn new(stream: S, tx: SendStream<SendBuf<S::Data>>) -> PipeToSendStream<S> {
PipeToSendStream {
@@ -114,7 +115,8 @@ where
impl<S> Future for PipeToSendStream<S>
where
S: Payload,
S: HttpBody,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = crate::Result<()>;

View File

@@ -9,7 +9,7 @@ use pin_project::{pin_project, project};
use tokio::io::{AsyncRead, AsyncWrite};
use super::{decode_content_length, ping, PipeToSendStream, SendBuf};
use crate::body::Payload;
use crate::body::HttpBody;
use crate::common::exec::H2Exec;
use crate::common::{task, Future, Pin, Poll};
use crate::headers;
@@ -58,7 +58,7 @@ impl Default for Config {
pub(crate) struct Server<T, S, B, E>
where
S: HttpService<Body>,
B: Payload,
B: HttpBody,
{
exec: E,
service: S,
@@ -67,7 +67,7 @@ where
enum State<T, B>
where
B: Payload,
B: HttpBody,
{
Handshaking {
ping_config: ping::Config,
@@ -79,7 +79,7 @@ where
struct Serving<T, B>
where
B: Payload,
B: HttpBody,
{
ping: Option<(ping::Recorder, ping::Ponger)>,
conn: Connection<T, SendBuf<B::Data>>,
@@ -91,7 +91,7 @@ where
T: AsyncRead + AsyncWrite + Unpin,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
B: HttpBody + 'static,
E: H2Exec<S::Future, B>,
{
pub(crate) fn new(io: T, service: S, config: &Config, exec: E) -> Server<T, S, B, E> {
@@ -157,7 +157,7 @@ where
T: AsyncRead + AsyncWrite + Unpin,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
B: HttpBody + 'static,
E: H2Exec<S::Future, B>,
{
type Output = crate::Result<Dispatched>;
@@ -201,7 +201,7 @@ where
impl<T, B> Serving<T, B>
where
T: AsyncRead + AsyncWrite + Unpin,
B: Payload,
B: HttpBody + 'static,
{
fn poll_server<S, E>(
&mut self,
@@ -315,7 +315,7 @@ where
#[pin_project]
pub struct H2Stream<F, B>
where
B: Payload,
B: HttpBody,
{
reply: SendResponse<SendBuf<B::Data>>,
#[pin]
@@ -325,7 +325,7 @@ where
#[pin_project]
enum H2StreamState<F, B>
where
B: Payload,
B: HttpBody,
{
Service(#[pin] F),
Body(#[pin] PipeToSendStream<B>),
@@ -333,7 +333,7 @@ where
impl<F, B> H2Stream<F, B>
where
B: Payload,
B: HttpBody,
{
fn new(fut: F, respond: SendResponse<SendBuf<B::Data>>) -> H2Stream<F, B> {
H2Stream {
@@ -359,7 +359,8 @@ macro_rules! reply {
impl<F, B, E> H2Stream<F, B>
where
F: Future<Output = Result<Response<B>, E>>,
B: Payload,
B: HttpBody,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: Into<Box<dyn StdError + Send + Sync>>,
{
#[project]
@@ -424,7 +425,8 @@ where
impl<F, B, E> Future for H2Stream<F, B>
where
F: Future<Output = Result<Response<B>, E>>,
B: Payload,
B: HttpBody,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: Into<Box<dyn StdError + Send + Sync>>,
{
type Output = ();

View File

@@ -21,7 +21,7 @@ use pin_project::{pin_project, project};
use tokio::io::{AsyncRead, AsyncWrite};
use super::Accept;
use crate::body::{Body, Payload};
use crate::body::{Body, HttpBody};
use crate::common::exec::{Exec, H2Exec, NewSvcExec};
use crate::common::io::Rewind;
use crate::common::{task, Future, Pin, Poll, Unpin};
@@ -122,7 +122,7 @@ where
pub(super) enum ProtoServer<T, B, S, E = Exec>
where
S: HttpService<Body>,
B: Payload,
B: HttpBody,
{
H1(
#[pin]
@@ -429,7 +429,8 @@ impl<E> Http<E> {
where
S: HttpService<Body, ResBody = Bd>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bd: Payload,
Bd: HttpBody + 'static,
Bd::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
E: H2Exec<S::Future, Bd>,
{
@@ -477,7 +478,7 @@ impl<E> Http<E> {
IO: AsyncRead + AsyncWrite + Unpin,
S: MakeServiceRef<IO, Body, ResBody = Bd>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bd: Payload,
Bd: HttpBody,
E: H2Exec<<S::Service as HttpService<Body>>::Future, Bd>,
{
Serve {
@@ -495,7 +496,8 @@ where
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
B: Payload + 'static,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
{
/// Start a graceful shutdown process for this connection.
@@ -640,7 +642,8 @@ where
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + 'static,
B: Payload + 'static,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
{
type Output = crate::Result<()>;
@@ -707,7 +710,7 @@ where
IO: AsyncRead + AsyncWrite + Unpin,
IE: Into<Box<dyn StdError + Send + Sync>>,
S: MakeServiceRef<IO, Body, ResBody = B>,
B: Payload,
B: HttpBody,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
{
fn poll_next_(
@@ -744,7 +747,8 @@ where
I: AsyncRead + AsyncWrite + Unpin,
F: Future<Output = Result<S, FE>>,
S: HttpService<Body, ResBody = B>,
B: Payload,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
{
type Output = Result<Connection<I, S, E>, FE>;
@@ -778,7 +782,7 @@ where
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody = B>,
B: Payload,
B: HttpBody,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
{
pub(super) fn poll_watch<W>(
@@ -814,7 +818,8 @@ where
T: AsyncRead + AsyncWrite + Unpin,
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
{
type Output = crate::Result<proto::Dispatched>;
@@ -834,7 +839,7 @@ pub(crate) mod spawn_all {
use tokio::io::{AsyncRead, AsyncWrite};
use super::{Connecting, UpgradeableConnection};
use crate::body::{Body, Payload};
use crate::body::{Body, HttpBody};
use crate::common::exec::H2Exec;
use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::service::HttpService;
@@ -863,6 +868,8 @@ pub(crate) mod spawn_all {
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: HttpService<Body>,
E: H2Exec<S::Future, S::ResBody>,
S::ResBody: 'static,
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Future = UpgradeableConnection<I, S, E>;
@@ -908,7 +915,8 @@ pub(crate) mod spawn_all {
N: Future<Output = Result<S, NE>>,
NE: Into<Box<dyn StdError + Send + Sync>>,
S: HttpService<Body, ResBody = B>,
B: Payload,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
W: Watcher<I, S, E>,
{
@@ -975,7 +983,8 @@ mod upgrades {
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
B: Payload + 'static,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, B>,
{
/// Start a graceful shutdown process for this connection.
@@ -992,7 +1001,8 @@ mod upgrades {
S: HttpService<Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + 'static,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: super::H2Exec<S::Future, B>,
{
type Output = crate::Result<()>;

View File

@@ -69,7 +69,7 @@ use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};
use self::accept::Accept;
use crate::body::{Body, Payload};
use crate::body::{Body, HttpBody};
use crate::common::exec::{Exec, H2Exec, NewSvcExec};
use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::service::{HttpService, MakeServiceRef};
@@ -152,7 +152,8 @@ where
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
B: HttpBody + Send + Sync + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
{
@@ -207,7 +208,8 @@ where
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
{
@@ -430,7 +432,8 @@ impl<I, E> Builder<I, E> {
I::Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<I::Conn, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
{

View File

@@ -5,7 +5,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
use super::Accept;
use crate::body::{Body, Payload};
use crate::body::{Body, HttpBody};
use crate::common::drain::{self, Draining, Signal, Watch, Watching};
use crate::common::exec::{H2Exec, NewSvcExec};
use crate::common::{task, Future, Pin, Poll, Unpin};
@@ -50,7 +50,8 @@ where
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
B: HttpBody + Send + Sync + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
F: Future<Output = ()>,
E: H2Exec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
@@ -98,6 +99,8 @@ where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: HttpService<Body>,
E: H2Exec<S::Future, S::ResBody>,
S::ResBody: Send + Sync + 'static,
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Future =
Watching<UpgradeableConnection<I, S, E>, fn(Pin<&mut UpgradeableConnection<I, S, E>>)>;
@@ -112,7 +115,8 @@ where
S: HttpService<Body>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
S::ResBody: Payload + 'static,
S::ResBody: HttpBody + Send + 'static,
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
E: H2Exec<S::Future, S::ResBody>,
{
conn.graceful_shutdown()

View File

@@ -1,13 +1,13 @@
use std::error::Error as StdError;
use crate::body::Payload;
use crate::body::HttpBody;
use crate::common::{task, Future, Poll};
use crate::{Request, Response};
/// An asynchronous function from `Request` to `Response`.
pub trait HttpService<ReqBody>: sealed::Sealed<ReqBody> {
/// The `Payload` body of the `http::Response`.
type ResBody: Payload;
/// The `HttpBody` body of the `http::Response`.
type ResBody: HttpBody;
/// The error type that can occur within this `Service`.
///
@@ -29,7 +29,7 @@ pub trait HttpService<ReqBody>: sealed::Sealed<ReqBody> {
impl<T, B1, B2> HttpService<B1> for T
where
T: tower_service::Service<Request<B1>, Response = Response<B2>>,
B2: Payload,
B2: HttpBody,
T::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type ResBody = B2;
@@ -49,7 +49,7 @@ where
impl<T, B1, B2> sealed::Sealed<B1> for T
where
T: tower_service::Service<Request<B1>, Response = Response<B2>>,
B2: Payload,
B2: HttpBody,
{
}

View File

@@ -4,7 +4,7 @@ use std::fmt;
use tokio::io::{AsyncRead, AsyncWrite};
use super::{HttpService, Service};
use crate::body::Payload;
use crate::body::HttpBody;
use crate::common::{task, Future, Poll};
// The same "trait alias" as tower::MakeConnection, but inlined to reduce
@@ -41,7 +41,7 @@ where
// Just a sort-of "trait alias" of `MakeService`, not to be implemented
// by anyone, only used as bounds.
pub trait MakeServiceRef<Target, ReqBody>: self::sealed::Sealed<(Target, ReqBody)> {
type ResBody: Payload;
type ResBody: HttpBody;
type Error: Into<Box<dyn StdError + Send + Sync>>;
type Service: HttpService<ReqBody, ResBody = Self::ResBody, Error = Self::Error>;
type MakeError: Into<Box<dyn StdError + Send + Sync>>;
@@ -70,8 +70,8 @@ where
ME: Into<Box<dyn StdError + Send + Sync>>,
S: HttpService<IB, ResBody = OB, Error = E>,
F: Future<Output = Result<S, ME>>,
IB: Payload,
OB: Payload,
IB: HttpBody,
OB: HttpBody,
{
type Error = E;
type Service = S;
@@ -94,8 +94,8 @@ impl<T, Target, S, B1, B2> self::sealed::Sealed<(Target, B1)> for T
where
T: for<'a> Service<&'a Target, Response = S>,
S: HttpService<B1, ResBody = B2>,
B1: Payload,
B2: Payload,
B1: HttpBody,
B2: HttpBody,
{
}

View File

@@ -2,7 +2,7 @@ use std::error::Error as StdError;
use std::fmt;
use std::marker::PhantomData;
use crate::body::Payload;
use crate::body::HttpBody;
use crate::common::{task, Future, Poll};
use crate::{Request, Response};
@@ -45,10 +45,10 @@ impl<F, ReqBody, Ret, ResBody, E> tower_service::Service<crate::Request<ReqBody>
for ServiceFn<F, ReqBody>
where
F: FnMut(Request<ReqBody>) -> Ret,
ReqBody: Payload,
ReqBody: HttpBody,
Ret: Future<Output = Result<Response<ResBody>, E>>,
E: Into<Box<dyn StdError + Send + Sync>>,
ResBody: Payload,
ResBody: HttpBody,
{
type Response = crate::Response<ResBody>;
type Error = E;

View File

@@ -258,7 +258,7 @@ mod response_body_lengths {
fn auto_response_with_unknown_length() {
run_test(TestCase {
version: 1,
// no headers means trying to guess from Payload
// no headers means trying to guess from HttpBody
headers: &[],
body: Bd::Unknown("foo bar baz"),
expects_chunked: true,
@@ -270,7 +270,7 @@ mod response_body_lengths {
fn auto_response_with_known_length() {
run_test(TestCase {
version: 1,
// no headers means trying to guess from Payload
// no headers means trying to guess from HttpBody
headers: &[],
body: Bd::Known("foo bar baz"),
expects_chunked: false,
@@ -282,7 +282,7 @@ mod response_body_lengths {
fn auto_response_known_empty() {
run_test(TestCase {
version: 1,
// no headers means trying to guess from Payload
// no headers means trying to guess from HttpBody
headers: &[],
body: Bd::Known(""),
expects_chunked: false,
@@ -294,7 +294,7 @@ mod response_body_lengths {
fn http10_auto_response_with_unknown_length() {
run_test(TestCase {
version: 0,
// no headers means trying to guess from Payload
// no headers means trying to guess from HttpBody
headers: &[],
body: Bd::Unknown("foo bar baz"),
expects_chunked: false,