From 1223fc28eecffbe206f3807f574b720625859384 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 28 Feb 2018 13:57:35 -0800 Subject: [PATCH] wip --- src/client/mod.rs | 21 +++++++++- src/client/pool.rs | 99 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 116 insertions(+), 4 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index b026e5cb..fbf1061e 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -90,16 +90,33 @@ impl Client { Exec::Executor(..) => panic!("Client not built with a Handle"), } } +} - /// Create a new client with a specific connector. + +impl Client +where C: Connect, + B: Stream, + B::Item: AsRef<[u8]>, +{ + // Create a new client with a specific connector. #[inline] fn configured(config: Config, exec: Exec) -> Client { - Client { + let client = Client { connector: Rc::new(config.connector), executor: exec, h1_writev: config.h1_writev, pool: Pool::new(config.keep_alive, config.keep_alive_timeout), retry_canceled_requests: config.retry_canceled_requests, + }; + + client.schedule_pool_timer(); + + client + } + + fn schedule_pool_timer(&self) { + if let Exec::Handle(ref h) = self.executor { + self.pool.spawn_expired_interval(h); } } } diff --git a/src/client/pool.rs b/src/client/pool.rs index 095b79ad..12c41587 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -6,7 +6,8 @@ use std::ops::{Deref, DerefMut, BitAndAssign}; use std::rc::{Rc, Weak}; use std::time::{Duration, Instant}; -use futures::{Future, Async, Poll}; +use futures::{Future, Async, Poll, Stream}; +use tokio::reactor::{Handle, Interval}; use relay; use proto::{KeepAlive, KA}; @@ -194,6 +195,59 @@ impl Pool { inner.parked.remove(key); } } + + fn clear_expired(&self) { + let mut inner = self.inner.borrow_mut(); + + let dur = if let Some(dur) = inner.timeout { + dur + } else { + return + }; + + let now = Instant::now(); + //self.last_idle_check_at = now; + + inner.idle.retain(|_key, values| { + + values.retain(|val| { + match val.status.get() { + TimedKA::Idle(idle_at) if now - idle_at < dur => { + true + }, + _ => false, + } + //now - val.idle_at < dur + }); + + // returning false evicts this key/val + !values.is_empty() + }); + } +} + + +impl Pool { + pub(super) fn spawn_expired_interval(&self, handle: &Handle) { + let inner = self.inner.borrow(); + + if !inner.enabled { + return; + } + + let dur = if let Some(dur) = inner.timeout { + dur + } else { + return + }; + + let interval = Interval::new(dur, handle) + .expect("reactor is gone"); + handle.spawn(IdleInterval { + interval: interval, + pool: Rc::downgrade(&self.inner), + }); + } } impl Clone for Pool { @@ -385,6 +439,28 @@ impl Expiration { } } +struct IdleInterval { + interval: Interval, + pool: Weak>>, +} + +impl Future for IdleInterval { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + loop { + try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error"))); + + if let Some(inner) = self.pool.upgrade() { + let pool = Pool { inner: inner }; + pool.clear_expired(); + } else { + return Ok(Async::Ready(())); + } + } + } +} #[cfg(test)] mod tests { @@ -428,7 +504,7 @@ mod tests { } #[test] - fn test_pool_removes_expired() { + fn test_pool_checkout_removes_expired() { let pool = Pool::new(true, Some(Duration::from_secs(1))); let key = Rc::new("foo".to_string()); @@ -451,6 +527,25 @@ mod tests { assert!(pool.inner.borrow().idle.get(&key).is_none()); } + #[test] + fn test_pool_timer_removes_expired() { + let pool = Pool::new(true, Some(Duration::from_secs(1))); + let key = Rc::new("foo".to_string()); + + let mut pooled1 = pool.pooled(key.clone(), 41); + pooled1.idle(); + let mut pooled2 = pool.pooled(key.clone(), 5); + pooled2.idle(); + let mut pooled3 = pool.pooled(key.clone(), 99); + pooled3.idle(); + + assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3)); + ::std::thread::sleep(pool.inner.borrow().timeout.unwrap()); + + pool.clear_expired(); + assert!(pool.inner.borrow().idle.get(&key).is_none()); + } + #[test] fn test_pool_checkout_task_unparked() { let pool = Pool::new(true, Some(Duration::from_secs(10)));