Merge pull request #1325 from hyperium/pool-clean-parked
fix(client): cleanup dropped pending Checkouts from Pool
This commit is contained in:
@@ -28,6 +28,7 @@ language-tags = "0.2"
|
||||
log = "0.3"
|
||||
mime = "0.3.2"
|
||||
percent-encoding = "1.0"
|
||||
relay = "0.1"
|
||||
time = "0.1"
|
||||
tokio-core = "0.1.6"
|
||||
tokio-proto = "0.1"
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::rc::Rc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures::{Future, Async, Poll};
|
||||
use futures::unsync::oneshot;
|
||||
use relay;
|
||||
|
||||
use http::{KeepAlive, KA};
|
||||
|
||||
@@ -17,8 +17,19 @@ pub struct Pool<T> {
|
||||
|
||||
struct PoolInner<T> {
|
||||
enabled: bool,
|
||||
// These are internal Conns sitting in the event loop in the KeepAlive
|
||||
// state, waiting to receive a new Request to send on the socket.
|
||||
idle: HashMap<Rc<String>, Vec<Entry<T>>>,
|
||||
parked: HashMap<Rc<String>, VecDeque<oneshot::Sender<Entry<T>>>>,
|
||||
// These are outstanding Checkouts that are waiting for a socket to be
|
||||
// able to send a Request one. This is used when "racing" for a new
|
||||
// connection.
|
||||
//
|
||||
// The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
|
||||
// for the Pool to receive an idle Conn. When a Conn becomes idle,
|
||||
// this list is checked for any parked Checkouts, and tries to notify
|
||||
// them that the Conn could be used instead of waiting for a brand new
|
||||
// connection.
|
||||
parked: HashMap<Rc<String>, VecDeque<relay::Sender<Entry<T>>>>,
|
||||
timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
@@ -50,6 +61,12 @@ impl<T: Clone> Pool<T> {
|
||||
let mut entry = Some(entry);
|
||||
if let Some(parked) = inner.parked.get_mut(&key) {
|
||||
while let Some(tx) = parked.pop_front() {
|
||||
if tx.is_canceled() {
|
||||
trace!("Pool::put removing canceled parked {:?}", key);
|
||||
} else {
|
||||
tx.complete(entry.take().unwrap());
|
||||
}
|
||||
/*
|
||||
match tx.send(entry.take().unwrap()) {
|
||||
Ok(()) => break,
|
||||
Err(e) => {
|
||||
@@ -57,6 +74,7 @@ impl<T: Clone> Pool<T> {
|
||||
entry = Some(e);
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
remove_parked = parked.is_empty();
|
||||
}
|
||||
@@ -74,6 +92,7 @@ impl<T: Clone> Pool<T> {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub fn pooled(&self, key: Rc<String>, value: T) -> Pooled<T> {
|
||||
trace!("Pool::pooled {:?}", key);
|
||||
Pooled {
|
||||
@@ -102,7 +121,7 @@ impl<T: Clone> Pool<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn park(&mut self, key: Rc<String>, tx: oneshot::Sender<Entry<T>>) {
|
||||
fn park(&mut self, key: Rc<String>, tx: relay::Sender<Entry<T>>) {
|
||||
trace!("Pool::park {:?}", key);
|
||||
self.inner.borrow_mut()
|
||||
.parked.entry(key)
|
||||
@@ -111,6 +130,24 @@ impl<T: Clone> Pool<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Pool<T> {
|
||||
fn clean_parked(&mut self, key: &Rc<String>) {
|
||||
trace!("Pool::clean_parked {:?}", key);
|
||||
let mut inner = self.inner.borrow_mut();
|
||||
|
||||
let mut remove_parked = false;
|
||||
if let Some(parked) = inner.parked.get_mut(key) {
|
||||
parked.retain(|tx| {
|
||||
!tx.is_canceled()
|
||||
});
|
||||
remove_parked = parked.is_empty();
|
||||
}
|
||||
if remove_parked {
|
||||
inner.parked.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for Pool<T> {
|
||||
fn clone(&self) -> Pool<T> {
|
||||
Pool {
|
||||
@@ -204,7 +241,7 @@ enum TimedKA {
|
||||
pub struct Checkout<T> {
|
||||
key: Rc<String>,
|
||||
pool: Pool<T>,
|
||||
parked: Option<oneshot::Receiver<Entry<T>>>,
|
||||
parked: Option<relay::Receiver<Entry<T>>>,
|
||||
}
|
||||
|
||||
impl<T: Clone> Future for Checkout<T> {
|
||||
@@ -260,7 +297,7 @@ impl<T: Clone> Future for Checkout<T> {
|
||||
Some(entry) => Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))),
|
||||
None => {
|
||||
if self.parked.is_none() {
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
let (tx, mut rx) = relay::channel();
|
||||
let _ = rx.poll(); // park this task
|
||||
self.pool.park(self.key.clone(), tx);
|
||||
self.parked = Some(rx);
|
||||
@@ -271,6 +308,13 @@ impl<T: Clone> Future for Checkout<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Checkout<T> {
|
||||
fn drop(&mut self) {
|
||||
self.parked.take();
|
||||
self.pool.clean_parked(&self.key);
|
||||
}
|
||||
}
|
||||
|
||||
struct Expiration(Option<Duration>);
|
||||
|
||||
impl Expiration {
|
||||
@@ -364,4 +408,30 @@ mod tests {
|
||||
})).map(|(entry, _)| entry);
|
||||
assert_eq!(*checkout.wait().unwrap(), *pooled1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pool_checkout_drop_cleans_up_parked() {
|
||||
future::lazy(|| {
|
||||
let pool = Pool::new(true, Some(Duration::from_secs(10)));
|
||||
let key = Rc::new("localhost:12345".to_string());
|
||||
let _pooled1 = pool.pooled(key.clone(), 41);
|
||||
let mut checkout1 = pool.checkout(&key);
|
||||
let mut checkout2 = pool.checkout(&key);
|
||||
|
||||
// first poll needed to get into Pool's parked
|
||||
checkout1.poll().unwrap();
|
||||
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1);
|
||||
checkout2.poll().unwrap();
|
||||
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 2);
|
||||
|
||||
// on drop, clean up Pool
|
||||
drop(checkout1);
|
||||
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1);
|
||||
|
||||
drop(checkout2);
|
||||
assert!(pool.inner.borrow().parked.get(&key).is_none());
|
||||
|
||||
::futures::future::ok::<(), ()>(())
|
||||
}).wait().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ extern crate language_tags;
|
||||
#[macro_use] extern crate log;
|
||||
pub extern crate mime;
|
||||
#[macro_use] extern crate percent_encoding;
|
||||
extern crate relay;
|
||||
extern crate time;
|
||||
extern crate tokio_core as tokio;
|
||||
#[macro_use] extern crate tokio_io;
|
||||
|
||||
Reference in New Issue
Block a user