refactor(client): pass internal executor to h2 dispatcher

This commit is contained in:
Sean McArthur
2018-04-25 17:55:45 -07:00
parent a4f4553487
commit d19d95af77
4 changed files with 45 additions and 44 deletions

View File

@@ -425,12 +425,10 @@ impl Builder {
} }
} }
/*
pub(super) fn exec(&mut self, exec: Exec) -> &mut Builder { pub(super) fn exec(&mut self, exec: Exec) -> &mut Builder {
self.exec = exec; self.exec = exec;
self self
} }
*/
pub(super) fn h1_writev(&mut self, enabled: bool) -> &mut Builder { pub(super) fn h1_writev(&mut self, enabled: bool) -> &mut Builder {
self.h1_writev = enabled; self.h1_writev = enabled;

View File

@@ -61,8 +61,8 @@ impl Destination {
pub fn scheme(&self) -> &str { pub fn scheme(&self) -> &str {
self.uri self.uri
.scheme_part() .scheme_part()
.expect("destination uri has scheme") .map(|s| s.as_str())
.as_str() .unwrap_or("")
} }
/// Get the hostname. /// Get the hostname.
@@ -70,7 +70,7 @@ impl Destination {
pub fn host(&self) -> &str { pub fn host(&self) -> &str {
self.uri self.uri
.host() .host()
.expect("destination uri has host") .unwrap_or("")
} }
/// Get the port, if specified. /// Get the port, if specified.
@@ -470,7 +470,6 @@ mod http {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#![allow(deprecated)]
use std::io; use std::io;
use futures::Future; use futures::Future;
use super::{Connect, Destination, HttpConnector}; use super::{Connect, Destination, HttpConnector};

View File

@@ -198,6 +198,7 @@ where C: Connect + Sync + 'static,
.map_err(::Error::new_connect) .map_err(::Error::new_connect)
.and_then(move |(io, connected)| { .and_then(move |(io, connected)| {
conn::Builder::new() conn::Builder::new()
.exec(executor.clone())
.h1_writev(h1_writev) .h1_writev(h1_writev)
.h1_title_case_headers(h1_title_case_headers) .h1_title_case_headers(h1_title_case_headers)
.http2_only(pool_key.1 == Ver::Http2) .http2_only(pool_key.1 == Ver::Http2)

View File

@@ -37,7 +37,6 @@ pub(super) enum Reservation<T> {
/// This connection could be used multiple times, the first one will be /// This connection could be used multiple times, the first one will be
/// reinserted into the `idle` pool, and the second will be given to /// reinserted into the `idle` pool, and the second will be given to
/// the `Checkout`. /// the `Checkout`.
#[allow(unused)]
Shared(T, T), Shared(T, T),
/// This connection requires unique access. It will be returned after /// This connection requires unique access. It will be returned after
/// use is complete. /// use is complete.
@@ -65,7 +64,7 @@ struct PoolInner<T> {
// this list is checked for any parked Checkouts, and tries to notify // this list is checked for any parked Checkouts, and tries to notify
// them that the Conn could be used instead of waiting for a brand new // them that the Conn could be used instead of waiting for a brand new
// connection. // connection.
parked: HashMap<Key, VecDeque<oneshot::Sender<T>>>, waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
timeout: Option<Duration>, timeout: Option<Duration>,
// A oneshot channel is used to allow the interval to be notified when // A oneshot channel is used to allow the interval to be notified when
// the Pool completely drops. That way, the interval can cancel immediately. // the Pool completely drops. That way, the interval can cancel immediately.
@@ -80,7 +79,7 @@ impl<T> Pool<T> {
enabled: enabled, enabled: enabled,
idle: HashMap::new(), idle: HashMap::new(),
idle_interval_ref: None, idle_interval_ref: None,
parked: HashMap::new(), waiters: HashMap::new(),
timeout: timeout, timeout: timeout,
})), })),
} }
@@ -94,7 +93,7 @@ impl<T: Poolable> Pool<T> {
Checkout { Checkout {
key, key,
pool: self.clone(), pool: self.clone(),
parked: None, waiter: None,
} }
} }
@@ -217,10 +216,10 @@ impl<T: Poolable> Pool<T> {
} }
} }
fn park(&mut self, key: Key, tx: oneshot::Sender<T>) { fn waiter(&mut self, key: Key, tx: oneshot::Sender<T>) {
trace!("checkout waiting for idle connection: {:?}", key); trace!("checkout waiting for idle connection: {:?}", key);
self.inner.lock().unwrap() self.inner.lock().unwrap()
.parked.entry(key) .waiters.entry(key)
.or_insert(VecDeque::new()) .or_insert(VecDeque::new())
.push_back(tx); .push_back(tx);
} }
@@ -285,10 +284,10 @@ impl<T: Poolable> PoolInner<T> {
return; return;
} }
trace!("put; add idle connection for {:?}", key); trace!("put; add idle connection for {:?}", key);
let mut remove_parked = false; let mut remove_waiters = false;
let mut value = Some(value); let mut value = Some(value);
if let Some(parked) = self.parked.get_mut(&key) { if let Some(waiters) = self.waiters.get_mut(&key) {
while let Some(tx) = parked.pop_front() { while let Some(tx) = waiters.pop_front() {
if !tx.is_canceled() { if !tx.is_canceled() {
let reserved = value.take().expect("value already sent"); let reserved = value.take().expect("value already sent");
let reserved = match reserved.reserve() { let reserved = match reserved.reserve() {
@@ -314,10 +313,10 @@ impl<T: Poolable> PoolInner<T> {
trace!("put; removing canceled waiter for {:?}", key); trace!("put; removing canceled waiter for {:?}", key);
} }
remove_parked = parked.is_empty(); remove_waiters = waiters.is_empty();
} }
if remove_parked { if remove_waiters {
self.parked.remove(&key); self.waiters.remove(&key);
} }
match value { match value {
@@ -345,7 +344,7 @@ impl<T: Poolable> PoolInner<T> {
// cancel any waiters. if there are any, it's because // cancel any waiters. if there are any, it's because
// this Connecting task didn't complete successfully. // this Connecting task didn't complete successfully.
// those waiters would never receive a connection. // those waiters would never receive a connection.
self.parked.remove(key); self.waiters.remove(key);
} }
} }
@@ -354,16 +353,16 @@ impl<T> PoolInner<T> {
/// and possibly inserted into the pool that it is waiting for an idle /// and possibly inserted into the pool that it is waiting for an idle
/// connection. If a user ever dropped that future, we need to clean out /// connection. If a user ever dropped that future, we need to clean out
/// those parked senders. /// those parked senders.
fn clean_parked(&mut self, key: &Key) { fn clean_waiters(&mut self, key: &Key) {
let mut remove_parked = false; let mut remove_waiters = false;
if let Some(parked) = self.parked.get_mut(key) { if let Some(waiters) = self.waiters.get_mut(key) {
parked.retain(|tx| { waiters.retain(|tx| {
!tx.is_canceled() !tx.is_canceled()
}); });
remove_parked = parked.is_empty(); remove_waiters = waiters.is_empty();
} }
if remove_parked { if remove_waiters {
self.parked.remove(key); self.waiters.remove(key);
} }
} }
} }
@@ -511,13 +510,13 @@ struct Idle<T> {
pub(super) struct Checkout<T> { pub(super) struct Checkout<T> {
key: Key, key: Key,
pool: Pool<T>, pool: Pool<T>,
parked: Option<oneshot::Receiver<T>>, waiter: Option<oneshot::Receiver<T>>,
} }
impl<T: Poolable> Checkout<T> { impl<T: Poolable> Checkout<T> {
fn poll_parked(&mut self) -> Poll<Option<Pooled<T>>, ::Error> { fn poll_waiter(&mut self) -> Poll<Option<Pooled<T>>, ::Error> {
static CANCELED: &str = "pool checkout failed"; static CANCELED: &str = "pool checkout failed";
if let Some(ref mut rx) = self.parked { if let Some(mut rx) = self.waiter.take() {
match rx.poll() { match rx.poll() {
Ok(Async::Ready(value)) => { Ok(Async::Ready(value)) => {
if !value.is_closed() { if !value.is_closed() {
@@ -526,7 +525,10 @@ impl<T: Poolable> Checkout<T> {
Err(::Error::new_canceled(Some(CANCELED))) Err(::Error::new_canceled(Some(CANCELED)))
} }
}, },
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => {
self.waiter = Some(rx);
Ok(Async::NotReady)
},
Err(_canceled) => Err(::Error::new_canceled(Some(CANCELED))), Err(_canceled) => Err(::Error::new_canceled(Some(CANCELED))),
} }
} else { } else {
@@ -534,12 +536,12 @@ impl<T: Poolable> Checkout<T> {
} }
} }
fn park(&mut self) { fn add_waiter(&mut self) {
if self.parked.is_none() { if self.waiter.is_none() {
let (tx, mut rx) = oneshot::channel(); let (tx, mut rx) = oneshot::channel();
let _ = rx.poll(); // park this task let _ = rx.poll(); // park this task
self.pool.park(self.key.clone(), tx); self.pool.waiter(self.key.clone(), tx);
self.parked = Some(rx); self.waiter = Some(rx);
} }
} }
} }
@@ -549,7 +551,7 @@ impl<T: Poolable> Future for Checkout<T> {
type Error = ::Error; type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(pooled) = try_ready!(self.poll_parked()) { if let Some(pooled) = try_ready!(self.poll_waiter()) {
return Ok(Async::Ready(pooled)); return Ok(Async::Ready(pooled));
} }
@@ -558,7 +560,7 @@ impl<T: Poolable> Future for Checkout<T> {
if let Some(pooled) = entry { if let Some(pooled) = entry {
Ok(Async::Ready(pooled)) Ok(Async::Ready(pooled))
} else { } else {
self.park(); self.add_waiter();
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
@@ -566,9 +568,10 @@ impl<T: Poolable> Future for Checkout<T> {
impl<T> Drop for Checkout<T> { impl<T> Drop for Checkout<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.parked.take(); if self.waiter.take().is_some() {
if let Ok(mut inner) = self.pool.inner.lock() { if let Ok(mut inner) = self.pool.inner.lock() {
inner.clean_parked(&self.key); inner.clean_waiters(&self.key);
}
} }
} }
} }
@@ -782,7 +785,7 @@ mod tests {
} }
#[test] #[test]
fn test_pool_checkout_drop_cleans_up_parked() { fn test_pool_checkout_drop_cleans_up_waiters() {
future::lazy(|| { future::lazy(|| {
let pool = Pool::<Uniq<i32>>::new(true, Some(Duration::from_secs(10))); let pool = Pool::<Uniq<i32>>::new(true, Some(Duration::from_secs(10)));
let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); let key = (Arc::new("localhost:12345".to_string()), Ver::Http1);
@@ -792,16 +795,16 @@ mod tests {
// first poll needed to get into Pool's parked // first poll needed to get into Pool's parked
checkout1.poll().unwrap(); checkout1.poll().unwrap();
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 1); assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 1);
checkout2.poll().unwrap(); checkout2.poll().unwrap();
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 2); assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 2);
// on drop, clean up Pool // on drop, clean up Pool
drop(checkout1); drop(checkout1);
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 1); assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 1);
drop(checkout2); drop(checkout2);
assert!(pool.inner.lock().unwrap().parked.get(&key).is_none()); assert!(pool.inner.lock().unwrap().waiters.get(&key).is_none());
::futures::future::ok::<(), ()>(()) ::futures::future::ok::<(), ()>(())
}).wait().unwrap(); }).wait().unwrap();