use std::cmp; use std::io::{self, Read, Write}; use futures::Async; use tokio::io::Io; #[derive(Debug)] pub struct Buf { vec: Vec, pos: usize, } impl Buf { pub fn new() -> Buf { Buf::wrap(vec![]) } pub fn wrap(vec: Vec) -> Buf { Buf { vec: vec, pos: 0, } } } impl ::std::ops::Deref for Buf { type Target = [u8]; fn deref(&self) -> &[u8] { &self.vec } } impl> PartialEq for Buf { fn eq(&self, other: &S) -> bool { self.vec == other.as_ref() } } impl Write for Buf { fn write(&mut self, data: &[u8]) -> io::Result { self.vec.extend(data); Ok(data.len()) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl Read for Buf { fn read(&mut self, buf: &mut [u8]) -> io::Result { (&self.vec[self.pos..]).read(buf).map(|n| { self.pos += n; n }) } } #[derive(Debug)] pub struct AsyncIo { inner: T, bytes_until_block: usize, error: Option, } impl AsyncIo { pub fn new(inner: T, bytes: usize) -> AsyncIo { AsyncIo { inner: inner, bytes_until_block: bytes, error: None, } } pub fn block_in(&mut self, bytes: usize) { self.bytes_until_block = bytes; } pub fn error(&mut self, err: io::Error) { self.error = Some(err); } } impl AsyncIo { pub fn new_buf>>(buf: T, bytes: usize) -> AsyncIo { AsyncIo::new(Buf::wrap(buf.into()), bytes) } } impl Read for AsyncIo { fn read(&mut self, buf: &mut [u8]) -> io::Result { if let Some(err) = self.error.take() { Err(err) } else if self.bytes_until_block == 0 { Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block")) } else { let n = cmp::min(self.bytes_until_block, buf.len()); let n = try!(self.inner.read(&mut buf[..n])); self.bytes_until_block -= n; Ok(n) } } } impl Write for AsyncIo { fn write(&mut self, data: &[u8]) -> io::Result { if let Some(err) = self.error.take() { Err(err) } else if self.bytes_until_block == 0 { Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block")) } else { trace!("AsyncIo::write() block_in = {}, data.len() = {}", self.bytes_until_block, data.len()); let n = cmp::min(self.bytes_until_block, data.len()); let n = try!(self.inner.write(&data[..n])); self.bytes_until_block -= n; Ok(n) } } fn flush(&mut self) -> io::Result<()> { self.inner.flush() } } impl Io for AsyncIo { fn poll_read(&mut self) -> Async<()> { if self.bytes_until_block == 0 { Async::NotReady } else { Async::Ready(()) } } fn poll_write(&mut self) -> Async<()> { if self.bytes_until_block == 0 { Async::NotReady } else { Async::Ready(()) } } } impl ::std::ops::Deref for AsyncIo { type Target = [u8]; fn deref(&self) -> &[u8] { &self.inner } }