diff --git a/src/client/mod.rs b/src/client/mod.rs index ececc3e9..dec8a91d 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -221,10 +221,26 @@ where C: Connect, e.into() }); - let resp = race.and_then(move |mut client| { + let resp = race.and_then(move |client| { + use proto::dispatch::ClientMsg; + let (callback, rx) = oneshot::channel(); - client.tx.borrow_mut().start_send(proto::dispatch::ClientMsg::Request(head, body, callback)).unwrap(); - client.should_close = false; + + match client.tx.borrow_mut().start_send(ClientMsg::Request(head, body, callback)) { + Ok(_) => (), + Err(e) => match e.into_inner() { + ClientMsg::Request(_, _, callback) => { + error!("pooled connection was not ready, this is a hyper bug"); + let err = io::Error::new( + io::ErrorKind::BrokenPipe, + "pool selected dead connection", + ); + let _ = callback.send(Err(::Error::Io(err))); + }, + _ => unreachable!("ClientMsg::Request was just sent"), + } + } + rx.then(|res| { match res { Ok(Ok(res)) => Ok(res), @@ -256,8 +272,8 @@ impl fmt::Debug for Client { } struct HyperClient { - tx: RefCell<::futures::sync::mpsc::Sender>>, should_close: bool, + tx: RefCell<::futures::sync::mpsc::Sender>>, } impl Clone for HyperClient { @@ -269,6 +285,15 @@ impl Clone for HyperClient { } } +impl self::pool::Ready for HyperClient { + fn poll_ready(&mut self) -> Poll<(), ()> { + self.tx + .borrow_mut() + .poll_ready() + .map_err(|_| ()) + } +} + impl Drop for HyperClient { fn drop(&mut self) { if self.should_close { @@ -497,3 +522,4 @@ mod background { } } } + diff --git a/src/client/pool.rs b/src/client/pool.rs index f56403e3..c8ee8883 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -15,6 +15,15 @@ pub struct Pool { inner: Rc>>, } +// Before using a pooled connection, make sure the sender is not dead. +// +// This is a trait to allow the `client::pool::tests` to work for `i32`. +// +// See https://github.com/hyperium/hyper/issues/1429 +pub trait Ready { + fn poll_ready(&mut self) -> Poll<(), ()>; +} + struct PoolInner { enabled: bool, // These are internal Conns sitting in the event loop in the KeepAlive @@ -256,7 +265,7 @@ pub struct Checkout { parked: Option>>, } -impl Future for Checkout { +impl Future for Checkout { type Item = Pooled; type Error = io::Error; @@ -282,21 +291,22 @@ impl Future for Checkout { let mut should_remove = false; let entry = self.pool.inner.borrow_mut().idle.get_mut(key).and_then(|list| { trace!("Checkout::poll key found {:?}", key); - while let Some(entry) = list.pop() { + while let Some(mut entry) = list.pop() { match entry.status.get() { TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => { - debug!("found idle connection for {:?}", key); - should_remove = list.is_empty(); - return Some(entry); + if let Ok(Async::Ready(())) = entry.value.poll_ready() { + debug!("found idle connection for {:?}", key); + should_remove = list.is_empty(); + return Some(entry); + } }, - _ => { - trace!("Checkout::poll removing unacceptable pooled {:?}", key); - // every other case the Entry should just be dropped - // 1. Idle but expired - // 2. Busy (something else somehow took it?) - // 3. Disabled don't reuse of course - } + _ => {}, } + trace!("Checkout::poll removing unacceptable pooled {:?}", key); + // every other case the Entry should just be dropped + // 1. Idle but expired + // 2. Busy (something else somehow took it?) + // 3. Disabled don't reuse of course } should_remove = true; None @@ -347,10 +357,16 @@ impl Expiration { mod tests { use std::rc::Rc; use std::time::Duration; - use futures::{Async, Future}; + use futures::{Async, Future, Poll}; use futures::future; use proto::KeepAlive; - use super::Pool; + use super::{Ready, Pool}; + + impl Ready for i32 { + fn poll_ready(&mut self) -> Poll<(), ()> { + Ok(Async::Ready(())) + } + } #[test] fn test_pool_checkout_smoke() { diff --git a/tests/client.rs b/tests/client.rs index c1f4f2e9..b3552163 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -954,6 +954,60 @@ mod dispatch_impl { assert_eq!(closes.load(Ordering::Relaxed), 1); } + #[test] + fn conn_drop_prevents_pool_checkout() { + // a drop might happen for any sort of reason, and we can protect + // against a lot of them, but if the `Core` is dropped, we can't + // really catch that. So, this is case to always check. + // + // See https://github.com/hyperium/hyper/issues/1429 + + use std::error::Error; + let _ = pretty_env_logger::try_init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let (tx1, rx1) = oneshot::channel(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap(); + sock.read(&mut buf).expect("read 2"); + let _ = tx1.send(()); + }); + + let uri = format!("http://{}/a", addr).parse::().unwrap(); + + let client = Client::new(&handle); + let res = client.get(uri.clone()).and_then(move |res| { + assert_eq!(res.status(), hyper::StatusCode::Ok); + res.body().concat2() + }); + + core.run(res).unwrap(); + + // drop previous Core + core = Core::new().unwrap(); + let timeout = Timeout::new(Duration::from_millis(200), &core.handle()).unwrap(); + let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let rx = rx.and_then(move |_| timeout.map_err(|e| e.into())); + + let res = client.get(uri); + // this does trigger an 'event loop gone' error, but before, it would + // panic internally on a `SendError`, which is what we're testing against. + let err = core.run(res.join(rx).map(|r| r.0)).unwrap_err(); + assert_eq!(err.description(), "event loop gone"); + } + + + #[test] fn client_custom_executor() { let server = TcpListener::bind("127.0.0.1:0").unwrap();