diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index 7f20f323..83384b84 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -242,7 +242,7 @@ impl Http1Transaction for Server { let (ret, mut is_last) = if is_upgrade { (Ok(()), true) } else if msg.head.subject.is_informational() { - error!("response with 1xx status code not supported"); + warn!("response with 1xx status code not supported"); *msg.head = MessageHead::default(); msg.head.subject = StatusCode::INTERNAL_SERVER_ERROR; msg.body = None; diff --git a/tests/server.rs b/tests/server.rs index f5c13fab..6cb5054e 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -29,18 +29,12 @@ use tokio::runtime::current_thread::Runtime; use tokio::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; - use hyper::{Body, Request, Response, StatusCode, Version}; use hyper::client::Client; use hyper::server::conn::Http; use hyper::server::Server; use hyper::service::{service_fn, service_fn_ok, Service}; -fn tcp_bind(addr: &SocketAddr) -> ::tokio::io::Result { - let std_listener = StdTcpListener::bind(addr).unwrap(); - TcpListener::from_std(std_listener, &Handle::default()) -} - #[test] fn get_should_ignore_body() { @@ -77,40 +71,6 @@ fn get_with_body() { assert_eq!(server.body(), b"I'm a good request."); } -#[test] -fn get_implicitly_empty() { - // See https://github.com/hyperium/hyper/issues/1373 - let mut rt = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); - let addr = listener.local_addr().unwrap(); - - thread::spawn(move || { - let mut tcp = connect(&addr); - tcp.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - \r\n\ - ").unwrap(); - }); - - let fut = listener.incoming() - .into_future() - .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); - Http::new().serve_connection(socket, service_fn(|req: Request| { - req.into_body() - .concat2() - .map(|buf| { - assert!(buf.is_empty()); - Response::new(Body::empty()) - }) - })) - }); - - rt.block_on(fut).unwrap(); -} - mod response_body_lengths { use super::*; @@ -416,17 +376,9 @@ fn get_chunked_response_with_ka() { \r\n\ ").expect("writing 1"); - let mut buf = [0; 1024 * 4]; - let mut ntotal = 0; - loop { - let n = req.read(&mut buf[ntotal..]).expect("reading 1"); - ntotal = ntotal + n; - assert!(ntotal < buf.len()); - if &buf[ntotal - foo_bar_chunk.len()..ntotal] == foo_bar_chunk { - break; - } - } - + read_until(&mut req, |buf| { + buf.ends_with(foo_bar_chunk) + }).expect("reading 1"); // try again! @@ -441,16 +393,10 @@ fn get_chunked_response_with_ka() { \r\n\ ").expect("writing 2"); - let mut buf = [0; 1024 * 8]; - loop { - let n = req.read(&mut buf[..]).expect("reading 2"); - assert!(n > 0, "n = {}", n); - if n < buf.len() && n > 0 { - if &buf[n - quux.len()..n] == quux { - break; - } - } - } + + read_until(&mut req, |buf| { + buf.ends_with(quux) + }).expect("reading 2"); } #[test] @@ -478,7 +424,6 @@ fn post_with_chunked_body() { #[test] fn post_with_incomplete_body() { - extern crate pretty_env_logger; let _ = pretty_env_logger::try_init(); let server = serve(); let mut req = connect(server.addr()); @@ -499,7 +444,6 @@ fn post_with_incomplete_body() { #[test] fn head_response_can_send_content_length() { - extern crate pretty_env_logger; let _ = pretty_env_logger::try_init(); let server = serve(); server.reply() @@ -527,7 +471,6 @@ fn head_response_can_send_content_length() { #[test] fn head_response_doesnt_send_body() { - extern crate pretty_env_logger; let _ = pretty_env_logger::try_init(); let foo_bar = b"foo bar baz"; let server = serve(); @@ -556,7 +499,6 @@ fn head_response_doesnt_send_body() { #[test] fn response_does_not_set_chunked_if_body_not_allowed() { - extern crate pretty_env_logger; let _ = pretty_env_logger::try_init(); let server = serve(); server.reply() @@ -598,15 +540,9 @@ fn keep_alive() { \r\n\ ").expect("writing 1"); - let mut buf = [0; 1024 * 8]; - loop { - let n = req.read(&mut buf[..]).expect("reading 1"); - if n < buf.len() { - if &buf[n - foo_bar.len()..n] == foo_bar { - break; - } - } - } + read_until(&mut req, |buf| { + buf.ends_with(foo_bar) + }).expect("reading 1"); // try again! @@ -621,16 +557,9 @@ fn keep_alive() { \r\n\ ").expect("writing 2"); - let mut buf = [0; 1024 * 8]; - loop { - let n = req.read(&mut buf[..]).expect("reading 2"); - assert!(n > 0, "n = {}", n); - if n < buf.len() { - if &buf[n - quux.len()..n] == quux { - break; - } - } - } + read_until(&mut req, |buf| { + buf.ends_with(quux) + }).expect("reading 2"); } #[test] @@ -649,19 +578,18 @@ fn http_10_keep_alive() { \r\n\ ").expect("writing 1"); - let mut buf = [0; 1024 * 8]; - loop { - let n = req.read(&mut buf[..]).expect("reading 1"); - if n < buf.len() { - if &buf[n - foo_bar.len()..n] == foo_bar { - break; - } - } - } // Connection: keep-alive header should be added when downgrading to a 1.0 response - let response = String::from_utf8(buf.to_vec()).unwrap(); - response.contains("Connection: keep-alive\r\n"); + let res = read_until(&mut req, |buf| { + buf.ends_with(foo_bar) + }).expect("reading 1"); + + let sres = s(&res); + assert!( + sres.contains("connection: keep-alive\r\n"), + "HTTP/1.0 response should have sent keep-alive: {:?}", + sres, + ); // try again! @@ -675,16 +603,10 @@ fn http_10_keep_alive() { \r\n\ ").expect("writing 2"); - let mut buf = [0; 1024 * 8]; - loop { - let n = req.read(&mut buf[..]).expect("reading 2"); - assert!(n > 0, "n = {}", n); - if n < buf.len() { - if &buf[n - quux.len()..n] == quux { - break; - } - } - } + + read_until(&mut req, |buf| { + buf.ends_with(quux) + }).expect("reading 2"); } #[test] @@ -710,53 +632,26 @@ fn http_10_close_on_no_ka() { ", ).expect("writing 1"); - let mut buf = [0; 1024 * 8]; - loop { - let n = req.read(&mut buf[..]).expect("reading 1"); - if n < buf.len() { - if &buf[n - foo_bar.len()..n] == foo_bar { - break; - } else { - } - } - } + // server isn't keeping-alive, so the socket should be closed after + // writing the response. thus, read_to_end should succeed. + let mut buf = Vec::new(); + req.read_to_end(&mut buf).expect("reading 1"); - // try again! - - let quux = b"zar quux"; - server - .reply() - .header("content-length", quux.len().to_string()) - .body(quux); - - // the write can possibly succeed, since it fills the kernel buffer on the first write - let _ = req.write_all( - b"\ - GET /quux HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - ", + assert!(buf.ends_with(foo_bar)); + let sbuf = s(&buf); + assert!( + !sbuf.contains("connection: keep-alive\r\n"), + "HTTP/1.0 response shouldn't have sent keep-alive: {:?}", + sbuf, ); - - let mut buf = [0; 1024 * 8]; - match req.read(&mut buf[..]) { - // Ok(0) means EOF, so a proper shutdown - // Err(_) could mean ConnReset or something, also fine - Ok(0) | Err(_) => {} - Ok(n) => { - panic!("read {} bytes on a disabled keep-alive socket", n); - } - } } #[test] fn disable_keep_alive() { let foo_bar = b"foo bar baz"; - let server = serve_with_options(ServeOptions { - keep_alive: false, - .. Default::default() - }); + let server = serve_opts() + .keep_alive(false) + .serve(); server.reply() .header("content-length", foo_bar.len().to_string()) .body(foo_bar); @@ -768,42 +663,12 @@ fn disable_keep_alive() { \r\n\ ").expect("writing 1"); - let mut buf = [0; 1024 * 8]; - loop { - let n = req.read(&mut buf[..]).expect("reading 1"); - if n < buf.len() { - if &buf[n - foo_bar.len()..n] == foo_bar { - break; - } else { - } - } - } - // try again! - - let quux = b"zar quux"; - server.reply() - .header("content-length", quux.len().to_string()) - .body(quux); - - // the write can possibly succeed, since it fills the kernel buffer on the first write - let _ = req.write_all(b"\ - GET /quux HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - "); - - let mut buf = [0; 1024 * 8]; - match req.read(&mut buf[..]) { - // Ok(0) means EOF, so a proper shutdown - // Err(_) could mean ConnReset or something, also fine - Ok(0) | - Err(_) => {} - Ok(n) => { - panic!("read {} bytes on a disabled keep-alive socket", n); - } - } + // server isn't keeping-alive, so the socket should be closed after + // writing the response. thus, read_to_end should succeed. + let mut buf = Vec::new(); + req.read_to_end(&mut buf).expect("reading 1"); + assert!(buf.ends_with(foo_bar)); } #[test] @@ -822,44 +687,17 @@ fn header_connection_close() { \r\n\ ").expect("writing 1"); - let mut buf = [0; 1024 * 8]; - loop { - let n = req.read(&mut buf[..]).expect("reading 1"); - if n < buf.len() { - if &buf[n - foo_bar.len()..n] == foo_bar { - break; - } else { - } - } - } - - // try again! - // but since the server responded with connection: close, the internal - // state should have noticed and shutdown - - let quux = b"zar quux"; - server.reply() - .header("content-length", quux.len().to_string()) - .body(quux); - - // the write can possibly succeed, since it fills the kernel buffer on the first write - let _ = req.write_all(b"\ - GET /quux HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - "); - - let mut buf = [0; 1024 * 8]; - match req.read(&mut buf[..]) { - // Ok(0) means EOF, so a proper shutdown - // Err(_) could mean ConnReset or something, also fine - Ok(0) | - Err(_) => {} - Ok(n) => { - panic!("read {} bytes on a disabled keep-alive socket", n); - } - } + // server isn't keeping-alive, so the socket should be closed after + // writing the response. thus, read_to_end should succeed. + let mut buf = Vec::new(); + req.read_to_end(&mut buf).expect("reading 1"); + assert!(buf.ends_with(foo_bar)); + let sbuf = s(&buf); + assert!( + sbuf.contains("connection: close\r\n"), + "response should have sent close: {:?}", + sbuf, + ); } #[test] @@ -936,10 +774,9 @@ fn pipeline_disabled() { #[test] fn pipeline_enabled() { - let server = serve_with_options(ServeOptions { - pipeline: true, - .. Default::default() - }); + let server = serve_opts() + .pipeline(true) + .serve(); let mut req = connect(server.addr()); server.reply() .header("content-length", "12") @@ -1062,20 +899,14 @@ fn disable_keep_alive_post_request() { \r\n\ ").unwrap(); - let mut buf = [0; 1024 * 8]; - loop { - let n = req.read(&mut buf).expect("reading 1"); - if &buf[n - HELLO.len()..n] == HELLO.as_bytes() { - break; - } - if n == 0 { - panic!("unexpected eof"); - } - } + read_until(&mut req, |buf| { + buf.ends_with(HELLO.as_bytes()) + }).expect("reading 1"); + // Connection should get closed *after* tx is sent on tx1.send(()).unwrap(); - let nread = req.read(&mut buf).expect("keep-alive reading"); + let nread = req.read(&mut [0u8; 1024]).expect("keep-alive reading"); assert_eq!(nread, 0); }); @@ -1135,7 +966,7 @@ fn empty_parse_eof_does_not_return_error() { Http::new().serve_connection(socket, HelloWorld) }); - rt.block_on(fut).unwrap(); + rt.block_on(fut).expect("empty parse eof is ok"); } #[test] @@ -1157,7 +988,7 @@ fn nonempty_parse_eof_returns_error() { Http::new().serve_connection(socket, HelloWorld) }); - rt.block_on(fut).unwrap_err(); + rt.block_on(fut).expect_err("partial parse eof is error"); } #[test] @@ -1190,7 +1021,7 @@ fn returning_1xx_response_is_error() { })) }); - rt.block_on(fut).unwrap_err(); + rt.block_on(fut).expect_err("1xx status code should error"); } #[test] @@ -1485,7 +1316,7 @@ fn parse_errors_send_4xx_response() { .serve_connection(socket, HelloWorld) }); - rt.block_on(fut).unwrap_err(); + rt.block_on(fut).expect_err("HTTP parse error"); } #[test] @@ -1513,7 +1344,7 @@ fn illegal_request_length_returns_400_response() { .serve_connection(socket, HelloWorld) }); - rt.block_on(fut).unwrap_err(); + rt.block_on(fut).expect_err("illegal Content-Length should error"); } #[test] @@ -1558,56 +1389,33 @@ fn max_buf_size() { .serve_connection(socket, HelloWorld) }); - rt.block_on(fut).unwrap_err(); + rt.block_on(fut).expect_err("should TooLarge error"); } #[test] fn streaming_body() { let _ = pretty_env_logger::try_init(); - let mut rt = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); - let addr = listener.local_addr().unwrap(); - let (tx, rx) = oneshot::channel(); - thread::spawn(move || { - let mut tcp = connect(&addr); - tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); - let mut buf = [0; 8192]; - let mut sum = tcp.read(&mut buf).expect("read 1"); + // disable keep-alive so we can use read_to_end + let server = serve_opts() + .keep_alive(false) + .serve(); - let expected = "HTTP/1.1 200 "; - assert_eq!(s(&buf[..expected.len()]), expected); + static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; + let b = ::futures::stream::iter_ok::<_, String>(S.into_iter()) + .map(|&s| s); + let b = hyper::Body::wrap_stream(b); + server + .reply() + .body_stream(b); - loop { - let n = tcp.read(&mut buf).expect("read loop"); - sum += n; - if n == 0 { - break; - } - } - assert_eq!(sum, 100_789); - let _ = tx.send(()); - }); + let mut tcp = connect(server.addr()); + tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); + let mut buf = Vec::new(); + tcp.read_to_end(&mut buf).expect("read 1"); - let rx = rx.map_err(|_| panic!("thread panicked")); - - let fut = listener.incoming() - .into_future() - .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); - Http::new() - .keep_alive(false) - .serve_connection(socket, service_fn(|_| { - static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; - let b = ::futures::stream::iter_ok::<_, String>(S.into_iter()) - .map(|&s| s); - let b = hyper::Body::wrap_stream(b); - Ok::<_, hyper::Error>(Response::new(b)) - })) - }); - - rt.block_on(fut.join(rx)).unwrap(); + assert!(buf.starts_with(b"HTTP/1.1 200 OK\r\n"), "response is 200 OK"); + assert_eq!(buf.len(), 100_789, "full streamed body read"); } #[test] @@ -1653,10 +1461,9 @@ fn try_h2() { #[test] fn http1_only() { - let server = serve_with_options(ServeOptions { - http1_only: true, - .. Default::default() - }); + let server = serve_opts() + .http1_only() + .serve(); let addr_str = format!("http://{}", server.addr()); let mut rt = Runtime::new().expect("runtime new"); @@ -1767,7 +1574,6 @@ impl Drop for Serve { struct TestService { tx: Arc>>, reply: spmc::Receiver, - _timeout: Option, } #[derive(Debug)] @@ -1853,7 +1659,11 @@ fn connect(addr: &SocketAddr) -> TcpStream { } fn serve() -> Serve { - serve_with_options(Default::default()) + serve_opts().serve() +} + +fn serve_opts() -> ServeOptions { + ServeOptions::default() } #[derive(Clone, Copy)] @@ -1861,7 +1671,6 @@ struct ServeOptions { keep_alive: bool, http1_only: bool, pipeline: bool, - timeout: Option, } impl Default for ServeOptions { @@ -1870,78 +1679,76 @@ impl Default for ServeOptions { keep_alive: true, http1_only: false, pipeline: false, - timeout: None, } } } -fn serve_with_options(options: ServeOptions) -> Serve { - let _ = pretty_env_logger::try_init(); +impl ServeOptions { + fn http1_only(mut self) -> Self { + self.http1_only = true; + self + } - let (addr_tx, addr_rx) = mpsc::channel(); - let (msg_tx, msg_rx) = mpsc::channel(); - let (reply_tx, reply_rx) = spmc::channel(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let shutdown_rx = shutdown_rx.then(|_| Ok(())); + fn keep_alive(mut self, enabled: bool) -> Self { + self.keep_alive = enabled; + self + } - let addr = ([127, 0, 0, 1], 0).into(); + fn pipeline(mut self, enabled: bool) -> Self { + self.pipeline = enabled; + self + } - let thread_name = format!( - "test-server-{}", - thread::current() - .name() - .unwrap_or("") - ); - let thread = thread::Builder::new().name(thread_name).spawn(move || { - let server = Server::bind(&addr) - .http1_only(options.http1_only) - .http1_keepalive(options.keep_alive) - .http1_pipeline_flush(options.pipeline) - .serve(move || { - let ts = TestService { - tx: Arc::new(Mutex::new(msg_tx.clone())), - _timeout: options.timeout, - reply: reply_rx.clone(), - }; - service_fn(move |req| ts.call(req)) - }); + fn serve(self) -> Serve { + let _ = pretty_env_logger::try_init(); + let options = self; - /* - let serve = Http::new() - .http1_only(options.http1_only) - .keep_alive(options.keep_alive) - .pipeline_flush(options.pipeline) - .serve_addr(&addr, move || { - let ts = TestService { - tx: Arc::new(Mutex::new(msg_tx.clone())), - _timeout: options.timeout, - reply: reply_rx.clone(), - }; - service_fn(move |req| ts.call(req)) - }) - .expect("bind to address"); - */ + let (addr_tx, addr_rx) = mpsc::channel(); + let (msg_tx, msg_rx) = mpsc::channel(); + let (reply_tx, reply_rx) = spmc::channel(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); - addr_tx.send( - server.local_addr() - ).expect("server addr tx"); + let addr = ([127, 0, 0, 1], 0).into(); - let fut = server - .select(shutdown_rx) - .then(|_| Ok::<(), ()>(())); + let thread_name = format!( + "test-server-{}", + thread::current() + .name() + .unwrap_or("") + ); + let thread = thread::Builder::new().name(thread_name).spawn(move || { + let server = Server::bind(&addr) + .http1_only(options.http1_only) + .http1_keepalive(options.keep_alive) + .http1_pipeline_flush(options.pipeline) + .serve(move || { + let ts = TestService { + tx: Arc::new(Mutex::new(msg_tx.clone())), + reply: reply_rx.clone(), + }; + service_fn(move |req| ts.call(req)) + }); - let mut rt = Runtime::new().expect("rt new"); - rt.block_on(fut).unwrap(); - }).expect("thread spawn"); + addr_tx.send( + server.local_addr() + ).expect("server addr tx"); - let addr = addr_rx.recv().expect("server addr rx"); + let fut = server + .with_graceful_shutdown(shutdown_rx); - Serve { - msg_rx: msg_rx, - reply_tx: reply_tx, - addr: addr, - shutdown_signal: Some(shutdown_tx), - thread: Some(thread), + let mut rt = Runtime::new().expect("rt new"); + rt.block_on(fut).unwrap(); + }).expect("thread spawn"); + + let addr = addr_rx.recv().expect("server addr rx"); + + Serve { + msg_rx: msg_rx, + reply_tx: reply_tx, + addr: addr, + shutdown_signal: Some(shutdown_tx), + thread: Some(thread), + } } } @@ -1955,6 +1762,36 @@ fn has_header(msg: &str, name: &str) -> bool { msg[..n].contains(name) } +fn tcp_bind(addr: &SocketAddr) -> ::tokio::io::Result { + let std_listener = StdTcpListener::bind(addr).unwrap(); + TcpListener::from_std(std_listener, &Handle::default()) +} + +fn read_until(io: &mut R, func: F) -> io::Result> +where + R: Read, + F: Fn(&[u8]) -> bool, +{ + let mut buf = vec![0; 8192]; + let mut pos = 0; + loop { + let n = io.read(&mut buf[pos..])?; + pos += n; + if func(&buf[..pos]) { + break; + } + + if pos == buf.len() { + return Err(io::Error::new( + io::ErrorKind::Other, + "read_until buffer filled" + )); + } + } + buf.truncate(pos); + Ok(buf) +} + struct DebugStream { stream: T, _debug: D,