diff --git a/src/common/drain.rs b/src/common/drain.rs index e98f5b34..4f04fd61 100644 --- a/src/common/drain.rs +++ b/src/common/drain.rs @@ -1,37 +1,23 @@ use std::mem; use pin_project::pin_project; -use tokio::stream::Stream; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; -use super::{task, Future, Never, Pin, Poll}; +use super::{task, Future, Pin, Poll}; pub fn channel() -> (Signal, Watch) { let (tx, rx) = watch::channel(()); - let (drained_tx, drained_rx) = mpsc::channel(1); - ( - Signal { - drained_rx, - _tx: tx, - }, - Watch { drained_tx, rx }, - ) + (Signal { tx }, Watch { rx }) } pub struct Signal { - drained_rx: mpsc::Receiver, - _tx: watch::Sender<()>, + tx: watch::Sender<()>, } -#[pin_project::pin_project] -pub struct Draining { - #[pin] - drained_rx: mpsc::Receiver, -} +pub struct Draining(Pin + Send + Sync>>); #[derive(Clone)] pub struct Watch { - drained_tx: mpsc::Sender, rx: watch::Receiver<()>, } @@ -42,7 +28,7 @@ pub struct Watching { future: F, state: State, watch: Pin + Send + Sync>>, - _drained_tx: mpsc::Sender, + _rx: watch::Receiver<()>, } enum State { @@ -52,21 +38,16 @@ enum State { impl Signal { pub fn drain(self) -> Draining { - // Simply dropping `self.tx` will signal the watchers - Draining { - drained_rx: self.drained_rx, - } + let _ = self.tx.send(()); + Draining(Box::pin(async move { self.tx.closed().await })) } } impl Future for Draining { type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match ready!(self.project().drained_rx.poll_next(cx)) { - Some(never) => match never {}, - None => Poll::Ready(()), - } + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + Pin::new(&mut self.as_mut().0).poll(cx) } } @@ -76,14 +57,17 @@ impl Watch { F: Future, FN: FnOnce(Pin<&mut F>), { - let Self { drained_tx, mut rx } = self; + let Self { mut rx } = self; + let _rx = rx.clone(); Watching { future, state: State::Watch(on_drain), watch: Box::pin(async move { let _ = rx.changed().await; }), - _drained_tx: drained_tx, + // Keep the receiver alive until the future completes, so that + // dropping it can signal that draining has completed. + _rx, } } }