test(benches): re-enable pipeline and server bench (#2934)
re-enable the recently disabled pipeline and server bench using `hyper::server::conn` instead of the removed higher-level `Server` api
This commit is contained in:
		| @@ -3,87 +3,85 @@ | ||||
|  | ||||
| extern crate test; | ||||
|  | ||||
| // TODO: Reimplement hello_world_16 bench using hyper::server::conn | ||||
| // (instead of Server). | ||||
| use std::io::{Read, Write}; | ||||
| use std::net::{SocketAddr, TcpStream}; | ||||
| use std::sync::mpsc; | ||||
| use std::time::Duration; | ||||
|  | ||||
| // use std::io::{Read, Write}; | ||||
| // use std::net::TcpStream; | ||||
| // use std::sync::mpsc; | ||||
| // use std::time::Duration; | ||||
| use tokio::net::TcpListener; | ||||
| use tokio::sync::oneshot; | ||||
|  | ||||
| // use tokio::sync::oneshot; | ||||
| use hyper::server::conn::Http; | ||||
| use hyper::service::service_fn; | ||||
| use hyper::{Body, Response}; | ||||
|  | ||||
| // use hyper::service::{make_service_fn, service_fn}; | ||||
| // use hyper::{Body, Response, Server}; | ||||
| const PIPELINED_REQUESTS: usize = 16; | ||||
|  | ||||
| // const PIPELINED_REQUESTS: usize = 16; | ||||
| #[bench] | ||||
| fn hello_world_16(b: &mut test::Bencher) { | ||||
|     let _ = pretty_env_logger::try_init(); | ||||
|     let (_until_tx, until_rx) = oneshot::channel::<()>(); | ||||
|  | ||||
| // #[bench] | ||||
| // fn hello_world_16(b: &mut test::Bencher) { | ||||
| //     let _ = pretty_env_logger::try_init(); | ||||
| //     let (_until_tx, until_rx) = oneshot::channel::<()>(); | ||||
|     let addr = { | ||||
|         let (addr_tx, addr_rx) = mpsc::channel(); | ||||
|         std::thread::spawn(move || { | ||||
|             let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); | ||||
|             let rt = tokio::runtime::Builder::new_current_thread() | ||||
|                 .enable_all() | ||||
|                 .build() | ||||
|                 .expect("rt build"); | ||||
|  | ||||
| //     let addr = { | ||||
| //         let (addr_tx, addr_rx) = mpsc::channel(); | ||||
| //         std::thread::spawn(move || { | ||||
| //             let addr = "127.0.0.1:0".parse().unwrap(); | ||||
|             let listener = rt.block_on(TcpListener::bind(addr)).unwrap(); | ||||
|             let addr = listener.local_addr().unwrap(); | ||||
|  | ||||
| //             let make_svc = make_service_fn(|_| async { | ||||
| //                 Ok::<_, hyper::Error>(service_fn(|_| async { | ||||
| //                     Ok::<_, hyper::Error>(Response::new(Body::from("Hello, World!"))) | ||||
| //                 })) | ||||
| //             }); | ||||
|             rt.spawn(async move { | ||||
|                 loop { | ||||
|                     let (stream, _addr) = listener.accept().await.expect("accept"); | ||||
|  | ||||
| //             let rt = tokio::runtime::Builder::new_current_thread() | ||||
| //                 .enable_all() | ||||
| //                 .build() | ||||
| //                 .expect("rt build"); | ||||
| //             let srv = rt.block_on(async move { | ||||
| //                 Server::bind(&addr) | ||||
| //                     .http1_pipeline_flush(true) | ||||
| //                     .serve(make_svc) | ||||
| //             }); | ||||
|                     Http::new() | ||||
|                         .pipeline_flush(true) | ||||
|                         .serve_connection( | ||||
|                             stream, | ||||
|                             service_fn(|_| async { | ||||
|                                 Ok::<_, hyper::Error>(Response::new(Body::from("Hello, World!"))) | ||||
|                             }), | ||||
|                         ) | ||||
|                         .await | ||||
|                         .unwrap(); | ||||
|                 } | ||||
|             }); | ||||
|  | ||||
| //             addr_tx.send(srv.local_addr()).unwrap(); | ||||
|             addr_tx.send(addr).unwrap(); | ||||
|             rt.block_on(until_rx).ok(); | ||||
|         }); | ||||
|  | ||||
| //             let graceful = srv.with_graceful_shutdown(async { | ||||
| //                 until_rx.await.ok(); | ||||
| //             }); | ||||
|         addr_rx.recv().unwrap() | ||||
|     }; | ||||
|  | ||||
| //             rt.block_on(async { | ||||
| //                 if let Err(e) = graceful.await { | ||||
| //                     panic!("server error: {}", e); | ||||
| //                 } | ||||
| //             }); | ||||
| //         }); | ||||
|     let mut pipelined_reqs = Vec::new(); | ||||
|     for _ in 0..PIPELINED_REQUESTS { | ||||
|         pipelined_reqs.extend_from_slice(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"); | ||||
|     } | ||||
|  | ||||
| //         addr_rx.recv().unwrap() | ||||
| //     }; | ||||
|     let total_bytes = { | ||||
|         let mut tcp = TcpStream::connect(addr).unwrap(); | ||||
|         tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") | ||||
|             .unwrap(); | ||||
|         let mut buf = Vec::new(); | ||||
|         tcp.read_to_end(&mut buf).unwrap() | ||||
|     } * PIPELINED_REQUESTS; | ||||
|  | ||||
| //     let mut pipelined_reqs = Vec::new(); | ||||
| //     for _ in 0..PIPELINED_REQUESTS { | ||||
| //         pipelined_reqs.extend_from_slice(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"); | ||||
| //     } | ||||
|     let mut tcp = TcpStream::connect(addr).unwrap(); | ||||
|     tcp.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); | ||||
|     let mut buf = [0u8; 8192]; | ||||
|  | ||||
| //     let total_bytes = { | ||||
| //         let mut tcp = TcpStream::connect(addr).unwrap(); | ||||
| //         tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") | ||||
| //             .unwrap(); | ||||
| //         let mut buf = Vec::new(); | ||||
| //         tcp.read_to_end(&mut buf).unwrap() | ||||
| //     } * PIPELINED_REQUESTS; | ||||
|  | ||||
| //     let mut tcp = TcpStream::connect(addr).unwrap(); | ||||
| //     tcp.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); | ||||
| //     let mut buf = [0u8; 8192]; | ||||
|  | ||||
| //     b.bytes = (pipelined_reqs.len() + total_bytes) as u64; | ||||
| //     b.iter(|| { | ||||
| //         tcp.write_all(&pipelined_reqs).unwrap(); | ||||
| //         let mut sum = 0; | ||||
| //         while sum < total_bytes { | ||||
| //             sum += tcp.read(&mut buf).unwrap(); | ||||
| //         } | ||||
| //         assert_eq!(sum, total_bytes); | ||||
| //     }); | ||||
| // } | ||||
|     b.bytes = (pipelined_reqs.len() + total_bytes) as u64; | ||||
|     b.iter(|| { | ||||
|         tcp.write_all(&pipelined_reqs).unwrap(); | ||||
|         let mut sum = 0; | ||||
|         while sum < total_bytes { | ||||
|             sum += tcp.read(&mut buf).unwrap(); | ||||
|         } | ||||
|         assert_eq!(sum, total_bytes); | ||||
|     }); | ||||
| } | ||||
|   | ||||
| @@ -3,133 +3,139 @@ | ||||
|  | ||||
| extern crate test; | ||||
|  | ||||
| // TODO: Reimplement bench_server using hyper::server::conn (instead | ||||
| // of removed Server). | ||||
|  | ||||
| use std::io::{Read, Write}; | ||||
| use std::net::{TcpListener, TcpStream}; | ||||
| use std::net::{SocketAddr, TcpListener, TcpStream}; | ||||
| use std::sync::mpsc; | ||||
| // use std::time::Duration; | ||||
| use std::time::Duration; | ||||
|  | ||||
| // use futures_util::{stream, StreamExt}; | ||||
| // use http_body_util::StreamBody; | ||||
| // use tokio::sync::oneshot; | ||||
| use futures_util::{stream, StreamExt}; | ||||
| use http_body_util::{BodyExt, StreamBody}; | ||||
| use tokio::sync::oneshot; | ||||
|  | ||||
| // use hyper::service::{make_service_fn, service_fn}; | ||||
| // use hyper::{Response, Server}; | ||||
| use hyper::server::conn::Http; | ||||
| use hyper::service::service_fn; | ||||
| use hyper::Response; | ||||
|  | ||||
| // macro_rules! bench_server { | ||||
| //     ($b:ident, $header:expr, $body:expr) => {{ | ||||
| //         let _ = pretty_env_logger::try_init(); | ||||
| //         let (_until_tx, until_rx) = oneshot::channel::<()>(); | ||||
| //         let addr = { | ||||
| //             let (addr_tx, addr_rx) = mpsc::channel(); | ||||
| //             std::thread::spawn(move || { | ||||
| //                 let addr = "127.0.0.1:0".parse().unwrap(); | ||||
| //                 let make_svc = make_service_fn(|_| async { | ||||
| //                     Ok::<_, hyper::Error>(service_fn(|_| async { | ||||
| //                         Ok::<_, hyper::Error>( | ||||
| //                             Response::builder() | ||||
| //                                 .header($header.0, $header.1) | ||||
| //                                 .header("content-type", "text/plain") | ||||
| //                                 .body($body()) | ||||
| //                                 .unwrap(), | ||||
| //                         ) | ||||
| //                     })) | ||||
| //                 }); | ||||
| macro_rules! bench_server { | ||||
|     ($b:ident, $header:expr, $body:expr) => {{ | ||||
|         let _ = pretty_env_logger::try_init(); | ||||
|         let (_until_tx, until_rx) = oneshot::channel::<()>(); | ||||
|  | ||||
| //                 let rt = tokio::runtime::Builder::new_current_thread() | ||||
| //                     .enable_all() | ||||
| //                     .build() | ||||
| //                     .expect("rt build"); | ||||
|         let addr = { | ||||
|             let (addr_tx, addr_rx) = mpsc::channel(); | ||||
|             std::thread::spawn(move || { | ||||
|                 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); | ||||
|                 let rt = tokio::runtime::Builder::new_current_thread() | ||||
|                     .enable_all() | ||||
|                     .build() | ||||
|                     .expect("rt build"); | ||||
|  | ||||
| //                 let srv = rt.block_on(async move { Server::bind(&addr).serve(make_svc) }); | ||||
|                 let listener = rt.block_on(tokio::net::TcpListener::bind(addr)).unwrap(); | ||||
|                 let addr = listener.local_addr().unwrap(); | ||||
|  | ||||
| //                 addr_tx.send(srv.local_addr()).unwrap(); | ||||
|                 rt.spawn(async move { | ||||
|                     loop { | ||||
|                         let (stream, _) = listener.accept().await.expect("accept"); | ||||
|  | ||||
| //                 let graceful = srv.with_graceful_shutdown(async { | ||||
| //                     until_rx.await.ok(); | ||||
| //                 }); | ||||
| //                 rt.block_on(async move { | ||||
| //                     if let Err(e) = graceful.await { | ||||
| //                         panic!("server error: {}", e); | ||||
| //                     } | ||||
| //                 }); | ||||
| //             }); | ||||
|                         Http::new() | ||||
|                             .serve_connection( | ||||
|                                 stream, | ||||
|                                 service_fn(|_| async { | ||||
|                                     Ok::<_, hyper::Error>( | ||||
|                                         Response::builder() | ||||
|                                             .header($header.0, $header.1) | ||||
|                                             .header("content-type", "text/plain") | ||||
|                                             .body($body()) | ||||
|                                             .unwrap(), | ||||
|                                     ) | ||||
|                                 }), | ||||
|                             ) | ||||
|                             .await | ||||
|                             .unwrap(); | ||||
|                     } | ||||
|                 }); | ||||
|  | ||||
| //             addr_rx.recv().unwrap() | ||||
| //         }; | ||||
|                 addr_tx.send(addr).unwrap(); | ||||
|                 rt.block_on(until_rx).ok(); | ||||
|             }); | ||||
|  | ||||
| //         let total_bytes = { | ||||
| //             let mut tcp = TcpStream::connect(addr).unwrap(); | ||||
| //             tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") | ||||
| //                 .unwrap(); | ||||
| //             let mut buf = Vec::new(); | ||||
| //             tcp.read_to_end(&mut buf).unwrap() | ||||
| //         }; | ||||
|             addr_rx.recv().unwrap() | ||||
|         }; | ||||
|  | ||||
| //         let mut tcp = TcpStream::connect(addr).unwrap(); | ||||
| //         tcp.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); | ||||
| //         let mut buf = [0u8; 8192]; | ||||
|         let total_bytes = { | ||||
|             let mut tcp = TcpStream::connect(addr).unwrap(); | ||||
|             tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") | ||||
|                 .unwrap(); | ||||
|             let mut buf = Vec::new(); | ||||
|             tcp.read_to_end(&mut buf).unwrap() | ||||
|         }; | ||||
|  | ||||
| //         $b.bytes = 35 + total_bytes as u64; | ||||
| //         $b.iter(|| { | ||||
| //             tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") | ||||
| //                 .unwrap(); | ||||
| //             let mut sum = 0; | ||||
| //             while sum < total_bytes { | ||||
| //                 sum += tcp.read(&mut buf).unwrap(); | ||||
| //             } | ||||
| //             assert_eq!(sum, total_bytes); | ||||
| //         }); | ||||
| //     }}; | ||||
| // } | ||||
|         let mut tcp = TcpStream::connect(addr).unwrap(); | ||||
|         tcp.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); | ||||
|         let mut buf = [0u8; 8192]; | ||||
|  | ||||
| // fn body(b: &'static [u8]) -> hyper::Body { | ||||
| //     b.into() | ||||
| // } | ||||
|         $b.bytes = 35 + total_bytes as u64; | ||||
|         $b.iter(|| { | ||||
|             tcp.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") | ||||
|                 .unwrap(); | ||||
|             let mut sum = 0; | ||||
|             while sum < total_bytes { | ||||
|                 sum += tcp.read(&mut buf).unwrap(); | ||||
|             } | ||||
|             assert_eq!(sum, total_bytes); | ||||
|         }); | ||||
|     }}; | ||||
| } | ||||
|  | ||||
| // #[bench] | ||||
| // fn throughput_fixedsize_small_payload(b: &mut test::Bencher) { | ||||
| //     bench_server!(b, ("content-length", "13"), || body(b"Hello, World!")) | ||||
| // } | ||||
| fn body(b: &'static [u8]) -> hyper::Body { | ||||
|     b.into() | ||||
| } | ||||
|  | ||||
| // #[bench] | ||||
| // fn throughput_fixedsize_large_payload(b: &mut test::Bencher) { | ||||
| //     bench_server!(b, ("content-length", "1000000"), || body( | ||||
| //         &[b'x'; 1_000_000] | ||||
| //     )) | ||||
| // } | ||||
| #[bench] | ||||
| fn throughput_fixedsize_small_payload(b: &mut test::Bencher) { | ||||
|     bench_server!(b, ("content-length", "13"), || body(b"Hello, World!")) | ||||
| } | ||||
|  | ||||
| // #[bench] | ||||
| // fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) { | ||||
| //     bench_server!(b, ("content-length", "1000000"), || { | ||||
| //         static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; | ||||
| //         StreamBody::new(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s))) | ||||
| //     }) | ||||
| // } | ||||
| #[bench] | ||||
| fn throughput_fixedsize_large_payload(b: &mut test::Bencher) { | ||||
|     bench_server!(b, ("content-length", "1000000"), || body( | ||||
|         &[b'x'; 1_000_000] | ||||
|     )) | ||||
| } | ||||
|  | ||||
| // #[bench] | ||||
| // fn throughput_chunked_small_payload(b: &mut test::Bencher) { | ||||
| //     bench_server!(b, ("transfer-encoding", "chunked"), || body( | ||||
| //         b"Hello, World!" | ||||
| //     )) | ||||
| // } | ||||
| #[bench] | ||||
| fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) { | ||||
|     bench_server!(b, ("content-length", "1000000"), move || { | ||||
|         static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; | ||||
|         BodyExt::boxed(StreamBody::new( | ||||
|             stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)), | ||||
|         )) | ||||
|     }) | ||||
| } | ||||
|  | ||||
| // #[bench] | ||||
| // fn throughput_chunked_large_payload(b: &mut test::Bencher) { | ||||
| //     bench_server!(b, ("transfer-encoding", "chunked"), || body( | ||||
| //         &[b'x'; 1_000_000] | ||||
| //     )) | ||||
| // } | ||||
| #[bench] | ||||
| fn throughput_chunked_small_payload(b: &mut test::Bencher) { | ||||
|     bench_server!(b, ("transfer-encoding", "chunked"), || body( | ||||
|         b"Hello, World!" | ||||
|     )) | ||||
| } | ||||
|  | ||||
| // #[bench] | ||||
| // fn throughput_chunked_many_chunks(b: &mut test::Bencher) { | ||||
| //     bench_server!(b, ("transfer-encoding", "chunked"), || { | ||||
| //         static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; | ||||
| //         StreamBody::new(stream::iter(S.iter()).map(|&s| Ok::<_, String>(s))) | ||||
| //     }) | ||||
| // } | ||||
| #[bench] | ||||
| fn throughput_chunked_large_payload(b: &mut test::Bencher) { | ||||
|     bench_server!(b, ("transfer-encoding", "chunked"), || body( | ||||
|         &[b'x'; 1_000_000] | ||||
|     )) | ||||
| } | ||||
|  | ||||
| #[bench] | ||||
| fn throughput_chunked_many_chunks(b: &mut test::Bencher) { | ||||
|     bench_server!(b, ("transfer-encoding", "chunked"), || { | ||||
|         static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_000] as _; | ||||
|         BodyExt::boxed(StreamBody::new( | ||||
|             stream::iter(S.iter()).map(|&s| Ok::<_, String>(s)), | ||||
|         )) | ||||
|     }) | ||||
| } | ||||
|  | ||||
| #[bench] | ||||
| fn raw_tcp_throughput_small_payload(b: &mut test::Bencher) { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user