test(drain): re-enable drain tests

This commit is contained in:
Sean McArthur
2019-09-11 11:31:02 -07:00
parent d36e028b21
commit 8479c2aaed

View File

@@ -128,12 +128,6 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
// FIXME: re-implement tests with `async/await`, this import should
// trigger a warning to remind us
use crate::Error;
/*
use futures::{future, Async, Future, Poll};
use super::*; use super::*;
struct TestMe { struct TestMe {
@@ -143,22 +137,21 @@ mod tests {
} }
impl Future for TestMe { impl Future for TestMe {
type Item = (); type Output = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
self.poll_cnt += 1; self.poll_cnt += 1;
if self.finished { if self.finished {
Ok(Async::Ready(())) Poll::Ready(())
} else { } else {
Ok(Async::NotReady) Poll::Pending
} }
} }
} }
#[test] #[test]
fn watch() { fn watch() {
future::lazy(|| { tokio_test::task::mock(|cx| {
let (tx, rx) = channel(); let (tx, rx) = channel();
let fut = TestMe { let fut = TestMe {
draining: false, draining: false,
@@ -166,18 +159,18 @@ mod tests {
poll_cnt: 0, poll_cnt: 0,
}; };
let mut watch = rx.watch(fut, |fut| { let mut watch = rx.watch(fut, |mut fut| {
fut.draining = true; fut.draining = true;
}); });
assert_eq!(watch.future.poll_cnt, 0); assert_eq!(watch.future.poll_cnt, 0);
// First poll should poll the inner future // First poll should poll the inner future
assert!(watch.poll().unwrap().is_not_ready()); assert!(Pin::new(&mut watch).poll(cx).is_pending());
assert_eq!(watch.future.poll_cnt, 1); assert_eq!(watch.future.poll_cnt, 1);
// Second poll should poll the inner future again // Second poll should poll the inner future again
assert!(watch.poll().unwrap().is_not_ready()); assert!(Pin::new(&mut watch).poll(cx).is_pending());
assert_eq!(watch.future.poll_cnt, 2); assert_eq!(watch.future.poll_cnt, 2);
let mut draining = tx.drain(); let mut draining = tx.drain();
@@ -186,28 +179,26 @@ mod tests {
assert_eq!(watch.future.poll_cnt, 2); assert_eq!(watch.future.poll_cnt, 2);
// Now, poll after drain has been signaled. // Now, poll after drain has been signaled.
assert!(watch.poll().unwrap().is_not_ready()); assert!(Pin::new(&mut watch).poll(cx).is_pending());
assert_eq!(watch.future.poll_cnt, 3); assert_eq!(watch.future.poll_cnt, 3);
assert!(watch.future.draining); assert!(watch.future.draining);
// Draining is not ready until watcher completes // Draining is not ready until watcher completes
assert!(draining.poll().unwrap().is_not_ready()); assert!(Pin::new(&mut draining).poll(cx).is_pending());
// Finishing up the watch future // Finishing up the watch future
watch.future.finished = true; watch.future.finished = true;
assert!(watch.poll().unwrap().is_ready()); assert!(Pin::new(&mut watch).poll(cx).is_ready());
assert_eq!(watch.future.poll_cnt, 4); assert_eq!(watch.future.poll_cnt, 4);
drop(watch); drop(watch);
assert!(draining.poll().unwrap().is_ready()); assert!(Pin::new(&mut draining).poll(cx).is_ready());
})
Ok::<_, ()>(())
}).wait().unwrap();
} }
#[test] #[test]
fn watch_clones() { fn watch_clones() {
future::lazy(|| { tokio_test::task::mock(|cx| {
let (tx, rx) = channel(); let (tx, rx) = channel();
let fut1 = TestMe { let fut1 = TestMe {
@@ -221,32 +212,29 @@ mod tests {
poll_cnt: 0, poll_cnt: 0,
}; };
let watch1 = rx.clone().watch(fut1, |fut| { let watch1 = rx.clone().watch(fut1, |mut fut| {
fut.draining = true; fut.draining = true;
}); });
let watch2 = rx.watch(fut2, |fut| { let watch2 = rx.watch(fut2, |mut fut| {
fut.draining = true; fut.draining = true;
}); });
let mut draining = tx.drain(); let mut draining = tx.drain();
// Still 2 outstanding watchers // Still 2 outstanding watchers
assert!(draining.poll().unwrap().is_not_ready()); assert!(Pin::new(&mut draining).poll(cx).is_pending());
// drop 1 for whatever reason // drop 1 for whatever reason
drop(watch1); drop(watch1);
// Still not ready, 1 other watcher still pending // Still not ready, 1 other watcher still pending
assert!(draining.poll().unwrap().is_not_ready()); assert!(Pin::new(&mut draining).poll(cx).is_pending());
drop(watch2); drop(watch2);
// Now all watchers are gone, draining is complete // Now all watchers are gone, draining is complete
assert!(draining.poll().unwrap().is_ready()); assert!(Pin::new(&mut draining).poll(cx).is_ready());
});
Ok::<_, ()>(())
}).wait().unwrap();
} }
*/
} }