refactor(client): notify idle interval when pool drops

This commit is contained in:
Sean McArthur
2018-04-02 10:34:34 -07:00
parent a12f7beed9
commit c1f84af481

View File

@@ -38,12 +38,9 @@ struct PoolInner<T> {
// connection. // connection.
parked: HashMap<Arc<String>, VecDeque<oneshot::Sender<T>>>, parked: HashMap<Arc<String>, VecDeque<oneshot::Sender<T>>>,
timeout: Option<Duration>, timeout: Option<Duration>,
// Used to prevent multiple intervals from being spawned to clear // A oneshot channel is used to allow the interval to be notified when
// expired connections. // the Pool completely drops. That way, the interval can cancel immediately.
// idle_interval_ref: Option<oneshot::Sender<Never>>,
// TODO(0.12): Remove the need for this when Client::schedule_pool_timer
// can be done in Client::new.
expired_timer_spawned: bool,
} }
impl<T> Pool<T> { impl<T> Pool<T> {
@@ -52,9 +49,9 @@ impl<T> Pool<T> {
inner: Arc::new(Mutex::new(PoolInner { inner: Arc::new(Mutex::new(PoolInner {
enabled: enabled, enabled: enabled,
idle: HashMap::new(), idle: HashMap::new(),
idle_interval_ref: None,
parked: HashMap::new(), parked: HashMap::new(),
timeout: timeout, timeout: timeout,
expired_timer_spawned: false,
})) }))
} }
} }
@@ -222,20 +219,21 @@ impl<T: Closed> PoolInner<T> {
impl<T: Closed + Send + 'static> Pool<T> { impl<T: Closed + Send + 'static> Pool<T> {
fn spawn_expired_interval(&mut self, cx: &mut task::Context) -> Result<(), ::Error> { fn spawn_expired_interval(&mut self, cx: &mut task::Context) -> Result<(), ::Error> {
let dur = { let (dur, rx) = {
let mut inner = self.inner.lock().unwrap(); let mut inner = self.inner.lock().unwrap();
if !inner.enabled { if !inner.enabled {
return Ok(()); return Ok(());
} }
if inner.expired_timer_spawned { if inner.idle_interval_ref.is_some() {
return Ok(()); return Ok(());
} }
inner.expired_timer_spawned = true;
if let Some(dur) = inner.timeout { if let Some(dur) = inner.timeout {
dur let (tx, rx) = oneshot::channel();
inner.idle_interval_ref = Some(tx);
(dur, rx)
} else { } else {
return Ok(()); return Ok(());
} }
@@ -245,6 +243,7 @@ impl<T: Closed + Send + 'static> Pool<T> {
super::execute(IdleInterval { super::execute(IdleInterval {
interval: interval, interval: interval,
pool: Arc::downgrade(&self.inner), pool: Arc::downgrade(&self.inner),
pool_drop_notifier: rx,
}, cx) }, cx)
} }
} }
@@ -411,6 +410,10 @@ impl Expiration {
struct IdleInterval<T> { struct IdleInterval<T> {
interval: Interval, interval: Interval,
pool: Weak<Mutex<PoolInner<T>>>, pool: Weak<Mutex<PoolInner<T>>>,
// This allows the IdleInterval to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
// but Err(Canceled) will be received when the Pool is dropped.
pool_drop_notifier: oneshot::Receiver<Never>,
} }
impl<T: Closed + 'static> Future for IdleInterval<T> { impl<T: Closed + 'static> Future for IdleInterval<T> {
@@ -419,6 +422,15 @@ impl<T: Closed + 'static> Future for IdleInterval<T> {
fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> { fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
loop { loop {
match self.pool_drop_notifier.poll(cx) {
Ok(Async::Ready(n)) => match n {},
Ok(Async::Pending) => (),
Err(_canceled) => {
trace!("pool closed, canceling idle interval");
return Ok(Async::Ready(()));
}
}
try_ready!(self.interval.poll_next(cx).map_err(|_| unreachable!("interval cannot error"))); try_ready!(self.interval.poll_next(cx).map_err(|_| unreachable!("interval cannot error")));
if let Some(inner) = self.pool.upgrade() { if let Some(inner) = self.pool.upgrade() {