From 7a7453ba520b4d691ad81ee55dae3b8850f93121 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 30 Apr 2018 14:23:05 -0700 Subject: [PATCH] refactor(lib): change from futures-timer to tokio-timer --- .travis.yml | 2 +- Cargo.toml | 4 +- src/client/mod.rs | 18 +-- src/client/pool.rs | 311 +++++++++++++++++++++++---------------- src/client/tests.rs | 6 +- src/lib.rs | 2 +- src/proto/h1/dispatch.rs | 2 +- src/server/tcp.rs | 63 ++++---- tests/client.rs | 66 ++------- 9 files changed, 247 insertions(+), 227 deletions(-) diff --git a/.travis.yml b/.travis.yml index 871cbfae..b9bd8e49 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,7 +18,7 @@ cache: script: - ./.travis/readme.py - cargo build $FEATURES - - 'if [ "$BUILD_ONLY" != "1" ]; then RUST_LOG=hyper cargo test $FEATURES; fi' + - 'if [ "$BUILD_ONLY" != "1" ]; then RUST_LOG=hyper cargo test $FEATURES -- --test-threads=1; fi' - 'if [ $TRAVIS_RUST_VERSION = nightly ]; then for f in ./benches/*.rs; do cargo test --bench $(basename $f .rs) $FEATURES; done; fi' addons: diff --git a/Cargo.toml b/Cargo.toml index b263e340..9b0ac9ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,6 @@ include = [ bytes = "0.4.4" futures = "0.1.21" futures-cpupool = { version = "0.1.6", optional = true } -futures-timer = "0.1.0" http = "0.1.5" httparse = "1.0" h2 = "0.1.5" @@ -37,9 +36,11 @@ tokio-executor = { version = "0.1.0", optional = true } tokio-io = "0.1" tokio-reactor = { version = "0.1", optional = true } tokio-tcp = { version = "0.1", optional = true } +tokio-timer = { version = "0.2", optional = true } want = "0.0.4" [dev-dependencies] +futures-timer = "0.1" num_cpus = "1.0" pretty_env_logger = "0.2.0" spmc = "0.2" @@ -55,6 +56,7 @@ runtime = [ "tokio-executor", "tokio-reactor", "tokio-tcp", + "tokio-timer", ] [[example]] diff --git a/src/client/mod.rs b/src/client/mod.rs index 06e0a2de..3d55d82f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -113,14 +113,6 @@ where C: Connect + Sync + 'static, /// Send a constructed Request using this Client. pub fn request(&self, mut req: Request) -> FutureResponse { - // TODO(0.12): do this at construction time. - // - // It cannot be done in the constructor because the Client::configured - // does not have `B: 'static` bounds, which are required to spawn - // the interval. In 0.12, add a static bounds to the constructor, - // and move this. - self.schedule_pool_timer(); - match req.version() { Version::HTTP_10 | Version::HTTP_11 => (), @@ -302,7 +294,7 @@ where C: Connect + Sync + 'static, // for a new request to start. // // It won't be ready if there is a body to stream. - if ver == Ver::Http2 || pooled.is_ready() { + if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() { drop(pooled); } else if !res.body().is_empty() { let (delayed_tx, delayed_rx) = oneshot::channel(); @@ -336,10 +328,6 @@ where C: Connect + Sync + 'static, Box::new(resp) } - - fn schedule_pool_timer(&self) { - self.pool.spawn_expired_interval(&self.executor); - } } impl Clone for Client { @@ -474,7 +462,7 @@ impl PoolClient { impl Poolable for PoolClient where - B: 'static, + B: Send + 'static, { fn is_open(&self) -> bool { match self.tx { @@ -700,7 +688,7 @@ impl Builder { executor: self.exec.clone(), h1_writev: self.h1_writev, h1_title_case_headers: self.h1_title_case_headers, - pool: Pool::new(self.keep_alive, self.keep_alive_timeout), + pool: Pool::new(self.keep_alive, self.keep_alive_timeout, &self.exec), retry_canceled_requests: self.retry_canceled_requests, set_host: self.set_host, ver: self.ver, diff --git a/src/client/pool.rs b/src/client/pool.rs index 0d4c8902..658b1c29 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -4,15 +4,16 @@ use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex, Weak}; use std::time::{Duration, Instant}; -use futures::{Future, Async, Poll, Stream}; +use futures::{Future, Async, Poll}; use futures::sync::oneshot; -use futures_timer::Interval; +#[cfg(feature = "runtime")] +use tokio_timer::Interval; -use common::{Exec, Never}; +use common::Exec; use super::Ver; pub(super) struct Pool { - inner: Arc>>, + inner: Arc>, } // Before using a pooled connection, make sure the sender is not dead. @@ -20,7 +21,7 @@ pub(super) struct Pool { // This is a trait to allow the `client::pool::tests` to work for `i32`. // // See https://github.com/hyperium/hyper/issues/1429 -pub(super) trait Poolable: Sized { +pub(super) trait Poolable: Send + Sized + 'static { fn is_open(&self) -> bool; /// Reserve this connection. /// @@ -47,11 +48,20 @@ pub(super) enum Reservation { type Key = (Arc, Ver); struct PoolInner { + connections: Mutex>, + enabled: bool, + /// A single Weak pointer used every time a proper weak reference + /// is not needed. This prevents allocating space in the heap to hold + /// a PoolInner *every single time*, and instead we just allocate + /// this one extra per pool. + weak: Weak>, +} + +struct Connections { // A flag that a connection is being estabilished, and the connection // should be shared. This prevents making multiple HTTP/2 connections // to the same host. connecting: HashSet, - enabled: bool, // These are internal Conns sitting in the event loop in the KeepAlive // state, waiting to receive a new Request to send on the socket. idle: HashMap>>, @@ -65,23 +75,44 @@ struct PoolInner { // them that the Conn could be used instead of waiting for a brand new // connection. waiters: HashMap>>, - timeout: Option, // A oneshot channel is used to allow the interval to be notified when // the Pool completely drops. That way, the interval can cancel immediately. - idle_interval_ref: Option>, + #[cfg(feature = "runtime")] + idle_interval_ref: Option>, + #[cfg(feature = "runtime")] + exec: Exec, + timeout: Option, } impl Pool { - pub fn new(enabled: bool, timeout: Option) -> Pool { + pub fn new(enabled: bool, timeout: Option, __exec: &Exec) -> Pool { Pool { - inner: Arc::new(Mutex::new(PoolInner { - connecting: HashSet::new(), - enabled: enabled, - idle: HashMap::new(), - idle_interval_ref: None, - waiters: HashMap::new(), - timeout: timeout, - })), + inner: Arc::new(PoolInner { + connections: Mutex::new(Connections { + connecting: HashSet::new(), + idle: HashMap::new(), + #[cfg(feature = "runtime")] + idle_interval_ref: None, + waiters: HashMap::new(), + #[cfg(feature = "runtime")] + exec: __exec.clone(), + timeout, + }), + enabled, + weak: Weak::new(), + }), + } + } + + #[cfg(test)] + pub(super) fn no_timer(&self) { + // Prevent an actual interval from being created for this pool... + #[cfg(feature = "runtime")] + { + let mut inner = self.inner.connections.lock().unwrap(); + assert!(inner.idle_interval_ref.is_none(), "timer already spawned"); + let (tx, _) = oneshot::channel(); + inner.idle_interval_ref = Some(tx); } } } @@ -100,8 +131,8 @@ impl Pool { /// Ensure that there is only ever 1 connecting task for HTTP/2 /// connections. This does nothing for HTTP/1. pub(super) fn connecting(&self, key: &Key) -> Option> { - if key.1 == Ver::Http2 { - let mut inner = self.inner.lock().unwrap(); + if key.1 == Ver::Http2 && self.inner.enabled { + let mut inner = self.inner.connections.lock().unwrap(); if inner.connecting.insert(key.clone()) { let connecting = Connecting { key: key.clone(), @@ -117,14 +148,14 @@ impl Pool { key: key.clone(), // in HTTP/1's case, there is never a lock, so we don't // need to do anything in Drop. - pool: Weak::new(), + pool: self.inner.weak.clone(), }) } } fn take(&self, key: &Key) -> Option> { let entry = { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.connections.lock().unwrap(); let expiration = Expiration::new(inner.timeout); let maybe_entry = inner.idle.get_mut(key) .and_then(|list| { @@ -158,35 +189,45 @@ impl Pool { } pub(super) fn pooled(&self, mut connecting: Connecting, value: T) -> Pooled { - let (value, pool_ref) = match value.reserve() { - Reservation::Shared(to_insert, to_return) => { - debug_assert_eq!( - connecting.key.1, - Ver::Http2, - "shared reservation without Http2" - ); - let mut inner = self.inner.lock().unwrap(); - inner.put(connecting.key.clone(), to_insert); - // Do this here instead of Drop for Connecting because we - // already have a lock, no need to lock the mutex twice. - inner.connected(&connecting.key); - // prevent the Drop of Connecting from repeating inner.connected() - connecting.pool = Weak::new(); + let (value, pool_ref, has_pool) = if self.inner.enabled { + match value.reserve() { + Reservation::Shared(to_insert, to_return) => { + debug_assert_eq!( + connecting.key.1, + Ver::Http2, + "shared reservation without Http2" + ); + let mut inner = self.inner.connections.lock().unwrap(); + inner.put(connecting.key.clone(), to_insert, &self.inner); + // Do this here instead of Drop for Connecting because we + // already have a lock, no need to lock the mutex twice. + inner.connected(&connecting.key); + // prevent the Drop of Connecting from repeating inner.connected() + connecting.pool = self.inner.weak.clone(); - // Shared reservations don't need a reference to the pool, - // since the pool always keeps a copy. - (to_return, Weak::new()) - }, - Reservation::Unique(value) => { - // Unique reservations must take a reference to the pool - // since they hope to reinsert once the reservation is - // completed - (value, Arc::downgrade(&self.inner)) - }, + // Shared reservations don't need a reference to the pool, + // since the pool always keeps a copy. + (to_return, self.inner.weak.clone(), false) + }, + Reservation::Unique(value) => { + // Unique reservations must take a reference to the pool + // since they hope to reinsert once the reservation is + // completed + (value, Arc::downgrade(&self.inner), true) + }, + } + } else { + // If pool is not enabled, skip all the things... + + // The Connecting should have had no pool ref + debug_assert!(connecting.pool.upgrade().is_none()); + + (value, self.inner.weak.clone(), false) }; Pooled { - is_reused: false, key: connecting.key.clone(), + has_pool, + is_reused: false, pool: pool_ref, value: Some(value) } @@ -202,13 +243,14 @@ impl Pool { // we just have the final value, without knowledge of if this is // unique or shared. So, the hack is to just assume Ver::Http2 means // shared... :( - let pool_ref = if key.1 == Ver::Http2 { - Weak::new() + let (pool_ref, has_pool) = if key.1 == Ver::Http2 { + (self.inner.weak.clone(), false) } else { - Arc::downgrade(&self.inner) + (Arc::downgrade(&self.inner), true) }; Pooled { + has_pool, is_reused: true, key: key.clone(), pool: pool_ref, @@ -218,7 +260,7 @@ impl Pool { fn waiter(&mut self, key: Key, tx: oneshot::Sender) { trace!("checkout waiting for idle connection: {:?}", key); - self.inner.lock().unwrap() + self.inner.connections.lock().unwrap() .waiters.entry(key) .or_insert(VecDeque::new()) .push_back(tx); @@ -274,11 +316,8 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { } } -impl PoolInner { - fn put(&mut self, key: Key, value: T) { - if !self.enabled { - return; - } +impl Connections { + fn put(&mut self, key: Key, value: T, __pool_ref: &Arc>) { if key.1 == Ver::Http2 && self.idle.contains_key(&key) { trace!("put; existing idle HTTP/2 connection for {:?}", key); return; @@ -328,6 +367,11 @@ impl PoolInner { value: value, idle_at: Instant::now(), }); + + #[cfg(feature = "runtime")] + { + self.spawn_idle_interval(__pool_ref); + } } None => trace!("put; found waiter for {:?}", key), } @@ -346,9 +390,37 @@ impl PoolInner { // those waiters would never receive a connection. self.waiters.remove(key); } + + #[cfg(feature = "runtime")] + fn spawn_idle_interval(&mut self, pool_ref: &Arc>) { + let (dur, rx) = { + debug_assert!(pool_ref.enabled); + + if self.idle_interval_ref.is_some() { + return; + } + + if let Some(dur) = self.timeout { + let (tx, rx) = oneshot::channel(); + self.idle_interval_ref = Some(tx); + (dur, rx) + } else { + return + } + }; + + let start = Instant::now() + dur; + + let interval = Interval::new(start, dur); + self.exec.execute(IdleInterval { + interval: interval, + pool: Arc::downgrade(pool_ref), + pool_drop_notifier: rx, + }); + } } -impl PoolInner { +impl Connections { /// Any `FutureResponse`s that were created will have made a `Checkout`, /// and possibly inserted into the pool that it is waiting for an idle /// connection. If a user ever dropped that future, we need to clean out @@ -367,7 +439,8 @@ impl PoolInner { } } -impl PoolInner { +#[cfg(feature = "runtime")] +impl Connections { /// This should *only* be called by the IdleInterval. fn clear_expired(&mut self) { let dur = self.timeout.expect("interval assumes timeout"); @@ -396,38 +469,6 @@ impl PoolInner { } } - -impl Pool { - pub(super) fn spawn_expired_interval(&self, exec: &Exec) { - let (dur, rx) = { - let mut inner = self.inner.lock().unwrap(); - - if !inner.enabled { - return; - } - - if inner.idle_interval_ref.is_some() { - return; - } - - if let Some(dur) = inner.timeout { - let (tx, rx) = oneshot::channel(); - inner.idle_interval_ref = Some(tx); - (dur, rx) - } else { - return - } - }; - - let interval = Interval::new(dur); - exec.execute(IdleInterval { - interval: interval, - pool: Arc::downgrade(&self.inner), - pool_drop_notifier: rx, - }); - } -} - impl Clone for Pool { fn clone(&self) -> Pool { Pool { @@ -440,9 +481,10 @@ impl Clone for Pool { // Note: The bounds `T: Poolable` is needed for the Drop impl. pub(super) struct Pooled { value: Option, + has_pool: bool, is_reused: bool, key: Key, - pool: Weak>>, + pool: Weak>, } impl Pooled { @@ -450,6 +492,10 @@ impl Pooled { self.is_reused } + pub fn is_pool_enabled(&self) -> bool { + self.has_pool + } + fn as_ref(&self) -> &T { self.value.as_ref().expect("not dropped") } @@ -481,9 +527,13 @@ impl Drop for Pooled { return; } - if let Some(inner) = self.pool.upgrade() { - if let Ok(mut inner) = inner.lock() { - inner.put(self.key.clone(), value); + if let Some(pool) = self.pool.upgrade() { + // Pooled should not have had a real reference if pool is + // not enabled! + debug_assert!(pool.enabled); + + if let Ok(mut inner) = pool.connections.lock() { + inner.put(self.key.clone(), value, &pool); } } else if self.key.1 == Ver::Http1 { trace!("pool dropped, dropping pooled ({:?})", self.key); @@ -569,7 +619,7 @@ impl Future for Checkout { impl Drop for Checkout { fn drop(&mut self) { if self.waiter.take().is_some() { - if let Ok(mut inner) = self.pool.inner.lock() { + if let Ok(mut inner) = self.pool.inner.connections.lock() { inner.clean_waiters(&self.key); } } @@ -578,14 +628,14 @@ impl Drop for Checkout { pub(super) struct Connecting { key: Key, - pool: Weak>>, + pool: Weak>, } impl Drop for Connecting { fn drop(&mut self) { if let Some(pool) = self.pool.upgrade() { // No need to panic on drop, that could abort! - if let Ok(mut inner) = pool.lock() { + if let Ok(mut inner) = pool.connections.lock() { debug_assert_eq!( self.key.1, Ver::Http2, @@ -612,20 +662,25 @@ impl Expiration { } } +#[cfg(feature = "runtime")] struct IdleInterval { interval: Interval, - pool: Weak>>, + pool: Weak>, // This allows the IdleInterval to be notified as soon as the entire // Pool is fully dropped, and shutdown. This channel is never sent on, // but Err(Canceled) will be received when the Pool is dropped. - pool_drop_notifier: oneshot::Receiver, + pool_drop_notifier: oneshot::Receiver<::common::Never>, } +#[cfg(feature = "runtime")] impl Future for IdleInterval { type Item = (); type Error = (); fn poll(&mut self) -> Poll { + // Interval is a Stream + use futures::Stream; + loop { match self.pool_drop_notifier.poll() { Ok(Async::Ready(n)) => match n {}, @@ -636,10 +691,13 @@ impl Future for IdleInterval { } } - try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error"))); + try_ready!(self.interval.poll().map_err(|err| { + error!("idle interval timer error: {}", err); + })); if let Some(inner) = self.pool.upgrade() { - if let Ok(mut inner) = inner.lock() { + if let Ok(mut inner) = inner.connections.lock() { + trace!("idle interval checking for expired"); inner.clear_expired(); continue; } @@ -655,13 +713,14 @@ mod tests { use std::time::Duration; use futures::{Async, Future}; use futures::future; + use common::Exec; use super::{Connecting, Key, Poolable, Pool, Reservation, Ver}; /// Test unique reservations. #[derive(Debug, PartialEq, Eq)] struct Uniq(T); - impl Poolable for Uniq { + impl Poolable for Uniq { fn is_open(&self) -> bool { true } @@ -678,9 +737,15 @@ mod tests { } } + fn pool_no_timer() -> Pool { + let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Default); + pool.no_timer(); + pool + } + #[test] fn test_pool_checkout_smoke() { - let pool = Pool::new(true, Some(Duration::from_secs(5))); + let pool = pool_no_timer(); let key = (Arc::new("foo".to_string()), Ver::Http1); let pooled = pool.pooled(c(key.clone()), Uniq(41)); @@ -695,11 +760,11 @@ mod tests { #[test] fn test_pool_checkout_returns_none_if_expired() { future::lazy(|| { - let pool = Pool::new(true, Some(Duration::from_millis(100))); + let pool = pool_no_timer(); let key = (Arc::new("foo".to_string()), Ver::Http1); let pooled = pool.pooled(c(key.clone()), Uniq(41)); drop(pooled); - ::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap()); + ::std::thread::sleep(pool.inner.connections.lock().unwrap().timeout.unwrap()); assert!(pool.checkout(key).poll().unwrap().is_not_ready()); ::futures::future::ok::<(), ()>(()) }).wait().unwrap(); @@ -708,19 +773,19 @@ mod tests { #[test] fn test_pool_checkout_removes_expired() { future::lazy(|| { - let pool = Pool::new(true, Some(Duration::from_millis(100))); + let pool = pool_no_timer(); let key = (Arc::new("foo".to_string()), Ver::Http1); pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(5)); pool.pooled(c(key.clone()), Uniq(99)); - assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); - ::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap()); + assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); + ::std::thread::sleep(pool.inner.connections.lock().unwrap().timeout.unwrap()); // checkout.poll() should clean out the expired pool.checkout(key.clone()).poll().unwrap(); - assert!(pool.inner.lock().unwrap().idle.get(&key).is_none()); + assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none()); Ok::<(), ()>(()) }).wait().unwrap(); @@ -730,30 +795,26 @@ mod tests { #[test] fn test_pool_timer_removes_expired() { use std::sync::Arc; - use common::Exec; let runtime = ::tokio::runtime::Runtime::new().unwrap(); - let pool = Pool::new(true, Some(Duration::from_millis(100))); - let executor = runtime.executor(); - pool.spawn_expired_interval(&Exec::Executor(Arc::new(executor))); + let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Executor(Arc::new(executor))); + let key = (Arc::new("foo".to_string()), Ver::Http1); pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(5)); pool.pooled(c(key.clone()), Uniq(99)); - assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); + assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); - ::futures_timer::Delay::new( - Duration::from_millis(400) // allow for too-good resolution - ).wait().unwrap(); + ::std::thread::sleep(Duration::from_millis(400)); // allow for too-good resolution - assert!(pool.inner.lock().unwrap().idle.get(&key).is_none()); + assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none()); } #[test] fn test_pool_checkout_task_unparked() { - let pool = Pool::new(true, Some(Duration::from_secs(10))); + let pool = pool_no_timer(); let key = (Arc::new("foo".to_string()), Ver::Http1); let pooled = pool.pooled(c(key.clone()), Uniq(41)); @@ -772,7 +833,7 @@ mod tests { #[test] fn test_pool_checkout_drop_cleans_up_waiters() { future::lazy(|| { - let pool = Pool::>::new(true, Some(Duration::from_secs(10))); + let pool = pool_no_timer::>(); let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); let mut checkout1 = pool.checkout(key.clone()); @@ -780,16 +841,16 @@ mod tests { // first poll needed to get into Pool's parked checkout1.poll().unwrap(); - assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 1); + assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 1); checkout2.poll().unwrap(); - assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 2); + assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 2); // on drop, clean up Pool drop(checkout1); - assert_eq!(pool.inner.lock().unwrap().waiters.get(&key).unwrap().len(), 1); + assert_eq!(pool.inner.connections.lock().unwrap().waiters.get(&key).unwrap().len(), 1); drop(checkout2); - assert!(pool.inner.lock().unwrap().waiters.get(&key).is_none()); + assert!(pool.inner.connections.lock().unwrap().waiters.get(&key).is_none()); ::futures::future::ok::<(), ()>(()) }).wait().unwrap(); @@ -813,13 +874,13 @@ mod tests { #[test] fn pooled_drop_if_closed_doesnt_reinsert() { - let pool = Pool::new(true, Some(Duration::from_secs(10))); + let pool = pool_no_timer(); let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); pool.pooled(c(key.clone()), CanClose { val: 57, closed: true, }); - assert!(!pool.inner.lock().unwrap().idle.contains_key(&key)); + assert!(!pool.inner.connections.lock().unwrap().idle.contains_key(&key)); } } diff --git a/src/client/tests.rs b/src/client/tests.rs index 94091b58..18a48a1f 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -25,6 +25,8 @@ fn retryable_request() { .executor(executor.sender().clone()) .build::<_, ::Body>(connector); + client.pool.no_timer(); + { let req = Request::builder() @@ -71,6 +73,8 @@ fn conn_reset_after_write() { .executor(executor.sender().clone()) .build::<_, ::Body>(connector); + client.pool.no_timer(); + { let req = Request::builder() .uri("http://mock.local/a") @@ -88,7 +92,7 @@ fn conn_reset_after_write() { } // sleep to allow some time for the connection to return to the pool - thread::sleep(Duration::from_millis(50)); + thread::sleep(Duration::from_millis(10)); let req = Request::builder() .uri("http://mock.local/a") diff --git a/src/lib.rs b/src/lib.rs index 0d70bff0..52492014 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,6 @@ extern crate bytes; #[macro_use] extern crate futures; #[cfg(feature = "runtime")] extern crate futures_cpupool; -extern crate futures_timer; extern crate h2; extern crate http; extern crate httparse; @@ -32,6 +31,7 @@ extern crate time; #[macro_use] extern crate tokio_io; #[cfg(feature = "runtime")] extern crate tokio_reactor; #[cfg(feature = "runtime")] extern crate tokio_tcp; +#[cfg(feature = "runtime")] extern crate tokio_timer; extern crate want; #[cfg(all(test, feature = "nightly"))] diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index b9bfacad..db2c471d 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -423,7 +423,7 @@ where Ok(Async::Ready(None)) }, Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(_) => unreachable!("receiver cannot error"), + Err(never) => match never {}, } } diff --git a/src/server/tcp.rs b/src/server/tcp.rs index e1092947..720b05a1 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -1,12 +1,12 @@ use std::fmt; use std::io; use std::net::{SocketAddr, TcpListener as StdTcpListener}; -use std::time::Duration; +use std::time::{Duration, Instant}; use futures::{Async, Future, Poll, Stream}; -use futures_timer::Delay; -use tokio_tcp::TcpListener; use tokio_reactor::Handle; +use tokio_tcp::TcpListener; +use tokio_timer::Delay; use self::addr_stream::AddrStream; @@ -93,9 +93,12 @@ impl Stream for AddrIncoming { fn poll(&mut self) -> Poll, Self::Error> { // Check if a previous timeout is active that was set by IO errors. if let Some(ref mut to) = self.timeout { - match to.poll().expect("timeout never fails") { - Async::Ready(_) => {} - Async::NotReady => return Ok(Async::NotReady), + match to.poll() { + Ok(Async::Ready(())) => {} + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(err) => { + error!("sleep timer error: {}", err); + } } } self.timeout = None; @@ -113,28 +116,38 @@ impl Stream for AddrIncoming { return Ok(Async::Ready(Some(AddrStream::new(socket, addr)))); }, Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(ref e) if self.sleep_on_errors => { - // Connection errors can be ignored directly, continue by - // accepting the next request. - if is_connection_error(e) { - debug!("accepted connection already errored: {}", e); - continue; - } - // Sleep 1s. - let delay = Duration::from_secs(1); - error!("accept error: {}", e); - let mut timeout = Delay::new(delay); - let result = timeout.poll() - .expect("timeout never fails"); - match result { - Async::Ready(()) => continue, - Async::NotReady => { - self.timeout = Some(timeout); - return Ok(Async::NotReady); + Err(e) => { + if self.sleep_on_errors { + // Connection errors can be ignored directly, continue by + // accepting the next request. + if is_connection_error(&e) { + debug!("accepted connection already errored: {}", e); + continue; } + // Sleep 1s. + let delay = Instant::now() + Duration::from_secs(1); + let mut timeout = Delay::new(delay); + + match timeout.poll() { + Ok(Async::Ready(())) => { + // Wow, it's been a second already? Ok then... + error!("accept error: {}", e); + continue + }, + Ok(Async::NotReady) => { + error!("accept error: {}", e); + self.timeout = Some(timeout); + return Ok(Async::NotReady); + }, + Err(timer_err) => { + error!("couldn't sleep on error, timer error: {}", timer_err); + return Err(e); + } + } + } else { + return Err(e); } }, - Err(e) => return Err(e), } } } diff --git a/tests/client.rs b/tests/client.rs index beb7455a..3436ca7e 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -160,6 +160,8 @@ macro_rules! test { let expected_res_body = Option::<&[u8]>::from($response_body) .unwrap_or_default(); assert_eq!(body.as_ref(), expected_res_body); + + runtime.shutdown_on_idle().wait().expect("rt shutdown"); } ); ( @@ -202,8 +204,10 @@ macro_rules! test { let closure = infer_closure($err); if !closure(&err) { - panic!("expected error, unexpected variant: {:?}", err) + panic!("expected error, unexpected variant: {:?}", err); } + + runtime.shutdown_on_idle().wait().expect("rt shutdown"); } ); @@ -227,11 +231,6 @@ macro_rules! test { let addr = server.local_addr().expect("local_addr"); let runtime = $runtime; - let mut config = Client::builder(); - config.http1_title_case_headers($title_case_headers); - if !$set_host { - config.set_host(false); - } let connector = ::hyper::client::HttpConnector::new_with_handle(1, runtime.reactor().clone()); let client = Client::builder() .set_host($set_host) @@ -923,7 +922,6 @@ mod dispatch_impl { client.request(req) }; - //let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); res.select2(rx1).wait().unwrap(); // res now dropped let t = Delay::new(Duration::from_millis(100)) @@ -1088,54 +1086,6 @@ mod dispatch_impl { let _ = t.select(close).wait(); } - #[test] - fn client_custom_executor() { - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); - let (closes_tx, closes) = mpsc::channel(10); - - let (tx1, rx1) = oneshot::channel(); - - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap(); - let _ = tx1.send(()); - }); - - let client = Client::builder() - .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); - - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Body::empty()) - .unwrap(); - let res = client.request(req).and_then(move |res| { - assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() - }); - let rx = rx1.expect("thread panicked"); - - let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.expect("timeout")); - res.join(rx).map(|r| r.0).wait().unwrap(); - - let t = Delay::new(Duration::from_millis(100)) - .map(|_| panic!("time out")); - let close = closes.into_future() - .map(|(opt, _)| { - opt.expect("closes"); - }) - .map_err(|_| panic!("closes dropped")); - let _ = t.select(close).wait(); - } - #[test] fn connect_call_is_lazy() { // We especially don't want connects() triggered if there's @@ -1168,8 +1118,7 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); - let connector = DebugConnector::new(&handle); + let connector = DebugConnector::new(runtime.reactor()); let connects = connector.connects.clone(); let client = Client::builder() @@ -1222,6 +1171,9 @@ mod dispatch_impl { res.join(rx).map(|r| r.0).wait().unwrap(); assert_eq!(connects.load(Ordering::SeqCst), 1, "second request should still only have 1 connect"); + drop(client); + + runtime.shutdown_on_idle().wait().expect("rt shutdown"); } #[test]