fix(client): fix a rare connection pool race condition
It's possible for `PoolInner::put` to happen between `Pool::take` and `Pool::waiter`. This merges `take` and `waiter` into using the same lock.
This commit is contained in:
@@ -198,41 +198,6 @@ impl<T: Poolable> Pool<T> {
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
fn take(&self, key: &Key) -> Option<Pooled<T>> {
|
||||
let entry = {
|
||||
let mut inner = self.inner.as_ref()?.lock().unwrap();
|
||||
let expiration = Expiration::new(inner.timeout);
|
||||
let maybe_entry = inner.idle.get_mut(key)
|
||||
.and_then(|list| {
|
||||
trace!("take? {:?}: expiration = {:?}", key, expiration.0);
|
||||
// A block to end the mutable borrow on list,
|
||||
// so the map below can check is_empty()
|
||||
{
|
||||
let popper = IdlePopper {
|
||||
key,
|
||||
list,
|
||||
};
|
||||
popper.pop(&expiration)
|
||||
}
|
||||
.map(|e| (e, list.is_empty()))
|
||||
});
|
||||
|
||||
let (entry, empty) = if let Some((e, empty)) = maybe_entry {
|
||||
(Some(e), empty)
|
||||
} else {
|
||||
// No entry found means nuke the list for sure.
|
||||
(None, true)
|
||||
};
|
||||
if empty {
|
||||
//TODO: This could be done with the HashMap::entry API instead.
|
||||
inner.idle.remove(key);
|
||||
}
|
||||
entry
|
||||
};
|
||||
|
||||
entry.map(|e| self.reuse(key, e.value))
|
||||
}
|
||||
|
||||
pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
|
||||
let (value, pool_ref) = if let Some(ref enabled) = self.inner {
|
||||
match value.reserve() {
|
||||
@@ -296,23 +261,6 @@ impl<T: Poolable> Pool<T> {
|
||||
value: Some(value),
|
||||
}
|
||||
}
|
||||
|
||||
fn waiter(&self, key: Key, tx: oneshot::Sender<T>) {
|
||||
debug_assert!(
|
||||
self.is_enabled(),
|
||||
"pool.waiter() should not be called if disabled",
|
||||
);
|
||||
trace!("checkout waiting for idle connection: {:?}", key);
|
||||
self.inner
|
||||
.as_ref()
|
||||
.expect("pool.waiter() expects pool is enabled")
|
||||
.lock()
|
||||
.unwrap()
|
||||
.waiters
|
||||
.entry(key)
|
||||
.or_insert(VecDeque::new())
|
||||
.push_back(tx);
|
||||
}
|
||||
}
|
||||
|
||||
/// Pop off this list, looking for a usable connection that hasn't expired.
|
||||
@@ -643,15 +591,54 @@ impl<T: Poolable> Checkout<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn add_waiter(&mut self) {
|
||||
debug_assert!(self.pool.is_enabled());
|
||||
fn checkout(&mut self) -> Option<Pooled<T>> {
|
||||
let entry = {
|
||||
let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
|
||||
let expiration = Expiration::new(inner.timeout);
|
||||
let maybe_entry = inner.idle.get_mut(&self.key)
|
||||
.and_then(|list| {
|
||||
trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
|
||||
// A block to end the mutable borrow on list,
|
||||
// so the map below can check is_empty()
|
||||
{
|
||||
let popper = IdlePopper {
|
||||
key: &self.key,
|
||||
list,
|
||||
};
|
||||
popper.pop(&expiration)
|
||||
}
|
||||
.map(|e| (e, list.is_empty()))
|
||||
});
|
||||
|
||||
if self.waiter.is_none() {
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
let _ = rx.poll(); // park this task
|
||||
self.pool.waiter(self.key.clone(), tx);
|
||||
self.waiter = Some(rx);
|
||||
}
|
||||
let (entry, empty) = if let Some((e, empty)) = maybe_entry {
|
||||
(Some(e), empty)
|
||||
} else {
|
||||
// No entry found means nuke the list for sure.
|
||||
(None, true)
|
||||
};
|
||||
if empty {
|
||||
//TODO: This could be done with the HashMap::entry API instead.
|
||||
inner.idle.remove(&self.key);
|
||||
}
|
||||
|
||||
if entry.is_none() && self.waiter.is_none() {
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
let _ = rx.poll(); // park this task
|
||||
|
||||
trace!("checkout waiting for idle connection: {:?}", self.key);
|
||||
inner
|
||||
.waiters
|
||||
.entry(self.key.clone())
|
||||
.or_insert(VecDeque::new())
|
||||
.push_back(tx);
|
||||
|
||||
self.waiter = Some(rx);
|
||||
}
|
||||
|
||||
entry
|
||||
};
|
||||
|
||||
entry.map(|e| self.pool.reuse(&self.key, e.value))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -664,14 +651,11 @@ impl<T: Poolable> Future for Checkout<T> {
|
||||
return Ok(Async::Ready(pooled));
|
||||
}
|
||||
|
||||
let entry = self.pool.take(&self.key);
|
||||
|
||||
if let Some(pooled) = entry {
|
||||
if let Some(pooled) = self.checkout() {
|
||||
Ok(Async::Ready(pooled))
|
||||
} else if !self.pool.is_enabled() {
|
||||
Err(::Error::new_canceled(Some("pool is disabled")))
|
||||
} else {
|
||||
self.add_waiter();
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user