diff --git a/benches/pipeline.rs b/benches/pipeline.rs index f5b47d46..772e5480 100644 --- a/benches/pipeline.rs +++ b/benches/pipeline.rs @@ -3,87 +3,85 @@ extern crate test; -// TODO: Reimplement hello_world_16 bench using hyper::server::conn -// (instead of Server). +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpStream}; +use std::sync::mpsc; +use std::time::Duration; -// use std::io::{Read, Write}; -// use std::net::TcpStream; -// use std::sync::mpsc; -// use std::time::Duration; +use tokio::net::TcpListener; +use tokio::sync::oneshot; -// use tokio::sync::oneshot; +use hyper::server::conn::Http; +use hyper::service::service_fn; +use hyper::{Body, Response}; -// use hyper::service::{make_service_fn, service_fn}; -// use hyper::{Body, Response, Server}; +const PIPELINED_REQUESTS: usize = 16; -// const PIPELINED_REQUESTS: usize = 16; +#[bench] +fn hello_world_16(b: &mut test::Bencher) { + let _ = pretty_env_logger::try_init(); + let (_until_tx, until_rx) = oneshot::channel::<()>(); -// #[bench] -// fn hello_world_16(b: &mut test::Bencher) { -// let _ = pretty_env_logger::try_init(); -// let (_until_tx, until_rx) = oneshot::channel::<()>(); + let addr = { + let (addr_tx, addr_rx) = mpsc::channel(); + std::thread::spawn(move || { + let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("rt build"); -// let addr = { -// let (addr_tx, addr_rx) = mpsc::channel(); -// std::thread::spawn(move || { -// let addr = "127.0.0.1:0".parse().unwrap(); + let listener = rt.block_on(TcpListener::bind(addr)).unwrap(); + let addr = listener.local_addr().unwrap(); -// let make_svc = make_service_fn(|_| async { -// Ok::<_, hyper::Error>(service_fn(|_| async { -// Ok::<_, hyper::Error>(Response::new(Body::from("Hello, World!"))) -// })) -// }); + rt.spawn(async move { + loop { + let (stream, _addr) = listener.accept().await.expect("accept"); -// let rt = tokio::runtime::Builder::new_current_thread() -// .enable_all() -// .build() -// .expect("rt build"); -// let srv = rt.block_on(async move { -// Server::bind(&addr) -// .http1_pipeline_flush(true) -// .serve(make_svc) -// }); + Http::new() + .pipeline_flush(true) + .serve_connection( + stream, + service_fn(|_| async { + Ok::<_, hyper::Error>(Response::new(Body::from("Hello, World!"))) + }), + ) + .await + .unwrap(); + } + }); -// addr_tx.send(srv.local_addr()).unwrap(); + addr_tx.send(addr).unwrap(); + rt.block_on(until_rx).ok(); + }); -// let graceful = srv.with_graceful_shutdown(async { -// until_rx.await.ok(); -// }); + addr_rx.recv().unwrap() + }; -// rt.block_on(async { -// if let Err(e) = graceful.await { -// panic!("server error: {}", e); -// } -// }); -// }); + let mut pipelined_reqs = Vec::new(); + for _ in 0..PIPELINED_REQUESTS { + pipelined_reqs.extend_from_slice(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"); + } -// addr_rx.recv().unwrap() -// }; + let total_bytes = { + let mut tcp = TcpStream::connect(addr).unwrap(); + tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") + .unwrap(); + let mut buf = Vec::new(); + tcp.read_to_end(&mut buf).unwrap() + } * PIPELINED_REQUESTS; -// let mut pipelined_reqs = Vec::new(); -// for _ in 0..PIPELINED_REQUESTS { -// pipelined_reqs.extend_from_slice(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"); -// } + let mut tcp = TcpStream::connect(addr).unwrap(); + tcp.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); + let mut buf = [0u8; 8192]; -// let total_bytes = { -// let mut tcp = TcpStream::connect(addr).unwrap(); -// tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") -// .unwrap(); -// let mut buf = Vec::new(); -// tcp.read_to_end(&mut buf).unwrap() -// } * PIPELINED_REQUESTS; - -// let mut tcp = TcpStream::connect(addr).unwrap(); -// tcp.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); -// let mut buf = [0u8; 8192]; - -// b.bytes = (pipelined_reqs.len() + total_bytes) as u64; -// b.iter(|| { -// tcp.write_all(&pipelined_reqs).unwrap(); -// let mut sum = 0; -// while sum < total_bytes { -// sum += tcp.read(&mut buf).unwrap(); -// } -// assert_eq!(sum, total_bytes); -// }); -// } + b.bytes = (pipelined_reqs.len() + total_bytes) as u64; + b.iter(|| { + tcp.write_all(&pipelined_reqs).unwrap(); + let mut sum = 0; + while sum < total_bytes { + sum += tcp.read(&mut buf).unwrap(); + } + assert_eq!(sum, total_bytes); + }); +} diff --git a/benches/server.rs b/benches/server.rs index d610ad99..9b87c85d 100644 --- a/benches/server.rs +++ b/benches/server.rs @@ -3,133 +3,139 @@ extern crate test; -// TODO: Reimplement bench_server using hyper::server::conn (instead -// of removed Server). - use std::io::{Read, Write}; -use std::net::{TcpListener, TcpStream}; +use std::net::{SocketAddr, TcpListener, TcpStream}; use std::sync::mpsc; -// use std::time::Duration; +use std::time::Duration; -// use futures_util::{stream, StreamExt}; -// use http_body_util::StreamBody; -// use tokio::sync::oneshot; +use futures_util::{stream, StreamExt}; +use http_body_util::{BodyExt, StreamBody}; +use tokio::sync::oneshot; -// use hyper::service::{make_service_fn, service_fn}; -// use hyper::{Response, Server}; +use hyper::server::conn::Http; +use hyper::service::service_fn; +use hyper::Response; -// macro_rules! bench_server { -// ($b:ident, $header:expr, $body:expr) => {{ -// let _ = pretty_env_logger::try_init(); -// let (_until_tx, until_rx) = oneshot::channel::<()>(); -// let addr = { -// let (addr_tx, addr_rx) = mpsc::channel(); -// std::thread::spawn(move || { -// let addr = "127.0.0.1:0".parse().unwrap(); -// let make_svc = make_service_fn(|_| async { -// Ok::<_, hyper::Error>(service_fn(|_| async { -// Ok::<_, hyper::Error>( -// Response::builder() -// .header($header.0, $header.1) -// .header("content-type", "text/plain") -// .body($body()) -// .unwrap(), -// ) -// })) -// }); +macro_rules! bench_server { + ($b:ident, $header:expr, $body:expr) => {{ + let _ = pretty_env_logger::try_init(); + let (_until_tx, until_rx) = oneshot::channel::<()>(); -// let rt = tokio::runtime::Builder::new_current_thread() -// .enable_all() -// .build() -// .expect("rt build"); + let addr = { + let (addr_tx, addr_rx) = mpsc::channel(); + std::thread::spawn(move || { + let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("rt build"); -// let srv = rt.block_on(async move { Server::bind(&addr).serve(make_svc) }); + let listener = rt.block_on(tokio::net::TcpListener::bind(addr)).unwrap(); + let addr = listener.local_addr().unwrap(); -// addr_tx.send(srv.local_addr()).unwrap(); + rt.spawn(async move { + loop { + let (stream, _) = listener.accept().await.expect("accept"); -// let graceful = srv.with_graceful_shutdown(async { -// until_rx.await.ok(); -// }); -// rt.block_on(async move { -// if let Err(e) = graceful.await { -// panic!("server error: {}", e); -// } -// }); -// }); + Http::new() + .serve_connection( + stream, + service_fn(|_| async { + Ok::<_, hyper::Error>( + Response::builder() + .header($header.0, $header.1) + .header("content-type", "text/plain") + .body($body()) + .unwrap(), + ) + }), + ) + .await + .unwrap(); + } + }); -// addr_rx.recv().unwrap() -// }; + addr_tx.send(addr).unwrap(); + rt.block_on(until_rx).ok(); + }); -// let total_bytes = { -// let mut tcp = TcpStream::connect(addr).unwrap(); -// tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") -// .unwrap(); -// let mut buf = Vec::new(); -// tcp.read_to_end(&mut buf).unwrap() -// }; + addr_rx.recv().unwrap() + }; -// let mut tcp = TcpStream::connect(addr).unwrap(); -// tcp.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); -// let mut buf = [0u8; 8192]; + let total_bytes = { + let mut tcp = TcpStream::connect(addr).unwrap(); + tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") + .unwrap(); + let mut buf = Vec::new(); + tcp.read_to_end(&mut buf).unwrap() + }; -// $b.bytes = 35 + total_bytes as u64; -// $b.iter(|| { -// tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") -// .unwrap(); -// let mut sum = 0; -// while sum < total_bytes { -// sum += tcp.read(&mut buf).unwrap(); -// } -// assert_eq!(sum, total_bytes); -// }); -// }}; -// } + let mut tcp = TcpStream::connect(addr).unwrap(); + tcp.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); + let mut buf = [0u8; 8192]; -// fn body(b: &'static [u8]) -> hyper::Body { -// b.into() -// } + $b.bytes = 35 + total_bytes as u64; + $b.iter(|| { + tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + .unwrap(); + let mut sum = 0; + while sum < total_bytes { + sum += tcp.read(&mut buf).unwrap(); + } + assert_eq!(sum, total_bytes); + }); + }}; +} -// #[bench] -// fn throughput_fixedsize_small_payload(b: &mut test::Bencher) { -// bench_server!(b, ("content-length", "13"), || body(b"Hello, World!")) -// } +fn body(b: &'static [u8]) -> hyper::Body { + b.into() +} -// #[bench] -// fn throughput_fixedsize_large_payload(b: &mut test::Bencher) { -// bench_server!(b, ("content-length", "1000000"), || body( -// &[b'x'; 1_000_000] -// )) -// } +#[bench] +fn throughput_fixedsize_small_payload(b: &mut test::Bencher) { + bench_server!(b, ("content-length", "13"), || body(b"Hello, World!")) +} -// #[bench] -// fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) { -// bench_server!(b, ("content-length", "1000000"), || { -// static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; -// StreamBody::new(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s))) -// }) -// } +#[bench] +fn throughput_fixedsize_large_payload(b: &mut test::Bencher) { + bench_server!(b, ("content-length", "1000000"), || body( + &[b'x'; 1_000_000] + )) +} -// #[bench] -// fn throughput_chunked_small_payload(b: &mut test::Bencher) { -// bench_server!(b, ("transfer-encoding", "chunked"), || body( -// b"Hello, World!" -// )) -// } +#[bench] +fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) { + bench_server!(b, ("content-length", "1000000"), move || { + static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; + BodyExt::boxed(StreamBody::new( + stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)), + )) + }) +} -// #[bench] -// fn throughput_chunked_large_payload(b: &mut test::Bencher) { -// bench_server!(b, ("transfer-encoding", "chunked"), || body( -// &[b'x'; 1_000_000] -// )) -// } +#[bench] +fn throughput_chunked_small_payload(b: &mut test::Bencher) { + bench_server!(b, ("transfer-encoding", "chunked"), || body( + b"Hello, World!" + )) +} -// #[bench] -// fn throughput_chunked_many_chunks(b: &mut test::Bencher) { -// bench_server!(b, ("transfer-encoding", "chunked"), || { -// static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; -// StreamBody::new(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s))) -// }) -// } +#[bench] +fn throughput_chunked_large_payload(b: &mut test::Bencher) { + bench_server!(b, ("transfer-encoding", "chunked"), || body( + &[b'x'; 1_000_000] + )) +} + +#[bench] +fn throughput_chunked_many_chunks(b: &mut test::Bencher) { + bench_server!(b, ("transfer-encoding", "chunked"), || { + static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; + BodyExt::boxed(StreamBody::new( + stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)), + )) + }) +} #[bench] fn raw_tcp_throughput_small_payload(b: &mut test::Bencher) {