refactor(lib): Switch from pin-project to pin-project-lite (#2566)

Note the practical affects of this change:

- Dependency count with --features full dropped from 65 to 55.
- Time to compile after a clean dropped from 48s to 35s (on a pretty underpowered VM).

Closes #2388
This commit is contained in:
Jonas Platte
2021-06-04 23:57:27 +02:00
committed by GitHub
parent 0d82405a7b
commit 6a6a24030e
15 changed files with 423 additions and 288 deletions

View File

@@ -34,7 +34,7 @@ httparse = "1.4"
h2 = { version = "0.3.3", optional = true } h2 = { version = "0.3.3", optional = true }
itoa = "0.4.1" itoa = "0.4.1"
tracing = { version = "0.1", default-features = false, features = ["std"] } tracing = { version = "0.1", default-features = false, features = ["std"] }
pin-project = "1.0" pin-project-lite = "0.2.4"
tower-service = "0.3" tower-service = "0.3"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
want = "0.3" want = "0.3"

View File

@@ -48,7 +48,7 @@
use std::error::Error as StdError; use std::error::Error as StdError;
use std::fmt; use std::fmt;
#[cfg(feature = "http2")] #[cfg(not(all(feature = "http1", feature = "http2")))]
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
#[cfg(all(feature = "runtime", feature = "http2"))] #[cfg(all(feature = "runtime", feature = "http2"))]
@@ -57,12 +57,14 @@ use std::time::Duration;
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::{self, Either, FutureExt as _}; use futures_util::future::{self, Either, FutureExt as _};
use httparse::ParserConfig; use httparse::ParserConfig;
use pin_project::pin_project; use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::Service; use tower_service::Service;
use super::dispatch; use super::dispatch;
use crate::body::HttpBody; use crate::body::HttpBody;
#[cfg(not(all(feature = "http1", feature = "http2")))]
use crate::common::Never;
use crate::common::{ use crate::common::{
exec::{BoxSendFuture, Exec}, exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll, task, Future, Pin, Poll,
@@ -74,17 +76,33 @@ use crate::upgrade::Upgraded;
use crate::{Body, Request, Response}; use crate::{Body, Request, Response};
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, R>; type Http1Dispatcher<T, B> =
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
#[pin_project(project = ProtoClientProj)] #[cfg(not(feature = "http1"))]
enum ProtoClient<T, B> type Http1Dispatcher<T, B> = (Never, PhantomData<(T, Pin<Box<B>>)>);
where
B: HttpBody, #[cfg(feature = "http2")]
{ type Http2ClientTask<B> = proto::h2::ClientTask<B>;
#[cfg(feature = "http1")]
H1(#[pin] Http1Dispatcher<T, B, proto::h1::ClientTransaction>), #[cfg(not(feature = "http2"))]
#[cfg(feature = "http2")] type Http2ClientTask<B> = (Never, PhantomData<Pin<Box<B>>>);
H2(#[pin] proto::h2::ClientTask<B>, PhantomData<fn(T)>),
pin_project! {
#[project = ProtoClientProj]
enum ProtoClient<T, B>
where
B: HttpBody,
{
H1 {
#[pin]
h1: Http1Dispatcher<T, B>,
},
H2 {
#[pin]
h2: Http2ClientTask<B>,
},
}
} }
/// Returns a handshake future over some IO. /// Returns a handshake future over some IO.
@@ -405,7 +423,7 @@ where
pub fn into_parts(self) -> Parts<T> { pub fn into_parts(self) -> Parts<T> {
match self.inner.expect("already upgraded") { match self.inner.expect("already upgraded") {
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
ProtoClient::H1(h1) => { ProtoClient::H1 { h1 } => {
let (io, read_buf, _) = h1.into_inner(); let (io, read_buf, _) = h1.into_inner();
Parts { Parts {
io, io,
@@ -413,10 +431,12 @@ where
_inner: (), _inner: (),
} }
} }
#[cfg(feature = "http2")] ProtoClient::H2 { .. } => {
ProtoClient::H2(..) => {
panic!("http2 cannot into_inner"); panic!("http2 cannot into_inner");
} }
#[cfg(not(feature = "http1"))]
ProtoClient::H1 { h1 } => match h1.0 {},
} }
} }
@@ -434,9 +454,14 @@ where
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
match *self.inner.as_mut().expect("already upgraded") { match *self.inner.as_mut().expect("already upgraded") {
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx), ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
ProtoClient::H2(ref mut h2, _) => Pin::new(h2).poll(cx).map_ok(|_| ()), ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()),
#[cfg(not(feature = "http1"))]
ProtoClient::H1 { ref mut h1 } => match h1.0 {},
#[cfg(not(feature = "http2"))]
ProtoClient::H2 { ref mut h2, .. } => match h2.0 {},
} }
} }
@@ -465,7 +490,7 @@ where
proto::Dispatched::Shutdown => Poll::Ready(Ok(())), proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
proto::Dispatched::Upgrade(pending) => match self.inner.take() { proto::Dispatched::Upgrade(pending) => match self.inner.take() {
Some(ProtoClient::H1(h1)) => { Some(ProtoClient::H1 { h1 }) => {
let (io, buf, _) = h1.into_inner(); let (io, buf, _) = h1.into_inner();
pending.fulfill(Upgraded::new(io, buf)); pending.fulfill(Upgraded::new(io, buf));
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
@@ -756,14 +781,14 @@ impl Builder {
} }
let cd = proto::h1::dispatch::Client::new(rx); let cd = proto::h1::dispatch::Client::new(rx);
let dispatch = proto::h1::Dispatcher::new(cd, conn); let dispatch = proto::h1::Dispatcher::new(cd, conn);
ProtoClient::H1(dispatch) ProtoClient::H1 { h1: dispatch }
} }
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
Proto::Http2 => { Proto::Http2 => {
let h2 = let h2 =
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone()) proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
.await?; .await?;
ProtoClient::H2(h2, PhantomData) ProtoClient::H2 { h2 }
} }
}; };
@@ -817,9 +842,14 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.project() { match self.project() {
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
ProtoClientProj::H1(c) => c.poll(cx), ProtoClientProj::H1 { h1 } => h1.poll(cx),
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
ProtoClientProj::H2(c, _) => c.poll(cx), ProtoClientProj::H2 { h2, .. } => h2.poll(cx),
#[cfg(not(feature = "http1"))]
ProtoClientProj::H1 { h1 } => match h1.0 {},
#[cfg(not(feature = "http2"))]
ProtoClientProj::H2 { h2, .. } => match h2.0 {},
} }
} }
} }

View File

@@ -11,7 +11,7 @@ use std::time::Duration;
use futures_util::future::Either; use futures_util::future::Either;
use http::uri::{Scheme, Uri}; use http::uri::{Scheme, Uri};
use pin_project::pin_project; use pin_project_lite::pin_project;
use tokio::net::{TcpSocket, TcpStream}; use tokio::net::{TcpSocket, TcpStream};
use tokio::time::Sleep; use tokio::time::Sleep;
@@ -373,18 +373,19 @@ impl HttpInfo {
} }
} }
// Not publicly exported (so missing_docs doesn't trigger). pin_project! {
// // Not publicly exported (so missing_docs doesn't trigger).
// We return this `Future` instead of the `Pin<Box<dyn Future>>` directly //
// so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot // We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
// (and thus we can change the type in the future). // so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
#[must_use = "futures do nothing unless polled"] // (and thus we can change the type in the future).
#[pin_project] #[must_use = "futures do nothing unless polled"]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct HttpConnecting<R> { pub struct HttpConnecting<R> {
#[pin] #[pin]
fut: BoxConnecting, fut: BoxConnecting,
_marker: PhantomData<R>, _marker: PhantomData<R>,
}
} }
type ConnectResult = Result<TcpStream, ConnectError>; type ConnectResult = Result<TcpStream, ConnectError>;

View File

@@ -11,7 +11,7 @@ use futures_channel::oneshot;
use tokio::time::{Duration, Instant, Interval}; use tokio::time::{Duration, Instant, Interval};
use super::client::Ver; use super::client::Ver;
use crate::common::{task, exec::Exec, Future, Pin, Poll, Unpin}; use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin};
// FIXME: allow() required due to `impl Trait` leaking types to this lint // FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@@ -714,16 +714,17 @@ impl Expiration {
} }
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
#[pin_project::pin_project] pin_project_lite::pin_project! {
struct IdleTask<T> { struct IdleTask<T> {
#[pin] #[pin]
interval: Interval, interval: Interval,
pool: WeakOpt<Mutex<PoolInner<T>>>, pool: WeakOpt<Mutex<PoolInner<T>>>,
// This allows the IdleTask to be notified as soon as the entire // This allows the IdleTask to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on, // Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped. // but Err(Canceled) will be received when the Pool is dropped.
#[pin] #[pin]
pool_drop_notifier: oneshot::Receiver<crate::common::Never>, pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
}
} }
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
@@ -776,7 +777,7 @@ mod tests {
use std::time::Duration; use std::time::Duration;
use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
use crate::common::{task, exec::Exec, Future, Pin}; use crate::common::{exec::Exec, task, Future, Pin};
/// Test unique reservations. /// Test unique reservations.
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]

View File

@@ -1,6 +1,6 @@
use std::mem; use std::mem;
use pin_project::pin_project; use pin_project_lite::pin_project;
use tokio::sync::watch; use tokio::sync::watch;
use super::{task, Future, Pin, Poll}; use super::{task, Future, Pin, Poll};
@@ -21,14 +21,15 @@ pub(crate) struct Watch {
rx: watch::Receiver<()>, rx: watch::Receiver<()>,
} }
#[allow(missing_debug_implementations)] pin_project! {
#[pin_project] #[allow(missing_debug_implementations)]
pub struct Watching<F, FN> { pub struct Watching<F, FN> {
#[pin] #[pin]
future: F, future: F,
state: State<FN>, state: State<FN>,
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>, watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
_rx: watch::Receiver<()>, _rx: watch::Receiver<()>,
}
} }
enum State<F> { enum State<F> {

View File

@@ -1,4 +1,4 @@
use pin_project::pin_project; use pin_project_lite::pin_project;
use super::{task, Future, Pin, Poll}; use super::{task, Future, Pin, Poll};
@@ -12,23 +12,27 @@ where
R: Future + Unpin, R: Future + Unpin,
{ {
Lazy { Lazy {
inner: Inner::Init(func), inner: Inner::Init { func },
} }
} }
// FIXME: allow() required due to `impl Trait` leaking types to this lint // FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)] pin_project! {
#[pin_project] #[allow(missing_debug_implementations)]
pub(crate) struct Lazy<F, R> { pub(crate) struct Lazy<F, R> {
#[pin] #[pin]
inner: Inner<F, R>, inner: Inner<F, R>,
}
} }
#[pin_project(project = InnerProj, project_replace = InnerProjReplace)] pin_project! {
enum Inner<F, R> { #[project = InnerProj]
Init(F), #[project_replace = InnerProjReplace]
Fut(#[pin] R), enum Inner<F, R> {
Empty, Init { func: F },
Fut { #[pin] fut: R },
Empty,
}
} }
impl<F, R> Started for Lazy<F, R> impl<F, R> Started for Lazy<F, R>
@@ -38,8 +42,8 @@ where
{ {
fn started(&self) -> bool { fn started(&self) -> bool {
match self.inner { match self.inner {
Inner::Init(_) => false, Inner::Init { .. } => false,
Inner::Fut(_) | Inner::Empty => true, Inner::Fut { .. } | Inner::Empty => true,
} }
} }
} }
@@ -54,15 +58,15 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project(); let mut this = self.project();
if let InnerProj::Fut(f) = this.inner.as_mut().project() { if let InnerProj::Fut { fut } = this.inner.as_mut().project() {
return f.poll(cx); return fut.poll(cx);
} }
match this.inner.as_mut().project_replace(Inner::Empty) { match this.inner.as_mut().project_replace(Inner::Empty) {
InnerProjReplace::Init(func) => { InnerProjReplace::Init { func } => {
this.inner.set(Inner::Fut(func())); this.inner.set(Inner::Fut { fut: func() });
if let InnerProj::Fut(f) = this.inner.project() { if let InnerProj::Fut { fut } = this.inner.project() {
return f.poll(cx); return fut.poll(cx);
} }
unreachable!() unreachable!()
} }

View File

@@ -44,10 +44,13 @@ cfg_server! {
} }
cfg_client! { cfg_client! {
pub(crate) struct Client<B> { pin_project_lite::pin_project! {
callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>, pub(crate) struct Client<B> {
rx: ClientRx<B>, callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
rx_closed: bool, #[pin]
rx: ClientRx<B>,
rx_closed: bool,
}
} }
type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>; type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;

View File

@@ -5,7 +5,7 @@ use http::header::{
TRANSFER_ENCODING, UPGRADE, TRANSFER_ENCODING, UPGRADE,
}; };
use http::HeaderMap; use http::HeaderMap;
use pin_project::pin_project; use pin_project_lite::pin_project;
use std::error::Error as StdError; use std::error::Error as StdError;
use std::io::{self, Cursor, IoSlice}; use std::io::{self, Cursor, IoSlice};
use std::mem; use std::mem;
@@ -88,15 +88,16 @@ fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
// body adapters used by both Client and Server // body adapters used by both Client and Server
#[pin_project] pin_project! {
struct PipeToSendStream<S> struct PipeToSendStream<S>
where where
S: HttpBody, S: HttpBody,
{ {
body_tx: SendStream<SendBuf<S::Data>>, body_tx: SendStream<SendBuf<S::Data>>,
data_done: bool, data_done: bool,
#[pin] #[pin]
stream: S, stream: S,
}
} }
impl<S> PipeToSendStream<S> impl<S> PipeToSendStream<S>

View File

@@ -7,7 +7,7 @@ use bytes::Bytes;
use h2::server::{Connection, Handshake, SendResponse}; use h2::server::{Connection, Handshake, SendResponse};
use h2::{Reason, RecvStream}; use h2::{Reason, RecvStream};
use http::{Method, Request}; use http::{Method, Request};
use pin_project::pin_project; use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use super::{ping, PipeToSendStream, SendBuf}; use super::{ping, PipeToSendStream, SendBuf};
@@ -62,15 +62,16 @@ impl Default for Config {
} }
} }
#[pin_project] pin_project! {
pub(crate) struct Server<T, S, B, E> pub(crate) struct Server<T, S, B, E>
where where
S: HttpService<Body>, S: HttpService<Body>,
B: HttpBody, B: HttpBody,
{ {
exec: E, exec: E,
service: S, service: S,
state: State<T, B>, state: State<T, B>,
}
} }
enum State<T, B> enum State<T, B>
@@ -348,24 +349,34 @@ where
} }
} }
#[allow(missing_debug_implementations)] pin_project! {
#[pin_project] #[allow(missing_debug_implementations)]
pub struct H2Stream<F, B> pub struct H2Stream<F, B>
where where
B: HttpBody, B: HttpBody,
{ {
reply: SendResponse<SendBuf<B::Data>>, reply: SendResponse<SendBuf<B::Data>>,
#[pin] #[pin]
state: H2StreamState<F, B>, state: H2StreamState<F, B>,
}
} }
#[pin_project(project = H2StreamStateProj)] pin_project! {
enum H2StreamState<F, B> #[project = H2StreamStateProj]
where enum H2StreamState<F, B>
B: HttpBody, where
{ B: HttpBody,
Service(#[pin] F, Option<ConnectParts>), {
Body(#[pin] PipeToSendStream<B>), Service {
#[pin]
fut: F,
connect_parts: Option<ConnectParts>,
},
Body {
#[pin]
pipe: PipeToSendStream<B>,
},
}
} }
struct ConnectParts { struct ConnectParts {
@@ -385,7 +396,7 @@ where
) -> H2Stream<F, B> { ) -> H2Stream<F, B> {
H2Stream { H2Stream {
reply: respond, reply: respond,
state: H2StreamState::Service(fut, connect_parts), state: H2StreamState::Service { fut, connect_parts },
} }
} }
} }
@@ -415,7 +426,10 @@ where
let mut me = self.project(); let mut me = self.project();
loop { loop {
let next = match me.state.as_mut().project() { let next = match me.state.as_mut().project() {
H2StreamStateProj::Service(h, connect_parts) => { H2StreamStateProj::Service {
fut: h,
connect_parts,
} => {
let res = match h.poll(cx) { let res = match h.poll(cx) {
Poll::Ready(Ok(r)) => r, Poll::Ready(Ok(r)) => r,
Poll::Pending => { Poll::Pending => {
@@ -476,13 +490,15 @@ where
if !body.is_end_stream() { if !body.is_end_stream() {
let body_tx = reply!(me, res, false); let body_tx = reply!(me, res, false);
H2StreamState::Body(PipeToSendStream::new(body, body_tx)) H2StreamState::Body {
pipe: PipeToSendStream::new(body, body_tx),
}
} else { } else {
reply!(me, res, true); reply!(me, res, true);
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
} }
H2StreamStateProj::Body(pipe) => { H2StreamStateProj::Body { pipe } => {
return pipe.poll(cx); return pipe.poll(cx);
} }
}; };

View File

@@ -9,7 +9,7 @@
#[cfg(feature = "stream")] #[cfg(feature = "stream")]
use futures_core::Stream; use futures_core::Stream;
#[cfg(feature = "stream")] #[cfg(feature = "stream")]
use pin_project::pin_project; use pin_project_lite::pin_project;
use crate::common::{ use crate::common::{
task::{self, Poll}, task::{self, Poll},
@@ -86,8 +86,12 @@ pub fn from_stream<S, IO, E>(stream: S) -> impl Accept<Conn = IO, Error = E>
where where
S: Stream<Item = Result<IO, E>>, S: Stream<Item = Result<IO, E>>,
{ {
#[pin_project] pin_project! {
struct FromStream<S>(#[pin] S); struct FromStream<S> {
#[pin]
stream: S,
}
}
impl<S, IO, E> Accept for FromStream<S> impl<S, IO, E> Accept for FromStream<S>
where where
@@ -99,9 +103,9 @@ where
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut task::Context<'_>, cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> { ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
self.project().0.poll_next(cx) self.project().stream.poll_next(cx)
} }
} }
FromStream(stream) FromStream { stream }
} }

View File

@@ -45,7 +45,7 @@
use std::error::Error as StdError; use std::error::Error as StdError;
use std::fmt; use std::fmt;
#[cfg(feature = "http1")] #[cfg(not(all(feature = "http1", feature = "http2")))]
use std::marker::PhantomData; use std::marker::PhantomData;
#[cfg(feature = "tcp")] #[cfg(feature = "tcp")]
use std::net::SocketAddr; use std::net::SocketAddr;
@@ -53,7 +53,7 @@ use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use bytes::Bytes; use bytes::Bytes;
use pin_project::pin_project; use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use super::accept::Accept; use super::accept::Accept;
@@ -61,6 +61,8 @@ use crate::body::{Body, HttpBody};
use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec}; use crate::common::exec::{ConnStreamExec, Exec, NewSvcExec};
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
use crate::common::io::Rewind; use crate::common::io::Rewind;
#[cfg(not(all(feature = "http1", feature = "http2")))]
use crate::common::Never;
use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::common::{task, Future, Pin, Poll, Unpin};
#[cfg(all(feature = "http1", feature = "http2"))] #[cfg(all(feature = "http1", feature = "http2"))]
use crate::error::{Kind, Parse}; use crate::error::{Kind, Parse};
@@ -111,77 +113,93 @@ enum ConnectionMode {
Fallback, Fallback,
} }
/// A stream mapping incoming IOs to new services. pin_project! {
/// /// A stream mapping incoming IOs to new services.
/// Yields `Connecting`s that are futures that should be put on a reactor. ///
#[must_use = "streams do nothing unless polled"] /// Yields `Connecting`s that are futures that should be put on a reactor.
#[pin_project] #[must_use = "streams do nothing unless polled"]
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Serve<I, S, E = Exec> { pub(super) struct Serve<I, S, E = Exec> {
#[pin]
incoming: I,
make_service: S,
protocol: Http<E>,
}
/// A future building a new `Service` to a `Connection`.
///
/// Wraps the future returned from `MakeService` into one that returns
/// a `Connection`.
#[must_use = "futures do nothing unless polled"]
#[pin_project]
#[derive(Debug)]
pub struct Connecting<I, F, E = Exec> {
#[pin]
future: F,
io: Option<I>,
protocol: Http<E>,
}
#[must_use = "futures do nothing unless polled"]
#[pin_project]
#[derive(Debug)]
pub(super) struct SpawnAll<I, S, E> {
// TODO: re-add `pub(super)` once rustdoc can handle this.
//
// See https://github.com/rust-lang/rust/issues/64705
#[pin]
pub(super) serve: Serve<I, S, E>,
}
/// A future binding a connection with a Service.
///
/// Polling this future will drive HTTP forward.
#[must_use = "futures do nothing unless polled"]
#[pin_project]
pub struct Connection<T, S, E = Exec>
where
S: HttpService<Body>,
{
pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
#[cfg(all(feature = "http1", feature = "http2"))]
fallback: Fallback<E>,
}
#[pin_project(project = ProtoServerProj)]
pub(super) enum ProtoServer<T, B, S, E = Exec>
where
S: HttpService<Body>,
B: HttpBody,
{
#[cfg(feature = "http1")]
H1(
#[pin] #[pin]
proto::h1::Dispatcher< incoming: I,
proto::h1::dispatch::Server<S, Body>, make_service: S,
B, protocol: Http<E>,
T, }
proto::ServerTransaction, }
>,
PhantomData<E>, pin_project! {
), /// A future building a new `Service` to a `Connection`.
#[cfg(feature = "http2")] ///
H2(#[pin] proto::h2::Server<Rewind<T>, S, B, E>), /// Wraps the future returned from `MakeService` into one that returns
/// a `Connection`.
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Connecting<I, F, E = Exec> {
#[pin]
future: F,
io: Option<I>,
protocol: Http<E>,
}
}
pin_project! {
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub(super) struct SpawnAll<I, S, E> {
// TODO: re-add `pub(super)` once rustdoc can handle this.
//
// See https://github.com/rust-lang/rust/issues/64705
#[pin]
pub(super) serve: Serve<I, S, E>,
}
}
pin_project! {
/// A future binding a connection with a Service.
///
/// Polling this future will drive HTTP forward.
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, S, E = Exec>
where
S: HttpService<Body>,
{
pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
fallback: Fallback<E>,
}
}
#[cfg(feature = "http1")]
type Http1Dispatcher<T, B, S> =
proto::h1::Dispatcher<proto::h1::dispatch::Server<S, Body>, B, T, proto::ServerTransaction>;
#[cfg(not(feature = "http1"))]
type Http1Dispatcher<T, B, S> = (Never, PhantomData<(T, Box<Pin<B>>, Box<Pin<S>>)>);
#[cfg(feature = "http2")]
type Http2Server<T, B, S, E> = proto::h2::Server<Rewind<T>, S, B, E>;
#[cfg(not(feature = "http2"))]
type Http2Server<T, B, S, E> = (
Never,
PhantomData<(T, Box<Pin<S>>, Box<Pin<B>>, Box<Pin<E>>)>,
);
pin_project! {
#[project = ProtoServerProj]
pub(super) enum ProtoServer<T, B, S, E = Exec>
where
S: HttpService<Body>,
B: HttpBody,
{
H1 {
#[pin]
h1: Http1Dispatcher<T, B, S>,
},
H2 {
#[pin]
h2: Http2Server<T, B, S, E>,
},
}
} }
#[cfg(all(feature = "http1", feature = "http2"))] #[cfg(all(feature = "http1", feature = "http2"))]
@@ -191,6 +209,9 @@ enum Fallback<E> {
Http1Only, Http1Only,
} }
#[cfg(not(all(feature = "http1", feature = "http2")))]
type Fallback<E> = PhantomData<E>;
#[cfg(all(feature = "http1", feature = "http2"))] #[cfg(all(feature = "http1", feature = "http2"))]
impl<E> Fallback<E> { impl<E> Fallback<E> {
fn to_h2(&self) -> bool { fn to_h2(&self) -> bool {
@@ -557,7 +578,9 @@ impl<E> Http<E> {
conn.set_max_buf_size(max); conn.set_max_buf_size(max);
} }
let sd = proto::h1::dispatch::Server::new(service); let sd = proto::h1::dispatch::Server::new(service);
ProtoServer::H1(proto::h1::Dispatcher::new(sd, conn), PhantomData) ProtoServer::H1 {
h1: proto::h1::Dispatcher::new(sd, conn),
}
}}; }};
} }
@@ -573,19 +596,20 @@ impl<E> Http<E> {
let rewind_io = Rewind::new(io); let rewind_io = Rewind::new(io);
let h2 = let h2 =
proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone()); proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone());
ProtoServer::H2(h2) ProtoServer::H2 { h2 }
} }
}; };
Connection { Connection {
conn: Some(proto), conn: Some(proto),
#[cfg(feature = "http1")] #[cfg(all(feature = "http1", feature = "http2"))]
#[cfg(feature = "http2")]
fallback: if self.mode == ConnectionMode::Fallback { fallback: if self.mode == ConnectionMode::Fallback {
Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone()) Fallback::ToHttp2(self.h2_builder.clone(), self.exec.clone())
} else { } else {
Fallback::Http1Only Fallback::Http1Only
}, },
#[cfg(not(all(feature = "http1", feature = "http2")))]
fallback: PhantomData,
} }
} }
@@ -628,17 +652,22 @@ where
/// This should only be called while the `Connection` future is still /// This should only be called while the `Connection` future is still
/// pending. If called after `Connection::poll` has resolved, this does /// pending. If called after `Connection::poll` has resolved, this does
/// nothing. /// nothing.
pub fn graceful_shutdown(self: Pin<&mut Self>) { pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
match self.project().conn { match self.conn {
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
Some(ProtoServer::H1(ref mut h1, _)) => { Some(ProtoServer::H1 { ref mut h1, .. }) => {
h1.disable_keep_alive(); h1.disable_keep_alive();
} }
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
Some(ProtoServer::H2(ref mut h2)) => { Some(ProtoServer::H2 { ref mut h2 }) => {
h2.graceful_shutdown(); h2.graceful_shutdown();
} }
None => (), None => (),
#[cfg(not(feature = "http1"))]
Some(ProtoServer::H1 { ref mut h1, .. }) => match h1.0 {},
#[cfg(not(feature = "http2"))]
Some(ProtoServer::H2 { ref mut h2 }) => match h2.0 {},
} }
} }
@@ -662,7 +691,7 @@ where
pub fn try_into_parts(self) -> Option<Parts<I, S>> { pub fn try_into_parts(self) -> Option<Parts<I, S>> {
match self.conn.unwrap() { match self.conn.unwrap() {
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
ProtoServer::H1(h1, _) => { ProtoServer::H1 { h1, .. } => {
let (io, read_buf, dispatch) = h1.into_inner(); let (io, read_buf, dispatch) = h1.into_inner();
Some(Parts { Some(Parts {
io, io,
@@ -671,8 +700,10 @@ where
_inner: (), _inner: (),
}) })
} }
#[cfg(feature = "http2")] ProtoServer::H2 { .. } => None,
ProtoServer::H2(_h2) => None,
#[cfg(not(feature = "http1"))]
ProtoServer::H1 { h1, .. } => match h1.0 {},
} }
} }
@@ -696,7 +727,7 @@ where
loop { loop {
match *self.conn.as_mut().unwrap() { match *self.conn.as_mut().unwrap() {
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
ProtoServer::H1(ref mut h1, _) => match ready!(h1.poll_without_shutdown(cx)) { ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) {
Ok(()) => return Poll::Ready(Ok(())), Ok(()) => return Poll::Ready(Ok(())),
Err(e) => { Err(e) => {
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
@@ -712,7 +743,12 @@ where
} }
}, },
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
ProtoServer::H2(ref mut h2) => return Pin::new(h2).poll(cx).map_ok(|_| ()), ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()),
#[cfg(not(feature = "http1"))]
ProtoServer::H1 { ref mut h1, .. } => match h1.0 {},
#[cfg(not(feature = "http2"))]
ProtoServer::H2 { ref mut h2 } => match h2.0 {},
}; };
} }
} }
@@ -738,8 +774,8 @@ where
let conn = self.conn.take(); let conn = self.conn.take();
let (io, read_buf, dispatch) = match conn.unwrap() { let (io, read_buf, dispatch) = match conn.unwrap() {
ProtoServer::H1(h1, _) => h1.into_inner(), ProtoServer::H1 { h1, .. } => h1.into_inner(),
ProtoServer::H2(_h2) => { ProtoServer::H2 { .. } => {
panic!("h2 cannot into_inner"); panic!("h2 cannot into_inner");
} }
}; };
@@ -752,7 +788,7 @@ where
let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone()); let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone());
debug_assert!(self.conn.is_none()); debug_assert!(self.conn.is_none());
self.conn = Some(ProtoServer::H2(h2)); self.conn = Some(ProtoServer::H2 { h2 });
} }
/// Enable this connection to support higher-level HTTP upgrades. /// Enable this connection to support higher-level HTTP upgrades.
@@ -986,9 +1022,14 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.project() { match self.project() {
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
ProtoServerProj::H1(s, _) => s.poll(cx), ProtoServerProj::H1 { h1, .. } => h1.poll(cx),
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
ProtoServerProj::H2(s) => s.poll(cx), ProtoServerProj::H2 { h2 } => h2.poll(cx),
#[cfg(not(feature = "http1"))]
ProtoServerProj::H1 { h1, .. } => match h1.0 {},
#[cfg(not(feature = "http2"))]
ProtoServerProj::H2 { h2 } => match h2.0 {},
} }
} }
} }
@@ -1002,7 +1043,7 @@ pub(crate) mod spawn_all {
use crate::common::exec::ConnStreamExec; use crate::common::exec::ConnStreamExec;
use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::service::HttpService; use crate::service::HttpService;
use pin_project::pin_project; use pin_project_lite::pin_project;
// Used by `SpawnAll` to optionally watch a `Connection` future. // Used by `SpawnAll` to optionally watch a `Connection` future.
// //
@@ -1047,23 +1088,36 @@ pub(crate) mod spawn_all {
// Users cannot import this type, nor the associated `NewSvcExec`. Instead, // Users cannot import this type, nor the associated `NewSvcExec`. Instead,
// a blanket implementation for `Executor<impl Future>` is sufficient. // a blanket implementation for `Executor<impl Future>` is sufficient.
#[pin_project] pin_project! {
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> { pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
#[pin] #[pin]
state: State<I, N, S, E, W>, state: State<I, N, S, E, W>,
}
} }
#[pin_project(project = StateProj)] pin_project! {
pub(super) enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> { #[project = StateProj]
Connecting(#[pin] Connecting<I, N, E>, W), pub(super) enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
Connected(#[pin] W::Future), Connecting {
#[pin]
connecting: Connecting<I, N, E>,
watcher: W,
},
Connected {
#[pin]
future: W::Future,
},
}
} }
impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> { impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self { pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self {
NewSvcTask { NewSvcTask {
state: State::Connecting(connecting, watcher), state: State::Connecting {
connecting,
watcher,
},
} }
} }
} }
@@ -1090,7 +1144,10 @@ pub(crate) mod spawn_all {
loop { loop {
let next = { let next = {
match me.state.as_mut().project() { match me.state.as_mut().project() {
StateProj::Connecting(connecting, watcher) => { StateProj::Connecting {
connecting,
watcher,
} => {
let res = ready!(connecting.poll(cx)); let res = ready!(connecting.poll(cx));
let conn = match res { let conn = match res {
Ok(conn) => conn, Ok(conn) => conn,
@@ -1100,10 +1157,10 @@ pub(crate) mod spawn_all {
return Poll::Ready(()); return Poll::Ready(());
} }
}; };
let connected = watcher.watch(conn.with_upgrades()); let future = watcher.watch(conn.with_upgrades());
State::Connected(connected) State::Connected { future }
} }
StateProj::Connected(future) => { StateProj::Connected { future } => {
return future.poll(cx).map(|res| { return future.poll(cx).map(|res| {
if let Err(err) = res { if let Err(err) = res {
debug!("connection error: {}", err); debug!("connection error: {}", err);
@@ -1171,7 +1228,7 @@ mod upgrades {
#[cfg(feature = "http1")] #[cfg(feature = "http1")]
Ok(proto::Dispatched::Upgrade(pending)) => { Ok(proto::Dispatched::Upgrade(pending)) => {
match self.inner.conn.take() { match self.inner.conn.take() {
Some(ProtoServer::H1(h1, _)) => { Some(ProtoServer::H1 { h1, .. }) => {
let (io, buf, _) = h1.into_inner(); let (io, buf, _) = h1.into_inner();
pending.fulfill(Upgraded::new(io, buf)); pending.fulfill(Upgraded::new(io, buf));
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));

View File

@@ -6,7 +6,7 @@ use std::net::{SocketAddr, TcpListener as StdTcpListener};
#[cfg(feature = "tcp")] #[cfg(feature = "tcp")]
use std::time::Duration; use std::time::Duration;
use pin_project::pin_project; use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use super::accept::Accept; use super::accept::Accept;
@@ -21,16 +21,17 @@ use super::shutdown::{Graceful, GracefulWatcher};
#[cfg(feature = "tcp")] #[cfg(feature = "tcp")]
use super::tcp::AddrIncoming; use super::tcp::AddrIncoming;
/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default. pin_project! {
/// /// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
/// `Server` is a `Future` mapping a bound listener with a set of service ///
/// handlers. It is built using the [`Builder`](Builder), and the future /// `Server` is a `Future` mapping a bound listener with a set of service
/// completes when the server has been shutdown. It should be run by an /// handlers. It is built using the [`Builder`](Builder), and the future
/// `Executor`. /// completes when the server has been shutdown. It should be run by an
#[pin_project] /// `Executor`.
pub struct Server<I, S, E = Exec> { pub struct Server<I, S, E = Exec> {
#[pin] #[pin]
spawn_all: SpawnAll<I, S, E>, spawn_all: SpawnAll<I, S, E>,
}
} }
/// A builder for a [`Server`](Server). /// A builder for a [`Server`](Server).

View File

@@ -1,33 +1,36 @@
use std::error::Error as StdError; use std::error::Error as StdError;
use pin_project::pin_project; use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
use super::accept::Accept; use super::accept::Accept;
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
use crate::body::{Body, HttpBody}; use crate::body::{Body, HttpBody};
use crate::common::drain::{self, Draining, Signal, Watch, Watching}; use crate::common::drain::{self, Draining, Signal, Watch, Watching};
use crate::common::exec::{ConnStreamExec, NewSvcExec}; use crate::common::exec::{ConnStreamExec, NewSvcExec};
use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::service::{HttpService, MakeServiceRef}; use crate::service::{HttpService, MakeServiceRef};
#[allow(missing_debug_implementations)] pin_project! {
#[pin_project] #[allow(missing_debug_implementations)]
pub struct Graceful<I, S, F, E> { pub struct Graceful<I, S, F, E> {
#[pin] #[pin]
state: State<I, S, F, E>, state: State<I, S, F, E>,
}
} }
#[pin_project(project = StateProj)] pin_project! {
pub(super) enum State<I, S, F, E> { #[project = StateProj]
Running { pub(super) enum State<I, S, F, E> {
drain: Option<(Signal, Watch)>, Running {
#[pin] drain: Option<(Signal, Watch)>,
spawn_all: SpawnAll<I, S, E>, #[pin]
#[pin] spawn_all: SpawnAll<I, S, E>,
signal: F, #[pin]
}, signal: F,
Draining(Draining), },
Draining { draining: Draining },
}
} }
impl<I, S, F, E> Graceful<I, S, F, E> { impl<I, S, F, E> Graceful<I, S, F, E> {
@@ -71,14 +74,16 @@ where
Poll::Ready(()) => { Poll::Ready(()) => {
debug!("signal received, starting graceful shutdown"); debug!("signal received, starting graceful shutdown");
let sig = drain.take().expect("drain channel").0; let sig = drain.take().expect("drain channel").0;
State::Draining(sig.drain()) State::Draining {
draining: sig.drain(),
}
} }
Poll::Pending => { Poll::Pending => {
let watch = drain.as_ref().expect("drain channel").1.clone(); let watch = drain.as_ref().expect("drain channel").1.clone();
return spawn_all.poll_watch(cx, &GracefulWatcher(watch)); return spawn_all.poll_watch(cx, &GracefulWatcher(watch));
} }
}, },
StateProj::Draining(ref mut draining) => { StateProj::Draining { ref mut draining } => {
return Pin::new(draining).poll(cx).map(Ok); return Pin::new(draining).poll(cx).map(Ok);
} }
} }

View File

@@ -199,13 +199,14 @@ mod addr_stream {
use crate::common::{task, Pin, Poll}; use crate::common::{task, Pin, Poll};
/// A transport returned yieled by `AddrIncoming`. pin_project_lite::pin_project! {
#[pin_project::pin_project] /// A transport returned yieled by `AddrIncoming`.
#[derive(Debug)] #[derive(Debug)]
pub struct AddrStream { pub struct AddrStream {
#[pin] #[pin]
inner: TcpStream, inner: TcpStream,
pub(super) remote_addr: SocketAddr, pub(super) remote_addr: SocketAddr,
}
} }
impl AddrStream { impl AddrStream {

View File

@@ -1,6 +1,6 @@
// TODO: Eventually to be replaced with tower_util::Oneshot. // TODO: Eventually to be replaced with tower_util::Oneshot.
use pin_project::pin_project; use pin_project_lite::pin_project;
use tower_service::Service; use tower_service::Service;
use crate::common::{task, Future, Pin, Poll}; use crate::common::{task, Future, Pin, Poll};
@@ -10,25 +10,35 @@ where
S: Service<Req>, S: Service<Req>,
{ {
Oneshot { Oneshot {
state: State::NotReady(svc, req), state: State::NotReady { svc, req },
} }
} }
// A `Future` consuming a `Service` and request, waiting until the `Service` pin_project! {
// is ready, and then calling `Service::call` with the request, and // A `Future` consuming a `Service` and request, waiting until the `Service`
// waiting for that `Future`. // is ready, and then calling `Service::call` with the request, and
#[allow(missing_debug_implementations)] // waiting for that `Future`.
#[pin_project] #[allow(missing_debug_implementations)]
pub struct Oneshot<S: Service<Req>, Req> { pub struct Oneshot<S: Service<Req>, Req> {
#[pin] #[pin]
state: State<S, Req>, state: State<S, Req>,
}
} }
#[pin_project(project = StateProj, project_replace = StateProjOwn)] pin_project! {
enum State<S: Service<Req>, Req> { #[project = StateProj]
NotReady(S, Req), #[project_replace = StateProjOwn]
Called(#[pin] S::Future), enum State<S: Service<Req>, Req> {
Tmp, NotReady {
svc: S,
req: Req,
},
Called {
#[pin]
fut: S::Future,
},
Tmp,
}
} }
impl<S, Req> Future for Oneshot<S, Req> impl<S, Req> Future for Oneshot<S, Req>
@@ -42,19 +52,19 @@ where
loop { loop {
match me.state.as_mut().project() { match me.state.as_mut().project() {
StateProj::NotReady(ref mut svc, _) => { StateProj::NotReady { ref mut svc, .. } => {
ready!(svc.poll_ready(cx))?; ready!(svc.poll_ready(cx))?;
// fallthrough out of the match's borrow // fallthrough out of the match's borrow
} }
StateProj::Called(fut) => { StateProj::Called { fut } => {
return fut.poll(cx); return fut.poll(cx);
} }
StateProj::Tmp => unreachable!(), StateProj::Tmp => unreachable!(),
} }
match me.state.as_mut().project_replace(State::Tmp) { match me.state.as_mut().project_replace(State::Tmp) {
StateProjOwn::NotReady(mut svc, req) => { StateProjOwn::NotReady { mut svc, req } => {
me.state.set(State::Called(svc.call(req))); me.state.set(State::Called { fut: svc.call(req) });
} }
_ => unreachable!(), _ => unreachable!(),
} }