refactor(server): simplify drain (#2328)
The current implementation of `drain` uses a `tokio::sync::watch` channel to send the shutdown signal, and a `tokio::sync::mpsc` to signal when all draining tasks have completed. No data is ever actually sent on the MPSC; instead, it is simply used to notify the task that signalled the drain when all draining tasks have been dropped. Tokio 0.3's `watch::Sender` has a `closed` method that can be used to await the dropping of all receivers. This can be used instead of the MPSC channel. This commit updates `drain` to use `watch::Sender::closed` instead. This has fewer moving parts, and may have slightly less overhead (as it doesn't require additional allocation forthe MPSC which is never actually used). Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
@@ -1,37 +1,23 @@
|
|||||||
use std::mem;
|
use std::mem;
|
||||||
|
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
use tokio::stream::Stream;
|
use tokio::sync::watch;
|
||||||
use tokio::sync::{mpsc, watch};
|
|
||||||
|
|
||||||
use super::{task, Future, Never, Pin, Poll};
|
use super::{task, Future, Pin, Poll};
|
||||||
|
|
||||||
pub fn channel() -> (Signal, Watch) {
|
pub fn channel() -> (Signal, Watch) {
|
||||||
let (tx, rx) = watch::channel(());
|
let (tx, rx) = watch::channel(());
|
||||||
let (drained_tx, drained_rx) = mpsc::channel(1);
|
(Signal { tx }, Watch { rx })
|
||||||
(
|
|
||||||
Signal {
|
|
||||||
drained_rx,
|
|
||||||
_tx: tx,
|
|
||||||
},
|
|
||||||
Watch { drained_tx, rx },
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Signal {
|
pub struct Signal {
|
||||||
drained_rx: mpsc::Receiver<Never>,
|
tx: watch::Sender<()>,
|
||||||
_tx: watch::Sender<()>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project::pin_project]
|
pub struct Draining(Pin<Box<dyn Future<Output = ()> + Send + Sync>>);
|
||||||
pub struct Draining {
|
|
||||||
#[pin]
|
|
||||||
drained_rx: mpsc::Receiver<Never>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Watch {
|
pub struct Watch {
|
||||||
drained_tx: mpsc::Sender<Never>,
|
|
||||||
rx: watch::Receiver<()>,
|
rx: watch::Receiver<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -42,7 +28,7 @@ pub struct Watching<F, FN> {
|
|||||||
future: F,
|
future: F,
|
||||||
state: State<FN>,
|
state: State<FN>,
|
||||||
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
|
watch: Pin<Box<dyn Future<Output = ()> + Send + Sync>>,
|
||||||
_drained_tx: mpsc::Sender<Never>,
|
_rx: watch::Receiver<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum State<F> {
|
enum State<F> {
|
||||||
@@ -52,21 +38,16 @@ enum State<F> {
|
|||||||
|
|
||||||
impl Signal {
|
impl Signal {
|
||||||
pub fn drain(self) -> Draining {
|
pub fn drain(self) -> Draining {
|
||||||
// Simply dropping `self.tx` will signal the watchers
|
let _ = self.tx.send(());
|
||||||
Draining {
|
Draining(Box::pin(async move { self.tx.closed().await }))
|
||||||
drained_rx: self.drained_rx,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for Draining {
|
impl Future for Draining {
|
||||||
type Output = ();
|
type Output = ();
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||||
match ready!(self.project().drained_rx.poll_next(cx)) {
|
Pin::new(&mut self.as_mut().0).poll(cx)
|
||||||
Some(never) => match never {},
|
|
||||||
None => Poll::Ready(()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -76,14 +57,17 @@ impl Watch {
|
|||||||
F: Future,
|
F: Future,
|
||||||
FN: FnOnce(Pin<&mut F>),
|
FN: FnOnce(Pin<&mut F>),
|
||||||
{
|
{
|
||||||
let Self { drained_tx, mut rx } = self;
|
let Self { mut rx } = self;
|
||||||
|
let _rx = rx.clone();
|
||||||
Watching {
|
Watching {
|
||||||
future,
|
future,
|
||||||
state: State::Watch(on_drain),
|
state: State::Watch(on_drain),
|
||||||
watch: Box::pin(async move {
|
watch: Box::pin(async move {
|
||||||
let _ = rx.changed().await;
|
let _ = rx.changed().await;
|
||||||
}),
|
}),
|
||||||
_drained_tx: drained_tx,
|
// Keep the receiver alive until the future completes, so that
|
||||||
|
// dropping it can signal that draining has completed.
|
||||||
|
_rx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user