diff --git a/tests/h2-support/Cargo.toml b/tests/h2-support/Cargo.toml index b30c227..f062ad3 100644 --- a/tests/h2-support/Cargo.toml +++ b/tests/h2-support/Cargo.toml @@ -9,8 +9,7 @@ h2 = { path = "../..", features = ["unstable"] } bytes = "0.4.7" env_logger = "0.5.9" -futures = "0.1.21" +futures-preview = "0.3.0-alpha.18" http = "0.1.5" string = "0.2" -tokio-io = "0.1.6" -tokio-timer = "0.1.2" +tokio = { git = "https://github.com/tokio-rs/tokio" } diff --git a/tests/h2-support/src/assert.rs b/tests/h2-support/src/assert.rs index eeb89db..8bc6d25 100644 --- a/tests/h2-support/src/assert.rs +++ b/tests/h2-support/src/assert.rs @@ -1,9 +1,10 @@ - #[macro_export] macro_rules! assert_closed { ($transport:expr) => {{ - assert_eq!($transport.poll().unwrap(), None.into()); - }} + use futures::StreamExt; + + assert!($transport.next().await.is_none()); + }}; } #[macro_export] @@ -13,7 +14,7 @@ macro_rules! assert_headers { h2::frame::Frame::Headers(v) => v, f => panic!("expected HEADERS; actual={:?}", f), } - }} + }}; } #[macro_export] @@ -23,7 +24,7 @@ macro_rules! assert_data { h2::frame::Frame::Data(v) => v, f => panic!("expected DATA; actual={:?}", f), } - }} + }}; } #[macro_export] @@ -33,7 +34,7 @@ macro_rules! assert_ping { h2::frame::Frame::Ping(v) => v, f => panic!("expected PING; actual={:?}", f), } - }} + }}; } #[macro_export] @@ -43,28 +44,56 @@ macro_rules! assert_settings { h2::frame::Frame::Settings(v) => v, f => panic!("expected SETTINGS; actual={:?}", f), } - }} + }}; } #[macro_export] macro_rules! poll_err { ($transport:expr) => {{ - match $transport.poll() { - Err(e) => e, + use futures::StreamExt; + match $transport.next().await { + Some(Err(e)) => e, frame => panic!("expected error; actual={:?}", frame), } - }} + }}; } #[macro_export] macro_rules! poll_frame { ($type: ident, $transport:expr) => {{ + use futures::StreamExt; use h2::frame::Frame; - use futures::Async; - match $transport.poll() { - Ok(Async::Ready(Some(Frame::$type(frame)))) => frame, + match $transport.next().await { + Some(Ok(Frame::$type(frame))) => frame, frame => panic!("unexpected frame; actual={:?}", frame), } - }} + }}; +} + +#[macro_export] +macro_rules! assert_default_settings { + ($settings: expr) => {{ + assert_frame_eq($settings, frame::Settings::default()); + }}; +} + +use h2::frame::Frame; + +pub fn assert_frame_eq, U: Into>(t: T, u: U) { + let actual: Frame = t.into(); + let expected: Frame = u.into(); + match (actual, expected) { + (Frame::Data(a), Frame::Data(b)) => { + assert_eq!( + a.payload().len(), + b.payload().len(), + "assert_frame_eq data payload len" + ); + assert_eq!(a, b, "assert_frame_eq"); + } + (a, b) => { + assert_eq!(a, b, "assert_frame_eq"); + } + } } diff --git a/tests/h2-support/src/client_ext.rs b/tests/h2-support/src/client_ext.rs index 9a4d6f9..4320301 100644 --- a/tests/h2-support/src/client_ext.rs +++ b/tests/h2-support/src/client_ext.rs @@ -11,8 +11,8 @@ pub trait SendRequestExt { impl SendRequestExt for SendRequest where - B: IntoBuf, - B::Buf: 'static, + B: IntoBuf + Unpin, + B::Buf: Unpin + 'static, { fn get(&mut self, uri: &str) -> ResponseFuture { let req = Request::builder() diff --git a/tests/h2-support/src/future_ext.rs b/tests/h2-support/src/future_ext.rs index f08710d..9f659b3 100644 --- a/tests/h2-support/src/future_ext.rs +++ b/tests/h2-support/src/future_ext.rs @@ -1,220 +1,54 @@ -use futures::{Async, Future, Poll}; - -use std::fmt; +use futures::FutureExt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; /// Future extension helpers that are useful for tests -pub trait FutureExt: Future { - /// Panic on error - fn unwrap(self) -> Unwrap - where - Self: Sized, - Self::Error: fmt::Debug, - { - Unwrap { - inner: self, - } - } - - /// Panic on success, yielding the content of an `Err`. - fn unwrap_err(self) -> UnwrapErr - where - Self: Sized, - Self::Error: fmt::Debug, - { - UnwrapErr { - inner: self, - } - } - - /// Panic on success, with a message. - fn expect_err(self, msg: T) -> ExpectErr - where - Self: Sized, - Self::Error: fmt::Debug, - T: fmt::Display, - { - ExpectErr{ - inner: self, - msg: msg.to_string(), - } - } - - /// Panic on error, with a message. - fn expect(self, msg: T) -> Expect - where - Self: Sized, - Self::Error: fmt::Debug, - T: fmt::Display, - { - Expect { - inner: self, - msg: msg.to_string(), - } - } - +pub trait TestFuture: Future { /// Drive `other` by polling `self`. /// /// `self` must not resolve before `other` does. - fn drive(self, other: T) -> Drive + fn drive(&mut self, other: T) -> Drive<'_, Self, T> where T: Future, - T::Error: fmt::Debug, - Self: Future + Sized, - Self::Error: fmt::Debug, + Self: Future + Sized, { Drive { - driver: Some(self), - future: other, - } - } - - /// Wrap this future in one that will yield NotReady once before continuing. - /// - /// This allows the executor to poll other futures before trying this one - /// again. - fn yield_once(self) -> Box> - where - Self: Future + Sized + 'static, - { - Box::new(super::util::yield_once().then(move |_| self)) - } -} - -impl FutureExt for T {} - -// ===== Unwrap ====== - -/// Panic on error -pub struct Unwrap { - inner: T, -} - -impl Future for Unwrap -where - T: Future, - T::Item: fmt::Debug, - T::Error: fmt::Debug, -{ - type Item = T::Item; - type Error = (); - - fn poll(&mut self) -> Poll { - Ok(self.inner.poll().unwrap()) - } -} - -// ===== UnwrapErr ====== - -/// Panic on success. -pub struct UnwrapErr { - inner: T, -} - -impl Future for UnwrapErr -where - T: Future, - T::Item: fmt::Debug, - T::Error: fmt::Debug, -{ - type Item = T::Error; - type Error = (); - - fn poll(&mut self) -> Poll { - match self.inner.poll() { - Ok(Async::Ready(v)) => panic!("Future::unwrap_err() on an Ok value: {:?}", v), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => Ok(Async::Ready(e)), + driver: self, + future: Box::pin(other), } } } - - -// ===== Expect ====== - -/// Panic on error -pub struct Expect { - inner: T, - msg: String, -} - -impl Future for Expect -where - T: Future, - T::Item: fmt::Debug, - T::Error: fmt::Debug, -{ - type Item = T::Item; - type Error = (); - - fn poll(&mut self) -> Poll { - Ok(self.inner.poll().expect(&self.msg)) - } -} - -// ===== ExpectErr ====== - -/// Panic on success -pub struct ExpectErr { - inner: T, - msg: String, -} - -impl Future for ExpectErr -where - T: Future, - T::Item: fmt::Debug, - T::Error: fmt::Debug, -{ - type Item = T::Error; - type Error = (); - - fn poll(&mut self) -> Poll { - match self.inner.poll() { - Ok(Async::Ready(v)) => panic!("{}: {:?}", self.msg, v), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => Ok(Async::Ready(e)), - } - } -} +impl TestFuture for T {} // ===== Drive ====== /// Drive a future to completion while also polling the driver /// /// This is useful for H2 futures that also require the connection to be polled. -pub struct Drive { - driver: Option, - future: U, +pub struct Drive<'a, T, U> { + driver: &'a mut T, + future: Pin>, } -impl Future for Drive +impl<'a, T, U> Future for Drive<'a, T, U> where - T: Future, + T: Future + Unpin, U: Future, - T::Error: fmt::Debug, - U::Error: fmt::Debug, { - type Item = (T, U::Item); - type Error = (); + type Output = U::Output; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut looped = false; - loop { - match self.future.poll() { - Ok(Async::Ready(val)) => { - // Get the driver - let driver = self.driver.take().unwrap(); - - return Ok((driver, val).into()); - }, - Ok(_) => {}, - Err(e) => panic!("unexpected error; {:?}", e), + match self.future.poll_unpin(cx) { + Poll::Ready(val) => return Poll::Ready(val), + Poll::Pending => {} } - match self.driver.as_mut().unwrap().poll() { - Ok(Async::Ready(_)) => { + match self.driver.poll_unpin(cx) { + Poll::Ready(_) => { if looped { // Try polling the future one last time panic!("driver resolved before future") @@ -222,12 +56,11 @@ where looped = true; continue; } - }, - Ok(Async::NotReady) => {}, - Err(e) => panic!("unexpected error; {:?}", e), + } + Poll::Pending => {} } - return Ok(Async::NotReady); + return Poll::Pending; } } } diff --git a/tests/h2-support/src/lib.rs b/tests/h2-support/src/lib.rs index 6b572fb..a8ae0e8 100644 --- a/tests/h2-support/src/lib.rs +++ b/tests/h2-support/src/lib.rs @@ -1,3 +1,4 @@ +#![feature(async_await)] //! Utilities to support tests. #[macro_use] @@ -16,7 +17,7 @@ mod client_ext; mod future_ext; pub use crate::client_ext::{SendRequestExt}; -pub use crate::future_ext::{FutureExt, Unwrap}; +pub use crate::future_ext::TestFuture; pub type WindowSize = usize; pub const DEFAULT_WINDOW_SIZE: WindowSize = (1 << 16) - 1; diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index ea5723c..19ae804 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -1,18 +1,21 @@ -use crate::{frames, FutureExt, SendFrame}; +use crate::SendFrame; -use h2::{self, RecvError, SendError}; use h2::frame::{self, Frame}; +use h2::{self, RecvError, SendError}; -use futures::{Async, Future, Poll, Stream}; -use futures::sync::oneshot; -use futures::task::{self, Task}; +use futures::future::poll_fn; +use futures::{ready, Stream, StreamExt}; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::io::read_exact; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::timer::Delay; -use std::{cmp, fmt, io, usize}; -use std::io::ErrorKind::WouldBlock; +use super::assert::assert_frame_eq; +use futures::executor::block_on; +use std::pin::Pin; use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; +use std::time::{Duration, Instant}; +use std::{cmp, io, usize}; /// A mock I/O #[derive(Debug)] @@ -36,19 +39,19 @@ struct Inner { rx: Vec, /// Notify when data is ready to be received. - rx_task: Option, + rx_task: Option, /// Data written by the `h2` library to be read by the test case. tx: Vec, /// Notify when data is written. This notifies the test case waiters. - tx_task: Option, + tx_task: Option, - /// Number of bytes that can be written before `write` returns `NotReady`. + /// Number of bytes that can be written before `write` returns `Poll::Pending`. tx_rem: usize, /// Task to notify when write capacity becomes available. - tx_rem_task: Option, + tx_rem_task: Option, /// True when the pipe is closed. closed: bool, @@ -80,9 +83,7 @@ pub fn new_with_write_capacity(cap: usize) -> (Mock, Handle) { }; let handle = Handle { - codec: h2::Codec::new(Pipe { - inner, - }), + codec: h2::Codec::new(Pipe { inner }), }; (mock, handle) @@ -97,207 +98,258 @@ impl Handle { } /// Send a frame - pub fn send(&mut self, item: SendFrame) -> Result<(), SendError> { + pub async fn send(&mut self, item: SendFrame) -> Result<(), SendError> { // Queue the frame self.codec.buffer(item).unwrap(); // Flush the frame - assert!(self.codec.flush()?.is_ready()); - + poll_fn(|cx| { + let p = self.codec.flush(cx); + assert!(p.is_ready()); + p + }) + .await?; Ok(()) } /// Writes the client preface - pub fn write_preface(&mut self) { - use std::io::Write; - - // Write the connnection preface - self.codec.get_mut().write(PREFACE).unwrap(); + pub async fn write_preface(&mut self) { + self.codec.get_mut().write_all(PREFACE).await.unwrap(); } /// Read the client preface - pub fn read_preface(self) -> Box> { - let buf = vec![0; PREFACE.len()]; - let ret = read_exact(self, buf).and_then(|(me, buf)| { - assert_eq!(buf, PREFACE); - Ok(me) - }); + pub async fn read_preface(&mut self) -> io::Result<()> { + let mut buf = vec![0u8; PREFACE.len()]; + self.read_exact(&mut buf).await?; + assert_eq!(buf, PREFACE); + Ok(()) + } - Box::new(ret) + pub async fn recv_frame>(&mut self, expected: F) { + let frame = self.next().await.unwrap().unwrap(); + assert_frame_eq(frame, expected); + } + + pub async fn send_frame>(&mut self, frame: F) { + self.send(frame.into()).await.unwrap(); + } + + pub async fn recv_eof(&mut self) { + let frame = self.next().await; + assert!(frame.is_none()); + } + + pub async fn send_bytes(&mut self, data: &[u8]) { + use bytes::Buf; + use std::io::Cursor; + + let buf: Vec<_> = data.into(); + let mut buf = Cursor::new(buf); + + poll_fn(move |cx| { + while buf.has_remaining() { + let res = Pin::new(self.codec.get_mut()) + .poll_write_buf(cx, &mut buf) + .map_err(|e| panic!("write err={:?}", e)); + + ready!(res).unwrap(); + } + + Poll::Ready(()) + }) + .await; } /// Perform the H2 handshake - pub fn assert_client_handshake( - self, - ) -> Box> { + pub async fn assert_client_handshake(&mut self) -> frame::Settings { self.assert_client_handshake_with_settings(frame::Settings::default()) + .await } /// Perform the H2 handshake - pub fn assert_client_handshake_with_settings( - mut self, - settings: T, - ) -> Box> + pub async fn assert_client_handshake_with_settings(&mut self, settings: T) -> frame::Settings where T: Into, { let settings = settings.into(); // Send a settings frame - self.send(settings.into()).unwrap(); + self.send(settings.into()).await.unwrap(); + self.read_preface().await.unwrap(); - let ret = self.read_preface() - .unwrap() - .and_then(|me| me.into_future().unwrap()) - .map(|(frame, mut me)| { - match frame { - Some(Frame::Settings(settings)) => { - // Send the ACK - let ack = frame::Settings::ack(); + let settings = match self.next().await { + Some(frame) => match frame.unwrap() { + Frame::Settings(settings) => { + // Send the ACK + let ack = frame::Settings::ack(); - // TODO: Don't unwrap? - me.send(ack.into()).unwrap(); + // TODO: Don't unwrap? + self.send(ack.into()).await.unwrap(); - (settings, me) - }, - Some(frame) => { - panic!("unexpected frame; frame={:?}", frame); - }, - None => { - panic!("unexpected EOF"); - }, + settings } - }) - .then(|res| { - let (settings, me) = res.unwrap(); + frame => { + panic!("unexpected frame; frame={:?}", frame); + } + }, + None => { + panic!("unexpected EOF"); + } + }; - me.into_future() - .map_err(|_| unreachable!("all previous futures unwrapped")) - .map(|(frame, me)| { - let f = assert_settings!(frame.unwrap()); + let frame = self.next().await.unwrap().unwrap(); + let f = assert_settings!(frame); - // Is ACK - assert!(f.is_ack()); + // Is ACK + assert!(f.is_ack()); - (settings, me) - }) - }); - - Box::new(ret) + settings } - /// Perform the H2 handshake - pub fn assert_server_handshake( - self, - ) -> Box> { + pub async fn assert_server_handshake(&mut self) -> frame::Settings { self.assert_server_handshake_with_settings(frame::Settings::default()) + .await } /// Perform the H2 handshake - pub fn assert_server_handshake_with_settings( - mut self, - settings: T, - ) -> Box> + pub async fn assert_server_handshake_with_settings(&mut self, settings: T) -> frame::Settings where T: Into, { - self.write_preface(); + self.write_preface().await; let settings = settings.into(); - self.send(settings.into()).unwrap(); + self.send(settings.into()).await.unwrap(); - let ret = self.into_future() - .unwrap() - .map(|(frame, mut me)| { - match frame { - Some(Frame::Settings(settings)) => { - // Send the ACK - let ack = frame::Settings::ack(); + let frame = self.next().await; + let settings = match frame { + Some(frame) => match frame.unwrap() { + Frame::Settings(settings) => { + // Send the ACK + let ack = frame::Settings::ack(); - // TODO: Don't unwrap? - me.send(ack.into()).unwrap(); + // TODO: Don't unwrap? + self.send(ack.into()).await.unwrap(); - (settings, me) - }, - Some(frame) => { - panic!("unexpected frame; frame={:?}", frame); - }, - None => { - panic!("unexpected EOF"); - }, + settings } - }) - .then(|res| { - let (settings, me) = res.unwrap(); + frame => panic!("unexpected frame; frame={:?}", frame), + }, + None => panic!("unexpected EOF"), + }; + let frame = self.next().await; + let f = assert_settings!(frame.unwrap().unwrap()); - me.into_future() - .map_err(|e| panic!("error: {:?}", e)) - .map(|(frame, me)| { - let f = assert_settings!(frame.unwrap()); + // Is ACK + assert!(f.is_ack()); - // Is ACK - assert!(f.is_ack()); + settings + } - (settings, me) - }) - }); + pub async fn ping_pong(&mut self, payload: [u8; 8]) { + self.send_frame(crate::frames::ping(payload)).await; + self.recv_frame(crate::frames::ping(payload).pong()).await; + } - Box::new(ret) + pub async fn buffer_bytes(&mut self, num: usize) { + // Set tx_rem to num + { + let mut i = self.codec.get_mut().inner.lock().unwrap(); + i.tx_rem = num; + } + + poll_fn(move |cx| { + { + let mut inner = self.codec.get_mut().inner.lock().unwrap(); + if inner.tx_rem == 0 { + inner.tx_rem = usize::MAX; + } else { + inner.tx_task = Some(cx.waker().clone()); + return Poll::Pending; + } + } + + Poll::Ready(()) + }) + .await; + } + + pub async fn unbounded_bytes(&mut self) { + let mut i = self.codec.get_mut().inner.lock().unwrap(); + i.tx_rem = usize::MAX; + + if let Some(task) = i.tx_rem_task.take() { + task.wake(); + } } } impl Stream for Handle { - type Item = Frame; - type Error = RecvError; + type Item = Result; - fn poll(&mut self) -> Poll, RecvError> { - self.codec.poll() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.codec).poll_next(cx) } } -impl io::Read for Handle { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.codec.get_mut().read(buf) - } -} - -impl AsyncRead for Handle {} - -impl io::Write for Handle { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.codec.get_mut().write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) +impl AsyncRead for Handle { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(self.codec.get_mut()).poll_read(cx, buf) } } impl AsyncWrite for Handle { - fn shutdown(&mut self) -> Poll<(), io::Error> { - use std::io::Write; - tokio_io::try_nb!(self.flush()); - Ok(().into()) + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(self.codec.get_mut()).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(self.codec.get_mut()).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(self.codec.get_mut()).poll_shutdown(cx) } } impl Drop for Handle { fn drop(&mut self) { - assert!(self.codec.shutdown().unwrap().is_ready()); + block_on(async { + poll_fn(|cx| { + assert!(self.codec.shutdown(cx).is_ready()); - let mut me = self.codec.get_mut().inner.lock().unwrap(); - me.closed = true; + let mut me = self.codec.get_mut().inner.lock().unwrap(); + me.closed = true; - if let Some(task) = me.rx_task.take() { - task.notify(); - } + if let Some(task) = me.rx_task.take() { + task.wake(); + } + Poll::Ready(()) + }) + .await; + }); } } // ===== impl Mock ===== -impl io::Read for Mock { - fn read(&mut self, buf: &mut [u8]) -> io::Result { +impl AsyncRead for Mock { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { assert!( buf.len() > 0, "attempted read with zero length buffer... wut?" @@ -307,34 +359,36 @@ impl io::Read for Mock { if me.rx.is_empty() { if me.closed { - return Ok(0); + return Poll::Ready(Ok(0)); } - me.rx_task = Some(task::current()); - return Err(WouldBlock.into()); + me.rx_task = Some(cx.waker().clone()); + return Poll::Pending; } let n = cmp::min(buf.len(), me.rx.len()); buf[..n].copy_from_slice(&me.rx[..n]); me.rx.drain(..n); - Ok(n) + Poll::Ready(Ok(n)) } } -impl AsyncRead for Mock {} - -impl io::Write for Mock { - fn write(&mut self, mut buf: &[u8]) -> io::Result { +impl AsyncWrite for Mock { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: &[u8], + ) -> Poll> { let mut me = self.pipe.inner.lock().unwrap(); if me.closed { - return Ok(buf.len()); + return Poll::Ready(Ok(buf.len())); } if me.tx_rem == 0 { - me.tx_rem_task = Some(task::current()); - return Err(io::ErrorKind::WouldBlock.into()); + me.tx_rem_task = Some(cx.waker().clone()); + return Poll::Pending; } if buf.len() > me.tx_rem { @@ -345,22 +399,18 @@ impl io::Write for Mock { me.tx_rem -= buf.len(); if let Some(task) = me.tx_task.take() { - task.notify(); + task.wake(); } - Ok(buf.len()) + Poll::Ready(Ok(buf.len())) } - fn flush(&mut self) -> io::Result<()> { - Ok(()) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } -} -impl AsyncWrite for Mock { - fn shutdown(&mut self) -> Poll<(), io::Error> { - use std::io::Write; - tokio_io::try_nb!(self.flush()); - Ok(().into()) + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } @@ -370,15 +420,19 @@ impl Drop for Mock { me.closed = true; if let Some(task) = me.tx_task.take() { - task.notify(); + task.wake(); } } } // ===== impl Pipe ===== -impl io::Read for Pipe { - fn read(&mut self, buf: &mut [u8]) -> io::Result { +impl AsyncRead for Pipe { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { assert!( buf.len() > 0, "attempted read with zero length buffer... wut?" @@ -386,382 +440,48 @@ impl io::Read for Pipe { let mut me = self.inner.lock().unwrap(); - if me.tx.is_empty() { if me.closed { - return Ok(0); + return Poll::Ready(Ok(0)); } - me.tx_task = Some(task::current()); - return Err(WouldBlock.into()); + me.tx_task = Some(cx.waker().clone()); + return Poll::Pending; } let n = cmp::min(buf.len(), me.tx.len()); buf[..n].copy_from_slice(&me.tx[..n]); me.tx.drain(..n); - Ok(n) - } -} - -impl AsyncRead for Pipe {} - -impl io::Write for Pipe { - fn write(&mut self, buf: &[u8]) -> io::Result { - let mut me = self.inner.lock().unwrap(); - me.rx.extend(buf); - - if let Some(task) = me.rx_task.take() { - task.notify(); - } - - Ok(buf.len()) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) + Poll::Ready(Ok(n)) } } impl AsyncWrite for Pipe { - fn shutdown(&mut self) -> Poll<(), io::Error> { - use std::io::Write; - tokio_io::try_nb!(self.flush()); - Ok(().into()) - } -} + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut me = self.inner.lock().unwrap(); + me.rx.extend(buf); -pub trait HandleFutureExt { - fn recv_settings(self) - -> RecvFrame, Handle), Error = ()>>> - where - Self: Sized + 'static, - Self: Future, - Self::Error: fmt::Debug, - { - self.recv_custom_settings(frame::Settings::default()) - } - - fn recv_custom_settings(self, settings: T) - -> RecvFrame, Handle), Error = ()>>> - where - Self: Sized + 'static, - Self: Future, - Self::Error: fmt::Debug, - T: Into, - { - let map = self - .map(|(settings, handle)| (Some(settings.into()), handle)) - .unwrap(); - - let boxed: Box, Handle), Error = ()>> = - Box::new(map); - RecvFrame { - inner: boxed, - frame: Some(settings.into().into()), - } - } - - fn ignore_settings(self) -> Box> - where - Self: Sized + 'static, - Self: Future, - Self::Error: fmt::Debug, - { - Box::new(self.map(|(_settings, handle)| handle).unwrap()) - } - - fn recv_frame(self, frame: T) -> RecvFrame<::Future> - where - Self: IntoRecvFrame + Sized, - T: Into, - { - self.into_recv_frame(Some(frame.into())) - } - - fn recv_eof(self) -> RecvFrame<::Future> - where - Self: IntoRecvFrame + Sized, - { - self.into_recv_frame(None) - } - - fn send_frame(self, frame: T) -> SendFrameFut - where - Self: Sized, - T: Into, - { - SendFrameFut { - inner: self, - frame: Some(frame.into()), - } - } - - fn send_bytes(self, data: &[u8]) -> Box> - where - Self: Future + Sized + 'static, - Self::Error: fmt::Debug, - { - use bytes::Buf; - use futures::future::poll_fn; - use std::io::Cursor; - - let buf: Vec<_> = data.into(); - let mut buf = Cursor::new(buf); - - Box::new(self.and_then(move |handle| { - let mut handle = Some(handle); - - poll_fn(move || { - while buf.has_remaining() { - let res = handle.as_mut().unwrap() - .codec.get_mut() - .write_buf(&mut buf) - .map_err(|e| panic!("write err={:?}", e)); - - futures::try_ready!(res); - } - - Ok(handle.take().unwrap().into()) - }) - })) - } - - fn ping_pong(self, payload: [u8; 8]) -> RecvFrame< as IntoRecvFrame>::Future> - where - Self: Future + Sized + 'static, - Self::Error: fmt::Debug, - { - self.send_frame(frames::ping(payload)) - .recv_frame(frames::ping(payload).pong()) - } - - fn idle_ms(self, ms: usize) -> Box> - where - Self: Sized + 'static, - Self: Future, - Self::Error: fmt::Debug, - { - use std::thread; - use std::time::Duration; - - - Box::new(self.and_then(move |handle| { - // This is terrible... but oh well - let (tx, rx) = oneshot::channel(); - - thread::spawn(move || { - thread::sleep(Duration::from_millis(ms as u64)); - tx.send(()).unwrap(); - }); - - Idle { - handle: Some(handle), - timeout: rx, - }.map_err(|_| unreachable!()) - })) - } - - fn buffer_bytes(self, num: usize) -> Box> - where Self: Sized + 'static, - Self: Future, - Self::Error: fmt::Debug, - { - use futures::future::poll_fn; - - Box::new(self.and_then(move |mut handle| { - // Set tx_rem to num - { - let mut i = handle.codec.get_mut().inner.lock().unwrap(); - i.tx_rem = num; - } - - let mut handle = Some(handle); - - poll_fn(move || { - { - let mut inner = handle.as_mut().unwrap() - .codec.get_mut().inner.lock().unwrap(); - - if inner.tx_rem == 0 { - inner.tx_rem = usize::MAX; - } else { - inner.tx_task = Some(task::current()); - return Ok(Async::NotReady); - } - } - - Ok(handle.take().unwrap().into()) - }) - })) - } - - fn unbounded_bytes(self) -> Box> - where Self: Sized + 'static, - Self: Future, - Self::Error: fmt::Debug, - { - Box::new(self.and_then(|mut handle| { - { - let mut i = handle.codec.get_mut().inner.lock().unwrap(); - i.tx_rem = usize::MAX; - - if let Some(task) = i.tx_rem_task.take() { - task.notify(); - } - } - - Ok(handle.into()) - })) - } - - fn then_notify(self, tx: oneshot::Sender<()>) -> Box> - where Self: Sized + 'static, - Self: Future, - Self::Error: fmt::Debug, - { - Box::new(self.map(move |handle| { - tx.send(()).unwrap(); - handle - })) - } - - fn wait_for(self, other: F) -> Box> - where - F: Future + 'static, - Self: Future + Sized + 'static - { - Box::new(self.then(move |result| { - other.then(move |_| result) - })) - } - - fn close(self) -> Box> - where - Self: Future + Sized + 'static, - { - Box::new(self.map(drop)) - } -} - -pub struct RecvFrame { - inner: T, - frame: Option, -} - -impl Future for RecvFrame -where - T: Future, Handle)>, - T::Error: fmt::Debug, -{ - type Item = Handle; - type Error = (); - - fn poll(&mut self) -> Poll { - use self::Frame::Data; - - let (frame, handle) = match self.inner.poll().unwrap() { - Async::Ready((frame, handle)) => (frame, handle), - Async::NotReady => return Ok(Async::NotReady), - }; - - match (frame, &self.frame) { - (Some(Data(ref a)), &Some(Data(ref b))) => { - assert_eq!(a.payload().len(), b.payload().len(), "recv_frame data payload len"); - assert_eq!(a, b, "recv_frame"); - } - (ref a, b) => { - assert_eq!(a, b, "recv_frame"); - } + if let Some(task) = me.rx_task.take() { + task.wake(); } - Ok(Async::Ready(handle)) + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } -pub struct SendFrameFut { - inner: T, - frame: Option, -} - -impl Future for SendFrameFut -where - T: Future, - T::Error: fmt::Debug, -{ - type Item = Handle; - type Error = (); - - fn poll(&mut self) -> Poll { - let mut handle = match self.inner.poll().unwrap() { - Async::Ready(handle) => handle, - Async::NotReady => return Ok(Async::NotReady), - }; - handle.send(self.frame.take().unwrap()).unwrap(); - Ok(Async::Ready(handle)) - } -} - -pub struct Idle { - handle: Option, - timeout: oneshot::Receiver<()>, -} - -impl Future for Idle { - type Item = Handle; - type Error = (); - - fn poll(&mut self) -> Poll { - if self.timeout.poll().unwrap().is_ready() { - return Ok(self.handle.take().unwrap().into()); - } - - match self.handle.as_mut().unwrap().poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - res => { - panic!("Idle received unexpected frame on handle; frame={:?}", res); - }, - } - } -} - -impl HandleFutureExt for T -where - T: Future + 'static, -{ -} - -pub trait IntoRecvFrame { - type Future: Future; - fn into_recv_frame(self, frame: Option) -> RecvFrame; -} - -impl IntoRecvFrame for Handle { - type Future = ::futures::stream::StreamFuture; - - fn into_recv_frame(self, frame: Option) -> RecvFrame { - RecvFrame { - inner: self.into_future(), - frame: frame, - } - } -} - -impl IntoRecvFrame for T -where - T: Future + 'static, - T::Error: fmt::Debug, -{ - type Future = Box, Handle), Error = ()>>; - - fn into_recv_frame(self, frame: Option) -> RecvFrame { - let into_fut = Box::new( - self.unwrap() - .and_then(|handle| handle.into_future().unwrap()), - ); - RecvFrame { - inner: into_fut, - frame: frame, - } - } +pub async fn idle_ms(ms: u64) { + Delay::new(Instant::now() + Duration::from_millis(ms)).await } diff --git a/tests/h2-support/src/mock_io.rs b/tests/h2-support/src/mock_io.rs index b556554..8b3c95b 100644 --- a/tests/h2-support/src/mock_io.rs +++ b/tests/h2-support/src/mock_io.rs @@ -74,9 +74,9 @@ #![allow(deprecated)] -use std::{cmp, io}; use std::collections::VecDeque; use std::time::{Duration, Instant}; +use std::{cmp, io}; /// An I/O handle that follows a predefined script. /// @@ -85,13 +85,12 @@ use std::time::{Duration, Instant}; #[derive(Debug)] pub struct Mock { inner: Inner, - tokio: tokio::Inner, - r#async: Option, + tokio: tokio_::Inner, } #[derive(Debug)] pub struct Handle { - inner: tokio::Handle, + inner: tokio_::Handle, } /// Builds `Mock` instances. @@ -99,9 +98,6 @@ pub struct Handle { pub struct Builder { // Sequence of actions for the Mock to take actions: VecDeque, - - // true for Tokio, false for blocking, None to auto detect - r#async: Option, } #[derive(Debug, Clone)] @@ -159,7 +155,7 @@ impl Builder { /// Build a `Mock` value paired with a handle pub fn build_with_handle(&mut self) -> (Mock, Handle) { - let (tokio, handle) = tokio::Inner::new(); + let (tokio, handle) = tokio_::Inner::new(); let src = self.clone(); @@ -169,7 +165,6 @@ impl Builder { waiting: None, }, tokio: tokio, - r#async: src.r#async, }; let handle = Handle { inner: handle }; @@ -198,45 +193,10 @@ impl Handle { } } -impl Mock { - fn sync_read(&mut self, dst: &mut [u8]) -> io::Result { - use std::thread; - - loop { - match self.inner.read(dst) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - if let Some(rem) = self.inner.remaining_wait() { - thread::sleep(rem); - } else { - // We've entered a dead lock scenario. The peer expects - // a write but we are reading. - panic!("mock_io::Mock expects write but currently blocked in read"); - } - } - ret => return ret, - } - } - } - - fn sync_write(&mut self, src: &[u8]) -> io::Result { - match self.inner.write(src) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - panic!("mock_io::Mock not currently expecting a write"); - } - ret => ret, - } - } - - /// Returns `true` if running in a futures-rs task context - fn is_async(&self) -> bool { - self.r#async.unwrap_or(tokio::is_task_ctx()) - } -} - impl Inner { fn read(&mut self, dst: &mut [u8]) -> io::Result { match self.action() { - Some(&mut Action::Read(ref mut data)) =>{ + Some(&mut Action::Read(ref mut data)) => { // Figure out how much to copy let n = cmp::min(dst.len(), data.len()); @@ -253,9 +213,7 @@ impl Inner { // Either waiting or expecting a write Err(io::ErrorKind::WouldBlock.into()) } - None => { - Ok(0) - } + None => Ok(0), } } @@ -347,55 +305,25 @@ impl Inner { } } -impl io::Read for Mock { - fn read(&mut self, dst: &mut [u8]) -> io::Result { - if self.is_async() { - tokio::async_read(self, dst) - } else { - self.sync_read(dst) - } - } -} - -impl io::Write for Mock { - fn write(&mut self, src: &[u8]) -> io::Result { - if self.is_async() { - tokio::async_write(self, src) - } else { - self.sync_write(src) - } - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - // use tokio::*; -mod tokio { +mod tokio_ { use super::*; - use futures::{Future, Stream, Poll, Async}; - use futures::sync::mpsc; - use futures::task::{self, Task}; - use tokio_io::{AsyncRead, AsyncWrite}; - use tokio_timer::{Timer, Sleep}; + use futures::channel::mpsc; + use futures::{ready, FutureExt, Stream}; + use std::task::{Context, Poll, Waker}; + use tokio::io::{AsyncRead, AsyncWrite}; + use tokio::timer::Delay; + + use std::pin::Pin; use std::io; - impl Builder { - pub fn set_async(&mut self, is_async: bool) -> &mut Self { - self.r#async = Some(is_async); - self - } - } - #[derive(Debug)] pub struct Inner { - timer: Timer, - sleep: Option, - read_wait: Option, + sleep: Option, + read_wait: Option, rx: mpsc::UnboundedReceiver, } @@ -408,11 +336,11 @@ mod tokio { impl Handle { pub fn read(&mut self, buf: &[u8]) { - mpsc::UnboundedSender::send(&mut self.tx, Action::Read(buf.into())).unwrap(); + self.tx.unbounded_send(Action::Read(buf.into())).unwrap(); } pub fn write(&mut self, buf: &[u8]) { - mpsc::UnboundedSender::send(&mut self.tx, Action::Write(buf.into())).unwrap(); + self.tx.unbounded_send(Action::Write(buf.into())).unwrap(); } } @@ -420,16 +348,9 @@ mod tokio { impl Inner { pub fn new() -> (Inner, Handle) { - // TODO: We probably want a higher resolution timer. - let timer = tokio_timer::wheel() - .tick_duration(Duration::from_millis(1)) - .max_timeout(Duration::from_secs(3600)) - .build(); - let (tx, rx) = mpsc::unbounded(); let inner = Inner { - timer: timer, sleep: None, read_wait: None, rx: rx, @@ -440,8 +361,8 @@ mod tokio { (inner, handle) } - pub(super) fn poll_action(&mut self) -> Poll, ()> { - self.rx.poll() + pub(super) fn poll_action(&mut self, cx: &mut Context) -> Poll> { + Pin::new(&mut self.rx).poll_next(cx) } } @@ -450,7 +371,7 @@ mod tokio { match self.inner.action() { Some(&mut Action::Read(_)) | None => { if let Some(task) = self.tokio.read_wait.take() { - task.notify(); + task.wake(); } } _ => {} @@ -458,106 +379,113 @@ mod tokio { } } - pub fn async_read(me: &mut Mock, dst: &mut [u8]) -> io::Result { - loop { - if let Some(ref mut sleep) = me.tokio.sleep { - let res = r#try!(sleep.poll()); - - if !res.is_ready() { - return Err(io::ErrorKind::WouldBlock.into()); - } - } - - // If a sleep is set, it has already fired - me.tokio.sleep = None; - - match me.inner.read(dst) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - if let Some(rem) = me.inner.remaining_wait() { - me.tokio.sleep = Some(me.tokio.timer.sleep(rem)); - } else { - me.tokio.read_wait = Some(task::current()); - return Err(io::ErrorKind::WouldBlock.into()); - } - } - Ok(0) => { - // TODO: Extract - match me.tokio.poll_action().unwrap() { - Async::Ready(Some(action)) => { - me.inner.actions.push_back(action); - continue; - } - Async::Ready(None) => { - return Ok(0); - } - Async::NotReady => { - return Err(io::ErrorKind::WouldBlock.into()); - } - } - } - ret => return ret, - } - } - } - - pub fn async_write(me: &mut Mock, src: &[u8]) -> io::Result { - loop { - if let Some(ref mut sleep) = me.tokio.sleep { - let res = r#try!(sleep.poll()); - - if !res.is_ready() { - return Err(io::ErrorKind::WouldBlock.into()); - } - } - - // If a sleep is set, it has already fired - me.tokio.sleep = None; - - match me.inner.write(src) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - if let Some(rem) = me.inner.remaining_wait() { - me.tokio.sleep = Some(me.tokio.timer.sleep(rem)); - } else { - panic!("unexpected WouldBlock"); - } - } - Ok(0) => { - // TODO: Is this correct? - if !me.inner.actions.is_empty() { - return Err(io::ErrorKind::WouldBlock.into()); - } - - // TODO: Extract - match me.tokio.poll_action().unwrap() { - Async::Ready(Some(action)) => { - me.inner.actions.push_back(action); - continue; - } - Async::Ready(None) => { - panic!("unexpected write"); - } - Async::NotReady => { - return Err(io::ErrorKind::WouldBlock.into()); - } - } - } - ret => { - me.maybe_wakeup_reader(); - return ret; - } - } - } - } - impl AsyncRead for Mock { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + loop { + if let Some(sleep) = &mut self.tokio.sleep { + ready!(sleep.poll_unpin(cx)); + } + + // If a sleep is set, it has already fired + self.tokio.sleep = None; + + match self.inner.read(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + if let Some(rem) = self.inner.remaining_wait() { + self.tokio.sleep = Some(Delay::new(Instant::now() + rem)); + } else { + self.tokio.read_wait = Some(cx.waker().clone()); + return Poll::Pending; + } + } + Ok(0) => { + // TODO: Extract + match self.tokio.poll_action(cx) { + Poll::Ready(Some(action)) => { + self.inner.actions.push_back(action); + continue; + } + Poll::Ready(None) => { + return Poll::Ready(Ok(0)); + } + Poll::Pending => { + return Poll::Pending; + } + } + } + ret => return Poll::Ready(ret), + } + } + } } impl AsyncWrite for Mock { - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(Async::Ready(())) + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + loop { + if let Some(sleep) = &mut self.tokio.sleep { + ready!(sleep.poll_unpin(cx)); + } + + // If a sleep is set, it has already fired + self.tokio.sleep = None; + + match self.inner.write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + if let Some(rem) = self.inner.remaining_wait() { + self.tokio.sleep = Some(Delay::new(Instant::now() + rem)); + } else { + panic!("unexpected WouldBlock"); + } + } + Ok(0) => { + // TODO: Is this correct? + if !self.inner.actions.is_empty() { + return Poll::Pending; + } + + // TODO: Extract + match self.tokio.poll_action(cx) { + Poll::Ready(Some(action)) => { + self.inner.actions.push_back(action); + continue; + } + Poll::Ready(None) => { + panic!("unexpected write"); + } + Poll::Pending => return Poll::Pending, + } + } + ret => { + self.maybe_wakeup_reader(); + return Poll::Ready(ret); + } + } + } + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } } + /* + TODO: Is this required? + /// Returns `true` if called from the context of a futures-rs Task pub fn is_task_ctx() -> bool { use std::panic; @@ -577,4 +505,5 @@ mod tokio { // Return the result r } + */ } diff --git a/tests/h2-support/src/notify.rs b/tests/h2-support/src/notify.rs index 6f3e96a..783297a 100644 --- a/tests/h2-support/src/notify.rs +++ b/tests/h2-support/src/notify.rs @@ -1,5 +1,3 @@ -use futures::executor::{self, Notify}; - use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::SeqCst; @@ -21,22 +19,22 @@ impl MockNotify { } } - pub fn with R, R>(&self, f: F) -> R { - use futures::Async::Ready; - use futures::future::poll_fn; + pub fn with R, R>(&self, _f: F) -> R { + unimplemented!(); + // use futures::future::poll_fn; - self.clear(); + // self.clear(); - let mut f = Some(f); + // let mut f = Some(f); - let res = executor::spawn(poll_fn(move || { - Ok::<_, ()>(Ready(f.take().unwrap()())) - })).poll_future_notify(&self.inner, 0); + // let res = tokio::spawn(poll_fn(move |cx| { + // Poll::Ready(f.take().unwrap()()) + // })).poll_future_notify(&self.inner, 0); - match res { - Ok(Ready(v)) => v, - _ => unreachable!(), - } + // match res { + // Poll::Ready(v) => v, + // _ => unreachable!(), + // } } pub fn clear(&self) { @@ -48,8 +46,8 @@ impl MockNotify { } } -impl Notify for Inner { - fn notify(&self, _: usize) { - self.notified.store(true, SeqCst); - } -} +// impl Notify for Inner { +// fn notify(&self, _: usize) { +// self.notified.store(true, SeqCst); +// } +// } diff --git a/tests/h2-support/src/prelude.rs b/tests/h2-support/src/prelude.rs index 16f3bd5..d0d5630 100644 --- a/tests/h2-support/src/prelude.rs +++ b/tests/h2-support/src/prelude.rs @@ -1,14 +1,13 @@ - // Re-export H2 crate pub use h2; -pub use h2::*; pub use h2::client; pub use h2::frame::StreamId; pub use h2::server; +pub use h2::*; // Re-export mock -pub use super::mock::{self, HandleFutureExt}; +pub use super::mock::{self, idle_ms}; // Re-export frames helpers pub use super::frames; @@ -23,28 +22,32 @@ pub use super::util; pub use super::{Codec, SendFrame}; // Re-export macros -pub use super::{assert_ping, assert_data, assert_headers, assert_closed, - raw_codec, poll_frame, poll_err}; +pub use super::{ + assert_closed, assert_data, assert_default_settings, assert_headers, assert_ping, poll_err, + poll_frame, raw_codec, +}; + +pub use super::assert::assert_frame_eq; // Re-export useful crates -pub use {bytes, env_logger, futures, http, tokio_io}; pub use super::mock_io; +pub use {bytes, env_logger, futures, http, tokio::io as tokio_io}; // Re-export primary future types -pub use futures::{Future, IntoFuture, Sink, Stream}; +pub use futures::{Future, Sink, Stream}; // And our Future extensions -pub use super::future_ext::{FutureExt, Unwrap}; +pub use super::future_ext::TestFuture; // Our client_ext helpers -pub use super::client_ext::{SendRequestExt}; +pub use super::client_ext::SendRequestExt; // Re-export HTTP types pub use http::{uri, HeaderMap, Method, Request, Response, StatusCode, Version}; pub use bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; -pub use tokio_io::{AsyncRead, AsyncWrite}; +pub use tokio::io::{AsyncRead, AsyncWrite}; pub use std::thread; pub use std::time::Duration; @@ -52,7 +55,10 @@ pub use std::time::Duration; // ===== Everything under here shouldn't be used ===== // TODO: work on deleting this code +use futures::future; pub use futures::future::poll_fn; +use futures::future::Either::*; +use std::pin::Pin; pub trait MockH2 { fn handshake(&mut self) -> &mut Self; @@ -69,29 +75,33 @@ impl MockH2 for super::mock_io::Builder { } pub trait ClientExt { - fn run(&mut self, f: F) -> Result; + fn run<'a, F: Future + Unpin + 'a>( + &'a mut self, + f: F, + ) -> Pin + 'a>>; } impl ClientExt for client::Connection where - T: AsyncRead + AsyncWrite + 'static, - B: IntoBuf + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, + B: IntoBuf + Unpin + 'static, + B::Buf: Unpin, { - fn run(&mut self, f: F) -> Result { - use futures::future; - use futures::future::Either::*; - - let res = future::poll_fn(|| self.poll()).select2(f).wait(); - - match res { - Ok(A((_, b))) => { - // Connection is done... - b.wait() - }, - Ok(B((v, _))) => return Ok(v), - Err(A((e, _))) => panic!("err: {:?}", e), - Err(B((e, _))) => return Err(e), - } + fn run<'a, F: Future + Unpin + 'a>( + &'a mut self, + f: F, + ) -> Pin + 'a>> { + let res = future::select(self, f); + Box::pin(async { + match res.await { + Left((Ok(_), b)) => { + // Connection is done... + b.await + } + Right((v, _)) => return v, + Left((Err(e), _)) => panic!("err: {:?}", e), + } + }) } } diff --git a/tests/h2-support/src/util.rs b/tests/h2-support/src/util.rs index 4c1a705..b854e6e 100644 --- a/tests/h2-support/src/util.rs +++ b/tests/h2-support/src/util.rs @@ -1,24 +1,28 @@ use h2; -use string::{String, TryFrom}; use bytes::Bytes; -use futures::{Async, Future, Poll}; +use futures::ready; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use string::{String, TryFrom}; pub fn byte_str(s: &str) -> String { String::try_from(Bytes::from(s)).unwrap() } -pub fn yield_once() -> impl Future { +pub async fn yield_once() { let mut yielded = false; - futures::future::poll_fn(move || { + futures::future::poll_fn(move |cx| { if yielded { - Ok(Async::Ready(())) + Poll::Ready(()) } else { yielded = true; - futures::task::current().notify(); - Ok(Async::NotReady) + cx.waker().clone().wake(); + Poll::Pending } }) + .await; } pub fn wait_for_capacity(stream: h2::SendStream, target: usize) -> WaitForCapacity { @@ -40,18 +44,17 @@ impl WaitForCapacity { } impl Future for WaitForCapacity { - type Item = h2::SendStream; - type Error = (); + type Output = h2::SendStream; - fn poll(&mut self) -> Poll { - let _ = futures::try_ready!(self.stream().poll_capacity().map_err(|_| panic!())); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let _ = ready!(self.stream().poll_capacity(cx)).unwrap(); let act = self.stream().capacity(); if act >= self.target { - return Ok(self.stream.take().unwrap().into()); + return Poll::Ready(self.stream.take().unwrap().into()); } - Ok(Async::NotReady) + Poll::Pending } }