From 1223fc28eecffbe206f3807f574b720625859384 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 28 Feb 2018 13:57:35 -0800 Subject: [PATCH 1/2] wip --- src/client/mod.rs | 21 +++++++++- src/client/pool.rs | 99 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 116 insertions(+), 4 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index b026e5cb..fbf1061e 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -90,16 +90,33 @@ impl Client { Exec::Executor(..) => panic!("Client not built with a Handle"), } } +} - /// Create a new client with a specific connector. + +impl Client +where C: Connect, + B: Stream, + B::Item: AsRef<[u8]>, +{ + // Create a new client with a specific connector. #[inline] fn configured(config: Config, exec: Exec) -> Client { - Client { + let client = Client { connector: Rc::new(config.connector), executor: exec, h1_writev: config.h1_writev, pool: Pool::new(config.keep_alive, config.keep_alive_timeout), retry_canceled_requests: config.retry_canceled_requests, + }; + + client.schedule_pool_timer(); + + client + } + + fn schedule_pool_timer(&self) { + if let Exec::Handle(ref h) = self.executor { + self.pool.spawn_expired_interval(h); } } } diff --git a/src/client/pool.rs b/src/client/pool.rs index 095b79ad..12c41587 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -6,7 +6,8 @@ use std::ops::{Deref, DerefMut, BitAndAssign}; use std::rc::{Rc, Weak}; use std::time::{Duration, Instant}; -use futures::{Future, Async, Poll}; +use futures::{Future, Async, Poll, Stream}; +use tokio::reactor::{Handle, Interval}; use relay; use proto::{KeepAlive, KA}; @@ -194,6 +195,59 @@ impl Pool { inner.parked.remove(key); } } + + fn clear_expired(&self) { + let mut inner = self.inner.borrow_mut(); + + let dur = if let Some(dur) = inner.timeout { + dur + } else { + return + }; + + let now = Instant::now(); + //self.last_idle_check_at = now; + + inner.idle.retain(|_key, values| { + + values.retain(|val| { + match val.status.get() { + TimedKA::Idle(idle_at) if now - idle_at < dur => { + true + }, + _ => false, + } + //now - val.idle_at < dur + }); + + // returning false evicts this key/val + !values.is_empty() + }); + } +} + + +impl Pool { + pub(super) fn spawn_expired_interval(&self, handle: &Handle) { + let inner = self.inner.borrow(); + + if !inner.enabled { + return; + } + + let dur = if let Some(dur) = inner.timeout { + dur + } else { + return + }; + + let interval = Interval::new(dur, handle) + .expect("reactor is gone"); + handle.spawn(IdleInterval { + interval: interval, + pool: Rc::downgrade(&self.inner), + }); + } } impl Clone for Pool { @@ -385,6 +439,28 @@ impl Expiration { } } +struct IdleInterval { + interval: Interval, + pool: Weak>>, +} + +impl Future for IdleInterval { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + loop { + try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error"))); + + if let Some(inner) = self.pool.upgrade() { + let pool = Pool { inner: inner }; + pool.clear_expired(); + } else { + return Ok(Async::Ready(())); + } + } + } +} #[cfg(test)] mod tests { @@ -428,7 +504,7 @@ mod tests { } #[test] - fn test_pool_removes_expired() { + fn test_pool_checkout_removes_expired() { let pool = Pool::new(true, Some(Duration::from_secs(1))); let key = Rc::new("foo".to_string()); @@ -451,6 +527,25 @@ mod tests { assert!(pool.inner.borrow().idle.get(&key).is_none()); } + #[test] + fn test_pool_timer_removes_expired() { + let pool = Pool::new(true, Some(Duration::from_secs(1))); + let key = Rc::new("foo".to_string()); + + let mut pooled1 = pool.pooled(key.clone(), 41); + pooled1.idle(); + let mut pooled2 = pool.pooled(key.clone(), 5); + pooled2.idle(); + let mut pooled3 = pool.pooled(key.clone(), 99); + pooled3.idle(); + + assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3)); + ::std::thread::sleep(pool.inner.borrow().timeout.unwrap()); + + pool.clear_expired(); + assert!(pool.inner.borrow().idle.get(&key).is_none()); + } + #[test] fn test_pool_checkout_task_unparked() { let pool = Pool::new(true, Some(Duration::from_secs(10))); From 727b74797e5754af8abba8812a876c3c8fda6d94 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 28 Feb 2018 14:57:06 -0800 Subject: [PATCH 2/2] fix(client): schedule interval to clear expired idle connections Currently only works if Client is built with a `Handle`, and not a custome executor, since a `Handle` is required to create a tokio Interval. --- src/client/mod.rs | 34 +++++++++++++++------------------- src/client/pool.rs | 26 ++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index fbf1061e..7d96de50 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -90,33 +90,15 @@ impl Client { Exec::Executor(..) => panic!("Client not built with a Handle"), } } -} - -impl Client -where C: Connect, - B: Stream, - B::Item: AsRef<[u8]>, -{ - // Create a new client with a specific connector. #[inline] fn configured(config: Config, exec: Exec) -> Client { - let client = Client { + Client { connector: Rc::new(config.connector), executor: exec, h1_writev: config.h1_writev, pool: Pool::new(config.keep_alive, config.keep_alive_timeout), retry_canceled_requests: config.retry_canceled_requests, - }; - - client.schedule_pool_timer(); - - client - } - - fn schedule_pool_timer(&self) { - if let Exec::Handle(ref h) = self.executor { - self.pool.spawn_expired_interval(h); } } } @@ -135,6 +117,14 @@ where C: Connect, /// Send a constructed Request using this Client. #[inline] pub fn request(&self, mut req: Request) -> FutureResponse { + // TODO(0.12): do this at construction time. + // + // It cannot be done in the constructor because the Client::configured + // does not have `B: 'static` bounds, which are required to spawn + // the interval. In 0.12, add a static bounds to the constructor, + // and move this. + self.schedule_pool_timer(); + match req.version() { HttpVersion::Http10 | HttpVersion::Http11 => (), @@ -263,6 +253,12 @@ where C: Connect, Box::new(resp) } + + fn schedule_pool_timer(&self) { + if let Exec::Handle(ref h) = self.executor { + self.pool.spawn_expired_interval(h); + } + } } impl Service for Client diff --git a/src/client/pool.rs b/src/client/pool.rs index 12c41587..e5e12b1a 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -41,6 +41,12 @@ struct PoolInner { // connection. parked: HashMap, VecDeque>>>, timeout: Option, + // Used to prevent multiple intervals from being spawned to clear + // expired connections. + // + // TODO(0.12): Remove the need for this when Client::schedule_pool_timer + // can be done in Client::new. + expired_timer_spawned: bool, } impl Pool { @@ -51,6 +57,7 @@ impl Pool { idle: HashMap::new(), parked: HashMap::new(), timeout: timeout, + expired_timer_spawned: false, })), } } @@ -229,12 +236,17 @@ impl Pool { impl Pool { pub(super) fn spawn_expired_interval(&self, handle: &Handle) { - let inner = self.inner.borrow(); + let mut inner = self.inner.borrow_mut(); if !inner.enabled { return; } + if inner.expired_timer_spawned { + return; + } + inner.expired_timer_spawned = true; + let dur = if let Some(dur) = inner.timeout { dur } else { @@ -529,7 +541,9 @@ mod tests { #[test] fn test_pool_timer_removes_expired() { - let pool = Pool::new(true, Some(Duration::from_secs(1))); + let mut core = ::tokio::reactor::Core::new().unwrap(); + let pool = Pool::new(true, Some(Duration::from_millis(100))); + pool.spawn_expired_interval(&core.handle()); let key = Rc::new("foo".to_string()); let mut pooled1 = pool.pooled(key.clone(), 41); @@ -540,9 +554,13 @@ mod tests { pooled3.idle(); assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3)); - ::std::thread::sleep(pool.inner.borrow().timeout.unwrap()); - pool.clear_expired(); + let timeout = ::tokio::reactor::Timeout::new( + Duration::from_millis(400), // allow for too-good resolution + &core.handle() + ).unwrap(); + core.run(timeout).unwrap(); + assert!(pool.inner.borrow().idle.get(&key).is_none()); }