Use executor::enter before blocking a thread in wait
This commit is contained in:
		| @@ -636,9 +636,12 @@ impl ClientHandle { | |||||||
|         let res = match wait::timeout(fut, self.timeout.0) { |         let res = match wait::timeout(fut, self.timeout.0) { | ||||||
|             Ok(res) => res, |             Ok(res) => res, | ||||||
|             Err(wait::Waited::TimedOut) => return Err(::error::timedout(Some(url))), |             Err(wait::Waited::TimedOut) => return Err(::error::timedout(Some(url))), | ||||||
|             Err(wait::Waited::Err(err)) => { |             Err(wait::Waited::Executor(err)) => { | ||||||
|  |                 return Err(::error::from(err).with_url(url)) | ||||||
|  |             }, | ||||||
|  |             Err(wait::Waited::Inner(err)) => { | ||||||
|                 return Err(err.with_url(url)); |                 return Err(err.with_url(url)); | ||||||
|             } |             }, | ||||||
|         }; |         }; | ||||||
|         res.map(|res| { |         res.map(|res| { | ||||||
|             Response::new(res, self.timeout.0, KeepCoreThreadAlive(Some(self.inner.clone()))) |             Response::new(res, self.timeout.0, KeepCoreThreadAlive(Some(self.inner.clone()))) | ||||||
|   | |||||||
							
								
								
									
										17
									
								
								src/error.rs
									
									
									
									
									
								
							
							
						
						
									
										17
									
								
								src/error.rs
									
									
									
									
									
								
							| @@ -2,6 +2,8 @@ use std::error::Error as StdError; | |||||||
| use std::fmt; | use std::fmt; | ||||||
| use std::io; | use std::io; | ||||||
|  |  | ||||||
|  | use tokio_executor::EnterError; | ||||||
|  |  | ||||||
| use {StatusCode, Url}; | use {StatusCode, Url}; | ||||||
|  |  | ||||||
| /// The Errors that may occur when processing a `Request`. | /// The Errors that may occur when processing a `Request`. | ||||||
| @@ -146,6 +148,7 @@ impl Error { | |||||||
|             Kind::Io(ref e) => Some(e), |             Kind::Io(ref e) => Some(e), | ||||||
|             Kind::UrlEncoded(ref e) => Some(e), |             Kind::UrlEncoded(ref e) => Some(e), | ||||||
|             Kind::Json(ref e) => Some(e), |             Kind::Json(ref e) => Some(e), | ||||||
|  |             Kind::Executor(ref e) => Some(e), | ||||||
|             Kind::UrlBadScheme | |             Kind::UrlBadScheme | | ||||||
|             Kind::TooManyRedirects | |             Kind::TooManyRedirects | | ||||||
|             Kind::RedirectLoop | |             Kind::RedirectLoop | | ||||||
| @@ -284,6 +287,7 @@ impl fmt::Display for Error { | |||||||
|             } |             } | ||||||
|             Kind::UnknownProxyScheme => f.write_str("Unknown proxy scheme"), |             Kind::UnknownProxyScheme => f.write_str("Unknown proxy scheme"), | ||||||
|             Kind::Timer => f.write_str("timer unavailable"), |             Kind::Timer => f.write_str("timer unavailable"), | ||||||
|  |             Kind::Executor(ref e) => fmt::Display::fmt(e, f), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -320,6 +324,7 @@ impl StdError for Error { | |||||||
|             } |             } | ||||||
|             Kind::UnknownProxyScheme => "Unknown proxy scheme", |             Kind::UnknownProxyScheme => "Unknown proxy scheme", | ||||||
|             Kind::Timer => "timer unavailable", |             Kind::Timer => "timer unavailable", | ||||||
|  |             Kind::Executor(ref e) => e.description(), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -342,6 +347,7 @@ impl StdError for Error { | |||||||
|             Kind::Io(ref e) => e.cause(), |             Kind::Io(ref e) => e.cause(), | ||||||
|             Kind::UrlEncoded(ref e) => e.cause(), |             Kind::UrlEncoded(ref e) => e.cause(), | ||||||
|             Kind::Json(ref e) => e.cause(), |             Kind::Json(ref e) => e.cause(), | ||||||
|  |             Kind::Executor(ref e) => e.cause(), | ||||||
|             Kind::UrlBadScheme | |             Kind::UrlBadScheme | | ||||||
|             Kind::TooManyRedirects | |             Kind::TooManyRedirects | | ||||||
|             Kind::RedirectLoop | |             Kind::RedirectLoop | | ||||||
| @@ -368,6 +374,7 @@ impl StdError for Error { | |||||||
|             Kind::Io(ref e) => e.source(), |             Kind::Io(ref e) => e.source(), | ||||||
|             Kind::UrlEncoded(ref e) => e.source(), |             Kind::UrlEncoded(ref e) => e.source(), | ||||||
|             Kind::Json(ref e) => e.source(), |             Kind::Json(ref e) => e.source(), | ||||||
|  |             Kind::Executor(ref e) => e.source(), | ||||||
|             Kind::UrlBadScheme | |             Kind::UrlBadScheme | | ||||||
|             Kind::TooManyRedirects | |             Kind::TooManyRedirects | | ||||||
|             Kind::RedirectLoop | |             Kind::RedirectLoop | | ||||||
| @@ -401,6 +408,7 @@ pub(crate) enum Kind { | |||||||
|     Status(StatusCode), |     Status(StatusCode), | ||||||
|     UnknownProxyScheme, |     UnknownProxyScheme, | ||||||
|     Timer, |     Timer, | ||||||
|  |     Executor(EnterError), | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -472,11 +480,18 @@ where T: Into<Kind> { | |||||||
|     fn from(err: ::wait::Waited<T>) -> Kind { |     fn from(err: ::wait::Waited<T>) -> Kind { | ||||||
|         match err { |         match err { | ||||||
|             ::wait::Waited::TimedOut =>  io_timeout().into(), |             ::wait::Waited::TimedOut =>  io_timeout().into(), | ||||||
|             ::wait::Waited::Err(e) => e.into(), |             ::wait::Waited::Executor(e) => e.into(), | ||||||
|  |             ::wait::Waited::Inner(e) => e.into(), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | impl From<EnterError> for Kind { | ||||||
|  |     fn from(err: EnterError) -> Kind { | ||||||
|  |         Kind::Executor(err) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| impl From<::tokio::timer::Error> for Kind { | impl From<::tokio::timer::Error> for Kind { | ||||||
|     fn from(_err: ::tokio::timer::Error) -> Kind { |     fn from(_err: ::tokio::timer::Error) -> Kind { | ||||||
|         Kind::Timer |         Kind::Timer | ||||||
|   | |||||||
| @@ -197,6 +197,7 @@ extern crate serde_json; | |||||||
| extern crate serde_urlencoded; | extern crate serde_urlencoded; | ||||||
| extern crate time; | extern crate time; | ||||||
| extern crate tokio; | extern crate tokio; | ||||||
|  | extern crate tokio_executor; | ||||||
| #[cfg_attr(feature = "default-tls", macro_use)] | #[cfg_attr(feature = "default-tls", macro_use)] | ||||||
| extern crate tokio_io; | extern crate tokio_io; | ||||||
| extern crate tokio_timer; | extern crate tokio_timer; | ||||||
|   | |||||||
| @@ -411,7 +411,8 @@ impl Stream for WaitBody { | |||||||
|             Some(Err(e)) => { |             Some(Err(e)) => { | ||||||
|                 let req_err = match e { |                 let req_err = match e { | ||||||
|                     wait::Waited::TimedOut => ::error::timedout(None), |                     wait::Waited::TimedOut => ::error::timedout(None), | ||||||
|                     wait::Waited::Err(e) => e, |                     wait::Waited::Executor(e) => ::error::from(e), | ||||||
|  |                     wait::Waited::Inner(e) => e, | ||||||
|                 }; |                 }; | ||||||
|  |  | ||||||
|                 Err(req_err) |                 Err(req_err) | ||||||
|   | |||||||
							
								
								
									
										112
									
								
								src/wait.rs
									
									
									
									
									
								
							
							
						
						
									
										112
									
								
								src/wait.rs
									
									
									
									
									
								
							| @@ -2,34 +2,18 @@ use std::sync::Arc; | |||||||
| use std::thread; | use std::thread; | ||||||
| use std::time::{Duration, Instant}; | use std::time::{Duration, Instant}; | ||||||
|  |  | ||||||
| use futures::{Async, Future, Stream}; | use futures::{Async, Future, Poll, Stream}; | ||||||
| use futures::executor::{self, Notify}; | use futures::executor::{self, Notify}; | ||||||
|  | use tokio_executor::{enter, EnterError}; | ||||||
|  |  | ||||||
| pub(crate) fn timeout<F>(fut: F, timeout: Option<Duration>) -> Result<F::Item, Waited<F::Error>> | pub(crate) fn timeout<F>(fut: F, timeout: Option<Duration>) -> Result<F::Item, Waited<F::Error>> | ||||||
| where F: Future { | where | ||||||
|     if let Some(dur) = timeout { |     F: Future, | ||||||
|         let start = Instant::now(); | { | ||||||
|         let deadline = start + dur; |     let mut spawn = executor::spawn(fut); | ||||||
|         let mut task = executor::spawn(fut); |     block_on(timeout, |notify| { | ||||||
|         let notify = Arc::new(ThreadNotify { |         spawn.poll_future_notify(notify, 0) | ||||||
|             thread: thread::current(), |     }) | ||||||
|         }); |  | ||||||
|  |  | ||||||
|         loop { |  | ||||||
|             let now = Instant::now(); |  | ||||||
|             if now >= deadline { |  | ||||||
|                 return Err(Waited::TimedOut); |  | ||||||
|             } |  | ||||||
|             match task.poll_future_notify(¬ify, 0)? { |  | ||||||
|                 Async::Ready(val) => return Ok(val), |  | ||||||
|                 Async::NotReady => { |  | ||||||
|                     thread::park_timeout(deadline - now); |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } else { |  | ||||||
|         fut.wait().map_err(From::from) |  | ||||||
|     } |  | ||||||
| } | } | ||||||
|  |  | ||||||
| pub(crate) fn stream<S>(stream: S, timeout: Option<Duration>) -> WaitStream<S> | pub(crate) fn stream<S>(stream: S, timeout: Option<Duration>) -> WaitStream<S> | ||||||
| @@ -43,12 +27,13 @@ where S: Stream { | |||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| pub(crate) enum Waited<E> { | pub(crate) enum Waited<E> { | ||||||
|     TimedOut, |     TimedOut, | ||||||
|     Err(E), |     Executor(EnterError), | ||||||
|  |     Inner(E), | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<E> From<E> for Waited<E> { | impl<E> From<E> for Waited<E> { | ||||||
|     fn from(err: E) -> Waited<E> { |     fn from(err: E) -> Waited<E> { | ||||||
|         Waited::Err(err) |         Waited::Inner(err) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -62,42 +47,14 @@ where S: Stream { | |||||||
|     type Item = Result<S::Item, Waited<S::Error>>; |     type Item = Result<S::Item, Waited<S::Error>>; | ||||||
|  |  | ||||||
|     fn next(&mut self) -> Option<Self::Item> { |     fn next(&mut self) -> Option<Self::Item> { | ||||||
|         if let Some(dur) = self.timeout { |         let res = block_on(self.timeout, |notify| { | ||||||
|             let start = Instant::now(); |             self.stream.poll_stream_notify(notify, 0) | ||||||
|             let deadline = start + dur; |  | ||||||
|             let notify = Arc::new(ThreadNotify { |  | ||||||
|                 thread: thread::current(), |  | ||||||
|         }); |         }); | ||||||
|  |  | ||||||
|             loop { |         match res { | ||||||
|                 let now = Instant::now(); |             Ok(Some(val)) => Some(Ok(val)), | ||||||
|                 if now >= deadline { |             Ok(None) => None, | ||||||
|                     return Some(Err(Waited::TimedOut)); |             Err(err) => Some(Err(err)), | ||||||
|                 } |  | ||||||
|                 match self.stream.poll_stream_notify(¬ify, 0) { |  | ||||||
|                     Ok(Async::Ready(Some(val))) => return Some(Ok(val)), |  | ||||||
|                     Ok(Async::Ready(None)) => return None, |  | ||||||
|                     Ok(Async::NotReady) => { |  | ||||||
|                         thread::park_timeout(deadline - now); |  | ||||||
|                     }, |  | ||||||
|                     Err(e) => return Some(Err(Waited::Err(e))), |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } else { |  | ||||||
|             let notify = Arc::new(ThreadNotify { |  | ||||||
|                 thread: thread::current(), |  | ||||||
|             }); |  | ||||||
|  |  | ||||||
|             loop { |  | ||||||
|                 match self.stream.poll_stream_notify(¬ify, 0) { |  | ||||||
|                     Ok(Async::Ready(Some(val))) => return Some(Ok(val)), |  | ||||||
|                     Ok(Async::Ready(None)) => return None, |  | ||||||
|                     Ok(Async::NotReady) => { |  | ||||||
|                         thread::park(); |  | ||||||
|                     }, |  | ||||||
|                     Err(e) => return Some(Err(Waited::Err(e))), |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -111,3 +68,36 @@ impl Notify for ThreadNotify { | |||||||
|         self.thread.unpark(); |         self.thread.unpark(); | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | fn block_on<F, U, E>(timeout: Option<Duration>, mut poll: F) -> Result<U, Waited<E>> | ||||||
|  | where | ||||||
|  |     F: FnMut(&Arc<ThreadNotify>) -> Poll<U, E>, | ||||||
|  | { | ||||||
|  |     let _entered = enter().map_err(Waited::Executor)?; | ||||||
|  |     let deadline = timeout.map(|d| { | ||||||
|  |         Instant::now() + d | ||||||
|  |     }); | ||||||
|  |     let notify = Arc::new(ThreadNotify { | ||||||
|  |         thread: thread::current(), | ||||||
|  |     }); | ||||||
|  |  | ||||||
|  |     loop { | ||||||
|  |         match poll(¬ify)? { | ||||||
|  |             Async::Ready(val) => return Ok(val), | ||||||
|  |             Async::NotReady => {} | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         if let Some(deadline) = deadline { | ||||||
|  |             let now = Instant::now(); | ||||||
|  |             if now >= deadline { | ||||||
|  |                 return Err(Waited::TimedOut); | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             thread::park_timeout(deadline - now); | ||||||
|  |         } else { | ||||||
|  |             thread::park(); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user