236 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			236 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| // FIXME: re-implement tests with `async/await`
 | |
| /*
 | |
| #[cfg(feature = "runtime")]
 | |
| use std::collections::HashMap;
 | |
| use std::cmp;
 | |
| use std::io::{self, Read, Write};
 | |
| #[cfg(feature = "runtime")]
 | |
| use std::sync::{Arc, Mutex};
 | |
| 
 | |
| use bytes::Buf;
 | |
| use futures::{Async, Poll};
 | |
| #[cfg(feature = "runtime")]
 | |
| use futures::Future;
 | |
| use futures::task::{self, Task};
 | |
| use tokio_io::{AsyncRead, AsyncWrite};
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| use crate::client::connect::{Connect, Connected, Destination};
 | |
| 
 | |
| 
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| pub struct Duplex {
 | |
|     inner: Arc<Mutex<DuplexInner>>,
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| struct DuplexInner {
 | |
|     handle_read_task: Option<Task>,
 | |
|     read: AsyncIo<MockCursor>,
 | |
|     write: AsyncIo<MockCursor>,
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| impl Duplex {
 | |
|     pub(crate) fn channel() -> (Duplex, DuplexHandle) {
 | |
|         let mut inner = DuplexInner {
 | |
|             handle_read_task: None,
 | |
|             read: AsyncIo::new_buf(Vec::new(), 0),
 | |
|             write: AsyncIo::new_buf(Vec::new(), std::usize::MAX),
 | |
|         };
 | |
| 
 | |
|         inner.read.park_tasks(true);
 | |
|         inner.write.park_tasks(true);
 | |
| 
 | |
|         let inner = Arc::new(Mutex::new(inner));
 | |
| 
 | |
|         let duplex = Duplex {
 | |
|             inner: inner.clone(),
 | |
|         };
 | |
|         let handle = DuplexHandle {
 | |
|             inner: inner,
 | |
|         };
 | |
| 
 | |
|         (duplex, handle)
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| impl Read for Duplex {
 | |
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
 | |
|         self.inner.lock().unwrap().read.read(buf)
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| impl Write for Duplex {
 | |
|     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
 | |
|         let mut inner = self.inner.lock().unwrap();
 | |
|         let ret = inner.write.write(buf);
 | |
|         if let Some(task) = inner.handle_read_task.take() {
 | |
|             trace!("waking DuplexHandle read");
 | |
|             task.notify();
 | |
|         }
 | |
|         ret
 | |
|     }
 | |
| 
 | |
|     fn flush(&mut self) -> io::Result<()> {
 | |
|         self.inner.lock().unwrap().write.flush()
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| impl AsyncRead for Duplex {
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| impl AsyncWrite for Duplex {
 | |
|     fn shutdown(&mut self) -> Poll<(), io::Error> {
 | |
|         Ok(().into())
 | |
|     }
 | |
| 
 | |
|     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
 | |
|         let mut inner = self.inner.lock().unwrap();
 | |
|         if let Some(task) = inner.handle_read_task.take() {
 | |
|             task.notify();
 | |
|         }
 | |
|         inner.write.write_buf(buf)
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| pub struct DuplexHandle {
 | |
|     inner: Arc<Mutex<DuplexInner>>,
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| impl DuplexHandle {
 | |
|     pub fn read(&self, buf: &mut [u8]) -> Poll<usize, io::Error> {
 | |
|         let mut inner = self.inner.lock().unwrap();
 | |
|         assert!(buf.len() >= inner.write.inner.len());
 | |
|         if inner.write.inner.is_empty() {
 | |
|             trace!("DuplexHandle read parking");
 | |
|             inner.handle_read_task = Some(task::current());
 | |
|             return Ok(Async::NotReady);
 | |
|         }
 | |
|         inner.write.read(buf).map(Async::Ready)
 | |
|     }
 | |
| 
 | |
|     pub fn write(&self, bytes: &[u8]) -> Poll<usize, io::Error> {
 | |
|         let mut inner = self.inner.lock().unwrap();
 | |
|         assert_eq!(inner.read.inner.pos, 0);
 | |
|         assert_eq!(inner.read.inner.vec.len(), 0, "write but read isn't empty");
 | |
|         inner
 | |
|             .read
 | |
|             .inner
 | |
|             .vec
 | |
|             .extend(bytes);
 | |
|         inner.read.block_in(bytes.len());
 | |
|         Ok(Async::Ready(bytes.len()))
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| impl Drop for DuplexHandle {
 | |
|     fn drop(&mut self) {
 | |
|         trace!("mock duplex handle drop");
 | |
|         if !::std::thread::panicking() {
 | |
|             let mut inner = self.inner.lock().unwrap();
 | |
|             inner.read.close();
 | |
|             inner.write.close();
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| type BoxedConnectFut = Box<dyn Future<Item=(Duplex, Connected), Error=io::Error> + Send>;
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| #[derive(Clone)]
 | |
| pub struct MockConnector {
 | |
|     mocks: Arc<Mutex<MockedConnections>>,
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| struct MockedConnections(HashMap<String, Vec<BoxedConnectFut>>);
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| impl MockConnector {
 | |
|     pub fn new() -> MockConnector {
 | |
|         MockConnector {
 | |
|             mocks: Arc::new(Mutex::new(MockedConnections(HashMap::new()))),
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     pub fn mock(&mut self, key: &str) -> DuplexHandle {
 | |
|         use futures::future;
 | |
|         self.mock_fut(key, future::ok::<_, ()>(()))
 | |
|     }
 | |
| 
 | |
|     pub fn mock_fut<F>(&mut self, key: &str, fut: F) -> DuplexHandle
 | |
|     where
 | |
|         F: Future + Send + 'static,
 | |
|     {
 | |
|         self.mock_opts(key, Connected::new(), fut)
 | |
|     }
 | |
| 
 | |
|     pub fn mock_opts<F>(&mut self, key: &str, connected: Connected, fut: F) -> DuplexHandle
 | |
|     where
 | |
|         F: Future + Send + 'static,
 | |
|     {
 | |
|         let key = key.to_owned();
 | |
| 
 | |
|         let (duplex, handle) = Duplex::channel();
 | |
| 
 | |
|         let fut = Box::new(fut.then(move |_| {
 | |
|             trace!("MockConnector mocked fut ready");
 | |
|             Ok((duplex, connected))
 | |
|         }));
 | |
|         self.mocks.lock().unwrap().0.entry(key)
 | |
|             .or_insert(Vec::new())
 | |
|             .push(fut);
 | |
| 
 | |
|         handle
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| impl Connect for MockConnector {
 | |
|     type Transport = Duplex;
 | |
|     type Error = io::Error;
 | |
|     type Future = BoxedConnectFut;
 | |
| 
 | |
|     fn connect(&self, dst: Destination) -> Self::Future {
 | |
|         trace!("mock connect: {:?}", dst);
 | |
|         let key = format!("{}://{}{}", dst.scheme(), dst.host(), if let Some(port) = dst.port() {
 | |
|             format!(":{}", port)
 | |
|         } else {
 | |
|             "".to_owned()
 | |
|         });
 | |
|         let mut mocks = self.mocks.lock().unwrap();
 | |
|         let mocks = mocks.0.get_mut(&key)
 | |
|             .expect(&format!("unknown mocks uri: {}", key));
 | |
|         assert!(!mocks.is_empty(), "no additional mocks for {}", key);
 | |
|         mocks.remove(0)
 | |
|     }
 | |
| }
 | |
| 
 | |
| 
 | |
| #[cfg(feature = "runtime")]
 | |
| impl Drop for MockedConnections {
 | |
|     fn drop(&mut self) {
 | |
|         if !::std::thread::panicking() {
 | |
|             for (key, mocks) in self.0.iter() {
 | |
|                 assert_eq!(
 | |
|                     mocks.len(),
 | |
|                     0,
 | |
|                     "not all mocked connects for {:?} were used",
 | |
|                     key,
 | |
|                 );
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| }
 | |
| */
 |