diff --git a/.appveyor.yml b/.appveyor.yml index a20323b..523f526 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -15,5 +15,5 @@ install: - cargo -vV build: false test_script: - - cargo test -- --test-threads=1 + - cargo test --features blocking,gzip -- --test-threads=1 skip_branch_with_pr: true diff --git a/.travis.yml b/.travis.yml index 9edd3e2..fb8294c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,6 +32,16 @@ matrix: - rust: nightly env: FEATURES="--features cookies" + # optional blocking + #- rust: stable + - rust: nightly + env: FEATURES="--features blocking" + + # optional gzip + #- rust: stable + - rust: nightly + env: FEATURES="--features gzip" + # socks #- rust: stable #- rust: nightly diff --git a/Cargo.toml b/Cargo.toml index 133fa47..b3395f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,6 @@ time = "0.1.42" # TODO: candidates for optional features -async-compression = { version = "0.1.0-alpha.4", default-features = false, features = ["gzip", "stream"] } serde = "1.0" serde_json = "1.0" @@ -63,6 +62,9 @@ futures-channel-preview = { version = "=0.3.0-alpha.18", optional = true } cookie_crate = { version = "0.12", package = "cookie", optional = true } cookie_store = { version = "0.9", optional = true } +## gzip +async-compression = { version = "0.1.0-alpha.4", default-features = false, features = ["gzip", "stream"], optional = true } + ## socks #socks = { version = "0.3.2", optional = true } @@ -87,10 +89,12 @@ default-tls-vendored = ["default-tls", "native-tls/vendored"] rustls-tls = ["hyper-rustls", "tokio-rustls", "webpki-roots", "rustls", "tls"] -blocking = ["futures-channel-preview"] +blocking = ["futures-channel-preview", "futures-util-preview/io"] cookies = ["cookie_crate", "cookie_store"] +gzip = ["async-compression"] + #trust-dns = ["trust-dns-resolver"] [target.'cfg(windows)'.dependencies] @@ -111,3 +115,8 @@ name = "cookie" path = "tests/cookie.rs" required-features = ["cookies"] +[[test]] +name = "gzip" +path = "tests/gzip.rs" +required-features = ["gzip"] + diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index dcfbf44..4919910 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -67,6 +67,7 @@ impl Body { } } + #[cfg(any(feature = "blocking", feature = "gzip",))] pub(crate) fn empty() -> Body { Body::wrap(hyper::Body::empty()) } diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 650341b..93590af 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -97,7 +97,7 @@ impl ClientBuilder { ClientBuilder { config: Config { - gzip: true, + gzip: cfg!(feature = "gzip"), headers, #[cfg(feature = "default-tls")] hostname_verification: true, @@ -312,22 +312,45 @@ impl ClientBuilder { self } - /// Enable auto gzip decompression by checking the ContentEncoding response header. + /// Enable auto gzip decompression by checking the `Content-Encoding` response header. /// /// If auto gzip decompresson is turned on: + /// /// - When sending a request and if the request's headers do not already contain /// an `Accept-Encoding` **and** `Range` values, the `Accept-Encoding` header is set to `gzip`. - /// The body is **not** automatically compressed. + /// The request body is **not** automatically compressed. /// - When receiving a response, if it's headers contain a `Content-Encoding` value that /// equals to `gzip`, both values `Content-Encoding` and `Content-Length` are removed from the - /// headers' set. The body is automatically decompressed. + /// headers' set. The response body is automatically decompressed. /// - /// Default is enabled. + /// If the `gzip` feature is turned on, the default option is enabled. + /// + /// # Optional + /// + /// This requires the optional `gzip` feature to be enabled + #[cfg(feature = "gzip")] pub fn gzip(mut self, enable: bool) -> ClientBuilder { self.config.gzip = enable; self } + /// Disable auto response body gzip decompression. + /// + /// This method exists even if the optional `gzip` feature is not enabled. + /// This can be used to ensure a `Client` doesn't use gzip decompression + /// even if another dependency were to enable the optional `gzip` feature. + pub fn no_gzip(self) -> ClientBuilder { + #[cfg(feature = "gzip")] + { + self.gzip(false) + } + + #[cfg(not(feature = "gzip"))] + { + self + } + } + /// Add a `Proxy` to the list of proxies the `Client` will use. pub fn proxy(mut self, proxy: Proxy) -> ClientBuilder { self.config.proxies.push(proxy); diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index b785b69..4efdc95 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -1,197 +1,229 @@ -/*! -A potentially non-blocking response decoder. +pub(crate) use self::imp::Decoder; -The decoder wraps a stream of chunks and produces a new stream of decompressed chunks. -The decompressed chunks aren't guaranteed to align to the compressed ones. +#[cfg(not(feature = "gzip"))] +mod imp { + use std::pin::Pin; + use std::task::{Context, Poll}; -If the response is plaintext then no additional work is carried out. -Chunks are just passed along. + use bytes::Bytes; + use futures_core::Stream; + use http::HeaderMap; -If the response is gzip, then the chunks are decompressed into a buffer. -Slices of that buffer are emitted as new chunks. -*/ - -use std::fmt; -use std::future::Future; -use std::mem; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use async_compression::stream::GzipDecoder; -use bytes::Bytes; -use futures_core::Stream; -use futures_util::stream::Peekable; -use hyper::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; -use hyper::HeaderMap; - -use log::warn; - -use super::Body; -use crate::error; - -/// A response decompressor over a non-blocking stream of chunks. -/// -/// The inner decoder may be constructed asynchronously. -pub(crate) struct Decoder { - inner: Inner, -} - -enum Inner { - /// A `PlainText` decoder just returns the response content as is. - PlainText(super::body::ImplStream), - /// A `Gzip` decoder will uncompress the gzipped response content before returning it. - Gzip(GzipDecoder>), - /// A decoder that doesn't have a value yet. - Pending(Pending), -} - -/// A future attempt to poll the response body for EOF so we know whether to use gzip or not. -struct Pending(Peekable); - -struct IoStream(super::body::ImplStream); - -impl fmt::Debug for Decoder { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Decoder").finish() - } -} - -impl Decoder { - /// An empty decoder. - /// - /// This decoder will produce a single 0 byte chunk. - #[cfg(feature = "blocking")] - pub(crate) fn empty() -> Decoder { - Decoder { - inner: Inner::PlainText(Body::empty().into_stream()), - } + use super::super::Body; + pub(crate) struct Decoder { + inner: super::super::body::ImplStream, } - /// A plain text decoder. - /// - /// This decoder will emit the underlying chunks as-is. - fn plain_text(body: Body) -> Decoder { - Decoder { - inner: Inner::PlainText(body.into_stream()), + impl Decoder { + #[cfg(feature = "blocking")] + pub(crate) fn empty() -> Decoder { + Decoder::plain_text(Body::empty()) } - } - /// A gzip decoder. - /// - /// This decoder will buffer and decompress chunks that are gzipped. - fn gzip(body: Body) -> Decoder { - use futures_util::StreamExt; - - Decoder { - inner: Inner::Pending(Pending(IoStream(body.into_stream()).peekable())), - } - } - - /// Constructs a Decoder from a hyper request. - /// - /// A decoder is just a wrapper around the hyper request that knows - /// how to decode the content body of the request. - /// - /// Uses the correct variant by inspecting the Content-Encoding header. - pub(crate) fn detect(headers: &mut HeaderMap, body: Body, check_gzip: bool) -> Decoder { - if !check_gzip { - return Decoder::plain_text(body); - } - let content_encoding_gzip: bool; - let mut is_gzip = { - content_encoding_gzip = headers - .get_all(CONTENT_ENCODING) - .iter() - .any(|enc| enc == "gzip"); - content_encoding_gzip - || headers - .get_all(TRANSFER_ENCODING) - .iter() - .any(|enc| enc == "gzip") - }; - if is_gzip { - if let Some(content_length) = headers.get(CONTENT_LENGTH) { - if content_length == "0" { - warn!("gzip response with content-length of 0"); - is_gzip = false; - } + /// A plain text decoder. + /// + /// This decoder will emit the underlying chunks as-is. + fn plain_text(body: Body) -> Decoder { + Decoder { + inner: body.into_stream(), } } - if content_encoding_gzip { - headers.remove(CONTENT_ENCODING); - headers.remove(CONTENT_LENGTH); - } - if is_gzip { - Decoder::gzip(body) - } else { + + pub(crate) fn detect(_: &mut HeaderMap, body: Body, _: bool) -> Decoder { Decoder::plain_text(body) } } -} -impl Stream for Decoder { - type Item = Result; + impl Stream for Decoder { + type Item = crate::Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // Do a read or poll for a pending decoder value. - let new_value = match self.inner { - Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { - Poll::Ready(Ok(inner)) => inner, - Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(crate::error::decode_io(e)))), - Poll::Pending => return Poll::Pending, - }, - Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx), - Inner::Gzip(ref mut decoder) => { - return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), - Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), - None => Poll::Ready(None), - } - } - }; - - self.inner = new_value; - self.poll_next(cx) - } -} - -impl Future for Pending { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_util::StreamExt; - - match futures_core::ready!(Pin::new(&mut self.0).peek(cx)) { - Some(Ok(_)) => { - // fallthrough - } - Some(Err(_e)) => { - // error was just a ref, so we need to really poll to move it - return Poll::Ready(Err(futures_core::ready!( - Pin::new(&mut self.0).poll_next(cx) - ) - .expect("just peeked Some") - .unwrap_err())); - } - None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), - }; - - let body = mem::replace( - &mut self.0, - IoStream(Body::empty().into_stream()).peekable(), - ); - Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(body)))) - } -} - -impl Stream for IoStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { - Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), - Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), - None => Poll::Ready(None), + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } + } +} + +#[cfg(feature = "gzip")] +mod imp { + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::{fmt, mem}; + + use async_compression::stream::GzipDecoder; + use bytes::Bytes; + use futures_core::Stream; + use futures_util::stream::Peekable; + use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; + use http::HeaderMap; + use log::warn; + + use super::super::Body; + use crate::error; + + /// A response decompressor over a non-blocking stream of chunks. + /// + /// The inner decoder may be constructed asynchronously. + pub(crate) struct Decoder { + inner: Inner, + } + + enum Inner { + /// A `PlainText` decoder just returns the response content as is. + PlainText(super::super::body::ImplStream), + /// A `Gzip` decoder will uncompress the gzipped response content before returning it. + Gzip(GzipDecoder>), + /// A decoder that doesn't have a value yet. + Pending(Pending), + } + + /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. + struct Pending(Peekable); + + struct IoStream(super::super::body::ImplStream); + + impl fmt::Debug for Decoder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Decoder").finish() + } + } + + impl Decoder { + #[cfg(feature = "blocking")] + pub(crate) fn empty() -> Decoder { + Decoder { + inner: Inner::PlainText(Body::empty().into_stream()), + } + } + + /// A plain text decoder. + /// + /// This decoder will emit the underlying chunks as-is. + fn plain_text(body: Body) -> Decoder { + Decoder { + inner: Inner::PlainText(body.into_stream()), + } + } + + /// A gzip decoder. + /// + /// This decoder will buffer and decompress chunks that are gzipped. + fn gzip(body: Body) -> Decoder { + use futures_util::StreamExt; + + Decoder { + inner: Inner::Pending(Pending(IoStream(body.into_stream()).peekable())), + } + } + + /// Constructs a Decoder from a hyper request. + /// + /// A decoder is just a wrapper around the hyper request that knows + /// how to decode the content body of the request. + /// + /// Uses the correct variant by inspecting the Content-Encoding header. + pub(crate) fn detect(headers: &mut HeaderMap, body: Body, check_gzip: bool) -> Decoder { + if !check_gzip { + return Decoder::plain_text(body); + } + let content_encoding_gzip: bool; + let mut is_gzip = { + content_encoding_gzip = headers + .get_all(CONTENT_ENCODING) + .iter() + .any(|enc| enc == "gzip"); + content_encoding_gzip + || headers + .get_all(TRANSFER_ENCODING) + .iter() + .any(|enc| enc == "gzip") + }; + if is_gzip { + if let Some(content_length) = headers.get(CONTENT_LENGTH) { + if content_length == "0" { + warn!("gzip response with content-length of 0"); + is_gzip = false; + } + } + } + if content_encoding_gzip { + headers.remove(CONTENT_ENCODING); + headers.remove(CONTENT_LENGTH); + } + if is_gzip { + Decoder::gzip(body) + } else { + Decoder::plain_text(body) + } + } + } + + impl Stream for Decoder { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // Do a read or poll for a pending decoder value. + let new_value = match self.inner { + Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { + Poll::Ready(Ok(inner)) => inner, + Poll::Ready(Err(e)) => { + return Poll::Ready(Some(Err(crate::error::decode_io(e)))) + } + Poll::Pending => return Poll::Pending, + }, + Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx), + Inner::Gzip(ref mut decoder) => { + return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), + Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), + None => Poll::Ready(None), + } + } + }; + + self.inner = new_value; + self.poll_next(cx) + } + } + + impl Future for Pending { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures_util::StreamExt; + + match futures_core::ready!(Pin::new(&mut self.0).peek(cx)) { + Some(Ok(_)) => { + // fallthrough + } + Some(Err(_e)) => { + // error was just a ref, so we need to really poll to move it + return Poll::Ready(Err(futures_core::ready!( + Pin::new(&mut self.0).poll_next(cx) + ) + .expect("just peeked Some") + .unwrap_err())); + } + None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), + }; + + let body = mem::replace( + &mut self.0, + IoStream(Body::empty().into_stream()).peekable(), + ); + Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(body)))) + } + } + + impl Stream for IoStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { + Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), + Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), + None => Poll::Ready(None), + } } } } diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 86918c4..38b8b99 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -53,7 +53,6 @@ pub struct Client { /// use std::time::Duration; /// /// let client = reqwest::blocking::Client::builder() -/// .gzip(true) /// .timeout(Duration::from_secs(10)) /// .build()?; /// # Ok(()) @@ -256,21 +255,36 @@ impl ClientBuilder { self.with_inner(move |inner| inner.default_headers(headers)) } - /// Enable auto gzip decompression by checking the ContentEncoding response header. + /// Enable auto gzip decompression by checking the `Content-Encoding` response header. /// /// If auto gzip decompresson is turned on: + /// /// - When sending a request and if the request's headers do not already contain /// an `Accept-Encoding` **and** `Range` values, the `Accept-Encoding` header is set to `gzip`. - /// The body is **not** automatically compressed. + /// The request body is **not** automatically compressed. /// - When receiving a response, if it's headers contain a `Content-Encoding` value that /// equals to `gzip`, both values `Content-Encoding` and `Content-Length` are removed from the - /// headers' set. The body is automatically decompressed. + /// headers' set. The response body is automatically decompressed. /// - /// Default is enabled. + /// If the `gzip` feature is turned on, the default option is enabled. + /// + /// # Optional + /// + /// This requires the optional `gzip` feature to be enabled + #[cfg(feature = "gzip")] pub fn gzip(self, enable: bool) -> ClientBuilder { self.with_inner(|inner| inner.gzip(enable)) } + /// Disable auto response body gzip decompression. + /// + /// This method exists even if the optional `gzip` feature is not enabled. + /// This can be used to ensure a `Client` doesn't use gzip decompression + /// even if another dependency were to enable the optional `gzip` feature. + pub fn no_gzip(self) -> ClientBuilder { + self.with_inner(|inner| inner.no_gzip()) + } + /// Add a `Proxy` to the list of proxies the `Client` will use. pub fn proxy(self, proxy: Proxy) -> ClientBuilder { self.with_inner(move |inner| inner.proxy(proxy)) diff --git a/src/error.rs b/src/error.rs index 09039d3..2c15528 100644 --- a/src/error.rs +++ b/src/error.rs @@ -99,6 +99,7 @@ impl Error { self } + #[allow(unused)] pub(crate) fn into_io(self) -> io::Error { io::Error::new(io::ErrorKind::Other, self) } @@ -214,11 +215,12 @@ pub(crate) fn url_bad_scheme(url: Url) -> Error { // io::Error helpers -#[cfg(feature = "blocking")] +#[allow(unused)] pub(crate) fn into_io(e: Error) -> io::Error { e.into_io() } +#[allow(unused)] pub(crate) fn decode_io(e: io::Error) -> Error { if e.get_ref().map(|r| r.is::()).unwrap_or(false) { *e.into_inner() diff --git a/src/lib.rs b/src/lib.rs index 3127621..4aaf0c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,6 +158,7 @@ //! - **rustls-tls**: Provides TLS support via the `rustls` library. //! - **blocking**: Provides the [blocking][] client API. //! - **cookies**: Provides cookie session support. +//! - **gzip**: Provides response body gzip decompression. //! //! //! [hyper]: http://hyper.rs diff --git a/tests/client.rs b/tests/client.rs index c490bc3..08b76f8 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -11,7 +11,9 @@ async fn auto_headers() { assert_eq!(req.headers()["accept"], "*/*"); assert_eq!(req.headers()["user-agent"], DEFAULT_USER_AGENT); - assert_eq!(req.headers()["accept-encoding"], "gzip"); + if cfg!(feature = "gzip") { + assert_eq!(req.headers()["accept-encoding"], "gzip"); + } http::Response::default() }