128 lines
4.0 KiB
Rust
128 lines
4.0 KiB
Rust
use std::error::Error as StdError;
|
|
|
|
use pin_project_lite::pin_project;
|
|
use tokio::io::{AsyncRead, AsyncWrite};
|
|
use tracing::debug;
|
|
|
|
use super::accept::Accept;
|
|
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
|
|
use crate::body::{Body, HttpBody};
|
|
use crate::common::drain::{self, Draining, Signal, Watch, Watching};
|
|
use crate::common::exec::{ConnStreamExec, NewSvcExec};
|
|
use crate::common::{task, Future, Pin, Poll, Unpin};
|
|
use crate::service::{HttpService, MakeServiceRef};
|
|
|
|
pin_project! {
|
|
#[allow(missing_debug_implementations)]
|
|
pub struct Graceful<I, S, F, E> {
|
|
#[pin]
|
|
state: State<I, S, F, E>,
|
|
}
|
|
}
|
|
|
|
pin_project! {
|
|
#[project = StateProj]
|
|
pub(super) enum State<I, S, F, E> {
|
|
Running {
|
|
drain: Option<(Signal, Watch)>,
|
|
#[pin]
|
|
spawn_all: SpawnAll<I, S, E>,
|
|
#[pin]
|
|
signal: F,
|
|
},
|
|
Draining { draining: Draining },
|
|
}
|
|
}
|
|
|
|
impl<I, S, F, E> Graceful<I, S, F, E> {
|
|
pub(super) fn new(spawn_all: SpawnAll<I, S, E>, signal: F) -> Self {
|
|
let drain = Some(drain::channel());
|
|
Graceful {
|
|
state: State::Running {
|
|
drain,
|
|
spawn_all,
|
|
signal,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<I, IO, IE, S, B, F, E> Future for Graceful<I, S, F, E>
|
|
where
|
|
I: Accept<Conn = IO, Error = IE>,
|
|
IE: Into<Box<dyn StdError + Send + Sync>>,
|
|
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
S: MakeServiceRef<IO, Body, ResBody = B>,
|
|
S::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
B: HttpBody + 'static,
|
|
B::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
F: Future<Output = ()>,
|
|
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
|
|
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
|
|
{
|
|
type Output = crate::Result<()>;
|
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
|
let mut me = self.project();
|
|
loop {
|
|
let next = {
|
|
match me.state.as_mut().project() {
|
|
StateProj::Running {
|
|
drain,
|
|
spawn_all,
|
|
signal,
|
|
} => match signal.poll(cx) {
|
|
Poll::Ready(()) => {
|
|
debug!("signal received, starting graceful shutdown");
|
|
let sig = drain.take().expect("drain channel").0;
|
|
State::Draining {
|
|
draining: sig.drain(),
|
|
}
|
|
}
|
|
Poll::Pending => {
|
|
let watch = drain.as_ref().expect("drain channel").1.clone();
|
|
return spawn_all.poll_watch(cx, &GracefulWatcher(watch));
|
|
}
|
|
},
|
|
StateProj::Draining { ref mut draining } => {
|
|
return Pin::new(draining).poll(cx).map(Ok);
|
|
}
|
|
}
|
|
};
|
|
me.state.set(next);
|
|
}
|
|
}
|
|
}
|
|
|
|
#[allow(missing_debug_implementations)]
|
|
#[derive(Clone)]
|
|
pub struct GracefulWatcher(Watch);
|
|
|
|
impl<I, S, E> Watcher<I, S, E> for GracefulWatcher
|
|
where
|
|
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
S: HttpService<Body>,
|
|
E: ConnStreamExec<S::Future, S::ResBody>,
|
|
S::ResBody: 'static,
|
|
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
{
|
|
type Future =
|
|
Watching<UpgradeableConnection<I, S, E>, fn(Pin<&mut UpgradeableConnection<I, S, E>>)>;
|
|
|
|
fn watch(&self, conn: UpgradeableConnection<I, S, E>) -> Self::Future {
|
|
self.0.clone().watch(conn, on_drain)
|
|
}
|
|
}
|
|
|
|
fn on_drain<I, S, E>(conn: Pin<&mut UpgradeableConnection<I, S, E>>)
|
|
where
|
|
S: HttpService<Body>,
|
|
S::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
I: AsyncRead + AsyncWrite + Unpin,
|
|
S::ResBody: HttpBody + 'static,
|
|
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
|
|
E: ConnStreamExec<S::Future, S::ResBody>,
|
|
{
|
|
conn.graceful_shutdown()
|
|
}
|