diff --git a/.travis.yml b/.travis.yml index c8c19d9..bd3a531 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,10 +15,10 @@ addons: matrix: include: - rust: nightly - - rust: stable +# - rust: stable before_deploy: cargo doc --no-deps - allow_failures: - - rust: nightly +# allow_failures: +# - rust: nightly before_script: - cargo clean @@ -39,8 +39,8 @@ script: # Run integration tests - cargo test -p h2-tests - # Run h2spec on stable - - if [ "${TRAVIS_RUST_VERSION}" = "stable" ]; then ./ci/h2spec.sh; fi + # Run h2spec on nightly for the time being. TODO: Change it to stable after Rust 1.38 release + - if [ "${TRAVIS_RUST_VERSION}" = "nightly" ]; then ./ci/h2spec.sh; fi # Check minimal versions - if [ "${TRAVIS_RUST_VERSION}" = "nightly" ]; then cargo clean; cargo check -Z minimal-versions; fi diff --git a/Cargo.toml b/Cargo.toml index bc60a99..0a280f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,8 +41,9 @@ members = [ ] [dependencies] -futures = "0.1" -tokio-io = "0.1.4" +futures-preview = "0.3.0-alpha.18" +tokio-io = { git = "https://github.com/tokio-rs/tokio" } +tokio-codec = { git = "https://github.com/tokio-rs/tokio" } bytes = "0.4.7" http = "0.1.8" log = "0.4.1" @@ -64,7 +65,7 @@ serde = "1.0.0" serde_json = "1.0.0" # Akamai example -tokio = "0.1.8" +tokio = { git = "https://github.com/tokio-rs/tokio" } env_logger = { version = "0.5.3", default-features = false } rustls = "0.12" tokio-rustls = "0.5.0" diff --git a/src/client.rs b/src/client.rs index 17375b3..5854611 100644 --- a/src/client.rs +++ b/src/client.rs @@ -64,72 +64,60 @@ //! //! # Example //! -//! ```rust +//! ```rust, no_run +//! #![feature(async_await)] +//! //! use h2::client; //! //! use futures::*; -//! use http::*; -//! +//! use http::{Request, Method}; +//! use std::error::Error; //! use tokio::net::TcpStream; //! -//! pub fn main() { +//! #[tokio::main] +//! pub async fn main() -> Result<(), Box> { //! let addr = "127.0.0.1:5928".parse().unwrap(); +//! +//! // Establish TCP connection to the server. +//! let tcp = TcpStream::connect(&addr).await?; +//! let (h2, connection) = client::handshake(tcp).await?; +//! tokio::spawn(async move { +//! connection.await.unwrap(); +//! }); //! -//! tokio::run( -//! // Establish TCP connection to the server. -//! TcpStream::connect(&addr) -//! .map_err(|_| { -//! panic!("failed to establish TCP connection") -//! }) -//! .and_then(|tcp| client::handshake(tcp)) -//! .and_then(|(h2, connection)| { -//! let connection = connection -//! .map_err(|_| panic!("HTTP/2.0 connection failed")); -//! -//! // Spawn a new task to drive the connection state -//! tokio::spawn(connection); -//! -//! // Wait until the `SendRequest` handle has available -//! // capacity. -//! h2.ready() -//! }) -//! .and_then(|mut h2| { -//! // Prepare the HTTP request to send to the server. -//! let request = Request::builder() +//! let mut h2 = h2.ready().await?; +//! // Prepare the HTTP request to send to the server. +//! let request = Request::builder() //! .method(Method::GET) //! .uri("https://www.example.com/") //! .body(()) //! .unwrap(); //! -//! // Send the request. The second tuple item allows the caller -//! // to stream a request body. -//! let (response, _) = h2.send_request(request, true).unwrap(); +//! // Send the request. The second tuple item allows the caller +//! // to stream a request body. +//! let (response, _) = h2.send_request(request, true).unwrap(); +//! +//! let (head, mut body) = response.await?.into_parts(); //! -//! response.and_then(|response| { -//! let (head, mut body) = response.into_parts(); +//! println!("Received response: {:?}", head); //! -//! println!("Received response: {:?}", head); +//! // The `release_capacity` handle allows the caller to manage +//! // flow control. +//! // +//! // Whenever data is received, the caller is responsible for +//! // releasing capacity back to the server once it has freed +//! // the data from memory. +//! let mut release_capacity = body.release_capacity().clone(); //! -//! // The `release_capacity` handle allows the caller to manage -//! // flow control. -//! // -//! // Whenever data is received, the caller is responsible for -//! // releasing capacity back to the server once it has freed -//! // the data from memory. -//! let mut release_capacity = body.release_capacity().clone(); +//! while let Some(chunk) = body.next().await { +//! let chunk = chunk?; +//! println!("RX: {:?}", chunk); //! -//! body.for_each(move |chunk| { -//! println!("RX: {:?}", chunk); +//! // Let the server send more data. +//! let _ = release_capacity.release_capacity(chunk.len()); +//! } //! -//! // Let the server send more data. -//! let _ = release_capacity.release_capacity(chunk.len()); -//! -//! Ok(()) -//! }) -//! }) -//! }) -//! .map_err(|e| panic!("failed to perform HTTP/2.0 request: {:?}", e)) -//! ) +//! Ok(()) //! } //! ``` //! @@ -151,21 +139,23 @@ //! [`Builder`]: struct.Builder.html //! [`Error`]: ../struct.Error.html -use crate::{SendStream, RecvStream, ReleaseCapacity, PingPong}; use crate::codec::{Codec, RecvError, SendError, UserError}; use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; use crate::proto; +use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream}; use bytes::{Bytes, IntoBuf}; -use futures::{Async, Future, Poll, Stream, try_ready}; -use http::{uri, HeaderMap, Request, Response, Method, Version}; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::io::WriteAll; - +use futures::{ready, FutureExt, Stream}; +use http::{uri, HeaderMap, Method, Request, Response, Version}; use std::fmt; +use std::future::Future; +use std::io; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use std::usize; +use tokio_io::{AsyncRead, AsyncWrite, AsyncWriteExt}; /// Performs the HTTP/2.0 connection handshake. /// @@ -182,9 +172,9 @@ use std::usize; /// /// [module]: index.html #[must_use = "futures do nothing unless polled"] -pub struct Handshake { +pub struct Handshake<'a, T, B = Bytes> { builder: Builder, - inner: WriteAll, + inner: Pin> + 'a>>, _marker: PhantomData, } @@ -246,31 +236,20 @@ pub struct ReadySendRequest { /// # Examples /// /// ``` -/// # use futures::{Future, Stream}; -/// # use futures::future::Executor; +/// #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client; /// # use h2::client::*; /// # -/// # fn doc(my_io: T, my_executor: E) -/// # where T: AsyncRead + AsyncWrite + 'static, -/// # E: Executor>>, +/// # async fn doc(my_io: T) +/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, /// # { -/// client::handshake(my_io) -/// .and_then(|(send_request, connection)| { -/// // Submit the connection handle to an executor. -/// my_executor.execute( -/// # Box::new( -/// connection.map_err(|_| panic!("connection failed")) -/// # ) -/// ).unwrap(); +/// let (send_request, connection) = client::handshake(my_io).await.unwrap(); +/// // Submit the connection handle to an executor. +/// tokio::spawn(async { connection.await.expect("connection failed"); }); /// -/// // Now, use `send_request` to initialize HTTP/2.0 streams. -/// // ... -/// # drop(send_request); -/// # Ok(()) -/// }) -/// # .wait().unwrap(); +/// // Now, use `send_request` to initialize HTTP/2.0 streams. +/// // ... /// # } /// # /// # pub fn main() {} @@ -338,8 +317,8 @@ pub struct PushPromises { /// # use tokio_io::*; /// # use h2::client::*; /// # -/// # fn doc(my_io: T) -/// # -> Handshake +/// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) +/// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -384,23 +363,23 @@ pub(crate) struct Peer; impl SendRequest where - B: IntoBuf, - B::Buf: 'static, + B: IntoBuf + Unpin, + B::Buf: Unpin + 'static, { /// Returns `Ready` when the connection can initialize a new HTTP/2.0 /// stream. /// /// This function must return `Ready` before `send_request` is called. When - /// `NotReady` is returned, the task will be notified once the readiness + /// `Poll::Pending` is returned, the task will be notified once the readiness /// state changes. /// /// See [module] level docs for more details. /// /// [module]: index.html - pub fn poll_ready(&mut self) -> Poll<(), crate::Error> { - try_ready!(self.inner.poll_pending_open(self.pending.as_ref())); + pub fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?; self.pending = None; - Ok(().into()) + Poll::Ready(Ok(())) } /// Consumes `self`, returning a future that returns `self` back once it is @@ -415,19 +394,15 @@ where /// # Examples /// /// ```rust - /// # use futures::*; + /// #![feature(async_await)] /// # use h2::client::*; /// # use http::*; - /// # fn doc(send_request: SendRequest<&'static [u8]>) + /// # async fn doc(send_request: SendRequest<&'static [u8]>) /// # { /// // First, wait until the `send_request` handle is ready to send a new /// // request - /// send_request.ready() - /// .and_then(|mut send_request| { - /// // Use `send_request` here. - /// # Ok(()) - /// }) - /// # .wait().unwrap(); + /// let mut send_request = send_request.ready().await.unwrap(); + /// // Use `send_request` here. /// # } /// # pub fn main() {} /// ``` @@ -479,32 +454,24 @@ where /// Sending a request with no body /// /// ```rust - /// # use futures::*; + /// #![feature(async_await)] /// # use h2::client::*; /// # use http::*; - /// # fn doc(send_request: SendRequest<&'static [u8]>) + /// # async fn doc(send_request: SendRequest<&'static [u8]>) /// # { /// // First, wait until the `send_request` handle is ready to send a new /// // request - /// send_request.ready() - /// .and_then(|mut send_request| { - /// // Prepare the HTTP request to send to the server. - /// let request = Request::get("https://www.example.com/") - /// .body(()) - /// .unwrap(); + /// let mut send_request = send_request.ready().await.unwrap(); + /// // Prepare the HTTP request to send to the server. + /// let request = Request::get("https://www.example.com/") + /// .body(()) + /// .unwrap(); /// - /// // Send the request to the server. Since we are not sending a - /// // body or trailers, we can drop the `SendStream` instance. - /// let (response, _) = send_request - /// .send_request(request, true).unwrap(); - /// - /// response - /// }) - /// .and_then(|response| { - /// // Process the response - /// # Ok(()) - /// }) - /// # .wait().unwrap(); + /// // Send the request to the server. Since we are not sending a + /// // body or trailers, we can drop the `SendStream` instance. + /// let (response, _) = send_request.send_request(request, true).unwrap(); + /// let response = response.await.unwrap(); + /// // Process the response /// # } /// # pub fn main() {} /// ``` @@ -512,48 +479,43 @@ where /// Sending a request with a body and trailers /// /// ```rust - /// # use futures::*; + /// #![feature(async_await)] /// # use h2::client::*; /// # use http::*; - /// # fn doc(send_request: SendRequest<&'static [u8]>) + /// # async fn doc(send_request: SendRequest<&'static [u8]>) /// # { /// // First, wait until the `send_request` handle is ready to send a new /// // request - /// send_request.ready() - /// .and_then(|mut send_request| { - /// // Prepare the HTTP request to send to the server. - /// let request = Request::get("https://www.example.com/") - /// .body(()) - /// .unwrap(); + /// let mut send_request = send_request.ready().await.unwrap(); /// - /// // Send the request to the server. If we are not sending a - /// // body or trailers, we can drop the `SendStream` instance. - /// let (response, mut send_stream) = send_request - /// .send_request(request, false).unwrap(); + /// // Prepare the HTTP request to send to the server. + /// let request = Request::get("https://www.example.com/") + /// .body(()) + /// .unwrap(); /// - /// // At this point, one option would be to wait for send capacity. - /// // Doing so would allow us to not hold data in memory that - /// // cannot be sent. However, this is not a requirement, so this - /// // example will skip that step. See `SendStream` documentation - /// // for more details. - /// send_stream.send_data(b"hello", false).unwrap(); - /// send_stream.send_data(b"world", false).unwrap(); + /// // Send the request to the server. If we are not sending a + /// // body or trailers, we can drop the `SendStream` instance. + /// let (response, mut send_stream) = send_request + /// .send_request(request, false).unwrap(); /// - /// // Send the trailers. - /// let mut trailers = HeaderMap::new(); - /// trailers.insert( - /// header::HeaderName::from_bytes(b"my-trailer").unwrap(), - /// header::HeaderValue::from_bytes(b"hello").unwrap()); + /// // At this point, one option would be to wait for send capacity. + /// // Doing so would allow us to not hold data in memory that + /// // cannot be sent. However, this is not a requirement, so this + /// // example will skip that step. See `SendStream` documentation + /// // for more details. + /// send_stream.send_data(b"hello", false).unwrap(); + /// send_stream.send_data(b"world", false).unwrap(); /// - /// send_stream.send_trailers(trailers).unwrap(); + /// // Send the trailers. + /// let mut trailers = HeaderMap::new(); + /// trailers.insert( + /// header::HeaderName::from_bytes(b"my-trailer").unwrap(), + /// header::HeaderValue::from_bytes(b"hello").unwrap()); /// - /// response - /// }) - /// .and_then(|response| { - /// // Process the response - /// # Ok(()) - /// }) - /// # .wait().unwrap(); + /// send_stream.send_trailers(trailers).unwrap(); + /// + /// let response = response.await.unwrap(); + /// // Process the response /// # } /// # pub fn main() {} /// ``` @@ -634,21 +596,21 @@ where // ===== impl ReadySendRequest ===== impl Future for ReadySendRequest -where B: IntoBuf, - B::Buf: 'static, +where + B: IntoBuf + Unpin, + B::Buf: Unpin + 'static, { - type Item = SendRequest; - type Error = crate::Error; + type Output = Result, crate::Error>; - fn poll(&mut self) -> Poll { - match self.inner { - Some(ref mut send_request) => { - let _ = try_ready!(send_request.poll_ready()); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match &mut self.inner { + Some(send_request) => { + let _ = ready!(send_request.poll_ready(cx))?; } None => panic!("called `poll` after future completed"), } - Ok(self.inner.take().unwrap().into()) + Poll::Ready(Ok(self.inner.take().unwrap())) } } @@ -666,8 +628,8 @@ impl Builder { /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -707,8 +669,8 @@ impl Builder { /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -741,8 +703,8 @@ impl Builder { /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -774,8 +736,8 @@ impl Builder { /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -813,8 +775,8 @@ impl Builder { /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -861,8 +823,8 @@ impl Builder { /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -901,8 +863,8 @@ impl Builder { /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -945,8 +907,8 @@ impl Builder { /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -990,8 +952,8 @@ impl Builder { /// # use h2::client::*; /// # use std::time::Duration; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -1027,8 +989,8 @@ impl Builder { /// # use h2::client::*; /// # use std::time::Duration; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -1081,8 +1043,8 @@ impl Builder { /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -1101,8 +1063,8 @@ impl Builder { /// # use tokio_io::*; /// # use h2::client::*; /// # - /// # fn doc(my_io: T) - /// # -> Handshake + /// # fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) + /// # -> Handshake<'a, T, &'static [u8]> /// # { /// // `client_fut` is a future representing the completion of the HTTP/2.0 /// // handshake. @@ -1113,11 +1075,11 @@ impl Builder { /// # /// # pub fn main() {} /// ``` - pub fn handshake(&self, io: T) -> Handshake + pub fn handshake<'a, T, B>(&self, io: T) -> Handshake<'a, T, B> where - T: AsyncRead + AsyncWrite, - B: IntoBuf, - B::Buf: 'static, + T: AsyncRead + AsyncWrite + Unpin + 'a, + B: IntoBuf + Unpin, + B::Buf: Unpin + 'static, { Connection::handshake2(io, self.clone()) } @@ -1149,45 +1111,41 @@ impl Default for Builder { /// # Examples /// /// ``` -/// # use futures::*; +/// #![feature(async_await)] /// # use tokio_io::*; /// # use h2::client; /// # use h2::client::*; /// # -/// # fn doc(my_io: T) +/// # async fn doc<'a, T: AsyncRead + AsyncWrite + Unpin + 'a>(my_io: T) /// # { -/// client::handshake(my_io) -/// .and_then(|(send_request, connection)| { -/// // The HTTP/2.0 handshake has completed, now start polling -/// // `connection` and use `send_request` to send requests to the -/// // server. -/// # Ok(()) -/// }) -/// # .wait().unwrap(); +/// let (send_request, connection) = client::handshake(my_io).await.unwrap(); +/// // The HTTP/2.0 handshake has completed, now start polling +/// // `connection` and use `send_request` to send requests to the +/// // server. /// # } /// # /// # pub fn main() {} /// ``` -pub fn handshake(io: T) -> Handshake -where T: AsyncRead + AsyncWrite, +pub fn handshake<'a, T>(io: T) -> Handshake<'a, T, Bytes> +where + T: AsyncRead + AsyncWrite + Unpin + 'a, { Builder::new().handshake(io) } // ===== impl Connection ===== -impl Connection +impl<'a, T, B> Connection where - T: AsyncRead + AsyncWrite, - B: IntoBuf, + T: AsyncRead + AsyncWrite + Unpin + 'a, + B: IntoBuf + Unpin, + B::Buf: Unpin, { - fn handshake2(io: T, builder: Builder) -> Handshake { - use tokio_io::io; - + fn handshake2(mut io: T, builder: Builder) -> Handshake<'a, T, B> { log::debug!("binding client connection"); let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; - let handshake = io::write_all(io, msg); + let handshake = Box::pin(async move { io.write_all(msg).await.map(|_| io) }); Handshake { builder, @@ -1224,23 +1182,21 @@ where /// /// This may only be called once. Calling multiple times will return `None`. pub fn ping_pong(&mut self) -> Option { - self.inner - .take_user_pings() - .map(PingPong::new) + self.inner.take_user_pings().map(PingPong::new) } } impl Future for Connection where - T: AsyncRead + AsyncWrite, - B: IntoBuf, + T: AsyncRead + AsyncWrite + Unpin, + B: IntoBuf + Unpin, + B::Buf: Unpin, { - type Item = (); - type Error = crate::Error; + type Output = Result<(), crate::Error>; - fn poll(&mut self) -> Poll<(), crate::Error> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.inner.maybe_close_connection_if_no_streams(); - self.inner.poll().map_err(Into::into) + self.inner.poll(cx).map_err(Into::into) } } @@ -1258,20 +1214,16 @@ where // ===== impl Handshake ===== -impl Future for Handshake +impl<'a, T, B> Future for Handshake<'_, T, B> where - T: AsyncRead + AsyncWrite, - B: IntoBuf, - B::Buf: 'static, + T: AsyncRead + AsyncWrite + Unpin + 'a, + B: IntoBuf + Unpin, + B::Buf: Unpin + 'static, { - type Item = (SendRequest, Connection); - type Error = crate::Error; + type Output = Result<(SendRequest, Connection), crate::Error>; - fn poll(&mut self) -> Poll { - let res = self.inner.poll() - .map_err(crate::Error::from); - - let (io, _) = try_ready!(res); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let io = ready!(self.inner.poll_unpin(cx))?; log::debug!("client connection bound"); @@ -1291,13 +1243,16 @@ where .buffer(self.builder.settings.clone().into()) .expect("invalid SETTINGS frame"); - let inner = proto::Connection::new(codec, proto::Config { - next_stream_id: self.builder.stream_id, - initial_max_send_streams: self.builder.initial_max_send_streams, - reset_stream_duration: self.builder.reset_stream_duration, - reset_stream_max: self.builder.reset_stream_max, - settings: self.builder.settings.clone(), - }); + let inner = proto::Connection::new( + codec, + proto::Config { + next_stream_id: self.builder.stream_id, + initial_max_send_streams: self.builder.initial_max_send_streams, + reset_stream_duration: self.builder.reset_stream_duration, + reset_stream_max: self.builder.reset_stream_max, + settings: self.builder.settings.clone(), + }, + ); let send_request = SendRequest { inner: inner.streams().clone(), pending: None, @@ -1308,11 +1263,11 @@ where connection.set_target_window_size(sz); } - Ok(Async::Ready((send_request, connection))) + Poll::Ready(Ok((send_request, connection))) } } -impl fmt::Debug for Handshake +impl fmt::Debug for Handshake<'_, T, B> where T: AsyncRead + AsyncWrite, T: fmt::Debug, @@ -1327,14 +1282,13 @@ where // ===== impl ResponseFuture ===== impl Future for ResponseFuture { - type Item = Response; - type Error = crate::Error; + type Output = Result, crate::Error>; - fn poll(&mut self) -> Poll { - let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts(); let body = RecvStream::new(ReleaseCapacity::new(self.inner.clone())); - Ok(Response::from_parts(parts, body).into()) + Poll::Ready(Ok(Response::from_parts(parts, body).into())) } } @@ -1358,27 +1312,31 @@ impl ResponseFuture { panic!("Reference to push promises stream taken!"); } self.push_promise_consumed = true; - PushPromises { inner: self.inner.clone() } + PushPromises { + inner: self.inner.clone(), + } } } // ===== impl PushPromises ===== impl Stream for PushPromises { - type Item = PushPromise; - type Error = crate::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - match try_ready!(self.inner.poll_pushed()) { - Some((request, response)) => { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.inner.poll_pushed(cx) { + Poll::Ready(Some(Ok((request, response)))) => { let response = PushedResponseFuture { inner: ResponseFuture { - inner: response, push_promise_consumed: false - } + inner: response, + push_promise_consumed: false, + }, }; - Ok(Async::Ready(Some(PushPromise{request, response}))) + Poll::Ready(Some(Ok(PushPromise { request, response }))) } - None => Ok(Async::Ready(None)), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } } } @@ -1406,11 +1364,10 @@ impl PushPromise { // ===== impl PushedResponseFuture ===== impl Future for PushedResponseFuture { - type Item = Response; - type Error = crate::Error; + type Output = Result, crate::Error>; - fn poll(&mut self) -> Poll { - self.inner.poll() + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.inner.poll_unpin(cx) } } @@ -1431,8 +1388,8 @@ impl Peer { pub fn convert_send_message( id: StreamId, request: Request<()>, - end_of_stream: bool) -> Result - { + end_of_stream: bool, + ) -> Result { use http::request::Parts; let ( @@ -1503,7 +1460,9 @@ impl proto::Peer for Peer { } fn convert_poll_message( - pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId + pseudo: Pseudo, + fields: HeaderMap, + stream_id: StreamId, ) -> Result { let mut b = Response::builder(); @@ -1522,7 +1481,7 @@ impl proto::Peer for Peer { id: stream_id, reason: Reason::PROTOCOL_ERROR, }); - }, + } }; *response.headers_mut() = fields; diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index 4a6bc43..59d08a8 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -1,24 +1,29 @@ use crate::codec::RecvError; use crate::frame::{self, Frame, Kind, Reason}; -use crate::frame::{DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE}; +use crate::frame::{ + DEFAULT_MAX_FRAME_SIZE, DEFAULT_SETTINGS_HEADER_TABLE_SIZE, MAX_MAX_FRAME_SIZE, +}; use crate::hpack; -use futures::*; +use futures::{ready, Stream}; use bytes::BytesMut; use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio_codec::{LengthDelimitedCodec, LengthDelimitedCodecError}; +use tokio_codec::FramedRead as InnerFramedRead; use tokio_io::AsyncRead; -use tokio_io::codec::length_delimited; // 16 MB "sane default" taken from golang http2 const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: usize = 16 << 20; #[derive(Debug)] pub struct FramedRead { - inner: length_delimited::FramedRead, + inner: InnerFramedRead, // hpack decoder state hpack: hpack::Decoder, @@ -45,7 +50,7 @@ enum Continuable { } impl FramedRead { - pub fn new(inner: length_delimited::FramedRead) -> FramedRead { + pub fn new(inner: InnerFramedRead) -> FramedRead { FramedRead { inner: inner, hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE), @@ -138,24 +143,27 @@ impl FramedRead { res.map_err(|e| { proto_err!(conn: "failed to load SETTINGS frame; err={:?}", e); Connection(Reason::PROTOCOL_ERROR) - })?.into() - }, + })? + .into() + } Kind::Ping => { let res = frame::Ping::load(head, &bytes[frame::HEADER_LEN..]); res.map_err(|e| { proto_err!(conn: "failed to load PING frame; err={:?}", e); Connection(Reason::PROTOCOL_ERROR) - })?.into() - }, + })? + .into() + } Kind::WindowUpdate => { let res = frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..]); res.map_err(|e| { proto_err!(conn: "failed to load WINDOW_UPDATE frame; err={:?}", e); Connection(Reason::PROTOCOL_ERROR) - })?.into() - }, + })? + .into() + } Kind::Data => { let _ = bytes.split_to(frame::HEADER_LEN); let res = frame::Data::load(head, bytes.freeze()); @@ -164,28 +172,27 @@ impl FramedRead { res.map_err(|e| { proto_err!(conn: "failed to load DATA frame; err={:?}", e); Connection(Reason::PROTOCOL_ERROR) - })?.into() - }, - Kind::Headers => { - header_block!(Headers, head, bytes) - }, + })? + .into() + } + Kind::Headers => header_block!(Headers, head, bytes), Kind::Reset => { let res = frame::Reset::load(head, &bytes[frame::HEADER_LEN..]); res.map_err(|e| { proto_err!(conn: "failed to load RESET frame; err={:?}", e); Connection(Reason::PROTOCOL_ERROR) - })?.into() - }, + })? + .into() + } Kind::GoAway => { let res = frame::GoAway::load(&bytes[frame::HEADER_LEN..]); res.map_err(|e| { proto_err!(conn: "failed to load GO_AWAY frame; err={:?}", e); Connection(Reason::PROTOCOL_ERROR) - })?.into() - }, - Kind::PushPromise => { - header_block!(PushPromise, head, bytes) - }, + })? + .into() + } + Kind::PushPromise => header_block!(PushPromise, head, bytes), Kind::Priority => { if head.stream_id() == 0 { // Invalid stream identifier @@ -205,13 +212,13 @@ impl FramedRead { id, reason: Reason::PROTOCOL_ERROR, }); - }, + } Err(e) => { proto_err!(conn: "failed to load PRIORITY frame; err={:?};", e); return Err(Connection(Reason::PROTOCOL_ERROR)); } } - }, + } Kind::Continuation => { let is_end_headers = (head.flag() & 0x4) == 0x4; @@ -229,8 +236,6 @@ impl FramedRead { return Err(Connection(Reason::PROTOCOL_ERROR)); } - - // Extend the buf if partial.buf.is_empty() { partial.buf = bytes.split_off(frame::HEADER_LEN); @@ -257,9 +262,14 @@ impl FramedRead { partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]); } - match partial.frame.load_hpack(&mut partial.buf, self.max_header_list_size, &mut self.hpack) { - Ok(_) => {}, - Err(frame::Error::Hpack(hpack::DecoderError::NeedMore(_))) if !is_end_headers => {}, + match partial.frame.load_hpack( + &mut partial.buf, + self.max_header_list_size, + &mut self.hpack, + ) { + Ok(_) => {} + Err(frame::Error::Hpack(hpack::DecoderError::NeedMore(_))) + if !is_end_headers => {} Err(frame::Error::MalformedMessage) => { let id = head.stream_id(); proto_err!(stream: "malformed CONTINUATION frame; stream={:?}", id); @@ -267,11 +277,11 @@ impl FramedRead { id, reason: Reason::PROTOCOL_ERROR, }); - }, + } Err(e) => { proto_err!(conn: "failed HPACK decoding; err={:?}", e); return Err(Connection(Reason::PROTOCOL_ERROR)); - }, + } } if is_end_headers { @@ -280,11 +290,11 @@ impl FramedRead { self.partial = Some(partial); return Ok(None); } - }, + } Kind::Unknown => { // Unknown frames are ignored return Ok(None); - }, + } }; Ok(Some(frame)) @@ -302,7 +312,7 @@ impl FramedRead { #[cfg(feature = "unstable")] #[inline] pub fn max_frame_size(&self) -> usize { - self.inner.max_frame_length() + self.inner.decoder().max_frame_length() } /// Updates the max frame size setting. @@ -311,7 +321,7 @@ impl FramedRead { #[inline] pub fn set_max_frame_size(&mut self, val: usize) { assert!(DEFAULT_MAX_FRAME_SIZE as usize <= val && val <= MAX_MAX_FRAME_SIZE as usize); - self.inner.set_max_frame_length(val) + self.inner.decoder_mut().set_max_frame_length(val) } /// Update the max header list size setting. @@ -323,34 +333,32 @@ impl FramedRead { impl Stream for FramedRead where - T: AsyncRead, + T: AsyncRead + Unpin, { - type Item = Frame; - type Error = RecvError; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { log::trace!("poll"); - let bytes = match try_ready!(self.inner.poll().map_err(map_err)) { - Some(bytes) => bytes, - None => return Ok(Async::Ready(None)), + let bytes = match ready!(Pin::new(&mut self.inner).poll_next(cx)) { + Some(Ok(bytes)) => bytes, + Some(Err(e)) => return Poll::Ready(Some(Err(map_err(e)))), + None => return Poll::Ready(None), }; log::trace!("poll; bytes={}B", bytes.len()); if let Some(frame) = self.decode_frame(bytes)? { log::debug!("received; frame={:?}", frame); - return Ok(Async::Ready(Some(frame))); + return Poll::Ready(Some(Ok(frame))); } } } } fn map_err(err: io::Error) -> RecvError { - use tokio_io::codec::length_delimited::FrameTooBig; - if let io::ErrorKind::InvalidData = err.kind() { if let Some(custom) = err.get_ref() { - if custom.is::() { + if custom.is::() { return RecvError::Connection(Reason::FRAME_SIZE_ERROR); } } diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index fa6ac18..ce95620 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -4,8 +4,10 @@ use crate::frame::{self, Frame, FrameSize}; use crate::hpack; use bytes::{Buf, BufMut, BytesMut}; -use futures::*; -use tokio_io::{AsyncRead, AsyncWrite, try_nb}; +use futures::ready; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio_io::{AsyncRead, AsyncWrite}; use std::io::{self, Cursor}; @@ -55,12 +57,12 @@ const CHAIN_THRESHOLD: usize = 256; // TODO: Make generic impl FramedWrite where - T: AsyncWrite, + T: AsyncWrite + Unpin, B: Buf, { pub fn new(inner: T) -> FramedWrite { FramedWrite { - inner: inner, + inner, hpack: hpack::Encoder::default(), buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)), next: None, @@ -73,17 +75,17 @@ where /// /// Calling this function may result in the current contents of the buffer /// to be flushed to `T`. - pub fn poll_ready(&mut self) -> Poll<(), io::Error> { + pub fn poll_ready(&mut self, cx: &mut Context) -> Poll> { if !self.has_capacity() { // Try flushing - self.flush()?; + ready!(self.flush(cx))?; if !self.has_capacity() { - return Ok(Async::NotReady); + return Poll::Pending; } } - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } /// Buffer a frame. @@ -123,33 +125,33 @@ where // Save off the last frame... self.last_data_frame = Some(v); } - }, + } Frame::Headers(v) => { if let Some(continuation) = v.encode(&mut self.hpack, self.buf.get_mut()) { self.next = Some(Next::Continuation(continuation)); } - }, + } Frame::PushPromise(v) => { if let Some(continuation) = v.encode(&mut self.hpack, self.buf.get_mut()) { self.next = Some(Next::Continuation(continuation)); } - }, + } Frame::Settings(v) => { v.encode(self.buf.get_mut()); log::trace!("encoded settings; rem={:?}", self.buf.remaining()); - }, + } Frame::GoAway(v) => { v.encode(self.buf.get_mut()); log::trace!("encoded go_away; rem={:?}", self.buf.remaining()); - }, + } Frame::Ping(v) => { v.encode(self.buf.get_mut()); log::trace!("encoded ping; rem={:?}", self.buf.remaining()); - }, + } Frame::WindowUpdate(v) => { v.encode(self.buf.get_mut()); log::trace!("encoded window_update; rem={:?}", self.buf.remaining()); - }, + } Frame::Priority(_) => { /* @@ -157,18 +159,18 @@ where log::trace!("encoded priority; rem={:?}", self.buf.remaining()); */ unimplemented!(); - }, + } Frame::Reset(v) => { v.encode(self.buf.get_mut()); log::trace!("encoded reset; rem={:?}", self.buf.remaining()); - }, + } } Ok(()) } /// Flush buffered data to the wire - pub fn flush(&mut self) -> Poll<(), io::Error> { + pub fn flush(&mut self, cx: &mut Context) -> Poll> { log::trace!("flush"); loop { @@ -177,12 +179,12 @@ where Some(Next::Data(ref mut frame)) => { log::trace!(" -> queued data frame"); let mut buf = Buf::by_ref(&mut self.buf).chain(frame.payload_mut()); - try_ready!(self.inner.write_buf(&mut buf)); - }, + ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut buf))?; + } _ => { log::trace!(" -> not a queued data frame"); - try_ready!(self.inner.write_buf(&mut self.buf)); - }, + ready!(Pin::new(&mut self.inner).poll_write_buf(cx, &mut self.buf))?; + } } } @@ -196,11 +198,10 @@ where self.last_data_frame = Some(frame); debug_assert!(self.is_empty()); break; - }, + } Some(Next::Continuation(frame)) => { // Buffer the continuation frame, then try to write again if let Some(continuation) = frame.encode(&mut self.hpack, self.buf.get_mut()) { - // We previously had a CONTINUATION, and after encoding // it, we got *another* one? Let's just double check // that at least some progress is being made... @@ -213,7 +214,7 @@ where self.next = Some(Next::Continuation(continuation)); } - }, + } None => { break; } @@ -222,15 +223,15 @@ where log::trace!("flushing buffer"); // Flush the upstream - try_nb!(self.inner.flush()); + ready!(Pin::new(&mut self.inner).poll_flush(cx))?; - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } /// Close the codec - pub fn shutdown(&mut self) -> Poll<(), io::Error> { - try_ready!(self.flush()); - self.inner.shutdown().map_err(Into::into) + pub fn shutdown(&mut self, cx: &mut Context) -> Poll> { + ready!(self.flush(cx))?; + Pin::new(&mut self.inner).poll_shutdown(cx) } fn has_capacity(&self) -> bool { @@ -267,23 +268,18 @@ impl FramedWrite { } } -impl io::Read for FramedWrite { - fn read(&mut self, dst: &mut [u8]) -> io::Result { - self.inner.read(dst) - } -} - -impl AsyncRead for FramedWrite { - fn read_buf(&mut self, buf: &mut B2) -> Poll - where - Self: Sized, - { - self.inner.read_buf(buf) - } - +impl AsyncRead for FramedWrite { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } + + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_read(cx, buf) + } } #[cfg(feature = "unstable")] diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 3ae00eb..322174c 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -14,10 +14,11 @@ use crate::frame::{self, Data, Frame}; use futures::*; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::length_delimited; - use bytes::Buf; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio_codec::length_delimited; +use tokio_io::{AsyncRead, AsyncWrite}; use std::io; @@ -28,8 +29,8 @@ pub struct Codec { impl Codec where - T: AsyncRead + AsyncWrite, - B: Buf, + T: AsyncRead + AsyncWrite + Unpin, + B: Buf + Unpin, { /// Returns a new `Codec` with the default max frame size #[inline] @@ -55,9 +56,7 @@ where // Use FramedRead's method since it checks the value is within range. inner.set_max_frame_size(max_frame_size); - Codec { - inner, - } + Codec { inner } } } @@ -121,12 +120,12 @@ impl Codec { impl Codec where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { /// Returns `Ready` when the codec can buffer a frame - pub fn poll_ready(&mut self) -> Poll<(), io::Error> { - self.framed_write().poll_ready() + pub fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + self.framed_write().poll_ready(cx) } /// Buffer a frame. @@ -140,60 +139,59 @@ where } /// Flush buffered data to the wire - pub fn flush(&mut self) -> Poll<(), io::Error> { - self.framed_write().flush() + pub fn flush(&mut self, cx: &mut Context) -> Poll> { + self.framed_write().flush(cx) } /// Shutdown the send half - pub fn shutdown(&mut self) -> Poll<(), io::Error> { - self.framed_write().shutdown() + pub fn shutdown(&mut self, cx: &mut Context) -> Poll> { + self.framed_write().shutdown(cx) } } impl Stream for Codec where - T: AsyncRead, + T: AsyncRead + Unpin, + B: Unpin, { - type Item = Frame; - type Error = RecvError; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) } } -impl Sink for Codec +impl Sink> for Codec where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { - type SinkItem = Frame; - type SinkError = SendError; + type Error = SendError; - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - if !self.poll_ready()?.is_ready() { - return Ok(AsyncSink::NotReady(item)); - } - - self.buffer(item)?; - Ok(AsyncSink::Ready) + fn start_send(mut self: Pin<&mut Self>, item: Frame) -> Result<(), Self::Error> { + Codec::buffer(&mut self, item)?; + Ok(()) + } + /// Returns `Ready` when the codec can buffer a frame + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.framed_write().poll_ready(cx).map_err(Into::into) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.flush()?; - Ok(Async::Ready(())) + /// Flush buffered data to the wire + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.framed_write().flush(cx).map_err(Into::into) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.shutdown()?; - Ok(Async::Ready(())) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + ready!(self.shutdown(cx))?; + Poll::Ready(Ok(())) } } // TODO: remove (or improve) this impl From for Codec> where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, { fn from(src: T) -> Self { Self::new(src) diff --git a/src/lib.rs b/src/lib.rs index 41a2d24..a03ea96 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,6 +81,7 @@ #![doc(html_root_url = "https://docs.rs/h2/0.1.25")] #![deny(missing_debug_implementations, missing_docs)] #![cfg_attr(test, deny(warnings))] +#![feature(async_await)] macro_rules! proto_err { (conn: $($msg:tt)+) => { @@ -91,9 +92,9 @@ macro_rules! proto_err { }; } -mod error; #[cfg_attr(feature = "unstable", allow(missing_docs))] mod codec; +mod error; mod hpack; mod proto; @@ -109,7 +110,48 @@ pub mod server; mod share; pub use crate::error::{Error, Reason}; -pub use crate::share::{SendStream, StreamId, RecvStream, ReleaseCapacity, PingPong, Ping, Pong}; +pub use crate::share::{Ping, PingPong, Pong, RecvStream, ReleaseCapacity, SendStream, StreamId}; #[cfg(feature = "unstable")] pub use codec::{Codec, RecvError, SendError, UserError}; + +use std::task::Poll; + +// TODO: Get rid of this trait once https://github.com/rust-lang/rust/pull/63512 +// is stablized. +trait PollExt { + /// Changes the success value of this `Poll` with the closure provided. + fn map_ok_(self, f: F) -> Poll>> + where + F: FnOnce(T) -> U; + /// Changes the error value of this `Poll` with the closure provided. + fn map_err_(self, f: F) -> Poll>> + where + F: FnOnce(E) -> U; +} + +impl PollExt for Poll>> { + fn map_ok_(self, f: F) -> Poll>> + where + F: FnOnce(T) -> U, + { + match self { + Poll::Ready(Some(Ok(t))) => Poll::Ready(Some(Ok(f(t)))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + fn map_err_(self, f: F) -> Poll>> + where + F: FnOnce(E) -> U, + { + match self { + Poll::Ready(Some(Ok(t))) => Poll::Ready(Some(Ok(t))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(f(e)))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 6183369..4c38e25 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,17 +1,18 @@ -use crate::{client, frame, proto, server}; use crate::codec::RecvError; use crate::frame::{Reason, StreamId}; +use crate::{client, frame, proto, server}; use crate::frame::DEFAULT_INITIAL_WINDOW_SIZE; use crate::proto::*; use bytes::{Bytes, IntoBuf}; -use futures::{Stream, try_ready}; -use tokio_io::{AsyncRead, AsyncWrite}; - -use std::marker::PhantomData; +use futures::{ready, Stream}; use std::io; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; +use tokio_io::{AsyncRead, AsyncWrite}; /// An H2 connection #[derive(Debug)] @@ -70,16 +71,15 @@ enum State { impl Connection where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, P: Peer, - B: IntoBuf, + B: IntoBuf + Unpin, + B::Buf: Unpin, { - pub fn new( - codec: Codec>, - config: Config, - ) -> Connection { + pub fn new(codec: Codec>, config: Config) -> Connection { let streams = Streams::new(streams::Config { - local_init_window_sz: config.settings + local_init_window_sz: config + .settings .initial_window_size() .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), initial_max_send_streams: config.initial_max_send_streams, @@ -88,7 +88,8 @@ where local_reset_duration: config.reset_stream_duration, local_reset_max: config.reset_stream_max, remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, - remote_max_initiated: config.settings + remote_max_initiated: config + .settings .max_concurrent_streams() .map(|max| max as usize), }); @@ -112,25 +113,24 @@ where /// /// Returns `RecvError` as this may raise errors that are caused by delayed /// processing of received frames. - fn poll_ready(&mut self) -> Poll<(), RecvError> { + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { // The order of these calls don't really matter too much - try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); - try_ready!(self.ping_pong.send_pending_ping(&mut self.codec)); - try_ready!( - self.settings - .send_pending_ack(&mut self.codec, &mut self.streams) - ); - try_ready!(self.streams.send_pending_refusal(&mut self.codec)); + ready!(self.ping_pong.send_pending_pong(cx, &mut self.codec))?; + ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?; + ready!(self + .settings + .send_pending_ack(cx, &mut self.codec, &mut self.streams))?; + ready!(self.streams.send_pending_refusal(cx, &mut self.codec))?; - Ok(().into()) + Poll::Ready(Ok(())) } /// Send any pending GOAWAY frames. /// /// This will return `Some(reason)` if the connection should be closed /// afterwards. If this is a graceful shutdown, this returns `None`. - fn poll_go_away(&mut self) -> Poll, io::Error> { - self.go_away.send_pending_go_away(&mut self.codec) + fn poll_go_away(&mut self, cx: &mut Context) -> Poll>> { + self.go_away.send_pending_go_away(cx, &mut self.codec) } fn go_away(&mut self, id: StreamId, e: Reason) { @@ -154,7 +154,7 @@ where self.streams.recv_err(&proto::Error::Proto(e)); } - fn take_error(&mut self, ours: Reason) -> Poll<(), proto::Error> { + fn take_error(&mut self, ours: Reason) -> Poll> { let reason = if let Some(theirs) = self.error.take() { match (ours, theirs) { // If either side reported an error, return that @@ -171,9 +171,9 @@ where }; if reason == Reason::NO_ERROR { - Ok(().into()) + Poll::Ready(Ok(())) } else { - Err(proto::Error::Proto(reason)) + Poll::Ready(Err(proto::Error::Proto(reason))) } } @@ -192,7 +192,7 @@ where } /// Advances the internal state of the connection. - pub fn poll(&mut self) -> Poll<(), proto::Error> { + pub fn poll(&mut self, cx: &mut Context) -> Poll> { use crate::codec::RecvError::*; loop { @@ -200,15 +200,15 @@ where match self.state { // When open, continue to poll a frame State::Open => { - match self.poll2() { + match self.poll2(cx) { // The connection has shutdown normally - Ok(Async::Ready(())) => self.state = State::Closing(Reason::NO_ERROR), + Poll::Ready(Ok(())) => self.state = State::Closing(Reason::NO_ERROR), // The connection is not ready to make progress - Ok(Async::NotReady) => { + Poll::Pending => { // Ensure all window updates have been sent. // // This will also handle flushing `self.codec` - try_ready!(self.streams.poll_complete(&mut self.codec)); + ready!(self.streams.poll_complete(cx, &mut self.codec))?; if self.error.is_some() || self.go_away.should_close_on_idle() { if !self.streams.has_streams() { @@ -217,12 +217,12 @@ where } } - return Ok(Async::NotReady); - }, + return Poll::Pending; + } // Attempting to read a frame resulted in a connection level // error. This is handled by setting a GOAWAY frame followed by // terminating the connection. - Err(Connection(e)) => { + Poll::Ready(Err(Connection(e))) => { log::debug!("Connection::poll; connection error={:?}", e); // We may have already sent a GOAWAY for this error, @@ -238,22 +238,19 @@ where // Reset all active streams self.streams.recv_err(&e.into()); self.go_away_now(e); - }, + } // Attempting to read a frame resulted in a stream level error. // This is handled by resetting the frame then trying to read // another frame. - Err(Stream { - id, - reason, - }) => { + Poll::Ready(Err(Stream { id, reason })) => { log::trace!("stream error; id={:?}; reason={:?}", id, reason); self.streams.send_reset(id, reason); - }, + } // Attempting to read a frame resulted in an I/O error. All // active streams must be reset. // // TODO: Are I/O errors recoverable? - Err(Io(e)) => { + Poll::Ready(Err(Io(e))) => { log::debug!("Connection::poll; IO error={:?}", e); let e = e.into(); @@ -261,24 +258,24 @@ where self.streams.recv_err(&e); // Return the error - return Err(e); - }, + return Poll::Ready(Err(e)); + } } } State::Closing(reason) => { log::trace!("connection closing after flush"); // Flush/shutdown the codec - try_ready!(self.codec.shutdown()); + ready!(self.codec.shutdown(cx))?; // Transition the state to error self.state = State::Closed(reason); - }, + } State::Closed(reason) => return self.take_error(reason), } } } - fn poll2(&mut self) -> Poll<(), RecvError> { + fn poll2(&mut self, cx: &mut Context) -> Poll> { use crate::frame::Frame::*; // This happens outside of the loop to prevent needing to do a clock @@ -292,43 +289,51 @@ where // The order here matters: // - poll_go_away may buffer a graceful shutdown GOAWAY frame // - If it has, we've also added a PING to be sent in poll_ready - if let Some(reason) = try_ready!(self.poll_go_away()) { - if self.go_away.should_close_now() { - if self.go_away.is_user_initiated() { - // A user initiated abrupt shutdown shouldn't return - // the same error back to the user. - return Ok(Async::Ready(())); - } else { - return Err(RecvError::Connection(reason)); + match ready!(self.poll_go_away(cx)) { + Some(Ok(reason)) => { + if self.go_away.should_close_now() { + if self.go_away.is_user_initiated() { + // A user initiated abrupt shutdown shouldn't return + // the same error back to the user. + return Poll::Ready(Ok(())); + } else { + return Poll::Ready(Err(RecvError::Connection(reason))); + } } + // Only NO_ERROR should be waiting for idle + debug_assert_eq!( + reason, + Reason::NO_ERROR, + "graceful GOAWAY should be NO_ERROR" + ); } - // Only NO_ERROR should be waiting for idle - debug_assert_eq!(reason, Reason::NO_ERROR, "graceful GOAWAY should be NO_ERROR"); + Some(Err(e)) => return Poll::Ready(Err(e.into())), + None => (), } - try_ready!(self.poll_ready()); + ready!(self.poll_ready(cx))?; - match try_ready!(self.codec.poll()) { - Some(Headers(frame)) => { + match ready!(Pin::new(&mut self.codec).poll_next(cx)) { + Some(Ok(Headers(frame))) => { log::trace!("recv HEADERS; frame={:?}", frame); self.streams.recv_headers(frame)?; - }, - Some(Data(frame)) => { + } + Some(Ok(Data(frame))) => { log::trace!("recv DATA; frame={:?}", frame); self.streams.recv_data(frame)?; - }, - Some(Reset(frame)) => { + } + Some(Ok(Reset(frame))) => { log::trace!("recv RST_STREAM; frame={:?}", frame); self.streams.recv_reset(frame)?; - }, - Some(PushPromise(frame)) => { + } + Some(Ok(PushPromise(frame))) => { log::trace!("recv PUSH_PROMISE; frame={:?}", frame); self.streams.recv_push_promise(frame)?; - }, - Some(Settings(frame)) => { + } + Some(Ok(Settings(frame))) => { log::trace!("recv SETTINGS; frame={:?}", frame); self.settings.recv_settings(frame); - }, - Some(GoAway(frame)) => { + } + Some(Ok(GoAway(frame))) => { log::trace!("recv GOAWAY; frame={:?}", frame); // This should prevent starting new streams, // but should allow continuing to process current streams @@ -336,8 +341,8 @@ where // transition to GoAway. self.streams.recv_go_away(&frame)?; self.error = Some(frame.reason()); - }, - Some(Ping(frame)) => { + } + Some(Ok(Ping(frame))) => { log::trace!("recv PING; frame={:?}", frame); let status = self.ping_pong.recv_ping(frame); if status.is_shutdown() { @@ -349,21 +354,21 @@ where let last_processed_id = self.streams.last_processed_id(); self.go_away(last_processed_id, Reason::NO_ERROR); } - }, - Some(WindowUpdate(frame)) => { + } + Some(Ok(WindowUpdate(frame))) => { log::trace!("recv WINDOW_UPDATE; frame={:?}", frame); self.streams.recv_window_update(frame)?; - }, - Some(Priority(frame)) => { + } + Some(Ok(Priority(frame))) => { log::trace!("recv PRIORITY; frame={:?}", frame); // TODO: handle - }, + } + Some(Err(e)) => return Poll::Ready(Err(e)), None => { log::trace!("codec closed"); - self.streams.recv_eof(false) - .ok().expect("mutex poisoned"); - return Ok(Async::Ready(())); - }, + self.streams.recv_eof(false).ok().expect("mutex poisoned"); + return Poll::Ready(Ok(())); + } } } } @@ -385,8 +390,9 @@ where impl Connection where - T: AsyncRead + AsyncWrite, - B: IntoBuf, + T: AsyncRead + AsyncWrite + Unpin, + B: IntoBuf + Unpin, + B::Buf: Unpin, { pub fn next_incoming(&mut self) -> Option> { self.streams.next_incoming() diff --git a/src/proto/go_away.rs b/src/proto/go_away.rs index 42ff0f0..1ac2f2e 100644 --- a/src/proto/go_away.rs +++ b/src/proto/go_away.rs @@ -2,8 +2,8 @@ use crate::codec::Codec; use crate::frame::{self, Reason, StreamId}; use bytes::Buf; -use futures::{Async, Poll}; use std::io; +use std::task::{Context, Poll}; use tokio_io::AsyncWrite; /// Manages our sending of GOAWAY frames. @@ -59,7 +59,7 @@ impl GoAway { assert!( f.last_stream_id() <= going_away.last_processed_id, "GOAWAY stream IDs shouldn't be higher; \ - last_processed_id = {:?}, f.last_stream_id() = {:?}", + last_processed_id = {:?}, f.last_stream_id() = {:?}", going_away.last_processed_id, f.last_stream_id(), ); @@ -76,8 +76,8 @@ impl GoAway { self.close_now = true; if let Some(ref going_away) = self.going_away { // Prevent sending the same GOAWAY twice. - if going_away.last_processed_id == f.last_stream_id() - && going_away.reason == f.reason() { + if going_away.last_processed_id == f.last_stream_id() && going_away.reason == f.reason() + { return; } } @@ -100,9 +100,7 @@ impl GoAway { /// Return the last Reason we've sent. pub fn going_away_reason(&self) -> Option { - self.going_away - .as_ref() - .map(|g| g.reason) + self.going_away.as_ref().map(|g| g.reason) } /// Returns if the connection should close now, or wait until idle. @@ -112,36 +110,43 @@ impl GoAway { /// Returns if the connection should be closed when idle. pub fn should_close_on_idle(&self) -> bool { - !self.close_now && self.going_away - .as_ref() - .map(|g| g.last_processed_id != StreamId::MAX) - .unwrap_or(false) + !self.close_now + && self + .going_away + .as_ref() + .map(|g| g.last_processed_id != StreamId::MAX) + .unwrap_or(false) } /// Try to write a pending GOAWAY frame to the buffer. /// /// If a frame is written, the `Reason` of the GOAWAY is returned. - pub fn send_pending_go_away(&mut self, dst: &mut Codec) -> Poll, io::Error> + pub fn send_pending_go_away( + &mut self, + cx: &mut Context, + dst: &mut Codec, + ) -> Poll>> where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { if let Some(frame) = self.pending.take() { - if !dst.poll_ready()?.is_ready() { + if !dst.poll_ready(cx)?.is_ready() { self.pending = Some(frame); - return Ok(Async::NotReady); + return Poll::Pending; } let reason = frame.reason(); - dst.buffer(frame.into()) - .ok() - .expect("invalid GOAWAY frame"); + dst.buffer(frame.into()).ok().expect("invalid GOAWAY frame"); - return Ok(Async::Ready(Some(reason))); + return Poll::Ready(Some(Ok(reason))); } else if self.should_close_now() { - return Ok(Async::Ready(self.going_away_reason())); + return match self.going_away_reason() { + Some(reason) => Poll::Ready(Some(Ok(reason))), + None => Poll::Ready(None), + }; } - Ok(Async::Ready(None)) + Poll::Ready(None) } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index ae43bda..4b6e090 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -8,10 +8,10 @@ mod streams; pub(crate) use self::connection::{Config, Connection}; pub(crate) use self::error::Error; -pub(crate) use self::peer::{Peer, Dyn as DynPeer}; +pub(crate) use self::peer::{Dyn as DynPeer, Peer}; pub(crate) use self::ping_pong::UserPings; -pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams}; -pub(crate) use self::streams::{PollReset, Prioritized, Open}; +pub(crate) use self::streams::{OpaqueStreamRef, StreamRef, Streams}; +pub(crate) use self::streams::{Open, PollReset, Prioritized}; use crate::codec::Codec; @@ -21,9 +21,6 @@ use self::settings::Settings; use crate::frame::{self, Frame}; -use futures::{task, Async, Poll}; -use futures::task::Task; - use bytes::Buf; use tokio_io::AsyncWrite; diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index bc24c82..0dbbec2 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -3,11 +3,11 @@ use crate::frame::Ping; use crate::proto::{self, PingPayload}; use bytes::Buf; -use futures::{Async, Poll}; -use futures::task::AtomicTask; +use futures::task::AtomicWaker; use std::io; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; use tokio_io::AsyncWrite; /// Acknowledges ping requests from the remote. @@ -28,9 +28,9 @@ struct UserPingsRx(Arc); struct UserPingsInner { state: AtomicUsize, /// Task to wake up the main `Connection`. - ping_task: AtomicTask, + ping_task: AtomicWaker, /// Task to wake up `share::PingPong::poll_pong`. - pong_task: AtomicTask, + pong_task: AtomicWaker, } #[derive(Debug)] @@ -77,8 +77,8 @@ impl PingPong { let user_pings = Arc::new(UserPingsInner { state: AtomicUsize::new(USER_STATE_EMPTY), - ping_task: AtomicTask::new(), - pong_task: AtomicTask::new(), + ping_task: AtomicWaker::new(), + pong_task: AtomicWaker::new(), }); self.user_pings = Some(UserPingsRx(user_pings.clone())); Some(UserPings(user_pings)) @@ -135,34 +135,42 @@ impl PingPong { } /// Send any pending pongs. - pub(crate) fn send_pending_pong(&mut self, dst: &mut Codec) -> Poll<(), io::Error> + pub(crate) fn send_pending_pong( + &mut self, + cx: &mut Context, + dst: &mut Codec, + ) -> Poll> where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { if let Some(pong) = self.pending_pong.take() { - if !dst.poll_ready()?.is_ready() { + if !dst.poll_ready(cx)?.is_ready() { self.pending_pong = Some(pong); - return Ok(Async::NotReady); + return Poll::Pending; } dst.buffer(Ping::pong(pong).into()) .expect("invalid pong frame"); } - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } /// Send any pending pings. - pub(crate) fn send_pending_ping(&mut self, dst: &mut Codec) -> Poll<(), io::Error> + pub(crate) fn send_pending_ping( + &mut self, + cx: &mut Context, + dst: &mut Codec, + ) -> Poll> where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { if let Some(ref mut ping) = self.pending_ping { if !ping.sent { - if !dst.poll_ready()?.is_ready() { - return Ok(Async::NotReady); + if !dst.poll_ready(cx)?.is_ready() { + return Poll::Pending; } dst.buffer(Ping::new(ping.payload).into()) @@ -171,19 +179,22 @@ impl PingPong { } } else if let Some(ref users) = self.user_pings { if users.0.state.load(Ordering::Acquire) == USER_STATE_PENDING_PING { - if !dst.poll_ready()?.is_ready() { - return Ok(Async::NotReady); + if !dst.poll_ready(cx)?.is_ready() { + return Poll::Pending; } dst.buffer(Ping::new(Ping::USER).into()) .expect("invalid ping frame"); - users.0.state.store(USER_STATE_PENDING_PONG, Ordering::Release); + users + .0 + .state + .store(USER_STATE_PENDING_PONG, Ordering::Release); } else { - users.0.ping_task.register(); + users.0.ping_task.register(cx.waker()); } } - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } } @@ -201,19 +212,17 @@ impl ReceivedPing { impl UserPings { pub(crate) fn send_ping(&self) -> Result<(), Option> { let prev = self.0.state.compare_and_swap( - USER_STATE_EMPTY, // current + USER_STATE_EMPTY, // current USER_STATE_PENDING_PING, // new Ordering::AcqRel, ); match prev { USER_STATE_EMPTY => { - self.0.ping_task.notify(); + self.0.ping_task.wake(); Ok(()) - }, - USER_STATE_CLOSED => { - Err(Some(broken_pipe().into())) } + USER_STATE_CLOSED => Err(Some(broken_pipe().into())), _ => { // Was already pending, user error! Err(None) @@ -221,20 +230,20 @@ impl UserPings { } } - pub(crate) fn poll_pong(&self) -> Poll<(), proto::Error> { + pub(crate) fn poll_pong(&self, cx: &mut Context) -> Poll> { // Must register before checking state, in case state were to change // before we could register, and then the ping would just be lost. - self.0.pong_task.register(); + self.0.pong_task.register(cx.waker()); let prev = self.0.state.compare_and_swap( USER_STATE_RECEIVED_PONG, // current - USER_STATE_EMPTY, // new + USER_STATE_EMPTY, // new Ordering::AcqRel, ); match prev { - USER_STATE_RECEIVED_PONG => Ok(Async::Ready(())), - USER_STATE_CLOSED => Err(broken_pipe().into()), - _ => Ok(Async::NotReady), + USER_STATE_RECEIVED_PONG => Poll::Ready(Ok(())), + USER_STATE_CLOSED => Poll::Ready(Err(broken_pipe().into())), + _ => Poll::Pending, } } } @@ -244,13 +253,13 @@ impl UserPings { impl UserPingsRx { fn receive_pong(&self) -> bool { let prev = self.0.state.compare_and_swap( - USER_STATE_PENDING_PONG, // current + USER_STATE_PENDING_PONG, // current USER_STATE_RECEIVED_PONG, // new Ordering::AcqRel, ); if prev == USER_STATE_PENDING_PONG { - self.0.pong_task.notify(); + self.0.pong_task.wake(); true } else { false @@ -261,7 +270,7 @@ impl UserPingsRx { impl Drop for UserPingsRx { fn drop(&mut self) { self.0.state.store(USER_STATE_CLOSED, Ordering::Release); - self.0.pong_task.notify(); + self.0.pong_task.wake(); } } diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 4007993..f35aefa 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,6 +1,7 @@ use crate::codec::RecvError; use crate::frame; use crate::proto::*; +use std::task::{Poll, Context}; #[derive(Debug)] pub(crate) struct Settings { @@ -29,21 +30,22 @@ impl Settings { pub fn send_pending_ack( &mut self, + cx: &mut Context, dst: &mut Codec, streams: &mut Streams, - ) -> Poll<(), RecvError> + ) -> Poll> where - T: AsyncWrite, - B: Buf, - C: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, + C: Buf + Unpin, P: Peer, { log::trace!("send_pending_ack; pending={:?}", self.pending); - if let Some(ref settings) = self.pending { - if !dst.poll_ready()?.is_ready() { + if let Some(settings) = &self.pending { + if !dst.poll_ready(cx)?.is_ready() { log::trace!("failed to send ACK"); - return Ok(Async::NotReady); + return Poll::Pending; } // Create an ACK settings frame @@ -65,6 +67,6 @@ impl Settings { self.pending = None; - Ok(().into()) + Poll::Ready(Ok(())) } } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index fee9e57..efa1050 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -7,10 +7,10 @@ use crate::codec::UserError; use crate::codec::UserError::*; use bytes::buf::Take; -use futures::try_ready; - +use futures::ready; use std::{cmp, fmt, mem}; use std::io; +use std::task::{Context, Poll, Waker}; /// # Warning /// @@ -104,14 +104,14 @@ impl Prioritize { frame: Frame, buffer: &mut Buffer>, stream: &mut store::Ptr, - task: &mut Option, + task: &mut Option, ) { // Queue the frame in the buffer stream.pending_send.push_back(buffer, frame); self.schedule_send(stream, task); } - pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option) { + pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option) { // If the stream is waiting to be opened, nothing more to do. if !stream.is_pending_open { log::trace!("schedule_send; {:?}", stream.id); @@ -120,7 +120,7 @@ impl Prioritize { // Notify the connection. if let Some(task) = task.take() { - task.notify(); + task.wake(); } } } @@ -136,7 +136,7 @@ impl Prioritize { buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, - task: &mut Option, + task: &mut Option, ) -> Result<(), UserError> where B: Buf, @@ -483,17 +483,18 @@ impl Prioritize { pub fn poll_complete( &mut self, + cx: &mut Context, buffer: &mut Buffer>, store: &mut Store, counts: &mut Counts, dst: &mut Codec>, - ) -> Poll<(), io::Error> + ) -> Poll> where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { // Ensure codec is ready - try_ready!(dst.poll_ready()); + ready!(dst.poll_ready(cx))?; // Reclaim any frame that has previously been written self.reclaim_frame(buffer, store, dst); @@ -517,18 +518,18 @@ impl Prioritize { dst.buffer(frame).ok().expect("invalid frame"); // Ensure the codec is ready to try the loop again. - try_ready!(dst.poll_ready()); + ready!(dst.poll_ready(cx))?; // Because, always try to reclaim... self.reclaim_frame(buffer, store, dst); }, None => { // Try to flush the codec. - try_ready!(dst.flush()); + ready!(dst.flush(cx))?; // This might release a data frame... if !self.reclaim_frame(buffer, store, dst) { - return Ok(().into()); + return Poll::Ready(Ok(())) } // No need to poll ready as poll_complete() does this for diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index eb03a68..9c78f7c 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,13 +1,15 @@ +use std::task::Context; use super::*; use crate::{frame, proto}; use crate::codec::{RecvError, UserError}; use crate::frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; use http::{HeaderMap, Response, Request, Method}; -use futures::try_ready; +use futures::ready; use std::io; use std::time::{Duration, Instant}; +use std::task::{Poll, Waker}; #[derive(Debug)] pub(super) struct Recv { @@ -257,15 +259,17 @@ impl Recv { /// Called by the client to get pushed response pub fn poll_pushed( - &mut self, stream: &mut store::Ptr - ) -> Poll, store::Key)>, proto::Error> { + &mut self, + cx: &Context, + stream: &mut store::Ptr + ) -> Poll, store::Key), proto::Error>>> { use super::peer::PollMessage::*; let mut ppp = stream.pending_push_promises.take(); let pushed = ppp.pop(stream.store_mut()).map( |mut pushed| match pushed.pending_recv.pop_front(&mut self.buffer) { Some(Event::Headers(Server(headers))) => - Async::Ready(Some((headers, pushed.key()))), + (headers, pushed.key()), // When frames are pushed into the queue, it is verified that // the first frame is a HEADERS frame. _ => panic!("Headers not set on pushed stream") @@ -273,15 +277,15 @@ impl Recv { ); stream.pending_push_promises = ppp; if let Some(p) = pushed { - Ok(p) + Poll::Ready(Some(Ok(p))) } else { let is_open = stream.state.ensure_recv_open()?; if is_open { - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) + stream.recv_task = Some(cx.waker().clone()); + Poll::Pending } else { - Ok(Async::Ready(None)) + Poll::Ready(None) } } } @@ -289,20 +293,21 @@ impl Recv { /// Called by the client to get the response pub fn poll_response( &mut self, + cx: &Context, stream: &mut store::Ptr, - ) -> Poll, proto::Error> { + ) -> Poll, proto::Error>> { use super::peer::PollMessage::*; // If the buffer is not empty, then the first frame must be a HEADERS // frame or the user violated the contract. match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Headers(Client(response))) => Ok(response.into()), + Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response.into())), Some(_) => panic!("poll_response called after response returned"), None => { stream.state.ensure_recv_open()?; - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) + stream.recv_task = Some(cx.waker().clone()); + Poll::Pending }, } } @@ -339,7 +344,7 @@ impl Recv { pub fn release_connection_capacity( &mut self, capacity: WindowSize, - task: &mut Option, + task: &mut Option, ) { log::trace!( "release_connection_capacity; size={}, connection in_flight_data={}", @@ -355,7 +360,7 @@ impl Recv { if self.flow.unclaimed_capacity().is_some() { if let Some(task) = task.take() { - task.notify(); + task.wake(); } } } @@ -365,7 +370,7 @@ impl Recv { &mut self, capacity: WindowSize, stream: &mut store::Ptr, - task: &mut Option, + task: &mut Option, ) -> Result<(), UserError> { log::trace!("release_capacity; size={}", capacity); @@ -387,7 +392,7 @@ impl Recv { self.pending_window_updates.push(stream); if let Some(task) = task.take() { - task.notify(); + task.wake(); } } @@ -398,7 +403,7 @@ impl Recv { pub fn release_closed_capacity( &mut self, stream: &mut store::Ptr, - task: &mut Option, + task: &mut Option, ) { debug_assert_eq!(stream.ref_count, 0); @@ -433,7 +438,7 @@ impl Recv { /// /// The `task` is an optional parked task for the `Connection` that might /// be blocked on needing more window capacity. - pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option) { + pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option) { log::trace!( "set_target_connection_window; target={}; available={}, reserved={}", target, @@ -458,7 +463,7 @@ impl Recv { // a connection WINDOW_UPDATE. if self.flow.unclaimed_capacity().is_some() { if let Some(task) = task.take() { - task.notify(); + task.wake(); } } } @@ -824,14 +829,15 @@ impl Recv { /// Send any pending refusals. pub fn send_pending_refusal( &mut self, + cx: &mut Context, dst: &mut Codec>, - ) -> Poll<(), io::Error> + ) -> Poll> where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { if let Some(stream_id) = self.refused { - try_ready!(dst.poll_ready()); + ready!(dst.poll_ready(cx))?; // Create the RST_STREAM frame let frame = frame::Reset::new(stream_id, Reason::REFUSED_STREAM); @@ -844,7 +850,7 @@ impl Recv { self.refused = None; - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { @@ -894,37 +900,39 @@ impl Recv { pub fn poll_complete( &mut self, + cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec>, - ) -> Poll<(), io::Error> + ) -> Poll> where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { // Send any pending connection level window updates - try_ready!(self.send_connection_window_update(dst)); + ready!(self.send_connection_window_update(cx, dst))?; // Send any pending stream level window updates - try_ready!(self.send_stream_window_updates(store, counts, dst)); + ready!(self.send_stream_window_updates(cx, store, counts, dst))?; - Ok(().into()) + Poll::Ready(Ok(())) } /// Send connection level window update fn send_connection_window_update( &mut self, + cx: &mut Context, dst: &mut Codec>, - ) -> Poll<(), io::Error> + ) -> Poll> where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { if let Some(incr) = self.flow.unclaimed_capacity() { let frame = frame::WindowUpdate::new(StreamId::zero(), incr); // Ensure the codec has capacity - try_ready!(dst.poll_ready()); + ready!(dst.poll_ready(cx))?; // Buffer the WINDOW_UPDATE frame dst.buffer(frame.into()) @@ -938,28 +946,29 @@ impl Recv { .expect("unexpected flow control state"); } - Ok(().into()) + Poll::Ready(Ok(())) } /// Send stream level window update pub fn send_stream_window_updates( &mut self, + cx: &mut Context, store: &mut Store, counts: &mut Counts, dst: &mut Codec>, - ) -> Poll<(), io::Error> + ) -> Poll> where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { loop { // Ensure the codec has capacity - try_ready!(dst.poll_ready()); + ready!(dst.poll_ready(cx))?; // Get the next stream let stream = match self.pending_window_updates.pop(store) { Some(stream) => stream, - None => return Ok(().into()), + None => return Poll::Ready(Ok(())), }; counts.transition(stream, |_, stream| { @@ -1001,10 +1010,10 @@ impl Recv { self.pending_accept.pop(store).map(|ptr| ptr.key()) } - pub fn poll_data(&mut self, stream: &mut Stream) -> Poll, proto::Error> { + pub fn poll_data(&mut self, cx: &Context, stream: &mut Stream) -> Poll>> { // TODO: Return error when the stream is reset match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Data(payload)) => Ok(Some(payload).into()), + Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))), Some(event) => { // Frame is trailer stream.pending_recv.push_front(&mut self.buffer, event); @@ -1020,36 +1029,37 @@ impl Recv { stream.notify_recv(); // No more data frames - Ok(None.into()) + Poll::Ready(None) }, - None => self.schedule_recv(stream), + None => self.schedule_recv(cx, stream), } } pub fn poll_trailers( &mut self, + cx: &Context, stream: &mut Stream, - ) -> Poll, proto::Error> { + ) -> Poll>> { match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Trailers(trailers)) => Ok(Some(trailers).into()), + Some(Event::Trailers(trailers)) => Poll::Ready(Some(Ok(trailers))), Some(event) => { // Frame is not trailers.. not ready to poll trailers yet. stream.pending_recv.push_front(&mut self.buffer, event); - Ok(Async::NotReady) + Poll::Pending }, - None => self.schedule_recv(stream), + None => self.schedule_recv(cx, stream), } } - fn schedule_recv(&mut self, stream: &mut Stream) -> Poll, proto::Error> { + fn schedule_recv(&mut self, cx: &Context, stream: &mut Stream) -> Poll>> { if stream.state.ensure_recv_open()? { // Request to get notified once more frames arrive - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) + stream.recv_task = Some(cx.waker().clone()); + Poll::Pending } else { // No more frames will be received - Ok(None.into()) + Poll::Ready(None) } } } diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 9bbd043..4a723ce 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -1,14 +1,13 @@ +use super::{ + store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId, + StreamIdOverflow, WindowSize, +}; use crate::codec::{RecvError, UserError}; use crate::frame::{self, Reason}; -use super::{ - store, Buffer, Codec, Config, Counts, Frame, Prioritize, - Prioritized, Store, Stream, StreamId, StreamIdOverflow, WindowSize, -}; use bytes::Buf; use http; -use futures::{Async, Poll}; -use futures::task::Task; +use std::task::{Context, Poll, Waker}; use tokio_io::AsyncWrite; use std::io; @@ -60,7 +59,7 @@ impl Send { buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, - task: &mut Option, + task: &mut Option, ) -> Result<(), UserError> { log::trace!( "send_headers; frame={:?}; init_window={:?}", @@ -81,7 +80,6 @@ impl Send { if te != "trailers" { log::debug!("illegal connection-specific headers found"); return Err(UserError::MalformedHeaders); - } } @@ -103,7 +101,8 @@ impl Send { } // Queue the frame for sending - self.prioritize.queue_frame(frame.into(), buffer, stream, task); + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); Ok(()) } @@ -115,7 +114,7 @@ impl Send { buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, - task: &mut Option, + task: &mut Option, ) { let is_reset = stream.state.is_reset(); let is_closed = stream.state.is_closed(); @@ -125,7 +124,7 @@ impl Send { "send_reset(..., reason={:?}, stream={:?}, ..., \ is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ state={:?} \ - ", + ", reason, stream.id, is_reset, @@ -151,7 +150,7 @@ impl Send { if is_closed && is_empty { log::trace!( " -> not sending explicit RST_STREAM ({:?} was closed \ - and send queue was flushed)", + and send queue was flushed)", stream.id ); return; @@ -166,7 +165,8 @@ impl Send { let frame = frame::Reset::new(stream.id, reason); log::trace!("send_reset -- queueing; frame={:?}", frame); - self.prioritize.queue_frame(frame.into(), buffer, stream, task); + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); self.prioritize.reclaim_all_capacity(stream, counts); } @@ -175,7 +175,7 @@ impl Send { stream: &mut store::Ptr, reason: Reason, counts: &mut Counts, - task: &mut Option, + task: &mut Option, ) { if stream.state.is_closed() { // Stream is already closed, nothing more to do @@ -194,11 +194,13 @@ impl Send { buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, - task: &mut Option, + task: &mut Option, ) -> Result<(), UserError> - where B: Buf, + where + B: Buf, { - self.prioritize.send_data(frame, buffer, stream, counts, task) + self.prioritize + .send_data(frame, buffer, stream, counts, task) } pub fn send_trailers( @@ -207,7 +209,7 @@ impl Send { buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, - task: &mut Option, + task: &mut Option, ) -> Result<(), UserError> { // TODO: Should this logic be moved into state.rs? if !stream.state.is_send_streaming() { @@ -221,7 +223,8 @@ impl Send { stream.state.send_close(); log::trace!("send_trailers -- queuing; frame={:?}", frame); - self.prioritize.queue_frame(frame.into(), buffer, stream, task); + self.prioritize + .queue_frame(frame.into(), buffer, stream, task); // Release any excess capacity self.prioritize.reserve_capacity(0, stream, counts); @@ -231,15 +234,18 @@ impl Send { pub fn poll_complete( &mut self, + cx: &mut Context, buffer: &mut Buffer>, store: &mut Store, counts: &mut Counts, dst: &mut Codec>, - ) -> Poll<(), io::Error> - where T: AsyncWrite, - B: Buf, + ) -> Poll> + where + T: AsyncWrite + Unpin, + B: Buf + Unpin, { - self.prioritize.poll_complete(buffer, store, counts, dst) + self.prioritize + .poll_complete(cx, buffer, store, counts, dst) } /// Request capacity to send data @@ -247,27 +253,28 @@ impl Send { &mut self, capacity: WindowSize, stream: &mut store::Ptr, - counts: &mut Counts) - { + counts: &mut Counts, + ) { self.prioritize.reserve_capacity(capacity, stream, counts) } pub fn poll_capacity( &mut self, + cx: &Context, stream: &mut store::Ptr, - ) -> Poll, UserError> { + ) -> Poll>> { if !stream.state.is_send_streaming() { - return Ok(Async::Ready(None)); + return Poll::Ready(None); } if !stream.send_capacity_inc { - stream.wait_send(); - return Ok(Async::NotReady); + stream.wait_send(cx); + return Poll::Pending; } stream.send_capacity_inc = false; - Ok(Async::Ready(Some(self.capacity(stream)))) + Poll::Ready(Some(Ok(self.capacity(stream)))) } /// Current available stream send capacity @@ -284,15 +291,16 @@ impl Send { pub fn poll_reset( &self, + cx: &Context, stream: &mut Stream, mode: PollReset, - ) -> Poll { + ) -> Poll> { match stream.state.ensure_reason(mode)? { - Some(reason) => Ok(reason.into()), + Some(reason) => Poll::Ready(Ok(reason)), None => { - stream.wait_send(); - Ok(Async::NotReady) - }, + stream.wait_send(cx); + Poll::Pending + } } } @@ -312,14 +320,18 @@ impl Send { buffer: &mut Buffer>, stream: &mut store::Ptr, counts: &mut Counts, - task: &mut Option, + task: &mut Option, ) -> Result<(), Reason> { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { log::debug!("recv_stream_window_update !!; err={:?}", e); self.send_reset( Reason::FLOW_CONTROL_ERROR.into(), - buffer, stream, counts, task); + buffer, + stream, + counts, + task, + ); return Err(e); } @@ -344,7 +356,7 @@ impl Send { buffer: &mut Buffer>, store: &mut Store, counts: &mut Counts, - task: &mut Option, + task: &mut Option, ) -> Result<(), RecvError> { // Applies an update to the remote endpoint's initial window size. // @@ -444,16 +456,14 @@ impl Send { } pub fn ensure_next_stream_id(&self) -> Result { - self.next_stream_id.map_err(|_| UserError::OverflowedStreamId) + self.next_stream_id + .map_err(|_| UserError::OverflowedStreamId) } pub fn may_have_created_stream(&self, id: StreamId) -> bool { if let Ok(next_id) = self.next_stream_id { // Peer::is_local_init should have been called beforehand - debug_assert_eq!( - id.is_server_initiated(), - next_id.is_server_initiated(), - ); + debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),); id < next_id } else { true diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index c677a4a..d3caf5c 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -2,6 +2,7 @@ use super::*; use std::time::Instant; use std::usize; +use std::task::{Context, Waker}; /// Tracks Stream related state /// @@ -47,7 +48,7 @@ pub(super) struct Stream { pub buffered_send_data: WindowSize, /// Task tracking additional send capacity (i.e. window updates). - send_task: Option, + send_task: Option, /// Frames pending for this stream being sent to the socket pub pending_send: buffer::Deque, @@ -96,7 +97,7 @@ pub(super) struct Stream { pub pending_recv: buffer::Deque, /// Task tracking receiving frames - pub recv_task: Option, + pub recv_task: Option, /// The stream's pending push promises pub pending_push_promises: store::Queue, @@ -280,17 +281,17 @@ impl Stream { pub fn notify_send(&mut self) { if let Some(task) = self.send_task.take() { - task.notify(); + task.wake(); } } - pub fn wait_send(&mut self) { - self.send_task = Some(task::current()); + pub fn wait_send(&mut self, cx: &Context) { + self.send_task = Some(cx.waker().clone()); } pub fn notify_recv(&mut self) { if let Some(task) = self.recv_task.take() { - task.notify(); + task.wake(); } } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 7ba818b..59f74aa 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,18 +1,20 @@ -use crate::{client, proto, server}; -use crate::codec::{Codec, RecvError, SendError, UserError}; -use crate::frame::{self, Frame, Reason}; -use crate::proto::{peer, Peer, Open, WindowSize}; -use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; use super::recv::RecvHeaderBlockError; use super::store::{self, Entry, Resolve, Store}; +use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; +use crate::codec::{Codec, RecvError, SendError, UserError}; +use crate::frame::{self, Frame, Reason}; +use crate::proto::{peer, Open, Peer, WindowSize}; +use crate::{client, proto, server}; use bytes::{Buf, Bytes}; -use futures::{task, Async, Poll, try_ready}; +use futures::ready; use http::{HeaderMap, Request, Response}; +use std::task::{Context, Poll, Waker}; use tokio_io::AsyncWrite; -use std::{fmt, io}; +use crate::PollExt; use std::sync::{Arc, Mutex}; +use std::{fmt, io}; #[derive(Debug)] pub(crate) struct Streams @@ -77,7 +79,7 @@ struct Actions { send: Send, /// Task that calls `poll_complete`. - task: Option, + task: Option, /// If the connection errors, a copy is kept for any StreamRefs. conn_error: Option, @@ -93,7 +95,7 @@ struct SendBuffer { impl Streams where - B: Buf, + B: Buf + Unpin, P: Peer, { pub fn new(config: Config) -> Self { @@ -134,7 +136,11 @@ where // The GOAWAY process has begun. All streams with a greater ID than // specified as part of GOAWAY should be ignored. if id > me.actions.recv.max_stream_id() { - log::trace!("id ({:?}) > max_stream_id ({:?}), ignoring HEADERS", id, me.actions.recv.max_stream_id()); + log::trace!( + "id ({:?}) > max_stream_id ({:?}), ignoring HEADERS", + id, + me.actions.recv.max_stream_id() + ); return Ok(()); } @@ -170,10 +176,10 @@ where ); e.insert(stream) - }, + } None => return Ok(()), } - }, + } }; let stream = me.store.resolve(key); @@ -254,15 +260,16 @@ where // The GOAWAY process has begun. All streams with a greater ID // than specified as part of GOAWAY should be ignored. if id > me.actions.recv.max_stream_id() { - log::trace!("id ({:?}) > max_stream_id ({:?}), ignoring DATA", id, me.actions.recv.max_stream_id()); + log::trace!( + "id ({:?}) > max_stream_id ({:?}), ignoring DATA", + id, + me.actions.recv.max_stream_id() + ); return Ok(()); } if me.actions.may_have_forgotten_stream::

(id) { - log::debug!( - "recv_data for old stream={:?}, sending STREAM_CLOSED", - id, - ); + log::debug!("recv_data for old stream={:?}, sending STREAM_CLOSED", id,); let sz = frame.payload().len(); // This should have been enforced at the codec::FramedRead layer, so @@ -279,7 +286,7 @@ where proto_err!(conn: "recv_data: stream not found; id={:?}", id); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); - }, + } }; let actions = &mut me.actions; @@ -294,7 +301,9 @@ where // we won't give the data to the user, and so they can't // release the capacity. We do it automatically. if let Err(RecvError::Stream { .. }) = res { - actions.recv.release_connection_capacity(sz as WindowSize, &mut None); + actions + .recv + .release_connection_capacity(sz as WindowSize, &mut None); } actions.reset_on_recv_stream_err(send_buffer, stream, counts, res) }) @@ -314,7 +323,11 @@ where // The GOAWAY process has begun. All streams with a greater ID than // specified as part of GOAWAY should be ignored. if id > me.actions.recv.max_stream_id() { - log::trace!("id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM", id, me.actions.recv.max_stream_id()); + log::trace!( + "id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM", + id, + me.actions.recv.max_stream_id() + ); return Ok(()); } @@ -327,7 +340,7 @@ where .map_err(RecvError::Connection)?; return Ok(()); - }, + } }; let mut send_buffer = self.send_buffer.inner.lock().unwrap(); @@ -400,14 +413,16 @@ where actions.recv.go_away(last_stream_id); me.store - .for_each(|stream| if stream.id > last_stream_id { - counts.transition(stream, |counts, stream| { - actions.recv.recv_err(&err, &mut *stream); - actions.send.recv_err(send_buffer, stream, counts); + .for_each(|stream| { + if stream.id > last_stream_id { + counts.transition(stream, |counts, stream| { + actions.recv.recv_err(&err, &mut *stream); + actions.send.recv_err(send_buffer, stream, counts); + Ok::<_, ()>(()) + }) + } else { Ok::<_, ()>(()) - }) - } else { - Ok::<_, ()>(()) + } }) .unwrap(); @@ -470,7 +485,11 @@ where // The GOAWAY process has begun. All streams with a greater ID // than specified as part of GOAWAY should be ignored. if id > me.actions.recv.max_stream_id() { - log::trace!("id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE", id, me.actions.recv.max_stream_id()); + log::trace!( + "id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE", + id, + me.actions.recv.max_stream_id() + ); return Ok(()); } @@ -480,8 +499,8 @@ where } None => { proto_err!(conn: "recv_push_promise: initiating stream is in an invalid state"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) - }, + return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + } }; // TODO: Streams in the reserved states do not count towards the concurrency @@ -495,7 +514,12 @@ where // // If `None` is returned, then the stream is being refused. There is no // further work to be done. - if me.actions.recv.open(promised_id, Open::PushPromise, &mut me.counts)?.is_none() { + if me + .actions + .recv + .open(promised_id, Open::PushPromise, &mut me.counts)? + .is_none() + { return Ok(()); } @@ -507,21 +531,26 @@ where Stream::new( promised_id, me.actions.send.init_window_sz(), - me.actions.recv.init_window_sz()) + me.actions.recv.init_window_sz(), + ) }); let actions = &mut me.actions; me.counts.transition(stream, |counts, stream| { - let stream_valid = - actions.recv.recv_push_promise(frame, stream); + let stream_valid = actions.recv.recv_push_promise(frame, stream); match stream_valid { - Ok(()) => - Ok(Some(stream.key())), + Ok(()) => Ok(Some(stream.key())), _ => { let mut send_buffer = self.send_buffer.inner.lock().unwrap(); - actions.reset_on_recv_stream_err(&mut *send_buffer, stream, counts, stream_valid) + actions + .reset_on_recv_stream_err( + &mut *send_buffer, + stream, + counts, + stream_valid, + ) .map(|()| None) } } @@ -549,7 +578,11 @@ where me.refs += 1; key.map(|key| { let stream = &mut me.store.resolve(key); - log::trace!("next_incoming; id={:?}, state={:?}", stream.id, stream.state); + log::trace!( + "next_incoming; id={:?}, state={:?}", + stream.id, + stream.state + ); StreamRef { opaque: OpaqueStreamRef::new(self.inner.clone(), stream), send_buffer: self.send_buffer.clone(), @@ -559,25 +592,33 @@ where pub fn send_pending_refusal( &mut self, + cx: &mut Context, dst: &mut Codec>, - ) -> Poll<(), io::Error> + ) -> Poll> where - T: AsyncWrite, + T: AsyncWrite + Unpin, + B: Unpin, { let mut me = self.inner.lock().unwrap(); let me = &mut *me; - me.actions.recv.send_pending_refusal(dst) + me.actions.recv.send_pending_refusal(cx, dst) } pub fn clear_expired_reset_streams(&mut self) { let mut me = self.inner.lock().unwrap(); let me = &mut *me; - me.actions.recv.clear_expired_reset_streams(&mut me.store, &mut me.counts); + me.actions + .recv + .clear_expired_reset_streams(&mut me.store, &mut me.counts); } - pub fn poll_complete(&mut self, dst: &mut Codec>) -> Poll<(), io::Error> + pub fn poll_complete( + &mut self, + cx: &mut Context, + dst: &mut Codec>, + ) -> Poll> where - T: AsyncWrite, + T: AsyncWrite + Unpin, { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -589,20 +630,21 @@ where // // TODO: It would probably be better to interleave updates w/ data // frames. - try_ready!(me.actions.recv.poll_complete(&mut me.store, &mut me.counts, dst)); + ready!(me + .actions + .recv + .poll_complete(cx, &mut me.store, &mut me.counts, dst))?; // Send any other pending frames - try_ready!(me.actions.send.poll_complete( - send_buffer, - &mut me.store, - &mut me.counts, - dst - )); + ready!(me + .actions + .send + .poll_complete(cx, send_buffer, &mut me.store, &mut me.counts, dst))?; // Nothing else to do, track the task - me.actions.task = Some(task::current()); + me.actions.task = Some(cx.waker().clone()); - Ok(().into()) + Poll::Ready(Ok(())) } pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> { @@ -615,7 +657,12 @@ where me.counts.apply_remote_settings(frame); me.actions.send.apply_remote_settings( - frame, send_buffer, &mut me.store, &mut me.counts, &mut me.actions.task) + frame, + send_buffer, + &mut me.store, + &mut me.counts, + &mut me.actions.task, + ) } pub fn send_request( @@ -624,8 +671,8 @@ where end_of_stream: bool, pending: Option<&OpaqueStreamRef>, ) -> Result, SendError> { - use http::Method; use super::stream::ContentLength; + use http::Method; // TODO: There is a hazard with assigning a stream ID before the // prioritize layer. If prioritization reorders new streams, this @@ -671,8 +718,7 @@ where } // Convert the message - let headers = client::Peer::convert_send_message( - stream_id, request, end_of_stream)?; + let headers = client::Peer::convert_send_message(stream_id, request, end_of_stream)?; let mut stream = me.store.insert(stream.id, stream); @@ -701,10 +747,7 @@ where me.refs += 1; Ok(StreamRef { - opaque: OpaqueStreamRef::new( - self.inner.clone(), - &mut stream, - ), + opaque: OpaqueStreamRef::new(self.inner.clone(), &mut stream), send_buffer: self.send_buffer.clone(), }) } @@ -719,13 +762,14 @@ where let stream = Stream::new(id, 0, 0); e.insert(stream) - }, + } }; let stream = me.store.resolve(key); let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - me.actions.send_reset(stream, reason, &mut me.counts, send_buffer); + me.actions + .send_reset(stream, reason, &mut me.counts, send_buffer); } pub fn send_go_away(&mut self, last_processed_id: StreamId) { @@ -740,7 +784,11 @@ impl Streams where B: Buf, { - pub fn poll_pending_open(&mut self, pending: Option<&OpaqueStreamRef>) -> Poll<(), crate::Error> { + pub fn poll_pending_open( + &mut self, + cx: &Context, + pending: Option<&OpaqueStreamRef>, + ) -> Poll> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -751,11 +799,11 @@ where let mut stream = me.store.resolve(pending.key); log::trace!("poll_pending_open; stream = {:?}", stream.is_pending_open); if stream.is_pending_open { - stream.wait_send(); - return Ok(Async::NotReady); + stream.wait_send(cx); + return Poll::Pending; } } - Ok(().into()) + Poll::Ready(Ok(())) } } @@ -845,7 +893,6 @@ where } } - // ===== impl StreamRef ===== impl StreamRef { @@ -867,12 +914,9 @@ impl StreamRef { frame.set_end_stream(end_stream); // Send the data frame - actions.send.send_data( - frame, - send_buffer, - stream, - counts, - &mut actions.task) + actions + .send + .send_data(frame, send_buffer, stream, counts, &mut actions.task) }) } @@ -890,8 +934,9 @@ impl StreamRef { let frame = frame::Headers::trailers(stream.id, trailers); // Send the trailers frame - actions.send.send_trailers( - frame, send_buffer, stream, counts, &mut actions.task) + actions + .send + .send_trailers(frame, send_buffer, stream, counts, &mut actions.task) }) } @@ -903,7 +948,8 @@ impl StreamRef { let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - me.actions.send_reset(stream, reason, &mut me.counts, send_buffer); + me.actions + .send_reset(stream, reason, &mut me.counts, send_buffer); } pub fn send_response( @@ -922,8 +968,9 @@ impl StreamRef { me.counts.transition(stream, |counts, stream| { let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream); - actions.send.send_headers( - frame, send_buffer, stream, counts, &mut actions.task) + actions + .send + .send_headers(frame, send_buffer, stream, counts, &mut actions.task) }) } @@ -955,7 +1002,9 @@ impl StreamRef { let mut stream = me.store.resolve(self.opaque.key); - me.actions.send.reserve_capacity(capacity, &mut stream, &mut me.counts) + me.actions + .send + .reserve_capacity(capacity, &mut stream, &mut me.counts) } /// Returns the stream's current send capacity. @@ -969,28 +1018,35 @@ impl StreamRef { } /// Request to be notified when the stream's capacity increases - pub fn poll_capacity(&mut self) -> Poll, UserError> { + pub fn poll_capacity(&mut self, cx: &Context) -> Poll>> { let mut me = self.opaque.inner.lock().unwrap(); let me = &mut *me; let mut stream = me.store.resolve(self.opaque.key); - me.actions.send.poll_capacity(&mut stream) + me.actions.send.poll_capacity(cx, &mut stream) } /// Request to be notified for if a `RST_STREAM` is received for this stream. - pub(crate) fn poll_reset(&mut self, mode: proto::PollReset) -> Poll { + pub(crate) fn poll_reset( + &mut self, + cx: &Context, + mode: proto::PollReset, + ) -> Poll> { let mut me = self.opaque.inner.lock().unwrap(); let me = &mut *me; let mut stream = me.store.resolve(self.opaque.key); - me.actions.send.poll_reset(&mut stream, mode) + me.actions + .send + .poll_reset(cx, &mut stream, mode) .map_err(From::from) } pub fn clone_to_opaque(&self) -> OpaqueStreamRef - where B: 'static, + where + B: 'static, { self.opaque.clone() } @@ -1015,35 +1071,37 @@ impl OpaqueStreamRef { fn new(inner: Arc>, stream: &mut store::Ptr) -> OpaqueStreamRef { stream.ref_inc(); OpaqueStreamRef { - inner, key: stream.key() + inner, + key: stream.key(), } } /// Called by a client to check for a received response. - pub fn poll_response(&mut self) -> Poll, proto::Error> { + pub fn poll_response(&mut self, cx: &Context) -> Poll, proto::Error>> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let mut stream = me.store.resolve(self.key); - me.actions.recv.poll_response(&mut stream) + me.actions.recv.poll_response(cx, &mut stream) } /// Called by a client to check for a pushed request. pub fn poll_pushed( - &mut self - ) -> Poll, OpaqueStreamRef)>, proto::Error> { + &mut self, + cx: &Context, + ) -> Poll, OpaqueStreamRef), proto::Error>>> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; - let res = { - let mut stream = me.store.resolve(self.key); - try_ready!(me.actions.recv.poll_pushed(&mut stream)) - }; - Ok(Async::Ready(res.map(|(h, key)| { - me.refs += 1; - let opaque_ref = - OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key)); - (h, opaque_ref) - }))) + let mut stream = me.store.resolve(self.key); + me.actions + .recv + .poll_pushed(cx, &mut stream) + .map_ok_(|(h, key)| { + me.refs += 1; + let opaque_ref = + OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key)); + (h, opaque_ref) + }) } pub fn body_is_empty(&self) -> bool { @@ -1064,22 +1122,22 @@ impl OpaqueStreamRef { me.actions.recv.is_end_stream(&stream) } - pub fn poll_data(&mut self) -> Poll, proto::Error> { + pub fn poll_data(&mut self, cx: &Context) -> Poll>> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let mut stream = me.store.resolve(self.key); - me.actions.recv.poll_data(&mut stream) + me.actions.recv.poll_data(cx, &mut stream) } - pub fn poll_trailers(&mut self) -> Poll, proto::Error> { + pub fn poll_trailers(&mut self, cx: &Context) -> Poll>> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let mut stream = me.store.resolve(self.key); - me.actions.recv.poll_trailers(&mut stream) + me.actions.recv.poll_trailers(cx, &mut stream) } /// Releases recv capacity back to the peer. This may result in sending @@ -1101,16 +1159,11 @@ impl OpaqueStreamRef { let mut stream = me.store.resolve(self.key); - me.actions - .recv - .clear_recv_buffer(&mut stream); + me.actions.recv.clear_recv_buffer(&mut stream); } pub fn stream_id(&self) -> StreamId { - self.inner.lock() - .unwrap() - .store[self.key] - .id + self.inner.lock().unwrap().store[self.key].id } } @@ -1125,17 +1178,15 @@ impl fmt::Debug for OpaqueStreamRef { .field("stream_id", &stream.id) .field("ref_count", &stream.ref_count) .finish() - }, - Err(Poisoned(_)) => { - fmt.debug_struct("OpaqueStreamRef") - .field("inner", &"") - .finish() - } - Err(WouldBlock) => { - fmt.debug_struct("OpaqueStreamRef") - .field("inner", &"") - .finish() } + Err(Poisoned(_)) => fmt + .debug_struct("OpaqueStreamRef") + .field("inner", &"") + .finish(), + Err(WouldBlock) => fmt + .debug_struct("OpaqueStreamRef") + .field("inner", &"") + .finish(), } } } @@ -1164,12 +1215,14 @@ impl Drop for OpaqueStreamRef { fn drop_stream_ref(inner: &Mutex, key: store::Key) { let mut me = match inner.lock() { Ok(inner) => inner, - Err(_) => if ::std::thread::panicking() { - log::trace!("StreamRef::drop; mutex poisoned"); - return; - } else { - panic!("StreamRef::drop; mutex poisoned"); - }, + Err(_) => { + if ::std::thread::panicking() { + log::trace!("StreamRef::drop; mutex poisoned"); + return; + } else { + panic!("StreamRef::drop; mutex poisoned"); + } + } }; let me = &mut *me; @@ -1189,19 +1242,19 @@ fn drop_stream_ref(inner: &Mutex, key: store::Key) { // (connection) so that it can close properly if stream.ref_count == 0 && stream.is_closed() { if let Some(task) = actions.task.take() { - task.notify(); + task.wake(); } } - me.counts.transition(stream, |counts, stream| { maybe_cancel(stream, actions, counts); if stream.ref_count == 0 { - // Release any recv window back to connection, no one can access // it anymore. - actions.recv.release_closed_capacity(stream, &mut actions.task); + actions + .recv + .release_closed_capacity(stream, &mut actions.task); // We won't be able to reach our push promises anymore let mut ppp = stream.pending_push_promises.take(); @@ -1216,11 +1269,9 @@ fn drop_stream_ref(inner: &Mutex, key: store::Key) { fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Counts) { if stream.is_canceled_interest() { - actions.send.schedule_implicit_reset( - stream, - Reason::CANCEL, - counts, - &mut actions.task); + actions + .send + .schedule_implicit_reset(stream, Reason::CANCEL, counts, &mut actions.task); actions.recv.enqueue_reset_expiration(stream, counts); } } @@ -1245,8 +1296,8 @@ impl Actions { send_buffer: &mut Buffer>, ) { counts.transition(stream, |counts, stream| { - self.send.send_reset( - reason, send_buffer, stream, counts, &mut self.task); + self.send + .send_reset(reason, send_buffer, stream, counts, &mut self.task); self.recv.enqueue_reset_expiration(stream, counts); // if a RecvStream is parked, ensure it's notified stream.notify_recv(); @@ -1260,12 +1311,10 @@ impl Actions { counts: &mut Counts, res: Result<(), RecvError>, ) -> Result<(), RecvError> { - if let Err(RecvError::Stream { - reason, .. - }) = res - { + if let Err(RecvError::Stream { reason, .. }) = res { // Reset the stream. - self.send.send_reset(reason, buffer, stream, counts, &mut self.task); + self.send + .send_reset(reason, buffer, stream, counts, &mut self.task); Ok(()) } else { res @@ -1308,11 +1357,7 @@ impl Actions { } } - fn clear_queues(&mut self, - clear_pending_accept: bool, - store: &mut Store, - counts: &mut Counts) - { + fn clear_queues(&mut self, clear_pending_accept: bool, store: &mut Store, counts: &mut Counts) { self.recv.clear_queues(clear_pending_accept, store, counts); self.send.clear_queues(store, counts); } diff --git a/src/server.rs b/src/server.rs index 3f6d825..8a1c6fc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -64,50 +64,45 @@ //! will use the HTTP/2.0 protocol without prior negotiation. //! //! ```rust -//! use futures::{Future, Stream}; -//! # use futures::future::ok; +//! #![feature(async_await)] +//! use futures::StreamExt; //! use h2::server; //! use http::{Response, StatusCode}; //! use tokio::net::TcpListener; //! -//! pub fn main () { +//! #[tokio::main] +//! pub async fn main () { //! let addr = "127.0.0.1:5928".parse().unwrap(); //! let listener = TcpListener::bind(&addr,).unwrap(); //! -//! tokio::run({ -//! // Accept all incoming TCP connections. -//! listener.incoming().for_each(move |socket| { -//! // Spawn a new task to process each connection. -//! tokio::spawn({ -//! // Start the HTTP/2.0 connection handshake -//! server::handshake(socket) -//! .and_then(|h2| { -//! // Accept all inbound HTTP/2.0 streams sent over the -//! // connection. -//! h2.for_each(|(request, mut respond)| { -//! println!("Received request: {:?}", request); +//! // Accept all incoming TCP connections. +//! let mut incoming = listener.incoming(); +//! # futures::future::select(Box::pin(async { +//! while let Some(socket) = incoming.next().await { +//! // Spawn a new task to process each connection. +//! tokio::spawn(async { +//! // Start the HTTP/2.0 connection handshake +//! let mut h2 = server::handshake(socket.unwrap()).await.unwrap(); +//! // Accept all inbound HTTP/2.0 streams sent over the +//! // connection. +//! while let Some(request) = h2.next().await { +//! let (request, mut respond) = request.unwrap(); +//! println!("Received request: {:?}", request); //! -//! // Build a response with no body -//! let response = Response::builder() -//! .status(StatusCode::OK) -//! .body(()) -//! .unwrap(); +//! // Build a response with no body +//! let response = Response::builder() +//! .status(StatusCode::OK) +//! .body(()) +//! .unwrap(); //! -//! // Send the response back to the client -//! respond.send_response(response, true) -//! .unwrap(); +//! // Send the response back to the client +//! respond.send_response(response, true) +//! .unwrap(); +//! } //! -//! Ok(()) -//! }) -//! }) -//! .map_err(|e| panic!("unexpected error = {:?}", e)) -//! }); -//! -//! Ok(()) -//! }) -//! .map_err(|e| panic!("failed to run HTTP/2.0 server: {:?}", e)) -//! # .select(ok(())).map(|_|()).map_err(|_|()) -//! }); +//! }); +//! } +//! # }), Box::pin(async {})).await; //! } //! ``` //! @@ -124,17 +119,20 @@ //! [`SendStream`]: ../struct.SendStream.html //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html -use crate::{SendStream, RecvStream, ReleaseCapacity, PingPong}; use crate::codec::{Codec, RecvError}; use crate::frame::{self, Pseudo, Reason, Settings, StreamId}; use crate::proto::{self, Config, Prioritized}; +use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream}; use bytes::{Buf, Bytes, IntoBuf}; -use futures::{self, Async, Future, Poll, try_ready}; +use futures::ready; use http::{HeaderMap, Request, Response}; -use std::{convert, fmt, io, mem}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; -use tokio_io::{AsyncRead, AsyncWrite, try_nb}; +use std::{convert, fmt, io, mem}; +use tokio_io::{AsyncRead, AsyncWrite}; /// In progress HTTP/2.0 connection handshake future. /// @@ -155,7 +153,7 @@ pub struct Handshake { /// The config to pass to Connection::new after handshake succeeds. builder: Builder, /// The current state of the handshake. - state: Handshaking + state: Handshaking, } /// Accepts inbound HTTP/2.0 streams on a connection. @@ -179,21 +177,19 @@ pub struct Handshake { /// # Examples /// /// ``` -/// # use futures::{Future, Stream}; +/// #![feature(async_await)] +/// # use futures::StreamExt; /// # use tokio_io::*; /// # use h2::server; /// # use h2::server::*; /// # -/// # fn doc(my_io: T) { -/// server::handshake(my_io) -/// .and_then(|server| { -/// server.for_each(|(request, respond)| { -/// // Process the request and send the response back to the client -/// // using `respond`. -/// # Ok(()) -/// }) -/// }) -/// # .wait().unwrap(); +/// # async fn doc(my_io: T) { +/// let mut server = server::handshake(my_io).await.unwrap(); +/// while let Some(request) = server.next().await { +/// let (request, respond) = request.unwrap(); +/// // Process the request and send the response back to the client +/// // using `respond`. +/// } /// # } /// # /// # pub fn main() {} @@ -224,7 +220,7 @@ pub struct Connection { /// # use tokio_io::*; /// # use h2::server::*; /// # -/// # fn doc(my_io: T) +/// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -318,26 +314,23 @@ const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; /// # Examples /// /// ``` +/// #![feature(async_await)] /// # use tokio_io::*; -/// # use futures::*; /// # use h2::server; /// # use h2::server::*; /// # -/// # fn doc(my_io: T) +/// # async fn doc(my_io: T) /// # { -/// server::handshake(my_io) -/// .and_then(|connection| { -/// // The HTTP/2.0 handshake has completed, now use `connection` to -/// // accept inbound HTTP/2.0 streams. -/// # Ok(()) -/// }) -/// # .wait().unwrap(); +/// let connection = server::handshake(my_io).await.unwrap(); +/// // The HTTP/2.0 handshake has completed, now use `connection` to +/// // accept inbound HTTP/2.0 streams. /// # } /// # /// # pub fn main() {} /// ``` pub fn handshake(io: T) -> Handshake -where T: AsyncRead + AsyncWrite, +where + T: AsyncRead + AsyncWrite + Unpin, { Builder::new().handshake(io) } @@ -346,8 +339,9 @@ where T: AsyncRead + AsyncWrite, impl Connection where - T: AsyncRead + AsyncWrite, - B: IntoBuf, + T: AsyncRead + AsyncWrite + Unpin, + B: IntoBuf + Unpin, + B::Buf: Unpin, { fn handshake2(io: T, builder: Builder) -> Handshake { // Create the codec. @@ -407,11 +401,14 @@ where /// [`poll`]: struct.Connection.html#method.poll /// [`RecvStream`]: ../struct.RecvStream.html /// [`SendStream`]: ../struct.SendStream.html - pub fn poll_close(&mut self) -> Poll<(), crate::Error> { - self.connection.poll().map_err(Into::into) + pub fn poll_close(&mut self, cx: &mut Context) -> Poll> { + self.connection.poll(cx).map_err(Into::into) } - #[deprecated(note="use abrupt_shutdown or graceful_shutdown instead", since="0.1.4")] + #[deprecated( + note = "use abrupt_shutdown or graceful_shutdown instead", + since = "0.1.4" + )] #[doc(hidden)] pub fn close_connection(&mut self) { self.graceful_shutdown(); @@ -453,31 +450,28 @@ where /// /// This may only be called once. Calling multiple times will return `None`. pub fn ping_pong(&mut self) -> Option { - self.connection - .take_user_pings() - .map(PingPong::new) + self.connection.take_user_pings().map(PingPong::new) } } impl futures::Stream for Connection where - T: AsyncRead + AsyncWrite, - B: IntoBuf, - B::Buf: 'static, + T: AsyncRead + AsyncWrite + Unpin, + B: IntoBuf + Unpin, + B::Buf: Unpin + 'static, { - type Item = (Request, SendResponse); - type Error = crate::Error; + type Item = Result<(Request, SendResponse), crate::Error>; - fn poll(&mut self) -> Poll, crate::Error> { - // Always try to advance the internal state. Getting NotReady also is - // needed to allow this function to return NotReady. - match self.poll_close()? { - Async::Ready(_) => { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Always try to advance the internal state. Getting Pending also is + // needed to allow this function to return Pending. + match self.poll_close(cx)? { + Poll::Ready(_) => { // If the socket is closed, don't return anything // TODO: drop any pending streams - return Ok(None.into()); - }, - _ => {}, + return Poll::Ready(None); + } + _ => {} } if let Some(inner) = self.connection.next_incoming() { @@ -488,10 +482,10 @@ where let request = Request::from_parts(head, body); let respond = SendResponse { inner }; - return Ok(Some((request, respond)).into()); + return Poll::Ready(Some(Ok((request, respond)))); } - Ok(Async::NotReady) + Poll::Pending } } @@ -522,7 +516,7 @@ impl Builder { /// # use tokio_io::*; /// # use h2::server::*; /// # - /// # fn doc(my_io: T) + /// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -561,7 +555,7 @@ impl Builder { /// # use tokio_io::*; /// # use h2::server::*; /// # - /// # fn doc(my_io: T) + /// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -595,7 +589,7 @@ impl Builder { /// # use tokio_io::*; /// # use h2::server::*; /// # - /// # fn doc(my_io: T) + /// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -628,7 +622,7 @@ impl Builder { /// # use tokio_io::*; /// # use h2::server::*; /// # - /// # fn doc(my_io: T) + /// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -667,7 +661,7 @@ impl Builder { /// # use tokio_io::*; /// # use h2::server::*; /// # - /// # fn doc(my_io: T) + /// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -715,7 +709,7 @@ impl Builder { /// # use tokio_io::*; /// # use h2::server::*; /// # - /// # fn doc(my_io: T) + /// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -761,7 +755,7 @@ impl Builder { /// # use tokio_io::*; /// # use h2::server::*; /// # - /// # fn doc(my_io: T) + /// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -808,7 +802,7 @@ impl Builder { /// # use h2::server::*; /// # use std::time::Duration; /// # - /// # fn doc(my_io: T) + /// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -850,7 +844,7 @@ impl Builder { /// # use tokio_io::*; /// # use h2::server::*; /// # - /// # fn doc(my_io: T) + /// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -870,7 +864,7 @@ impl Builder { /// # use tokio_io::*; /// # use h2::server::*; /// # - /// # fn doc(my_io: T) + /// # fn doc(my_io: T) /// # -> Handshake /// # { /// // `server_fut` is a future representing the completion of the HTTP/2.0 @@ -884,9 +878,9 @@ impl Builder { /// ``` pub fn handshake(&self, io: T) -> Handshake where - T: AsyncRead + AsyncWrite, - B: IntoBuf, - B::Buf: 'static, + T: AsyncRead + AsyncWrite + Unpin, + B: IntoBuf + Unpin, + B::Buf: Unpin + 'static, { Connection::handshake2(io, self.clone()) } @@ -949,7 +943,7 @@ impl SendResponse { /// Polls to be notified when the client resets this stream. /// - /// If stream is still open, this returns `Ok(Async::NotReady)`, and + /// If stream is still open, this returns `Poll::Pending`, and /// registers the task to be notified if a `RST_STREAM` is received. /// /// If a `RST_STREAM` frame is received for this stream, calling this @@ -959,8 +953,8 @@ impl SendResponse { /// /// Calling this method after having called `send_response` will return /// a user error. - pub fn poll_reset(&mut self) -> Poll { - self.inner.poll_reset(proto::PollReset::AwaitingHeaders) + pub fn poll_reset(&mut self, cx: &mut Context) -> Poll> { + self.inner.poll_reset(cx, proto::PollReset::AwaitingHeaders) } /// Returns the stream ID of the response stream. @@ -979,26 +973,23 @@ impl SendResponse { impl Flush { fn new(codec: Codec) -> Self { - Flush { - codec: Some(codec), - } + Flush { codec: Some(codec) } } } impl Future for Flush where - T: AsyncWrite, - B: Buf, + T: AsyncWrite + Unpin, + B: Buf + Unpin, { - type Item = Codec; - type Error = crate::Error; + type Output = Result, crate::Error>; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Flush the codec - try_ready!(self.codec.as_mut().unwrap().flush()); + ready!(self.codec.as_mut().unwrap().flush(cx))?; // Return the codec - Ok(Async::Ready(self.codec.take().unwrap())) + Poll::Ready(Ok(self.codec.take().unwrap())) } } @@ -1017,49 +1008,50 @@ impl ReadPreface { impl Future for ReadPreface where - T: AsyncRead, - B: Buf, + T: AsyncRead + Unpin, + B: Buf + Unpin, { - type Item = Codec; - type Error = crate::Error; + type Output = Result, crate::Error>; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut buf = [0; 24]; let mut rem = PREFACE.len() - self.pos; while rem > 0 { - let n = try_nb!(self.inner_mut().read(&mut buf[..rem])); + let n = ready!(Pin::new(self.inner_mut()).poll_read(cx, &mut buf[..rem]))?; if n == 0 { - return Err(io::Error::new( + return Poll::Ready(Err(io::Error::new( io::ErrorKind::ConnectionReset, "connection closed unexpectedly", - ).into()); + ) + .into())); } if PREFACE[self.pos..self.pos + n] != buf[..n] { proto_err!(conn: "read_preface: invalid preface"); // TODO: Should this just write the GO_AWAY frame directly? - return Err(Reason::PROTOCOL_ERROR.into()); + return Poll::Ready(Err(Reason::PROTOCOL_ERROR.into())); } self.pos += n; rem -= n; // TODO test } - Ok(Async::Ready(self.codec.take().unwrap())) + Poll::Ready(Ok(self.codec.take().unwrap())) } } // ===== impl Handshake ===== impl Future for Handshake - where T: AsyncRead + AsyncWrite, - B: IntoBuf, +where + T: AsyncRead + AsyncWrite + Unpin, + B: IntoBuf + Unpin, + B::Buf: Unpin, { - type Item = Connection; - type Error = crate::Error; + type Output = Result, crate::Error>; - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { log::trace!("Handshake::poll(); state={:?};", self.state); use crate::server::Handshaking::*; @@ -1067,12 +1059,12 @@ impl Future for Handshake // We're currently flushing a pending SETTINGS frame. Poll the // flush future, and, if it's completed, advance our state to wait // for the client preface. - let codec = match flush.poll()? { - Async::NotReady => { - log::trace!("Handshake::poll(); flush.poll()=NotReady"); - return Ok(Async::NotReady); - }, - Async::Ready(flushed) => { + let codec = match Pin::new(flush).poll(cx)? { + Poll::Pending => { + log::trace!("Handshake::poll(); flush.poll()=Pending"); + return Poll::Pending; + } + Poll::Ready(flushed) => { log::trace!("Handshake::poll(); flush.poll()=Ready"); flushed } @@ -1089,38 +1081,41 @@ impl Future for Handshake // We're now waiting for the client preface. Poll the `ReadPreface` // future. If it has completed, we will create a `Connection` handle // for the connection. - read.poll() - // Actually creating the `Connection` has to occur outside of this - // `if let` block, because we've borrowed `self` mutably in order - // to poll the state and won't be able to borrow the SETTINGS frame - // as well until we release the borrow for `poll()`. + Pin::new(read).poll(cx) + // Actually creating the `Connection` has to occur outside of this + // `if let` block, because we've borrowed `self` mutably in order + // to poll the state and won't be able to borrow the SETTINGS frame + // as well until we release the borrow for `poll()`. } else { unreachable!("Handshake::poll() state was not advanced completely!") }; - let server = poll?.map(|codec| { - let connection = proto::Connection::new(codec, Config { - next_stream_id: 2.into(), - // Server does not need to locally initiate any streams - initial_max_send_streams: 0, - reset_stream_duration: self.builder.reset_stream_duration, - reset_stream_max: self.builder.reset_stream_max, - settings: self.builder.settings.clone(), - }); + poll?.map(|codec| { + let connection = proto::Connection::new( + codec, + Config { + next_stream_id: 2.into(), + // Server does not need to locally initiate any streams + initial_max_send_streams: 0, + reset_stream_duration: self.builder.reset_stream_duration, + reset_stream_max: self.builder.reset_stream_max, + settings: self.builder.settings.clone(), + }, + ); log::trace!("Handshake::poll(); connection established!"); let mut c = Connection { connection }; if let Some(sz) = self.builder.initial_target_connection_window_size { c.set_target_window_size(sz); } - c - }); - Ok(server) + Ok(c) + }) } } impl fmt::Debug for Handshake - where T: AsyncRead + AsyncWrite + fmt::Debug, - B: fmt::Debug + IntoBuf, +where + T: AsyncRead + AsyncWrite + fmt::Debug, + B: fmt::Debug + IntoBuf, { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "server::Handshake") @@ -1131,16 +1126,14 @@ impl Peer { pub fn convert_send_message( id: StreamId, response: Response<()>, - end_of_stream: bool) -> frame::Headers - { + end_of_stream: bool, + ) -> frame::Headers { use http::response::Parts; // Extract the components of the HTTP request let ( Parts { - status, - headers, - .. + status, headers, .. }, _, ) = response.into_parts(); @@ -1172,7 +1165,9 @@ impl proto::Peer for Peer { } fn convert_poll_message( - pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId + pseudo: Pseudo, + fields: HeaderMap, + stream_id: StreamId, ) -> Result { use http::{uri, Version}; @@ -1205,23 +1200,29 @@ impl proto::Peer for Peer { // Convert the URI let mut parts = uri::Parts::default(); - // A request translated from HTTP/1 must not include the :authority // header if let Some(authority) = pseudo.authority { let maybe_authority = uri::Authority::from_shared(authority.clone().into_inner()); - parts.authority = Some(maybe_authority.or_else(|why| malformed!( - "malformed headers: malformed authority ({:?}): {}", authority, why, - ))?); - + parts.authority = Some(maybe_authority.or_else(|why| { + malformed!( + "malformed headers: malformed authority ({:?}): {}", + authority, + why, + ) + })?); } // A :scheme is always required. if let Some(scheme) = pseudo.scheme { let maybe_scheme = uri::Scheme::from_shared(scheme.clone().into_inner()); - let scheme = maybe_scheme.or_else(|why| malformed!( - "malformed headers: malformed scheme ({:?}): {}", scheme, why, - ))?; + let scheme = maybe_scheme.or_else(|why| { + malformed!( + "malformed headers: malformed scheme ({:?}): {}", + scheme, + why, + ) + })?; // It's not possible to build an `Uri` from a scheme and path. So, // after validating is was a valid scheme, we just have to drop it @@ -1240,9 +1241,9 @@ impl proto::Peer for Peer { } let maybe_path = uri::PathAndQuery::from_shared(path.clone().into_inner()); - parts.path_and_query = Some(maybe_path.or_else(|why| malformed!( - "malformed headers: malformed path ({:?}): {}", path, why, - ))?); + parts.path_and_query = Some(maybe_path.or_else(|why| { + malformed!("malformed headers: malformed path ({:?}): {}", path, why,) + })?); } b.uri(parts); @@ -1257,7 +1258,7 @@ impl proto::Peer for Peer { id: stream_id, reason: Reason::PROTOCOL_ERROR, }); - }, + } }; *request.headers_mut() = fields; @@ -1270,18 +1271,15 @@ impl proto::Peer for Peer { impl fmt::Debug for Handshaking where - B: IntoBuf + B: IntoBuf, { - #[inline] fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { match *self { - Handshaking::Flushing(_) => - write!(f, "Handshaking::Flushing(_)"), - Handshaking::ReadingPreface(_) => - write!(f, "Handshaking::ReadingPreface(_)"), - Handshaking::Empty => - write!(f, "Handshaking::Empty"), + Handshaking::Flushing(_) => write!(f, "Handshaking::Flushing(_)"), + Handshaking::ReadingPreface(_) => write!(f, "Handshaking::ReadingPreface(_)"), + Handshaking::Empty => write!(f, "Handshaking::Empty"), } - } } @@ -1290,18 +1288,19 @@ where T: AsyncRead + AsyncWrite, B: IntoBuf, { - #[inline] fn from(flush: Flush>) -> Self { + #[inline] + fn from(flush: Flush>) -> Self { Handshaking::Flushing(flush) } } -impl convert::From>> for - Handshaking +impl convert::From>> for Handshaking where T: AsyncRead + AsyncWrite, B: IntoBuf, { - #[inline] fn from(read: ReadPreface>) -> Self { + #[inline] + fn from(read: ReadPreface>) -> Self { Handshaking::ReadingPreface(read) } } @@ -1311,7 +1310,8 @@ where T: AsyncRead + AsyncWrite, B: IntoBuf, { - #[inline] fn from(codec: Codec>) -> Self { + #[inline] + fn from(codec: Codec>) -> Self { Handshaking::from(Flush::new(codec)) } } diff --git a/src/share.rs b/src/share.rs index 2d6df07..9f28186 100644 --- a/src/share.rs +++ b/src/share.rs @@ -3,10 +3,13 @@ use crate::frame::Reason; use crate::proto::{self, WindowSize}; use bytes::{Bytes, IntoBuf}; -use futures::{self, Poll, Async, try_ready}; -use http::{HeaderMap}; +use http::HeaderMap; +use crate::PollExt; +use futures::ready; use std::fmt; +use std::pin::Pin; +use std::task::{Context, Poll}; /// Sends the body stream and trailers to the remote peer. /// @@ -264,11 +267,12 @@ impl SendStream { /// is sent. For example: /// /// ```rust + /// #![feature(async_await)] /// # use h2::*; - /// # fn doc(mut send_stream: SendStream<&'static [u8]>) { + /// # async fn doc(mut send_stream: SendStream<&'static [u8]>) { /// send_stream.reserve_capacity(100); /// - /// let capacity = send_stream.poll_capacity(); + /// let capacity = futures::future::poll_fn(|cx| send_stream.poll_capacity(cx)).await; /// // capacity == 5; /// /// send_stream.send_data(b"hello", false).unwrap(); @@ -309,9 +313,11 @@ impl SendStream { /// amount of assigned capacity at that point in time. It is also possible /// that `n` is lower than the previous call if, since then, the caller has /// sent data. - pub fn poll_capacity(&mut self) -> Poll, crate::Error> { - let res = try_ready!(self.inner.poll_capacity()); - Ok(Async::Ready(res.map(|v| v as usize))) + pub fn poll_capacity(&mut self, cx: &mut Context) -> Poll>> { + self.inner + .poll_capacity(cx) + .map_ok_(|w| w as usize) + .map_err_(Into::into) } /// Sends a single data frame to the remote peer. @@ -356,7 +362,7 @@ impl SendStream { /// Polls to be notified when the client resets this stream. /// - /// If stream is still open, this returns `Ok(Async::NotReady)`, and + /// If stream is still open, this returns `Poll::Pending`, and /// registers the task to be notified if a `RST_STREAM` is received. /// /// If a `RST_STREAM` frame is received for this stream, calling this @@ -366,8 +372,8 @@ impl SendStream { /// /// If connection sees an error, this returns that error instead of a /// `Reason`. - pub fn poll_reset(&mut self) -> Poll { - self.inner.poll_reset(proto::PollReset::Streaming) + pub fn poll_reset(&mut self, cx: &mut Context) -> Poll> { + self.inner.poll_reset(cx, proto::PollReset::Streaming) } /// Returns the stream ID of this `SendStream`. @@ -417,8 +423,11 @@ impl RecvStream { } /// Returns received trailers. - pub fn poll_trailers(&mut self) -> Poll, crate::Error> { - self.inner.inner.poll_trailers().map_err(Into::into) + pub fn poll_trailers( + &mut self, + cx: &mut Context, + ) -> Poll>> { + self.inner.inner.poll_trailers(cx).map_err_(Into::into) } /// Returns the stream ID of this stream. @@ -432,11 +441,10 @@ impl RecvStream { } impl futures::Stream for RecvStream { - type Item = Bytes; - type Error = crate::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.inner.poll_data().map_err(Into::into) + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.inner.poll_data(cx).map_err_(Into::into) } } @@ -514,9 +522,7 @@ impl Clone for ReleaseCapacity { impl PingPong { pub(crate) fn new(inner: proto::UserPings) -> Self { - PingPong { - inner, - } + PingPong { inner } } /// Send a `PING` frame to the peer. @@ -540,12 +546,10 @@ impl PingPong { // just drop it. drop(ping); - self.inner - .send_ping() - .map_err(|err| match err { - Some(err) => err.into(), - None => UserError::SendPingWhilePending.into() - }) + self.inner.send_ping().map_err(|err| match err { + Some(err) => err.into(), + None => UserError::SendPingWhilePending.into(), + }) } /// Polls for the acknowledgement of a previously [sent][] `PING` frame. @@ -553,8 +557,8 @@ impl PingPong { /// # Example /// /// ``` - /// # use futures::Future; - /// # fn doc(mut ping_pong: h2::PingPong) { + /// #![feature(async_await)] + /// # async fn doc(mut ping_pong: h2::PingPong) { /// // let mut ping_pong = ... /// /// // First, send a PING. @@ -563,26 +567,23 @@ impl PingPong { /// .unwrap(); /// /// // And then wait for the PONG. - /// futures::future::poll_fn(move || { - /// ping_pong.poll_pong() - /// }).wait().unwrap(); + /// futures::future::poll_fn(move |cx| { + /// ping_pong.poll_pong(cx) + /// }).await.unwrap(); /// # } /// # fn main() {} /// ``` /// /// [sent]: struct.PingPong.html#method.send_ping - pub fn poll_pong(&mut self) -> Poll { - try_ready!(self.inner.poll_pong()); - Ok(Async::Ready(Pong { - _p: (), - })) + pub fn poll_pong(&mut self, cx: &mut Context) -> Poll> { + ready!(self.inner.poll_pong(cx))?; + Poll::Ready(Ok(Pong { _p: () })) } } impl fmt::Debug for PingPong { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("PingPong") - .finish() + fmt.debug_struct("PingPong").finish() } } @@ -595,16 +596,13 @@ impl Ping { /// /// [`PingPong`]: struct.PingPong.html pub fn opaque() -> Ping { - Ping { - _p: (), - } + Ping { _p: () } } } impl fmt::Debug for Ping { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Ping") - .finish() + fmt.debug_struct("Ping").finish() } } @@ -612,7 +610,6 @@ impl fmt::Debug for Ping { impl fmt::Debug for Pong { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Pong") - .finish() + fmt.debug_struct("Pong").finish() } }