From 5db85316a10d0b7bdd36524d85a746a23bd10190 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 16 Mar 2018 17:48:59 -0700 Subject: [PATCH] refactor(client): replace signal mod with want crate --- Cargo.toml | 1 + src/client/dispatch.rs | 10 +-- src/client/mod.rs | 2 - src/client/signal.rs | 188 ----------------------------------------- src/lib.rs | 1 + 5 files changed, 7 insertions(+), 195 deletions(-) delete mode 100644 src/client/signal.rs diff --git a/Cargo.toml b/Cargo.toml index b5713262..ed5e1052 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ tokio = "0.1.3" tokio-executor = "0.1.0" tokio-service = "0.1" tokio-io = "0.1" +want = "0.0.2" [dev-dependencies] num_cpus = "1.0" diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 1dffc1f8..51eae1d4 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -1,8 +1,8 @@ use futures::{Async, Poll, Stream}; use futures::sync::{mpsc, oneshot}; +use want; use common::Never; -use super::signal; //pub type Callback = oneshot::Sender)>>; pub type RetryPromise = oneshot::Receiver)>>; @@ -10,7 +10,7 @@ pub type Promise = oneshot::Receiver>; pub fn channel() -> (Sender, Receiver) { let (tx, rx) = mpsc::channel(0); - let (giver, taker) = signal::new(); + let (giver, taker) = want::new(); let tx = Sender { giver: giver, inner: tx, @@ -27,7 +27,7 @@ pub struct Sender { // when the queue is empty. This helps us know when a request and // response have been fully processed, and a connection is ready // for more. - giver: signal::Giver, + giver: want::Giver, //inner: mpsc::Sender<(T, Callback)>, inner: mpsc::Sender>, } @@ -68,7 +68,7 @@ impl Sender { pub struct Receiver { //inner: mpsc::Receiver<(T, Callback)>, inner: mpsc::Receiver>, - taker: signal::Taker, + taker: want::Taker, } impl Stream for Receiver { @@ -229,7 +229,7 @@ mod tests { #[cfg(feature = "nightly")] #[bench] fn giver_queue_cancel(b: &mut test::Bencher) { - let (_tx, rx) = super::channel::(); + let (_tx, mut rx) = super::channel::(); b.iter(move || { rx.taker.cancel(); diff --git a/src/client/mod.rs b/src/client/mod.rs index 1761ae7a..94c96f63 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -26,11 +26,9 @@ use self::connect::Destination; pub mod conn; pub mod connect; -//TODO(easy): move cancel and dispatch into common instead pub(crate) mod dispatch; mod dns; mod pool; -mod signal; #[cfg(test)] mod tests; diff --git a/src/client/signal.rs b/src/client/signal.rs deleted file mode 100644 index 2ddf67f7..00000000 --- a/src/client/signal.rs +++ /dev/null @@ -1,188 +0,0 @@ -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use futures::{Async, Poll}; -use futures::task::{self, Task}; - -use self::lock::Lock; - -pub fn new() -> (Giver, Taker) { - let inner = Arc::new(Inner { - state: AtomicUsize::new(STATE_IDLE), - task: Lock::new(None), - }); - let inner2 = inner.clone(); - ( - Giver { - inner: inner, - }, - Taker { - inner: inner2, - }, - ) -} - -#[derive(Clone)] -pub struct Giver { - inner: Arc, -} - -pub struct Taker { - inner: Arc, -} - -const STATE_IDLE: usize = 0; -const STATE_WANT: usize = 1; -const STATE_GIVE: usize = 2; -const STATE_CLOSED: usize = 3; - -struct Inner { - state: AtomicUsize, - task: Lock>, -} - -impl Giver { - pub fn poll_want(&mut self) -> Poll<(), ()> { - loop { - let state = self.inner.state.load(Ordering::SeqCst); - match state { - STATE_WANT => { - // only set to IDLE if it is still Want - self.inner.state.compare_and_swap( - STATE_WANT, - STATE_IDLE, - Ordering::SeqCst, - ); - return Ok(Async::Ready(())) - }, - STATE_GIVE => { - // we're already waiting, return - return Ok(Async::NotReady) - } - STATE_CLOSED => return Err(()), - // Taker doesn't want anything yet, so park. - _ => { - if let Some(mut locked) = self.inner.task.try_lock() { - - // While we have the lock, try to set to GIVE. - let old = self.inner.state.compare_and_swap( - STATE_IDLE, - STATE_GIVE, - Ordering::SeqCst, - ); - // If it's not still IDLE, something happened! - // Go around the loop again. - if old == STATE_IDLE { - *locked = Some(task::current()); - return Ok(Async::NotReady) - } - } else { - // if we couldn't take the lock, then a Taker has it. - // The *ONLY* reason is because it is in the process of notifying us - // of its want. - // - // We need to loop again to see what state it was changed to. - } - }, - } - } - } - - pub fn is_canceled(&self) -> bool { - self.inner.state.load(Ordering::SeqCst) == STATE_CLOSED - } -} - -impl Taker { - pub fn cancel(&self) { - self.signal(STATE_CLOSED) - } - - pub fn want(&self) { - self.signal(STATE_WANT) - } - - fn signal(&self, state: usize) { - let old_state = self.inner.state.swap(state, Ordering::SeqCst); - match old_state { - STATE_WANT | STATE_CLOSED | STATE_IDLE => (), - _ => { - loop { - if let Some(mut locked) = self.inner.task.try_lock() { - if let Some(task) = locked.take() { - task.notify(); - } - return; - } else { - // if we couldn't take the lock, then a Giver has it. - // The *ONLY* reason is because it is in the process of parking. - // - // We need to loop and take the lock so we can notify this task. - } - } - }, - } - } -} - -impl Drop for Taker { - fn drop(&mut self) { - self.cancel(); - } -} - - -// a sub module just to protect unsafety -mod lock { - use std::cell::UnsafeCell; - use std::ops::{Deref, DerefMut}; - use std::sync::atomic::{AtomicBool, Ordering}; - - pub struct Lock { - is_locked: AtomicBool, - value: UnsafeCell, - } - - impl Lock { - pub fn new(val: T) -> Lock { - Lock { - is_locked: AtomicBool::new(false), - value: UnsafeCell::new(val), - } - } - - pub fn try_lock(&self) -> Option> { - if !self.is_locked.swap(true, Ordering::SeqCst) { - Some(Locked { lock: self }) - } else { - None - } - } - } - - unsafe impl Send for Lock {} - unsafe impl Sync for Lock {} - - pub struct Locked<'a, T: 'a> { - lock: &'a Lock, - } - - impl<'a, T> Deref for Locked<'a, T> { - type Target = T; - fn deref(&self) -> &T { - unsafe { &*self.lock.value.get() } - } - } - - impl<'a, T> DerefMut for Locked<'a, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.lock.value.get() } - } - } - - impl<'a, T> Drop for Locked<'a, T> { - fn drop(&mut self) { - self.lock.is_locked.store(false, Ordering::SeqCst); - } - } -} diff --git a/src/lib.rs b/src/lib.rs index cf1702d1..9ef8176e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ extern crate tokio; extern crate tokio_executor; #[macro_use] extern crate tokio_io; extern crate tokio_service; +extern crate want; #[cfg(all(test, feature = "nightly"))] extern crate test;