| @@ -221,10 +221,26 @@ where C: Connect, | ||||
|                 e.into() | ||||
|             }); | ||||
|  | ||||
|         let resp = race.and_then(move |mut client| { | ||||
|         let resp = race.and_then(move |client| { | ||||
|             use proto::dispatch::ClientMsg; | ||||
|  | ||||
|             let (callback, rx) = oneshot::channel(); | ||||
|             client.tx.borrow_mut().start_send(proto::dispatch::ClientMsg::Request(head, body, callback)).unwrap(); | ||||
|             client.should_close = false; | ||||
|  | ||||
|             match client.tx.borrow_mut().start_send(ClientMsg::Request(head, body, callback)) { | ||||
|                 Ok(_) => (), | ||||
|                 Err(e) => match e.into_inner() { | ||||
|                     ClientMsg::Request(_, _, callback) => { | ||||
|                         error!("pooled connection was not ready, this is a hyper bug"); | ||||
|                         let err = io::Error::new( | ||||
|                             io::ErrorKind::BrokenPipe, | ||||
|                             "pool selected dead connection", | ||||
|                         ); | ||||
|                         let _ = callback.send(Err(::Error::Io(err))); | ||||
|                     }, | ||||
|                     _ => unreachable!("ClientMsg::Request was just sent"), | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             rx.then(|res| { | ||||
|                 match res { | ||||
|                     Ok(Ok(res)) => Ok(res), | ||||
| @@ -256,8 +272,8 @@ impl<C, B> fmt::Debug for Client<C, B> { | ||||
| } | ||||
|  | ||||
| struct HyperClient<B> { | ||||
|     tx: RefCell<::futures::sync::mpsc::Sender<proto::dispatch::ClientMsg<B>>>, | ||||
|     should_close: bool, | ||||
|     tx: RefCell<::futures::sync::mpsc::Sender<proto::dispatch::ClientMsg<B>>>, | ||||
| } | ||||
|  | ||||
| impl<B> Clone for HyperClient<B> { | ||||
| @@ -269,6 +285,15 @@ impl<B> Clone for HyperClient<B> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<B> self::pool::Ready for HyperClient<B> { | ||||
|     fn poll_ready(&mut self) -> Poll<(), ()> { | ||||
|         self.tx | ||||
|             .borrow_mut() | ||||
|             .poll_ready() | ||||
|             .map_err(|_| ()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<B> Drop for HyperClient<B> { | ||||
|     fn drop(&mut self) { | ||||
|         if self.should_close { | ||||
| @@ -497,3 +522,4 @@ mod background { | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -15,6 +15,15 @@ pub struct Pool<T> { | ||||
|     inner: Rc<RefCell<PoolInner<T>>>, | ||||
| } | ||||
|  | ||||
| // Before using a pooled connection, make sure the sender is not dead. | ||||
| // | ||||
| // This is a trait to allow the `client::pool::tests` to work for `i32`. | ||||
| // | ||||
| // See https://github.com/hyperium/hyper/issues/1429 | ||||
| pub trait Ready { | ||||
|     fn poll_ready(&mut self) -> Poll<(), ()>; | ||||
| } | ||||
|  | ||||
| struct PoolInner<T> { | ||||
|     enabled: bool, | ||||
|     // These are internal Conns sitting in the event loop in the KeepAlive | ||||
| @@ -256,7 +265,7 @@ pub struct Checkout<T> { | ||||
|     parked: Option<relay::Receiver<Entry<T>>>, | ||||
| } | ||||
|  | ||||
| impl<T: Clone> Future for Checkout<T> { | ||||
| impl<T: Ready + Clone> Future for Checkout<T> { | ||||
|     type Item = Pooled<T>; | ||||
|     type Error = io::Error; | ||||
|  | ||||
| @@ -282,21 +291,22 @@ impl<T: Clone> Future for Checkout<T> { | ||||
|         let mut should_remove = false; | ||||
|         let entry = self.pool.inner.borrow_mut().idle.get_mut(key).and_then(|list| { | ||||
|             trace!("Checkout::poll key found {:?}", key); | ||||
|             while let Some(entry) = list.pop() { | ||||
|             while let Some(mut entry) = list.pop() { | ||||
|                 match entry.status.get() { | ||||
|                     TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => { | ||||
|                         debug!("found idle connection for {:?}", key); | ||||
|                         should_remove = list.is_empty(); | ||||
|                         return Some(entry); | ||||
|                         if let Ok(Async::Ready(())) = entry.value.poll_ready() { | ||||
|                             debug!("found idle connection for {:?}", key); | ||||
|                             should_remove = list.is_empty(); | ||||
|                             return Some(entry); | ||||
|                         } | ||||
|                     }, | ||||
|                     _ => { | ||||
|                         trace!("Checkout::poll removing unacceptable pooled {:?}", key); | ||||
|                         // every other case the Entry should just be dropped | ||||
|                         // 1. Idle but expired | ||||
|                         // 2. Busy (something else somehow took it?) | ||||
|                         // 3. Disabled don't reuse of course | ||||
|                     } | ||||
|                     _ => {}, | ||||
|                 } | ||||
|                 trace!("Checkout::poll removing unacceptable pooled {:?}", key); | ||||
|                 // every other case the Entry should just be dropped | ||||
|                 // 1. Idle but expired | ||||
|                 // 2. Busy (something else somehow took it?) | ||||
|                 // 3. Disabled don't reuse of course | ||||
|             } | ||||
|             should_remove = true; | ||||
|             None | ||||
| @@ -347,10 +357,16 @@ impl Expiration { | ||||
| mod tests { | ||||
|     use std::rc::Rc; | ||||
|     use std::time::Duration; | ||||
|     use futures::{Async, Future}; | ||||
|     use futures::{Async, Future, Poll}; | ||||
|     use futures::future; | ||||
|     use proto::KeepAlive; | ||||
|     use super::Pool; | ||||
|     use super::{Ready, Pool}; | ||||
|  | ||||
|     impl Ready for i32 { | ||||
|         fn poll_ready(&mut self) -> Poll<(), ()> { | ||||
|             Ok(Async::Ready(())) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn test_pool_checkout_smoke() { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user