diff --git a/.travis.yml b/.travis.yml index fb8294c..b3d59de 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,13 +19,13 @@ matrix: # rustls-tls #- rust: stable - - rust: nightly - env: FEATURES="--no-default-features --features rustls-tls" + #- rust: nightly + # env: FEATURES="--no-default-features --features rustls-tls" # default-tls and rustls-tls #- rust: stable - - rust: nightly - env: FEATURES="--features rustls-tls" + #- rust: nightly + # env: FEATURES="--features rustls-tls" # optional cookies #- rust: stable diff --git a/Cargo.toml b/Cargo.toml index b3395f5..3dfa788 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,13 +24,14 @@ encoding_rs = "0.8" futures-core-preview = { version = "=0.3.0-alpha.18" } futures-util-preview = { version = "=0.3.0-alpha.18" } http = "0.1.15" -hyper = "=0.13.0-alpha.1" +http-body = "=0.2.0-alpha.1" +hyper = "=0.13.0-alpha.2" log = "0.4" mime = "0.3.7" mime_guess = "2.0" percent-encoding = "2.1" -tokio = { version = "=0.2.0-alpha.4", default-features = false, features = ["rt-full", "tcp"] } -tokio-executor = "=0.2.0-alpha.4" +tokio = { version = "=0.2.0-alpha.5", default-features = false, features = ["rt-full", "tcp"] } +tokio-executor = "=0.2.0-alpha.5" url = "2.1" uuid = { version = "0.7", features = ["v4"] } time = "0.1.42" @@ -45,15 +46,15 @@ serde_urlencoded = "0.6.1" # Optional deps... ## default-tls -hyper-tls = { version = "=0.4.0-alpha.1", optional = true } +hyper-tls = { version = "=0.4.0-alpha.2", optional = true } native-tls = { version = "0.2", optional = true } -tokio-tls = { version = "=0.3.0-alpha.4", optional = true } +tokio-tls = { version = "=0.3.0-alpha.5", optional = true } ## rustls-tls -hyper-rustls = { version = "=0.18.0-alpha.1", optional = true } -rustls = { version = "0.16", features = ["dangerous_configuration"], optional = true } -tokio-rustls = { version = "=0.12.0-alpha.2", optional = true } -webpki-roots = { version = "0.17", optional = true } +#hyper-rustls = { version = "=0.18.0-alpha.1", optional = true } +#rustls = { version = "0.16", features = ["dangerous_configuration"], optional = true } +#tokio-rustls = { version = "=0.12.0-alpha.2", optional = true } +#webpki-roots = { version = "0.17", optional = true } ## blocking futures-channel-preview = { version = "=0.3.0-alpha.18", optional = true } @@ -73,11 +74,11 @@ async-compression = { version = "0.1.0-alpha.4", default-features = false, featu [dev-dependencies] env_logger = "0.6" +hyper = { version = "=0.13.0-alpha.2", features = ["unstable-stream"] } serde = { version = "1.0", features = ["derive"] } libflate = "0.1" doc-comment = "0.3" -bytes = "0.4" -tokio-fs = { version = "=0.2.0-alpha.4" } +tokio-fs = { version = "=0.2.0-alpha.5" } [features] default = ["default-tls"] @@ -87,7 +88,8 @@ tls = [] default-tls = ["hyper-tls", "native-tls", "tls", "tokio-tls"] default-tls-vendored = ["default-tls", "native-tls/vendored"] -rustls-tls = ["hyper-rustls", "tokio-rustls", "webpki-roots", "rustls", "tls"] +# re-enable CI also +#rustls-tls = ["hyper-rustls", "tokio-rustls", "webpki-roots", "rustls", "tls"] blocking = ["futures-channel-preview", "futures-util-preview/io"] diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index 4919910..e26918d 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -1,10 +1,12 @@ -use bytes::Bytes; -use futures_core::Stream; -use hyper::body::Payload; use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; + +use bytes::Bytes; +use futures_core::Stream; +use futures_util::TryStreamExt; +use http_body::Body as HttpBody; use tokio::timer::Delay; /// An asynchronous request body. @@ -17,12 +19,22 @@ pub(crate) struct ImplStream(Body); enum Inner { Reusable(Bytes), - Hyper { - body: hyper::Body, + Streaming { + body: Pin< + Box< + dyn HttpBody> + + Send + + Sync, + >, + >, timeout: Option, }, } +struct WrapStream(S); + +struct WrapHyper(hyper::Body); + impl Body { /// Wrap a futures `Stream` in a box inside `Body`. /// @@ -49,27 +61,38 @@ impl Body { S::Error: Into>, hyper::Chunk: From, { - Body::wrap(hyper::body::Body::wrap_stream(stream)) - } - - pub(crate) fn response(body: hyper::Body, timeout: Option) -> Body { + let body = Box::pin(WrapStream( + stream.map_ok(hyper::Chunk::from).map_err(Into::into), + )); Body { - inner: Inner::Hyper { body, timeout }, - } - } - - pub(crate) fn wrap(body: hyper::Body) -> Body { - Body { - inner: Inner::Hyper { + inner: Inner::Streaming { body, timeout: None, }, } } - #[cfg(any(feature = "blocking", feature = "gzip",))] + pub(crate) fn response(body: hyper::Body, timeout: Option) -> Body { + Body { + inner: Inner::Streaming { + body: Box::pin(WrapHyper(body)), + timeout, + }, + } + } + + #[cfg(feature = "blocking")] + pub(crate) fn wrap(body: hyper::Body) -> Body { + Body { + inner: Inner::Streaming { + body: Box::pin(WrapHyper(body)), + timeout: None, + }, + } + } + pub(crate) fn empty() -> Body { - Body::wrap(hyper::Body::empty()) + Body::reusable(Bytes::new()) } pub(crate) fn reusable(chunk: Bytes) -> Body { @@ -78,14 +101,13 @@ impl Body { } } - pub(crate) fn into_hyper(self) -> (Option, hyper::Body) { - match self.inner { - Inner::Reusable(chunk) => (Some(chunk.clone()), chunk.into()), - Inner::Hyper { body, timeout } => { - debug_assert!(timeout.is_none()); - (None, body) - } - } + pub(crate) fn try_reuse(self) -> (Option, Self) { + let reuse = match self.inner { + Inner::Reusable(ref chunk) => Some(chunk.clone()), + Inner::Streaming { .. } => None, + }; + + (reuse, self) } pub(crate) fn into_stream(self) -> ImplStream { @@ -95,7 +117,7 @@ impl Body { pub(crate) fn content_length(&self) -> Option { match self.inner { Inner::Reusable(ref bytes) => Some(bytes.len() as u64), - Inner::Hyper { ref body, .. } => body.size_hint().exact(), + Inner::Streaming { ref body, .. } => body.size_hint().exact(), } } } @@ -143,13 +165,71 @@ impl fmt::Debug for Body { // ===== impl ImplStream ===== +impl HttpBody for ImplStream { + type Data = hyper::Chunk; + type Error = crate::Error; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { + let opt_try_chunk = match self.0.inner { + Inner::Streaming { + ref mut body, + ref mut timeout, + } => { + if let Some(ref mut timeout) = timeout { + if let Poll::Ready(()) = Pin::new(timeout).poll(cx) { + return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut)))); + } + } + futures_core::ready!(Pin::new(body).poll_data(cx)) + .map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::body)) + } + Inner::Reusable(ref mut bytes) => { + if bytes.is_empty() { + None + } else { + Some(Ok(std::mem::replace(bytes, Bytes::new()).into())) + } + } + }; + + Poll::Ready(opt_try_chunk) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } + + fn is_end_stream(&self) -> bool { + match self.0.inner { + Inner::Streaming { ref body, .. } => body.is_end_stream(), + Inner::Reusable(ref bytes) => bytes.is_empty(), + } + } + + fn size_hint(&self) -> http_body::SizeHint { + match self.0.inner { + Inner::Streaming { ref body, .. } => body.size_hint(), + Inner::Reusable(ref bytes) => { + let mut hint = http_body::SizeHint::default(); + hint.set_exact(bytes.len() as u64); + hint + } + } + } +} + impl Stream for ImplStream { type Item = Result; - #[inline] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let opt_try_chunk = match self.0.inner { - Inner::Hyper { + Inner::Streaming { ref mut body, ref mut timeout, } => { @@ -173,3 +253,67 @@ impl Stream for ImplStream { Poll::Ready(opt_try_chunk) } } + +// ===== impl WrapStream ===== + +impl HttpBody for WrapStream +where + S: Stream>, + D: Into, + E: Into>, +{ + type Data = hyper::Chunk; + type Error = E; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { + // safe pin projection + let item = + futures_core::ready!( + unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) }.poll_next(cx)? + ); + + Poll::Ready(item.map(|val| Ok(val.into()))) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +// ===== impl WrapHyper ===== + +impl HttpBody for WrapHyper { + type Data = hyper::Chunk; + type Error = Box; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { + // safe pin projection + Pin::new(&mut self.0) + .poll_data(cx) + .map(|opt| opt.map(|res| res.map_err(Into::into))) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } + + fn is_end_stream(&self) -> bool { + self.0.is_end_stream() + } + + fn size_hint(&self) -> http_body::SizeHint { + self.0.size_hint() + } +} diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 93590af..629d9ae 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -5,11 +5,11 @@ use std::sync::RwLock; use std::time::Duration; use std::{fmt, str}; -use crate::header::{ +use bytes::Bytes; +use http::header::{ Entry, HeaderMap, HeaderValue, ACCEPT, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE, LOCATION, PROXY_AUTHORIZATION, RANGE, REFERER, TRANSFER_ENCODING, USER_AGENT, }; -use bytes::Bytes; use http::Uri; use hyper::client::ResponseFuture; use mime; @@ -24,6 +24,7 @@ use log::debug; use super::request::{Request, RequestBuilder}; use super::response::Response; +use super::Body; use crate::connect::Connector; #[cfg(feature = "cookies")] use crate::cookie; @@ -472,7 +473,7 @@ impl ClientBuilder { } } -type HyperClient = hyper::Client; +type HyperClient = hyper::Client; impl Client { /// Constructs a new `Client`. @@ -612,10 +613,10 @@ impl Client { let (reusable, body) = match body { Some(body) => { - let (reusable, body) = body.into_hyper(); + let (reusable, body) = body.try_reuse(); (Some(reusable), body) } - None => (None, hyper::Body::empty()), + None => (None, Body::empty()), }; self.proxy_auth(&uri, &mut headers); @@ -623,7 +624,7 @@ impl Client { let mut req = hyper::Request::builder() .method(method.clone()) .uri(uri.clone()) - .body(body) + .body(body.into_stream()) .expect("valid request parts"); *req.headers_mut() = headers.clone(); @@ -884,13 +885,13 @@ impl Future for PendingRequest { debug!("redirecting to {:?} '{}'", self.method, self.url); let uri = expect_uri(&self.url); let body = match self.body { - Some(Some(ref body)) => hyper::Body::from(body.clone()), - _ => hyper::Body::empty(), + Some(Some(ref body)) => Body::reusable(body.clone()), + _ => Body::empty(), }; let mut req = hyper::Request::builder() .method(self.method.clone()) .uri(uri.clone()) - .body(body) + .body(body.into_stream()) .expect("valid request parts"); // Add cookies from the cookie store. diff --git a/src/async_impl/multipart.rs b/src/async_impl/multipart.rs index 0287f26..78d08cd 100644 --- a/src/async_impl/multipart.rs +++ b/src/async_impl/multipart.rs @@ -1,13 +1,16 @@ //! multipart/form-data use std::borrow::Cow; use std::fmt; +use std::pin::Pin; +use bytes::Bytes; use http::HeaderMap; use mime_guess::Mime; use percent_encoding::{self, AsciiSet, NON_ALPHANUMERIC}; use uuid::Uuid; -use futures_util::StreamExt; +use futures_core::Stream; +use futures_util::{future, stream, StreamExt}; use super::Body; @@ -96,50 +99,58 @@ impl Form { self.with_inner(|inner| inner.percent_encode_noop()) } - /// Consume this instance and transform into an instance of hyper::Body for use in a request. - pub(crate) fn stream(mut self) -> hyper::Body { + /// Consume this instance and transform into an instance of Body for use in a request. + pub(crate) fn stream(mut self) -> Body { if self.inner.fields.is_empty() { - return hyper::Body::empty(); + return Body::empty(); } // create initial part to init reduce chain let (name, part) = self.inner.fields.remove(0); - let start = self.part_stream(name, part); + let start = Box::pin(self.part_stream(name, part)) + as Pin> + Send + Sync>>; let fields = self.inner.take_fields(); // for each field, chain an additional stream let stream = fields.into_iter().fold(start, |memo, (name, part)| { let part_stream = self.part_stream(name, part); - hyper::Body::wrap_stream(memo.chain(part_stream)) + Box::pin(memo.chain(part_stream)) + as Pin> + Send + Sync>> }); // append special ending boundary - let last = hyper::Body::from(format!("--{}--\r\n", self.boundary())); - hyper::Body::wrap_stream(stream.chain(last)) + let last = stream::once(future::ready(Ok( + format!("--{}--\r\n", self.boundary()).into() + ))); + Body::wrap_stream(stream.chain(last)) } /// Generate a hyper::Body stream for a single Part instance of a Form request. - pub(crate) fn part_stream(&mut self, name: T, part: Part) -> hyper::Body + pub(crate) fn part_stream( + &mut self, + name: T, + part: Part, + ) -> impl Stream> where T: Into>, { // start with boundary - let boundary = hyper::Body::from(format!("--{}\r\n", self.boundary())); + let boundary = stream::once(future::ready(Ok( + format!("--{}\r\n", self.boundary()).into() + ))); // append headers - let header = hyper::Body::from({ + let header = stream::once(future::ready(Ok({ let mut h = self .inner .percent_encoding .encode_headers(&name.into(), &part.meta); h.extend_from_slice(b"\r\n\r\n"); - h - }); + h.into() + }))); // then append form data followed by terminating CRLF - hyper::Body::wrap_stream( - boundary - .chain(header) - .chain(hyper::Body::wrap_stream(part.value.into_stream())) - .chain(hyper::Body::from("\r\n".to_owned())), - ) + boundary + .chain(header) + .chain(part.value.into_stream()) + .chain(stream::once(future::ready(Ok("\r\n".into())))) } pub(crate) fn compute_length(&mut self) -> Option { @@ -482,8 +493,8 @@ mod tests { let form = Form::new(); let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt"); - let body = form.stream(); - let s = body.map(|try_c| try_c.map(|c| c.into_bytes())).try_concat(); + let body = form.stream().into_stream(); + let s = body.map(|try_c| try_c.map(Bytes::from)).try_concat(); let out = rt.block_on(s); assert_eq!(out.unwrap(), Vec::new()); @@ -530,8 +541,8 @@ mod tests { Content-Disposition: form-data; name=\"key3\"; filename=\"filename\"\r\n\r\n\ value3\r\n--boundary--\r\n"; let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt"); - let body = form.stream(); - let s = body.map(|try_c| try_c.map(|c| c.into_bytes())).try_concat(); + let body = form.stream().into_stream(); + let s = body.map(|try_c| try_c.map(Bytes::from)).try_concat(); let out = rt.block_on(s).unwrap(); // These prints are for debug purposes in case the test fails @@ -557,8 +568,8 @@ mod tests { value2\r\n\ --boundary--\r\n"; let mut rt = tokio::runtime::current_thread::Runtime::new().expect("new rt"); - let body = form.stream(); - let s = body.map(|try_c| try_c.map(|c| c.into_bytes())).try_concat(); + let body = form.stream().into_stream(); + let s = body.map(|try_c| try_c.map(Bytes::from)).try_concat(); let out = rt.block_on(s).unwrap(); // These prints are for debug purposes in case the test fails diff --git a/src/async_impl/request.rs b/src/async_impl/request.rs index 53be49b..b8366e4 100644 --- a/src/async_impl/request.rs +++ b/src/async_impl/request.rs @@ -219,7 +219,7 @@ impl RequestBuilder { }; if let Ok(ref mut req) = builder.request { - *req.body_mut() = Some(Body::wrap(multipart.stream())) + *req.body_mut() = Some(multipart.stream()) } builder } diff --git a/tests/redirect.rs b/tests/redirect.rs index f863fd4..e0c3818 100644 --- a/tests/redirect.rs +++ b/tests/redirect.rs @@ -80,12 +80,14 @@ async fn test_redirect_307_and_308_tries_to_get_again() { #[tokio::test] async fn test_redirect_307_and_308_tries_to_post_again() { + let _ = env_logger::try_init(); let client = reqwest::Client::new(); let codes = [307u16, 308]; for &code in codes.iter() { let redirect = server::http(move |mut req| { async move { assert_eq!(req.method(), "POST"); + assert_eq!(req.headers()["content-length"], "5"); let data = req.body_mut().next().await.unwrap().unwrap(); assert_eq!(&*data, b"Hello");