diff --git a/Cargo.toml b/Cargo.toml index eb551cd..5569b07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,9 +44,12 @@ members = [ ] [dependencies] -futures-preview = "=0.3.0-alpha.18" -tokio-io = { version = "=0.2.0-alpha.4", features = ["util"] } +futures-core-preview = "=0.3.0-alpha.18" +futures-sink-preview = "=0.3.0-alpha.18" +futures-util-preview = "=0.3.0-alpha.18" tokio-codec = "=0.2.0-alpha.4" +tokio-io = { version = "=0.2.0-alpha.4", features = ["util"] } +tokio-sync = "=0.2.0-alpha.4" bytes = "0.4.7" http = "0.1.8" log = "0.4.1" diff --git a/src/client.rs b/src/client.rs index ec2d1fe..81ac093 100644 --- a/src/client.rs +++ b/src/client.rs @@ -141,7 +141,6 @@ use crate::proto; use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream}; use bytes::{Bytes, IntoBuf}; -use futures::{ready, FutureExt}; use http::{uri, HeaderMap, Method, Request, Response, Version}; use std::fmt; use std::future::Future; @@ -1282,7 +1281,7 @@ impl ResponseFuture { impl PushPromises { /// Get the next `PushPromise`. pub async fn push_promise(&mut self) -> Option> { - futures::future::poll_fn(move |cx| self.poll_push_promise(cx)).await + futures_util::future::poll_fn(move |cx| self.poll_push_promise(cx)).await } #[doc(hidden)] @@ -1308,7 +1307,7 @@ impl PushPromises { } #[cfg(feature = "stream")] -impl futures::Stream for PushPromises { +impl futures_core::Stream for PushPromises { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -1342,7 +1341,7 @@ impl Future for PushedResponseFuture { type Output = Result, crate::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.inner.poll_unpin(cx) + Pin::new(&mut self.inner).poll(cx) } } diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index 88d1007..84fe305 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -6,7 +6,7 @@ use crate::frame::{ use crate::hpack; -use futures::{ready, Stream}; +use futures_core::Stream; use bytes::BytesMut; diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index cfcbdd4..62a3156 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -4,7 +4,6 @@ use crate::frame::{self, Frame, FrameSize}; use crate::hpack; use bytes::{Buf, BufMut, BytesMut}; -use futures::ready; use std::pin::Pin; use std::task::{Context, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 243d280..b4cb077 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -9,9 +9,9 @@ use self::framed_write::FramedWrite; use crate::frame::{self, Data, Frame}; -use futures::*; - use bytes::Buf; +use futures_core::Stream; +use futures_sink::Sink; use std::pin::Pin; use std::task::{Context, Poll}; use tokio_codec::length_delimited; diff --git a/src/lib.rs b/src/lib.rs index 44fdb18..bfd433f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -91,6 +91,15 @@ macro_rules! proto_err { }; } +macro_rules! ready { + ($e:expr) => { + match $e { + ::std::task::Poll::Ready(r) => r, + ::std::task::Poll::Pending => return ::std::task::Poll::Pending, + } + }; +} + #[cfg_attr(feature = "unstable", allow(missing_docs))] mod codec; mod error; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 332bcc9..434b4ad 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -6,7 +6,7 @@ use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE; use crate::proto::*; use bytes::{Bytes, IntoBuf}; -use futures::{ready, Stream}; +use futures_core::Stream; use std::io; use std::marker::PhantomData; use std::pin::Pin; diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index 0dbbec2..59b7000 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -3,12 +3,12 @@ use crate::frame::Ping; use crate::proto::{self, PingPayload}; use bytes::Buf; -use futures::task::AtomicWaker; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use tokio_io::AsyncWrite; +use tokio_sync::AtomicWaker; /// Acknowledges ping requests from the remote. #[derive(Debug)] @@ -190,7 +190,7 @@ impl PingPong { .state .store(USER_STATE_PENDING_PONG, Ordering::Release); } else { - users.0.ping_task.register(cx.waker()); + users.0.ping_task.register_by_ref(cx.waker()); } } @@ -233,7 +233,7 @@ impl UserPings { pub(crate) fn poll_pong(&self, cx: &mut Context) -> Poll> { // Must register before checking state, in case state were to change // before we could register, and then the ping would just be lost. - self.0.pong_task.register(cx.waker()); + self.0.pong_task.register_by_ref(cx.waker()); let prev = self.0.state.compare_and_swap( USER_STATE_RECEIVED_PONG, // current USER_STATE_EMPTY, // new diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 8aa8c5e..ce39b76 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -7,7 +7,6 @@ use crate::codec::UserError; use crate::codec::UserError::*; use bytes::buf::Take; -use futures::ready; use std::io; use std::task::{Context, Poll, Waker}; use std::{cmp, fmt, mem}; diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 31c264c..27e8049 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -4,7 +4,6 @@ use crate::frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; use crate::{frame, proto}; use std::task::Context; -use futures::ready; use http::{HeaderMap, Method, Request, Response}; use std::io; diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index f7319f3..4662195 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -7,7 +7,6 @@ use crate::proto::{peer, Open, Peer, WindowSize}; use crate::{client, proto, server}; use bytes::{Buf, Bytes}; -use futures::ready; use http::{HeaderMap, Request, Response}; use std::task::{Context, Poll, Waker}; use tokio_io::AsyncWrite; diff --git a/src/server.rs b/src/server.rs index 924e8ce..17f41e4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -121,7 +121,6 @@ use crate::proto::{self, Config, Prioritized}; use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream}; use bytes::{Buf, Bytes, IntoBuf}; -use futures::ready; use http::{HeaderMap, Request, Response}; use std::future::Future; use std::pin::Pin; @@ -363,7 +362,7 @@ where pub async fn accept( &mut self, ) -> Option, SendResponse), crate::Error>> { - futures::future::poll_fn(move |cx| self.poll_accept(cx)).await + futures_util::future::poll_fn(move |cx| self.poll_accept(cx)).await } #[doc(hidden)] @@ -479,7 +478,7 @@ where } #[cfg(feature = "stream")] -impl futures::Stream for Connection +impl futures_core::Stream for Connection where T: AsyncRead + AsyncWrite + Unpin, B: IntoBuf + Unpin, diff --git a/src/share.rs b/src/share.rs index 8ec8529..d6dfa72 100644 --- a/src/share.rs +++ b/src/share.rs @@ -6,7 +6,6 @@ use bytes::{Bytes, IntoBuf}; use http::HeaderMap; use crate::PollExt; -use futures::ready; use std::fmt; #[cfg(feature = "stream")] use std::pin::Pin; @@ -272,9 +271,6 @@ impl SendStream { /// # async fn doc(mut send_stream: SendStream<&'static [u8]>) { /// send_stream.reserve_capacity(100); /// - /// let capacity = futures::future::poll_fn(|cx| send_stream.poll_capacity(cx)).await; - /// // capacity == 5; - /// /// send_stream.send_data(b"hello", false).unwrap(); /// // At this point, the total amount of requested capacity is 95 bytes. /// @@ -417,12 +413,12 @@ impl RecvStream { /// Get the next data frame. pub async fn data(&mut self) -> Option> { - futures::future::poll_fn(move |cx| self.poll_data(cx)).await + futures_util::future::poll_fn(move |cx| self.poll_data(cx)).await } /// Get optional trailers for this stream. pub async fn trailers(&mut self) -> Result, crate::Error> { - futures::future::poll_fn(move |cx| self.poll_trailers(cx)).await + futures_util::future::poll_fn(move |cx| self.poll_trailers(cx)).await } #[doc(hidden)] @@ -453,7 +449,7 @@ impl RecvStream { } #[cfg(feature = "stream")] -impl futures::Stream for RecvStream { +impl futures_core::Stream for RecvStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -538,21 +534,13 @@ impl PingPong { PingPong { inner } } - /// Send a `PING` frame to the peer. - /// - /// Only one ping can be pending at a time, so trying to send while - /// a pong has not be received means this will return a user error. - /// - /// # Example - /// - /// ``` - /// # fn doc(mut ping_pong: h2::PingPong) { - /// // let mut ping_pong = ... - /// ping_pong - /// .send_ping(h2::Ping::opaque()) - /// .unwrap(); - /// # } - /// ``` + /// Send a PING frame and wait for the peer to send the pong. + pub async fn ping(&mut self, ping: Ping) -> Result { + self.send_ping(ping)?; + futures_util::future::poll_fn(|cx| self.poll_pong(cx)).await + } + + #[doc(hidden)] pub fn send_ping(&mut self, ping: Ping) -> Result<(), crate::Error> { // Passing a `Ping` here is just to be forwards-compatible with // eventually allowing choosing a ping payload. For now, we can @@ -565,28 +553,7 @@ impl PingPong { }) } - /// Polls for the acknowledgement of a previously [sent][] `PING` frame. - /// - /// # Example - /// - /// ``` - /// # async fn doc(mut ping_pong: h2::PingPong) { - /// // let mut ping_pong = ... - /// - /// // First, send a PING. - /// ping_pong - /// .send_ping(h2::Ping::opaque()) - /// .unwrap(); - /// - /// // And then wait for the PONG. - /// futures::future::poll_fn(move |cx| { - /// ping_pong.poll_pong(cx) - /// }).await.unwrap(); - /// # } - /// # fn main() {} - /// ``` - /// - /// [sent]: struct.PingPong.html#method.send_ping + #[doc(hidden)] pub fn poll_pong(&mut self, cx: &mut Context) -> Poll> { ready!(self.inner.poll_pong(cx))?; Poll::Ready(Ok(Pong { _p: () }))