diff --git a/src/common/drain.rs b/src/common/drain.rs index 31ea9267..95eaaaeb 100644 --- a/src/common/drain.rs +++ b/src/common/drain.rs @@ -1,9 +1,9 @@ use std::mem; +use futures_util::FutureExt as _; use tokio_sync::{mpsc, watch}; use super::{Future, Never, Poll, Pin, task}; -use futures_util::FutureExt as _; // Sentinel value signaling that the watch is still open enum Action { @@ -100,9 +100,10 @@ where loop { match mem::replace(&mut me.state, State::Draining) { State::Watch(on_drain) => { - let mut recv_fut = me.watch.rx.recv_ref().boxed(); + let recv = me.watch.rx.recv_ref(); + futures_util::pin_mut!(recv); - match recv_fut.poll_unpin(cx) { + match recv.poll_unpin(cx) { Poll::Ready(None) => { // Drain has been triggered! on_drain(unsafe { Pin::new_unchecked(&mut me.future) });