test(h1): re-enable the proto::h1::io tests
This commit is contained in:
		
							
								
								
									
										293
									
								
								src/mock.rs
									
									
									
									
									
								
							
							
						
						
									
										293
									
								
								src/mock.rs
									
									
									
									
									
								
							| @@ -19,300 +19,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| #[cfg(feature = "runtime")] | ||||
| use crate::client::connect::{Connect, Connected, Destination}; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct MockCursor { | ||||
|     vec: Vec<u8>, | ||||
|     pos: usize, | ||||
| } | ||||
|  | ||||
| impl MockCursor { | ||||
|     pub fn wrap(vec: Vec<u8>) -> MockCursor { | ||||
|         MockCursor { | ||||
|             vec: vec, | ||||
|             pos: 0, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl ::std::ops::Deref for MockCursor { | ||||
|     type Target = [u8]; | ||||
|  | ||||
|     fn deref(&self) -> &[u8] { | ||||
|         &self.vec | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl AsRef<[u8]> for MockCursor { | ||||
|     fn as_ref(&self) -> &[u8] { | ||||
|         &self.vec | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<S: AsRef<[u8]>> PartialEq<S> for MockCursor { | ||||
|     fn eq(&self, other: &S) -> bool { | ||||
|         self.vec == other.as_ref() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Write for MockCursor { | ||||
|     fn write(&mut self, data: &[u8]) -> io::Result<usize> { | ||||
|         trace!("MockCursor::write; len={}", data.len()); | ||||
|         self.vec.extend(data); | ||||
|         Ok(data.len()) | ||||
|     } | ||||
|  | ||||
|     fn flush(&mut self) -> io::Result<()> { | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Read for MockCursor { | ||||
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||||
|         (&self.vec[self.pos..]).read(buf).map(|n| { | ||||
|             trace!("MockCursor::read; len={}", n); | ||||
|             self.pos += n; | ||||
|             if self.pos == self.vec.len() { | ||||
|                 trace!("MockCursor::read to end, clearing"); | ||||
|                 self.pos = 0; | ||||
|                 self.vec.clear(); | ||||
|             } | ||||
|             n | ||||
|         }) | ||||
|     } | ||||
| } | ||||
|  | ||||
| const READ_VECS_CNT: usize = 64; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct AsyncIo<T> { | ||||
|     blocked: bool, | ||||
|     bytes_until_block: usize, | ||||
|     error: Option<io::Error>, | ||||
|     flushed: bool, | ||||
|     inner: T, | ||||
|     max_read_vecs: usize, | ||||
|     num_writes: usize, | ||||
|     panic: bool, | ||||
|     park_tasks: bool, | ||||
|     task: Option<Task>, | ||||
| } | ||||
|  | ||||
| impl<T> AsyncIo<T> { | ||||
|     pub fn new(inner: T, bytes: usize) -> AsyncIo<T> { | ||||
|         AsyncIo { | ||||
|             blocked: false, | ||||
|             bytes_until_block: bytes, | ||||
|             error: None, | ||||
|             flushed: false, | ||||
|             inner: inner, | ||||
|             max_read_vecs: READ_VECS_CNT, | ||||
|             num_writes: 0, | ||||
|             panic: false, | ||||
|             park_tasks: false, | ||||
|             task: None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn block_in(&mut self, bytes: usize) { | ||||
|         self.bytes_until_block = bytes; | ||||
|  | ||||
|         if let Some(task) = self.task.take() { | ||||
|             task.notify(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn error(&mut self, err: io::Error) { | ||||
|         self.error = Some(err); | ||||
|     } | ||||
|  | ||||
|     #[cfg(feature = "nightly")] | ||||
|     pub fn panic(&mut self) { | ||||
|         self.panic = true; | ||||
|     } | ||||
|  | ||||
|     pub fn max_read_vecs(&mut self, cnt: usize) { | ||||
|         assert!(cnt <= READ_VECS_CNT); | ||||
|         self.max_read_vecs = cnt; | ||||
|     } | ||||
|  | ||||
|     #[cfg(feature = "runtime")] | ||||
|     pub fn park_tasks(&mut self, enabled: bool) { | ||||
|         self.park_tasks = enabled; | ||||
|     } | ||||
|  | ||||
|     /* | ||||
|     pub fn flushed(&self) -> bool { | ||||
|         self.flushed | ||||
|     } | ||||
|     */ | ||||
|  | ||||
|     pub fn blocked(&self) -> bool { | ||||
|         self.blocked | ||||
|     } | ||||
|  | ||||
|     pub fn num_writes(&self) -> usize { | ||||
|         self.num_writes | ||||
|     } | ||||
|  | ||||
|     fn would_block(&mut self) -> io::Error { | ||||
|         self.blocked = true; | ||||
|         if self.park_tasks { | ||||
|             self.task = Some(task::current()); | ||||
|         } | ||||
|         io::ErrorKind::WouldBlock.into() | ||||
|     } | ||||
|  | ||||
| } | ||||
|  | ||||
| impl AsyncIo<MockCursor> { | ||||
|     pub fn new_buf<T: Into<Vec<u8>>>(buf: T, bytes: usize) -> AsyncIo<MockCursor> { | ||||
|         AsyncIo::new(MockCursor::wrap(buf.into()), bytes) | ||||
|     } | ||||
|  | ||||
|     /* | ||||
|     pub fn new_eof() -> AsyncIo<Buf> { | ||||
|         AsyncIo::new(Buf::wrap(Vec::new().into()), 1) | ||||
|     } | ||||
|     */ | ||||
|  | ||||
|     #[cfg(feature = "runtime")] | ||||
|     fn close(&mut self) { | ||||
|         self.block_in(1); | ||||
|         assert_eq!( | ||||
|             self.inner.vec.len(), | ||||
|             self.inner.pos, | ||||
|             "AsyncIo::close(), but cursor not consumed", | ||||
|         ); | ||||
|         self.inner.vec.truncate(0); | ||||
|         self.inner.pos = 0; | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: Read + Write> AsyncIo<T> { | ||||
|     fn write_no_vecs<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { | ||||
|         if !buf.has_remaining() { | ||||
|             return Ok(Async::Ready(0)); | ||||
|         } | ||||
|  | ||||
|         let n = try_nb!(self.write(buf.bytes())); | ||||
|         buf.advance(n); | ||||
|         Ok(Async::Ready(n)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<S: AsRef<[u8]>, T: AsRef<[u8]>> PartialEq<S> for AsyncIo<T> { | ||||
|     fn eq(&self, other: &S) -> bool { | ||||
|         self.inner.as_ref() == other.as_ref() | ||||
|     } | ||||
| } | ||||
|  | ||||
|  | ||||
| impl<T: Read> Read for AsyncIo<T> { | ||||
|     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { | ||||
|         assert!(!self.panic, "AsyncIo::read panic"); | ||||
|         self.blocked = false; | ||||
|         if let Some(err) = self.error.take() { | ||||
|             Err(err) | ||||
|         } else if self.bytes_until_block == 0 { | ||||
|             Err(self.would_block()) | ||||
|         } else { | ||||
|             let n = cmp::min(self.bytes_until_block, buf.len()); | ||||
|             let n = self.inner.read(&mut buf[..n])?; | ||||
|             self.bytes_until_block -= n; | ||||
|             Ok(n) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: Write> Write for AsyncIo<T> { | ||||
|     fn write(&mut self, data: &[u8]) -> io::Result<usize> { | ||||
|         assert!(!self.panic, "AsyncIo::write panic"); | ||||
|         self.num_writes += 1; | ||||
|         if let Some(err) = self.error.take() { | ||||
|             trace!("AsyncIo::write error"); | ||||
|             Err(err) | ||||
|         } else if self.bytes_until_block == 0 { | ||||
|             trace!("AsyncIo::write would block"); | ||||
|             Err(self.would_block()) | ||||
|         } else { | ||||
|             trace!("AsyncIo::write; {} bytes", data.len()); | ||||
|             self.flushed = false; | ||||
|             let n = cmp::min(self.bytes_until_block, data.len()); | ||||
|             let n = self.inner.write(&data[..n])?; | ||||
|             self.bytes_until_block -= n; | ||||
|             Ok(n) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn flush(&mut self) -> io::Result<()> { | ||||
|         self.flushed = true; | ||||
|         self.inner.flush() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: Read + Write> AsyncRead for AsyncIo<T> { | ||||
| } | ||||
|  | ||||
| impl<T: Read + Write> AsyncWrite for AsyncIo<T> { | ||||
|     fn shutdown(&mut self) -> Poll<(), io::Error> { | ||||
|         Ok(().into()) | ||||
|     } | ||||
|  | ||||
|     fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> { | ||||
|         assert!(!self.panic, "AsyncIo::write_buf panic"); | ||||
|         if self.max_read_vecs == 0 { | ||||
|             return self.write_no_vecs(buf); | ||||
|         } | ||||
|         let r = { | ||||
|             static DUMMY: &[u8] = &[0]; | ||||
|             let mut bufs = [From::from(DUMMY); READ_VECS_CNT]; | ||||
|             let i = Buf::bytes_vec(&buf, &mut bufs[..self.max_read_vecs]); | ||||
|             let mut n = 0; | ||||
|             let mut ret = Ok(0); | ||||
|             // each call to write() will increase our count, but we assume | ||||
|             // that if iovecs are used, its really only 1 write call. | ||||
|             let num_writes = self.num_writes; | ||||
|             for iovec in &bufs[..i] { | ||||
|                 match self.write(iovec) { | ||||
|                     Ok(num) => { | ||||
|                         n += num; | ||||
|                         ret = Ok(n); | ||||
|                     }, | ||||
|                     Err(e) => { | ||||
|                         if e.kind() == io::ErrorKind::WouldBlock { | ||||
|                             if let Ok(0) = ret { | ||||
|                                 ret = Err(e); | ||||
|                             } | ||||
|                         } else { | ||||
|                             ret = Err(e); | ||||
|                         } | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|             self.num_writes = num_writes + 1; | ||||
|             ret | ||||
|         }; | ||||
|         match r { | ||||
|             Ok(n) => { | ||||
|                 Buf::advance(buf, n); | ||||
|                 Ok(Async::Ready(n)) | ||||
|             } | ||||
|             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { | ||||
|                 Ok(Async::NotReady) | ||||
|             } | ||||
|             Err(e) => Err(e), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl ::std::ops::Deref for AsyncIo<MockCursor> { | ||||
|     type Target = [u8]; | ||||
|  | ||||
|     fn deref(&self) -> &[u8] { | ||||
|         &self.inner | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[cfg(feature = "runtime")] | ||||
| pub struct Duplex { | ||||
|   | ||||
| @@ -242,6 +242,11 @@ where | ||||
|         } | ||||
|         Pin::new(&mut self.io).poll_flush(cx) | ||||
|     } | ||||
|  | ||||
|     #[cfg(test)] | ||||
|     fn flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a { | ||||
|         futures_util::future::poll_fn(move |cx| self.poll_flush(cx)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| // The `B` is a `Buf`, we never project a pin to it | ||||
| @@ -252,7 +257,7 @@ pub trait MemRead { | ||||
|     fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>; | ||||
| } | ||||
|  | ||||
| impl<T, B> MemRead for Buffered<T, B>  | ||||
| impl<T, B> MemRead for Buffered<T, B> | ||||
| where | ||||
|     T: AsyncRead + AsyncWrite + Unpin, | ||||
|     B: Buf, | ||||
| @@ -650,17 +655,15 @@ impl<T: Buf> Buf for BufDeque<T> { | ||||
|  | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
|     // FIXME: re-implement tests with `async/await`, this import should | ||||
|     // trigger a warning to remind us | ||||
|     use crate::Error; | ||||
|     /* | ||||
|     use super::*; | ||||
|     use std::io::Read; | ||||
|     use crate::mock::AsyncIo; | ||||
|     use std::time::Duration; | ||||
|  | ||||
|     use tokio_test::io::Builder as Mock; | ||||
|  | ||||
|     #[cfg(feature = "nightly")] | ||||
|     use test::Bencher; | ||||
|  | ||||
|     /* | ||||
|     impl<T: Read> MemRead for AsyncIo<T> { | ||||
|         fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> { | ||||
|             let mut v = vec![0; len]; | ||||
| @@ -668,33 +671,51 @@ mod tests { | ||||
|             Ok(Async::Ready(BytesMut::from(&v[..n]).freeze())) | ||||
|         } | ||||
|     } | ||||
|     */ | ||||
|  | ||||
|     #[test] | ||||
|     fn iobuf_write_empty_slice() { | ||||
|         let mut mock = AsyncIo::new_buf(vec![], 256); | ||||
|         mock.error(io::Error::new(io::ErrorKind::Other, "logic error")); | ||||
|  | ||||
|         let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock); | ||||
|     #[tokio::test] | ||||
|     async fn iobuf_write_empty_slice() { | ||||
|         // First, let's just check that the Mock would normally return an | ||||
|         // error on an unexpected write, even if the buffer is empty... | ||||
|         let mut mock = Mock::new().build(); | ||||
|         futures_util::future::poll_fn(|cx| Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[]))) | ||||
|             .await | ||||
|             .expect_err("should be a broken pipe"); | ||||
|  | ||||
|         // underlying io will return the logic error upon write, | ||||
|         // so we are testing that the io_buf does not trigger a write | ||||
|         // when there is nothing to flush | ||||
|         io_buf.flush().expect("should short-circuit flush"); | ||||
|         let mock = Mock::new().build(); | ||||
|         let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock); | ||||
|         io_buf.flush().await.expect("should short-circuit flush"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn parse_reads_until_blocked() { | ||||
|         // missing last line ending | ||||
|         let raw = "HTTP/1.1 200 OK\r\n"; | ||||
|     #[tokio::test] | ||||
|     async fn parse_reads_until_blocked() { | ||||
|         use crate::proto::h1::ClientTransaction; | ||||
|  | ||||
|         let mock = Mock::new() | ||||
|             // Split over multiple reads will read all of it | ||||
|             .read(b"HTTP/1.1 200 OK\r\n") | ||||
|             .read(b"Server: hyper\r\n") | ||||
|             // missing last line ending | ||||
|             .wait(Duration::from_secs(1)) | ||||
|             .build(); | ||||
|  | ||||
|         let mock = AsyncIo::new_buf(raw, raw.len()); | ||||
|         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); | ||||
|         let ctx = ParseContext { | ||||
|             cached_headers: &mut None, | ||||
|             req_method: &mut None, | ||||
|         }; | ||||
|         assert!(buffered.parse::<crate::proto::h1::ClientTransaction>(ctx).unwrap().is_not_ready()); | ||||
|         assert!(buffered.io.blocked()); | ||||
|  | ||||
|         // We expect a `parse` to be not ready, and so can't await it directly. | ||||
|         // Rather, this `poll_fn` will wrap the `Poll` result. | ||||
|         futures_util::future::poll_fn(|cx| { | ||||
|             let parse_ctx = ParseContext { | ||||
|                 cached_headers: &mut None, | ||||
|                 req_method: &mut None, | ||||
|             }; | ||||
|             assert!(buffered.parse::<ClientTransaction>(cx, parse_ctx).is_pending()); | ||||
|             Poll::Ready(()) | ||||
|         }).await; | ||||
|  | ||||
|         assert_eq!(buffered.read_buf, b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
| @@ -791,12 +812,14 @@ mod tests { | ||||
|     #[test] | ||||
|     #[should_panic] | ||||
|     fn write_buf_requires_non_empty_bufs() { | ||||
|         let mock = AsyncIo::new_buf(vec![], 1024); | ||||
|         let mock = Mock::new().build(); | ||||
|         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); | ||||
|  | ||||
|         buffered.buffer(Cursor::new(Vec::new())); | ||||
|     } | ||||
|  | ||||
|     /* | ||||
|     TODO: needs tokio_test::io to allow configure write_buf calls | ||||
|     #[test] | ||||
|     fn write_buf_queue() { | ||||
|         extern crate pretty_env_logger; | ||||
| @@ -817,13 +840,18 @@ mod tests { | ||||
|         assert_eq!(buffered.io.num_writes(), 1); | ||||
|         assert_eq!(buffered.write_buf.queue.bufs.len(), 0); | ||||
|     } | ||||
|     */ | ||||
|  | ||||
|     #[test] | ||||
|     fn write_buf_flatten() { | ||||
|     #[tokio::test] | ||||
|     async fn write_buf_flatten() { | ||||
|         extern crate pretty_env_logger; | ||||
|         let _ = pretty_env_logger::try_init(); | ||||
|  | ||||
|         let mock = AsyncIo::new_buf(vec![], 1024); | ||||
|         let mock = Mock::new() | ||||
|             // Just a single write | ||||
|             .write(b"hello world, it's hyper!") | ||||
|             .build(); | ||||
|  | ||||
|         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); | ||||
|         buffered.write_buf.set_strategy(WriteStrategy::Flatten); | ||||
|  | ||||
| @@ -833,19 +861,21 @@ mod tests { | ||||
|         buffered.buffer(Cursor::new(b"hyper!".to_vec())); | ||||
|         assert_eq!(buffered.write_buf.queue.bufs.len(), 0); | ||||
|  | ||||
|         buffered.flush().unwrap(); | ||||
|  | ||||
|         assert_eq!(buffered.io, b"hello world, it's hyper!"); | ||||
|         assert_eq!(buffered.io.num_writes(), 1); | ||||
|         buffered.flush().await.expect("flush"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn write_buf_auto_flatten() { | ||||
|     #[tokio::test] | ||||
|     async fn write_buf_auto_flatten() { | ||||
|         extern crate pretty_env_logger; | ||||
|         let _ = pretty_env_logger::try_init(); | ||||
|  | ||||
|         let mut mock = AsyncIo::new_buf(vec![], 1024); | ||||
|         mock.max_read_vecs(0); // disable vectored IO | ||||
|         let mock = Mock::new() | ||||
|             // Expects write_buf to only consume first buffer | ||||
|             .write(b"hello ") | ||||
|             // And then the Auto strategy will have flattened | ||||
|             .write(b"world, it's hyper!") | ||||
|             .build(); | ||||
|  | ||||
|         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); | ||||
|  | ||||
|         // we have 4 buffers, but hope to detect that vectored IO isn't | ||||
| @@ -856,20 +886,24 @@ mod tests { | ||||
|         buffered.buffer(Cursor::new(b"it's ".to_vec())); | ||||
|         buffered.buffer(Cursor::new(b"hyper!".to_vec())); | ||||
|         assert_eq!(buffered.write_buf.queue.bufs.len(), 3); | ||||
|         buffered.flush().unwrap(); | ||||
|  | ||||
|         assert_eq!(buffered.io, b"hello world, it's hyper!"); | ||||
|         assert_eq!(buffered.io.num_writes(), 2); | ||||
|         buffered.flush().await.expect("flush"); | ||||
|  | ||||
|         assert_eq!(buffered.write_buf.queue.bufs.len(), 0); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn write_buf_queue_disable_auto() { | ||||
|     #[tokio::test] | ||||
|     async fn write_buf_queue_disable_auto() { | ||||
|         extern crate pretty_env_logger; | ||||
|         let _ = pretty_env_logger::try_init(); | ||||
|  | ||||
|         let mut mock = AsyncIo::new_buf(vec![], 1024); | ||||
|         mock.max_read_vecs(0); // disable vectored IO | ||||
|         let mock = Mock::new() | ||||
|             .write(b"hello ") | ||||
|             .write(b"world, ") | ||||
|             .write(b"it's ") | ||||
|             .write(b"hyper!") | ||||
|             .build(); | ||||
|  | ||||
|         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock); | ||||
|         buffered.write_buf.set_strategy(WriteStrategy::Queue); | ||||
|  | ||||
| @@ -881,10 +915,9 @@ mod tests { | ||||
|         buffered.buffer(Cursor::new(b"it's ".to_vec())); | ||||
|         buffered.buffer(Cursor::new(b"hyper!".to_vec())); | ||||
|         assert_eq!(buffered.write_buf.queue.bufs.len(), 3); | ||||
|         buffered.flush().unwrap(); | ||||
|  | ||||
|         assert_eq!(buffered.io, b"hello world, it's hyper!"); | ||||
|         assert_eq!(buffered.io.num_writes(), 4); | ||||
|         buffered.flush().await.expect("flush"); | ||||
|  | ||||
|         assert_eq!(buffered.write_buf.queue.bufs.len(), 0); | ||||
|     } | ||||
|  | ||||
| @@ -903,5 +936,4 @@ mod tests { | ||||
|             write_buf.headers.bytes.clear(); | ||||
|         }) | ||||
|     } | ||||
|     */ | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user