From 63a8f26583432c0035667351f6654fa459844d30 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Tue, 16 Jul 2019 22:55:38 +0800 Subject: [PATCH] test(benches): update pipeline benchmark to async/await --- Cargo.toml | 8 ++-- {benches_disabled => benches}/pipeline.rs | 48 +++++++++++++---------- 2 files changed, 32 insertions(+), 24 deletions(-) rename {benches_disabled => benches}/pipeline.rs (63%) diff --git a/Cargo.toml b/Cargo.toml index 3aa3ba36..a35a9540 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,10 +151,10 @@ required-features = ["runtime"] #path = "benches/end_to_end.rs" #required-features = ["runtime"] -#[[bench]] -#name = "pipeline" -#path = "benches/pipeline.rs" -#required-features = ["runtime"] +[[bench]] +name = "pipeline" +path = "benches/pipeline.rs" +required-features = ["runtime"] #[[bench]] #name = "server" diff --git a/benches_disabled/pipeline.rs b/benches/pipeline.rs similarity index 63% rename from benches_disabled/pipeline.rs rename to benches/pipeline.rs index dd27f55f..ef27e753 100644 --- a/benches_disabled/pipeline.rs +++ b/benches/pipeline.rs @@ -1,21 +1,19 @@ +#![feature(async_await)] #![feature(test)] #![deny(warnings)] -extern crate futures; -extern crate hyper; -extern crate pretty_env_logger; extern crate test; -extern crate tokio; use std::io::{Read, Write}; use std::net::{TcpStream}; use std::sync::mpsc; +use std::time::Duration; -use futures::Future; -use futures::sync::oneshot; +use tokio::runtime::current_thread; +use tokio::sync::oneshot; use hyper::{Body, Response, Server}; -use hyper::service::service_fn_ok; +use hyper::service::{make_service_fn, service_fn}; const PIPELINED_REQUESTS: usize = 16; @@ -23,24 +21,34 @@ const PIPELINED_REQUESTS: usize = 16; fn hello_world(b: &mut test::Bencher) { let _ = pretty_env_logger::try_init(); let (_until_tx, until_rx) = oneshot::channel::<()>(); + let addr = { let (addr_tx, addr_rx) = mpsc::channel(); - ::std::thread::spawn(move || { + std::thread::spawn(move || { let addr = "127.0.0.1:0".parse().unwrap(); + + let make_svc = make_service_fn(|_| async { + Ok::<_, hyper::Error>(service_fn(|_| async { + Ok::<_, hyper::Error>(Response::new(Body::from("Hello, World!"))) + })) + }); let srv = Server::bind(&addr) .http1_pipeline_flush(true) - .serve(|| { - service_fn_ok(move |_| { - Response::new(Body::from("Hello, World!")) - }) - }); + .serve(make_svc); + addr_tx.send(srv.local_addr()).unwrap(); - let fut = srv - .map_err(|e| panic!("server error: {}", e)) - .select(until_rx.then(|_| Ok(()))) - .then(|_| Ok(())); - let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap(); - rt.spawn(fut); + + let graceful = srv + .with_graceful_shutdown(async { + until_rx.await.ok(); + }); + + let mut rt = current_thread::Runtime::new().unwrap(); + rt.spawn(async { + if let Err(e) = graceful.await { + panic!("server error: {}", e); + } + }); rt.run().unwrap(); }); @@ -60,7 +68,7 @@ fn hello_world(b: &mut test::Bencher) { } * PIPELINED_REQUESTS; let mut tcp = TcpStream::connect(addr).unwrap(); - tcp.set_read_timeout(Some(::std::time::Duration::from_secs(3))).unwrap(); + tcp.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); let mut buf = [0u8; 8192]; b.bytes = (pipelined_reqs.len() + total_bytes) as u64;