Update h2-support to std-future
This commit is contained in:
committed by
Sean McArthur
parent
c8fefd49f1
commit
529ef4cd40
@@ -9,8 +9,7 @@ h2 = { path = "../..", features = ["unstable"] }
|
||||
|
||||
bytes = "0.4.7"
|
||||
env_logger = "0.5.9"
|
||||
futures = "0.1.21"
|
||||
futures-preview = "0.3.0-alpha.18"
|
||||
http = "0.1.5"
|
||||
string = "0.2"
|
||||
tokio-io = "0.1.6"
|
||||
tokio-timer = "0.1.2"
|
||||
tokio = { git = "https://github.com/tokio-rs/tokio" }
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! assert_closed {
|
||||
($transport:expr) => {{
|
||||
assert_eq!($transport.poll().unwrap(), None.into());
|
||||
}}
|
||||
use futures::StreamExt;
|
||||
|
||||
assert!($transport.next().await.is_none());
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
@@ -13,7 +14,7 @@ macro_rules! assert_headers {
|
||||
h2::frame::Frame::Headers(v) => v,
|
||||
f => panic!("expected HEADERS; actual={:?}", f),
|
||||
}
|
||||
}}
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
@@ -23,7 +24,7 @@ macro_rules! assert_data {
|
||||
h2::frame::Frame::Data(v) => v,
|
||||
f => panic!("expected DATA; actual={:?}", f),
|
||||
}
|
||||
}}
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
@@ -33,7 +34,7 @@ macro_rules! assert_ping {
|
||||
h2::frame::Frame::Ping(v) => v,
|
||||
f => panic!("expected PING; actual={:?}", f),
|
||||
}
|
||||
}}
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
@@ -43,28 +44,56 @@ macro_rules! assert_settings {
|
||||
h2::frame::Frame::Settings(v) => v,
|
||||
f => panic!("expected SETTINGS; actual={:?}", f),
|
||||
}
|
||||
}}
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! poll_err {
|
||||
($transport:expr) => {{
|
||||
match $transport.poll() {
|
||||
Err(e) => e,
|
||||
use futures::StreamExt;
|
||||
match $transport.next().await {
|
||||
Some(Err(e)) => e,
|
||||
frame => panic!("expected error; actual={:?}", frame),
|
||||
}
|
||||
}}
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! poll_frame {
|
||||
($type: ident, $transport:expr) => {{
|
||||
use futures::StreamExt;
|
||||
use h2::frame::Frame;
|
||||
use futures::Async;
|
||||
|
||||
match $transport.poll() {
|
||||
Ok(Async::Ready(Some(Frame::$type(frame)))) => frame,
|
||||
match $transport.next().await {
|
||||
Some(Ok(Frame::$type(frame))) => frame,
|
||||
frame => panic!("unexpected frame; actual={:?}", frame),
|
||||
}
|
||||
}}
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! assert_default_settings {
|
||||
($settings: expr) => {{
|
||||
assert_frame_eq($settings, frame::Settings::default());
|
||||
}};
|
||||
}
|
||||
|
||||
use h2::frame::Frame;
|
||||
|
||||
pub fn assert_frame_eq<T: Into<Frame>, U: Into<Frame>>(t: T, u: U) {
|
||||
let actual: Frame = t.into();
|
||||
let expected: Frame = u.into();
|
||||
match (actual, expected) {
|
||||
(Frame::Data(a), Frame::Data(b)) => {
|
||||
assert_eq!(
|
||||
a.payload().len(),
|
||||
b.payload().len(),
|
||||
"assert_frame_eq data payload len"
|
||||
);
|
||||
assert_eq!(a, b, "assert_frame_eq");
|
||||
}
|
||||
(a, b) => {
|
||||
assert_eq!(a, b, "assert_frame_eq");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,8 +11,8 @@ pub trait SendRequestExt {
|
||||
|
||||
impl<B> SendRequestExt for SendRequest<B>
|
||||
where
|
||||
B: IntoBuf,
|
||||
B::Buf: 'static,
|
||||
B: IntoBuf + Unpin,
|
||||
B::Buf: Unpin + 'static,
|
||||
{
|
||||
fn get(&mut self, uri: &str) -> ResponseFuture {
|
||||
let req = Request::builder()
|
||||
|
||||
@@ -1,220 +1,54 @@
|
||||
use futures::{Async, Future, Poll};
|
||||
|
||||
use std::fmt;
|
||||
use futures::FutureExt;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
/// Future extension helpers that are useful for tests
|
||||
pub trait FutureExt: Future {
|
||||
/// Panic on error
|
||||
fn unwrap(self) -> Unwrap<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
Unwrap {
|
||||
inner: self,
|
||||
}
|
||||
}
|
||||
|
||||
/// Panic on success, yielding the content of an `Err`.
|
||||
fn unwrap_err(self) -> UnwrapErr<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
UnwrapErr {
|
||||
inner: self,
|
||||
}
|
||||
}
|
||||
|
||||
/// Panic on success, with a message.
|
||||
fn expect_err<T>(self, msg: T) -> ExpectErr<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
Self::Error: fmt::Debug,
|
||||
T: fmt::Display,
|
||||
{
|
||||
ExpectErr{
|
||||
inner: self,
|
||||
msg: msg.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Panic on error, with a message.
|
||||
fn expect<T>(self, msg: T) -> Expect<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
Self::Error: fmt::Debug,
|
||||
T: fmt::Display,
|
||||
{
|
||||
Expect {
|
||||
inner: self,
|
||||
msg: msg.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TestFuture: Future {
|
||||
/// Drive `other` by polling `self`.
|
||||
///
|
||||
/// `self` must not resolve before `other` does.
|
||||
fn drive<T>(self, other: T) -> Drive<Self, T>
|
||||
fn drive<T>(&mut self, other: T) -> Drive<'_, Self, T>
|
||||
where
|
||||
T: Future,
|
||||
T::Error: fmt::Debug,
|
||||
Self: Future<Item = ()> + Sized,
|
||||
Self::Error: fmt::Debug,
|
||||
Self: Future + Sized,
|
||||
{
|
||||
Drive {
|
||||
driver: Some(self),
|
||||
future: other,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap this future in one that will yield NotReady once before continuing.
|
||||
///
|
||||
/// This allows the executor to poll other futures before trying this one
|
||||
/// again.
|
||||
fn yield_once(self) -> Box<dyn Future<Item = Self::Item, Error = Self::Error>>
|
||||
where
|
||||
Self: Future + Sized + 'static,
|
||||
{
|
||||
Box::new(super::util::yield_once().then(move |_| self))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Future> FutureExt for T {}
|
||||
|
||||
// ===== Unwrap ======
|
||||
|
||||
/// Panic on error
|
||||
pub struct Unwrap<T> {
|
||||
inner: T,
|
||||
}
|
||||
|
||||
impl<T> Future for Unwrap<T>
|
||||
where
|
||||
T: Future,
|
||||
T::Item: fmt::Debug,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
type Item = T::Item;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<T::Item, ()> {
|
||||
Ok(self.inner.poll().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== UnwrapErr ======
|
||||
|
||||
/// Panic on success.
|
||||
pub struct UnwrapErr<T> {
|
||||
inner: T,
|
||||
}
|
||||
|
||||
impl<T> Future for UnwrapErr<T>
|
||||
where
|
||||
T: Future,
|
||||
T::Item: fmt::Debug,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
type Item = T::Error;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<T::Error, ()> {
|
||||
match self.inner.poll() {
|
||||
Ok(Async::Ready(v)) => panic!("Future::unwrap_err() on an Ok value: {:?}", v),
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(e) => Ok(Async::Ready(e)),
|
||||
driver: self,
|
||||
future: Box::pin(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// ===== Expect ======
|
||||
|
||||
/// Panic on error
|
||||
pub struct Expect<T> {
|
||||
inner: T,
|
||||
msg: String,
|
||||
}
|
||||
|
||||
impl<T> Future for Expect<T>
|
||||
where
|
||||
T: Future,
|
||||
T::Item: fmt::Debug,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
type Item = T::Item;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<T::Item, ()> {
|
||||
Ok(self.inner.poll().expect(&self.msg))
|
||||
}
|
||||
}
|
||||
|
||||
// ===== ExpectErr ======
|
||||
|
||||
/// Panic on success
|
||||
pub struct ExpectErr<T> {
|
||||
inner: T,
|
||||
msg: String,
|
||||
}
|
||||
|
||||
impl<T> Future for ExpectErr<T>
|
||||
where
|
||||
T: Future,
|
||||
T::Item: fmt::Debug,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
type Item = T::Error;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<T::Error, ()> {
|
||||
match self.inner.poll() {
|
||||
Ok(Async::Ready(v)) => panic!("{}: {:?}", self.msg, v),
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(e) => Ok(Async::Ready(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl<T: Future> TestFuture for T {}
|
||||
|
||||
// ===== Drive ======
|
||||
|
||||
/// Drive a future to completion while also polling the driver
|
||||
///
|
||||
/// This is useful for H2 futures that also require the connection to be polled.
|
||||
pub struct Drive<T, U> {
|
||||
driver: Option<T>,
|
||||
future: U,
|
||||
pub struct Drive<'a, T, U> {
|
||||
driver: &'a mut T,
|
||||
future: Pin<Box<U>>,
|
||||
}
|
||||
|
||||
impl<T, U> Future for Drive<T, U>
|
||||
impl<'a, T, U> Future for Drive<'a, T, U>
|
||||
where
|
||||
T: Future<Item = ()>,
|
||||
T: Future + Unpin,
|
||||
U: Future,
|
||||
T::Error: fmt::Debug,
|
||||
U::Error: fmt::Debug,
|
||||
{
|
||||
type Item = (T, U::Item);
|
||||
type Error = ();
|
||||
type Output = U::Output;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let mut looped = false;
|
||||
|
||||
loop {
|
||||
match self.future.poll() {
|
||||
Ok(Async::Ready(val)) => {
|
||||
// Get the driver
|
||||
let driver = self.driver.take().unwrap();
|
||||
|
||||
return Ok((driver, val).into());
|
||||
},
|
||||
Ok(_) => {},
|
||||
Err(e) => panic!("unexpected error; {:?}", e),
|
||||
match self.future.poll_unpin(cx) {
|
||||
Poll::Ready(val) => return Poll::Ready(val),
|
||||
Poll::Pending => {}
|
||||
}
|
||||
|
||||
match self.driver.as_mut().unwrap().poll() {
|
||||
Ok(Async::Ready(_)) => {
|
||||
match self.driver.poll_unpin(cx) {
|
||||
Poll::Ready(_) => {
|
||||
if looped {
|
||||
// Try polling the future one last time
|
||||
panic!("driver resolved before future")
|
||||
@@ -222,12 +56,11 @@ where
|
||||
looped = true;
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Ok(Async::NotReady) => {},
|
||||
Err(e) => panic!("unexpected error; {:?}", e),
|
||||
}
|
||||
Poll::Pending => {}
|
||||
}
|
||||
|
||||
return Ok(Async::NotReady);
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#![feature(async_await)]
|
||||
//! Utilities to support tests.
|
||||
|
||||
#[macro_use]
|
||||
@@ -16,7 +17,7 @@ mod client_ext;
|
||||
mod future_ext;
|
||||
|
||||
pub use crate::client_ext::{SendRequestExt};
|
||||
pub use crate::future_ext::{FutureExt, Unwrap};
|
||||
pub use crate::future_ext::TestFuture;
|
||||
|
||||
pub type WindowSize = usize;
|
||||
pub const DEFAULT_WINDOW_SIZE: WindowSize = (1 << 16) - 1;
|
||||
|
||||
@@ -1,18 +1,21 @@
|
||||
use crate::{frames, FutureExt, SendFrame};
|
||||
use crate::SendFrame;
|
||||
|
||||
use h2::{self, RecvError, SendError};
|
||||
use h2::frame::{self, Frame};
|
||||
use h2::{self, RecvError, SendError};
|
||||
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use futures::sync::oneshot;
|
||||
use futures::task::{self, Task};
|
||||
use futures::future::poll_fn;
|
||||
use futures::{ready, Stream, StreamExt};
|
||||
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_io::io::read_exact;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use tokio::timer::Delay;
|
||||
|
||||
use std::{cmp, fmt, io, usize};
|
||||
use std::io::ErrorKind::WouldBlock;
|
||||
use super::assert::assert_frame_eq;
|
||||
use futures::executor::block_on;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{cmp, io, usize};
|
||||
|
||||
/// A mock I/O
|
||||
#[derive(Debug)]
|
||||
@@ -36,19 +39,19 @@ struct Inner {
|
||||
rx: Vec<u8>,
|
||||
|
||||
/// Notify when data is ready to be received.
|
||||
rx_task: Option<Task>,
|
||||
rx_task: Option<Waker>,
|
||||
|
||||
/// Data written by the `h2` library to be read by the test case.
|
||||
tx: Vec<u8>,
|
||||
|
||||
/// Notify when data is written. This notifies the test case waiters.
|
||||
tx_task: Option<Task>,
|
||||
tx_task: Option<Waker>,
|
||||
|
||||
/// Number of bytes that can be written before `write` returns `NotReady`.
|
||||
/// Number of bytes that can be written before `write` returns `Poll::Pending`.
|
||||
tx_rem: usize,
|
||||
|
||||
/// Task to notify when write capacity becomes available.
|
||||
tx_rem_task: Option<Task>,
|
||||
tx_rem_task: Option<Waker>,
|
||||
|
||||
/// True when the pipe is closed.
|
||||
closed: bool,
|
||||
@@ -80,9 +83,7 @@ pub fn new_with_write_capacity(cap: usize) -> (Mock, Handle) {
|
||||
};
|
||||
|
||||
let handle = Handle {
|
||||
codec: h2::Codec::new(Pipe {
|
||||
inner,
|
||||
}),
|
||||
codec: h2::Codec::new(Pipe { inner }),
|
||||
};
|
||||
|
||||
(mock, handle)
|
||||
@@ -97,207 +98,258 @@ impl Handle {
|
||||
}
|
||||
|
||||
/// Send a frame
|
||||
pub fn send(&mut self, item: SendFrame) -> Result<(), SendError> {
|
||||
pub async fn send(&mut self, item: SendFrame) -> Result<(), SendError> {
|
||||
// Queue the frame
|
||||
self.codec.buffer(item).unwrap();
|
||||
|
||||
// Flush the frame
|
||||
assert!(self.codec.flush()?.is_ready());
|
||||
|
||||
poll_fn(|cx| {
|
||||
let p = self.codec.flush(cx);
|
||||
assert!(p.is_ready());
|
||||
p
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes the client preface
|
||||
pub fn write_preface(&mut self) {
|
||||
use std::io::Write;
|
||||
|
||||
// Write the connnection preface
|
||||
self.codec.get_mut().write(PREFACE).unwrap();
|
||||
pub async fn write_preface(&mut self) {
|
||||
self.codec.get_mut().write_all(PREFACE).await.unwrap();
|
||||
}
|
||||
|
||||
/// Read the client preface
|
||||
pub fn read_preface(self) -> Box<dyn Future<Item = Self, Error = io::Error>> {
|
||||
let buf = vec![0; PREFACE.len()];
|
||||
let ret = read_exact(self, buf).and_then(|(me, buf)| {
|
||||
assert_eq!(buf, PREFACE);
|
||||
Ok(me)
|
||||
});
|
||||
pub async fn read_preface(&mut self) -> io::Result<()> {
|
||||
let mut buf = vec![0u8; PREFACE.len()];
|
||||
self.read_exact(&mut buf).await?;
|
||||
assert_eq!(buf, PREFACE);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Box::new(ret)
|
||||
pub async fn recv_frame<F: Into<Frame>>(&mut self, expected: F) {
|
||||
let frame = self.next().await.unwrap().unwrap();
|
||||
assert_frame_eq(frame, expected);
|
||||
}
|
||||
|
||||
pub async fn send_frame<F: Into<SendFrame>>(&mut self, frame: F) {
|
||||
self.send(frame.into()).await.unwrap();
|
||||
}
|
||||
|
||||
pub async fn recv_eof(&mut self) {
|
||||
let frame = self.next().await;
|
||||
assert!(frame.is_none());
|
||||
}
|
||||
|
||||
pub async fn send_bytes(&mut self, data: &[u8]) {
|
||||
use bytes::Buf;
|
||||
use std::io::Cursor;
|
||||
|
||||
let buf: Vec<_> = data.into();
|
||||
let mut buf = Cursor::new(buf);
|
||||
|
||||
poll_fn(move |cx| {
|
||||
while buf.has_remaining() {
|
||||
let res = Pin::new(self.codec.get_mut())
|
||||
.poll_write_buf(cx, &mut buf)
|
||||
.map_err(|e| panic!("write err={:?}", e));
|
||||
|
||||
ready!(res).unwrap();
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Perform the H2 handshake
|
||||
pub fn assert_client_handshake(
|
||||
self,
|
||||
) -> Box<dyn Future<Item = (frame::Settings, Self), Error = h2::Error>> {
|
||||
pub async fn assert_client_handshake(&mut self) -> frame::Settings {
|
||||
self.assert_client_handshake_with_settings(frame::Settings::default())
|
||||
.await
|
||||
}
|
||||
|
||||
/// Perform the H2 handshake
|
||||
pub fn assert_client_handshake_with_settings<T>(
|
||||
mut self,
|
||||
settings: T,
|
||||
) -> Box<dyn Future<Item = (frame::Settings, Self), Error = h2::Error>>
|
||||
pub async fn assert_client_handshake_with_settings<T>(&mut self, settings: T) -> frame::Settings
|
||||
where
|
||||
T: Into<frame::Settings>,
|
||||
{
|
||||
let settings = settings.into();
|
||||
// Send a settings frame
|
||||
self.send(settings.into()).unwrap();
|
||||
self.send(settings.into()).await.unwrap();
|
||||
self.read_preface().await.unwrap();
|
||||
|
||||
let ret = self.read_preface()
|
||||
.unwrap()
|
||||
.and_then(|me| me.into_future().unwrap())
|
||||
.map(|(frame, mut me)| {
|
||||
match frame {
|
||||
Some(Frame::Settings(settings)) => {
|
||||
// Send the ACK
|
||||
let ack = frame::Settings::ack();
|
||||
let settings = match self.next().await {
|
||||
Some(frame) => match frame.unwrap() {
|
||||
Frame::Settings(settings) => {
|
||||
// Send the ACK
|
||||
let ack = frame::Settings::ack();
|
||||
|
||||
// TODO: Don't unwrap?
|
||||
me.send(ack.into()).unwrap();
|
||||
// TODO: Don't unwrap?
|
||||
self.send(ack.into()).await.unwrap();
|
||||
|
||||
(settings, me)
|
||||
},
|
||||
Some(frame) => {
|
||||
panic!("unexpected frame; frame={:?}", frame);
|
||||
},
|
||||
None => {
|
||||
panic!("unexpected EOF");
|
||||
},
|
||||
settings
|
||||
}
|
||||
})
|
||||
.then(|res| {
|
||||
let (settings, me) = res.unwrap();
|
||||
frame => {
|
||||
panic!("unexpected frame; frame={:?}", frame);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
panic!("unexpected EOF");
|
||||
}
|
||||
};
|
||||
|
||||
me.into_future()
|
||||
.map_err(|_| unreachable!("all previous futures unwrapped"))
|
||||
.map(|(frame, me)| {
|
||||
let f = assert_settings!(frame.unwrap());
|
||||
let frame = self.next().await.unwrap().unwrap();
|
||||
let f = assert_settings!(frame);
|
||||
|
||||
// Is ACK
|
||||
assert!(f.is_ack());
|
||||
// Is ACK
|
||||
assert!(f.is_ack());
|
||||
|
||||
(settings, me)
|
||||
})
|
||||
});
|
||||
|
||||
Box::new(ret)
|
||||
settings
|
||||
}
|
||||
|
||||
|
||||
/// Perform the H2 handshake
|
||||
pub fn assert_server_handshake(
|
||||
self,
|
||||
) -> Box<dyn Future<Item = (frame::Settings, Self), Error = h2::Error>> {
|
||||
pub async fn assert_server_handshake(&mut self) -> frame::Settings {
|
||||
self.assert_server_handshake_with_settings(frame::Settings::default())
|
||||
.await
|
||||
}
|
||||
|
||||
/// Perform the H2 handshake
|
||||
pub fn assert_server_handshake_with_settings<T>(
|
||||
mut self,
|
||||
settings: T,
|
||||
) -> Box<dyn Future<Item = (frame::Settings, Self), Error = h2::Error>>
|
||||
pub async fn assert_server_handshake_with_settings<T>(&mut self, settings: T) -> frame::Settings
|
||||
where
|
||||
T: Into<frame::Settings>,
|
||||
{
|
||||
self.write_preface();
|
||||
self.write_preface().await;
|
||||
|
||||
let settings = settings.into();
|
||||
self.send(settings.into()).unwrap();
|
||||
self.send(settings.into()).await.unwrap();
|
||||
|
||||
let ret = self.into_future()
|
||||
.unwrap()
|
||||
.map(|(frame, mut me)| {
|
||||
match frame {
|
||||
Some(Frame::Settings(settings)) => {
|
||||
// Send the ACK
|
||||
let ack = frame::Settings::ack();
|
||||
let frame = self.next().await;
|
||||
let settings = match frame {
|
||||
Some(frame) => match frame.unwrap() {
|
||||
Frame::Settings(settings) => {
|
||||
// Send the ACK
|
||||
let ack = frame::Settings::ack();
|
||||
|
||||
// TODO: Don't unwrap?
|
||||
me.send(ack.into()).unwrap();
|
||||
// TODO: Don't unwrap?
|
||||
self.send(ack.into()).await.unwrap();
|
||||
|
||||
(settings, me)
|
||||
},
|
||||
Some(frame) => {
|
||||
panic!("unexpected frame; frame={:?}", frame);
|
||||
},
|
||||
None => {
|
||||
panic!("unexpected EOF");
|
||||
},
|
||||
settings
|
||||
}
|
||||
})
|
||||
.then(|res| {
|
||||
let (settings, me) = res.unwrap();
|
||||
frame => panic!("unexpected frame; frame={:?}", frame),
|
||||
},
|
||||
None => panic!("unexpected EOF"),
|
||||
};
|
||||
let frame = self.next().await;
|
||||
let f = assert_settings!(frame.unwrap().unwrap());
|
||||
|
||||
me.into_future()
|
||||
.map_err(|e| panic!("error: {:?}", e))
|
||||
.map(|(frame, me)| {
|
||||
let f = assert_settings!(frame.unwrap());
|
||||
// Is ACK
|
||||
assert!(f.is_ack());
|
||||
|
||||
// Is ACK
|
||||
assert!(f.is_ack());
|
||||
settings
|
||||
}
|
||||
|
||||
(settings, me)
|
||||
})
|
||||
});
|
||||
pub async fn ping_pong(&mut self, payload: [u8; 8]) {
|
||||
self.send_frame(crate::frames::ping(payload)).await;
|
||||
self.recv_frame(crate::frames::ping(payload).pong()).await;
|
||||
}
|
||||
|
||||
Box::new(ret)
|
||||
pub async fn buffer_bytes(&mut self, num: usize) {
|
||||
// Set tx_rem to num
|
||||
{
|
||||
let mut i = self.codec.get_mut().inner.lock().unwrap();
|
||||
i.tx_rem = num;
|
||||
}
|
||||
|
||||
poll_fn(move |cx| {
|
||||
{
|
||||
let mut inner = self.codec.get_mut().inner.lock().unwrap();
|
||||
if inner.tx_rem == 0 {
|
||||
inner.tx_rem = usize::MAX;
|
||||
} else {
|
||||
inner.tx_task = Some(cx.waker().clone());
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn unbounded_bytes(&mut self) {
|
||||
let mut i = self.codec.get_mut().inner.lock().unwrap();
|
||||
i.tx_rem = usize::MAX;
|
||||
|
||||
if let Some(task) = i.tx_rem_task.take() {
|
||||
task.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Handle {
|
||||
type Item = Frame;
|
||||
type Error = RecvError;
|
||||
type Item = Result<Frame, RecvError>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, RecvError> {
|
||||
self.codec.poll()
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
Pin::new(&mut self.codec).poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Read for Handle {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.codec.get_mut().read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for Handle {}
|
||||
|
||||
impl io::Write for Handle {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.codec.get_mut().write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
impl AsyncRead for Handle {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(self.codec.get_mut()).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for Handle {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
use std::io::Write;
|
||||
tokio_io::try_nb!(self.flush());
|
||||
Ok(().into())
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
Pin::new(self.codec.get_mut()).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Pin::new(self.codec.get_mut()).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
Pin::new(self.codec.get_mut()).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Handle {
|
||||
fn drop(&mut self) {
|
||||
assert!(self.codec.shutdown().unwrap().is_ready());
|
||||
block_on(async {
|
||||
poll_fn(|cx| {
|
||||
assert!(self.codec.shutdown(cx).is_ready());
|
||||
|
||||
let mut me = self.codec.get_mut().inner.lock().unwrap();
|
||||
me.closed = true;
|
||||
let mut me = self.codec.get_mut().inner.lock().unwrap();
|
||||
me.closed = true;
|
||||
|
||||
if let Some(task) = me.rx_task.take() {
|
||||
task.notify();
|
||||
}
|
||||
if let Some(task) = me.rx_task.take() {
|
||||
task.wake();
|
||||
}
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Mock =====
|
||||
|
||||
impl io::Read for Mock {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
impl AsyncRead for Mock {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
assert!(
|
||||
buf.len() > 0,
|
||||
"attempted read with zero length buffer... wut?"
|
||||
@@ -307,34 +359,36 @@ impl io::Read for Mock {
|
||||
|
||||
if me.rx.is_empty() {
|
||||
if me.closed {
|
||||
return Ok(0);
|
||||
return Poll::Ready(Ok(0));
|
||||
}
|
||||
|
||||
me.rx_task = Some(task::current());
|
||||
return Err(WouldBlock.into());
|
||||
me.rx_task = Some(cx.waker().clone());
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
let n = cmp::min(buf.len(), me.rx.len());
|
||||
buf[..n].copy_from_slice(&me.rx[..n]);
|
||||
me.rx.drain(..n);
|
||||
|
||||
Ok(n)
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for Mock {}
|
||||
|
||||
impl io::Write for Mock {
|
||||
fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
|
||||
impl AsyncWrite for Mock {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
mut buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
let mut me = self.pipe.inner.lock().unwrap();
|
||||
|
||||
if me.closed {
|
||||
return Ok(buf.len());
|
||||
return Poll::Ready(Ok(buf.len()));
|
||||
}
|
||||
|
||||
if me.tx_rem == 0 {
|
||||
me.tx_rem_task = Some(task::current());
|
||||
return Err(io::ErrorKind::WouldBlock.into());
|
||||
me.tx_rem_task = Some(cx.waker().clone());
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
if buf.len() > me.tx_rem {
|
||||
@@ -345,22 +399,18 @@ impl io::Write for Mock {
|
||||
me.tx_rem -= buf.len();
|
||||
|
||||
if let Some(task) = me.tx_task.take() {
|
||||
task.notify();
|
||||
task.wake();
|
||||
}
|
||||
|
||||
Ok(buf.len())
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for Mock {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
use std::io::Write;
|
||||
tokio_io::try_nb!(self.flush());
|
||||
Ok(().into())
|
||||
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -370,15 +420,19 @@ impl Drop for Mock {
|
||||
me.closed = true;
|
||||
|
||||
if let Some(task) = me.tx_task.take() {
|
||||
task.notify();
|
||||
task.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Pipe =====
|
||||
|
||||
impl io::Read for Pipe {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
impl AsyncRead for Pipe {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
assert!(
|
||||
buf.len() > 0,
|
||||
"attempted read with zero length buffer... wut?"
|
||||
@@ -386,382 +440,48 @@ impl io::Read for Pipe {
|
||||
|
||||
let mut me = self.inner.lock().unwrap();
|
||||
|
||||
|
||||
if me.tx.is_empty() {
|
||||
if me.closed {
|
||||
return Ok(0);
|
||||
return Poll::Ready(Ok(0));
|
||||
}
|
||||
|
||||
me.tx_task = Some(task::current());
|
||||
return Err(WouldBlock.into());
|
||||
me.tx_task = Some(cx.waker().clone());
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
let n = cmp::min(buf.len(), me.tx.len());
|
||||
buf[..n].copy_from_slice(&me.tx[..n]);
|
||||
me.tx.drain(..n);
|
||||
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for Pipe {}
|
||||
|
||||
impl io::Write for Pipe {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let mut me = self.inner.lock().unwrap();
|
||||
me.rx.extend(buf);
|
||||
|
||||
if let Some(task) = me.rx_task.take() {
|
||||
task.notify();
|
||||
}
|
||||
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for Pipe {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
use std::io::Write;
|
||||
tokio_io::try_nb!(self.flush());
|
||||
Ok(().into())
|
||||
}
|
||||
}
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
let mut me = self.inner.lock().unwrap();
|
||||
me.rx.extend(buf);
|
||||
|
||||
pub trait HandleFutureExt {
|
||||
fn recv_settings(self)
|
||||
-> RecvFrame<Box<dyn Future<Item = (Option<Frame>, Handle), Error = ()>>>
|
||||
where
|
||||
Self: Sized + 'static,
|
||||
Self: Future<Item = (frame::Settings, Handle)>,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
self.recv_custom_settings(frame::Settings::default())
|
||||
}
|
||||
|
||||
fn recv_custom_settings<T>(self, settings: T)
|
||||
-> RecvFrame<Box<dyn Future<Item = (Option<Frame>, Handle), Error = ()>>>
|
||||
where
|
||||
Self: Sized + 'static,
|
||||
Self: Future<Item = (frame::Settings, Handle)>,
|
||||
Self::Error: fmt::Debug,
|
||||
T: Into<frame::Settings>,
|
||||
{
|
||||
let map = self
|
||||
.map(|(settings, handle)| (Some(settings.into()), handle))
|
||||
.unwrap();
|
||||
|
||||
let boxed: Box<dyn Future<Item = (Option<Frame>, Handle), Error = ()>> =
|
||||
Box::new(map);
|
||||
RecvFrame {
|
||||
inner: boxed,
|
||||
frame: Some(settings.into().into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn ignore_settings(self) -> Box<dyn Future<Item = Handle, Error = ()>>
|
||||
where
|
||||
Self: Sized + 'static,
|
||||
Self: Future<Item = (frame::Settings, Handle)>,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
Box::new(self.map(|(_settings, handle)| handle).unwrap())
|
||||
}
|
||||
|
||||
fn recv_frame<T>(self, frame: T) -> RecvFrame<<Self as IntoRecvFrame>::Future>
|
||||
where
|
||||
Self: IntoRecvFrame + Sized,
|
||||
T: Into<Frame>,
|
||||
{
|
||||
self.into_recv_frame(Some(frame.into()))
|
||||
}
|
||||
|
||||
fn recv_eof(self) -> RecvFrame<<Self as IntoRecvFrame>::Future>
|
||||
where
|
||||
Self: IntoRecvFrame + Sized,
|
||||
{
|
||||
self.into_recv_frame(None)
|
||||
}
|
||||
|
||||
fn send_frame<T>(self, frame: T) -> SendFrameFut<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
T: Into<SendFrame>,
|
||||
{
|
||||
SendFrameFut {
|
||||
inner: self,
|
||||
frame: Some(frame.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn send_bytes(self, data: &[u8]) -> Box<dyn Future<Item = Handle, Error = Self::Error>>
|
||||
where
|
||||
Self: Future<Item = Handle> + Sized + 'static,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
use bytes::Buf;
|
||||
use futures::future::poll_fn;
|
||||
use std::io::Cursor;
|
||||
|
||||
let buf: Vec<_> = data.into();
|
||||
let mut buf = Cursor::new(buf);
|
||||
|
||||
Box::new(self.and_then(move |handle| {
|
||||
let mut handle = Some(handle);
|
||||
|
||||
poll_fn(move || {
|
||||
while buf.has_remaining() {
|
||||
let res = handle.as_mut().unwrap()
|
||||
.codec.get_mut()
|
||||
.write_buf(&mut buf)
|
||||
.map_err(|e| panic!("write err={:?}", e));
|
||||
|
||||
futures::try_ready!(res);
|
||||
}
|
||||
|
||||
Ok(handle.take().unwrap().into())
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
fn ping_pong(self, payload: [u8; 8]) -> RecvFrame<<SendFrameFut<Self> as IntoRecvFrame>::Future>
|
||||
where
|
||||
Self: Future<Item=Handle> + Sized + 'static,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
self.send_frame(frames::ping(payload))
|
||||
.recv_frame(frames::ping(payload).pong())
|
||||
}
|
||||
|
||||
fn idle_ms(self, ms: usize) -> Box<dyn Future<Item = Handle, Error = Self::Error>>
|
||||
where
|
||||
Self: Sized + 'static,
|
||||
Self: Future<Item = Handle>,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
|
||||
Box::new(self.and_then(move |handle| {
|
||||
// This is terrible... but oh well
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
thread::sleep(Duration::from_millis(ms as u64));
|
||||
tx.send(()).unwrap();
|
||||
});
|
||||
|
||||
Idle {
|
||||
handle: Some(handle),
|
||||
timeout: rx,
|
||||
}.map_err(|_| unreachable!())
|
||||
}))
|
||||
}
|
||||
|
||||
fn buffer_bytes(self, num: usize) -> Box<dyn Future<Item = Handle, Error = Self::Error>>
|
||||
where Self: Sized + 'static,
|
||||
Self: Future<Item = Handle>,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
use futures::future::poll_fn;
|
||||
|
||||
Box::new(self.and_then(move |mut handle| {
|
||||
// Set tx_rem to num
|
||||
{
|
||||
let mut i = handle.codec.get_mut().inner.lock().unwrap();
|
||||
i.tx_rem = num;
|
||||
}
|
||||
|
||||
let mut handle = Some(handle);
|
||||
|
||||
poll_fn(move || {
|
||||
{
|
||||
let mut inner = handle.as_mut().unwrap()
|
||||
.codec.get_mut().inner.lock().unwrap();
|
||||
|
||||
if inner.tx_rem == 0 {
|
||||
inner.tx_rem = usize::MAX;
|
||||
} else {
|
||||
inner.tx_task = Some(task::current());
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(handle.take().unwrap().into())
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
fn unbounded_bytes(self) -> Box<dyn Future<Item = Handle, Error = Self::Error>>
|
||||
where Self: Sized + 'static,
|
||||
Self: Future<Item = Handle>,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
Box::new(self.and_then(|mut handle| {
|
||||
{
|
||||
let mut i = handle.codec.get_mut().inner.lock().unwrap();
|
||||
i.tx_rem = usize::MAX;
|
||||
|
||||
if let Some(task) = i.tx_rem_task.take() {
|
||||
task.notify();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(handle.into())
|
||||
}))
|
||||
}
|
||||
|
||||
fn then_notify(self, tx: oneshot::Sender<()>) -> Box<dyn Future<Item = Handle, Error = Self::Error>>
|
||||
where Self: Sized + 'static,
|
||||
Self: Future<Item = Handle>,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
Box::new(self.map(move |handle| {
|
||||
tx.send(()).unwrap();
|
||||
handle
|
||||
}))
|
||||
}
|
||||
|
||||
fn wait_for<F>(self, other: F) -> Box<dyn Future<Item = Self::Item, Error = Self::Error>>
|
||||
where
|
||||
F: Future + 'static,
|
||||
Self: Future + Sized + 'static
|
||||
{
|
||||
Box::new(self.then(move |result| {
|
||||
other.then(move |_| result)
|
||||
}))
|
||||
}
|
||||
|
||||
fn close(self) -> Box<dyn Future<Item = (), Error = ()>>
|
||||
where
|
||||
Self: Future<Error = ()> + Sized + 'static,
|
||||
{
|
||||
Box::new(self.map(drop))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RecvFrame<T> {
|
||||
inner: T,
|
||||
frame: Option<Frame>,
|
||||
}
|
||||
|
||||
impl<T> Future for RecvFrame<T>
|
||||
where
|
||||
T: Future<Item = (Option<Frame>, Handle)>,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
type Item = Handle;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
use self::Frame::Data;
|
||||
|
||||
let (frame, handle) = match self.inner.poll().unwrap() {
|
||||
Async::Ready((frame, handle)) => (frame, handle),
|
||||
Async::NotReady => return Ok(Async::NotReady),
|
||||
};
|
||||
|
||||
match (frame, &self.frame) {
|
||||
(Some(Data(ref a)), &Some(Data(ref b))) => {
|
||||
assert_eq!(a.payload().len(), b.payload().len(), "recv_frame data payload len");
|
||||
assert_eq!(a, b, "recv_frame");
|
||||
}
|
||||
(ref a, b) => {
|
||||
assert_eq!(a, b, "recv_frame");
|
||||
}
|
||||
if let Some(task) = me.rx_task.take() {
|
||||
task.wake();
|
||||
}
|
||||
|
||||
Ok(Async::Ready(handle))
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SendFrameFut<T> {
|
||||
inner: T,
|
||||
frame: Option<SendFrame>,
|
||||
}
|
||||
|
||||
impl<T> Future for SendFrameFut<T>
|
||||
where
|
||||
T: Future<Item = Handle>,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
type Item = Handle;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let mut handle = match self.inner.poll().unwrap() {
|
||||
Async::Ready(handle) => handle,
|
||||
Async::NotReady => return Ok(Async::NotReady),
|
||||
};
|
||||
handle.send(self.frame.take().unwrap()).unwrap();
|
||||
Ok(Async::Ready(handle))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Idle {
|
||||
handle: Option<Handle>,
|
||||
timeout: oneshot::Receiver<()>,
|
||||
}
|
||||
|
||||
impl Future for Idle {
|
||||
type Item = Handle;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
if self.timeout.poll().unwrap().is_ready() {
|
||||
return Ok(self.handle.take().unwrap().into());
|
||||
}
|
||||
|
||||
match self.handle.as_mut().unwrap().poll() {
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
res => {
|
||||
panic!("Idle received unexpected frame on handle; frame={:?}", res);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> HandleFutureExt for T
|
||||
where
|
||||
T: Future + 'static,
|
||||
{
|
||||
}
|
||||
|
||||
pub trait IntoRecvFrame {
|
||||
type Future: Future;
|
||||
fn into_recv_frame(self, frame: Option<Frame>) -> RecvFrame<Self::Future>;
|
||||
}
|
||||
|
||||
impl IntoRecvFrame for Handle {
|
||||
type Future = ::futures::stream::StreamFuture<Self>;
|
||||
|
||||
fn into_recv_frame(self, frame: Option<Frame>) -> RecvFrame<Self::Future> {
|
||||
RecvFrame {
|
||||
inner: self.into_future(),
|
||||
frame: frame,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> IntoRecvFrame for T
|
||||
where
|
||||
T: Future<Item = Handle> + 'static,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
type Future = Box<dyn Future<Item = (Option<Frame>, Handle), Error = ()>>;
|
||||
|
||||
fn into_recv_frame(self, frame: Option<Frame>) -> RecvFrame<Self::Future> {
|
||||
let into_fut = Box::new(
|
||||
self.unwrap()
|
||||
.and_then(|handle| handle.into_future().unwrap()),
|
||||
);
|
||||
RecvFrame {
|
||||
inner: into_fut,
|
||||
frame: frame,
|
||||
}
|
||||
}
|
||||
pub async fn idle_ms(ms: u64) {
|
||||
Delay::new(Instant::now() + Duration::from_millis(ms)).await
|
||||
}
|
||||
|
||||
@@ -74,9 +74,9 @@
|
||||
|
||||
#![allow(deprecated)]
|
||||
|
||||
use std::{cmp, io};
|
||||
use std::collections::VecDeque;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{cmp, io};
|
||||
|
||||
/// An I/O handle that follows a predefined script.
|
||||
///
|
||||
@@ -85,13 +85,12 @@ use std::time::{Duration, Instant};
|
||||
#[derive(Debug)]
|
||||
pub struct Mock {
|
||||
inner: Inner,
|
||||
tokio: tokio::Inner,
|
||||
r#async: Option<bool>,
|
||||
tokio: tokio_::Inner,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Handle {
|
||||
inner: tokio::Handle,
|
||||
inner: tokio_::Handle,
|
||||
}
|
||||
|
||||
/// Builds `Mock` instances.
|
||||
@@ -99,9 +98,6 @@ pub struct Handle {
|
||||
pub struct Builder {
|
||||
// Sequence of actions for the Mock to take
|
||||
actions: VecDeque<Action>,
|
||||
|
||||
// true for Tokio, false for blocking, None to auto detect
|
||||
r#async: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -159,7 +155,7 @@ impl Builder {
|
||||
|
||||
/// Build a `Mock` value paired with a handle
|
||||
pub fn build_with_handle(&mut self) -> (Mock, Handle) {
|
||||
let (tokio, handle) = tokio::Inner::new();
|
||||
let (tokio, handle) = tokio_::Inner::new();
|
||||
|
||||
let src = self.clone();
|
||||
|
||||
@@ -169,7 +165,6 @@ impl Builder {
|
||||
waiting: None,
|
||||
},
|
||||
tokio: tokio,
|
||||
r#async: src.r#async,
|
||||
};
|
||||
|
||||
let handle = Handle { inner: handle };
|
||||
@@ -198,45 +193,10 @@ impl Handle {
|
||||
}
|
||||
}
|
||||
|
||||
impl Mock {
|
||||
fn sync_read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
|
||||
use std::thread;
|
||||
|
||||
loop {
|
||||
match self.inner.read(dst) {
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
if let Some(rem) = self.inner.remaining_wait() {
|
||||
thread::sleep(rem);
|
||||
} else {
|
||||
// We've entered a dead lock scenario. The peer expects
|
||||
// a write but we are reading.
|
||||
panic!("mock_io::Mock expects write but currently blocked in read");
|
||||
}
|
||||
}
|
||||
ret => return ret,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn sync_write(&mut self, src: &[u8]) -> io::Result<usize> {
|
||||
match self.inner.write(src) {
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
panic!("mock_io::Mock not currently expecting a write");
|
||||
}
|
||||
ret => ret,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if running in a futures-rs task context
|
||||
fn is_async(&self) -> bool {
|
||||
self.r#async.unwrap_or(tokio::is_task_ctx())
|
||||
}
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
|
||||
match self.action() {
|
||||
Some(&mut Action::Read(ref mut data)) =>{
|
||||
Some(&mut Action::Read(ref mut data)) => {
|
||||
// Figure out how much to copy
|
||||
let n = cmp::min(dst.len(), data.len());
|
||||
|
||||
@@ -253,9 +213,7 @@ impl Inner {
|
||||
// Either waiting or expecting a write
|
||||
Err(io::ErrorKind::WouldBlock.into())
|
||||
}
|
||||
None => {
|
||||
Ok(0)
|
||||
}
|
||||
None => Ok(0),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -347,55 +305,25 @@ impl Inner {
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Read for Mock {
|
||||
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
|
||||
if self.is_async() {
|
||||
tokio::async_read(self, dst)
|
||||
} else {
|
||||
self.sync_read(dst)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Write for Mock {
|
||||
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
|
||||
if self.is_async() {
|
||||
tokio::async_write(self, src)
|
||||
} else {
|
||||
self.sync_write(src)
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// use tokio::*;
|
||||
|
||||
mod tokio {
|
||||
mod tokio_ {
|
||||
use super::*;
|
||||
|
||||
use futures::{Future, Stream, Poll, Async};
|
||||
use futures::sync::mpsc;
|
||||
use futures::task::{self, Task};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_timer::{Timer, Sleep};
|
||||
use futures::channel::mpsc;
|
||||
use futures::{ready, FutureExt, Stream};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::timer::Delay;
|
||||
|
||||
use std::pin::Pin;
|
||||
|
||||
use std::io;
|
||||
|
||||
impl Builder {
|
||||
pub fn set_async(&mut self, is_async: bool) -> &mut Self {
|
||||
self.r#async = Some(is_async);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Inner {
|
||||
timer: Timer,
|
||||
sleep: Option<Sleep>,
|
||||
read_wait: Option<Task>,
|
||||
sleep: Option<Delay>,
|
||||
read_wait: Option<Waker>,
|
||||
rx: mpsc::UnboundedReceiver<Action>,
|
||||
}
|
||||
|
||||
@@ -408,11 +336,11 @@ mod tokio {
|
||||
|
||||
impl Handle {
|
||||
pub fn read(&mut self, buf: &[u8]) {
|
||||
mpsc::UnboundedSender::send(&mut self.tx, Action::Read(buf.into())).unwrap();
|
||||
self.tx.unbounded_send(Action::Read(buf.into())).unwrap();
|
||||
}
|
||||
|
||||
pub fn write(&mut self, buf: &[u8]) {
|
||||
mpsc::UnboundedSender::send(&mut self.tx, Action::Write(buf.into())).unwrap();
|
||||
self.tx.unbounded_send(Action::Write(buf.into())).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -420,16 +348,9 @@ mod tokio {
|
||||
|
||||
impl Inner {
|
||||
pub fn new() -> (Inner, Handle) {
|
||||
// TODO: We probably want a higher resolution timer.
|
||||
let timer = tokio_timer::wheel()
|
||||
.tick_duration(Duration::from_millis(1))
|
||||
.max_timeout(Duration::from_secs(3600))
|
||||
.build();
|
||||
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
let inner = Inner {
|
||||
timer: timer,
|
||||
sleep: None,
|
||||
read_wait: None,
|
||||
rx: rx,
|
||||
@@ -440,8 +361,8 @@ mod tokio {
|
||||
(inner, handle)
|
||||
}
|
||||
|
||||
pub(super) fn poll_action(&mut self) -> Poll<Option<Action>, ()> {
|
||||
self.rx.poll()
|
||||
pub(super) fn poll_action(&mut self, cx: &mut Context) -> Poll<Option<Action>> {
|
||||
Pin::new(&mut self.rx).poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -450,7 +371,7 @@ mod tokio {
|
||||
match self.inner.action() {
|
||||
Some(&mut Action::Read(_)) | None => {
|
||||
if let Some(task) = self.tokio.read_wait.take() {
|
||||
task.notify();
|
||||
task.wake();
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
@@ -458,106 +379,113 @@ mod tokio {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn async_read(me: &mut Mock, dst: &mut [u8]) -> io::Result<usize> {
|
||||
loop {
|
||||
if let Some(ref mut sleep) = me.tokio.sleep {
|
||||
let res = r#try!(sleep.poll());
|
||||
|
||||
if !res.is_ready() {
|
||||
return Err(io::ErrorKind::WouldBlock.into());
|
||||
}
|
||||
}
|
||||
|
||||
// If a sleep is set, it has already fired
|
||||
me.tokio.sleep = None;
|
||||
|
||||
match me.inner.read(dst) {
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
if let Some(rem) = me.inner.remaining_wait() {
|
||||
me.tokio.sleep = Some(me.tokio.timer.sleep(rem));
|
||||
} else {
|
||||
me.tokio.read_wait = Some(task::current());
|
||||
return Err(io::ErrorKind::WouldBlock.into());
|
||||
}
|
||||
}
|
||||
Ok(0) => {
|
||||
// TODO: Extract
|
||||
match me.tokio.poll_action().unwrap() {
|
||||
Async::Ready(Some(action)) => {
|
||||
me.inner.actions.push_back(action);
|
||||
continue;
|
||||
}
|
||||
Async::Ready(None) => {
|
||||
return Ok(0);
|
||||
}
|
||||
Async::NotReady => {
|
||||
return Err(io::ErrorKind::WouldBlock.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
ret => return ret,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn async_write(me: &mut Mock, src: &[u8]) -> io::Result<usize> {
|
||||
loop {
|
||||
if let Some(ref mut sleep) = me.tokio.sleep {
|
||||
let res = r#try!(sleep.poll());
|
||||
|
||||
if !res.is_ready() {
|
||||
return Err(io::ErrorKind::WouldBlock.into());
|
||||
}
|
||||
}
|
||||
|
||||
// If a sleep is set, it has already fired
|
||||
me.tokio.sleep = None;
|
||||
|
||||
match me.inner.write(src) {
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
if let Some(rem) = me.inner.remaining_wait() {
|
||||
me.tokio.sleep = Some(me.tokio.timer.sleep(rem));
|
||||
} else {
|
||||
panic!("unexpected WouldBlock");
|
||||
}
|
||||
}
|
||||
Ok(0) => {
|
||||
// TODO: Is this correct?
|
||||
if !me.inner.actions.is_empty() {
|
||||
return Err(io::ErrorKind::WouldBlock.into());
|
||||
}
|
||||
|
||||
// TODO: Extract
|
||||
match me.tokio.poll_action().unwrap() {
|
||||
Async::Ready(Some(action)) => {
|
||||
me.inner.actions.push_back(action);
|
||||
continue;
|
||||
}
|
||||
Async::Ready(None) => {
|
||||
panic!("unexpected write");
|
||||
}
|
||||
Async::NotReady => {
|
||||
return Err(io::ErrorKind::WouldBlock.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
ret => {
|
||||
me.maybe_wakeup_reader();
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for Mock {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
loop {
|
||||
if let Some(sleep) = &mut self.tokio.sleep {
|
||||
ready!(sleep.poll_unpin(cx));
|
||||
}
|
||||
|
||||
// If a sleep is set, it has already fired
|
||||
self.tokio.sleep = None;
|
||||
|
||||
match self.inner.read(buf) {
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
if let Some(rem) = self.inner.remaining_wait() {
|
||||
self.tokio.sleep = Some(Delay::new(Instant::now() + rem));
|
||||
} else {
|
||||
self.tokio.read_wait = Some(cx.waker().clone());
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
Ok(0) => {
|
||||
// TODO: Extract
|
||||
match self.tokio.poll_action(cx) {
|
||||
Poll::Ready(Some(action)) => {
|
||||
self.inner.actions.push_back(action);
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
return Poll::Ready(Ok(0));
|
||||
}
|
||||
Poll::Pending => {
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
ret => return Poll::Ready(ret),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for Mock {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
Ok(Async::Ready(()))
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
loop {
|
||||
if let Some(sleep) = &mut self.tokio.sleep {
|
||||
ready!(sleep.poll_unpin(cx));
|
||||
}
|
||||
|
||||
// If a sleep is set, it has already fired
|
||||
self.tokio.sleep = None;
|
||||
|
||||
match self.inner.write(buf) {
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
if let Some(rem) = self.inner.remaining_wait() {
|
||||
self.tokio.sleep = Some(Delay::new(Instant::now() + rem));
|
||||
} else {
|
||||
panic!("unexpected WouldBlock");
|
||||
}
|
||||
}
|
||||
Ok(0) => {
|
||||
// TODO: Is this correct?
|
||||
if !self.inner.actions.is_empty() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
// TODO: Extract
|
||||
match self.tokio.poll_action(cx) {
|
||||
Poll::Ready(Some(action)) => {
|
||||
self.inner.actions.push_back(action);
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
panic!("unexpected write");
|
||||
}
|
||||
Poll::Pending => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
ret => {
|
||||
self.maybe_wakeup_reader();
|
||||
return Poll::Ready(ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
TODO: Is this required?
|
||||
|
||||
/// Returns `true` if called from the context of a futures-rs Task
|
||||
pub fn is_task_ctx() -> bool {
|
||||
use std::panic;
|
||||
@@ -577,4 +505,5 @@ mod tokio {
|
||||
// Return the result
|
||||
r
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use futures::executor::{self, Notify};
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering::SeqCst;
|
||||
@@ -21,22 +19,22 @@ impl MockNotify {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with<F: FnOnce() -> R, R>(&self, f: F) -> R {
|
||||
use futures::Async::Ready;
|
||||
use futures::future::poll_fn;
|
||||
pub fn with<F: FnOnce() -> R, R>(&self, _f: F) -> R {
|
||||
unimplemented!();
|
||||
// use futures::future::poll_fn;
|
||||
|
||||
self.clear();
|
||||
// self.clear();
|
||||
|
||||
let mut f = Some(f);
|
||||
// let mut f = Some(f);
|
||||
|
||||
let res = executor::spawn(poll_fn(move || {
|
||||
Ok::<_, ()>(Ready(f.take().unwrap()()))
|
||||
})).poll_future_notify(&self.inner, 0);
|
||||
// let res = tokio::spawn(poll_fn(move |cx| {
|
||||
// Poll::Ready(f.take().unwrap()())
|
||||
// })).poll_future_notify(&self.inner, 0);
|
||||
|
||||
match res {
|
||||
Ok(Ready(v)) => v,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
// match res {
|
||||
// Poll::Ready(v) => v,
|
||||
// _ => unreachable!(),
|
||||
// }
|
||||
}
|
||||
|
||||
pub fn clear(&self) {
|
||||
@@ -48,8 +46,8 @@ impl MockNotify {
|
||||
}
|
||||
}
|
||||
|
||||
impl Notify for Inner {
|
||||
fn notify(&self, _: usize) {
|
||||
self.notified.store(true, SeqCst);
|
||||
}
|
||||
}
|
||||
// impl Notify for Inner {
|
||||
// fn notify(&self, _: usize) {
|
||||
// self.notified.store(true, SeqCst);
|
||||
// }
|
||||
// }
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
|
||||
// Re-export H2 crate
|
||||
pub use h2;
|
||||
|
||||
pub use h2::*;
|
||||
pub use h2::client;
|
||||
pub use h2::frame::StreamId;
|
||||
pub use h2::server;
|
||||
pub use h2::*;
|
||||
|
||||
// Re-export mock
|
||||
pub use super::mock::{self, HandleFutureExt};
|
||||
pub use super::mock::{self, idle_ms};
|
||||
|
||||
// Re-export frames helpers
|
||||
pub use super::frames;
|
||||
@@ -23,28 +22,32 @@ pub use super::util;
|
||||
pub use super::{Codec, SendFrame};
|
||||
|
||||
// Re-export macros
|
||||
pub use super::{assert_ping, assert_data, assert_headers, assert_closed,
|
||||
raw_codec, poll_frame, poll_err};
|
||||
pub use super::{
|
||||
assert_closed, assert_data, assert_default_settings, assert_headers, assert_ping, poll_err,
|
||||
poll_frame, raw_codec,
|
||||
};
|
||||
|
||||
pub use super::assert::assert_frame_eq;
|
||||
|
||||
// Re-export useful crates
|
||||
pub use {bytes, env_logger, futures, http, tokio_io};
|
||||
pub use super::mock_io;
|
||||
pub use {bytes, env_logger, futures, http, tokio::io as tokio_io};
|
||||
|
||||
// Re-export primary future types
|
||||
pub use futures::{Future, IntoFuture, Sink, Stream};
|
||||
pub use futures::{Future, Sink, Stream};
|
||||
|
||||
// And our Future extensions
|
||||
pub use super::future_ext::{FutureExt, Unwrap};
|
||||
pub use super::future_ext::TestFuture;
|
||||
|
||||
// Our client_ext helpers
|
||||
pub use super::client_ext::{SendRequestExt};
|
||||
pub use super::client_ext::SendRequestExt;
|
||||
|
||||
// Re-export HTTP types
|
||||
pub use http::{uri, HeaderMap, Method, Request, Response, StatusCode, Version};
|
||||
|
||||
pub use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf};
|
||||
|
||||
pub use tokio_io::{AsyncRead, AsyncWrite};
|
||||
pub use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
pub use std::thread;
|
||||
pub use std::time::Duration;
|
||||
@@ -52,7 +55,10 @@ pub use std::time::Duration;
|
||||
// ===== Everything under here shouldn't be used =====
|
||||
// TODO: work on deleting this code
|
||||
|
||||
use futures::future;
|
||||
pub use futures::future::poll_fn;
|
||||
use futures::future::Either::*;
|
||||
use std::pin::Pin;
|
||||
|
||||
pub trait MockH2 {
|
||||
fn handshake(&mut self) -> &mut Self;
|
||||
@@ -69,29 +75,33 @@ impl MockH2 for super::mock_io::Builder {
|
||||
}
|
||||
|
||||
pub trait ClientExt {
|
||||
fn run<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error>;
|
||||
fn run<'a, F: Future + Unpin + 'a>(
|
||||
&'a mut self,
|
||||
f: F,
|
||||
) -> Pin<Box<dyn Future<Output = F::Output> + 'a>>;
|
||||
}
|
||||
|
||||
impl<T, B> ClientExt for client::Connection<T, B>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + 'static,
|
||||
B: IntoBuf + 'static,
|
||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
B: IntoBuf + Unpin + 'static,
|
||||
B::Buf: Unpin,
|
||||
{
|
||||
fn run<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> {
|
||||
use futures::future;
|
||||
use futures::future::Either::*;
|
||||
|
||||
let res = future::poll_fn(|| self.poll()).select2(f).wait();
|
||||
|
||||
match res {
|
||||
Ok(A((_, b))) => {
|
||||
// Connection is done...
|
||||
b.wait()
|
||||
},
|
||||
Ok(B((v, _))) => return Ok(v),
|
||||
Err(A((e, _))) => panic!("err: {:?}", e),
|
||||
Err(B((e, _))) => return Err(e),
|
||||
}
|
||||
fn run<'a, F: Future + Unpin + 'a>(
|
||||
&'a mut self,
|
||||
f: F,
|
||||
) -> Pin<Box<dyn Future<Output = F::Output> + 'a>> {
|
||||
let res = future::select(self, f);
|
||||
Box::pin(async {
|
||||
match res.await {
|
||||
Left((Ok(_), b)) => {
|
||||
// Connection is done...
|
||||
b.await
|
||||
}
|
||||
Right((v, _)) => return v,
|
||||
Left((Err(e), _)) => panic!("err: {:?}", e),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,24 +1,28 @@
|
||||
use h2;
|
||||
|
||||
use string::{String, TryFrom};
|
||||
use bytes::Bytes;
|
||||
use futures::{Async, Future, Poll};
|
||||
use futures::ready;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use string::{String, TryFrom};
|
||||
|
||||
pub fn byte_str(s: &str) -> String<Bytes> {
|
||||
String::try_from(Bytes::from(s)).unwrap()
|
||||
}
|
||||
|
||||
pub fn yield_once() -> impl Future<Item=(), Error=()> {
|
||||
pub async fn yield_once() {
|
||||
let mut yielded = false;
|
||||
futures::future::poll_fn(move || {
|
||||
futures::future::poll_fn(move |cx| {
|
||||
if yielded {
|
||||
Ok(Async::Ready(()))
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
yielded = true;
|
||||
futures::task::current().notify();
|
||||
Ok(Async::NotReady)
|
||||
cx.waker().clone().wake();
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
pub fn wait_for_capacity(stream: h2::SendStream<Bytes>, target: usize) -> WaitForCapacity {
|
||||
@@ -40,18 +44,17 @@ impl WaitForCapacity {
|
||||
}
|
||||
|
||||
impl Future for WaitForCapacity {
|
||||
type Item = h2::SendStream<Bytes>;
|
||||
type Error = ();
|
||||
type Output = h2::SendStream<Bytes>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, ()> {
|
||||
let _ = futures::try_ready!(self.stream().poll_capacity().map_err(|_| panic!()));
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let _ = ready!(self.stream().poll_capacity(cx)).unwrap();
|
||||
|
||||
let act = self.stream().capacity();
|
||||
|
||||
if act >= self.target {
|
||||
return Ok(self.stream.take().unwrap().into());
|
||||
return Poll::Ready(self.stream.take().unwrap().into());
|
||||
}
|
||||
|
||||
Ok(Async::NotReady)
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user