Make Stream impls part of optional 'stream' cargo feature (#397)

This commit is contained in:
Sean McArthur
2019-08-20 16:01:03 -07:00
committed by Carl Lerche
parent f31ec5d0da
commit a1db5428db
8 changed files with 137 additions and 85 deletions

View File

@@ -24,6 +24,9 @@ repository = "hyperium/h2"
branch = "master" branch = "master"
[features] [features]
# Enables `futures::Stream` implementations for various types.
# This is an optional feature due to `Stream` not being stable.
stream = []
# Enables **unstable** APIs. Any API exposed by this feature has no backwards # Enables **unstable** APIs. Any API exposed by this feature has no backwards
# compatibility guarantees. In other words, you should not use this feature for # compatibility guarantees. In other words, you should not use this feature for
@@ -65,7 +68,7 @@ serde = "1.0.0"
serde_json = "1.0.0" serde_json = "1.0.0"
# Akamai example # Akamai example
tokio = "0.2.0-alpha.1" tokio = "0.2.0-alpha.2"
env_logger = { version = "0.5.3", default-features = false } env_logger = { version = "0.5.3", default-features = false }
rustls = "0.12" rustls = "0.12"
tokio-rustls = "0.5.0" tokio-rustls = "0.5.0"

View File

@@ -1,7 +1,5 @@
#![feature(async_await)] #![feature(async_await)]
use futures::future::poll_fn;
use futures::StreamExt;
use h2::client; use h2::client;
use http::{HeaderMap, Request}; use http::{HeaderMap, Request};
@@ -42,13 +40,13 @@ pub async fn main() -> Result<(), Box<dyn Error>> {
println!("GOT RESPONSE: {:?}", response); println!("GOT RESPONSE: {:?}", response);
// Get the body // Get the body
let (_, mut body) = response.into_parts(); let mut body = response.into_body();
while let Some(chunk) = body.next().await { while let Some(chunk) = body.data().await {
println!("GOT CHUNK = {:?}", chunk?); println!("GOT CHUNK = {:?}", chunk?);
} }
if let Some(trailers) = poll_fn(|cx| body.poll_trailers(cx)).await { if let Some(trailers) = body.trailers().await {
println!("GOT TRAILERS: {:?}", trailers?); println!("GOT TRAILERS: {:?}", trailers?);
} }

View File

@@ -3,7 +3,6 @@
use h2::server; use h2::server;
use bytes::*; use bytes::*;
use futures::*;
use http::{Response, StatusCode}; use http::{Response, StatusCode};
use std::error::Error; use std::error::Error;
@@ -13,27 +12,26 @@ use tokio::net::{TcpListener, TcpStream};
pub async fn main() -> Result<(), Box<dyn Error>> { pub async fn main() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let listener = TcpListener::bind(&"127.0.0.1:5928".parse().unwrap()).unwrap(); let mut listener = TcpListener::bind(&"127.0.0.1:5928".parse()?)?;
println!("listening on {:?}", listener.local_addr()); println!("listening on {:?}", listener.local_addr());
let mut incoming = listener.incoming();
while let Some(socket) = incoming.next().await { loop {
if let Ok((socket, _peer_addr)) = listener.accept().await {
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = handle(socket).await { if let Err(e) = handle(socket).await {
println!(" -> err={:?}", e); println!(" -> err={:?}", e);
} }
}); });
} }
}
Ok(())
} }
async fn handle(socket: io::Result<TcpStream>) -> Result<(), Box<dyn Error>> { async fn handle(socket: TcpStream) -> Result<(), Box<dyn Error>> {
let mut connection = server::handshake(socket?).await?; let mut connection = server::handshake(socket).await?;
println!("H2 connection bound"); println!("H2 connection bound");
while let Some(result) = connection.next().await { while let Some(result) = connection.accept().await {
let (request, mut respond) = result?; let (request, mut respond) = result?;
println!("GOT request: {:?}", request); println!("GOT request: {:?}", request);
let response = Response::builder().status(StatusCode::OK).body(()).unwrap(); let response = Response::builder().status(StatusCode::OK).body(()).unwrap();

View File

@@ -69,7 +69,6 @@
//! //!
//! use h2::client; //! use h2::client;
//! //!
//! use futures::*;
//! use http::{Request, Method}; //! use http::{Request, Method};
//! use std::error::Error; //! use std::error::Error;
//! use tokio::net::TcpStream; //! use tokio::net::TcpStream;
@@ -109,7 +108,7 @@
//! // the data from memory. //! // the data from memory.
//! let mut release_capacity = body.release_capacity().clone(); //! let mut release_capacity = body.release_capacity().clone();
//! //!
//! while let Some(chunk) = body.next().await { //! while let Some(chunk) = body.data().await {
//! let chunk = chunk?; //! let chunk = chunk?;
//! println!("RX: {:?}", chunk); //! println!("RX: {:?}", chunk);
//! //!
@@ -145,7 +144,7 @@ use crate::proto;
use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream}; use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream};
use bytes::{Bytes, IntoBuf}; use bytes::{Bytes, IntoBuf};
use futures::{ready, FutureExt, Stream}; use futures::{ready, FutureExt};
use http::{uri, HeaderMap, Method, Request, Response, Version}; use http::{uri, HeaderMap, Method, Request, Response, Version};
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
@@ -1301,10 +1300,17 @@ impl ResponseFuture {
// ===== impl PushPromises ===== // ===== impl PushPromises =====
impl Stream for PushPromises { impl PushPromises {
type Item = Result<PushPromise, crate::Error>; /// Get the next `PushPromise`.
pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
futures::future::poll_fn(move |cx| self.poll_push_promise(cx)).await
}
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { #[doc(hidden)]
pub fn poll_push_promise(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<PushPromise, crate::Error>>> {
match self.inner.poll_pushed(cx) { match self.inner.poll_pushed(cx) {
Poll::Ready(Some(Ok((request, response)))) => { Poll::Ready(Some(Ok((request, response)))) => {
let response = PushedResponseFuture { let response = PushedResponseFuture {
@@ -1322,6 +1328,15 @@ impl Stream for PushPromises {
} }
} }
#[cfg(feature = "stream")]
impl futures::Stream for PushPromises {
type Item = Result<PushPromise, crate::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_push_promise(cx)
}
}
// ===== impl PushPromise ===== // ===== impl PushPromise =====
impl PushPromise { impl PushPromise {

View File

@@ -63,9 +63,8 @@
//! knowledge], i.e. both the client and the server assume that the TCP socket //! knowledge], i.e. both the client and the server assume that the TCP socket
//! will use the HTTP/2.0 protocol without prior negotiation. //! will use the HTTP/2.0 protocol without prior negotiation.
//! //!
//! ```rust //! ```no_run
//! #![feature(async_await)] //! #![feature(async_await)]
//! use futures::StreamExt;
//! use h2::server; //! use h2::server;
//! use http::{Response, StatusCode}; //! use http::{Response, StatusCode};
//! use tokio::net::TcpListener; //! use tokio::net::TcpListener;
@@ -73,19 +72,18 @@
//! #[tokio::main] //! #[tokio::main]
//! pub async fn main() { //! pub async fn main() {
//! let addr = "127.0.0.1:5928".parse().unwrap(); //! let addr = "127.0.0.1:5928".parse().unwrap();
//! let listener = TcpListener::bind(&addr,).unwrap(); //! let mut listener = TcpListener::bind(&addr).unwrap();
//! //!
//! // Accept all incoming TCP connections. //! // Accept all incoming TCP connections.
//! let mut incoming = listener.incoming(); //! loop {
//! # futures::future::select(Box::pin(async { //! if let Ok((socket, _peer_addr)) = listener.accept().await {
//! while let Some(socket) = incoming.next().await {
//! // Spawn a new task to process each connection. //! // Spawn a new task to process each connection.
//! tokio::spawn(async { //! tokio::spawn(async {
//! // Start the HTTP/2.0 connection handshake //! // Start the HTTP/2.0 connection handshake
//! let mut h2 = server::handshake(socket.unwrap()).await.unwrap(); //! let mut h2 = server::handshake(socket).await.unwrap();
//! // Accept all inbound HTTP/2.0 streams sent over the //! // Accept all inbound HTTP/2.0 streams sent over the
//! // connection. //! // connection.
//! while let Some(request) = h2.next().await { //! while let Some(request) = h2.accept().await {
//! let (request, mut respond) = request.unwrap(); //! let (request, mut respond) = request.unwrap();
//! println!("Received request: {:?}", request); //! println!("Received request: {:?}", request);
//! //!
@@ -102,7 +100,7 @@
//! //!
//! }); //! });
//! } //! }
//! # }), Box::pin(async {})).await; //! }
//! } //! }
//! ``` //! ```
//! //!
@@ -178,14 +176,13 @@ pub struct Handshake<T, B: IntoBuf = Bytes> {
/// ///
/// ``` /// ```
/// # #![feature(async_await)] /// # #![feature(async_await)]
/// # use futures::StreamExt;
/// # use tokio_io::*; /// # use tokio_io::*;
/// # use h2::server; /// # use h2::server;
/// # use h2::server::*; /// # use h2::server::*;
/// # /// #
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) { /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) {
/// let mut server = server::handshake(my_io).await.unwrap(); /// let mut server = server::handshake(my_io).await.unwrap();
/// while let Some(request) = server.next().await { /// while let Some(request) = server.accept().await {
/// let (request, respond) = request.unwrap(); /// let (request, respond) = request.unwrap();
/// // Process the request and send the response back to the client /// // Process the request and send the response back to the client
/// // using `respond`. /// // using `respond`.
@@ -341,7 +338,7 @@ impl<T, B> Connection<T, B>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
B: IntoBuf + Unpin, B: IntoBuf + Unpin,
B::Buf: Unpin, B::Buf: Unpin + 'static,
{ {
fn handshake2(io: T, builder: Builder) -> Handshake<T, B> { fn handshake2(io: T, builder: Builder) -> Handshake<T, B> {
// Create the codec. // Create the codec.
@@ -366,6 +363,40 @@ where
Handshake { builder, state } Handshake { builder, state }
} }
/// Accept the next incoming request on this connection.
pub async fn accept(
&mut self,
) -> Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>> {
futures::future::poll_fn(move |cx| self.poll_accept(cx)).await
}
#[doc(hidden)]
pub fn poll_accept(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<(Request<RecvStream>, SendResponse<B>), crate::Error>>> {
// Always try to advance the internal state. Getting Pending also is
// needed to allow this function to return Pending.
if let Poll::Ready(_) = self.poll_closed(cx)? {
// If the socket is closed, don't return anything
// TODO: drop any pending streams
return Poll::Ready(None);
}
if let Some(inner) = self.connection.next_incoming() {
log::trace!("received incoming");
let (head, _) = inner.take_request().into_parts();
let body = RecvStream::new(ReleaseCapacity::new(inner.clone_to_opaque()));
let request = Request::from_parts(head, body);
let respond = SendResponse { inner };
return Poll::Ready(Some(Ok((request, respond))));
}
Poll::Pending
}
/// Sets the target window size for the whole connection. /// Sets the target window size for the whole connection.
/// ///
/// If `size` is greater than the current value, then a `WINDOW_UPDATE` /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
@@ -390,21 +421,27 @@ where
/// Returns `Ready` when the underlying connection has closed. /// Returns `Ready` when the underlying connection has closed.
/// ///
/// If any new inbound streams are received during a call to `poll_close`, /// If any new inbound streams are received during a call to `poll_closed`,
/// they will be queued and returned on the next call to [`poll`]. /// they will be queued and returned on the next call to [`poll_accept`].
/// ///
/// This function will advance the internal connection state, driving /// This function will advance the internal connection state, driving
/// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]). /// progress on all the other handles (e.g. [`RecvStream`] and [`SendStream`]).
/// ///
/// See [here](index.html#managing-the-connection) for more details. /// See [here](index.html#managing-the-connection) for more details.
/// ///
/// [`poll`]: struct.Connection.html#method.poll /// [`poll_accept`]: struct.Connection.html#method.poll_accept
/// [`RecvStream`]: ../struct.RecvStream.html /// [`RecvStream`]: ../struct.RecvStream.html
/// [`SendStream`]: ../struct.SendStream.html /// [`SendStream`]: ../struct.SendStream.html
pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> { pub fn poll_closed(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
self.connection.poll(cx).map_err(Into::into) self.connection.poll(cx).map_err(Into::into)
} }
#[doc(hidden)]
#[deprecated(note = "renamed to poll_closed")]
pub fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
self.poll_closed(cx)
}
/// Sets the connection to a GOAWAY state. /// Sets the connection to a GOAWAY state.
/// ///
/// Does not terminate the connection. Must continue being polled to close /// Does not terminate the connection. Must continue being polled to close
@@ -445,6 +482,7 @@ where
} }
} }
#[cfg(feature = "stream")]
impl<T, B> futures::Stream for Connection<T, B> impl<T, B> futures::Stream for Connection<T, B>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
@@ -454,26 +492,7 @@ where
type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>; type Item = Result<(Request<RecvStream>, SendResponse<B>), crate::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Always try to advance the internal state. Getting Pending also is self.poll_accept(cx)
// needed to allow this function to return Pending.
if let Poll::Ready(_) = self.poll_close(cx)? {
// If the socket is closed, don't return anything
// TODO: drop any pending streams
return Poll::Ready(None);
}
if let Some(inner) = self.connection.next_incoming() {
log::trace!("received incoming");
let (head, _) = inner.take_request().into_parts();
let body = RecvStream::new(ReleaseCapacity::new(inner.clone_to_opaque()));
let request = Request::from_parts(head, body);
let respond = SendResponse { inner };
return Poll::Ready(Some(Ok((request, respond))));
}
Poll::Pending
} }
} }
@@ -1035,7 +1054,7 @@ impl<T, B: IntoBuf> Future for Handshake<T, B>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
B: IntoBuf + Unpin, B: IntoBuf + Unpin,
B::Buf: Unpin, B::Buf: Unpin + 'static,
{ {
type Output = Result<Connection<T, B>, crate::Error>; type Output = Result<Connection<T, B>, crate::Error>;

View File

@@ -8,6 +8,7 @@ use http::HeaderMap;
use crate::PollExt; use crate::PollExt;
use futures::ready; use futures::ready;
use std::fmt; use std::fmt;
#[cfg(feature = "stream")]
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@@ -415,7 +416,22 @@ impl RecvStream {
&mut self.inner &mut self.inner
} }
/// Returns received trailers. /// Get the next data frame.
pub async fn data(&mut self) -> Option<Result<Bytes, crate::Error>> {
futures::future::poll_fn(move |cx| self.poll_data(cx)).await
}
/// Get optional trailers for this stream.
pub async fn trailers(&mut self) -> Option<Result<HeaderMap, crate::Error>> {
futures::future::poll_fn(move |cx| self.poll_trailers(cx)).await
}
#[doc(hidden)]
pub fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, crate::Error>>> {
self.inner.inner.poll_data(cx).map_err_(Into::into)
}
#[doc(hidden)]
pub fn poll_trailers( pub fn poll_trailers(
&mut self, &mut self,
cx: &mut Context, cx: &mut Context,
@@ -433,11 +449,12 @@ impl RecvStream {
} }
} }
#[cfg(feature = "stream")]
impl futures::Stream for RecvStream { impl futures::Stream for RecvStream {
type Item = Result<Bytes, crate::Error>; type Item = Result<Bytes, crate::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.inner.poll_data(cx).map_err_(Into::into) self.poll_data(cx)
} }
} }

View File

@@ -5,7 +5,7 @@ authors = ["Carl Lerche <me@carllerche.com>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
h2 = { path = "../..", features = ["unstable"] } h2 = { path = "../..", features = ["stream", "unstable"] }
bytes = "0.4.7" bytes = "0.4.7"
env_logger = "0.5.9" env_logger = "0.5.9"

View File

@@ -239,7 +239,9 @@ async fn abrupt_shutdown() {
srv.abrupt_shutdown(Reason::INTERNAL_ERROR); srv.abrupt_shutdown(Reason::INTERNAL_ERROR);
let srv_fut = async move { let srv_fut = async move {
poll_fn(move |cx| srv.poll_close(cx)).await.expect("server"); poll_fn(move |cx| srv.poll_closed(cx))
.await
.expect("server");
}; };
join(req_fut, srv_fut).await; join(req_fut, srv_fut).await;