diff --git a/Cargo.toml b/Cargo.toml
index b6f5598..3cdae3f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,7 +14,7 @@ bytes = "0.4"
futures = "0.1.14"
hyper = "0.11"
hyper-tls = "0.1.2"
-libflate = "0.1.5"
+libflate = "0.1.11"
log = "0.3"
native-tls = "0.1.3"
serde = "1.0"
diff --git a/examples/async.rs b/examples/async.rs
index 06f0378..3cafb5f 100644
--- a/examples/async.rs
+++ b/examples/async.rs
@@ -1,17 +1,40 @@
-#![deny(warnings)]
+#![allow(warnings)] // remove when error_chain is fixed
+
extern crate futures;
extern crate reqwest;
extern crate tokio_core;
+#[macro_use]
+extern crate error_chain;
-use futures::Future;
+use std::mem;
+use std::io::{self, Cursor};
+use futures::{Future, Stream};
+use reqwest::unstable::async::{Client, Decoder};
+
+error_chain! {
+ foreign_links {
+ ReqError(reqwest::Error);
+ IoError(io::Error);
+ }
+}
fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap();
- let client = reqwest::unstable::async::Client::new(&core.handle()).unwrap();
+ let client = Client::new(&core.handle()).unwrap();
- let work = client.get("https://hyper.rs").unwrap().send().map(|res| {
- println!("{}", res.status());
- });
+ let work = client.get("https://hyper.rs").unwrap()
+ .send()
+ .map_err(|e| Error::from(e))
+ .and_then(|mut res| {
+ println!("{}", res.status());
+
+ let body = mem::replace(res.body_mut(), Decoder::empty());
+ body.concat2().map_err(Into::into)
+ })
+ .and_then(|body| {
+ let mut body = Cursor::new(body);
+ io::copy(&mut body, &mut io::stdout()).map_err(Into::into)
+ });
core.run(work).unwrap();
}
diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs
index a55ba74..de8ea97 100644
--- a/src/async_impl/body.rs
+++ b/src/async_impl/body.rs
@@ -66,6 +66,13 @@ pub struct Chunk {
inner: ::hyper::Chunk,
}
+impl AsRef<[u8]> for Chunk {
+ #[inline]
+ fn as_ref(&self) -> &[u8] {
+ &*self
+ }
+}
+
impl ::std::ops::Deref for Chunk {
type Target = [u8];
#[inline]
@@ -121,6 +128,20 @@ pub fn take(body: &mut Body) -> Body {
}
}
+#[inline]
+pub fn empty() -> Body {
+ Body {
+ inner: Inner::Hyper(::hyper::Body::empty()),
+ }
+}
+
+#[inline]
+pub fn chunk(chunk: Bytes) -> Chunk {
+ Chunk {
+ inner: ::hyper::Chunk::from(chunk)
+ }
+}
+
#[inline]
pub fn reusable(chunk: Bytes) -> Body {
Body {
diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs
new file mode 100644
index 0000000..441c8cc
--- /dev/null
+++ b/src/async_impl/decoder.rs
@@ -0,0 +1,441 @@
+/*!
+A potentially non-blocking response decoder.
+
+The decoder wraps a stream of chunks and produces a new stream of decompressed chunks.
+The decompressed chunks aren't guaranteed to align to the compressed ones.
+
+If the response is plaintext then no additional work is carried out.
+Chunks are just passed along.
+
+If the response is gzip, then the chunks are decompressed into a buffer.
+Slices of that buffer are emitted as new chunks.
+
+This module consists of a few main types:
+
+- `ReadableChunks` is a `Read`-like wrapper around a stream
+- `Decoder` is a layer over `ReadableChunks` that applies the right decompression
+
+The following types directly support the gzip compression case:
+
+- `Pending` is a non-blocking constructor for a `Decoder` in case the body needs to be checked for EOF
+- `Peeked` is a buffer that keeps a few bytes available so `libflate`s `read_exact` calls won't fail
+*/
+
+use std::fmt;
+use std::mem;
+use std::cmp;
+use std::io::{self, Read};
+use std::marker::PhantomData;
+
+use bytes::{BufMut, BytesMut};
+use libflate::non_blocking::gzip;
+use tokio_io::AsyncRead;
+use tokio_io::io as async_io;
+use futures::{Async, Future, Poll, Stream};
+use futures::stream::Concat2;
+use hyper::StatusCode;
+use serde::de::DeserializeOwned;
+use serde_json;
+use url::Url;
+
+use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding};
+use super::{body, Body, Chunk};
+use error;
+
+const INIT_BUFFER_SIZE: usize = 8192;
+
+/// A response decompressor over a non-blocking stream of chunks.
+///
+/// The inner decoder may be constructed asynchronously.
+pub struct Decoder {
+ inner: Inner
+}
+
+enum Inner {
+ /// A `PlainText` decoder just returns the response content as is.
+ PlainText(Body),
+ /// A `Gzip` decoder will uncompress the gzipped response content before returning it.
+ Gzip(Gzip),
+ /// A decoder that doesn't have a value yet.
+ Pending(Pending)
+}
+
+enum Pending {
+ /// An unreachable internal state.
+ Empty,
+ /// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
+ Gzip(ReadableChunks
)
+}
+
+/// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results
+/// as a `Chunk`.
+struct Gzip {
+ inner: gzip::Decoder>>,
+ buf: BytesMut,
+}
+
+impl fmt::Debug for Decoder {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("Decoder")
+ .finish()
+ }
+}
+
+impl Decoder {
+ /// An empty decoder.
+ ///
+ /// This decoder will produce a single 0 byte chunk.
+ #[inline]
+ pub fn empty() -> Decoder {
+ Decoder {
+ inner: Inner::PlainText(body::empty())
+ }
+ }
+
+ /// A plain text decoder.
+ ///
+ /// This decoder will emit the underlying chunks as-is.
+ #[inline]
+ fn plain_text(body: Body) -> Decoder {
+ Decoder {
+ inner: Inner::PlainText(body)
+ }
+ }
+
+ /// A gzip decoder.
+ ///
+ /// This decoder will buffer and decompress chunks that are gzipped.
+ #[inline]
+ fn gzip(mut body: Body) -> Decoder {
+ Decoder {
+ inner: Inner::Pending(Pending::Gzip(ReadableChunks::new(body)))
+ }
+ }
+}
+
+impl Stream for Decoder {
+ type Item = Chunk;
+ type Error = error::Error;
+
+ fn poll(&mut self) -> Poll