diff --git a/Cargo.toml b/Cargo.toml index b4e72716..0ed78a14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ pin-utils = "0.1.0-alpha.4" time = "0.1" tokio = { version = "0.2.0-alpha.2", optional = true, default-features = false, features = ["rt-full"] } #tokio-buf = "0.2.0-alpha.1" +tower-service = "=0.3.0-alpha.1" tokio-executor = "0.2.0-alpha.2" tokio-io = "0.2.0-alpha.2" tokio-sync = "0.2.0-alpha.2" diff --git a/examples/tower_server.rs b/examples/tower_server.rs new file mode 100644 index 00000000..a48fc7f4 --- /dev/null +++ b/examples/tower_server.rs @@ -0,0 +1,70 @@ +#![feature(async_await)] +#![deny(warnings)] + +use hyper::{Body, Request, Response, Server}; +use tower_service::Service; +use futures_util::future; +use std::task::{Context, Poll}; + +const ROOT: &'static str = "/"; + +#[derive(Debug)] +pub struct Svc; + +impl Service> for Svc { + type Response = Response; + type Error = hyper::Error; + type Future = future::Ready>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + + fn call(&mut self, req: Request) -> Self::Future { + let mut rsp = Response::builder(); + + let uri = req.uri(); + if uri.path() != ROOT { + let body = Body::from(Vec::new()); + let rsp = rsp.status(404).body(body).unwrap(); + return future::ok(rsp); + } + + let body = Body::from(Vec::from(&b"heyo!"[..])); + let rsp = rsp.status(200).body(body).unwrap(); + future::ok(rsp) + } +} + +pub struct MakeSvc; + +impl Service for MakeSvc { + type Response = Svc; + type Error = std::io::Error; + type Future = future::Ready>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + + fn call(&mut self, _: T) -> Self::Future { + future::ok(Svc) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + + let addr = "127.0.0.1:1337".parse().unwrap(); + + + let server = Server::bind(&addr) + .serve(MakeSvc); + + println!("Listening on http://{}", addr); + + server.await?; + + Ok(()) +} diff --git a/src/common/exec.rs b/src/common/exec.rs index a91333f4..baf36505 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use tokio_executor::{SpawnError, TypedExecutor}; -use crate::body::Payload; +use crate::body::{Payload, Body}; use crate::proto::h2::server::H2Stream; use crate::server::conn::spawn_all::{NewSvcTask, Watcher}; use crate::service::Service; @@ -14,7 +14,7 @@ pub trait H2Exec: Clone { fn execute_h2stream(&mut self, fut: H2Stream) -> crate::Result<()>; } -pub trait NewSvcExec>: Clone { +pub trait NewSvcExec, E, W: Watcher>: Clone { fn execute_new_svc(&mut self, fut: NewSvcTask) -> crate::Result<()>; } @@ -119,7 +119,7 @@ where impl NewSvcExec for Exec where NewSvcTask: Future + Send + 'static, - S: Service, + S: Service, W: Watcher, { fn execute_new_svc(&mut self, fut: NewSvcTask) -> crate::Result<()> { @@ -148,7 +148,7 @@ impl NewSvcExec for E where E: TypedExecutor> + Clone, NewSvcTask: Future, - S: Service, + S: Service, W: Watcher, { fn execute_new_svc(&mut self, fut: NewSvcTask) -> crate::Result<()> { diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 201372d8..db08e495 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -30,7 +30,7 @@ pub(crate) trait Dispatch { fn should_poll(&self) -> bool; } -pub struct Server { +pub struct Server, B> { in_flight: Pin>>, pub(crate) service: S, } @@ -412,11 +412,11 @@ impl<'a, T> Drop for OptGuard<'a, T> { // ===== impl Server ===== -impl Server +impl Server where - S: Service, + S: Service, { - pub fn new(service: S) -> Server { + pub fn new(service: S) -> Server { Server { in_flight: Box::pin(None), service: service, @@ -429,11 +429,11 @@ where } // Service is never pinned -impl Unpin for Server {} +impl, B> Unpin for Server {} -impl Dispatch for Server +impl Dispatch for Server where - S: Service, + S: Service, S::Error: Into>, Bs: Payload, { diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 13678454..23600cc3 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -20,7 +20,7 @@ use crate::{Body, Response}; pub(crate) struct Server where - S: Service, + S: Service, B: Payload, { exec: E, @@ -29,7 +29,7 @@ where } // TODO: fix me -impl Unpin for Server {} +impl, B: Payload, E> Unpin for Server {} enum State where @@ -52,7 +52,7 @@ where impl Server where T: AsyncRead + AsyncWrite + Unpin, - S: Service, + S: Service, S::Error: Into>, B: Payload, B::Data: Unpin, @@ -90,7 +90,7 @@ where impl Future for Server where T: AsyncRead + AsyncWrite + Unpin, - S: Service, + S: Service, S::Error: Into>, B: Payload, B::Data: Unpin, @@ -133,7 +133,7 @@ where fn poll_server(&mut self, cx: &mut task::Context<'_>, service: &mut S, exec: &mut E) -> Poll> where S: Service< - ReqBody=Body, + Body, ResBody=B, >, S::Error: Into>, diff --git a/src/server/conn.rs b/src/server/conn.rs index 11005d9c..5b01a32b 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -102,11 +102,11 @@ pub(super) struct SpawnAll { #[must_use = "futures do nothing unless polled"] pub struct Connection where - S: Service, + S: Service, { pub(super) conn: Option, + proto::h1::dispatch::Server, S::ResBody, T, proto::ServerTransaction, @@ -341,7 +341,7 @@ impl Http { /// # async fn run(some_io: I, some_service: S) /// # where /// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - /// # S: Service + Send + 'static, + /// # S: Service + Send + 'static, /// # S::Future: Send /// # { /// let http = Http::new(); @@ -355,7 +355,7 @@ impl Http { /// ``` pub fn serve_connection(&self, io: I, service: S) -> Connection where - S: Service, + S: Service, S::Error: Into>, Bd: Payload, Bd::Data: Unpin, @@ -409,12 +409,12 @@ impl Http { where S: MakeServiceRef< AddrStream, - ReqBody=Body, + Body, ResBody=Bd, >, S::Error: Into>, Bd: Payload, - E: H2Exec<::Future, Bd>, + E: H2Exec<>::Future, Bd>, { let mut incoming = AddrIncoming::new(addr, None)?; if self.keep_alive { @@ -434,12 +434,12 @@ impl Http { where S: MakeServiceRef< AddrStream, - ReqBody=Body, + Body, ResBody=Bd, >, S::Error: Into>, Bd: Payload, - E: H2Exec<::Future, Bd>, + E: H2Exec<>::Future, Bd>, { let mut incoming = AddrIncoming::new(addr, Some(handle))?; if self.keep_alive { @@ -456,12 +456,12 @@ impl Http { IO: AsyncRead + AsyncWrite + Unpin, S: MakeServiceRef< IO, - ReqBody=Body, + Body, ResBody=Bd, >, S::Error: Into>, Bd: Payload, - E: H2Exec<::Future, Bd>, + E: H2Exec<>::Future, Bd>, { Serve { incoming, @@ -476,7 +476,7 @@ impl Http { impl Connection where - S: Service, + S: Service, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Payload + 'static, @@ -627,7 +627,7 @@ where impl Future for Connection where - S: Service + 'static, + S: Service + 'static, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, B: Payload + 'static, @@ -665,7 +665,7 @@ where impl fmt::Debug for Connection where - S: Service, + S: Service, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Connection") @@ -703,11 +703,11 @@ where I: Stream>, IO: AsyncRead + AsyncWrite + Unpin, IE: Into>, - S: MakeServiceRef, + S: MakeServiceRef, //S::Error2: Into>, //SME: Into>, B: Payload, - E: H2Exec<::Future, B>, + E: H2Exec<>::Future, B>, { type Item = crate::Result>; @@ -745,7 +745,7 @@ impl Future for Connecting where I: AsyncRead + AsyncWrite + Unpin, F: Future>, - S: Service, + S: Service, B: Payload, B::Data: Unpin, E: H2Exec, @@ -781,11 +781,11 @@ where IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: MakeServiceRef< IO, - ReqBody=Body, + Body, ResBody=B, >, B: Payload, - E: H2Exec<::Future, B>, + E: H2Exec<>::Future, B>, { pub(super) fn poll_watch(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, watcher: &W) -> Poll> where @@ -842,7 +842,7 @@ pub(crate) mod spawn_all { // The `Server::with_graceful_shutdown` needs to keep track of all active // connections, and signal that they start to shutdown when prompted, so // it has a `GracefulWatcher` implementation to do that. - pub trait Watcher: Clone { + pub trait Watcher, E>: Clone { type Future: Future>; fn watch(&self, conn: UpgradeableConnection) -> Self::Future; @@ -855,7 +855,7 @@ pub(crate) mod spawn_all { impl Watcher for NoopWatcher where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: Service + 'static, + S: Service + 'static, ::Data: Unpin, E: H2Exec, { @@ -876,16 +876,16 @@ pub(crate) mod spawn_all { // Users cannot import this type, nor the associated `NewSvcExec`. Instead, // a blanket implementation for `Executor` is sufficient. #[allow(missing_debug_implementations)] - pub struct NewSvcTask> { + pub struct NewSvcTask, E, W: Watcher> { state: State, } - enum State> { + enum State, E, W: Watcher> { Connecting(Connecting, W), Connected(W::Future), } - impl> NewSvcTask { + impl, E, W: Watcher> NewSvcTask { pub(super) fn new(connecting: Connecting, watcher: W) -> Self { NewSvcTask { state: State::Connecting(connecting, watcher), @@ -898,7 +898,7 @@ pub(crate) mod spawn_all { I: AsyncRead + AsyncWrite + Unpin + Send + 'static, N: Future>, NE: Into>, - S: Service, + S: Service, B: Payload, B::Data: Unpin, E: H2Exec, @@ -955,14 +955,14 @@ mod upgrades { #[allow(missing_debug_implementations)] pub struct UpgradeableConnection where - S: Service, + S: Service, { pub(super) inner: Connection, } impl UpgradeableConnection where - S: Service,// + 'static, + S: Service,// + 'static, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Payload + 'static, @@ -980,7 +980,7 @@ mod upgrades { impl Future for UpgradeableConnection where - S: Service + 'static, + S: Service + 'static, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Payload + 'static, diff --git a/src/server/mod.rs b/src/server/mod.rs index af341809..5005376b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -150,12 +150,12 @@ where I: Stream>, IE: Into>, IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, + S: MakeServiceRef, S::Error: Into>, S::Service: 'static, B: Payload, B::Data: Unpin, - E: H2Exec<::Future, B>, + E: H2Exec<>::Future, B>, E: NewSvcExec, { /// Prepares a server to handle graceful shutdown when the provided future @@ -208,12 +208,12 @@ where I: Stream>, IE: Into>, IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, + S: MakeServiceRef, S::Error: Into>, S::Service: 'static, B: Payload, B::Data: Unpin, - E: H2Exec<::Future, B>, + E: H2Exec<>::Future, B>, E: NewSvcExec, { type Output = crate::Result<()>; @@ -407,13 +407,13 @@ impl Builder { I: Stream>, IE: Into>, IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, + S: MakeServiceRef, S::Error: Into>, S::Service: 'static, B: Payload, B::Data: Unpin, E: NewSvcExec, - E: H2Exec<::Future, B>, + E: H2Exec<>::Future, B>, { let serve = self.protocol.serve_incoming(self.incoming, new_service); let spawn_all = serve.spawn_all(); diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index f47a7042..17b69fe3 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -43,13 +43,13 @@ where I: Stream>, IE: Into>, IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, + S: MakeServiceRef, S::Service: 'static, S::Error: Into>, B: Payload, B::Data: Unpin, F: Future, - E: H2Exec<::Future, B>, + E: H2Exec<>::Future, B>, E: NewSvcExec, { type Output = crate::Result<()>; @@ -98,7 +98,7 @@ pub struct GracefulWatcher(Watch); impl Watcher for GracefulWatcher where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: Service + 'static, + S: Service + 'static, ::Data: Unpin, E: H2Exec, { @@ -114,7 +114,7 @@ where fn on_drain(conn: Pin<&mut UpgradeableConnection>) where - S: Service, + S: Service, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, S::ResBody: Payload + 'static, diff --git a/src/service/make_service.rs b/src/service/make_service.rs index 09524833..c19ccadb 100644 --- a/src/service/make_service.rs +++ b/src/service/make_service.rs @@ -6,10 +6,7 @@ use crate::common::{Future, Poll, task}; use super::Service; /// An asynchronous constructor of `Service`s. -pub trait MakeService { - /// The `Payload` body of the `http::Request`. - type ReqBody: Payload; - +pub trait MakeService: sealed::Sealed { /// The `Payload` body of the `http::Response`. type ResBody: Payload; @@ -18,7 +15,7 @@ pub trait MakeService { /// The resolved `Service` from `new_service()`. type Service: Service< - ReqBody=Self::ReqBody, + ReqBody, ResBody=Self::ResBody, Error=Self::Error, >; @@ -42,15 +39,46 @@ pub trait MakeService { fn make_service(&mut self, target: Target) -> Self::Future; } +impl MakeService for T +where + T: for<'a> tower_service::Service<&'a Target, Response = S, Error = E, Future = F>, + S: tower_service::Service, Response = crate::Response>, + E: Into>, + S::Error: Into>, + B1: Payload, + B2: Payload, + F: Future>, +{ + type ResBody = B2; + type Error = S::Error; + type Service = S; + type Future = F; + type MakeError = E; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + tower_service::Service::poll_ready(self, cx) + } + + fn make_service(&mut self, req: Target) -> Self::Future { + tower_service::Service::call(self, &req) + } +} + +impl sealed::Sealed for T +where + T: for<'a> tower_service::Service<&'a Target, Response = S>, + S: tower_service::Service, Response = crate::Response> +{ +} + // Just a sort-of "trait alias" of `MakeService`, not to be implemented // by anyone, only used as bounds. #[doc(hidden)] -pub trait MakeServiceRef: self::sealed::Sealed { - type ReqBody: Payload; +pub trait MakeServiceRef: self::sealed::Sealed { type ResBody: Payload; type Error: Into>; type Service: Service< - ReqBody=Self::ReqBody, + ReqBody, ResBody=Self::ResBody, Error=Self::Error, >; @@ -73,19 +101,18 @@ pub trait MakeServiceRef: self::sealed::Sealed { fn make_service_ref(&mut self, target: &Target) -> Self::Future; } -impl MakeServiceRef for T +impl MakeServiceRef for T where - T: for<'a> MakeService<&'a Target, Error=E, MakeError=ME, Service=S, Future=F, ReqBody=IB, ResBody=OB>, + T: for<'a> tower_service::Service<&'a Target, Error=ME, Response=S, Future=F>, E: Into>, ME: Into>, - S: Service, + S: tower_service::Service, Response=crate::Response, Error=E>, F: Future>, IB: Payload, OB: Payload, { type Error = E; type Service = S; - type ReqBody = IB; type ResBody = OB; type MakeError = ME; type Future = F; @@ -97,22 +124,10 @@ where } fn make_service_ref(&mut self, target: &Target) -> Self::Future { - self.make_service(target) + self.call(target) } } -impl self::sealed::Sealed for T -where - T: for<'a> MakeService<&'a Target, Error=E, MakeError=ME, Service=S, Future=F, ReqBody=IB, ResBody=OB>, - E: Into>, - ME: Into>, - S: Service, - F: Future>, - IB: Payload, - OB: Payload, -{} - - /// Create a `MakeService` from a function. /// /// # Example @@ -167,23 +182,21 @@ pub struct MakeServiceFn { f: F, } -impl<'t, F, Target, Ret, ReqBody, ResBody, Svc, MkErr> MakeService<&'t Target> for MakeServiceFn +impl<'t, F, Ret, Target, Svc, MkErr> tower_service::Service<&'t Target> for MakeServiceFn where F: FnMut(&Target) -> Ret, Ret: Future>, - Svc: Service, MkErr: Into>, - ReqBody: Payload, - ResBody: Payload, { - type ReqBody = ReqBody; - type ResBody = ResBody; - type Error = Svc::Error; - type Service = Svc; + type Error = MkErr; + type Response = Svc; type Future = Ret; - type MakeError = MkErr; - fn make_service(&mut self, target: &'t Target) -> Self::Future { + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, target: &'t Target) -> Self::Future { (self.f)(target) } } @@ -196,7 +209,7 @@ impl fmt::Debug for MakeServiceFn { } mod sealed { - pub trait Sealed {} + pub trait Sealed {} pub trait CantImpl {} diff --git a/src/service/service.rs b/src/service/service.rs index e0230149..616d4f70 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -7,10 +7,7 @@ use crate::common::{Future, Never, Poll, task}; use crate::{Request, Response}; /// An asynchronous function from `Request` to `Response`. -pub trait Service { - /// The `Payload` body of the `http::Request`. - type ReqBody: Payload; - +pub trait Service: sealed::Sealed { /// The `Payload` body of the `http::Response`. type ResBody: Payload; @@ -34,7 +31,37 @@ pub trait Service { } /// Calls this `Service` with a request, returning a `Future` of the response. - fn call(&mut self, req: Request) -> Self::Future; + fn call(&mut self, req: Request) -> Self::Future; +} + +impl Service for T +where + T: tower_service::Service, Response = Response>, + B2: Payload, + T::Error: Into>, +{ + type ResBody = B2; + + type Error = T::Error; + type Future = T::Future; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + tower_service::Service::poll_ready(self, cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + tower_service::Service::call(self, req) + } +} + +impl sealed::Sealed for T +where + T: tower_service::Service, Response = Response>, + B2: Payload, +{} + +mod sealed { + pub trait Sealed {} } @@ -74,7 +101,7 @@ pub struct ServiceFn { _req: PhantomData, } -impl Service for ServiceFn +impl tower_service::Service> for ServiceFn where F: FnMut(Request) -> Ret, ReqBody: Payload, @@ -82,12 +109,15 @@ where E: Into>, ResBody: Payload, { - type ReqBody = ReqBody; - type ResBody = ResBody; + type Response = crate::Response; type Error = E; type Future = Ret; - fn call(&mut self, req: Request) -> Self::Future { + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { (self.f)(req) } } diff --git a/tests/server.rs b/tests/server.rs index d246cac7..53ccb119 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -39,7 +39,7 @@ use hyper::{Body, Request, Response, StatusCode, Version}; use hyper::client::Client; use hyper::server::conn::Http; use hyper::server::Server; -use hyper::service::{make_service_fn, service_fn, Service}; +use hyper::service::{make_service_fn, service_fn}; #[test] @@ -1628,19 +1628,18 @@ fn http2_body_user_error_sends_reset_reason() { struct Svc; -impl hyper::service::Service for Svc { - type ReqBody = hyper::Body; - type ResBody = hyper::Body; +impl tower_service::Service> for Svc { + type Response = Response; type Error = h2::Error; type Future = Box, Self::Error> + Output = Result > + Send + Sync + Unpin>; fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll> { Poll::Ready(Err::<(), _>(h2::Error::from(h2::Reason::INADEQUATE_SECURITY))) } - fn call(&mut self, _: hyper::Request) -> Self::Future { + fn call(&mut self, _: hyper::Request) -> Self::Future { unreachable!("poll_ready error should have shutdown conn"); } } @@ -1853,12 +1852,15 @@ enum Msg { End, } -impl Service for TestService { - type ReqBody = Body; - type ResBody = Body; +impl tower_service::Service> for TestService { + type Response = Response; type Error = BoxError; type Future = BoxFuture<'static, Result, BoxError>>; + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + fn call(&mut self, req: Request) -> Self::Future { let tx1 = self.tx.clone(); let tx2 = self.tx.clone(); @@ -1913,11 +1915,14 @@ const HELLO: &'static str = "hello"; struct HelloWorld; -impl Service for HelloWorld { - type ReqBody = Body; - type ResBody = Body; +impl tower_service::Service> for HelloWorld { + type Response = Response; type Error = hyper::Error; - type Future = BoxFuture<'static, Result, Self::Error>>; + type Future = BoxFuture<'static, Result, Self::Error>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } fn call(&mut self, _req: Request) -> Self::Future { let response = Response::new(HELLO.into());