Propagate async timeout to response body (#503)
This commit is contained in:
		| @@ -1,8 +1,9 @@ | |||||||
| use std::{fmt, mem}; | use std::fmt; | ||||||
|  |  | ||||||
| use futures::{Stream, Poll, Async}; | use futures::{Future, Stream, Poll, Async}; | ||||||
| use bytes::{Buf, Bytes}; | use bytes::{Buf, Bytes}; | ||||||
| use hyper::body::Payload; | use hyper::body::Payload; | ||||||
|  | use tokio::timer::Delay; | ||||||
|  |  | ||||||
| /// An asynchronous `Stream`. | /// An asynchronous `Stream`. | ||||||
| pub struct Body { | pub struct Body { | ||||||
| @@ -11,48 +12,43 @@ pub struct Body { | |||||||
|  |  | ||||||
| enum Inner { | enum Inner { | ||||||
|     Reusable(Bytes), |     Reusable(Bytes), | ||||||
|     Hyper(::hyper::Body), |     Hyper { | ||||||
|  |         body: ::hyper::Body, | ||||||
|  |         timeout: Option<Delay>, | ||||||
|  |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Body { | impl Body { | ||||||
|     fn poll_inner(&mut self) -> &mut ::hyper::Body { |  | ||||||
|         match self.inner { |  | ||||||
|             Inner::Hyper(ref mut body) => return body, |  | ||||||
|             Inner::Reusable(_) => (), |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         let bytes = match mem::replace(&mut self.inner, Inner::Reusable(Bytes::new())) { |  | ||||||
|             Inner::Reusable(bytes) => bytes, |  | ||||||
|             Inner::Hyper(_) => unreachable!(), |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         self.inner = Inner::Hyper(bytes.into()); |  | ||||||
|  |  | ||||||
|         match self.inner { |  | ||||||
|             Inner::Hyper(ref mut body) => return body, |  | ||||||
|             Inner::Reusable(_) => unreachable!(), |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub(crate) fn content_length(&self) -> Option<u64> { |     pub(crate) fn content_length(&self) -> Option<u64> { | ||||||
|         match self.inner { |         match self.inner { | ||||||
|             Inner::Reusable(ref bytes) => Some(bytes.len() as u64), |             Inner::Reusable(ref bytes) => Some(bytes.len() as u64), | ||||||
|             Inner::Hyper(ref body) => body.content_length(), |             Inner::Hyper { ref body, .. } => body.content_length(), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     #[inline] | ||||||
|  |     pub(crate) fn response(body: ::hyper::Body, timeout: Option<Delay>) -> Body { | ||||||
|  |         Body { | ||||||
|  |             inner: Inner::Hyper { | ||||||
|  |                 body, | ||||||
|  |                 timeout, | ||||||
|  |             }, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[inline] |     #[inline] | ||||||
|     pub(crate) fn wrap(body: ::hyper::Body) -> Body { |     pub(crate) fn wrap(body: ::hyper::Body) -> Body { | ||||||
|         Body { |         Body { | ||||||
|             inner: Inner::Hyper(body), |             inner: Inner::Hyper { | ||||||
|  |                 body, | ||||||
|  |                 timeout: None, | ||||||
|  |             }, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[inline] |     #[inline] | ||||||
|     pub(crate) fn empty() -> Body { |     pub(crate) fn empty() -> Body { | ||||||
|         Body { |         Body::wrap(::hyper::Body::empty()) | ||||||
|             inner: Inner::Hyper(::hyper::Body::empty()), |  | ||||||
|         } |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     #[inline] |     #[inline] | ||||||
| @@ -66,7 +62,10 @@ impl Body { | |||||||
|     pub(crate) fn into_hyper(self) -> (Option<Bytes>, ::hyper::Body) { |     pub(crate) fn into_hyper(self) -> (Option<Bytes>, ::hyper::Body) { | ||||||
|         match self.inner { |         match self.inner { | ||||||
|             Inner::Reusable(chunk) => (Some(chunk.clone()), chunk.into()), |             Inner::Reusable(chunk) => (Some(chunk.clone()), chunk.into()), | ||||||
|             Inner::Hyper(b) => (None, b), |             Inner::Hyper { body, timeout } => { | ||||||
|  |                 debug_assert!(timeout.is_none()); | ||||||
|  |                 (None, body) | ||||||
|  |             }, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -77,12 +76,29 @@ impl Stream for Body { | |||||||
|  |  | ||||||
|     #[inline] |     #[inline] | ||||||
|     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||||
|         match try_!(self.poll_inner().poll()) { |         let opt = match self.inner { | ||||||
|             Async::Ready(opt) => Ok(Async::Ready(opt.map(|chunk| Chunk { |             Inner::Hyper { ref mut body, ref mut timeout } => { | ||||||
|                 inner: chunk, |                 if let Some(ref mut timeout) = timeout { | ||||||
|             }))), |                     if let Async::Ready(()) = try_!(timeout.poll()) { | ||||||
|             Async::NotReady => Ok(Async::NotReady), |                         return Err(::error::timedout(None)); | ||||||
|         } |                     } | ||||||
|  |                 } | ||||||
|  |                 try_ready!(body.poll_data().map_err(::error::from)) | ||||||
|  |             }, | ||||||
|  |             Inner::Reusable(ref mut bytes) => { | ||||||
|  |                 return if bytes.is_empty() { | ||||||
|  |                     Ok(Async::Ready(None)) | ||||||
|  |                 } else { | ||||||
|  |                     let chunk = Chunk::from_chunk(bytes.clone()); | ||||||
|  |                     *bytes = Bytes::new(); | ||||||
|  |                     Ok(Async::Ready(Some(chunk))) | ||||||
|  |                 }; | ||||||
|  |             }, | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         Ok(Async::Ready(opt.map(|chunk| Chunk { | ||||||
|  |             inner: chunk, | ||||||
|  |         }))) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -346,7 +346,7 @@ impl ClientBuilder { | |||||||
|     /// Enables a request timeout. |     /// Enables a request timeout. | ||||||
|     /// |     /// | ||||||
|     /// The timeout is applied from the when the request starts connecting |     /// The timeout is applied from the when the request starts connecting | ||||||
|     /// until the response headers are received. Bodies are not affected. |     /// until the response body has finished. | ||||||
|     /// |     /// | ||||||
|     /// Default is no timeout. |     /// Default is no timeout. | ||||||
|     pub fn timeout(mut self, timeout: Duration) -> ClientBuilder { |     pub fn timeout(mut self, timeout: Duration) -> ClientBuilder { | ||||||
| @@ -839,7 +839,7 @@ impl Future for PendingRequest { | |||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|             let res = Response::new(res, self.url.clone(), self.client.gzip); |             let res = Response::new(res, self.url.clone(), self.client.gzip, self.timeout.take()); | ||||||
|             return Ok(Async::Ready(res)); |             return Ok(Async::Ready(res)); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -5,13 +5,15 @@ use std::net::SocketAddr; | |||||||
|  |  | ||||||
| use futures::{Async, Future, Poll, Stream}; | use futures::{Async, Future, Poll, Stream}; | ||||||
| use futures::stream::Concat2; | use futures::stream::Concat2; | ||||||
|  | use http; | ||||||
| use hyper::{HeaderMap, StatusCode, Version}; | use hyper::{HeaderMap, StatusCode, Version}; | ||||||
| use hyper::client::connect::HttpInfo; | use hyper::client::connect::HttpInfo; | ||||||
| use hyper::header::{CONTENT_LENGTH}; | use hyper::header::{CONTENT_LENGTH}; | ||||||
|  | use tokio::timer::Delay; | ||||||
| use serde::de::DeserializeOwned; | use serde::de::DeserializeOwned; | ||||||
| use serde_json; | use serde_json; | ||||||
| use url::Url; | use url::Url; | ||||||
| use http; |  | ||||||
|  |  | ||||||
| use cookie; | use cookie; | ||||||
| use super::Decoder; | use super::Decoder; | ||||||
| @@ -31,14 +33,14 @@ pub struct Response { | |||||||
| } | } | ||||||
|  |  | ||||||
| impl Response { | impl Response { | ||||||
|     pub(super) fn new(res: ::hyper::Response<::hyper::Body>, url: Url, gzip: bool) -> Response { |     pub(super) fn new(res: ::hyper::Response<::hyper::Body>, url: Url, gzip: bool, timeout: Option<Delay>) -> Response { | ||||||
|         let (parts, body) = res.into_parts(); |         let (parts, body) = res.into_parts(); | ||||||
|         let status = parts.status; |         let status = parts.status; | ||||||
|         let version = parts.version; |         let version = parts.version; | ||||||
|         let extensions = parts.extensions; |         let extensions = parts.extensions; | ||||||
|  |  | ||||||
|         let mut headers = parts.headers; |         let mut headers = parts.headers; | ||||||
|         let decoder = Decoder::detect(&mut headers, Body::wrap(body), gzip); |         let decoder = Decoder::detect(&mut headers, Body::response(body, timeout), gzip); | ||||||
|  |  | ||||||
|         debug!("Response: '{}' for {}", status, url); |         debug!("Response: '{}' for {}", status, url); | ||||||
|         Response { |         Response { | ||||||
|   | |||||||
| @@ -138,6 +138,46 @@ fn request_timeout() { | |||||||
|     assert_eq!(err.url().map(|u| u.as_str()), Some(url.as_str())); |     assert_eq!(err.url().map(|u| u.as_str()), Some(url.as_str())); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn response_timeout() { | ||||||
|  |     let _ = env_logger::try_init(); | ||||||
|  |  | ||||||
|  |     let server = server! { | ||||||
|  |         request: b"\ | ||||||
|  |             GET /slow HTTP/1.1\r\n\ | ||||||
|  |             user-agent: $USERAGENT\r\n\ | ||||||
|  |             accept: */*\r\n\ | ||||||
|  |             accept-encoding: gzip\r\n\ | ||||||
|  |             host: $HOST\r\n\ | ||||||
|  |             \r\n\ | ||||||
|  |             ", | ||||||
|  |         response: b"\ | ||||||
|  |             HTTP/1.1 200 OK\r\n\ | ||||||
|  |             Content-Length: 5\r\n\ | ||||||
|  |             \r\n\ | ||||||
|  |             Hello\ | ||||||
|  |             ", | ||||||
|  |         write_timeout: Duration::from_secs(2) | ||||||
|  |     }; | ||||||
|  |  | ||||||
|  |     let mut rt = Runtime::new().expect("new rt"); | ||||||
|  |  | ||||||
|  |     let client = Client::builder() | ||||||
|  |         .timeout(Duration::from_millis(500)) | ||||||
|  |         .build() | ||||||
|  |         .unwrap(); | ||||||
|  |  | ||||||
|  |     let url = format!("http://{}/slow", server.addr()); | ||||||
|  |     let fut = client | ||||||
|  |         .get(&url) | ||||||
|  |         .send() | ||||||
|  |         .and_then(|res| res.into_body().concat2()); | ||||||
|  |  | ||||||
|  |     let err = rt.block_on(fut).unwrap_err(); | ||||||
|  |  | ||||||
|  |     assert!(err.is_timeout()); | ||||||
|  | } | ||||||
|  |  | ||||||
| fn gzip_case(response_size: usize, chunk_size: usize) { | fn gzip_case(response_size: usize, chunk_size: usize) { | ||||||
|     let content: String = (0..response_size).into_iter().map(|i| format!("test {}", i)).collect(); |     let content: String = (0..response_size).into_iter().map(|i| format!("test {}", i)).collect(); | ||||||
|     let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap(); |     let mut encoder = ::libflate::gzip::Encoder::new(Vec::new()).unwrap(); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user