From b3b8010e1c6eadc2769103e13f1c1d8cca521a5b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 12 Nov 2020 13:34:19 -0800 Subject: [PATCH] refactor(server): simplify drain (#2328) The current implementation of `drain` uses a `tokio::sync::watch` channel to send the shutdown signal, and a `tokio::sync::mpsc` to signal when all draining tasks have completed. No data is ever actually sent on the MPSC; instead, it is simply used to notify the task that signalled the drain when all draining tasks have been dropped. Tokio 0.3's `watch::Sender` has a `closed` method that can be used to await the dropping of all receivers. This can be used instead of the MPSC channel. This commit updates `drain` to use `watch::Sender::closed` instead. This has fewer moving parts, and may have slightly less overhead (as it doesn't require additional allocation forthe MPSC which is never actually used). Signed-off-by: Eliza Weisman --- src/common/drain.rs | 46 +++++++++++++++------------------------------ 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/src/common/drain.rs b/src/common/drain.rs index e98f5b34..4f04fd61 100644 --- a/src/common/drain.rs +++ b/src/common/drain.rs @@ -1,37 +1,23 @@ use std::mem; use pin_project::pin_project; -use tokio::stream::Stream; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; -use super::{task, Future, Never, Pin, Poll}; +use super::{task, Future, Pin, Poll}; pub fn channel() -> (Signal, Watch) { let (tx, rx) = watch::channel(()); - let (drained_tx, drained_rx) = mpsc::channel(1); - ( - Signal { - drained_rx, - _tx: tx, - }, - Watch { drained_tx, rx }, - ) + (Signal { tx }, Watch { rx }) } pub struct Signal { - drained_rx: mpsc::Receiver, - _tx: watch::Sender<()>, + tx: watch::Sender<()>, } -#[pin_project::pin_project] -pub struct Draining { - #[pin] - drained_rx: mpsc::Receiver, -} +pub struct Draining(Pin + Send + Sync>>); #[derive(Clone)] pub struct Watch { - drained_tx: mpsc::Sender, rx: watch::Receiver<()>, } @@ -42,7 +28,7 @@ pub struct Watching { future: F, state: State, watch: Pin + Send + Sync>>, - _drained_tx: mpsc::Sender, + _rx: watch::Receiver<()>, } enum State { @@ -52,21 +38,16 @@ enum State { impl Signal { pub fn drain(self) -> Draining { - // Simply dropping `self.tx` will signal the watchers - Draining { - drained_rx: self.drained_rx, - } + let _ = self.tx.send(()); + Draining(Box::pin(async move { self.tx.closed().await })) } } impl Future for Draining { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match ready!(self.project().drained_rx.poll_next(cx)) { - Some(never) => match never {}, - None => Poll::Ready(()), - } + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + Pin::new(&mut self.as_mut().0).poll(cx) } } @@ -76,14 +57,17 @@ impl Watch { F: Future, FN: FnOnce(Pin<&mut F>), { - let Self { drained_tx, mut rx } = self; + let Self { mut rx } = self; + let _rx = rx.clone(); Watching { future, state: State::Watch(on_drain), watch: Box::pin(async move { let _ = rx.changed().await; }), - _drained_tx: drained_tx, + // Keep the receiver alive until the future completes, so that + // dropping it can signal that draining has completed. + _rx, } } }