diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index da48a9f..c0542cf 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -43,29 +43,34 @@ pub(crate) struct Decoder { inner: Inner, } +type PeekableIoStream = Peekable; + +#[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))] +type PeekableIoStreamReader = StreamReader; + enum Inner { /// A `PlainText` decoder just returns the response content as is. PlainText(super::body::ImplStream), /// A `Gzip` decoder will uncompress the gzipped response content before returning it. #[cfg(feature = "gzip")] - Gzip(FramedRead, Bytes>>, BytesCodec>), + Gzip(Pin, BytesCodec>>>), /// A `Brotli` decoder will uncompress the brotlied response content before returning it. #[cfg(feature = "brotli")] - Brotli(FramedRead, Bytes>>, BytesCodec>), + Brotli(Pin, BytesCodec>>>), /// A `Deflate` decoder will uncompress the deflated response content before returning it. #[cfg(feature = "deflate")] - Deflate(FramedRead, Bytes>>, BytesCodec>), + Deflate(Pin, BytesCodec>>>), /// A decoder that doesn't have a value yet. #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))] - Pending(Pending), + Pending(Pin>), } /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. -struct Pending(Peekable, DecoderType); +struct Pending(PeekableIoStream, DecoderType); struct IoStream(super::body::ImplStream); @@ -109,10 +114,10 @@ impl Decoder { use futures_util::StreamExt; Decoder { - inner: Inner::Pending(Pending( + inner: Inner::Pending(Box::pin(Pending( IoStream(body.into_stream()).peekable(), DecoderType::Gzip, - )), + ))), } } @@ -124,10 +129,10 @@ impl Decoder { use futures_util::StreamExt; Decoder { - inner: Inner::Pending(Pending( + inner: Inner::Pending(Box::pin(Pending( IoStream(body.into_stream()).peekable(), DecoderType::Brotli, - )), + ))), } } @@ -139,10 +144,10 @@ impl Decoder { use futures_util::StreamExt; Decoder { - inner: Inner::Pending(Pending( + inner: Inner::Pending(Box::pin(Pending( IoStream(body.into_stream()).peekable(), DecoderType::Deflate, - )), + ))), } } @@ -218,37 +223,35 @@ impl Stream for Decoder { Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { Poll::Ready(Ok(inner)) => { self.inner = inner; - return self.poll_next(cx); + self.poll_next(cx) } - Poll::Ready(Err(e)) => { - return Poll::Ready(Some(Err(crate::error::decode_io(e)))); - } - Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(crate::error::decode_io(e)))), + Poll::Pending => Poll::Pending, }, Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx), #[cfg(feature = "gzip")] Inner::Gzip(ref mut decoder) => { - return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { + match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), None => Poll::Ready(None), - }; + } } #[cfg(feature = "brotli")] Inner::Brotli(ref mut decoder) => { - return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { + match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), None => Poll::Ready(None), - }; + } } #[cfg(feature = "deflate")] Inner::Deflate(ref mut decoder) => { - return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { + match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), None => Poll::Ready(None), - }; + } } } } @@ -310,20 +313,20 @@ impl Future for Pending { match self.1 { #[cfg(feature = "brotli")] - DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(FramedRead::new( + DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(Box::pin(FramedRead::new( BrotliDecoder::new(StreamReader::new(_body)), BytesCodec::new(), - )))), + ))))), #[cfg(feature = "gzip")] - DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(FramedRead::new( + DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(Box::pin(FramedRead::new( GzipDecoder::new(StreamReader::new(_body)), BytesCodec::new(), - )))), + ))))), #[cfg(feature = "deflate")] - DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(FramedRead::new( + DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(Box::pin(FramedRead::new( ZlibDecoder::new(StreamReader::new(_body)), BytesCodec::new(), - )))), + ))))), } } }