From c916dc03cc274759a29b9f582ad6fad88a29cfd6 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 27 Feb 2020 12:44:04 -0800 Subject: [PATCH] Improve performance of Response::bytes() (#827) --- src/async_impl/decoder.rs | 557 +++++++++++++++++++------------------ src/async_impl/response.rs | 20 +- tests/client.rs | 19 ++ 3 files changed, 317 insertions(+), 279 deletions(-) diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index 15d1fca..2a6ec55 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -1,294 +1,319 @@ -pub(crate) use self::imp::Decoder; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; -mod imp { - use std::fmt; - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; +#[cfg(feature = "gzip")] +use async_compression::stream::GzipDecoder; + +#[cfg(feature = "brotli")] +use async_compression::stream::BrotliDecoder; + +use bytes::Bytes; +use futures_core::Stream; +use futures_util::stream::Peekable; +use http::HeaderMap; +use hyper::body::HttpBody; + +use super::super::Body; +use crate::error; + +/// A response decompressor over a non-blocking stream of chunks. +/// +/// The inner decoder may be constructed asynchronously. +pub(crate) struct Decoder { + inner: Inner, +} + +enum DecoderType { + #[cfg(feature = "gzip")] + Gzip, + #[cfg(feature = "brotli")] + Brotli, +} + +enum Inner { + /// A `PlainText` decoder just returns the response content as is. + PlainText(super::body::ImplStream), + + /// A `Gzip` decoder will uncompress the gzipped response content before returning it. + #[cfg(feature = "gzip")] + Gzip(GzipDecoder>), + + /// A `Brotli` decoder will uncompress the brotlied response content before returning it. + #[cfg(feature = "brotli")] + Brotli(BrotliDecoder>), + + /// A decoder that doesn't have a value yet. + #[cfg(any(feature = "brotli", feature = "gzip"))] + Pending(Pending), +} + +/// A future attempt to poll the response body for EOF so we know whether to use gzip or not. +struct Pending(Peekable, DecoderType); + +struct IoStream(super::body::ImplStream); + +impl fmt::Debug for Decoder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Decoder").finish() + } +} + +impl Decoder { + #[cfg(feature = "blocking")] + pub(crate) fn empty() -> Decoder { + Decoder { + inner: Inner::PlainText(Body::empty().into_stream()), + } + } + + /// A plain text decoder. + /// + /// This decoder will emit the underlying chunks as-is. + fn plain_text(body: Body) -> Decoder { + Decoder { + inner: Inner::PlainText(body.into_stream()), + } + } + + /// A gzip decoder. + /// + /// This decoder will buffer and decompress chunks that are gzipped. + #[cfg(feature = "gzip")] + fn gzip(body: Body) -> Decoder { + use futures_util::StreamExt; + + Decoder { + inner: Inner::Pending(Pending( + IoStream(body.into_stream()).peekable(), + DecoderType::Gzip, + )), + } + } + + /// A brotli decoder. + /// + /// This decoder will buffer and decompress chunks that are brotlied. + #[cfg(feature = "brotli")] + fn brotli(body: Body) -> Decoder { + use futures_util::StreamExt; + + Decoder { + inner: Inner::Pending(Pending( + IoStream(body.into_stream()).peekable(), + DecoderType::Brotli, + )), + } + } #[cfg(feature = "gzip")] - use async_compression::stream::GzipDecoder; + fn detect_gzip(headers: &mut HeaderMap) -> bool { + use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; + use log::warn; + + let content_encoding_gzip: bool; + let mut is_gzip = { + content_encoding_gzip = headers + .get_all(CONTENT_ENCODING) + .iter() + .any(|enc| enc == "gzip"); + content_encoding_gzip + || headers + .get_all(TRANSFER_ENCODING) + .iter() + .any(|enc| enc == "gzip") + }; + if is_gzip { + if let Some(content_length) = headers.get(CONTENT_LENGTH) { + if content_length == "0" { + warn!("gzip response with content-length of 0"); + is_gzip = false; + } + } + } + if is_gzip { + headers.remove(CONTENT_ENCODING); + headers.remove(CONTENT_LENGTH); + } + is_gzip + } #[cfg(feature = "brotli")] - use async_compression::stream::BrotliDecoder; + fn detect_brotli(headers: &mut HeaderMap) -> bool { + use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; + use log::warn; - use bytes::Bytes; - use futures_core::Stream; - use futures_util::stream::Peekable; - use http::HeaderMap; + let content_encoding_gzip: bool; + let mut is_brotli = { + content_encoding_gzip = headers + .get_all(CONTENT_ENCODING) + .iter() + .any(|enc| enc == "br"); + content_encoding_gzip + || headers + .get_all(TRANSFER_ENCODING) + .iter() + .any(|enc| enc == "br") + }; + if is_brotli { + if let Some(content_length) = headers.get(CONTENT_LENGTH) { + if content_length == "0" { + warn!("brotli response with content-length of 0"); + is_brotli = false; + } + } + } + if is_brotli { + headers.remove(CONTENT_ENCODING); + headers.remove(CONTENT_LENGTH); + } + is_brotli + } - use super::super::Body; - use crate::error; - - /// A response decompressor over a non-blocking stream of chunks. + /// Constructs a Decoder from a hyper request. /// - /// The inner decoder may be constructed asynchronously. - pub(crate) struct Decoder { - inner: Inner, - } + /// A decoder is just a wrapper around the hyper request that knows + /// how to decode the content body of the request. + /// + /// Uses the correct variant by inspecting the Content-Encoding header. + pub(crate) fn detect( + _headers: &mut HeaderMap, + body: Body, + check_gzip: bool, + check_brotli: bool, + ) -> Decoder { + if !check_gzip && !check_brotli { + return Decoder::plain_text(body); + } - enum DecoderType { #[cfg(feature = "gzip")] - Gzip, + { + if Decoder::detect_gzip(_headers) { + return Decoder::gzip(body); + } + } + #[cfg(feature = "brotli")] - Brotli, + { + if Decoder::detect_brotli(_headers) { + return Decoder::brotli(body); + } + } + + Decoder::plain_text(body) } +} - enum Inner { - /// A `PlainText` decoder just returns the response content as is. - PlainText(super::super::body::ImplStream), +impl Stream for Decoder { + type Item = Result; - /// A `Gzip` decoder will uncompress the gzipped response content before returning it. - #[cfg(feature = "gzip")] - Gzip(GzipDecoder>), - - /// A `Brotli` decoder will uncompress the brotlied response content before returning it. - #[cfg(feature = "brotli")] - Brotli(BrotliDecoder>), - - /// A decoder that doesn't have a value yet. - #[cfg(any(feature = "brotli", feature = "gzip"))] - Pending(Pending), - } - - /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. - struct Pending(Peekable, DecoderType); - - struct IoStream(super::super::body::ImplStream); - - impl fmt::Debug for Decoder { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Decoder").finish() - } - } - - impl Decoder { - #[cfg(feature = "blocking")] - pub(crate) fn empty() -> Decoder { - Decoder { - inner: Inner::PlainText(Body::empty().into_stream()), - } - } - - /// A plain text decoder. - /// - /// This decoder will emit the underlying chunks as-is. - fn plain_text(body: Body) -> Decoder { - Decoder { - inner: Inner::PlainText(body.into_stream()), - } - } - - /// A gzip decoder. - /// - /// This decoder will buffer and decompress chunks that are gzipped. - #[cfg(feature = "gzip")] - fn gzip(body: Body) -> Decoder { - use futures_util::StreamExt; - - Decoder { - inner: Inner::Pending(Pending( - IoStream(body.into_stream()).peekable(), - DecoderType::Gzip, - )), - } - } - - /// A brotli decoder. - /// - /// This decoder will buffer and decompress chunks that are brotlied. - #[cfg(feature = "brotli")] - fn brotli(body: Body) -> Decoder { - use futures_util::StreamExt; - - Decoder { - inner: Inner::Pending(Pending( - IoStream(body.into_stream()).peekable(), - DecoderType::Brotli, - )), - } - } - - #[cfg(feature = "gzip")] - fn detect_gzip(headers: &mut HeaderMap) -> bool { - use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; - use log::warn; - - let content_encoding_gzip: bool; - let mut is_gzip = { - content_encoding_gzip = headers - .get_all(CONTENT_ENCODING) - .iter() - .any(|enc| enc == "gzip"); - content_encoding_gzip - || headers - .get_all(TRANSFER_ENCODING) - .iter() - .any(|enc| enc == "gzip") - }; - if is_gzip { - if let Some(content_length) = headers.get(CONTENT_LENGTH) { - if content_length == "0" { - warn!("gzip response with content-length of 0"); - is_gzip = false; - } + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // Do a read or poll for a pending decoder value. + match self.inner { + #[cfg(any(feature = "brotli", feature = "gzip"))] + Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { + Poll::Ready(Ok(inner)) => { + self.inner = inner; + return self.poll_next(cx); } - } - if is_gzip { - headers.remove(CONTENT_ENCODING); - headers.remove(CONTENT_LENGTH); - } - is_gzip - } - - #[cfg(feature = "brotli")] - fn detect_brotli(headers: &mut HeaderMap) -> bool { - use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; - use log::warn; - - let content_encoding_gzip: bool; - let mut is_brotli = { - content_encoding_gzip = headers - .get_all(CONTENT_ENCODING) - .iter() - .any(|enc| enc == "br"); - content_encoding_gzip - || headers - .get_all(TRANSFER_ENCODING) - .iter() - .any(|enc| enc == "br") - }; - if is_brotli { - if let Some(content_length) = headers.get(CONTENT_LENGTH) { - if content_length == "0" { - warn!("brotli response with content-length of 0"); - is_brotli = false; - } + Poll::Ready(Err(e)) => { + return Poll::Ready(Some(Err(crate::error::decode_io(e)))); } - } - if is_brotli { - headers.remove(CONTENT_ENCODING); - headers.remove(CONTENT_LENGTH); - } - is_brotli - } - - /// Constructs a Decoder from a hyper request. - /// - /// A decoder is just a wrapper around the hyper request that knows - /// how to decode the content body of the request. - /// - /// Uses the correct variant by inspecting the Content-Encoding header. - pub(crate) fn detect( - _headers: &mut HeaderMap, - body: Body, - check_gzip: bool, - check_brotli: bool, - ) -> Decoder { - if !check_gzip && !check_brotli { - return Decoder::plain_text(body); - } - + Poll::Pending => return Poll::Pending, + }, + Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx), #[cfg(feature = "gzip")] - { - if Decoder::detect_gzip(_headers) { - return Decoder::gzip(body); - } + Inner::Gzip(ref mut decoder) => { + return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), + Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), + None => Poll::Ready(None), + }; } - #[cfg(feature = "brotli")] - { - if Decoder::detect_brotli(_headers) { - return Decoder::brotli(body); - } + Inner::Brotli(ref mut decoder) => { + return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { + Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), + Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), + None => Poll::Ready(None), + }; } + }; + } +} - Decoder::plain_text(body) - } +impl HttpBody for Decoder { + type Data = Bytes; + type Error = crate::Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>> { + self.poll_next(cx) } - impl Stream for Decoder { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - // Do a read or poll for a pending decoder value. - match self.inner { - #[cfg(any(feature = "brotli", feature = "gzip"))] - Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { - Poll::Ready(Ok(inner)) => { - self.inner = inner; - return self.poll_next(cx); - } - Poll::Ready(Err(e)) => { - return Poll::Ready(Some(Err(crate::error::decode_io(e)))); - } - Poll::Pending => return Poll::Pending, - }, - Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx), - #[cfg(feature = "gzip")] - Inner::Gzip(ref mut decoder) => { - return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), - Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), - None => Poll::Ready(None), - }; - } - #[cfg(feature = "brotli")] - Inner::Brotli(ref mut decoder) => { - return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { - Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), - Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), - None => Poll::Ready(None), - }; - } - }; - } + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) } - impl Future for Pending { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_util::StreamExt; - - match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) { - Some(Ok(_)) => { - // fallthrough - } - Some(Err(_e)) => { - // error was just a ref, so we need to really poll to move it - return Poll::Ready(Err(futures_core::ready!( - Pin::new(&mut self.0).poll_next(cx) - ) - .expect("just peeked Some") - .unwrap_err())); - } - None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), - }; - - let _body = std::mem::replace( - &mut self.0, - IoStream(Body::empty().into_stream()).peekable(), - ); - - match self.1 { - #[cfg(feature = "brotli")] - DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(BrotliDecoder::new(_body)))), - #[cfg(feature = "gzip")] - DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(_body)))), - } - } - } - - impl Stream for IoStream { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { - Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), - Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), - None => Poll::Ready(None), - } + fn size_hint(&self) -> http_body::SizeHint { + match self.inner { + Inner::PlainText(ref body) => HttpBody::size_hint(body), + // the rest are "unknown", so default + #[cfg(any(feature = "brotli", feature = "gzip"))] + _ => http_body::SizeHint::default(), + } + } +} + +impl Future for Pending { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures_util::StreamExt; + + match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) { + Some(Ok(_)) => { + // fallthrough + } + Some(Err(_e)) => { + // error was just a ref, so we need to really poll to move it + return Poll::Ready(Err(futures_core::ready!( + Pin::new(&mut self.0).poll_next(cx) + ) + .expect("just peeked Some") + .unwrap_err())); + } + None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), + }; + + let _body = std::mem::replace( + &mut self.0, + IoStream(Body::empty().into_stream()).peekable(), + ); + + match self.1 { + #[cfg(feature = "brotli")] + DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(BrotliDecoder::new(_body)))), + #[cfg(feature = "gzip")] + DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(_body)))), + } + } +} + +impl Stream for IoStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { + Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), + Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), + None => Poll::Ready(None), } } } diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index 19c6f6c..1e095b8 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -2,12 +2,11 @@ use std::borrow::Cow; use std::fmt; use std::net::SocketAddr; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use encoding_rs::{Encoding, UTF_8}; use futures_util::stream::StreamExt; use http; use hyper::client::connect::HttpInfo; -use hyper::header::CONTENT_LENGTH; use hyper::{HeaderMap, StatusCode, Version}; use mime::Mime; #[cfg(feature = "json")] @@ -89,13 +88,12 @@ impl Response { /// Reasons it may not be known: /// /// - The server didn't send a `content-length` header. - /// - The response is gzipped and automatically decoded (thus changing + /// - The response is compressed and automatically decoded (thus changing /// the actual decoded length). pub fn content_length(&self) -> Option { - self.headers() - .get(CONTENT_LENGTH) - .and_then(|ct_len| ct_len.to_str().ok()) - .and_then(|ct_len| ct_len.parse().ok()) + use hyper::body::HttpBody; + + HttpBody::size_hint(&self.body).exact() } /// Retrieve the cookies contained in the response. @@ -259,12 +257,8 @@ impl Response { /// # Ok(()) /// # } /// ``` - pub async fn bytes(mut self) -> crate::Result { - let mut buf = BytesMut::new(); - while let Some(chunk) = self.body.next().await { - buf.extend(chunk?); - } - Ok(buf.freeze()) + pub async fn bytes(self) -> crate::Result { + hyper::body::to_bytes(self.body).await } /// Stream a chunk of the response body. diff --git a/tests/client.rs b/tests/client.rs index 3105d4f..3465a5f 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -68,10 +68,29 @@ async fn response_text() { .send() .await .expect("Failed to get"); + assert_eq!(res.content_length(), Some(5)); let text = res.text().await.expect("Failed to get text"); assert_eq!("Hello", text); } +#[tokio::test] +async fn response_bytes() { + let _ = env_logger::try_init(); + + let server = server::http(move |_req| async { http::Response::new("Hello".into()) }); + + let client = Client::new(); + + let res = client + .get(&format!("http://{}/bytes", server.addr())) + .send() + .await + .expect("Failed to get"); + assert_eq!(res.content_length(), Some(5)); + let bytes = res.bytes().await.expect("res.bytes()"); + assert_eq!("Hello", bytes); +} + #[tokio::test] #[cfg(feature = "json")] async fn response_json() {