chore(windows): add test for keep-alive dropped in failing test

This commit is contained in:
Sean McArthur
2017-12-15 12:18:44 -08:00
parent ef4008121e
commit 5d05b284d8

View File

@@ -4,6 +4,7 @@ extern crate futures;
extern crate spmc; extern crate spmc;
extern crate pretty_env_logger; extern crate pretty_env_logger;
extern crate tokio_core; extern crate tokio_core;
extern crate tokio_io;
use futures::{Future, Stream}; use futures::{Future, Stream};
use futures::future::{self, FutureResult, Either}; use futures::future::{self, FutureResult, Either};
@@ -11,9 +12,11 @@ use futures::sync::oneshot;
use tokio_core::net::TcpListener; use tokio_core::net::TcpListener;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tokio_io::{AsyncRead, AsyncWrite};
use std::net::{TcpStream, SocketAddr}; use std::net::{TcpStream, SocketAddr};
use std::io::{Read, Write}; use std::io::{self, Read, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
@@ -623,18 +626,22 @@ fn disable_keep_alive_post_request() {
tx1.send(()).unwrap(); tx1.send(()).unwrap();
// allow a little more time for TCP to notice the FIN let nread = req.read(&mut buf).expect("keep-alive reading")
req.set_read_timeout(Some(Duration::from_secs(5))).expect("set_read_timeout 2");
let nread = req.read(&mut buf).expect("keep-alive reading");
assert_eq!(nread, 0); assert_eq!(nread, 0);
}); });
let dropped = Dropped::new();
let dropped2 = dropped.clone();
let fut = listener.incoming() let fut = listener.incoming()
.into_future() .into_future()
.map_err(|_| unreachable!()) .map_err(|_| unreachable!())
.and_then(|(item, _incoming)| { .and_then(|(item, _incoming)| {
let (socket, _) = item.expect("accepted socket"); let (socket, _) = item.expect("accepted socket");
Http::<hyper::Chunk>::new().serve_connection(socket, HelloWorld) let transport = DebugStream {
stream: socket,
_debug: dropped2,
};
Http::<hyper::Chunk>::new().serve_connection(transport, HelloWorld)
.select2(rx1) .select2(rx1)
.then(|r| { .then(|r| {
match r { match r {
@@ -649,7 +656,9 @@ fn disable_keep_alive_post_request() {
}) })
}); });
assert!(!dropped.load());
core.run(fut).unwrap(); core.run(fut).unwrap();
assert!(dropped.load());
child.join().unwrap(); child.join().unwrap();
} }
@@ -918,3 +927,53 @@ fn serve_with_options(options: ServeOptions) -> Serve {
thread: Some(thread), thread: Some(thread),
} }
} }
struct DebugStream<T, D> {
stream: T,
_debug: D,
}
impl<T: Read, D> Read for DebugStream<T, D> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.stream.read(buf)
}
}
impl<T: Write, D> Write for DebugStream<T, D> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.stream.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.stream.flush()
}
}
impl<T: AsyncWrite, D> AsyncWrite for DebugStream<T, D> {
fn shutdown(&mut self) -> futures::Poll<(), io::Error> {
self.stream.shutdown()
}
}
impl<T: AsyncRead, D> AsyncRead for DebugStream<T, D> {}
#[derive(Clone)]
struct Dropped(Arc<AtomicBool>);
impl Dropped {
pub fn new() -> Dropped {
Dropped(Arc::new(AtomicBool::new(false)))
}
pub fn load(&self) -> bool {
self.0.load(Ordering::SeqCst)
}
}
impl Drop for Dropped {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}