feat(server): change NewService to MakeService with connection context
				
					
				
			This adjusts the way `Service`s are created for a `hyper::Server`. The `MakeService` trait allows receiving an argument when creating a `Service`. The implementation for `hyper::Server` expects to pass a reference to the accepted transport (so, `&Incoming::Item`). The user can inspect the transport before making a `Service`. In practice, this allows for things like getting the remote socket address, or the TLS certification, or similar. To prevent a breaking change, there is a blanket implementation of `MakeService` for any `NewService`. Besides implementing `MakeService` directly, there is also added `hyper::service::make_service_fn`. Closes #1650
This commit is contained in:
		| @@ -25,15 +25,16 @@ use common::exec::{Exec, H2Exec, NewSvcExec}; | ||||
| use common::io::Rewind; | ||||
| use error::{Kind, Parse}; | ||||
| use proto; | ||||
| use service::{NewService, Service}; | ||||
| use service::Service; | ||||
| use upgrade::Upgraded; | ||||
|  | ||||
| pub(super) use self::make_service::MakeServiceRef; | ||||
| pub(super) use self::spawn_all::NoopWatcher; | ||||
| use self::spawn_all::NewSvcTask; | ||||
| pub(super) use self::spawn_all::Watcher; | ||||
| pub(super) use self::upgrades::UpgradeableConnection; | ||||
|  | ||||
| #[cfg(feature = "runtime")] pub use super::tcp::AddrIncoming; | ||||
| #[cfg(feature = "runtime")] pub use super::tcp::{AddrIncoming, AddrStream}; | ||||
|  | ||||
| /// A lower-level configuration of the HTTP protocol. | ||||
| /// | ||||
| @@ -69,13 +70,13 @@ enum ConnectionMode { | ||||
| #[derive(Debug)] | ||||
| pub struct Serve<I, S, E = Exec> { | ||||
|     incoming: I, | ||||
|     new_service: S, | ||||
|     make_service: S, | ||||
|     protocol: Http<E>, | ||||
| } | ||||
|  | ||||
| /// A future building a new `Service` to a `Connection`. | ||||
| /// | ||||
| /// Wraps the future returned from `NewService` into one that returns | ||||
| /// Wraps the future returned from `MakeService` into one that returns | ||||
| /// a `Connection`. | ||||
| #[must_use = "futures do nothing unless polled"] | ||||
| #[derive(Debug)] | ||||
| @@ -349,12 +350,16 @@ impl<E> Http<E> { | ||||
|     /// | ||||
|     /// This method will bind the `addr` provided with a new TCP listener ready | ||||
|     /// to accept connections. Each connection will be processed with the | ||||
|     /// `new_service` object provided, creating a new service per | ||||
|     /// `make_service` object provided, creating a new service per | ||||
|     /// connection. | ||||
|     #[cfg(feature = "runtime")] | ||||
|     pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S, E>> | ||||
|     pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, make_service: S) -> ::Result<Serve<AddrIncoming, S, E>> | ||||
|     where | ||||
|         S: NewService<ReqBody=Body, ResBody=Bd>, | ||||
|         S: MakeServiceRef< | ||||
|             AddrStream, | ||||
|             ReqBody=Body, | ||||
|             ResBody=Bd, | ||||
|         >, | ||||
|         S::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|         Bd: Payload, | ||||
|         E: H2Exec<<S::Service as Service>::Future, Bd>, | ||||
| @@ -363,19 +368,23 @@ impl<E> Http<E> { | ||||
|         if self.keep_alive { | ||||
|             incoming.set_keepalive(Some(Duration::from_secs(90))); | ||||
|         } | ||||
|         Ok(self.serve_incoming(incoming, new_service)) | ||||
|         Ok(self.serve_incoming(incoming, make_service)) | ||||
|     } | ||||
|  | ||||
|     /// Bind the provided `addr` with the `Handle` and return a [`Serve`](Serve) | ||||
|     /// | ||||
|     /// This method will bind the `addr` provided with a new TCP listener ready | ||||
|     /// to accept connections. Each connection will be processed with the | ||||
|     /// `new_service` object provided, creating a new service per | ||||
|     /// `make_service` object provided, creating a new service per | ||||
|     /// connection. | ||||
|     #[cfg(feature = "runtime")] | ||||
|     pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S, E>> | ||||
|     pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, make_service: S) -> ::Result<Serve<AddrIncoming, S, E>> | ||||
|     where | ||||
|         S: NewService<ReqBody=Body, ResBody=Bd>, | ||||
|         S: MakeServiceRef< | ||||
|             AddrStream, | ||||
|             ReqBody=Body, | ||||
|             ResBody=Bd, | ||||
|         >, | ||||
|         S::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|         Bd: Payload, | ||||
|         E: H2Exec<<S::Service as Service>::Future, Bd>, | ||||
| @@ -384,23 +393,27 @@ impl<E> Http<E> { | ||||
|         if self.keep_alive { | ||||
|             incoming.set_keepalive(Some(Duration::from_secs(90))); | ||||
|         } | ||||
|         Ok(self.serve_incoming(incoming, new_service)) | ||||
|         Ok(self.serve_incoming(incoming, make_service)) | ||||
|     } | ||||
|  | ||||
|     /// Bind the provided stream of incoming IO objects with a `NewService`. | ||||
|     pub fn serve_incoming<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S, E> | ||||
|     /// Bind the provided stream of incoming IO objects with a `MakeService`. | ||||
|     pub fn serve_incoming<I, S, Bd>(&self, incoming: I, make_service: S) -> Serve<I, S, E> | ||||
|     where | ||||
|         I: Stream, | ||||
|         I::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|         I::Item: AsyncRead + AsyncWrite, | ||||
|         S: NewService<ReqBody=Body, ResBody=Bd>, | ||||
|         S: MakeServiceRef< | ||||
|             I::Item, | ||||
|             ReqBody=Body, | ||||
|             ResBody=Bd, | ||||
|         >, | ||||
|         S::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|         Bd: Payload, | ||||
|         E: H2Exec<<S::Service as Service>::Future, Bd>, | ||||
|     { | ||||
|         Serve { | ||||
|             incoming: incoming, | ||||
|             new_service: new_service, | ||||
|             incoming, | ||||
|             make_service, | ||||
|             protocol: self.clone(), | ||||
|         } | ||||
|     } | ||||
| @@ -604,8 +617,9 @@ where | ||||
|     I: Stream, | ||||
|     I::Item: AsyncRead + AsyncWrite, | ||||
|     I::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     S: NewService<ReqBody=Body, ResBody=B>, | ||||
|     S::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>, | ||||
|     //S::Error2: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     //SME: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     B: Payload, | ||||
|     E: H2Exec<<S::Service as Service>::Future, B>, | ||||
| { | ||||
| @@ -614,7 +628,7 @@ where | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||
|         if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) { | ||||
|             let new_fut = self.new_service.new_service(); | ||||
|             let new_fut = self.make_service.make_service_ref(&io); | ||||
|             Ok(Async::Ready(Some(Connecting { | ||||
|                 future: new_fut, | ||||
|                 io: Some(io), | ||||
| @@ -666,8 +680,11 @@ where | ||||
|     I: Stream, | ||||
|     I::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     I::Item: AsyncRead + AsyncWrite + Send + 'static, | ||||
|     S: NewService<ReqBody=Body, ResBody=B>, | ||||
|     S::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     S: MakeServiceRef< | ||||
|         I::Item, | ||||
|         ReqBody=Body, | ||||
|         ResBody=B, | ||||
|     >, | ||||
|     B: Payload, | ||||
|     E: H2Exec<<S::Service as Service>::Future, B>, | ||||
| { | ||||
| @@ -873,3 +890,37 @@ mod upgrades { | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub(crate) mod make_service { | ||||
|     use std::error::Error as StdError; | ||||
|  | ||||
|     pub trait MakeServiceRef<Ctx> { | ||||
|         type Error: Into<Box<StdError + Send + Sync>>; | ||||
|         type ReqBody: ::body::Payload; | ||||
|         type ResBody: ::body::Payload; | ||||
|         type Service: ::service::Service<ReqBody=Self::ReqBody, ResBody=Self::ResBody, Error=Self::Error>; | ||||
|         type Future: ::futures::Future<Item=Self::Service>; | ||||
|  | ||||
|         fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future; | ||||
|     } | ||||
|  | ||||
|     impl<T, Ctx, E, ME, S, F, IB, OB> MakeServiceRef<Ctx> for T | ||||
|     where | ||||
|         T: for<'a> ::service::MakeService<&'a Ctx, Error=E, MakeError=ME, Service=S, Future=F, ReqBody=IB, ResBody=OB>, | ||||
|         E: Into<Box<StdError + Send + Sync>>, | ||||
|         ME: Into<Box<StdError + Send + Sync>>, | ||||
|         S: ::service::Service<ReqBody=IB, ResBody=OB, Error=E>, | ||||
|         F: ::futures::Future<Item=S, Error=ME>, | ||||
|         IB: ::body::Payload, | ||||
|         OB: ::body::Payload, | ||||
|     { | ||||
|         type Error = E; | ||||
|         type Service = S; | ||||
|         type ReqBody = IB; | ||||
|         type ResBody = OB; | ||||
|         type Future = F; | ||||
|  | ||||
|         fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future { | ||||
|             self.make_service(ctx) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -11,7 +11,7 @@ | ||||
| //! # Server | ||||
| //! | ||||
| //! The [`Server`](Server) is main way to start listening for HTTP requests. | ||||
| //! It wraps a listener with a [`NewService`](::service), and then should | ||||
| //! It wraps a listener with a [`MakeService`](::service), and then should | ||||
| //! be executed to start serving requests. | ||||
| //! | ||||
| //! [`Server`](Server) accepts connections in both HTTP1 and HTTP2 by default. | ||||
| @@ -30,8 +30,8 @@ | ||||
| //!     // Construct our SocketAddr to listen on... | ||||
| //!     let addr = ([127, 0, 0, 1], 3000).into(); | ||||
| //! | ||||
| //!     // And a NewService to handle each connection... | ||||
| //!     let new_service = || { | ||||
| //!     // And a MakeService to handle each connection... | ||||
| //!     let make_service = || { | ||||
| //!         service_fn_ok(|_req| { | ||||
| //!             Response::new(Body::from("Hello World")) | ||||
| //!         }) | ||||
| @@ -39,7 +39,7 @@ | ||||
| //! | ||||
| //!     // Then bind and serve... | ||||
| //!     let server = Server::bind(&addr) | ||||
| //!         .serve(new_service); | ||||
| //!         .serve(make_service); | ||||
| //! | ||||
| //!     // Finally, spawn `server` onto an Executor... | ||||
| //!     hyper::rt::run(server.map_err(|e| { | ||||
| @@ -65,10 +65,10 @@ use tokio_io::{AsyncRead, AsyncWrite}; | ||||
|  | ||||
| use body::{Body, Payload}; | ||||
| use common::exec::{Exec, H2Exec, NewSvcExec}; | ||||
| use service::{NewService, Service}; | ||||
| use service::Service; | ||||
| // Renamed `Http` as `Http_` for now so that people upgrading don't see an | ||||
| // error that `hyper::server::Http` is private... | ||||
| use self::conn::{Http as Http_, NoopWatcher, SpawnAll}; | ||||
| use self::conn::{Http as Http_, MakeServiceRef, NoopWatcher, SpawnAll}; | ||||
| use self::shutdown::{Graceful, GracefulWatcher}; | ||||
| #[cfg(feature = "runtime")] use self::tcp::AddrIncoming; | ||||
|  | ||||
| @@ -144,7 +144,7 @@ where | ||||
|     I: Stream, | ||||
|     I::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     I::Item: AsyncRead + AsyncWrite + Send + 'static, | ||||
|     S: NewService<ReqBody=Body, ResBody=B>, | ||||
|     S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>, | ||||
|     S::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     S::Service: 'static, | ||||
|     B: Payload, | ||||
| @@ -203,7 +203,7 @@ where | ||||
|     I: Stream, | ||||
|     I::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     I::Item: AsyncRead + AsyncWrite + Send + 'static, | ||||
|     S: NewService<ReqBody=Body, ResBody=B>, | ||||
|     S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>, | ||||
|     S::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     S::Service: 'static, | ||||
|     B: Payload, | ||||
| @@ -332,7 +332,7 @@ impl<I, E> Builder<I, E> { | ||||
|         I: Stream, | ||||
|         I::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|         I::Item: AsyncRead + AsyncWrite + Send + 'static, | ||||
|         S: NewService<ReqBody=Body, ResBody=B>, | ||||
|         S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>, | ||||
|         S::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|         S::Service: 'static, | ||||
|         B: Payload, | ||||
|   | ||||
| @@ -4,8 +4,8 @@ use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| use body::{Body, Payload}; | ||||
| use common::drain::{self, Draining, Signal, Watch, Watching}; | ||||
| use common::exec::{H2Exec, NewSvcExec}; | ||||
| use service::{Service, NewService}; | ||||
| use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; | ||||
| use service::Service; | ||||
| use super::conn::{MakeServiceRef, SpawnAll, UpgradeableConnection, Watcher}; | ||||
|  | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct Graceful<I, S, F, E> { | ||||
| @@ -40,7 +40,7 @@ where | ||||
|     I: Stream, | ||||
|     I::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     I::Item: AsyncRead + AsyncWrite + Send + 'static, | ||||
|     S: NewService<ReqBody=Body, ResBody=B>, | ||||
|     S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>, | ||||
|     S::Service: 'static, | ||||
|     S::Error: Into<Box<::std::error::Error + Send + Sync>>, | ||||
|     B: Payload, | ||||
|   | ||||
| @@ -8,7 +8,7 @@ use tokio_reactor::Handle; | ||||
| use tokio_tcp::TcpListener; | ||||
| use tokio_timer::Delay; | ||||
|  | ||||
| use self::addr_stream::AddrStream; | ||||
| pub use self::addr_stream::AddrStream; | ||||
|  | ||||
| /// A stream of connections from binding to an address. | ||||
| #[must_use = "streams do nothing unless polled"] | ||||
| @@ -194,6 +194,7 @@ mod addr_stream { | ||||
|     use tokio_io::{AsyncRead, AsyncWrite}; | ||||
|  | ||||
|  | ||||
|     /// A transport returned yieled by `AddrIncoming`. | ||||
|     #[derive(Debug)] | ||||
|     pub struct AddrStream { | ||||
|         inner: TcpStream, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user