diff --git a/src/client/mod.rs b/src/client/mod.rs index 10f1a1a0..515eef64 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -3,7 +3,7 @@ //! The HTTP `Client` uses asynchronous IO, and utilizes the `Handler` trait //! to convey when IO events are available for a given request. -use std::collections::HashMap; +use std::collections::{VecDeque, HashMap}; use std::fmt; use std::io; use std::marker::PhantomData; @@ -107,6 +107,7 @@ impl Client { keep_alive: keep_alive, idle_conns: HashMap::new(), queue: HashMap::new(), + awaiting_slot: VecDeque::new(), }).unwrap() })); @@ -328,19 +329,52 @@ impl, T: Transport> http::MessageHandler for Message { } } -struct Context { +struct Context { connect_timeout: Duration, keep_alive: bool, - idle_conns: HashMap>, - queue: HashMap>>, + idle_conns: HashMap>, + queue: HashMap>>, + awaiting_slot: VecDeque<(C::Key, C::Output)>, } -impl Context { +/// Macro for advancing state of a ClientFsm::Socket +/// +/// This was previously a method on Context, but due to eviction needs, this +/// block now needs access to the registration APIs on rotor::Scope. +macro_rules! conn_response { + ($scope:expr, $conn:expr, $time:expr) => {{ + match $conn { + Some((conn, timeout)) => { + //TODO: HTTP2: a connection doesn't need to be idle to be used for a second stream + if conn.is_idle() { + $scope.idle_conns.entry(conn.key().clone()).or_insert_with(VecDeque::new) + .push_back(conn.control()); + } + match timeout { + Some(dur) => rotor::Response::ok(ClientFsm::Socket(conn)) + .deadline($time + dur), + None => rotor::Response::ok(ClientFsm::Socket(conn)), + } + + } + None => { + if let Some((key, socket)) = $scope.awaiting_slot.pop_front() { + rotor_try!($scope.register(&socket, EventSet::writable(), PollOpt::level())); + rotor::Response::ok(ClientFsm::Connecting((key, socket))) + } else { + rotor::Response::done() + } + } + } + }} +} + +impl Context { fn pop_queue(&mut self, key: &K) -> Option> { let mut should_remove = false; let queued = { - self.queue.get_mut(key).map(|vec| { - let queued = vec.remove(0); + self.queue.get_mut(key).and_then(|vec| { + let queued = vec.pop_front(); if vec.is_empty() { should_remove = true; } @@ -350,32 +384,17 @@ impl Context { if should_remove { self.queue.remove(key); } + queued } - - fn conn_response(&mut self, conn: Option<(http::Conn>, Option)>, time: rotor::Time) - -> rotor::Response, (C::Key, C::Output)> - where C: Connect, H: Handler { - match conn { - Some((conn, timeout)) => { - //TODO: HTTP2: a connection doesn't need to be idle to be used for a second stream - if conn.is_idle() { - self.idle_conns.entry(conn.key().clone()).or_insert_with(Vec::new) - .push(conn.control()); - } - match timeout { - Some(dur) => rotor::Response::ok(ClientFsm::Socket(conn)) - .deadline(time + dur), - None => rotor::Response::ok(ClientFsm::Socket(conn)), - } - - } - None => rotor::Response::done() - } - } } -impl, T: Transport> http::MessageHandlerFactory for Context { +impl http::MessageHandlerFactory for Context + where K: http::Key, + H: Handler, + T: Transport, + C: Connect +{ type Output = Message; fn create(&mut self, seed: http::Seed) -> Option { @@ -424,7 +443,7 @@ where C: Connect, C::Key: fmt::Debug, C::Output: Transport, H: Handler { - type Context = Context; + type Context = Context; type Seed = (C::Key, C::Output); fn create(seed: Self::Seed, scope: &mut Scope) -> rotor::Response { @@ -437,7 +456,7 @@ where C: Connect, ClientFsm::Socket(conn) => { let res = conn.ready(events, scope); let now = scope.now(); - scope.conn_response(res, now) + conn_response!(scope, res, now) }, ClientFsm::Connecting(mut seed) => { if events.is_error() || events.is_hup() { @@ -480,6 +499,70 @@ where C: Connect, } } + fn spawn_error( + self, + scope: &mut Scope, + error: rotor::SpawnError + ) -> rotor::Response { + // see if there's an idle connections that can be terminated. If yes, put this seed on a + // list waiting for empty slot. + if let rotor::SpawnError::NoSlabSpace((key, socket)) = error { + if let Some(mut queued) = scope.pop_queue(&key) { + trace!("attempting to remove an idle socket"); + // Remove an idle connection. Any connection. Just make some space + // for the new request. + let mut remove_keys = Vec::new(); + let mut found_idle = false; + + // Check all idle connections regardless of origin + for (key, idle) in scope.idle_conns.iter_mut() { + while let Some(ctrl) = idle.pop_front() { + // Signal connection to close. An err here means the + // socket is already dead can should be tossed. + if ctrl.ready(Next::remove()).is_ok() { + found_idle = true; + break; + } + } + + // This list is empty, mark it for removal + if idle.is_empty() { + remove_keys.push(key.to_owned()); + } + + // if found, stop looking for an idle connection. + if found_idle { + break; + } + } + + trace!("idle conns: {:?}", scope.idle_conns); + + // Remove empty idle lists. + for key in &remove_keys { + scope.idle_conns.remove(&key); + } + + if found_idle { + // A socket should be evicted soon; put it on a queue to + // consume newly freed slot. Also need to put the Queued + // back onto front of queue. + scope.awaiting_slot.push_back((key.clone(), socket)); + scope.queue + .entry(key) + .or_insert_with(VecDeque::new) + .push_back(queued); + } else { + // Couldn't evict a socket, just run the error handler. + debug!("Error spawning state machine; slab full and no sockets idle"); + let _ = queued.handler.on_error(::Error::Full); + } + } + } + + self.connect(scope) + } + fn timeout(self, scope: &mut Scope) -> rotor::Response { trace!("timeout now = {:?}", scope.now()); match self { @@ -489,8 +572,8 @@ where C: Connect, { for (key, mut vec) in &mut scope.queue { while !vec.is_empty() && vec[0].deadline <= now { - let mut queued = vec.remove(0); - let _ = queued.handler.on_error(::Error::Timeout); + vec.pop_front() + .map(|mut queued| queued.handler.on_error(::Error::Timeout)); } if vec.is_empty() { empty_keys.push(key.clone()); @@ -511,7 +594,7 @@ where C: Connect, ClientFsm::Socket(conn) => { let res = conn.timeout(scope); let now = scope.now(); - scope.conn_response(res, now) + conn_response!(scope, res, now) } } } @@ -524,7 +607,7 @@ where C: Connect, ClientFsm::Socket(conn) => { let res = conn.wakeup(scope); let now = scope.now(); - scope.conn_response(res, now) + conn_response!(scope, res, now) }, ClientFsm::Connecting(..) => unreachable!("connecting sockets should not be woken up") } @@ -559,8 +642,7 @@ where C: Connect, let mut remove_idle = false; let mut woke_up = false; if let Some(mut idle) = scope.idle_conns.get_mut(&key) { - while !idle.is_empty() { - let ctrl = idle.remove(0); + while let Some(ctrl) = idle.pop_front() { // err means the socket has since died if ctrl.ready(Next::write()).is_ok() { woke_up = true; @@ -576,11 +658,14 @@ where C: Connect, if woke_up { trace!("woke up idle conn for '{}'", url); let deadline = scope.now() + scope.connect_timeout; - scope.queue.entry(key).or_insert_with(Vec::new).push(Queued { - deadline: deadline, - handler: handler, - url: url - }); + scope.queue + .entry(key) + .or_insert_with(VecDeque::new) + .push_back(Queued { + deadline: deadline, + handler: handler, + url: url + }); continue; } } else { @@ -592,11 +677,14 @@ where C: Connect, match connector.connect(&url) { Ok(key) => { let deadline = scope.now() + scope.connect_timeout; - scope.queue.entry(key).or_insert_with(Vec::new).push(Queued { - deadline: deadline, - handler: handler, - url: url - }); + scope.queue + .entry(key) + .or_insert_with(VecDeque::new) + .push_back(Queued { + deadline: deadline, + handler: handler, + url: url + }); } Err(e) => { let _todo = handler.on_error(e.into()); diff --git a/src/error.rs b/src/error.rs index 45c37af6..9e2f3f6b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -48,6 +48,8 @@ pub enum Error { Status, /// A timeout occurred waiting for an IO event. Timeout, + /// Event loop is full and cannot process request + Full, /// An `io::Error` that occurred while trying to read or write to a network stream. Io(IoError), /// An error from a SSL library. @@ -90,6 +92,7 @@ impl StdError for Error { Status => "Invalid Status provided", Incomplete => "Message is incomplete", Timeout => "Timeout", + Error::Full => "Event loop is full", Uri(ref e) => e.description(), Io(ref e) => e.description(), Ssl(ref e) => e.description(),