diff --git a/src/client/pool.rs b/src/client/pool.rs index c8ee8883..4c7d9022 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -42,7 +42,7 @@ struct PoolInner { timeout: Option, } -impl Pool { +impl Pool { pub fn new(enabled: bool, timeout: Option) -> Pool { Pool { inner: Rc::new(RefCell::new(PoolInner { @@ -65,7 +65,6 @@ impl Pool { fn put(&mut self, key: Rc, entry: Entry) { trace!("Pool::put {:?}", key); let mut inner = self.inner.borrow_mut(); - //let inner = &mut *inner; let mut remove_parked = false; let mut entry = Some(entry); if let Some(parked) = inner.parked.get_mut(&key) { @@ -103,6 +102,42 @@ impl Pool { } } + fn take(&self, key: &Rc) -> Option> { + let entry = { + let mut inner = self.inner.borrow_mut(); + let expiration = Expiration::new(inner.timeout); + let mut should_remove = false; + let entry = inner.idle.get_mut(key).and_then(|list| { + trace!("take; url = {:?}, expiration = {:?}", key, expiration.0); + while let Some(mut entry) = list.pop() { + match entry.status.get() { + TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => { + if let Ok(Async::Ready(())) = entry.value.poll_ready() { + should_remove = list.is_empty(); + return Some(entry); + } + }, + _ => {}, + } + trace!("removing unacceptable pooled {:?}", key); + // every other case the Entry should just be dropped + // 1. Idle but expired + // 2. Busy (something else somehow took it?) + // 3. Disabled don't reuse of course + } + should_remove = true; + None + }); + + if should_remove { + inner.idle.remove(key); + } + entry + }; + + entry.map(|e| self.reuse(key, e)) + } + pub fn pooled(&self, key: Rc, value: T) -> Pooled { Pooled { @@ -120,13 +155,13 @@ impl Pool { self.inner.borrow().enabled } - fn reuse(&self, key: Rc, mut entry: Entry) -> Pooled { - trace!("reuse {:?}", key); + fn reuse(&self, key: &Rc, mut entry: Entry) -> Pooled { + debug!("reuse idle connection for {:?}", key); entry.is_reused = true; entry.status.set(TimedKA::Busy); Pooled { entry: entry, - key: key, + key: key.clone(), pool: Rc::downgrade(&self.inner), } } @@ -141,8 +176,11 @@ impl Pool { } impl Pool { + /// Any `FutureResponse`s that were created will have made a `Checkout`, + /// and possibly inserted into the pool that it is waiting for an idle + /// connection. If a user ever dropped that future, we need to clean out + /// those parked senders. fn clean_parked(&mut self, key: &Rc) { - trace!("clean_parked {:?}", key); let mut inner = self.inner.borrow_mut(); let mut remove_parked = false; @@ -186,7 +224,7 @@ impl DerefMut for Pooled { } } -impl KeepAlive for Pooled { +impl KeepAlive for Pooled { fn busy(&mut self) { self.entry.status.set(TimedKA::Busy); } @@ -237,7 +275,7 @@ impl fmt::Debug for Pooled { } } -impl BitAndAssign for Pooled { +impl BitAndAssign for Pooled { fn bitand_assign(&mut self, enabled: bool) { if !enabled { self.disable(); @@ -265,67 +303,56 @@ pub struct Checkout { parked: Option>>, } -impl Future for Checkout { - type Item = Pooled; - type Error = io::Error; +struct NotParked; - fn poll(&mut self) -> Poll { - trace!("Checkout::poll"); +impl Checkout { + fn poll_parked(&mut self) -> Poll, NotParked> { let mut drop_parked = false; if let Some(ref mut rx) = self.parked { match rx.poll() { - Ok(Async::Ready(entry)) => { - trace!("Checkout::poll found client in relay for {:?}", self.key); - return Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))); + Ok(Async::Ready(mut entry)) => { + if let Ok(Async::Ready(())) = entry.value.poll_ready() { + return Ok(Async::Ready(self.pool.reuse(&self.key, entry))); + } + drop_parked = true; }, - Ok(Async::NotReady) => (), + Ok(Async::NotReady) => return Ok(Async::NotReady), Err(_canceled) => drop_parked = true, } } if drop_parked { self.parked.take(); } - let expiration = Expiration::new(self.pool.inner.borrow().timeout); - let key = &self.key; - trace!("Checkout::poll url = {:?}, expiration = {:?}", key, expiration.0); - let mut should_remove = false; - let entry = self.pool.inner.borrow_mut().idle.get_mut(key).and_then(|list| { - trace!("Checkout::poll key found {:?}", key); - while let Some(mut entry) = list.pop() { - match entry.status.get() { - TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => { - if let Ok(Async::Ready(())) = entry.value.poll_ready() { - debug!("found idle connection for {:?}", key); - should_remove = list.is_empty(); - return Some(entry); - } - }, - _ => {}, - } - trace!("Checkout::poll removing unacceptable pooled {:?}", key); - // every other case the Entry should just be dropped - // 1. Idle but expired - // 2. Busy (something else somehow took it?) - // 3. Disabled don't reuse of course - } - should_remove = true; - None - }); + Err(NotParked) + } - if should_remove { - self.pool.inner.borrow_mut().idle.remove(key); + fn park(&mut self) { + if self.parked.is_none() { + let (tx, mut rx) = relay::channel(); + let _ = rx.poll(); // park this task + self.pool.park(self.key.clone(), tx); + self.parked = Some(rx); } - match entry { - Some(entry) => Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))), - None => { - if self.parked.is_none() { - let (tx, mut rx) = relay::channel(); - let _ = rx.poll(); // park this task - self.pool.park(self.key.clone(), tx); - self.parked = Some(rx); - } - Ok(Async::NotReady) - }, + } +} + +impl Future for Checkout { + type Item = Pooled; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + match self.poll_parked() { + Ok(async) => return Ok(async), + Err(_not_parked) => (), + } + + let entry = self.pool.take(&self.key); + + if let Some(pooled) = entry { + Ok(Async::Ready(pooled)) + } else { + self.park(); + Ok(Async::NotReady) } } }