Upgrade some lib tests to async/.await version (#1882)

* test(http): use async/.await

Signed-off-by: Weihang Lo <me@weihanglo.tw>

* test(pool): use async/.await

* test(pool): pass &mut Future into PollOnce

* test(client): tests/benches using async/.await

* test(client): change due to PR #1917

* test(client): change Delay to delay fucntion

Ref: https://github.com/tokio-rs/tokio/pull/1440

* test(client): remove warning triggers
This commit is contained in:
Weihang Lo
2019-09-07 00:54:11 +08:00
committed by Sean McArthur
parent 511ea3889b
commit 144893b409
3 changed files with 231 additions and 156 deletions

View File

@@ -630,58 +630,76 @@ impl ConnectingTcp {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
// FIXME: re-implement tests with `async/await`, this import should
// trigger a warning to remind us
use crate::Error;
/*
use std::io; use std::io;
use futures::Future;
use tokio::runtime::current_thread::Runtime;
use tokio_net::driver::Handle;
use super::{Connect, Destination, HttpConnector}; use super::{Connect, Destination, HttpConnector};
#[test] #[test]
fn test_errors_missing_authority() { fn test_errors_missing_authority() {
let mut rt = Runtime::new().unwrap();
let uri = "/foo/bar?baz".parse().unwrap(); let uri = "/foo/bar?baz".parse().unwrap();
let dst = Destination { let dst = Destination {
uri, uri,
}; };
let connector = HttpConnector::new(1); let connector = HttpConnector::new();
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
})
} }
#[test] #[test]
fn test_errors_enforce_http() { fn test_errors_enforce_http() {
let mut rt = Runtime::new().unwrap();
let uri = "https://example.domain/foo/bar?baz".parse().unwrap(); let uri = "https://example.domain/foo/bar?baz".parse().unwrap();
let dst = Destination { let dst = Destination {
uri, uri,
}; };
let connector = HttpConnector::new(1); let connector = HttpConnector::new();
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
})
} }
#[test] #[test]
fn test_errors_missing_scheme() { fn test_errors_missing_scheme() {
let mut rt = Runtime::new().unwrap();
let uri = "example.domain".parse().unwrap(); let uri = "example.domain".parse().unwrap();
let dst = Destination { let dst = Destination {
uri, uri,
}; };
let connector = HttpConnector::new(1); let connector = HttpConnector::new();
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
});
} }
#[test] #[test]
#[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)] #[cfg_attr(not(feature = "__internal_happy_eyeballs_tests"), ignore)]
fn client_happy_eyeballs() { fn client_happy_eyeballs() {
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener};
use std::task::Poll;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use futures::{Async, Poll};
use tokio::runtime::current_thread::Runtime; use tokio::runtime::current_thread::Runtime;
use tokio_net::driver::Handle;
use crate::common::{Pin, task};
use super::dns; use super::dns;
use super::ConnectingTcp; use super::ConnectingTcp;
@@ -768,16 +786,15 @@ mod tests {
struct ConnectingTcpFuture(ConnectingTcp); struct ConnectingTcpFuture(ConnectingTcp);
impl Future for ConnectingTcpFuture { impl Future for ConnectingTcpFuture {
type Item = u8; type Output = Result<u8, std::io::Error>;
type Error = ::std::io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match self.0.poll(&Some(Handle::default())) { match self.0.poll(cx,&Some(Handle::default())) {
Poll::Ready(Ok(stream)) => Poll::Ready(Ok( Poll::Ready(Ok(stream)) => Poll::Ready(Ok(
if stream.peer_addr().unwrap().is_ipv4() { 4 } else { 6 } if stream.peer_addr().unwrap().is_ipv4() { 4 } else { 6 }
)), )),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Err(err) => Err(err),
} }
} }
} }
@@ -818,6 +835,5 @@ mod tests {
(reachable, duration) (reachable, duration)
} }
} }
*/
} }

View File

@@ -246,70 +246,102 @@ impl<T, U> Callback<T, U> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
// FIXME: re-implement tests with `async/await`, this import should
// trigger a warning to remind us
use crate::Error;
/*
#[cfg(feature = "nightly")] #[cfg(feature = "nightly")]
extern crate test; extern crate test;
use futures::{future, Future, Stream}; use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::runtime::current_thread::Runtime;
use super::{Callback, channel, Receiver};
#[derive(Debug)] #[derive(Debug)]
struct Custom(i32); struct Custom(i32);
impl<T, U> Future for Receiver<T, U> {
type Output = Option<(T, Callback<T, U>)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_next(cx)
}
}
/// Helper to check if the future is ready after polling once.
struct PollOnce<'a, F>(&'a mut F);
impl<F, T> Future for PollOnce<'_, F>
where
F: Future<Output = T> + Unpin
{
type Output = Option<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Ready(_) => Poll::Ready(Some(())),
Poll::Pending => Poll::Ready(None)
}
}
}
#[test] #[test]
fn drop_receiver_sends_cancel_errors() { fn drop_receiver_sends_cancel_errors() {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();
future::lazy(|| { let (mut tx, mut rx) = channel::<Custom, ()>();
let (mut tx, mut rx) = super::channel::<Custom, ()>();
// must poll once for try_send to succeed // must poll once for try_send to succeed
assert!(rx.poll().expect("rx empty").is_not_ready()); rt.block_on(async {
let poll_once = PollOnce(&mut rx);
assert!(poll_once.await.is_none(), "rx empty");
});
let promise = tx.try_send(Custom(43)).unwrap(); let promise = tx.try_send(Custom(43)).unwrap();
drop(rx); drop(rx);
promise.then(|fulfilled| { rt.block_on(async {
let err = fulfilled let fulfilled = promise.await;
.expect("fulfilled") let err = fulfilled
.expect_err("promise should error"); .expect("fulfilled")
.expect_err("promise should error");
match (err.0.kind(), err.1) { match (err.0.kind(), err.1) {
(&crate::error::Kind::Canceled, Some(_)) => (), (&crate::error::Kind::Canceled, Some(_)) => (),
e => panic!("expected Error::Cancel(_), found {:?}", e), e => panic!("expected Error::Cancel(_), found {:?}", e),
} }
});
Ok::<(), ()>(())
})
}).wait().unwrap();
} }
#[test] #[test]
fn sender_checks_for_want_on_send() { fn sender_checks_for_want_on_send() {
future::lazy(|| { let mut rt = Runtime::new().unwrap();
let (mut tx, mut rx) = super::channel::<Custom, ()>(); let (mut tx, mut rx) = channel::<Custom, ()>();
// one is allowed to buffer, second is rejected
let _ = tx.try_send(Custom(1)).expect("1 buffered");
tx.try_send(Custom(2)).expect_err("2 not ready");
assert!(rx.poll().expect("rx 1").is_ready()); // one is allowed to buffer, second is rejected
// Even though 1 has been popped, only 1 could be buffered for the let _ = tx.try_send(Custom(1)).expect("1 buffered");
// lifetime of the channel. tx.try_send(Custom(2)).expect_err("2 not ready");
tx.try_send(Custom(2)).expect_err("2 still not ready");
assert!(rx.poll().expect("rx empty").is_not_ready()); rt.block_on(async {
let _ = tx.try_send(Custom(2)).expect("2 ready"); let poll_once = PollOnce(&mut rx);
assert!(poll_once.await.is_some(), "rx empty");
});
Ok::<(), ()>(()) // Even though 1 has been popped, only 1 could be buffered for the
}).wait().unwrap(); // lifetime of the channel.
tx.try_send(Custom(2)).expect_err("2 still not ready");
rt.block_on(async {
let poll_once = PollOnce(&mut rx);
assert!(poll_once.await.is_none(), "rx empty");
});
let _ = tx.try_send(Custom(2)).expect("2 ready");
} }
#[test] #[test]
fn unbounded_sender_doesnt_bound_on_want() { fn unbounded_sender_doesnt_bound_on_want() {
let (tx, rx) = super::channel::<Custom, ()>(); let (tx, rx) = channel::<Custom, ()>();
let mut tx = tx.unbound(); let mut tx = tx.unbound();
let _ = tx.try_send(Custom(1)).unwrap(); let _ = tx.try_send(Custom(1)).unwrap();
@@ -325,46 +357,44 @@ mod tests {
#[bench] #[bench]
fn giver_queue_throughput(b: &mut test::Bencher) { fn giver_queue_throughput(b: &mut test::Bencher) {
use crate::{Body, Request, Response}; use crate::{Body, Request, Response};
let (mut tx, mut rx) = super::channel::<Request<Body>, Response<Body>>();
let mut rt = Runtime::new().unwrap();
let (mut tx, mut rx) = channel::<Request<Body>, Response<Body>>();
b.iter(move || { b.iter(move || {
::futures::future::lazy(|| { let _ = tx.send(Request::default()).unwrap();
let _ = tx.send(Request::default()).unwrap(); rt.block_on(async {
loop { loop {
let ok = rx.poll().unwrap(); let poll_once = PollOnce(&mut rx);
if ok.is_not_ready() { let opt = poll_once.await;
break; if opt.is_none() {
break
} }
} }
});
Ok::<_, ()>(())
}).wait().unwrap();
}) })
} }
#[cfg(feature = "nightly")] #[cfg(feature = "nightly")]
#[bench] #[bench]
fn giver_queue_not_ready(b: &mut test::Bencher) { fn giver_queue_not_ready(b: &mut test::Bencher) {
let (_tx, mut rx) = super::channel::<i32, ()>(); let mut rt = Runtime::new().unwrap();
let (_tx, mut rx) = channel::<i32, ()>();
b.iter(move || { b.iter(move || {
::futures::future::lazy(|| { rt.block_on(async {
assert!(rx.poll().unwrap().is_not_ready()); let poll_once = PollOnce(&mut rx);
assert!(poll_once.await.is_none());
Ok::<(), ()>(()) });
}).wait().unwrap();
}) })
} }
#[cfg(feature = "nightly")] #[cfg(feature = "nightly")]
#[bench] #[bench]
fn giver_queue_cancel(b: &mut test::Bencher) { fn giver_queue_cancel(b: &mut test::Bencher) {
let (_tx, mut rx) = super::channel::<i32, ()>(); let (_tx, mut rx) = channel::<i32, ()>();
b.iter(move || { b.iter(move || {
rx.taker.cancel(); rx.taker.cancel();
}) })
} }
*/
} }

View File

@@ -774,23 +774,20 @@ impl<T> WeakOpt<T> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
// FIXME: re-implement tests with `async/await`, this import should
// trigger a warning to remind us
use crate::Error;
/*
use std::sync::Arc; use std::sync::Arc;
use std::task::Poll;
use std::time::Duration; use std::time::Duration;
use futures::{Async, Future};
use futures::future; use tokio::runtime::current_thread::Runtime;
use crate::common::Exec;
use crate::common::{Exec, Future, Pin, task};
use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt}; use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt};
/// Test unique reservations. /// Test unique reservations.
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
struct Uniq<T>(T); struct Uniq<T>(T);
impl<T: Send + 'static> Poolable for Uniq<T> { impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
fn is_open(&self) -> bool { fn is_open(&self) -> bool {
true true
} }
@@ -829,75 +826,97 @@ mod tests {
#[test] #[test]
fn test_pool_checkout_smoke() { fn test_pool_checkout_smoke() {
let mut rt = Runtime::new().unwrap();
let pool = pool_no_timer(); let pool = pool_no_timer();
let key = Arc::new("foo".to_string()); let key = Arc::new("foo".to_string());
let pooled = pool.pooled(c(key.clone()), Uniq(41)); let pooled = pool.pooled(c(key.clone()), Uniq(41));
drop(pooled); drop(pooled);
match pool.checkout(key).poll().unwrap() { rt.block_on(async {
Async::Ready(pooled) => assert_eq!(*pooled, Uniq(41)), match pool.checkout(key).await {
_ => panic!("not ready"), Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
Err(_) => panic!("not ready"),
};
})
}
/// Helper to check if the future is ready after polling once.
struct PollOnce<'a, F>(&'a mut F);
impl<F, T, U> Future for PollOnce<'_, F>
where F: Future<Output = Result<T, U>> + Unpin
{
type Output = Option<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
Poll::Ready(Err(_)) => Poll::Ready(Some(())),
Poll::Pending => Poll::Ready(None)
}
} }
} }
#[test] #[test]
fn test_pool_checkout_returns_none_if_expired() { fn test_pool_checkout_returns_none_if_expired() {
future::lazy(|| { let mut rt = Runtime::new().unwrap();
let pool = pool_no_timer(); let pool = pool_no_timer();
let key = Arc::new("foo".to_string()); let key = Arc::new("foo".to_string());
let pooled = pool.pooled(c(key.clone()), Uniq(41)); let pooled = pool.pooled(c(key.clone()), Uniq(41));
drop(pooled);
::std::thread::sleep(pool.locked().timeout.unwrap()); drop(pooled);
assert!(pool.checkout(key).poll().unwrap().is_not_ready()); std::thread::sleep(pool.locked().timeout.unwrap());
::futures::future::ok::<(), ()>(()) rt.block_on(async {
}).wait().unwrap(); let mut checkout = pool.checkout(key);
let poll_once = PollOnce(&mut checkout);
let is_not_ready = poll_once.await.is_none();
assert!(is_not_ready);
});
} }
#[cfg(feature = "runtime")]
#[test] #[test]
fn test_pool_checkout_removes_expired() { fn test_pool_checkout_removes_expired() {
future::lazy(|| { let mut rt = Runtime::new().unwrap();
let pool = pool_no_timer(); let pool = pool_no_timer();
let key = Arc::new("foo".to_string()); let key = Arc::new("foo".to_string());
pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(41));
pool.pooled(c(key.clone()), Uniq(5)); pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99)); pool.pooled(c(key.clone()), Uniq(99));
assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3)); assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3));
::std::thread::sleep(pool.locked().timeout.unwrap()); std::thread::sleep(pool.locked().timeout.unwrap());
// checkout.poll() should clean out the expired rt.block_on(async {
pool.checkout(key.clone()).poll().unwrap(); let mut checkout = pool.checkout(key.clone());
let poll_once = PollOnce(&mut checkout);
// checkout.await should clean out the expired
poll_once.await;
assert!(pool.locked().idle.get(&key).is_none()); assert!(pool.locked().idle.get(&key).is_none());
});
Ok::<(), ()>(())
}).wait().unwrap();
} }
#[test] #[test]
fn test_pool_max_idle_per_host() { fn test_pool_max_idle_per_host() {
future::lazy(|| { let pool = pool_max_idle_no_timer(2);
let pool = pool_max_idle_no_timer(2); let key = Arc::new("foo".to_string());
let key = Arc::new("foo".to_string());
pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(41));
pool.pooled(c(key.clone()), Uniq(5)); pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99)); pool.pooled(c(key.clone()), Uniq(99));
// pooled and dropped 3, max_idle should only allow 2 // pooled and dropped 3, max_idle should only allow 2
assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(2)); assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(2));
Ok::<(), ()>(())
}).wait().unwrap();
} }
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
#[test] #[test]
fn test_pool_timer_removes_expired() { fn test_pool_timer_removes_expired() {
use std::time::Instant; use std::time::Instant;
use tokio_timer::Delay; use tokio_timer::delay;
let mut rt = ::tokio::runtime::current_thread::Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let pool = Pool::new(super::Config { let pool = Pool::new(super::Config {
enabled: true, enabled: true,
keep_alive_timeout: Some(Duration::from_millis(100)), keep_alive_timeout: Some(Duration::from_millis(100)),
@@ -911,65 +930,76 @@ mod tests {
// Since pool.pooled() will be calling spawn on drop, need to be sure // Since pool.pooled() will be calling spawn on drop, need to be sure
// those drops are called while `rt` is the current executor. To do so, // those drops are called while `rt` is the current executor. To do so,
// call those inside a future. // call those inside a future.
rt.block_on(::futures::future::lazy(|| { rt.block_on(async {
pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(41));
pool.pooled(c(key.clone()), Uniq(5)); pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99)); pool.pooled(c(key.clone()), Uniq(99));
Ok::<_, ()>(()) });
})).unwrap();
assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3)); assert_eq!(pool.locked().idle.get(&key).map(|entries| entries.len()), Some(3));
// Let the timer tick passed the expiration... // Let the timer tick passed the expiration...
rt rt.block_on(async {
.block_on(Delay::new(Instant::now() + Duration::from_millis(200))) let deadline = Instant::now() + Duration::from_millis(200);
.expect("rt block_on 200ms"); delay(deadline).await;
});
assert!(pool.locked().idle.get(&key).is_none()); assert!(pool.locked().idle.get(&key).is_none());
} }
#[test] #[test]
fn test_pool_checkout_task_unparked() { fn test_pool_checkout_task_unparked() {
use futures_util::future::join;
use futures_util::FutureExt;
let mut rt = Runtime::new().unwrap();
let pool = pool_no_timer(); let pool = pool_no_timer();
let key = Arc::new("foo".to_string()); let key = Arc::new("foo".to_string());
let pooled = pool.pooled(c(key.clone()), Uniq(41)); let pooled = pool.pooled(c(key.clone()), Uniq(41));
let checkout = pool.checkout(key).join(future::lazy(move || { let checkout = join(
// the checkout future will park first, pool.checkout(key),
// and then this lazy future will be polled, which will insert async {
// the pooled back into the pool // the checkout future will park first,
// // and then this lazy future will be polled, which will insert
// this test makes sure that doing so will unpark the checkout // the pooled back into the pool
drop(pooled); //
Ok(()) // this test makes sure that doing so will unpark the checkout
})).map(|(entry, _)| entry); drop(pooled);
assert_eq!(*checkout.wait().unwrap(), Uniq(41)); },
).map(|(entry, _)| entry);
rt.block_on(async {
assert_eq!(*checkout.await.unwrap(), Uniq(41));
});
} }
#[test] #[test]
fn test_pool_checkout_drop_cleans_up_waiters() { fn test_pool_checkout_drop_cleans_up_waiters() {
future::lazy(|| { let mut rt = Runtime::new().unwrap();
let pool = pool_no_timer::<Uniq<i32>>(); let pool = pool_no_timer::<Uniq<i32>>();
let key = Arc::new("localhost:12345".to_string()); let key = Arc::new("localhost:12345".to_string());
let mut checkout1 = pool.checkout(key.clone()); let mut checkout1 = pool.checkout(key.clone());
let mut checkout2 = pool.checkout(key.clone()); let mut checkout2 = pool.checkout(key.clone());
// first poll needed to get into Pool's parked let poll_once1 = PollOnce(&mut checkout1);
checkout1.poll().unwrap(); let poll_once2 = PollOnce(&mut checkout2);
// first poll needed to get into Pool's parked
rt.block_on(async {
poll_once1.await;
assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
checkout2.poll().unwrap(); poll_once2.await;
assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2); assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
});
// on drop, clean up Pool // on drop, clean up Pool
drop(checkout1); drop(checkout1);
assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1); assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
drop(checkout2); drop(checkout2);
assert!(pool.locked().waiters.get(&key).is_none()); assert!(pool.locked().waiters.get(&key).is_none());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
} }
#[derive(Debug)] #[derive(Debug)]
@@ -1003,5 +1033,4 @@ mod tests {
assert!(!pool.locked().idle.contains_key(&key)); assert!(!pool.locked().idle.contains_key(&key));
} }
*/
} }