refactor(client): Use async/await more (#2437)

* refactor: Use async/await in client.rs

* refactor: Simplify client.rs a bit more

* refactor: Allow !Unpin in Lazy

* Remove some impl Future

* Remove some combinator use
This commit is contained in:
Markus Westerlind
2021-02-18 19:35:43 +01:00
committed by GitHub
parent 42587059e6
commit f01de8e503
4 changed files with 174 additions and 178 deletions

View File

@@ -162,7 +162,7 @@ where
Version::HTTP_10 => { Version::HTTP_10 => {
if is_http_connect { if is_http_connect {
warn!("CONNECT is not allowed for HTTP/1.0"); warn!("CONNECT is not allowed for HTTP/1.0");
return ResponseFuture::new(Box::new(future::err( return ResponseFuture::new(Box::pin(future::err(
crate::Error::new_user_unsupported_request_method(), crate::Error::new_user_unsupported_request_method(),
))); )));
} }
@@ -179,35 +179,33 @@ where
let pool_key = match extract_domain(req.uri_mut(), is_http_connect) { let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
Ok(s) => s, Ok(s) => s,
Err(err) => { Err(err) => {
return ResponseFuture::new(Box::new(future::err(err))); return ResponseFuture::new(Box::pin(future::err(err)));
} }
}; };
ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key))) ResponseFuture::new(Box::pin(self.clone().retryably_send_request(req, pool_key)))
} }
fn retryably_send_request( async fn retryably_send_request(
&self, self,
req: Request<B>, mut req: Request<B>,
pool_key: PoolKey, pool_key: PoolKey,
) -> impl Future<Output = crate::Result<Response<Body>>> { ) -> crate::Result<Response<Body>> {
let client = self.clone();
let uri = req.uri().clone(); let uri = req.uri().clone();
let mut send_fut = client.send_request(req, pool_key.clone()); loop {
future::poll_fn(move |cx| loop { req = match self.send_request(req, pool_key.clone()).await {
match ready!(Pin::new(&mut send_fut).poll(cx)) { Ok(resp) => return Ok(resp),
Ok(resp) => return Poll::Ready(Ok(resp)), Err(ClientError::Normal(err)) => return Err(err),
Err(ClientError::Normal(err)) => return Poll::Ready(Err(err)),
Err(ClientError::Canceled { Err(ClientError::Canceled {
connection_reused, connection_reused,
mut req, mut req,
reason, reason,
}) => { }) => {
if !client.config.retry_canceled_requests || !connection_reused { if !self.config.retry_canceled_requests || !connection_reused {
// if client disabled, don't retry // if client disabled, don't retry
// a fresh connection means we definitely can't retry // a fresh connection means we definitely can't retry
return Poll::Ready(Err(reason)); return Err(reason);
} }
trace!( trace!(
@@ -215,115 +213,112 @@ where
reason reason
); );
*req.uri_mut() = uri.clone(); *req.uri_mut() = uri.clone();
send_fut = client.send_request(req, pool_key.clone()); req
} }
} }
}) }
} }
fn send_request( async fn send_request(
&self, &self,
mut req: Request<B>, mut req: Request<B>,
pool_key: PoolKey, pool_key: PoolKey,
) -> impl Future<Output = Result<Response<Body>, ClientError<B>>> + Unpin { ) -> Result<Response<Body>, ClientError<B>> {
let conn = self.connection_for(pool_key); let mut pooled = self.connection_for(pool_key).await?;
let set_host = self.config.set_host; if pooled.is_http1() {
let executor = self.conn_builder.exec.clone(); if self.config.set_host {
conn.and_then(move |mut pooled| { let uri = req.uri().clone();
if pooled.is_http1() { req.headers_mut().entry(HOST).or_insert_with(|| {
if set_host { let hostname = uri.host().expect("authority implies host");
let uri = req.uri().clone(); if let Some(port) = uri.port() {
req.headers_mut().entry(HOST).or_insert_with(|| { let s = format!("{}:{}", hostname, port);
let hostname = uri.host().expect("authority implies host"); HeaderValue::from_str(&s)
if let Some(port) = uri.port() { } else {
let s = format!("{}:{}", hostname, port); HeaderValue::from_str(hostname)
HeaderValue::from_str(&s) }
} else { .expect("uri host is valid header value")
HeaderValue::from_str(hostname) });
}
.expect("uri host is valid header value")
});
}
// CONNECT always sends authority-form, so check it first...
if req.method() == Method::CONNECT {
authority_form(req.uri_mut());
} else if pooled.conn_info.is_proxied {
absolute_form(req.uri_mut());
} else {
origin_form(req.uri_mut());
};
} else if req.method() == Method::CONNECT {
debug!("client does not support CONNECT requests over HTTP2");
return Either::Left(future::err(ClientError::Normal(
crate::Error::new_user_unsupported_request_method(),
)));
} }
let fut = pooled // CONNECT always sends authority-form, so check it first...
.send_request_retryable(req) if req.method() == Method::CONNECT {
.map_err(ClientError::map_with_reused(pooled.is_reused())); authority_form(req.uri_mut());
} else if pooled.conn_info.is_proxied {
absolute_form(req.uri_mut());
} else {
origin_form(req.uri_mut());
};
} else if req.method() == Method::CONNECT {
debug!("client does not support CONNECT requests over HTTP2");
return Err(ClientError::Normal(
crate::Error::new_user_unsupported_request_method(),
));
}
// If the Connector included 'extra' info, add to Response... let fut = pooled
let extra_info = pooled.conn_info.extra.clone(); .send_request_retryable(req)
let fut = fut.map_ok(move |mut res| { .map_err(ClientError::map_with_reused(pooled.is_reused()));
if let Some(extra) = extra_info {
extra.set(res.extensions_mut()); // If the Connector included 'extra' info, add to Response...
} let extra_info = pooled.conn_info.extra.clone();
res let fut = fut.map_ok(move |mut res| {
if let Some(extra) = extra_info {
extra.set(res.extensions_mut());
}
res
});
// As of futures@0.1.21, there is a race condition in the mpsc
// channel, such that sending when the receiver is closing can
// result in the message being stuck inside the queue. It won't
// ever notify until the Sender side is dropped.
//
// To counteract this, we must check if our senders 'want' channel
// has been closed after having tried to send. If so, error out...
if pooled.is_closed() {
return fut.await;
}
let mut res = fut.await?;
// If pooled is HTTP/2, we can toss this reference immediately.
//
// when pooled is dropped, it will try to insert back into the
// pool. To delay that, spawn a future that completes once the
// sender is ready again.
//
// This *should* only be once the related `Connection` has polled
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
drop(pooled);
} else if !res.body().is_end_stream() {
let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx);
let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
// At this point, `pooled` is dropped, and had a chance
// to insert into the pool (if conn was idle)
drop(delayed_tx);
}); });
// As of futures@0.1.21, there is a race condition in the mpsc self.conn_builder.exec.execute(on_idle);
// channel, such that sending when the receiver is closing can } else {
// result in the message being stuck inside the queue. It won't // There's no body to delay, but the connection isn't
// ever notify until the Sender side is dropped. // ready yet. Only re-insert when it's ready
// let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
// To counteract this, we must check if our senders 'want' channel
// has been closed after having tried to send. If so, error out...
if pooled.is_closed() {
return Either::Right(Either::Left(fut));
}
Either::Right(Either::Right(fut.map_ok(move |mut res| { self.conn_builder.exec.execute(on_idle);
// If pooled is HTTP/2, we can toss this reference immediately. }
//
// when pooled is dropped, it will try to insert back into the
// pool. To delay that, spawn a future that completes once the
// sender is ready again.
//
// This *should* only be once the related `Connection` has polled
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
drop(pooled);
} else if !res.body().is_end_stream() {
let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx);
let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
// At this point, `pooled` is dropped, and had a chance
// to insert into the pool (if conn was idle)
drop(delayed_tx);
});
executor.execute(on_idle); Ok(res)
} else {
// There's no body to delay, but the connection isn't
// ready yet. Only re-insert when it's ready
let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
executor.execute(on_idle);
}
res
})))
})
} }
fn connection_for( async fn connection_for(
&self, &self,
pool_key: PoolKey, pool_key: PoolKey,
) -> impl Future<Output = Result<Pooled<PoolClient<B>>, ClientError<B>>> { ) -> Result<Pooled<PoolClient<B>>, ClientError<B>> {
// This actually races 2 different futures to try to get a ready // This actually races 2 different futures to try to get a ready
// connection the fastest, and to reduce connection churn. // connection the fastest, and to reduce connection churn.
// //
@@ -340,9 +335,9 @@ where
let checkout = self.pool.checkout(pool_key.clone()); let checkout = self.pool.checkout(pool_key.clone());
let connect = self.connect_to(pool_key); let connect = self.connect_to(pool_key);
let executor = self.conn_builder.exec.clone();
// The order of the `select` is depended on below... // The order of the `select` is depended on below...
future::select(checkout, connect).then(move |either| match either {
match future::select(checkout, connect).await {
// Checkout won, connect future may have been started or not. // Checkout won, connect future may have been started or not.
// //
// If it has, let it finish and insert back into the pool, // If it has, let it finish and insert back into the pool,
@@ -366,12 +361,12 @@ where
}); });
// An execute error here isn't important, we're just trying // An execute error here isn't important, we're just trying
// to prevent a waste of a socket... // to prevent a waste of a socket...
executor.execute(bg); self.conn_builder.exec.execute(bg);
} }
Either::Left(future::ok(checked_out)) Ok(checked_out)
} }
// Connect won, checkout can just be dropped. // Connect won, checkout can just be dropped.
Either::Right((Ok(connected), _checkout)) => Either::Left(future::ok(connected)), Either::Right((Ok(connected), _checkout)) => Ok(connected),
// Either checkout or connect could get canceled: // Either checkout or connect could get canceled:
// //
// 1. Connect is canceled if this is HTTP/2 and there is // 1. Connect is canceled if this is HTTP/2 and there is
@@ -380,21 +375,21 @@ where
// idle connection reliably. // idle connection reliably.
// //
// In both cases, we should just wait for the other future. // In both cases, we should just wait for the other future.
Either::Left((Err(err), connecting)) => Either::Right(Either::Left({ Either::Left((Err(err), connecting)) => {
if err.is_canceled() { if err.is_canceled() {
Either::Left(connecting.map_err(ClientError::Normal)) connecting.await.map_err(ClientError::Normal)
} else { } else {
Either::Right(future::err(ClientError::Normal(err))) Err(ClientError::Normal(err))
} }
})), }
Either::Right((Err(err), checkout)) => Either::Right(Either::Right({ Either::Right((Err(err), checkout)) => {
if err.is_canceled() { if err.is_canceled() {
Either::Left(checkout.map_err(ClientError::Normal)) checkout.await.map_err(ClientError::Normal)
} else { } else {
Either::Right(future::err(ClientError::Normal(err))) Err(ClientError::Normal(err))
} }
})), }
}) }
} }
fn connect_to( fn connect_to(
@@ -459,44 +454,40 @@ where
conn_builder.http2_only(is_h2); conn_builder.http2_only(is_h2);
} }
Either::Left(Box::pin( Either::Left(Box::pin(async move {
conn_builder let (tx, conn) = conn_builder.handshake(io).await?;
.handshake(io)
.and_then(move |(tx, conn)| {
trace!(
"handshake complete, spawning background dispatcher task"
);
executor.execute(
conn.map_err(|e| debug!("client connection error: {}", e))
.map(|_| ()),
);
// Wait for 'conn' to ready up before we trace!("handshake complete, spawning background dispatcher task");
// declare this tx as usable executor.execute(
tx.when_ready() conn.map_err(|e| debug!("client connection error: {}", e))
}) .map(|_| ()),
.map_ok(move |tx| { );
let tx = {
#[cfg(feature = "http2")] // Wait for 'conn' to ready up before we
{ // declare this tx as usable
if is_h2 { let tx = tx.when_ready().await?;
PoolTx::Http2(tx.into_http2())
} else { let tx = {
PoolTx::Http1(tx) #[cfg(feature = "http2")]
} {
} if is_h2 {
#[cfg(not(feature = "http2"))] PoolTx::Http2(tx.into_http2())
} else {
PoolTx::Http1(tx) PoolTx::Http1(tx)
}; }
pool.pooled( }
connecting, #[cfg(not(feature = "http2"))]
PoolClient { PoolTx::Http1(tx)
conn_info: connected, };
tx,
}, Ok(pool.pooled(
) connecting,
}), PoolClient {
)) conn_info: connected,
tx,
},
))
}))
}), }),
) )
}) })
@@ -563,13 +554,13 @@ impl<C, B> fmt::Debug for Client<C, B> {
// ===== impl ResponseFuture ===== // ===== impl ResponseFuture =====
impl ResponseFuture { impl ResponseFuture {
fn new(fut: Box<dyn Future<Output = crate::Result<Response<Body>>> + Send>) -> Self { fn new(fut: Pin<Box<dyn Future<Output = crate::Result<Response<Body>>> + Send>>) -> Self {
Self { inner: fut.into() } Self { inner: fut }
} }
fn error_version(ver: Version) -> Self { fn error_version(ver: Version) -> Self {
warn!("Request has unsupported version \"{:?}\"", ver); warn!("Request has unsupported version \"{:?}\"", ver);
ResponseFuture::new(Box::new(future::err( ResponseFuture::new(Box::pin(future::err(
crate::Error::new_user_unsupported_version(), crate::Error::new_user_unsupported_version(),
))) )))
} }

View File

@@ -192,12 +192,13 @@ impl<B> SendRequest<B> {
self.dispatch.poll_ready(cx) self.dispatch.poll_ready(cx)
} }
pub(super) fn when_ready(self) -> impl Future<Output = crate::Result<Self>> { pub(super) async fn when_ready(self) -> crate::Result<Self> {
let mut me = Some(self); let mut me = Some(self);
future::poll_fn(move |cx| { future::poll_fn(move |cx| {
ready!(me.as_mut().unwrap().poll_ready(cx))?; ready!(me.as_mut().unwrap().poll_ready(cx))?;
Poll::Ready(Ok(me.take().unwrap())) Poll::Ready(Ok(me.take().unwrap()))
}) })
.await
} }
pub(super) fn is_ready(&self) -> bool { pub(super) fn is_ready(&self) -> bool {

View File

@@ -4,9 +4,9 @@ use std::future::Future;
use futures_util::FutureExt; use futures_util::FutureExt;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use crate::common::{task, Poll};
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
use crate::common::Pin; use crate::common::Pin;
use crate::common::{task, Poll};
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>; pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>; pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;
@@ -230,10 +230,10 @@ impl<T, U> Callback<T, U> {
} }
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
pub(crate) fn send_when( pub(crate) async fn send_when(
self, self,
mut when: impl Future<Output = Result<U, (crate::Error, Option<T>)>> + Unpin, mut when: impl Future<Output = Result<U, (crate::Error, Option<T>)>> + Unpin,
) -> impl Future<Output = ()> { ) {
use futures_util::future; use futures_util::future;
let mut cb = Some(self); let mut cb = Some(self);
@@ -257,6 +257,7 @@ impl<T, U> Callback<T, U> {
} }
} }
}) })
.await
} }
} }

View File

@@ -1,4 +1,4 @@
use std::mem; use pin_project::pin_project;
use super::{task, Future, Pin, Poll}; use super::{task, Future, Pin, Poll};
@@ -18,20 +18,23 @@ where
// FIXME: allow() required due to `impl Trait` leaking types to this lint // FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
#[pin_project]
pub(crate) struct Lazy<F, R> { pub(crate) struct Lazy<F, R> {
#[pin]
inner: Inner<F, R>, inner: Inner<F, R>,
} }
#[pin_project(project = InnerProj, project_replace = InnerProjReplace)]
enum Inner<F, R> { enum Inner<F, R> {
Init(F), Init(F),
Fut(R), Fut(#[pin] R),
Empty, Empty,
} }
impl<F, R> Started for Lazy<F, R> impl<F, R> Started for Lazy<F, R>
where where
F: FnOnce() -> R, F: FnOnce() -> R,
R: Future + Unpin, R: Future,
{ {
fn started(&self) -> bool { fn started(&self) -> bool {
match self.inner { match self.inner {
@@ -44,26 +47,26 @@ where
impl<F, R> Future for Lazy<F, R> impl<F, R> Future for Lazy<F, R>
where where
F: FnOnce() -> R, F: FnOnce() -> R,
R: Future + Unpin, R: Future,
{ {
type Output = R::Output; type Output = R::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
if let Inner::Fut(ref mut f) = self.inner { let mut this = self.project();
return Pin::new(f).poll(cx);
if let InnerProj::Fut(f) = this.inner.as_mut().project() {
return f.poll(cx);
} }
match mem::replace(&mut self.inner, Inner::Empty) { match this.inner.as_mut().project_replace(Inner::Empty) {
Inner::Init(func) => { InnerProjReplace::Init(func) => {
let mut fut = func(); this.inner.set(Inner::Fut(func()));
let ret = Pin::new(&mut fut).poll(cx); if let InnerProj::Fut(f) = this.inner.project() {
self.inner = Inner::Fut(fut); return f.poll(cx);
ret }
unreachable!()
} }
_ => unreachable!("lazy state wrong"), _ => unreachable!("lazy state wrong"),
} }
} }
} }
// The closure `F` is never pinned
impl<F, R: Unpin> Unpin for Lazy<F, R> {}