diff --git a/Cargo.toml b/Cargo.toml
index 13948fa..fc7d9eb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,7 +14,7 @@ bytes = "0.4"
futures = "0.1.14"
hyper = "0.11"
hyper-tls = "0.1.2"
-libflate = "0.1.5"
+libflate = "0.1.11"
log = "0.3"
native-tls = "0.1.3"
serde = "1.0"
diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs
index a55ba74..7c7bf59 100644
--- a/src/async_impl/body.rs
+++ b/src/async_impl/body.rs
@@ -121,6 +121,20 @@ pub fn take(body: &mut Body) -> Body {
}
}
+#[inline]
+pub fn empty() -> Body {
+ Body {
+ inner: Inner::Hyper(::hyper::Body::empty()),
+ }
+}
+
+#[inline]
+pub fn chunk(chunk: Bytes) -> Chunk {
+ Chunk {
+ inner: ::hyper::Chunk::from(chunk)
+ }
+}
+
#[inline]
pub fn reusable(chunk: Bytes) -> Body {
Body {
diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs
new file mode 100644
index 0000000..724c1ca
--- /dev/null
+++ b/src/async_impl/decoder.rs
@@ -0,0 +1,433 @@
+/*!
+A potentially non-blocking response decoder.
+
+The decoder wraps a stream of chunks and produces a new stream of decompressed chunks.
+The decompressed chunks aren't guaranteed to align to the compressed ones.
+
+If the response is plaintext then no additional work is carried out.
+Chunks are just passed along.
+
+If the response is gzip, then the chunks are decompressed into a buffer.
+Slices of that buffer are emitted as new chunks.
+
+This module consists of a few main types:
+
+- `ReadableChunks` is a `Read`-like wrapper around a stream
+- `Decoder` is a layer over `ReadableChunks` that applies the right decompression
+
+The following types directly support the gzip compression case:
+
+- `Pending` is a non-blocking constructor for a `Decoder` in case the body needs to be checked for EOF
+- `Peeked` is a buffer that keeps a few bytes available so `libflate`s `read_exact` calls won't fail
+*/
+
+use std::fmt;
+use std::mem;
+use std::cmp;
+use std::io::{self, Read};
+use std::marker::PhantomData;
+
+use bytes::{BufMut, BytesMut};
+use libflate::non_blocking::gzip;
+use tokio_io::AsyncRead;
+use tokio_io::io as async_io;
+use futures::{Async, Future, Poll, Stream};
+use futures::stream::Concat2;
+use hyper::StatusCode;
+use serde::de::DeserializeOwned;
+use serde_json;
+use url::Url;
+
+use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding};
+use super::{body, Body, Chunk};
+use error;
+
+const INIT_BUFFER_SIZE: usize = 8192;
+
+/// A response decompressor over a non-blocking stream of chunks.
+///
+/// The inner decoder may be constructed asynchronously.
+pub struct Decoder {
+ inner: Inner
+}
+
+enum Inner {
+ /// A `PlainText` decoder just returns the response content as is.
+ PlainText(Body),
+ /// A `Gzip` decoder will uncompress the gzipped response content before returning it.
+ Gzip(Gzip),
+ /// A decoder that doesn't have a value yet.
+ Pending(Pending)
+}
+
+enum Pending {
+ /// An unreachable internal state.
+ Empty,
+ /// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
+ Gzip(ReadableChunks
)
+}
+
+/// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results
+/// as a `Chunk`.
+struct Gzip {
+ inner: gzip::Decoder>>,
+ buf: BytesMut,
+}
+
+impl fmt::Debug for Decoder {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Decoder")
+ .finish()
+ }
+}
+
+impl Decoder {
+ #[inline]
+ fn plain_text(body: Body) -> Decoder {
+ Decoder {
+ inner: Inner::PlainText(body)
+ }
+ }
+
+ #[inline]
+ fn gzip(mut body: Body) -> Decoder {
+ Decoder {
+ inner: Inner::Pending(Pending::Gzip(ReadableChunks::new(body)))
+ }
+ }
+}
+
+impl Stream for Decoder {
+ type Item = Chunk;
+ type Error = error::Error;
+
+ fn poll(&mut self) -> Poll, Self::Error> {
+ // Do a read or poll for a pendidng decoder value.
+ let new_value = match self.inner {
+ Inner::Pending(ref mut future) => {
+ match future.poll() {
+ Ok(Async::Ready(inner)) => inner,
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => return Err(e)
+ }
+ },
+ Inner::PlainText(ref mut body) => return body.poll(),
+ Inner::Gzip(ref mut decoder) => return decoder.poll()
+ };
+
+ self.inner = new_value;
+ self.poll()
+ }
+}
+
+impl Future for Pending {
+ type Item = Inner;
+ type Error = error::Error;
+
+ fn poll(&mut self) -> Poll {
+ let body_state = match *self {
+ Pending::Gzip(ref mut body) => {
+ match body.poll_stream() {
+ Ok(Async::Ready(state)) => state,
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => return Err(e)
+ }
+ },
+ Pending::Empty => panic!("poll for a decoder after it's done")
+ };
+
+ match mem::replace(self, Pending::Empty) {
+ Pending::Gzip(body) => {
+ // libflate does a read_exact([0; 2]), so its impossible to tell
+ // if the stream was empty, or truly had an UnexpectedEof.
+ // Therefore, we need to check for EOF first.
+ match body_state {
+ StreamState::Eof => Ok(Async::Ready(Inner::PlainText(body::empty()))),
+ StreamState::HasMore => Ok(Async::Ready(Inner::Gzip(Gzip::new(body))))
+ }
+ },
+ Pending::Empty => panic!("invalid internal state")
+ }
+ }
+}
+
+impl Gzip {
+ fn new(stream: ReadableChunks) -> Self {
+ Gzip {
+ buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
+ inner: gzip::Decoder::new(Peeked::new(stream))
+ }
+ }
+}
+
+impl Stream for Gzip {
+ type Item = Chunk;
+ type Error = error::Error;
+
+ fn poll(&mut self) -> Poll, Self::Error> {
+ if self.buf.remaining_mut() == 0 {
+ self.buf.reserve(INIT_BUFFER_SIZE);
+ }
+
+ // The buffer contains uninitialised memory so getting a readable slice is unsafe.
+ // We trust the `libflate` writer not to read from the memory given.
+ //
+ // To be safe, this memory could be zeroed before passing to `libflate`.
+ // Otherwise we might need to deal with the case where `libflate` panics.
+ let read = {
+ let mut buf = unsafe { self.buf.bytes_mut() };
+ self.inner.read(&mut buf)
+ };
+
+ match read {
+ Ok(read) if read == 0 => {
+ Ok(Async::Ready(None))
+ },
+ Ok(read) => {
+ unsafe { self.buf.advance_mut(read) };
+ let chunk = body::chunk(self.buf.split_to(read).freeze());
+
+ Ok(Async::Ready(Some(chunk)))
+ },
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
+ Ok(Async::NotReady)
+ },
+ Err(e) => Err(error::from(e))
+ }
+ }
+}
+
+/// A `Read`able wrapper over a stream of chunks.
+pub struct ReadableChunks {
+ state: ReadState,
+ stream: S,
+}
+
+enum ReadState {
+ /// A chunk is ready to be read from.
+ Ready(Chunk, usize),
+ /// The next chunk isn't ready yet.
+ NotReady,
+ /// The stream has finished.
+ Eof,
+}
+
+enum StreamState {
+ /// More bytes can be read from the stream.
+ HasMore,
+ /// No more bytes can be read from the stream.
+ Eof
+}
+
+/// A buffering reader that ensures `Read`s return at least a few bytes.
+struct Peeked {
+ state: PeekedState,
+ peeked_buf: [u8; 2],
+ pos: usize,
+ inner: R,
+}
+
+enum PeekedState {
+ /// The internal buffer hasn't filled yet.
+ NotReady,
+ /// The internal buffer can be read.
+ Ready(usize)
+}
+
+impl Peeked {
+ #[inline]
+ fn new(inner: R) -> Self {
+ Peeked {
+ state: PeekedState::NotReady,
+ peeked_buf: [0; 2],
+ inner: inner,
+ pos: 0,
+ }
+ }
+
+ #[inline]
+ fn ready(&mut self) {
+ self.state = PeekedState::Ready(self.pos);
+ self.pos = 0;
+ }
+
+ #[inline]
+ fn not_ready(&mut self) {
+ self.state = PeekedState::NotReady;
+ self.pos = 0;
+ }
+}
+
+impl Read for Peeked {
+ #[inline]
+ fn read(&mut self, buf: &mut [u8]) -> io::Result {
+ loop {
+ match self.state {
+ PeekedState::Ready(peeked_buf_len) => {
+ let len = cmp::min(buf.len(), peeked_buf_len - self.pos);
+ let start = self.pos;
+ let end = self.pos + len;
+
+ buf[..len].copy_from_slice(&self.peeked_buf[start..end]);
+ self.pos += len;
+ if self.pos == peeked_buf_len {
+ self.not_ready();
+ }
+
+ return Ok(len)
+ },
+ PeekedState::NotReady => {
+ let read = self.inner.read(&mut self.peeked_buf[self.pos..]);
+
+ match read {
+ Ok(0) => {
+ self.ready();
+ },
+ Ok(read) => {
+ self.pos += read;
+ if self.pos == self.peeked_buf.len() {
+ self.ready();
+ }
+ },
+ Err(e) => return Err(e)
+ }
+ }
+ };
+ }
+ }
+}
+
+impl ReadableChunks {
+ #[inline]
+ pub fn new(stream: S) -> Self {
+ ReadableChunks {
+ state: ReadState::NotReady,
+ stream: stream,
+ }
+ }
+}
+
+impl fmt::Debug for ReadableChunks {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("ReadableChunks")
+ .finish()
+ }
+}
+
+impl Read for ReadableChunks
+ where S: Stream-
+{
+ fn read(&mut self, buf: &mut [u8]) -> io::Result
{
+ loop {
+ let ret;
+ match self.state {
+ ReadState::Ready(ref mut chunk, ref mut pos) => {
+ let chunk_start = *pos;
+ let len = cmp::min(buf.len(), chunk.len() - chunk_start);
+ let chunk_end = chunk_start + len;
+
+ buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);
+ *pos += len;
+ if *pos == chunk.len() {
+ ret = len;
+ } else {
+ return Ok(len);
+ }
+ },
+ ReadState::NotReady => {
+ match self.poll_stream() {
+ Ok(Async::Ready(StreamState::HasMore)) => continue,
+ Ok(Async::Ready(StreamState::Eof)) => {
+ return Ok(0)
+ },
+ Ok(Async::NotReady) => {
+ return Err(io::ErrorKind::WouldBlock.into())
+ },
+ Err(e) => {
+ return Err(error::into_io(e))
+ }
+ }
+ },
+ ReadState::Eof => return Ok(0),
+ }
+ self.state = ReadState::NotReady;
+ return Ok(ret);
+ }
+ }
+}
+
+impl ReadableChunks
+ where S: Stream-
+{
+ /// Poll the readiness of the inner reader.
+ ///
+ /// This function will update the internal state and return a simplified
+ /// version of the `ReadState`.
+ fn poll_stream(&mut self) -> Poll
{
+ match self.stream.poll() {
+ Ok(Async::Ready(Some(chunk))) => {
+ self.state = ReadState::Ready(chunk, 0);
+
+ Ok(Async::Ready(StreamState::HasMore))
+ },
+ Ok(Async::Ready(None)) => {
+ self.state = ReadState::Eof;
+
+ Ok(Async::Ready(StreamState::Eof))
+ },
+ Ok(Async::NotReady) => {
+ Ok(Async::NotReady)
+ },
+ Err(e) => Err(e)
+ }
+ }
+}
+
+// pub(crate)
+
+#[inline]
+pub fn take(decoder: &mut Decoder) -> Decoder {
+ let inner = mem::replace(&mut decoder.inner, Inner::PlainText(body::empty()));
+ Decoder {
+ inner: inner,
+ }
+}
+
+/// Constructs a Decoder from a hyper request.
+///
+/// A decoder is just a wrapper around the hyper request that knows
+/// how to decode the content body of the request.
+///
+/// Uses the correct variant by inspecting the Content-Encoding header.
+pub fn detect(headers: &mut Headers, body: Body, check_gzip: bool) -> Decoder {
+ if !check_gzip {
+ return Decoder::plain_text(body);
+ }
+ let content_encoding_gzip: bool;
+ let mut is_gzip = {
+ content_encoding_gzip = headers
+ .get::()
+ .map_or(false, |encs| encs.contains(&Encoding::Gzip));
+ content_encoding_gzip ||
+ headers
+ .get::()
+ .map_or(false, |encs| encs.contains(&Encoding::Gzip))
+ };
+ if is_gzip {
+ if let Some(content_length) = headers.get::() {
+ if content_length.0 == 0 {
+ warn!("GZipped response with content-length of 0");
+ is_gzip = false;
+ }
+ }
+ }
+ if content_encoding_gzip {
+ headers.remove::();
+ headers.remove::();
+ }
+ if is_gzip {
+ Decoder::gzip(body)
+ } else {
+ Decoder::plain_text(body)
+ }
+}
diff --git a/src/async_impl/mod.rs b/src/async_impl/mod.rs
index 0ee4a66..309a672 100644
--- a/src/async_impl/mod.rs
+++ b/src/async_impl/mod.rs
@@ -1,11 +1,13 @@
#![cfg_attr(not(features = "unstable"), allow(unused))]
pub use self::body::{Body, Chunk};
+pub use self::decoder::{Decoder, ReadableChunks};
pub use self::client::{Client, ClientBuilder};
pub use self::request::{Request, RequestBuilder};
pub use self::response::Response;
pub mod body;
pub mod client;
+pub mod decoder;
mod request;
mod response;
diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs
index 6378cd8..c0080e0 100644
--- a/src/async_impl/response.rs
+++ b/src/async_impl/response.rs
@@ -1,15 +1,21 @@
use std::fmt;
+use std::mem;
+use std::io::{self, Read};
use std::marker::PhantomData;
+use libflate::non_blocking::gzip;
+use tokio_io::AsyncRead;
+use tokio_io::io as async_io;
use futures::{Async, Future, Poll, Stream};
use futures::stream::Concat2;
-use header::Headers;
use hyper::StatusCode;
use serde::de::DeserializeOwned;
use serde_json;
use url::Url;
-use super::{body, Body};
+use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding};
+use super::{decoder, body, Body, Chunk, Decoder};
+use error;
/// A Response to a submitted `Request`.
@@ -17,7 +23,7 @@ pub struct Response {
status: StatusCode,
headers: Headers,
url: Url,
- body: Body,
+ body: Decoder,
}
impl Response {
@@ -45,17 +51,29 @@ impl Response {
&mut self.headers
}
- /// Get a mutable reference to the `Body` of this `Response`.
+ /// Get a readable response body.
+ ///
+ /// The response will be decoded.
#[inline]
- pub fn body_mut(&mut self) -> &mut Body {
+ pub fn body_mut(&mut self) -> &mut Decoder {
&mut self.body
}
+ /// Get a readable response body.
+ ///
+ /// This function will replace the body on the response with an empty one.
+ #[inline]
+ pub fn body(&mut self) -> Decoder {
+ decoder::take(&mut self.body)
+ }
+
/// Try to deserialize the response body as JSON using `serde`.
#[inline]
pub fn json(&mut self) -> Json {
+ let body = self.body().concat2();
+
Json {
- concat: body::take(self.body_mut()).concat2(),
+ concat: body,
_marker: PhantomData,
}
}
@@ -95,7 +113,6 @@ impl Response {
}
}
-
impl fmt::Debug for Response {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Response")
@@ -107,7 +124,7 @@ impl fmt::Debug for Response {
}
pub struct Json {
- concat: Concat2,
+ concat: Concat2,
_marker: PhantomData,
}
@@ -130,17 +147,15 @@ impl fmt::Debug for Json {
// pub(crate)
-pub fn new(mut res: ::hyper::client::Response, url: Url, _gzip: bool) -> Response {
- use std::mem;
-
+pub fn new(mut res: ::hyper::client::Response, url: Url, gzip: bool) -> Response {
let status = res.status();
- let headers = mem::replace(res.headers_mut(), Headers::new());
- let body = res.body();
+ let mut headers = mem::replace(res.headers_mut(), Headers::new());
+ let decoder = decoder::detect(&mut headers, body::wrap(res.body()), gzip);
debug!("Response: '{}' for {}", status, url);
Response {
status: status,
headers: headers,
url: url,
- body: super::body::wrap(body),
+ body: decoder,
}
}
diff --git a/src/client.rs b/src/client.rs
index a486b54..d357832 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -52,7 +52,6 @@ pub struct Client {
/// # }
/// ```
pub struct ClientBuilder {
- gzip: bool,
inner: async_impl::ClientBuilder,
timeout: Option,
}
@@ -66,7 +65,6 @@ impl ClientBuilder {
pub fn new() -> ::Result {
async_impl::ClientBuilder::new().map(|builder| ClientBuilder {
inner: builder,
- gzip: true,
timeout: None,
})
}
@@ -150,7 +148,6 @@ impl ClientBuilder {
#[inline]
pub fn gzip(&mut self, enable: bool) -> &mut ClientBuilder {
self.inner.gzip(enable);
- self.gzip = enable;
self
}
@@ -312,7 +309,6 @@ impl fmt::Debug for ClientBuilder {
#[derive(Clone)]
struct ClientHandle {
- gzip: bool,
timeout: Option,
inner: Arc
}
@@ -334,7 +330,6 @@ impl Drop for InnerClientHandle {
impl ClientHandle {
fn new(builder: &mut ClientBuilder) -> ::Result {
- let gzip = builder.gzip;
let timeout = builder.timeout;
let mut builder = async_impl::client::take_builder(&mut builder.inner);
let (tx, rx) = mpsc::unbounded();
@@ -383,7 +378,6 @@ impl ClientHandle {
Ok(ClientHandle {
- gzip: gzip,
timeout: timeout,
inner: inner_handle,
})
@@ -412,7 +406,7 @@ impl ClientHandle {
}
};
res.map(|res| {
- response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.inner.clone()))
+ response::new(res, self.timeout, KeepCoreThreadAlive(self.inner.clone()))
})
}
}
diff --git a/src/response.rs b/src/response.rs
index 06e9c2d..cdc7827 100644
--- a/src/response.rs
+++ b/src/response.rs
@@ -2,19 +2,18 @@ use std::fmt;
use std::io::{self, Read};
use std::time::Duration;
-use libflate::gzip;
+use futures::{Async, Poll, Stream};
use serde::de::DeserializeOwned;
use serde_json;
use client::KeepCoreThreadAlive;
-use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding};
+use header::Headers;
use {async_impl, StatusCode, Url, wait};
-
/// A Response to a submitted `Request`.
pub struct Response {
- body: Decoder,
inner: async_impl::Response,
+ body: async_impl::ReadableChunks,
_thread_handle: KeepCoreThreadAlive,
}
@@ -183,11 +182,11 @@ impl Response {
/// ```
#[inline]
pub fn error_for_status(self) -> ::Result {
- let Response { body, inner, _thread_handle } = self;
+ let Response { inner, body, _thread_handle } = self;
inner.error_for_status().map(move |inner| {
Response {
- body: body,
inner: inner,
+ body: body,
_thread_handle: _thread_handle,
}
})
@@ -201,189 +200,40 @@ impl Read for Response {
}
}
-struct ReadableBody {
- state: ReadState,
- stream: wait::WaitStream,
+struct WaitBody {
+ inner: wait::WaitStream
}
-enum ReadState {
- Ready(async_impl::Chunk, usize),
- NotReady,
- Eof,
-}
+impl Stream for WaitBody {
+ type Item = ::Item;
+ type Error = ::Error;
+ fn poll(&mut self) -> Poll, Self::Error> {
+ match self.inner.next() {
+ Some(Ok(chunk)) => Ok(Async::Ready(Some(chunk))),
+ Some(Err(e)) => {
+ let req_err = match e {
+ wait::Waited::TimedOut => ::error::timedout(None),
+ wait::Waited::Err(e) => e,
+ };
-impl Read for ReadableBody {
- fn read(&mut self, buf: &mut [u8]) -> io::Result {
- use std::cmp;
-
- loop {
- let ret;
- match self.state {
- ReadState::Ready(ref mut chunk, ref mut pos) => {
- let chunk_start = *pos;
- let len = cmp::min(buf.len(), chunk.len() - chunk_start);
- let chunk_end = chunk_start + len;
- buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);
- *pos += len;
- if *pos == chunk.len() {
- ret = len;
- } else {
- return Ok(len);
- }
- },
- ReadState::NotReady => {
- match self.stream.next() {
- Some(Ok(chunk)) => {
- self.state = ReadState::Ready(chunk, 0);
- continue;
- },
- Some(Err(e)) => {
- let req_err = match e {
- wait::Waited::TimedOut => ::error::timedout(None),
- wait::Waited::Err(e) => e,
- };
- return Err(::error::into_io(req_err));
- },
- None => {
- self.state = ReadState::Eof;
- return Ok(0);
- },
- }
- },
- ReadState::Eof => return Ok(0),
- }
- self.state = ReadState::NotReady;
- return Ok(ret);
+ Err(req_err)
+ },
+ None => Ok(Async::Ready(None)),
}
}
}
-
-enum Decoder {
- /// A `PlainText` decoder just returns the response content as is.
- PlainText(ReadableBody),
- /// A `Gzip` decoder will uncompress the gziped response content before returning it.
- Gzip(gzip::Decoder),
- /// An error occured reading the Gzip header, so return that error
- /// when the user tries to read on the `Response`.
- Errored(Option),
-}
-
-impl Decoder {
- /// Constructs a Decoder from a hyper request.
- ///
- /// A decoder is just a wrapper around the hyper request that knows
- /// how to decode the content body of the request.
- ///
- /// Uses the correct variant by inspecting the Content-Encoding header.
- fn new(res: &mut async_impl::Response, check_gzip: bool, timeout: Option) -> Self {
- let body = async_impl::body::take(res.body_mut());
- let body = ReadableBody {
- state: ReadState::NotReady,
- stream: wait::stream(body, timeout),
- };
-
- if !check_gzip {
- return Decoder::PlainText(body);
- }
- let content_encoding_gzip: bool;
- let mut is_gzip = {
- content_encoding_gzip = res.headers()
- .get::()
- .map_or(false, |encs| encs.contains(&Encoding::Gzip));
- content_encoding_gzip ||
- res.headers()
- .get::()
- .map_or(false, |encs| encs.contains(&Encoding::Gzip))
- };
- if is_gzip {
- if let Some(content_length) = res.headers().get::() {
- if content_length.0 == 0 {
- warn!("GZipped response with content-length of 0");
- is_gzip = false;
- }
- }
- }
- if content_encoding_gzip {
- res.headers_mut().remove::();
- res.headers_mut().remove::();
- }
- if is_gzip {
- new_gzip(body)
- } else {
- Decoder::PlainText(body)
- }
- }
-}
-
-fn new_gzip(mut body: ReadableBody) -> Decoder {
- // libflate does a read_exact([0; 2]), so its impossible to tell
- // if the stream was empty, or truly had an UnexpectedEof.
- // Therefore, we need to peek a byte to make check for EOF first.
- let mut peek = [0];
- match body.read(&mut peek) {
- Ok(0) => return Decoder::PlainText(body),
- Ok(n) => debug_assert_eq!(n, 1),
- Err(e) => return Decoder::Errored(Some(e)),
- }
-
- let reader = Peeked {
- peeked: Some(peek[0]),
- inner: body,
- };
- match gzip::Decoder::new(reader) {
- Ok(gzip) => Decoder::Gzip(gzip),
- Err(e) => Decoder::Errored(Some(e)),
- }
-}
-
-struct Peeked {
- peeked: Option,
- inner: ReadableBody,
-}
-
-impl Read for Peeked {
- #[inline]
- fn read(&mut self, buf: &mut [u8]) -> io::Result {
- if buf.is_empty() {
- return Ok(0);
- }
- if let Some(byte) = self.peeked.take() {
- buf[0] = byte;
- Ok(1)
- } else {
- self.inner.read(buf)
- }
- }
-}
-
-impl Read for Decoder {
- fn read(&mut self, buf: &mut [u8]) -> io::Result {
- match *self {
- Decoder::PlainText(ref mut body) => body.read(buf),
- Decoder::Gzip(ref mut decoder) => decoder.read(buf),
- Decoder::Errored(ref mut err) => {
- Err(err.take().unwrap_or_else(previously_errored))
- }
- }
- }
-}
-
-#[inline]
-fn previously_errored() -> io::Error {
- io::Error::new(io::ErrorKind::Other, "permanently errored")
-}
-
-
// pub(crate)
-pub fn new(mut res: async_impl::Response, gzip: bool, timeout: Option, thread: KeepCoreThreadAlive) -> Response {
+pub fn new(mut res: async_impl::Response, timeout: Option, thread: KeepCoreThreadAlive) -> Response {
+ let body = async_impl::ReadableChunks::new(WaitBody {
+ inner: wait::stream(res.body(), timeout)
+ });
- let decoder = Decoder::new(&mut res, gzip, timeout);
Response {
- body: decoder,
inner: res,
+ body: body,
_thread_handle: thread,
}
}
diff --git a/tests/async.rs b/tests/async.rs
new file mode 100644
index 0000000..90af803
--- /dev/null
+++ b/tests/async.rs
@@ -0,0 +1,77 @@
+#![cfg(feature="unstable")]
+
+extern crate futures;
+extern crate tokio_core;
+extern crate tokio_io;
+extern crate reqwest;
+extern crate libflate;
+
+#[macro_use]
+mod support;
+
+use futures::{Future, Stream};
+use tokio_core::reactor::Core;
+use std::io::Write;
+use std::time::Duration;
+
+#[test]
+fn async_test_gzip_response() {
+ test_gzip(10_000, 4096);
+}
+
+#[test]
+fn async_test_gzip_single_byte_chunks() {
+ test_gzip(10, 1);
+}
+
+fn test_gzip(response_size: usize, chunk_size: usize) {
+ let content: String = (0..response_size).into_iter().map(|i| format!("test {}", i)).collect();
+ let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap();
+ match encoder.write(content.as_bytes()) {
+ Ok(n) => assert!(n > 0, "Failed to write to encoder."),
+ _ => panic!("Failed to gzip encode string."),
+ };
+
+ let gzipped_content = encoder.finish().into_result().unwrap();
+
+ let mut response = format!("\
+ HTTP/1.1 200 OK\r\n\
+ Server: test-accept\r\n\
+ Content-Encoding: gzip\r\n\
+ Content-Length: {}\r\n\
+ \r\n", &gzipped_content.len())
+ .into_bytes();
+ response.extend(&gzipped_content);
+
+ let server = server! {
+ request: b"\
+ GET /gzip HTTP/1.1\r\n\
+ Host: $HOST\r\n\
+ User-Agent: $USERAGENT\r\n\
+ Accept: */*\r\n\
+ Accept-Encoding: gzip\r\n\
+ \r\n\
+ ",
+ chunk_size: chunk_size,
+ write_timeout: Duration::from_millis(10),
+ response: response
+ };
+
+ let mut core = Core::new().unwrap();
+
+ let client = reqwest::unstable::async::Client::new(&core.handle()).unwrap();
+
+ let res_future = client.get(&format!("http://{}/gzip", server.addr()))
+ .unwrap()
+ .send()
+ .and_then(|mut res| res.body().concat2())
+ .and_then(|buf| {
+ let body = ::std::str::from_utf8(&buf).unwrap();
+
+ assert_eq!(body, &content);
+
+ Ok(())
+ });
+
+ core.run(res_future).unwrap();
+}
diff --git a/tests/gzip.rs b/tests/gzip.rs
index 6e3a478..e720eef 100644
--- a/tests/gzip.rs
+++ b/tests/gzip.rs
@@ -4,12 +4,15 @@ extern crate libflate;
#[macro_use]
mod support;
+use std::time::Duration;
use std::io::{Read, Write};
#[test]
fn test_gzip_response() {
+ let content: String = (0..50).into_iter().map(|i| format!("test {}", i)).collect();
+ let chunk_size = content.len() / 3;
let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap();
- match encoder.write(b"test request") {
+ match encoder.write(content.as_bytes()) {
Ok(n) => assert!(n > 0, "Failed to write to encoder."),
_ => panic!("Failed to gzip encode string."),
};
@@ -34,6 +37,8 @@ fn test_gzip_response() {
Accept-Encoding: gzip\r\n\
\r\n\
",
+ chunk_size: chunk_size,
+ write_timeout: Duration::from_millis(10),
response: response
};
let mut res = reqwest::get(&format!("http://{}/gzip", server.addr())).unwrap();
@@ -41,7 +46,7 @@ fn test_gzip_response() {
let mut body = String::new();
res.read_to_string(&mut body).unwrap();
- assert_eq!(body, "test request");
+ assert_eq!(body, content);
}
#[test]
diff --git a/tests/support/server.rs b/tests/support/server.rs
index b56cb9f..420b49c 100644
--- a/tests/support/server.rs
+++ b/tests/support/server.rs
@@ -23,6 +23,7 @@ pub struct Txn {
pub read_timeout: Option,
pub response_timeout: Option,
pub write_timeout: Option,
+ pub chunk_size: Option,
}
static DEFAULT_USER_AGENT: &'static str =
@@ -55,10 +56,21 @@ pub fn spawn(txns: Vec) -> Server {
}
if let Some(dur) = txn.write_timeout {
- let headers_end = ::std::str::from_utf8(&reply).unwrap().find("\r\n\r\n").unwrap() + 4;
+ let headers_end = b"\r\n\r\n";
+ let headers_end = reply.windows(headers_end.len()).position(|w| w == headers_end).unwrap() + 4;
socket.write_all(&reply[..headers_end]).unwrap();
- thread::park_timeout(dur);
- socket.write_all(&reply[headers_end..]).unwrap();
+
+ let body = &reply[headers_end..];
+
+ if let Some(chunk_size) = txn.chunk_size {
+ for content in body.chunks(chunk_size) {
+ thread::park_timeout(dur);
+ socket.write_all(&content).unwrap();
+ }
+ } else {
+ thread::park_timeout(dur);
+ socket.write_all(&body).unwrap();
+ }
} else {
socket.write_all(&reply).unwrap();
}