refactor(lib): Switch from pin-project to pin-project-lite
This commit is contained in:
committed by
Sean McArthur
parent
9dff00425d
commit
43412a950f
@@ -34,7 +34,7 @@ httparse = "1.0"
|
|||||||
h2 = { version = "0.3", optional = true }
|
h2 = { version = "0.3", optional = true }
|
||||||
itoa = "0.4.1"
|
itoa = "0.4.1"
|
||||||
tracing = { version = "0.1", default-features = false, features = ["std"] }
|
tracing = { version = "0.1", default-features = false, features = ["std"] }
|
||||||
pin-project = "1.0"
|
pin-project-lite = "0.2.4"
|
||||||
tower-service = "0.3"
|
tower-service = "0.3"
|
||||||
tokio = { version = "1", features = ["sync"] }
|
tokio = { version = "1", features = ["sync"] }
|
||||||
want = "0.3"
|
want = "0.3"
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_util::future::{self, Either, FutureExt as _};
|
use futures_util::future::{self, Either, FutureExt as _};
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tower_service::Service;
|
use tower_service::Service;
|
||||||
|
|
||||||
@@ -75,15 +75,23 @@ use crate::{Body, Request, Response};
|
|||||||
#[cfg(feature = "http1")]
|
#[cfg(feature = "http1")]
|
||||||
type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, R>;
|
type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, R>;
|
||||||
|
|
||||||
#[pin_project(project = ProtoClientProj)]
|
pin_project! {
|
||||||
enum ProtoClient<T, B>
|
#[project = ProtoClientProj]
|
||||||
where
|
enum ProtoClient<T, B>
|
||||||
B: HttpBody,
|
where
|
||||||
{
|
B: HttpBody,
|
||||||
#[cfg(feature = "http1")]
|
{
|
||||||
H1(#[pin] Http1Dispatcher<T, B, proto::h1::ClientTransaction>),
|
#[cfg(feature = "http1")]
|
||||||
#[cfg(feature = "http2")]
|
H1 {
|
||||||
H2(#[pin] proto::h2::ClientTask<B>, PhantomData<fn(T)>),
|
#[pin]
|
||||||
|
h1: Http1Dispatcher<T, B, proto::h1::ClientTransaction>,
|
||||||
|
},
|
||||||
|
#[cfg(feature = "http2")]
|
||||||
|
H2 {
|
||||||
|
#[pin]
|
||||||
|
h2: proto::h2::ClientTask<B>, _phantom: PhantomData<fn(T)>,
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a handshake future over some IO.
|
/// Returns a handshake future over some IO.
|
||||||
@@ -400,7 +408,7 @@ where
|
|||||||
pub fn into_parts(self) -> Parts<T> {
|
pub fn into_parts(self) -> Parts<T> {
|
||||||
match self.inner.expect("already upgraded") {
|
match self.inner.expect("already upgraded") {
|
||||||
#[cfg(feature = "http1")]
|
#[cfg(feature = "http1")]
|
||||||
ProtoClient::H1(h1) => {
|
ProtoClient::H1 { h1 } => {
|
||||||
let (io, read_buf, _) = h1.into_inner();
|
let (io, read_buf, _) = h1.into_inner();
|
||||||
Parts {
|
Parts {
|
||||||
io,
|
io,
|
||||||
@@ -409,7 +417,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[cfg(feature = "http2")]
|
#[cfg(feature = "http2")]
|
||||||
ProtoClient::H2(..) => {
|
ProtoClient::H2 { .. } => {
|
||||||
panic!("http2 cannot into_inner");
|
panic!("http2 cannot into_inner");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -429,9 +437,9 @@ where
|
|||||||
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
|
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
|
||||||
match *self.inner.as_mut().expect("already upgraded") {
|
match *self.inner.as_mut().expect("already upgraded") {
|
||||||
#[cfg(feature = "http1")]
|
#[cfg(feature = "http1")]
|
||||||
ProtoClient::H1(ref mut h1) => h1.poll_without_shutdown(cx),
|
ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
|
||||||
#[cfg(feature = "http2")]
|
#[cfg(feature = "http2")]
|
||||||
ProtoClient::H2(ref mut h2, _) => Pin::new(h2).poll(cx).map_ok(|_| ()),
|
ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -460,7 +468,7 @@ where
|
|||||||
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
|
proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
|
||||||
#[cfg(feature = "http1")]
|
#[cfg(feature = "http1")]
|
||||||
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
|
proto::Dispatched::Upgrade(pending) => match self.inner.take() {
|
||||||
Some(ProtoClient::H1(h1)) => {
|
Some(ProtoClient::H1 { h1 }) => {
|
||||||
let (io, buf, _) = h1.into_inner();
|
let (io, buf, _) = h1.into_inner();
|
||||||
pending.fulfill(Upgraded::new(io, buf));
|
pending.fulfill(Upgraded::new(io, buf));
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
@@ -707,14 +715,17 @@ impl Builder {
|
|||||||
}
|
}
|
||||||
let cd = proto::h1::dispatch::Client::new(rx);
|
let cd = proto::h1::dispatch::Client::new(rx);
|
||||||
let dispatch = proto::h1::Dispatcher::new(cd, conn);
|
let dispatch = proto::h1::Dispatcher::new(cd, conn);
|
||||||
ProtoClient::H1(dispatch)
|
ProtoClient::H1 { h1: dispatch }
|
||||||
}
|
}
|
||||||
#[cfg(feature = "http2")]
|
#[cfg(feature = "http2")]
|
||||||
Proto::Http2 => {
|
Proto::Http2 => {
|
||||||
let h2 =
|
let h2 =
|
||||||
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
|
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
|
||||||
.await?;
|
.await?;
|
||||||
ProtoClient::H2(h2, PhantomData)
|
ProtoClient::H2 {
|
||||||
|
h2,
|
||||||
|
_phantom: PhantomData,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -768,9 +779,9 @@ where
|
|||||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||||
match self.project() {
|
match self.project() {
|
||||||
#[cfg(feature = "http1")]
|
#[cfg(feature = "http1")]
|
||||||
ProtoClientProj::H1(c) => c.poll(cx),
|
ProtoClientProj::H1 { h1 } => h1.poll(cx),
|
||||||
#[cfg(feature = "http2")]
|
#[cfg(feature = "http2")]
|
||||||
ProtoClientProj::H2(c, _) => c.poll(cx),
|
ProtoClientProj::H2 { h2, .. } => h2.poll(cx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use futures_util::future::Either;
|
use futures_util::future::Either;
|
||||||
use http::uri::{Scheme, Uri};
|
use http::uri::{Scheme, Uri};
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use tokio::net::{TcpSocket, TcpStream};
|
use tokio::net::{TcpSocket, TcpStream};
|
||||||
use tokio::time::Sleep;
|
use tokio::time::Sleep;
|
||||||
|
|
||||||
@@ -373,18 +373,19 @@ impl HttpInfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not publicly exported (so missing_docs doesn't trigger).
|
pin_project! {
|
||||||
//
|
// Not publicly exported (so missing_docs doesn't trigger).
|
||||||
// We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
|
//
|
||||||
// so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
|
// We return this `Future` instead of the `Pin<Box<dyn Future>>` directly
|
||||||
// (and thus we can change the type in the future).
|
// so that users don't rely on it fitting in a `Pin<Box<dyn Future>>` slot
|
||||||
#[must_use = "futures do nothing unless polled"]
|
// (and thus we can change the type in the future).
|
||||||
#[pin_project]
|
#[must_use = "futures do nothing unless polled"]
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct HttpConnecting<R> {
|
pub struct HttpConnecting<R> {
|
||||||
#[pin]
|
#[pin]
|
||||||
fut: BoxConnecting,
|
fut: BoxConnecting,
|
||||||
_marker: PhantomData<R>,
|
_marker: PhantomData<R>,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConnectResult = Result<TcpStream, ConnectError>;
|
type ConnectResult = Result<TcpStream, ConnectError>;
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ use futures_channel::oneshot;
|
|||||||
use tokio::time::{Duration, Instant, Interval};
|
use tokio::time::{Duration, Instant, Interval};
|
||||||
|
|
||||||
use super::client::Ver;
|
use super::client::Ver;
|
||||||
use crate::common::{task, exec::Exec, Future, Pin, Poll, Unpin};
|
use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin};
|
||||||
|
|
||||||
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
@@ -714,16 +714,17 @@ impl Expiration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
#[pin_project::pin_project]
|
pin_project_lite::pin_project! {
|
||||||
struct IdleTask<T> {
|
struct IdleTask<T> {
|
||||||
#[pin]
|
#[pin]
|
||||||
interval: Interval,
|
interval: Interval,
|
||||||
pool: WeakOpt<Mutex<PoolInner<T>>>,
|
pool: WeakOpt<Mutex<PoolInner<T>>>,
|
||||||
// This allows the IdleTask to be notified as soon as the entire
|
// This allows the IdleTask to be notified as soon as the entire
|
||||||
// Pool is fully dropped, and shutdown. This channel is never sent on,
|
// Pool is fully dropped, and shutdown. This channel is never sent on,
|
||||||
// but Err(Canceled) will be received when the Pool is dropped.
|
// but Err(Canceled) will be received when the Pool is dropped.
|
||||||
#[pin]
|
#[pin]
|
||||||
pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
|
pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
@@ -776,7 +777,7 @@ mod tests {
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
|
use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
|
||||||
use crate::common::{task, exec::Exec, Future, Pin};
|
use crate::common::{exec::Exec, task, Future, Pin};
|
||||||
|
|
||||||
/// Test unique reservations.
|
/// Test unique reservations.
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use std::mem;
|
use std::mem;
|
||||||
|
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use super::{task, Future, Pin, Poll};
|
use super::{task, Future, Pin, Poll};
|
||||||
@@ -21,14 +21,15 @@ pub(crate) struct Watch {
|
|||||||
rx: watch::Receiver<()>,
|
rx: watch::Receiver<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(missing_debug_implementations)]
|
pin_project! {
|
||||||
#[pin_project]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct Watching<F, FN> {
|
pub struct Watching<F, FN> {
|
||||||
#[pin]
|
#[pin]
|
||||||
future: F,
|
future: F,
|
||||||
state: State<FN>,
|
state: State<FN>,
|
||||||
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
|
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
|
||||||
_rx: watch::Receiver<()>,
|
_rx: watch::Receiver<()>,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum State<F> {
|
enum State<F> {
|
||||||
|
|||||||
@@ -44,10 +44,13 @@ cfg_server! {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cfg_client! {
|
cfg_client! {
|
||||||
pub(crate) struct Client<B> {
|
pin_project_lite::pin_project! {
|
||||||
callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
|
pub(crate) struct Client<B> {
|
||||||
rx: ClientRx<B>,
|
callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
|
||||||
rx_closed: bool,
|
#[pin]
|
||||||
|
rx: ClientRx<B>,
|
||||||
|
rx_closed: bool,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
|
type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use http::header::{
|
|||||||
TRANSFER_ENCODING, UPGRADE,
|
TRANSFER_ENCODING, UPGRADE,
|
||||||
};
|
};
|
||||||
use http::HeaderMap;
|
use http::HeaderMap;
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use std::error::Error as StdError;
|
use std::error::Error as StdError;
|
||||||
use std::io::IoSlice;
|
use std::io::IoSlice;
|
||||||
|
|
||||||
@@ -94,15 +94,16 @@ fn decode_content_length(headers: &HeaderMap) -> DecodedLength {
|
|||||||
|
|
||||||
// body adapters used by both Client and Server
|
// body adapters used by both Client and Server
|
||||||
|
|
||||||
#[pin_project]
|
pin_project! {
|
||||||
struct PipeToSendStream<S>
|
struct PipeToSendStream<S>
|
||||||
where
|
where
|
||||||
S: HttpBody,
|
S: HttpBody,
|
||||||
{
|
{
|
||||||
body_tx: SendStream<SendBuf<S::Data>>,
|
body_tx: SendStream<SendBuf<S::Data>>,
|
||||||
data_done: bool,
|
data_done: bool,
|
||||||
#[pin]
|
#[pin]
|
||||||
stream: S,
|
stream: S,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> PipeToSendStream<S>
|
impl<S> PipeToSendStream<S>
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use h2::server::{Connection, Handshake, SendResponse};
|
use h2::server::{Connection, Handshake, SendResponse};
|
||||||
use h2::Reason;
|
use h2::Reason;
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
use super::{decode_content_length, ping, PipeToSendStream, SendBuf};
|
use super::{decode_content_length, ping, PipeToSendStream, SendBuf};
|
||||||
@@ -57,15 +57,16 @@ impl Default for Config {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project]
|
pin_project! {
|
||||||
pub(crate) struct Server<T, S, B, E>
|
pub(crate) struct Server<T, S, B, E>
|
||||||
where
|
where
|
||||||
S: HttpService<Body>,
|
S: HttpService<Body>,
|
||||||
B: HttpBody,
|
B: HttpBody,
|
||||||
{
|
{
|
||||||
exec: E,
|
exec: E,
|
||||||
service: S,
|
service: S,
|
||||||
state: State<T, B>,
|
state: State<T, B>,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum State<T, B>
|
enum State<T, B>
|
||||||
@@ -315,24 +316,33 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(missing_debug_implementations)]
|
pin_project! {
|
||||||
#[pin_project]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct H2Stream<F, B>
|
pub struct H2Stream<F, B>
|
||||||
where
|
where
|
||||||
B: HttpBody,
|
B: HttpBody,
|
||||||
{
|
{
|
||||||
reply: SendResponse<SendBuf<B::Data>>,
|
reply: SendResponse<SendBuf<B::Data>>,
|
||||||
#[pin]
|
#[pin]
|
||||||
state: H2StreamState<F, B>,
|
state: H2StreamState<F, B>,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project(project = H2StreamStateProj)]
|
pin_project! {
|
||||||
enum H2StreamState<F, B>
|
#[project = H2StreamStateProj]
|
||||||
where
|
enum H2StreamState<F, B>
|
||||||
B: HttpBody,
|
where
|
||||||
{
|
B: HttpBody,
|
||||||
Service(#[pin] F),
|
{
|
||||||
Body(#[pin] PipeToSendStream<B>),
|
Service {
|
||||||
|
#[pin]
|
||||||
|
fut: F,
|
||||||
|
},
|
||||||
|
Body {
|
||||||
|
#[pin]
|
||||||
|
pipe: PipeToSendStream<B>,
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, B> H2Stream<F, B>
|
impl<F, B> H2Stream<F, B>
|
||||||
@@ -342,7 +352,7 @@ where
|
|||||||
fn new(fut: F, respond: SendResponse<SendBuf<B::Data>>) -> H2Stream<F, B> {
|
fn new(fut: F, respond: SendResponse<SendBuf<B::Data>>) -> H2Stream<F, B> {
|
||||||
H2Stream {
|
H2Stream {
|
||||||
reply: respond,
|
reply: respond,
|
||||||
state: H2StreamState::Service(fut),
|
state: H2StreamState::Service { fut },
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -371,7 +381,7 @@ where
|
|||||||
let mut me = self.project();
|
let mut me = self.project();
|
||||||
loop {
|
loop {
|
||||||
let next = match me.state.as_mut().project() {
|
let next = match me.state.as_mut().project() {
|
||||||
H2StreamStateProj::Service(h) => {
|
H2StreamStateProj::Service { fut: h } => {
|
||||||
let res = match h.poll(cx) {
|
let res = match h.poll(cx) {
|
||||||
Poll::Ready(Ok(r)) => r,
|
Poll::Ready(Ok(r)) => r,
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
@@ -409,13 +419,15 @@ where
|
|||||||
|
|
||||||
if !body.is_end_stream() {
|
if !body.is_end_stream() {
|
||||||
let body_tx = reply!(me, res, false);
|
let body_tx = reply!(me, res, false);
|
||||||
H2StreamState::Body(PipeToSendStream::new(body, body_tx))
|
H2StreamState::Body {
|
||||||
|
pipe: PipeToSendStream::new(body, body_tx),
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
reply!(me, res, true);
|
reply!(me, res, true);
|
||||||
return Poll::Ready(Ok(()));
|
return Poll::Ready(Ok(()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
H2StreamStateProj::Body(pipe) => {
|
H2StreamStateProj::Body { pipe } => {
|
||||||
return pipe.poll(cx);
|
return pipe.poll(cx);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -9,7 +9,7 @@
|
|||||||
#[cfg(feature = "stream")]
|
#[cfg(feature = "stream")]
|
||||||
use futures_core::Stream;
|
use futures_core::Stream;
|
||||||
#[cfg(feature = "stream")]
|
#[cfg(feature = "stream")]
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use crate::common::{
|
use crate::common::{
|
||||||
task::{self, Poll},
|
task::{self, Poll},
|
||||||
@@ -86,8 +86,12 @@ pub fn from_stream<S, IO, E>(stream: S) -> impl Accept<Conn = IO, Error = E>
|
|||||||
where
|
where
|
||||||
S: Stream<Item = Result<IO, E>>,
|
S: Stream<Item = Result<IO, E>>,
|
||||||
{
|
{
|
||||||
#[pin_project]
|
pin_project! {
|
||||||
struct FromStream<S>(#[pin] S);
|
struct FromStream<S> {
|
||||||
|
#[pin]
|
||||||
|
stream: S,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<S, IO, E> Accept for FromStream<S>
|
impl<S, IO, E> Accept for FromStream<S>
|
||||||
where
|
where
|
||||||
@@ -99,9 +103,9 @@ where
|
|||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
cx: &mut task::Context<'_>,
|
cx: &mut task::Context<'_>,
|
||||||
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
|
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
|
||||||
self.project().0.poll_next(cx)
|
self.project().stream.poll_next(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
FromStream(stream)
|
FromStream { stream }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,7 +45,6 @@
|
|||||||
|
|
||||||
use std::error::Error as StdError;
|
use std::error::Error as StdError;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
#[cfg(feature = "tcp")]
|
#[cfg(feature = "tcp")]
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
@@ -53,7 +52,7 @@ use std::net::SocketAddr;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
use super::accept::Accept;
|
use super::accept::Accept;
|
||||||
@@ -109,77 +108,85 @@ enum ConnectionMode {
|
|||||||
Fallback,
|
Fallback,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A stream mapping incoming IOs to new services.
|
pin_project! {
|
||||||
///
|
/// A stream mapping incoming IOs to new services.
|
||||||
/// Yields `Connecting`s that are futures that should be put on a reactor.
|
///
|
||||||
#[must_use = "streams do nothing unless polled"]
|
/// Yields `Connecting`s that are futures that should be put on a reactor.
|
||||||
#[pin_project]
|
#[must_use = "streams do nothing unless polled"]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(super) struct Serve<I, S, E = Exec> {
|
pub(super) struct Serve<I, S, E = Exec> {
|
||||||
#[pin]
|
|
||||||
incoming: I,
|
|
||||||
make_service: S,
|
|
||||||
protocol: Http<E>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A future building a new `Service` to a `Connection`.
|
|
||||||
///
|
|
||||||
/// Wraps the future returned from `MakeService` into one that returns
|
|
||||||
/// a `Connection`.
|
|
||||||
#[must_use = "futures do nothing unless polled"]
|
|
||||||
#[pin_project]
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Connecting<I, F, E = Exec> {
|
|
||||||
#[pin]
|
|
||||||
future: F,
|
|
||||||
io: Option<I>,
|
|
||||||
protocol: Http<E>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[must_use = "futures do nothing unless polled"]
|
|
||||||
#[pin_project]
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub(super) struct SpawnAll<I, S, E> {
|
|
||||||
// TODO: re-add `pub(super)` once rustdoc can handle this.
|
|
||||||
//
|
|
||||||
// See https://github.com/rust-lang/rust/issues/64705
|
|
||||||
#[pin]
|
|
||||||
pub(super) serve: Serve<I, S, E>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A future binding a connection with a Service.
|
|
||||||
///
|
|
||||||
/// Polling this future will drive HTTP forward.
|
|
||||||
#[must_use = "futures do nothing unless polled"]
|
|
||||||
#[pin_project]
|
|
||||||
pub struct Connection<T, S, E = Exec>
|
|
||||||
where
|
|
||||||
S: HttpService<Body>,
|
|
||||||
{
|
|
||||||
pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
|
|
||||||
#[cfg(all(feature = "http1", feature = "http2"))]
|
|
||||||
fallback: Fallback<E>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[pin_project(project = ProtoServerProj)]
|
|
||||||
pub(super) enum ProtoServer<T, B, S, E = Exec>
|
|
||||||
where
|
|
||||||
S: HttpService<Body>,
|
|
||||||
B: HttpBody,
|
|
||||||
{
|
|
||||||
#[cfg(feature = "http1")]
|
|
||||||
H1(
|
|
||||||
#[pin]
|
#[pin]
|
||||||
proto::h1::Dispatcher<
|
incoming: I,
|
||||||
proto::h1::dispatch::Server<S, Body>,
|
make_service: S,
|
||||||
B,
|
protocol: Http<E>,
|
||||||
T,
|
}
|
||||||
proto::ServerTransaction,
|
}
|
||||||
>,
|
|
||||||
PhantomData<E>,
|
pin_project! {
|
||||||
),
|
/// A future building a new `Service` to a `Connection`.
|
||||||
#[cfg(feature = "http2")]
|
///
|
||||||
H2(#[pin] proto::h2::Server<Rewind<T>, S, B, E>),
|
/// Wraps the future returned from `MakeService` into one that returns
|
||||||
|
/// a `Connection`.
|
||||||
|
#[must_use = "futures do nothing unless polled"]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Connecting<I, F, E = Exec> {
|
||||||
|
#[pin]
|
||||||
|
future: F,
|
||||||
|
io: Option<I>,
|
||||||
|
protocol: Http<E>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pin_project! {
|
||||||
|
#[must_use = "futures do nothing unless polled"]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(super) struct SpawnAll<I, S, E> {
|
||||||
|
// TODO: re-add `pub(super)` once rustdoc can handle this.
|
||||||
|
//
|
||||||
|
// See https://github.com/rust-lang/rust/issues/64705
|
||||||
|
#[pin]
|
||||||
|
pub(super) serve: Serve<I, S, E>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pin_project! {
|
||||||
|
/// A future binding a connection with a Service.
|
||||||
|
///
|
||||||
|
/// Polling this future will drive HTTP forward.
|
||||||
|
#[must_use = "futures do nothing unless polled"]
|
||||||
|
pub struct Connection<T, S, E = Exec>
|
||||||
|
where
|
||||||
|
S: HttpService<Body>,
|
||||||
|
{
|
||||||
|
pub(super) conn: Option<ProtoServer<T, S::ResBody, S, E>>,
|
||||||
|
fallback: Fallback<E>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pin_project! {
|
||||||
|
#[project = ProtoServerProj]
|
||||||
|
pub(super) enum ProtoServer<T, B, S, E = Exec>
|
||||||
|
where
|
||||||
|
S: HttpService<Body>,
|
||||||
|
B: HttpBody,
|
||||||
|
{
|
||||||
|
#[cfg(feature = "http1")]
|
||||||
|
H1 {
|
||||||
|
#[pin]
|
||||||
|
h1: proto::h1::Dispatcher<
|
||||||
|
proto::h1::dispatch::Server<S, Body>,
|
||||||
|
B,
|
||||||
|
T,
|
||||||
|
proto::ServerTransaction,
|
||||||
|
>,
|
||||||
|
_phantom: PhantomData<E>,
|
||||||
|
},
|
||||||
|
#[cfg(feature = "http2")]
|
||||||
|
H2 {
|
||||||
|
#[pin]
|
||||||
|
h2: proto::h2::Server<Rewind<T>, S, B, E>,
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "http1", feature = "http2"))]
|
#[cfg(all(feature = "http1", feature = "http2"))]
|
||||||
@@ -189,6 +196,10 @@ enum Fallback<E> {
|
|||||||
Http1Only,
|
Http1Only,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(all(feature = "http1", feature = "http2")))]
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct Fallback<E>(PhantomData<E>);
|
||||||
|
|
||||||
#[cfg(all(feature = "http1", feature = "http2"))]
|
#[cfg(all(feature = "http1", feature = "http2"))]
|
||||||
impl<E> Fallback<E> {
|
impl<E> Fallback<E> {
|
||||||
fn to_h2(&self) -> bool {
|
fn to_h2(&self) -> bool {
|
||||||
@@ -519,7 +530,10 @@ impl<E> Http<E> {
|
|||||||
conn.set_max_buf_size(max);
|
conn.set_max_buf_size(max);
|
||||||
}
|
}
|
||||||
let sd = proto::h1::dispatch::Server::new(service);
|
let sd = proto::h1::dispatch::Server::new(service);
|
||||||
ProtoServer::H1(proto::h1::Dispatcher::new(sd, conn), PhantomData)
|
ProtoServer::H1 {
|
||||||
|
h1: proto::h1::Dispatcher::new(sd, conn),
|
||||||
|
_phantom: PhantomData,
|
||||||
|
}
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -535,7 +549,7 @@ impl<E> Http<E> {
|
|||||||
let rewind_io = Rewind::new(io);
|
let rewind_io = Rewind::new(io);
|
||||||
let h2 =
|
let h2 =
|
||||||
proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone());
|
proto::h2::Server::new(rewind_io, service, &self.h2_builder, self.exec.clone());
|
||||||
ProtoServer::H2(h2)
|
ProtoServer::H2 { h2 }
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -590,14 +604,14 @@ where
|
|||||||
/// This should only be called while the `Connection` future is still
|
/// This should only be called while the `Connection` future is still
|
||||||
/// pending. If called after `Connection::poll` has resolved, this does
|
/// pending. If called after `Connection::poll` has resolved, this does
|
||||||
/// nothing.
|
/// nothing.
|
||||||
pub fn graceful_shutdown(self: Pin<&mut Self>) {
|
pub fn graceful_shutdown(mut self: Pin<&mut Self>) {
|
||||||
match self.project().conn {
|
match self.conn {
|
||||||
#[cfg(feature = "http1")]
|
#[cfg(feature = "http1")]
|
||||||
Some(ProtoServer::H1(ref mut h1, _)) => {
|
Some(ProtoServer::H1 { ref mut h1, .. }) => {
|
||||||
h1.disable_keep_alive();
|
h1.disable_keep_alive();
|
||||||
}
|
}
|
||||||
#[cfg(feature = "http2")]
|
#[cfg(feature = "http2")]
|
||||||
Some(ProtoServer::H2(ref mut h2)) => {
|
Some(ProtoServer::H2 { ref mut h2 }) => {
|
||||||
h2.graceful_shutdown();
|
h2.graceful_shutdown();
|
||||||
}
|
}
|
||||||
None => (),
|
None => (),
|
||||||
@@ -624,7 +638,7 @@ where
|
|||||||
pub fn try_into_parts(self) -> Option<Parts<I, S>> {
|
pub fn try_into_parts(self) -> Option<Parts<I, S>> {
|
||||||
match self.conn.unwrap() {
|
match self.conn.unwrap() {
|
||||||
#[cfg(feature = "http1")]
|
#[cfg(feature = "http1")]
|
||||||
ProtoServer::H1(h1, _) => {
|
ProtoServer::H1 { h1, .. } => {
|
||||||
let (io, read_buf, dispatch) = h1.into_inner();
|
let (io, read_buf, dispatch) = h1.into_inner();
|
||||||
Some(Parts {
|
Some(Parts {
|
||||||
io,
|
io,
|
||||||
@@ -634,7 +648,7 @@ where
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
#[cfg(feature = "http2")]
|
#[cfg(feature = "http2")]
|
||||||
ProtoServer::H2(_h2) => None,
|
ProtoServer::H2 { .. } => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -658,7 +672,7 @@ where
|
|||||||
loop {
|
loop {
|
||||||
match *self.conn.as_mut().unwrap() {
|
match *self.conn.as_mut().unwrap() {
|
||||||
#[cfg(feature = "http1")]
|
#[cfg(feature = "http1")]
|
||||||
ProtoServer::H1(ref mut h1, _) => match ready!(h1.poll_without_shutdown(cx)) {
|
ProtoServer::H1 { ref mut h1, .. } => match ready!(h1.poll_without_shutdown(cx)) {
|
||||||
Ok(()) => return Poll::Ready(Ok(())),
|
Ok(()) => return Poll::Ready(Ok(())),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
#[cfg(feature = "http2")]
|
#[cfg(feature = "http2")]
|
||||||
@@ -674,7 +688,7 @@ where
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
#[cfg(feature = "http2")]
|
#[cfg(feature = "http2")]
|
||||||
ProtoServer::H2(ref mut h2) => return Pin::new(h2).poll(cx).map_ok(|_| ()),
|
ProtoServer::H2 { ref mut h2 } => return Pin::new(h2).poll(cx).map_ok(|_| ()),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -700,8 +714,8 @@ where
|
|||||||
let conn = self.conn.take();
|
let conn = self.conn.take();
|
||||||
|
|
||||||
let (io, read_buf, dispatch) = match conn.unwrap() {
|
let (io, read_buf, dispatch) = match conn.unwrap() {
|
||||||
ProtoServer::H1(h1, _) => h1.into_inner(),
|
ProtoServer::H1 { h1, .. } => h1.into_inner(),
|
||||||
ProtoServer::H2(_h2) => {
|
ProtoServer::H2 { .. } => {
|
||||||
panic!("h2 cannot into_inner");
|
panic!("h2 cannot into_inner");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -714,7 +728,7 @@ where
|
|||||||
let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone());
|
let h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), builder, exec.clone());
|
||||||
|
|
||||||
debug_assert!(self.conn.is_none());
|
debug_assert!(self.conn.is_none());
|
||||||
self.conn = Some(ProtoServer::H2(h2));
|
self.conn = Some(ProtoServer::H2 { h2 });
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Enable this connection to support higher-level HTTP upgrades.
|
/// Enable this connection to support higher-level HTTP upgrades.
|
||||||
@@ -948,9 +962,9 @@ where
|
|||||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||||
match self.project() {
|
match self.project() {
|
||||||
#[cfg(feature = "http1")]
|
#[cfg(feature = "http1")]
|
||||||
ProtoServerProj::H1(s, _) => s.poll(cx),
|
ProtoServerProj::H1 { h1, .. } => h1.poll(cx),
|
||||||
#[cfg(feature = "http2")]
|
#[cfg(feature = "http2")]
|
||||||
ProtoServerProj::H2(s) => s.poll(cx),
|
ProtoServerProj::H2 { h2 } => h2.poll(cx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -964,7 +978,7 @@ pub(crate) mod spawn_all {
|
|||||||
use crate::common::exec::ConnStreamExec;
|
use crate::common::exec::ConnStreamExec;
|
||||||
use crate::common::{task, Future, Pin, Poll, Unpin};
|
use crate::common::{task, Future, Pin, Poll, Unpin};
|
||||||
use crate::service::HttpService;
|
use crate::service::HttpService;
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
// Used by `SpawnAll` to optionally watch a `Connection` future.
|
// Used by `SpawnAll` to optionally watch a `Connection` future.
|
||||||
//
|
//
|
||||||
@@ -1009,23 +1023,36 @@ pub(crate) mod spawn_all {
|
|||||||
// Users cannot import this type, nor the associated `NewSvcExec`. Instead,
|
// Users cannot import this type, nor the associated `NewSvcExec`. Instead,
|
||||||
// a blanket implementation for `Executor<impl Future>` is sufficient.
|
// a blanket implementation for `Executor<impl Future>` is sufficient.
|
||||||
|
|
||||||
#[pin_project]
|
pin_project! {
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
|
pub struct NewSvcTask<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
|
||||||
#[pin]
|
#[pin]
|
||||||
state: State<I, N, S, E, W>,
|
state: State<I, N, S, E, W>,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project(project = StateProj)]
|
pin_project! {
|
||||||
pub(super) enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
|
#[project = StateProj]
|
||||||
Connecting(#[pin] Connecting<I, N, E>, W),
|
pub(super) enum State<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> {
|
||||||
Connected(#[pin] W::Future),
|
Connecting {
|
||||||
|
#[pin]
|
||||||
|
connecting: Connecting<I, N, E>,
|
||||||
|
watcher: W,
|
||||||
|
},
|
||||||
|
Connected {
|
||||||
|
#[pin]
|
||||||
|
future: W::Future,
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
|
impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
|
||||||
pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self {
|
pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self {
|
||||||
NewSvcTask {
|
NewSvcTask {
|
||||||
state: State::Connecting(connecting, watcher),
|
state: State::Connecting {
|
||||||
|
connecting,
|
||||||
|
watcher,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1052,7 +1079,10 @@ pub(crate) mod spawn_all {
|
|||||||
loop {
|
loop {
|
||||||
let next = {
|
let next = {
|
||||||
match me.state.as_mut().project() {
|
match me.state.as_mut().project() {
|
||||||
StateProj::Connecting(connecting, watcher) => {
|
StateProj::Connecting {
|
||||||
|
connecting,
|
||||||
|
watcher,
|
||||||
|
} => {
|
||||||
let res = ready!(connecting.poll(cx));
|
let res = ready!(connecting.poll(cx));
|
||||||
let conn = match res {
|
let conn = match res {
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
@@ -1062,10 +1092,10 @@ pub(crate) mod spawn_all {
|
|||||||
return Poll::Ready(());
|
return Poll::Ready(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let connected = watcher.watch(conn.with_upgrades());
|
let future = watcher.watch(conn.with_upgrades());
|
||||||
State::Connected(connected)
|
State::Connected { future }
|
||||||
}
|
}
|
||||||
StateProj::Connected(future) => {
|
StateProj::Connected { future } => {
|
||||||
return future.poll(cx).map(|res| {
|
return future.poll(cx).map(|res| {
|
||||||
if let Err(err) = res {
|
if let Err(err) = res {
|
||||||
debug!("connection error: {}", err);
|
debug!("connection error: {}", err);
|
||||||
@@ -1133,7 +1163,7 @@ mod upgrades {
|
|||||||
#[cfg(feature = "http1")]
|
#[cfg(feature = "http1")]
|
||||||
Ok(proto::Dispatched::Upgrade(pending)) => {
|
Ok(proto::Dispatched::Upgrade(pending)) => {
|
||||||
match self.inner.conn.take() {
|
match self.inner.conn.take() {
|
||||||
Some(ProtoServer::H1(h1, _)) => {
|
Some(ProtoServer::H1 { h1, .. }) => {
|
||||||
let (io, buf, _) = h1.into_inner();
|
let (io, buf, _) = h1.into_inner();
|
||||||
pending.fulfill(Upgraded::new(io, buf));
|
pending.fulfill(Upgraded::new(io, buf));
|
||||||
return Poll::Ready(Ok(()));
|
return Poll::Ready(Ok(()));
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ use std::net::{SocketAddr, TcpListener as StdTcpListener};
|
|||||||
#[cfg(feature = "tcp")]
|
#[cfg(feature = "tcp")]
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
use super::accept::Accept;
|
use super::accept::Accept;
|
||||||
@@ -21,16 +21,17 @@ use super::shutdown::{Graceful, GracefulWatcher};
|
|||||||
#[cfg(feature = "tcp")]
|
#[cfg(feature = "tcp")]
|
||||||
use super::tcp::AddrIncoming;
|
use super::tcp::AddrIncoming;
|
||||||
|
|
||||||
/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
|
pin_project! {
|
||||||
///
|
/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
|
||||||
/// `Server` is a `Future` mapping a bound listener with a set of service
|
///
|
||||||
/// handlers. It is built using the [`Builder`](Builder), and the future
|
/// `Server` is a `Future` mapping a bound listener with a set of service
|
||||||
/// completes when the server has been shutdown. It should be run by an
|
/// handlers. It is built using the [`Builder`](Builder), and the future
|
||||||
/// `Executor`.
|
/// completes when the server has been shutdown. It should be run by an
|
||||||
#[pin_project]
|
/// `Executor`.
|
||||||
pub struct Server<I, S, E = Exec> {
|
pub struct Server<I, S, E = Exec> {
|
||||||
#[pin]
|
#[pin]
|
||||||
spawn_all: SpawnAll<I, S, E>,
|
spawn_all: SpawnAll<I, S, E>,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A builder for a [`Server`](Server).
|
/// A builder for a [`Server`](Server).
|
||||||
|
|||||||
@@ -1,33 +1,36 @@
|
|||||||
use std::error::Error as StdError;
|
use std::error::Error as StdError;
|
||||||
|
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
|
|
||||||
use super::accept::Accept;
|
use super::accept::Accept;
|
||||||
|
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
|
||||||
use crate::body::{Body, HttpBody};
|
use crate::body::{Body, HttpBody};
|
||||||
use crate::common::drain::{self, Draining, Signal, Watch, Watching};
|
use crate::common::drain::{self, Draining, Signal, Watch, Watching};
|
||||||
use crate::common::exec::{ConnStreamExec, NewSvcExec};
|
use crate::common::exec::{ConnStreamExec, NewSvcExec};
|
||||||
use crate::common::{task, Future, Pin, Poll, Unpin};
|
use crate::common::{task, Future, Pin, Poll, Unpin};
|
||||||
use crate::service::{HttpService, MakeServiceRef};
|
use crate::service::{HttpService, MakeServiceRef};
|
||||||
|
|
||||||
#[allow(missing_debug_implementations)]
|
pin_project! {
|
||||||
#[pin_project]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct Graceful<I, S, F, E> {
|
pub struct Graceful<I, S, F, E> {
|
||||||
#[pin]
|
#[pin]
|
||||||
state: State<I, S, F, E>,
|
state: State<I, S, F, E>,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project(project = StateProj)]
|
pin_project! {
|
||||||
pub(super) enum State<I, S, F, E> {
|
#[project = StateProj]
|
||||||
Running {
|
pub(super) enum State<I, S, F, E> {
|
||||||
drain: Option<(Signal, Watch)>,
|
Running {
|
||||||
#[pin]
|
drain: Option<(Signal, Watch)>,
|
||||||
spawn_all: SpawnAll<I, S, E>,
|
#[pin]
|
||||||
#[pin]
|
spawn_all: SpawnAll<I, S, E>,
|
||||||
signal: F,
|
#[pin]
|
||||||
},
|
signal: F,
|
||||||
Draining(Draining),
|
},
|
||||||
|
Draining { draining: Draining },
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I, S, F, E> Graceful<I, S, F, E> {
|
impl<I, S, F, E> Graceful<I, S, F, E> {
|
||||||
@@ -71,14 +74,16 @@ where
|
|||||||
Poll::Ready(()) => {
|
Poll::Ready(()) => {
|
||||||
debug!("signal received, starting graceful shutdown");
|
debug!("signal received, starting graceful shutdown");
|
||||||
let sig = drain.take().expect("drain channel").0;
|
let sig = drain.take().expect("drain channel").0;
|
||||||
State::Draining(sig.drain())
|
State::Draining {
|
||||||
|
draining: sig.drain(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
let watch = drain.as_ref().expect("drain channel").1.clone();
|
let watch = drain.as_ref().expect("drain channel").1.clone();
|
||||||
return spawn_all.poll_watch(cx, &GracefulWatcher(watch));
|
return spawn_all.poll_watch(cx, &GracefulWatcher(watch));
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
StateProj::Draining(ref mut draining) => {
|
StateProj::Draining { ref mut draining } => {
|
||||||
return Pin::new(draining).poll(cx).map(Ok);
|
return Pin::new(draining).poll(cx).map(Ok);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -229,13 +229,14 @@ mod addr_stream {
|
|||||||
|
|
||||||
use crate::common::{task, Pin, Poll};
|
use crate::common::{task, Pin, Poll};
|
||||||
|
|
||||||
/// A transport returned yieled by `AddrIncoming`.
|
pin_project_lite::pin_project! {
|
||||||
#[pin_project::pin_project]
|
/// A transport returned yieled by `AddrIncoming`.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AddrStream {
|
pub struct AddrStream {
|
||||||
#[pin]
|
#[pin]
|
||||||
inner: TcpStream,
|
inner: TcpStream,
|
||||||
pub(super) remote_addr: SocketAddr,
|
pub(super) remote_addr: SocketAddr,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AddrStream {
|
impl AddrStream {
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
// TODO: Eventually to be replaced with tower_util::Oneshot.
|
// TODO: Eventually to be replaced with tower_util::Oneshot.
|
||||||
|
|
||||||
use pin_project::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
use tower_service::Service;
|
use tower_service::Service;
|
||||||
|
|
||||||
use crate::common::{task, Future, Pin, Poll};
|
use crate::common::{task, Future, Pin, Poll};
|
||||||
@@ -10,25 +10,35 @@ where
|
|||||||
S: Service<Req>,
|
S: Service<Req>,
|
||||||
{
|
{
|
||||||
Oneshot {
|
Oneshot {
|
||||||
state: State::NotReady(svc, req),
|
state: State::NotReady { svc, req },
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// A `Future` consuming a `Service` and request, waiting until the `Service`
|
pin_project! {
|
||||||
// is ready, and then calling `Service::call` with the request, and
|
// A `Future` consuming a `Service` and request, waiting until the `Service`
|
||||||
// waiting for that `Future`.
|
// is ready, and then calling `Service::call` with the request, and
|
||||||
#[allow(missing_debug_implementations)]
|
// waiting for that `Future`.
|
||||||
#[pin_project]
|
#[allow(missing_debug_implementations)]
|
||||||
pub struct Oneshot<S: Service<Req>, Req> {
|
pub struct Oneshot<S: Service<Req>, Req> {
|
||||||
#[pin]
|
#[pin]
|
||||||
state: State<S, Req>,
|
state: State<S, Req>,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project(project = StateProj, project_replace = StateProjOwn)]
|
pin_project! {
|
||||||
enum State<S: Service<Req>, Req> {
|
#[project = StateProj]
|
||||||
NotReady(S, Req),
|
#[project_replace = StateProjOwn]
|
||||||
Called(#[pin] S::Future),
|
enum State<S: Service<Req>, Req> {
|
||||||
Tmp,
|
NotReady {
|
||||||
|
svc: S,
|
||||||
|
req: Req,
|
||||||
|
},
|
||||||
|
Called {
|
||||||
|
#[pin]
|
||||||
|
fut: S::Future,
|
||||||
|
},
|
||||||
|
Tmp,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, Req> Future for Oneshot<S, Req>
|
impl<S, Req> Future for Oneshot<S, Req>
|
||||||
@@ -42,19 +52,19 @@ where
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
match me.state.as_mut().project() {
|
match me.state.as_mut().project() {
|
||||||
StateProj::NotReady(ref mut svc, _) => {
|
StateProj::NotReady { ref mut svc, .. } => {
|
||||||
ready!(svc.poll_ready(cx))?;
|
ready!(svc.poll_ready(cx))?;
|
||||||
// fallthrough out of the match's borrow
|
// fallthrough out of the match's borrow
|
||||||
}
|
}
|
||||||
StateProj::Called(fut) => {
|
StateProj::Called { fut } => {
|
||||||
return fut.poll(cx);
|
return fut.poll(cx);
|
||||||
}
|
}
|
||||||
StateProj::Tmp => unreachable!(),
|
StateProj::Tmp => unreachable!(),
|
||||||
}
|
}
|
||||||
|
|
||||||
match me.state.as_mut().project_replace(State::Tmp) {
|
match me.state.as_mut().project_replace(State::Tmp) {
|
||||||
StateProjOwn::NotReady(mut svc, req) => {
|
StateProjOwn::NotReady { mut svc, req } => {
|
||||||
me.state.set(State::Called(svc.call(req)));
|
me.state.set(State::Called { fut: svc.call(req) });
|
||||||
}
|
}
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user