From 94e02ac276eb122df44ff4171620cddc53c1c381 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 24 Oct 2018 15:04:30 -0700 Subject: [PATCH] perf(http2): eagerly poll some futures in h2 dispatchers before allocating in executor --- benches/end_to_end.rs | 77 +++++++++++++++++++++++------------------- src/proto/h2/client.rs | 24 ++++++++----- src/proto/h2/server.rs | 33 ++++++++++++++---- 3 files changed, 85 insertions(+), 49 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index aedfaec2..78a3232d 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -3,6 +3,7 @@ extern crate futures; extern crate hyper; +extern crate pretty_env_logger; extern crate test; extern crate tokio; @@ -12,50 +13,60 @@ use futures::{Future, Stream}; use tokio::runtime::current_thread::Runtime; use tokio::net::TcpListener; -use hyper::{Body, Method, Request, Response}; +use hyper::{Body, Method, Request, Response, Version}; use hyper::client::HttpConnector; use hyper::server::conn::Http; - #[bench] -fn get_one_at_a_time(b: &mut test::Bencher) { - let mut rt = Runtime::new().unwrap(); - let addr = spawn_hello(&mut rt); - - let connector = HttpConnector::new(1); - let client = hyper::Client::builder() - .build::<_, Body>(connector); - - let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap(); - - b.bytes = 160 * 2 + PHRASE.len() as u64; - b.iter(move || { - rt.block_on(client.get(url.clone()) - .and_then(|res| { - res.into_body().for_each(|_chunk| { - Ok(()) - }) - })) - .expect("client wait"); +fn http1_get(b: &mut test::Bencher) { + bench_with(b, Version::HTTP_11, || { + Request::new(Body::empty()) }); } #[bench] -fn post_one_at_a_time(b: &mut test::Bencher) { +fn http1_post(b: &mut test::Bencher) { + bench_with(b, Version::HTTP_11, || { + let mut req = Request::new("foo bar baz quux".into()); + *req.method_mut() = Method::POST; + req + }); +} + +#[bench] +fn http2_get(b: &mut test::Bencher) { + bench_with(b, Version::HTTP_2, || { + Request::new(Body::empty()) + }); +} + +#[bench] +fn http2_post(b: &mut test::Bencher) { + bench_with(b, Version::HTTP_2, || { + let mut req = Request::new("foo bar baz quux".into()); + *req.method_mut() = Method::POST; + req + }); +} + +fn bench_with(b: &mut test::Bencher, version: Version, make_request: F) +where + F: Fn() -> Request, +{ let mut rt = Runtime::new().unwrap(); - let addr = spawn_hello(&mut rt); + let body = b"Hello"; + let addr = spawn_hello(&mut rt, body); let connector = HttpConnector::new(1); let client = hyper::Client::builder() + .http2_only(version == Version::HTTP_2) .build::<_, Body>(connector); - let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap(); + let url: hyper::Uri = format!("http://{}/hello", addr).parse().unwrap(); - let post = "foo bar baz quux"; - b.bytes = 180 * 2 + post.len() as u64 + PHRASE.len() as u64; + b.bytes = body.len() as u64; b.iter(move || { - let mut req = Request::new(post.into()); - *req.method_mut() = Method::POST; + let mut req = make_request(); *req.uri_mut() = url.clone(); rt.block_on(client.request(req).and_then(|res| { res.into_body().for_each(|_chunk| { @@ -65,9 +76,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) { }); } -static PHRASE: &'static [u8] = include_bytes!("../CHANGELOG.md"); //b"Hello, World!"; - -fn spawn_hello(rt: &mut Runtime) -> SocketAddr { +fn spawn_hello(rt: &mut Runtime, body: &'static [u8]) -> SocketAddr { use hyper::service::{service_fn}; let addr = "127.0.0.1:0".parse().unwrap(); let listener = TcpListener::bind(&addr).unwrap(); @@ -75,11 +84,11 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr { let http = Http::new(); - let service = service_fn(|req: Request| { + let service = service_fn(move |req: Request| { req.into_body() .concat2() - .map(|_| { - Response::new(Body::from(PHRASE)) + .map(move |_| { + Response::new(Body::from(body)) }) }); diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index b285574f..7ccceac0 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -122,14 +122,22 @@ where } }; if !eos { - let conn_drop_ref = conn_dropper.clone(); - let pipe = PipeToSendStream::new(body, body_tx) - .map_err(|e| debug!("client request body error: {}", e)) - .then(move |x| { - drop(conn_drop_ref); - x - }); - self.executor.execute(pipe)?; + let mut pipe = PipeToSendStream::new(body, body_tx) + .map_err(|e| debug!("client request body error: {}", e)); + + // eagerly see if the body pipe is ready and + // can thus skip allocating in the executor + match pipe.poll() { + Ok(Async::Ready(())) | Err(()) => (), + Ok(Async::NotReady) => { + let conn_drop_ref = conn_dropper.clone(); + let pipe = pipe.then(move |x| { + drop(conn_drop_ref); + x + }); + self.executor.execute(pipe)?; + } + } } let fut = fut diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index fdd27272..9af2073e 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -5,6 +5,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use ::headers::content_length_parse_all; use ::body::Payload; +use body::internal::FullDataArg; use ::common::exec::H2Exec; use ::headers; use ::service::Service; @@ -133,8 +134,16 @@ where let req = req.map(|stream| { ::Body::h2(stream, content_length) }); - let fut = H2Stream::new(service.call(req), respond); - exec.execute_h2stream(fut)?; + let mut fut = H2Stream::new(service.call(req), respond); + + // try to eagerly poll the future, so that we might + // not need to allocate a new task... + match fut.poll() { + Ok(Async::Ready(())) | Err(()) => (), + Ok(Async::NotReady) => { + exec.execute_h2stream(fut)?; + } + } } // no more incoming streams... @@ -193,7 +202,7 @@ where Err(e) => return Err(::Error::new_user_service(e)), }; - let (head, body) = res.into_parts(); + let (head, mut body) = res.into_parts(); let mut res = ::http::Response::from_parts(head, ()); super::strip_connection_headers(res.headers_mut(), false); @@ -204,10 +213,6 @@ where .expect("DATE is a valid HeaderName") .or_insert_with(::proto::h1::date::update_and_header_value); - // automatically set Content-Length from body... - if let Some(len) = body.content_length() { - headers::set_content_length_if_missing(res.headers_mut(), len); - } macro_rules! reply { ($eos:expr) => ({ match self.reply.send_response(res, $eos) { @@ -220,6 +225,20 @@ where } }) } + + if let Some(full) = body.__hyper_full_data(FullDataArg(())).0 { + let mut body_tx = reply!(false); + let buf = SendBuf(Some(full)); + body_tx + .send_data(buf, true) + .map_err(::Error::new_body_write)?; + return Ok(Async::Ready(())); + } + + // automatically set Content-Length from body... + if let Some(len) = body.content_length() { + headers::set_content_length_if_missing(res.headers_mut(), len); + } if !body.is_end_stream() { let body_tx = reply!(false); H2StreamState::Body(PipeToSendStream::new(body, body_tx))