fix(client): cleanup dropped pending Checkouts from Pool

Closes #1315
This commit is contained in:
Sean McArthur
2017-09-18 13:29:25 -07:00
parent 971864c424
commit 3b91fc65b2
3 changed files with 77 additions and 5 deletions

View File

@@ -28,6 +28,7 @@ language-tags = "0.2"
log = "0.3"
mime = "0.3.2"
percent-encoding = "1.0"
relay = "0.1"
time = "0.1"
tokio-core = "0.1.6"
tokio-proto = "0.1"

View File

@@ -7,7 +7,7 @@ use std::rc::Rc;
use std::time::{Duration, Instant};
use futures::{Future, Async, Poll};
use futures::unsync::oneshot;
use relay;
use http::{KeepAlive, KA};
@@ -17,8 +17,19 @@ pub struct Pool<T> {
struct PoolInner<T> {
enabled: bool,
// These are internal Conns sitting in the event loop in the KeepAlive
// state, waiting to receive a new Request to send on the socket.
idle: HashMap<Rc<String>, Vec<Entry<T>>>,
parked: HashMap<Rc<String>, VecDeque<oneshot::Sender<Entry<T>>>>,
// These are outstanding Checkouts that are waiting for a socket to be
// able to send a Request one. This is used when "racing" for a new
// connection.
//
// The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
// for the Pool to receive an idle Conn. When a Conn becomes idle,
// this list is checked for any parked Checkouts, and tries to notify
// them that the Conn could be used instead of waiting for a brand new
// connection.
parked: HashMap<Rc<String>, VecDeque<relay::Sender<Entry<T>>>>,
timeout: Option<Duration>,
}
@@ -50,6 +61,12 @@ impl<T: Clone> Pool<T> {
let mut entry = Some(entry);
if let Some(parked) = inner.parked.get_mut(&key) {
while let Some(tx) = parked.pop_front() {
if tx.is_canceled() {
trace!("Pool::put removing canceled parked {:?}", key);
} else {
tx.complete(entry.take().unwrap());
}
/*
match tx.send(entry.take().unwrap()) {
Ok(()) => break,
Err(e) => {
@@ -57,6 +74,7 @@ impl<T: Clone> Pool<T> {
entry = Some(e);
}
}
*/
}
remove_parked = parked.is_empty();
}
@@ -74,6 +92,7 @@ impl<T: Clone> Pool<T> {
}
}
pub fn pooled(&self, key: Rc<String>, value: T) -> Pooled<T> {
trace!("Pool::pooled {:?}", key);
Pooled {
@@ -102,7 +121,7 @@ impl<T: Clone> Pool<T> {
}
}
fn park(&mut self, key: Rc<String>, tx: oneshot::Sender<Entry<T>>) {
fn park(&mut self, key: Rc<String>, tx: relay::Sender<Entry<T>>) {
trace!("Pool::park {:?}", key);
self.inner.borrow_mut()
.parked.entry(key)
@@ -111,6 +130,24 @@ impl<T: Clone> Pool<T> {
}
}
impl<T> Pool<T> {
fn clean_parked(&mut self, key: &Rc<String>) {
trace!("Pool::clean_parked {:?}", key);
let mut inner = self.inner.borrow_mut();
let mut remove_parked = false;
if let Some(parked) = inner.parked.get_mut(key) {
parked.retain(|tx| {
!tx.is_canceled()
});
remove_parked = parked.is_empty();
}
if remove_parked {
inner.parked.remove(key);
}
}
}
impl<T> Clone for Pool<T> {
fn clone(&self) -> Pool<T> {
Pool {
@@ -204,7 +241,7 @@ enum TimedKA {
pub struct Checkout<T> {
key: Rc<String>,
pool: Pool<T>,
parked: Option<oneshot::Receiver<Entry<T>>>,
parked: Option<relay::Receiver<Entry<T>>>,
}
impl<T: Clone> Future for Checkout<T> {
@@ -260,7 +297,7 @@ impl<T: Clone> Future for Checkout<T> {
Some(entry) => Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))),
None => {
if self.parked.is_none() {
let (tx, mut rx) = oneshot::channel();
let (tx, mut rx) = relay::channel();
let _ = rx.poll(); // park this task
self.pool.park(self.key.clone(), tx);
self.parked = Some(rx);
@@ -271,6 +308,13 @@ impl<T: Clone> Future for Checkout<T> {
}
}
impl<T> Drop for Checkout<T> {
fn drop(&mut self) {
self.parked.take();
self.pool.clean_parked(&self.key);
}
}
struct Expiration(Option<Duration>);
impl Expiration {
@@ -364,4 +408,30 @@ mod tests {
})).map(|(entry, _)| entry);
assert_eq!(*checkout.wait().unwrap(), *pooled1);
}
#[test]
fn test_pool_checkout_drop_cleans_up_parked() {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_secs(10)));
let key = Rc::new("localhost:12345".to_string());
let _pooled1 = pool.pooled(key.clone(), 41);
let mut checkout1 = pool.checkout(&key);
let mut checkout2 = pool.checkout(&key);
// first poll needed to get into Pool's parked
checkout1.poll().unwrap();
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1);
checkout2.poll().unwrap();
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 2);
// on drop, clean up Pool
drop(checkout1);
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1);
drop(checkout2);
assert!(pool.inner.borrow().parked.get(&key).is_none());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
}
}

View File

@@ -26,6 +26,7 @@ extern crate language_tags;
#[macro_use] extern crate log;
pub extern crate mime;
#[macro_use] extern crate percent_encoding;
extern crate relay;
extern crate time;
extern crate tokio_core as tokio;
#[macro_use] extern crate tokio_io;