diff --git a/src/client/mod.rs b/src/client/mod.rs index 8cdbc039..d3726907 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -91,7 +91,6 @@ impl Client { } } - /// Create a new client with a specific connector. #[inline] fn configured(config: Config, exec: Exec) -> Client { Client { @@ -118,6 +117,14 @@ where C: Connect, /// Send a constructed Request using this Client. #[inline] pub fn request(&self, mut req: Request) -> FutureResponse { + // TODO(0.12): do this at construction time. + // + // It cannot be done in the constructor because the Client::configured + // does not have `B: 'static` bounds, which are required to spawn + // the interval. In 0.12, add a static bounds to the constructor, + // and move this. + self.schedule_pool_timer(); + match req.version() { HttpVersion::Http10 | HttpVersion::Http11 => (), @@ -249,6 +256,12 @@ where C: Connect, Box::new(resp) } + + fn schedule_pool_timer(&self) { + if let Exec::Handle(ref h) = self.executor { + self.pool.spawn_expired_interval(h); + } + } } impl Service for Client diff --git a/src/client/pool.rs b/src/client/pool.rs index 095b79ad..e5e12b1a 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -6,7 +6,8 @@ use std::ops::{Deref, DerefMut, BitAndAssign}; use std::rc::{Rc, Weak}; use std::time::{Duration, Instant}; -use futures::{Future, Async, Poll}; +use futures::{Future, Async, Poll, Stream}; +use tokio::reactor::{Handle, Interval}; use relay; use proto::{KeepAlive, KA}; @@ -40,6 +41,12 @@ struct PoolInner { // connection. parked: HashMap, VecDeque>>>, timeout: Option, + // Used to prevent multiple intervals from being spawned to clear + // expired connections. + // + // TODO(0.12): Remove the need for this when Client::schedule_pool_timer + // can be done in Client::new. + expired_timer_spawned: bool, } impl Pool { @@ -50,6 +57,7 @@ impl Pool { idle: HashMap::new(), parked: HashMap::new(), timeout: timeout, + expired_timer_spawned: false, })), } } @@ -194,6 +202,64 @@ impl Pool { inner.parked.remove(key); } } + + fn clear_expired(&self) { + let mut inner = self.inner.borrow_mut(); + + let dur = if let Some(dur) = inner.timeout { + dur + } else { + return + }; + + let now = Instant::now(); + //self.last_idle_check_at = now; + + inner.idle.retain(|_key, values| { + + values.retain(|val| { + match val.status.get() { + TimedKA::Idle(idle_at) if now - idle_at < dur => { + true + }, + _ => false, + } + //now - val.idle_at < dur + }); + + // returning false evicts this key/val + !values.is_empty() + }); + } +} + + +impl Pool { + pub(super) fn spawn_expired_interval(&self, handle: &Handle) { + let mut inner = self.inner.borrow_mut(); + + if !inner.enabled { + return; + } + + if inner.expired_timer_spawned { + return; + } + inner.expired_timer_spawned = true; + + let dur = if let Some(dur) = inner.timeout { + dur + } else { + return + }; + + let interval = Interval::new(dur, handle) + .expect("reactor is gone"); + handle.spawn(IdleInterval { + interval: interval, + pool: Rc::downgrade(&self.inner), + }); + } } impl Clone for Pool { @@ -385,6 +451,28 @@ impl Expiration { } } +struct IdleInterval { + interval: Interval, + pool: Weak>>, +} + +impl Future for IdleInterval { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + loop { + try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error"))); + + if let Some(inner) = self.pool.upgrade() { + let pool = Pool { inner: inner }; + pool.clear_expired(); + } else { + return Ok(Async::Ready(())); + } + } + } +} #[cfg(test)] mod tests { @@ -428,7 +516,7 @@ mod tests { } #[test] - fn test_pool_removes_expired() { + fn test_pool_checkout_removes_expired() { let pool = Pool::new(true, Some(Duration::from_secs(1))); let key = Rc::new("foo".to_string()); @@ -451,6 +539,31 @@ mod tests { assert!(pool.inner.borrow().idle.get(&key).is_none()); } + #[test] + fn test_pool_timer_removes_expired() { + let mut core = ::tokio::reactor::Core::new().unwrap(); + let pool = Pool::new(true, Some(Duration::from_millis(100))); + pool.spawn_expired_interval(&core.handle()); + let key = Rc::new("foo".to_string()); + + let mut pooled1 = pool.pooled(key.clone(), 41); + pooled1.idle(); + let mut pooled2 = pool.pooled(key.clone(), 5); + pooled2.idle(); + let mut pooled3 = pool.pooled(key.clone(), 99); + pooled3.idle(); + + assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3)); + + let timeout = ::tokio::reactor::Timeout::new( + Duration::from_millis(400), // allow for too-good resolution + &core.handle() + ).unwrap(); + core.run(timeout).unwrap(); + + assert!(pool.inner.borrow().idle.get(&key).is_none()); + } + #[test] fn test_pool_checkout_task_unparked() { let pool = Pool::new(true, Some(Duration::from_secs(10)));