diff --git a/tests/server.rs b/tests/server.rs index c15ec3ee..96206017 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -4,6 +4,7 @@ extern crate futures; extern crate spmc; extern crate pretty_env_logger; extern crate tokio_core; +extern crate tokio_io; use futures::{Future, Stream}; use futures::future::{self, FutureResult, Either}; @@ -11,9 +12,11 @@ use futures::sync::oneshot; use tokio_core::net::TcpListener; use tokio_core::reactor::Core; +use tokio_io::{AsyncRead, AsyncWrite}; use std::net::{TcpStream, SocketAddr}; -use std::io::{Read, Write}; +use std::io::{self, Read, Write}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use std::sync::{Arc, Mutex}; use std::thread; @@ -623,18 +626,22 @@ fn disable_keep_alive_post_request() { tx1.send(()).unwrap(); - // allow a little more time for TCP to notice the FIN - req.set_read_timeout(Some(Duration::from_secs(5))).expect("set_read_timeout 2"); - let nread = req.read(&mut buf).expect("keep-alive reading"); + let nread = req.read(&mut buf).expect("keep-alive reading") assert_eq!(nread, 0); }); + let dropped = Dropped::new(); + let dropped2 = dropped.clone(); let fut = listener.incoming() .into_future() .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let (socket, _) = item.expect("accepted socket"); - Http::::new().serve_connection(socket, HelloWorld) + let transport = DebugStream { + stream: socket, + _debug: dropped2, + }; + Http::::new().serve_connection(transport, HelloWorld) .select2(rx1) .then(|r| { match r { @@ -649,7 +656,9 @@ fn disable_keep_alive_post_request() { }) }); + assert!(!dropped.load()); core.run(fut).unwrap(); + assert!(dropped.load()); child.join().unwrap(); } @@ -918,3 +927,53 @@ fn serve_with_options(options: ServeOptions) -> Serve { thread: Some(thread), } } + +struct DebugStream { + stream: T, + _debug: D, +} + +impl Read for DebugStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.stream.read(buf) + } +} + +impl Write for DebugStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.stream.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.stream.flush() + } +} + + +impl AsyncWrite for DebugStream { + fn shutdown(&mut self) -> futures::Poll<(), io::Error> { + self.stream.shutdown() + } +} + + +impl AsyncRead for DebugStream {} + +#[derive(Clone)] +struct Dropped(Arc); + +impl Dropped { + pub fn new() -> Dropped { + Dropped(Arc::new(AtomicBool::new(false))) + } + + pub fn load(&self) -> bool { + self.0.load(Ordering::SeqCst) + } +} + +impl Drop for Dropped { + fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + } +}