fix(client): schedule interval to clear expired idle connections

Currently only works if Client is built with a `Handle`, and not a
custome executor, since a `Handle` is required to create a tokio
Interval.
This commit is contained in:
Sean McArthur
2018-02-28 14:57:06 -08:00
parent 1223fc28ee
commit 727b74797e
2 changed files with 37 additions and 23 deletions

View File

@@ -90,33 +90,15 @@ impl<C, B> Client<C, B> {
Exec::Executor(..) => panic!("Client not built with a Handle"), Exec::Executor(..) => panic!("Client not built with a Handle"),
} }
} }
}
impl<C, B> Client<C, B>
where C: Connect,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
// Create a new client with a specific connector.
#[inline] #[inline]
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> { fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> {
let client = Client { Client {
connector: Rc::new(config.connector), connector: Rc::new(config.connector),
executor: exec, executor: exec,
h1_writev: config.h1_writev, h1_writev: config.h1_writev,
pool: Pool::new(config.keep_alive, config.keep_alive_timeout), pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
retry_canceled_requests: config.retry_canceled_requests, retry_canceled_requests: config.retry_canceled_requests,
};
client.schedule_pool_timer();
client
}
fn schedule_pool_timer(&self) {
if let Exec::Handle(ref h) = self.executor {
self.pool.spawn_expired_interval(h);
} }
} }
} }
@@ -135,6 +117,14 @@ where C: Connect,
/// Send a constructed Request using this Client. /// Send a constructed Request using this Client.
#[inline] #[inline]
pub fn request(&self, mut req: Request<B>) -> FutureResponse { pub fn request(&self, mut req: Request<B>) -> FutureResponse {
// TODO(0.12): do this at construction time.
//
// It cannot be done in the constructor because the Client::configured
// does not have `B: 'static` bounds, which are required to spawn
// the interval. In 0.12, add a static bounds to the constructor,
// and move this.
self.schedule_pool_timer();
match req.version() { match req.version() {
HttpVersion::Http10 | HttpVersion::Http10 |
HttpVersion::Http11 => (), HttpVersion::Http11 => (),
@@ -263,6 +253,12 @@ where C: Connect,
Box::new(resp) Box::new(resp)
} }
fn schedule_pool_timer(&self) {
if let Exec::Handle(ref h) = self.executor {
self.pool.spawn_expired_interval(h);
}
}
} }
impl<C, B> Service for Client<C, B> impl<C, B> Service for Client<C, B>

View File

@@ -41,6 +41,12 @@ struct PoolInner<T> {
// connection. // connection.
parked: HashMap<Rc<String>, VecDeque<relay::Sender<Entry<T>>>>, parked: HashMap<Rc<String>, VecDeque<relay::Sender<Entry<T>>>>,
timeout: Option<Duration>, timeout: Option<Duration>,
// Used to prevent multiple intervals from being spawned to clear
// expired connections.
//
// TODO(0.12): Remove the need for this when Client::schedule_pool_timer
// can be done in Client::new.
expired_timer_spawned: bool,
} }
impl<T: Clone + Ready> Pool<T> { impl<T: Clone + Ready> Pool<T> {
@@ -51,6 +57,7 @@ impl<T: Clone + Ready> Pool<T> {
idle: HashMap::new(), idle: HashMap::new(),
parked: HashMap::new(), parked: HashMap::new(),
timeout: timeout, timeout: timeout,
expired_timer_spawned: false,
})), })),
} }
} }
@@ -229,12 +236,17 @@ impl<T> Pool<T> {
impl<T: 'static> Pool<T> { impl<T: 'static> Pool<T> {
pub(super) fn spawn_expired_interval(&self, handle: &Handle) { pub(super) fn spawn_expired_interval(&self, handle: &Handle) {
let inner = self.inner.borrow(); let mut inner = self.inner.borrow_mut();
if !inner.enabled { if !inner.enabled {
return; return;
} }
if inner.expired_timer_spawned {
return;
}
inner.expired_timer_spawned = true;
let dur = if let Some(dur) = inner.timeout { let dur = if let Some(dur) = inner.timeout {
dur dur
} else { } else {
@@ -529,7 +541,9 @@ mod tests {
#[test] #[test]
fn test_pool_timer_removes_expired() { fn test_pool_timer_removes_expired() {
let pool = Pool::new(true, Some(Duration::from_secs(1))); let mut core = ::tokio::reactor::Core::new().unwrap();
let pool = Pool::new(true, Some(Duration::from_millis(100)));
pool.spawn_expired_interval(&core.handle());
let key = Rc::new("foo".to_string()); let key = Rc::new("foo".to_string());
let mut pooled1 = pool.pooled(key.clone(), 41); let mut pooled1 = pool.pooled(key.clone(), 41);
@@ -540,9 +554,13 @@ mod tests {
pooled3.idle(); pooled3.idle();
assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3)); assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3));
::std::thread::sleep(pool.inner.borrow().timeout.unwrap());
pool.clear_expired(); let timeout = ::tokio::reactor::Timeout::new(
Duration::from_millis(400), // allow for too-good resolution
&core.handle()
).unwrap();
core.run(timeout).unwrap();
assert!(pool.inner.borrow().idle.get(&key).is_none()); assert!(pool.inner.borrow().idle.get(&key).is_none());
} }