feat(service): use tower_service::Service for hyper::service

This commit is contained in:
Lucio Franco
2019-08-20 15:01:06 -04:00
committed by Sean McArthur
parent 53a437c382
commit ec520d5602
11 changed files with 230 additions and 111 deletions

View File

@@ -38,6 +38,7 @@ pin-utils = "0.1.0-alpha.4"
time = "0.1"
tokio = { version = "0.2.0-alpha.2", optional = true, default-features = false, features = ["rt-full"] }
#tokio-buf = "0.2.0-alpha.1"
tower-service = "=0.3.0-alpha.1"
tokio-executor = "0.2.0-alpha.2"
tokio-io = "0.2.0-alpha.2"
tokio-sync = "0.2.0-alpha.2"

70
examples/tower_server.rs Normal file
View File

@@ -0,0 +1,70 @@
#![feature(async_await)]
#![deny(warnings)]
use hyper::{Body, Request, Response, Server};
use tower_service::Service;
use futures_util::future;
use std::task::{Context, Poll};
const ROOT: &'static str = "/";
#[derive(Debug)]
pub struct Svc;
impl Service<Request<Body>> for Svc {
type Response = Response<Body>;
type Error = hyper::Error;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
let mut rsp = Response::builder();
let uri = req.uri();
if uri.path() != ROOT {
let body = Body::from(Vec::new());
let rsp = rsp.status(404).body(body).unwrap();
return future::ok(rsp);
}
let body = Body::from(Vec::from(&b"heyo!"[..]));
let rsp = rsp.status(200).body(body).unwrap();
future::ok(rsp)
}
}
pub struct MakeSvc;
impl<T> Service<T> for MakeSvc {
type Response = Svc;
type Error = std::io::Error;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, _: T) -> Self::Future {
future::ok(Svc)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
let server = Server::bind(&addr)
.serve(MakeSvc);
println!("Listening on http://{}", addr);
server.await?;
Ok(())
}

View File

@@ -5,7 +5,7 @@ use std::sync::Arc;
use tokio_executor::{SpawnError, TypedExecutor};
use crate::body::Payload;
use crate::body::{Payload, Body};
use crate::proto::h2::server::H2Stream;
use crate::server::conn::spawn_all::{NewSvcTask, Watcher};
use crate::service::Service;
@@ -14,7 +14,7 @@ pub trait H2Exec<F, B: Payload>: Clone {
fn execute_h2stream(&mut self, fut: H2Stream<F, B>) -> crate::Result<()>;
}
pub trait NewSvcExec<I, N, S: Service, E, W: Watcher<I, S, E>>: Clone {
pub trait NewSvcExec<I, N, S: Service<Body>, E, W: Watcher<I, S, E>>: Clone {
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()>;
}
@@ -119,7 +119,7 @@ where
impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec
where
NewSvcTask<I, N, S, E, W>: Future<Output=()> + Send + 'static,
S: Service,
S: Service<Body>,
W: Watcher<I, S, E>,
{
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()> {
@@ -148,7 +148,7 @@ impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E
where
E: TypedExecutor<NewSvcTask<I, N, S, E, W>> + Clone,
NewSvcTask<I, N, S, E, W>: Future<Output=()>,
S: Service,
S: Service<Body>,
W: Watcher<I, S, E>,
{
fn execute_new_svc(&mut self, fut: NewSvcTask<I, N, S, E, W>) -> crate::Result<()> {

View File

@@ -30,7 +30,7 @@ pub(crate) trait Dispatch {
fn should_poll(&self) -> bool;
}
pub struct Server<S: Service> {
pub struct Server<S: Service<B>, B> {
in_flight: Pin<Box<Option<S::Future>>>,
pub(crate) service: S,
}
@@ -412,11 +412,11 @@ impl<'a, T> Drop for OptGuard<'a, T> {
// ===== impl Server =====
impl<S> Server<S>
impl<S, B> Server<S, B>
where
S: Service,
S: Service<B>,
{
pub fn new(service: S) -> Server<S> {
pub fn new(service: S) -> Server<S, B> {
Server {
in_flight: Box::pin(None),
service: service,
@@ -429,11 +429,11 @@ where
}
// Service is never pinned
impl<S: Service> Unpin for Server<S> {}
impl<S: Service<B>, B> Unpin for Server<S, B> {}
impl<S, Bs> Dispatch for Server<S>
impl<S, Bs> Dispatch for Server<S, Body>
where
S: Service<ReqBody=Body, ResBody=Bs>,
S: Service<Body, ResBody=Bs>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bs: Payload,
{

View File

@@ -20,7 +20,7 @@ use crate::{Body, Response};
pub(crate) struct Server<T, S, B, E>
where
S: Service,
S: Service<Body>,
B: Payload,
{
exec: E,
@@ -29,7 +29,7 @@ where
}
// TODO: fix me
impl<T, S: Service, B: Payload, E> Unpin for Server<T, S, B, E> {}
impl<T, S: Service<Body>, B: Payload, E> Unpin for Server<T, S, B, E> {}
enum State<T, B>
where
@@ -52,7 +52,7 @@ where
impl<T, S, B, E> Server<T, S, B, E>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<ReqBody=Body, ResBody=B>,
S: Service<Body, ResBody=B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
B::Data: Unpin,
@@ -90,7 +90,7 @@ where
impl<T, S, B, E> Future for Server<T, S, B, E>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<ReqBody=Body, ResBody=B>,
S: Service<Body, ResBody=B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
B::Data: Unpin,
@@ -133,7 +133,7 @@ where
fn poll_server<S, E>(&mut self, cx: &mut task::Context<'_>, service: &mut S, exec: &mut E) -> Poll<crate::Result<()>>
where
S: Service<
ReqBody=Body,
Body,
ResBody=B,
>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,

View File

@@ -102,11 +102,11 @@ pub(super) struct SpawnAll<I, S, E> {
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, S, E = Exec>
where
S: Service,
S: Service<Body>,
{
pub(super) conn: Option<Either<
proto::h1::Dispatcher<
proto::h1::dispatch::Server<S>,
proto::h1::dispatch::Server<S, Body>,
S::ResBody,
T,
proto::ServerTransaction,
@@ -341,7 +341,7 @@ impl<E> Http<E> {
/// # async fn run<I, S>(some_io: I, some_service: S)
/// # where
/// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
/// # S: Service<ReqBody=Body, ResBody=Body> + Send + 'static,
/// # S: Service<Body, ResBody=Body> + Send + 'static,
/// # S::Future: Send
/// # {
/// let http = Http::new();
@@ -355,7 +355,7 @@ impl<E> Http<E> {
/// ```
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S, E>
where
S: Service<ReqBody=Body, ResBody=Bd>,
S: Service<Body, ResBody=Bd>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bd: Payload,
Bd::Data: Unpin,
@@ -409,12 +409,12 @@ impl<E> Http<E> {
where
S: MakeServiceRef<
AddrStream,
ReqBody=Body,
Body,
ResBody=Bd,
>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bd: Payload,
E: H2Exec<<S::Service as Service>::Future, Bd>,
E: H2Exec<<S::Service as Service<Body>>::Future, Bd>,
{
let mut incoming = AddrIncoming::new(addr, None)?;
if self.keep_alive {
@@ -434,12 +434,12 @@ impl<E> Http<E> {
where
S: MakeServiceRef<
AddrStream,
ReqBody=Body,
Body,
ResBody=Bd,
>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bd: Payload,
E: H2Exec<<S::Service as Service>::Future, Bd>,
E: H2Exec<<S::Service as Service<Body>>::Future, Bd>,
{
let mut incoming = AddrIncoming::new(addr, Some(handle))?;
if self.keep_alive {
@@ -456,12 +456,12 @@ impl<E> Http<E> {
IO: AsyncRead + AsyncWrite + Unpin,
S: MakeServiceRef<
IO,
ReqBody=Body,
Body,
ResBody=Bd,
>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
Bd: Payload,
E: H2Exec<<S::Service as Service>::Future, Bd>,
E: H2Exec<<S::Service as Service<Body>>::Future, Bd>,
{
Serve {
incoming,
@@ -476,7 +476,7 @@ impl<E> Http<E> {
impl<I, B, S, E> Connection<I, S, E>
where
S: Service<ReqBody=Body, ResBody=B>,
S: Service<Body, ResBody=B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
B: Payload + 'static,
@@ -627,7 +627,7 @@ where
impl<I, B, S, E> Future for Connection<I, S, E>
where
S: Service<ReqBody=Body, ResBody=B> + 'static,
S: Service<Body, ResBody=B> + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + 'static,
B: Payload + 'static,
@@ -665,7 +665,7 @@ where
impl<I, S> fmt::Debug for Connection<I, S>
where
S: Service,
S: Service<Body>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Connection")
@@ -703,11 +703,11 @@ where
I: Stream<Item = Result<IO, IE>>,
IO: AsyncRead + AsyncWrite + Unpin,
IE: Into<Box<dyn StdError + Send + Sync>>,
S: MakeServiceRef<IO, ReqBody=Body, ResBody=B>,
S: MakeServiceRef<IO, Body, ResBody=B>,
//S::Error2: Into<Box<StdError + Send + Sync>>,
//SME: Into<Box<StdError + Send + Sync>>,
B: Payload,
E: H2Exec<<S::Service as Service>::Future, B>,
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
{
type Item = crate::Result<Connecting<IO, S::Future, E>>;
@@ -745,7 +745,7 @@ impl<I, F, S, FE, E, B> Future for Connecting<I, F, E>
where
I: AsyncRead + AsyncWrite + Unpin,
F: Future<Output=Result<S, FE>>,
S: Service<ReqBody=Body, ResBody=B>,
S: Service<Body, ResBody=B>,
B: Payload,
B::Data: Unpin,
E: H2Exec<S::Future, B>,
@@ -781,11 +781,11 @@ where
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<
IO,
ReqBody=Body,
Body,
ResBody=B,
>,
B: Payload,
E: H2Exec<<S::Service as Service>::Future, B>,
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
{
pub(super) fn poll_watch<W>(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, watcher: &W) -> Poll<crate::Result<()>>
where
@@ -842,7 +842,7 @@ pub(crate) mod spawn_all {
// The `Server::with_graceful_shutdown` needs to keep track of all active
// connections, and signal that they start to shutdown when prompted, so
// it has a `GracefulWatcher` implementation to do that.
pub trait Watcher<I, S: Service, E>: Clone {
pub trait Watcher<I, S: Service<Body>, E>: Clone {
type Future: Future<Output = crate::Result<()>>;
fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future;
@@ -855,7 +855,7 @@ pub(crate) mod spawn_all {
impl<I, S, E> Watcher<I, S, E> for NoopWatcher
where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: Service<ReqBody=Body> + 'static,
S: Service<Body> + 'static,
<S::ResBody as Payload>::Data: Unpin,
E: H2Exec<S::Future, S::ResBody>,
{
@@ -876,16 +876,16 @@ pub(crate) mod spawn_all {
// Users cannot import this type, nor the associated `NewSvcExec`. Instead,
// a blanket implementation for `Executor<impl Future>` is sufficient.
#[allow(missing_debug_implementations)]
pub struct NewSvcTask<I, N, S: Service, E, W: Watcher<I, S, E>> {
pub struct NewSvcTask<I, N, S: Service<Body>, E, W: Watcher<I, S, E>> {
state: State<I, N, S, E, W>,
}
enum State<I, N, S: Service, E, W: Watcher<I, S, E>> {
enum State<I, N, S: Service<Body>, E, W: Watcher<I, S, E>> {
Connecting(Connecting<I, N, E>, W),
Connected(W::Future),
}
impl<I, N, S: Service, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
impl<I, N, S: Service<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self {
NewSvcTask {
state: State::Connecting(connecting, watcher),
@@ -898,7 +898,7 @@ pub(crate) mod spawn_all {
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
N: Future<Output=Result<S, NE>>,
NE: Into<Box<dyn StdError + Send + Sync>>,
S: Service<ReqBody=Body, ResBody=B>,
S: Service<Body, ResBody=B>,
B: Payload,
B::Data: Unpin,
E: H2Exec<S::Future, B>,
@@ -955,14 +955,14 @@ mod upgrades {
#[allow(missing_debug_implementations)]
pub struct UpgradeableConnection<T, S, E>
where
S: Service,
S: Service<Body>,
{
pub(super) inner: Connection<T, S, E>,
}
impl<I, B, S, E> UpgradeableConnection<I, S, E>
where
S: Service<ReqBody=Body, ResBody=B>,// + 'static,
S: Service<Body, ResBody=B>,// + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
B: Payload + 'static,
@@ -980,7 +980,7 @@ mod upgrades {
impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
where
S: Service<ReqBody=Body, ResBody=B> + 'static,
S: Service<Body, ResBody=B> + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + 'static,

View File

@@ -150,12 +150,12 @@ where
I: Stream<Item=Result<IO, IE>>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, ReqBody=Body, ResBody=B>,
S: MakeServiceRef<IO, Body, ResBody=B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Service: 'static,
B: Payload,
B::Data: Unpin,
E: H2Exec<<S::Service as Service>::Future, B>,
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
{
/// Prepares a server to handle graceful shutdown when the provided future
@@ -208,12 +208,12 @@ where
I: Stream<Item=Result<IO, IE>>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, ReqBody=Body, ResBody=B>,
S: MakeServiceRef<IO, Body, ResBody=B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Service: 'static,
B: Payload,
B::Data: Unpin,
E: H2Exec<<S::Service as Service>::Future, B>,
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
{
type Output = crate::Result<()>;
@@ -407,13 +407,13 @@ impl<I, E> Builder<I, E> {
I: Stream<Item=Result<IO, IE>>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, ReqBody=Body, ResBody=B>,
S: MakeServiceRef<IO, Body, ResBody=B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Service: 'static,
B: Payload,
B::Data: Unpin,
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
E: H2Exec<<S::Service as Service>::Future, B>,
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
{
let serve = self.protocol.serve_incoming(self.incoming, new_service);
let spawn_all = serve.spawn_all();

View File

@@ -43,13 +43,13 @@ where
I: Stream<Item=Result<IO, IE>>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, ReqBody=Body, ResBody=B>,
S: MakeServiceRef<IO, Body, ResBody=B>,
S::Service: 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Payload,
B::Data: Unpin,
F: Future<Output=()>,
E: H2Exec<<S::Service as Service>::Future, B>,
E: H2Exec<<S::Service as Service<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
{
type Output = crate::Result<()>;
@@ -98,7 +98,7 @@ pub struct GracefulWatcher(Watch);
impl<I, S, E> Watcher<I, S, E> for GracefulWatcher
where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: Service<ReqBody=Body> + 'static,
S: Service<Body> + 'static,
<S::ResBody as Payload>::Data: Unpin,
E: H2Exec<S::Future, S::ResBody>,
{
@@ -114,7 +114,7 @@ where
fn on_drain<I, S, E>(conn: Pin<&mut UpgradeableConnection<I, S, E>>)
where
S: Service<ReqBody=Body>,
S: Service<Body>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: AsyncRead + AsyncWrite + Unpin,
S::ResBody: Payload + 'static,

View File

@@ -6,10 +6,7 @@ use crate::common::{Future, Poll, task};
use super::Service;
/// An asynchronous constructor of `Service`s.
pub trait MakeService<Target> {
/// The `Payload` body of the `http::Request`.
type ReqBody: Payload;
pub trait MakeService<Target, ReqBody>: sealed::Sealed<Target, ReqBody> {
/// The `Payload` body of the `http::Response`.
type ResBody: Payload;
@@ -18,7 +15,7 @@ pub trait MakeService<Target> {
/// The resolved `Service` from `new_service()`.
type Service: Service<
ReqBody=Self::ReqBody,
ReqBody,
ResBody=Self::ResBody,
Error=Self::Error,
>;
@@ -42,15 +39,46 @@ pub trait MakeService<Target> {
fn make_service(&mut self, target: Target) -> Self::Future;
}
impl<T, Target, S, B1, B2, E, F> MakeService<Target, B1> for T
where
T: for<'a> tower_service::Service<&'a Target, Response = S, Error = E, Future = F>,
S: tower_service::Service<crate::Request<B1>, Response = crate::Response<B2>>,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
B1: Payload,
B2: Payload,
F: Future<Output = Result<S, E>>,
{
type ResBody = B2;
type Error = S::Error;
type Service = S;
type Future = F;
type MakeError = E;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>> {
tower_service::Service::poll_ready(self, cx)
}
fn make_service(&mut self, req: Target) -> Self::Future {
tower_service::Service::call(self, &req)
}
}
impl<T, Target, S, B1, B2> sealed::Sealed<Target, B1> for T
where
T: for<'a> tower_service::Service<&'a Target, Response = S>,
S: tower_service::Service<crate::Request<B1>, Response = crate::Response<B2>>
{
}
// Just a sort-of "trait alias" of `MakeService`, not to be implemented
// by anyone, only used as bounds.
#[doc(hidden)]
pub trait MakeServiceRef<Target>: self::sealed::Sealed<Target> {
type ReqBody: Payload;
pub trait MakeServiceRef<Target, ReqBody>: self::sealed::Sealed<Target, ReqBody> {
type ResBody: Payload;
type Error: Into<Box<dyn StdError + Send + Sync>>;
type Service: Service<
ReqBody=Self::ReqBody,
ReqBody,
ResBody=Self::ResBody,
Error=Self::Error,
>;
@@ -73,19 +101,18 @@ pub trait MakeServiceRef<Target>: self::sealed::Sealed<Target> {
fn make_service_ref(&mut self, target: &Target) -> Self::Future;
}
impl<T, Target, E, ME, S, F, IB, OB> MakeServiceRef<Target> for T
impl<T, Target, E, ME, S, F, IB, OB> MakeServiceRef<Target, IB> for T
where
T: for<'a> MakeService<&'a Target, Error=E, MakeError=ME, Service=S, Future=F, ReqBody=IB, ResBody=OB>,
T: for<'a> tower_service::Service<&'a Target, Error=ME, Response=S, Future=F>,
E: Into<Box<dyn StdError + Send + Sync>>,
ME: Into<Box<dyn StdError + Send + Sync>>,
S: Service<ReqBody=IB, ResBody=OB, Error=E>,
S: tower_service::Service<crate::Request<IB>, Response=crate::Response<OB>, Error=E>,
F: Future<Output=Result<S, ME>>,
IB: Payload,
OB: Payload,
{
type Error = E;
type Service = S;
type ReqBody = IB;
type ResBody = OB;
type MakeError = ME;
type Future = F;
@@ -97,22 +124,10 @@ where
}
fn make_service_ref(&mut self, target: &Target) -> Self::Future {
self.make_service(target)
self.call(target)
}
}
impl<T, Target, E, ME, S, F, IB, OB> self::sealed::Sealed<Target> for T
where
T: for<'a> MakeService<&'a Target, Error=E, MakeError=ME, Service=S, Future=F, ReqBody=IB, ResBody=OB>,
E: Into<Box<dyn StdError + Send + Sync>>,
ME: Into<Box<dyn StdError + Send + Sync>>,
S: Service<ReqBody=IB, ResBody=OB, Error=E>,
F: Future<Output=Result<S, ME>>,
IB: Payload,
OB: Payload,
{}
/// Create a `MakeService` from a function.
///
/// # Example
@@ -167,23 +182,21 @@ pub struct MakeServiceFn<F> {
f: F,
}
impl<'t, F, Target, Ret, ReqBody, ResBody, Svc, MkErr> MakeService<&'t Target> for MakeServiceFn<F>
impl<'t, F, Ret, Target, Svc, MkErr> tower_service::Service<&'t Target> for MakeServiceFn<F>
where
F: FnMut(&Target) -> Ret,
Ret: Future<Output=Result<Svc, MkErr>>,
Svc: Service<ReqBody=ReqBody, ResBody=ResBody>,
MkErr: Into<Box<dyn StdError + Send + Sync>>,
ReqBody: Payload,
ResBody: Payload,
{
type ReqBody = ReqBody;
type ResBody = ResBody;
type Error = Svc::Error;
type Service = Svc;
type Error = MkErr;
type Response = Svc;
type Future = Ret;
type MakeError = MkErr;
fn make_service(&mut self, target: &'t Target) -> Self::Future {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, target: &'t Target) -> Self::Future {
(self.f)(target)
}
}
@@ -196,7 +209,7 @@ impl<F> fmt::Debug for MakeServiceFn<F> {
}
mod sealed {
pub trait Sealed<T> {}
pub trait Sealed<T, B> {}
pub trait CantImpl {}

View File

@@ -7,10 +7,7 @@ use crate::common::{Future, Never, Poll, task};
use crate::{Request, Response};
/// An asynchronous function from `Request` to `Response`.
pub trait Service {
/// The `Payload` body of the `http::Request`.
type ReqBody: Payload;
pub trait Service<ReqBody>: sealed::Sealed<ReqBody> {
/// The `Payload` body of the `http::Response`.
type ResBody: Payload;
@@ -34,7 +31,37 @@ pub trait Service {
}
/// Calls this `Service` with a request, returning a `Future` of the response.
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future;
fn call(&mut self, req: Request<ReqBody>) -> Self::Future;
}
impl<T, B1, B2> Service<B1> for T
where
T: tower_service::Service<Request<B1>, Response = Response<B2>>,
B2: Payload,
T::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type ResBody = B2;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
tower_service::Service::poll_ready(self, cx)
}
fn call(&mut self, req: Request<B1>) -> Self::Future {
tower_service::Service::call(self, req)
}
}
impl<T, B1, B2> sealed::Sealed<B1> for T
where
T: tower_service::Service<Request<B1>, Response = Response<B2>>,
B2: Payload,
{}
mod sealed {
pub trait Sealed<T> {}
}
@@ -74,7 +101,7 @@ pub struct ServiceFn<F, R> {
_req: PhantomData<fn(R)>,
}
impl<F, ReqBody, Ret, ResBody, E> Service for ServiceFn<F, ReqBody>
impl<F, ReqBody, Ret, ResBody, E> tower_service::Service<crate::Request<ReqBody>> for ServiceFn<F, ReqBody>
where
F: FnMut(Request<ReqBody>) -> Ret,
ReqBody: Payload,
@@ -82,12 +109,15 @@ where
E: Into<Box<dyn StdError + Send + Sync>>,
ResBody: Payload,
{
type ReqBody = ReqBody;
type ResBody = ResBody;
type Response = crate::Response<ResBody>;
type Error = E;
type Future = Ret;
fn call(&mut self, req: Request<Self::ReqBody>) -> Self::Future {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
(self.f)(req)
}
}

View File

@@ -39,7 +39,7 @@ use hyper::{Body, Request, Response, StatusCode, Version};
use hyper::client::Client;
use hyper::server::conn::Http;
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn, Service};
use hyper::service::{make_service_fn, service_fn};
#[test]
@@ -1628,19 +1628,18 @@ fn http2_body_user_error_sends_reset_reason() {
struct Svc;
impl hyper::service::Service for Svc {
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
impl tower_service::Service<Request<Body>> for Svc {
type Response = Response<Body>;
type Error = h2::Error;
type Future = Box<dyn futures_core::Future<
Output = Result<hyper::Response<Self::ResBody>, Self::Error>
Output = Result<Self::Response, Self::Error>
> + Send + Sync + Unpin>;
fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err::<(), _>(h2::Error::from(h2::Reason::INADEQUATE_SECURITY)))
}
fn call(&mut self, _: hyper::Request<Self::ResBody>) -> Self::Future {
fn call(&mut self, _: hyper::Request<Body>) -> Self::Future {
unreachable!("poll_ready error should have shutdown conn");
}
}
@@ -1853,12 +1852,15 @@ enum Msg {
End,
}
impl Service for TestService {
type ReqBody = Body;
type ResBody = Body;
impl tower_service::Service<Request<Body>> for TestService {
type Response = Response<Body>;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Response<Body>, BoxError>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
let tx1 = self.tx.clone();
let tx2 = self.tx.clone();
@@ -1913,11 +1915,14 @@ const HELLO: &'static str = "hello";
struct HelloWorld;
impl Service for HelloWorld {
type ReqBody = Body;
type ResBody = Body;
impl tower_service::Service<Request<Body>> for HelloWorld {
type Response = Response<Body>;
type Error = hyper::Error;
type Future = BoxFuture<'static, Result<hyper::Response<Self::ResBody>, Self::Error>>;
type Future = BoxFuture<'static, Result<Response<Body>, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, _req: Request<Body>) -> Self::Future {
let response = Response::new(HELLO.into());