From a1db5428db2c8f65d466fc2b4d2cd115699d79b3 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 20 Aug 2019 16:01:03 -0700 Subject: [PATCH] Make Stream impls part of optional 'stream' cargo feature (#397) --- Cargo.toml | 5 +- examples/client.rs | 8 +- examples/server.rs | 26 +++---- src/client.rs | 27 +++++-- src/server.rs | 129 +++++++++++++++++++-------------- src/share.rs | 21 +++++- tests/h2-support/Cargo.toml | 2 +- tests/h2-tests/tests/server.rs | 4 +- 8 files changed, 137 insertions(+), 85 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cec4cde..203a3b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,9 @@ repository = "hyperium/h2" branch = "master" [features] +# Enables `futures::Stream` implementations for various types. +# This is an optional feature due to `Stream` not being stable. +stream = [] # Enables **unstable** APIs. Any API exposed by this feature has no backwards # compatibility guarantees. In other words, you should not use this feature for @@ -65,7 +68,7 @@ serde = "1.0.0" serde_json = "1.0.0" # Akamai example -tokio = "0.2.0-alpha.1" +tokio = "0.2.0-alpha.2" env_logger = { version = "0.5.3", default-features = false } rustls = "0.12" tokio-rustls = "0.5.0" diff --git a/examples/client.rs b/examples/client.rs index 3993344..9df87f2 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,7 +1,5 @@ #![feature(async_await)] -use futures::future::poll_fn; -use futures::StreamExt; use h2::client; use http::{HeaderMap, Request}; @@ -42,13 +40,13 @@ pub async fn main() -> Result<(), Box> { println!("GOT RESPONSE: {:?}", response); // Get the body - let (_, mut body) = response.into_parts(); + let mut body = response.into_body(); - while let Some(chunk) = body.next().await { + while let Some(chunk) = body.data().await { println!("GOT CHUNK = {:?}", chunk?); } - if let Some(trailers) = poll_fn(|cx| body.poll_trailers(cx)).await { + if let Some(trailers) = body.trailers().await { println!("GOT TRAILERS: {:?}", trailers?); } diff --git a/examples/server.rs b/examples/server.rs index 86f7a3d..4e38c68 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -3,7 +3,6 @@ use h2::server; use bytes::*; -use futures::*; use http::{Response, StatusCode}; use std::error::Error; @@ -13,27 +12,26 @@ use tokio::net::{TcpListener, TcpStream}; pub async fn main() -> Result<(), Box> { let _ = env_logger::try_init(); - let listener = TcpListener::bind(&"127.0.0.1:5928".parse().unwrap()).unwrap(); + let mut listener = TcpListener::bind(&"127.0.0.1:5928".parse()?)?; println!("listening on {:?}", listener.local_addr()); - let mut incoming = listener.incoming(); - while let Some(socket) = incoming.next().await { - tokio::spawn(async move { - if let Err(e) = handle(socket).await { - println!(" -> err={:?}", e); - } - }); + loop { + if let Ok((socket, _peer_addr)) = listener.accept().await { + tokio::spawn(async move { + if let Err(e) = handle(socket).await { + println!(" -> err={:?}", e); + } + }); + } } - - Ok(()) } -async fn handle(socket: io::Result) -> Result<(), Box> { - let mut connection = server::handshake(socket?).await?; +async fn handle(socket: TcpStream) -> Result<(), Box> { + let mut connection = server::handshake(socket).await?; println!("H2 connection bound"); - while let Some(result) = connection.next().await { + while let Some(result) = connection.accept().await { let (request, mut respond) = result?; println!("GOT request: {:?}", request); let response = Response::builder().status(StatusCode::OK).body(()).unwrap(); diff --git a/src/client.rs b/src/client.rs index c632c6d..24b51ad 100644 --- a/src/client.rs +++ b/src/client.rs @@ -69,7 +69,6 @@ //! //! use h2::client; //! -//! use futures::*; //! use http::{Request, Method}; //! use std::error::Error; //! use tokio::net::TcpStream; @@ -109,7 +108,7 @@ //! // the data from memory. //! let mut release_capacity = body.release_capacity().clone(); //! -//! while let Some(chunk) = body.next().await { +//! while let Some(chunk) = body.data().await { //! let chunk = chunk?; //! println!("RX: {:?}", chunk); //! @@ -145,7 +144,7 @@ use crate::proto; use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream}; use bytes::{Bytes, IntoBuf}; -use futures::{ready, FutureExt, Stream}; +use futures::{ready, FutureExt}; use http::{uri, HeaderMap, Method, Request, Response, Version}; use std::fmt; use std::future::Future; @@ -1301,10 +1300,17 @@ impl ResponseFuture { // ===== impl PushPromises ===== -impl Stream for PushPromises { - type Item = Result; +impl PushPromises { + /// Get the next `PushPromise`. + pub async fn push_promise(&mut self) -> Option> { + futures::future::poll_fn(move |cx| self.poll_push_promise(cx)).await + } - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + #[doc(hidden)] + pub fn poll_push_promise( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { match self.inner.poll_pushed(cx) { Poll::Ready(Some(Ok((request, response)))) => { let response = PushedResponseFuture { @@ -1322,6 +1328,15 @@ impl Stream for PushPromises { } } +#[cfg(feature = "stream")] +impl futures::Stream for PushPromises { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_push_promise(cx) + } +} + // ===== impl PushPromise ===== impl PushPromise { diff --git a/src/server.rs b/src/server.rs index 2bc7562..657f697 100644 --- a/src/server.rs +++ b/src/server.rs @@ -63,46 +63,44 @@ //! knowledge], i.e. both the client and the server assume that the TCP socket //! will use the HTTP/2.0 protocol without prior negotiation. //! -//! ```rust +//! ```no_run //! #![feature(async_await)] -//! use futures::StreamExt; //! use h2::server; //! use http::{Response, StatusCode}; //! use tokio::net::TcpListener; //! //! #[tokio::main] -//! pub async fn main () { +//! pub async fn main() { //! let addr = "127.0.0.1:5928".parse().unwrap(); -//! let listener = TcpListener::bind(&addr,).unwrap(); +//! let mut listener = TcpListener::bind(&addr).unwrap(); //! //! // Accept all incoming TCP connections. -//! let mut incoming = listener.incoming(); -//! # futures::future::select(Box::pin(async { -//! while let Some(socket) = incoming.next().await { -//! // Spawn a new task to process each connection. -//! tokio::spawn(async { -//! // Start the HTTP/2.0 connection handshake -//! let mut h2 = server::handshake(socket.unwrap()).await.unwrap(); -//! // Accept all inbound HTTP/2.0 streams sent over the -//! // connection. -//! while let Some(request) = h2.next().await { -//! let (request, mut respond) = request.unwrap(); -//! println!("Received request: {:?}", request); +//! loop { +//! if let Ok((socket, _peer_addr)) = listener.accept().await { +//! // Spawn a new task to process each connection. +//! tokio::spawn(async { +//! // Start the HTTP/2.0 connection handshake +//! let mut h2 = server::handshake(socket).await.unwrap(); +//! // Accept all inbound HTTP/2.0 streams sent over the +//! // connection. +//! while let Some(request) = h2.accept().await { +//! let (request, mut respond) = request.unwrap(); +//! println!("Received request: {:?}", request); //! -//! // Build a response with no body -//! let response = Response::builder() -//! .status(StatusCode::OK) -//! .body(()) -//! .unwrap(); -//! -//! // Send the response back to the client -//! respond.send_response(response, true) +//! // Build a response with no body +//! let response = Response::builder() +//! .status(StatusCode::OK) +//! .body(()) //! .unwrap(); -//! } //! -//! }); +//! // Send the response back to the client +//! respond.send_response(response, true) +//! .unwrap(); +//! } +//! +//! }); +//! } //! } -//! # }), Box::pin(async {})).await; //! } //! ``` //! @@ -178,14 +176,13 @@ pub struct Handshake { /// /// ``` /// # #![feature(async_await)] -/// # use futures::StreamExt; /// # use tokio_io::*; /// # use h2::server; /// # use h2::server::*; /// # /// # async fn doc(my_io: T) { /// let mut server = server::handshake(my_io).await.unwrap(); -/// while let Some(request) = server.next().await { +/// while let Some(request) = server.accept().await { /// let (request, respond) = request.unwrap(); /// // Process the request and send the response back to the client /// // using `respond`. @@ -341,7 +338,7 @@ impl Connection where T: AsyncRead + AsyncWrite + Unpin, B: IntoBuf + Unpin, - B::Buf: Unpin, + B::Buf: Unpin + 'static, { fn handshake2(io: T, builder: Builder) -> Handshake { // Create the codec. @@ -366,6 +363,40 @@ where Handshake { builder, state } } + /// Accept the next incoming request on this connection. + pub async fn accept( + &mut self, + ) -> Option, SendResponse), crate::Error>> { + futures::future::poll_fn(move |cx| self.poll_accept(cx)).await + } + + #[doc(hidden)] + pub fn poll_accept( + &mut self, + cx: &mut Context<'_>, + ) -> Poll, SendResponse), crate::Error>>> { + // Always try to advance the internal state. Getting Pending also is + // needed to allow this function to return Pending. + if let Poll::Ready(_) = self.poll_closed(cx)? { + // If the socket is closed, don't return anything + // TODO: drop any pending streams + return Poll::Ready(None); + } + + if let Some(inner) = self.connection.next_incoming() { + log::trace!("received incoming"); + let (head, _) = inner.take_request().into_parts(); + let body = RecvStream::new(ReleaseCapacity::new(inner.clone_to_opaque())); + + let request = Request::from_parts(head, body); + let respond = SendResponse { inner }; + + return Poll::Ready(Some(Ok((request, respond)))); + } + + Poll::Pending + } + /// Sets the target window size for the whole connection. /// /// If `size` is greater than the current value, then a `WINDOW_UPDATE` @@ -390,21 +421,27 @@ where /// Returns `Ready` when the underlying connection has closed. /// - /// If any new inbound streams are received during a call to `poll_close`, - /// they will be queued and returned on the next call to [`poll`]. + /// If any new inbound streams are received during a call to `poll_closed`, + /// they will be queued and returned on the next call to [`poll_accept`]. /// /// This function will advance the internal connection state, driving /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]). /// /// See [here](index.html#managing-the-connection) for more details. /// - /// [`poll`]: struct.Connection.html#method.poll + /// [`poll_accept`]: struct.Connection.html#method.poll_accept /// [`RecvStream`]: ../struct.RecvStream.html /// [`SendStream`]: ../struct.SendStream.html - pub fn poll_close(&mut self, cx: &mut Context) -> Poll> { + pub fn poll_closed(&mut self, cx: &mut Context) -> Poll> { self.connection.poll(cx).map_err(Into::into) } + #[doc(hidden)] + #[deprecated(note = "renamed to poll_closed")] + pub fn poll_close(&mut self, cx: &mut Context) -> Poll> { + self.poll_closed(cx) + } + /// Sets the connection to a GOAWAY state. /// /// Does not terminate the connection. Must continue being polled to close @@ -445,6 +482,7 @@ where } } +#[cfg(feature = "stream")] impl futures::Stream for Connection where T: AsyncRead + AsyncWrite + Unpin, @@ -454,26 +492,7 @@ where type Item = Result<(Request, SendResponse), crate::Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Always try to advance the internal state. Getting Pending also is - // needed to allow this function to return Pending. - if let Poll::Ready(_) = self.poll_close(cx)? { - // If the socket is closed, don't return anything - // TODO: drop any pending streams - return Poll::Ready(None); - } - - if let Some(inner) = self.connection.next_incoming() { - log::trace!("received incoming"); - let (head, _) = inner.take_request().into_parts(); - let body = RecvStream::new(ReleaseCapacity::new(inner.clone_to_opaque())); - - let request = Request::from_parts(head, body); - let respond = SendResponse { inner }; - - return Poll::Ready(Some(Ok((request, respond)))); - } - - Poll::Pending + self.poll_accept(cx) } } @@ -1035,7 +1054,7 @@ impl Future for Handshake where T: AsyncRead + AsyncWrite + Unpin, B: IntoBuf + Unpin, - B::Buf: Unpin, + B::Buf: Unpin + 'static, { type Output = Result, crate::Error>; diff --git a/src/share.rs b/src/share.rs index d209332..5ac723f 100644 --- a/src/share.rs +++ b/src/share.rs @@ -8,6 +8,7 @@ use http::HeaderMap; use crate::PollExt; use futures::ready; use std::fmt; +#[cfg(feature = "stream")] use std::pin::Pin; use std::task::{Context, Poll}; @@ -415,7 +416,22 @@ impl RecvStream { &mut self.inner } - /// Returns received trailers. + /// Get the next data frame. + pub async fn data(&mut self) -> Option> { + futures::future::poll_fn(move |cx| self.poll_data(cx)).await + } + + /// Get optional trailers for this stream. + pub async fn trailers(&mut self) -> Option> { + futures::future::poll_fn(move |cx| self.poll_trailers(cx)).await + } + + #[doc(hidden)] + pub fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll>> { + self.inner.inner.poll_data(cx).map_err_(Into::into) + } + + #[doc(hidden)] pub fn poll_trailers( &mut self, cx: &mut Context, @@ -433,11 +449,12 @@ impl RecvStream { } } +#[cfg(feature = "stream")] impl futures::Stream for RecvStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.inner.poll_data(cx).map_err_(Into::into) + self.poll_data(cx) } } diff --git a/tests/h2-support/Cargo.toml b/tests/h2-support/Cargo.toml index 04a1588..0ec64b9 100644 --- a/tests/h2-support/Cargo.toml +++ b/tests/h2-support/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Carl Lerche "] edition = "2018" [dependencies] -h2 = { path = "../..", features = ["unstable"] } +h2 = { path = "../..", features = ["stream", "unstable"] } bytes = "0.4.7" env_logger = "0.5.9" diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index 857d081..5ca318e 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -239,7 +239,9 @@ async fn abrupt_shutdown() { srv.abrupt_shutdown(Reason::INTERNAL_ERROR); let srv_fut = async move { - poll_fn(move |cx| srv.poll_close(cx)).await.expect("server"); + poll_fn(move |cx| srv.poll_closed(cx)) + .await + .expect("server"); }; join(req_fut, srv_fut).await;