wip
This commit is contained in:
@@ -90,16 +90,33 @@ impl<C, B> Client<C, B> {
|
|||||||
Exec::Executor(..) => panic!("Client not built with a Handle"),
|
Exec::Executor(..) => panic!("Client not built with a Handle"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Create a new client with a specific connector.
|
|
||||||
|
impl<C, B> Client<C, B>
|
||||||
|
where C: Connect,
|
||||||
|
B: Stream<Error=::Error>,
|
||||||
|
B::Item: AsRef<[u8]>,
|
||||||
|
{
|
||||||
|
// Create a new client with a specific connector.
|
||||||
#[inline]
|
#[inline]
|
||||||
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> {
|
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> {
|
||||||
Client {
|
let client = Client {
|
||||||
connector: Rc::new(config.connector),
|
connector: Rc::new(config.connector),
|
||||||
executor: exec,
|
executor: exec,
|
||||||
h1_writev: config.h1_writev,
|
h1_writev: config.h1_writev,
|
||||||
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
|
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
|
||||||
retry_canceled_requests: config.retry_canceled_requests,
|
retry_canceled_requests: config.retry_canceled_requests,
|
||||||
|
};
|
||||||
|
|
||||||
|
client.schedule_pool_timer();
|
||||||
|
|
||||||
|
client
|
||||||
|
}
|
||||||
|
|
||||||
|
fn schedule_pool_timer(&self) {
|
||||||
|
if let Exec::Handle(ref h) = self.executor {
|
||||||
|
self.pool.spawn_expired_interval(h);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,8 @@ use std::ops::{Deref, DerefMut, BitAndAssign};
|
|||||||
use std::rc::{Rc, Weak};
|
use std::rc::{Rc, Weak};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use futures::{Future, Async, Poll};
|
use futures::{Future, Async, Poll, Stream};
|
||||||
|
use tokio::reactor::{Handle, Interval};
|
||||||
use relay;
|
use relay;
|
||||||
|
|
||||||
use proto::{KeepAlive, KA};
|
use proto::{KeepAlive, KA};
|
||||||
@@ -194,6 +195,59 @@ impl<T> Pool<T> {
|
|||||||
inner.parked.remove(key);
|
inner.parked.remove(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn clear_expired(&self) {
|
||||||
|
let mut inner = self.inner.borrow_mut();
|
||||||
|
|
||||||
|
let dur = if let Some(dur) = inner.timeout {
|
||||||
|
dur
|
||||||
|
} else {
|
||||||
|
return
|
||||||
|
};
|
||||||
|
|
||||||
|
let now = Instant::now();
|
||||||
|
//self.last_idle_check_at = now;
|
||||||
|
|
||||||
|
inner.idle.retain(|_key, values| {
|
||||||
|
|
||||||
|
values.retain(|val| {
|
||||||
|
match val.status.get() {
|
||||||
|
TimedKA::Idle(idle_at) if now - idle_at < dur => {
|
||||||
|
true
|
||||||
|
},
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
//now - val.idle_at < dur
|
||||||
|
});
|
||||||
|
|
||||||
|
// returning false evicts this key/val
|
||||||
|
!values.is_empty()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl<T: 'static> Pool<T> {
|
||||||
|
pub(super) fn spawn_expired_interval(&self, handle: &Handle) {
|
||||||
|
let inner = self.inner.borrow();
|
||||||
|
|
||||||
|
if !inner.enabled {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let dur = if let Some(dur) = inner.timeout {
|
||||||
|
dur
|
||||||
|
} else {
|
||||||
|
return
|
||||||
|
};
|
||||||
|
|
||||||
|
let interval = Interval::new(dur, handle)
|
||||||
|
.expect("reactor is gone");
|
||||||
|
handle.spawn(IdleInterval {
|
||||||
|
interval: interval,
|
||||||
|
pool: Rc::downgrade(&self.inner),
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Clone for Pool<T> {
|
impl<T> Clone for Pool<T> {
|
||||||
@@ -385,6 +439,28 @@ impl Expiration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct IdleInterval<T> {
|
||||||
|
interval: Interval,
|
||||||
|
pool: Weak<RefCell<PoolInner<T>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: 'static> Future for IdleInterval<T> {
|
||||||
|
type Item = ();
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
loop {
|
||||||
|
try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error")));
|
||||||
|
|
||||||
|
if let Some(inner) = self.pool.upgrade() {
|
||||||
|
let pool = Pool { inner: inner };
|
||||||
|
pool.clear_expired();
|
||||||
|
} else {
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
@@ -428,7 +504,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_pool_removes_expired() {
|
fn test_pool_checkout_removes_expired() {
|
||||||
let pool = Pool::new(true, Some(Duration::from_secs(1)));
|
let pool = Pool::new(true, Some(Duration::from_secs(1)));
|
||||||
let key = Rc::new("foo".to_string());
|
let key = Rc::new("foo".to_string());
|
||||||
|
|
||||||
@@ -451,6 +527,25 @@ mod tests {
|
|||||||
assert!(pool.inner.borrow().idle.get(&key).is_none());
|
assert!(pool.inner.borrow().idle.get(&key).is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pool_timer_removes_expired() {
|
||||||
|
let pool = Pool::new(true, Some(Duration::from_secs(1)));
|
||||||
|
let key = Rc::new("foo".to_string());
|
||||||
|
|
||||||
|
let mut pooled1 = pool.pooled(key.clone(), 41);
|
||||||
|
pooled1.idle();
|
||||||
|
let mut pooled2 = pool.pooled(key.clone(), 5);
|
||||||
|
pooled2.idle();
|
||||||
|
let mut pooled3 = pool.pooled(key.clone(), 99);
|
||||||
|
pooled3.idle();
|
||||||
|
|
||||||
|
assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3));
|
||||||
|
::std::thread::sleep(pool.inner.borrow().timeout.unwrap());
|
||||||
|
|
||||||
|
pool.clear_expired();
|
||||||
|
assert!(pool.inner.borrow().idle.get(&key).is_none());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_pool_checkout_task_unparked() {
|
fn test_pool_checkout_task_unparked() {
|
||||||
let pool = Pool::new(true, Some(Duration::from_secs(10)));
|
let pool = Pool::new(true, Some(Duration::from_secs(10)));
|
||||||
|
|||||||
Reference in New Issue
Block a user