From 8b878a805ac6a95a758b6ef08f6410449c65648a Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 23 Oct 2019 15:48:47 -0700 Subject: [PATCH] perf(client): change HttpConnecting to hold Arc instead of inlined fields --- Cargo.toml | 5 +++ benches/connect.rs | 34 ++++++++++++++++ src/client/connect/http.rs | 82 +++++++++++++++----------------------- 3 files changed, 71 insertions(+), 50 deletions(-) create mode 100644 benches/connect.rs diff --git a/Cargo.toml b/Cargo.toml index 8ea63cfa..f68b0e58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -170,6 +170,11 @@ path = "examples/web_api.rs" required-features = ["runtime", "unstable-stream"] +[[bench]] +name = "connect" +path = "benches/connect.rs" +required-features = ["runtime"] + [[bench]] name = "end_to_end" path = "benches/end_to_end.rs" diff --git a/benches/connect.rs b/benches/connect.rs new file mode 100644 index 00000000..323ca0b8 --- /dev/null +++ b/benches/connect.rs @@ -0,0 +1,34 @@ +#![feature(test)] +#![deny(warnings)] + +extern crate test; + +use tokio::net::TcpListener; +use tokio::runtime::current_thread::Runtime; +use hyper::client::connect::{Destination, HttpConnector}; +use hyper::service::Service; +use http::Uri; + +#[bench] +fn http_connector(b: &mut test::Bencher) { + let _ = pretty_env_logger::try_init(); + let mut rt = Runtime::new().unwrap(); + let mut listener = rt.block_on(TcpListener::bind("127.0.0.1:0")).expect("bind"); + let addr = listener.local_addr().expect("local_addr"); + let uri: Uri = format!("http://{}/", addr).parse().expect("uri parse"); + let dst = Destination::try_from_uri(uri).expect("destination"); + let mut connector = HttpConnector::new(); + + rt.spawn(async move { + loop { + let _ = listener.accept().await; + } + }); + + + b.iter(|| { + rt.block_on(async { + connector.call(dst.clone()).await.expect("connect"); + }); + }); +} diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index 1ed5ef8c..f85802d9 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -228,6 +228,16 @@ impl HttpConnector { } } +impl HttpConnector { + fn invalid_url(&self, err: InvalidUrl) -> HttpConnecting { + HttpConnecting { + config: self.config.clone(), + state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))), + port: 0, + } + } +} + // R: Debug required for now to allow adding it to debug output later... impl fmt::Debug for HttpConnector { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -263,15 +273,15 @@ where if self.config.enforce_http { if dst.uri.scheme_part() != Some(&Scheme::HTTP) { - return invalid_url(InvalidUrl::NotHttp); + return self.invalid_url(InvalidUrl::NotHttp); } } else if dst.uri.scheme_part().is_none() { - return invalid_url(InvalidUrl::MissingScheme); + return self.invalid_url(InvalidUrl::MissingScheme); } let host = match dst.uri.host() { Some(s) => s, - None => return invalid_url(InvalidUrl::MissingAuthority), + None => return self.invalid_url(InvalidUrl::MissingAuthority), }; let port = match dst.uri.port_part() { Some(port) => port.as_u16(), @@ -279,16 +289,9 @@ where }; HttpConnecting { - state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address), - handle: self.config.handle.clone(), - connect_timeout: self.config.connect_timeout, - happy_eyeballs_timeout: self.config.happy_eyeballs_timeout, - keep_alive_timeout: self.config.keep_alive_timeout, - nodelay: self.config.nodelay, + config: self.config.clone(), + state: State::Lazy(self.resolver.clone(), host.into()), port, - reuse_address: self.config.reuse_address, - send_buffer_size: self.config.send_buffer_size, - recv_buffer_size: self.config.recv_buffer_size, } } } @@ -321,21 +324,6 @@ impl HttpInfo { } } -fn invalid_url(err: InvalidUrl) -> HttpConnecting { - HttpConnecting { - state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))), - handle: None, - keep_alive_timeout: None, - nodelay: false, - port: 0, - connect_timeout: None, - happy_eyeballs_timeout: None, - reuse_address: false, - send_buffer_size: None, - recv_buffer_size: None, - } -} - #[derive(Debug, Clone, Copy)] enum InvalidUrl { MissingScheme, @@ -362,23 +350,16 @@ impl StdError for InvalidUrl { #[must_use = "futures do nothing unless polled"] #[pin_project] pub struct HttpConnecting { + config: Arc, #[pin] state: State, - handle: Option, - connect_timeout: Option, - happy_eyeballs_timeout: Option, - keep_alive_timeout: Option, - nodelay: bool, port: u16, - reuse_address: bool, - send_buffer_size: Option, - recv_buffer_size: Option, } #[pin_project] enum State { - Lazy(R, String, Option), - Resolving(#[pin] R::Future, Option), + Lazy(R, String), + Resolving(#[pin] R::Future), Connecting(ConnectingTcp), Error(Option), } @@ -389,22 +370,23 @@ impl Future for HttpConnecting { #[project] fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { let mut me = self.project(); + let config: &Config = &me.config; loop { let state; #[project] match me.state.as_mut().project() { - State::Lazy(ref resolver, ref mut host, ref local_addr) => { + State::Lazy(ref resolver, ref mut host) => { // If the host is already an IP addr (v4 or v6), // skip resolving the dns and start connecting right away. if let Some(addrs) = dns::IpAddrs::try_parse(host, *me.port) { state = State::Connecting(ConnectingTcp::new( - **local_addr, addrs, *me.connect_timeout, *me.happy_eyeballs_timeout, *me.reuse_address)); + config.local_address, addrs, config.connect_timeout, config.happy_eyeballs_timeout, config.reuse_address)); } else { let name = dns::Name::new(mem::replace(host, String::new())); - state = State::Resolving(resolver.resolve(name), **local_addr); + state = State::Resolving(resolver.resolve(name)); } }, - State::Resolving(future, local_addr) => { + State::Resolving(future) => { let addrs = ready!(future.poll(cx))?; let port = *me.port; let addrs = addrs @@ -412,24 +394,24 @@ impl Future for HttpConnecting { .collect(); let addrs = dns::IpAddrs::new(addrs); state = State::Connecting(ConnectingTcp::new( - *local_addr, addrs, *me.connect_timeout, *me.happy_eyeballs_timeout, *me.reuse_address)); + config.local_address, addrs, config.connect_timeout, config.happy_eyeballs_timeout, config.reuse_address)); }, State::Connecting(ref mut c) => { - let sock = ready!(c.poll(cx, &me.handle))?; + let sock = ready!(c.poll(cx, &config.handle))?; - if let Some(dur) = me.keep_alive_timeout { - sock.set_keepalive(Some(*dur))?; + if let Some(dur) = config.keep_alive_timeout { + sock.set_keepalive(Some(dur))?; } - if let Some(size) = me.send_buffer_size { - sock.set_send_buffer_size(*size)?; + if let Some(size) = config.send_buffer_size { + sock.set_send_buffer_size(size)?; } - if let Some(size) = me.recv_buffer_size { - sock.set_recv_buffer_size(*size)?; + if let Some(size) = config.recv_buffer_size { + sock.set_recv_buffer_size(size)?; } - sock.set_nodelay(*me.nodelay)?; + sock.set_nodelay(config.nodelay)?; let extra = HttpInfo { remote_addr: sock.peer_addr()?,