pub(crate) use self::imp::Decoder; #[cfg(not(feature = "gzip"))] mod imp { use std::pin::Pin; use std::task::{Context, Poll}; use bytes::Bytes; use futures_core::Stream; use http::HeaderMap; use super::super::Body; pub(crate) struct Decoder { inner: super::super::body::ImplStream, } impl Decoder { #[cfg(feature = "blocking")] pub(crate) fn empty() -> Decoder { Decoder::plain_text(Body::empty()) } /// A plain text decoder. /// /// This decoder will emit the underlying chunks as-is. fn plain_text(body: Body) -> Decoder { Decoder { inner: body.into_stream(), } } pub(crate) fn detect(_: &mut HeaderMap, body: Body, _: bool) -> Decoder { Decoder::plain_text(body) } } impl Stream for Decoder { type Item = crate::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Pin::new(&mut self.inner).poll_next(cx) } } } #[cfg(feature = "gzip")] mod imp { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, mem}; use async_compression::stream::GzipDecoder; use bytes::Bytes; use futures_core::Stream; use futures_util::stream::Peekable; use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::HeaderMap; use log::warn; use super::super::Body; use crate::error; /// A response decompressor over a non-blocking stream of chunks. /// /// The inner decoder may be constructed asynchronously. pub(crate) struct Decoder { inner: Inner, } enum Inner { /// A `PlainText` decoder just returns the response content as is. PlainText(super::super::body::ImplStream), /// A `Gzip` decoder will uncompress the gzipped response content before returning it. Gzip(GzipDecoder>), /// A decoder that doesn't have a value yet. Pending(Pending), } /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. struct Pending(Peekable); struct IoStream(super::super::body::ImplStream); impl fmt::Debug for Decoder { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Decoder").finish() } } impl Decoder { #[cfg(feature = "blocking")] pub(crate) fn empty() -> Decoder { Decoder { inner: Inner::PlainText(Body::empty().into_stream()), } } /// A plain text decoder. /// /// This decoder will emit the underlying chunks as-is. fn plain_text(body: Body) -> Decoder { Decoder { inner: Inner::PlainText(body.into_stream()), } } /// A gzip decoder. /// /// This decoder will buffer and decompress chunks that are gzipped. fn gzip(body: Body) -> Decoder { use futures_util::StreamExt; Decoder { inner: Inner::Pending(Pending(IoStream(body.into_stream()).peekable())), } } /// Constructs a Decoder from a hyper request. /// /// A decoder is just a wrapper around the hyper request that knows /// how to decode the content body of the request. /// /// Uses the correct variant by inspecting the Content-Encoding header. pub(crate) fn detect(headers: &mut HeaderMap, body: Body, check_gzip: bool) -> Decoder { if !check_gzip { return Decoder::plain_text(body); } let content_encoding_gzip: bool; let mut is_gzip = { content_encoding_gzip = headers .get_all(CONTENT_ENCODING) .iter() .any(|enc| enc == "gzip"); content_encoding_gzip || headers .get_all(TRANSFER_ENCODING) .iter() .any(|enc| enc == "gzip") }; if is_gzip { if let Some(content_length) = headers.get(CONTENT_LENGTH) { if content_length == "0" { warn!("gzip response with content-length of 0"); is_gzip = false; } } } if content_encoding_gzip { headers.remove(CONTENT_ENCODING); headers.remove(CONTENT_LENGTH); } if is_gzip { Decoder::gzip(body) } else { Decoder::plain_text(body) } } } impl Stream for Decoder { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { // Do a read or poll for a pending decoder value. let new_value = match self.inner { Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { Poll::Ready(Ok(inner)) => inner, Poll::Ready(Err(e)) => { return Poll::Ready(Some(Err(crate::error::decode_io(e)))) } Poll::Pending => return Poll::Pending, }, Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx), Inner::Gzip(ref mut decoder) => { return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), None => Poll::Ready(None), } } }; self.inner = new_value; self.poll_next(cx) } } impl Future for Pending { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { use futures_util::StreamExt; match futures_core::ready!(Pin::new(&mut self.0).peek(cx)) { Some(Ok(_)) => { // fallthrough } Some(Err(_e)) => { // error was just a ref, so we need to really poll to move it return Poll::Ready(Err(futures_core::ready!( Pin::new(&mut self.0).poll_next(cx) ) .expect("just peeked Some") .unwrap_err())); } None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), }; let body = mem::replace( &mut self.0, IoStream(Body::empty().into_stream()).peekable(), ); Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(body)))) } } impl Stream for IoStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), None => Poll::Ready(None), } } } }