From db86e554d51a4675da267659f956b642a6ab013d Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Wed, 17 Jul 2019 06:54:41 +0800 Subject: [PATCH] test(benches): update server benchmark to async/await --- Cargo.toml | 8 +-- {benches_disabled => benches}/server.rs | 68 +++++++++++++------------ 2 files changed, 40 insertions(+), 36 deletions(-) rename {benches_disabled => benches}/server.rs (76%) diff --git a/Cargo.toml b/Cargo.toml index a35a9540..d1158d2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,10 +156,10 @@ name = "pipeline" path = "benches/pipeline.rs" required-features = ["runtime"] -#[[bench]] -#name = "server" -#path = "benches/server.rs" -#required-features = ["runtime"] +[[bench]] +name = "server" +path = "benches/server.rs" +required-features = ["runtime"] #[[test]] diff --git a/benches_disabled/server.rs b/benches/server.rs similarity index 76% rename from benches_disabled/server.rs rename to benches/server.rs index dab94253..cbc63a6a 100644 --- a/benches_disabled/server.rs +++ b/benches/server.rs @@ -1,21 +1,20 @@ +#![feature(async_await)] #![feature(test)] -#![deny(warnings)] +//#![deny(warnings)] -extern crate futures; -extern crate hyper; -extern crate pretty_env_logger; extern crate test; -extern crate tokio; use std::io::{Read, Write}; use std::net::{TcpListener, TcpStream}; use std::sync::mpsc; +use std::time::Duration; -use futures::{stream, Future, Stream}; -use futures::sync::oneshot; +use futures_util::{stream, StreamExt}; +use tokio::runtime::current_thread; +use tokio::sync::oneshot; use hyper::{Body, Response, Server}; -use hyper::service::service_fn_ok; +use hyper::service::{make_service_fn, service_fn}; macro_rules! bench_server { ($b:ident, $header:expr, $body:expr) => ({ @@ -23,27 +22,33 @@ macro_rules! bench_server { let (_until_tx, until_rx) = oneshot::channel::<()>(); let addr = { let (addr_tx, addr_rx) = mpsc::channel(); - ::std::thread::spawn(move || { + std::thread::spawn(move || { let addr = "127.0.0.1:0".parse().unwrap(); + let make_svc = make_service_fn(|_| async { + Ok::<_, hyper::Error>(service_fn(|_| async { + Ok::<_, hyper::Error>(Response::builder() + .header($header.0, $header.1) + .header("content-type", "text/plain") + .body($body()) + .unwrap() + ) + })) + }); let srv = Server::bind(&addr) - .serve(|| { - let header = $header; - let body = $body; - service_fn_ok(move |_| { - Response::builder() - .header(header.0, header.1) - .header("content-type", "text/plain") - .body(body()) - .unwrap() - }) - }); + .serve(make_svc); + addr_tx.send(srv.local_addr()).unwrap(); - let fut = srv - .map_err(|e| panic!("server error: {}", e)) - .select(until_rx.then(|_| Ok(()))) - .then(|_| Ok(())); - let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); - rt.spawn(fut); + + let graceful = srv + .with_graceful_shutdown(async { + until_rx.await.ok(); + }); + let mut rt = current_thread::Runtime::new().unwrap(); + rt.spawn(async { + if let Err(e) = graceful.await { + panic!("server error: {}", e); + } + }); rt.run().unwrap(); }); @@ -58,7 +63,7 @@ macro_rules! bench_server { }; let mut tcp = TcpStream::connect(addr).unwrap(); - tcp.set_read_timeout(Some(::std::time::Duration::from_secs(3))).unwrap(); + tcp.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); let mut buf = [0u8; 8192]; $b.bytes = 35 + total_bytes as u64; @@ -91,7 +96,7 @@ fn throughput_fixedsize_large_payload(b: &mut test::Bencher) { fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) { bench_server!(b, ("content-length", "1000000"), || { static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; - Body::wrap_stream(stream::iter_ok::<_, String>(S.iter()).map(|&s| s)) + Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s))) }) } @@ -109,7 +114,7 @@ fn throughput_chunked_large_payload(b: &mut test::Bencher) { fn throughput_chunked_many_chunks(b: &mut test::Bencher) { bench_server!(b, ("transfer-encoding", "chunked"), || { static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; - Body::wrap_stream(stream::iter_ok::<_, String>(S.iter()).map(|&s| s)) + Body::wrap_stream(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s))) }) } @@ -118,7 +123,7 @@ fn raw_tcp_throughput_small_payload(b: &mut test::Bencher) { let (tx, rx) = mpsc::channel(); let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = listener.local_addr().unwrap(); - ::std::thread::spawn(move || { + std::thread::spawn(move || { let mut sock = listener.accept().unwrap().0; let mut buf = [0u8; 8192]; @@ -160,7 +165,7 @@ fn raw_tcp_throughput_large_payload(b: &mut test::Bencher) { Date: Fri, 12 May 2017 18:21:45 GMT\r\n\ \r\n\ "; - ::std::thread::spawn(move || { + std::thread::spawn(move || { let mut sock = listener.accept().unwrap().0; let mut buf = [0u8; 8192]; @@ -190,4 +195,3 @@ fn raw_tcp_throughput_large_payload(b: &mut test::Bencher) { }); tx.send(()).unwrap(); } -