feat(client): add max_idle_per_host configuration option
Setting this will cap the amount of idle connections kept around for a specific host. This defaults to no limit (`usize::MAX`).
This commit is contained in:
@@ -753,8 +753,7 @@ pub struct Builder {
|
|||||||
keep_alive_timeout: Option<Duration>,
|
keep_alive_timeout: Option<Duration>,
|
||||||
h1_writev: bool,
|
h1_writev: bool,
|
||||||
h1_title_case_headers: bool,
|
h1_title_case_headers: bool,
|
||||||
//TODO: make use of max_idle config
|
max_idle_per_host: usize,
|
||||||
max_idle: usize,
|
|
||||||
retry_canceled_requests: bool,
|
retry_canceled_requests: bool,
|
||||||
set_host: bool,
|
set_host: bool,
|
||||||
ver: Ver,
|
ver: Ver,
|
||||||
@@ -768,7 +767,7 @@ impl Default for Builder {
|
|||||||
keep_alive_timeout: Some(Duration::from_secs(90)),
|
keep_alive_timeout: Some(Duration::from_secs(90)),
|
||||||
h1_writev: true,
|
h1_writev: true,
|
||||||
h1_title_case_headers: false,
|
h1_title_case_headers: false,
|
||||||
max_idle: 5,
|
max_idle_per_host: ::std::usize::MAX,
|
||||||
retry_canceled_requests: true,
|
retry_canceled_requests: true,
|
||||||
set_host: true,
|
set_host: true,
|
||||||
ver: Ver::Http1,
|
ver: Ver::Http1,
|
||||||
@@ -839,6 +838,14 @@ impl Builder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets the maximum idle connection per host allowed in the pool.
|
||||||
|
///
|
||||||
|
/// Default is `usize::MAX` (no limit).
|
||||||
|
pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
|
||||||
|
self.max_idle_per_host = max_idle;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Set whether to retry requests that get disrupted before ever starting
|
/// Set whether to retry requests that get disrupted before ever starting
|
||||||
/// to write.
|
/// to write.
|
||||||
///
|
///
|
||||||
@@ -905,7 +912,12 @@ impl Builder {
|
|||||||
executor: self.exec.clone(),
|
executor: self.exec.clone(),
|
||||||
h1_writev: self.h1_writev,
|
h1_writev: self.h1_writev,
|
||||||
h1_title_case_headers: self.h1_title_case_headers,
|
h1_title_case_headers: self.h1_title_case_headers,
|
||||||
pool: Pool::new(self.keep_alive, self.keep_alive_timeout, &self.exec),
|
pool: Pool::new(
|
||||||
|
pool::Enabled(self.keep_alive),
|
||||||
|
pool::IdleTimeout(self.keep_alive_timeout),
|
||||||
|
pool::MaxIdlePerHost(self.max_idle_per_host),
|
||||||
|
&self.exec,
|
||||||
|
),
|
||||||
retry_canceled_requests: self.retry_canceled_requests,
|
retry_canceled_requests: self.retry_canceled_requests,
|
||||||
set_host: self.set_host,
|
set_host: self.set_host,
|
||||||
ver: self.ver,
|
ver: self.ver,
|
||||||
@@ -919,7 +931,7 @@ impl fmt::Debug for Builder {
|
|||||||
.field("keep_alive", &self.keep_alive)
|
.field("keep_alive", &self.keep_alive)
|
||||||
.field("keep_alive_timeout", &self.keep_alive_timeout)
|
.field("keep_alive_timeout", &self.keep_alive_timeout)
|
||||||
.field("http1_writev", &self.h1_writev)
|
.field("http1_writev", &self.h1_writev)
|
||||||
.field("max_idle", &self.max_idle)
|
.field("max_idle_per_host", &self.max_idle_per_host)
|
||||||
.field("set_host", &self.set_host)
|
.field("set_host", &self.set_host)
|
||||||
.field("version", &self.ver)
|
.field("version", &self.ver)
|
||||||
.finish()
|
.finish()
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ struct Connections<T> {
|
|||||||
// These are internal Conns sitting in the event loop in the KeepAlive
|
// These are internal Conns sitting in the event loop in the KeepAlive
|
||||||
// state, waiting to receive a new Request to send on the socket.
|
// state, waiting to receive a new Request to send on the socket.
|
||||||
idle: HashMap<Key, Vec<Idle<T>>>,
|
idle: HashMap<Key, Vec<Idle<T>>>,
|
||||||
|
max_idle_per_host: usize,
|
||||||
// These are outstanding Checkouts that are waiting for a socket to be
|
// These are outstanding Checkouts that are waiting for a socket to be
|
||||||
// able to send a Request one. This is used when "racing" for a new
|
// able to send a Request one. This is used when "racing" for a new
|
||||||
// connection.
|
// connection.
|
||||||
@@ -83,8 +84,13 @@ struct Connections<T> {
|
|||||||
// doesn't need it!
|
// doesn't need it!
|
||||||
struct WeakOpt<T>(Option<Weak<T>>);
|
struct WeakOpt<T>(Option<Weak<T>>);
|
||||||
|
|
||||||
|
// Newtypes to act as keyword arguments for `Pool::new`...
|
||||||
|
pub(super) struct Enabled(pub(super) bool);
|
||||||
|
pub(super) struct IdleTimeout(pub(super) Option<Duration>);
|
||||||
|
pub(super) struct MaxIdlePerHost(pub(super) usize);
|
||||||
|
|
||||||
impl<T> Pool<T> {
|
impl<T> Pool<T> {
|
||||||
pub fn new(enabled: bool, timeout: Option<Duration>, __exec: &Exec) -> Pool<T> {
|
pub fn new(enabled: Enabled, timeout: IdleTimeout, max_idle: MaxIdlePerHost, __exec: &Exec) -> Pool<T> {
|
||||||
Pool {
|
Pool {
|
||||||
inner: Arc::new(PoolInner {
|
inner: Arc::new(PoolInner {
|
||||||
connections: Mutex::new(Connections {
|
connections: Mutex::new(Connections {
|
||||||
@@ -92,12 +98,13 @@ impl<T> Pool<T> {
|
|||||||
idle: HashMap::new(),
|
idle: HashMap::new(),
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
idle_interval_ref: None,
|
idle_interval_ref: None,
|
||||||
|
max_idle_per_host: max_idle.0,
|
||||||
waiters: HashMap::new(),
|
waiters: HashMap::new(),
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
exec: __exec.clone(),
|
exec: __exec.clone(),
|
||||||
timeout,
|
timeout: timeout.0,
|
||||||
}),
|
}),
|
||||||
enabled,
|
enabled: enabled.0,
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -376,13 +383,23 @@ impl<T: Poolable> Connections<T> {
|
|||||||
|
|
||||||
match value {
|
match value {
|
||||||
Some(value) => {
|
Some(value) => {
|
||||||
debug!("pooling idle connection for {:?}", key);
|
// borrow-check scope...
|
||||||
self.idle.entry(key)
|
{
|
||||||
.or_insert(Vec::new())
|
let idle_list = self
|
||||||
.push(Idle {
|
.idle
|
||||||
value: value,
|
.entry(key.clone())
|
||||||
idle_at: Instant::now(),
|
.or_insert(Vec::new());
|
||||||
});
|
if self.max_idle_per_host <= idle_list.len() {
|
||||||
|
trace!("max idle per host for {:?}, dropping connection", key);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("pooling idle connection for {:?}", key);
|
||||||
|
idle_list.push(Idle {
|
||||||
|
value: value,
|
||||||
|
idle_at: Instant::now(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
{
|
{
|
||||||
@@ -773,7 +790,16 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn pool_no_timer<T>() -> Pool<T> {
|
fn pool_no_timer<T>() -> Pool<T> {
|
||||||
let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Default);
|
pool_max_idle_no_timer(::std::usize::MAX)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
|
||||||
|
let pool = Pool::new(
|
||||||
|
super::Enabled(true),
|
||||||
|
super::IdleTimeout(Some(Duration::from_millis(100))),
|
||||||
|
super::MaxIdlePerHost(max_idle),
|
||||||
|
&Exec::Default,
|
||||||
|
);
|
||||||
pool.no_timer();
|
pool.no_timer();
|
||||||
pool
|
pool
|
||||||
}
|
}
|
||||||
@@ -826,13 +852,35 @@ mod tests {
|
|||||||
}).wait().unwrap();
|
}).wait().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_pool_max_idle_per_host() {
|
||||||
|
future::lazy(|| {
|
||||||
|
let pool = pool_max_idle_no_timer(2);
|
||||||
|
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
||||||
|
|
||||||
|
pool.pooled(c(key.clone()), Uniq(41));
|
||||||
|
pool.pooled(c(key.clone()), Uniq(5));
|
||||||
|
pool.pooled(c(key.clone()), Uniq(99));
|
||||||
|
|
||||||
|
// pooled and dropped 3, max_idle should only allow 2
|
||||||
|
assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(2));
|
||||||
|
|
||||||
|
Ok::<(), ()>(())
|
||||||
|
}).wait().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
#[test]
|
#[test]
|
||||||
fn test_pool_timer_removes_expired() {
|
fn test_pool_timer_removes_expired() {
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use tokio_timer::Delay;
|
use tokio_timer::Delay;
|
||||||
let mut rt = ::tokio::runtime::current_thread::Runtime::new().unwrap();
|
let mut rt = ::tokio::runtime::current_thread::Runtime::new().unwrap();
|
||||||
let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Default);
|
let pool = Pool::new(
|
||||||
|
super::Enabled(true),
|
||||||
|
super::IdleTimeout(Some(Duration::from_millis(100))),
|
||||||
|
super::MaxIdlePerHost(::std::usize::MAX),
|
||||||
|
&Exec::Default,
|
||||||
|
);
|
||||||
|
|
||||||
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
let key = (Arc::new("foo".to_string()), Ver::Http1);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user