Improve performance of Response::bytes() (#827)
This commit is contained in:
		| @@ -1,294 +1,319 @@ | ||||
| pub(crate) use self::imp::Decoder; | ||||
| use std::fmt; | ||||
| use std::future::Future; | ||||
| use std::pin::Pin; | ||||
| use std::task::{Context, Poll}; | ||||
|  | ||||
| mod imp { | ||||
|     use std::fmt; | ||||
|     use std::future::Future; | ||||
|     use std::pin::Pin; | ||||
|     use std::task::{Context, Poll}; | ||||
| #[cfg(feature = "gzip")] | ||||
| use async_compression::stream::GzipDecoder; | ||||
|  | ||||
| #[cfg(feature = "brotli")] | ||||
| use async_compression::stream::BrotliDecoder; | ||||
|  | ||||
| use bytes::Bytes; | ||||
| use futures_core::Stream; | ||||
| use futures_util::stream::Peekable; | ||||
| use http::HeaderMap; | ||||
| use hyper::body::HttpBody; | ||||
|  | ||||
| use super::super::Body; | ||||
| use crate::error; | ||||
|  | ||||
| /// A response decompressor over a non-blocking stream of chunks. | ||||
| /// | ||||
| /// The inner decoder may be constructed asynchronously. | ||||
| pub(crate) struct Decoder { | ||||
|     inner: Inner, | ||||
| } | ||||
|  | ||||
| enum DecoderType { | ||||
|     #[cfg(feature = "gzip")] | ||||
|     Gzip, | ||||
|     #[cfg(feature = "brotli")] | ||||
|     Brotli, | ||||
| } | ||||
|  | ||||
| enum Inner { | ||||
|     /// A `PlainText` decoder just returns the response content as is. | ||||
|     PlainText(super::body::ImplStream), | ||||
|  | ||||
|     /// A `Gzip` decoder will uncompress the gzipped response content before returning it. | ||||
|     #[cfg(feature = "gzip")] | ||||
|     Gzip(GzipDecoder<Peekable<IoStream>>), | ||||
|  | ||||
|     /// A `Brotli` decoder will uncompress the brotlied response content before returning it. | ||||
|     #[cfg(feature = "brotli")] | ||||
|     Brotli(BrotliDecoder<Peekable<IoStream>>), | ||||
|  | ||||
|     /// A decoder that doesn't have a value yet. | ||||
|     #[cfg(any(feature = "brotli", feature = "gzip"))] | ||||
|     Pending(Pending), | ||||
| } | ||||
|  | ||||
| /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. | ||||
| struct Pending(Peekable<IoStream>, DecoderType); | ||||
|  | ||||
| struct IoStream(super::body::ImplStream); | ||||
|  | ||||
| impl fmt::Debug for Decoder { | ||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||
|         f.debug_struct("Decoder").finish() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Decoder { | ||||
|     #[cfg(feature = "blocking")] | ||||
|     pub(crate) fn empty() -> Decoder { | ||||
|         Decoder { | ||||
|             inner: Inner::PlainText(Body::empty().into_stream()), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// A plain text decoder. | ||||
|     /// | ||||
|     /// This decoder will emit the underlying chunks as-is. | ||||
|     fn plain_text(body: Body) -> Decoder { | ||||
|         Decoder { | ||||
|             inner: Inner::PlainText(body.into_stream()), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// A gzip decoder. | ||||
|     /// | ||||
|     /// This decoder will buffer and decompress chunks that are gzipped. | ||||
|     #[cfg(feature = "gzip")] | ||||
|     fn gzip(body: Body) -> Decoder { | ||||
|         use futures_util::StreamExt; | ||||
|  | ||||
|         Decoder { | ||||
|             inner: Inner::Pending(Pending( | ||||
|                 IoStream(body.into_stream()).peekable(), | ||||
|                 DecoderType::Gzip, | ||||
|             )), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// A brotli decoder. | ||||
|     /// | ||||
|     /// This decoder will buffer and decompress chunks that are brotlied. | ||||
|     #[cfg(feature = "brotli")] | ||||
|     fn brotli(body: Body) -> Decoder { | ||||
|         use futures_util::StreamExt; | ||||
|  | ||||
|         Decoder { | ||||
|             inner: Inner::Pending(Pending( | ||||
|                 IoStream(body.into_stream()).peekable(), | ||||
|                 DecoderType::Brotli, | ||||
|             )), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[cfg(feature = "gzip")] | ||||
|     use async_compression::stream::GzipDecoder; | ||||
|     fn detect_gzip(headers: &mut HeaderMap) -> bool { | ||||
|         use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; | ||||
|         use log::warn; | ||||
|  | ||||
|         let content_encoding_gzip: bool; | ||||
|         let mut is_gzip = { | ||||
|             content_encoding_gzip = headers | ||||
|                 .get_all(CONTENT_ENCODING) | ||||
|                 .iter() | ||||
|                 .any(|enc| enc == "gzip"); | ||||
|             content_encoding_gzip | ||||
|                 || headers | ||||
|                     .get_all(TRANSFER_ENCODING) | ||||
|                     .iter() | ||||
|                     .any(|enc| enc == "gzip") | ||||
|         }; | ||||
|         if is_gzip { | ||||
|             if let Some(content_length) = headers.get(CONTENT_LENGTH) { | ||||
|                 if content_length == "0" { | ||||
|                     warn!("gzip response with content-length of 0"); | ||||
|                     is_gzip = false; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         if is_gzip { | ||||
|             headers.remove(CONTENT_ENCODING); | ||||
|             headers.remove(CONTENT_LENGTH); | ||||
|         } | ||||
|         is_gzip | ||||
|     } | ||||
|  | ||||
|     #[cfg(feature = "brotli")] | ||||
|     use async_compression::stream::BrotliDecoder; | ||||
|     fn detect_brotli(headers: &mut HeaderMap) -> bool { | ||||
|         use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; | ||||
|         use log::warn; | ||||
|  | ||||
|     use bytes::Bytes; | ||||
|     use futures_core::Stream; | ||||
|     use futures_util::stream::Peekable; | ||||
|     use http::HeaderMap; | ||||
|         let content_encoding_gzip: bool; | ||||
|         let mut is_brotli = { | ||||
|             content_encoding_gzip = headers | ||||
|                 .get_all(CONTENT_ENCODING) | ||||
|                 .iter() | ||||
|                 .any(|enc| enc == "br"); | ||||
|             content_encoding_gzip | ||||
|                 || headers | ||||
|                     .get_all(TRANSFER_ENCODING) | ||||
|                     .iter() | ||||
|                     .any(|enc| enc == "br") | ||||
|         }; | ||||
|         if is_brotli { | ||||
|             if let Some(content_length) = headers.get(CONTENT_LENGTH) { | ||||
|                 if content_length == "0" { | ||||
|                     warn!("brotli response with content-length of 0"); | ||||
|                     is_brotli = false; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         if is_brotli { | ||||
|             headers.remove(CONTENT_ENCODING); | ||||
|             headers.remove(CONTENT_LENGTH); | ||||
|         } | ||||
|         is_brotli | ||||
|     } | ||||
|  | ||||
|     use super::super::Body; | ||||
|     use crate::error; | ||||
|  | ||||
|     /// A response decompressor over a non-blocking stream of chunks. | ||||
|     /// Constructs a Decoder from a hyper request. | ||||
|     /// | ||||
|     /// The inner decoder may be constructed asynchronously. | ||||
|     pub(crate) struct Decoder { | ||||
|         inner: Inner, | ||||
|     } | ||||
|     /// A decoder is just a wrapper around the hyper request that knows | ||||
|     /// how to decode the content body of the request. | ||||
|     /// | ||||
|     /// Uses the correct variant by inspecting the Content-Encoding header. | ||||
|     pub(crate) fn detect( | ||||
|         _headers: &mut HeaderMap, | ||||
|         body: Body, | ||||
|         check_gzip: bool, | ||||
|         check_brotli: bool, | ||||
|     ) -> Decoder { | ||||
|         if !check_gzip && !check_brotli { | ||||
|             return Decoder::plain_text(body); | ||||
|         } | ||||
|  | ||||
|     enum DecoderType { | ||||
|         #[cfg(feature = "gzip")] | ||||
|         Gzip, | ||||
|         { | ||||
|             if Decoder::detect_gzip(_headers) { | ||||
|                 return Decoder::gzip(body); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         #[cfg(feature = "brotli")] | ||||
|         Brotli, | ||||
|         { | ||||
|             if Decoder::detect_brotli(_headers) { | ||||
|                 return Decoder::brotli(body); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Decoder::plain_text(body) | ||||
|     } | ||||
| } | ||||
|  | ||||
|     enum Inner { | ||||
|         /// A `PlainText` decoder just returns the response content as is. | ||||
|         PlainText(super::super::body::ImplStream), | ||||
| impl Stream for Decoder { | ||||
|     type Item = Result<Bytes, error::Error>; | ||||
|  | ||||
|         /// A `Gzip` decoder will uncompress the gzipped response content before returning it. | ||||
|         #[cfg(feature = "gzip")] | ||||
|         Gzip(GzipDecoder<Peekable<IoStream>>), | ||||
|  | ||||
|         /// A `Brotli` decoder will uncompress the brotlied response content before returning it. | ||||
|         #[cfg(feature = "brotli")] | ||||
|         Brotli(BrotliDecoder<Peekable<IoStream>>), | ||||
|  | ||||
|         /// A decoder that doesn't have a value yet. | ||||
|         #[cfg(any(feature = "brotli", feature = "gzip"))] | ||||
|         Pending(Pending), | ||||
|     } | ||||
|  | ||||
|     /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. | ||||
|     struct Pending(Peekable<IoStream>, DecoderType); | ||||
|  | ||||
|     struct IoStream(super::super::body::ImplStream); | ||||
|  | ||||
|     impl fmt::Debug for Decoder { | ||||
|         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||
|             f.debug_struct("Decoder").finish() | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     impl Decoder { | ||||
|         #[cfg(feature = "blocking")] | ||||
|         pub(crate) fn empty() -> Decoder { | ||||
|             Decoder { | ||||
|                 inner: Inner::PlainText(Body::empty().into_stream()), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         /// A plain text decoder. | ||||
|         /// | ||||
|         /// This decoder will emit the underlying chunks as-is. | ||||
|         fn plain_text(body: Body) -> Decoder { | ||||
|             Decoder { | ||||
|                 inner: Inner::PlainText(body.into_stream()), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         /// A gzip decoder. | ||||
|         /// | ||||
|         /// This decoder will buffer and decompress chunks that are gzipped. | ||||
|         #[cfg(feature = "gzip")] | ||||
|         fn gzip(body: Body) -> Decoder { | ||||
|             use futures_util::StreamExt; | ||||
|  | ||||
|             Decoder { | ||||
|                 inner: Inner::Pending(Pending( | ||||
|                     IoStream(body.into_stream()).peekable(), | ||||
|                     DecoderType::Gzip, | ||||
|                 )), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         /// A brotli decoder. | ||||
|         /// | ||||
|         /// This decoder will buffer and decompress chunks that are brotlied. | ||||
|         #[cfg(feature = "brotli")] | ||||
|         fn brotli(body: Body) -> Decoder { | ||||
|             use futures_util::StreamExt; | ||||
|  | ||||
|             Decoder { | ||||
|                 inner: Inner::Pending(Pending( | ||||
|                     IoStream(body.into_stream()).peekable(), | ||||
|                     DecoderType::Brotli, | ||||
|                 )), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         #[cfg(feature = "gzip")] | ||||
|         fn detect_gzip(headers: &mut HeaderMap) -> bool { | ||||
|             use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; | ||||
|             use log::warn; | ||||
|  | ||||
|             let content_encoding_gzip: bool; | ||||
|             let mut is_gzip = { | ||||
|                 content_encoding_gzip = headers | ||||
|                     .get_all(CONTENT_ENCODING) | ||||
|                     .iter() | ||||
|                     .any(|enc| enc == "gzip"); | ||||
|                 content_encoding_gzip | ||||
|                     || headers | ||||
|                         .get_all(TRANSFER_ENCODING) | ||||
|                         .iter() | ||||
|                         .any(|enc| enc == "gzip") | ||||
|             }; | ||||
|             if is_gzip { | ||||
|                 if let Some(content_length) = headers.get(CONTENT_LENGTH) { | ||||
|                     if content_length == "0" { | ||||
|                         warn!("gzip response with content-length of 0"); | ||||
|                         is_gzip = false; | ||||
|                     } | ||||
|     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | ||||
|         // Do a read or poll for a pending decoder value. | ||||
|         match self.inner { | ||||
|             #[cfg(any(feature = "brotli", feature = "gzip"))] | ||||
|             Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { | ||||
|                 Poll::Ready(Ok(inner)) => { | ||||
|                     self.inner = inner; | ||||
|                     return self.poll_next(cx); | ||||
|                 } | ||||
|             } | ||||
|             if is_gzip { | ||||
|                 headers.remove(CONTENT_ENCODING); | ||||
|                 headers.remove(CONTENT_LENGTH); | ||||
|             } | ||||
|             is_gzip | ||||
|         } | ||||
|  | ||||
|         #[cfg(feature = "brotli")] | ||||
|         fn detect_brotli(headers: &mut HeaderMap) -> bool { | ||||
|             use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; | ||||
|             use log::warn; | ||||
|  | ||||
|             let content_encoding_gzip: bool; | ||||
|             let mut is_brotli = { | ||||
|                 content_encoding_gzip = headers | ||||
|                     .get_all(CONTENT_ENCODING) | ||||
|                     .iter() | ||||
|                     .any(|enc| enc == "br"); | ||||
|                 content_encoding_gzip | ||||
|                     || headers | ||||
|                         .get_all(TRANSFER_ENCODING) | ||||
|                         .iter() | ||||
|                         .any(|enc| enc == "br") | ||||
|             }; | ||||
|             if is_brotli { | ||||
|                 if let Some(content_length) = headers.get(CONTENT_LENGTH) { | ||||
|                     if content_length == "0" { | ||||
|                         warn!("brotli response with content-length of 0"); | ||||
|                         is_brotli = false; | ||||
|                     } | ||||
|                 Poll::Ready(Err(e)) => { | ||||
|                     return Poll::Ready(Some(Err(crate::error::decode_io(e)))); | ||||
|                 } | ||||
|             } | ||||
|             if is_brotli { | ||||
|                 headers.remove(CONTENT_ENCODING); | ||||
|                 headers.remove(CONTENT_LENGTH); | ||||
|             } | ||||
|             is_brotli | ||||
|         } | ||||
|  | ||||
|         /// Constructs a Decoder from a hyper request. | ||||
|         /// | ||||
|         /// A decoder is just a wrapper around the hyper request that knows | ||||
|         /// how to decode the content body of the request. | ||||
|         /// | ||||
|         /// Uses the correct variant by inspecting the Content-Encoding header. | ||||
|         pub(crate) fn detect( | ||||
|             _headers: &mut HeaderMap, | ||||
|             body: Body, | ||||
|             check_gzip: bool, | ||||
|             check_brotli: bool, | ||||
|         ) -> Decoder { | ||||
|             if !check_gzip && !check_brotli { | ||||
|                 return Decoder::plain_text(body); | ||||
|             } | ||||
|  | ||||
|                 Poll::Pending => return Poll::Pending, | ||||
|             }, | ||||
|             Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx), | ||||
|             #[cfg(feature = "gzip")] | ||||
|             { | ||||
|                 if Decoder::detect_gzip(_headers) { | ||||
|                     return Decoder::gzip(body); | ||||
|                 } | ||||
|             Inner::Gzip(ref mut decoder) => { | ||||
|                 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { | ||||
|                     Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), | ||||
|                     Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), | ||||
|                     None => Poll::Ready(None), | ||||
|                 }; | ||||
|             } | ||||
|  | ||||
|             #[cfg(feature = "brotli")] | ||||
|             { | ||||
|                 if Decoder::detect_brotli(_headers) { | ||||
|                     return Decoder::brotli(body); | ||||
|                 } | ||||
|             Inner::Brotli(ref mut decoder) => { | ||||
|                 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { | ||||
|                     Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), | ||||
|                     Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), | ||||
|                     None => Poll::Ready(None), | ||||
|                 }; | ||||
|             } | ||||
|         }; | ||||
|     } | ||||
| } | ||||
|  | ||||
|             Decoder::plain_text(body) | ||||
|         } | ||||
| impl HttpBody for Decoder { | ||||
|     type Data = Bytes; | ||||
|     type Error = crate::Error; | ||||
|  | ||||
|     fn poll_data( | ||||
|         self: Pin<&mut Self>, | ||||
|         cx: &mut Context, | ||||
|     ) -> Poll<Option<Result<Self::Data, Self::Error>>> { | ||||
|         self.poll_next(cx) | ||||
|     } | ||||
|  | ||||
|     impl Stream for Decoder { | ||||
|         type Item = Result<Bytes, error::Error>; | ||||
|  | ||||
|         fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | ||||
|             // Do a read or poll for a pending decoder value. | ||||
|             match self.inner { | ||||
|                 #[cfg(any(feature = "brotli", feature = "gzip"))] | ||||
|                 Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { | ||||
|                     Poll::Ready(Ok(inner)) => { | ||||
|                         self.inner = inner; | ||||
|                         return self.poll_next(cx); | ||||
|                     } | ||||
|                     Poll::Ready(Err(e)) => { | ||||
|                         return Poll::Ready(Some(Err(crate::error::decode_io(e)))); | ||||
|                     } | ||||
|                     Poll::Pending => return Poll::Pending, | ||||
|                 }, | ||||
|                 Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx), | ||||
|                 #[cfg(feature = "gzip")] | ||||
|                 Inner::Gzip(ref mut decoder) => { | ||||
|                     return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { | ||||
|                         Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), | ||||
|                         Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), | ||||
|                         None => Poll::Ready(None), | ||||
|                     }; | ||||
|                 } | ||||
|                 #[cfg(feature = "brotli")] | ||||
|                 Inner::Brotli(ref mut decoder) => { | ||||
|                     return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { | ||||
|                         Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), | ||||
|                         Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), | ||||
|                         None => Poll::Ready(None), | ||||
|                     }; | ||||
|                 } | ||||
|             }; | ||||
|         } | ||||
|     fn poll_trailers( | ||||
|         self: Pin<&mut Self>, | ||||
|         _cx: &mut Context, | ||||
|     ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { | ||||
|         Poll::Ready(Ok(None)) | ||||
|     } | ||||
|  | ||||
|     impl Future for Pending { | ||||
|         type Output = Result<Inner, std::io::Error>; | ||||
|  | ||||
|         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|             use futures_util::StreamExt; | ||||
|  | ||||
|             match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) { | ||||
|                 Some(Ok(_)) => { | ||||
|                     // fallthrough | ||||
|                 } | ||||
|                 Some(Err(_e)) => { | ||||
|                     // error was just a ref, so we need to really poll to move it | ||||
|                     return Poll::Ready(Err(futures_core::ready!( | ||||
|                         Pin::new(&mut self.0).poll_next(cx) | ||||
|                     ) | ||||
|                     .expect("just peeked Some") | ||||
|                     .unwrap_err())); | ||||
|                 } | ||||
|                 None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), | ||||
|             }; | ||||
|  | ||||
|             let _body = std::mem::replace( | ||||
|                 &mut self.0, | ||||
|                 IoStream(Body::empty().into_stream()).peekable(), | ||||
|             ); | ||||
|  | ||||
|             match self.1 { | ||||
|                 #[cfg(feature = "brotli")] | ||||
|                 DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(BrotliDecoder::new(_body)))), | ||||
|                 #[cfg(feature = "gzip")] | ||||
|                 DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(_body)))), | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     impl Stream for IoStream { | ||||
|         type Item = Result<Bytes, std::io::Error>; | ||||
|  | ||||
|         fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | ||||
|             match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { | ||||
|                 Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), | ||||
|                 Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), | ||||
|                 None => Poll::Ready(None), | ||||
|             } | ||||
|     fn size_hint(&self) -> http_body::SizeHint { | ||||
|         match self.inner { | ||||
|             Inner::PlainText(ref body) => HttpBody::size_hint(body), | ||||
|             // the rest are "unknown", so default | ||||
|             #[cfg(any(feature = "brotli", feature = "gzip"))] | ||||
|             _ => http_body::SizeHint::default(), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Future for Pending { | ||||
|     type Output = Result<Inner, std::io::Error>; | ||||
|  | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         use futures_util::StreamExt; | ||||
|  | ||||
|         match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) { | ||||
|             Some(Ok(_)) => { | ||||
|                 // fallthrough | ||||
|             } | ||||
|             Some(Err(_e)) => { | ||||
|                 // error was just a ref, so we need to really poll to move it | ||||
|                 return Poll::Ready(Err(futures_core::ready!( | ||||
|                     Pin::new(&mut self.0).poll_next(cx) | ||||
|                 ) | ||||
|                 .expect("just peeked Some") | ||||
|                 .unwrap_err())); | ||||
|             } | ||||
|             None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), | ||||
|         }; | ||||
|  | ||||
|         let _body = std::mem::replace( | ||||
|             &mut self.0, | ||||
|             IoStream(Body::empty().into_stream()).peekable(), | ||||
|         ); | ||||
|  | ||||
|         match self.1 { | ||||
|             #[cfg(feature = "brotli")] | ||||
|             DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(BrotliDecoder::new(_body)))), | ||||
|             #[cfg(feature = "gzip")] | ||||
|             DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(_body)))), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Stream for IoStream { | ||||
|     type Item = Result<Bytes, std::io::Error>; | ||||
|  | ||||
|     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | ||||
|         match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { | ||||
|             Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), | ||||
|             Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), | ||||
|             None => Poll::Ready(None), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -2,12 +2,11 @@ use std::borrow::Cow; | ||||
| use std::fmt; | ||||
| use std::net::SocketAddr; | ||||
|  | ||||
| use bytes::{Bytes, BytesMut}; | ||||
| use bytes::Bytes; | ||||
| use encoding_rs::{Encoding, UTF_8}; | ||||
| use futures_util::stream::StreamExt; | ||||
| use http; | ||||
| use hyper::client::connect::HttpInfo; | ||||
| use hyper::header::CONTENT_LENGTH; | ||||
| use hyper::{HeaderMap, StatusCode, Version}; | ||||
| use mime::Mime; | ||||
| #[cfg(feature = "json")] | ||||
| @@ -89,13 +88,12 @@ impl Response { | ||||
|     /// Reasons it may not be known: | ||||
|     /// | ||||
|     /// - The server didn't send a `content-length` header. | ||||
|     /// - The response is gzipped and automatically decoded (thus changing | ||||
|     /// - The response is compressed and automatically decoded (thus changing | ||||
|     ///   the actual decoded length). | ||||
|     pub fn content_length(&self) -> Option<u64> { | ||||
|         self.headers() | ||||
|             .get(CONTENT_LENGTH) | ||||
|             .and_then(|ct_len| ct_len.to_str().ok()) | ||||
|             .and_then(|ct_len| ct_len.parse().ok()) | ||||
|         use hyper::body::HttpBody; | ||||
|  | ||||
|         HttpBody::size_hint(&self.body).exact() | ||||
|     } | ||||
|  | ||||
|     /// Retrieve the cookies contained in the response. | ||||
| @@ -259,12 +257,8 @@ impl Response { | ||||
|     /// # Ok(()) | ||||
|     /// # } | ||||
|     /// ``` | ||||
|     pub async fn bytes(mut self) -> crate::Result<Bytes> { | ||||
|         let mut buf = BytesMut::new(); | ||||
|         while let Some(chunk) = self.body.next().await { | ||||
|             buf.extend(chunk?); | ||||
|         } | ||||
|         Ok(buf.freeze()) | ||||
|     pub async fn bytes(self) -> crate::Result<Bytes> { | ||||
|         hyper::body::to_bytes(self.body).await | ||||
|     } | ||||
|  | ||||
|     /// Stream a chunk of the response body. | ||||
|   | ||||
		Reference in New Issue
	
	Block a user