diff --git a/Cargo.toml b/Cargo.toml index 5b5a21c3..f6c4a945 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,16 +48,16 @@ tokio-timer = { git = "https://github.com/tokio-rs/tokio", optional = true } want = { git = "https://github.com/seanmonstar/want", branch = "std-future" } [dev-dependencies] -#futures-timer = "0.1" num_cpus = "1.0" pretty_env_logger = "0.3" spmc = "0.2" -url = "1.0" -tokio-fs = { git = "https://github.com/tokio-rs/tokio" } -#tokio-mockstream = "1.1.0" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +tokio-fs = { git = "https://github.com/tokio-rs/tokio" } +tokio-test = { git = "https://github.com/tokio-rs/tokio" } +url = "1.0" + [features] default = [ diff --git a/src/mock.rs b/src/mock.rs index 20686bab..b785cdb3 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -19,300 +19,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "runtime")] use crate::client::connect::{Connect, Connected, Destination}; -#[derive(Debug)] -pub struct MockCursor { - vec: Vec, - pos: usize, -} -impl MockCursor { - pub fn wrap(vec: Vec) -> MockCursor { - MockCursor { - vec: vec, - pos: 0, - } - } -} - -impl ::std::ops::Deref for MockCursor { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - &self.vec - } -} - -impl AsRef<[u8]> for MockCursor { - fn as_ref(&self) -> &[u8] { - &self.vec - } -} - -impl> PartialEq for MockCursor { - fn eq(&self, other: &S) -> bool { - self.vec == other.as_ref() - } -} - -impl Write for MockCursor { - fn write(&mut self, data: &[u8]) -> io::Result { - trace!("MockCursor::write; len={}", data.len()); - self.vec.extend(data); - Ok(data.len()) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -impl Read for MockCursor { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - (&self.vec[self.pos..]).read(buf).map(|n| { - trace!("MockCursor::read; len={}", n); - self.pos += n; - if self.pos == self.vec.len() { - trace!("MockCursor::read to end, clearing"); - self.pos = 0; - self.vec.clear(); - } - n - }) - } -} - -const READ_VECS_CNT: usize = 64; - -#[derive(Debug)] -pub struct AsyncIo { - blocked: bool, - bytes_until_block: usize, - error: Option, - flushed: bool, - inner: T, - max_read_vecs: usize, - num_writes: usize, - panic: bool, - park_tasks: bool, - task: Option, -} - -impl AsyncIo { - pub fn new(inner: T, bytes: usize) -> AsyncIo { - AsyncIo { - blocked: false, - bytes_until_block: bytes, - error: None, - flushed: false, - inner: inner, - max_read_vecs: READ_VECS_CNT, - num_writes: 0, - panic: false, - park_tasks: false, - task: None, - } - } - - pub fn block_in(&mut self, bytes: usize) { - self.bytes_until_block = bytes; - - if let Some(task) = self.task.take() { - task.notify(); - } - } - - pub fn error(&mut self, err: io::Error) { - self.error = Some(err); - } - - #[cfg(feature = "nightly")] - pub fn panic(&mut self) { - self.panic = true; - } - - pub fn max_read_vecs(&mut self, cnt: usize) { - assert!(cnt <= READ_VECS_CNT); - self.max_read_vecs = cnt; - } - - #[cfg(feature = "runtime")] - pub fn park_tasks(&mut self, enabled: bool) { - self.park_tasks = enabled; - } - - /* - pub fn flushed(&self) -> bool { - self.flushed - } - */ - - pub fn blocked(&self) -> bool { - self.blocked - } - - pub fn num_writes(&self) -> usize { - self.num_writes - } - - fn would_block(&mut self) -> io::Error { - self.blocked = true; - if self.park_tasks { - self.task = Some(task::current()); - } - io::ErrorKind::WouldBlock.into() - } - -} - -impl AsyncIo { - pub fn new_buf>>(buf: T, bytes: usize) -> AsyncIo { - AsyncIo::new(MockCursor::wrap(buf.into()), bytes) - } - - /* - pub fn new_eof() -> AsyncIo { - AsyncIo::new(Buf::wrap(Vec::new().into()), 1) - } - */ - - #[cfg(feature = "runtime")] - fn close(&mut self) { - self.block_in(1); - assert_eq!( - self.inner.vec.len(), - self.inner.pos, - "AsyncIo::close(), but cursor not consumed", - ); - self.inner.vec.truncate(0); - self.inner.pos = 0; - } -} - -impl AsyncIo { - fn write_no_vecs(&mut self, buf: &mut B) -> Poll { - if !buf.has_remaining() { - return Ok(Async::Ready(0)); - } - - let n = try_nb!(self.write(buf.bytes())); - buf.advance(n); - Ok(Async::Ready(n)) - } -} - -impl, T: AsRef<[u8]>> PartialEq for AsyncIo { - fn eq(&self, other: &S) -> bool { - self.inner.as_ref() == other.as_ref() - } -} - - -impl Read for AsyncIo { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - assert!(!self.panic, "AsyncIo::read panic"); - self.blocked = false; - if let Some(err) = self.error.take() { - Err(err) - } else if self.bytes_until_block == 0 { - Err(self.would_block()) - } else { - let n = cmp::min(self.bytes_until_block, buf.len()); - let n = self.inner.read(&mut buf[..n])?; - self.bytes_until_block -= n; - Ok(n) - } - } -} - -impl Write for AsyncIo { - fn write(&mut self, data: &[u8]) -> io::Result { - assert!(!self.panic, "AsyncIo::write panic"); - self.num_writes += 1; - if let Some(err) = self.error.take() { - trace!("AsyncIo::write error"); - Err(err) - } else if self.bytes_until_block == 0 { - trace!("AsyncIo::write would block"); - Err(self.would_block()) - } else { - trace!("AsyncIo::write; {} bytes", data.len()); - self.flushed = false; - let n = cmp::min(self.bytes_until_block, data.len()); - let n = self.inner.write(&data[..n])?; - self.bytes_until_block -= n; - Ok(n) - } - } - - fn flush(&mut self) -> io::Result<()> { - self.flushed = true; - self.inner.flush() - } -} - -impl AsyncRead for AsyncIo { -} - -impl AsyncWrite for AsyncIo { - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(().into()) - } - - fn write_buf(&mut self, buf: &mut B) -> Poll { - assert!(!self.panic, "AsyncIo::write_buf panic"); - if self.max_read_vecs == 0 { - return self.write_no_vecs(buf); - } - let r = { - static DUMMY: &[u8] = &[0]; - let mut bufs = [From::from(DUMMY); READ_VECS_CNT]; - let i = Buf::bytes_vec(&buf, &mut bufs[..self.max_read_vecs]); - let mut n = 0; - let mut ret = Ok(0); - // each call to write() will increase our count, but we assume - // that if iovecs are used, its really only 1 write call. - let num_writes = self.num_writes; - for iovec in &bufs[..i] { - match self.write(iovec) { - Ok(num) => { - n += num; - ret = Ok(n); - }, - Err(e) => { - if e.kind() == io::ErrorKind::WouldBlock { - if let Ok(0) = ret { - ret = Err(e); - } - } else { - ret = Err(e); - } - break; - } - } - } - self.num_writes = num_writes + 1; - ret - }; - match r { - Ok(n) => { - Buf::advance(buf, n); - Ok(Async::Ready(n)) - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - Ok(Async::NotReady) - } - Err(e) => Err(e), - } - } -} - -impl ::std::ops::Deref for AsyncIo { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - &self.inner - } -} #[cfg(feature = "runtime")] pub struct Duplex { diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 18d3efb5..79e1b0ee 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -242,6 +242,11 @@ where } Pin::new(&mut self.io).poll_flush(cx) } + + #[cfg(test)] + fn flush<'a>(&'a mut self) -> impl std::future::Future> + 'a { + futures_util::future::poll_fn(move |cx| self.poll_flush(cx)) + } } // The `B` is a `Buf`, we never project a pin to it @@ -252,7 +257,7 @@ pub trait MemRead { fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll>; } -impl MemRead for Buffered +impl MemRead for Buffered where T: AsyncRead + AsyncWrite + Unpin, B: Buf, @@ -650,17 +655,15 @@ impl Buf for BufDeque { #[cfg(test)] mod tests { - // FIXME: re-implement tests with `async/await`, this import should - // trigger a warning to remind us - use crate::Error; - /* use super::*; - use std::io::Read; - use crate::mock::AsyncIo; + use std::time::Duration; + + use tokio_test::io::Builder as Mock; #[cfg(feature = "nightly")] use test::Bencher; + /* impl MemRead for AsyncIo { fn read_mem(&mut self, len: usize) -> Poll { let mut v = vec![0; len]; @@ -668,33 +671,51 @@ mod tests { Ok(Async::Ready(BytesMut::from(&v[..n]).freeze())) } } + */ - #[test] - fn iobuf_write_empty_slice() { - let mut mock = AsyncIo::new_buf(vec![], 256); - mock.error(io::Error::new(io::ErrorKind::Other, "logic error")); - - let mut io_buf = Buffered::<_, Cursor>>::new(mock); + #[tokio::test] + async fn iobuf_write_empty_slice() { + // First, let's just check that the Mock would normally return an + // error on an unexpected write, even if the buffer is empty... + let mut mock = Mock::new().build(); + futures_util::future::poll_fn(|cx| Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[]))) + .await + .expect_err("should be a broken pipe"); // underlying io will return the logic error upon write, // so we are testing that the io_buf does not trigger a write // when there is nothing to flush - io_buf.flush().expect("should short-circuit flush"); + let mock = Mock::new().build(); + let mut io_buf = Buffered::<_, Cursor>>::new(mock); + io_buf.flush().await.expect("should short-circuit flush"); } - #[test] - fn parse_reads_until_blocked() { - // missing last line ending - let raw = "HTTP/1.1 200 OK\r\n"; + #[tokio::test] + async fn parse_reads_until_blocked() { + use crate::proto::h1::ClientTransaction; + + let mock = Mock::new() + // Split over multiple reads will read all of it + .read(b"HTTP/1.1 200 OK\r\n") + .read(b"Server: hyper\r\n") + // missing last line ending + .wait(Duration::from_secs(1)) + .build(); - let mock = AsyncIo::new_buf(raw, raw.len()); let mut buffered = Buffered::<_, Cursor>>::new(mock); - let ctx = ParseContext { - cached_headers: &mut None, - req_method: &mut None, - }; - assert!(buffered.parse::(ctx).unwrap().is_not_ready()); - assert!(buffered.io.blocked()); + + // We expect a `parse` to be not ready, and so can't await it directly. + // Rather, this `poll_fn` will wrap the `Poll` result. + futures_util::future::poll_fn(|cx| { + let parse_ctx = ParseContext { + cached_headers: &mut None, + req_method: &mut None, + }; + assert!(buffered.parse::(cx, parse_ctx).is_pending()); + Poll::Ready(()) + }).await; + + assert_eq!(buffered.read_buf, b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]); } #[test] @@ -791,12 +812,14 @@ mod tests { #[test] #[should_panic] fn write_buf_requires_non_empty_bufs() { - let mock = AsyncIo::new_buf(vec![], 1024); + let mock = Mock::new().build(); let mut buffered = Buffered::<_, Cursor>>::new(mock); buffered.buffer(Cursor::new(Vec::new())); } + /* + TODO: needs tokio_test::io to allow configure write_buf calls #[test] fn write_buf_queue() { extern crate pretty_env_logger; @@ -817,13 +840,18 @@ mod tests { assert_eq!(buffered.io.num_writes(), 1); assert_eq!(buffered.write_buf.queue.bufs.len(), 0); } + */ - #[test] - fn write_buf_flatten() { + #[tokio::test] + async fn write_buf_flatten() { extern crate pretty_env_logger; let _ = pretty_env_logger::try_init(); - let mock = AsyncIo::new_buf(vec![], 1024); + let mock = Mock::new() + // Just a single write + .write(b"hello world, it's hyper!") + .build(); + let mut buffered = Buffered::<_, Cursor>>::new(mock); buffered.write_buf.set_strategy(WriteStrategy::Flatten); @@ -833,19 +861,21 @@ mod tests { buffered.buffer(Cursor::new(b"hyper!".to_vec())); assert_eq!(buffered.write_buf.queue.bufs.len(), 0); - buffered.flush().unwrap(); - - assert_eq!(buffered.io, b"hello world, it's hyper!"); - assert_eq!(buffered.io.num_writes(), 1); + buffered.flush().await.expect("flush"); } - #[test] - fn write_buf_auto_flatten() { + #[tokio::test] + async fn write_buf_auto_flatten() { extern crate pretty_env_logger; let _ = pretty_env_logger::try_init(); - let mut mock = AsyncIo::new_buf(vec![], 1024); - mock.max_read_vecs(0); // disable vectored IO + let mock = Mock::new() + // Expects write_buf to only consume first buffer + .write(b"hello ") + // And then the Auto strategy will have flattened + .write(b"world, it's hyper!") + .build(); + let mut buffered = Buffered::<_, Cursor>>::new(mock); // we have 4 buffers, but hope to detect that vectored IO isn't @@ -856,20 +886,24 @@ mod tests { buffered.buffer(Cursor::new(b"it's ".to_vec())); buffered.buffer(Cursor::new(b"hyper!".to_vec())); assert_eq!(buffered.write_buf.queue.bufs.len(), 3); - buffered.flush().unwrap(); - assert_eq!(buffered.io, b"hello world, it's hyper!"); - assert_eq!(buffered.io.num_writes(), 2); + buffered.flush().await.expect("flush"); + assert_eq!(buffered.write_buf.queue.bufs.len(), 0); } - #[test] - fn write_buf_queue_disable_auto() { + #[tokio::test] + async fn write_buf_queue_disable_auto() { extern crate pretty_env_logger; let _ = pretty_env_logger::try_init(); - let mut mock = AsyncIo::new_buf(vec![], 1024); - mock.max_read_vecs(0); // disable vectored IO + let mock = Mock::new() + .write(b"hello ") + .write(b"world, ") + .write(b"it's ") + .write(b"hyper!") + .build(); + let mut buffered = Buffered::<_, Cursor>>::new(mock); buffered.write_buf.set_strategy(WriteStrategy::Queue); @@ -881,10 +915,9 @@ mod tests { buffered.buffer(Cursor::new(b"it's ".to_vec())); buffered.buffer(Cursor::new(b"hyper!".to_vec())); assert_eq!(buffered.write_buf.queue.bufs.len(), 3); - buffered.flush().unwrap(); - assert_eq!(buffered.io, b"hello world, it's hyper!"); - assert_eq!(buffered.io.num_writes(), 4); + buffered.flush().await.expect("flush"); + assert_eq!(buffered.write_buf.queue.bufs.len(), 0); } @@ -903,5 +936,4 @@ mod tests { write_buf.headers.bytes.clear(); }) } - */ }