diff --git a/src/client.rs b/src/client.rs index 7633194..c1705fb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -636,9 +636,12 @@ impl ClientHandle { let res = match wait::timeout(fut, self.timeout.0) { Ok(res) => res, Err(wait::Waited::TimedOut) => return Err(::error::timedout(Some(url))), - Err(wait::Waited::Err(err)) => { + Err(wait::Waited::Executor(err)) => { + return Err(::error::from(err).with_url(url)) + }, + Err(wait::Waited::Inner(err)) => { return Err(err.with_url(url)); - } + }, }; res.map(|res| { Response::new(res, self.timeout.0, KeepCoreThreadAlive(Some(self.inner.clone()))) diff --git a/src/error.rs b/src/error.rs index 9f07766..84c9d13 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,6 +2,8 @@ use std::error::Error as StdError; use std::fmt; use std::io; +use tokio_executor::EnterError; + use {StatusCode, Url}; /// The Errors that may occur when processing a `Request`. @@ -146,6 +148,7 @@ impl Error { Kind::Io(ref e) => Some(e), Kind::UrlEncoded(ref e) => Some(e), Kind::Json(ref e) => Some(e), + Kind::Executor(ref e) => Some(e), Kind::UrlBadScheme | Kind::TooManyRedirects | Kind::RedirectLoop | @@ -284,6 +287,7 @@ impl fmt::Display for Error { } Kind::UnknownProxyScheme => f.write_str("Unknown proxy scheme"), Kind::Timer => f.write_str("timer unavailable"), + Kind::Executor(ref e) => fmt::Display::fmt(e, f), } } } @@ -320,6 +324,7 @@ impl StdError for Error { } Kind::UnknownProxyScheme => "Unknown proxy scheme", Kind::Timer => "timer unavailable", + Kind::Executor(ref e) => e.description(), } } @@ -342,6 +347,7 @@ impl StdError for Error { Kind::Io(ref e) => e.cause(), Kind::UrlEncoded(ref e) => e.cause(), Kind::Json(ref e) => e.cause(), + Kind::Executor(ref e) => e.cause(), Kind::UrlBadScheme | Kind::TooManyRedirects | Kind::RedirectLoop | @@ -368,6 +374,7 @@ impl StdError for Error { Kind::Io(ref e) => e.source(), Kind::UrlEncoded(ref e) => e.source(), Kind::Json(ref e) => e.source(), + Kind::Executor(ref e) => e.source(), Kind::UrlBadScheme | Kind::TooManyRedirects | Kind::RedirectLoop | @@ -401,6 +408,7 @@ pub(crate) enum Kind { Status(StatusCode), UnknownProxyScheme, Timer, + Executor(EnterError), } @@ -472,11 +480,18 @@ where T: Into { fn from(err: ::wait::Waited) -> Kind { match err { ::wait::Waited::TimedOut => io_timeout().into(), - ::wait::Waited::Err(e) => e.into(), + ::wait::Waited::Executor(e) => e.into(), + ::wait::Waited::Inner(e) => e.into(), } } } +impl From for Kind { + fn from(err: EnterError) -> Kind { + Kind::Executor(err) + } +} + impl From<::tokio::timer::Error> for Kind { fn from(_err: ::tokio::timer::Error) -> Kind { Kind::Timer diff --git a/src/lib.rs b/src/lib.rs index fae1015..4712a68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -197,6 +197,7 @@ extern crate serde_json; extern crate serde_urlencoded; extern crate time; extern crate tokio; +extern crate tokio_executor; #[cfg_attr(feature = "default-tls", macro_use)] extern crate tokio_io; extern crate tokio_timer; diff --git a/src/response.rs b/src/response.rs index fc6e46f..843c12e 100644 --- a/src/response.rs +++ b/src/response.rs @@ -411,7 +411,8 @@ impl Stream for WaitBody { Some(Err(e)) => { let req_err = match e { wait::Waited::TimedOut => ::error::timedout(None), - wait::Waited::Err(e) => e, + wait::Waited::Executor(e) => ::error::from(e), + wait::Waited::Inner(e) => e, }; Err(req_err) diff --git a/src/wait.rs b/src/wait.rs index d6c975e..794463f 100644 --- a/src/wait.rs +++ b/src/wait.rs @@ -2,34 +2,18 @@ use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; -use futures::{Async, Future, Stream}; +use futures::{Async, Future, Poll, Stream}; use futures::executor::{self, Notify}; +use tokio_executor::{enter, EnterError}; pub(crate) fn timeout(fut: F, timeout: Option) -> Result> -where F: Future { - if let Some(dur) = timeout { - let start = Instant::now(); - let deadline = start + dur; - let mut task = executor::spawn(fut); - let notify = Arc::new(ThreadNotify { - thread: thread::current(), - }); - - loop { - let now = Instant::now(); - if now >= deadline { - return Err(Waited::TimedOut); - } - match task.poll_future_notify(¬ify, 0)? { - Async::Ready(val) => return Ok(val), - Async::NotReady => { - thread::park_timeout(deadline - now); - } - } - } - } else { - fut.wait().map_err(From::from) - } +where + F: Future, +{ + let mut spawn = executor::spawn(fut); + block_on(timeout, |notify| { + spawn.poll_future_notify(notify, 0) + }) } pub(crate) fn stream(stream: S, timeout: Option) -> WaitStream @@ -43,12 +27,13 @@ where S: Stream { #[derive(Debug)] pub(crate) enum Waited { TimedOut, - Err(E), + Executor(EnterError), + Inner(E), } impl From for Waited { fn from(err: E) -> Waited { - Waited::Err(err) + Waited::Inner(err) } } @@ -62,42 +47,14 @@ where S: Stream { type Item = Result>; fn next(&mut self) -> Option { - if let Some(dur) = self.timeout { - let start = Instant::now(); - let deadline = start + dur; - let notify = Arc::new(ThreadNotify { - thread: thread::current(), - }); + let res = block_on(self.timeout, |notify| { + self.stream.poll_stream_notify(notify, 0) + }); - loop { - let now = Instant::now(); - if now >= deadline { - return Some(Err(Waited::TimedOut)); - } - match self.stream.poll_stream_notify(¬ify, 0) { - Ok(Async::Ready(Some(val))) => return Some(Ok(val)), - Ok(Async::Ready(None)) => return None, - Ok(Async::NotReady) => { - thread::park_timeout(deadline - now); - }, - Err(e) => return Some(Err(Waited::Err(e))), - } - } - } else { - let notify = Arc::new(ThreadNotify { - thread: thread::current(), - }); - - loop { - match self.stream.poll_stream_notify(¬ify, 0) { - Ok(Async::Ready(Some(val))) => return Some(Ok(val)), - Ok(Async::Ready(None)) => return None, - Ok(Async::NotReady) => { - thread::park(); - }, - Err(e) => return Some(Err(Waited::Err(e))), - } - } + match res { + Ok(Some(val)) => Some(Ok(val)), + Ok(None) => None, + Err(err) => Some(Err(err)), } } } @@ -111,3 +68,36 @@ impl Notify for ThreadNotify { self.thread.unpark(); } } + +fn block_on(timeout: Option, mut poll: F) -> Result> +where + F: FnMut(&Arc) -> Poll, +{ + let _entered = enter().map_err(Waited::Executor)?; + let deadline = timeout.map(|d| { + Instant::now() + d + }); + let notify = Arc::new(ThreadNotify { + thread: thread::current(), + }); + + loop { + match poll(¬ify)? { + Async::Ready(val) => return Ok(val), + Async::NotReady => {} + } + + if let Some(deadline) = deadline { + let now = Instant::now(); + if now >= deadline { + return Err(Waited::TimedOut); + } + + thread::park_timeout(deadline - now); + } else { + thread::park(); + } + } +} + +