refactor(client): replace signal mod with want crate
This commit is contained in:
		| @@ -35,6 +35,7 @@ tokio = "0.1.3" | |||||||
| tokio-executor = "0.1.0" | tokio-executor = "0.1.0" | ||||||
| tokio-service = "0.1" | tokio-service = "0.1" | ||||||
| tokio-io = "0.1" | tokio-io = "0.1" | ||||||
|  | want = "0.0.2" | ||||||
|  |  | ||||||
| [dev-dependencies] | [dev-dependencies] | ||||||
| num_cpus = "1.0" | num_cpus = "1.0" | ||||||
|   | |||||||
| @@ -1,8 +1,8 @@ | |||||||
| use futures::{Async, Poll, Stream}; | use futures::{Async, Poll, Stream}; | ||||||
| use futures::sync::{mpsc, oneshot}; | use futures::sync::{mpsc, oneshot}; | ||||||
|  | use want; | ||||||
|  |  | ||||||
| use common::Never; | use common::Never; | ||||||
| use super::signal; |  | ||||||
|  |  | ||||||
| //pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>; | //pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>; | ||||||
| pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>; | pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>; | ||||||
| @@ -10,7 +10,7 @@ pub type Promise<T> = oneshot::Receiver<Result<T, ::Error>>; | |||||||
|  |  | ||||||
| pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) { | pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) { | ||||||
|     let (tx, rx) = mpsc::channel(0); |     let (tx, rx) = mpsc::channel(0); | ||||||
|     let (giver, taker) = signal::new(); |     let (giver, taker) = want::new(); | ||||||
|     let tx = Sender { |     let tx = Sender { | ||||||
|         giver: giver, |         giver: giver, | ||||||
|         inner: tx, |         inner: tx, | ||||||
| @@ -27,7 +27,7 @@ pub struct Sender<T, U> { | |||||||
|     // when the queue is empty. This helps us know when a request and |     // when the queue is empty. This helps us know when a request and | ||||||
|     // response have been fully processed, and a connection is ready |     // response have been fully processed, and a connection is ready | ||||||
|     // for more. |     // for more. | ||||||
|     giver: signal::Giver, |     giver: want::Giver, | ||||||
|     //inner: mpsc::Sender<(T, Callback<T, U>)>, |     //inner: mpsc::Sender<(T, Callback<T, U>)>, | ||||||
|     inner: mpsc::Sender<Envelope<T, U>>, |     inner: mpsc::Sender<Envelope<T, U>>, | ||||||
| } | } | ||||||
| @@ -68,7 +68,7 @@ impl<T, U> Sender<T, U> { | |||||||
| pub struct Receiver<T, U> { | pub struct Receiver<T, U> { | ||||||
|     //inner: mpsc::Receiver<(T, Callback<T, U>)>, |     //inner: mpsc::Receiver<(T, Callback<T, U>)>, | ||||||
|     inner: mpsc::Receiver<Envelope<T, U>>, |     inner: mpsc::Receiver<Envelope<T, U>>, | ||||||
|     taker: signal::Taker, |     taker: want::Taker, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T, U> Stream for Receiver<T, U> { | impl<T, U> Stream for Receiver<T, U> { | ||||||
| @@ -229,7 +229,7 @@ mod tests { | |||||||
|     #[cfg(feature = "nightly")] |     #[cfg(feature = "nightly")] | ||||||
|     #[bench] |     #[bench] | ||||||
|     fn giver_queue_cancel(b: &mut test::Bencher) { |     fn giver_queue_cancel(b: &mut test::Bencher) { | ||||||
|         let (_tx, rx) = super::channel::<i32, ()>(); |         let (_tx, mut rx) = super::channel::<i32, ()>(); | ||||||
|  |  | ||||||
|         b.iter(move || { |         b.iter(move || { | ||||||
|             rx.taker.cancel(); |             rx.taker.cancel(); | ||||||
|   | |||||||
| @@ -26,11 +26,9 @@ use self::connect::Destination; | |||||||
|  |  | ||||||
| pub mod conn; | pub mod conn; | ||||||
| pub mod connect; | pub mod connect; | ||||||
| //TODO(easy): move cancel and dispatch into common instead |  | ||||||
| pub(crate) mod dispatch; | pub(crate) mod dispatch; | ||||||
| mod dns; | mod dns; | ||||||
| mod pool; | mod pool; | ||||||
| mod signal; |  | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
| mod tests; | mod tests; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,188 +0,0 @@ | |||||||
| use std::sync::Arc; |  | ||||||
| use std::sync::atomic::{AtomicUsize, Ordering}; |  | ||||||
|  |  | ||||||
| use futures::{Async, Poll}; |  | ||||||
| use futures::task::{self, Task}; |  | ||||||
|  |  | ||||||
| use self::lock::Lock; |  | ||||||
|  |  | ||||||
| pub fn new() -> (Giver, Taker) { |  | ||||||
|     let inner = Arc::new(Inner { |  | ||||||
|         state: AtomicUsize::new(STATE_IDLE), |  | ||||||
|         task: Lock::new(None), |  | ||||||
|     }); |  | ||||||
|     let inner2 = inner.clone(); |  | ||||||
|     ( |  | ||||||
|         Giver { |  | ||||||
|             inner: inner, |  | ||||||
|         }, |  | ||||||
|         Taker { |  | ||||||
|             inner: inner2, |  | ||||||
|         }, |  | ||||||
|     ) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| #[derive(Clone)] |  | ||||||
| pub struct Giver { |  | ||||||
|     inner: Arc<Inner>, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| pub struct Taker { |  | ||||||
|     inner: Arc<Inner>, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| const STATE_IDLE: usize = 0; |  | ||||||
| const STATE_WANT: usize = 1; |  | ||||||
| const STATE_GIVE: usize = 2; |  | ||||||
| const STATE_CLOSED: usize = 3; |  | ||||||
|  |  | ||||||
| struct Inner { |  | ||||||
|     state: AtomicUsize, |  | ||||||
|     task: Lock<Option<Task>>, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Giver { |  | ||||||
|     pub fn poll_want(&mut self) -> Poll<(), ()> { |  | ||||||
|         loop { |  | ||||||
|             let state = self.inner.state.load(Ordering::SeqCst); |  | ||||||
|             match state { |  | ||||||
|                 STATE_WANT => { |  | ||||||
|                     // only set to IDLE if it is still Want |  | ||||||
|                     self.inner.state.compare_and_swap( |  | ||||||
|                         STATE_WANT, |  | ||||||
|                         STATE_IDLE, |  | ||||||
|                         Ordering::SeqCst, |  | ||||||
|                     ); |  | ||||||
|                     return Ok(Async::Ready(())) |  | ||||||
|                 }, |  | ||||||
|                 STATE_GIVE => { |  | ||||||
|                     // we're already waiting, return |  | ||||||
|                     return Ok(Async::NotReady) |  | ||||||
|                 } |  | ||||||
|                 STATE_CLOSED => return Err(()), |  | ||||||
|                 // Taker doesn't want anything yet, so park. |  | ||||||
|                 _ => { |  | ||||||
|                     if let Some(mut locked) = self.inner.task.try_lock() { |  | ||||||
|  |  | ||||||
|                         // While we have the lock, try to set to GIVE. |  | ||||||
|                         let old = self.inner.state.compare_and_swap( |  | ||||||
|                             STATE_IDLE, |  | ||||||
|                             STATE_GIVE, |  | ||||||
|                             Ordering::SeqCst, |  | ||||||
|                         ); |  | ||||||
|                         // If it's not still IDLE, something happened! |  | ||||||
|                         // Go around the loop again. |  | ||||||
|                         if old == STATE_IDLE { |  | ||||||
|                             *locked = Some(task::current()); |  | ||||||
|                             return Ok(Async::NotReady) |  | ||||||
|                         } |  | ||||||
|                     } else { |  | ||||||
|                         // if we couldn't take the lock, then a Taker has it. |  | ||||||
|                         // The *ONLY* reason is because it is in the process of notifying us |  | ||||||
|                         // of its want. |  | ||||||
|                         // |  | ||||||
|                         // We need to loop again to see what state it was changed to. |  | ||||||
|                     } |  | ||||||
|                 }, |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn is_canceled(&self) -> bool { |  | ||||||
|         self.inner.state.load(Ordering::SeqCst) == STATE_CLOSED |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Taker { |  | ||||||
|     pub fn cancel(&self) { |  | ||||||
|         self.signal(STATE_CLOSED) |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn want(&self) { |  | ||||||
|         self.signal(STATE_WANT) |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     fn signal(&self, state: usize) { |  | ||||||
|         let old_state = self.inner.state.swap(state, Ordering::SeqCst); |  | ||||||
|         match old_state { |  | ||||||
|             STATE_WANT | STATE_CLOSED | STATE_IDLE => (), |  | ||||||
|             _ => { |  | ||||||
|                 loop { |  | ||||||
|                     if let Some(mut locked) = self.inner.task.try_lock() { |  | ||||||
|                         if let Some(task) = locked.take() { |  | ||||||
|                             task.notify(); |  | ||||||
|                         } |  | ||||||
|                         return; |  | ||||||
|                     } else { |  | ||||||
|                         // if we couldn't take the lock, then a Giver has it. |  | ||||||
|                         // The *ONLY* reason is because it is in the process of parking. |  | ||||||
|                         // |  | ||||||
|                         // We need to loop and take the lock so we can notify this task. |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|             }, |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl Drop for Taker { |  | ||||||
|     fn drop(&mut self) { |  | ||||||
|         self.cancel(); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  |  | ||||||
| // a sub module just to protect unsafety |  | ||||||
| mod lock { |  | ||||||
|     use std::cell::UnsafeCell; |  | ||||||
|     use std::ops::{Deref, DerefMut}; |  | ||||||
|     use std::sync::atomic::{AtomicBool, Ordering}; |  | ||||||
|  |  | ||||||
|     pub struct Lock<T> { |  | ||||||
|         is_locked: AtomicBool, |  | ||||||
|         value: UnsafeCell<T>, |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     impl<T> Lock<T> { |  | ||||||
|         pub fn new(val: T) -> Lock<T> { |  | ||||||
|             Lock { |  | ||||||
|                 is_locked: AtomicBool::new(false), |  | ||||||
|                 value: UnsafeCell::new(val), |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         pub fn try_lock(&self) -> Option<Locked<T>> { |  | ||||||
|             if !self.is_locked.swap(true, Ordering::SeqCst) { |  | ||||||
|                 Some(Locked { lock: self }) |  | ||||||
|             } else { |  | ||||||
|                 None |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     unsafe impl<T: Send> Send for Lock<T> {} |  | ||||||
|     unsafe impl<T: Send> Sync for Lock<T> {} |  | ||||||
|  |  | ||||||
|     pub struct Locked<'a, T: 'a> { |  | ||||||
|         lock: &'a Lock<T>, |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     impl<'a, T> Deref for Locked<'a, T> { |  | ||||||
|         type Target = T; |  | ||||||
|         fn deref(&self) -> &T { |  | ||||||
|             unsafe { &*self.lock.value.get() } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     impl<'a, T> DerefMut for Locked<'a, T> { |  | ||||||
|         fn deref_mut(&mut self) -> &mut T { |  | ||||||
|             unsafe { &mut *self.lock.value.get() } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     impl<'a, T> Drop for Locked<'a, T> { |  | ||||||
|         fn drop(&mut self) { |  | ||||||
|             self.lock.is_locked.store(false, Ordering::SeqCst); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| @@ -30,6 +30,7 @@ extern crate tokio; | |||||||
| extern crate tokio_executor; | extern crate tokio_executor; | ||||||
| #[macro_use] extern crate tokio_io; | #[macro_use] extern crate tokio_io; | ||||||
| extern crate tokio_service; | extern crate tokio_service; | ||||||
|  | extern crate want; | ||||||
|  |  | ||||||
| #[cfg(all(test, feature = "nightly"))] | #[cfg(all(test, feature = "nightly"))] | ||||||
| extern crate test; | extern crate test; | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user