fix(client): Evict idle connections when full
In the scenario where a request is started on the `Client`, the client has a full slab, and sockets for a *different* domain are idling in keep-alive, the new request would previously cause the client to panic!. This patch adds a `spawn_error` handler which attempts to evict an idle connection to make space for the new request. If space cannot be made, the error handler is run (passed `Error::Full`) and the `Handler` is dropped. This is a breaking change because of the new variant of `Error`. Some inefficient use of `Vec` in the client was replaced with `VecDeque` to support push/pop from either end.
This commit is contained in:
		| @@ -3,7 +3,7 @@ | ||||
| //! The HTTP `Client` uses asynchronous IO, and utilizes the `Handler` trait | ||||
| //! to convey when IO events are available for a given request. | ||||
|  | ||||
| use std::collections::HashMap; | ||||
| use std::collections::{VecDeque, HashMap}; | ||||
| use std::fmt; | ||||
| use std::io; | ||||
| use std::marker::PhantomData; | ||||
| @@ -107,6 +107,7 @@ impl<H: Send> Client<H> { | ||||
|                 keep_alive: keep_alive, | ||||
|                 idle_conns: HashMap::new(), | ||||
|                 queue: HashMap::new(), | ||||
|                 awaiting_slot: VecDeque::new(), | ||||
|             }).unwrap() | ||||
|         })); | ||||
|  | ||||
| @@ -328,19 +329,52 @@ impl<H: Handler<T>, T: Transport> http::MessageHandler<T> for Message<H, T> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| struct Context<K, H> { | ||||
| struct Context<K, H, C: Connect> { | ||||
|     connect_timeout: Duration, | ||||
|     keep_alive: bool, | ||||
|     idle_conns: HashMap<K, Vec<http::Control>>, | ||||
|     queue: HashMap<K, Vec<Queued<H>>>, | ||||
|     idle_conns: HashMap<K, VecDeque<http::Control>>, | ||||
|     queue: HashMap<K, VecDeque<Queued<H>>>, | ||||
|     awaiting_slot: VecDeque<(C::Key, C::Output)>, | ||||
| } | ||||
|  | ||||
| impl<K: http::Key, H> Context<K, H> { | ||||
| /// Macro for advancing state of a ClientFsm::Socket | ||||
| /// | ||||
| /// This was previously a method on Context, but due to eviction needs, this | ||||
| /// block now needs access to the registration APIs on rotor::Scope. | ||||
| macro_rules! conn_response { | ||||
|     ($scope:expr, $conn:expr, $time:expr) => {{ | ||||
|         match $conn { | ||||
|             Some((conn, timeout)) => { | ||||
|                 //TODO: HTTP2: a connection doesn't need to be idle to be used for a second stream | ||||
|                 if conn.is_idle() { | ||||
|                     $scope.idle_conns.entry(conn.key().clone()).or_insert_with(VecDeque::new) | ||||
|                         .push_back(conn.control()); | ||||
|                 } | ||||
|                 match timeout { | ||||
|                     Some(dur) => rotor::Response::ok(ClientFsm::Socket(conn)) | ||||
|                         .deadline($time + dur), | ||||
|                     None => rotor::Response::ok(ClientFsm::Socket(conn)), | ||||
|                 } | ||||
|  | ||||
|             } | ||||
|             None => { | ||||
|                 if let Some((key, socket)) = $scope.awaiting_slot.pop_front() { | ||||
|                     rotor_try!($scope.register(&socket, EventSet::writable(), PollOpt::level())); | ||||
|                     rotor::Response::ok(ClientFsm::Connecting((key, socket))) | ||||
|                 } else { | ||||
|                     rotor::Response::done() | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     }} | ||||
| } | ||||
|  | ||||
| impl<K: http::Key, H, C: Connect> Context<K, H, C> { | ||||
|     fn pop_queue(&mut self, key: &K) -> Option<Queued<H>> { | ||||
|         let mut should_remove = false; | ||||
|         let queued = { | ||||
|             self.queue.get_mut(key).map(|vec| { | ||||
|                 let queued = vec.remove(0); | ||||
|             self.queue.get_mut(key).and_then(|vec| { | ||||
|                 let queued = vec.pop_front(); | ||||
|                 if vec.is_empty() { | ||||
|                     should_remove = true; | ||||
|                 } | ||||
| @@ -350,32 +384,17 @@ impl<K: http::Key, H> Context<K, H> { | ||||
|         if should_remove { | ||||
|             self.queue.remove(key); | ||||
|         } | ||||
|  | ||||
|         queued | ||||
|     } | ||||
|  | ||||
|     fn conn_response<C>(&mut self, conn: Option<(http::Conn<K, C::Output, Message<H, C::Output>>, Option<Duration>)>, time: rotor::Time) | ||||
|     -> rotor::Response<ClientFsm<C, H>, (C::Key, C::Output)> | ||||
|     where C: Connect<Key=K>, H: Handler<C::Output> { | ||||
|         match conn { | ||||
|             Some((conn, timeout)) => { | ||||
|                 //TODO: HTTP2: a connection doesn't need to be idle to be used for a second stream | ||||
|                 if conn.is_idle() { | ||||
|                     self.idle_conns.entry(conn.key().clone()).or_insert_with(Vec::new) | ||||
|                         .push(conn.control()); | ||||
|                 } | ||||
|                 match timeout { | ||||
|                     Some(dur) => rotor::Response::ok(ClientFsm::Socket(conn)) | ||||
|                         .deadline(time + dur), | ||||
|                     None => rotor::Response::ok(ClientFsm::Socket(conn)), | ||||
|                 } | ||||
|  | ||||
|             } | ||||
|             None => rotor::Response::done() | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<K: http::Key, H: Handler<T>, T: Transport> http::MessageHandlerFactory<K, T> for Context<K, H> { | ||||
| impl<K, H, T, C> http::MessageHandlerFactory<K, T> for Context<K, H, C> | ||||
|     where K: http::Key, | ||||
|           H: Handler<T>, | ||||
|           T: Transport, | ||||
|           C: Connect | ||||
| { | ||||
|     type Output = Message<H, T>; | ||||
|  | ||||
|     fn create(&mut self, seed: http::Seed<K>) -> Option<Self::Output> { | ||||
| @@ -424,7 +443,7 @@ where C: Connect, | ||||
|       C::Key: fmt::Debug, | ||||
|       C::Output: Transport, | ||||
|       H: Handler<C::Output> { | ||||
|     type Context = Context<C::Key, H>; | ||||
|     type Context = Context<C::Key, H, C>; | ||||
|     type Seed = (C::Key, C::Output); | ||||
|  | ||||
|     fn create(seed: Self::Seed, scope: &mut Scope<Self::Context>) -> rotor::Response<Self, rotor::Void> { | ||||
| @@ -437,7 +456,7 @@ where C: Connect, | ||||
|             ClientFsm::Socket(conn) => { | ||||
|                 let res = conn.ready(events, scope); | ||||
|                 let now = scope.now(); | ||||
|                 scope.conn_response(res, now) | ||||
|                 conn_response!(scope, res, now) | ||||
|             }, | ||||
|             ClientFsm::Connecting(mut seed) => { | ||||
|                 if events.is_error() || events.is_hup() { | ||||
| @@ -480,6 +499,70 @@ where C: Connect, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn spawn_error( | ||||
|         self, | ||||
|         scope: &mut Scope<Self::Context>, | ||||
|         error: rotor::SpawnError<Self::Seed> | ||||
|     ) -> rotor::Response<Self, Self::Seed> { | ||||
|         // see if there's an idle connections that can be terminated. If yes, put this seed on a | ||||
|         // list waiting for empty slot. | ||||
|         if let rotor::SpawnError::NoSlabSpace((key, socket)) = error { | ||||
|             if let Some(mut queued) = scope.pop_queue(&key) { | ||||
|                 trace!("attempting to remove an idle socket"); | ||||
|                 // Remove an idle connection. Any connection. Just make some space | ||||
|                 // for the new request. | ||||
|                 let mut remove_keys = Vec::new(); | ||||
|                 let mut found_idle = false; | ||||
|  | ||||
|                 // Check all idle connections regardless of origin | ||||
|                 for (key, idle) in scope.idle_conns.iter_mut() { | ||||
|                     while let Some(ctrl) = idle.pop_front() { | ||||
|                         // Signal connection to close. An err here means the | ||||
|                         // socket is already dead can should be tossed. | ||||
|                         if ctrl.ready(Next::remove()).is_ok() { | ||||
|                             found_idle = true; | ||||
|                             break; | ||||
|                         } | ||||
|                     } | ||||
|  | ||||
|                     // This list is empty, mark it for removal | ||||
|                     if idle.is_empty() { | ||||
|                         remove_keys.push(key.to_owned()); | ||||
|                     } | ||||
|  | ||||
|                     // if found, stop looking for an idle connection. | ||||
|                     if found_idle { | ||||
|                         break; | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 trace!("idle conns: {:?}", scope.idle_conns); | ||||
|  | ||||
|                 // Remove empty idle lists. | ||||
|                 for key in &remove_keys { | ||||
|                     scope.idle_conns.remove(&key); | ||||
|                 } | ||||
|  | ||||
|                 if found_idle { | ||||
|                     // A socket should be evicted soon; put it on a queue to | ||||
|                     // consume newly freed slot. Also need to put the Queued<H> | ||||
|                     // back onto front of queue. | ||||
|                     scope.awaiting_slot.push_back((key.clone(), socket)); | ||||
|                     scope.queue | ||||
|                         .entry(key) | ||||
|                         .or_insert_with(VecDeque::new) | ||||
|                         .push_back(queued); | ||||
|                 } else { | ||||
|                     // Couldn't evict a socket, just run the error handler. | ||||
|                     debug!("Error spawning state machine; slab full and no sockets idle"); | ||||
|                     let _ = queued.handler.on_error(::Error::Full); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         self.connect(scope) | ||||
|     } | ||||
|  | ||||
|     fn timeout(self, scope: &mut Scope<Self::Context>) -> rotor::Response<Self, Self::Seed> { | ||||
|         trace!("timeout now = {:?}", scope.now()); | ||||
|         match self { | ||||
| @@ -489,8 +572,8 @@ where C: Connect, | ||||
|                 { | ||||
|                     for (key, mut vec) in &mut scope.queue { | ||||
|                         while !vec.is_empty() && vec[0].deadline <= now { | ||||
|                             let mut queued = vec.remove(0); | ||||
|                             let _ = queued.handler.on_error(::Error::Timeout); | ||||
|                             vec.pop_front() | ||||
|                                .map(|mut queued| queued.handler.on_error(::Error::Timeout)); | ||||
|                         } | ||||
|                         if vec.is_empty() { | ||||
|                             empty_keys.push(key.clone()); | ||||
| @@ -511,7 +594,7 @@ where C: Connect, | ||||
|             ClientFsm::Socket(conn) => { | ||||
|                 let res = conn.timeout(scope); | ||||
|                 let now = scope.now(); | ||||
|                 scope.conn_response(res, now) | ||||
|                 conn_response!(scope, res, now) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| @@ -524,7 +607,7 @@ where C: Connect, | ||||
|             ClientFsm::Socket(conn) => { | ||||
|                 let res = conn.wakeup(scope); | ||||
|                 let now = scope.now(); | ||||
|                 scope.conn_response(res, now) | ||||
|                 conn_response!(scope, res, now) | ||||
|             }, | ||||
|             ClientFsm::Connecting(..) => unreachable!("connecting sockets should not be woken up") | ||||
|         } | ||||
| @@ -559,8 +642,7 @@ where C: Connect, | ||||
|                                 let mut remove_idle = false; | ||||
|                                 let mut woke_up = false; | ||||
|                                 if let Some(mut idle) = scope.idle_conns.get_mut(&key) { | ||||
|                                     while !idle.is_empty() { | ||||
|                                         let ctrl = idle.remove(0); | ||||
|                                     while let Some(ctrl) = idle.pop_front() { | ||||
|                                         // err means the socket has since died | ||||
|                                         if ctrl.ready(Next::write()).is_ok() { | ||||
|                                             woke_up = true; | ||||
| @@ -576,11 +658,14 @@ where C: Connect, | ||||
|                                 if woke_up { | ||||
|                                     trace!("woke up idle conn for '{}'", url); | ||||
|                                     let deadline = scope.now() + scope.connect_timeout; | ||||
|                                     scope.queue.entry(key).or_insert_with(Vec::new).push(Queued { | ||||
|                                         deadline: deadline, | ||||
|                                         handler: handler, | ||||
|                                         url: url | ||||
|                                     }); | ||||
|                                     scope.queue | ||||
|                                         .entry(key) | ||||
|                                         .or_insert_with(VecDeque::new) | ||||
|                                         .push_back(Queued { | ||||
|                                             deadline: deadline, | ||||
|                                             handler: handler, | ||||
|                                             url: url | ||||
|                                         }); | ||||
|                                     continue; | ||||
|                                 } | ||||
|                             } else { | ||||
| @@ -592,11 +677,14 @@ where C: Connect, | ||||
|                             match connector.connect(&url) { | ||||
|                                 Ok(key) => { | ||||
|                                     let deadline = scope.now() + scope.connect_timeout; | ||||
|                                     scope.queue.entry(key).or_insert_with(Vec::new).push(Queued { | ||||
|                                         deadline: deadline, | ||||
|                                         handler: handler, | ||||
|                                         url: url | ||||
|                                     }); | ||||
|                                     scope.queue | ||||
|                                         .entry(key) | ||||
|                                         .or_insert_with(VecDeque::new) | ||||
|                                         .push_back(Queued { | ||||
|                                             deadline: deadline, | ||||
|                                             handler: handler, | ||||
|                                             url: url | ||||
|                                         }); | ||||
|                                 } | ||||
|                                 Err(e) => { | ||||
|                                     let _todo = handler.on_error(e.into()); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user