diff --git a/src/client/pool.rs b/src/client/pool.rs deleted file mode 100644 index f3198d5b..00000000 --- a/src/client/pool.rs +++ /dev/null @@ -1,291 +0,0 @@ -//! Client Connection Pooling -use std::borrow::ToOwned; -use std::collections::HashMap; -use std::io::{self, Read, Write}; -use std::net::{SocketAddr, Shutdown}; -use std::sync::{Arc, Mutex}; - -use std::time::Duration; - -use net::{NetworkConnector, NetworkStream, DefaultConnector}; - -/// The `NetworkConnector` that behaves as a connection pool used by hyper's `Client`. -pub struct Pool { - connector: C, - inner: Arc::Stream>>> -} - -/// Config options for the `Pool`. -#[derive(Debug)] -pub struct Config { - /// The maximum idle connections *per host*. - pub max_idle: usize, -} - -impl Default for Config { - #[inline] - fn default() -> Config { - Config { - max_idle: 5, - } - } -} - -#[derive(Debug)] -struct PoolImpl { - conns: HashMap>>, - config: Config, -} - -type Key = (String, u16, Scheme); - -fn key>(host: &str, port: u16, scheme: T) -> Key { - (host.to_owned(), port, scheme.into()) -} - -#[derive(Clone, PartialEq, Eq, Debug, Hash)] -enum Scheme { - Http, - Https, - Other(String) -} - -impl<'a> From<&'a str> for Scheme { - fn from(s: &'a str) -> Scheme { - match s { - "http" => Scheme::Http, - "https" => Scheme::Https, - s => Scheme::Other(String::from(s)) - } - } -} - -impl Pool { - /// Creates a `Pool` with a `DefaultConnector`. - #[inline] - pub fn new(config: Config) -> Pool { - Pool::with_connector(config, DefaultConnector::default()) - } -} - -impl Pool { - /// Creates a `Pool` with a specified `NetworkConnector`. - #[inline] - pub fn with_connector(config: Config, connector: C) -> Pool { - Pool { - connector: connector, - inner: Arc::new(Mutex::new(PoolImpl { - conns: HashMap::new(), - config: config, - })) - } - } - - /// Clear all idle connections from the Pool, closing them. - #[inline] - pub fn clear_idle(&mut self) { - self.inner.lock().unwrap().conns.clear(); - } -} - -impl PoolImpl { - fn reuse(&mut self, key: Key, conn: PooledStreamInner) { - trace!("reuse {:?}", key); - let conns = self.conns.entry(key).or_insert(vec![]); - if conns.len() < self.config.max_idle { - conns.push(conn); - } - } -} - -impl, S: NetworkStream + Send> NetworkConnector for Pool { - type Stream = PooledStream; - fn connect(&self, host: &str, port: u16, scheme: &str) -> ::Result> { - let key = key(host, port, scheme); - - let inner = { - // keep the mutex locked only in this block - let mut locked = self.inner.lock().unwrap(); - let mut should_remove = false; - let inner = locked.conns.get_mut(&key).map(|vec| { - trace!("Pool had connection, using"); - should_remove = vec.len() == 1; - vec.pop().unwrap() - }); - if should_remove { - locked.conns.remove(&key); - } - inner - }; - - let inner = match inner { - Some(inner) => inner, - None => PooledStreamInner { - key: key.clone(), - stream: try!(self.connector.connect(host, port, scheme)), - previous_response_expected_no_content: false, - } - - }; - Ok(PooledStream { - inner: Some(inner), - is_closed: false, - pool: self.inner.clone(), - }) - } -} - -/// A Stream that will try to be returned to the Pool when dropped. -#[derive(Debug)] -pub struct PooledStream { - inner: Option>, - is_closed: bool, - pool: Arc>>, -} - -impl PooledStream { - /// Take the wrapped stream out of the pool completely. - pub fn into_inner(mut self) -> S { - self.inner.take().expect("PooledStream lost its inner stream").stream - } -} - -#[derive(Debug)] -struct PooledStreamInner { - key: Key, - stream: S, - previous_response_expected_no_content: bool, -} - -impl Read for PooledStream { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match self.inner.as_mut().unwrap().stream.read(buf) { - Ok(0) => { - // if the wrapped stream returns EOF (Ok(0)), that means the - // server has closed the stream. we must be sure this stream - // is dropped and not put back into the pool. - self.is_closed = true; - Ok(0) - }, - r => r - } - } -} - -impl Write for PooledStream { - #[inline] - fn write(&mut self, buf: &[u8]) -> io::Result { - self.inner.as_mut().unwrap().stream.write(buf) - } - - #[inline] - fn flush(&mut self) -> io::Result<()> { - self.inner.as_mut().unwrap().stream.flush() - } -} - -impl NetworkStream for PooledStream { - #[inline] - fn peer_addr(&mut self) -> io::Result { - self.inner.as_mut().unwrap().stream.peer_addr() - } - - #[inline] - fn set_read_timeout(&self, dur: Option) -> io::Result<()> { - self.inner.as_ref().unwrap().stream.set_read_timeout(dur) - } - - #[inline] - fn set_write_timeout(&self, dur: Option) -> io::Result<()> { - self.inner.as_ref().unwrap().stream.set_write_timeout(dur) - } - - #[inline] - fn close(&mut self, how: Shutdown) -> io::Result<()> { - self.is_closed = true; - self.inner.as_mut().unwrap().stream.close(how) - } - - #[inline] - fn set_previous_response_expected_no_content(&mut self, expected: bool) { - trace!("set_previous_response_expected_no_content {}", expected); - self.inner.as_mut().unwrap().previous_response_expected_no_content = expected; - } - - #[inline] - fn previous_response_expected_no_content(&self) -> bool { - let answer = self.inner.as_ref().unwrap().previous_response_expected_no_content; - trace!("previous_response_expected_no_content {}", answer); - answer - } -} - -impl Drop for PooledStream { - fn drop(&mut self) { - trace!("PooledStream.drop, is_closed={}", self.is_closed); - if !self.is_closed { - self.inner.take().map(|inner| { - if let Ok(mut pool) = self.pool.lock() { - pool.reuse(inner.key.clone(), inner); - } - // else poisoned, give up - }); - } - } -} - -#[cfg(test)] -mod tests { - use std::net::Shutdown; - use std::io::Read; - use mock::{MockConnector}; - use net::{NetworkConnector, NetworkStream}; - - use super::{Pool, key}; - - macro_rules! mocked { - () => ({ - Pool::with_connector(Default::default(), MockConnector) - }) - } - - #[test] - fn test_connect_and_drop() { - let pool = mocked!(); - let key = key("127.0.0.1", 3000, "http"); - pool.connect("127.0.0.1", 3000, "http").unwrap(); - { - let locked = pool.inner.lock().unwrap(); - assert_eq!(locked.conns.len(), 1); - assert_eq!(locked.conns.get(&key).unwrap().len(), 1); - } - pool.connect("127.0.0.1", 3000, "http").unwrap(); //reused - { - let locked = pool.inner.lock().unwrap(); - assert_eq!(locked.conns.len(), 1); - assert_eq!(locked.conns.get(&key).unwrap().len(), 1); - } - } - - #[test] - fn test_closed() { - let pool = mocked!(); - let mut stream = pool.connect("127.0.0.1", 3000, "http").unwrap(); - stream.close(Shutdown::Both).unwrap(); - drop(stream); - let locked = pool.inner.lock().unwrap(); - assert_eq!(locked.conns.len(), 0); - } - - #[test] - fn test_eof_closes() { - let pool = mocked!(); - - let mut stream = pool.connect("127.0.0.1", 3000, "http").unwrap(); - assert_eq!(stream.read(&mut [0]).unwrap(), 0); - drop(stream); - let locked = pool.inner.lock().unwrap(); - assert_eq!(locked.conns.len(), 0); - } -}