diff --git a/benches_disabled/end_to_end.rs b/benches/end_to_end.rs similarity index 76% rename from benches_disabled/end_to_end.rs rename to benches/end_to_end.rs index 734cb826..59679e3d 100644 --- a/benches_disabled/end_to_end.rs +++ b/benches/end_to_end.rs @@ -1,15 +1,12 @@ +#![feature(async_await)] #![feature(test)] #![deny(warnings)] -extern crate futures; -extern crate hyper; -extern crate pretty_env_logger; extern crate test; -extern crate tokio; use std::net::SocketAddr; -use futures::{Future, Stream}; +use futures_util::future::join_all; use tokio::runtime::current_thread::Runtime; use hyper::{Body, Method, Request, Response, Server}; @@ -176,11 +173,12 @@ impl Opts { fn bench(self, b: &mut test::Bencher) { let _ = pretty_env_logger::try_init(); + // Create a runtime of current thread. let mut rt = Runtime::new().unwrap(); b.bytes = self.response_body.len() as u64 + self.request_body.map(|b| b.len()).unwrap_or(0) as u64; - let addr = spawn_hello(&mut rt, &self); + let addr = spawn_server(&mut rt, &self); let connector = HttpConnector::new(1); let client = hyper::Client::builder() @@ -198,40 +196,41 @@ impl Opts { .unwrap_or_else(|| Body::empty()); let mut req = Request::new(body); *req.method_mut() = self.request_method.clone(); + *req.uri_mut() = url.clone(); req }; + let send_request = |req: Request| { + let fut = client.request(req); + async { + let res = fut.await.expect("client wait"); + let mut body = res.into_body(); + while let Some(_chunk) = body.next().await {} + } + }; + if self.parallel_cnt == 1 { - b.iter(move || { - let mut req = make_request(); - *req.uri_mut() = url.clone(); - rt.block_on(client.request(req).and_then(|res| { - res.into_body().for_each(|_chunk| { - Ok(()) - }) - })).expect("client wait"); + b.iter(|| { + let req = make_request(); + rt.block_on(send_request(req)); }); + } else { b.iter(|| { let futs = (0..self.parallel_cnt) - .into_iter() .map(|_| { - let mut req = make_request(); - *req.uri_mut() = url.clone(); - client.request(req).and_then(|res| { - res.into_body().for_each(|_chunk| { - Ok(()) - }) - }).map_err(|e| panic!("client error: {}", e)) + let req = make_request(); + send_request(req) }); - let _ = rt.block_on(::futures::future::join_all(futs)); + // Await all spawned futures becoming completed. + rt.block_on(join_all(futs)); }); } } } -fn spawn_hello(rt: &mut Runtime, opts: &Opts) -> SocketAddr { - use hyper::service::{service_fn}; +fn spawn_server(rt: &mut Runtime, opts: &Opts) -> SocketAddr { + use hyper::service::{make_service_fn, service_fn}; let addr = "127.0.0.1:0".parse().unwrap(); let body = opts.response_body; @@ -239,20 +238,18 @@ fn spawn_hello(rt: &mut Runtime, opts: &Opts) -> SocketAddr { .http2_only(opts.http2) .http2_initial_stream_window_size_(opts.http2_stream_window) .http2_initial_connection_window_size_(opts.http2_conn_window) - .serve(move || { - service_fn(move |req: Request| { - req - .into_body() - .for_each(|_chunk| { - Ok(()) - }) - .map(move |_| { - Response::new(Body::from(body)) - }) - }) - }); + .serve(make_service_fn( move |_| async move { + Ok::<_, hyper::Error>(service_fn(move |req: Request| async move { + let mut req_body = req.into_body(); + while let Some(_chunk) = req_body.next().await {} + Ok::<_, hyper::Error>(Response::new(Body::from(body))) + })) + })); let addr = srv.local_addr(); - let fut = srv.map_err(|err| panic!("server error: {}", err)); - rt.spawn(fut); + rt.spawn(async { + if let Err(err) = srv.await { + panic!("server error: {}", err); + } + }); return addr }