diff --git a/src/client/tests.rs b/src/client/tests.rs index d496209b..7df6a7ef 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -137,7 +137,14 @@ fn checkout_win_allows_connect_future_to_be_pooled() { .map(|res| res.into_body().concat2()); let srv1 = poll_fn(|| { try_ready!(sock1.read(&mut [0u8; 512])); - try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\nx")); + // Chunked is used so as to force 2 body reads. + try_ready!(sock1.write(b"\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + 1\r\nx\r\n\ + 0\r\n\r\n\ + ")); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 2b1603e2..358cdfb6 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -180,29 +180,31 @@ where I: AsyncRead + AsyncWrite, pub fn read_body(&mut self) -> Poll, io::Error> { debug_assert!(self.can_read_body()); - trace!("Conn::read_body"); - let (reading, ret) = match self.state.reading { Reading::Body(ref mut decoder) => { match decoder.decode(&mut self.io) { Ok(Async::Ready(slice)) => { - let (reading, chunk) = if !slice.is_empty() { - return Ok(Async::Ready(Some(Chunk::from(slice)))); - } else if decoder.is_eof() { + let (reading, chunk) = if decoder.is_eof() { debug!("incoming body completed"); - (Reading::KeepAlive, None) - } else { - trace!("decode stream unexpectedly ended"); - // this should actually be unreachable: - // the decoder will return an UnexpectedEof if there were - // no bytes to read and it isn't eof yet... + (Reading::KeepAlive, if !slice.is_empty() { + Some(Chunk::from(slice)) + } else { + None + }) + } else if slice.is_empty() { + error!("decode stream unexpectedly ended"); + // This should be unreachable, since all 3 decoders + // either set eof=true or return an Err when reading + // an empty slice... (Reading::Closed, None) + } else { + return Ok(Async::Ready(Some(Chunk::from(slice)))); }; (reading, Ok(Async::Ready(chunk))) }, Ok(Async::NotReady) => return Ok(Async::NotReady), Err(e) => { - trace!("decode stream error: {}", e); + debug!("decode stream error: {}", e); (Reading::Closed, Err(e)) }, } diff --git a/tests/integration.rs b/tests/integration.rs index 7a9a1127..e593284b 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -62,6 +62,50 @@ t! { ; } +t! { + get_body_2_keeps_alive, + client: + request: + uri: "/", + ; + response: + status: 200, + headers: { + "content-length" => 11, + }, + body: "hello world", + ; + request: + uri: "/", + ; + response: + status: 200, + headers: { + "content-length" => 11, + }, + body: "hello world", + ; + server: + request: + uri: "/", + ; + response: + headers: { + "content-length" => 11, + }, + body: "hello world", + ; + request: + uri: "/", + ; + response: + headers: { + "content-length" => 11, + }, + body: "hello world", + ; +} + t! { get_strip_connection_header, client: diff --git a/tests/support/mod.rs b/tests/support/mod.rs index be60a959..40524655 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -2,6 +2,13 @@ pub extern crate futures; pub extern crate hyper; pub extern crate tokio; +use std::sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}}; +use std::time::Duration; + +use hyper::{Body, Client, Request, Response, Server, Version}; +use hyper::client::HttpConnector; +use hyper::service::service_fn; + pub use std::net::SocketAddr; pub use self::futures::{future, Future, Stream}; pub use self::futures::sync::oneshot; @@ -44,6 +51,16 @@ macro_rules! t { )); } + __run_test(__TestConfig { + client_version: 2, + client_msgs: c.clone(), + server_version: 2, + server_msgs: s.clone(), + parallel: true, + connections: 1, + proxy: false, + }); + __run_test(__TestConfig { client_version: 2, client_msgs: c, @@ -51,8 +68,8 @@ macro_rules! t { server_msgs: s, parallel: true, connections: 1, + proxy: true, }); - } ); ( @@ -104,6 +121,27 @@ macro_rules! t { server_msgs: s.clone(), parallel: false, connections: 1, + proxy: false, + }); + + __run_test(__TestConfig { + client_version: 2, + client_msgs: c.clone(), + server_version: 2, + server_msgs: s.clone(), + parallel: false, + connections: 1, + proxy: false, + }); + + __run_test(__TestConfig { + client_version: 1, + client_msgs: c.clone(), + server_version: 1, + server_msgs: s.clone(), + parallel: false, + connections: 1, + proxy: true, }); __run_test(__TestConfig { @@ -113,6 +151,7 @@ macro_rules! t { server_msgs: s, parallel: false, connections: 1, + proxy: true, }); } ); @@ -185,14 +224,11 @@ pub struct __TestConfig { pub parallel: bool, pub connections: usize, + pub proxy: bool, } pub fn __run_test(cfg: __TestConfig) { extern crate pretty_env_logger; - use hyper::{Body, Client, Request, Response, Version}; - use hyper::client::HttpConnector; - use std::sync::{Arc, Mutex}; - use std::time::Duration; let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().expect("new rt"); @@ -254,31 +290,39 @@ pub fn __run_test(cfg: __TestConfig) { ) .expect("serve_addr"); - let addr = serve.incoming_ref().local_addr(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let (success_tx, success_rx) = oneshot::channel(); + let mut addr = serve.incoming_ref().local_addr(); let expected_connections = cfg.connections; let server = serve .fold(0, move |cnt, connecting| { + let cnt = cnt + 1; + assert!( + cnt <= expected_connections, + "server expected {} connections, received {}", + expected_connections, + cnt + ); let fut = connecting .map_err(|never| -> hyper::Error { match never {} }) .flatten() .map_err(|e| panic!("server connection error: {}", e)); ::tokio::spawn(fut); - Ok::<_, hyper::Error>(cnt + 1) + Ok::<_, hyper::Error>(cnt) }) - .map(move |cnt| { - assert_eq!(cnt, expected_connections); - }) - .map_err(|e| panic!("serve error: {}", e)) - .select2(shutdown_rx) - .map(move |_| { - let _ = success_tx.send(()); - }) - .map_err(|_| panic!("shutdown not ok")); + .map(|_| ()) + .map_err(|e| panic!("serve error: {}", e)); rt.spawn(server); + if cfg.proxy { + let (proxy_addr, proxy) = naive_proxy(ProxyConfig { + connections: cfg.connections, + dst: addr, + version: cfg.server_version, + }); + rt.spawn(proxy); + addr = proxy_addr; + } + let make_request = Arc::new(move |client: &Client, creq: __CReq, cres: __CRes| { let uri = format!("http://{}{}", addr, creq.uri); @@ -335,12 +379,41 @@ pub fn __run_test(cfg: __TestConfig) { Box::new(client_futures.map(|_| ())) }; - let client_futures = client_futures.map(move |_| { - let _ = shutdown_tx.send(()); - }); - rt.spawn(client_futures); - rt.block_on(success_rx - .map_err(|_| "something panicked")) + let client_futures = client_futures.map(|_| ()); + rt.block_on(client_futures) .expect("shutdown succeeded"); } +struct ProxyConfig { + connections: usize, + dst: SocketAddr, + version: usize, +} + +fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) { + let client = Client::builder() + .keep_alive_timeout(Duration::from_secs(10)) + .http2_only(cfg.version == 2) + .build_http::(); + + let dst_addr = cfg.dst; + let max_connections = cfg.connections; + let counter = AtomicUsize::new(0); + + let srv = Server::bind(&([127, 0, 0, 1], 0).into()) + .serve(move || { + let prev = counter.fetch_add(1, Ordering::Relaxed); + assert!(max_connections >= prev + 1, "proxy max connections"); + let client = client.clone(); + service_fn(move |mut req| { + let uri = format!("http://{}{}", dst_addr, req.uri().path()) + .parse() + .expect("proxy new uri parse"); + *req.uri_mut() = uri; + client.request(req) + }) + + }); + let proxy_addr = srv.local_addr(); + (proxy_addr, srv.map_err(|err| panic!("proxy error: {}", err))) +}