From d19d95af777ebcb723880d2961366363a1d7298c Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 25 Apr 2018 17:55:45 -0700 Subject: [PATCH] refactor(client): pass internal executor to h2 dispatcher --- src/client/conn.rs | 2 -- src/client/connect.rs | 7 ++-- src/client/mod.rs | 1 + src/client/pool.rs | 79 ++++++++++++++++++++++--------------------- 4 files changed, 45 insertions(+), 44 deletions(-) diff --git a/src/client/conn.rs b/src/client/conn.rs index c63af1a3..0a89f903 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -425,12 +425,10 @@ impl Builder { } } - /* pub(super) fn exec(&mut self, exec: Exec) -> &mut Builder { self.exec = exec; self } - */ pub(super) fn h1_writev(&mut self, enabled: bool) -> &mut Builder { self.h1_writev = enabled; diff --git a/src/client/connect.rs b/src/client/connect.rs index 83dd1b67..e371cca0 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -61,8 +61,8 @@ impl Destination { pub fn scheme(&self) -> &str { self.uri .scheme_part() - .expect("destination uri has scheme") - .as_str() + .map(|s| s.as_str()) + .unwrap_or("") } /// Get the hostname. @@ -70,7 +70,7 @@ impl Destination { pub fn host(&self) -> &str { self.uri .host() - .expect("destination uri has host") + .unwrap_or("") } /// Get the port, if specified. @@ -470,7 +470,6 @@ mod http { #[cfg(test)] mod tests { - #![allow(deprecated)] use std::io; use futures::Future; use super::{Connect, Destination, HttpConnector}; diff --git a/src/client/mod.rs b/src/client/mod.rs index 8f1ff6a6..c1d56a41 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -198,6 +198,7 @@ where C: Connect + Sync + 'static, .map_err(::Error::new_connect) .and_then(move |(io, connected)| { conn::Builder::new() + .exec(executor.clone()) .h1_writev(h1_writev) .h1_title_case_headers(h1_title_case_headers) .http2_only(pool_key.1 == Ver::Http2) diff --git a/src/client/pool.rs b/src/client/pool.rs index fc6ecd62..c9b39ffe 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -37,7 +37,6 @@ pub(super) enum Reservation { /// This connection could be used multiple times, the first one will be /// reinserted into the `idle` pool, and the second will be given to /// the `Checkout`. - #[allow(unused)] Shared(T, T), /// This connection requires unique access. It will be returned after /// use is complete. @@ -65,7 +64,7 @@ struct PoolInner { // this list is checked for any parked Checkouts, and tries to notify // them that the Conn could be used instead of waiting for a brand new // connection. - parked: HashMap>>, + waiters: HashMap>>, timeout: Option, // A oneshot channel is used to allow the interval to be notified when // the Pool completely drops. That way, the interval can cancel immediately. @@ -80,7 +79,7 @@ impl Pool { enabled: enabled, idle: HashMap::new(), idle_interval_ref: None, - parked: HashMap::new(), + waiters: HashMap::new(), timeout: timeout, })), } @@ -94,7 +93,7 @@ impl Pool { Checkout { key, pool: self.clone(), - parked: None, + waiter: None, } } @@ -217,10 +216,10 @@ impl Pool { } } - fn park(&mut self, key: Key, tx: oneshot::Sender) { + fn waiter(&mut self, key: Key, tx: oneshot::Sender) { trace!("checkout waiting for idle connection: {:?}", key); self.inner.lock().unwrap() - .parked.entry(key) + .waiters.entry(key) .or_insert(VecDeque::new()) .push_back(tx); } @@ -285,10 +284,10 @@ impl PoolInner { return; } trace!("put; add idle connection for {:?}", key); - let mut remove_parked = false; + let mut remove_waiters = false; let mut value = Some(value); - if let Some(parked) = self.parked.get_mut(&key) { - while let Some(tx) = parked.pop_front() { + if let Some(waiters) = self.waiters.get_mut(&key) { + while let Some(tx) = waiters.pop_front() { if !tx.is_canceled() { let reserved = value.take().expect("value already sent"); let reserved = match reserved.reserve() { @@ -314,10 +313,10 @@ impl PoolInner { trace!("put; removing canceled waiter for {:?}", key); } - remove_parked = parked.is_empty(); + remove_waiters = waiters.is_empty(); } - if remove_parked { - self.parked.remove(&key); + if remove_waiters { + self.waiters.remove(&key); } match value { @@ -345,7 +344,7 @@ impl PoolInner { // cancel any waiters. if there are any, it's because // this Connecting task didn't complete successfully. // those waiters would never receive a connection. - self.parked.remove(key); + self.waiters.remove(key); } } @@ -354,16 +353,16 @@ impl PoolInner { /// and possibly inserted into the pool that it is waiting for an idle /// connection. If a user ever dropped that future, we need to clean out /// those parked senders. - fn clean_parked(&mut self, key: &Key) { - let mut remove_parked = false; - if let Some(parked) = self.parked.get_mut(key) { - parked.retain(|tx| { + fn clean_waiters(&mut self, key: &Key) { + let mut remove_waiters = false; + if let Some(waiters) = self.waiters.get_mut(key) { + waiters.retain(|tx| { !tx.is_canceled() }); - remove_parked = parked.is_empty(); + remove_waiters = waiters.is_empty(); } - if remove_parked { - self.parked.remove(key); + if remove_waiters { + self.waiters.remove(key); } } } @@ -511,13 +510,13 @@ struct Idle { pub(super) struct Checkout { key: Key, pool: Pool, - parked: Option>, + waiter: Option>, } impl Checkout { - fn poll_parked(&mut self) -> Poll>, ::Error> { + fn poll_waiter(&mut self) -> Poll>, ::Error> { static CANCELED: &str = "pool checkout failed"; - if let Some(ref mut rx) = self.parked { + if let Some(mut rx) = self.waiter.take() { match rx.poll() { Ok(Async::Ready(value)) => { if !value.is_closed() { @@ -526,7 +525,10 @@ impl Checkout { Err(::Error::new_canceled(Some(CANCELED))) } }, - Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::NotReady) => { + self.waiter = Some(rx); + Ok(Async::NotReady) + }, Err(_canceled) => Err(::Error::new_canceled(Some(CANCELED))), } } else { @@ -534,12 +536,12 @@ impl Checkout { } } - fn park(&mut self) { - if self.parked.is_none() { + fn add_waiter(&mut self) { + if self.waiter.is_none() { let (tx, mut rx) = oneshot::channel(); let _ = rx.poll(); // park this task - self.pool.park(self.key.clone(), tx); - self.parked = Some(rx); + self.pool.waiter(self.key.clone(), tx); + self.waiter = Some(rx); } } } @@ -549,7 +551,7 @@ impl Future for Checkout { type Error = ::Error; fn poll(&mut self) -> Poll { - if let Some(pooled) = try_ready!(self.poll_parked()) { + if let Some(pooled) = try_ready!(self.poll_waiter()) { return Ok(Async::Ready(pooled)); } @@ -558,7 +560,7 @@ impl Future for Checkout { if let Some(pooled) = entry { Ok(Async::Ready(pooled)) } else { - self.park(); + self.add_waiter(); Ok(Async::NotReady) } } @@ -566,9 +568,10 @@ impl Future for Checkout { impl Drop for Checkout { fn drop(&mut self) { - self.parked.take(); - if let Ok(mut inner) = self.pool.inner.lock() { - inner.clean_parked(&self.key); + if self.waiter.take().is_some() { + if let Ok(mut inner) = self.pool.inner.lock() { + inner.clean_waiters(&self.key); + } } } } @@ -782,7 +785,7 @@ mod tests { } #[test] - fn test_pool_checkout_drop_cleans_up_parked() { + fn test_pool_checkout_drop_cleans_up_waiters() { future::lazy(|| { let pool = Pool::>::new(true, Some(Duration::from_secs(10))); let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); @@ -792,16 +795,16 @@ mod tests { // first poll needed to get into Pool's parked checkout1.poll().unwrap(); - assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 1); + assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 1); checkout2.poll().unwrap(); - assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 2); + assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 2); // on drop, clean up Pool drop(checkout1); - assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 1); + assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 1); drop(checkout2); - assert!(pool.inner.lock().unwrap().parked.get(&key).is_none()); + assert!(pool.inner.lock().unwrap().waiters.get(&key).is_none()); ::futures::future::ok::<(), ()>(()) }).wait().unwrap();