Extract drive combinator to test support

This commit is contained in:
Carl Lerche
2017-09-07 15:36:54 -07:00
parent 38bbf30b2f
commit 9b42dafd24
2 changed files with 68 additions and 42 deletions

View File

@@ -174,43 +174,13 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
extern crate futures;
use futures::{Poll, Async};
use std::fmt;
// TODO: These types should be extracted out
// TODO: Extract this out?
struct WaitForCapacity {
stream: Option<client::Stream<Bytes>>,
target: usize,
}
struct Drive<T> {
conn: Option<Client<mock::Mock, Bytes>>,
fut: T,
}
impl<T> Future for Drive<T>
where T: Future,
T::Error: fmt::Debug,
{
type Item = (Client<mock::Mock, Bytes>, T::Item);
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll() {
Ok(Async::Ready(v)) => return Ok((self.conn.take().unwrap(), v).into()),
Ok(_) => {}
Err(e) => panic!("unexpected error; {:?}", e),
}
match self.conn.as_mut().unwrap().poll() {
Ok(Async::Ready(_)) => panic!(),
Ok(Async::NotReady) => {}
Err(e) => panic!("unexpected error; {:?}", e),
}
Ok(Async::NotReady)
}
}
impl WaitForCapacity {
fn stream(&mut self) -> &mut client::Stream<Bytes> {
self.stream.as_mut().unwrap()
@@ -255,15 +225,10 @@ fn send_data_receive_window_update() {
stream.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize);
// Wait for capacity
let fut = WaitForCapacity {
h2.drive(WaitForCapacity {
stream: Some(stream),
target: frame::DEFAULT_INITIAL_WINDOW_SIZE as usize,
};
Drive {
conn: Some(h2),
fut: fut,
}
})
})
.and_then(|(h2, mut stream)| {
let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize];

View File

@@ -1,23 +1,43 @@
use futures::{Future, Poll};
use futures::{Future, Async, Poll};
use std::fmt;
/// Future extension helpers that are useful for tests
pub trait FutureExt: Future {
/// Panic on error
fn unwrap(self) -> Unwrap<Self>
where Self: Sized,
Self::Error: fmt::Debug,
{
Unwrap { inner: self }
}
}
pub struct Unwrap<T> {
inner: T,
/// Drive `other` by polling `self`.
///
/// `self` must not resolve before `other` does.
fn drive<T>(self, other: T) -> Drive<Self, T>
where T: Future,
T::Error: fmt::Debug,
Self: Future<Item = ()> + Sized,
Self::Error: fmt::Debug,
{
Drive {
driver: Some(self),
future: other,
}
}
}
impl<T: Future> FutureExt for T {
}
// ===== Unwrap ======
/// Panic on error
pub struct Unwrap<T> {
inner: T,
}
impl<T> Future for Unwrap<T>
where T: Future,
T::Item: fmt::Debug,
@@ -30,3 +50,44 @@ impl<T> Future for Unwrap<T>
Ok(self.inner.poll().unwrap())
}
}
// ===== Drive ======
/// Drive a future to completion while also polling the driver
///
/// This is useful for H2 futures that also require the connection to be polled.
pub struct Drive<T, U> {
driver: Option<T>,
future: U,
}
impl<T, U> Future for Drive<T, U>
where T: Future<Item = ()>,
U: Future,
T::Error: fmt::Debug,
U::Error: fmt::Debug,
{
type Item = (T, U::Item);
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.future.poll() {
Ok(Async::Ready(val)) => {
// Get the driver
let driver = self.driver.take().unwrap();
return Ok((driver, val).into())
}
Ok(_) => {}
Err(e) => panic!("unexpected error; {:?}", e),
}
match self.driver.as_mut().unwrap().poll() {
Ok(Async::Ready(_)) => panic!("driver resolved before future"),
Ok(Async::NotReady) => {}
Err(e) => panic!("unexpected error; {:?}", e),
}
Ok(Async::NotReady)
}
}