fix(client): prevent a checkout loop of pooled connections that aren't ready yet

This commit is contained in:
Sean McArthur
2018-04-30 14:06:12 -07:00
parent 11c92ff467
commit 5e3b43af09
4 changed files with 30 additions and 29 deletions

View File

@@ -107,7 +107,7 @@ impl<T, U> Sender<T, U> {
impl<T, U> UnboundedSender<T, U> { impl<T, U> UnboundedSender<T, U> {
pub fn is_ready(&self) -> bool { pub fn is_ready(&self) -> bool {
self.giver.is_wanting() !self.giver.is_canceled()
} }
pub fn is_closed(&self) -> bool { pub fn is_closed(&self) -> bool {

View File

@@ -318,6 +318,15 @@ where C: Connect + Sync + 'static,
Ok(()) Ok(())
}) })
); );
} else {
// There's no body to delay, but the connection isn't
// ready yet. Only re-insert when it's ready
executor.execute(
future::poll_fn(move || {
pooled.poll_ready()
})
.then(|_| Ok(()))
);
} }
Ok(res) Ok(res)
}); });
@@ -441,6 +450,13 @@ impl<B> PoolClient<B> {
PoolTx::Http2(ref tx) => tx.is_ready(), PoolTx::Http2(ref tx) => tx.is_ready(),
} }
} }
fn is_closed(&self) -> bool {
match self.tx {
PoolTx::Http1(ref tx) => tx.is_closed(),
PoolTx::Http2(ref tx) => tx.is_closed(),
}
}
} }
impl<B: Payload + 'static> PoolClient<B> { impl<B: Payload + 'static> PoolClient<B> {
@@ -460,10 +476,10 @@ impl<B> Poolable for PoolClient<B>
where where
B: 'static, B: 'static,
{ {
fn is_closed(&self) -> bool { fn is_open(&self) -> bool {
match self.tx { match self.tx {
PoolTx::Http1(ref tx) => tx.is_closed(), PoolTx::Http1(ref tx) => tx.is_ready(),
PoolTx::Http2(ref tx) => tx.is_closed(), PoolTx::Http2(ref tx) => tx.is_ready(),
} }
} }

View File

@@ -21,7 +21,7 @@ pub(super) struct Pool<T> {
// //
// See https://github.com/hyperium/hyper/issues/1429 // See https://github.com/hyperium/hyper/issues/1429
pub(super) trait Poolable: Sized { pub(super) trait Poolable: Sized {
fn is_closed(&self) -> bool; fn is_open(&self) -> bool;
/// Reserve this connection. /// Reserve this connection.
/// ///
/// Allows for HTTP/2 to return a shared reservation. /// Allows for HTTP/2 to return a shared reservation.
@@ -236,7 +236,7 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
while let Some(entry) = self.list.pop() { while let Some(entry) = self.list.pop() {
// If the connection has been closed, or is older than our idle // If the connection has been closed, or is older than our idle
// timeout, simply drop it and keep looking... // timeout, simply drop it and keep looking...
if entry.value.is_closed() { if !entry.value.is_open() {
trace!("removing closed connection for {:?}", self.key); trace!("removing closed connection for {:?}", self.key);
continue; continue;
} }
@@ -377,7 +377,7 @@ impl<T: Poolable> PoolInner<T> {
self.idle.retain(|key, values| { self.idle.retain(|key, values| {
values.retain(|entry| { values.retain(|entry| {
if entry.value.is_closed() { if !entry.value.is_open() {
trace!("idle interval evicting closed for {:?}", key); trace!("idle interval evicting closed for {:?}", key);
return false; return false;
} }
@@ -475,7 +475,7 @@ impl<T: Poolable> DerefMut for Pooled<T> {
impl<T: Poolable> Drop for Pooled<T> { impl<T: Poolable> Drop for Pooled<T> {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(value) = self.value.take() { if let Some(value) = self.value.take() {
if value.is_closed() { if !value.is_open() {
// If we *already* know the connection is done here, // If we *already* know the connection is done here,
// it shouldn't be re-inserted back into the pool. // it shouldn't be re-inserted back into the pool.
return; return;
@@ -519,7 +519,7 @@ impl<T: Poolable> Checkout<T> {
if let Some(mut rx) = self.waiter.take() { if let Some(mut rx) = self.waiter.take() {
match rx.poll() { match rx.poll() {
Ok(Async::Ready(value)) => { Ok(Async::Ready(value)) => {
if !value.is_closed() { if value.is_open() {
Ok(Async::Ready(Some(self.pool.reuse(&self.key, value)))) Ok(Async::Ready(Some(self.pool.reuse(&self.key, value))))
} else { } else {
Err(::Error::new_canceled(Some(CANCELED))) Err(::Error::new_canceled(Some(CANCELED)))
@@ -662,8 +662,8 @@ mod tests {
struct Uniq<T>(T); struct Uniq<T>(T);
impl<T> Poolable for Uniq<T> { impl<T> Poolable for Uniq<T> {
fn is_closed(&self) -> bool { fn is_open(&self) -> bool {
false true
} }
fn reserve(self) -> Reservation<Self> { fn reserve(self) -> Reservation<Self> {
@@ -671,21 +671,6 @@ mod tests {
} }
} }
/*
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
struct Share<T>(T);
impl<T> Poolable for Share<T> {
fn is_closed(&self) -> bool {
false
}
fn reserve(self) -> Reservation<Self> {
Reservation::Shared(self.clone(), self)
}
}
*/
fn c<T: Poolable>(key: Key) -> Connecting<T> { fn c<T: Poolable>(key: Key) -> Connecting<T> {
Connecting { Connecting {
key, key,
@@ -817,8 +802,8 @@ mod tests {
} }
impl Poolable for CanClose { impl Poolable for CanClose {
fn is_closed(&self) -> bool { fn is_open(&self) -> bool {
self.closed !self.closed
} }
fn reserve(self) -> Reservation<Self> { fn reserve(self) -> Reservation<Self> {

View File

@@ -88,7 +88,7 @@ fn conn_reset_after_write() {
} }
// sleep to allow some time for the connection to return to the pool // sleep to allow some time for the connection to return to the pool
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_millis(50));
let req = Request::builder() let req = Request::builder()
.uri("http://mock.local/a") .uri("http://mock.local/a")