fix(lib): return an error instead of panic if execute fails

If executing an internal task fails, a new variant of `hyper::Error` is
returned to the user, with improved messaging.

If a non-critical task fails to spawn, it no longer panics, instead just
logging a warning.

Closes #1566
This commit is contained in:
Sean McArthur
2018-06-18 16:01:01 -07:00
parent 27db8b0061
commit 482a5f589e
7 changed files with 63 additions and 30 deletions

View File

@@ -270,13 +270,20 @@ where C: Connect + Sync + 'static,
.http2_only(pool_key.1 == Ver::Http2) .http2_only(pool_key.1 == Ver::Http2)
.handshake(io) .handshake(io)
.and_then(move |(tx, conn)| { .and_then(move |(tx, conn)| {
executor.execute(conn.map_err(|e| { let bg = executor.execute(conn.map_err(|e| {
debug!("client connection error: {}", e) debug!("client connection error: {}", e)
})); }));
// This task is critical, so an execute error
// should be returned.
if let Err(err) = bg {
warn!("error spawning critical client task: {}", err);
return Either::A(future::err(err));
}
// Wait for 'conn' to ready up before we // Wait for 'conn' to ready up before we
// declare this tx as usable // declare this tx as usable
tx.when_ready() Either::B(tx.when_ready())
}) })
.map(move |tx| { .map(move |tx| {
pool.pooled(connecting, PoolClient { pool.pooled(connecting, PoolClient {
@@ -373,26 +380,32 @@ where C: Connect + Sync + 'static,
} else if !res.body().is_end_stream() { } else if !res.body().is_end_stream() {
let (delayed_tx, delayed_rx) = oneshot::channel(); let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx); res.body_mut().delayed_eof(delayed_rx);
executor.execute( let on_idle = future::poll_fn(move || {
future::poll_fn(move || { pooled.poll_ready()
pooled.poll_ready() })
})
.then(move |_| { .then(move |_| {
// At this point, `pooled` is dropped, and had a chance // At this point, `pooled` is dropped, and had a chance
// to insert into the pool (if conn was idle) // to insert into the pool (if conn was idle)
drop(delayed_tx); drop(delayed_tx);
Ok(()) Ok(())
}) });
);
if let Err(err) = executor.execute(on_idle) {
// This task isn't critical, so just log and ignore.
warn!("error spawning task to insert idle connection: {}", err);
}
} else { } else {
// There's no body to delay, but the connection isn't // There's no body to delay, but the connection isn't
// ready yet. Only re-insert when it's ready // ready yet. Only re-insert when it's ready
executor.execute( let on_idle = future::poll_fn(move || {
future::poll_fn(move || { pooled.poll_ready()
pooled.poll_ready() })
}) .then(|_| Ok(()));
.then(|_| Ok(()))
); if let Err(err) = executor.execute(on_idle) {
// This task isn't critical, so just log and ignore.
warn!("error spawning task to insert idle connection: {}", err);
}
} }
Ok(res) Ok(res)
}); });

View File

@@ -407,12 +407,16 @@ impl<T: Poolable> Connections<T> {
let start = Instant::now() + dur; let start = Instant::now() + dur;
let interval = Interval::new(start, dur); let interval = IdleInterval {
self.exec.execute(IdleInterval { interval: Interval::new(start, dur),
interval: interval,
pool: WeakOpt::downgrade(pool_ref), pool: WeakOpt::downgrade(pool_ref),
pool_drop_notifier: rx, pool_drop_notifier: rx,
}); };
if let Err(err) = self.exec.execute(interval) {
// This task isn't critical, so simply log and ignore.
warn!("error spawning connection pool idle interval: {}", err);
}
} }
} }

View File

@@ -13,7 +13,7 @@ pub(crate) enum Exec {
impl Exec { impl Exec {
pub(crate) fn execute<F>(&self, fut: F) pub(crate) fn execute<F>(&self, fut: F) -> ::Result<()>
where where
F: Future<Item=(), Error=()> + Send + 'static, F: Future<Item=(), Error=()> + Send + 'static,
{ {
@@ -21,7 +21,13 @@ impl Exec {
Exec::Default => { Exec::Default => {
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
{ {
::tokio_executor::spawn(fut) use ::tokio_executor::Executor;
::tokio_executor::DefaultExecutor::current()
.spawn(Box::new(fut))
.map_err(|err| {
warn!("executor error: {:?}", err);
::Error::new_execute()
})
} }
#[cfg(not(feature = "runtime"))] #[cfg(not(feature = "runtime"))]
{ {
@@ -30,10 +36,11 @@ impl Exec {
} }
}, },
Exec::Executor(ref e) => { Exec::Executor(ref e) => {
let _ = e.execute(Box::new(fut)) e.execute(Box::new(fut))
.map_err(|err| { .map_err(|err| {
panic!("executor error: {:?}", err.kind()); warn!("executor error: {:?}", err.kind());
}); ::Error::new_execute()
})
}, },
} }
} }

View File

@@ -67,6 +67,9 @@ pub(crate) enum Kind {
/// User polled for an upgrade, but low-level API is not using upgrades. /// User polled for an upgrade, but low-level API is not using upgrades.
ManualUpgrade, ManualUpgrade,
/// Error trying to call `Executor::execute`.
Execute,
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@@ -114,7 +117,8 @@ impl Error {
Kind::Closed | Kind::Closed |
Kind::UnsupportedVersion | Kind::UnsupportedVersion |
Kind::UnsupportedRequestMethod | Kind::UnsupportedRequestMethod |
Kind::NoUpgrade => true, Kind::NoUpgrade |
Kind::Execute => true,
_ => false, _ => false,
} }
} }
@@ -130,7 +134,7 @@ impl Error {
} }
/// Returns the error's cause. /// Returns the error's cause.
/// ///
/// This is identical to `Error::cause` except that it provides extra /// This is identical to `Error::cause` except that it provides extra
/// bounds required to be able to downcast the error. /// bounds required to be able to downcast the error.
pub fn cause2(&self) -> Option<&(StdError + 'static + Sync + Send)> { pub fn cause2(&self) -> Option<&(StdError + 'static + Sync + Send)> {
@@ -244,6 +248,10 @@ impl Error {
Error::new(Kind::Shutdown, Some(Box::new(cause))) Error::new(Kind::Shutdown, Some(Box::new(cause)))
} }
pub(crate) fn new_execute() -> Error {
Error::new(Kind::Execute, None)
}
pub(crate) fn new_h2(cause: ::h2::Error) -> Error { pub(crate) fn new_h2(cause: ::h2::Error) -> Error {
Error::new(Kind::Http2, Some(Box::new(cause))) Error::new(Kind::Http2, Some(Box::new(cause)))
} }
@@ -297,6 +305,7 @@ impl StdError for Error {
Kind::UnsupportedRequestMethod => "request has unsupported HTTP method", Kind::UnsupportedRequestMethod => "request has unsupported HTTP method",
Kind::NoUpgrade => "no upgrade available", Kind::NoUpgrade => "no upgrade available",
Kind::ManualUpgrade => "upgrade expected but low level API in use", Kind::ManualUpgrade => "upgrade expected but low level API in use",
Kind::Execute => "executor failed to spawn task",
Kind::Io => "an IO error occurred", Kind::Io => "an IO error occurred",
} }

View File

@@ -94,7 +94,7 @@ where
} }
Err(Either::B((never, _))) => match never {}, Err(Either::B((never, _))) => match never {},
}); });
self.executor.execute(fut); self.executor.execute(fut)?;
State::Ready(request_tx, tx) State::Ready(request_tx, tx)
}, },
State::Ready(ref mut tx, ref conn_dropper) => { State::Ready(ref mut tx, ref conn_dropper) => {
@@ -129,7 +129,7 @@ where
drop(conn_drop_ref); drop(conn_drop_ref);
x x
}); });
self.executor.execute(pipe); self.executor.execute(pipe)?;
} }
let fut = fut let fut = fut
@@ -148,7 +148,7 @@ where
} }
Ok(()) Ok(())
}); });
self.executor.execute(fut); self.executor.execute(fut)?;
continue; continue;
}, },

View File

@@ -132,7 +132,7 @@ where
::Body::h2(stream, content_length) ::Body::h2(stream, content_length)
}); });
let fut = H2Stream::new(service.call(req), respond); let fut = H2Stream::new(service.call(req), respond);
exec.execute(fut); exec.execute(fut)?;
} }
// no more incoming streams... // no more incoming streams...

View File

@@ -641,7 +641,7 @@ where
// flatten basically // flatten basically
.and_then(|conn| conn.with_upgrades()) .and_then(|conn| conn.with_upgrades())
.map_err(|err| debug!("conn error: {}", err)); .map_err(|err| debug!("conn error: {}", err));
self.serve.protocol.exec.execute(fut); self.serve.protocol.exec.execute(fut)?;
} else { } else {
return Ok(Async::Ready(())) return Ok(Async::Ready(()))
} }