refactor all to async/await (#617)

Co-authored-by: Danny Browning <danny.browning@protectwise.com>
Co-authored-by: Daniel Eades <danieleades@hotmail.com>
This commit is contained in:
Sean McArthur
2019-09-06 17:22:56 -07:00
committed by GitHub
parent d7fcd8ac2e
commit ba7b2a754e
30 changed files with 1106 additions and 1430 deletions

View File

@@ -9,25 +9,16 @@ Chunks are just passed along.
If the response is gzip, then the chunks are decompressed into a buffer.
Slices of that buffer are emitted as new chunks.
This module consists of a few main types:
- `ReadableChunks` is a `Read`-like wrapper around a stream
- `Decoder` is a layer over `ReadableChunks` that applies the right decompression
The following types directly support the gzip compression case:
- `Pending` is a non-blocking constructor for a `Decoder` in case the body needs to be checked for EOF
*/
use std::cmp;
use std::fmt;
use std::io::{self, Read};
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Buf, BufMut, BytesMut};
use flate2::read::GzDecoder;
use futures::{Async, Future, Poll, Stream};
use bytes::Bytes;
use futures::Stream;
use hyper::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
use hyper::HeaderMap;
@@ -36,8 +27,6 @@ use log::warn;
use super::{Body, Chunk};
use crate::error;
const INIT_BUFFER_SIZE: usize = 8192;
/// A response decompressor over a non-blocking stream of chunks.
///
/// The inner decoder may be constructed asynchronously.
@@ -49,22 +38,15 @@ enum Inner {
/// A `PlainText` decoder just returns the response content as is.
PlainText(Body),
/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
Gzip(Gzip),
Gzip(async_compression::stream::GzipDecoder<futures::stream::Peekable<BodyBytes>>),
/// A decoder that doesn't have a value yet.
Pending(Pending),
}
/// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
struct Pending {
body: ReadableChunks<Body>,
}
struct Pending(futures::stream::Peekable<BodyBytes>);
/// A gzip decoder that reads from a `flate2::read::GzDecoder` into a `BytesMut` and emits the results
/// as a `Chunk`.
struct Gzip {
inner: Box<GzDecoder<ReadableChunks<Body>>>,
buf: BytesMut,
}
struct BodyBytes(Body);
impl fmt::Debug for Decoder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -86,7 +68,6 @@ impl Decoder {
/// A plain text decoder.
///
/// This decoder will emit the underlying chunks as-is.
#[inline]
fn plain_text(body: Body) -> Decoder {
Decoder {
inner: Inner::PlainText(body),
@@ -96,12 +77,11 @@ impl Decoder {
/// A gzip decoder.
///
/// This decoder will buffer and decompress chunks that are gzipped.
#[inline]
fn gzip(body: Body) -> Decoder {
use futures::stream::StreamExt;
Decoder {
inner: Inner::Pending(Pending {
body: ReadableChunks::new(body),
}),
inner: Inner::Pending(Pending(BodyBytes(body).peekable())),
}
}
@@ -148,189 +128,65 @@ impl Decoder {
}
impl Stream for Decoder {
type Item = Chunk;
type Error = error::Error;
type Item = Result<Chunk, error::Error>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// Do a read or poll for a pendidng decoder value.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// Do a read or poll for a pending decoder value.
let new_value = match self.inner {
Inner::Pending(ref mut future) => match future.poll() {
Ok(Async::Ready(inner)) => inner,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => return Err(e),
Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) {
Poll::Ready(Ok(inner)) => inner,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(crate::error::from_io(e)))),
Poll::Pending => return Poll::Pending,
},
Inner::PlainText(ref mut body) => return body.poll(),
Inner::Gzip(ref mut decoder) => return decoder.poll(),
Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx),
Inner::Gzip(ref mut decoder) => {
return match futures::ready!(Pin::new(decoder).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.into()))),
Some(Err(err)) => Poll::Ready(Some(Err(crate::error::from_io(err)))),
None => Poll::Ready(None),
}
}
};
self.inner = new_value;
self.poll()
self.poll_next(cx)
}
}
impl Future for Pending {
type Item = Inner;
type Error = error::Error;
type Output = Result<Inner, std::io::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let body_state = match self.body.poll_stream() {
Ok(Async::Ready(state)) => state,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => return Err(e),
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures::stream::StreamExt;
match futures::ready!(Pin::new(&mut self.0).peek(cx)) {
Some(Ok(_)) => {
// fallthrough
}
Some(Err(_e)) => {
// error was just a ref, so we need to really poll to move it
return Poll::Ready(Err(futures::ready!(Pin::new(&mut self.0).poll_next(cx))
.expect("just peeked Some")
.unwrap_err()));
}
None => return Poll::Ready(Ok(Inner::PlainText(Body::empty()))),
};
let body = mem::replace(&mut self.body, ReadableChunks::new(Body::empty()));
match body_state {
StreamState::Eof => Ok(Async::Ready(Inner::PlainText(Body::empty()))),
StreamState::HasMore => Ok(Async::Ready(Inner::Gzip(Gzip::new(body)))),
}
let body = mem::replace(&mut self.0, BodyBytes(Body::empty()).peekable());
Poll::Ready(Ok(Inner::Gzip(
async_compression::stream::GzipDecoder::new(body),
)))
}
}
impl Gzip {
fn new(stream: ReadableChunks<Body>) -> Self {
Gzip {
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
inner: Box::new(GzDecoder::new(stream)),
}
}
}
impl Stream for Gzip {
type Item = Chunk;
type Error = error::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.buf.remaining_mut() == 0 {
self.buf.reserve(INIT_BUFFER_SIZE);
}
// The buffer contains uninitialised memory so getting a readable slice is unsafe.
// We trust the `flate2` and `miniz` writer not to read from the memory given.
//
// To be safe, this memory could be zeroed before passing to `flate2`.
// Otherwise we might need to deal with the case where `flate2` panics.
let read = try_io!(self.inner.read(unsafe { self.buf.bytes_mut() }));
if read == 0 {
// If GzDecoder reports EOF, it doesn't necessarily mean the
// underlying stream reached EOF (such as the `0\r\n\r\n`
// header meaning a chunked transfer has completed). If it
// isn't polled till EOF, the connection may not be able
// to be re-used.
//
// See https://github.com/seanmonstar/reqwest/issues/508.
let inner_read = try_io!(self.inner.get_mut().read(&mut [0]));
if inner_read == 0 {
Ok(Async::Ready(None))
} else {
Err(error::from(io::Error::new(
io::ErrorKind::InvalidData,
"unexpected data after gzip decoder signaled end-of-file",
)))
}
} else {
unsafe { self.buf.advance_mut(read) };
let chunk = Chunk::from_chunk(self.buf.split_to(read).freeze());
Ok(Async::Ready(Some(chunk)))
}
}
}
/// A `Read`able wrapper over a stream of chunks.
pub struct ReadableChunks<S> {
state: ReadState,
stream: S,
}
enum ReadState {
/// A chunk is ready to be read from.
Ready(Chunk),
/// The next chunk isn't ready yet.
NotReady,
/// The stream has finished.
Eof,
}
enum StreamState {
/// More bytes can be read from the stream.
HasMore,
/// No more bytes can be read from the stream.
Eof,
}
impl<S> ReadableChunks<S> {
#[inline]
pub(crate) fn new(stream: S) -> Self {
ReadableChunks {
state: ReadState::NotReady,
stream,
}
}
}
impl<S> fmt::Debug for ReadableChunks<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ReadableChunks").finish()
}
}
impl<S> Read for ReadableChunks<S>
where
S: Stream<Item = Chunk, Error = error::Error>,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
let ret;
match self.state {
ReadState::Ready(ref mut chunk) => {
let len = cmp::min(buf.len(), chunk.remaining());
buf[..len].copy_from_slice(&chunk[..len]);
chunk.advance(len);
if chunk.is_empty() {
ret = len;
} else {
return Ok(len);
}
}
ReadState::NotReady => match self.poll_stream() {
Ok(Async::Ready(StreamState::HasMore)) => continue,
Ok(Async::Ready(StreamState::Eof)) => return Ok(0),
Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
Err(e) => return Err(error::into_io(e)),
},
ReadState::Eof => return Ok(0),
}
self.state = ReadState::NotReady;
return Ok(ret);
}
}
}
impl<S> ReadableChunks<S>
where
S: Stream<Item = Chunk, Error = error::Error>,
{
/// Poll the readiness of the inner reader.
///
/// This function will update the internal state and return a simplified
/// version of the `ReadState`.
fn poll_stream(&mut self) -> Poll<StreamState, error::Error> {
match self.stream.poll() {
Ok(Async::Ready(Some(chunk))) => {
self.state = ReadState::Ready(chunk);
Ok(Async::Ready(StreamState::HasMore))
}
Ok(Async::Ready(None)) => {
self.state = ReadState::Eof;
Ok(Async::Ready(StreamState::Eof))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
impl Stream for BodyBytes {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match futures::ready!(Pin::new(&mut self.0).poll_next(cx)) {
Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk.into()))),
Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))),
None => Poll::Ready(None),
}
}
}