refactor(server): move non-conn code out of conn.rs
The actual code for `Server` was previously organized very confusingly: it was thrice layered with `SpawnAll` and `Serve` which both appeared in conn.rs despite not having anything to do with the lower-level conn API. This commit changes that, removing all layering and having the code for the higher-level `Server` appear inside `server.rs` only.
This commit is contained in:
		
				
					committed by
					
						 Sean McArthur
						Sean McArthur
					
				
			
			
				
	
			
			
			
						parent
						
							53f15e5870
						
					
				
				
					commit
					0fec1c8737
				
			| @@ -11,7 +11,7 @@ use crate::body::HttpBody; | |||||||
| use crate::proto::h2::server::H2Stream; | use crate::proto::h2::server::H2Stream; | ||||||
| use crate::rt::Executor; | use crate::rt::Executor; | ||||||
| #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] | #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] | ||||||
| use crate::server::conn::spawn_all::{NewSvcTask, Watcher}; | use crate::server::server::{Watcher, new_svc::NewSvcTask}; | ||||||
| #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] | #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] | ||||||
| use crate::service::HttpService; | use crate::service::HttpService; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -48,8 +48,6 @@ | |||||||
|     not(all(feature = "http1", feature = "http2")) |     not(all(feature = "http1", feature = "http2")) | ||||||
| ))] | ))] | ||||||
| use std::marker::PhantomData; | use std::marker::PhantomData; | ||||||
| #[cfg(feature = "tcp")] |  | ||||||
| use std::net::SocketAddr; |  | ||||||
| use std::time::Duration; | use std::time::Duration; | ||||||
|  |  | ||||||
| #[cfg(feature = "http2")] | #[cfg(feature = "http2")] | ||||||
| @@ -70,17 +68,15 @@ cfg_feature! { | |||||||
|     use tokio::io::{AsyncRead, AsyncWrite}; |     use tokio::io::{AsyncRead, AsyncWrite}; | ||||||
|     use tracing::trace; |     use tracing::trace; | ||||||
|  |  | ||||||
|     use super::accept::Accept; |     pub use super::server::Connecting; | ||||||
|     use crate::body::{Body, HttpBody}; |     use crate::body::{Body, HttpBody}; | ||||||
|     use crate::common::{task, Future, Pin, Poll, Unpin}; |     use crate::common::{task, Future, Pin, Poll, Unpin}; | ||||||
|     #[cfg(not(all(feature = "http1", feature = "http2")))] |     #[cfg(not(all(feature = "http1", feature = "http2")))] | ||||||
|     use crate::common::Never; |     use crate::common::Never; | ||||||
|     use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec}; |     use crate::common::exec::{ConnStreamExec, Exec}; | ||||||
|     use crate::proto; |     use crate::proto; | ||||||
|     use crate::service::{HttpService, MakeServiceRef}; |     use crate::service::HttpService; | ||||||
|     use self::spawn_all::NewSvcTask; |  | ||||||
|  |  | ||||||
|     pub(super) use self::spawn_all::{NoopWatcher, Watcher}; |  | ||||||
|     pub(super) use self::upgrades::UpgradeableConnection; |     pub(super) use self::upgrades::UpgradeableConnection; | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -97,7 +93,7 @@ pub use super::tcp::{AddrIncoming, AddrStream}; | |||||||
| #[cfg(any(feature = "http1", feature = "http2"))] | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
| #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] | #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] | ||||||
| pub struct Http<E = Exec> { | pub struct Http<E = Exec> { | ||||||
|     exec: E, |     pub(crate) exec: E, | ||||||
|     h1_half_close: bool, |     h1_half_close: bool, | ||||||
|     h1_keep_alive: bool, |     h1_keep_alive: bool, | ||||||
|     h1_title_case_headers: bool, |     h1_title_case_headers: bool, | ||||||
| @@ -127,51 +123,6 @@ enum ConnectionMode { | |||||||
|     Fallback, |     Fallback, | ||||||
| } | } | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] |  | ||||||
| pin_project! { |  | ||||||
|     /// A stream mapping incoming IOs to new services. |  | ||||||
|     /// |  | ||||||
|     /// Yields `Connecting`s that are futures that should be put on a reactor. |  | ||||||
|     #[must_use = "streams do nothing unless polled"] |  | ||||||
|     #[derive(Debug)] |  | ||||||
|     pub(super) struct Serve<I, S, E = Exec> { |  | ||||||
|         #[pin] |  | ||||||
|         incoming: I, |  | ||||||
|         make_service: S, |  | ||||||
|         protocol: Http<E>, |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] |  | ||||||
| pin_project! { |  | ||||||
|     /// A future building a new `Service` to a `Connection`. |  | ||||||
|     /// |  | ||||||
|     /// Wraps the future returned from `MakeService` into one that returns |  | ||||||
|     /// a `Connection`. |  | ||||||
|     #[must_use = "futures do nothing unless polled"] |  | ||||||
|     #[derive(Debug)] |  | ||||||
|     #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] |  | ||||||
|     pub struct Connecting<I, F, E = Exec> { |  | ||||||
|         #[pin] |  | ||||||
|         future: F, |  | ||||||
|         io: Option<I>, |  | ||||||
|         protocol: Http<E>, |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] |  | ||||||
| pin_project! { |  | ||||||
|     #[must_use = "futures do nothing unless polled"] |  | ||||||
|     #[derive(Debug)] |  | ||||||
|     pub(super) struct SpawnAll<I, S, E> { |  | ||||||
|         // TODO: re-add `pub(super)` once rustdoc can handle this. |  | ||||||
|         // |  | ||||||
|         // See https://github.com/rust-lang/rust/issues/64705 |  | ||||||
|         #[pin] |  | ||||||
|         pub(super) serve: Serve<I, S, E>, |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
| pin_project! { | pin_project! { | ||||||
|     /// A future binding a connection with a Service. |     /// A future binding a connection with a Service. | ||||||
| @@ -719,23 +670,6 @@ impl<E> Http<E> { | |||||||
|             fallback: PhantomData, |             fallback: PhantomData, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(super) fn serve<I, IO, IE, S, Bd>(&self, incoming: I, make_service: S) -> Serve<I, S, E> |  | ||||||
|     where |  | ||||||
|         I: Accept<Conn = IO, Error = IE>, |  | ||||||
|         IE: Into<Box<dyn StdError + Send + Sync>>, |  | ||||||
|         IO: AsyncRead + AsyncWrite + Unpin, |  | ||||||
|         S: MakeServiceRef<IO, Body, ResBody = Bd>, |  | ||||||
|         S::Error: Into<Box<dyn StdError + Send + Sync>>, |  | ||||||
|         Bd: HttpBody, |  | ||||||
|         E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, Bd>, |  | ||||||
|     { |  | ||||||
|         Serve { |  | ||||||
|             incoming, |  | ||||||
|             make_service, |  | ||||||
|             protocol: self.clone(), |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // ===== impl Connection ===== | // ===== impl Connection ===== | ||||||
| @@ -987,141 +921,6 @@ impl Default for ConnectionMode { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| // ===== impl Serve ===== |  | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] |  | ||||||
| impl<I, S, E> Serve<I, S, E> { |  | ||||||
|     /// Get a reference to the incoming stream. |  | ||||||
|     #[inline] |  | ||||||
|     pub(super) fn incoming_ref(&self) -> &I { |  | ||||||
|         &self.incoming |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     /* |  | ||||||
|     /// Get a mutable reference to the incoming stream. |  | ||||||
|     #[inline] |  | ||||||
|     pub fn incoming_mut(&mut self) -> &mut I { |  | ||||||
|         &mut self.incoming |  | ||||||
|     } |  | ||||||
|     */ |  | ||||||
|  |  | ||||||
|     /// Spawn all incoming connections onto the executor in `Http`. |  | ||||||
|     pub(super) fn spawn_all(self) -> SpawnAll<I, S, E> { |  | ||||||
|         SpawnAll { serve: self } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] |  | ||||||
| impl<I, IO, IE, S, B, E> Serve<I, S, E> |  | ||||||
| where |  | ||||||
|     I: Accept<Conn = IO, Error = IE>, |  | ||||||
|     IO: AsyncRead + AsyncWrite + Unpin, |  | ||||||
|     IE: Into<Box<dyn StdError + Send + Sync>>, |  | ||||||
|     S: MakeServiceRef<IO, Body, ResBody = B>, |  | ||||||
|     B: HttpBody, |  | ||||||
|     E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>, |  | ||||||
| { |  | ||||||
|     fn poll_next_( |  | ||||||
|         self: Pin<&mut Self>, |  | ||||||
|         cx: &mut task::Context<'_>, |  | ||||||
|     ) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> { |  | ||||||
|         let me = self.project(); |  | ||||||
|         match ready!(me.make_service.poll_ready_ref(cx)) { |  | ||||||
|             Ok(()) => (), |  | ||||||
|             Err(e) => { |  | ||||||
|                 trace!("make_service closed"); |  | ||||||
|                 return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e)))); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         if let Some(item) = ready!(me.incoming.poll_accept(cx)) { |  | ||||||
|             let io = item.map_err(crate::Error::new_accept)?; |  | ||||||
|             let new_fut = me.make_service.make_service_ref(&io); |  | ||||||
|             Poll::Ready(Some(Ok(Connecting { |  | ||||||
|                 future: new_fut, |  | ||||||
|                 io: Some(io), |  | ||||||
|                 protocol: me.protocol.clone(), |  | ||||||
|             }))) |  | ||||||
|         } else { |  | ||||||
|             Poll::Ready(None) |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // ===== impl Connecting ===== |  | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] |  | ||||||
| impl<I, F, S, FE, E, B> Future for Connecting<I, F, E> |  | ||||||
| where |  | ||||||
|     I: AsyncRead + AsyncWrite + Unpin, |  | ||||||
|     F: Future<Output = Result<S, FE>>, |  | ||||||
|     S: HttpService<Body, ResBody = B>, |  | ||||||
|     B: HttpBody + 'static, |  | ||||||
|     B::Error: Into<Box<dyn StdError + Send + Sync>>, |  | ||||||
|     E: ConnStreamExec<S::Future, B>, |  | ||||||
| { |  | ||||||
|     type Output = Result<Connection<I, S, E>, FE>; |  | ||||||
|  |  | ||||||
|     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |  | ||||||
|         let mut me = self.project(); |  | ||||||
|         let service = ready!(me.future.poll(cx))?; |  | ||||||
|         let io = Option::take(&mut me.io).expect("polled after complete"); |  | ||||||
|         Poll::Ready(Ok(me.protocol.serve_connection(io, service))) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // ===== impl SpawnAll ===== |  | ||||||
|  |  | ||||||
| #[cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))] |  | ||||||
| impl<S, E> SpawnAll<AddrIncoming, S, E> { |  | ||||||
|     pub(super) fn local_addr(&self) -> SocketAddr { |  | ||||||
|         self.serve.incoming.local_addr() |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] |  | ||||||
| impl<I, S, E> SpawnAll<I, S, E> { |  | ||||||
|     pub(super) fn incoming_ref(&self) -> &I { |  | ||||||
|         self.serve.incoming_ref() |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] |  | ||||||
| impl<I, IO, IE, S, B, E> SpawnAll<I, S, E> |  | ||||||
| where |  | ||||||
|     I: Accept<Conn = IO, Error = IE>, |  | ||||||
|     IE: Into<Box<dyn StdError + Send + Sync>>, |  | ||||||
|     IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, |  | ||||||
|     S: MakeServiceRef<IO, Body, ResBody = B>, |  | ||||||
|     B: HttpBody, |  | ||||||
|     E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>, |  | ||||||
| { |  | ||||||
|     pub(super) fn poll_watch<W>( |  | ||||||
|         self: Pin<&mut Self>, |  | ||||||
|         cx: &mut task::Context<'_>, |  | ||||||
|         watcher: &W, |  | ||||||
|     ) -> Poll<crate::Result<()>> |  | ||||||
|     where |  | ||||||
|         E: NewSvcExec<IO, S::Future, S::Service, E, W>, |  | ||||||
|         W: Watcher<IO, S::Service, E>, |  | ||||||
|     { |  | ||||||
|         let mut me = self.project(); |  | ||||||
|         loop { |  | ||||||
|             if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) { |  | ||||||
|                 let fut = NewSvcTask::new(connecting, watcher.clone()); |  | ||||||
|                 me.serve |  | ||||||
|                     .as_mut() |  | ||||||
|                     .project() |  | ||||||
|                     .protocol |  | ||||||
|                     .exec |  | ||||||
|                     .execute_new_svc(fut); |  | ||||||
|             } else { |  | ||||||
|                 return Poll::Ready(Ok(())); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // ===== impl ProtoServer ===== | // ===== impl ProtoServer ===== | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
| @@ -1151,150 +950,6 @@ where | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] |  | ||||||
| pub(crate) mod spawn_all { |  | ||||||
|     use std::error::Error as StdError; |  | ||||||
|     use tokio::io::{AsyncRead, AsyncWrite}; |  | ||||||
|     use tracing::debug; |  | ||||||
|  |  | ||||||
|     use super::{Connecting, UpgradeableConnection}; |  | ||||||
|     use crate::body::{Body, HttpBody}; |  | ||||||
|     use crate::common::exec::ConnStreamExec; |  | ||||||
|     use crate::common::{task, Future, Pin, Poll, Unpin}; |  | ||||||
|     use crate::service::HttpService; |  | ||||||
|     use pin_project_lite::pin_project; |  | ||||||
|  |  | ||||||
|     // Used by `SpawnAll` to optionally watch a `Connection` future. |  | ||||||
|     // |  | ||||||
|     // The regular `hyper::Server` just uses a `NoopWatcher`, which does |  | ||||||
|     // not need to watch anything, and so returns the `Connection` untouched. |  | ||||||
|     // |  | ||||||
|     // The `Server::with_graceful_shutdown` needs to keep track of all active |  | ||||||
|     // connections, and signal that they start to shutdown when prompted, so |  | ||||||
|     // it has a `GracefulWatcher` implementation to do that. |  | ||||||
|     pub trait Watcher<I, S: HttpService<Body>, E>: Clone { |  | ||||||
|         type Future: Future<Output = crate::Result<()>>; |  | ||||||
|  |  | ||||||
|         fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[allow(missing_debug_implementations)] |  | ||||||
|     #[derive(Copy, Clone)] |  | ||||||
|     pub struct NoopWatcher; |  | ||||||
|  |  | ||||||
|     impl<I, S, E> Watcher<I, S, E> for NoopWatcher |  | ||||||
|     where |  | ||||||
|         I: AsyncRead + AsyncWrite + Unpin + Send + 'static, |  | ||||||
|         S: HttpService<Body>, |  | ||||||
|         E: ConnStreamExec<S::Future, S::ResBody>, |  | ||||||
|         S::ResBody: 'static, |  | ||||||
|         <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>, |  | ||||||
|     { |  | ||||||
|         type Future = UpgradeableConnection<I, S, E>; |  | ||||||
|  |  | ||||||
|         fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future { |  | ||||||
|             conn |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     // This is a `Future<Item=(), Error=()>` spawned to an `Executor` inside |  | ||||||
|     // the `SpawnAll`. By being a nameable type, we can be generic over the |  | ||||||
|     // user's `Service::Future`, and thus an `Executor` can execute it. |  | ||||||
|     // |  | ||||||
|     // Doing this allows for the server to conditionally require `Send` futures, |  | ||||||
|     // depending on the `Executor` configured. |  | ||||||
|     // |  | ||||||
|     // Users cannot import this type, nor the associated `NewSvcExec`. Instead, |  | ||||||
|     // a blanket implementation for `Executor<impl Future>` is sufficient. |  | ||||||
|  |  | ||||||
|     pin_project! { |  | ||||||
|         #[allow(missing_debug_implementations)] |  | ||||||
|         pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> { |  | ||||||
|             #[pin] |  | ||||||
|             state: State<I, N, S, E, W>, |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pin_project! { |  | ||||||
|         #[project = StateProj] |  | ||||||
|         pub(super) enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> { |  | ||||||
|             Connecting { |  | ||||||
|                 #[pin] |  | ||||||
|                 connecting: Connecting<I, N, E>, |  | ||||||
|                 watcher: W, |  | ||||||
|             }, |  | ||||||
|             Connected { |  | ||||||
|                 #[pin] |  | ||||||
|                 future: W::Future, |  | ||||||
|             }, |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> { |  | ||||||
|         pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self { |  | ||||||
|             NewSvcTask { |  | ||||||
|                 state: State::Connecting { |  | ||||||
|                     connecting, |  | ||||||
|                     watcher, |  | ||||||
|                 }, |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     impl<I, N, S, NE, B, E, W> Future for NewSvcTask<I, N, S, E, W> |  | ||||||
|     where |  | ||||||
|         I: AsyncRead + AsyncWrite + Unpin + Send + 'static, |  | ||||||
|         N: Future<Output = Result<S, NE>>, |  | ||||||
|         NE: Into<Box<dyn StdError + Send + Sync>>, |  | ||||||
|         S: HttpService<Body, ResBody = B>, |  | ||||||
|         B: HttpBody + 'static, |  | ||||||
|         B::Error: Into<Box<dyn StdError + Send + Sync>>, |  | ||||||
|         E: ConnStreamExec<S::Future, B>, |  | ||||||
|         W: Watcher<I, S, E>, |  | ||||||
|     { |  | ||||||
|         type Output = (); |  | ||||||
|  |  | ||||||
|         fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |  | ||||||
|             // If it weren't for needing to name this type so the `Send` bounds |  | ||||||
|             // could be projected to the `Serve` executor, this could just be |  | ||||||
|             // an `async fn`, and much safer. Woe is me. |  | ||||||
|  |  | ||||||
|             let mut me = self.project(); |  | ||||||
|             loop { |  | ||||||
|                 let next = { |  | ||||||
|                     match me.state.as_mut().project() { |  | ||||||
|                         StateProj::Connecting { |  | ||||||
|                             connecting, |  | ||||||
|                             watcher, |  | ||||||
|                         } => { |  | ||||||
|                             let res = ready!(connecting.poll(cx)); |  | ||||||
|                             let conn = match res { |  | ||||||
|                                 Ok(conn) => conn, |  | ||||||
|                                 Err(err) => { |  | ||||||
|                                     let err = crate::Error::new_user_make_service(err); |  | ||||||
|                                     debug!("connecting error: {}", err); |  | ||||||
|                                     return Poll::Ready(()); |  | ||||||
|                                 } |  | ||||||
|                             }; |  | ||||||
|                             let future = watcher.watch(conn.with_upgrades()); |  | ||||||
|                             State::Connected { future } |  | ||||||
|                         } |  | ||||||
|                         StateProj::Connected { future } => { |  | ||||||
|                             return future.poll(cx).map(|res| { |  | ||||||
|                                 if let Err(err) = res { |  | ||||||
|                                     debug!("connection error: {}", err); |  | ||||||
|                                 } |  | ||||||
|                             }); |  | ||||||
|                         } |  | ||||||
|                     } |  | ||||||
|                 }; |  | ||||||
|  |  | ||||||
|                 me.state.set(next); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
| mod upgrades { | mod upgrades { | ||||||
|     use super::*; |     use super::*; | ||||||
|   | |||||||
| @@ -149,7 +149,7 @@ | |||||||
|  |  | ||||||
| pub mod accept; | pub mod accept; | ||||||
| pub mod conn; | pub mod conn; | ||||||
| mod server; | pub(crate) mod server; | ||||||
| #[cfg(feature = "tcp")] | #[cfg(feature = "tcp")] | ||||||
| mod tcp; | mod tcp; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -15,6 +15,7 @@ cfg_feature! { | |||||||
|  |  | ||||||
|     use pin_project_lite::pin_project; |     use pin_project_lite::pin_project; | ||||||
|     use tokio::io::{AsyncRead, AsyncWrite}; |     use tokio::io::{AsyncRead, AsyncWrite}; | ||||||
|  |     use tracing::trace; | ||||||
|  |  | ||||||
|     use super::accept::Accept; |     use super::accept::Accept; | ||||||
|     use crate::body::{Body, HttpBody}; |     use crate::body::{Body, HttpBody}; | ||||||
| @@ -22,9 +23,11 @@ cfg_feature! { | |||||||
|     use crate::common::exec::{ConnStreamExec, NewSvcExec}; |     use crate::common::exec::{ConnStreamExec, NewSvcExec}; | ||||||
|     // Renamed `Http` as `Http_` for now so that people upgrading don't see an |     // Renamed `Http` as `Http_` for now so that people upgrading don't see an | ||||||
|     // error that `hyper::server::Http` is private... |     // error that `hyper::server::Http` is private... | ||||||
|     use super::conn::{Http as Http_, NoopWatcher, SpawnAll}; |     use super::conn::{Connection, Http as Http_, UpgradeableConnection}; | ||||||
|     use super::shutdown::{Graceful, GracefulWatcher}; |     use super::shutdown::{Graceful, GracefulWatcher}; | ||||||
|     use crate::service::{HttpService, MakeServiceRef}; |     use crate::service::{HttpService, MakeServiceRef}; | ||||||
|  |  | ||||||
|  |     use self::new_svc::NewSvcTask; | ||||||
| } | } | ||||||
|  |  | ||||||
| #[cfg(any(feature = "http1", feature = "http2"))] | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
| @@ -37,7 +40,9 @@ pin_project! { | |||||||
|     /// `Executor`. |     /// `Executor`. | ||||||
|     pub struct Server<I, S, E = Exec> { |     pub struct Server<I, S, E = Exec> { | ||||||
|         #[pin] |         #[pin] | ||||||
|         spawn_all: SpawnAll<I, S, E>, |         incoming: I, | ||||||
|  |         make_service: S, | ||||||
|  |         protocol: Http_<E>, | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -107,7 +112,7 @@ cfg_feature! { | |||||||
|     impl<S, E> Server<AddrIncoming, S, E> { |     impl<S, E> Server<AddrIncoming, S, E> { | ||||||
|         /// Returns the local address that this server is bound to. |         /// Returns the local address that this server is bound to. | ||||||
|         pub fn local_addr(&self) -> SocketAddr { |         pub fn local_addr(&self) -> SocketAddr { | ||||||
|             self.spawn_all.local_addr() |             self.incoming.local_addr() | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -124,7 +129,6 @@ where | |||||||
|     B: HttpBody + 'static, |     B: HttpBody + 'static, | ||||||
|     B::Error: Into<Box<dyn StdError + Send + Sync>>, |     B::Error: Into<Box<dyn StdError + Send + Sync>>, | ||||||
|     E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>, |     E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>, | ||||||
|     E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>, |  | ||||||
| { | { | ||||||
|     /// Prepares a server to handle graceful shutdown when the provided future |     /// Prepares a server to handle graceful shutdown when the provided future | ||||||
|     /// completes. |     /// completes. | ||||||
| @@ -165,8 +169,54 @@ where | |||||||
|     pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E> |     pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E> | ||||||
|     where |     where | ||||||
|         F: Future<Output = ()>, |         F: Future<Output = ()>, | ||||||
|  |         E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>, | ||||||
|     { |     { | ||||||
|         Graceful::new(self.spawn_all, signal) |         Graceful::new(self, signal) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn poll_next_( | ||||||
|  |         self: Pin<&mut Self>, | ||||||
|  |         cx: &mut task::Context<'_>, | ||||||
|  |     ) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> { | ||||||
|  |         let me = self.project(); | ||||||
|  |         match ready!(me.make_service.poll_ready_ref(cx)) { | ||||||
|  |             Ok(()) => (), | ||||||
|  |             Err(e) => { | ||||||
|  |                 trace!("make_service closed"); | ||||||
|  |                 return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e)))); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         if let Some(item) = ready!(me.incoming.poll_accept(cx)) { | ||||||
|  |             let io = item.map_err(crate::Error::new_accept)?; | ||||||
|  |             let new_fut = me.make_service.make_service_ref(&io); | ||||||
|  |             Poll::Ready(Some(Ok(Connecting { | ||||||
|  |                 future: new_fut, | ||||||
|  |                 io: Some(io), | ||||||
|  |                 protocol: me.protocol.clone(), | ||||||
|  |             }))) | ||||||
|  |         } else { | ||||||
|  |             Poll::Ready(None) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub(super) fn poll_watch<W>( | ||||||
|  |         mut self: Pin<&mut Self>, | ||||||
|  |         cx: &mut task::Context<'_>, | ||||||
|  |         watcher: &W, | ||||||
|  |     ) -> Poll<crate::Result<()>> | ||||||
|  |     where | ||||||
|  |         E: NewSvcExec<IO, S::Future, S::Service, E, W>, | ||||||
|  |         W: Watcher<IO, S::Service, E>, | ||||||
|  |     { | ||||||
|  |         loop { | ||||||
|  |             if let Some(connecting) = ready!(self.as_mut().poll_next_(cx)?) { | ||||||
|  |                 let fut = NewSvcTask::new(connecting, watcher.clone()); | ||||||
|  |                 self.as_mut().project().protocol.exec.execute_new_svc(fut); | ||||||
|  |             } else { | ||||||
|  |                 return Poll::Ready(Ok(())); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -187,7 +237,7 @@ where | |||||||
|     type Output = crate::Result<()>; |     type Output = crate::Result<()>; | ||||||
|  |  | ||||||
|     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { |     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||||||
|         self.project().spawn_all.poll_watch(cx, &NoopWatcher) |         self.poll_watch(cx, &NoopWatcher) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -195,7 +245,7 @@ impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> { | |||||||
|     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||||||
|         let mut st = f.debug_struct("Server"); |         let mut st = f.debug_struct("Server"); | ||||||
|         #[cfg(any(feature = "http1", feature = "http2"))] |         #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
|         st.field("listener", &self.spawn_all.incoming_ref()); |         st.field("listener", &self.incoming); | ||||||
|         st.finish() |         st.finish() | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -502,7 +552,7 @@ impl<I, E> Builder<I, E> { | |||||||
|     /// } |     /// } | ||||||
|     /// # } |     /// # } | ||||||
|     /// ``` |     /// ``` | ||||||
|     pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E> |     pub fn serve<S, B>(self, make_service: S) -> Server<I, S, E> | ||||||
|     where |     where | ||||||
|         I: Accept, |         I: Accept, | ||||||
|         I::Error: Into<Box<dyn StdError + Send + Sync>>, |         I::Error: Into<Box<dyn StdError + Send + Sync>>, | ||||||
| @@ -514,9 +564,11 @@ impl<I, E> Builder<I, E> { | |||||||
|         E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>, |         E: NewSvcExec<I::Conn, S::Future, S::Service, E, NoopWatcher>, | ||||||
|         E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>, |         E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>, | ||||||
|     { |     { | ||||||
|         let serve = self.protocol.serve(self.incoming, new_service); |         Server { | ||||||
|         let spawn_all = serve.spawn_all(); |             incoming: self.incoming, | ||||||
|         Server { spawn_all } |             make_service, | ||||||
|  |             protocol: self.protocol.clone(), | ||||||
|  |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -558,3 +610,188 @@ impl<E> Builder<AddrIncoming, E> { | |||||||
|         self |         self | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Used by `Server` to optionally watch a `Connection` future. | ||||||
|  | // | ||||||
|  | // The regular `hyper::Server` just uses a `NoopWatcher`, which does | ||||||
|  | // not need to watch anything, and so returns the `Connection` untouched. | ||||||
|  | // | ||||||
|  | // The `Server::with_graceful_shutdown` needs to keep track of all active | ||||||
|  | // connections, and signal that they start to shutdown when prompted, so | ||||||
|  | // it has a `GracefulWatcher` implementation to do that. | ||||||
|  | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
|  | pub trait Watcher<I, S: HttpService<Body>, E>: Clone { | ||||||
|  |     type Future: Future<Output = crate::Result<()>>; | ||||||
|  |  | ||||||
|  |     fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[allow(missing_debug_implementations)] | ||||||
|  | #[derive(Copy, Clone)] | ||||||
|  | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
|  | pub struct NoopWatcher; | ||||||
|  |  | ||||||
|  | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
|  | impl<I, S, E> Watcher<I, S, E> for NoopWatcher | ||||||
|  | where | ||||||
|  |     I: AsyncRead + AsyncWrite + Unpin + Send + 'static, | ||||||
|  |     S: HttpService<Body>, | ||||||
|  |     E: ConnStreamExec<S::Future, S::ResBody>, | ||||||
|  |     S::ResBody: 'static, | ||||||
|  |     <S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>, | ||||||
|  | { | ||||||
|  |     type Future = UpgradeableConnection<I, S, E>; | ||||||
|  |  | ||||||
|  |     fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future { | ||||||
|  |         conn | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
|  | // used by exec.rs | ||||||
|  | pub(crate) mod new_svc { | ||||||
|  |     use std::error::Error as StdError; | ||||||
|  |     use tokio::io::{AsyncRead, AsyncWrite}; | ||||||
|  |     use tracing::debug; | ||||||
|  |  | ||||||
|  |     use super::{Connecting, Watcher}; | ||||||
|  |     use crate::body::{Body, HttpBody}; | ||||||
|  |     use crate::common::exec::ConnStreamExec; | ||||||
|  |     use crate::common::{task, Future, Pin, Poll, Unpin}; | ||||||
|  |     use crate::service::HttpService; | ||||||
|  |     use pin_project_lite::pin_project; | ||||||
|  |  | ||||||
|  |     // This is a `Future<Item=(), Error=()>` spawned to an `Executor` inside | ||||||
|  |     // the `Server`. By being a nameable type, we can be generic over the | ||||||
|  |     // user's `Service::Future`, and thus an `Executor` can execute it. | ||||||
|  |     // | ||||||
|  |     // Doing this allows for the server to conditionally require `Send` futures, | ||||||
|  |     // depending on the `Executor` configured. | ||||||
|  |     // | ||||||
|  |     // Users cannot import this type, nor the associated `NewSvcExec`. Instead, | ||||||
|  |     // a blanket implementation for `Executor<impl Future>` is sufficient. | ||||||
|  |  | ||||||
|  |     pin_project! { | ||||||
|  |         #[allow(missing_debug_implementations)] | ||||||
|  |         pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> { | ||||||
|  |             #[pin] | ||||||
|  |             state: State<I, N, S, E, W>, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pin_project! { | ||||||
|  |         #[project = StateProj] | ||||||
|  |         pub(super) enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> { | ||||||
|  |             Connecting { | ||||||
|  |                 #[pin] | ||||||
|  |                 connecting: Connecting<I, N, E>, | ||||||
|  |                 watcher: W, | ||||||
|  |             }, | ||||||
|  |             Connected { | ||||||
|  |                 #[pin] | ||||||
|  |                 future: W::Future, | ||||||
|  |             }, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> { | ||||||
|  |         pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self { | ||||||
|  |             NewSvcTask { | ||||||
|  |                 state: State::Connecting { | ||||||
|  |                     connecting, | ||||||
|  |                     watcher, | ||||||
|  |                 }, | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     impl<I, N, S, NE, B, E, W> Future for NewSvcTask<I, N, S, E, W> | ||||||
|  |     where | ||||||
|  |         I: AsyncRead + AsyncWrite + Unpin + Send + 'static, | ||||||
|  |         N: Future<Output = Result<S, NE>>, | ||||||
|  |         NE: Into<Box<dyn StdError + Send + Sync>>, | ||||||
|  |         S: HttpService<Body, ResBody = B>, | ||||||
|  |         B: HttpBody + 'static, | ||||||
|  |         B::Error: Into<Box<dyn StdError + Send + Sync>>, | ||||||
|  |         E: ConnStreamExec<S::Future, B>, | ||||||
|  |         W: Watcher<I, S, E>, | ||||||
|  |     { | ||||||
|  |         type Output = (); | ||||||
|  |  | ||||||
|  |         fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||||||
|  |             // If it weren't for needing to name this type so the `Send` bounds | ||||||
|  |             // could be projected to the `Serve` executor, this could just be | ||||||
|  |             // an `async fn`, and much safer. Woe is me. | ||||||
|  |  | ||||||
|  |             let mut me = self.project(); | ||||||
|  |             loop { | ||||||
|  |                 let next = { | ||||||
|  |                     match me.state.as_mut().project() { | ||||||
|  |                         StateProj::Connecting { | ||||||
|  |                             connecting, | ||||||
|  |                             watcher, | ||||||
|  |                         } => { | ||||||
|  |                             let res = ready!(connecting.poll(cx)); | ||||||
|  |                             let conn = match res { | ||||||
|  |                                 Ok(conn) => conn, | ||||||
|  |                                 Err(err) => { | ||||||
|  |                                     let err = crate::Error::new_user_make_service(err); | ||||||
|  |                                     debug!("connecting error: {}", err); | ||||||
|  |                                     return Poll::Ready(()); | ||||||
|  |                                 } | ||||||
|  |                             }; | ||||||
|  |                             let future = watcher.watch(conn.with_upgrades()); | ||||||
|  |                             State::Connected { future } | ||||||
|  |                         } | ||||||
|  |                         StateProj::Connected { future } => { | ||||||
|  |                             return future.poll(cx).map(|res| { | ||||||
|  |                                 if let Err(err) = res { | ||||||
|  |                                     debug!("connection error: {}", err); | ||||||
|  |                                 } | ||||||
|  |                             }); | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|  |                 }; | ||||||
|  |  | ||||||
|  |                 me.state.set(next); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
|  | pin_project! { | ||||||
|  |     /// A future building a new `Service` to a `Connection`. | ||||||
|  |     /// | ||||||
|  |     /// Wraps the future returned from `MakeService` into one that returns | ||||||
|  |     /// a `Connection`. | ||||||
|  |     #[must_use = "futures do nothing unless polled"] | ||||||
|  |     #[derive(Debug)] | ||||||
|  |     #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] | ||||||
|  |     pub struct Connecting<I, F, E = Exec> { | ||||||
|  |         #[pin] | ||||||
|  |         future: F, | ||||||
|  |         io: Option<I>, | ||||||
|  |         protocol: Http_<E>, | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[cfg(any(feature = "http1", feature = "http2"))] | ||||||
|  | impl<I, F, S, FE, E, B> Future for Connecting<I, F, E> | ||||||
|  | where | ||||||
|  |     I: AsyncRead + AsyncWrite + Unpin, | ||||||
|  |     F: Future<Output = Result<S, FE>>, | ||||||
|  |     S: HttpService<Body, ResBody = B>, | ||||||
|  |     B: HttpBody + 'static, | ||||||
|  |     B::Error: Into<Box<dyn StdError + Send + Sync>>, | ||||||
|  |     E: ConnStreamExec<S::Future, B>, | ||||||
|  | { | ||||||
|  |     type Output = Result<Connection<I, S, E>, FE>; | ||||||
|  |  | ||||||
|  |     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { | ||||||
|  |         let mut me = self.project(); | ||||||
|  |         let service = ready!(me.future.poll(cx))?; | ||||||
|  |         let io = Option::take(&mut me.io).expect("polled after complete"); | ||||||
|  |         Poll::Ready(Ok(me.protocol.serve_connection(io, service))) | ||||||
|  |     } | ||||||
|  | } | ||||||
|   | |||||||
| @@ -5,7 +5,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; | |||||||
| use tracing::debug; | use tracing::debug; | ||||||
|  |  | ||||||
| use super::accept::Accept; | use super::accept::Accept; | ||||||
| use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; | use super::conn::UpgradeableConnection; | ||||||
|  | use super::server::{Server, Watcher}; | ||||||
| use crate::body::{Body, HttpBody}; | use crate::body::{Body, HttpBody}; | ||||||
| use crate::common::drain::{self, Draining, Signal, Watch, Watching}; | use crate::common::drain::{self, Draining, Signal, Watch, Watching}; | ||||||
| use crate::common::exec::{ConnStreamExec, NewSvcExec}; | use crate::common::exec::{ConnStreamExec, NewSvcExec}; | ||||||
| @@ -26,7 +27,7 @@ pin_project! { | |||||||
|         Running { |         Running { | ||||||
|             drain: Option<(Signal, Watch)>, |             drain: Option<(Signal, Watch)>, | ||||||
|             #[pin] |             #[pin] | ||||||
|             spawn_all: SpawnAll<I, S, E>, |             server: Server<I, S, E>, | ||||||
|             #[pin] |             #[pin] | ||||||
|             signal: F, |             signal: F, | ||||||
|         }, |         }, | ||||||
| @@ -35,12 +36,12 @@ pin_project! { | |||||||
| } | } | ||||||
|  |  | ||||||
| impl<I, S, F, E> Graceful<I, S, F, E> { | impl<I, S, F, E> Graceful<I, S, F, E> { | ||||||
|     pub(super) fn new(spawn_all: SpawnAll<I, S, E>, signal: F) -> Self { |     pub(super) fn new(server: Server<I, S, E>, signal: F) -> Self { | ||||||
|         let drain = Some(drain::channel()); |         let drain = Some(drain::channel()); | ||||||
|         Graceful { |         Graceful { | ||||||
|             state: State::Running { |             state: State::Running { | ||||||
|                 drain, |                 drain, | ||||||
|                 spawn_all, |                 server, | ||||||
|                 signal, |                 signal, | ||||||
|             }, |             }, | ||||||
|         } |         } | ||||||
| @@ -69,7 +70,7 @@ where | |||||||
|                 match me.state.as_mut().project() { |                 match me.state.as_mut().project() { | ||||||
|                     StateProj::Running { |                     StateProj::Running { | ||||||
|                         drain, |                         drain, | ||||||
|                         spawn_all, |                         server, | ||||||
|                         signal, |                         signal, | ||||||
|                     } => match signal.poll(cx) { |                     } => match signal.poll(cx) { | ||||||
|                         Poll::Ready(()) => { |                         Poll::Ready(()) => { | ||||||
| @@ -81,7 +82,7 @@ where | |||||||
|                         } |                         } | ||||||
|                         Poll::Pending => { |                         Poll::Pending => { | ||||||
|                             let watch = drain.as_ref().expect("drain channel").1.clone(); |                             let watch = drain.as_ref().expect("drain channel").1.clone(); | ||||||
|                             return spawn_all.poll_watch(cx, &GracefulWatcher(watch)); |                             return server.poll_watch(cx, &GracefulWatcher(watch)); | ||||||
|                         } |                         } | ||||||
|                     }, |                     }, | ||||||
|                     StateProj::Draining { ref mut draining } => { |                     StateProj::Draining { ref mut draining } => { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user