diff --git a/tests/prioritization.rs b/tests/prioritization.rs index 0138d93..ff408af 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -174,43 +174,13 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { extern crate futures; use futures::{Poll, Async}; -use std::fmt; -// TODO: These types should be extracted out +// TODO: Extract this out? struct WaitForCapacity { stream: Option>, target: usize, } -struct Drive { - conn: Option>, - fut: T, -} - -impl Future for Drive - where T: Future, - T::Error: fmt::Debug, -{ - type Item = (Client, T::Item); - type Error = (); - - fn poll(&mut self) -> Poll { - match self.fut.poll() { - Ok(Async::Ready(v)) => return Ok((self.conn.take().unwrap(), v).into()), - Ok(_) => {} - Err(e) => panic!("unexpected error; {:?}", e), - } - - match self.conn.as_mut().unwrap().poll() { - Ok(Async::Ready(_)) => panic!(), - Ok(Async::NotReady) => {} - Err(e) => panic!("unexpected error; {:?}", e), - } - - Ok(Async::NotReady) - } -} - impl WaitForCapacity { fn stream(&mut self) -> &mut client::Stream { self.stream.as_mut().unwrap() @@ -255,15 +225,10 @@ fn send_data_receive_window_update() { stream.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize); // Wait for capacity - let fut = WaitForCapacity { + h2.drive(WaitForCapacity { stream: Some(stream), target: frame::DEFAULT_INITIAL_WINDOW_SIZE as usize, - }; - - Drive { - conn: Some(h2), - fut: fut, - } + }) }) .and_then(|(h2, mut stream)| { let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; diff --git a/tests/support/src/future_ext.rs b/tests/support/src/future_ext.rs index 295581c..2794e51 100644 --- a/tests/support/src/future_ext.rs +++ b/tests/support/src/future_ext.rs @@ -1,23 +1,43 @@ -use futures::{Future, Poll}; +use futures::{Future, Async, Poll}; use std::fmt; +/// Future extension helpers that are useful for tests pub trait FutureExt: Future { + /// Panic on error fn unwrap(self) -> Unwrap where Self: Sized, Self::Error: fmt::Debug, { Unwrap { inner: self } } -} -pub struct Unwrap { - inner: T, + /// Drive `other` by polling `self`. + /// + /// `self` must not resolve before `other` does. + fn drive(self, other: T) -> Drive + where T: Future, + T::Error: fmt::Debug, + Self: Future + Sized, + Self::Error: fmt::Debug, + { + Drive { + driver: Some(self), + future: other, + } + } } impl FutureExt for T { } +// ===== Unwrap ====== + +/// Panic on error +pub struct Unwrap { + inner: T, +} + impl Future for Unwrap where T: Future, T::Item: fmt::Debug, @@ -30,3 +50,44 @@ impl Future for Unwrap Ok(self.inner.poll().unwrap()) } } + +// ===== Drive ====== + +/// Drive a future to completion while also polling the driver +/// +/// This is useful for H2 futures that also require the connection to be polled. +pub struct Drive { + driver: Option, + future: U, +} + +impl Future for Drive + where T: Future, + U: Future, + T::Error: fmt::Debug, + U::Error: fmt::Debug, +{ + type Item = (T, U::Item); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.future.poll() { + Ok(Async::Ready(val)) => { + // Get the driver + let driver = self.driver.take().unwrap(); + + return Ok((driver, val).into()) + } + Ok(_) => {} + Err(e) => panic!("unexpected error; {:?}", e), + } + + match self.driver.as_mut().unwrap().poll() { + Ok(Async::Ready(_)) => panic!("driver resolved before future"), + Ok(Async::NotReady) => {} + Err(e) => panic!("unexpected error; {:?}", e), + } + + Ok(Async::NotReady) + } +}