refactor(client): don't allocate PoolInner if pool is disabled

This commit is contained in:
Sean McArthur
2018-09-28 11:13:22 -07:00
parent bddc2d5040
commit 192348d7ef

View File

@@ -15,7 +15,8 @@ use super::Ver;
// FIXME: allow() required due to `impl Trait` leaking types to this lint // FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub(super) struct Pool<T> { pub(super) struct Pool<T> {
inner: Arc<PoolInner<T>>, // If the pool is disabled, this is None.
inner: Option<Arc<Mutex<PoolInner<T>>>>,
} }
// Before using a pooled connection, make sure the sender is not dead. // Before using a pooled connection, make sure the sender is not dead.
@@ -52,11 +53,6 @@ pub(super) enum Reservation<T> {
pub(super) type Key = (Arc<String>, Ver); pub(super) type Key = (Arc<String>, Ver);
struct PoolInner<T> { struct PoolInner<T> {
connections: Mutex<Connections<T>>,
enabled: bool,
}
struct Connections<T> {
// A flag that a connection is being estabilished, and the connection // A flag that a connection is being estabilished, and the connection
// should be shared. This prevents making multiple HTTP/2 connections // should be shared. This prevents making multiple HTTP/2 connections
// to the same host. // to the same host.
@@ -104,30 +100,37 @@ pub(super) struct MaxIdlePerHost(pub(super) usize);
impl<T> Pool<T> { impl<T> Pool<T> {
pub fn new(enabled: Enabled, timeout: IdleTimeout, max_idle: MaxIdlePerHost, __exec: &Exec) -> Pool<T> { pub fn new(enabled: Enabled, timeout: IdleTimeout, max_idle: MaxIdlePerHost, __exec: &Exec) -> Pool<T> {
let inner = if enabled.0 {
Some(Arc::new(Mutex::new(PoolInner {
connecting: HashSet::new(),
idle: HashMap::new(),
#[cfg(feature = "runtime")]
idle_interval_ref: None,
max_idle_per_host: max_idle.0,
waiters: HashMap::new(),
#[cfg(feature = "runtime")]
exec: __exec.clone(),
timeout: timeout.0,
})))
} else {
None
};
Pool { Pool {
inner: Arc::new(PoolInner { inner,
connections: Mutex::new(Connections {
connecting: HashSet::new(),
idle: HashMap::new(),
#[cfg(feature = "runtime")]
idle_interval_ref: None,
max_idle_per_host: max_idle.0,
waiters: HashMap::new(),
#[cfg(feature = "runtime")]
exec: __exec.clone(),
timeout: timeout.0,
}),
enabled: enabled.0,
}),
} }
} }
fn is_enabled(&self) -> bool {
self.inner.is_some()
}
#[cfg(test)] #[cfg(test)]
pub(super) fn no_timer(&self) { pub(super) fn no_timer(&self) {
// Prevent an actual interval from being created for this pool... // Prevent an actual interval from being created for this pool...
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
{ {
let mut inner = self.inner.connections.lock().unwrap(); let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
assert!(inner.idle_interval_ref.is_none(), "timer already spawned"); assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
let (tx, _) = oneshot::channel(); let (tx, _) = oneshot::channel();
inner.idle_interval_ref = Some(tx); inner.idle_interval_ref = Some(tx);
@@ -149,26 +152,39 @@ impl<T: Poolable> Pool<T> {
/// Ensure that there is only ever 1 connecting task for HTTP/2 /// Ensure that there is only ever 1 connecting task for HTTP/2
/// connections. This does nothing for HTTP/1. /// connections. This does nothing for HTTP/1.
pub(super) fn connecting(&self, key: &Key) -> Option<Connecting<T>> { pub(super) fn connecting(&self, key: &Key) -> Option<Connecting<T>> {
if key.1 == Ver::Http2 && self.inner.enabled { if key.1 == Ver::Http2 {
let mut inner = self.inner.connections.lock().unwrap(); if let Some(ref enabled) = self.inner {
if inner.connecting.insert(key.clone()) { let mut inner = enabled.lock().unwrap();
let connecting = Connecting { return if inner.connecting.insert(key.clone()) {
key: key.clone(), let connecting = Connecting {
pool: WeakOpt::downgrade(&self.inner), key: key.clone(),
pool: WeakOpt::downgrade(enabled),
};
Some(connecting)
} else {
trace!("HTTP/2 connecting already in progress for {:?}", key.0);
None
}; };
Some(connecting)
} else {
trace!("HTTP/2 connecting already in progress for {:?}", key.0);
None
} }
} else {
Some(Connecting {
key: key.clone(),
// in HTTP/1's case, there is never a lock, so we don't
// need to do anything in Drop.
pool: WeakOpt::none(),
})
} }
// else
Some(Connecting {
key: key.clone(),
// in HTTP/1's case, there is never a lock, so we don't
// need to do anything in Drop.
pool: WeakOpt::none(),
})
}
#[cfg(test)]
fn locked(&self) -> ::std::sync::MutexGuard<PoolInner<T>> {
self
.inner
.as_ref()
.expect("enabled")
.lock()
.expect("lock")
} }
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
@@ -181,10 +197,7 @@ impl<T: Poolable> Pool<T> {
#[cfg(test)] #[cfg(test)]
pub(super) fn idle_count(&self, key: &Key) -> usize { pub(super) fn idle_count(&self, key: &Key) -> usize {
self self
.inner .locked()
.connections
.lock()
.unwrap()
.idle .idle
.get(key) .get(key)
.map(|list| list.len()) .map(|list| list.len())
@@ -193,7 +206,7 @@ impl<T: Poolable> Pool<T> {
fn take(&self, key: &Key) -> Option<Pooled<T>> { fn take(&self, key: &Key) -> Option<Pooled<T>> {
let entry = { let entry = {
let mut inner = self.inner.connections.lock().unwrap(); let mut inner = self.inner.as_ref()?.lock().unwrap();
let expiration = Expiration::new(inner.timeout); let expiration = Expiration::new(inner.timeout);
let maybe_entry = inner.idle.get_mut(key) let maybe_entry = inner.idle.get_mut(key)
.and_then(|list| { .and_then(|list| {
@@ -227,7 +240,7 @@ impl<T: Poolable> Pool<T> {
} }
pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> { pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
let (value, pool_ref) = if self.inner.enabled { let (value, pool_ref) = if let Some(ref enabled) = self.inner {
match value.reserve() { match value.reserve() {
Reservation::Shared(to_insert, to_return) => { Reservation::Shared(to_insert, to_return) => {
debug_assert_eq!( debug_assert_eq!(
@@ -235,8 +248,8 @@ impl<T: Poolable> Pool<T> {
Ver::Http2, Ver::Http2,
"shared reservation without Http2" "shared reservation without Http2"
); );
let mut inner = self.inner.connections.lock().unwrap(); let mut inner = enabled.lock().unwrap();
inner.put(connecting.key.clone(), to_insert, &self.inner); inner.put(connecting.key.clone(), to_insert, enabled);
// Do this here instead of Drop for Connecting because we // Do this here instead of Drop for Connecting because we
// already have a lock, no need to lock the mutex twice. // already have a lock, no need to lock the mutex twice.
inner.connected(&connecting.key); inner.connected(&connecting.key);
@@ -251,7 +264,7 @@ impl<T: Poolable> Pool<T> {
// Unique reservations must take a reference to the pool // Unique reservations must take a reference to the pool
// since they hope to reinsert once the reservation is // since they hope to reinsert once the reservation is
// completed // completed
(value, WeakOpt::downgrade(&self.inner)) (value, WeakOpt::downgrade(enabled))
}, },
} }
} else { } else {
@@ -280,11 +293,12 @@ impl<T: Poolable> Pool<T> {
// we just have the final value, without knowledge of if this is // we just have the final value, without knowledge of if this is
// unique or shared. So, the hack is to just assume Ver::Http2 means // unique or shared. So, the hack is to just assume Ver::Http2 means
// shared... :( // shared... :(
let pool_ref = if key.1 == Ver::Http2 { let mut pool_ref = WeakOpt::none();
WeakOpt::none() if key.1 == Ver::Http1 {
} else { if let Some(ref enabled) = self.inner {
WeakOpt::downgrade(&self.inner) pool_ref = WeakOpt::downgrade(enabled);
}; }
}
Pooled { Pooled {
is_reused: true, is_reused: true,
@@ -294,10 +308,19 @@ impl<T: Poolable> Pool<T> {
} }
} }
fn waiter(&mut self, key: Key, tx: oneshot::Sender<T>) { fn waiter(&self, key: Key, tx: oneshot::Sender<T>) {
debug_assert!(
self.is_enabled(),
"pool.waiter() should not be called if disabled",
);
trace!("checkout waiting for idle connection: {:?}", key); trace!("checkout waiting for idle connection: {:?}", key);
self.inner.connections.lock().unwrap() self.inner
.waiters.entry(key) .as_ref()
.expect("pool.waiter() expects pool is enabled")
.lock()
.unwrap()
.waiters
.entry(key)
.or_insert(VecDeque::new()) .or_insert(VecDeque::new())
.push_back(tx); .push_back(tx);
} }
@@ -352,8 +375,8 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
} }
} }
impl<T: Poolable> Connections<T> { impl<T: Poolable> PoolInner<T> {
fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<PoolInner<T>>) { fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
if key.1 == Ver::Http2 && self.idle.contains_key(&key) { if key.1 == Ver::Http2 && self.idle.contains_key(&key) {
trace!("put; existing idle HTTP/2 connection for {:?}", key); trace!("put; existing idle HTTP/2 connection for {:?}", key);
return; return;
@@ -438,10 +461,8 @@ impl<T: Poolable> Connections<T> {
} }
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
fn spawn_idle_interval(&mut self, pool_ref: &Arc<PoolInner<T>>) { fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
let (dur, rx) = { let (dur, rx) = {
debug_assert!(pool_ref.enabled);
if self.idle_interval_ref.is_some() { if self.idle_interval_ref.is_some() {
return; return;
} }
@@ -470,7 +491,7 @@ impl<T: Poolable> Connections<T> {
} }
} }
impl<T> Connections<T> { impl<T> PoolInner<T> {
/// Any `FutureResponse`s that were created will have made a `Checkout`, /// Any `FutureResponse`s that were created will have made a `Checkout`,
/// and possibly inserted into the pool that it is waiting for an idle /// and possibly inserted into the pool that it is waiting for an idle
/// connection. If a user ever dropped that future, we need to clean out /// connection. If a user ever dropped that future, we need to clean out
@@ -490,7 +511,7 @@ impl<T> Connections<T> {
} }
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
impl<T: Poolable> Connections<T> { impl<T: Poolable> PoolInner<T> {
/// This should *only* be called by the IdleInterval. /// This should *only* be called by the IdleInterval.
fn clear_expired(&mut self) { fn clear_expired(&mut self) {
let dur = self.timeout.expect("interval assumes timeout"); let dur = self.timeout.expect("interval assumes timeout");
@@ -533,7 +554,7 @@ pub(super) struct Pooled<T: Poolable> {
value: Option<T>, value: Option<T>,
is_reused: bool, is_reused: bool,
key: Key, key: Key,
pool: WeakOpt<PoolInner<T>>, pool: WeakOpt<Mutex<PoolInner<T>>>,
} }
impl<T: Poolable> Pooled<T> { impl<T: Poolable> Pooled<T> {
@@ -577,11 +598,7 @@ impl<T: Poolable> Drop for Pooled<T> {
} }
if let Some(pool) = self.pool.upgrade() { if let Some(pool) = self.pool.upgrade() {
// Pooled should not have had a real reference if pool is if let Ok(mut inner) = pool.lock() {
// not enabled!
debug_assert!(pool.enabled);
if let Ok(mut inner) = pool.connections.lock() {
inner.put(self.key.clone(), value, &pool); inner.put(self.key.clone(), value, &pool);
} }
} else if self.key.1 == Ver::Http1 { } else if self.key.1 == Ver::Http1 {
@@ -638,6 +655,8 @@ impl<T: Poolable> Checkout<T> {
} }
fn add_waiter(&mut self) { fn add_waiter(&mut self) {
debug_assert!(self.pool.is_enabled());
if self.waiter.is_none() { if self.waiter.is_none() {
let (tx, mut rx) = oneshot::channel(); let (tx, mut rx) = oneshot::channel();
let _ = rx.poll(); // park this task let _ = rx.poll(); // park this task
@@ -660,6 +679,8 @@ impl<T: Poolable> Future for Checkout<T> {
if let Some(pooled) = entry { if let Some(pooled) = entry {
Ok(Async::Ready(pooled)) Ok(Async::Ready(pooled))
} else if !self.pool.is_enabled() {
Err(::Error::new_canceled(Some("pool is disabled")))
} else { } else {
self.add_waiter(); self.add_waiter();
Ok(Async::NotReady) Ok(Async::NotReady)
@@ -670,7 +691,7 @@ impl<T: Poolable> Future for Checkout<T> {
impl<T> Drop for Checkout<T> { impl<T> Drop for Checkout<T> {
fn drop(&mut self) { fn drop(&mut self) {
if self.waiter.take().is_some() { if self.waiter.take().is_some() {
if let Ok(mut inner) = self.pool.inner.connections.lock() { if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
inner.clean_waiters(&self.key); inner.clean_waiters(&self.key);
} }
} }
@@ -681,14 +702,14 @@ impl<T> Drop for Checkout<T> {
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub(super) struct Connecting<T: Poolable> { pub(super) struct Connecting<T: Poolable> {
key: Key, key: Key,
pool: WeakOpt<PoolInner<T>>, pool: WeakOpt<Mutex<PoolInner<T>>>,
} }
impl<T: Poolable> Drop for Connecting<T> { impl<T: Poolable> Drop for Connecting<T> {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(pool) = self.pool.upgrade() { if let Some(pool) = self.pool.upgrade() {
// No need to panic on drop, that could abort! // No need to panic on drop, that could abort!
if let Ok(mut inner) = pool.connections.lock() { if let Ok(mut inner) = pool.lock() {
debug_assert_eq!( debug_assert_eq!(
self.key.1, self.key.1,
Ver::Http2, Ver::Http2,
@@ -718,7 +739,7 @@ impl Expiration {
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
struct IdleInterval<T> { struct IdleInterval<T> {
interval: Interval, interval: Interval,
pool: WeakOpt<PoolInner<T>>, pool: WeakOpt<Mutex<PoolInner<T>>>,
// This allows the IdleInterval to be notified as soon as the entire // This allows the IdleInterval to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on, // Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped. // but Err(Canceled) will be received when the Pool is dropped.
@@ -749,7 +770,7 @@ impl<T: Poolable + 'static> Future for IdleInterval<T> {
})); }));
if let Some(inner) = self.pool.upgrade() { if let Some(inner) = self.pool.upgrade() {
if let Ok(mut inner) = inner.connections.lock() { if let Ok(mut inner) = inner.lock() {
trace!("idle interval checking for expired"); trace!("idle interval checking for expired");
inner.clear_expired(); inner.clear_expired();
continue; continue;
@@ -842,7 +863,7 @@ mod tests {
let key = (Arc::new("foo".to_string()), Ver::Http1); let key = (Arc::new("foo".to_string()), Ver::Http1);
let pooled = pool.pooled(c(key.clone()), Uniq(41)); let pooled = pool.pooled(c(key.clone()), Uniq(41));
drop(pooled); drop(pooled);
::std::thread::sleep(pool.inner.connections.lock().unwrap().timeout.unwrap()); ::std::thread::sleep(pool.locked().timeout.unwrap());
assert!(pool.checkout(key).poll().unwrap().is_not_ready()); assert!(pool.checkout(key).poll().unwrap().is_not_ready());
::futures::future::ok::<(), ()>(()) ::futures::future::ok::<(), ()>(())
}).wait().unwrap(); }).wait().unwrap();
@@ -858,12 +879,12 @@ mod tests {
pool.pooled(c(key.clone()), Uniq(5)); pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99)); pool.pooled(c(key.clone()), Uniq(99));
assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3));
::std::thread::sleep(pool.inner.connections.lock().unwrap().timeout.unwrap()); ::std::thread::sleep(pool.locked().timeout.unwrap());
// checkout.poll() should clean out the expired // checkout.poll() should clean out the expired
pool.checkout(key.clone()).poll().unwrap(); pool.checkout(key.clone()).poll().unwrap();
assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none()); assert!(pool.locked().idle.get(&key).is_none());
Ok::<(), ()>(()) Ok::<(), ()>(())
}).wait().unwrap(); }).wait().unwrap();
@@ -880,7 +901,7 @@ mod tests {
pool.pooled(c(key.clone()), Uniq(99)); pool.pooled(c(key.clone()), Uniq(99));
// pooled and dropped 3, max_idle should only allow 2 // pooled and dropped 3, max_idle should only allow 2
assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(2)); assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(2));
Ok::<(), ()>(()) Ok::<(), ()>(())
}).wait().unwrap(); }).wait().unwrap();
@@ -911,14 +932,14 @@ mod tests {
Ok::<_, ()>(()) Ok::<_, ()>(())
})).unwrap(); })).unwrap();
assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3));
// Let the timer tick passed the expiration... // Let the timer tick passed the expiration...
rt rt
.block_on(Delay::new(Instant::now() + Duration::from_millis(200))) .block_on(Delay::new(Instant::now() + Duration::from_millis(200)))
.expect("rt block_on 200ms"); .expect("rt block_on 200ms");
assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none()); assert!(pool.locked().idle.get(&key).is_none());
} }
#[test] #[test]
@@ -950,16 +971,16 @@ mod tests {
// first poll needed to get into Pool's parked // first poll needed to get into Pool's parked
checkout1.poll().unwrap(); checkout1.poll().unwrap();
assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 1); assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
checkout2.poll().unwrap(); checkout2.poll().unwrap();
assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 2); assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
// on drop, clean up Pool // on drop, clean up Pool
drop(checkout1); drop(checkout1);
assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 1); assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
drop(checkout2); drop(checkout2);
assert!(pool.inner.connections.lock().unwrap().waiters.get(&key).is_none()); assert!(pool.locked().waiters.get(&key).is_none());
::futures::future::ok::<(), ()>(()) ::futures::future::ok::<(), ()>(())
}).wait().unwrap(); }).wait().unwrap();
@@ -990,6 +1011,6 @@ mod tests {
closed: true, closed: true,
}); });
assert!(!pool.inner.connections.lock().unwrap().idle.contains_key(&key)); assert!(!pool.locked().idle.contains_key(&key));
} }
} }