Apply rustfmt to test/support crate (#116)

This commit is contained in:
Oliver Gould
2017-09-25 08:29:54 -07:00
committed by Sean McArthur
parent dad113e17b
commit b99c513334
6 changed files with 175 additions and 169 deletions

View File

@@ -3,8 +3,8 @@ use std::fmt;
use bytes::{Bytes, IntoBuf}; use bytes::{Bytes, IntoBuf};
use http::{self, HeaderMap, HttpTryFrom}; use http::{self, HeaderMap, HttpTryFrom};
use h2::frame::{self, Frame, StreamId};
use super::SendFrame; use super::SendFrame;
use h2::frame::{self, Frame, StreamId};
pub const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; pub const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
@@ -12,7 +12,8 @@ pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];
// ==== helper functions to easily construct h2 Frames ==== // ==== helper functions to easily construct h2 Frames ====
pub fn headers<T>(id: T) -> Mock<frame::Headers> pub fn headers<T>(id: T) -> Mock<frame::Headers>
where T: Into<StreamId>, where
T: Into<StreamId>,
{ {
Mock(frame::Headers::new( Mock(frame::Headers::new(
id.into(), id.into(),
@@ -22,14 +23,16 @@ pub fn headers<T>(id: T) -> Mock<frame::Headers>
} }
pub fn data<T, B>(id: T, buf: B) -> Mock<frame::Data> pub fn data<T, B>(id: T, buf: B) -> Mock<frame::Data>
where T: Into<StreamId>, where
T: Into<StreamId>,
B: Into<Bytes>, B: Into<Bytes>,
{ {
Mock(frame::Data::new(id.into(), buf.into())) Mock(frame::Data::new(id.into(), buf.into()))
} }
pub fn push_promise<T1, T2>(id: T1, promised: T2) -> Mock<frame::PushPromise> pub fn push_promise<T1, T2>(id: T1, promised: T2) -> Mock<frame::PushPromise>
where T1: Into<StreamId>, where
T1: Into<StreamId>,
T2: Into<StreamId>, T2: Into<StreamId>,
{ {
Mock(frame::PushPromise::new( Mock(frame::PushPromise::new(
@@ -41,19 +44,22 @@ where T1: Into<StreamId>,
} }
pub fn window_update<T>(id: T, sz: u32) -> frame::WindowUpdate pub fn window_update<T>(id: T, sz: u32) -> frame::WindowUpdate
where T: Into<StreamId>, where
T: Into<StreamId>,
{ {
frame::WindowUpdate::new(id.into(), sz) frame::WindowUpdate::new(id.into(), sz)
} }
pub fn go_away<T>(id: T) -> Mock<frame::GoAway> pub fn go_away<T>(id: T) -> Mock<frame::GoAway>
where T: Into<StreamId>, where
T: Into<StreamId>,
{ {
Mock(frame::GoAway::new(id.into(), frame::Reason::NoError)) Mock(frame::GoAway::new(id.into(), frame::Reason::NoError))
} }
pub fn reset<T>(id: T) -> Mock<frame::Reset> pub fn reset<T>(id: T) -> Mock<frame::Reset>
where T: Into<StreamId>, where
T: Into<StreamId>,
{ {
Mock(frame::Reset::new(id.into(), frame::Reason::NoError)) Mock(frame::Reset::new(id.into(), frame::Reason::NoError))
} }
@@ -73,7 +79,9 @@ impl<T: fmt::Debug> fmt::Debug for Mock<T> {
} }
impl<T> From<Mock<T>> for Frame impl<T> From<Mock<T>> for Frame
where T: Into<Frame> { where
T: Into<Frame>,
{
fn from(src: Mock<T>) -> Self { fn from(src: Mock<T>) -> Self {
src.0.into() src.0.into()
} }
@@ -83,30 +91,24 @@ where T: Into<Frame> {
impl Mock<frame::Headers> { impl Mock<frame::Headers> {
pub fn request<M, U>(self, method: M, uri: U) -> Self pub fn request<M, U>(self, method: M, uri: U) -> Self
where M: HttpTryInto<http::Method>, where
M: HttpTryInto<http::Method>,
U: HttpTryInto<http::Uri>, U: HttpTryInto<http::Uri>,
{ {
let method = method.try_into().unwrap(); let method = method.try_into().unwrap();
let uri = uri.try_into().unwrap(); let uri = uri.try_into().unwrap();
let (id, _, fields) = self.into_parts(); let (id, _, fields) = self.into_parts();
let frame = frame::Headers::new( let frame = frame::Headers::new(id, frame::Pseudo::request(method, uri), fields);
id,
frame::Pseudo::request(method, uri),
fields
);
Mock(frame) Mock(frame)
} }
pub fn response<S>(self, status: S) -> Self pub fn response<S>(self, status: S) -> Self
where S: HttpTryInto<http::StatusCode>, where
S: HttpTryInto<http::StatusCode>,
{ {
let status = status.try_into().unwrap(); let status = status.try_into().unwrap();
let (id, _, fields) = self.into_parts(); let (id, _, fields) = self.into_parts();
let frame = frame::Headers::new( let frame = frame::Headers::new(id, frame::Pseudo::response(status), fields);
id,
frame::Pseudo::response(status),
fields
);
Mock(frame) Mock(frame)
} }
@@ -161,18 +163,15 @@ impl From<Mock<frame::Data>> for SendFrame {
impl Mock<frame::PushPromise> { impl Mock<frame::PushPromise> {
pub fn request<M, U>(self, method: M, uri: U) -> Self pub fn request<M, U>(self, method: M, uri: U) -> Self
where M: HttpTryInto<http::Method>, where
M: HttpTryInto<http::Method>,
U: HttpTryInto<http::Uri>, U: HttpTryInto<http::Uri>,
{ {
let method = method.try_into().unwrap(); let method = method.try_into().unwrap();
let uri = uri.try_into().unwrap(); let uri = uri.try_into().unwrap();
let (id, promised, _, fields) = self.into_parts(); let (id, promised, _, fields) = self.into_parts();
let frame = frame::PushPromise::new( let frame =
id, frame::PushPromise::new(id, promised, frame::Pseudo::request(method, uri), fields);
promised,
frame::Pseudo::request(method, uri),
fields
);
Mock(frame) Mock(frame)
} }
@@ -201,15 +200,24 @@ impl From<Mock<frame::PushPromise>> for SendFrame {
impl Mock<frame::GoAway> { impl Mock<frame::GoAway> {
pub fn protocol_error(self) -> Self { pub fn protocol_error(self) -> Self {
Mock(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::ProtocolError)) Mock(frame::GoAway::new(
self.0.last_stream_id(),
frame::Reason::ProtocolError,
))
} }
pub fn flow_control(self) -> Self { pub fn flow_control(self) -> Self {
Mock(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FlowControlError)) Mock(frame::GoAway::new(
self.0.last_stream_id(),
frame::Reason::FlowControlError,
))
} }
pub fn frame_size(self) -> Self { pub fn frame_size(self) -> Self {
Mock(frame::GoAway::new(self.0.last_stream_id(), frame::Reason::FrameSizeError)) Mock(frame::GoAway::new(
self.0.last_stream_id(),
frame::Reason::FrameSizeError,
))
} }
} }
@@ -264,7 +272,8 @@ pub trait HttpTryInto<T> {
} }
impl<T, U> HttpTryInto<T> for U impl<T, U> HttpTryInto<T> for U
where T: HttpTryFrom<U>, where
T: HttpTryFrom<U>,
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
type Error = T::Error; type Error = T::Error;

View File

@@ -1,4 +1,4 @@
use futures::{Future, Async, Poll}; use futures::{Async, Future, Poll};
use std::fmt; use std::fmt;
@@ -6,15 +6,19 @@ use std::fmt;
pub trait FutureExt: Future { pub trait FutureExt: Future {
/// Panic on error /// Panic on error
fn unwrap(self) -> Unwrap<Self> fn unwrap(self) -> Unwrap<Self>
where Self: Sized, where
Self: Sized,
Self::Error: fmt::Debug, Self::Error: fmt::Debug,
{ {
Unwrap { inner: self } Unwrap {
inner: self,
}
} }
/// Panic on error, with a message. /// Panic on error, with a message.
fn expect<T>(self, msg: T) -> Expect<Self> fn expect<T>(self, msg: T) -> Expect<Self>
where Self: Sized, where
Self: Sized,
Self::Error: fmt::Debug, Self::Error: fmt::Debug,
T: fmt::Display, T: fmt::Display,
{ {
@@ -28,7 +32,8 @@ pub trait FutureExt: Future {
/// ///
/// `self` must not resolve before `other` does. /// `self` must not resolve before `other` does.
fn drive<T>(self, other: T) -> Drive<Self, T> fn drive<T>(self, other: T) -> Drive<Self, T>
where T: Future, where
T: Future,
T::Error: fmt::Debug, T::Error: fmt::Debug,
Self: Future<Item = ()> + Sized, Self: Future<Item = ()> + Sized,
Self::Error: fmt::Debug, Self::Error: fmt::Debug,
@@ -40,8 +45,7 @@ pub trait FutureExt: Future {
} }
} }
impl<T: Future> FutureExt for T { impl<T: Future> FutureExt for T {}
}
// ===== Unwrap ====== // ===== Unwrap ======
@@ -51,7 +55,8 @@ pub struct Unwrap<T> {
} }
impl<T> Future for Unwrap<T> impl<T> Future for Unwrap<T>
where T: Future, where
T: Future,
T::Item: fmt::Debug, T::Item: fmt::Debug,
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
@@ -73,7 +78,8 @@ pub struct Expect<T> {
} }
impl<T> Future for Expect<T> impl<T> Future for Expect<T>
where T: Future, where
T: Future,
T::Item: fmt::Debug, T::Item: fmt::Debug,
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
@@ -96,7 +102,8 @@ pub struct Drive<T, U> {
} }
impl<T, U> Future for Drive<T, U> impl<T, U> Future for Drive<T, U>
where T: Future<Item = ()>, where
T: Future<Item = ()>,
U: Future, U: Future,
T::Error: fmt::Debug, T::Error: fmt::Debug,
U::Error: fmt::Debug, U::Error: fmt::Debug,
@@ -113,9 +120,9 @@ impl<T, U> Future for Drive<T, U>
// Get the driver // Get the driver
let driver = self.driver.take().unwrap(); let driver = self.driver.take().unwrap();
return Ok((driver, val).into()) return Ok((driver, val).into());
} },
Ok(_) => {} Ok(_) => {},
Err(e) => panic!("unexpected error; {:?}", e), Err(e) => panic!("unexpected error; {:?}", e),
} }
@@ -128,8 +135,8 @@ impl<T, U> Future for Drive<T, U>
looped = true; looped = true;
continue; continue;
} }
} },
Ok(Async::NotReady) => {} Ok(Async::NotReady) => {},
Err(e) => panic!("unexpected error; {:?}", e), Err(e) => panic!("unexpected error; {:?}", e),
} }

View File

@@ -7,10 +7,10 @@ pub extern crate http;
#[macro_use] #[macro_use]
pub extern crate tokio_io; pub extern crate tokio_io;
pub extern crate env_logger;
#[macro_use] #[macro_use]
pub extern crate futures; pub extern crate futures;
pub extern crate mock_io; pub extern crate mock_io;
pub extern crate env_logger;
#[macro_use] #[macro_use]
mod assert; mod assert;

View File

@@ -1,11 +1,11 @@
use {FutureExt, SendFrame}; use {FutureExt, SendFrame};
use h2::{self, SendError, RecvError}; use h2::{self, RecvError, SendError};
use h2::frame::{self, Frame}; use h2::frame::{self, Frame};
use futures::{Async, Future, Stream, Poll}; use futures::{Async, Future, Poll, Stream};
use futures::task::{self, Task};
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::task::{self, Task};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::read_exact; use tokio_io::io::read_exact;
@@ -58,7 +58,9 @@ pub fn new() -> (Mock, Handle) {
}; };
let handle = Handle { let handle = Handle {
codec: h2::Codec::new(Pipe { inner }), codec: h2::Codec::new(Pipe {
inner,
}),
}; };
(mock, handle) (mock, handle)
@@ -92,12 +94,9 @@ impl Handle {
} }
/// Read the client preface /// Read the client preface
pub fn read_preface(self) pub fn read_preface(self) -> Box<Future<Item = Self, Error = io::Error>> {
-> Box<Future<Item = Self, Error = io::Error>>
{
let buf = vec![0; PREFACE.len()]; let buf = vec![0; PREFACE.len()];
let ret = read_exact(self, buf) let ret = read_exact(self, buf).and_then(|(me, buf)| {
.and_then(|(me, buf)| {
assert_eq!(buf, PREFACE); assert_eq!(buf, PREFACE);
Ok(me) Ok(me)
}); });
@@ -106,22 +105,26 @@ impl Handle {
} }
/// Perform the H2 handshake /// Perform the H2 handshake
pub fn assert_client_handshake(self) pub fn assert_client_handshake(
-> Box<Future<Item = (frame::Settings, Self), Error = h2::Error>> self,
{ ) -> Box<Future<Item = (frame::Settings, Self), Error = h2::Error>> {
self.assert_client_handshake_with_settings(frame::Settings::default()) self.assert_client_handshake_with_settings(frame::Settings::default())
} }
/// Perform the H2 handshake /// Perform the H2 handshake
pub fn assert_client_handshake_with_settings<T>(mut self, settings: T) pub fn assert_client_handshake_with_settings<T>(
-> Box<Future<Item = (frame::Settings, Self), Error = h2::Error>> mut self,
where T: Into<frame::Settings>, settings: T,
) -> Box<Future<Item = (frame::Settings, Self), Error = h2::Error>>
where
T: Into<frame::Settings>,
{ {
let settings = settings.into(); let settings = settings.into();
// Send a settings frame // Send a settings frame
self.send(settings.into()).unwrap(); self.send(settings.into()).unwrap();
let ret = self.read_preface().unwrap() let ret = self.read_preface()
.unwrap()
.and_then(|me| me.into_future().unwrap()) .and_then(|me| me.into_future().unwrap())
.map(|(frame, mut me)| { .map(|(frame, mut me)| {
match frame { match frame {
@@ -133,13 +136,13 @@ impl Handle {
me.send(ack.into()).unwrap(); me.send(ack.into()).unwrap();
(settings, me) (settings, me)
} },
Some(frame) => { Some(frame) => {
panic!("unexpected frame; frame={:?}", frame); panic!("unexpected frame; frame={:?}", frame);
} },
None => { None => {
panic!("unexpected EOF"); panic!("unexpected EOF");
} },
} }
}) })
.then(|res| { .then(|res| {
@@ -155,8 +158,7 @@ impl Handle {
(settings, me) (settings, me)
}) })
}) });
;
Box::new(ret) Box::new(ret)
} }
@@ -177,8 +179,7 @@ impl io::Read for Handle {
} }
} }
impl AsyncRead for Handle { impl AsyncRead for Handle {}
}
impl io::Write for Handle { impl io::Write for Handle {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
@@ -215,7 +216,10 @@ impl Drop for Handle {
impl io::Read for Mock { impl io::Read for Mock {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
assert!(buf.len() > 0, "attempted read with zero length buffer... wut?"); assert!(
buf.len() > 0,
"attempted read with zero length buffer... wut?"
);
let mut me = self.pipe.inner.lock().unwrap(); let mut me = self.pipe.inner.lock().unwrap();
@@ -236,8 +240,7 @@ impl io::Read for Mock {
} }
} }
impl AsyncRead for Mock { impl AsyncRead for Mock {}
}
impl io::Write for Mock { impl io::Write for Mock {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
@@ -273,7 +276,10 @@ impl AsyncWrite for Mock {
impl io::Read for Pipe { impl io::Read for Pipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
assert!(buf.len() > 0, "attempted read with zero length buffer... wut?"); assert!(
buf.len() > 0,
"attempted read with zero length buffer... wut?"
);
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
@@ -290,8 +296,7 @@ impl io::Read for Pipe {
} }
} }
impl AsyncRead for Pipe { impl AsyncRead for Pipe {}
}
impl io::Write for Pipe { impl io::Write for Pipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
@@ -320,11 +325,13 @@ impl AsyncWrite for Pipe {
pub trait HandleFutureExt { pub trait HandleFutureExt {
fn recv_settings(self) -> RecvFrame<Box<Future<Item = (Option<Frame>, Handle), Error = ()>>> fn recv_settings(self) -> RecvFrame<Box<Future<Item = (Option<Frame>, Handle), Error = ()>>>
where Self: Sized + 'static, where
Self: Sized + 'static,
Self: Future<Item = (frame::Settings, Handle)>, Self: Future<Item = (frame::Settings, Handle)>,
Self::Error: fmt::Debug, Self::Error: fmt::Debug,
{ {
let map = self.map(|(settings, handle)| (Some(settings.into()), handle)).unwrap(); let map = self.map(|(settings, handle)| (Some(settings.into()), handle))
.unwrap();
let boxed: Box<Future<Item = (Option<Frame>, Handle), Error = ()>> = Box::new(map); let boxed: Box<Future<Item = (Option<Frame>, Handle), Error = ()>> = Box::new(map);
RecvFrame { RecvFrame {
@@ -334,7 +341,8 @@ pub trait HandleFutureExt {
} }
fn ignore_settings(self) -> Box<Future<Item = Handle, Error = ()>> fn ignore_settings(self) -> Box<Future<Item = Handle, Error = ()>>
where Self: Sized + 'static, where
Self: Sized + 'static,
Self: Future<Item = (frame::Settings, Handle)>, Self: Future<Item = (frame::Settings, Handle)>,
Self::Error: fmt::Debug, Self::Error: fmt::Debug,
{ {
@@ -342,14 +350,16 @@ pub trait HandleFutureExt {
} }
fn recv_frame<T>(self, frame: T) -> RecvFrame<<Self as IntoRecvFrame>::Future> fn recv_frame<T>(self, frame: T) -> RecvFrame<<Self as IntoRecvFrame>::Future>
where Self: IntoRecvFrame + Sized, where
Self: IntoRecvFrame + Sized,
T: Into<Frame>, T: Into<Frame>,
{ {
self.into_recv_frame(frame.into()) self.into_recv_frame(frame.into())
} }
fn send_frame<T>(self, frame: T) -> SendFrameFut<Self> fn send_frame<T>(self, frame: T) -> SendFrameFut<Self>
where Self: Sized, where
Self: Sized,
T: Into<SendFrame>, T: Into<SendFrame>,
{ {
SendFrameFut { SendFrameFut {
@@ -359,7 +369,8 @@ pub trait HandleFutureExt {
} }
fn idle_ms(self, ms: usize) -> Box<Future<Item = Handle, Error = Self::Error>> fn idle_ms(self, ms: usize) -> Box<Future<Item = Handle, Error = Self::Error>>
where Self: Sized + 'static, where
Self: Sized + 'static,
Self: Future<Item = Handle>, Self: Future<Item = Handle>,
Self::Error: fmt::Debug, Self::Error: fmt::Debug,
{ {
@@ -384,7 +395,8 @@ pub trait HandleFutureExt {
} }
fn close(self) -> Box<Future<Item = (), Error = ()>> fn close(self) -> Box<Future<Item = (), Error = ()>>
where Self: Future<Error = ()> + Sized + 'static, where
Self: Future<Error = ()> + Sized + 'static,
{ {
Box::new(self.map(drop)) Box::new(self.map(drop))
} }
@@ -396,7 +408,8 @@ pub struct RecvFrame<T> {
} }
impl<T> Future for RecvFrame<T> impl<T> Future for RecvFrame<T>
where T: Future<Item=(Option<Frame>, Handle)>, where
T: Future<Item = (Option<Frame>, Handle)>,
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
type Item = Handle; type Item = Handle;
@@ -418,7 +431,8 @@ pub struct SendFrameFut<T> {
} }
impl<T> Future for SendFrameFut<T> impl<T> Future for SendFrameFut<T>
where T: Future<Item=Handle>, where
T: Future<Item = Handle>,
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
type Item = Handle; type Item = Handle;
@@ -452,13 +466,14 @@ impl Future for Idle {
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
res => { res => {
panic!("Received unexpected frame on handle; frame={:?}", res); panic!("Received unexpected frame on handle; frame={:?}", res);
} },
} }
} }
} }
impl<T> HandleFutureExt for T impl<T> HandleFutureExt for T
where T: Future + 'static, where
T: Future + 'static,
{ {
} }
@@ -479,14 +494,16 @@ impl IntoRecvFrame for Handle {
} }
impl<T> IntoRecvFrame for T impl<T> IntoRecvFrame for T
where T: Future<Item=Handle> + 'static, where
T: Future<Item = Handle> + 'static,
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
type Future = Box<Future<Item = (Option<Frame>, Handle), Error = ()>>; type Future = Box<Future<Item = (Option<Frame>, Handle), Error = ()>>;
fn into_recv_frame(self, frame: Frame) -> RecvFrame<Self::Future> { fn into_recv_frame(self, frame: Frame) -> RecvFrame<Self::Future> {
let into_fut = Box::new(self.unwrap() let into_fut = Box::new(
.and_then(|handle| handle.into_future().unwrap()) self.unwrap()
.and_then(|handle| handle.into_future().unwrap()),
); );
RecvFrame { RecvFrame {
inner: into_fut, inner: into_fut,

View File

@@ -3,8 +3,8 @@
pub use super::h2; pub use super::h2;
pub use self::h2::*; pub use self::h2::*;
pub use self::h2::frame::StreamId;
pub use self::h2::client::{self, Client}; pub use self::h2::client::{self, Client};
pub use self::h2::frame::StreamId;
pub use self::h2::server::{self, Server}; pub use self::h2::server::{self, Server};
// Re-export mock // Re-export mock
@@ -20,41 +20,18 @@ pub use super::util;
pub use super::{Codec, SendFrame}; pub use super::{Codec, SendFrame};
// Re-export useful crates // Re-export useful crates
pub use super::{ pub use super::{bytes, env_logger, futures, http, mock_io, tokio_io};
http,
bytes,
tokio_io,
futures,
mock_io,
env_logger,
};
// Re-export primary future types // Re-export primary future types
pub use self::futures::{ pub use self::futures::{Future, Sink, Stream};
Future,
Sink,
Stream,
};
// And our Future extensions // And our Future extensions
pub use future_ext::{FutureExt, Unwrap}; pub use future_ext::{FutureExt, Unwrap};
// Re-export HTTP types // Re-export HTTP types
pub use self::http::{ pub use self::http::{HeaderMap, Method, Request, Response, StatusCode};
Request,
Response,
Method,
HeaderMap,
StatusCode,
};
pub use self::bytes::{ pub use self::bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf};
Buf,
BufMut,
Bytes,
BytesMut,
IntoBuf,
};
pub use tokio_io::{AsyncRead, AsyncWrite}; pub use tokio_io::{AsyncRead, AsyncWrite};
@@ -63,7 +40,7 @@ pub use std::time::Duration;
// ===== Everything under here shouldn't be used ===== // ===== Everything under here shouldn't be used =====
// TODO: work on deleting this code // TODO: work on deleting this code
pub use ::futures::future::poll_fn; pub use futures::future::poll_fn;
pub trait MockH2 { pub trait MockH2 {
fn handshake(&mut self) -> &mut Self; fn handshake(&mut self) -> &mut Self;
@@ -84,25 +61,24 @@ pub trait ClientExt {
} }
impl<T, B> ClientExt for Client<T, B> impl<T, B> ClientExt for Client<T, B>
where T: AsyncRead + AsyncWrite + 'static, where
T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static, B: IntoBuf + 'static,
{ {
fn run<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> { fn run<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> {
use futures::future::{self, Future}; use futures::future::{self, Future};
use futures::future::Either::*; use futures::future::Either::*;
let res = future::poll_fn(|| self.poll()) let res = future::poll_fn(|| self.poll()).select2(f).wait();
.select2(f).wait();
match res { match res {
Ok(A((_, b))) => { Ok(A((_, b))) => {
// Connection is done... // Connection is done...
b.wait() b.wait()
} },
Ok(B((v, _))) => return Ok(v), Ok(B((v, _))) => return Ok(v),
Err(A((e, _))) => panic!("err: {:?}", e), Err(A((e, _))) => panic!("err: {:?}", e),
Err(B((e, _))) => return Err(e), Err(B((e, _))) => return Err(e),
} }
} }
} }

View File

@@ -1,12 +1,9 @@
use h2::client; use h2::client;
use futures::{Poll, Async, Future};
use bytes::Bytes; use bytes::Bytes;
use futures::{Async, Future, Poll};
pub fn wait_for_capacity(stream: client::Stream<Bytes>, pub fn wait_for_capacity(stream: client::Stream<Bytes>, target: usize) -> WaitForCapacity {
target: usize)
-> WaitForCapacity
{
WaitForCapacity { WaitForCapacity {
stream: Some(stream), stream: Some(stream),
target: target, target: target,
@@ -36,7 +33,7 @@ impl Future for WaitForCapacity {
println!("CAP={:?}", act); println!("CAP={:?}", act);
if act >= self.target { if act >= self.target {
return Ok(self.stream.take().unwrap().into()) return Ok(self.stream.take().unwrap().into());
} }
Ok(Async::NotReady) Ok(Async::NotReady)