use std::cmp; use std::io::{self, Read, Write}; use futures::Poll; use tokio_io::{AsyncRead, AsyncWrite}; #[derive(Debug)] pub struct Buf { vec: Vec, pos: usize, } impl Buf { pub fn new() -> Buf { Buf::wrap(vec![]) } pub fn wrap(vec: Vec) -> Buf { Buf { vec: vec, pos: 0, } } } impl ::std::ops::Deref for Buf { type Target = [u8]; fn deref(&self) -> &[u8] { &self.vec } } impl> PartialEq for Buf { fn eq(&self, other: &S) -> bool { self.vec == other.as_ref() } } impl Write for Buf { fn write(&mut self, data: &[u8]) -> io::Result { self.vec.extend(data); Ok(data.len()) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl Read for Buf { fn read(&mut self, buf: &mut [u8]) -> io::Result { (&self.vec[self.pos..]).read(buf).map(|n| { self.pos += n; n }) } } #[derive(Debug)] pub struct AsyncIo { inner: T, bytes_until_block: usize, error: Option, blocked: bool, flushed: bool, } impl AsyncIo { pub fn new(inner: T, bytes: usize) -> AsyncIo { AsyncIo { inner: inner, bytes_until_block: bytes, error: None, flushed: false, blocked: false, } } pub fn block_in(&mut self, bytes: usize) { self.bytes_until_block = bytes; } pub fn error(&mut self, err: io::Error) { self.error = Some(err); } } impl AsyncIo { pub fn new_buf>>(buf: T, bytes: usize) -> AsyncIo { AsyncIo::new(Buf::wrap(buf.into()), bytes) } pub fn flushed(&self) -> bool { self.flushed } pub fn blocked(&self) -> bool { self.blocked } } impl Read for AsyncIo { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.blocked = false; if let Some(err) = self.error.take() { Err(err) } else if self.bytes_until_block == 0 { self.blocked = true; Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block")) } else { let n = cmp::min(self.bytes_until_block, buf.len()); let n = try!(self.inner.read(&mut buf[..n])); self.bytes_until_block -= n; Ok(n) } } } impl Write for AsyncIo { fn write(&mut self, data: &[u8]) -> io::Result { if let Some(err) = self.error.take() { Err(err) } else if self.bytes_until_block == 0 { Err(io::Error::new(io::ErrorKind::WouldBlock, "mock block")) } else { trace!("AsyncIo::write() block_in = {}, data.len() = {}", self.bytes_until_block, data.len()); self.flushed = false; let n = cmp::min(self.bytes_until_block, data.len()); let n = try!(self.inner.write(&data[..n])); self.bytes_until_block -= n; Ok(n) } } fn flush(&mut self) -> io::Result<()> { self.flushed = true; self.inner.flush() } } impl AsyncRead for AsyncIo { } impl AsyncWrite for AsyncIo { fn shutdown(&mut self) -> Poll<(), io::Error> { Ok(().into()) } } impl ::std::ops::Deref for AsyncIo { type Target = [u8]; fn deref(&self) -> &[u8] { &self.inner } }