diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index 1ec034e..7aa6848 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -142,14 +142,6 @@ impl Decoder { Decoder::plain_text(body) } } - - - pub(crate) fn content_length(&self) -> Option { - match self.inner { - Inner::PlainText(ref body) => body.content_length(), - _ => None, - } - } } impl Stream for Decoder { diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index fc5ea8c..0277208 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -271,7 +271,7 @@ impl> From> for Response { } /// A JSON object. -pub struct Json { +struct Json { concat: Concat2, _marker: PhantomData, } @@ -294,7 +294,7 @@ impl fmt::Debug for Json { } #[derive(Debug)] -pub struct Text { +struct Text { concat: Concat2, encoding: &'static Encoding, } diff --git a/src/response.rs b/src/response.rs index 843c12e..8ffb43f 100644 --- a/src/response.rs +++ b/src/response.rs @@ -3,14 +3,10 @@ use std::fmt; use std::io::{self, Read}; use std::net::SocketAddr; use std::time::Duration; -use std::borrow::Cow; -use encoding_rs::{Encoding, UTF_8}; use futures::{Async, Poll, Stream}; use http; -use mime::Mime; use serde::de::DeserializeOwned; -use serde_json; use cookie; use client::KeepCoreThreadAlive; @@ -20,8 +16,8 @@ use {async_impl, StatusCode, Url, Version, wait}; /// A Response to a submitted `Request`. pub struct Response { inner: async_impl::Response, - body: async_impl::ReadableChunks, - content_length: Option, + body: Option>, + timeout: Option, _thread_handle: KeepCoreThreadAlive, } @@ -32,17 +28,11 @@ impl fmt::Debug for Response { } impl Response { - pub(crate) fn new(mut res: async_impl::Response, timeout: Option, thread: KeepCoreThreadAlive) -> Response { - let body = mem::replace(res.body_mut(), async_impl::Decoder::empty()); - let len = body.content_length(); - let body = async_impl::ReadableChunks::new(WaitBody { - inner: wait::stream(body, timeout) - }); - + pub(crate) fn new(res: async_impl::Response, timeout: Option, thread: KeepCoreThreadAlive) -> Response { Response { inner: res, - body: body, - content_length: len, + body: None, + timeout, _thread_handle: thread, } } @@ -212,21 +202,13 @@ impl Response { /// [`serde_json::from_reader`]: https://docs.serde.rs/serde_json/fn.from_reader.html #[inline] pub fn json(&mut self) -> ::Result { - // There's 2 ways we could implement this: - // - // 1. Just using from_reader(self), making use of our blocking read adapter - // 2. Just use self.inner.json().wait() - // - // Doing 1 is pretty easy, but it means we have the `serde_json` code - // in more than one place, doing basically the same thing. - // - // Doing 2 would mean `serde_json` is only in one place, but we'd - // need to update the sync Response to lazily make a blocking read - // adapter, so that our `inner` could possibly still have the original - // body. - // - // Went for easier for now, just to get it working. - serde_json::from_reader(self).map_err(::error::from) + wait::timeout(self.inner.json(), self.timeout).map_err(|e| { + match e { + wait::Waited::TimedOut => ::error::timedout(None), + wait::Waited::Executor(e) => ::error::from(e), + wait::Waited::Inner(e) => e, + } + }) } /// Get the response text. @@ -278,38 +260,13 @@ impl Response { /// This consumes the body. Trying to read more, or use of `response.json()` /// will return empty values. pub fn text_with_charset(&mut self, default_encoding: &str) -> ::Result { - let len = self.content_length.unwrap_or(0); - let mut content = Vec::with_capacity(len as usize); - self.read_to_end(&mut content).map_err(::error::from)?; - let content_type = self.headers().get(::header::CONTENT_TYPE) - .and_then(|value| { - value.to_str().ok() - }) - .and_then(|value| { - value.parse::().ok() - }); - let encoding_name = content_type - .as_ref() - .and_then(|mime| { - mime - .get_param("charset") - .map(|charset| charset.as_str()) - }) - .unwrap_or(default_encoding); - let encoding = Encoding::for_label(encoding_name.as_bytes()).unwrap_or(UTF_8); - // a block because of borrow checker - { - let (text, _, _) = encoding.decode(&content); - match text { - Cow::Owned(s) => return Ok(s), - _ => (), + wait::timeout(self.inner.text_with_charset(default_encoding), self.timeout).map_err(|e| { + match e { + wait::Waited::TimedOut => ::error::timedout(None), + wait::Waited::Executor(e) => ::error::from(e), + wait::Waited::Inner(e) => e, } - } - unsafe { - // decoding returned Cow::Borrowed, meaning these bytes - // are already valid utf8 - Ok(String::from_utf8_unchecked(content)) - } + }) } /// Copy the response body into a writer. @@ -357,12 +314,12 @@ impl Response { /// ``` #[inline] pub fn error_for_status(self) -> ::Result { - let Response { body, content_length, inner, _thread_handle } = self; + let Response { body, inner, timeout, _thread_handle } = self; inner.error_for_status().map(move |inner| { Response { inner, body, - content_length, + timeout, _thread_handle, } }) @@ -393,7 +350,17 @@ impl Response { impl Read for Response { #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.body.read(buf) + if self.body.is_none() { + let body = mem::replace(self.inner.body_mut(), async_impl::Decoder::empty()); + let body = async_impl::ReadableChunks::new(WaitBody { + inner: wait::stream(body, self.timeout) + }); + self.body = Some(body); + } + let mut body = self.body.take().unwrap(); + let bytes = body.read(buf); + self.body = Some(body); + bytes } }