refactor(client): minimize the amount of instantiated code (#2391)

* Halve the amount of conn_task instantiations
* Extract non-generic parts of client::handshake
* Extract the non-generic parts of call_async
This commit is contained in:
Markus Westerlind
2021-01-13 01:18:22 +01:00
committed by GitHub
parent 446dd2f0c7
commit a15f3f7f0f
2 changed files with 87 additions and 72 deletions

View File

@@ -272,54 +272,60 @@ where
} }
} }
fn get_host_port<'u>(config: &Config, dst: &'u Uri) -> Result<(&'u str, u16), ConnectError> {
trace!(
"Http::connect; scheme={:?}, host={:?}, port={:?}",
dst.scheme(),
dst.host(),
dst.port(),
);
if config.enforce_http {
if dst.scheme() != Some(&Scheme::HTTP) {
return Err(ConnectError {
msg: INVALID_NOT_HTTP.into(),
cause: None,
});
}
} else if dst.scheme().is_none() {
return Err(ConnectError {
msg: INVALID_MISSING_SCHEME.into(),
cause: None,
});
}
let host = match dst.host() {
Some(s) => s,
None => {
return Err(ConnectError {
msg: INVALID_MISSING_HOST.into(),
cause: None,
})
}
};
let port = match dst.port() {
Some(port) => port.as_u16(),
None => {
if dst.scheme() == Some(&Scheme::HTTPS) {
443
} else {
80
}
}
};
Ok((host, port))
}
impl<R> HttpConnector<R> impl<R> HttpConnector<R>
where where
R: Resolve, R: Resolve,
{ {
async fn call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError> { async fn call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError> {
trace!(
"Http::connect; scheme={:?}, host={:?}, port={:?}",
dst.scheme(),
dst.host(),
dst.port(),
);
if self.config.enforce_http {
if dst.scheme() != Some(&Scheme::HTTP) {
return Err(ConnectError {
msg: INVALID_NOT_HTTP.into(),
cause: None,
});
}
} else if dst.scheme().is_none() {
return Err(ConnectError {
msg: INVALID_MISSING_SCHEME.into(),
cause: None,
});
}
let host = match dst.host() {
Some(s) => s,
None => {
return Err(ConnectError {
msg: INVALID_MISSING_HOST.into(),
cause: None,
})
}
};
let port = match dst.port() {
Some(port) => port.as_u16(),
None => {
if dst.scheme() == Some(&Scheme::HTTPS) {
443
} else {
80
}
}
};
let config = &self.config; let config = &self.config;
let (host, port) = get_host_port(config, &dst)?;
// If the host is already an IP addr (v4 or v6), // If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away. // skip resolving the dns and start connecting right away.
let addrs = if let Some(addrs) = dns::SocketAddrs::try_parse(host, port) { let addrs = if let Some(addrs) = dns::SocketAddrs::try_parse(host, port) {
@@ -328,10 +334,12 @@ where
let addrs = resolve(&mut self.resolver, dns::Name::new(host.into())) let addrs = resolve(&mut self.resolver, dns::Name::new(host.into()))
.await .await
.map_err(ConnectError::dns)?; .map_err(ConnectError::dns)?;
let addrs = addrs.map(|mut addr| { let addrs = addrs
addr.set_port(port); .map(|mut addr| {
addr addr.set_port(port);
}).collect(); addr
})
.collect();
dns::SocketAddrs::new(addrs) dns::SocketAddrs::new(addrs)
}; };

View File

@@ -63,6 +63,32 @@ impl Default for Config {
} }
} }
fn new_builder(config: &Config) -> Builder {
let mut builder = Builder::default();
builder
.initial_window_size(config.initial_stream_window_size)
.initial_connection_window_size(config.initial_conn_window_size)
.max_frame_size(config.max_frame_size)
.enable_push(false);
builder
}
fn new_ping_config(config: &Config) -> ping::Config {
ping::Config {
bdp_initial_window: if config.adaptive_window {
Some(config.initial_stream_window_size)
} else {
None
},
#[cfg(feature = "runtime")]
keep_alive_interval: config.keep_alive_interval,
#[cfg(feature = "runtime")]
keep_alive_timeout: config.keep_alive_timeout,
#[cfg(feature = "runtime")]
keep_alive_while_idle: config.keep_alive_while_idle,
}
}
pub(crate) async fn handshake<T, B>( pub(crate) async fn handshake<T, B>(
io: T, io: T,
req_rx: ClientRx<B>, req_rx: ClientRx<B>,
@@ -74,11 +100,7 @@ where
B: HttpBody, B: HttpBody,
B::Data: Send + 'static, B::Data: Send + 'static,
{ {
let (h2_tx, mut conn) = Builder::default() let (h2_tx, mut conn) = new_builder(config)
.initial_window_size(config.initial_stream_window_size)
.initial_connection_window_size(config.initial_conn_window_size)
.max_frame_size(config.max_frame_size)
.enable_push(false)
.handshake::<_, SendBuf<B::Data>>(io) .handshake::<_, SendBuf<B::Data>>(io)
.await .await
.map_err(crate::Error::new_h2)?; .map_err(crate::Error::new_h2)?;
@@ -96,21 +118,9 @@ where
} }
}); });
let ping_config = ping::Config { let ping_config = new_ping_config(&config);
bdp_initial_window: if config.adaptive_window {
Some(config.initial_stream_window_size)
} else {
None
},
#[cfg(feature = "runtime")]
keep_alive_interval: config.keep_alive_interval,
#[cfg(feature = "runtime")]
keep_alive_timeout: config.keep_alive_timeout,
#[cfg(feature = "runtime")]
keep_alive_while_idle: config.keep_alive_while_idle,
};
let ping = if ping_config.is_enabled() { let (conn, ping) = if ping_config.is_enabled() {
let pp = conn.ping_pong().expect("conn.ping_pong"); let pp = conn.ping_pong().expect("conn.ping_pong");
let (recorder, mut ponger) = ping::channel(pp, ping_config); let (recorder, mut ponger) = ping::channel(pp, ping_config);
@@ -130,16 +140,13 @@ where
Pin::new(&mut conn).poll(cx) Pin::new(&mut conn).poll(cx)
}); });
let conn = conn.map_err(|e| debug!("connection error: {}", e)); (Either::Left(conn), recorder)
exec.execute(conn_task(conn, conn_drop_rx, cancel_tx));
recorder
} else { } else {
let conn = conn.map_err(|e| debug!("connection error: {}", e)); (Either::Right(conn), ping::disabled())
exec.execute(conn_task(conn, conn_drop_rx, cancel_tx));
ping::disabled()
}; };
let conn = conn.map_err(|e| debug!("connection error: {}", e));
exec.execute(conn_task(conn, conn_drop_rx, cancel_tx));
Ok(ClientTask { Ok(ClientTask {
ping, ping,