Put Stream APIs behind unstable-stream feature

This commit is contained in:
Sean McArthur
2019-09-27 15:44:00 -07:00
parent 8e1a29ce1b
commit ebe57e10a3
7 changed files with 32 additions and 6 deletions

View File

@@ -47,6 +47,13 @@ matrix:
- rust: beta - rust: beta
env: FEATURES="--features json" env: FEATURES="--features json"
# optional unstable-stream
#- rust: stable
- rust: beta
env: FEATURES="--features unstable-stream"
# socks # socks
#- rust: stable #- rust: stable
# env: FEATURES="--features socks" # env: FEATURES="--features socks"

View File

@@ -37,6 +37,8 @@ json = ["serde_json"]
#trust-dns = ["trust-dns-resolver"] #trust-dns = ["trust-dns-resolver"]
unstable-stream = []
[dependencies] [dependencies]
http = "0.1.15" http = "0.1.15"
url = "2.1" url = "2.1"

View File

@@ -5,7 +5,6 @@ use std::task::{Context, Poll};
use bytes::Bytes; use bytes::Bytes;
use futures_core::Stream; use futures_core::Stream;
use futures_util::TryStreamExt;
use http_body::Body as HttpBody; use http_body::Body as HttpBody;
use tokio::timer::Delay; use tokio::timer::Delay;
@@ -55,12 +54,28 @@ impl Body {
/// let body = Body::wrap_stream(stream); /// let body = Body::wrap_stream(stream);
/// # } /// # }
/// ``` /// ```
///
/// # Unstable
///
/// This requires the `unstable-stream` feature to be enabled.
#[cfg(feature = "unstable-stream")]
pub fn wrap_stream<S>(stream: S) -> Body pub fn wrap_stream<S>(stream: S) -> Body
where where
S: futures_core::stream::TryStream + Send + Sync + 'static, S: futures_core::stream::TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
hyper::Chunk: From<S::Ok>, hyper::Chunk: From<S::Ok>,
{ {
Body::stream(stream)
}
pub(crate) fn stream<S>(stream: S) -> Body
where
S: futures_core::stream::TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
hyper::Chunk: From<S::Ok>,
{
use futures_util::TryStreamExt;
let body = Box::pin(WrapStream( let body = Box::pin(WrapStream(
stream.map_ok(hyper::Chunk::from).map_err(Into::into), stream.map_ok(hyper::Chunk::from).map_err(Into::into),
)); ));

View File

@@ -121,7 +121,7 @@ impl Form {
let last = stream::once(future::ready(Ok( let last = stream::once(future::ready(Ok(
format!("--{}--\r\n", self.boundary()).into() format!("--{}--\r\n", self.boundary()).into()
))); )));
Body::wrap_stream(stream.chain(last)) Body::stream(stream.chain(last))
} }
/// Generate a hyper::Body stream for a single Part instance of a Form request. /// Generate a hyper::Body stream for a single Part instance of a Form request.
@@ -505,7 +505,7 @@ mod tests {
let mut form = Form::new() let mut form = Form::new()
.part( .part(
"reader1", "reader1",
Part::stream(Body::wrap_stream(stream::once(future::ready::< Part::stream(Body::stream(stream::once(future::ready::<
Result<String, crate::Error>, Result<String, crate::Error>,
>(Ok( >(Ok(
"part1".to_owned(), "part1".to_owned(),
@@ -515,7 +515,7 @@ mod tests {
.part("key2", Part::text("value2").mime(mime::IMAGE_BMP)) .part("key2", Part::text("value2").mime(mime::IMAGE_BMP))
.part( .part(
"reader2", "reader2",
Part::stream(Body::wrap_stream(stream::once(future::ready::< Part::stream(Body::stream(stream::once(future::ready::<
Result<String, crate::Error>, Result<String, crate::Error>,
>(Ok( >(Ok(
"part2".to_owned(), "part2".to_owned(),

View File

@@ -394,7 +394,7 @@ impl<T: Into<Body>> From<http::Response<T>> for Response {
/// A `Response` can be piped as the `Body` of another request. /// A `Response` can be piped as the `Body` of another request.
impl From<Response> for Body { impl From<Response> for Body {
fn from(r: Response) -> Body { fn from(r: Response) -> Body {
Body::wrap_stream(r.body) Body::stream(r.body)
} }
} }

View File

@@ -156,11 +156,11 @@
//! - **default-tls** *(enabled by default)*: Provides TLS support via the //! - **default-tls** *(enabled by default)*: Provides TLS support via the
//! `native-tls` library to connect over HTTPS. //! `native-tls` library to connect over HTTPS.
//! - **default-tls-vendored**: Enables the `vendored` feature of `native-tls`. //! - **default-tls-vendored**: Enables the `vendored` feature of `native-tls`.
//! - **rustls-tls**: Provides TLS support via the `rustls` library.
//! - **blocking**: Provides the [blocking][] client API. //! - **blocking**: Provides the [blocking][] client API.
//! - **cookies**: Provides cookie session support. //! - **cookies**: Provides cookie session support.
//! - **gzip**: Provides response body gzip decompression. //! - **gzip**: Provides response body gzip decompression.
//! - **json**: Provides serialization and deserialization for JSON bodies. //! - **json**: Provides serialization and deserialization for JSON bodies.
//! - **unstable-stream** *(unstable)*: Adds support for `futures::Stream`.
//! //!
//! //!
//! [hyper]: http://hyper.rs //! [hyper]: http://hyper.rs
@@ -173,6 +173,7 @@
//! [Proxy]: ./struct.Proxy.html //! [Proxy]: ./struct.Proxy.html
//! [cargo-features]: https://doc.rust-lang.org/stable/cargo/reference/manifest.html#the-features-section //! [cargo-features]: https://doc.rust-lang.org/stable/cargo/reference/manifest.html#the-features-section
////! - **rustls-tls**: Provides TLS support via the `rustls` library.
////! - **socks**: Provides SOCKS5 proxy support. ////! - **socks**: Provides SOCKS5 proxy support.
////! - **trust-dns**: Enables a trust-dns async resolver instead of default ////! - **trust-dns**: Enables a trust-dns async resolver instead of default
////! threadpool using `getaddrinfo`. ////! threadpool using `getaddrinfo`.

View File

@@ -51,6 +51,7 @@ async fn text_part() {
assert_eq!(res.status(), reqwest::StatusCode::OK); assert_eq!(res.status(), reqwest::StatusCode::OK);
} }
#[cfg(feature = "unstable-stream")]
#[tokio::test] #[tokio::test]
async fn stream_part() { async fn stream_part() {
use futures_util::{future, stream}; use futures_util::{future, stream};