test(benches): clean up end_to_end benchmark configuration

This commit is contained in:
Sean McArthur
2018-10-25 17:15:06 -07:00
parent b20971cb4e
commit 79f970c7ff

View File

@@ -11,144 +11,164 @@ use std::net::SocketAddr;
use futures::{Future, Stream}; use futures::{Future, Stream};
use tokio::runtime::current_thread::Runtime; use tokio::runtime::current_thread::Runtime;
use tokio::net::TcpListener;
use hyper::{Body, Method, Request, Response, Version}; use hyper::{Body, Method, Request, Response, Server};
use hyper::client::HttpConnector; use hyper::client::HttpConnector;
use hyper::server::conn::Http;
#[bench] #[bench]
fn http1_get(b: &mut test::Bencher) { fn http1_get(b: &mut test::Bencher) {
bench_with(b, Version::HTTP_11, || { opts()
Request::new(Body::empty()) .bench(b)
});
} }
#[bench] #[bench]
fn http1_post(b: &mut test::Bencher) { fn http1_post(b: &mut test::Bencher) {
bench_with(b, Version::HTTP_11, || { opts()
let mut req = Request::new("foo bar baz quux".into()); .method(Method::POST)
*req.method_mut() = Method::POST; .request_body(b"foo bar baz quux")
req .bench(b)
});
} }
#[bench] #[bench]
fn http1_get_parallel(b: &mut test::Bencher) { fn http1_get_parallel(b: &mut test::Bencher) {
bench_parallel_with(b, Version::HTTP_11, || { opts()
Request::new(Body::empty()) .parallel(10)
}); .bench(b)
} }
#[bench] #[bench]
fn http2_get(b: &mut test::Bencher) { fn http2_get(b: &mut test::Bencher) {
bench_with(b, Version::HTTP_2, || { opts()
Request::new(Body::empty()) .http2()
}); .bench(b)
} }
#[bench] #[bench]
fn http2_post(b: &mut test::Bencher) { fn http2_post(b: &mut test::Bencher) {
bench_with(b, Version::HTTP_2, || { opts()
let mut req = Request::new("foo bar baz quux".into()); .http2()
*req.method_mut() = Method::POST; .method(Method::POST)
req .request_body(b"foo bar baz quux")
}); .bench(b)
} }
#[bench] #[bench]
fn http2_get_parallel(b: &mut test::Bencher) { fn http2_get_parallel(b: &mut test::Bencher) {
bench_parallel_with(b, Version::HTTP_2, || { opts()
Request::new(Body::empty()) .http2()
}); .parallel(10)
.bench(b)
} }
fn bench_with<F>(b: &mut test::Bencher, version: Version, make_request: F) // ==== Benchmark Options =====
where
F: Fn() -> Request<Body>,
{
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();
let body = b"Hello";
let addr = spawn_hello(&mut rt, body);
let connector = HttpConnector::new(1); struct Opts {
let client = hyper::Client::builder() http2: bool,
.http2_only(version == Version::HTTP_2) parallel_cnt: u32,
.build::<_, Body>(connector); request_method: Method,
request_body: Option<&'static [u8]>,
let url: hyper::Uri = format!("http://{}/hello", addr).parse().unwrap(); response_body: &'static [u8],
b.bytes = body.len() as u64;
b.iter(move || {
let mut req = make_request();
*req.uri_mut() = url.clone();
rt.block_on(client.request(req).and_then(|res| {
res.into_body().for_each(|_chunk| {
Ok(())
})
})).expect("client wait");
});
} }
fn bench_parallel_with<F>(b: &mut test::Bencher, version: Version, make_request: F) fn opts() -> Opts {
where Opts {
F: Fn() -> Request<Body>, http2: false,
{ parallel_cnt: 1,
let _ = pretty_env_logger::try_init(); request_method: Method::GET,
let mut rt = Runtime::new().unwrap(); request_body: None,
let body = b"Hello"; response_body: b"Hello",
let addr = spawn_hello(&mut rt, body); }
}
let connector = HttpConnector::new(1); impl Opts {
let client = hyper::Client::builder() fn http2(mut self) -> Self {
.http2_only(version == Version::HTTP_2) self.http2 = true;
.build::<_, Body>(connector); self
}
let url: hyper::Uri = format!("http://{}/hello", addr).parse().unwrap(); fn method(mut self, m: Method) -> Self {
self.request_method = m;
self
}
b.bytes = body.len() as u64; fn request_body(mut self, body: &'static [u8]) -> Self {
b.iter(move || { self.request_body = Some(body);
let futs = (0..10) self
.into_iter() }
.map(|_| {
fn parallel(mut self, cnt: u32) -> Self {
assert!(cnt > 0, "parallel count must be larger than 0");
self.parallel_cnt = cnt;
self
}
fn bench(self, b: &mut test::Bencher) {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt, self.response_body);
let connector = HttpConnector::new(1);
let client = hyper::Client::builder()
.http2_only(self.http2)
.build::<_, Body>(connector);
let url: hyper::Uri = format!("http://{}/hello", addr).parse().unwrap();
let make_request = || {
let body = self
.request_body
.map(Body::from)
.unwrap_or_else(|| Body::empty());
let mut req = Request::new(body);
*req.method_mut() = self.request_method.clone();
req
};
if self.parallel_cnt == 1 {
b.iter(move || {
let mut req = make_request(); let mut req = make_request();
*req.uri_mut() = url.clone(); *req.uri_mut() = url.clone();
client.request(req).and_then(|res| { rt.block_on(client.request(req).and_then(|res| {
res.into_body().for_each(|_chunk| { res.into_body().for_each(|_chunk| {
Ok(()) Ok(())
}) })
}).map_err(|_e| ()) })).expect("client wait");
}); });
let _ = rt.block_on(::futures::future::join_all(futs)); } else {
}); b.iter(|| {
let futs = (0..self.parallel_cnt)
.into_iter()
.map(|_| {
let mut req = make_request();
*req.uri_mut() = url.clone();
client.request(req).and_then(|res| {
res.into_body().for_each(|_chunk| {
Ok(())
})
}).map_err(|e| panic!("client error: {}", e))
});
let _ = rt.block_on(::futures::future::join_all(futs));
});
}
}
} }
fn spawn_hello(rt: &mut Runtime, body: &'static [u8]) -> SocketAddr { fn spawn_hello(rt: &mut Runtime, body: &'static [u8]) -> SocketAddr {
use hyper::service::{service_fn}; use hyper::service::{service_fn};
let addr = "127.0.0.1:0".parse().unwrap(); let addr = "127.0.0.1:0".parse().unwrap();
let listener = TcpListener::bind(&addr).unwrap();
let addr = listener.local_addr().unwrap();
let http = Http::new(); let srv = Server::bind(&addr)
.serve(move || {
let service = service_fn(move |req: Request<Body>| { service_fn(move |req: Request<Body>| {
req.into_body() req.into_body()
.concat2() .concat2()
.map(move |_| { .map(move |_| {
Response::new(Body::from(body)) Response::new(Body::from(body))
})
}) })
});
// Specifically only accept 1 connection.
let srv = listener.incoming()
.into_future()
.map_err(|(e, _inc)| panic!("accept error: {}", e))
.and_then(move |(accepted, _inc)| {
let socket = accepted.expect("accepted socket");
http.serve_connection(socket, service)
.map_err(|_| ())
}); });
rt.spawn(srv); let addr = srv.local_addr();
let fut = srv.map_err(|err| panic!("server error: {}", err));
rt.spawn(fut);
return addr return addr
} }