Merge pull request #1502 from hyperium/tokio-timer
refactor(lib): change from futures-timer to tokio-timer
This commit is contained in:
		| @@ -18,7 +18,7 @@ cache: | |||||||
| script: | script: | ||||||
|   - ./.travis/readme.py |   - ./.travis/readme.py | ||||||
|   - cargo build $FEATURES |   - cargo build $FEATURES | ||||||
|   - 'if [ "$BUILD_ONLY" != "1" ]; then RUST_LOG=hyper cargo test $FEATURES; fi' |   - 'if [ "$BUILD_ONLY" != "1" ]; then RUST_LOG=hyper cargo test $FEATURES -- --test-threads=1; fi' | ||||||
|   - 'if [ $TRAVIS_RUST_VERSION = nightly ]; then for f in ./benches/*.rs; do cargo test --bench $(basename $f .rs) $FEATURES; done; fi' |   - 'if [ $TRAVIS_RUST_VERSION = nightly ]; then for f in ./benches/*.rs; do cargo test --bench $(basename $f .rs) $FEATURES; done; fi' | ||||||
|  |  | ||||||
| addons: | addons: | ||||||
|   | |||||||
| @@ -24,7 +24,6 @@ include = [ | |||||||
| bytes = "0.4.4" | bytes = "0.4.4" | ||||||
| futures = "0.1.21" | futures = "0.1.21" | ||||||
| futures-cpupool = { version = "0.1.6", optional = true } | futures-cpupool = { version = "0.1.6", optional = true } | ||||||
| futures-timer = "0.1.0" |  | ||||||
| http = "0.1.5" | http = "0.1.5" | ||||||
| httparse = "1.0" | httparse = "1.0" | ||||||
| h2 = "0.1.5" | h2 = "0.1.5" | ||||||
| @@ -37,9 +36,11 @@ tokio-executor = { version = "0.1.0", optional = true } | |||||||
| tokio-io = "0.1" | tokio-io = "0.1" | ||||||
| tokio-reactor = { version = "0.1", optional = true } | tokio-reactor = { version = "0.1", optional = true } | ||||||
| tokio-tcp = { version = "0.1", optional = true } | tokio-tcp = { version = "0.1", optional = true } | ||||||
|  | tokio-timer = { version = "0.2", optional = true } | ||||||
| want = "0.0.4" | want = "0.0.4" | ||||||
|  |  | ||||||
| [dev-dependencies] | [dev-dependencies] | ||||||
|  | futures-timer = "0.1" | ||||||
| num_cpus = "1.0" | num_cpus = "1.0" | ||||||
| pretty_env_logger = "0.2.0" | pretty_env_logger = "0.2.0" | ||||||
| spmc = "0.2" | spmc = "0.2" | ||||||
| @@ -55,6 +56,7 @@ runtime = [ | |||||||
|     "tokio-executor", |     "tokio-executor", | ||||||
|     "tokio-reactor", |     "tokio-reactor", | ||||||
|     "tokio-tcp", |     "tokio-tcp", | ||||||
|  |     "tokio-timer", | ||||||
| ] | ] | ||||||
|  |  | ||||||
| [[example]] | [[example]] | ||||||
|   | |||||||
| @@ -113,14 +113,6 @@ where C: Connect + Sync + 'static, | |||||||
|  |  | ||||||
|     /// Send a constructed Request using this Client. |     /// Send a constructed Request using this Client. | ||||||
|     pub fn request(&self, mut req: Request<B>) -> FutureResponse { |     pub fn request(&self, mut req: Request<B>) -> FutureResponse { | ||||||
|         // TODO(0.12): do this at construction time. |  | ||||||
|         // |  | ||||||
|         // It cannot be done in the constructor because the Client::configured |  | ||||||
|         // does not have `B: 'static` bounds, which are required to spawn |  | ||||||
|         // the interval. In 0.12, add a static bounds to the constructor, |  | ||||||
|         // and move this. |  | ||||||
|         self.schedule_pool_timer(); |  | ||||||
|  |  | ||||||
|         match req.version() { |         match req.version() { | ||||||
|             Version::HTTP_10 | |             Version::HTTP_10 | | ||||||
|             Version::HTTP_11 => (), |             Version::HTTP_11 => (), | ||||||
| @@ -302,7 +294,7 @@ where C: Connect + Sync + 'static, | |||||||
|                         // for a new request to start. |                         // for a new request to start. | ||||||
|                         // |                         // | ||||||
|                         // It won't be ready if there is a body to stream. |                         // It won't be ready if there is a body to stream. | ||||||
|                         if ver == Ver::Http2 || pooled.is_ready() { |                         if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() { | ||||||
|                             drop(pooled); |                             drop(pooled); | ||||||
|                         } else if !res.body().is_empty() { |                         } else if !res.body().is_empty() { | ||||||
|                             let (delayed_tx, delayed_rx) = oneshot::channel(); |                             let (delayed_tx, delayed_rx) = oneshot::channel(); | ||||||
| @@ -336,10 +328,6 @@ where C: Connect + Sync + 'static, | |||||||
|  |  | ||||||
|         Box::new(resp) |         Box::new(resp) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn schedule_pool_timer(&self) { |  | ||||||
|         self.pool.spawn_expired_interval(&self.executor); |  | ||||||
|     } |  | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<C, B> Clone for Client<C, B> { | impl<C, B> Clone for Client<C, B> { | ||||||
| @@ -474,7 +462,7 @@ impl<B: Payload + 'static> PoolClient<B> { | |||||||
|  |  | ||||||
| impl<B> Poolable for PoolClient<B> | impl<B> Poolable for PoolClient<B> | ||||||
| where | where | ||||||
|     B: 'static, |     B: Send + 'static, | ||||||
| { | { | ||||||
|     fn is_open(&self) -> bool { |     fn is_open(&self) -> bool { | ||||||
|         match self.tx { |         match self.tx { | ||||||
| @@ -700,7 +688,7 @@ impl Builder { | |||||||
|             executor: self.exec.clone(), |             executor: self.exec.clone(), | ||||||
|             h1_writev: self.h1_writev, |             h1_writev: self.h1_writev, | ||||||
|             h1_title_case_headers: self.h1_title_case_headers, |             h1_title_case_headers: self.h1_title_case_headers, | ||||||
|             pool: Pool::new(self.keep_alive, self.keep_alive_timeout), |             pool: Pool::new(self.keep_alive, self.keep_alive_timeout, &self.exec), | ||||||
|             retry_canceled_requests: self.retry_canceled_requests, |             retry_canceled_requests: self.retry_canceled_requests, | ||||||
|             set_host: self.set_host, |             set_host: self.set_host, | ||||||
|             ver: self.ver, |             ver: self.ver, | ||||||
|   | |||||||
| @@ -4,15 +4,16 @@ use std::ops::{Deref, DerefMut}; | |||||||
| use std::sync::{Arc, Mutex, Weak}; | use std::sync::{Arc, Mutex, Weak}; | ||||||
| use std::time::{Duration, Instant}; | use std::time::{Duration, Instant}; | ||||||
|  |  | ||||||
| use futures::{Future, Async, Poll, Stream}; | use futures::{Future, Async, Poll}; | ||||||
| use futures::sync::oneshot; | use futures::sync::oneshot; | ||||||
| use futures_timer::Interval; | #[cfg(feature = "runtime")] | ||||||
|  | use tokio_timer::Interval; | ||||||
|  |  | ||||||
| use common::{Exec, Never}; | use common::Exec; | ||||||
| use super::Ver; | use super::Ver; | ||||||
|  |  | ||||||
| pub(super) struct Pool<T> { | pub(super) struct Pool<T> { | ||||||
|     inner: Arc<Mutex<PoolInner<T>>>, |     inner: Arc<PoolInner<T>>, | ||||||
| } | } | ||||||
|  |  | ||||||
| // Before using a pooled connection, make sure the sender is not dead. | // Before using a pooled connection, make sure the sender is not dead. | ||||||
| @@ -20,7 +21,7 @@ pub(super) struct Pool<T> { | |||||||
| // This is a trait to allow the `client::pool::tests` to work for `i32`. | // This is a trait to allow the `client::pool::tests` to work for `i32`. | ||||||
| // | // | ||||||
| // See https://github.com/hyperium/hyper/issues/1429 | // See https://github.com/hyperium/hyper/issues/1429 | ||||||
| pub(super) trait Poolable: Sized { | pub(super) trait Poolable: Send + Sized + 'static { | ||||||
|     fn is_open(&self) -> bool; |     fn is_open(&self) -> bool; | ||||||
|     /// Reserve this connection. |     /// Reserve this connection. | ||||||
|     /// |     /// | ||||||
| @@ -47,11 +48,20 @@ pub(super) enum Reservation<T> { | |||||||
| type Key = (Arc<String>, Ver); | type Key = (Arc<String>, Ver); | ||||||
|  |  | ||||||
| struct PoolInner<T> { | struct PoolInner<T> { | ||||||
|  |     connections: Mutex<Connections<T>>, | ||||||
|  |     enabled: bool, | ||||||
|  |     /// A single Weak pointer used every time a proper weak reference | ||||||
|  |     /// is not needed. This prevents allocating space in the heap to hold | ||||||
|  |     /// a PoolInner<T> *every single time*, and instead we just allocate | ||||||
|  |     /// this one extra per pool. | ||||||
|  |     weak: Weak<PoolInner<T>>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | struct Connections<T> { | ||||||
|     // A flag that a connection is being estabilished, and the connection |     // A flag that a connection is being estabilished, and the connection | ||||||
|     // should be shared. This prevents making multiple HTTP/2 connections |     // should be shared. This prevents making multiple HTTP/2 connections | ||||||
|     // to the same host. |     // to the same host. | ||||||
|     connecting: HashSet<Key>, |     connecting: HashSet<Key>, | ||||||
|     enabled: bool, |  | ||||||
|     // These are internal Conns sitting in the event loop in the KeepAlive |     // These are internal Conns sitting in the event loop in the KeepAlive | ||||||
|     // state, waiting to receive a new Request to send on the socket. |     // state, waiting to receive a new Request to send on the socket. | ||||||
|     idle: HashMap<Key, Vec<Idle<T>>>, |     idle: HashMap<Key, Vec<Idle<T>>>, | ||||||
| @@ -65,23 +75,44 @@ struct PoolInner<T> { | |||||||
|     // them that the Conn could be used instead of waiting for a brand new |     // them that the Conn could be used instead of waiting for a brand new | ||||||
|     // connection. |     // connection. | ||||||
|     waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>, |     waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>, | ||||||
|     timeout: Option<Duration>, |  | ||||||
|     // A oneshot channel is used to allow the interval to be notified when |     // A oneshot channel is used to allow the interval to be notified when | ||||||
|     // the Pool completely drops. That way, the interval can cancel immediately. |     // the Pool completely drops. That way, the interval can cancel immediately. | ||||||
|     idle_interval_ref: Option<oneshot::Sender<Never>>, |     #[cfg(feature = "runtime")] | ||||||
|  |     idle_interval_ref: Option<oneshot::Sender<::common::Never>>, | ||||||
|  |     #[cfg(feature = "runtime")] | ||||||
|  |     exec: Exec, | ||||||
|  |     timeout: Option<Duration>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T> Pool<T> { | impl<T> Pool<T> { | ||||||
|     pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> { |     pub fn new(enabled: bool, timeout: Option<Duration>, __exec: &Exec) -> Pool<T> { | ||||||
|         Pool { |         Pool { | ||||||
|             inner: Arc::new(Mutex::new(PoolInner { |             inner: Arc::new(PoolInner { | ||||||
|                 connecting: HashSet::new(), |                 connections: Mutex::new(Connections { | ||||||
|                 enabled: enabled, |                     connecting: HashSet::new(), | ||||||
|                 idle: HashMap::new(), |                     idle: HashMap::new(), | ||||||
|                 idle_interval_ref: None, |                     #[cfg(feature = "runtime")] | ||||||
|                 waiters: HashMap::new(), |                     idle_interval_ref: None, | ||||||
|                 timeout: timeout, |                     waiters: HashMap::new(), | ||||||
|             })), |                     #[cfg(feature = "runtime")] | ||||||
|  |                     exec: __exec.clone(), | ||||||
|  |                     timeout, | ||||||
|  |                 }), | ||||||
|  |                 enabled, | ||||||
|  |                 weak: Weak::new(), | ||||||
|  |             }), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[cfg(test)] | ||||||
|  |     pub(super) fn no_timer(&self) { | ||||||
|  |         // Prevent an actual interval from being created for this pool... | ||||||
|  |         #[cfg(feature = "runtime")] | ||||||
|  |         { | ||||||
|  |             let mut inner = self.inner.connections.lock().unwrap(); | ||||||
|  |             assert!(inner.idle_interval_ref.is_none(), "timer already spawned"); | ||||||
|  |             let (tx, _) = oneshot::channel(); | ||||||
|  |             inner.idle_interval_ref = Some(tx); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -100,8 +131,8 @@ impl<T: Poolable> Pool<T> { | |||||||
|     /// Ensure that there is only ever 1 connecting task for HTTP/2 |     /// Ensure that there is only ever 1 connecting task for HTTP/2 | ||||||
|     /// connections. This does nothing for HTTP/1. |     /// connections. This does nothing for HTTP/1. | ||||||
|     pub(super) fn connecting(&self, key: &Key) -> Option<Connecting<T>> { |     pub(super) fn connecting(&self, key: &Key) -> Option<Connecting<T>> { | ||||||
|         if key.1 == Ver::Http2 { |         if key.1 == Ver::Http2 && self.inner.enabled { | ||||||
|             let mut inner = self.inner.lock().unwrap(); |             let mut inner = self.inner.connections.lock().unwrap(); | ||||||
|             if inner.connecting.insert(key.clone()) { |             if inner.connecting.insert(key.clone()) { | ||||||
|                 let connecting = Connecting { |                 let connecting = Connecting { | ||||||
|                     key: key.clone(), |                     key: key.clone(), | ||||||
| @@ -117,14 +148,14 @@ impl<T: Poolable> Pool<T> { | |||||||
|                 key: key.clone(), |                 key: key.clone(), | ||||||
|                 // in HTTP/1's case, there is never a lock, so we don't |                 // in HTTP/1's case, there is never a lock, so we don't | ||||||
|                 // need to do anything in Drop. |                 // need to do anything in Drop. | ||||||
|                 pool: Weak::new(), |                 pool: self.inner.weak.clone(), | ||||||
|             }) |             }) | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn take(&self, key: &Key) -> Option<Pooled<T>> { |     fn take(&self, key: &Key) -> Option<Pooled<T>> { | ||||||
|         let entry = { |         let entry = { | ||||||
|             let mut inner = self.inner.lock().unwrap(); |             let mut inner = self.inner.connections.lock().unwrap(); | ||||||
|             let expiration = Expiration::new(inner.timeout); |             let expiration = Expiration::new(inner.timeout); | ||||||
|             let maybe_entry = inner.idle.get_mut(key) |             let maybe_entry = inner.idle.get_mut(key) | ||||||
|                 .and_then(|list| { |                 .and_then(|list| { | ||||||
| @@ -158,35 +189,45 @@ impl<T: Poolable> Pool<T> { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> { |     pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> { | ||||||
|         let (value, pool_ref) = match value.reserve() { |         let (value, pool_ref, has_pool) = if self.inner.enabled { | ||||||
|             Reservation::Shared(to_insert, to_return) => { |             match value.reserve() { | ||||||
|                 debug_assert_eq!( |                 Reservation::Shared(to_insert, to_return) => { | ||||||
|                     connecting.key.1, |                     debug_assert_eq!( | ||||||
|                     Ver::Http2, |                         connecting.key.1, | ||||||
|                     "shared reservation without Http2" |                         Ver::Http2, | ||||||
|                 ); |                         "shared reservation without Http2" | ||||||
|                 let mut inner = self.inner.lock().unwrap(); |                     ); | ||||||
|                 inner.put(connecting.key.clone(), to_insert); |                     let mut inner = self.inner.connections.lock().unwrap(); | ||||||
|                 // Do this here instead of Drop for Connecting because we |                     inner.put(connecting.key.clone(), to_insert, &self.inner); | ||||||
|                 // already have a lock, no need to lock the mutex twice. |                     // Do this here instead of Drop for Connecting because we | ||||||
|                 inner.connected(&connecting.key); |                     // already have a lock, no need to lock the mutex twice. | ||||||
|                 // prevent the Drop of Connecting from repeating inner.connected() |                     inner.connected(&connecting.key); | ||||||
|                 connecting.pool = Weak::new(); |                     // prevent the Drop of Connecting from repeating inner.connected() | ||||||
|  |                     connecting.pool = self.inner.weak.clone(); | ||||||
|  |  | ||||||
|                 // Shared reservations don't need a reference to the pool, |                     // Shared reservations don't need a reference to the pool, | ||||||
|                 // since the pool always keeps a copy. |                     // since the pool always keeps a copy. | ||||||
|                 (to_return, Weak::new()) |                     (to_return, self.inner.weak.clone(), false) | ||||||
|             }, |                 }, | ||||||
|             Reservation::Unique(value) => { |                 Reservation::Unique(value) => { | ||||||
|                 // Unique reservations must take a reference to the pool |                     // Unique reservations must take a reference to the pool | ||||||
|                 // since they hope to reinsert once the reservation is |                     // since they hope to reinsert once the reservation is | ||||||
|                 // completed |                     // completed | ||||||
|                 (value, Arc::downgrade(&self.inner)) |                     (value, Arc::downgrade(&self.inner), true) | ||||||
|             }, |                 }, | ||||||
|  |             } | ||||||
|  |         } else { | ||||||
|  |             // If pool is not enabled, skip all the things... | ||||||
|  |  | ||||||
|  |             // The Connecting should have had no pool ref | ||||||
|  |             debug_assert!(connecting.pool.upgrade().is_none()); | ||||||
|  |  | ||||||
|  |             (value, self.inner.weak.clone(), false) | ||||||
|         }; |         }; | ||||||
|         Pooled { |         Pooled { | ||||||
|             is_reused: false, |  | ||||||
|             key: connecting.key.clone(), |             key: connecting.key.clone(), | ||||||
|  |             has_pool, | ||||||
|  |             is_reused: false, | ||||||
|             pool: pool_ref, |             pool: pool_ref, | ||||||
|             value: Some(value) |             value: Some(value) | ||||||
|         } |         } | ||||||
| @@ -202,13 +243,14 @@ impl<T: Poolable> Pool<T> { | |||||||
|         // we just have the final value, without knowledge of if this is |         // we just have the final value, without knowledge of if this is | ||||||
|         // unique or shared. So, the hack is to just assume Ver::Http2 means |         // unique or shared. So, the hack is to just assume Ver::Http2 means | ||||||
|         // shared... :( |         // shared... :( | ||||||
|         let pool_ref = if key.1 == Ver::Http2 { |         let (pool_ref, has_pool) = if key.1 == Ver::Http2 { | ||||||
|             Weak::new() |             (self.inner.weak.clone(), false) | ||||||
|         } else { |         } else { | ||||||
|             Arc::downgrade(&self.inner) |             (Arc::downgrade(&self.inner), true) | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         Pooled { |         Pooled { | ||||||
|  |             has_pool, | ||||||
|             is_reused: true, |             is_reused: true, | ||||||
|             key: key.clone(), |             key: key.clone(), | ||||||
|             pool: pool_ref, |             pool: pool_ref, | ||||||
| @@ -218,7 +260,7 @@ impl<T: Poolable> Pool<T> { | |||||||
|  |  | ||||||
|     fn waiter(&mut self, key: Key, tx: oneshot::Sender<T>) { |     fn waiter(&mut self, key: Key, tx: oneshot::Sender<T>) { | ||||||
|         trace!("checkout waiting for idle connection: {:?}", key); |         trace!("checkout waiting for idle connection: {:?}", key); | ||||||
|         self.inner.lock().unwrap() |         self.inner.connections.lock().unwrap() | ||||||
|             .waiters.entry(key) |             .waiters.entry(key) | ||||||
|             .or_insert(VecDeque::new()) |             .or_insert(VecDeque::new()) | ||||||
|             .push_back(tx); |             .push_back(tx); | ||||||
| @@ -274,11 +316,8 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T: Poolable> PoolInner<T> { | impl<T: Poolable> Connections<T> { | ||||||
|     fn put(&mut self, key: Key, value: T) { |     fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<PoolInner<T>>) { | ||||||
|         if !self.enabled { |  | ||||||
|             return; |  | ||||||
|         } |  | ||||||
|         if key.1 == Ver::Http2 && self.idle.contains_key(&key) { |         if key.1 == Ver::Http2 && self.idle.contains_key(&key) { | ||||||
|             trace!("put; existing idle HTTP/2 connection for {:?}", key); |             trace!("put; existing idle HTTP/2 connection for {:?}", key); | ||||||
|             return; |             return; | ||||||
| @@ -328,6 +367,11 @@ impl<T: Poolable> PoolInner<T> { | |||||||
|                          value: value, |                          value: value, | ||||||
|                          idle_at: Instant::now(), |                          idle_at: Instant::now(), | ||||||
|                      }); |                      }); | ||||||
|  |  | ||||||
|  |                 #[cfg(feature = "runtime")] | ||||||
|  |                 { | ||||||
|  |                     self.spawn_idle_interval(__pool_ref); | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|             None => trace!("put; found waiter for {:?}", key), |             None => trace!("put; found waiter for {:?}", key), | ||||||
|         } |         } | ||||||
| @@ -346,9 +390,37 @@ impl<T: Poolable> PoolInner<T> { | |||||||
|         // those waiters would never receive a connection. |         // those waiters would never receive a connection. | ||||||
|         self.waiters.remove(key); |         self.waiters.remove(key); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[cfg(feature = "runtime")] | ||||||
|  |     fn spawn_idle_interval(&mut self, pool_ref: &Arc<PoolInner<T>>) { | ||||||
|  |         let (dur, rx) = { | ||||||
|  |             debug_assert!(pool_ref.enabled); | ||||||
|  |  | ||||||
|  |             if self.idle_interval_ref.is_some() { | ||||||
|  |                 return; | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             if let Some(dur) = self.timeout { | ||||||
|  |                 let (tx, rx) = oneshot::channel(); | ||||||
|  |                 self.idle_interval_ref = Some(tx); | ||||||
|  |                 (dur, rx) | ||||||
|  |             } else { | ||||||
|  |                 return | ||||||
|  |             } | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         let start = Instant::now() + dur; | ||||||
|  |  | ||||||
|  |         let interval = Interval::new(start, dur); | ||||||
|  |         self.exec.execute(IdleInterval { | ||||||
|  |             interval: interval, | ||||||
|  |             pool: Arc::downgrade(pool_ref), | ||||||
|  |             pool_drop_notifier: rx, | ||||||
|  |         }); | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T> PoolInner<T> { | impl<T> Connections<T> { | ||||||
|     /// Any `FutureResponse`s that were created will have made a `Checkout`, |     /// Any `FutureResponse`s that were created will have made a `Checkout`, | ||||||
|     /// and possibly inserted into the pool that it is waiting for an idle |     /// and possibly inserted into the pool that it is waiting for an idle | ||||||
|     /// connection. If a user ever dropped that future, we need to clean out |     /// connection. If a user ever dropped that future, we need to clean out | ||||||
| @@ -367,7 +439,8 @@ impl<T> PoolInner<T> { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T: Poolable> PoolInner<T> { | #[cfg(feature = "runtime")] | ||||||
|  | impl<T: Poolable> Connections<T> { | ||||||
|     /// This should *only* be called by the IdleInterval. |     /// This should *only* be called by the IdleInterval. | ||||||
|     fn clear_expired(&mut self) { |     fn clear_expired(&mut self) { | ||||||
|         let dur = self.timeout.expect("interval assumes timeout"); |         let dur = self.timeout.expect("interval assumes timeout"); | ||||||
| @@ -396,38 +469,6 @@ impl<T: Poolable> PoolInner<T> { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
| impl<T: Poolable + Send + 'static> Pool<T> { |  | ||||||
|     pub(super) fn spawn_expired_interval(&self, exec: &Exec) { |  | ||||||
|         let (dur, rx) = { |  | ||||||
|             let mut inner = self.inner.lock().unwrap(); |  | ||||||
|  |  | ||||||
|             if !inner.enabled { |  | ||||||
|                 return; |  | ||||||
|             } |  | ||||||
|  |  | ||||||
|             if inner.idle_interval_ref.is_some() { |  | ||||||
|                 return; |  | ||||||
|             } |  | ||||||
|  |  | ||||||
|             if let Some(dur) = inner.timeout { |  | ||||||
|                 let (tx, rx) = oneshot::channel(); |  | ||||||
|                 inner.idle_interval_ref = Some(tx); |  | ||||||
|                 (dur, rx) |  | ||||||
|             } else { |  | ||||||
|                 return |  | ||||||
|             } |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         let interval = Interval::new(dur); |  | ||||||
|         exec.execute(IdleInterval { |  | ||||||
|             interval: interval, |  | ||||||
|             pool: Arc::downgrade(&self.inner), |  | ||||||
|             pool_drop_notifier: rx, |  | ||||||
|         }); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl<T> Clone for Pool<T> { | impl<T> Clone for Pool<T> { | ||||||
|     fn clone(&self) -> Pool<T> { |     fn clone(&self) -> Pool<T> { | ||||||
|         Pool { |         Pool { | ||||||
| @@ -440,9 +481,10 @@ impl<T> Clone for Pool<T> { | |||||||
| // Note: The bounds `T: Poolable` is needed for the Drop impl. | // Note: The bounds `T: Poolable` is needed for the Drop impl. | ||||||
| pub(super) struct Pooled<T: Poolable> { | pub(super) struct Pooled<T: Poolable> { | ||||||
|     value: Option<T>, |     value: Option<T>, | ||||||
|  |     has_pool: bool, | ||||||
|     is_reused: bool, |     is_reused: bool, | ||||||
|     key: Key, |     key: Key, | ||||||
|     pool: Weak<Mutex<PoolInner<T>>>, |     pool: Weak<PoolInner<T>>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T: Poolable> Pooled<T> { | impl<T: Poolable> Pooled<T> { | ||||||
| @@ -450,6 +492,10 @@ impl<T: Poolable> Pooled<T> { | |||||||
|         self.is_reused |         self.is_reused | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn is_pool_enabled(&self) -> bool { | ||||||
|  |         self.has_pool | ||||||
|  |     } | ||||||
|  |  | ||||||
|     fn as_ref(&self) -> &T { |     fn as_ref(&self) -> &T { | ||||||
|         self.value.as_ref().expect("not dropped") |         self.value.as_ref().expect("not dropped") | ||||||
|     } |     } | ||||||
| @@ -481,9 +527,13 @@ impl<T: Poolable> Drop for Pooled<T> { | |||||||
|                 return; |                 return; | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             if let Some(inner) = self.pool.upgrade() { |             if let Some(pool) = self.pool.upgrade() { | ||||||
|                 if let Ok(mut inner) = inner.lock() { |                 // Pooled should not have had a real reference if pool is | ||||||
|                     inner.put(self.key.clone(), value); |                 // not enabled! | ||||||
|  |                 debug_assert!(pool.enabled); | ||||||
|  |  | ||||||
|  |                 if let Ok(mut inner) = pool.connections.lock() { | ||||||
|  |                     inner.put(self.key.clone(), value, &pool); | ||||||
|                 } |                 } | ||||||
|             } else if self.key.1 == Ver::Http1 { |             } else if self.key.1 == Ver::Http1 { | ||||||
|                 trace!("pool dropped, dropping pooled ({:?})", self.key); |                 trace!("pool dropped, dropping pooled ({:?})", self.key); | ||||||
| @@ -569,7 +619,7 @@ impl<T: Poolable> Future for Checkout<T> { | |||||||
| impl<T> Drop for Checkout<T> { | impl<T> Drop for Checkout<T> { | ||||||
|     fn drop(&mut self) { |     fn drop(&mut self) { | ||||||
|         if self.waiter.take().is_some() { |         if self.waiter.take().is_some() { | ||||||
|             if let Ok(mut inner) = self.pool.inner.lock() { |             if let Ok(mut inner) = self.pool.inner.connections.lock() { | ||||||
|                 inner.clean_waiters(&self.key); |                 inner.clean_waiters(&self.key); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| @@ -578,14 +628,14 @@ impl<T> Drop for Checkout<T> { | |||||||
|  |  | ||||||
| pub(super) struct Connecting<T: Poolable> { | pub(super) struct Connecting<T: Poolable> { | ||||||
|     key: Key, |     key: Key, | ||||||
|     pool: Weak<Mutex<PoolInner<T>>>, |     pool: Weak<PoolInner<T>>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T: Poolable> Drop for Connecting<T> { | impl<T: Poolable> Drop for Connecting<T> { | ||||||
|     fn drop(&mut self) { |     fn drop(&mut self) { | ||||||
|         if let Some(pool) = self.pool.upgrade() { |         if let Some(pool) = self.pool.upgrade() { | ||||||
|             // No need to panic on drop, that could abort! |             // No need to panic on drop, that could abort! | ||||||
|             if let Ok(mut inner) = pool.lock() { |             if let Ok(mut inner) = pool.connections.lock() { | ||||||
|                 debug_assert_eq!( |                 debug_assert_eq!( | ||||||
|                     self.key.1, |                     self.key.1, | ||||||
|                     Ver::Http2, |                     Ver::Http2, | ||||||
| @@ -612,20 +662,25 @@ impl Expiration { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[cfg(feature = "runtime")] | ||||||
| struct IdleInterval<T> { | struct IdleInterval<T> { | ||||||
|     interval: Interval, |     interval: Interval, | ||||||
|     pool: Weak<Mutex<PoolInner<T>>>, |     pool: Weak<PoolInner<T>>, | ||||||
|     // This allows the IdleInterval to be notified as soon as the entire |     // This allows the IdleInterval to be notified as soon as the entire | ||||||
|     // Pool is fully dropped, and shutdown. This channel is never sent on, |     // Pool is fully dropped, and shutdown. This channel is never sent on, | ||||||
|     // but Err(Canceled) will be received when the Pool is dropped. |     // but Err(Canceled) will be received when the Pool is dropped. | ||||||
|     pool_drop_notifier: oneshot::Receiver<Never>, |     pool_drop_notifier: oneshot::Receiver<::common::Never>, | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[cfg(feature = "runtime")] | ||||||
| impl<T: Poolable + 'static> Future for IdleInterval<T> { | impl<T: Poolable + 'static> Future for IdleInterval<T> { | ||||||
|     type Item = (); |     type Item = (); | ||||||
|     type Error = (); |     type Error = (); | ||||||
|  |  | ||||||
|     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { |     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||||
|  |         // Interval is a Stream | ||||||
|  |         use futures::Stream; | ||||||
|  |  | ||||||
|         loop { |         loop { | ||||||
|             match self.pool_drop_notifier.poll() { |             match self.pool_drop_notifier.poll() { | ||||||
|                 Ok(Async::Ready(n)) => match n {}, |                 Ok(Async::Ready(n)) => match n {}, | ||||||
| @@ -636,10 +691,13 @@ impl<T: Poolable + 'static> Future for IdleInterval<T> { | |||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error"))); |             try_ready!(self.interval.poll().map_err(|err| { | ||||||
|  |                 error!("idle interval timer error: {}", err); | ||||||
|  |             })); | ||||||
|  |  | ||||||
|             if let Some(inner) = self.pool.upgrade() { |             if let Some(inner) = self.pool.upgrade() { | ||||||
|                 if let Ok(mut inner) = inner.lock() { |                 if let Ok(mut inner) = inner.connections.lock() { | ||||||
|  |                     trace!("idle interval checking for expired"); | ||||||
|                     inner.clear_expired(); |                     inner.clear_expired(); | ||||||
|                     continue; |                     continue; | ||||||
|                 } |                 } | ||||||
| @@ -655,13 +713,14 @@ mod tests { | |||||||
|     use std::time::Duration; |     use std::time::Duration; | ||||||
|     use futures::{Async, Future}; |     use futures::{Async, Future}; | ||||||
|     use futures::future; |     use futures::future; | ||||||
|  |     use common::Exec; | ||||||
|     use super::{Connecting, Key, Poolable, Pool, Reservation, Ver}; |     use super::{Connecting, Key, Poolable, Pool, Reservation, Ver}; | ||||||
|  |  | ||||||
|     /// Test unique reservations. |     /// Test unique reservations. | ||||||
|     #[derive(Debug, PartialEq, Eq)] |     #[derive(Debug, PartialEq, Eq)] | ||||||
|     struct Uniq<T>(T); |     struct Uniq<T>(T); | ||||||
|  |  | ||||||
|     impl<T> Poolable for Uniq<T> { |     impl<T: Send + 'static> Poolable for Uniq<T> { | ||||||
|         fn is_open(&self) -> bool { |         fn is_open(&self) -> bool { | ||||||
|             true |             true | ||||||
|         } |         } | ||||||
| @@ -678,9 +737,15 @@ mod tests { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     fn pool_no_timer<T>() -> Pool<T> { | ||||||
|  |         let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Default); | ||||||
|  |         pool.no_timer(); | ||||||
|  |         pool | ||||||
|  |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_pool_checkout_smoke() { |     fn test_pool_checkout_smoke() { | ||||||
|         let pool = Pool::new(true, Some(Duration::from_secs(5))); |         let pool = pool_no_timer(); | ||||||
|         let key = (Arc::new("foo".to_string()), Ver::Http1); |         let key = (Arc::new("foo".to_string()), Ver::Http1); | ||||||
|         let pooled = pool.pooled(c(key.clone()), Uniq(41)); |         let pooled = pool.pooled(c(key.clone()), Uniq(41)); | ||||||
|  |  | ||||||
| @@ -695,11 +760,11 @@ mod tests { | |||||||
|     #[test] |     #[test] | ||||||
|     fn test_pool_checkout_returns_none_if_expired() { |     fn test_pool_checkout_returns_none_if_expired() { | ||||||
|         future::lazy(|| { |         future::lazy(|| { | ||||||
|             let pool = Pool::new(true, Some(Duration::from_millis(100))); |             let pool = pool_no_timer(); | ||||||
|             let key = (Arc::new("foo".to_string()), Ver::Http1); |             let key = (Arc::new("foo".to_string()), Ver::Http1); | ||||||
|             let pooled = pool.pooled(c(key.clone()), Uniq(41)); |             let pooled = pool.pooled(c(key.clone()), Uniq(41)); | ||||||
|             drop(pooled); |             drop(pooled); | ||||||
|             ::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap()); |             ::std::thread::sleep(pool.inner.connections.lock().unwrap().timeout.unwrap()); | ||||||
|             assert!(pool.checkout(key).poll().unwrap().is_not_ready()); |             assert!(pool.checkout(key).poll().unwrap().is_not_ready()); | ||||||
|             ::futures::future::ok::<(), ()>(()) |             ::futures::future::ok::<(), ()>(()) | ||||||
|         }).wait().unwrap(); |         }).wait().unwrap(); | ||||||
| @@ -708,19 +773,19 @@ mod tests { | |||||||
|     #[test] |     #[test] | ||||||
|     fn test_pool_checkout_removes_expired() { |     fn test_pool_checkout_removes_expired() { | ||||||
|         future::lazy(|| { |         future::lazy(|| { | ||||||
|             let pool = Pool::new(true, Some(Duration::from_millis(100))); |             let pool = pool_no_timer(); | ||||||
|             let key = (Arc::new("foo".to_string()), Ver::Http1); |             let key = (Arc::new("foo".to_string()), Ver::Http1); | ||||||
|  |  | ||||||
|             pool.pooled(c(key.clone()), Uniq(41)); |             pool.pooled(c(key.clone()), Uniq(41)); | ||||||
|             pool.pooled(c(key.clone()), Uniq(5)); |             pool.pooled(c(key.clone()), Uniq(5)); | ||||||
|             pool.pooled(c(key.clone()), Uniq(99)); |             pool.pooled(c(key.clone()), Uniq(99)); | ||||||
|  |  | ||||||
|             assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); |             assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); | ||||||
|             ::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap()); |             ::std::thread::sleep(pool.inner.connections.lock().unwrap().timeout.unwrap()); | ||||||
|  |  | ||||||
|             // checkout.poll() should clean out the expired |             // checkout.poll() should clean out the expired | ||||||
|             pool.checkout(key.clone()).poll().unwrap(); |             pool.checkout(key.clone()).poll().unwrap(); | ||||||
|             assert!(pool.inner.lock().unwrap().idle.get(&key).is_none()); |             assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none()); | ||||||
|  |  | ||||||
|             Ok::<(), ()>(()) |             Ok::<(), ()>(()) | ||||||
|         }).wait().unwrap(); |         }).wait().unwrap(); | ||||||
| @@ -730,30 +795,26 @@ mod tests { | |||||||
|     #[test] |     #[test] | ||||||
|     fn test_pool_timer_removes_expired() { |     fn test_pool_timer_removes_expired() { | ||||||
|         use std::sync::Arc; |         use std::sync::Arc; | ||||||
|         use common::Exec; |  | ||||||
|         let runtime = ::tokio::runtime::Runtime::new().unwrap(); |         let runtime = ::tokio::runtime::Runtime::new().unwrap(); | ||||||
|         let pool = Pool::new(true, Some(Duration::from_millis(100))); |  | ||||||
|  |  | ||||||
|         let executor = runtime.executor(); |         let executor = runtime.executor(); | ||||||
|         pool.spawn_expired_interval(&Exec::Executor(Arc::new(executor))); |         let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Executor(Arc::new(executor))); | ||||||
|  |  | ||||||
|         let key = (Arc::new("foo".to_string()), Ver::Http1); |         let key = (Arc::new("foo".to_string()), Ver::Http1); | ||||||
|  |  | ||||||
|         pool.pooled(c(key.clone()), Uniq(41)); |         pool.pooled(c(key.clone()), Uniq(41)); | ||||||
|         pool.pooled(c(key.clone()), Uniq(5)); |         pool.pooled(c(key.clone()), Uniq(5)); | ||||||
|         pool.pooled(c(key.clone()), Uniq(99)); |         pool.pooled(c(key.clone()), Uniq(99)); | ||||||
|  |  | ||||||
|         assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); |         assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); | ||||||
|  |  | ||||||
|         ::futures_timer::Delay::new( |         ::std::thread::sleep(Duration::from_millis(400)); // allow for too-good resolution | ||||||
|             Duration::from_millis(400) // allow for too-good resolution |  | ||||||
|         ).wait().unwrap(); |  | ||||||
|  |  | ||||||
|         assert!(pool.inner.lock().unwrap().idle.get(&key).is_none()); |         assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none()); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn test_pool_checkout_task_unparked() { |     fn test_pool_checkout_task_unparked() { | ||||||
|         let pool = Pool::new(true, Some(Duration::from_secs(10))); |         let pool = pool_no_timer(); | ||||||
|         let key = (Arc::new("foo".to_string()), Ver::Http1); |         let key = (Arc::new("foo".to_string()), Ver::Http1); | ||||||
|         let pooled = pool.pooled(c(key.clone()), Uniq(41)); |         let pooled = pool.pooled(c(key.clone()), Uniq(41)); | ||||||
|  |  | ||||||
| @@ -772,7 +833,7 @@ mod tests { | |||||||
|     #[test] |     #[test] | ||||||
|     fn test_pool_checkout_drop_cleans_up_waiters() { |     fn test_pool_checkout_drop_cleans_up_waiters() { | ||||||
|         future::lazy(|| { |         future::lazy(|| { | ||||||
|             let pool = Pool::<Uniq<i32>>::new(true, Some(Duration::from_secs(10))); |             let pool = pool_no_timer::<Uniq<i32>>(); | ||||||
|             let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); |             let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); | ||||||
|  |  | ||||||
|             let mut checkout1 = pool.checkout(key.clone()); |             let mut checkout1 = pool.checkout(key.clone()); | ||||||
| @@ -780,16 +841,16 @@ mod tests { | |||||||
|  |  | ||||||
|             // first poll needed to get into Pool's parked |             // first poll needed to get into Pool's parked | ||||||
|             checkout1.poll().unwrap(); |             checkout1.poll().unwrap(); | ||||||
|             assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 1); |             assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 1); | ||||||
|             checkout2.poll().unwrap(); |             checkout2.poll().unwrap(); | ||||||
|             assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 2); |             assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 2); | ||||||
|  |  | ||||||
|             // on drop, clean up Pool |             // on drop, clean up Pool | ||||||
|             drop(checkout1); |             drop(checkout1); | ||||||
|             assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 1); |             assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 1); | ||||||
|  |  | ||||||
|             drop(checkout2); |             drop(checkout2); | ||||||
|             assert!(pool.inner.lock().unwrap().waiters.get(&key).is_none()); |             assert!(pool.inner.connections.lock().unwrap().waiters.get(&key).is_none()); | ||||||
|  |  | ||||||
|             ::futures::future::ok::<(), ()>(()) |             ::futures::future::ok::<(), ()>(()) | ||||||
|         }).wait().unwrap(); |         }).wait().unwrap(); | ||||||
| @@ -813,13 +874,13 @@ mod tests { | |||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn pooled_drop_if_closed_doesnt_reinsert() { |     fn pooled_drop_if_closed_doesnt_reinsert() { | ||||||
|         let pool = Pool::new(true, Some(Duration::from_secs(10))); |         let pool = pool_no_timer(); | ||||||
|         let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); |         let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); | ||||||
|         pool.pooled(c(key.clone()), CanClose { |         pool.pooled(c(key.clone()), CanClose { | ||||||
|             val: 57, |             val: 57, | ||||||
|             closed: true, |             closed: true, | ||||||
|         }); |         }); | ||||||
|  |  | ||||||
|         assert!(!pool.inner.lock().unwrap().idle.contains_key(&key)); |         assert!(!pool.inner.connections.lock().unwrap().idle.contains_key(&key)); | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -25,6 +25,8 @@ fn retryable_request() { | |||||||
|         .executor(executor.sender().clone()) |         .executor(executor.sender().clone()) | ||||||
|         .build::<_, ::Body>(connector); |         .build::<_, ::Body>(connector); | ||||||
|  |  | ||||||
|  |     client.pool.no_timer(); | ||||||
|  |  | ||||||
|     { |     { | ||||||
|  |  | ||||||
|         let req = Request::builder() |         let req = Request::builder() | ||||||
| @@ -71,6 +73,8 @@ fn conn_reset_after_write() { | |||||||
|         .executor(executor.sender().clone()) |         .executor(executor.sender().clone()) | ||||||
|         .build::<_, ::Body>(connector); |         .build::<_, ::Body>(connector); | ||||||
|  |  | ||||||
|  |     client.pool.no_timer(); | ||||||
|  |  | ||||||
|     { |     { | ||||||
|         let req = Request::builder() |         let req = Request::builder() | ||||||
|             .uri("http://mock.local/a") |             .uri("http://mock.local/a") | ||||||
| @@ -88,7 +92,7 @@ fn conn_reset_after_write() { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     // sleep to allow some time for the connection to return to the pool |     // sleep to allow some time for the connection to return to the pool | ||||||
|     thread::sleep(Duration::from_millis(50)); |     thread::sleep(Duration::from_millis(10)); | ||||||
|  |  | ||||||
|     let req = Request::builder() |     let req = Request::builder() | ||||||
|         .uri("http://mock.local/a") |         .uri("http://mock.local/a") | ||||||
|   | |||||||
| @@ -19,7 +19,6 @@ | |||||||
| extern crate bytes; | extern crate bytes; | ||||||
| #[macro_use] extern crate futures; | #[macro_use] extern crate futures; | ||||||
| #[cfg(feature = "runtime")] extern crate futures_cpupool; | #[cfg(feature = "runtime")] extern crate futures_cpupool; | ||||||
| extern crate futures_timer; |  | ||||||
| extern crate h2; | extern crate h2; | ||||||
| extern crate http; | extern crate http; | ||||||
| extern crate httparse; | extern crate httparse; | ||||||
| @@ -32,6 +31,7 @@ extern crate time; | |||||||
| #[macro_use] extern crate tokio_io; | #[macro_use] extern crate tokio_io; | ||||||
| #[cfg(feature = "runtime")] extern crate tokio_reactor; | #[cfg(feature = "runtime")] extern crate tokio_reactor; | ||||||
| #[cfg(feature = "runtime")] extern crate tokio_tcp; | #[cfg(feature = "runtime")] extern crate tokio_tcp; | ||||||
|  | #[cfg(feature = "runtime")] extern crate tokio_timer; | ||||||
| extern crate want; | extern crate want; | ||||||
|  |  | ||||||
| #[cfg(all(test, feature = "nightly"))] | #[cfg(all(test, feature = "nightly"))] | ||||||
|   | |||||||
| @@ -423,7 +423,7 @@ where | |||||||
|                 Ok(Async::Ready(None)) |                 Ok(Async::Ready(None)) | ||||||
|             }, |             }, | ||||||
|             Ok(Async::NotReady) => return Ok(Async::NotReady), |             Ok(Async::NotReady) => return Ok(Async::NotReady), | ||||||
|             Err(_) => unreachable!("receiver cannot error"), |             Err(never) => match never {}, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,12 +1,12 @@ | |||||||
| use std::fmt; | use std::fmt; | ||||||
| use std::io; | use std::io; | ||||||
| use std::net::{SocketAddr, TcpListener as StdTcpListener}; | use std::net::{SocketAddr, TcpListener as StdTcpListener}; | ||||||
| use std::time::Duration; | use std::time::{Duration, Instant}; | ||||||
|  |  | ||||||
| use futures::{Async, Future, Poll, Stream}; | use futures::{Async, Future, Poll, Stream}; | ||||||
| use futures_timer::Delay; |  | ||||||
| use tokio_tcp::TcpListener; |  | ||||||
| use tokio_reactor::Handle; | use tokio_reactor::Handle; | ||||||
|  | use tokio_tcp::TcpListener; | ||||||
|  | use tokio_timer::Delay; | ||||||
|  |  | ||||||
| use self::addr_stream::AddrStream; | use self::addr_stream::AddrStream; | ||||||
|  |  | ||||||
| @@ -93,9 +93,12 @@ impl Stream for AddrIncoming { | |||||||
|     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||||
|         // Check if a previous timeout is active that was set by IO errors. |         // Check if a previous timeout is active that was set by IO errors. | ||||||
|         if let Some(ref mut to) = self.timeout { |         if let Some(ref mut to) = self.timeout { | ||||||
|             match to.poll().expect("timeout never fails") { |             match to.poll() { | ||||||
|                 Async::Ready(_) => {} |                 Ok(Async::Ready(())) => {} | ||||||
|                 Async::NotReady => return Ok(Async::NotReady), |                 Ok(Async::NotReady) => return Ok(Async::NotReady), | ||||||
|  |                 Err(err) => { | ||||||
|  |                     error!("sleep timer error: {}", err); | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|         self.timeout = None; |         self.timeout = None; | ||||||
| @@ -113,28 +116,38 @@ impl Stream for AddrIncoming { | |||||||
|                     return Ok(Async::Ready(Some(AddrStream::new(socket, addr)))); |                     return Ok(Async::Ready(Some(AddrStream::new(socket, addr)))); | ||||||
|                 }, |                 }, | ||||||
|                 Ok(Async::NotReady) => return Ok(Async::NotReady), |                 Ok(Async::NotReady) => return Ok(Async::NotReady), | ||||||
|                 Err(ref e) if self.sleep_on_errors => { |                 Err(e) => { | ||||||
|                     // Connection errors can be ignored directly, continue by |                     if self.sleep_on_errors { | ||||||
|                     // accepting the next request. |                         // Connection errors can be ignored directly, continue by | ||||||
|                     if is_connection_error(e) { |                         // accepting the next request. | ||||||
|                         debug!("accepted connection already errored: {}", e); |                         if is_connection_error(&e) { | ||||||
|                         continue; |                             debug!("accepted connection already errored: {}", e); | ||||||
|                     } |                             continue; | ||||||
|                     // Sleep 1s. |  | ||||||
|                     let delay = Duration::from_secs(1); |  | ||||||
|                     error!("accept error: {}", e); |  | ||||||
|                     let mut timeout = Delay::new(delay); |  | ||||||
|                     let result = timeout.poll() |  | ||||||
|                         .expect("timeout never fails"); |  | ||||||
|                     match result { |  | ||||||
|                         Async::Ready(()) => continue, |  | ||||||
|                         Async::NotReady => { |  | ||||||
|                             self.timeout = Some(timeout); |  | ||||||
|                             return Ok(Async::NotReady); |  | ||||||
|                         } |                         } | ||||||
|  |                         // Sleep 1s. | ||||||
|  |                         let delay = Instant::now() + Duration::from_secs(1); | ||||||
|  |                         let mut timeout = Delay::new(delay); | ||||||
|  |  | ||||||
|  |                         match timeout.poll() { | ||||||
|  |                             Ok(Async::Ready(())) => { | ||||||
|  |                                 // Wow, it's been a second already? Ok then... | ||||||
|  |                                 error!("accept error: {}", e); | ||||||
|  |                                 continue | ||||||
|  |                             }, | ||||||
|  |                             Ok(Async::NotReady) => { | ||||||
|  |                                 error!("accept error: {}", e); | ||||||
|  |                                 self.timeout = Some(timeout); | ||||||
|  |                                 return Ok(Async::NotReady); | ||||||
|  |                             }, | ||||||
|  |                             Err(timer_err) => { | ||||||
|  |                                 error!("couldn't sleep on error, timer error: {}", timer_err); | ||||||
|  |                                 return Err(e); | ||||||
|  |                             } | ||||||
|  |                         } | ||||||
|  |                     } else { | ||||||
|  |                         return Err(e); | ||||||
|                     } |                     } | ||||||
|                 }, |                 }, | ||||||
|                 Err(e) => return Err(e), |  | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -160,6 +160,8 @@ macro_rules! test { | |||||||
|             let expected_res_body = Option::<&[u8]>::from($response_body) |             let expected_res_body = Option::<&[u8]>::from($response_body) | ||||||
|                 .unwrap_or_default(); |                 .unwrap_or_default(); | ||||||
|             assert_eq!(body.as_ref(), expected_res_body); |             assert_eq!(body.as_ref(), expected_res_body); | ||||||
|  |  | ||||||
|  |             runtime.shutdown_on_idle().wait().expect("rt shutdown"); | ||||||
|         } |         } | ||||||
|     ); |     ); | ||||||
|     ( |     ( | ||||||
| @@ -202,8 +204,10 @@ macro_rules! test { | |||||||
|  |  | ||||||
|             let closure = infer_closure($err); |             let closure = infer_closure($err); | ||||||
|             if !closure(&err) { |             if !closure(&err) { | ||||||
|                 panic!("expected error, unexpected variant: {:?}", err) |                 panic!("expected error, unexpected variant: {:?}", err); | ||||||
|             } |             } | ||||||
|  |  | ||||||
|  |             runtime.shutdown_on_idle().wait().expect("rt shutdown"); | ||||||
|         } |         } | ||||||
|     ); |     ); | ||||||
|  |  | ||||||
| @@ -227,11 +231,6 @@ macro_rules! test { | |||||||
|         let addr = server.local_addr().expect("local_addr"); |         let addr = server.local_addr().expect("local_addr"); | ||||||
|         let runtime = $runtime; |         let runtime = $runtime; | ||||||
|  |  | ||||||
|         let mut config = Client::builder(); |  | ||||||
|         config.http1_title_case_headers($title_case_headers); |  | ||||||
|         if !$set_host { |  | ||||||
|             config.set_host(false); |  | ||||||
|         } |  | ||||||
|         let connector = ::hyper::client::HttpConnector::new_with_handle(1, runtime.reactor().clone()); |         let connector = ::hyper::client::HttpConnector::new_with_handle(1, runtime.reactor().clone()); | ||||||
|         let client = Client::builder() |         let client = Client::builder() | ||||||
|             .set_host($set_host) |             .set_host($set_host) | ||||||
| @@ -923,7 +922,6 @@ mod dispatch_impl { | |||||||
|             client.request(req) |             client.request(req) | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         //let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); |  | ||||||
|         res.select2(rx1).wait().unwrap(); |         res.select2(rx1).wait().unwrap(); | ||||||
|         // res now dropped |         // res now dropped | ||||||
|         let t = Delay::new(Duration::from_millis(100)) |         let t = Delay::new(Duration::from_millis(100)) | ||||||
| @@ -1088,54 +1086,6 @@ mod dispatch_impl { | |||||||
|         let _ = t.select(close).wait(); |         let _ = t.select(close).wait(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |  | ||||||
|     fn client_custom_executor() { |  | ||||||
|         let server = TcpListener::bind("127.0.0.1:0").unwrap(); |  | ||||||
|         let addr = server.local_addr().unwrap(); |  | ||||||
|         let runtime = Runtime::new().unwrap(); |  | ||||||
|         let handle = runtime.reactor(); |  | ||||||
|         let (closes_tx, closes) = mpsc::channel(10); |  | ||||||
|  |  | ||||||
|         let (tx1, rx1) = oneshot::channel(); |  | ||||||
|  |  | ||||||
|         thread::spawn(move || { |  | ||||||
|             let mut sock = server.accept().unwrap().0; |  | ||||||
|             sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); |  | ||||||
|             sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); |  | ||||||
|             let mut buf = [0; 4096]; |  | ||||||
|             sock.read(&mut buf).expect("read 1"); |  | ||||||
|             sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap(); |  | ||||||
|             let _ = tx1.send(()); |  | ||||||
|         }); |  | ||||||
|  |  | ||||||
|         let client = Client::builder() |  | ||||||
|             .executor(runtime.executor()) |  | ||||||
|             .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); |  | ||||||
|  |  | ||||||
|         let req = Request::builder() |  | ||||||
|             .uri(&*format!("http://{}/a", addr)) |  | ||||||
|             .body(Body::empty()) |  | ||||||
|             .unwrap(); |  | ||||||
|         let res = client.request(req).and_then(move |res| { |  | ||||||
|             assert_eq!(res.status(), hyper::StatusCode::OK); |  | ||||||
|             res.into_body().concat2() |  | ||||||
|         }); |  | ||||||
|         let rx = rx1.expect("thread panicked"); |  | ||||||
|  |  | ||||||
|         let timeout = Delay::new(Duration::from_millis(200)); |  | ||||||
|         let rx = rx.and_then(move |_| timeout.expect("timeout")); |  | ||||||
|         res.join(rx).map(|r| r.0).wait().unwrap(); |  | ||||||
|  |  | ||||||
|         let t = Delay::new(Duration::from_millis(100)) |  | ||||||
|             .map(|_| panic!("time out")); |  | ||||||
|         let close = closes.into_future() |  | ||||||
|             .map(|(opt, _)| { |  | ||||||
|                 opt.expect("closes"); |  | ||||||
|             }) |  | ||||||
|             .map_err(|_| panic!("closes dropped")); |  | ||||||
|         let _ = t.select(close).wait(); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn connect_call_is_lazy() { |     fn connect_call_is_lazy() { | ||||||
|         // We especially don't want connects() triggered if there's |         // We especially don't want connects() triggered if there's | ||||||
| @@ -1168,8 +1118,7 @@ mod dispatch_impl { | |||||||
|         let server = TcpListener::bind("127.0.0.1:0").unwrap(); |         let server = TcpListener::bind("127.0.0.1:0").unwrap(); | ||||||
|         let addr = server.local_addr().unwrap(); |         let addr = server.local_addr().unwrap(); | ||||||
|         let runtime = Runtime::new().unwrap(); |         let runtime = Runtime::new().unwrap(); | ||||||
|         let handle = runtime.reactor(); |         let connector = DebugConnector::new(runtime.reactor()); | ||||||
|         let connector = DebugConnector::new(&handle); |  | ||||||
|         let connects = connector.connects.clone(); |         let connects = connector.connects.clone(); | ||||||
|  |  | ||||||
|         let client = Client::builder() |         let client = Client::builder() | ||||||
| @@ -1222,6 +1171,9 @@ mod dispatch_impl { | |||||||
|         res.join(rx).map(|r| r.0).wait().unwrap(); |         res.join(rx).map(|r| r.0).wait().unwrap(); | ||||||
|  |  | ||||||
|         assert_eq!(connects.load(Ordering::SeqCst), 1, "second request should still only have 1 connect"); |         assert_eq!(connects.load(Ordering::SeqCst), 1, "second request should still only have 1 connect"); | ||||||
|  |         drop(client); | ||||||
|  |  | ||||||
|  |         runtime.shutdown_on_idle().wait().expect("rt shutdown"); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user