From e6a6ddef7c1a52e27b3d1a30d1f26eb16cd9b866 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 25 Mar 2020 13:04:45 -0700 Subject: [PATCH] refactor(client): replace futures mpsc for tokio mpsc in dispatcher --- src/client/dispatch.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index d28007df..87ac0b09 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,6 +1,6 @@ -use futures_channel::{mpsc, oneshot}; -use futures_core::Stream; +use futures_channel::oneshot; use futures_util::future; +use tokio::sync::mpsc; use crate::common::{task, Future, Pin, Poll}; @@ -8,7 +8,7 @@ pub type RetryPromise = oneshot::Receiver = oneshot::Receiver>; pub fn channel() -> (Sender, Receiver) { - let (tx, rx) = mpsc::unbounded(); + let (tx, rx) = mpsc::unbounded_channel(); let (giver, taker) = want::new(); let tx = Sender { buffered_once: false, @@ -81,9 +81,9 @@ impl Sender { } let (tx, rx) = oneshot::channel(); self.inner - .unbounded_send(Envelope(Some((val, Callback::Retry(tx))))) + .send(Envelope(Some((val, Callback::Retry(tx))))) .map(move |_| rx) - .map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) } pub fn send(&mut self, val: T) -> Result, T> { @@ -92,9 +92,9 @@ impl Sender { } let (tx, rx) = oneshot::channel(); self.inner - .unbounded_send(Envelope(Some((val, Callback::NoRetry(tx))))) + .send(Envelope(Some((val, Callback::NoRetry(tx))))) .map(move |_| rx) - .map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) } pub fn unbound(self) -> UnboundedSender { @@ -117,9 +117,9 @@ impl UnboundedSender { pub fn try_send(&mut self, val: T) -> Result, T> { let (tx, rx) = oneshot::channel(); self.inner - .unbounded_send(Envelope(Some((val, Callback::Retry(tx))))) + .send(Envelope(Some((val, Callback::Retry(tx))))) .map(move |_| rx) - .map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0) + .map_err(|mut e| (e.0).0.take().expect("envelope not dropped").0) } } @@ -142,7 +142,7 @@ impl Receiver { &mut self, cx: &mut task::Context<'_>, ) -> Poll)>> { - match Pin::new(&mut self.inner).poll_next(cx) { + match self.inner.poll_recv(cx) { Poll::Ready(item) => { Poll::Ready(item.map(|mut env| env.0.take().expect("envelope not dropped"))) } @@ -159,9 +159,8 @@ impl Receiver { } pub(crate) fn try_recv(&mut self) -> Option<(T, Callback)> { - match self.inner.try_next() { - Ok(Some(mut env)) => env.0.take(), - Ok(None) => None, + match self.inner.try_recv() { + Ok(mut env) => env.0.take(), Err(_) => None, } }