chore(tests): fixup re-enabled tests

This commit is contained in:
Sean McArthur
2019-08-19 17:55:37 -07:00
parent 821fa03c2a
commit 53a437c382
3 changed files with 62 additions and 76 deletions

View File

@@ -1478,17 +1478,16 @@ mod dispatch_impl {
assert_eq!(vec, b"bar=foo"); assert_eq!(vec, b"bar=foo");
} }
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
#[test] #[test]
fn alpn_h2() { fn alpn_h2() {
use hyper::Response; use hyper::Response;
use hyper::server::conn::Http; use hyper::server::conn::Http;
use hyper::service::service_fn; use hyper::service::service_fn;
use tokio_tcp::TcpListener; use tokio_net::tcp::TcpListener;
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let mut listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
let mut connector = DebugConnector::new(); let mut connector = DebugConnector::new();
connector.alpn_h2 = true; connector.alpn_h2 = true;
@@ -1497,22 +1496,17 @@ mod dispatch_impl {
let client = Client::builder() let client = Client::builder()
.build::<_, ::hyper::Body>(connector); .build::<_, ::hyper::Body>(connector);
let mut incoming = listener.incoming(); rt.spawn(async move {
let srv = incoming let (socket, _addr) = listener.accept().await.expect("accept");
.try_next() Http::new()
.map_err(|_| unreachable!()) .http2_only(true)
.and_then(|item| { .serve_connection(socket, service_fn(|req| async move {
let socket = item.unwrap(); assert_eq!(req.headers().get("host"), None);
Http::new() Ok::<_, hyper::Error>(Response::new(Body::empty()))
.http2_only(true) }))
.serve_connection(socket, service_fn(|req| async move { .await
assert_eq!(req.headers().get("host"), None); .expect("server");
Ok::<_, hyper::Error>(Response::new(Body::empty())) });
}))
})
.map_err(|e| panic!("server error: {}", e));
rt.block_on(srv).unwrap();
assert_eq!(connects.load(Ordering::SeqCst), 0); assert_eq!(connects.load(Ordering::SeqCst), 0);
@@ -2097,7 +2091,6 @@ mod conn {
assert_eq!(vec, b"bar=foo"); assert_eq!(vec, b"bar=foo");
} }
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
#[test] #[test]
fn http2_detect_conn_eof() { fn http2_detect_conn_eof() {
use futures_util::future; use futures_util::future;

View File

@@ -14,7 +14,7 @@ use std::net::{TcpStream, Shutdown, SocketAddr};
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::{Arc}; use std::sync::{Arc, Mutex};
use std::net::{TcpListener as StdTcpListener}; use std::net::{TcpListener as StdTcpListener};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@@ -29,12 +29,10 @@ use futures_util::stream::StreamExt;
use futures_util::try_future::{self, TryFutureExt}; use futures_util::try_future::{self, TryFutureExt};
use futures_util::try_stream::TryStreamExt; use futures_util::try_stream::TryStreamExt;
use http::header::{HeaderName, HeaderValue}; use http::header::{HeaderName, HeaderValue};
use tokio_net::driver::Handle;
use tokio_net::tcp::{TcpListener, TcpStream as TkTcpStream}; use tokio_net::tcp::{TcpListener, TcpStream as TkTcpStream};
use tokio::runtime::current_thread::Runtime; use tokio::runtime::current_thread::Runtime;
use tokio::reactor::Handle;
use tokio::runtime::current_thread::Runtime;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::{TcpListener, TcpStream as TkTcpStream};
use tokio_timer::Delay; use tokio_timer::Delay;
use hyper::{Body, Request, Response, StatusCode, Version}; use hyper::{Body, Request, Response, StatusCode, Version};
@@ -306,7 +304,6 @@ mod response_body_lengths {
}); });
} }
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
#[test] #[test]
fn http2_auto_response_with_known_length() { fn http2_auto_response_with_known_length() {
use hyper::body::Payload; use hyper::body::Payload;
@@ -335,7 +332,6 @@ mod response_body_lengths {
}).unwrap(); }).unwrap();
} }
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
#[test] #[test]
fn http2_auto_response_with_conflicting_lengths() { fn http2_auto_response_with_conflicting_lengths() {
use hyper::body::Payload; use hyper::body::Payload;
@@ -1522,7 +1518,6 @@ fn http1_response_with_http2_version() {
}).unwrap(); }).unwrap();
} }
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
#[test] #[test]
fn try_h2() { fn try_h2() {
let server = serve(); let server = serve();
@@ -1545,7 +1540,6 @@ fn try_h2() {
assert_eq!(server.body(), b""); assert_eq!(server.body(), b"");
} }
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
#[test] #[test]
fn http1_only() { fn http1_only() {
let server = serve_opts() let server = serve_opts()
@@ -1564,7 +1558,6 @@ fn http1_only() {
}).unwrap_err(); }).unwrap_err();
} }
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
#[test] #[test]
fn http2_service_error_sends_reset_reason() { fn http2_service_error_sends_reset_reason() {
use std::error::Error; use std::error::Error;
@@ -1596,7 +1589,6 @@ fn http2_service_error_sends_reset_reason() {
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
} }
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
#[test] #[test]
fn http2_body_user_error_sends_reset_reason() { fn http2_body_user_error_sends_reset_reason() {
use std::error::Error; use std::error::Error;
@@ -1653,7 +1645,6 @@ impl hyper::service::Service for Svc {
} }
} }
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
#[test] #[test]
fn http2_service_poll_ready_error_sends_goaway() { fn http2_service_poll_ready_error_sends_goaway() {
use std::error::Error; use std::error::Error;
@@ -1742,7 +1733,7 @@ fn skips_content_length_and_body_for_304_responses() {
struct Serve { struct Serve {
addr: SocketAddr, addr: SocketAddr,
msg_rx: mpsc::Receiver<Msg>, msg_rx: mpsc::Receiver<Msg>,
reply_tx: spmc::Sender<Reply>, reply_tx: Mutex<spmc::Sender<Reply>>,
shutdown_signal: Option<oneshot::Sender<()>>, shutdown_signal: Option<oneshot::Sender<()>>,
thread: Option<thread::JoinHandle<()>>, thread: Option<thread::JoinHandle<()>>,
} }
@@ -1785,44 +1776,46 @@ impl Serve {
type BoxError = Box<dyn std::error::Error + Send + Sync>; type BoxError = Box<dyn std::error::Error + Send + Sync>;
struct ReplyBuilder<'a> { struct ReplyBuilder<'a> {
tx: &'a spmc::Sender<Reply>, tx: &'a Mutex<spmc::Sender<Reply>>,
} }
impl<'a> ReplyBuilder<'a> { impl<'a> ReplyBuilder<'a> {
fn status(self, status: hyper::StatusCode) -> Self { fn status(self, status: hyper::StatusCode) -> Self {
self.tx.send(Reply::Status(status)).unwrap(); self.tx.lock().unwrap().send(Reply::Status(status)).unwrap();
self self
} }
fn version(self, version: hyper::Version) -> Self { fn version(self, version: hyper::Version) -> Self {
self.tx.send(Reply::Version(version)).unwrap(); self.tx.lock().unwrap().send(Reply::Version(version)).unwrap();
self self
} }
fn header<V: AsRef<str>>(self, name: &str, value: V) -> Self { fn header<V: AsRef<str>>(self, name: &str, value: V) -> Self {
let name = HeaderName::from_bytes(name.as_bytes()).expect("header name"); let name = HeaderName::from_bytes(name.as_bytes()).expect("header name");
let value = HeaderValue::from_str(value.as_ref()).expect("header value"); let value = HeaderValue::from_str(value.as_ref()).expect("header value");
self.tx.send(Reply::Header(name, value)).unwrap(); self.tx.lock().unwrap().send(Reply::Header(name, value)).unwrap();
self self
} }
fn body<T: AsRef<[u8]>>(self, body: T) { fn body<T: AsRef<[u8]>>(self, body: T) {
self.tx.send(Reply::Body(body.as_ref().to_vec().into())).unwrap(); self.tx.lock().unwrap().send(Reply::Body(body.as_ref().to_vec().into())).unwrap();
} }
fn body_stream(self, body: Body) { fn body_stream(self, body: Body) {
self.tx.send(Reply::Body(body)).unwrap(); self.tx.lock().unwrap().send(Reply::Body(body)).unwrap();
} }
#[allow(dead_code)] #[allow(dead_code)]
fn error<E: Into<BoxError>>(self, err: E) { fn error<E: Into<BoxError>>(self, err: E) {
self.tx.send(Reply::Error(err.into())).unwrap(); self.tx.lock().unwrap().send(Reply::Error(err.into())).unwrap();
} }
} }
impl<'a> Drop for ReplyBuilder<'a> { impl<'a> Drop for ReplyBuilder<'a> {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.tx.send(Reply::End); if let Ok(mut tx) = self.tx.lock() {
let _ = tx.send(Reply::End);
}
} }
} }
@@ -2036,7 +2029,7 @@ impl ServeOptions {
Serve { Serve {
msg_rx: msg_rx, msg_rx: msg_rx,
reply_tx: reply_tx, reply_tx: Mutex::new(reply_tx),
addr: addr, addr: addr,
shutdown_signal: Some(shutdown_tx), shutdown_signal: Some(shutdown_tx),
thread: Some(thread), thread: Some(thread),

View File

@@ -1,16 +1,19 @@
pub extern crate hyper; pub extern crate hyper;
pub extern crate tokio; pub extern crate tokio;
extern crate futures_util;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}}; use std::sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}};
use std::time::{Duration, Instant}; use std::time::{Duration/*, Instant*/};
use crate::hyper::{Body, Client, Request, Response, Server, Version}; use crate::hyper::{Body, Client, Request, Response, Server, Version};
use crate::hyper::client::HttpConnector; use crate::hyper::client::HttpConnector;
use crate::hyper::service::service_fn; use crate::hyper::service::{make_service_fn, service_fn};
pub use std::net::SocketAddr; pub use std::net::SocketAddr;
pub use self::futures::{future, Future, Stream}; pub use self::futures_util::{future, try_future, FutureExt as _, StreamExt as _, TryFutureExt as _, TryStreamExt as _};
pub use self::futures_channel::oneshot; //pub use self::futures_channel::oneshot;
pub use self::hyper::{HeaderMap, StatusCode}; pub use self::hyper::{HeaderMap, StatusCode};
pub use self::tokio::runtime::current_thread::Runtime; pub use self::tokio::runtime::current_thread::Runtime;
@@ -324,10 +327,10 @@ pub fn __run_test(cfg: __TestConfig) {
let serve_handles = Arc::new(Mutex::new( let serve_handles = Arc::new(Mutex::new(
cfg.server_msgs cfg.server_msgs
)); ));
let new_service = move || { let new_service = make_service_fn(move |_| {
// Move a clone into the service_fn // Move a clone into the service_fn
let serve_handles = serve_handles.clone(); let serve_handles = serve_handles.clone();
hyper::service::service_fn(move |req: Request<Body>| { future::ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
let (sreq, sres) = serve_handles.lock() let (sreq, sres) = serve_handles.lock()
.unwrap() .unwrap()
.remove(0); .remove(0);
@@ -341,7 +344,7 @@ pub fn __run_test(cfg: __TestConfig) {
let sbody = sreq.body; let sbody = sreq.body;
req.into_body() req.into_body()
.try_concat() .try_concat()
.map(move |body| { .map_ok(move |body| {
assert_eq!(body.as_ref(), sbody.as_slice(), "client body"); assert_eq!(body.as_ref(), sbody.as_slice(), "client body");
let mut res = Response::builder() let mut res = Response::builder()
@@ -351,8 +354,8 @@ pub fn __run_test(cfg: __TestConfig) {
*res.headers_mut() = sres.headers; *res.headers_mut() = sres.headers;
res res
}) })
}) }))
}; });
let serve = hyper::server::conn::Http::new() let serve = hyper::server::conn::Http::new()
.http2_only(cfg.server_version == 2) .http2_only(cfg.server_version == 2)
@@ -365,7 +368,7 @@ pub fn __run_test(cfg: __TestConfig) {
let mut addr = serve.incoming_ref().local_addr(); let mut addr = serve.incoming_ref().local_addr();
let expected_connections = cfg.connections; let expected_connections = cfg.connections;
let server = serve let server = serve
.fold(0, move |cnt, connecting| { .try_fold(0, move |cnt, connecting| {
let cnt = cnt + 1; let cnt = cnt + 1;
assert!( assert!(
cnt <= expected_connections, cnt <= expected_connections,
@@ -374,14 +377,14 @@ pub fn __run_test(cfg: __TestConfig) {
cnt cnt
); );
let fut = connecting let fut = connecting
.map_err(|never| -> hyper::Error { match never {} }) .then(|res| res.expect("connecting"))
.flatten() .map(|conn_res| conn_res.expect("server connection error"));
.map_err(|e| panic!("server connection error: {}", e));
crate::tokio::spawn(fut); crate::tokio::spawn(fut);
Ok::<_, hyper::Error>(cnt) future::ok::<_, hyper::Error>(cnt)
}) })
.map(|_| ()) .map(|res| {
.map_err(|e| panic!("serve error: {}", e)); let _ = res.expect("serve error");
});
rt.spawn(server); rt.spawn(server);
@@ -418,39 +421,37 @@ pub fn __run_test(cfg: __TestConfig) {
} }
res.into_body().try_concat() res.into_body().try_concat()
}) })
.map(move |body| { .map_ok(move |body| {
assert_eq!(body.as_ref(), cbody.as_slice(), "server body"); assert_eq!(body.as_ref(), cbody.as_slice(), "server body");
}) })
.map_err(|e| panic!("client error: {}", e)) .map(|res| res.expect("client error"))
}); });
let client_futures: Box<dyn Future<Item=(), Error=()> + Send> = if cfg.parallel { let client_futures: Pin<Box<dyn Future<Output = ()> + Send>> = if cfg.parallel {
let mut client_futures = vec![]; let mut client_futures = vec![];
for (creq, cres) in cfg.client_msgs { for (creq, cres) in cfg.client_msgs {
client_futures.push(make_request(&client, creq, cres)); client_futures.push(make_request(&client, creq, cres));
} }
drop(client); drop(client);
Box::new(future::join_all(client_futures).map(|_| ())) Box::pin(future::join_all(client_futures).map(|_| ()))
} else { } else {
let mut client_futures: Box<dyn Future<Item=Client<HttpConnector>, Error=()> + Send> = let mut client_futures: Pin<Box<dyn Future<Output=Client<HttpConnector>> + Send>> =
Box::new(future::ok(client)); Box::pin(future::ready(client));
for (creq, cres) in cfg.client_msgs { for (creq, cres) in cfg.client_msgs {
let mk_request = make_request.clone(); let mk_request = make_request.clone();
client_futures = Box::new( client_futures = Box::pin(
client_futures client_futures
.and_then(move |client| { .then(move |client| {
let fut = mk_request(&client, creq, cres); let fut = mk_request(&client, creq, cres);
fut.map(move |()| client) fut.map(move |()| client)
}) })
); );
} }
Box::new(client_futures.map(|_| ())) Box::pin(client_futures.map(|_| ()))
}; };
let client_futures = client_futures.map(|_| ()); rt.block_on(client_futures);
rt.block_on(client_futures)
.expect("shutdown succeeded");
} }
struct ProxyConfig { struct ProxyConfig {
@@ -459,7 +460,7 @@ struct ProxyConfig {
version: usize, version: usize,
} }
fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future<Item = (), Error = ()>) { fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future<Output = ()>) {
let client = Client::builder() let client = Client::builder()
.keep_alive_timeout(Duration::from_secs(10)) .keep_alive_timeout(Duration::from_secs(10))
.http2_only(cfg.version == 2) .http2_only(cfg.version == 2)
@@ -470,19 +471,18 @@ fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future<Item = (), Error =
let counter = AtomicUsize::new(0); let counter = AtomicUsize::new(0);
let srv = Server::bind(&([127, 0, 0, 1], 0).into()) let srv = Server::bind(&([127, 0, 0, 1], 0).into())
.serve(move || { .serve(make_service_fn(move |_| {
let prev = counter.fetch_add(1, Ordering::Relaxed); let prev = counter.fetch_add(1, Ordering::Relaxed);
assert!(max_connections >= prev + 1, "proxy max connections"); assert!(max_connections >= prev + 1, "proxy max connections");
let client = client.clone(); let client = client.clone();
service_fn(move |mut req| { future::ok::<_, hyper::Error>(service_fn(move |mut req| {
let uri = format!("http://{}{}", dst_addr, req.uri().path()) let uri = format!("http://{}{}", dst_addr, req.uri().path())
.parse() .parse()
.expect("proxy new uri parse"); .expect("proxy new uri parse");
*req.uri_mut() = uri; *req.uri_mut() = uri;
client.request(req) client.request(req)
}) }))
}));
});
let proxy_addr = srv.local_addr(); let proxy_addr = srv.local_addr();
(proxy_addr, srv.map_err(|err| panic!("proxy error: {}", err))) (proxy_addr, srv.map(|res| res.expect("proxy error")))
} }