From c1f84af481598bf9c5e03b9c3d3d667326d80ac0 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 2 Apr 2018 10:34:34 -0700 Subject: [PATCH] refactor(client): notify idle interval when pool drops --- src/client/pool.rs | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/client/pool.rs b/src/client/pool.rs index f4d6f637..4fb077ce 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -38,12 +38,9 @@ struct PoolInner { // connection. parked: HashMap, VecDeque>>, timeout: Option, - // Used to prevent multiple intervals from being spawned to clear - // expired connections. - // - // TODO(0.12): Remove the need for this when Client::schedule_pool_timer - // can be done in Client::new. - expired_timer_spawned: bool, + // A oneshot channel is used to allow the interval to be notified when + // the Pool completely drops. That way, the interval can cancel immediately. + idle_interval_ref: Option>, } impl Pool { @@ -52,9 +49,9 @@ impl Pool { inner: Arc::new(Mutex::new(PoolInner { enabled: enabled, idle: HashMap::new(), + idle_interval_ref: None, parked: HashMap::new(), timeout: timeout, - expired_timer_spawned: false, })) } } @@ -222,20 +219,21 @@ impl PoolInner { impl Pool { fn spawn_expired_interval(&mut self, cx: &mut task::Context) -> Result<(), ::Error> { - let dur = { + let (dur, rx) = { let mut inner = self.inner.lock().unwrap(); if !inner.enabled { return Ok(()); } - if inner.expired_timer_spawned { + if inner.idle_interval_ref.is_some() { return Ok(()); } - inner.expired_timer_spawned = true; if let Some(dur) = inner.timeout { - dur + let (tx, rx) = oneshot::channel(); + inner.idle_interval_ref = Some(tx); + (dur, rx) } else { return Ok(()); } @@ -245,6 +243,7 @@ impl Pool { super::execute(IdleInterval { interval: interval, pool: Arc::downgrade(&self.inner), + pool_drop_notifier: rx, }, cx) } } @@ -411,6 +410,10 @@ impl Expiration { struct IdleInterval { interval: Interval, pool: Weak>>, + // This allows the IdleInterval to be notified as soon as the entire + // Pool is fully dropped, and shutdown. This channel is never sent on, + // but Err(Canceled) will be received when the Pool is dropped. + pool_drop_notifier: oneshot::Receiver, } impl Future for IdleInterval { @@ -419,6 +422,15 @@ impl Future for IdleInterval { fn poll(&mut self, cx: &mut task::Context) -> Poll { loop { + match self.pool_drop_notifier.poll(cx) { + Ok(Async::Ready(n)) => match n {}, + Ok(Async::Pending) => (), + Err(_canceled) => { + trace!("pool closed, canceling idle interval"); + return Ok(Async::Ready(())); + } + } + try_ready!(self.interval.poll_next(cx).map_err(|_| unreachable!("interval cannot error"))); if let Some(inner) = self.pool.upgrade() {