test(h1): fix flaky streaming test
This commit is contained in:
60
src/mock.rs
60
src/mock.rs
@@ -1,7 +1,8 @@
|
|||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
|
|
||||||
use futures::Poll;
|
use bytes::Buf as BufTrait;
|
||||||
|
use futures::{Async, Poll};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -11,10 +12,6 @@ pub struct Buf {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Buf {
|
impl Buf {
|
||||||
pub fn new() -> Buf {
|
|
||||||
Buf::wrap(vec![])
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn wrap(vec: Vec<u8>) -> Buf {
|
pub fn wrap(vec: Vec<u8>) -> Buf {
|
||||||
Buf {
|
Buf {
|
||||||
vec: vec,
|
vec: vec,
|
||||||
@@ -63,23 +60,27 @@ impl Read for Buf {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const READ_VECS_CNT: usize = 64;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AsyncIo<T> {
|
pub struct AsyncIo<T> {
|
||||||
inner: T,
|
blocked: bool,
|
||||||
bytes_until_block: usize,
|
bytes_until_block: usize,
|
||||||
error: Option<io::Error>,
|
error: Option<io::Error>,
|
||||||
blocked: bool,
|
|
||||||
flushed: bool,
|
flushed: bool,
|
||||||
|
inner: T,
|
||||||
|
max_read_vecs: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> AsyncIo<T> {
|
impl<T> AsyncIo<T> {
|
||||||
pub fn new(inner: T, bytes: usize) -> AsyncIo<T> {
|
pub fn new(inner: T, bytes: usize) -> AsyncIo<T> {
|
||||||
AsyncIo {
|
AsyncIo {
|
||||||
inner: inner,
|
blocked: false,
|
||||||
bytes_until_block: bytes,
|
bytes_until_block: bytes,
|
||||||
error: None,
|
error: None,
|
||||||
flushed: false,
|
flushed: false,
|
||||||
blocked: false,
|
inner: inner,
|
||||||
|
max_read_vecs: READ_VECS_CNT,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,6 +91,22 @@ impl<T> AsyncIo<T> {
|
|||||||
pub fn error(&mut self, err: io::Error) {
|
pub fn error(&mut self, err: io::Error) {
|
||||||
self.error = Some(err);
|
self.error = Some(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn max_read_vecs(&mut self, cnt: usize) {
|
||||||
|
assert!(cnt <= READ_VECS_CNT);
|
||||||
|
self.max_read_vecs = cnt;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "tokio-proto")]
|
||||||
|
//TODO: fix proto::conn::tests to not use tokio-proto API,
|
||||||
|
//and then this cfg flag go away
|
||||||
|
pub fn flushed(&self) -> bool {
|
||||||
|
self.flushed
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn blocked(&self) -> bool {
|
||||||
|
self.blocked
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsyncIo<Buf> {
|
impl AsyncIo<Buf> {
|
||||||
@@ -103,16 +120,17 @@ impl AsyncIo<Buf> {
|
|||||||
pub fn new_eof() -> AsyncIo<Buf> {
|
pub fn new_eof() -> AsyncIo<Buf> {
|
||||||
AsyncIo::new(Buf::wrap(Vec::new().into()), 1)
|
AsyncIo::new(Buf::wrap(Vec::new().into()), 1)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "tokio-proto")]
|
impl<T: Read + Write> AsyncIo<T> {
|
||||||
//TODO: fix proto::conn::tests to not use tokio-proto API,
|
fn write_no_vecs<B: BufTrait>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||||
//and then this cfg flag go away
|
if !buf.has_remaining() {
|
||||||
pub fn flushed(&self) -> bool {
|
return Ok(Async::Ready(0));
|
||||||
self.flushed
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn blocked(&self) -> bool {
|
let n = try_nb!(self.write(buf.bytes()));
|
||||||
self.blocked
|
buf.advance(n);
|
||||||
|
Ok(Async::Ready(n))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -170,12 +188,14 @@ impl<T: Read + Write> AsyncWrite for AsyncIo<T> {
|
|||||||
Ok(().into())
|
Ok(().into())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_buf<B: ::bytes::Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
fn write_buf<B: BufTrait>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||||
use futures::Async;
|
if self.max_read_vecs == 0 {
|
||||||
|
return self.write_no_vecs(buf);
|
||||||
|
}
|
||||||
let r = {
|
let r = {
|
||||||
static DUMMY: &[u8] = &[0];
|
static DUMMY: &[u8] = &[0];
|
||||||
let mut bufs = [From::from(DUMMY); 64];
|
let mut bufs = [From::from(DUMMY); READ_VECS_CNT];
|
||||||
let i = ::bytes::Buf::bytes_vec(&buf, &mut bufs);
|
let i = ::bytes::Buf::bytes_vec(&buf, &mut bufs[..self.max_read_vecs]);
|
||||||
let mut n = 0;
|
let mut n = 0;
|
||||||
let mut ret = Ok(0);
|
let mut ret = Ok(0);
|
||||||
for iovec in &bufs[..i] {
|
for iovec in &bufs[..i] {
|
||||||
|
|||||||
@@ -440,6 +440,9 @@ impl<T: Buf> Buf for BufDeque<T> {
|
|||||||
return buf.bytes();
|
return buf.bytes();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if let Some(ref buf) = self.bufs.front() {
|
||||||
|
return buf.bytes();
|
||||||
|
}
|
||||||
&[]
|
&[]
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -483,24 +486,24 @@ impl<T: Buf> Buf for BufDeque<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Move tests to their own mod
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
use std::io::Read;
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::io::Read;
|
||||||
|
use mock::AsyncIo;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
impl<T: Read> MemRead for ::mock::AsyncIo<T> {
|
impl<T: Read> MemRead for ::mock::AsyncIo<T> {
|
||||||
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
|
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
|
||||||
let mut v = vec![0; len];
|
let mut v = vec![0; len];
|
||||||
let n = try_nb!(self.read(v.as_mut_slice()));
|
let n = try_nb!(self.read(v.as_mut_slice()));
|
||||||
Ok(Async::Ready(BytesMut::from(&v[..n]).freeze()))
|
Ok(Async::Ready(BytesMut::from(&v[..n]).freeze()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_iobuf_write_empty_slice() {
|
fn iobuf_write_empty_slice() {
|
||||||
use mock::{AsyncIo, Buf as MockBuf};
|
let mut mock = AsyncIo::new_buf(vec![], 256);
|
||||||
|
|
||||||
let mut mock = AsyncIo::new(MockBuf::new(), 256);
|
|
||||||
mock.error(io::Error::new(io::ErrorKind::Other, "logic error"));
|
mock.error(io::Error::new(io::ErrorKind::Other, "logic error"));
|
||||||
|
|
||||||
let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
||||||
@@ -509,16 +512,28 @@ fn test_iobuf_write_empty_slice() {
|
|||||||
// so we are testing that the io_buf does not trigger a write
|
// so we are testing that the io_buf does not trigger a write
|
||||||
// when there is nothing to flush
|
// when there is nothing to flush
|
||||||
io_buf.flush().expect("should short-circuit flush");
|
io_buf.flush().expect("should short-circuit flush");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_parse_reads_until_blocked() {
|
fn parse_reads_until_blocked() {
|
||||||
use mock::{AsyncIo, Buf as MockBuf};
|
|
||||||
// missing last line ending
|
// missing last line ending
|
||||||
let raw = "HTTP/1.1 200 OK\r\n";
|
let raw = "HTTP/1.1 200 OK\r\n";
|
||||||
|
|
||||||
let mock = AsyncIo::new(MockBuf::wrap(raw.into()), raw.len());
|
let mock = AsyncIo::new_buf(raw, raw.len());
|
||||||
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
||||||
assert_eq!(buffered.parse::<::proto::ClientTransaction>().unwrap(), Async::NotReady);
|
assert_eq!(buffered.parse::<::proto::ClientTransaction>().unwrap(), Async::NotReady);
|
||||||
assert!(buffered.io.blocked());
|
assert!(buffered.io.blocked());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn write_buf_skips_empty_bufs() {
|
||||||
|
let mut mock = AsyncIo::new_buf(vec![], 1024);
|
||||||
|
mock.max_read_vecs(0); // disable vectored IO
|
||||||
|
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
||||||
|
|
||||||
|
buffered.buffer(Cursor::new(Vec::new()));
|
||||||
|
buffered.buffer(Cursor::new(b"hello".to_vec()));
|
||||||
|
buffered.flush().unwrap();
|
||||||
|
assert_eq!(buffered.io, b"hello");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1044,45 +1044,6 @@ mod dispatch_impl {
|
|||||||
assert_eq!(closes.load(Ordering::Relaxed), 1);
|
assert_eq!(closes.load(Ordering::Relaxed), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn client_body_mpsc() {
|
|
||||||
use futures::Sink;
|
|
||||||
let _ = pretty_env_logger::try_init();
|
|
||||||
let server = TcpListener::bind("127.0.0.1:0").unwrap();
|
|
||||||
let addr = server.local_addr().unwrap();
|
|
||||||
let mut core = Core::new().unwrap();
|
|
||||||
let handle = core.handle();
|
|
||||||
let closes = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
let (tx1, rx1) = oneshot::channel();
|
|
||||||
|
|
||||||
thread::spawn(move || {
|
|
||||||
let mut sock = server.accept().unwrap().0;
|
|
||||||
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
|
|
||||||
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
|
|
||||||
let mut buf = [0; 4096];
|
|
||||||
sock.read(&mut buf).expect("read 1");
|
|
||||||
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap();
|
|
||||||
let _ = tx1.send(());
|
|
||||||
});
|
|
||||||
|
|
||||||
let uri = format!("http://{}/a", addr).parse().unwrap();
|
|
||||||
|
|
||||||
let client = Client::configure()
|
|
||||||
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
|
|
||||||
.build(&handle);
|
|
||||||
let mut req = Request::new(Method::Post, uri);
|
|
||||||
let (tx, body) = hyper::Body::pair();
|
|
||||||
req.set_body(body);
|
|
||||||
let res = client.request(req).and_then(move |res| {
|
|
||||||
assert_eq!(res.status(), hyper::StatusCode::Ok);
|
|
||||||
res.body().concat2()
|
|
||||||
});
|
|
||||||
let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
|
|
||||||
let send = tx.send_all(::futures::stream::iter_ok(vec!["hello"; 2]).map(hyper::Chunk::from).map(Ok)).then(|_| Ok(()));
|
|
||||||
core.run(res.join(send).join(rx).map(|r| r.0)).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
struct DebugConnector(HttpConnector, Arc<AtomicUsize>);
|
struct DebugConnector(HttpConnector, Arc<AtomicUsize>);
|
||||||
|
|
||||||
impl Service for DebugConnector {
|
impl Service for DebugConnector {
|
||||||
|
|||||||
Reference in New Issue
Block a user