fix(client): check conn is closed in expire interval
This commit is contained in:
@@ -376,13 +376,9 @@ impl<B> Clone for HyperClient<B> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B> self::pool::Ready for HyperClient<B> {
|
impl<B> self::pool::Closed for HyperClient<B> {
|
||||||
fn poll_ready(&mut self) -> Poll<(), ()> {
|
fn is_closed(&self) -> bool {
|
||||||
if self.tx.is_closed() {
|
self.tx.is_closed()
|
||||||
Err(())
|
|
||||||
} else {
|
|
||||||
Ok(Async::Ready(()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -21,8 +21,8 @@ pub struct Pool<T> {
|
|||||||
// This is a trait to allow the `client::pool::tests` to work for `i32`.
|
// This is a trait to allow the `client::pool::tests` to work for `i32`.
|
||||||
//
|
//
|
||||||
// See https://github.com/hyperium/hyper/issues/1429
|
// See https://github.com/hyperium/hyper/issues/1429
|
||||||
pub trait Ready {
|
pub trait Closed {
|
||||||
fn poll_ready(&mut self) -> Poll<(), ()>;
|
fn is_closed(&self) -> bool;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct PoolInner<T> {
|
struct PoolInner<T> {
|
||||||
@@ -49,7 +49,7 @@ struct PoolInner<T> {
|
|||||||
expired_timer_spawned: bool,
|
expired_timer_spawned: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Clone + Ready> Pool<T> {
|
impl<T: Clone + Closed> Pool<T> {
|
||||||
pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
|
pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
|
||||||
Pool {
|
Pool {
|
||||||
inner: Rc::new(RefCell::new(PoolInner {
|
inner: Rc::new(RefCell::new(PoolInner {
|
||||||
@@ -117,10 +117,10 @@ impl<T: Clone + Ready> Pool<T> {
|
|||||||
let mut should_remove = false;
|
let mut should_remove = false;
|
||||||
let entry = inner.idle.get_mut(key).and_then(|list| {
|
let entry = inner.idle.get_mut(key).and_then(|list| {
|
||||||
trace!("take; url = {:?}, expiration = {:?}", key, expiration.0);
|
trace!("take; url = {:?}, expiration = {:?}", key, expiration.0);
|
||||||
while let Some(mut entry) = list.pop() {
|
while let Some(entry) = list.pop() {
|
||||||
match entry.status.get() {
|
match entry.status.get() {
|
||||||
TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => {
|
TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => {
|
||||||
if let Ok(Async::Ready(())) = entry.value.poll_ready() {
|
if !entry.value.is_closed() {
|
||||||
should_remove = list.is_empty();
|
should_remove = list.is_empty();
|
||||||
return Some(entry);
|
return Some(entry);
|
||||||
}
|
}
|
||||||
@@ -202,7 +202,9 @@ impl<T> Pool<T> {
|
|||||||
inner.parked.remove(key);
|
inner.parked.remove(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Closed> Pool<T> {
|
||||||
fn clear_expired(&self) {
|
fn clear_expired(&self) {
|
||||||
let mut inner = self.inner.borrow_mut();
|
let mut inner = self.inner.borrow_mut();
|
||||||
|
|
||||||
@@ -218,6 +220,9 @@ impl<T> Pool<T> {
|
|||||||
inner.idle.retain(|_key, values| {
|
inner.idle.retain(|_key, values| {
|
||||||
|
|
||||||
values.retain(|val| {
|
values.retain(|val| {
|
||||||
|
if val.value.is_closed() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
match val.status.get() {
|
match val.status.get() {
|
||||||
TimedKA::Idle(idle_at) if now - idle_at < dur => {
|
TimedKA::Idle(idle_at) if now - idle_at < dur => {
|
||||||
true
|
true
|
||||||
@@ -234,7 +239,7 @@ impl<T> Pool<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
impl<T: 'static> Pool<T> {
|
impl<T: Closed + 'static> Pool<T> {
|
||||||
pub(super) fn spawn_expired_interval(&self, handle: &Handle) {
|
pub(super) fn spawn_expired_interval(&self, handle: &Handle) {
|
||||||
let mut inner = self.inner.borrow_mut();
|
let mut inner = self.inner.borrow_mut();
|
||||||
|
|
||||||
@@ -296,7 +301,7 @@ impl<T> DerefMut for Pooled<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Clone + Ready> KeepAlive for Pooled<T> {
|
impl<T: Clone + Closed> KeepAlive for Pooled<T> {
|
||||||
fn busy(&mut self) {
|
fn busy(&mut self) {
|
||||||
self.entry.status.set(TimedKA::Busy);
|
self.entry.status.set(TimedKA::Busy);
|
||||||
}
|
}
|
||||||
@@ -347,7 +352,7 @@ impl<T> fmt::Debug for Pooled<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Clone + Ready> BitAndAssign<bool> for Pooled<T> {
|
impl<T: Clone + Closed> BitAndAssign<bool> for Pooled<T> {
|
||||||
fn bitand_assign(&mut self, enabled: bool) {
|
fn bitand_assign(&mut self, enabled: bool) {
|
||||||
if !enabled {
|
if !enabled {
|
||||||
self.disable();
|
self.disable();
|
||||||
@@ -377,13 +382,13 @@ pub struct Checkout<T> {
|
|||||||
|
|
||||||
struct NotParked;
|
struct NotParked;
|
||||||
|
|
||||||
impl<T: Clone + Ready> Checkout<T> {
|
impl<T: Clone + Closed> Checkout<T> {
|
||||||
fn poll_parked(&mut self) -> Poll<Pooled<T>, NotParked> {
|
fn poll_parked(&mut self) -> Poll<Pooled<T>, NotParked> {
|
||||||
let mut drop_parked = false;
|
let mut drop_parked = false;
|
||||||
if let Some(ref mut rx) = self.parked {
|
if let Some(ref mut rx) = self.parked {
|
||||||
match rx.poll() {
|
match rx.poll() {
|
||||||
Ok(Async::Ready(mut entry)) => {
|
Ok(Async::Ready(mut entry)) => {
|
||||||
if let Ok(Async::Ready(())) = entry.value.poll_ready() {
|
if !entry.value.is_closed() {
|
||||||
return Ok(Async::Ready(self.pool.reuse(&self.key, entry)));
|
return Ok(Async::Ready(self.pool.reuse(&self.key, entry)));
|
||||||
}
|
}
|
||||||
drop_parked = true;
|
drop_parked = true;
|
||||||
@@ -408,7 +413,7 @@ impl<T: Clone + Ready> Checkout<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Clone + Ready> Future for Checkout<T> {
|
impl<T: Clone + Closed> Future for Checkout<T> {
|
||||||
type Item = Pooled<T>;
|
type Item = Pooled<T>;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|
||||||
@@ -456,7 +461,7 @@ struct IdleInterval<T> {
|
|||||||
pool: Weak<RefCell<PoolInner<T>>>,
|
pool: Weak<RefCell<PoolInner<T>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: 'static> Future for IdleInterval<T> {
|
impl<T: Closed + 'static> Future for IdleInterval<T> {
|
||||||
type Item = ();
|
type Item = ();
|
||||||
type Error = ();
|
type Error = ();
|
||||||
|
|
||||||
@@ -478,14 +483,14 @@ impl<T: 'static> Future for IdleInterval<T> {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use futures::{Async, Future, Poll};
|
use futures::{Async, Future};
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use proto::KeepAlive;
|
use proto::KeepAlive;
|
||||||
use super::{Ready, Pool};
|
use super::{Closed, Pool};
|
||||||
|
|
||||||
impl Ready for i32 {
|
impl Closed for i32 {
|
||||||
fn poll_ready(&mut self) -> Poll<(), ()> {
|
fn is_closed(&self) -> bool {
|
||||||
Ok(Async::Ready(()))
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user