feat(server): add Server::with_graceful_shutdown method
This adds a "combinator" method to `Server`, which accepts a user's future to "select" on. All connections received by the `Server` will be tracked, and if the user's future finishes, graceful shutdown will begin. - The listener will be closed immediately. - The currently active connections will all be notified to start a graceful shutdown. For HTTP/1, that means finishing the existing response and using `connection: clone`. For HTTP/2, the graceful `GOAWAY` process is started. - Once all active connections have terminated, the graceful future will return. Closes #1575
This commit is contained in:
115
src/common/drain.rs
Normal file
115
src/common/drain.rs
Normal file
@@ -0,0 +1,115 @@
|
||||
use std::mem;
|
||||
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use futures::future::Shared;
|
||||
use futures::sync::{mpsc, oneshot};
|
||||
|
||||
use super::Never;
|
||||
|
||||
pub fn channel() -> (Signal, Watch) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let (drained_tx, drained_rx) = mpsc::channel(0);
|
||||
(
|
||||
Signal {
|
||||
drained_rx,
|
||||
tx,
|
||||
},
|
||||
Watch {
|
||||
drained_tx,
|
||||
rx: rx.shared(),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub struct Signal {
|
||||
drained_rx: mpsc::Receiver<Never>,
|
||||
tx: oneshot::Sender<()>,
|
||||
}
|
||||
|
||||
pub struct Draining {
|
||||
drained_rx: mpsc::Receiver<Never>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Watch {
|
||||
drained_tx: mpsc::Sender<Never>,
|
||||
rx: Shared<oneshot::Receiver<()>>,
|
||||
}
|
||||
|
||||
pub struct Watching<F, FN> {
|
||||
future: F,
|
||||
state: State<FN>,
|
||||
watch: Watch,
|
||||
}
|
||||
|
||||
enum State<F> {
|
||||
Watch(F),
|
||||
Draining,
|
||||
}
|
||||
|
||||
impl Signal {
|
||||
pub fn drain(self) -> Draining {
|
||||
let _ = self.tx.send(());
|
||||
Draining {
|
||||
drained_rx: self.drained_rx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Draining {
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match try_ready!(self.drained_rx.poll()) {
|
||||
Some(never) => match never {},
|
||||
None => Ok(Async::Ready(())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Watch {
|
||||
pub fn watch<F, FN>(self, future: F, on_drain: FN) -> Watching<F, FN>
|
||||
where
|
||||
F: Future,
|
||||
FN: FnOnce(&mut F),
|
||||
{
|
||||
Watching {
|
||||
future,
|
||||
state: State::Watch(on_drain),
|
||||
watch: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, FN> Future for Watching<F, FN>
|
||||
where
|
||||
F: Future,
|
||||
FN: FnOnce(&mut F),
|
||||
{
|
||||
type Item = F::Item;
|
||||
type Error = F::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
match mem::replace(&mut self.state, State::Draining) {
|
||||
State::Watch(on_drain) => {
|
||||
match self.watch.rx.poll() {
|
||||
Ok(Async::Ready(_)) | Err(_) => {
|
||||
// Drain has been triggered!
|
||||
on_drain(&mut self.future);
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
self.state = State::Watch(on_drain);
|
||||
return self.future.poll();
|
||||
},
|
||||
}
|
||||
},
|
||||
State::Draining => {
|
||||
return self.future.poll();
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
mod buf;
|
||||
pub(crate) mod drain;
|
||||
mod exec;
|
||||
pub(crate) mod io;
|
||||
mod lazy;
|
||||
|
||||
@@ -85,7 +85,7 @@ pub struct Connecting<I, F> {
|
||||
#[must_use = "futures do nothing unless polled"]
|
||||
#[derive(Debug)]
|
||||
pub(super) struct SpawnAll<I, S> {
|
||||
serve: Serve<I, S>,
|
||||
pub(super) serve: Serve<I, S>,
|
||||
}
|
||||
|
||||
/// A future binding a connection with a Service.
|
||||
@@ -618,7 +618,7 @@ impl<I, S> SpawnAll<I, S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S, B> Future for SpawnAll<I, S>
|
||||
impl<I, S, B> SpawnAll<I, S>
|
||||
where
|
||||
I: Stream,
|
||||
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
@@ -630,16 +630,19 @@ where
|
||||
<S::Service as Service>::Future: Send + 'static,
|
||||
B: Payload,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
pub(super) fn poll_with<F1, F2, R>(&mut self, per_connection: F1) -> Poll<(), ::Error>
|
||||
where
|
||||
F1: Fn() -> F2,
|
||||
F2: FnOnce(UpgradeableConnection<I::Item, S::Service>) -> R + Send + 'static,
|
||||
R: Future<Item=(), Error=::Error> + Send + 'static,
|
||||
{
|
||||
loop {
|
||||
if let Some(connecting) = try_ready!(self.serve.poll()) {
|
||||
let and_then = per_connection();
|
||||
let fut = connecting
|
||||
.map_err(::Error::new_user_new_service)
|
||||
// flatten basically
|
||||
.and_then(|conn| conn.with_upgrades())
|
||||
.and_then(|conn| and_then(conn.with_upgrades()))
|
||||
.map_err(|err| debug!("conn error: {}", err));
|
||||
self.serve.protocol.exec.execute(fut)?;
|
||||
} else {
|
||||
|
||||
@@ -51,6 +51,7 @@
|
||||
//! ```
|
||||
|
||||
pub mod conn;
|
||||
mod shutdown;
|
||||
#[cfg(feature = "runtime")] mod tcp;
|
||||
|
||||
use std::fmt;
|
||||
@@ -67,6 +68,7 @@ use service::{NewService, Service};
|
||||
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
|
||||
// error that `hyper::server::Http` is private...
|
||||
use self::conn::{Http as Http_, SpawnAll};
|
||||
use self::shutdown::Graceful;
|
||||
#[cfg(feature = "runtime")] use self::tcp::AddrIncoming;
|
||||
|
||||
/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
|
||||
@@ -136,6 +138,65 @@ impl<S> Server<AddrIncoming, S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S, B> Server<I, S>
|
||||
where
|
||||
I: Stream,
|
||||
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
I::Item: AsyncRead + AsyncWrite + Send + 'static,
|
||||
S: NewService<ReqBody=Body, ResBody=B> + Send + 'static,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
S::Service: Send,
|
||||
S::Future: Send + 'static,
|
||||
<S::Service as Service>::Future: Send + 'static,
|
||||
B: Payload,
|
||||
{
|
||||
/// Prepares a server to handle graceful shutdown when the provided future
|
||||
/// completes.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// # extern crate hyper;
|
||||
/// # extern crate futures;
|
||||
/// # use futures::Future;
|
||||
/// # fn main() {}
|
||||
/// # #[cfg(feature = "runtime")]
|
||||
/// # fn run() {
|
||||
/// # use hyper::{Body, Response, Server};
|
||||
/// # use hyper::service::service_fn_ok;
|
||||
/// # let new_service = || {
|
||||
/// # service_fn_ok(|_req| {
|
||||
/// # Response::new(Body::from("Hello World"))
|
||||
/// # })
|
||||
/// # };
|
||||
///
|
||||
/// // Make a server from the previous examples...
|
||||
/// let server = Server::bind(&([127, 0, 0, 1], 3000).into())
|
||||
/// .serve(new_service);
|
||||
///
|
||||
/// // Prepare some signal for when the server should start
|
||||
/// // shutting down...
|
||||
/// let (tx, rx) = futures::sync::oneshot::channel::<()>();
|
||||
///
|
||||
/// let graceful = server
|
||||
/// .with_graceful_shutdown(rx)
|
||||
/// .map_err(|err| eprintln!("server error: {}", err));
|
||||
///
|
||||
/// // Spawn `server` onto an Executor...
|
||||
/// hyper::rt::spawn(graceful);
|
||||
///
|
||||
/// // And later, trigger the signal by calling `tx.send(())`.
|
||||
/// let _ = tx.send(());
|
||||
/// # }
|
||||
/// ```
|
||||
pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F>
|
||||
where
|
||||
F: Future<Item=()>
|
||||
{
|
||||
Graceful::new(self.spawn_all, signal)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, S, B> Future for Server<I, S>
|
||||
where
|
||||
I: Stream,
|
||||
@@ -152,7 +213,7 @@ where
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
self.spawn_all.poll()
|
||||
self.spawn_all.poll_with(|| |conn| conn)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
93
src/server/shutdown.rs
Normal file
93
src/server/shutdown.rs
Normal file
@@ -0,0 +1,93 @@
|
||||
use futures::{Async, Future, Stream, Poll};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use body::{Body, Payload};
|
||||
use common::drain::{self, Draining, Signal, Watch};
|
||||
use service::{Service, NewService};
|
||||
use super::SpawnAll;
|
||||
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Graceful<I, S, F> {
|
||||
state: State<I, S, F>,
|
||||
}
|
||||
|
||||
enum State<I, S, F> {
|
||||
Running {
|
||||
drain: Option<(Signal, Watch)>,
|
||||
spawn_all: SpawnAll<I, S>,
|
||||
signal: F,
|
||||
},
|
||||
Draining(Draining),
|
||||
}
|
||||
|
||||
impl<I, S, F> Graceful<I, S, F> {
|
||||
pub(super) fn new(spawn_all: SpawnAll<I, S>, signal: F) -> Self {
|
||||
let drain = Some(drain::channel());
|
||||
Graceful {
|
||||
state: State::Running {
|
||||
drain,
|
||||
spawn_all,
|
||||
signal,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<I, S, B, F> Future for Graceful<I, S, F>
|
||||
where
|
||||
I: Stream,
|
||||
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
I::Item: AsyncRead + AsyncWrite + Send + 'static,
|
||||
S: NewService<ReqBody=Body, ResBody=B> + Send + 'static,
|
||||
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
|
||||
S::Service: Send,
|
||||
S::Future: Send + 'static,
|
||||
<S::Service as Service>::Future: Send + 'static,
|
||||
B: Payload,
|
||||
F: Future<Item=()>,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
loop {
|
||||
let next = match self.state {
|
||||
State::Running {
|
||||
ref mut drain,
|
||||
ref mut spawn_all,
|
||||
ref mut signal,
|
||||
} => match signal.poll() {
|
||||
Ok(Async::Ready(())) | Err(_) => {
|
||||
debug!("signal received, starting graceful shutdown");
|
||||
let sig = drain
|
||||
.take()
|
||||
.expect("drain channel")
|
||||
.0;
|
||||
State::Draining(sig.drain())
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
let watch = &drain
|
||||
.as_ref()
|
||||
.expect("drain channel")
|
||||
.1;
|
||||
return spawn_all.poll_with(|| {
|
||||
let watch = watch.clone();
|
||||
move |conn| {
|
||||
watch.watch(conn, |conn| {
|
||||
// on_drain, start conn graceful shutdown
|
||||
conn.graceful_shutdown()
|
||||
})
|
||||
}
|
||||
});
|
||||
},
|
||||
},
|
||||
State::Draining(ref mut draining) => {
|
||||
return draining.poll()
|
||||
.map_err(|()| unreachable!("drain mpsc rx never errors"));
|
||||
}
|
||||
};
|
||||
self.state = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user