committed by
Sean McArthur
parent
81e0f1ff2a
commit
cf8944a0f0
@@ -20,18 +20,18 @@ The following types directly support the gzip compression case:
|
||||
- `Pending` is a non-blocking constructor for a `Decoder` in case the body needs to be checked for EOF
|
||||
*/
|
||||
|
||||
use std::fmt;
|
||||
use std::mem;
|
||||
use std::cmp;
|
||||
use std::fmt;
|
||||
use std::io::{self, Read};
|
||||
use std::mem;
|
||||
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use flate2::read::GzDecoder;
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use hyper::{HeaderMap};
|
||||
use hyper::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
|
||||
use hyper::HeaderMap;
|
||||
|
||||
use log::{warn};
|
||||
use log::warn;
|
||||
|
||||
use super::{Body, Chunk};
|
||||
use crate::error;
|
||||
@@ -42,7 +42,7 @@ const INIT_BUFFER_SIZE: usize = 8192;
|
||||
///
|
||||
/// The inner decoder may be constructed asynchronously.
|
||||
pub struct Decoder {
|
||||
inner: Inner
|
||||
inner: Inner,
|
||||
}
|
||||
|
||||
enum Inner {
|
||||
@@ -51,7 +51,7 @@ enum Inner {
|
||||
/// A `Gzip` decoder will uncompress the gzipped response content before returning it.
|
||||
Gzip(Gzip),
|
||||
/// A decoder that doesn't have a value yet.
|
||||
Pending(Pending)
|
||||
Pending(Pending),
|
||||
}
|
||||
|
||||
/// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
|
||||
@@ -68,8 +68,7 @@ struct Gzip {
|
||||
|
||||
impl fmt::Debug for Decoder {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Decoder")
|
||||
.finish()
|
||||
f.debug_struct("Decoder").finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,7 +79,7 @@ impl Decoder {
|
||||
#[inline]
|
||||
pub fn empty() -> Decoder {
|
||||
Decoder {
|
||||
inner: Inner::PlainText(Body::empty())
|
||||
inner: Inner::PlainText(Body::empty()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,7 +89,7 @@ impl Decoder {
|
||||
#[inline]
|
||||
fn plain_text(body: Body) -> Decoder {
|
||||
Decoder {
|
||||
inner: Inner::PlainText(body)
|
||||
inner: Inner::PlainText(body),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,7 +99,9 @@ impl Decoder {
|
||||
#[inline]
|
||||
fn gzip(body: Body) -> Decoder {
|
||||
Decoder {
|
||||
inner: Inner::Pending(Pending { body: ReadableChunks::new(body) })
|
||||
inner: Inner::Pending(Pending {
|
||||
body: ReadableChunks::new(body),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,11 +121,11 @@ impl Decoder {
|
||||
.get_all(CONTENT_ENCODING)
|
||||
.iter()
|
||||
.any(|enc| enc == "gzip");
|
||||
content_encoding_gzip ||
|
||||
headers
|
||||
.get_all(TRANSFER_ENCODING)
|
||||
.iter()
|
||||
.any(|enc| enc == "gzip")
|
||||
content_encoding_gzip
|
||||
|| headers
|
||||
.get_all(TRANSFER_ENCODING)
|
||||
.iter()
|
||||
.any(|enc| enc == "gzip")
|
||||
};
|
||||
if is_gzip {
|
||||
if let Some(content_length) = headers.get(CONTENT_LENGTH) {
|
||||
@@ -153,15 +154,13 @@ impl Stream for Decoder {
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
// Do a read or poll for a pendidng decoder value.
|
||||
let new_value = match self.inner {
|
||||
Inner::Pending(ref mut future) => {
|
||||
match future.poll() {
|
||||
Ok(Async::Ready(inner)) => inner,
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(e) => return Err(e)
|
||||
}
|
||||
Inner::Pending(ref mut future) => match future.poll() {
|
||||
Ok(Async::Ready(inner)) => inner,
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(e) => return Err(e),
|
||||
},
|
||||
Inner::PlainText(ref mut body) => return body.poll(),
|
||||
Inner::Gzip(ref mut decoder) => return decoder.poll()
|
||||
Inner::Gzip(ref mut decoder) => return decoder.poll(),
|
||||
};
|
||||
|
||||
self.inner = new_value;
|
||||
@@ -177,13 +176,13 @@ impl Future for Pending {
|
||||
let body_state = match self.body.poll_stream() {
|
||||
Ok(Async::Ready(state)) => state,
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Err(e) => return Err(e)
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
let body = mem::replace(&mut self.body, ReadableChunks::new(Body::empty()));
|
||||
match body_state {
|
||||
StreamState::Eof => Ok(Async::Ready(Inner::PlainText(Body::empty()))),
|
||||
StreamState::HasMore => Ok(Async::Ready(Inner::Gzip(Gzip::new(body))))
|
||||
StreamState::HasMore => Ok(Async::Ready(Inner::Gzip(Gzip::new(body)))),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -258,7 +257,7 @@ enum StreamState {
|
||||
/// More bytes can be read from the stream.
|
||||
HasMore,
|
||||
/// No more bytes can be read from the stream.
|
||||
Eof
|
||||
Eof,
|
||||
}
|
||||
|
||||
impl<S> ReadableChunks<S> {
|
||||
@@ -273,8 +272,7 @@ impl<S> ReadableChunks<S> {
|
||||
|
||||
impl<S> fmt::Debug for ReadableChunks<S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("ReadableChunks")
|
||||
.finish()
|
||||
f.debug_struct("ReadableChunks").finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,20 +294,12 @@ where
|
||||
} else {
|
||||
return Ok(len);
|
||||
}
|
||||
},
|
||||
ReadState::NotReady => {
|
||||
match self.poll_stream() {
|
||||
Ok(Async::Ready(StreamState::HasMore)) => continue,
|
||||
Ok(Async::Ready(StreamState::Eof)) => {
|
||||
return Ok(0)
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
return Err(io::ErrorKind::WouldBlock.into())
|
||||
},
|
||||
Err(e) => {
|
||||
return Err(error::into_io(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
ReadState::NotReady => match self.poll_stream() {
|
||||
Ok(Async::Ready(StreamState::HasMore)) => continue,
|
||||
Ok(Async::Ready(StreamState::Eof)) => return Ok(0),
|
||||
Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
|
||||
Err(e) => return Err(error::into_io(e)),
|
||||
},
|
||||
ReadState::Eof => return Ok(0),
|
||||
}
|
||||
@@ -320,7 +310,8 @@ where
|
||||
}
|
||||
|
||||
impl<S> ReadableChunks<S>
|
||||
where S: Stream<Item = Chunk, Error = error::Error>
|
||||
where
|
||||
S: Stream<Item = Chunk, Error = error::Error>,
|
||||
{
|
||||
/// Poll the readiness of the inner reader.
|
||||
///
|
||||
@@ -332,16 +323,14 @@ impl<S> ReadableChunks<S>
|
||||
self.state = ReadState::Ready(chunk);
|
||||
|
||||
Ok(Async::Ready(StreamState::HasMore))
|
||||
},
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
self.state = ReadState::Eof;
|
||||
|
||||
Ok(Async::Ready(StreamState::Eof))
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
Ok(Async::NotReady)
|
||||
},
|
||||
Err(e) => Err(e)
|
||||
}
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user