use std::error::Error as StdError; use futures::{Async, Future, Stream, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use body::{Body, Payload}; use common::drain::{self, Draining, Signal, Watch, Watching}; use common::exec::{H2Exec, NewSvcExec}; use service::{MakeServiceRef, Service}; use super::conn::{SpawnAll, UpgradeableConnection, Watcher}; #[allow(missing_debug_implementations)] pub struct Graceful { state: State, } enum State { Running { drain: Option<(Signal, Watch)>, spawn_all: SpawnAll, signal: F, }, Draining(Draining), } impl Graceful { pub(super) fn new(spawn_all: SpawnAll, signal: F) -> Self { let drain = Some(drain::channel()); Graceful { state: State::Running { drain, spawn_all, signal, }, } } } impl Future for Graceful where I: Stream, I::Error: Into>, I::Item: AsyncRead + AsyncWrite + Send + 'static, S: MakeServiceRef, S::Service: 'static, S::Error: Into>, B: Payload, F: Future, E: H2Exec<::Future, B>, E: NewSvcExec, { type Item = (); type Error = ::Error; fn poll(&mut self) -> Poll { loop { let next = match self.state { State::Running { ref mut drain, ref mut spawn_all, ref mut signal, } => match signal.poll() { Ok(Async::Ready(())) | Err(_) => { debug!("signal received, starting graceful shutdown"); let sig = drain .take() .expect("drain channel") .0; State::Draining(sig.drain()) }, Ok(Async::NotReady) => { let watch = drain .as_ref() .expect("drain channel") .1 .clone(); return spawn_all.poll_watch(&GracefulWatcher(watch)); }, }, State::Draining(ref mut draining) => { return draining.poll() .map_err(|()| unreachable!("drain mpsc rx never errors")); } }; self.state = next; } } } #[allow(missing_debug_implementations)] #[derive(Clone)] pub struct GracefulWatcher(Watch); impl Watcher for GracefulWatcher where I: AsyncRead + AsyncWrite + Send + 'static, S: Service + 'static, E: H2Exec, { type Future = Watching, fn(&mut UpgradeableConnection)>; fn watch(&self, conn: UpgradeableConnection) -> Self::Future { self .0 .clone() .watch(conn, on_drain) } } fn on_drain(conn: &mut UpgradeableConnection) where S: Service, S::Error: Into>, I: AsyncRead + AsyncWrite, S::ResBody: Payload + 'static, E: H2Exec, { conn.graceful_shutdown() }