test(server): server test cleanup

This commit is contained in:
Sean McArthur
2018-10-26 12:50:27 -07:00
parent 576addd531
commit 5ca2905c84
2 changed files with 183 additions and 346 deletions

View File

@@ -242,7 +242,7 @@ impl Http1Transaction for Server {
let (ret, mut is_last) = if is_upgrade { let (ret, mut is_last) = if is_upgrade {
(Ok(()), true) (Ok(()), true)
} else if msg.head.subject.is_informational() { } else if msg.head.subject.is_informational() {
error!("response with 1xx status code not supported"); warn!("response with 1xx status code not supported");
*msg.head = MessageHead::default(); *msg.head = MessageHead::default();
msg.head.subject = StatusCode::INTERNAL_SERVER_ERROR; msg.head.subject = StatusCode::INTERNAL_SERVER_ERROR;
msg.body = None; msg.body = None;

View File

@@ -29,18 +29,12 @@ use tokio::runtime::current_thread::Runtime;
use tokio::reactor::Handle; use tokio::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use hyper::{Body, Request, Response, StatusCode, Version}; use hyper::{Body, Request, Response, StatusCode, Version};
use hyper::client::Client; use hyper::client::Client;
use hyper::server::conn::Http; use hyper::server::conn::Http;
use hyper::server::Server; use hyper::server::Server;
use hyper::service::{service_fn, service_fn_ok, Service}; use hyper::service::{service_fn, service_fn_ok, Service};
fn tcp_bind(addr: &SocketAddr) -> ::tokio::io::Result<TcpListener> {
let std_listener = StdTcpListener::bind(addr).unwrap();
TcpListener::from_std(std_listener, &Handle::default())
}
#[test] #[test]
fn get_should_ignore_body() { fn get_should_ignore_body() {
@@ -77,40 +71,6 @@ fn get_with_body() {
assert_eq!(server.body(), b"I'm a good request."); assert_eq!(server.body(), b"I'm a good request.");
} }
#[test]
fn get_implicitly_empty() {
// See https://github.com/hyperium/hyper/issues/1373
let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
thread::spawn(move || {
let mut tcp = connect(&addr);
tcp.write_all(b"\
GET / HTTP/1.1\r\n\
Host: example.domain\r\n\
\r\n\
").unwrap();
});
let fut = listener.incoming()
.into_future()
.map_err(|_| unreachable!())
.and_then(|(item, _incoming)| {
let socket = item.unwrap();
Http::new().serve_connection(socket, service_fn(|req: Request<Body>| {
req.into_body()
.concat2()
.map(|buf| {
assert!(buf.is_empty());
Response::new(Body::empty())
})
}))
});
rt.block_on(fut).unwrap();
}
mod response_body_lengths { mod response_body_lengths {
use super::*; use super::*;
@@ -416,17 +376,9 @@ fn get_chunked_response_with_ka() {
\r\n\ \r\n\
").expect("writing 1"); ").expect("writing 1");
let mut buf = [0; 1024 * 4]; read_until(&mut req, |buf| {
let mut ntotal = 0; buf.ends_with(foo_bar_chunk)
loop { }).expect("reading 1");
let n = req.read(&mut buf[ntotal..]).expect("reading 1");
ntotal = ntotal + n;
assert!(ntotal < buf.len());
if &buf[ntotal - foo_bar_chunk.len()..ntotal] == foo_bar_chunk {
break;
}
}
// try again! // try again!
@@ -441,16 +393,10 @@ fn get_chunked_response_with_ka() {
\r\n\ \r\n\
").expect("writing 2"); ").expect("writing 2");
let mut buf = [0; 1024 * 8];
loop { read_until(&mut req, |buf| {
let n = req.read(&mut buf[..]).expect("reading 2"); buf.ends_with(quux)
assert!(n > 0, "n = {}", n); }).expect("reading 2");
if n < buf.len() && n > 0 {
if &buf[n - quux.len()..n] == quux {
break;
}
}
}
} }
#[test] #[test]
@@ -478,7 +424,6 @@ fn post_with_chunked_body() {
#[test] #[test]
fn post_with_incomplete_body() { fn post_with_incomplete_body() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let server = serve(); let server = serve();
let mut req = connect(server.addr()); let mut req = connect(server.addr());
@@ -499,7 +444,6 @@ fn post_with_incomplete_body() {
#[test] #[test]
fn head_response_can_send_content_length() { fn head_response_can_send_content_length() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let server = serve(); let server = serve();
server.reply() server.reply()
@@ -527,7 +471,6 @@ fn head_response_can_send_content_length() {
#[test] #[test]
fn head_response_doesnt_send_body() { fn head_response_doesnt_send_body() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let foo_bar = b"foo bar baz"; let foo_bar = b"foo bar baz";
let server = serve(); let server = serve();
@@ -556,7 +499,6 @@ fn head_response_doesnt_send_body() {
#[test] #[test]
fn response_does_not_set_chunked_if_body_not_allowed() { fn response_does_not_set_chunked_if_body_not_allowed() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let server = serve(); let server = serve();
server.reply() server.reply()
@@ -598,15 +540,9 @@ fn keep_alive() {
\r\n\ \r\n\
").expect("writing 1"); ").expect("writing 1");
let mut buf = [0; 1024 * 8]; read_until(&mut req, |buf| {
loop { buf.ends_with(foo_bar)
let n = req.read(&mut buf[..]).expect("reading 1"); }).expect("reading 1");
if n < buf.len() {
if &buf[n - foo_bar.len()..n] == foo_bar {
break;
}
}
}
// try again! // try again!
@@ -621,16 +557,9 @@ fn keep_alive() {
\r\n\ \r\n\
").expect("writing 2"); ").expect("writing 2");
let mut buf = [0; 1024 * 8]; read_until(&mut req, |buf| {
loop { buf.ends_with(quux)
let n = req.read(&mut buf[..]).expect("reading 2"); }).expect("reading 2");
assert!(n > 0, "n = {}", n);
if n < buf.len() {
if &buf[n - quux.len()..n] == quux {
break;
}
}
}
} }
#[test] #[test]
@@ -649,19 +578,18 @@ fn http_10_keep_alive() {
\r\n\ \r\n\
").expect("writing 1"); ").expect("writing 1");
let mut buf = [0; 1024 * 8];
loop {
let n = req.read(&mut buf[..]).expect("reading 1");
if n < buf.len() {
if &buf[n - foo_bar.len()..n] == foo_bar {
break;
}
}
}
// Connection: keep-alive header should be added when downgrading to a 1.0 response // Connection: keep-alive header should be added when downgrading to a 1.0 response
let response = String::from_utf8(buf.to_vec()).unwrap(); let res = read_until(&mut req, |buf| {
response.contains("Connection: keep-alive\r\n"); buf.ends_with(foo_bar)
}).expect("reading 1");
let sres = s(&res);
assert!(
sres.contains("connection: keep-alive\r\n"),
"HTTP/1.0 response should have sent keep-alive: {:?}",
sres,
);
// try again! // try again!
@@ -675,16 +603,10 @@ fn http_10_keep_alive() {
\r\n\ \r\n\
").expect("writing 2"); ").expect("writing 2");
let mut buf = [0; 1024 * 8];
loop { read_until(&mut req, |buf| {
let n = req.read(&mut buf[..]).expect("reading 2"); buf.ends_with(quux)
assert!(n > 0, "n = {}", n); }).expect("reading 2");
if n < buf.len() {
if &buf[n - quux.len()..n] == quux {
break;
}
}
}
} }
#[test] #[test]
@@ -710,53 +632,26 @@ fn http_10_close_on_no_ka() {
", ",
).expect("writing 1"); ).expect("writing 1");
let mut buf = [0; 1024 * 8]; // server isn't keeping-alive, so the socket should be closed after
loop { // writing the response. thus, read_to_end should succeed.
let n = req.read(&mut buf[..]).expect("reading 1"); let mut buf = Vec::new();
if n < buf.len() { req.read_to_end(&mut buf).expect("reading 1");
if &buf[n - foo_bar.len()..n] == foo_bar {
break;
} else {
}
}
}
// try again! assert!(buf.ends_with(foo_bar));
let sbuf = s(&buf);
let quux = b"zar quux"; assert!(
server !sbuf.contains("connection: keep-alive\r\n"),
.reply() "HTTP/1.0 response shouldn't have sent keep-alive: {:?}",
.header("content-length", quux.len().to_string()) sbuf,
.body(quux);
// the write can possibly succeed, since it fills the kernel buffer on the first write
let _ = req.write_all(
b"\
GET /quux HTTP/1.1\r\n\
Host: example.domain\r\n\
Connection: close\r\n\
\r\n\
",
); );
let mut buf = [0; 1024 * 8];
match req.read(&mut buf[..]) {
// Ok(0) means EOF, so a proper shutdown
// Err(_) could mean ConnReset or something, also fine
Ok(0) | Err(_) => {}
Ok(n) => {
panic!("read {} bytes on a disabled keep-alive socket", n);
}
}
} }
#[test] #[test]
fn disable_keep_alive() { fn disable_keep_alive() {
let foo_bar = b"foo bar baz"; let foo_bar = b"foo bar baz";
let server = serve_with_options(ServeOptions { let server = serve_opts()
keep_alive: false, .keep_alive(false)
.. Default::default() .serve();
});
server.reply() server.reply()
.header("content-length", foo_bar.len().to_string()) .header("content-length", foo_bar.len().to_string())
.body(foo_bar); .body(foo_bar);
@@ -768,42 +663,12 @@ fn disable_keep_alive() {
\r\n\ \r\n\
").expect("writing 1"); ").expect("writing 1");
let mut buf = [0; 1024 * 8];
loop {
let n = req.read(&mut buf[..]).expect("reading 1");
if n < buf.len() {
if &buf[n - foo_bar.len()..n] == foo_bar {
break;
} else {
}
}
}
// try again! // server isn't keeping-alive, so the socket should be closed after
// writing the response. thus, read_to_end should succeed.
let quux = b"zar quux"; let mut buf = Vec::new();
server.reply() req.read_to_end(&mut buf).expect("reading 1");
.header("content-length", quux.len().to_string()) assert!(buf.ends_with(foo_bar));
.body(quux);
// the write can possibly succeed, since it fills the kernel buffer on the first write
let _ = req.write_all(b"\
GET /quux HTTP/1.1\r\n\
Host: example.domain\r\n\
Connection: close\r\n\
\r\n\
");
let mut buf = [0; 1024 * 8];
match req.read(&mut buf[..]) {
// Ok(0) means EOF, so a proper shutdown
// Err(_) could mean ConnReset or something, also fine
Ok(0) |
Err(_) => {}
Ok(n) => {
panic!("read {} bytes on a disabled keep-alive socket", n);
}
}
} }
#[test] #[test]
@@ -822,44 +687,17 @@ fn header_connection_close() {
\r\n\ \r\n\
").expect("writing 1"); ").expect("writing 1");
let mut buf = [0; 1024 * 8]; // server isn't keeping-alive, so the socket should be closed after
loop { // writing the response. thus, read_to_end should succeed.
let n = req.read(&mut buf[..]).expect("reading 1"); let mut buf = Vec::new();
if n < buf.len() { req.read_to_end(&mut buf).expect("reading 1");
if &buf[n - foo_bar.len()..n] == foo_bar { assert!(buf.ends_with(foo_bar));
break; let sbuf = s(&buf);
} else { assert!(
} sbuf.contains("connection: close\r\n"),
} "response should have sent close: {:?}",
} sbuf,
);
// try again!
// but since the server responded with connection: close, the internal
// state should have noticed and shutdown
let quux = b"zar quux";
server.reply()
.header("content-length", quux.len().to_string())
.body(quux);
// the write can possibly succeed, since it fills the kernel buffer on the first write
let _ = req.write_all(b"\
GET /quux HTTP/1.1\r\n\
Host: example.domain\r\n\
Connection: close\r\n\
\r\n\
");
let mut buf = [0; 1024 * 8];
match req.read(&mut buf[..]) {
// Ok(0) means EOF, so a proper shutdown
// Err(_) could mean ConnReset or something, also fine
Ok(0) |
Err(_) => {}
Ok(n) => {
panic!("read {} bytes on a disabled keep-alive socket", n);
}
}
} }
#[test] #[test]
@@ -936,10 +774,9 @@ fn pipeline_disabled() {
#[test] #[test]
fn pipeline_enabled() { fn pipeline_enabled() {
let server = serve_with_options(ServeOptions { let server = serve_opts()
pipeline: true, .pipeline(true)
.. Default::default() .serve();
});
let mut req = connect(server.addr()); let mut req = connect(server.addr());
server.reply() server.reply()
.header("content-length", "12") .header("content-length", "12")
@@ -1062,20 +899,14 @@ fn disable_keep_alive_post_request() {
\r\n\ \r\n\
").unwrap(); ").unwrap();
let mut buf = [0; 1024 * 8]; read_until(&mut req, |buf| {
loop { buf.ends_with(HELLO.as_bytes())
let n = req.read(&mut buf).expect("reading 1"); }).expect("reading 1");
if &buf[n - HELLO.len()..n] == HELLO.as_bytes() {
break;
}
if n == 0 {
panic!("unexpected eof");
}
}
// Connection should get closed *after* tx is sent on
tx1.send(()).unwrap(); tx1.send(()).unwrap();
let nread = req.read(&mut buf).expect("keep-alive reading"); let nread = req.read(&mut [0u8; 1024]).expect("keep-alive reading");
assert_eq!(nread, 0); assert_eq!(nread, 0);
}); });
@@ -1135,7 +966,7 @@ fn empty_parse_eof_does_not_return_error() {
Http::new().serve_connection(socket, HelloWorld) Http::new().serve_connection(socket, HelloWorld)
}); });
rt.block_on(fut).unwrap(); rt.block_on(fut).expect("empty parse eof is ok");
} }
#[test] #[test]
@@ -1157,7 +988,7 @@ fn nonempty_parse_eof_returns_error() {
Http::new().serve_connection(socket, HelloWorld) Http::new().serve_connection(socket, HelloWorld)
}); });
rt.block_on(fut).unwrap_err(); rt.block_on(fut).expect_err("partial parse eof is error");
} }
#[test] #[test]
@@ -1190,7 +1021,7 @@ fn returning_1xx_response_is_error() {
})) }))
}); });
rt.block_on(fut).unwrap_err(); rt.block_on(fut).expect_err("1xx status code should error");
} }
#[test] #[test]
@@ -1485,7 +1316,7 @@ fn parse_errors_send_4xx_response() {
.serve_connection(socket, HelloWorld) .serve_connection(socket, HelloWorld)
}); });
rt.block_on(fut).unwrap_err(); rt.block_on(fut).expect_err("HTTP parse error");
} }
#[test] #[test]
@@ -1513,7 +1344,7 @@ fn illegal_request_length_returns_400_response() {
.serve_connection(socket, HelloWorld) .serve_connection(socket, HelloWorld)
}); });
rt.block_on(fut).unwrap_err(); rt.block_on(fut).expect_err("illegal Content-Length should error");
} }
#[test] #[test]
@@ -1558,56 +1389,33 @@ fn max_buf_size() {
.serve_connection(socket, HelloWorld) .serve_connection(socket, HelloWorld)
}); });
rt.block_on(fut).unwrap_err(); rt.block_on(fut).expect_err("should TooLarge error");
} }
#[test] #[test]
fn streaming_body() { fn streaming_body() {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
let (tx, rx) = oneshot::channel(); // disable keep-alive so we can use read_to_end
thread::spawn(move || { let server = serve_opts()
let mut tcp = connect(&addr); .keep_alive(false)
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); .serve();
let mut buf = [0; 8192];
let mut sum = tcp.read(&mut buf).expect("read 1");
let expected = "HTTP/1.1 200 "; static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _;
assert_eq!(s(&buf[..expected.len()]), expected); let b = ::futures::stream::iter_ok::<_, String>(S.into_iter())
.map(|&s| s);
let b = hyper::Body::wrap_stream(b);
server
.reply()
.body_stream(b);
loop { let mut tcp = connect(server.addr());
let n = tcp.read(&mut buf).expect("read loop"); tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
sum += n; let mut buf = Vec::new();
if n == 0 { tcp.read_to_end(&mut buf).expect("read 1");
break;
}
}
assert_eq!(sum, 100_789);
let _ = tx.send(());
});
let rx = rx.map_err(|_| panic!("thread panicked")); assert!(buf.starts_with(b"HTTP/1.1 200 OK\r\n"), "response is 200 OK");
assert_eq!(buf.len(), 100_789, "full streamed body read");
let fut = listener.incoming()
.into_future()
.map_err(|_| unreachable!())
.and_then(|(item, _incoming)| {
let socket = item.unwrap();
Http::new()
.keep_alive(false)
.serve_connection(socket, service_fn(|_| {
static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _;
let b = ::futures::stream::iter_ok::<_, String>(S.into_iter())
.map(|&s| s);
let b = hyper::Body::wrap_stream(b);
Ok::<_, hyper::Error>(Response::new(b))
}))
});
rt.block_on(fut.join(rx)).unwrap();
} }
#[test] #[test]
@@ -1653,10 +1461,9 @@ fn try_h2() {
#[test] #[test]
fn http1_only() { fn http1_only() {
let server = serve_with_options(ServeOptions { let server = serve_opts()
http1_only: true, .http1_only()
.. Default::default() .serve();
});
let addr_str = format!("http://{}", server.addr()); let addr_str = format!("http://{}", server.addr());
let mut rt = Runtime::new().expect("runtime new"); let mut rt = Runtime::new().expect("runtime new");
@@ -1767,7 +1574,6 @@ impl Drop for Serve {
struct TestService { struct TestService {
tx: Arc<Mutex<mpsc::Sender<Msg>>>, tx: Arc<Mutex<mpsc::Sender<Msg>>>,
reply: spmc::Receiver<Reply>, reply: spmc::Receiver<Reply>,
_timeout: Option<Duration>,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -1853,7 +1659,11 @@ fn connect(addr: &SocketAddr) -> TcpStream {
} }
fn serve() -> Serve { fn serve() -> Serve {
serve_with_options(Default::default()) serve_opts().serve()
}
fn serve_opts() -> ServeOptions {
ServeOptions::default()
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
@@ -1861,7 +1671,6 @@ struct ServeOptions {
keep_alive: bool, keep_alive: bool,
http1_only: bool, http1_only: bool,
pipeline: bool, pipeline: bool,
timeout: Option<Duration>,
} }
impl Default for ServeOptions { impl Default for ServeOptions {
@@ -1870,78 +1679,76 @@ impl Default for ServeOptions {
keep_alive: true, keep_alive: true,
http1_only: false, http1_only: false,
pipeline: false, pipeline: false,
timeout: None,
} }
} }
} }
fn serve_with_options(options: ServeOptions) -> Serve { impl ServeOptions {
let _ = pretty_env_logger::try_init(); fn http1_only(mut self) -> Self {
self.http1_only = true;
self
}
let (addr_tx, addr_rx) = mpsc::channel(); fn keep_alive(mut self, enabled: bool) -> Self {
let (msg_tx, msg_rx) = mpsc::channel(); self.keep_alive = enabled;
let (reply_tx, reply_rx) = spmc::channel(); self
let (shutdown_tx, shutdown_rx) = oneshot::channel(); }
let shutdown_rx = shutdown_rx.then(|_| Ok(()));
let addr = ([127, 0, 0, 1], 0).into(); fn pipeline(mut self, enabled: bool) -> Self {
self.pipeline = enabled;
self
}
let thread_name = format!( fn serve(self) -> Serve {
"test-server-{}", let _ = pretty_env_logger::try_init();
thread::current() let options = self;
.name()
.unwrap_or("<unknown test case name>")
);
let thread = thread::Builder::new().name(thread_name).spawn(move || {
let server = Server::bind(&addr)
.http1_only(options.http1_only)
.http1_keepalive(options.keep_alive)
.http1_pipeline_flush(options.pipeline)
.serve(move || {
let ts = TestService {
tx: Arc::new(Mutex::new(msg_tx.clone())),
_timeout: options.timeout,
reply: reply_rx.clone(),
};
service_fn(move |req| ts.call(req))
});
/* let (addr_tx, addr_rx) = mpsc::channel();
let serve = Http::new() let (msg_tx, msg_rx) = mpsc::channel();
.http1_only(options.http1_only) let (reply_tx, reply_rx) = spmc::channel();
.keep_alive(options.keep_alive) let (shutdown_tx, shutdown_rx) = oneshot::channel();
.pipeline_flush(options.pipeline)
.serve_addr(&addr, move || {
let ts = TestService {
tx: Arc::new(Mutex::new(msg_tx.clone())),
_timeout: options.timeout,
reply: reply_rx.clone(),
};
service_fn(move |req| ts.call(req))
})
.expect("bind to address");
*/
addr_tx.send( let addr = ([127, 0, 0, 1], 0).into();
server.local_addr()
).expect("server addr tx");
let fut = server let thread_name = format!(
.select(shutdown_rx) "test-server-{}",
.then(|_| Ok::<(), ()>(())); thread::current()
.name()
.unwrap_or("<unknown test case name>")
);
let thread = thread::Builder::new().name(thread_name).spawn(move || {
let server = Server::bind(&addr)
.http1_only(options.http1_only)
.http1_keepalive(options.keep_alive)
.http1_pipeline_flush(options.pipeline)
.serve(move || {
let ts = TestService {
tx: Arc::new(Mutex::new(msg_tx.clone())),
reply: reply_rx.clone(),
};
service_fn(move |req| ts.call(req))
});
let mut rt = Runtime::new().expect("rt new"); addr_tx.send(
rt.block_on(fut).unwrap(); server.local_addr()
}).expect("thread spawn"); ).expect("server addr tx");
let addr = addr_rx.recv().expect("server addr rx"); let fut = server
.with_graceful_shutdown(shutdown_rx);
Serve { let mut rt = Runtime::new().expect("rt new");
msg_rx: msg_rx, rt.block_on(fut).unwrap();
reply_tx: reply_tx, }).expect("thread spawn");
addr: addr,
shutdown_signal: Some(shutdown_tx), let addr = addr_rx.recv().expect("server addr rx");
thread: Some(thread),
Serve {
msg_rx: msg_rx,
reply_tx: reply_tx,
addr: addr,
shutdown_signal: Some(shutdown_tx),
thread: Some(thread),
}
} }
} }
@@ -1955,6 +1762,36 @@ fn has_header(msg: &str, name: &str) -> bool {
msg[..n].contains(name) msg[..n].contains(name)
} }
fn tcp_bind(addr: &SocketAddr) -> ::tokio::io::Result<TcpListener> {
let std_listener = StdTcpListener::bind(addr).unwrap();
TcpListener::from_std(std_listener, &Handle::default())
}
fn read_until<R, F>(io: &mut R, func: F) -> io::Result<Vec<u8>>
where
R: Read,
F: Fn(&[u8]) -> bool,
{
let mut buf = vec![0; 8192];
let mut pos = 0;
loop {
let n = io.read(&mut buf[pos..])?;
pos += n;
if func(&buf[..pos]) {
break;
}
if pos == buf.len() {
return Err(io::Error::new(
io::ErrorKind::Other,
"read_until buffer filled"
));
}
}
buf.truncate(pos);
Ok(buf)
}
struct DebugStream<T, D> { struct DebugStream<T, D> {
stream: T, stream: T,
_debug: D, _debug: D,