refactor(client): clean up pool Checkout::poll function

This commit is contained in:
Sean McArthur
2018-02-05 11:53:17 -08:00
parent 265ad67c86
commit b7293ca6ff

View File

@@ -42,7 +42,7 @@ struct PoolInner<T> {
timeout: Option<Duration>, timeout: Option<Duration>,
} }
impl<T: Clone> Pool<T> { impl<T: Clone + Ready> Pool<T> {
pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> { pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
Pool { Pool {
inner: Rc::new(RefCell::new(PoolInner { inner: Rc::new(RefCell::new(PoolInner {
@@ -65,7 +65,6 @@ impl<T: Clone> Pool<T> {
fn put(&mut self, key: Rc<String>, entry: Entry<T>) { fn put(&mut self, key: Rc<String>, entry: Entry<T>) {
trace!("Pool::put {:?}", key); trace!("Pool::put {:?}", key);
let mut inner = self.inner.borrow_mut(); let mut inner = self.inner.borrow_mut();
//let inner = &mut *inner;
let mut remove_parked = false; let mut remove_parked = false;
let mut entry = Some(entry); let mut entry = Some(entry);
if let Some(parked) = inner.parked.get_mut(&key) { if let Some(parked) = inner.parked.get_mut(&key) {
@@ -103,6 +102,42 @@ impl<T: Clone> Pool<T> {
} }
} }
fn take(&self, key: &Rc<String>) -> Option<Pooled<T>> {
let entry = {
let mut inner = self.inner.borrow_mut();
let expiration = Expiration::new(inner.timeout);
let mut should_remove = false;
let entry = inner.idle.get_mut(key).and_then(|list| {
trace!("take; url = {:?}, expiration = {:?}", key, expiration.0);
while let Some(mut entry) = list.pop() {
match entry.status.get() {
TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => {
if let Ok(Async::Ready(())) = entry.value.poll_ready() {
should_remove = list.is_empty();
return Some(entry);
}
},
_ => {},
}
trace!("removing unacceptable pooled {:?}", key);
// every other case the Entry should just be dropped
// 1. Idle but expired
// 2. Busy (something else somehow took it?)
// 3. Disabled don't reuse of course
}
should_remove = true;
None
});
if should_remove {
inner.idle.remove(key);
}
entry
};
entry.map(|e| self.reuse(key, e))
}
pub fn pooled(&self, key: Rc<String>, value: T) -> Pooled<T> { pub fn pooled(&self, key: Rc<String>, value: T) -> Pooled<T> {
Pooled { Pooled {
@@ -120,13 +155,13 @@ impl<T: Clone> Pool<T> {
self.inner.borrow().enabled self.inner.borrow().enabled
} }
fn reuse(&self, key: Rc<String>, mut entry: Entry<T>) -> Pooled<T> { fn reuse(&self, key: &Rc<String>, mut entry: Entry<T>) -> Pooled<T> {
trace!("reuse {:?}", key); debug!("reuse idle connection for {:?}", key);
entry.is_reused = true; entry.is_reused = true;
entry.status.set(TimedKA::Busy); entry.status.set(TimedKA::Busy);
Pooled { Pooled {
entry: entry, entry: entry,
key: key, key: key.clone(),
pool: Rc::downgrade(&self.inner), pool: Rc::downgrade(&self.inner),
} }
} }
@@ -141,8 +176,11 @@ impl<T: Clone> Pool<T> {
} }
impl<T> Pool<T> { impl<T> Pool<T> {
/// Any `FutureResponse`s that were created will have made a `Checkout`,
/// and possibly inserted into the pool that it is waiting for an idle
/// connection. If a user ever dropped that future, we need to clean out
/// those parked senders.
fn clean_parked(&mut self, key: &Rc<String>) { fn clean_parked(&mut self, key: &Rc<String>) {
trace!("clean_parked {:?}", key);
let mut inner = self.inner.borrow_mut(); let mut inner = self.inner.borrow_mut();
let mut remove_parked = false; let mut remove_parked = false;
@@ -186,7 +224,7 @@ impl<T> DerefMut for Pooled<T> {
} }
} }
impl<T: Clone> KeepAlive for Pooled<T> { impl<T: Clone + Ready> KeepAlive for Pooled<T> {
fn busy(&mut self) { fn busy(&mut self) {
self.entry.status.set(TimedKA::Busy); self.entry.status.set(TimedKA::Busy);
} }
@@ -237,7 +275,7 @@ impl<T> fmt::Debug for Pooled<T> {
} }
} }
impl<T: Clone> BitAndAssign<bool> for Pooled<T> { impl<T: Clone + Ready> BitAndAssign<bool> for Pooled<T> {
fn bitand_assign(&mut self, enabled: bool) { fn bitand_assign(&mut self, enabled: bool) {
if !enabled { if !enabled {
self.disable(); self.disable();
@@ -265,67 +303,56 @@ pub struct Checkout<T> {
parked: Option<relay::Receiver<Entry<T>>>, parked: Option<relay::Receiver<Entry<T>>>,
} }
impl<T: Ready + Clone> Future for Checkout<T> { struct NotParked;
type Item = Pooled<T>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { impl<T: Clone + Ready> Checkout<T> {
trace!("Checkout::poll"); fn poll_parked(&mut self) -> Poll<Pooled<T>, NotParked> {
let mut drop_parked = false; let mut drop_parked = false;
if let Some(ref mut rx) = self.parked { if let Some(ref mut rx) = self.parked {
match rx.poll() { match rx.poll() {
Ok(Async::Ready(entry)) => { Ok(Async::Ready(mut entry)) => {
trace!("Checkout::poll found client in relay for {:?}", self.key); if let Ok(Async::Ready(())) = entry.value.poll_ready() {
return Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))); return Ok(Async::Ready(self.pool.reuse(&self.key, entry)));
}
drop_parked = true;
}, },
Ok(Async::NotReady) => (), Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_canceled) => drop_parked = true, Err(_canceled) => drop_parked = true,
} }
} }
if drop_parked { if drop_parked {
self.parked.take(); self.parked.take();
} }
let expiration = Expiration::new(self.pool.inner.borrow().timeout); Err(NotParked)
let key = &self.key; }
trace!("Checkout::poll url = {:?}, expiration = {:?}", key, expiration.0);
let mut should_remove = false;
let entry = self.pool.inner.borrow_mut().idle.get_mut(key).and_then(|list| {
trace!("Checkout::poll key found {:?}", key);
while let Some(mut entry) = list.pop() {
match entry.status.get() {
TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => {
if let Ok(Async::Ready(())) = entry.value.poll_ready() {
debug!("found idle connection for {:?}", key);
should_remove = list.is_empty();
return Some(entry);
}
},
_ => {},
}
trace!("Checkout::poll removing unacceptable pooled {:?}", key);
// every other case the Entry should just be dropped
// 1. Idle but expired
// 2. Busy (something else somehow took it?)
// 3. Disabled don't reuse of course
}
should_remove = true;
None
});
if should_remove { fn park(&mut self) {
self.pool.inner.borrow_mut().idle.remove(key); if self.parked.is_none() {
let (tx, mut rx) = relay::channel();
let _ = rx.poll(); // park this task
self.pool.park(self.key.clone(), tx);
self.parked = Some(rx);
} }
match entry { }
Some(entry) => Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))), }
None => {
if self.parked.is_none() { impl<T: Clone + Ready> Future for Checkout<T> {
let (tx, mut rx) = relay::channel(); type Item = Pooled<T>;
let _ = rx.poll(); // park this task type Error = io::Error;
self.pool.park(self.key.clone(), tx);
self.parked = Some(rx); fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
} match self.poll_parked() {
Ok(Async::NotReady) Ok(async) => return Ok(async),
}, Err(_not_parked) => (),
}
let entry = self.pool.take(&self.key);
if let Some(pooled) = entry {
Ok(Async::Ready(pooled))
} else {
self.park();
Ok(Async::NotReady)
} }
} }
} }