refactor(client): breakout connect_to future into separate function
This commit is contained in:
		| @@ -119,6 +119,8 @@ pub struct Parts<T> { | ||||
| // ========== internal client api | ||||
|  | ||||
| /// A `Future` for when `SendRequest::poll_ready()` is ready. | ||||
| // FIXME: allow() required due to `impl Trait` leaking types to this lint | ||||
| #[allow(missing_debug_implementations)] | ||||
| #[must_use = "futures do nothing unless polled"] | ||||
| pub(super) struct WhenReady<B> { | ||||
|     tx: Option<SendRequest<B>>, | ||||
|   | ||||
| @@ -90,10 +90,9 @@ use http::header::{Entry, HeaderValue, HOST}; | ||||
| use http::uri::Scheme; | ||||
|  | ||||
| use body::{Body, Payload}; | ||||
| use common::Exec; | ||||
| use common::lazy as hyper_lazy; | ||||
| use common::{Exec, lazy as hyper_lazy, Lazy}; | ||||
| use self::connect::{Connect, Destination}; | ||||
| use self::pool::{Pool, Poolable, Reservation}; | ||||
| use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation}; | ||||
|  | ||||
| #[cfg(feature = "runtime")] pub use self::connect::HttpConnector; | ||||
|  | ||||
| @@ -261,62 +260,10 @@ where C: Connect + Sync + 'static, | ||||
|  | ||||
|     //TODO: replace with `impl Future` when stable | ||||
|     fn send_request(&self, mut req: Request<B>, domain: &str) -> Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send> { | ||||
|         let url = req.uri().clone(); | ||||
|         let ver = self.ver; | ||||
|         let pool_key = (Arc::new(domain.to_string()), self.ver); | ||||
|         let checkout = self.pool.checkout(pool_key.clone()); | ||||
|         let connect = { | ||||
|             let executor = self.executor.clone(); | ||||
|             let pool = self.pool.clone(); | ||||
|             let h1_writev = self.h1_writev; | ||||
|             let h1_title_case_headers = self.h1_title_case_headers; | ||||
|             let connector = self.connector.clone(); | ||||
|             let dst = Destination { | ||||
|                 uri: url, | ||||
|             }; | ||||
|             hyper_lazy(move || { | ||||
|                 if let Some(connecting) = pool.connecting(&pool_key) { | ||||
|                     Either::A(connector.connect(dst) | ||||
|                         .map_err(::Error::new_connect) | ||||
|                         .and_then(move |(io, connected)| { | ||||
|                             conn::Builder::new() | ||||
|                                 .exec(executor.clone()) | ||||
|                                 .h1_writev(h1_writev) | ||||
|                                 .h1_title_case_headers(h1_title_case_headers) | ||||
|                                 .http2_only(pool_key.1 == Ver::Http2) | ||||
|                                 .handshake(io) | ||||
|                                 .and_then(move |(tx, conn)| { | ||||
|                                     let bg = executor.execute(conn.map_err(|e| { | ||||
|                                         debug!("client connection error: {}", e) | ||||
|                                     })); | ||||
|  | ||||
|                                     // This task is critical, so an execute error | ||||
|                                     // should be returned. | ||||
|                                     if let Err(err) = bg { | ||||
|                                         warn!("error spawning critical client task: {}", err); | ||||
|                                         return Either::A(future::err(err)); | ||||
|                                     } | ||||
|  | ||||
|                                     // Wait for 'conn' to ready up before we | ||||
|                                     // declare this tx as usable | ||||
|                                     Either::B(tx.when_ready()) | ||||
|                                 }) | ||||
|                                 .map(move |tx| { | ||||
|                                     pool.pooled(connecting, PoolClient { | ||||
|                                         is_proxied: connected.is_proxied, | ||||
|                                         tx: match ver { | ||||
|                                             Ver::Http1 => PoolTx::Http1(tx), | ||||
|                                             Ver::Http2 => PoolTx::Http2(tx.into_http2()), | ||||
|                                         }, | ||||
|                                     }) | ||||
|                                 }) | ||||
|                         })) | ||||
|                 } else { | ||||
|                     let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress")); | ||||
|                     Either::B(future::err(canceled)) | ||||
|                 } | ||||
|             }) | ||||
|         }; | ||||
|         let connect = self.connect_to(req.uri().clone(), pool_key); | ||||
|  | ||||
|         let executor = self.executor.clone(); | ||||
|         // The order of the `select` is depended on below... | ||||
| @@ -486,6 +433,69 @@ where C: Connect + Sync + 'static, | ||||
|  | ||||
|         Box::new(resp) | ||||
|     } | ||||
|  | ||||
|     fn connect_to(&self, uri: Uri, pool_key: PoolKey) | ||||
|         -> impl Lazy<Item=Pooled<PoolClient<B>>, Error=::Error> | ||||
|     { | ||||
|         let executor = self.executor.clone(); | ||||
|         let pool = self.pool.clone(); | ||||
|         let h1_writev = self.h1_writev; | ||||
|         let h1_title_case_headers = self.h1_title_case_headers; | ||||
|         let connector = self.connector.clone(); | ||||
|         let ver = pool_key.1; | ||||
|         let dst = Destination { | ||||
|             uri, | ||||
|         }; | ||||
|         hyper_lazy(move || { | ||||
|             // Try to take a "connecting lock". | ||||
|             // | ||||
|             // If the pool_key is for HTTP/2, and there is already a | ||||
|             // connection being estabalished, then this can't take a | ||||
|             // second lock. The "connect_to" future is Canceled. | ||||
|             let connecting = match pool.connecting(&pool_key) { | ||||
|                 Some(lock) => lock, | ||||
|                 None => { | ||||
|                     let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress")); | ||||
|                     return Either::B(future::err(canceled)); | ||||
|                 } | ||||
|             }; | ||||
|             Either::A(connector.connect(dst) | ||||
|                 .map_err(::Error::new_connect) | ||||
|                 .and_then(move |(io, connected)| { | ||||
|                     conn::Builder::new() | ||||
|                         .exec(executor.clone()) | ||||
|                         .h1_writev(h1_writev) | ||||
|                         .h1_title_case_headers(h1_title_case_headers) | ||||
|                         .http2_only(pool_key.1 == Ver::Http2) | ||||
|                         .handshake(io) | ||||
|                         .and_then(move |(tx, conn)| { | ||||
|                             let bg = executor.execute(conn.map_err(|e| { | ||||
|                                 debug!("client connection error: {}", e) | ||||
|                             })); | ||||
|  | ||||
|                             // This task is critical, so an execute error | ||||
|                             // should be returned. | ||||
|                             if let Err(err) = bg { | ||||
|                                 warn!("error spawning critical client task: {}", err); | ||||
|                                 return Either::A(future::err(err)); | ||||
|                             } | ||||
|  | ||||
|                             // Wait for 'conn' to ready up before we | ||||
|                             // declare this tx as usable | ||||
|                             Either::B(tx.when_ready()) | ||||
|                         }) | ||||
|                         .map(move |tx| { | ||||
|                             pool.pooled(connecting, PoolClient { | ||||
|                                 is_proxied: connected.is_proxied, | ||||
|                                 tx: match ver { | ||||
|                                     Ver::Http1 => PoolTx::Http1(tx), | ||||
|                                     Ver::Http2 => PoolTx::Http2(tx.into_http2()), | ||||
|                                 }, | ||||
|                             }) | ||||
|                         }) | ||||
|                 })) | ||||
|         }) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<C, B> Clone for Client<C, B> { | ||||
| @@ -588,6 +598,8 @@ where | ||||
|     } | ||||
| } | ||||
|  | ||||
| // FIXME: allow() required due to `impl Trait` leaking types to this lint | ||||
| #[allow(missing_debug_implementations)] | ||||
| struct PoolClient<B> { | ||||
|     is_proxied: bool, | ||||
|     tx: PoolTx<B>, | ||||
|   | ||||
| @@ -12,6 +12,8 @@ use tokio_timer::Interval; | ||||
| use common::Exec; | ||||
| use super::Ver; | ||||
|  | ||||
| // FIXME: allow() required due to `impl Trait` leaking types to this lint | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub(super) struct Pool<T> { | ||||
|     inner: Arc<PoolInner<T>>, | ||||
| } | ||||
| @@ -34,6 +36,8 @@ pub(super) trait Poolable: Send + Sized + 'static { | ||||
| /// | ||||
| /// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be | ||||
| /// used for multiple requests. | ||||
| // FIXME: allow() required due to `impl Trait` leaking types to this lint | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub(super) enum Reservation<T> { | ||||
|     /// This connection could be used multiple times, the first one will be | ||||
|     /// reinserted into the `idle` pool, and the second will be given to | ||||
| @@ -45,7 +49,7 @@ pub(super) enum Reservation<T> { | ||||
| } | ||||
|  | ||||
| /// Simple type alias in case the key type needs to be adjusted. | ||||
| type Key = (Arc<String>, Ver); | ||||
| pub(super) type Key = (Arc<String>, Ver); | ||||
|  | ||||
| struct PoolInner<T> { | ||||
|     connections: Mutex<Connections<T>>, | ||||
| @@ -85,8 +89,17 @@ struct Connections<T> { | ||||
| struct WeakOpt<T>(Option<Weak<T>>); | ||||
|  | ||||
| // Newtypes to act as keyword arguments for `Pool::new`... | ||||
|  | ||||
| // FIXME: allow() required due to `impl Trait` leaking types to this lint | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub(super) struct Enabled(pub(super) bool); | ||||
|  | ||||
| // FIXME: allow() required due to `impl Trait` leaking types to this lint | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub(super) struct IdleTimeout(pub(super) Option<Duration>); | ||||
|  | ||||
| // FIXME: allow() required due to `impl Trait` leaking types to this lint | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub(super) struct MaxIdlePerHost(pub(super) usize); | ||||
|  | ||||
| impl<T> Pool<T> { | ||||
| @@ -593,6 +606,8 @@ struct Idle<T> { | ||||
|     value: T, | ||||
| } | ||||
|  | ||||
| // FIXME: allow() required due to `impl Trait` leaking types to this lint | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub(super) struct Checkout<T> { | ||||
|     key: Key, | ||||
|     pool: Pool<T>, | ||||
| @@ -662,6 +677,8 @@ impl<T> Drop for Checkout<T> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| // FIXME: allow() required due to `impl Trait` leaking types to this lint | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub(super) struct Connecting<T: Poolable> { | ||||
|     key: Key, | ||||
|     pool: WeakOpt<PoolInner<T>>, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user