From 0fec1c8737e84bfccf0cd6495564ba465442cd1e Mon Sep 17 00:00:00 2001 From: SabrinaJewson Date: Sat, 12 Mar 2022 12:59:58 +0000 Subject: [PATCH] refactor(server): move non-conn code out of conn.rs The actual code for `Server` was previously organized very confusingly: it was thrice layered with `SpawnAll` and `Serve` which both appeared in conn.rs despite not having anything to do with the lower-level conn API. This commit changes that, removing all layering and having the code for the higher-level `Server` appear inside `server.rs` only. --- src/common/exec.rs | 2 +- src/server/conn.rs | 353 +---------------------------------------- src/server/mod.rs | 2 +- src/server/server.rs | 261 ++++++++++++++++++++++++++++-- src/server/shutdown.rs | 13 +- 5 files changed, 262 insertions(+), 369 deletions(-) diff --git a/src/common/exec.rs b/src/common/exec.rs index b6da9a27..f890e6aa 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -11,7 +11,7 @@ use crate::body::HttpBody; use crate::proto::h2::server::H2Stream; use crate::rt::Executor; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] -use crate::server::conn::spawn_all::{NewSvcTask, Watcher}; +use crate::server::server::{Watcher, new_svc::NewSvcTask}; #[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))] use crate::service::HttpService; diff --git a/src/server/conn.rs b/src/server/conn.rs index de765b3a..1e6cb469 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -48,8 +48,6 @@ not(all(feature = "http1", feature = "http2")) ))] use std::marker::PhantomData; -#[cfg(feature = "tcp")] -use std::net::SocketAddr; use std::time::Duration; #[cfg(feature = "http2")] @@ -70,17 +68,15 @@ cfg_feature! { use tokio::io::{AsyncRead, AsyncWrite}; use tracing::trace; - use super::accept::Accept; + pub use super::server::Connecting; use crate::body::{Body, HttpBody}; use crate::common::{task, Future, Pin, Poll, Unpin}; #[cfg(not(all(feature = "http1", feature = "http2")))] use crate::common::Never; - use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec}; + use crate::common::exec::{ConnStreamExec, Exec}; use crate::proto; - use crate::service::{HttpService, MakeServiceRef}; - use self::spawn_all::NewSvcTask; + use crate::service::HttpService; - pub(super) use self::spawn_all::{NoopWatcher, Watcher}; pub(super) use self::upgrades::UpgradeableConnection; } @@ -97,7 +93,7 @@ pub use super::tcp::{AddrIncoming, AddrStream}; #[cfg(any(feature = "http1", feature = "http2"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] pub struct Http { - exec: E, + pub(crate) exec: E, h1_half_close: bool, h1_keep_alive: bool, h1_title_case_headers: bool, @@ -127,51 +123,6 @@ enum ConnectionMode { Fallback, } -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - /// A stream mapping incoming IOs to new services. - /// - /// Yields `Connecting`s that are futures that should be put on a reactor. - #[must_use = "streams do nothing unless polled"] - #[derive(Debug)] - pub(super) struct Serve { - #[pin] - incoming: I, - make_service: S, - protocol: Http, - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - /// A future building a new `Service` to a `Connection`. - /// - /// Wraps the future returned from `MakeService` into one that returns - /// a `Connection`. - #[must_use = "futures do nothing unless polled"] - #[derive(Debug)] - #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] - pub struct Connecting { - #[pin] - future: F, - io: Option, - protocol: Http, - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -pin_project! { - #[must_use = "futures do nothing unless polled"] - #[derive(Debug)] - pub(super) struct SpawnAll { - // TODO: re-add `pub(super)` once rustdoc can handle this. - // - // See https://github.com/rust-lang/rust/issues/64705 - #[pin] - pub(super) serve: Serve, - } -} - #[cfg(any(feature = "http1", feature = "http2"))] pin_project! { /// A future binding a connection with a Service. @@ -719,23 +670,6 @@ impl Http { fallback: PhantomData, } } - - pub(super) fn serve(&self, incoming: I, make_service: S) -> Serve - where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin, - S: MakeServiceRef, - S::Error: Into>, - Bd: HttpBody, - E: ConnStreamExec<>::Future, Bd>, - { - Serve { - incoming, - make_service, - protocol: self.clone(), - } - } } // ===== impl Connection ===== @@ -987,141 +921,6 @@ impl Default for ConnectionMode { } } -// ===== impl Serve ===== - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Serve { - /// Get a reference to the incoming stream. - #[inline] - pub(super) fn incoming_ref(&self) -> &I { - &self.incoming - } - - /* - /// Get a mutable reference to the incoming stream. - #[inline] - pub fn incoming_mut(&mut self) -> &mut I { - &mut self.incoming - } - */ - - /// Spawn all incoming connections onto the executor in `Http`. - pub(super) fn spawn_all(self) -> SpawnAll { - SpawnAll { serve: self } - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Serve -where - I: Accept, - IO: AsyncRead + AsyncWrite + Unpin, - IE: Into>, - S: MakeServiceRef, - B: HttpBody, - E: ConnStreamExec<>::Future, B>, -{ - fn poll_next_( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - ) -> Poll>>> { - let me = self.project(); - match ready!(me.make_service.poll_ready_ref(cx)) { - Ok(()) => (), - Err(e) => { - trace!("make_service closed"); - return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e)))); - } - } - - if let Some(item) = ready!(me.incoming.poll_accept(cx)) { - let io = item.map_err(crate::Error::new_accept)?; - let new_fut = me.make_service.make_service_ref(&io); - Poll::Ready(Some(Ok(Connecting { - future: new_fut, - io: Some(io), - protocol: me.protocol.clone(), - }))) - } else { - Poll::Ready(None) - } - } -} - -// ===== impl Connecting ===== - -#[cfg(any(feature = "http1", feature = "http2"))] -impl Future for Connecting -where - I: AsyncRead + AsyncWrite + Unpin, - F: Future>, - S: HttpService, - B: HttpBody + 'static, - B::Error: Into>, - E: ConnStreamExec, -{ - type Output = Result, FE>; - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let mut me = self.project(); - let service = ready!(me.future.poll(cx))?; - let io = Option::take(&mut me.io).expect("polled after complete"); - Poll::Ready(Ok(me.protocol.serve_connection(io, service))) - } -} - -// ===== impl SpawnAll ===== - -#[cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))] -impl SpawnAll { - pub(super) fn local_addr(&self) -> SocketAddr { - self.serve.incoming.local_addr() - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl SpawnAll { - pub(super) fn incoming_ref(&self) -> &I { - self.serve.incoming_ref() - } -} - -#[cfg(any(feature = "http1", feature = "http2"))] -impl SpawnAll -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - B: HttpBody, - E: ConnStreamExec<>::Future, B>, -{ - pub(super) fn poll_watch( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - watcher: &W, - ) -> Poll> - where - E: NewSvcExec, - W: Watcher, - { - let mut me = self.project(); - loop { - if let Some(connecting) = ready!(me.serve.as_mut().poll_next_(cx)?) { - let fut = NewSvcTask::new(connecting, watcher.clone()); - me.serve - .as_mut() - .project() - .protocol - .exec - .execute_new_svc(fut); - } else { - return Poll::Ready(Ok(())); - } - } - } -} - // ===== impl ProtoServer ===== #[cfg(any(feature = "http1", feature = "http2"))] @@ -1151,150 +950,6 @@ where } } -#[cfg(any(feature = "http1", feature = "http2"))] -pub(crate) mod spawn_all { - use std::error::Error as StdError; - use tokio::io::{AsyncRead, AsyncWrite}; - use tracing::debug; - - use super::{Connecting, UpgradeableConnection}; - use crate::body::{Body, HttpBody}; - use crate::common::exec::ConnStreamExec; - use crate::common::{task, Future, Pin, Poll, Unpin}; - use crate::service::HttpService; - use pin_project_lite::pin_project; - - // Used by `SpawnAll` to optionally watch a `Connection` future. - // - // The regular `hyper::Server` just uses a `NoopWatcher`, which does - // not need to watch anything, and so returns the `Connection` untouched. - // - // The `Server::with_graceful_shutdown` needs to keep track of all active - // connections, and signal that they start to shutdown when prompted, so - // it has a `GracefulWatcher` implementation to do that. - pub trait Watcher, E>: Clone { - type Future: Future>; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future; - } - - #[allow(missing_debug_implementations)] - #[derive(Copy, Clone)] - pub struct NoopWatcher; - - impl Watcher for NoopWatcher - where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: HttpService, - E: ConnStreamExec, - S::ResBody: 'static, - ::Error: Into>, - { - type Future = UpgradeableConnection; - - fn watch(&self, conn: UpgradeableConnection) -> Self::Future { - conn - } - } - - // This is a `Future` spawned to an `Executor` inside - // the `SpawnAll`. By being a nameable type, we can be generic over the - // user's `Service::Future`, and thus an `Executor` can execute it. - // - // Doing this allows for the server to conditionally require `Send` futures, - // depending on the `Executor` configured. - // - // Users cannot import this type, nor the associated `NewSvcExec`. Instead, - // a blanket implementation for `Executor` is sufficient. - - pin_project! { - #[allow(missing_debug_implementations)] - pub struct NewSvcTask, E, W: Watcher> { - #[pin] - state: State, - } - } - - pin_project! { - #[project = StateProj] - pub(super) enum State, E, W: Watcher> { - Connecting { - #[pin] - connecting: Connecting, - watcher: W, - }, - Connected { - #[pin] - future: W::Future, - }, - } - } - - impl, E, W: Watcher> NewSvcTask { - pub(super) fn new(connecting: Connecting, watcher: W) -> Self { - NewSvcTask { - state: State::Connecting { - connecting, - watcher, - }, - } - } - } - - impl Future for NewSvcTask - where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - N: Future>, - NE: Into>, - S: HttpService, - B: HttpBody + 'static, - B::Error: Into>, - E: ConnStreamExec, - W: Watcher, - { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - // If it weren't for needing to name this type so the `Send` bounds - // could be projected to the `Serve` executor, this could just be - // an `async fn`, and much safer. Woe is me. - - let mut me = self.project(); - loop { - let next = { - match me.state.as_mut().project() { - StateProj::Connecting { - connecting, - watcher, - } => { - let res = ready!(connecting.poll(cx)); - let conn = match res { - Ok(conn) => conn, - Err(err) => { - let err = crate::Error::new_user_make_service(err); - debug!("connecting error: {}", err); - return Poll::Ready(()); - } - }; - let future = watcher.watch(conn.with_upgrades()); - State::Connected { future } - } - StateProj::Connected { future } => { - return future.poll(cx).map(|res| { - if let Err(err) = res { - debug!("connection error: {}", err); - } - }); - } - } - }; - - me.state.set(next); - } - } - } -} - #[cfg(any(feature = "http1", feature = "http2"))] mod upgrades { use super::*; diff --git a/src/server/mod.rs b/src/server/mod.rs index a97944f5..d658f421 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -149,7 +149,7 @@ pub mod accept; pub mod conn; -mod server; +pub(crate) mod server; #[cfg(feature = "tcp")] mod tcp; diff --git a/src/server/server.rs b/src/server/server.rs index c48582c7..f2fb8bd8 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -15,6 +15,7 @@ cfg_feature! { use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; + use tracing::trace; use super::accept::Accept; use crate::body::{Body, HttpBody}; @@ -22,9 +23,11 @@ cfg_feature! { use crate::common::exec::{ConnStreamExec, NewSvcExec}; // Renamed `Http` as `Http_` for now so that people upgrading don't see an // error that `hyper::server::Http` is private... - use super::conn::{Http as Http_, NoopWatcher, SpawnAll}; + use super::conn::{Connection, Http as Http_, UpgradeableConnection}; use super::shutdown::{Graceful, GracefulWatcher}; use crate::service::{HttpService, MakeServiceRef}; + + use self::new_svc::NewSvcTask; } #[cfg(any(feature = "http1", feature = "http2"))] @@ -37,7 +40,9 @@ pin_project! { /// `Executor`. pub struct Server { #[pin] - spawn_all: SpawnAll, + incoming: I, + make_service: S, + protocol: Http_, } } @@ -107,7 +112,7 @@ cfg_feature! { impl Server { /// Returns the local address that this server is bound to. pub fn local_addr(&self) -> SocketAddr { - self.spawn_all.local_addr() + self.incoming.local_addr() } } } @@ -124,7 +129,6 @@ where B: HttpBody + 'static, B::Error: Into>, E: ConnStreamExec<>::Future, B>, - E: NewSvcExec, { /// Prepares a server to handle graceful shutdown when the provided future /// completes. @@ -165,8 +169,54 @@ where pub fn with_graceful_shutdown(self, signal: F) -> Graceful where F: Future, + E: NewSvcExec, { - Graceful::new(self.spawn_all, signal) + Graceful::new(self, signal) + } + + fn poll_next_( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll>>> { + let me = self.project(); + match ready!(me.make_service.poll_ready_ref(cx)) { + Ok(()) => (), + Err(e) => { + trace!("make_service closed"); + return Poll::Ready(Some(Err(crate::Error::new_user_make_service(e)))); + } + } + + if let Some(item) = ready!(me.incoming.poll_accept(cx)) { + let io = item.map_err(crate::Error::new_accept)?; + let new_fut = me.make_service.make_service_ref(&io); + Poll::Ready(Some(Ok(Connecting { + future: new_fut, + io: Some(io), + protocol: me.protocol.clone(), + }))) + } else { + Poll::Ready(None) + } + } + + pub(super) fn poll_watch( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + watcher: &W, + ) -> Poll> + where + E: NewSvcExec, + W: Watcher, + { + loop { + if let Some(connecting) = ready!(self.as_mut().poll_next_(cx)?) { + let fut = NewSvcTask::new(connecting, watcher.clone()); + self.as_mut().project().protocol.exec.execute_new_svc(fut); + } else { + return Poll::Ready(Ok(())); + } + } } } @@ -187,7 +237,7 @@ where type Output = crate::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - self.project().spawn_all.poll_watch(cx, &NoopWatcher) + self.poll_watch(cx, &NoopWatcher) } } @@ -195,7 +245,7 @@ impl fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut st = f.debug_struct("Server"); #[cfg(any(feature = "http1", feature = "http2"))] - st.field("listener", &self.spawn_all.incoming_ref()); + st.field("listener", &self.incoming); st.finish() } } @@ -309,7 +359,7 @@ impl Builder { self } - /// Set a timeout for reading client request headers. If a client does not + /// Set a timeout for reading client request headers. If a client does not /// transmit the entire header within this time, the connection is closed. /// /// Default is None. @@ -502,7 +552,7 @@ impl Builder { /// } /// # } /// ``` - pub fn serve(self, new_service: S) -> Server + pub fn serve(self, make_service: S) -> Server where I: Accept, I::Error: Into>, @@ -514,9 +564,11 @@ impl Builder { E: NewSvcExec, E: ConnStreamExec<>::Future, B>, { - let serve = self.protocol.serve(self.incoming, new_service); - let spawn_all = serve.spawn_all(); - Server { spawn_all } + Server { + incoming: self.incoming, + make_service, + protocol: self.protocol.clone(), + } } } @@ -558,3 +610,188 @@ impl Builder { self } } + +// Used by `Server` to optionally watch a `Connection` future. +// +// The regular `hyper::Server` just uses a `NoopWatcher`, which does +// not need to watch anything, and so returns the `Connection` untouched. +// +// The `Server::with_graceful_shutdown` needs to keep track of all active +// connections, and signal that they start to shutdown when prompted, so +// it has a `GracefulWatcher` implementation to do that. +#[cfg(any(feature = "http1", feature = "http2"))] +pub trait Watcher, E>: Clone { + type Future: Future>; + + fn watch(&self, conn: UpgradeableConnection) -> Self::Future; +} + +#[allow(missing_debug_implementations)] +#[derive(Copy, Clone)] +#[cfg(any(feature = "http1", feature = "http2"))] +pub struct NoopWatcher; + +#[cfg(any(feature = "http1", feature = "http2"))] +impl Watcher for NoopWatcher +where + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + S: HttpService, + E: ConnStreamExec, + S::ResBody: 'static, + ::Error: Into>, +{ + type Future = UpgradeableConnection; + + fn watch(&self, conn: UpgradeableConnection) -> Self::Future { + conn + } +} + +#[cfg(any(feature = "http1", feature = "http2"))] +// used by exec.rs +pub(crate) mod new_svc { + use std::error::Error as StdError; + use tokio::io::{AsyncRead, AsyncWrite}; + use tracing::debug; + + use super::{Connecting, Watcher}; + use crate::body::{Body, HttpBody}; + use crate::common::exec::ConnStreamExec; + use crate::common::{task, Future, Pin, Poll, Unpin}; + use crate::service::HttpService; + use pin_project_lite::pin_project; + + // This is a `Future` spawned to an `Executor` inside + // the `Server`. By being a nameable type, we can be generic over the + // user's `Service::Future`, and thus an `Executor` can execute it. + // + // Doing this allows for the server to conditionally require `Send` futures, + // depending on the `Executor` configured. + // + // Users cannot import this type, nor the associated `NewSvcExec`. Instead, + // a blanket implementation for `Executor` is sufficient. + + pin_project! { + #[allow(missing_debug_implementations)] + pub struct NewSvcTask, E, W: Watcher> { + #[pin] + state: State, + } + } + + pin_project! { + #[project = StateProj] + pub(super) enum State, E, W: Watcher> { + Connecting { + #[pin] + connecting: Connecting, + watcher: W, + }, + Connected { + #[pin] + future: W::Future, + }, + } + } + + impl, E, W: Watcher> NewSvcTask { + pub(super) fn new(connecting: Connecting, watcher: W) -> Self { + NewSvcTask { + state: State::Connecting { + connecting, + watcher, + }, + } + } + } + + impl Future for NewSvcTask + where + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + N: Future>, + NE: Into>, + S: HttpService, + B: HttpBody + 'static, + B::Error: Into>, + E: ConnStreamExec, + W: Watcher, + { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // If it weren't for needing to name this type so the `Send` bounds + // could be projected to the `Serve` executor, this could just be + // an `async fn`, and much safer. Woe is me. + + let mut me = self.project(); + loop { + let next = { + match me.state.as_mut().project() { + StateProj::Connecting { + connecting, + watcher, + } => { + let res = ready!(connecting.poll(cx)); + let conn = match res { + Ok(conn) => conn, + Err(err) => { + let err = crate::Error::new_user_make_service(err); + debug!("connecting error: {}", err); + return Poll::Ready(()); + } + }; + let future = watcher.watch(conn.with_upgrades()); + State::Connected { future } + } + StateProj::Connected { future } => { + return future.poll(cx).map(|res| { + if let Err(err) = res { + debug!("connection error: {}", err); + } + }); + } + } + }; + + me.state.set(next); + } + } + } +} + +#[cfg(any(feature = "http1", feature = "http2"))] +pin_project! { + /// A future building a new `Service` to a `Connection`. + /// + /// Wraps the future returned from `MakeService` into one that returns + /// a `Connection`. + #[must_use = "futures do nothing unless polled"] + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] + pub struct Connecting { + #[pin] + future: F, + io: Option, + protocol: Http_, + } +} + +#[cfg(any(feature = "http1", feature = "http2"))] +impl Future for Connecting +where + I: AsyncRead + AsyncWrite + Unpin, + F: Future>, + S: HttpService, + B: HttpBody + 'static, + B::Error: Into>, + E: ConnStreamExec, +{ + type Output = Result, FE>; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut me = self.project(); + let service = ready!(me.future.poll(cx))?; + let io = Option::take(&mut me.io).expect("polled after complete"); + Poll::Ready(Ok(me.protocol.serve_connection(io, service))) + } +} diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index 2277a409..96937d08 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -5,7 +5,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tracing::debug; use super::accept::Accept; -use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; +use super::conn::UpgradeableConnection; +use super::server::{Server, Watcher}; use crate::body::{Body, HttpBody}; use crate::common::drain::{self, Draining, Signal, Watch, Watching}; use crate::common::exec::{ConnStreamExec, NewSvcExec}; @@ -26,7 +27,7 @@ pin_project! { Running { drain: Option<(Signal, Watch)>, #[pin] - spawn_all: SpawnAll, + server: Server, #[pin] signal: F, }, @@ -35,12 +36,12 @@ pin_project! { } impl Graceful { - pub(super) fn new(spawn_all: SpawnAll, signal: F) -> Self { + pub(super) fn new(server: Server, signal: F) -> Self { let drain = Some(drain::channel()); Graceful { state: State::Running { drain, - spawn_all, + server, signal, }, } @@ -69,7 +70,7 @@ where match me.state.as_mut().project() { StateProj::Running { drain, - spawn_all, + server, signal, } => match signal.poll(cx) { Poll::Ready(()) => { @@ -81,7 +82,7 @@ where } Poll::Pending => { let watch = drain.as_ref().expect("drain channel").1.clone(); - return spawn_all.poll_watch(cx, &GracefulWatcher(watch)); + return server.poll_watch(cx, &GracefulWatcher(watch)); } }, StateProj::Draining { ref mut draining } => {