refactor(lib): change from futures-timer to tokio-timer
This commit is contained in:
@@ -18,7 +18,7 @@ cache:
|
|||||||
script:
|
script:
|
||||||
- ./.travis/readme.py
|
- ./.travis/readme.py
|
||||||
- cargo build $FEATURES
|
- cargo build $FEATURES
|
||||||
- 'if [ "$BUILD_ONLY" != "1" ]; then RUST_LOG=hyper cargo test $FEATURES; fi'
|
- 'if [ "$BUILD_ONLY" != "1" ]; then RUST_LOG=hyper cargo test $FEATURES -- --test-threads=1; fi'
|
||||||
- 'if [ $TRAVIS_RUST_VERSION = nightly ]; then for f in ./benches/*.rs; do cargo test --bench $(basename $f .rs) $FEATURES; done; fi'
|
- 'if [ $TRAVIS_RUST_VERSION = nightly ]; then for f in ./benches/*.rs; do cargo test --bench $(basename $f .rs) $FEATURES; done; fi'
|
||||||
|
|
||||||
addons:
|
addons:
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ include = [
|
|||||||
bytes = "0.4.4"
|
bytes = "0.4.4"
|
||||||
futures = "0.1.21"
|
futures = "0.1.21"
|
||||||
futures-cpupool = { version = "0.1.6", optional = true }
|
futures-cpupool = { version = "0.1.6", optional = true }
|
||||||
futures-timer = "0.1.0"
|
|
||||||
http = "0.1.5"
|
http = "0.1.5"
|
||||||
httparse = "1.0"
|
httparse = "1.0"
|
||||||
h2 = "0.1.5"
|
h2 = "0.1.5"
|
||||||
@@ -37,9 +36,11 @@ tokio-executor = { version = "0.1.0", optional = true }
|
|||||||
tokio-io = "0.1"
|
tokio-io = "0.1"
|
||||||
tokio-reactor = { version = "0.1", optional = true }
|
tokio-reactor = { version = "0.1", optional = true }
|
||||||
tokio-tcp = { version = "0.1", optional = true }
|
tokio-tcp = { version = "0.1", optional = true }
|
||||||
|
tokio-timer = { version = "0.2", optional = true }
|
||||||
want = "0.0.4"
|
want = "0.0.4"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
futures-timer = "0.1"
|
||||||
num_cpus = "1.0"
|
num_cpus = "1.0"
|
||||||
pretty_env_logger = "0.2.0"
|
pretty_env_logger = "0.2.0"
|
||||||
spmc = "0.2"
|
spmc = "0.2"
|
||||||
@@ -55,6 +56,7 @@ runtime = [
|
|||||||
"tokio-executor",
|
"tokio-executor",
|
||||||
"tokio-reactor",
|
"tokio-reactor",
|
||||||
"tokio-tcp",
|
"tokio-tcp",
|
||||||
|
"tokio-timer",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[example]]
|
[[example]]
|
||||||
|
|||||||
@@ -113,14 +113,6 @@ where C: Connect + Sync + 'static,
|
|||||||
|
|
||||||
/// Send a constructed Request using this Client.
|
/// Send a constructed Request using this Client.
|
||||||
pub fn request(&self, mut req: Request<B>) -> FutureResponse {
|
pub fn request(&self, mut req: Request<B>) -> FutureResponse {
|
||||||
// TODO(0.12): do this at construction time.
|
|
||||||
//
|
|
||||||
// It cannot be done in the constructor because the Client::configured
|
|
||||||
// does not have `B: 'static` bounds, which are required to spawn
|
|
||||||
// the interval. In 0.12, add a static bounds to the constructor,
|
|
||||||
// and move this.
|
|
||||||
self.schedule_pool_timer();
|
|
||||||
|
|
||||||
match req.version() {
|
match req.version() {
|
||||||
Version::HTTP_10 |
|
Version::HTTP_10 |
|
||||||
Version::HTTP_11 => (),
|
Version::HTTP_11 => (),
|
||||||
@@ -302,7 +294,7 @@ where C: Connect + Sync + 'static,
|
|||||||
// for a new request to start.
|
// for a new request to start.
|
||||||
//
|
//
|
||||||
// It won't be ready if there is a body to stream.
|
// It won't be ready if there is a body to stream.
|
||||||
if ver == Ver::Http2 || pooled.is_ready() {
|
if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() {
|
||||||
drop(pooled);
|
drop(pooled);
|
||||||
} else if !res.body().is_empty() {
|
} else if !res.body().is_empty() {
|
||||||
let (delayed_tx, delayed_rx) = oneshot::channel();
|
let (delayed_tx, delayed_rx) = oneshot::channel();
|
||||||
@@ -336,10 +328,6 @@ where C: Connect + Sync + 'static,
|
|||||||
|
|
||||||
Box::new(resp)
|
Box::new(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn schedule_pool_timer(&self) {
|
|
||||||
self.pool.spawn_expired_interval(&self.executor);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C, B> Clone for Client<C, B> {
|
impl<C, B> Clone for Client<C, B> {
|
||||||
@@ -474,7 +462,7 @@ impl<B: Payload + 'static> PoolClient<B> {
|
|||||||
|
|
||||||
impl<B> Poolable for PoolClient<B>
|
impl<B> Poolable for PoolClient<B>
|
||||||
where
|
where
|
||||||
B: 'static,
|
B: Send + 'static,
|
||||||
{
|
{
|
||||||
fn is_open(&self) -> bool {
|
fn is_open(&self) -> bool {
|
||||||
match self.tx {
|
match self.tx {
|
||||||
@@ -700,7 +688,7 @@ impl Builder {
|
|||||||
executor: self.exec.clone(),
|
executor: self.exec.clone(),
|
||||||
h1_writev: self.h1_writev,
|
h1_writev: self.h1_writev,
|
||||||
h1_title_case_headers: self.h1_title_case_headers,
|
h1_title_case_headers: self.h1_title_case_headers,
|
||||||
pool: Pool::new(self.keep_alive, self.keep_alive_timeout),
|
pool: Pool::new(self.keep_alive, self.keep_alive_timeout, &self.exec),
|
||||||
retry_canceled_requests: self.retry_canceled_requests,
|
retry_canceled_requests: self.retry_canceled_requests,
|
||||||
set_host: self.set_host,
|
set_host: self.set_host,
|
||||||
ver: self.ver,
|
ver: self.ver,
|
||||||
|
|||||||
@@ -4,15 +4,16 @@ use std::ops::{Deref, DerefMut};
|
|||||||
use std::sync::{Arc, Mutex, Weak};
|
use std::sync::{Arc, Mutex, Weak};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use futures::{Future, Async, Poll, Stream};
|
use futures::{Future, Async, Poll};
|
||||||
use futures::sync::oneshot;
|
use futures::sync::oneshot;
|
||||||
use futures_timer::Interval;
|
#[cfg(feature = "runtime")]
|
||||||
|
use tokio_timer::Interval;
|
||||||
|
|
||||||
use common::{Exec, Never};
|
use common::Exec;
|
||||||
use super::Ver;
|
use super::Ver;
|
||||||
|
|
||||||
pub(super) struct Pool<T> {
|
pub(super) struct Pool<T> {
|
||||||
inner: Arc<Mutex<PoolInner<T>>>,
|
inner: Arc<PoolInner<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Before using a pooled connection, make sure the sender is not dead.
|
// Before using a pooled connection, make sure the sender is not dead.
|
||||||
@@ -20,7 +21,7 @@ pub(super) struct Pool<T> {
|
|||||||
// This is a trait to allow the `client::pool::tests` to work for `i32`.
|
// This is a trait to allow the `client::pool::tests` to work for `i32`.
|
||||||
//
|
//
|
||||||
// See https://github.com/hyperium/hyper/issues/1429
|
// See https://github.com/hyperium/hyper/issues/1429
|
||||||
pub(super) trait Poolable: Sized {
|
pub(super) trait Poolable: Send + Sized + 'static {
|
||||||
fn is_open(&self) -> bool;
|
fn is_open(&self) -> bool;
|
||||||
/// Reserve this connection.
|
/// Reserve this connection.
|
||||||
///
|
///
|
||||||
@@ -47,11 +48,20 @@ pub(super) enum Reservation<T> {
|
|||||||
type Key = (Arc<String>, Ver);
|
type Key = (Arc<String>, Ver);
|
||||||
|
|
||||||
struct PoolInner<T> {
|
struct PoolInner<T> {
|
||||||
|
connections: Mutex<Connections<T>>,
|
||||||
|
enabled: bool,
|
||||||
|
/// A single Weak pointer used every time a proper weak reference
|
||||||
|
/// is not needed. This prevents allocating space in the heap to hold
|
||||||
|
/// a PoolInner<T> *every single time*, and instead we just allocate
|
||||||
|
/// this one extra per pool.
|
||||||
|
weak: Weak<PoolInner<T>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Connections<T> {
|
||||||
// A flag that a connection is being estabilished, and the connection
|
// A flag that a connection is being estabilished, and the connection
|
||||||
// should be shared. This prevents making multiple HTTP/2 connections
|
// should be shared. This prevents making multiple HTTP/2 connections
|
||||||
// to the same host.
|
// to the same host.
|
||||||
connecting: HashSet<Key>,
|
connecting: HashSet<Key>,
|
||||||
enabled: bool,
|
|
||||||
// These are internal Conns sitting in the event loop in the KeepAlive
|
// These are internal Conns sitting in the event loop in the KeepAlive
|
||||||
// state, waiting to receive a new Request to send on the socket.
|
// state, waiting to receive a new Request to send on the socket.
|
||||||
idle: HashMap<Key, Vec<Idle<T>>>,
|
idle: HashMap<Key, Vec<Idle<T>>>,
|
||||||
@@ -65,23 +75,44 @@ struct PoolInner<T> {
|
|||||||
// them that the Conn could be used instead of waiting for a brand new
|
// them that the Conn could be used instead of waiting for a brand new
|
||||||
// connection.
|
// connection.
|
||||||
waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
|
waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
|
||||||
timeout: Option<Duration>,
|
|
||||||
// A oneshot channel is used to allow the interval to be notified when
|
// A oneshot channel is used to allow the interval to be notified when
|
||||||
// the Pool completely drops. That way, the interval can cancel immediately.
|
// the Pool completely drops. That way, the interval can cancel immediately.
|
||||||
idle_interval_ref: Option<oneshot::Sender<Never>>,
|
#[cfg(feature = "runtime")]
|
||||||
|
idle_interval_ref: Option<oneshot::Sender<::common::Never>>,
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
|
exec: Exec,
|
||||||
|
timeout: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Pool<T> {
|
impl<T> Pool<T> {
|
||||||
pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
|
pub fn new(enabled: bool, timeout: Option<Duration>, __exec: &Exec) -> Pool<T> {
|
||||||
Pool {
|
Pool {
|
||||||
inner: Arc::new(Mutex::new(PoolInner {
|
inner: Arc::new(PoolInner {
|
||||||
|
connections: Mutex::new(Connections {
|
||||||
connecting: HashSet::new(),
|
connecting: HashSet::new(),
|
||||||
enabled: enabled,
|
|
||||||
idle: HashMap::new(),
|
idle: HashMap::new(),
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
idle_interval_ref: None,
|
idle_interval_ref: None,
|
||||||
waiters: HashMap::new(),
|
waiters: HashMap::new(),
|
||||||
timeout: timeout,
|
#[cfg(feature = "runtime")]
|
||||||
})),
|
exec: __exec.clone(),
|
||||||
|
timeout,
|
||||||
|
}),
|
||||||
|
enabled,
|
||||||
|
weak: Weak::new(),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub(super) fn no_timer(&self) {
|
||||||
|
// Prevent an actual interval from being created for this pool...
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
|
{
|
||||||
|
let mut inner = self.inner.connections.lock().unwrap();
|
||||||
|
assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
|
||||||
|
let (tx, _) = oneshot::channel();
|
||||||
|
inner.idle_interval_ref = Some(tx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -100,8 +131,8 @@ impl<T: Poolable> Pool<T> {
|
|||||||
/// Ensure that there is only ever 1 connecting task for HTTP/2
|
/// Ensure that there is only ever 1 connecting task for HTTP/2
|
||||||
/// connections. This does nothing for HTTP/1.
|
/// connections. This does nothing for HTTP/1.
|
||||||
pub(super) fn connecting(&self, key: &Key) -> Option<Connecting<T>> {
|
pub(super) fn connecting(&self, key: &Key) -> Option<Connecting<T>> {
|
||||||
if key.1 == Ver::Http2 {
|
if key.1 == Ver::Http2 && self.inner.enabled {
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.connections.lock().unwrap();
|
||||||
if inner.connecting.insert(key.clone()) {
|
if inner.connecting.insert(key.clone()) {
|
||||||
let connecting = Connecting {
|
let connecting = Connecting {
|
||||||
key: key.clone(),
|
key: key.clone(),
|
||||||
@@ -117,14 +148,14 @@ impl<T: Poolable> Pool<T> {
|
|||||||
key: key.clone(),
|
key: key.clone(),
|
||||||
// in HTTP/1's case, there is never a lock, so we don't
|
// in HTTP/1's case, there is never a lock, so we don't
|
||||||
// need to do anything in Drop.
|
// need to do anything in Drop.
|
||||||
pool: Weak::new(),
|
pool: self.inner.weak.clone(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn take(&self, key: &Key) -> Option<Pooled<T>> {
|
fn take(&self, key: &Key) -> Option<Pooled<T>> {
|
||||||
let entry = {
|
let entry = {
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.connections.lock().unwrap();
|
||||||
let expiration = Expiration::new(inner.timeout);
|
let expiration = Expiration::new(inner.timeout);
|
||||||
let maybe_entry = inner.idle.get_mut(key)
|
let maybe_entry = inner.idle.get_mut(key)
|
||||||
.and_then(|list| {
|
.and_then(|list| {
|
||||||
@@ -158,35 +189,45 @@ impl<T: Poolable> Pool<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
|
pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
|
||||||
let (value, pool_ref) = match value.reserve() {
|
let (value, pool_ref, has_pool) = if self.inner.enabled {
|
||||||
|
match value.reserve() {
|
||||||
Reservation::Shared(to_insert, to_return) => {
|
Reservation::Shared(to_insert, to_return) => {
|
||||||
debug_assert_eq!(
|
debug_assert_eq!(
|
||||||
connecting.key.1,
|
connecting.key.1,
|
||||||
Ver::Http2,
|
Ver::Http2,
|
||||||
"shared reservation without Http2"
|
"shared reservation without Http2"
|
||||||
);
|
);
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.connections.lock().unwrap();
|
||||||
inner.put(connecting.key.clone(), to_insert);
|
inner.put(connecting.key.clone(), to_insert, &self.inner);
|
||||||
// Do this here instead of Drop for Connecting because we
|
// Do this here instead of Drop for Connecting because we
|
||||||
// already have a lock, no need to lock the mutex twice.
|
// already have a lock, no need to lock the mutex twice.
|
||||||
inner.connected(&connecting.key);
|
inner.connected(&connecting.key);
|
||||||
// prevent the Drop of Connecting from repeating inner.connected()
|
// prevent the Drop of Connecting from repeating inner.connected()
|
||||||
connecting.pool = Weak::new();
|
connecting.pool = self.inner.weak.clone();
|
||||||
|
|
||||||
// Shared reservations don't need a reference to the pool,
|
// Shared reservations don't need a reference to the pool,
|
||||||
// since the pool always keeps a copy.
|
// since the pool always keeps a copy.
|
||||||
(to_return, Weak::new())
|
(to_return, self.inner.weak.clone(), false)
|
||||||
},
|
},
|
||||||
Reservation::Unique(value) => {
|
Reservation::Unique(value) => {
|
||||||
// Unique reservations must take a reference to the pool
|
// Unique reservations must take a reference to the pool
|
||||||
// since they hope to reinsert once the reservation is
|
// since they hope to reinsert once the reservation is
|
||||||
// completed
|
// completed
|
||||||
(value, Arc::downgrade(&self.inner))
|
(value, Arc::downgrade(&self.inner), true)
|
||||||
},
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If pool is not enabled, skip all the things...
|
||||||
|
|
||||||
|
// The Connecting should have had no pool ref
|
||||||
|
debug_assert!(connecting.pool.upgrade().is_none());
|
||||||
|
|
||||||
|
(value, self.inner.weak.clone(), false)
|
||||||
};
|
};
|
||||||
Pooled {
|
Pooled {
|
||||||
is_reused: false,
|
|
||||||
key: connecting.key.clone(),
|
key: connecting.key.clone(),
|
||||||
|
has_pool,
|
||||||
|
is_reused: false,
|
||||||
pool: pool_ref,
|
pool: pool_ref,
|
||||||
value: Some(value)
|
value: Some(value)
|
||||||
}
|
}
|
||||||
@@ -202,13 +243,14 @@ impl<T: Poolable> Pool<T> {
|
|||||||
// we just have the final value, without knowledge of if this is
|
// we just have the final value, without knowledge of if this is
|
||||||
// unique or shared. So, the hack is to just assume Ver::Http2 means
|
// unique or shared. So, the hack is to just assume Ver::Http2 means
|
||||||
// shared... :(
|
// shared... :(
|
||||||
let pool_ref = if key.1 == Ver::Http2 {
|
let (pool_ref, has_pool) = if key.1 == Ver::Http2 {
|
||||||
Weak::new()
|
(self.inner.weak.clone(), false)
|
||||||
} else {
|
} else {
|
||||||
Arc::downgrade(&self.inner)
|
(Arc::downgrade(&self.inner), true)
|
||||||
};
|
};
|
||||||
|
|
||||||
Pooled {
|
Pooled {
|
||||||
|
has_pool,
|
||||||
is_reused: true,
|
is_reused: true,
|
||||||
key: key.clone(),
|
key: key.clone(),
|
||||||
pool: pool_ref,
|
pool: pool_ref,
|
||||||
@@ -218,7 +260,7 @@ impl<T: Poolable> Pool<T> {
|
|||||||
|
|
||||||
fn waiter(&mut self, key: Key, tx: oneshot::Sender<T>) {
|
fn waiter(&mut self, key: Key, tx: oneshot::Sender<T>) {
|
||||||
trace!("checkout waiting for idle connection: {:?}", key);
|
trace!("checkout waiting for idle connection: {:?}", key);
|
||||||
self.inner.lock().unwrap()
|
self.inner.connections.lock().unwrap()
|
||||||
.waiters.entry(key)
|
.waiters.entry(key)
|
||||||
.or_insert(VecDeque::new())
|
.or_insert(VecDeque::new())
|
||||||
.push_back(tx);
|
.push_back(tx);
|
||||||
@@ -274,11 +316,8 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Poolable> PoolInner<T> {
|
impl<T: Poolable> Connections<T> {
|
||||||
fn put(&mut self, key: Key, value: T) {
|
fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<PoolInner<T>>) {
|
||||||
if !self.enabled {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if key.1 == Ver::Http2 && self.idle.contains_key(&key) {
|
if key.1 == Ver::Http2 && self.idle.contains_key(&key) {
|
||||||
trace!("put; existing idle HTTP/2 connection for {:?}", key);
|
trace!("put; existing idle HTTP/2 connection for {:?}", key);
|
||||||
return;
|
return;
|
||||||
@@ -328,6 +367,11 @@ impl<T: Poolable> PoolInner<T> {
|
|||||||
value: value,
|
value: value,
|
||||||
idle_at: Instant::now(),
|
idle_at: Instant::now(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
|
{
|
||||||
|
self.spawn_idle_interval(__pool_ref);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
None => trace!("put; found waiter for {:?}", key),
|
None => trace!("put; found waiter for {:?}", key),
|
||||||
}
|
}
|
||||||
@@ -346,9 +390,37 @@ impl<T: Poolable> PoolInner<T> {
|
|||||||
// those waiters would never receive a connection.
|
// those waiters would never receive a connection.
|
||||||
self.waiters.remove(key);
|
self.waiters.remove(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
|
fn spawn_idle_interval(&mut self, pool_ref: &Arc<PoolInner<T>>) {
|
||||||
|
let (dur, rx) = {
|
||||||
|
debug_assert!(pool_ref.enabled);
|
||||||
|
|
||||||
|
if self.idle_interval_ref.is_some() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(dur) = self.timeout {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.idle_interval_ref = Some(tx);
|
||||||
|
(dur, rx)
|
||||||
|
} else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let start = Instant::now() + dur;
|
||||||
|
|
||||||
|
let interval = Interval::new(start, dur);
|
||||||
|
self.exec.execute(IdleInterval {
|
||||||
|
interval: interval,
|
||||||
|
pool: Arc::downgrade(pool_ref),
|
||||||
|
pool_drop_notifier: rx,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> PoolInner<T> {
|
impl<T> Connections<T> {
|
||||||
/// Any `FutureResponse`s that were created will have made a `Checkout`,
|
/// Any `FutureResponse`s that were created will have made a `Checkout`,
|
||||||
/// and possibly inserted into the pool that it is waiting for an idle
|
/// and possibly inserted into the pool that it is waiting for an idle
|
||||||
/// connection. If a user ever dropped that future, we need to clean out
|
/// connection. If a user ever dropped that future, we need to clean out
|
||||||
@@ -367,7 +439,8 @@ impl<T> PoolInner<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Poolable> PoolInner<T> {
|
#[cfg(feature = "runtime")]
|
||||||
|
impl<T: Poolable> Connections<T> {
|
||||||
/// This should *only* be called by the IdleInterval.
|
/// This should *only* be called by the IdleInterval.
|
||||||
fn clear_expired(&mut self) {
|
fn clear_expired(&mut self) {
|
||||||
let dur = self.timeout.expect("interval assumes timeout");
|
let dur = self.timeout.expect("interval assumes timeout");
|
||||||
@@ -396,38 +469,6 @@ impl<T: Poolable> PoolInner<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
impl<T: Poolable + Send + 'static> Pool<T> {
|
|
||||||
pub(super) fn spawn_expired_interval(&self, exec: &Exec) {
|
|
||||||
let (dur, rx) = {
|
|
||||||
let mut inner = self.inner.lock().unwrap();
|
|
||||||
|
|
||||||
if !inner.enabled {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if inner.idle_interval_ref.is_some() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(dur) = inner.timeout {
|
|
||||||
let (tx, rx) = oneshot::channel();
|
|
||||||
inner.idle_interval_ref = Some(tx);
|
|
||||||
(dur, rx)
|
|
||||||
} else {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let interval = Interval::new(dur);
|
|
||||||
exec.execute(IdleInterval {
|
|
||||||
interval: interval,
|
|
||||||
pool: Arc::downgrade(&self.inner),
|
|
||||||
pool_drop_notifier: rx,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> Clone for Pool<T> {
|
impl<T> Clone for Pool<T> {
|
||||||
fn clone(&self) -> Pool<T> {
|
fn clone(&self) -> Pool<T> {
|
||||||
Pool {
|
Pool {
|
||||||
@@ -440,9 +481,10 @@ impl<T> Clone for Pool<T> {
|
|||||||
// Note: The bounds `T: Poolable` is needed for the Drop impl.
|
// Note: The bounds `T: Poolable` is needed for the Drop impl.
|
||||||
pub(super) struct Pooled<T: Poolable> {
|
pub(super) struct Pooled<T: Poolable> {
|
||||||
value: Option<T>,
|
value: Option<T>,
|
||||||
|
has_pool: bool,
|
||||||
is_reused: bool,
|
is_reused: bool,
|
||||||
key: Key,
|
key: Key,
|
||||||
pool: Weak<Mutex<PoolInner<T>>>,
|
pool: Weak<PoolInner<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Poolable> Pooled<T> {
|
impl<T: Poolable> Pooled<T> {
|
||||||
@@ -450,6 +492,10 @@ impl<T: Poolable> Pooled<T> {
|
|||||||
self.is_reused
|
self.is_reused
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_pool_enabled(&self) -> bool {
|
||||||
|
self.has_pool
|
||||||
|
}
|
||||||
|
|
||||||
fn as_ref(&self) -> &T {
|
fn as_ref(&self) -> &T {
|
||||||
self.value.as_ref().expect("not dropped")
|
self.value.as_ref().expect("not dropped")
|
||||||
}
|
}
|
||||||
@@ -481,9 +527,13 @@ impl<T: Poolable> Drop for Pooled<T> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(inner) = self.pool.upgrade() {
|
if let Some(pool) = self.pool.upgrade() {
|
||||||
if let Ok(mut inner) = inner.lock() {
|
// Pooled should not have had a real reference if pool is
|
||||||
inner.put(self.key.clone(), value);
|
// not enabled!
|
||||||
|
debug_assert!(pool.enabled);
|
||||||
|
|
||||||
|
if let Ok(mut inner) = pool.connections.lock() {
|
||||||
|
inner.put(self.key.clone(), value, &pool);
|
||||||
}
|
}
|
||||||
} else if self.key.1 == Ver::Http1 {
|
} else if self.key.1 == Ver::Http1 {
|
||||||
trace!("pool dropped, dropping pooled ({:?})", self.key);
|
trace!("pool dropped, dropping pooled ({:?})", self.key);
|
||||||
@@ -569,7 +619,7 @@ impl<T: Poolable> Future for Checkout<T> {
|
|||||||
impl<T> Drop for Checkout<T> {
|
impl<T> Drop for Checkout<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.waiter.take().is_some() {
|
if self.waiter.take().is_some() {
|
||||||
if let Ok(mut inner) = self.pool.inner.lock() {
|
if let Ok(mut inner) = self.pool.inner.connections.lock() {
|
||||||
inner.clean_waiters(&self.key);
|
inner.clean_waiters(&self.key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -578,14 +628,14 @@ impl<T> Drop for Checkout<T> {
|
|||||||
|
|
||||||
pub(super) struct Connecting<T: Poolable> {
|
pub(super) struct Connecting<T: Poolable> {
|
||||||
key: Key,
|
key: Key,
|
||||||
pool: Weak<Mutex<PoolInner<T>>>,
|
pool: Weak<PoolInner<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Poolable> Drop for Connecting<T> {
|
impl<T: Poolable> Drop for Connecting<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(pool) = self.pool.upgrade() {
|
if let Some(pool) = self.pool.upgrade() {
|
||||||
// No need to panic on drop, that could abort!
|
// No need to panic on drop, that could abort!
|
||||||
if let Ok(mut inner) = pool.lock() {
|
if let Ok(mut inner) = pool.connections.lock() {
|
||||||
debug_assert_eq!(
|
debug_assert_eq!(
|
||||||
self.key.1,
|
self.key.1,
|
||||||
Ver::Http2,
|
Ver::Http2,
|
||||||
@@ -612,20 +662,25 @@ impl Expiration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
struct IdleInterval<T> {
|
struct IdleInterval<T> {
|
||||||
interval: Interval,
|
interval: Interval,
|
||||||
pool: Weak<Mutex<PoolInner<T>>>,
|
pool: Weak<PoolInner<T>>,
|
||||||
// This allows the IdleInterval to be notified as soon as the entire
|
// This allows the IdleInterval to be notified as soon as the entire
|
||||||
// Pool is fully dropped, and shutdown. This channel is never sent on,
|
// Pool is fully dropped, and shutdown. This channel is never sent on,
|
||||||
// but Err(Canceled) will be received when the Pool is dropped.
|
// but Err(Canceled) will be received when the Pool is dropped.
|
||||||
pool_drop_notifier: oneshot::Receiver<Never>,
|
pool_drop_notifier: oneshot::Receiver<::common::Never>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl<T: Poolable + 'static> Future for IdleInterval<T> {
|
impl<T: Poolable + 'static> Future for IdleInterval<T> {
|
||||||
type Item = ();
|
type Item = ();
|
||||||
type Error = ();
|
type Error = ();
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
// Interval is a Stream
|
||||||
|
use futures::Stream;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match self.pool_drop_notifier.poll() {
|
match self.pool_drop_notifier.poll() {
|
||||||
Ok(Async::Ready(n)) => match n {},
|
Ok(Async::Ready(n)) => match n {},
|
||||||
@@ -636,10 +691,13 @@ impl<T: Poolable + 'static> Future for IdleInterval<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error")));
|
try_ready!(self.interval.poll().map_err(|err| {
|
||||||
|
error!("idle interval timer error: {}", err);
|
||||||
|
}));
|
||||||
|
|
||||||
if let Some(inner) = self.pool.upgrade() {
|
if let Some(inner) = self.pool.upgrade() {
|
||||||
if let Ok(mut inner) = inner.lock() {
|
if let Ok(mut inner) = inner.connections.lock() {
|
||||||
|
trace!("idle interval checking for expired");
|
||||||
inner.clear_expired();
|
inner.clear_expired();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -655,13 +713,14 @@ mod tests {
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use futures::{Async, Future};
|
use futures::{Async, Future};
|
||||||
use futures::future;
|
use futures::future;
|
||||||
|
use common::Exec;
|
||||||
use super::{Connecting, Key, Poolable, Pool, Reservation, Ver};
|
use super::{Connecting, Key, Poolable, Pool, Reservation, Ver};
|
||||||
|
|
||||||
/// Test unique reservations.
|
/// Test unique reservations.
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
struct Uniq<T>(T);
|
struct Uniq<T>(T);
|
||||||
|
|
||||||
impl<T> Poolable for Uniq<T> {
|
impl<T: Send + 'static> Poolable for Uniq<T> {
|
||||||
fn is_open(&self) -> bool {
|
fn is_open(&self) -> bool {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
@@ -678,9 +737,15 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn pool_no_timer<T>() -> Pool<T> {
|
||||||
|
let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Default);
|
||||||
|
pool.no_timer();
|
||||||
|
pool
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_pool_checkout_smoke() {
|
fn test_pool_checkout_smoke() {
|
||||||
let pool = Pool::new(true, Some(Duration::from_secs(5)));
|
let pool = pool_no_timer();
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
||||||
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
||||||
|
|
||||||
@@ -695,11 +760,11 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_pool_checkout_returns_none_if_expired() {
|
fn test_pool_checkout_returns_none_if_expired() {
|
||||||
future::lazy(|| {
|
future::lazy(|| {
|
||||||
let pool = Pool::new(true, Some(Duration::from_millis(100)));
|
let pool = pool_no_timer();
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
||||||
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
||||||
drop(pooled);
|
drop(pooled);
|
||||||
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
|
::std::thread::sleep(pool.inner.connections.lock().unwrap().timeout.unwrap());
|
||||||
assert!(pool.checkout(key).poll().unwrap().is_not_ready());
|
assert!(pool.checkout(key).poll().unwrap().is_not_ready());
|
||||||
::futures::future::ok::<(), ()>(())
|
::futures::future::ok::<(), ()>(())
|
||||||
}).wait().unwrap();
|
}).wait().unwrap();
|
||||||
@@ -708,19 +773,19 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_pool_checkout_removes_expired() {
|
fn test_pool_checkout_removes_expired() {
|
||||||
future::lazy(|| {
|
future::lazy(|| {
|
||||||
let pool = Pool::new(true, Some(Duration::from_millis(100)));
|
let pool = pool_no_timer();
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
||||||
|
|
||||||
pool.pooled(c(key.clone()), Uniq(41));
|
pool.pooled(c(key.clone()), Uniq(41));
|
||||||
pool.pooled(c(key.clone()), Uniq(5));
|
pool.pooled(c(key.clone()), Uniq(5));
|
||||||
pool.pooled(c(key.clone()), Uniq(99));
|
pool.pooled(c(key.clone()), Uniq(99));
|
||||||
|
|
||||||
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
|
assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
|
||||||
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
|
::std::thread::sleep(pool.inner.connections.lock().unwrap().timeout.unwrap());
|
||||||
|
|
||||||
// checkout.poll() should clean out the expired
|
// checkout.poll() should clean out the expired
|
||||||
pool.checkout(key.clone()).poll().unwrap();
|
pool.checkout(key.clone()).poll().unwrap();
|
||||||
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
|
assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none());
|
||||||
|
|
||||||
Ok::<(), ()>(())
|
Ok::<(), ()>(())
|
||||||
}).wait().unwrap();
|
}).wait().unwrap();
|
||||||
@@ -730,30 +795,26 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_pool_timer_removes_expired() {
|
fn test_pool_timer_removes_expired() {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use common::Exec;
|
|
||||||
let runtime = ::tokio::runtime::Runtime::new().unwrap();
|
let runtime = ::tokio::runtime::Runtime::new().unwrap();
|
||||||
let pool = Pool::new(true, Some(Duration::from_millis(100)));
|
|
||||||
|
|
||||||
let executor = runtime.executor();
|
let executor = runtime.executor();
|
||||||
pool.spawn_expired_interval(&Exec::Executor(Arc::new(executor)));
|
let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Executor(Arc::new(executor)));
|
||||||
|
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
||||||
|
|
||||||
pool.pooled(c(key.clone()), Uniq(41));
|
pool.pooled(c(key.clone()), Uniq(41));
|
||||||
pool.pooled(c(key.clone()), Uniq(5));
|
pool.pooled(c(key.clone()), Uniq(5));
|
||||||
pool.pooled(c(key.clone()), Uniq(99));
|
pool.pooled(c(key.clone()), Uniq(99));
|
||||||
|
|
||||||
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
|
assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
|
||||||
|
|
||||||
::futures_timer::Delay::new(
|
::std::thread::sleep(Duration::from_millis(400)); // allow for too-good resolution
|
||||||
Duration::from_millis(400) // allow for too-good resolution
|
|
||||||
).wait().unwrap();
|
|
||||||
|
|
||||||
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
|
assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_pool_checkout_task_unparked() {
|
fn test_pool_checkout_task_unparked() {
|
||||||
let pool = Pool::new(true, Some(Duration::from_secs(10)));
|
let pool = pool_no_timer();
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
||||||
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
let pooled = pool.pooled(c(key.clone()), Uniq(41));
|
||||||
|
|
||||||
@@ -772,7 +833,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_pool_checkout_drop_cleans_up_waiters() {
|
fn test_pool_checkout_drop_cleans_up_waiters() {
|
||||||
future::lazy(|| {
|
future::lazy(|| {
|
||||||
let pool = Pool::<Uniq<i32>>::new(true, Some(Duration::from_secs(10)));
|
let pool = pool_no_timer::<Uniq<i32>>();
|
||||||
let key = (Arc::new("localhost:12345".to_string()), Ver::Http1);
|
let key = (Arc::new("localhost:12345".to_string()), Ver::Http1);
|
||||||
|
|
||||||
let mut checkout1 = pool.checkout(key.clone());
|
let mut checkout1 = pool.checkout(key.clone());
|
||||||
@@ -780,16 +841,16 @@ mod tests {
|
|||||||
|
|
||||||
// first poll needed to get into Pool's parked
|
// first poll needed to get into Pool's parked
|
||||||
checkout1.poll().unwrap();
|
checkout1.poll().unwrap();
|
||||||
assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 1);
|
assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 1);
|
||||||
checkout2.poll().unwrap();
|
checkout2.poll().unwrap();
|
||||||
assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 2);
|
assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 2);
|
||||||
|
|
||||||
// on drop, clean up Pool
|
// on drop, clean up Pool
|
||||||
drop(checkout1);
|
drop(checkout1);
|
||||||
assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 1);
|
assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 1);
|
||||||
|
|
||||||
drop(checkout2);
|
drop(checkout2);
|
||||||
assert!(pool.inner.lock().unwrap().waiters.get(&key).is_none());
|
assert!(pool.inner.connections.lock().unwrap().waiters.get(&key).is_none());
|
||||||
|
|
||||||
::futures::future::ok::<(), ()>(())
|
::futures::future::ok::<(), ()>(())
|
||||||
}).wait().unwrap();
|
}).wait().unwrap();
|
||||||
@@ -813,13 +874,13 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn pooled_drop_if_closed_doesnt_reinsert() {
|
fn pooled_drop_if_closed_doesnt_reinsert() {
|
||||||
let pool = Pool::new(true, Some(Duration::from_secs(10)));
|
let pool = pool_no_timer();
|
||||||
let key = (Arc::new("localhost:12345".to_string()), Ver::Http1);
|
let key = (Arc::new("localhost:12345".to_string()), Ver::Http1);
|
||||||
pool.pooled(c(key.clone()), CanClose {
|
pool.pooled(c(key.clone()), CanClose {
|
||||||
val: 57,
|
val: 57,
|
||||||
closed: true,
|
closed: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
assert!(!pool.inner.lock().unwrap().idle.contains_key(&key));
|
assert!(!pool.inner.connections.lock().unwrap().idle.contains_key(&key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ fn retryable_request() {
|
|||||||
.executor(executor.sender().clone())
|
.executor(executor.sender().clone())
|
||||||
.build::<_, ::Body>(connector);
|
.build::<_, ::Body>(connector);
|
||||||
|
|
||||||
|
client.pool.no_timer();
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
||||||
let req = Request::builder()
|
let req = Request::builder()
|
||||||
@@ -71,6 +73,8 @@ fn conn_reset_after_write() {
|
|||||||
.executor(executor.sender().clone())
|
.executor(executor.sender().clone())
|
||||||
.build::<_, ::Body>(connector);
|
.build::<_, ::Body>(connector);
|
||||||
|
|
||||||
|
client.pool.no_timer();
|
||||||
|
|
||||||
{
|
{
|
||||||
let req = Request::builder()
|
let req = Request::builder()
|
||||||
.uri("http://mock.local/a")
|
.uri("http://mock.local/a")
|
||||||
@@ -88,7 +92,7 @@ fn conn_reset_after_write() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// sleep to allow some time for the connection to return to the pool
|
// sleep to allow some time for the connection to return to the pool
|
||||||
thread::sleep(Duration::from_millis(50));
|
thread::sleep(Duration::from_millis(10));
|
||||||
|
|
||||||
let req = Request::builder()
|
let req = Request::builder()
|
||||||
.uri("http://mock.local/a")
|
.uri("http://mock.local/a")
|
||||||
|
|||||||
@@ -19,7 +19,6 @@
|
|||||||
extern crate bytes;
|
extern crate bytes;
|
||||||
#[macro_use] extern crate futures;
|
#[macro_use] extern crate futures;
|
||||||
#[cfg(feature = "runtime")] extern crate futures_cpupool;
|
#[cfg(feature = "runtime")] extern crate futures_cpupool;
|
||||||
extern crate futures_timer;
|
|
||||||
extern crate h2;
|
extern crate h2;
|
||||||
extern crate http;
|
extern crate http;
|
||||||
extern crate httparse;
|
extern crate httparse;
|
||||||
@@ -32,6 +31,7 @@ extern crate time;
|
|||||||
#[macro_use] extern crate tokio_io;
|
#[macro_use] extern crate tokio_io;
|
||||||
#[cfg(feature = "runtime")] extern crate tokio_reactor;
|
#[cfg(feature = "runtime")] extern crate tokio_reactor;
|
||||||
#[cfg(feature = "runtime")] extern crate tokio_tcp;
|
#[cfg(feature = "runtime")] extern crate tokio_tcp;
|
||||||
|
#[cfg(feature = "runtime")] extern crate tokio_timer;
|
||||||
extern crate want;
|
extern crate want;
|
||||||
|
|
||||||
#[cfg(all(test, feature = "nightly"))]
|
#[cfg(all(test, feature = "nightly"))]
|
||||||
|
|||||||
@@ -423,7 +423,7 @@ where
|
|||||||
Ok(Async::Ready(None))
|
Ok(Async::Ready(None))
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Err(_) => unreachable!("receiver cannot error"),
|
Err(never) => match never {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::net::{SocketAddr, TcpListener as StdTcpListener};
|
use std::net::{SocketAddr, TcpListener as StdTcpListener};
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use futures::{Async, Future, Poll, Stream};
|
use futures::{Async, Future, Poll, Stream};
|
||||||
use futures_timer::Delay;
|
|
||||||
use tokio_tcp::TcpListener;
|
|
||||||
use tokio_reactor::Handle;
|
use tokio_reactor::Handle;
|
||||||
|
use tokio_tcp::TcpListener;
|
||||||
|
use tokio_timer::Delay;
|
||||||
|
|
||||||
use self::addr_stream::AddrStream;
|
use self::addr_stream::AddrStream;
|
||||||
|
|
||||||
@@ -93,9 +93,12 @@ impl Stream for AddrIncoming {
|
|||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
// Check if a previous timeout is active that was set by IO errors.
|
// Check if a previous timeout is active that was set by IO errors.
|
||||||
if let Some(ref mut to) = self.timeout {
|
if let Some(ref mut to) = self.timeout {
|
||||||
match to.poll().expect("timeout never fails") {
|
match to.poll() {
|
||||||
Async::Ready(_) => {}
|
Ok(Async::Ready(())) => {}
|
||||||
Async::NotReady => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
|
Err(err) => {
|
||||||
|
error!("sleep timer error: {}", err);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.timeout = None;
|
self.timeout = None;
|
||||||
@@ -113,28 +116,38 @@ impl Stream for AddrIncoming {
|
|||||||
return Ok(Async::Ready(Some(AddrStream::new(socket, addr))));
|
return Ok(Async::Ready(Some(AddrStream::new(socket, addr))));
|
||||||
},
|
},
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
Err(ref e) if self.sleep_on_errors => {
|
Err(e) => {
|
||||||
|
if self.sleep_on_errors {
|
||||||
// Connection errors can be ignored directly, continue by
|
// Connection errors can be ignored directly, continue by
|
||||||
// accepting the next request.
|
// accepting the next request.
|
||||||
if is_connection_error(e) {
|
if is_connection_error(&e) {
|
||||||
debug!("accepted connection already errored: {}", e);
|
debug!("accepted connection already errored: {}", e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Sleep 1s.
|
// Sleep 1s.
|
||||||
let delay = Duration::from_secs(1);
|
let delay = Instant::now() + Duration::from_secs(1);
|
||||||
error!("accept error: {}", e);
|
|
||||||
let mut timeout = Delay::new(delay);
|
let mut timeout = Delay::new(delay);
|
||||||
let result = timeout.poll()
|
|
||||||
.expect("timeout never fails");
|
match timeout.poll() {
|
||||||
match result {
|
Ok(Async::Ready(())) => {
|
||||||
Async::Ready(()) => continue,
|
// Wow, it's been a second already? Ok then...
|
||||||
Async::NotReady => {
|
error!("accept error: {}", e);
|
||||||
|
continue
|
||||||
|
},
|
||||||
|
Ok(Async::NotReady) => {
|
||||||
|
error!("accept error: {}", e);
|
||||||
self.timeout = Some(timeout);
|
self.timeout = Some(timeout);
|
||||||
return Ok(Async::NotReady);
|
return Ok(Async::NotReady);
|
||||||
|
},
|
||||||
|
Err(timer_err) => {
|
||||||
|
error!("couldn't sleep on error, timer error: {}", timer_err);
|
||||||
|
return Err(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Err(e) => return Err(e),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -160,6 +160,8 @@ macro_rules! test {
|
|||||||
let expected_res_body = Option::<&[u8]>::from($response_body)
|
let expected_res_body = Option::<&[u8]>::from($response_body)
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
assert_eq!(body.as_ref(), expected_res_body);
|
assert_eq!(body.as_ref(), expected_res_body);
|
||||||
|
|
||||||
|
runtime.shutdown_on_idle().wait().expect("rt shutdown");
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
(
|
(
|
||||||
@@ -202,8 +204,10 @@ macro_rules! test {
|
|||||||
|
|
||||||
let closure = infer_closure($err);
|
let closure = infer_closure($err);
|
||||||
if !closure(&err) {
|
if !closure(&err) {
|
||||||
panic!("expected error, unexpected variant: {:?}", err)
|
panic!("expected error, unexpected variant: {:?}", err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
runtime.shutdown_on_idle().wait().expect("rt shutdown");
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -227,11 +231,6 @@ macro_rules! test {
|
|||||||
let addr = server.local_addr().expect("local_addr");
|
let addr = server.local_addr().expect("local_addr");
|
||||||
let runtime = $runtime;
|
let runtime = $runtime;
|
||||||
|
|
||||||
let mut config = Client::builder();
|
|
||||||
config.http1_title_case_headers($title_case_headers);
|
|
||||||
if !$set_host {
|
|
||||||
config.set_host(false);
|
|
||||||
}
|
|
||||||
let connector = ::hyper::client::HttpConnector::new_with_handle(1, runtime.reactor().clone());
|
let connector = ::hyper::client::HttpConnector::new_with_handle(1, runtime.reactor().clone());
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.set_host($set_host)
|
.set_host($set_host)
|
||||||
@@ -923,7 +922,6 @@ mod dispatch_impl {
|
|||||||
client.request(req)
|
client.request(req)
|
||||||
};
|
};
|
||||||
|
|
||||||
//let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
|
|
||||||
res.select2(rx1).wait().unwrap();
|
res.select2(rx1).wait().unwrap();
|
||||||
// res now dropped
|
// res now dropped
|
||||||
let t = Delay::new(Duration::from_millis(100))
|
let t = Delay::new(Duration::from_millis(100))
|
||||||
@@ -1088,54 +1086,6 @@ mod dispatch_impl {
|
|||||||
let _ = t.select(close).wait();
|
let _ = t.select(close).wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn client_custom_executor() {
|
|
||||||
let server = TcpListener::bind("127.0.0.1:0").unwrap();
|
|
||||||
let addr = server.local_addr().unwrap();
|
|
||||||
let runtime = Runtime::new().unwrap();
|
|
||||||
let handle = runtime.reactor();
|
|
||||||
let (closes_tx, closes) = mpsc::channel(10);
|
|
||||||
|
|
||||||
let (tx1, rx1) = oneshot::channel();
|
|
||||||
|
|
||||||
thread::spawn(move || {
|
|
||||||
let mut sock = server.accept().unwrap().0;
|
|
||||||
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
|
|
||||||
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
|
|
||||||
let mut buf = [0; 4096];
|
|
||||||
sock.read(&mut buf).expect("read 1");
|
|
||||||
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap();
|
|
||||||
let _ = tx1.send(());
|
|
||||||
});
|
|
||||||
|
|
||||||
let client = Client::builder()
|
|
||||||
.executor(runtime.executor())
|
|
||||||
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
|
|
||||||
|
|
||||||
let req = Request::builder()
|
|
||||||
.uri(&*format!("http://{}/a", addr))
|
|
||||||
.body(Body::empty())
|
|
||||||
.unwrap();
|
|
||||||
let res = client.request(req).and_then(move |res| {
|
|
||||||
assert_eq!(res.status(), hyper::StatusCode::OK);
|
|
||||||
res.into_body().concat2()
|
|
||||||
});
|
|
||||||
let rx = rx1.expect("thread panicked");
|
|
||||||
|
|
||||||
let timeout = Delay::new(Duration::from_millis(200));
|
|
||||||
let rx = rx.and_then(move |_| timeout.expect("timeout"));
|
|
||||||
res.join(rx).map(|r| r.0).wait().unwrap();
|
|
||||||
|
|
||||||
let t = Delay::new(Duration::from_millis(100))
|
|
||||||
.map(|_| panic!("time out"));
|
|
||||||
let close = closes.into_future()
|
|
||||||
.map(|(opt, _)| {
|
|
||||||
opt.expect("closes");
|
|
||||||
})
|
|
||||||
.map_err(|_| panic!("closes dropped"));
|
|
||||||
let _ = t.select(close).wait();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn connect_call_is_lazy() {
|
fn connect_call_is_lazy() {
|
||||||
// We especially don't want connects() triggered if there's
|
// We especially don't want connects() triggered if there's
|
||||||
@@ -1168,8 +1118,7 @@ mod dispatch_impl {
|
|||||||
let server = TcpListener::bind("127.0.0.1:0").unwrap();
|
let server = TcpListener::bind("127.0.0.1:0").unwrap();
|
||||||
let addr = server.local_addr().unwrap();
|
let addr = server.local_addr().unwrap();
|
||||||
let runtime = Runtime::new().unwrap();
|
let runtime = Runtime::new().unwrap();
|
||||||
let handle = runtime.reactor();
|
let connector = DebugConnector::new(runtime.reactor());
|
||||||
let connector = DebugConnector::new(&handle);
|
|
||||||
let connects = connector.connects.clone();
|
let connects = connector.connects.clone();
|
||||||
|
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
@@ -1222,6 +1171,9 @@ mod dispatch_impl {
|
|||||||
res.join(rx).map(|r| r.0).wait().unwrap();
|
res.join(rx).map(|r| r.0).wait().unwrap();
|
||||||
|
|
||||||
assert_eq!(connects.load(Ordering::SeqCst), 1, "second request should still only have 1 connect");
|
assert_eq!(connects.load(Ordering::SeqCst), 1, "second request should still only have 1 connect");
|
||||||
|
drop(client);
|
||||||
|
|
||||||
|
runtime.shutdown_on_idle().wait().expect("rt shutdown");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
Reference in New Issue
Block a user