From 5e159a58b4d95c122886befe6e1a9dcd45baae47 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 27 Sep 2018 19:06:16 -0700 Subject: [PATCH] refactor(client): breakout connect_to future into separate function --- src/client/conn.rs | 2 + src/client/mod.rs | 124 +++++++++++++++++++++++++-------------------- src/client/pool.rs | 19 ++++++- src/common/lazy.rs | 13 +++-- src/common/mod.rs | 2 +- 5 files changed, 99 insertions(+), 61 deletions(-) diff --git a/src/client/conn.rs b/src/client/conn.rs index 9aea409d..63ccab40 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -119,6 +119,8 @@ pub struct Parts { // ========== internal client api /// A `Future` for when `SendRequest::poll_ready()` is ready. +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] #[must_use = "futures do nothing unless polled"] pub(super) struct WhenReady { tx: Option>, diff --git a/src/client/mod.rs b/src/client/mod.rs index b8c56ed0..ef790bc3 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -90,10 +90,9 @@ use http::header::{Entry, HeaderValue, HOST}; use http::uri::Scheme; use body::{Body, Payload}; -use common::Exec; -use common::lazy as hyper_lazy; +use common::{Exec, lazy as hyper_lazy, Lazy}; use self::connect::{Connect, Destination}; -use self::pool::{Pool, Poolable, Reservation}; +use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation}; #[cfg(feature = "runtime")] pub use self::connect::HttpConnector; @@ -261,62 +260,10 @@ where C: Connect + Sync + 'static, //TODO: replace with `impl Future` when stable fn send_request(&self, mut req: Request, domain: &str) -> Box, Error=ClientError> + Send> { - let url = req.uri().clone(); let ver = self.ver; let pool_key = (Arc::new(domain.to_string()), self.ver); let checkout = self.pool.checkout(pool_key.clone()); - let connect = { - let executor = self.executor.clone(); - let pool = self.pool.clone(); - let h1_writev = self.h1_writev; - let h1_title_case_headers = self.h1_title_case_headers; - let connector = self.connector.clone(); - let dst = Destination { - uri: url, - }; - hyper_lazy(move || { - if let Some(connecting) = pool.connecting(&pool_key) { - Either::A(connector.connect(dst) - .map_err(::Error::new_connect) - .and_then(move |(io, connected)| { - conn::Builder::new() - .exec(executor.clone()) - .h1_writev(h1_writev) - .h1_title_case_headers(h1_title_case_headers) - .http2_only(pool_key.1 == Ver::Http2) - .handshake(io) - .and_then(move |(tx, conn)| { - let bg = executor.execute(conn.map_err(|e| { - debug!("client connection error: {}", e) - })); - - // This task is critical, so an execute error - // should be returned. - if let Err(err) = bg { - warn!("error spawning critical client task: {}", err); - return Either::A(future::err(err)); - } - - // Wait for 'conn' to ready up before we - // declare this tx as usable - Either::B(tx.when_ready()) - }) - .map(move |tx| { - pool.pooled(connecting, PoolClient { - is_proxied: connected.is_proxied, - tx: match ver { - Ver::Http1 => PoolTx::Http1(tx), - Ver::Http2 => PoolTx::Http2(tx.into_http2()), - }, - }) - }) - })) - } else { - let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress")); - Either::B(future::err(canceled)) - } - }) - }; + let connect = self.connect_to(req.uri().clone(), pool_key); let executor = self.executor.clone(); // The order of the `select` is depended on below... @@ -486,6 +433,69 @@ where C: Connect + Sync + 'static, Box::new(resp) } + + fn connect_to(&self, uri: Uri, pool_key: PoolKey) + -> impl Lazy>, Error=::Error> + { + let executor = self.executor.clone(); + let pool = self.pool.clone(); + let h1_writev = self.h1_writev; + let h1_title_case_headers = self.h1_title_case_headers; + let connector = self.connector.clone(); + let ver = pool_key.1; + let dst = Destination { + uri, + }; + hyper_lazy(move || { + // Try to take a "connecting lock". + // + // If the pool_key is for HTTP/2, and there is already a + // connection being estabalished, then this can't take a + // second lock. The "connect_to" future is Canceled. + let connecting = match pool.connecting(&pool_key) { + Some(lock) => lock, + None => { + let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress")); + return Either::B(future::err(canceled)); + } + }; + Either::A(connector.connect(dst) + .map_err(::Error::new_connect) + .and_then(move |(io, connected)| { + conn::Builder::new() + .exec(executor.clone()) + .h1_writev(h1_writev) + .h1_title_case_headers(h1_title_case_headers) + .http2_only(pool_key.1 == Ver::Http2) + .handshake(io) + .and_then(move |(tx, conn)| { + let bg = executor.execute(conn.map_err(|e| { + debug!("client connection error: {}", e) + })); + + // This task is critical, so an execute error + // should be returned. + if let Err(err) = bg { + warn!("error spawning critical client task: {}", err); + return Either::A(future::err(err)); + } + + // Wait for 'conn' to ready up before we + // declare this tx as usable + Either::B(tx.when_ready()) + }) + .map(move |tx| { + pool.pooled(connecting, PoolClient { + is_proxied: connected.is_proxied, + tx: match ver { + Ver::Http1 => PoolTx::Http1(tx), + Ver::Http2 => PoolTx::Http2(tx.into_http2()), + }, + }) + }) + })) + }) + } } impl Clone for Client { @@ -588,6 +598,8 @@ where } } +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] struct PoolClient { is_proxied: bool, tx: PoolTx, diff --git a/src/client/pool.rs b/src/client/pool.rs index 1cccc177..c220cbd4 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -12,6 +12,8 @@ use tokio_timer::Interval; use common::Exec; use super::Ver; +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] pub(super) struct Pool { inner: Arc>, } @@ -34,6 +36,8 @@ pub(super) trait Poolable: Send + Sized + 'static { /// /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be /// used for multiple requests. +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] pub(super) enum Reservation { /// This connection could be used multiple times, the first one will be /// reinserted into the `idle` pool, and the second will be given to @@ -45,7 +49,7 @@ pub(super) enum Reservation { } /// Simple type alias in case the key type needs to be adjusted. -type Key = (Arc, Ver); +pub(super) type Key = (Arc, Ver); struct PoolInner { connections: Mutex>, @@ -85,8 +89,17 @@ struct Connections { struct WeakOpt(Option>); // Newtypes to act as keyword arguments for `Pool::new`... + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] pub(super) struct Enabled(pub(super) bool); + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] pub(super) struct IdleTimeout(pub(super) Option); + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] pub(super) struct MaxIdlePerHost(pub(super) usize); impl Pool { @@ -593,6 +606,8 @@ struct Idle { value: T, } +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] pub(super) struct Checkout { key: Key, pool: Pool, @@ -662,6 +677,8 @@ impl Drop for Checkout { } } +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] pub(super) struct Connecting { key: Key, pool: WeakOpt>, diff --git a/src/common/lazy.rs b/src/common/lazy.rs index 10dfe179..9d0f238b 100644 --- a/src/common/lazy.rs +++ b/src/common/lazy.rs @@ -2,6 +2,10 @@ use std::mem; use futures::{Future, IntoFuture, Poll}; +pub(crate) trait Started: Future { + fn started(&self) -> bool; +} + pub(crate) fn lazy(func: F) -> Lazy where F: FnOnce() -> R, @@ -12,7 +16,9 @@ where } } -pub struct Lazy { +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] +pub(crate) struct Lazy { inner: Inner } @@ -22,12 +28,12 @@ enum Inner { Empty, } -impl Lazy +impl Started for Lazy where F: FnOnce() -> R, R: IntoFuture, { - pub fn started(&self) -> bool { + fn started(&self) -> bool { match self.inner { Inner::Init(_) => false, Inner::Fut(_) | @@ -61,3 +67,4 @@ where } } } + diff --git a/src/common/mod.rs b/src/common/mod.rs index c3411569..c8ced418 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -9,5 +9,5 @@ mod never; pub(crate) use self::buf::StaticBuf; pub(crate) use self::exec::Exec; -pub(crate) use self::lazy::lazy; +pub(crate) use self::lazy::{lazy, Started as Lazy}; pub use self::never::Never;