Merge pull request #165 from KodrAus/feat/async-decoder
Support Async Gzip Decoding
This commit is contained in:
		| @@ -14,7 +14,7 @@ bytes = "0.4" | |||||||
| futures = "0.1.14" | futures = "0.1.14" | ||||||
| hyper = "0.11" | hyper = "0.11" | ||||||
| hyper-tls = "0.1.2" | hyper-tls = "0.1.2" | ||||||
| libflate = "0.1.5" | libflate = "0.1.11" | ||||||
| log = "0.3" | log = "0.3" | ||||||
| native-tls = "0.1.3" | native-tls = "0.1.3" | ||||||
| serde = "1.0" | serde = "1.0" | ||||||
|   | |||||||
| @@ -1,17 +1,40 @@ | |||||||
| #![deny(warnings)] | #![allow(warnings)] // remove when error_chain is fixed | ||||||
|  |  | ||||||
| extern crate futures; | extern crate futures; | ||||||
| extern crate reqwest; | extern crate reqwest; | ||||||
| extern crate tokio_core; | extern crate tokio_core; | ||||||
|  | #[macro_use] | ||||||
|  | extern crate error_chain; | ||||||
|  |  | ||||||
| use futures::Future; | use std::mem; | ||||||
|  | use std::io::{self, Cursor}; | ||||||
|  | use futures::{Future, Stream}; | ||||||
|  | use reqwest::unstable::async::{Client, Decoder}; | ||||||
|  |  | ||||||
|  | error_chain! { | ||||||
|  |     foreign_links { | ||||||
|  |         ReqError(reqwest::Error); | ||||||
|  |         IoError(io::Error); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| fn main() { | fn main() { | ||||||
|     let mut core = tokio_core::reactor::Core::new().unwrap(); |     let mut core = tokio_core::reactor::Core::new().unwrap(); | ||||||
|     let client = reqwest::unstable::async::Client::new(&core.handle()).unwrap(); |     let client = Client::new(&core.handle()).unwrap(); | ||||||
|  |  | ||||||
|     let work = client.get("https://hyper.rs").unwrap().send().map(|res| { |     let work = client.get("https://hyper.rs").unwrap() | ||||||
|         println!("{}", res.status()); |         .send() | ||||||
|     }); |         .map_err(|e| Error::from(e)) | ||||||
|  |         .and_then(|mut res| { | ||||||
|  |             println!("{}", res.status()); | ||||||
|  |  | ||||||
|  |             let body = mem::replace(res.body_mut(), Decoder::empty()); | ||||||
|  |             body.concat2().map_err(Into::into) | ||||||
|  |         }) | ||||||
|  |         .and_then(|body| { | ||||||
|  |             let mut body = Cursor::new(body); | ||||||
|  |             io::copy(&mut body, &mut io::stdout()).map_err(Into::into) | ||||||
|  |         }); | ||||||
|  |  | ||||||
|     core.run(work).unwrap(); |     core.run(work).unwrap(); | ||||||
| } | } | ||||||
|   | |||||||
| @@ -66,6 +66,13 @@ pub struct Chunk { | |||||||
|     inner: ::hyper::Chunk, |     inner: ::hyper::Chunk, | ||||||
| } | } | ||||||
|  |  | ||||||
|  | impl AsRef<[u8]> for Chunk { | ||||||
|  |     #[inline] | ||||||
|  |     fn as_ref(&self) -> &[u8] { | ||||||
|  |         &*self | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| impl ::std::ops::Deref for Chunk { | impl ::std::ops::Deref for Chunk { | ||||||
|     type Target = [u8]; |     type Target = [u8]; | ||||||
|     #[inline] |     #[inline] | ||||||
| @@ -121,6 +128,20 @@ pub fn take(body: &mut Body) -> Body { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[inline] | ||||||
|  | pub fn empty() -> Body { | ||||||
|  |     Body { | ||||||
|  |         inner: Inner::Hyper(::hyper::Body::empty()), | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[inline] | ||||||
|  | pub fn chunk(chunk: Bytes) -> Chunk { | ||||||
|  |     Chunk { | ||||||
|  |         inner: ::hyper::Chunk::from(chunk) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| #[inline] | #[inline] | ||||||
| pub fn reusable(chunk: Bytes) -> Body { | pub fn reusable(chunk: Bytes) -> Body { | ||||||
|     Body { |     Body { | ||||||
|   | |||||||
							
								
								
									
										441
									
								
								src/async_impl/decoder.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										441
									
								
								src/async_impl/decoder.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,441 @@ | |||||||
|  | /*! | ||||||
|  | A potentially non-blocking response decoder. | ||||||
|  |  | ||||||
|  | The decoder wraps a stream of chunks and produces a new stream of decompressed chunks. | ||||||
|  | The decompressed chunks aren't guaranteed to align to the compressed ones. | ||||||
|  |  | ||||||
|  | If the response is plaintext then no additional work is carried out. | ||||||
|  | Chunks are just passed along. | ||||||
|  |  | ||||||
|  | If the response is gzip, then the chunks are decompressed into a buffer. | ||||||
|  | Slices of that buffer are emitted as new chunks. | ||||||
|  |  | ||||||
|  | This module consists of a few main types: | ||||||
|  |  | ||||||
|  | - `ReadableChunks` is a `Read`-like wrapper around a stream | ||||||
|  | - `Decoder` is a layer over `ReadableChunks` that applies the right decompression | ||||||
|  |  | ||||||
|  | The following types directly support the gzip compression case: | ||||||
|  |  | ||||||
|  | - `Pending` is a non-blocking constructor for a `Decoder` in case the body needs to be checked for EOF | ||||||
|  | - `Peeked` is a buffer that keeps a few bytes available so `libflate`s `read_exact` calls won't fail | ||||||
|  | */ | ||||||
|  |  | ||||||
|  | use std::fmt; | ||||||
|  | use std::mem; | ||||||
|  | use std::cmp; | ||||||
|  | use std::io::{self, Read}; | ||||||
|  | use std::marker::PhantomData; | ||||||
|  |  | ||||||
|  | use bytes::{BufMut, BytesMut}; | ||||||
|  | use libflate::non_blocking::gzip; | ||||||
|  | use tokio_io::AsyncRead; | ||||||
|  | use tokio_io::io as async_io; | ||||||
|  | use futures::{Async, Future, Poll, Stream}; | ||||||
|  | use futures::stream::Concat2; | ||||||
|  | use hyper::StatusCode; | ||||||
|  | use serde::de::DeserializeOwned; | ||||||
|  | use serde_json; | ||||||
|  | use url::Url; | ||||||
|  |  | ||||||
|  | use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding}; | ||||||
|  | use super::{body, Body, Chunk}; | ||||||
|  | use error; | ||||||
|  |  | ||||||
|  | const INIT_BUFFER_SIZE: usize = 8192; | ||||||
|  |  | ||||||
|  | /// A response decompressor over a non-blocking stream of chunks. | ||||||
|  | ///  | ||||||
|  | /// The inner decoder may be constructed asynchronously. | ||||||
|  | pub struct Decoder { | ||||||
|  |     inner: Inner | ||||||
|  | } | ||||||
|  |  | ||||||
|  | enum Inner { | ||||||
|  |     /// A `PlainText` decoder just returns the response content as is. | ||||||
|  |     PlainText(Body), | ||||||
|  |     /// A `Gzip` decoder will uncompress the gzipped response content before returning it. | ||||||
|  |     Gzip(Gzip), | ||||||
|  |     /// A decoder that doesn't have a value yet. | ||||||
|  |     Pending(Pending) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | enum Pending { | ||||||
|  |     /// An unreachable internal state. | ||||||
|  |     Empty, | ||||||
|  |     /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. | ||||||
|  |     Gzip(ReadableChunks<Body>) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results | ||||||
|  | /// as a `Chunk`. | ||||||
|  | struct Gzip { | ||||||
|  |     inner: gzip::Decoder<Peeked<ReadableChunks<Body>>>, | ||||||
|  |     buf: BytesMut, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl fmt::Debug for Decoder { | ||||||
|  |     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||||
|  |         f.debug_struct("Decoder") | ||||||
|  |             .finish() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl Decoder { | ||||||
|  |     /// An empty decoder. | ||||||
|  |     ///  | ||||||
|  |     /// This decoder will produce a single 0 byte chunk. | ||||||
|  |     #[inline] | ||||||
|  |     pub fn empty() -> Decoder { | ||||||
|  |         Decoder { | ||||||
|  |             inner: Inner::PlainText(body::empty()) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// A plain text decoder. | ||||||
|  |     ///  | ||||||
|  |     /// This decoder will emit the underlying chunks as-is. | ||||||
|  |     #[inline] | ||||||
|  |     fn plain_text(body: Body) -> Decoder { | ||||||
|  |         Decoder { | ||||||
|  |             inner: Inner::PlainText(body) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// A gzip decoder. | ||||||
|  |     ///  | ||||||
|  |     /// This decoder will buffer and decompress chunks that are gzipped. | ||||||
|  |     #[inline] | ||||||
|  |     fn gzip(mut body: Body) -> Decoder { | ||||||
|  |         Decoder { | ||||||
|  |             inner: Inner::Pending(Pending::Gzip(ReadableChunks::new(body))) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl Stream for Decoder { | ||||||
|  |     type Item = Chunk; | ||||||
|  |     type Error = error::Error; | ||||||
|  |  | ||||||
|  |     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||||
|  |         // Do a read or poll for a pendidng decoder value. | ||||||
|  |         let new_value = match self.inner { | ||||||
|  |             Inner::Pending(ref mut future) => { | ||||||
|  |                 match future.poll() { | ||||||
|  |                     Ok(Async::Ready(inner)) => inner, | ||||||
|  |                     Ok(Async::NotReady) => return Ok(Async::NotReady), | ||||||
|  |                     Err(e) => return Err(e) | ||||||
|  |                 } | ||||||
|  |             }, | ||||||
|  |             Inner::PlainText(ref mut body) => return body.poll(), | ||||||
|  |             Inner::Gzip(ref mut decoder) => return decoder.poll() | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         self.inner = new_value; | ||||||
|  |         self.poll() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl Future for Pending { | ||||||
|  |     type Item = Inner; | ||||||
|  |     type Error = error::Error; | ||||||
|  |  | ||||||
|  |     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||||
|  |         let body_state = match *self { | ||||||
|  |             Pending::Gzip(ref mut body) => { | ||||||
|  |                 match body.poll_stream() { | ||||||
|  |                     Ok(Async::Ready(state)) => state, | ||||||
|  |                     Ok(Async::NotReady) => return Ok(Async::NotReady), | ||||||
|  |                     Err(e) => return Err(e) | ||||||
|  |                 } | ||||||
|  |             }, | ||||||
|  |             Pending::Empty => panic!("poll for a decoder after it's done") | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         match mem::replace(self, Pending::Empty) { | ||||||
|  |             Pending::Gzip(body) => { | ||||||
|  |                 // libflate does a read_exact([0; 2]), so its impossible to tell | ||||||
|  |                 // if the stream was empty, or truly had an UnexpectedEof. | ||||||
|  |                 // Therefore, we need to check for EOF first. | ||||||
|  |                 match body_state { | ||||||
|  |                     StreamState::Eof => Ok(Async::Ready(Inner::PlainText(body::empty()))), | ||||||
|  |                     StreamState::HasMore => Ok(Async::Ready(Inner::Gzip(Gzip::new(body)))) | ||||||
|  |                 } | ||||||
|  |             }, | ||||||
|  |             Pending::Empty => panic!("invalid internal state") | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl Gzip { | ||||||
|  |     fn new(stream: ReadableChunks<Body>) -> Self { | ||||||
|  |         Gzip { | ||||||
|  |             buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), | ||||||
|  |             inner: gzip::Decoder::new(Peeked::new(stream)) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl Stream for Gzip { | ||||||
|  |     type Item = Chunk; | ||||||
|  |     type Error = error::Error; | ||||||
|  |  | ||||||
|  |     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||||
|  |         if self.buf.remaining_mut() == 0 { | ||||||
|  |             self.buf.reserve(INIT_BUFFER_SIZE); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // The buffer contains uninitialised memory so getting a readable slice is unsafe. | ||||||
|  |         // We trust the `libflate` writer not to read from the memory given. | ||||||
|  |         //  | ||||||
|  |         // To be safe, this memory could be zeroed before passing to `libflate`. | ||||||
|  |         // Otherwise we might need to deal with the case where `libflate` panics. | ||||||
|  |         let read = { | ||||||
|  |             let mut buf = unsafe { self.buf.bytes_mut() }; | ||||||
|  |             self.inner.read(&mut buf) | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         match read { | ||||||
|  |             Ok(read) if read == 0 => { | ||||||
|  |                 Ok(Async::Ready(None)) | ||||||
|  |             }, | ||||||
|  |             Ok(read) => { | ||||||
|  |                 unsafe { self.buf.advance_mut(read) }; | ||||||
|  |                 let chunk = body::chunk(self.buf.split_to(read).freeze()); | ||||||
|  |  | ||||||
|  |                 Ok(Async::Ready(Some(chunk))) | ||||||
|  |             }, | ||||||
|  |             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { | ||||||
|  |                 Ok(Async::NotReady) | ||||||
|  |             }, | ||||||
|  |             Err(e) => Err(error::from(e)) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /// A `Read`able wrapper over a stream of chunks. | ||||||
|  | pub struct ReadableChunks<S> { | ||||||
|  |     state: ReadState, | ||||||
|  |     stream: S, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | enum ReadState { | ||||||
|  |     /// A chunk is ready to be read from. | ||||||
|  |     Ready(Chunk, usize), | ||||||
|  |     /// The next chunk isn't ready yet. | ||||||
|  |     NotReady, | ||||||
|  |     /// The stream has finished. | ||||||
|  |     Eof, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | enum StreamState { | ||||||
|  |     /// More bytes can be read from the stream. | ||||||
|  |     HasMore, | ||||||
|  |     /// No more bytes can be read from the stream. | ||||||
|  |     Eof | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /// A buffering reader that ensures `Read`s return at least a few bytes. | ||||||
|  | struct Peeked<R> { | ||||||
|  |     state: PeekedState, | ||||||
|  |     peeked_buf: [u8; 2], | ||||||
|  |     pos: usize, | ||||||
|  |     inner: R, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | enum PeekedState { | ||||||
|  |     /// The internal buffer hasn't filled yet. | ||||||
|  |     NotReady, | ||||||
|  |     /// The internal buffer can be read. | ||||||
|  |     Ready(usize) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<R> Peeked<R> { | ||||||
|  |     #[inline] | ||||||
|  |     fn new(inner: R) -> Self { | ||||||
|  |         Peeked { | ||||||
|  |             state: PeekedState::NotReady, | ||||||
|  |             peeked_buf: [0; 2], | ||||||
|  |             inner: inner, | ||||||
|  |             pos: 0, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[inline] | ||||||
|  |     fn ready(&mut self) { | ||||||
|  |         self.state = PeekedState::Ready(self.pos); | ||||||
|  |         self.pos = 0; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[inline] | ||||||
|  |     fn not_ready(&mut self) { | ||||||
|  |         self.state = PeekedState::NotReady; | ||||||
|  |         self.pos = 0; | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<R: Read> Read for Peeked<R> { | ||||||
|  |     #[inline] | ||||||
|  |     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||||||
|  |         loop { | ||||||
|  |             match self.state { | ||||||
|  |                 PeekedState::Ready(peeked_buf_len) => { | ||||||
|  |                     let len = cmp::min(buf.len(), peeked_buf_len - self.pos); | ||||||
|  |                     let start = self.pos; | ||||||
|  |                     let end = self.pos + len; | ||||||
|  |  | ||||||
|  |                     buf[..len].copy_from_slice(&self.peeked_buf[start..end]); | ||||||
|  |                     self.pos += len; | ||||||
|  |                     if self.pos == peeked_buf_len { | ||||||
|  |                         self.not_ready(); | ||||||
|  |                     } | ||||||
|  |  | ||||||
|  |                     return Ok(len) | ||||||
|  |                 }, | ||||||
|  |                 PeekedState::NotReady => { | ||||||
|  |                     let read = self.inner.read(&mut self.peeked_buf[self.pos..]); | ||||||
|  |  | ||||||
|  |                     match read { | ||||||
|  |                         Ok(0) => { | ||||||
|  |                             self.ready(); | ||||||
|  |                         }, | ||||||
|  |                         Ok(read) => { | ||||||
|  |                             self.pos += read; | ||||||
|  |                             if self.pos == self.peeked_buf.len() { | ||||||
|  |                                 self.ready(); | ||||||
|  |                             } | ||||||
|  |                         }, | ||||||
|  |                         Err(e) => return Err(e) | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             }; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<S> ReadableChunks<S> { | ||||||
|  |     #[inline] | ||||||
|  |     pub fn new(stream: S) -> Self { | ||||||
|  |         ReadableChunks { | ||||||
|  |             state: ReadState::NotReady, | ||||||
|  |             stream: stream, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<S> fmt::Debug for ReadableChunks<S> { | ||||||
|  |     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||||
|  |         f.debug_struct("ReadableChunks") | ||||||
|  |             .finish() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<S> Read for ReadableChunks<S>  | ||||||
|  |     where S: Stream<Item = Chunk, Error = error::Error> | ||||||
|  | { | ||||||
|  |     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||||||
|  |         loop { | ||||||
|  |             let ret; | ||||||
|  |             match self.state { | ||||||
|  |                 ReadState::Ready(ref mut chunk, ref mut pos) => { | ||||||
|  |                     let chunk_start = *pos; | ||||||
|  |                     let len = cmp::min(buf.len(), chunk.len() - chunk_start); | ||||||
|  |                     let chunk_end = chunk_start + len; | ||||||
|  |  | ||||||
|  |                     buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); | ||||||
|  |                     *pos += len; | ||||||
|  |                     if *pos == chunk.len() { | ||||||
|  |                         ret = len; | ||||||
|  |                     } else { | ||||||
|  |                         return Ok(len); | ||||||
|  |                     } | ||||||
|  |                 }, | ||||||
|  |                 ReadState::NotReady => { | ||||||
|  |                     match self.poll_stream() { | ||||||
|  |                         Ok(Async::Ready(StreamState::HasMore)) => continue, | ||||||
|  |                         Ok(Async::Ready(StreamState::Eof)) => { | ||||||
|  |                             return Ok(0) | ||||||
|  |                         }, | ||||||
|  |                         Ok(Async::NotReady) => { | ||||||
|  |                             return Err(io::ErrorKind::WouldBlock.into()) | ||||||
|  |                         }, | ||||||
|  |                         Err(e) => { | ||||||
|  |                             return Err(error::into_io(e)) | ||||||
|  |                         } | ||||||
|  |                     } | ||||||
|  |                 }, | ||||||
|  |                 ReadState::Eof => return Ok(0), | ||||||
|  |             } | ||||||
|  |             self.state = ReadState::NotReady; | ||||||
|  |             return Ok(ret); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<S> ReadableChunks<S>  | ||||||
|  |     where S: Stream<Item = Chunk, Error = error::Error> | ||||||
|  | { | ||||||
|  |     /// Poll the readiness of the inner reader. | ||||||
|  |     ///  | ||||||
|  |     /// This function will update the internal state and return a simplified | ||||||
|  |     /// version of the `ReadState`. | ||||||
|  |     fn poll_stream(&mut self) -> Poll<StreamState, error::Error> { | ||||||
|  |         match self.stream.poll() { | ||||||
|  |             Ok(Async::Ready(Some(chunk))) => { | ||||||
|  |                 self.state = ReadState::Ready(chunk, 0); | ||||||
|  |  | ||||||
|  |                 Ok(Async::Ready(StreamState::HasMore)) | ||||||
|  |             }, | ||||||
|  |             Ok(Async::Ready(None)) => { | ||||||
|  |                 self.state = ReadState::Eof; | ||||||
|  |  | ||||||
|  |                 Ok(Async::Ready(StreamState::Eof)) | ||||||
|  |             }, | ||||||
|  |             Ok(Async::NotReady) => { | ||||||
|  |                 Ok(Async::NotReady) | ||||||
|  |             }, | ||||||
|  |             Err(e) => Err(e) | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // pub(crate) | ||||||
|  |  | ||||||
|  | /// Constructs a Decoder from a hyper request. | ||||||
|  | /// | ||||||
|  | /// A decoder is just a wrapper around the hyper request that knows | ||||||
|  | /// how to decode the content body of the request. | ||||||
|  | /// | ||||||
|  | /// Uses the correct variant by inspecting the Content-Encoding header. | ||||||
|  | pub fn detect(headers: &mut Headers, body: Body, check_gzip: bool) -> Decoder { | ||||||
|  |     if !check_gzip { | ||||||
|  |         return Decoder::plain_text(body); | ||||||
|  |     } | ||||||
|  |     let content_encoding_gzip: bool; | ||||||
|  |     let mut is_gzip = { | ||||||
|  |         content_encoding_gzip = headers | ||||||
|  |             .get::<ContentEncoding>() | ||||||
|  |             .map_or(false, |encs| encs.contains(&Encoding::Gzip)); | ||||||
|  |         content_encoding_gzip || | ||||||
|  |         headers | ||||||
|  |             .get::<TransferEncoding>() | ||||||
|  |             .map_or(false, |encs| encs.contains(&Encoding::Gzip)) | ||||||
|  |     }; | ||||||
|  |     if is_gzip { | ||||||
|  |         if let Some(content_length) = headers.get::<ContentLength>() { | ||||||
|  |             if content_length.0 == 0 { | ||||||
|  |                 warn!("GZipped response with content-length of 0"); | ||||||
|  |                 is_gzip = false; | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |     if content_encoding_gzip { | ||||||
|  |         headers.remove::<ContentEncoding>(); | ||||||
|  |         headers.remove::<ContentLength>(); | ||||||
|  |     } | ||||||
|  |     if is_gzip { | ||||||
|  |         Decoder::gzip(body) | ||||||
|  |     } else { | ||||||
|  |         Decoder::plain_text(body) | ||||||
|  |     } | ||||||
|  | } | ||||||
| @@ -1,11 +1,13 @@ | |||||||
| #![cfg_attr(not(features = "unstable"), allow(unused))] | #![cfg_attr(not(features = "unstable"), allow(unused))] | ||||||
|  |  | ||||||
| pub use self::body::{Body, Chunk}; | pub use self::body::{Body, Chunk}; | ||||||
|  | pub use self::decoder::{Decoder, ReadableChunks}; | ||||||
| pub use self::client::{Client, ClientBuilder}; | pub use self::client::{Client, ClientBuilder}; | ||||||
| pub use self::request::{Request, RequestBuilder}; | pub use self::request::{Request, RequestBuilder}; | ||||||
| pub use self::response::Response; | pub use self::response::Response; | ||||||
|  |  | ||||||
| pub mod body; | pub mod body; | ||||||
| pub mod client; | pub mod client; | ||||||
|  | pub mod decoder; | ||||||
| mod request; | mod request; | ||||||
| mod response; | mod response; | ||||||
|   | |||||||
| @@ -1,15 +1,21 @@ | |||||||
| use std::fmt; | use std::fmt; | ||||||
|  | use std::mem; | ||||||
|  | use std::io::{self, Read}; | ||||||
| use std::marker::PhantomData; | use std::marker::PhantomData; | ||||||
|  |  | ||||||
|  | use libflate::non_blocking::gzip; | ||||||
|  | use tokio_io::AsyncRead; | ||||||
|  | use tokio_io::io as async_io; | ||||||
| use futures::{Async, Future, Poll, Stream}; | use futures::{Async, Future, Poll, Stream}; | ||||||
| use futures::stream::Concat2; | use futures::stream::Concat2; | ||||||
| use header::Headers; |  | ||||||
| use hyper::StatusCode; | use hyper::StatusCode; | ||||||
| use serde::de::DeserializeOwned; | use serde::de::DeserializeOwned; | ||||||
| use serde_json; | use serde_json; | ||||||
| use url::Url; | use url::Url; | ||||||
|  |  | ||||||
| use super::{body, Body}; | use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding}; | ||||||
|  | use super::{decoder, body, Body, Chunk, Decoder}; | ||||||
|  | use error; | ||||||
|  |  | ||||||
|  |  | ||||||
| /// A Response to a submitted `Request`. | /// A Response to a submitted `Request`. | ||||||
| @@ -17,7 +23,7 @@ pub struct Response { | |||||||
|     status: StatusCode, |     status: StatusCode, | ||||||
|     headers: Headers, |     headers: Headers, | ||||||
|     url: Url, |     url: Url, | ||||||
|     body: Body, |     body: Decoder, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Response { | impl Response { | ||||||
| @@ -45,17 +51,29 @@ impl Response { | |||||||
|         &mut self.headers |         &mut self.headers | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Get a mutable reference to the `Body` of this `Response`. |     /// Get a readable response body. | ||||||
|  |     ///  | ||||||
|  |     /// The response will be decoded. | ||||||
|     #[inline] |     #[inline] | ||||||
|     pub fn body_mut(&mut self) -> &mut Body { |     pub fn body_mut(&mut self) -> &mut Decoder { | ||||||
|         &mut self.body |         &mut self.body | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Get a readable response body. | ||||||
|  |     ///  | ||||||
|  |     /// This function will replace the body on the response with an empty one. | ||||||
|  |     #[inline] | ||||||
|  |     pub fn body(&self) -> &Decoder { | ||||||
|  |         &self.body | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Try to deserialize the response body as JSON using `serde`. |     /// Try to deserialize the response body as JSON using `serde`. | ||||||
|     #[inline] |     #[inline] | ||||||
|     pub fn json<T: DeserializeOwned>(&mut self) -> Json<T> { |     pub fn json<T: DeserializeOwned>(&mut self) -> Json<T> { | ||||||
|  |         let body = mem::replace(&mut self.body, Decoder::empty()); | ||||||
|  |          | ||||||
|         Json { |         Json { | ||||||
|             concat: body::take(self.body_mut()).concat2(), |             concat: body.concat2(), | ||||||
|             _marker: PhantomData, |             _marker: PhantomData, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| @@ -95,7 +113,6 @@ impl Response { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
| impl fmt::Debug for Response { | impl fmt::Debug for Response { | ||||||
|     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||||||
|         f.debug_struct("Response") |         f.debug_struct("Response") | ||||||
| @@ -107,7 +124,7 @@ impl fmt::Debug for Response { | |||||||
| } | } | ||||||
|  |  | ||||||
| pub struct Json<T> { | pub struct Json<T> { | ||||||
|     concat: Concat2<Body>, |     concat: Concat2<Decoder>, | ||||||
|     _marker: PhantomData<T>, |     _marker: PhantomData<T>, | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -130,17 +147,15 @@ impl<T> fmt::Debug for Json<T> { | |||||||
|  |  | ||||||
| // pub(crate) | // pub(crate) | ||||||
|  |  | ||||||
| pub fn new(mut res: ::hyper::client::Response, url: Url, _gzip: bool) -> Response { | pub fn new(mut res: ::hyper::client::Response, url: Url, gzip: bool) -> Response { | ||||||
|     use std::mem; |  | ||||||
|  |  | ||||||
|     let status = res.status(); |     let status = res.status(); | ||||||
|     let headers = mem::replace(res.headers_mut(), Headers::new()); |     let mut headers = mem::replace(res.headers_mut(), Headers::new()); | ||||||
|     let body = res.body(); |     let decoder = decoder::detect(&mut headers, body::wrap(res.body()), gzip); | ||||||
|     debug!("Response: '{}' for {}", status, url); |     debug!("Response: '{}' for {}", status, url); | ||||||
|     Response { |     Response { | ||||||
|         status: status, |         status: status, | ||||||
|         headers: headers, |         headers: headers, | ||||||
|         url: url, |         url: url, | ||||||
|         body: super::body::wrap(body), |         body: decoder, | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -52,7 +52,6 @@ pub struct Client { | |||||||
| /// # } | /// # } | ||||||
| /// ``` | /// ``` | ||||||
| pub struct ClientBuilder { | pub struct ClientBuilder { | ||||||
|     gzip: bool, |  | ||||||
|     inner: async_impl::ClientBuilder, |     inner: async_impl::ClientBuilder, | ||||||
|     timeout: Option<Duration>, |     timeout: Option<Duration>, | ||||||
| } | } | ||||||
| @@ -66,7 +65,6 @@ impl ClientBuilder { | |||||||
|     pub fn new() -> ::Result<ClientBuilder> { |     pub fn new() -> ::Result<ClientBuilder> { | ||||||
|         async_impl::ClientBuilder::new().map(|builder| ClientBuilder { |         async_impl::ClientBuilder::new().map(|builder| ClientBuilder { | ||||||
|             inner: builder, |             inner: builder, | ||||||
|             gzip: true, |  | ||||||
|             timeout: None, |             timeout: None, | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| @@ -150,7 +148,6 @@ impl ClientBuilder { | |||||||
|     #[inline] |     #[inline] | ||||||
|     pub fn gzip(&mut self, enable: bool) -> &mut ClientBuilder { |     pub fn gzip(&mut self, enable: bool) -> &mut ClientBuilder { | ||||||
|         self.inner.gzip(enable); |         self.inner.gzip(enable); | ||||||
|         self.gzip = enable; |  | ||||||
|         self |         self | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -312,7 +309,6 @@ impl fmt::Debug for ClientBuilder { | |||||||
|  |  | ||||||
| #[derive(Clone)] | #[derive(Clone)] | ||||||
| struct ClientHandle { | struct ClientHandle { | ||||||
|     gzip: bool, |  | ||||||
|     timeout: Option<Duration>, |     timeout: Option<Duration>, | ||||||
|     inner: Arc<InnerClientHandle> |     inner: Arc<InnerClientHandle> | ||||||
| } | } | ||||||
| @@ -334,7 +330,6 @@ impl Drop for InnerClientHandle { | |||||||
| impl ClientHandle { | impl ClientHandle { | ||||||
|     fn new(builder: &mut ClientBuilder) -> ::Result<ClientHandle> { |     fn new(builder: &mut ClientBuilder) -> ::Result<ClientHandle> { | ||||||
|  |  | ||||||
|         let gzip = builder.gzip; |  | ||||||
|         let timeout = builder.timeout; |         let timeout = builder.timeout; | ||||||
|         let mut builder = async_impl::client::take_builder(&mut builder.inner); |         let mut builder = async_impl::client::take_builder(&mut builder.inner); | ||||||
|         let (tx, rx) = mpsc::unbounded(); |         let (tx, rx) = mpsc::unbounded(); | ||||||
| @@ -383,7 +378,6 @@ impl ClientHandle { | |||||||
|  |  | ||||||
|  |  | ||||||
|         Ok(ClientHandle { |         Ok(ClientHandle { | ||||||
|             gzip: gzip, |  | ||||||
|             timeout: timeout, |             timeout: timeout, | ||||||
|             inner: inner_handle, |             inner: inner_handle, | ||||||
|         }) |         }) | ||||||
| @@ -412,7 +406,7 @@ impl ClientHandle { | |||||||
|             } |             } | ||||||
|         }; |         }; | ||||||
|         res.map(|res| { |         res.map(|res| { | ||||||
|             response::new(res, self.gzip, self.timeout, KeepCoreThreadAlive(self.inner.clone())) |             response::new(res, self.timeout, KeepCoreThreadAlive(self.inner.clone())) | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -172,6 +172,7 @@ pub mod unstable { | |||||||
|         pub use ::async_impl::{ |         pub use ::async_impl::{ | ||||||
|             Body, |             Body, | ||||||
|             Chunk, |             Chunk, | ||||||
|  |             Decoder, | ||||||
|             Client, |             Client, | ||||||
|             ClientBuilder, |             ClientBuilder, | ||||||
|             Request, |             Request, | ||||||
|   | |||||||
							
								
								
									
										202
									
								
								src/response.rs
									
									
									
									
									
								
							
							
						
						
									
										202
									
								
								src/response.rs
									
									
									
									
									
								
							| @@ -1,20 +1,20 @@ | |||||||
|  | use std::mem; | ||||||
| use std::fmt; | use std::fmt; | ||||||
| use std::io::{self, Read}; | use std::io::{self, Read}; | ||||||
| use std::time::Duration; | use std::time::Duration; | ||||||
|  |  | ||||||
| use libflate::gzip; | use futures::{Async, Poll, Stream}; | ||||||
| use serde::de::DeserializeOwned; | use serde::de::DeserializeOwned; | ||||||
| use serde_json; | use serde_json; | ||||||
|  |  | ||||||
| use client::KeepCoreThreadAlive; | use client::KeepCoreThreadAlive; | ||||||
| use header::{Headers, ContentEncoding, ContentLength, Encoding, TransferEncoding}; | use header::Headers; | ||||||
| use {async_impl, StatusCode, Url, wait}; | use {async_impl, StatusCode, Url, wait}; | ||||||
|  |  | ||||||
|  |  | ||||||
| /// A Response to a submitted `Request`. | /// A Response to a submitted `Request`. | ||||||
| pub struct Response { | pub struct Response { | ||||||
|     body: Decoder, |  | ||||||
|     inner: async_impl::Response, |     inner: async_impl::Response, | ||||||
|  |     body: async_impl::ReadableChunks<WaitBody>, | ||||||
|     _thread_handle: KeepCoreThreadAlive, |     _thread_handle: KeepCoreThreadAlive, | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -186,8 +186,8 @@ impl Response { | |||||||
|         let Response { body, inner, _thread_handle } = self; |         let Response { body, inner, _thread_handle } = self; | ||||||
|         inner.error_for_status().map(move |inner| { |         inner.error_for_status().map(move |inner| { | ||||||
|             Response { |             Response { | ||||||
|                 body: body, |  | ||||||
|                 inner: inner, |                 inner: inner, | ||||||
|  |                 body: body, | ||||||
|                 _thread_handle: _thread_handle, |                 _thread_handle: _thread_handle, | ||||||
|             } |             } | ||||||
|         }) |         }) | ||||||
| @@ -201,189 +201,41 @@ impl Read for Response { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| struct ReadableBody { | struct WaitBody { | ||||||
|     state: ReadState, |     inner: wait::WaitStream<async_impl::Decoder> | ||||||
|     stream:  wait::WaitStream<async_impl::Body>, |  | ||||||
| } | } | ||||||
|  |  | ||||||
| enum ReadState { | impl Stream for WaitBody { | ||||||
|     Ready(async_impl::Chunk, usize), |     type Item = <async_impl::Decoder as Stream>::Item; | ||||||
|     NotReady, |     type Error = <async_impl::Decoder as Stream>::Error; | ||||||
|     Eof, |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  |     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||||
|  |         match self.inner.next() { | ||||||
|  |             Some(Ok(chunk)) => Ok(Async::Ready(Some(chunk))), | ||||||
|  |             Some(Err(e)) => { | ||||||
|  |                 let req_err = match e { | ||||||
|  |                     wait::Waited::TimedOut => ::error::timedout(None), | ||||||
|  |                     wait::Waited::Err(e) => e, | ||||||
|  |                 }; | ||||||
|  |  | ||||||
| impl Read for ReadableBody { |                 Err(req_err) | ||||||
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |             }, | ||||||
|         use std::cmp; |             None => Ok(Async::Ready(None)), | ||||||
|  |  | ||||||
|         loop { |  | ||||||
|             let ret; |  | ||||||
|             match self.state { |  | ||||||
|                 ReadState::Ready(ref mut chunk, ref mut pos) => { |  | ||||||
|                     let chunk_start = *pos; |  | ||||||
|                     let len = cmp::min(buf.len(), chunk.len() - chunk_start); |  | ||||||
|                     let chunk_end = chunk_start + len; |  | ||||||
|                     buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); |  | ||||||
|                     *pos += len; |  | ||||||
|                     if *pos == chunk.len() { |  | ||||||
|                         ret = len; |  | ||||||
|                     } else { |  | ||||||
|                         return Ok(len); |  | ||||||
|                     } |  | ||||||
|                 }, |  | ||||||
|                 ReadState::NotReady => { |  | ||||||
|                     match self.stream.next() { |  | ||||||
|                         Some(Ok(chunk)) => { |  | ||||||
|                             self.state = ReadState::Ready(chunk, 0); |  | ||||||
|                             continue; |  | ||||||
|                         }, |  | ||||||
|                         Some(Err(e)) => { |  | ||||||
|                             let req_err = match e { |  | ||||||
|                                 wait::Waited::TimedOut => ::error::timedout(None), |  | ||||||
|                                 wait::Waited::Err(e) => e, |  | ||||||
|                             }; |  | ||||||
|                             return Err(::error::into_io(req_err)); |  | ||||||
|                         }, |  | ||||||
|                         None => { |  | ||||||
|                             self.state = ReadState::Eof; |  | ||||||
|                             return Ok(0); |  | ||||||
|                         }, |  | ||||||
|                     } |  | ||||||
|                 }, |  | ||||||
|                 ReadState::Eof => return Ok(0), |  | ||||||
|             } |  | ||||||
|             self.state = ReadState::NotReady; |  | ||||||
|             return Ok(ret); |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
| enum Decoder { |  | ||||||
|     /// A `PlainText` decoder just returns the response content as is. |  | ||||||
|     PlainText(ReadableBody), |  | ||||||
|     /// A `Gzip` decoder will uncompress the gziped response content before returning it. |  | ||||||
|     Gzip(gzip::Decoder<Peeked>), |  | ||||||
|     /// An error occured reading the Gzip header, so return that error |  | ||||||
|     /// when the user tries to read on the `Response`. |  | ||||||
|     Errored(Option<io::Error>), |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Decoder { |  | ||||||
|     /// Constructs a Decoder from a hyper request. |  | ||||||
|     /// |  | ||||||
|     /// A decoder is just a wrapper around the hyper request that knows |  | ||||||
|     /// how to decode the content body of the request. |  | ||||||
|     /// |  | ||||||
|     /// Uses the correct variant by inspecting the Content-Encoding header. |  | ||||||
|     fn new(res: &mut async_impl::Response, check_gzip: bool, timeout: Option<Duration>) -> Self { |  | ||||||
|         let body = async_impl::body::take(res.body_mut()); |  | ||||||
|         let body = ReadableBody { |  | ||||||
|             state: ReadState::NotReady, |  | ||||||
|             stream: wait::stream(body, timeout), |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         if !check_gzip { |  | ||||||
|             return Decoder::PlainText(body); |  | ||||||
|         } |  | ||||||
|         let content_encoding_gzip: bool; |  | ||||||
|         let mut is_gzip = { |  | ||||||
|             content_encoding_gzip = res.headers() |  | ||||||
|                 .get::<ContentEncoding>() |  | ||||||
|                 .map_or(false, |encs| encs.contains(&Encoding::Gzip)); |  | ||||||
|             content_encoding_gzip || |  | ||||||
|             res.headers() |  | ||||||
|                 .get::<TransferEncoding>() |  | ||||||
|                 .map_or(false, |encs| encs.contains(&Encoding::Gzip)) |  | ||||||
|         }; |  | ||||||
|         if is_gzip { |  | ||||||
|             if let Some(content_length) = res.headers().get::<ContentLength>() { |  | ||||||
|                 if content_length.0 == 0 { |  | ||||||
|                     warn!("GZipped response with content-length of 0"); |  | ||||||
|                     is_gzip = false; |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         if content_encoding_gzip { |  | ||||||
|             res.headers_mut().remove::<ContentEncoding>(); |  | ||||||
|             res.headers_mut().remove::<ContentLength>(); |  | ||||||
|         } |  | ||||||
|         if is_gzip { |  | ||||||
|             new_gzip(body) |  | ||||||
|         } else { |  | ||||||
|             Decoder::PlainText(body) |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| fn new_gzip(mut body: ReadableBody) -> Decoder { |  | ||||||
|     // libflate does a read_exact([0; 2]), so its impossible to tell |  | ||||||
|     // if the stream was empty, or truly had an UnexpectedEof. |  | ||||||
|     // Therefore, we need to peek a byte to make check for EOF first. |  | ||||||
|     let mut peek = [0]; |  | ||||||
|     match body.read(&mut peek) { |  | ||||||
|         Ok(0) => return Decoder::PlainText(body), |  | ||||||
|         Ok(n) => debug_assert_eq!(n, 1), |  | ||||||
|         Err(e) => return Decoder::Errored(Some(e)), |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     let reader = Peeked { |  | ||||||
|         peeked: Some(peek[0]), |  | ||||||
|         inner: body, |  | ||||||
|     }; |  | ||||||
|     match gzip::Decoder::new(reader) { |  | ||||||
|         Ok(gzip) => Decoder::Gzip(gzip), |  | ||||||
|         Err(e) => Decoder::Errored(Some(e)), |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| struct Peeked { |  | ||||||
|     peeked: Option<u8>, |  | ||||||
|     inner: ReadableBody, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Read for Peeked { |  | ||||||
|     #[inline] |  | ||||||
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |  | ||||||
|         if buf.is_empty() { |  | ||||||
|             return Ok(0); |  | ||||||
|         } |  | ||||||
|         if let Some(byte) = self.peeked.take() { |  | ||||||
|             buf[0] = byte; |  | ||||||
|             Ok(1) |  | ||||||
|         } else { |  | ||||||
|             self.inner.read(buf) |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Read for Decoder { |  | ||||||
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { |  | ||||||
|         match *self { |  | ||||||
|             Decoder::PlainText(ref mut body) => body.read(buf), |  | ||||||
|             Decoder::Gzip(ref mut decoder) => decoder.read(buf), |  | ||||||
|             Decoder::Errored(ref mut err) => { |  | ||||||
|                 Err(err.take().unwrap_or_else(previously_errored)) |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[inline] |  | ||||||
| fn previously_errored() -> io::Error { |  | ||||||
|     io::Error::new(io::ErrorKind::Other, "permanently errored") |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  |  | ||||||
| // pub(crate) | // pub(crate) | ||||||
|  |  | ||||||
| pub fn new(mut res: async_impl::Response, gzip: bool, timeout: Option<Duration>, thread: KeepCoreThreadAlive) -> Response { | pub fn new(mut res: async_impl::Response, timeout: Option<Duration>, thread: KeepCoreThreadAlive) -> Response { | ||||||
|  |     let body = mem::replace(res.body_mut(), async_impl::Decoder::empty()); | ||||||
|  |     let body = async_impl::ReadableChunks::new(WaitBody { | ||||||
|  |         inner: wait::stream(body, timeout) | ||||||
|  |     }); | ||||||
|  |  | ||||||
|     let decoder = Decoder::new(&mut res, gzip, timeout); |  | ||||||
|     Response { |     Response { | ||||||
|         body: decoder, |  | ||||||
|         inner: res, |         inner: res, | ||||||
|  |         body: body, | ||||||
|         _thread_handle: thread, |         _thread_handle: thread, | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										82
									
								
								tests/async.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										82
									
								
								tests/async.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,82 @@ | |||||||
|  | #![cfg(feature="unstable")] | ||||||
|  |  | ||||||
|  | extern crate futures; | ||||||
|  | extern crate tokio_core; | ||||||
|  | extern crate tokio_io; | ||||||
|  | extern crate reqwest; | ||||||
|  | extern crate libflate; | ||||||
|  |  | ||||||
|  | #[macro_use] | ||||||
|  | mod support; | ||||||
|  |  | ||||||
|  | use std::mem; | ||||||
|  | use reqwest::unstable::async::{Client, Decoder}; | ||||||
|  | use futures::{Future, Stream}; | ||||||
|  | use tokio_core::reactor::Core; | ||||||
|  | use std::io::Write; | ||||||
|  | use std::time::Duration; | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn async_test_gzip_response() { | ||||||
|  |     test_gzip(10_000, 4096); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn async_test_gzip_single_byte_chunks() { | ||||||
|  |     test_gzip(10, 1); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | fn test_gzip(response_size: usize, chunk_size: usize) { | ||||||
|  |     let content: String = (0..response_size).into_iter().map(|i| format!("test {}", i)).collect(); | ||||||
|  |     let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap(); | ||||||
|  |     match encoder.write(content.as_bytes()) { | ||||||
|  |         Ok(n) => assert!(n > 0, "Failed to write to encoder."), | ||||||
|  |         _ => panic!("Failed to gzip encode string."), | ||||||
|  |     }; | ||||||
|  |  | ||||||
|  |     let gzipped_content = encoder.finish().into_result().unwrap(); | ||||||
|  |  | ||||||
|  |     let mut response = format!("\ | ||||||
|  |             HTTP/1.1 200 OK\r\n\ | ||||||
|  |             Server: test-accept\r\n\ | ||||||
|  |             Content-Encoding: gzip\r\n\ | ||||||
|  |             Content-Length: {}\r\n\ | ||||||
|  |             \r\n", &gzipped_content.len()) | ||||||
|  |         .into_bytes(); | ||||||
|  |     response.extend(&gzipped_content); | ||||||
|  |  | ||||||
|  |     let server = server! { | ||||||
|  |         request: b"\ | ||||||
|  |             GET /gzip HTTP/1.1\r\n\ | ||||||
|  |             Host: $HOST\r\n\ | ||||||
|  |             User-Agent: $USERAGENT\r\n\ | ||||||
|  |             Accept: */*\r\n\ | ||||||
|  |             Accept-Encoding: gzip\r\n\ | ||||||
|  |             \r\n\ | ||||||
|  |             ", | ||||||
|  |         chunk_size: chunk_size, | ||||||
|  |         write_timeout: Duration::from_millis(10), | ||||||
|  |         response: response | ||||||
|  |     }; | ||||||
|  |  | ||||||
|  |     let mut core = Core::new().unwrap(); | ||||||
|  |  | ||||||
|  |     let client = Client::new(&core.handle()).unwrap(); | ||||||
|  |  | ||||||
|  |     let res_future = client.get(&format!("http://{}/gzip", server.addr())) | ||||||
|  |         .unwrap() | ||||||
|  |         .send() | ||||||
|  |         .and_then(|mut res| { | ||||||
|  |             let body = mem::replace(res.body_mut(), Decoder::empty()); | ||||||
|  |             body.concat2() | ||||||
|  |         }) | ||||||
|  |         .and_then(|buf| { | ||||||
|  |             let body = ::std::str::from_utf8(&buf).unwrap(); | ||||||
|  |  | ||||||
|  |             assert_eq!(body, &content); | ||||||
|  |  | ||||||
|  |             Ok(()) | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     core.run(res_future).unwrap(); | ||||||
|  | } | ||||||
| @@ -4,12 +4,15 @@ extern crate libflate; | |||||||
| #[macro_use] | #[macro_use] | ||||||
| mod support; | mod support; | ||||||
|  |  | ||||||
|  | use std::time::Duration; | ||||||
| use std::io::{Read, Write}; | use std::io::{Read, Write}; | ||||||
|  |  | ||||||
| #[test] | #[test] | ||||||
| fn test_gzip_response() { | fn test_gzip_response() { | ||||||
|  |     let content: String = (0..50).into_iter().map(|i| format!("test {}", i)).collect(); | ||||||
|  |     let chunk_size = content.len() / 3; | ||||||
|     let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap(); |     let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap(); | ||||||
|     match encoder.write(b"test request") { |     match encoder.write(content.as_bytes()) { | ||||||
|         Ok(n) => assert!(n > 0, "Failed to write to encoder."), |         Ok(n) => assert!(n > 0, "Failed to write to encoder."), | ||||||
|         _ => panic!("Failed to gzip encode string."), |         _ => panic!("Failed to gzip encode string."), | ||||||
|     }; |     }; | ||||||
| @@ -34,6 +37,8 @@ fn test_gzip_response() { | |||||||
|             Accept-Encoding: gzip\r\n\ |             Accept-Encoding: gzip\r\n\ | ||||||
|             \r\n\ |             \r\n\ | ||||||
|             ", |             ", | ||||||
|  |         chunk_size: chunk_size, | ||||||
|  |         write_timeout: Duration::from_millis(10), | ||||||
|         response: response |         response: response | ||||||
|     }; |     }; | ||||||
|     let mut res = reqwest::get(&format!("http://{}/gzip", server.addr())).unwrap(); |     let mut res = reqwest::get(&format!("http://{}/gzip", server.addr())).unwrap(); | ||||||
| @@ -41,7 +46,7 @@ fn test_gzip_response() { | |||||||
|     let mut body = String::new(); |     let mut body = String::new(); | ||||||
|     res.read_to_string(&mut body).unwrap(); |     res.read_to_string(&mut body).unwrap(); | ||||||
|  |  | ||||||
|     assert_eq!(body, "test request"); |     assert_eq!(body, content); | ||||||
| } | } | ||||||
|  |  | ||||||
| #[test] | #[test] | ||||||
|   | |||||||
| @@ -23,6 +23,7 @@ pub struct Txn { | |||||||
|     pub read_timeout: Option<Duration>, |     pub read_timeout: Option<Duration>, | ||||||
|     pub response_timeout: Option<Duration>, |     pub response_timeout: Option<Duration>, | ||||||
|     pub write_timeout: Option<Duration>, |     pub write_timeout: Option<Duration>, | ||||||
|  |     pub chunk_size: Option<usize>, | ||||||
| } | } | ||||||
|  |  | ||||||
| static DEFAULT_USER_AGENT: &'static str = | static DEFAULT_USER_AGENT: &'static str = | ||||||
| @@ -63,10 +64,21 @@ pub fn spawn(txns: Vec<Txn>) -> Server { | |||||||
|             } |             } | ||||||
|  |  | ||||||
|             if let Some(dur) = txn.write_timeout { |             if let Some(dur) = txn.write_timeout { | ||||||
|                 let headers_end = ::std::str::from_utf8(&reply).unwrap().find("\r\n\r\n").unwrap() + 4; |                 let headers_end = b"\r\n\r\n"; | ||||||
|  |                 let headers_end = reply.windows(headers_end.len()).position(|w| w == headers_end).unwrap() + 4; | ||||||
|                 socket.write_all(&reply[..headers_end]).unwrap(); |                 socket.write_all(&reply[..headers_end]).unwrap(); | ||||||
|                 thread::park_timeout(dur); |  | ||||||
|                 socket.write_all(&reply[headers_end..]).unwrap(); |                 let body = &reply[headers_end..]; | ||||||
|  |  | ||||||
|  |                 if let Some(chunk_size) = txn.chunk_size { | ||||||
|  |                     for content in body.chunks(chunk_size) { | ||||||
|  |                         thread::park_timeout(dur); | ||||||
|  |                         socket.write_all(&content).unwrap(); | ||||||
|  |                     } | ||||||
|  |                 } else { | ||||||
|  |                     thread::park_timeout(dur); | ||||||
|  |                     socket.write_all(&body).unwrap(); | ||||||
|  |                 } | ||||||
|             } else { |             } else { | ||||||
|                 socket.write_all(&reply).unwrap(); |                 socket.write_all(&reply).unwrap(); | ||||||
|             } |             } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user