From e2fa97254e9ba4e5251a39d079bb61b49f392958 Mon Sep 17 00:00:00 2001 From: Ashley Mannix Date: Fri, 18 Aug 2017 19:43:06 +1000 Subject: [PATCH] support async gzip decoding --- Cargo.toml | 2 +- src/async_impl/body.rs | 14 ++ src/async_impl/decoder.rs | 433 +++++++++++++++++++++++++++++++++++++ src/async_impl/mod.rs | 2 + src/async_impl/response.rs | 43 ++-- src/client.rs | 8 +- src/response.rs | 202 +++-------------- tests/async.rs | 77 +++++++ tests/gzip.rs | 9 +- tests/support/server.rs | 18 +- 10 files changed, 605 insertions(+), 203 deletions(-) create mode 100644 src/async_impl/decoder.rs create mode 100644 tests/async.rs diff --git a/Cargo.toml b/Cargo.toml index 13948fa..fc7d9eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ bytes = "0.4" futures = "0.1.14" hyper = "0.11" hyper-tls = "0.1.2" -libflate = "0.1.5" +libflate = "0.1.11" log = "0.3" native-tls = "0.1.3" serde = "1.0" diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index a55ba74..7c7bf59 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -121,6 +121,20 @@ pub fn take(body: &mut Body) -> Body { } } +#[inline] +pub fn empty() -> Body { + Body { + inner: Inner::Hyper(::hyper::Body::empty()), + } +} + +#[inline] +pub fn chunk(chunk: Bytes) -> Chunk { + Chunk { + inner: ::hyper::Chunk::from(chunk) + } +} + #[inline] pub fn reusable(chunk: Bytes) -> Body { Body { diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs new file mode 100644 index 0000000..724c1ca --- /dev/null +++ b/src/async_impl/decoder.rs @@ -0,0 +1,433 @@ +/*! +A potentially non-blocking response decoder. + +The decoder wraps a stream of chunks and produces a new stream of decompressed chunks. +The decompressed chunks aren't guaranteed to align to the compressed ones. + +If the response is plaintext then no additional work is carried out. +Chunks are just passed along. + +If the response is gzip, then the chunks are decompressed into a buffer. +Slices of that buffer are emitted as new chunks. + +This module consists of a few main types: + +- `ReadableChunks` is a `Read`-like wrapper around a stream +- `Decoder` is a layer over `ReadableChunks` that applies the right decompression + +The following types directly support the gzip compression case: + +- `Pending` is a non-blocking constructor for a `Decoder` in case the body needs to be checked for EOF +- `Peeked` is a buffer that keeps a few bytes available so `libflate`s `read_exact` calls won't fail +*/ + +use std::fmt; +use std::mem; +use std::cmp; +use std::io::{self, Read}; +use std::marker::PhantomData; + +use bytes::{BufMut, BytesMut}; +use libflate::non_blocking::gzip; +use tokio_io::AsyncRead; +use tokio_io::io as async_io; +use futures::{Async, Future, Poll, Stream}; +use futures::stream::Concat2; +use hyper::StatusCode; +use serde::de::DeserializeOwned; +use serde_json; +use url::Url; + +use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding}; +use super::{body, Body, Chunk}; +use error; + +const INIT_BUFFER_SIZE: usize = 8192; + +/// A response decompressor over a non-blocking stream of chunks. +/// +/// The inner decoder may be constructed asynchronously. +pub struct Decoder { + inner: Inner +} + +enum Inner { + /// A `PlainText` decoder just returns the response content as is. + PlainText(Body), + /// A `Gzip` decoder will uncompress the gzipped response content before returning it. + Gzip(Gzip), + /// A decoder that doesn't have a value yet. + Pending(Pending) +} + +enum Pending { + /// An unreachable internal state. + Empty, + /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. + Gzip(ReadableChunks) +} + +/// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results +/// as a `Chunk`. +struct Gzip { + inner: gzip::Decoder>>, + buf: BytesMut, +} + +impl fmt::Debug for Decoder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Decoder") + .finish() + } +} + +impl Decoder { + #[inline] + fn plain_text(body: Body) -> Decoder { + Decoder { + inner: Inner::PlainText(body) + } + } + + #[inline] + fn gzip(mut body: Body) -> Decoder { + Decoder { + inner: Inner::Pending(Pending::Gzip(ReadableChunks::new(body))) + } + } +} + +impl Stream for Decoder { + type Item = Chunk; + type Error = error::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + // Do a read or poll for a pendidng decoder value. + let new_value = match self.inner { + Inner::Pending(ref mut future) => { + match future.poll() { + Ok(Async::Ready(inner)) => inner, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => return Err(e) + } + }, + Inner::PlainText(ref mut body) => return body.poll(), + Inner::Gzip(ref mut decoder) => return decoder.poll() + }; + + self.inner = new_value; + self.poll() + } +} + +impl Future for Pending { + type Item = Inner; + type Error = error::Error; + + fn poll(&mut self) -> Poll { + let body_state = match *self { + Pending::Gzip(ref mut body) => { + match body.poll_stream() { + Ok(Async::Ready(state)) => state, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => return Err(e) + } + }, + Pending::Empty => panic!("poll for a decoder after it's done") + }; + + match mem::replace(self, Pending::Empty) { + Pending::Gzip(body) => { + // libflate does a read_exact([0; 2]), so its impossible to tell + // if the stream was empty, or truly had an UnexpectedEof. + // Therefore, we need to check for EOF first. + match body_state { + StreamState::Eof => Ok(Async::Ready(Inner::PlainText(body::empty()))), + StreamState::HasMore => Ok(Async::Ready(Inner::Gzip(Gzip::new(body)))) + } + }, + Pending::Empty => panic!("invalid internal state") + } + } +} + +impl Gzip { + fn new(stream: ReadableChunks) -> Self { + Gzip { + buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), + inner: gzip::Decoder::new(Peeked::new(stream)) + } + } +} + +impl Stream for Gzip { + type Item = Chunk; + type Error = error::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + if self.buf.remaining_mut() == 0 { + self.buf.reserve(INIT_BUFFER_SIZE); + } + + // The buffer contains uninitialised memory so getting a readable slice is unsafe. + // We trust the `libflate` writer not to read from the memory given. + // + // To be safe, this memory could be zeroed before passing to `libflate`. + // Otherwise we might need to deal with the case where `libflate` panics. + let read = { + let mut buf = unsafe { self.buf.bytes_mut() }; + self.inner.read(&mut buf) + }; + + match read { + Ok(read) if read == 0 => { + Ok(Async::Ready(None)) + }, + Ok(read) => { + unsafe { self.buf.advance_mut(read) }; + let chunk = body::chunk(self.buf.split_to(read).freeze()); + + Ok(Async::Ready(Some(chunk))) + }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + Ok(Async::NotReady) + }, + Err(e) => Err(error::from(e)) + } + } +} + +/// A `Read`able wrapper over a stream of chunks. +pub struct ReadableChunks { + state: ReadState, + stream: S, +} + +enum ReadState { + /// A chunk is ready to be read from. + Ready(Chunk, usize), + /// The next chunk isn't ready yet. + NotReady, + /// The stream has finished. + Eof, +} + +enum StreamState { + /// More bytes can be read from the stream. + HasMore, + /// No more bytes can be read from the stream. + Eof +} + +/// A buffering reader that ensures `Read`s return at least a few bytes. +struct Peeked { + state: PeekedState, + peeked_buf: [u8; 2], + pos: usize, + inner: R, +} + +enum PeekedState { + /// The internal buffer hasn't filled yet. + NotReady, + /// The internal buffer can be read. + Ready(usize) +} + +impl Peeked { + #[inline] + fn new(inner: R) -> Self { + Peeked { + state: PeekedState::NotReady, + peeked_buf: [0; 2], + inner: inner, + pos: 0, + } + } + + #[inline] + fn ready(&mut self) { + self.state = PeekedState::Ready(self.pos); + self.pos = 0; + } + + #[inline] + fn not_ready(&mut self) { + self.state = PeekedState::NotReady; + self.pos = 0; + } +} + +impl Read for Peeked { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + loop { + match self.state { + PeekedState::Ready(peeked_buf_len) => { + let len = cmp::min(buf.len(), peeked_buf_len - self.pos); + let start = self.pos; + let end = self.pos + len; + + buf[..len].copy_from_slice(&self.peeked_buf[start..end]); + self.pos += len; + if self.pos == peeked_buf_len { + self.not_ready(); + } + + return Ok(len) + }, + PeekedState::NotReady => { + let read = self.inner.read(&mut self.peeked_buf[self.pos..]); + + match read { + Ok(0) => { + self.ready(); + }, + Ok(read) => { + self.pos += read; + if self.pos == self.peeked_buf.len() { + self.ready(); + } + }, + Err(e) => return Err(e) + } + } + }; + } + } +} + +impl ReadableChunks { + #[inline] + pub fn new(stream: S) -> Self { + ReadableChunks { + state: ReadState::NotReady, + stream: stream, + } + } +} + +impl fmt::Debug for ReadableChunks { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("ReadableChunks") + .finish() + } +} + +impl Read for ReadableChunks + where S: Stream +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result { + loop { + let ret; + match self.state { + ReadState::Ready(ref mut chunk, ref mut pos) => { + let chunk_start = *pos; + let len = cmp::min(buf.len(), chunk.len() - chunk_start); + let chunk_end = chunk_start + len; + + buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); + *pos += len; + if *pos == chunk.len() { + ret = len; + } else { + return Ok(len); + } + }, + ReadState::NotReady => { + match self.poll_stream() { + Ok(Async::Ready(StreamState::HasMore)) => continue, + Ok(Async::Ready(StreamState::Eof)) => { + return Ok(0) + }, + Ok(Async::NotReady) => { + return Err(io::ErrorKind::WouldBlock.into()) + }, + Err(e) => { + return Err(error::into_io(e)) + } + } + }, + ReadState::Eof => return Ok(0), + } + self.state = ReadState::NotReady; + return Ok(ret); + } + } +} + +impl ReadableChunks + where S: Stream +{ + /// Poll the readiness of the inner reader. + /// + /// This function will update the internal state and return a simplified + /// version of the `ReadState`. + fn poll_stream(&mut self) -> Poll { + match self.stream.poll() { + Ok(Async::Ready(Some(chunk))) => { + self.state = ReadState::Ready(chunk, 0); + + Ok(Async::Ready(StreamState::HasMore)) + }, + Ok(Async::Ready(None)) => { + self.state = ReadState::Eof; + + Ok(Async::Ready(StreamState::Eof)) + }, + Ok(Async::NotReady) => { + Ok(Async::NotReady) + }, + Err(e) => Err(e) + } + } +} + +// pub(crate) + +#[inline] +pub fn take(decoder: &mut Decoder) -> Decoder { + let inner = mem::replace(&mut decoder.inner, Inner::PlainText(body::empty())); + Decoder { + inner: inner, + } +} + +/// Constructs a Decoder from a hyper request. +/// +/// A decoder is just a wrapper around the hyper request that knows +/// how to decode the content body of the request. +/// +/// Uses the correct variant by inspecting the Content-Encoding header. +pub fn detect(headers: &mut Headers, body: Body, check_gzip: bool) -> Decoder { + if !check_gzip { + return Decoder::plain_text(body); + } + let content_encoding_gzip: bool; + let mut is_gzip = { + content_encoding_gzip = headers + .get::() + .map_or(false, |encs| encs.contains(&Encoding::Gzip)); + content_encoding_gzip || + headers + .get::() + .map_or(false, |encs| encs.contains(&Encoding::Gzip)) + }; + if is_gzip { + if let Some(content_length) = headers.get::() { + if content_length.0 == 0 { + warn!("GZipped response with content-length of 0"); + is_gzip = false; + } + } + } + if content_encoding_gzip { + headers.remove::(); + headers.remove::(); + } + if is_gzip { + Decoder::gzip(body) + } else { + Decoder::plain_text(body) + } +} diff --git a/src/async_impl/mod.rs b/src/async_impl/mod.rs index 0ee4a66..309a672 100644 --- a/src/async_impl/mod.rs +++ b/src/async_impl/mod.rs @@ -1,11 +1,13 @@ #![cfg_attr(not(features = "unstable"), allow(unused))] pub use self::body::{Body, Chunk}; +pub use self::decoder::{Decoder, ReadableChunks}; pub use self::client::{Client, ClientBuilder}; pub use self::request::{Request, RequestBuilder}; pub use self::response::Response; pub mod body; pub mod client; +pub mod decoder; mod request; mod response; diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index 6378cd8..c0080e0 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -1,15 +1,21 @@ use std::fmt; +use std::mem; +use std::io::{self, Read}; use std::marker::PhantomData; +use libflate::non_blocking::gzip; +use tokio_io::AsyncRead; +use tokio_io::io as async_io; use futures::{Async, Future, Poll, Stream}; use futures::stream::Concat2; -use header::Headers; use hyper::StatusCode; use serde::de::DeserializeOwned; use serde_json; use url::Url; -use super::{body, Body}; +use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding}; +use super::{decoder, body, Body, Chunk, Decoder}; +use error; /// A Response to a submitted `Request`. @@ -17,7 +23,7 @@ pub struct Response { status: StatusCode, headers: Headers, url: Url, - body: Body, + body: Decoder, } impl Response { @@ -45,17 +51,29 @@ impl Response { &mut self.headers } - /// Get a mutable reference to the `Body` of this `Response`. + /// Get a readable response body. + /// + /// The response will be decoded. #[inline] - pub fn body_mut(&mut self) -> &mut Body { + pub fn body_mut(&mut self) -> &mut Decoder { &mut self.body } + /// Get a readable response body. + /// + /// This function will replace the body on the response with an empty one. + #[inline] + pub fn body(&mut self) -> Decoder { + decoder::take(&mut self.body) + } + /// Try to deserialize the response body as JSON using `serde`. #[inline] pub fn json(&mut self) -> Json { + let body = self.body().concat2(); + Json { - concat: body::take(self.body_mut()).concat2(), + concat: body, _marker: PhantomData, } } @@ -95,7 +113,6 @@ impl Response { } } - impl fmt::Debug for Response { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Response") @@ -107,7 +124,7 @@ impl fmt::Debug for Response { } pub struct Json { - concat: Concat2, + concat: Concat2, _marker: PhantomData, } @@ -130,17 +147,15 @@ impl fmt::Debug for Json { // pub(crate) -pub fn new(mut res: ::hyper::client::Response, url: Url, _gzip: bool) -> Response { - use std::mem; - +pub fn new(mut res: ::hyper::client::Response, url: Url, gzip: bool) -> Response { let status = res.status(); - let headers = mem::replace(res.headers_mut(), Headers::new()); - let body = res.body(); + let mut headers = mem::replace(res.headers_mut(), Headers::new()); + let decoder = decoder::detect(&mut headers, body::wrap(res.body()), gzip); debug!("Response: '{}' for {}", status, url); Response { status: status, headers: headers, url: url, - body: super::body::wrap(body), + body: decoder, } } diff --git a/src/client.rs b/src/client.rs index a486b54..d357832 100644 --- a/src/client.rs +++ b/src/client.rs @@ -52,7 +52,6 @@ pub struct Client { /// # } /// ``` pub struct ClientBuilder { - gzip: bool, inner: async_impl::ClientBuilder, timeout: Option, } @@ -66,7 +65,6 @@ impl ClientBuilder { pub fn new() -> ::Result { async_impl::ClientBuilder::new().map(|builder| ClientBuilder { inner: builder, - gzip: true, timeout: None, }) } @@ -150,7 +148,6 @@ impl ClientBuilder { #[inline] pub fn gzip(&mut self, enable: bool) -> &mut ClientBuilder { self.inner.gzip(enable); - self.gzip = enable; self } @@ -312,7 +309,6 @@ impl fmt::Debug for ClientBuilder { #[derive(Clone)] struct ClientHandle { - gzip: bool, timeout: Option, inner: Arc } @@ -334,7 +330,6 @@ impl Drop for InnerClientHandle { impl ClientHandle { fn new(builder: &mut ClientBuilder) -> ::Result { - let gzip = builder.gzip; let timeout = builder.timeout; let mut builder = async_impl::client::take_builder(&mut builder.inner); let (tx, rx) = mpsc::unbounded(); @@ -383,7 +378,6 @@ impl ClientHandle { Ok(ClientHandle { - gzip: gzip, timeout: timeout, inner: inner_handle, }) @@ -412,7 +406,7 @@ impl ClientHandle { } }; res.map(|res| { - response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.inner.clone())) + response::new(res, self.timeout, KeepCoreThreadAlive(self.inner.clone())) }) } } diff --git a/src/response.rs b/src/response.rs index 06e9c2d..cdc7827 100644 --- a/src/response.rs +++ b/src/response.rs @@ -2,19 +2,18 @@ use std::fmt; use std::io::{self, Read}; use std::time::Duration; -use libflate::gzip; +use futures::{Async, Poll, Stream}; use serde::de::DeserializeOwned; use serde_json; use client::KeepCoreThreadAlive; -use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding}; +use header::Headers; use {async_impl, StatusCode, Url, wait}; - /// A Response to a submitted `Request`. pub struct Response { - body: Decoder, inner: async_impl::Response, + body: async_impl::ReadableChunks, _thread_handle: KeepCoreThreadAlive, } @@ -183,11 +182,11 @@ impl Response { /// ``` #[inline] pub fn error_for_status(self) -> ::Result { - let Response { body, inner, _thread_handle } = self; + let Response { inner, body, _thread_handle } = self; inner.error_for_status().map(move |inner| { Response { - body: body, inner: inner, + body: body, _thread_handle: _thread_handle, } }) @@ -201,189 +200,40 @@ impl Read for Response { } } -struct ReadableBody { - state: ReadState, - stream: wait::WaitStream, +struct WaitBody { + inner: wait::WaitStream } -enum ReadState { - Ready(async_impl::Chunk, usize), - NotReady, - Eof, -} +impl Stream for WaitBody { + type Item = ::Item; + type Error = ::Error; + fn poll(&mut self) -> Poll, Self::Error> { + match self.inner.next() { + Some(Ok(chunk)) => Ok(Async::Ready(Some(chunk))), + Some(Err(e)) => { + let req_err = match e { + wait::Waited::TimedOut => ::error::timedout(None), + wait::Waited::Err(e) => e, + }; -impl Read for ReadableBody { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - use std::cmp; - - loop { - let ret; - match self.state { - ReadState::Ready(ref mut chunk, ref mut pos) => { - let chunk_start = *pos; - let len = cmp::min(buf.len(), chunk.len() - chunk_start); - let chunk_end = chunk_start + len; - buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); - *pos += len; - if *pos == chunk.len() { - ret = len; - } else { - return Ok(len); - } - }, - ReadState::NotReady => { - match self.stream.next() { - Some(Ok(chunk)) => { - self.state = ReadState::Ready(chunk, 0); - continue; - }, - Some(Err(e)) => { - let req_err = match e { - wait::Waited::TimedOut => ::error::timedout(None), - wait::Waited::Err(e) => e, - }; - return Err(::error::into_io(req_err)); - }, - None => { - self.state = ReadState::Eof; - return Ok(0); - }, - } - }, - ReadState::Eof => return Ok(0), - } - self.state = ReadState::NotReady; - return Ok(ret); + Err(req_err) + }, + None => Ok(Async::Ready(None)), } } } - -enum Decoder { - /// A `PlainText` decoder just returns the response content as is. - PlainText(ReadableBody), - /// A `Gzip` decoder will uncompress the gziped response content before returning it. - Gzip(gzip::Decoder), - /// An error occured reading the Gzip header, so return that error - /// when the user tries to read on the `Response`. - Errored(Option), -} - -impl Decoder { - /// Constructs a Decoder from a hyper request. - /// - /// A decoder is just a wrapper around the hyper request that knows - /// how to decode the content body of the request. - /// - /// Uses the correct variant by inspecting the Content-Encoding header. - fn new(res: &mut async_impl::Response, check_gzip: bool, timeout: Option) -> Self { - let body = async_impl::body::take(res.body_mut()); - let body = ReadableBody { - state: ReadState::NotReady, - stream: wait::stream(body, timeout), - }; - - if !check_gzip { - return Decoder::PlainText(body); - } - let content_encoding_gzip: bool; - let mut is_gzip = { - content_encoding_gzip = res.headers() - .get::() - .map_or(false, |encs| encs.contains(&Encoding::Gzip)); - content_encoding_gzip || - res.headers() - .get::() - .map_or(false, |encs| encs.contains(&Encoding::Gzip)) - }; - if is_gzip { - if let Some(content_length) = res.headers().get::() { - if content_length.0 == 0 { - warn!("GZipped response with content-length of 0"); - is_gzip = false; - } - } - } - if content_encoding_gzip { - res.headers_mut().remove::(); - res.headers_mut().remove::(); - } - if is_gzip { - new_gzip(body) - } else { - Decoder::PlainText(body) - } - } -} - -fn new_gzip(mut body: ReadableBody) -> Decoder { - // libflate does a read_exact([0; 2]), so its impossible to tell - // if the stream was empty, or truly had an UnexpectedEof. - // Therefore, we need to peek a byte to make check for EOF first. - let mut peek = [0]; - match body.read(&mut peek) { - Ok(0) => return Decoder::PlainText(body), - Ok(n) => debug_assert_eq!(n, 1), - Err(e) => return Decoder::Errored(Some(e)), - } - - let reader = Peeked { - peeked: Some(peek[0]), - inner: body, - }; - match gzip::Decoder::new(reader) { - Ok(gzip) => Decoder::Gzip(gzip), - Err(e) => Decoder::Errored(Some(e)), - } -} - -struct Peeked { - peeked: Option, - inner: ReadableBody, -} - -impl Read for Peeked { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - if buf.is_empty() { - return Ok(0); - } - if let Some(byte) = self.peeked.take() { - buf[0] = byte; - Ok(1) - } else { - self.inner.read(buf) - } - } -} - -impl Read for Decoder { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match *self { - Decoder::PlainText(ref mut body) => body.read(buf), - Decoder::Gzip(ref mut decoder) => decoder.read(buf), - Decoder::Errored(ref mut err) => { - Err(err.take().unwrap_or_else(previously_errored)) - } - } - } -} - -#[inline] -fn previously_errored() -> io::Error { - io::Error::new(io::ErrorKind::Other, "permanently errored") -} - - // pub(crate) -pub fn new(mut res: async_impl::Response, gzip: bool, timeout: Option, thread: KeepCoreThreadAlive) -> Response { +pub fn new(mut res: async_impl::Response, timeout: Option, thread: KeepCoreThreadAlive) -> Response { + let body = async_impl::ReadableChunks::new(WaitBody { + inner: wait::stream(res.body(), timeout) + }); - let decoder = Decoder::new(&mut res, gzip, timeout); Response { - body: decoder, inner: res, + body: body, _thread_handle: thread, } } diff --git a/tests/async.rs b/tests/async.rs new file mode 100644 index 0000000..90af803 --- /dev/null +++ b/tests/async.rs @@ -0,0 +1,77 @@ +#![cfg(feature="unstable")] + +extern crate futures; +extern crate tokio_core; +extern crate tokio_io; +extern crate reqwest; +extern crate libflate; + +#[macro_use] +mod support; + +use futures::{Future, Stream}; +use tokio_core::reactor::Core; +use std::io::Write; +use std::time::Duration; + +#[test] +fn async_test_gzip_response() { + test_gzip(10_000, 4096); +} + +#[test] +fn async_test_gzip_single_byte_chunks() { + test_gzip(10, 1); +} + +fn test_gzip(response_size: usize, chunk_size: usize) { + let content: String = (0..response_size).into_iter().map(|i| format!("test {}", i)).collect(); + let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap(); + match encoder.write(content.as_bytes()) { + Ok(n) => assert!(n > 0, "Failed to write to encoder."), + _ => panic!("Failed to gzip encode string."), + }; + + let gzipped_content = encoder.finish().into_result().unwrap(); + + let mut response = format!("\ + HTTP/1.1 200 OK\r\n\ + Server: test-accept\r\n\ + Content-Encoding: gzip\r\n\ + Content-Length: {}\r\n\ + \r\n", &gzipped_content.len()) + .into_bytes(); + response.extend(&gzipped_content); + + let server = server! { + request: b"\ + GET /gzip HTTP/1.1\r\n\ + Host: $HOST\r\n\ + User-Agent: $USERAGENT\r\n\ + Accept: */*\r\n\ + Accept-Encoding: gzip\r\n\ + \r\n\ + ", + chunk_size: chunk_size, + write_timeout: Duration::from_millis(10), + response: response + }; + + let mut core = Core::new().unwrap(); + + let client = reqwest::unstable::async::Client::new(&core.handle()).unwrap(); + + let res_future = client.get(&format!("http://{}/gzip", server.addr())) + .unwrap() + .send() + .and_then(|mut res| res.body().concat2()) + .and_then(|buf| { + let body = ::std::str::from_utf8(&buf).unwrap(); + + assert_eq!(body, &content); + + Ok(()) + }); + + core.run(res_future).unwrap(); +} diff --git a/tests/gzip.rs b/tests/gzip.rs index 6e3a478..e720eef 100644 --- a/tests/gzip.rs +++ b/tests/gzip.rs @@ -4,12 +4,15 @@ extern crate libflate; #[macro_use] mod support; +use std::time::Duration; use std::io::{Read, Write}; #[test] fn test_gzip_response() { + let content: String = (0..50).into_iter().map(|i| format!("test {}", i)).collect(); + let chunk_size = content.len() / 3; let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap(); - match encoder.write(b"test request") { + match encoder.write(content.as_bytes()) { Ok(n) => assert!(n > 0, "Failed to write to encoder."), _ => panic!("Failed to gzip encode string."), }; @@ -34,6 +37,8 @@ fn test_gzip_response() { Accept-Encoding: gzip\r\n\ \r\n\ ", + chunk_size: chunk_size, + write_timeout: Duration::from_millis(10), response: response }; let mut res = reqwest::get(&format!("http://{}/gzip", server.addr())).unwrap(); @@ -41,7 +46,7 @@ fn test_gzip_response() { let mut body = String::new(); res.read_to_string(&mut body).unwrap(); - assert_eq!(body, "test request"); + assert_eq!(body, content); } #[test] diff --git a/tests/support/server.rs b/tests/support/server.rs index b56cb9f..420b49c 100644 --- a/tests/support/server.rs +++ b/tests/support/server.rs @@ -23,6 +23,7 @@ pub struct Txn { pub read_timeout: Option, pub response_timeout: Option, pub write_timeout: Option, + pub chunk_size: Option, } static DEFAULT_USER_AGENT: &'static str = @@ -55,10 +56,21 @@ pub fn spawn(txns: Vec) -> Server { } if let Some(dur) = txn.write_timeout { - let headers_end = ::std::str::from_utf8(&reply).unwrap().find("\r\n\r\n").unwrap() + 4; + let headers_end = b"\r\n\r\n"; + let headers_end = reply.windows(headers_end.len()).position(|w| w == headers_end).unwrap() + 4; socket.write_all(&reply[..headers_end]).unwrap(); - thread::park_timeout(dur); - socket.write_all(&reply[headers_end..]).unwrap(); + + let body = &reply[headers_end..]; + + if let Some(chunk_size) = txn.chunk_size { + for content in body.chunks(chunk_size) { + thread::park_timeout(dur); + socket.write_all(&content).unwrap(); + } + } else { + thread::park_timeout(dur); + socket.write_all(&body).unwrap(); + } } else { socket.write_all(&reply).unwrap(); }