feat(client): implement connection pooling for Client
Closes #830 Closes #848
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
@@ -24,7 +25,6 @@ pub use self::response::Response;
|
||||
|
||||
mod connect;
|
||||
mod dns;
|
||||
//mod pool;
|
||||
mod request;
|
||||
mod response;
|
||||
|
||||
@@ -116,6 +116,7 @@ impl<H: Send> Client<H> {
|
||||
loop_.run(Context {
|
||||
connect_timeout: connect_timeout,
|
||||
keep_alive: keep_alive,
|
||||
idle_conns: HashMap::new(),
|
||||
queue: HashMap::new(),
|
||||
}).unwrap()
|
||||
}));
|
||||
@@ -332,7 +333,7 @@ impl<H: Handler<T>, T: Transport> http::MessageHandler<T> for Message<H, T> {
|
||||
struct Context<K, H> {
|
||||
connect_timeout: Duration,
|
||||
keep_alive: bool,
|
||||
// idle: HashMap<K, Vec<Notify>>,
|
||||
idle_conns: HashMap<K, Vec<http::Control>>,
|
||||
queue: HashMap<K, Vec<Queued<H>>>,
|
||||
}
|
||||
|
||||
@@ -352,6 +353,27 @@ impl<K: http::Key, H> Context<K, H> {
|
||||
}
|
||||
queued
|
||||
}
|
||||
|
||||
fn conn_response<C>(&mut self, conn: Option<(http::Conn<K, C::Output, Message<H, C::Output>>, Option<Duration>)>, time: rotor::Time)
|
||||
-> rotor::Response<ClientFsm<C, H>, (C::Key, C::Output)>
|
||||
where C: Connect<Key=K>, H: Handler<C::Output> {
|
||||
match conn {
|
||||
Some((conn, timeout)) => {
|
||||
//TODO: HTTP2: a connection doesn't need to be idle to be used for a second stream
|
||||
if conn.is_idle() {
|
||||
self.idle_conns.entry(conn.key().clone()).or_insert_with(Vec::new)
|
||||
.push(conn.control());
|
||||
}
|
||||
match timeout {
|
||||
Some(dur) => rotor::Response::ok(ClientFsm::Socket(conn))
|
||||
.deadline(time + dur),
|
||||
None => rotor::Response::ok(ClientFsm::Socket(conn)),
|
||||
}
|
||||
|
||||
}
|
||||
None => rotor::Response::done()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: http::Key, H: Handler<T>, T: Transport> http::MessageHandlerFactory<K, T> for Context<K, H> {
|
||||
@@ -414,14 +436,9 @@ where C: Connect,
|
||||
unreachable!("Connector can never be ready")
|
||||
},
|
||||
ClientFsm::Socket(conn) => {
|
||||
match conn.ready(events, scope) {
|
||||
Some((conn, None)) => rotor::Response::ok(ClientFsm::Socket(conn)),
|
||||
Some((conn, Some(dur))) => {
|
||||
rotor::Response::ok(ClientFsm::Socket(conn))
|
||||
.deadline(scope.now() + dur)
|
||||
}
|
||||
None => rotor::Response::done()
|
||||
}
|
||||
let res = conn.ready(events, scope);
|
||||
let now = scope.now();
|
||||
scope.conn_response(res, now)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -461,14 +478,9 @@ where C: Connect,
|
||||
}
|
||||
}
|
||||
ClientFsm::Socket(conn) => {
|
||||
match conn.timeout(scope) {
|
||||
Some((conn, None)) => rotor::Response::ok(ClientFsm::Socket(conn)),
|
||||
Some((conn, Some(dur))) => {
|
||||
rotor::Response::ok(ClientFsm::Socket(conn))
|
||||
.deadline(scope.now() + dur)
|
||||
}
|
||||
None => rotor::Response::done()
|
||||
}
|
||||
let res = conn.timeout(scope);
|
||||
let now = scope.now();
|
||||
scope.conn_response(res, now)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -478,13 +490,10 @@ where C: Connect,
|
||||
ClientFsm::Connector(..) => {
|
||||
self.connect(scope)
|
||||
},
|
||||
ClientFsm::Socket(conn) => match conn.wakeup(scope) {
|
||||
Some((conn, None)) => rotor::Response::ok(ClientFsm::Socket(conn)),
|
||||
Some((conn, Some(dur))) => {
|
||||
rotor::Response::ok(ClientFsm::Socket(conn))
|
||||
.deadline(scope.now() + dur)
|
||||
}
|
||||
None => rotor::Response::done()
|
||||
ClientFsm::Socket(conn) => {
|
||||
let res = conn.wakeup(scope);
|
||||
let now = scope.now();
|
||||
scope.conn_response(res, now)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -513,7 +522,41 @@ where C: Connect,
|
||||
loop {
|
||||
match rx.try_recv() {
|
||||
Ok(Notify::Connect(url, mut handler)) => {
|
||||
// TODO: check pool for sockets to this domain
|
||||
// check pool for sockets to this domain
|
||||
if let Some(key) = connector.key(&url) {
|
||||
let mut remove_idle = false;
|
||||
let mut woke_up = false;
|
||||
if let Some(mut idle) = scope.idle_conns.get_mut(&key) {
|
||||
while !idle.is_empty() {
|
||||
let ctrl = idle.remove(0);
|
||||
// err means the socket has since died
|
||||
if ctrl.ready(Next::write()).is_ok() {
|
||||
woke_up = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
remove_idle = idle.is_empty();
|
||||
}
|
||||
if remove_idle {
|
||||
scope.idle_conns.remove(&key);
|
||||
}
|
||||
|
||||
if woke_up {
|
||||
trace!("woke up idle conn for '{}'", url);
|
||||
let deadline = scope.now() + scope.connect_timeout;
|
||||
scope.queue.entry(key).or_insert_with(Vec::new).push(Queued {
|
||||
deadline: deadline,
|
||||
handler: handler,
|
||||
url: url
|
||||
});
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// this connector cannot handle this url anyways
|
||||
let _ = handler.on_error(io::Error::new(io::ErrorKind::InvalidInput, "invalid url for connector").into());
|
||||
continue;
|
||||
}
|
||||
// no exist connection, call connector
|
||||
match connector.connect(&url) {
|
||||
Ok(key) => {
|
||||
let deadline = scope.now() + scope.connect_timeout;
|
||||
|
||||
Reference in New Issue
Block a user