diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index f54cd2e7..eac06f97 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -630,58 +630,76 @@ impl ConnectingTcp { #[cfg(test)] mod tests { - // FIXME: re-implement tests with `async/await`, this import should - // trigger a warning to remind us - use crate::Error; - /* use std::io; - use futures::Future; + + use tokio::runtime::current_thread::Runtime; + use tokio_net::driver::Handle; + use super::{Connect, Destination, HttpConnector}; #[test] fn test_errors_missing_authority() { + let mut rt = Runtime::new().unwrap(); let uri = "/foo/bar?baz".parse().unwrap(); let dst = Destination { uri, }; - let connector = HttpConnector::new(1); + let connector = HttpConnector::new(); - assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); + rt.block_on(async { + assert_eq!( + connector.connect(dst).await.unwrap_err().kind(), + io::ErrorKind::InvalidInput, + ); + }) } #[test] fn test_errors_enforce_http() { + let mut rt = Runtime::new().unwrap(); let uri = "https://example.domain/foo/bar?baz".parse().unwrap(); let dst = Destination { uri, }; - let connector = HttpConnector::new(1); + let connector = HttpConnector::new(); - assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); + rt.block_on(async { + assert_eq!( + connector.connect(dst).await.unwrap_err().kind(), + io::ErrorKind::InvalidInput, + ); + }) } #[test] fn test_errors_missing_scheme() { + let mut rt = Runtime::new().unwrap(); let uri = "example.domain".parse().unwrap(); let dst = Destination { uri, }; - let connector = HttpConnector::new(1); + let connector = HttpConnector::new(); - assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); + rt.block_on(async { + assert_eq!( + connector.connect(dst).await.unwrap_err().kind(), + io::ErrorKind::InvalidInput, + ); + }); } #[test] #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)] fn client_happy_eyeballs() { + use std::future::Future; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener}; + use std::task::Poll; use std::time::{Duration, Instant}; - use futures::{Async, Poll}; use tokio::runtime::current_thread::Runtime; - use tokio_net::driver::Handle; + use crate::common::{Pin, task}; use super::dns; use super::ConnectingTcp; @@ -768,16 +786,15 @@ mod tests { struct ConnectingTcpFuture(ConnectingTcp); impl Future for ConnectingTcpFuture { - type Item = u8; - type Error = ::std::io::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - match self.0.poll(&Some(Handle::default())) { + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + match self.0.poll(cx,&Some(Handle::default())) { Poll::Ready(Ok(stream)) => Poll::Ready(Ok( if stream.peer_addr().unwrap().is_ipv4() { 4 } else { 6 } )), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Pending => Poll::Pending, - Err(err) => Err(err), } } } @@ -818,6 +835,5 @@ mod tests { (reachable, duration) } } - */ } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 4d4723ee..9eb69d2e 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -246,70 +246,102 @@ impl Callback { #[cfg(test)] mod tests { - // FIXME: re-implement tests with `async/await`, this import should - // trigger a warning to remind us - use crate::Error; - /* #[cfg(feature = "nightly")] extern crate test; - use futures::{future, Future, Stream}; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + use tokio::runtime::current_thread::Runtime; + + use super::{Callback, channel, Receiver}; #[derive(Debug)] struct Custom(i32); + impl Future for Receiver { + type Output = Option<(T, Callback)>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.poll_next(cx) + } + } + + /// Helper to check if the future is ready after polling once. + struct PollOnce<'a, F>(&'a mut F); + + impl Future for PollOnce<'_, F> + where + F: Future + Unpin + { + type Output = Option<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.0).poll(cx) { + Poll::Ready(_) => Poll::Ready(Some(())), + Poll::Pending => Poll::Ready(None) + } + } + } + #[test] fn drop_receiver_sends_cancel_errors() { let _ = pretty_env_logger::try_init(); + let mut rt = Runtime::new().unwrap(); - future::lazy(|| { - let (mut tx, mut rx) = super::channel::(); + let (mut tx, mut rx) = channel::(); - // must poll once for try_send to succeed - assert!(rx.poll().expect("rx empty").is_not_ready()); + // must poll once for try_send to succeed + rt.block_on(async { + let poll_once = PollOnce(&mut rx); + assert!(poll_once.await.is_none(), "rx empty"); + }); - let promise = tx.try_send(Custom(43)).unwrap(); - drop(rx); + let promise = tx.try_send(Custom(43)).unwrap(); + drop(rx); - promise.then(|fulfilled| { - let err = fulfilled - .expect("fulfilled") - .expect_err("promise should error"); - - match (err.0.kind(), err.1) { - (&crate::error::Kind::Canceled, Some(_)) => (), - e => panic!("expected Error::Cancel(_), found {:?}", e), - } - - Ok::<(), ()>(()) - }) - }).wait().unwrap(); + rt.block_on(async { + let fulfilled = promise.await; + let err = fulfilled + .expect("fulfilled") + .expect_err("promise should error"); + match (err.0.kind(), err.1) { + (&crate::error::Kind::Canceled, Some(_)) => (), + e => panic!("expected Error::Cancel(_), found {:?}", e), + } + }); } #[test] fn sender_checks_for_want_on_send() { - future::lazy(|| { - let (mut tx, mut rx) = super::channel::(); - // one is allowed to buffer, second is rejected - let _ = tx.try_send(Custom(1)).expect("1 buffered"); - tx.try_send(Custom(2)).expect_err("2 not ready"); + let mut rt = Runtime::new().unwrap(); + let (mut tx, mut rx) = channel::(); - assert!(rx.poll().expect("rx 1").is_ready()); - // Even though 1 has been popped, only 1 could be buffered for the - // lifetime of the channel. - tx.try_send(Custom(2)).expect_err("2 still not ready"); + // one is allowed to buffer, second is rejected + let _ = tx.try_send(Custom(1)).expect("1 buffered"); + tx.try_send(Custom(2)).expect_err("2 not ready"); - assert!(rx.poll().expect("rx empty").is_not_ready()); - let _ = tx.try_send(Custom(2)).expect("2 ready"); + rt.block_on(async { + let poll_once = PollOnce(&mut rx); + assert!(poll_once.await.is_some(), "rx empty"); + }); - Ok::<(), ()>(()) - }).wait().unwrap(); + // Even though 1 has been popped, only 1 could be buffered for the + // lifetime of the channel. + tx.try_send(Custom(2)).expect_err("2 still not ready"); + + rt.block_on(async { + let poll_once = PollOnce(&mut rx); + assert!(poll_once.await.is_none(), "rx empty"); + }); + + let _ = tx.try_send(Custom(2)).expect("2 ready"); } #[test] fn unbounded_sender_doesnt_bound_on_want() { - let (tx, rx) = super::channel::(); + let (tx, rx) = channel::(); let mut tx = tx.unbound(); let _ = tx.try_send(Custom(1)).unwrap(); @@ -325,46 +357,44 @@ mod tests { #[bench] fn giver_queue_throughput(b: &mut test::Bencher) { use crate::{Body, Request, Response}; - let (mut tx, mut rx) = super::channel::, Response>(); + + let mut rt = Runtime::new().unwrap(); + let (mut tx, mut rx) = channel::, Response>(); b.iter(move || { - ::futures::future::lazy(|| { - let _ = tx.send(Request::default()).unwrap(); + let _ = tx.send(Request::default()).unwrap(); + rt.block_on(async { loop { - let ok = rx.poll().unwrap(); - if ok.is_not_ready() { - break; + let poll_once = PollOnce(&mut rx); + let opt = poll_once.await; + if opt.is_none() { + break } } - - - Ok::<_, ()>(()) - }).wait().unwrap(); + }); }) } #[cfg(feature = "nightly")] #[bench] fn giver_queue_not_ready(b: &mut test::Bencher) { - let (_tx, mut rx) = super::channel::(); - + let mut rt = Runtime::new().unwrap(); + let (_tx, mut rx) = channel::(); b.iter(move || { - ::futures::future::lazy(|| { - assert!(rx.poll().unwrap().is_not_ready()); - - Ok::<(), ()>(()) - }).wait().unwrap(); + rt.block_on(async { + let poll_once = PollOnce(&mut rx); + assert!(poll_once.await.is_none()); + }); }) } #[cfg(feature = "nightly")] #[bench] fn giver_queue_cancel(b: &mut test::Bencher) { - let (_tx, mut rx) = super::channel::(); + let (_tx, mut rx) = channel::(); b.iter(move || { rx.taker.cancel(); }) } - */ } diff --git a/src/client/pool.rs b/src/client/pool.rs index 716a2c92..e6a1f545 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -774,23 +774,20 @@ impl WeakOpt { #[cfg(test)] mod tests { - // FIXME: re-implement tests with `async/await`, this import should - // trigger a warning to remind us - use crate::Error; - - /* use std::sync::Arc; + use std::task::Poll; use std::time::Duration; - use futures::{Async, Future}; - use futures::future; - use crate::common::Exec; + + use tokio::runtime::current_thread::Runtime; + + use crate::common::{Exec, Future, Pin, task}; use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt}; /// Test unique reservations. #[derive(Debug, PartialEq, Eq)] struct Uniq(T); - impl Poolable for Uniq { + impl Poolable for Uniq { fn is_open(&self) -> bool { true } @@ -829,75 +826,97 @@ mod tests { #[test] fn test_pool_checkout_smoke() { + let mut rt = Runtime::new().unwrap(); let pool = pool_no_timer(); let key = Arc::new("foo".to_string()); let pooled = pool.pooled(c(key.clone()), Uniq(41)); drop(pooled); - match pool.checkout(key).poll().unwrap() { - Async::Ready(pooled) => assert_eq!(*pooled, Uniq(41)), - _ => panic!("not ready"), + rt.block_on(async { + match pool.checkout(key).await { + Ok(pooled) => assert_eq!(*pooled, Uniq(41)), + Err(_) => panic!("not ready"), + }; + }) + } + + /// Helper to check if the future is ready after polling once. + struct PollOnce<'a, F>(&'a mut F); + + impl Future for PollOnce<'_, F> + where F: Future> + Unpin + { + type Output = Option<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + match Pin::new(&mut self.0).poll(cx) { + Poll::Ready(Ok(_)) => Poll::Ready(Some(())), + Poll::Ready(Err(_)) => Poll::Ready(Some(())), + Poll::Pending => Poll::Ready(None) + } } } #[test] fn test_pool_checkout_returns_none_if_expired() { - future::lazy(|| { - let pool = pool_no_timer(); - let key = Arc::new("foo".to_string()); - let pooled = pool.pooled(c(key.clone()), Uniq(41)); - drop(pooled); - ::std::thread::sleep(pool.locked().timeout.unwrap()); - assert!(pool.checkout(key).poll().unwrap().is_not_ready()); - ::futures::future::ok::<(), ()>(()) - }).wait().unwrap(); + let mut rt = Runtime::new().unwrap(); + let pool = pool_no_timer(); + let key = Arc::new("foo".to_string()); + let pooled = pool.pooled(c(key.clone()), Uniq(41)); + + drop(pooled); + std::thread::sleep(pool.locked().timeout.unwrap()); + rt.block_on(async { + let mut checkout = pool.checkout(key); + let poll_once = PollOnce(&mut checkout); + let is_not_ready = poll_once.await.is_none(); + assert!(is_not_ready); + }); } + #[cfg(feature = "runtime")] #[test] fn test_pool_checkout_removes_expired() { - future::lazy(|| { - let pool = pool_no_timer(); - let key = Arc::new("foo".to_string()); + let mut rt = Runtime::new().unwrap(); + let pool = pool_no_timer(); + let key = Arc::new("foo".to_string()); - pool.pooled(c(key.clone()), Uniq(41)); - pool.pooled(c(key.clone()), Uniq(5)); - pool.pooled(c(key.clone()), Uniq(99)); + pool.pooled(c(key.clone()), Uniq(41)); + pool.pooled(c(key.clone()), Uniq(5)); + pool.pooled(c(key.clone()), Uniq(99)); - assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3)); - ::std::thread::sleep(pool.locked().timeout.unwrap()); + assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3)); + std::thread::sleep(pool.locked().timeout.unwrap()); - // checkout.poll() should clean out the expired - pool.checkout(key.clone()).poll().unwrap(); + rt.block_on(async { + let mut checkout = pool.checkout(key.clone()); + let poll_once = PollOnce(&mut checkout); + // checkout.await should clean out the expired + poll_once.await; assert!(pool.locked().idle.get(&key).is_none()); - - Ok::<(), ()>(()) - }).wait().unwrap(); + }); } #[test] fn test_pool_max_idle_per_host() { - future::lazy(|| { - let pool = pool_max_idle_no_timer(2); - let key = Arc::new("foo".to_string()); + let pool = pool_max_idle_no_timer(2); + let key = Arc::new("foo".to_string()); - pool.pooled(c(key.clone()), Uniq(41)); - pool.pooled(c(key.clone()), Uniq(5)); - pool.pooled(c(key.clone()), Uniq(99)); + pool.pooled(c(key.clone()), Uniq(41)); + pool.pooled(c(key.clone()), Uniq(5)); + pool.pooled(c(key.clone()), Uniq(99)); - // pooled and dropped 3, max_idle should only allow 2 - assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(2)); - - Ok::<(), ()>(()) - }).wait().unwrap(); + // pooled and dropped 3, max_idle should only allow 2 + assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(2)); } #[cfg(feature = "runtime")] #[test] fn test_pool_timer_removes_expired() { use std::time::Instant; - use tokio_timer::Delay; - let mut rt = ::tokio::runtime::current_thread::Runtime::new().unwrap(); + use tokio_timer::delay; + let mut rt = Runtime::new().unwrap(); let pool = Pool::new(super::Config { enabled: true, keep_alive_timeout: Some(Duration::from_millis(100)), @@ -911,65 +930,76 @@ mod tests { // Since pool.pooled() will be calling spawn on drop, need to be sure // those drops are called while `rt` is the current executor. To do so, // call those inside a future. - rt.block_on(::futures::future::lazy(|| { + rt.block_on(async { pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(5)); pool.pooled(c(key.clone()), Uniq(99)); - Ok::<_, ()>(()) - })).unwrap(); + }); assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3)); // Let the timer tick passed the expiration... - rt - .block_on(Delay::new(Instant::now() + Duration::from_millis(200))) - .expect("rt block_on 200ms"); + rt.block_on(async { + let deadline = Instant::now() + Duration::from_millis(200); + delay(deadline).await; + }); assert!(pool.locked().idle.get(&key).is_none()); } #[test] fn test_pool_checkout_task_unparked() { + use futures_util::future::join; + use futures_util::FutureExt; + + let mut rt = Runtime::new().unwrap(); let pool = pool_no_timer(); let key = Arc::new("foo".to_string()); let pooled = pool.pooled(c(key.clone()), Uniq(41)); - let checkout = pool.checkout(key).join(future::lazy(move || { - // the checkout future will park first, - // and then this lazy future will be polled, which will insert - // the pooled back into the pool - // - // this test makes sure that doing so will unpark the checkout - drop(pooled); - Ok(()) - })).map(|(entry, _)| entry); - assert_eq!(*checkout.wait().unwrap(), Uniq(41)); + let checkout = join( + pool.checkout(key), + async { + // the checkout future will park first, + // and then this lazy future will be polled, which will insert + // the pooled back into the pool + // + // this test makes sure that doing so will unpark the checkout + drop(pooled); + }, + ).map(|(entry, _)| entry); + + rt.block_on(async { + assert_eq!(*checkout.await.unwrap(), Uniq(41)); + }); } #[test] fn test_pool_checkout_drop_cleans_up_waiters() { - future::lazy(|| { - let pool = pool_no_timer::>(); - let key = Arc::new("localhost:12345".to_string()); + let mut rt = Runtime::new().unwrap(); + let pool = pool_no_timer::>(); + let key = Arc::new("localhost:12345".to_string()); - let mut checkout1 = pool.checkout(key.clone()); - let mut checkout2 = pool.checkout(key.clone()); + let mut checkout1 = pool.checkout(key.clone()); + let mut checkout2 = pool.checkout(key.clone()); - // first poll needed to get into Pool's parked - checkout1.poll().unwrap(); + let poll_once1 = PollOnce(&mut checkout1); + let poll_once2 = PollOnce(&mut checkout2); + + // first poll needed to get into Pool's parked + rt.block_on(async { + poll_once1.await; assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); - checkout2.poll().unwrap(); + poll_once2.await; assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); + }); - // on drop, clean up Pool - drop(checkout1); - assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); + // on drop, clean up Pool + drop(checkout1); + assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); - drop(checkout2); - assert!(pool.locked().waiters.get(&key).is_none()); - - ::futures::future::ok::<(), ()>(()) - }).wait().unwrap(); + drop(checkout2); + assert!(pool.locked().waiters.get(&key).is_none()); } #[derive(Debug)] @@ -1003,5 +1033,4 @@ mod tests { assert!(!pool.locked().idle.contains_key(&key)); } - */ }