feat(body): put Stream impl for Body behind stream feature

BREAKING CHANGE: Using a `Body` as a `Stream`, and constructing one via
  `Body::wrap_stream`, require enabling the unstable `stream` feature.
This commit is contained in:
Sean McArthur
2019-09-05 14:46:12 -07:00
parent b3e5506261
commit 511ea3889b
4 changed files with 43 additions and 25 deletions

View File

@@ -57,7 +57,6 @@ tokio-fs = "=0.2.0-alpha.4"
tokio-test = "=0.2.0-alpha.4" tokio-test = "=0.2.0-alpha.4"
url = "1.0" url = "1.0"
[features] [features]
default = [ default = [
"__internal_flaky_tests", "__internal_flaky_tests",
@@ -78,6 +77,12 @@ nightly = []
__internal_flaky_tests = [] __internal_flaky_tests = []
__internal_happy_eyeballs_tests = [] __internal_happy_eyeballs_tests = []
[package.metadata.docs.rs]
features = [
"runtime",
"stream",
]
[profile.release] [profile.release]
codegen-units = 1 codegen-units = 1
incremental = false incremental = false
@@ -94,12 +99,12 @@ required-features = ["runtime"]
[[example]] [[example]]
name = "client_json" name = "client_json"
path = "examples/client_json.rs" path = "examples/client_json.rs"
required-features = ["runtime"] required-features = ["runtime", "stream"]
[[example]] [[example]]
name = "echo" name = "echo"
path = "examples/echo.rs" path = "examples/echo.rs"
required-features = ["runtime"] required-features = ["runtime", "stream"]
[[example]] [[example]]
name = "hello" name = "hello"
@@ -114,7 +119,7 @@ required-features = ["runtime"]
[[example]] [[example]]
name = "params" name = "params"
path = "examples/params.rs" path = "examples/params.rs"
required-features = ["runtime"] required-features = ["runtime", "stream"]
[[example]] [[example]]
name = "proxy" name = "proxy"
@@ -155,7 +160,7 @@ required-features = ["runtime"]
[[example]] [[example]]
name = "web_api" name = "web_api"
path = "examples/web_api.rs" path = "examples/web_api.rs"
required-features = ["runtime"] required-features = ["runtime", "stream"]
[[bench]] [[bench]]
@@ -171,20 +176,20 @@ required-features = ["runtime"]
[[bench]] [[bench]]
name = "server" name = "server"
path = "benches/server.rs" path = "benches/server.rs"
required-features = ["runtime"] required-features = ["runtime", "stream"]
[[test]] [[test]]
name = "client" name = "client"
path = "tests/client.rs" path = "tests/client.rs"
required-features = ["runtime"] required-features = ["runtime", "stream"]
[[test]] [[test]]
name = "integration" name = "integration"
path = "tests/integration.rs" path = "tests/integration.rs"
required-features = ["runtime"] required-features = ["runtime", "stream"]
[[test]] [[test]]
name = "server" name = "server"
path = "tests/server.rs" path = "tests/server.rs"
required-features = ["runtime"] required-features = ["runtime", "stream"]

View File

@@ -1,10 +1,14 @@
use std::borrow::Cow; use std::borrow::Cow;
#[cfg(feature = "stream")]
use std::error::Error as StdError; use std::error::Error as StdError;
use std::fmt; use std::fmt;
use bytes::Bytes; use bytes::Bytes;
use futures_core::{Stream, TryStream}; use futures_core::Stream; // for mpsc::Receiver
#[cfg(feature = "stream")]
use futures_core::TryStream;
use futures_channel::{mpsc, oneshot}; use futures_channel::{mpsc, oneshot};
#[cfg(feature = "stream")]
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use http_body::{SizeHint, Body as HttpBody}; use http_body::{SizeHint, Body as HttpBody};
use http::HeaderMap; use http::HeaderMap;
@@ -18,8 +22,6 @@ type BodySender = mpsc::Sender<Result<Chunk, crate::Error>>;
/// A stream of `Chunk`s, used when receiving bodies. /// A stream of `Chunk`s, used when receiving bodies.
/// ///
/// A good default `Payload` to use in many applications. /// A good default `Payload` to use in many applications.
///
/// Also implements `futures::Stream`, so stream combinators may be used.
#[must_use = "streams do nothing unless polled"] #[must_use = "streams do nothing unless polled"]
pub struct Body { pub struct Body {
kind: Kind, kind: Kind,
@@ -43,6 +45,7 @@ enum Kind {
// while a borrow of a `Request<Body>` exists. // while a borrow of a `Request<Body>` exists.
// //
// See https://github.com/rust-lang/rust/issues/57017 // See https://github.com/rust-lang/rust/issues/57017
#[cfg(feature = "stream")]
Wrapped(Pin<Box<dyn Stream<Item = Result<Chunk, Box<dyn StdError + Send + Sync>>> + Send + Sync>>), Wrapped(Pin<Box<dyn Stream<Item = Result<Chunk, Box<dyn StdError + Send + Sync>>> + Send + Sync>>),
} }
@@ -140,6 +143,12 @@ impl Body {
/// let body = Body::wrap_stream(stream); /// let body = Body::wrap_stream(stream);
/// # } /// # }
/// ``` /// ```
///
/// # Unstable
///
/// This function requires enabling the unstable `stream` feature in your
/// `Cargo.toml`.
#[cfg(feature = "stream")]
pub fn wrap_stream<S>(stream: S) -> Body pub fn wrap_stream<S>(stream: S) -> Body
where where
S: TryStream + Send + Sync + 'static, S: TryStream + Send + Sync + 'static,
@@ -277,6 +286,8 @@ impl Body {
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
None => Poll::Ready(None), None => Poll::Ready(None),
}, },
#[cfg(feature = "stream")]
Kind::Wrapped(ref mut s) => { Kind::Wrapped(ref mut s) => {
match ready!(s.as_mut().poll_next(cx)) { match ready!(s.as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
@@ -326,6 +337,7 @@ impl HttpBody for Body {
Kind::Once(ref val) => val.is_none(), Kind::Once(ref val) => val.is_none(),
Kind::Chan { content_length, .. } => content_length == Some(0), Kind::Chan { content_length, .. } => content_length == Some(0),
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
#[cfg(feature = "stream")]
Kind::Wrapped(..) => false, Kind::Wrapped(..) => false,
} }
} }
@@ -340,6 +352,7 @@ impl HttpBody for Body {
Kind::Once(None) => { Kind::Once(None) => {
SizeHint::default() SizeHint::default()
}, },
#[cfg(feature = "stream")]
Kind::Wrapped(..) => SizeHint::default(), Kind::Wrapped(..) => SizeHint::default(),
Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => { Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => {
let mut hint = SizeHint::default(); let mut hint = SizeHint::default();
@@ -361,12 +374,12 @@ impl fmt::Debug for Body {
#[derive(Debug)] #[derive(Debug)]
struct Empty; struct Empty;
#[derive(Debug)] #[derive(Debug)]
struct Once<'a>(&'a Chunk); struct Full<'a>(&'a Chunk);
let mut builder = f.debug_tuple("Body"); let mut builder = f.debug_tuple("Body");
match self.kind { match self.kind {
Kind::Once(None) => builder.field(&Empty), Kind::Once(None) => builder.field(&Empty),
Kind::Once(Some(ref chunk)) => builder.field(&Once(chunk)), Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)),
_ => builder.field(&Streaming), _ => builder.field(&Streaming),
}; };
@@ -374,6 +387,7 @@ impl fmt::Debug for Body {
} }
} }
#[cfg(feature = "stream")]
impl Stream for Body { impl Stream for Body {
type Item = crate::Result<Chunk>; type Item = crate::Result<Chunk>;
@@ -383,6 +397,7 @@ impl Stream for Body {
} }
#[cfg(feature = "stream")]
impl impl
From<Box<dyn Stream<Item = Result<Chunk, Box<dyn StdError + Send + Sync>>> + Send + Sync>> From<Box<dyn Stream<Item = Result<Chunk, Box<dyn StdError + Send + Sync>>> + Send + Sync>>
for Body for Body

View File

@@ -18,6 +18,15 @@
//! //!
//! If looking for just a convenient HTTP client, consider the //! If looking for just a convenient HTTP client, consider the
//! [reqwest](https://crates.io/crates/reqwest) crate. //! [reqwest](https://crates.io/crates/reqwest) crate.
//!
//! # Optional Features
//!
//! The following optional features are available:
//!
//! - `runtime` (*enabled by default*): Enables convenient integration with
//! `tokio`, providing connectors and acceptors for TCP, and a default
//! executor.
//! - `stream` (*unstable*): Provides `futures::Stream` capabilities.
#[doc(hidden)] pub use http; #[doc(hidden)] pub use http;
#[macro_use] extern crate log; #[macro_use] extern crate log;

View File

@@ -156,17 +156,6 @@ impl AddrIncoming {
} }
} }
/*
impl Stream for AddrIncoming {
type Item = io::Result<AddrStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let result = ready!(self.poll_next_(cx));
Poll::Ready(Some(result))
}
}
*/
impl Accept for AddrIncoming { impl Accept for AddrIncoming {
type Conn = AddrStream; type Conn = AddrStream;
type Error = io::Error; type Error = io::Error;