refactor(client): clean up client config fields
This commit is contained in:
@@ -71,7 +71,7 @@ where
|
||||
/// After setting options, the builder is used to create a `Handshake` future.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Builder {
|
||||
exec: Exec,
|
||||
pub(super) exec: Exec,
|
||||
h1_writev: bool,
|
||||
h1_title_case_headers: bool,
|
||||
h1_read_buf_exact_size: Option<usize>,
|
||||
|
||||
@@ -90,7 +90,7 @@ use http::header::{HeaderValue, HOST};
|
||||
use http::uri::Scheme;
|
||||
|
||||
use body::{Body, Payload};
|
||||
use common::{Exec, lazy as hyper_lazy, Lazy};
|
||||
use common::{lazy as hyper_lazy, Lazy};
|
||||
use self::connect::{Alpn, Connect, Connected, Destination};
|
||||
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
|
||||
|
||||
@@ -105,12 +105,14 @@ mod tests;
|
||||
|
||||
/// A Client to make outgoing HTTP requests.
|
||||
pub struct Client<C, B = Body> {
|
||||
config: Config,
|
||||
conn_builder: conn::Builder,
|
||||
connector: Arc<C>,
|
||||
executor: Exec,
|
||||
h1_writev: bool,
|
||||
h1_title_case_headers: bool,
|
||||
pool: Pool<PoolClient<B>>,
|
||||
h1_read_buf_exact_size: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
struct Config {
|
||||
retry_canceled_requests: bool,
|
||||
set_host: bool,
|
||||
ver: Ver,
|
||||
@@ -200,7 +202,7 @@ where C: Connect + Sync + 'static,
|
||||
debug!("CONNECT is not allowed for HTTP/1.0");
|
||||
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method())));
|
||||
},
|
||||
other => if self.ver != Ver::Http2 {
|
||||
other => if self.config.ver != Ver::Http2 {
|
||||
error!("Request has unsupported version \"{:?}\"", other);
|
||||
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version())));
|
||||
}
|
||||
@@ -250,7 +252,7 @@ where C: Connect + Sync + 'static,
|
||||
mut req,
|
||||
reason,
|
||||
}) => {
|
||||
if !client.retry_canceled_requests || !connection_reused {
|
||||
if !client.config.retry_canceled_requests || !connection_reused {
|
||||
// if client disabled, don't retry
|
||||
// a fresh connection means we definitely can't retry
|
||||
return Err(reason);
|
||||
@@ -267,8 +269,8 @@ where C: Connect + Sync + 'static,
|
||||
fn send_request(&self, mut req: Request<B>, pool_key: PoolKey) -> impl Future<Item=Response<Body>, Error=ClientError<B>> {
|
||||
let conn = self.connection_for(req.uri().clone(), pool_key);
|
||||
|
||||
let set_host = self.set_host;
|
||||
let executor = self.executor.clone();
|
||||
let set_host = self.config.set_host;
|
||||
let executor = self.conn_builder.exec.clone();
|
||||
conn.and_then(move |mut pooled| {
|
||||
if pooled.is_http1() {
|
||||
if set_host {
|
||||
@@ -392,7 +394,7 @@ where C: Connect + Sync + 'static,
|
||||
let checkout = self.pool.checkout(pool_key.clone());
|
||||
let connect = self.connect_to(uri, pool_key);
|
||||
|
||||
let executor = self.executor.clone();
|
||||
let executor = self.conn_builder.exec.clone();
|
||||
checkout
|
||||
// The order of the `select` is depended on below...
|
||||
.select2(connect)
|
||||
@@ -458,13 +460,11 @@ where C: Connect + Sync + 'static,
|
||||
fn connect_to(&self, uri: Uri, pool_key: PoolKey)
|
||||
-> impl Lazy<Item=Pooled<PoolClient<B>>, Error=::Error>
|
||||
{
|
||||
let executor = self.executor.clone();
|
||||
let executor = self.conn_builder.exec.clone();
|
||||
let pool = self.pool.clone();
|
||||
let h1_writev = self.h1_writev;
|
||||
let h1_title_case_headers = self.h1_title_case_headers;
|
||||
let h1_read_buf_exact_size = self.h1_read_buf_exact_size;
|
||||
let ver = self.ver;
|
||||
let is_ver_h2 = self.ver == Ver::Http2;
|
||||
let mut conn_builder = self.conn_builder.clone();
|
||||
let ver = self.config.ver;
|
||||
let is_ver_h2 = ver == Ver::Http2;
|
||||
let connector = self.connector.clone();
|
||||
let dst = Destination {
|
||||
uri,
|
||||
@@ -505,11 +505,7 @@ where C: Connect + Sync + 'static,
|
||||
connecting
|
||||
};
|
||||
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
|
||||
Either::A(conn::Builder::new()
|
||||
.exec(executor.clone())
|
||||
.h1_writev(h1_writev)
|
||||
.h1_title_case_headers(h1_title_case_headers)
|
||||
.h1_read_buf_exact_size(h1_read_buf_exact_size)
|
||||
Either::A(conn_builder
|
||||
.http2_only(is_h2)
|
||||
.handshake(io)
|
||||
.and_then(move |(tx, conn)| {
|
||||
@@ -546,15 +542,10 @@ where C: Connect + Sync + 'static,
|
||||
impl<C, B> Clone for Client<C, B> {
|
||||
fn clone(&self) -> Client<C, B> {
|
||||
Client {
|
||||
config: self.config.clone(),
|
||||
conn_builder: self.conn_builder.clone(),
|
||||
connector: self.connector.clone(),
|
||||
executor: self.executor.clone(),
|
||||
h1_writev: self.h1_writev,
|
||||
h1_read_buf_exact_size: self.h1_read_buf_exact_size,
|
||||
h1_title_case_headers: self.h1_title_case_headers,
|
||||
pool: self.pool.clone(),
|
||||
retry_canceled_requests: self.retry_canceled_requests,
|
||||
set_host: self.set_host,
|
||||
ver: self.ver,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -790,32 +781,25 @@ fn set_scheme(uri: &mut Uri, scheme: Scheme) {
|
||||
/// Builder for a Client
|
||||
#[derive(Clone)]
|
||||
pub struct Builder {
|
||||
//connect_timeout: Duration,
|
||||
exec: Exec,
|
||||
keep_alive: bool,
|
||||
keep_alive_timeout: Option<Duration>,
|
||||
h1_writev: bool,
|
||||
h1_title_case_headers: bool,
|
||||
h1_read_buf_exact_size: Option<usize>,
|
||||
max_idle_per_host: usize,
|
||||
retry_canceled_requests: bool,
|
||||
set_host: bool,
|
||||
ver: Ver,
|
||||
client_config: Config,
|
||||
conn_builder: conn::Builder,
|
||||
pool_config: pool::Config,
|
||||
}
|
||||
|
||||
impl Default for Builder {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
exec: Exec::Default,
|
||||
keep_alive: true,
|
||||
keep_alive_timeout: Some(Duration::from_secs(90)),
|
||||
h1_writev: true,
|
||||
h1_title_case_headers: false,
|
||||
h1_read_buf_exact_size: None,
|
||||
max_idle_per_host: ::std::usize::MAX,
|
||||
retry_canceled_requests: true,
|
||||
set_host: true,
|
||||
ver: Ver::Http1,
|
||||
client_config: Config {
|
||||
retry_canceled_requests: true,
|
||||
set_host: true,
|
||||
ver: Ver::Http1,
|
||||
},
|
||||
conn_builder: conn::Builder::new(),
|
||||
pool_config: pool::Config {
|
||||
enabled: true,
|
||||
keep_alive_timeout: Some(Duration::from_secs(90)),
|
||||
max_idle_per_host: ::std::usize::MAX,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -826,7 +810,7 @@ impl Builder {
|
||||
/// Default is enabled.
|
||||
#[inline]
|
||||
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
|
||||
self.keep_alive = val;
|
||||
self.pool_config.enabled = val;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -840,7 +824,7 @@ impl Builder {
|
||||
where
|
||||
D: Into<Option<Duration>>,
|
||||
{
|
||||
self.keep_alive_timeout = val.into();
|
||||
self.pool_config.keep_alive_timeout = val.into();
|
||||
self
|
||||
}
|
||||
|
||||
@@ -854,7 +838,7 @@ impl Builder {
|
||||
/// Default is `true`.
|
||||
#[inline]
|
||||
pub fn http1_writev(&mut self, val: bool) -> &mut Self {
|
||||
self.h1_writev = val;
|
||||
self.conn_builder.h1_writev(val);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -863,7 +847,7 @@ impl Builder {
|
||||
/// Default is an adaptive read buffer.
|
||||
#[inline]
|
||||
pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
|
||||
self.h1_read_buf_exact_size = Some(sz);
|
||||
self.conn_builder.h1_read_buf_exact_size(Some(sz));
|
||||
self
|
||||
}
|
||||
|
||||
@@ -874,7 +858,7 @@ impl Builder {
|
||||
///
|
||||
/// Default is false.
|
||||
pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
|
||||
self.h1_title_case_headers = val;
|
||||
self.conn_builder.h1_title_case_headers(val);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -884,7 +868,7 @@ impl Builder {
|
||||
///
|
||||
/// Default is false.
|
||||
pub fn http2_only(&mut self, val: bool) -> &mut Self {
|
||||
self.ver = if val {
|
||||
self.client_config.ver = if val {
|
||||
Ver::Http2
|
||||
} else {
|
||||
Ver::Http1
|
||||
@@ -896,7 +880,7 @@ impl Builder {
|
||||
///
|
||||
/// Default is `usize::MAX` (no limit).
|
||||
pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
|
||||
self.max_idle_per_host = max_idle;
|
||||
self.pool_config.max_idle_per_host = max_idle;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -913,7 +897,7 @@ impl Builder {
|
||||
/// Default is `true`.
|
||||
#[inline]
|
||||
pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
|
||||
self.retry_canceled_requests = val;
|
||||
self.client_config.retry_canceled_requests = val;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -925,7 +909,7 @@ impl Builder {
|
||||
/// Default is `true`.
|
||||
#[inline]
|
||||
pub fn set_host(&mut self, val: bool) -> &mut Self {
|
||||
self.set_host = val;
|
||||
self.client_config.set_host = val;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -934,7 +918,7 @@ impl Builder {
|
||||
where
|
||||
E: Executor<Box<Future<Item=(), Error=()> + Send>> + Send + Sync + 'static,
|
||||
{
|
||||
self.exec = Exec::Executor(Arc::new(exec));
|
||||
self.conn_builder.executor(exec);
|
||||
self
|
||||
}
|
||||
|
||||
@@ -946,8 +930,8 @@ impl Builder {
|
||||
B::Data: Send,
|
||||
{
|
||||
let mut connector = HttpConnector::new(4);
|
||||
if self.keep_alive {
|
||||
connector.set_keepalive(self.keep_alive_timeout);
|
||||
if self.pool_config.enabled {
|
||||
connector.set_keepalive(self.pool_config.keep_alive_timeout);
|
||||
}
|
||||
self.build(connector)
|
||||
}
|
||||
@@ -962,20 +946,10 @@ impl Builder {
|
||||
B::Data: Send,
|
||||
{
|
||||
Client {
|
||||
config: self.client_config,
|
||||
conn_builder: self.conn_builder.clone(),
|
||||
connector: Arc::new(connector),
|
||||
executor: self.exec.clone(),
|
||||
h1_writev: self.h1_writev,
|
||||
h1_title_case_headers: self.h1_title_case_headers,
|
||||
h1_read_buf_exact_size: self.h1_read_buf_exact_size,
|
||||
pool: Pool::new(
|
||||
pool::Enabled(self.keep_alive),
|
||||
pool::IdleTimeout(self.keep_alive_timeout),
|
||||
pool::MaxIdlePerHost(self.max_idle_per_host),
|
||||
&self.exec,
|
||||
),
|
||||
retry_canceled_requests: self.retry_canceled_requests,
|
||||
set_host: self.set_host,
|
||||
ver: self.ver,
|
||||
pool: Pool::new(self.pool_config, &self.conn_builder.exec),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -983,13 +957,9 @@ impl Builder {
|
||||
impl fmt::Debug for Builder {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_struct("Builder")
|
||||
.field("keep_alive", &self.keep_alive)
|
||||
.field("keep_alive_timeout", &self.keep_alive_timeout)
|
||||
.field("http1_read_buf_exact_size", &self.h1_read_buf_exact_size)
|
||||
.field("http1_writev", &self.h1_writev)
|
||||
.field("max_idle_per_host", &self.max_idle_per_host)
|
||||
.field("set_host", &self.set_host)
|
||||
.field("version", &self.ver)
|
||||
.field("client_config", &self.client_config)
|
||||
.field("conn_builder", &self.conn_builder)
|
||||
.field("pool_config", &self.pool_config)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,33 +85,26 @@ struct PoolInner<T> {
|
||||
// doesn't need it!
|
||||
struct WeakOpt<T>(Option<Weak<T>>);
|
||||
|
||||
// Newtypes to act as keyword arguments for `Pool::new`...
|
||||
|
||||
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub(super) struct Enabled(pub(super) bool);
|
||||
|
||||
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub(super) struct IdleTimeout(pub(super) Option<Duration>);
|
||||
|
||||
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub(super) struct MaxIdlePerHost(pub(super) usize);
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub(super) struct Config {
|
||||
pub(super) enabled: bool,
|
||||
pub(super) keep_alive_timeout: Option<Duration>,
|
||||
pub(super) max_idle_per_host: usize,
|
||||
}
|
||||
|
||||
impl<T> Pool<T> {
|
||||
pub fn new(enabled: Enabled, timeout: IdleTimeout, max_idle: MaxIdlePerHost, __exec: &Exec) -> Pool<T> {
|
||||
let inner = if enabled.0 {
|
||||
pub fn new(config: Config, __exec: &Exec) -> Pool<T> {
|
||||
let inner = if config.enabled {
|
||||
Some(Arc::new(Mutex::new(PoolInner {
|
||||
connecting: HashSet::new(),
|
||||
idle: HashMap::new(),
|
||||
#[cfg(feature = "runtime")]
|
||||
idle_interval_ref: None,
|
||||
max_idle_per_host: max_idle.0,
|
||||
max_idle_per_host: config.max_idle_per_host,
|
||||
waiters: HashMap::new(),
|
||||
#[cfg(feature = "runtime")]
|
||||
exec: __exec.clone(),
|
||||
timeout: timeout.0,
|
||||
timeout: config.keep_alive_timeout,
|
||||
})))
|
||||
} else {
|
||||
None
|
||||
|
||||
Reference in New Issue
Block a user