| @@ -772,7 +772,7 @@ impl Client { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(super) fn execute_request(&self, req: Request) -> Pending { |     pub(super) fn execute_request(&self, req: Request) -> Pending { | ||||||
|         let (method, url, mut headers, body) = req.pieces(); |         let (method, url, mut headers, body, timeout) = req.pieces(); | ||||||
|  |  | ||||||
|         // insert default headers in the request headers |         // insert default headers in the request headers | ||||||
|         // without overwriting already appended headers. |         // without overwriting already appended headers. | ||||||
| @@ -816,15 +816,15 @@ impl Client { | |||||||
|             .body(body.into_stream()) |             .body(body.into_stream()) | ||||||
|             .expect("valid request parts"); |             .expect("valid request parts"); | ||||||
|  |  | ||||||
|  |         let timeout = timeout | ||||||
|  |             .or(self.inner.request_timeout) | ||||||
|  |             .map(|dur| tokio::time::delay_for(dur)); | ||||||
|  |  | ||||||
|  |  | ||||||
|         *req.headers_mut() = headers.clone(); |         *req.headers_mut() = headers.clone(); | ||||||
|  |  | ||||||
|         let in_flight = self.inner.hyper.request(req); |         let in_flight = self.inner.hyper.request(req); | ||||||
|  |  | ||||||
|         let timeout = self |  | ||||||
|             .inner |  | ||||||
|             .request_timeout |  | ||||||
|             .map(|dur| tokio::time::delay_for(dur)); |  | ||||||
|  |  | ||||||
|         Pending { |         Pending { | ||||||
|             inner: PendingInner::Request(PendingRequest { |             inner: PendingInner::Request(PendingRequest { | ||||||
|                 method, |                 method, | ||||||
|   | |||||||
| @@ -2,6 +2,7 @@ use std::convert::TryFrom; | |||||||
| use std::fmt; | use std::fmt; | ||||||
| use std::future::Future; | use std::future::Future; | ||||||
| use std::io::Write; | use std::io::Write; | ||||||
|  | use std::time::Duration; | ||||||
|  |  | ||||||
| use base64; | use base64; | ||||||
| use base64::write::EncoderWriter as Base64Encoder; | use base64::write::EncoderWriter as Base64Encoder; | ||||||
| @@ -23,6 +24,7 @@ pub struct Request { | |||||||
|     url: Url, |     url: Url, | ||||||
|     headers: HeaderMap, |     headers: HeaderMap, | ||||||
|     body: Option<Body>, |     body: Option<Body>, | ||||||
|  |     timeout: Option<Duration>, | ||||||
| } | } | ||||||
|  |  | ||||||
| /// A builder to construct the properties of a `Request`. | /// A builder to construct the properties of a `Request`. | ||||||
| @@ -40,6 +42,7 @@ impl Request { | |||||||
|             url, |             url, | ||||||
|             headers: HeaderMap::new(), |             headers: HeaderMap::new(), | ||||||
|             body: None, |             body: None, | ||||||
|  |             timeout: None | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -91,6 +94,18 @@ impl Request { | |||||||
|         &mut self.body |         &mut self.body | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Get the timeout. | ||||||
|  |     #[inline] | ||||||
|  |     pub fn timeout(&self) -> Option<&Duration> { | ||||||
|  |         self.timeout.as_ref() | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// Get a mutable reference to the timeout. | ||||||
|  |     #[inline] | ||||||
|  |     pub fn timeout_mut(&mut self) -> &mut Option<Duration> { | ||||||
|  |         &mut self.timeout | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Attempt to clone the request. |     /// Attempt to clone the request. | ||||||
|     /// |     /// | ||||||
|     /// `None` is returned if the request can not be cloned, i.e. if the body is a stream. |     /// `None` is returned if the request can not be cloned, i.e. if the body is a stream. | ||||||
| @@ -100,13 +115,14 @@ impl Request { | |||||||
|             None => None, |             None => None, | ||||||
|         }; |         }; | ||||||
|         let mut req = Request::new(self.method().clone(), self.url().clone()); |         let mut req = Request::new(self.method().clone(), self.url().clone()); | ||||||
|  |         *req.timeout_mut() = self.timeout().cloned(); | ||||||
|         *req.headers_mut() = self.headers().clone(); |         *req.headers_mut() = self.headers().clone(); | ||||||
|         req.body = body; |         req.body = body; | ||||||
|         Some(req) |         Some(req) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(super) fn pieces(self) -> (Method, Url, HeaderMap, Option<Body>) { |     pub(super) fn pieces(self) -> (Method, Url, HeaderMap, Option<Body>, Option<Duration>) { | ||||||
|         (self.method, self.url, self.headers, self.body) |         (self.method, self.url, self.headers, self.body, self.timeout) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -199,6 +215,18 @@ impl RequestBuilder { | |||||||
|         self |         self | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Enables a request timeout. | ||||||
|  |     /// | ||||||
|  |     /// The timeout is applied from the when the request starts connecting | ||||||
|  |     /// until the response body has finished. It affects only this request | ||||||
|  |     /// and overrides the timeout configured using `ClientBuilder::timeout()`. | ||||||
|  |     pub fn timeout(mut self, timeout: Duration) -> RequestBuilder { | ||||||
|  |         if let Ok(ref mut req) = self.request { | ||||||
|  |             *req.timeout_mut() = Some(timeout); | ||||||
|  |         } | ||||||
|  |         self | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Sends a multipart/form-data body. |     /// Sends a multipart/form-data body. | ||||||
|     /// |     /// | ||||||
|     /// ``` |     /// ``` | ||||||
|   | |||||||
| @@ -4,7 +4,7 @@ use support::*; | |||||||
| use std::time::Duration; | use std::time::Duration; | ||||||
|  |  | ||||||
| #[tokio::test] | #[tokio::test] | ||||||
| async fn request_timeout() { | async fn client_timeout() { | ||||||
|     let _ = env_logger::try_init(); |     let _ = env_logger::try_init(); | ||||||
|  |  | ||||||
|     let server = server::http(move |_req| { |     let server = server::http(move |_req| { | ||||||
| @@ -30,6 +30,34 @@ async fn request_timeout() { | |||||||
|     assert_eq!(err.url().map(|u| u.as_str()), Some(url.as_str())); |     assert_eq!(err.url().map(|u| u.as_str()), Some(url.as_str())); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[tokio::test] | ||||||
|  | async fn request_timeout() { | ||||||
|  |     let _ = env_logger::try_init(); | ||||||
|  |  | ||||||
|  |     let server = server::http(move |_req| { | ||||||
|  |         async { | ||||||
|  |             // delay returning the response | ||||||
|  |             tokio::time::delay_for(Duration::from_secs(2)).await; | ||||||
|  |             http::Response::default() | ||||||
|  |         } | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     let client = reqwest::Client::builder().build().unwrap(); | ||||||
|  |  | ||||||
|  |     let url = format!("http://{}/slow", server.addr()); | ||||||
|  |  | ||||||
|  |     let res = client | ||||||
|  |         .get(&url) | ||||||
|  |         .timeout(Duration::from_millis(500)) | ||||||
|  |         .send() | ||||||
|  |         .await; | ||||||
|  |  | ||||||
|  |     let err = res.unwrap_err(); | ||||||
|  |  | ||||||
|  |     assert!(err.is_timeout()); | ||||||
|  |     assert_eq!(err.url().map(|u| u.as_str()), Some(url.as_str())); | ||||||
|  | } | ||||||
|  |  | ||||||
| #[tokio::test] | #[tokio::test] | ||||||
| async fn response_timeout() { | async fn response_timeout() { | ||||||
|     let _ = env_logger::try_init(); |     let _ = env_logger::try_init(); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user