From 727b74797e5754af8abba8812a876c3c8fda6d94 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 28 Feb 2018 14:57:06 -0800 Subject: [PATCH] fix(client): schedule interval to clear expired idle connections Currently only works if Client is built with a `Handle`, and not a custome executor, since a `Handle` is required to create a tokio Interval. --- src/client/mod.rs | 34 +++++++++++++++------------------- src/client/pool.rs | 26 ++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index fbf1061e..7d96de50 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -90,33 +90,15 @@ impl Client { Exec::Executor(..) => panic!("Client not built with a Handle"), } } -} - -impl Client -where C: Connect, - B: Stream, - B::Item: AsRef<[u8]>, -{ - // Create a new client with a specific connector. #[inline] fn configured(config: Config, exec: Exec) -> Client { - let client = Client { + Client { connector: Rc::new(config.connector), executor: exec, h1_writev: config.h1_writev, pool: Pool::new(config.keep_alive, config.keep_alive_timeout), retry_canceled_requests: config.retry_canceled_requests, - }; - - client.schedule_pool_timer(); - - client - } - - fn schedule_pool_timer(&self) { - if let Exec::Handle(ref h) = self.executor { - self.pool.spawn_expired_interval(h); } } } @@ -135,6 +117,14 @@ where C: Connect, /// Send a constructed Request using this Client. #[inline] pub fn request(&self, mut req: Request) -> FutureResponse { + // TODO(0.12): do this at construction time. + // + // It cannot be done in the constructor because the Client::configured + // does not have `B: 'static` bounds, which are required to spawn + // the interval. In 0.12, add a static bounds to the constructor, + // and move this. + self.schedule_pool_timer(); + match req.version() { HttpVersion::Http10 | HttpVersion::Http11 => (), @@ -263,6 +253,12 @@ where C: Connect, Box::new(resp) } + + fn schedule_pool_timer(&self) { + if let Exec::Handle(ref h) = self.executor { + self.pool.spawn_expired_interval(h); + } + } } impl Service for Client diff --git a/src/client/pool.rs b/src/client/pool.rs index 12c41587..e5e12b1a 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -41,6 +41,12 @@ struct PoolInner { // connection. parked: HashMap, VecDeque>>>, timeout: Option, + // Used to prevent multiple intervals from being spawned to clear + // expired connections. + // + // TODO(0.12): Remove the need for this when Client::schedule_pool_timer + // can be done in Client::new. + expired_timer_spawned: bool, } impl Pool { @@ -51,6 +57,7 @@ impl Pool { idle: HashMap::new(), parked: HashMap::new(), timeout: timeout, + expired_timer_spawned: false, })), } } @@ -229,12 +236,17 @@ impl Pool { impl Pool { pub(super) fn spawn_expired_interval(&self, handle: &Handle) { - let inner = self.inner.borrow(); + let mut inner = self.inner.borrow_mut(); if !inner.enabled { return; } + if inner.expired_timer_spawned { + return; + } + inner.expired_timer_spawned = true; + let dur = if let Some(dur) = inner.timeout { dur } else { @@ -529,7 +541,9 @@ mod tests { #[test] fn test_pool_timer_removes_expired() { - let pool = Pool::new(true, Some(Duration::from_secs(1))); + let mut core = ::tokio::reactor::Core::new().unwrap(); + let pool = Pool::new(true, Some(Duration::from_millis(100))); + pool.spawn_expired_interval(&core.handle()); let key = Rc::new("foo".to_string()); let mut pooled1 = pool.pooled(key.clone(), 41); @@ -540,9 +554,13 @@ mod tests { pooled3.idle(); assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3)); - ::std::thread::sleep(pool.inner.borrow().timeout.unwrap()); - pool.clear_expired(); + let timeout = ::tokio::reactor::Timeout::new( + Duration::from_millis(400), // allow for too-good resolution + &core.handle() + ).unwrap(); + core.run(timeout).unwrap(); + assert!(pool.inner.borrow().idle.get(&key).is_none()); }