fix(client): properly close idle connections after timeout

Additionally fixes if there were idle connections when a `Client` is
dropped.

Only fixes with the no-proto dispatcher, as changing internals for the
tokio-proto dispatcher would be much harder, and it will replace it very
soon.

Closes #1397
This commit is contained in:
Sean McArthur
2017-12-13 16:29:25 -08:00
parent 16aa92cf03
commit 139dc7ab2b
4 changed files with 125 additions and 13 deletions

View File

@@ -249,8 +249,12 @@ where C: Connect,
let pool_key = Rc::new(domain.to_string()); let pool_key = Rc::new(domain.to_string());
self.connector.connect(url) self.connector.connect(url)
.map(move |io| { .map(move |io| {
let (tx, rx) = mpsc::channel(1); let (tx, rx) = mpsc::channel(0);
let pooled = pool.pooled(pool_key, RefCell::new(tx)); let tx = HyperClient {
tx: RefCell::new(tx),
should_close: true,
};
let pooled = pool.pooled(pool_key, tx);
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone()); let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn); let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err))); handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err)));
@@ -269,9 +273,10 @@ where C: Connect,
e.into() e.into()
}); });
let resp = race.and_then(move |client| { let resp = race.and_then(move |mut client| {
let (callback, rx) = oneshot::channel(); let (callback, rx) = oneshot::channel();
client.borrow_mut().start_send((head, body, callback)).unwrap(); client.tx.borrow_mut().start_send(proto::dispatch::ClientMsg::Request(head, body, callback)).unwrap();
client.should_close = false;
rx.then(|res| { rx.then(|res| {
match res { match res {
Ok(Ok(res)) => Ok(res), Ok(Ok(res)) => Ok(res),
@@ -309,7 +314,29 @@ impl<C, B> fmt::Debug for Client<C, B> {
} }
type ProtoClient<B> = ClientProxy<Message<RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>; type ProtoClient<B> = ClientProxy<Message<RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>;
type HyperClient<B> = RefCell<::futures::sync::mpsc::Sender<(RequestHead, Option<B>, ::futures::sync::oneshot::Sender<::Result<::Response>>)>>;
struct HyperClient<B> {
tx: RefCell<::futures::sync::mpsc::Sender<proto::dispatch::ClientMsg<B>>>,
should_close: bool,
}
impl<B> Clone for HyperClient<B> {
fn clone(&self) -> HyperClient<B> {
HyperClient {
tx: self.tx.clone(),
should_close: self.should_close,
}
}
}
impl<B> Drop for HyperClient<B> {
fn drop(&mut self) {
if self.should_close {
self.should_close = false;
let _ = self.tx.borrow_mut().try_send(proto::dispatch::ClientMsg::Close);
}
}
}
enum Dispatch<B> { enum Dispatch<B> {
Proto(Pool<ProtoClient<B>>), Proto(Pool<ProtoClient<B>>),

View File

@@ -511,6 +511,11 @@ where I: AsyncRead + AsyncWrite,
} }
pub fn close_and_shutdown(&mut self) -> Poll<(), io::Error> {
try_ready!(self.flush());
self.shutdown()
}
pub fn shutdown(&mut self) -> Poll<(), io::Error> { pub fn shutdown(&mut self) -> Poll<(), io::Error> {
match self.io.io_mut().shutdown() { match self.io.io_mut().shutdown() {
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
@@ -625,8 +630,7 @@ where I: AsyncRead + AsyncWrite,
#[inline] #[inline]
fn close(&mut self) -> Poll<(), Self::SinkError> { fn close(&mut self) -> Poll<(), Self::SinkError> {
try_ready!(self.poll_complete()); self.close_and_shutdown()
self.shutdown()
} }
} }

View File

@@ -13,6 +13,7 @@ pub struct Dispatcher<D, Bs, I, B, T, K> {
dispatch: D, dispatch: D,
body_tx: Option<super::body::BodySender>, body_tx: Option<super::body::BodySender>,
body_rx: Option<Bs>, body_rx: Option<Bs>,
is_closing: bool,
} }
pub trait Dispatch { pub trait Dispatch {
@@ -34,7 +35,12 @@ pub struct Client<B> {
rx: ClientRx<B>, rx: ClientRx<B>,
} }
type ClientRx<B> = mpsc::Receiver<(RequestHead, Option<B>, oneshot::Sender<::Result<::Response>>)>; pub enum ClientMsg<B> {
Request(RequestHead, Option<B>, oneshot::Sender<::Result<::Response>>),
Close,
}
type ClientRx<B> = mpsc::Receiver<ClientMsg<B>>;
impl<D, Bs, I, B, T, K> Dispatcher<D, Bs, I, B, T, K> impl<D, Bs, I, B, T, K> Dispatcher<D, Bs, I, B, T, K>
where where
@@ -51,6 +57,7 @@ where
dispatch: dispatch, dispatch: dispatch,
body_tx: None, body_tx: None,
body_rx: None, body_rx: None,
is_closing: false,
} }
} }
@@ -60,7 +67,9 @@ where
fn poll_read(&mut self) -> Poll<(), ::Error> { fn poll_read(&mut self) -> Poll<(), ::Error> {
loop { loop {
if self.conn.can_read_head() { if self.is_closing {
return Ok(Async::Ready(()));
} else if self.conn.can_read_head() {
match self.conn.read_head() { match self.conn.read_head() {
Ok(Async::Ready(Some((head, has_body)))) => { Ok(Async::Ready(Some((head, has_body)))) => {
let body = if has_body { let body = if has_body {
@@ -149,12 +158,16 @@ where
fn poll_write(&mut self) -> Poll<(), ::Error> { fn poll_write(&mut self) -> Poll<(), ::Error> {
loop { loop {
if self.body_rx.is_none() && self.dispatch.should_poll() { if self.is_closing {
return Ok(Async::Ready(()));
} else if self.body_rx.is_none() && self.dispatch.should_poll() {
if let Some((head, body)) = try_ready!(self.dispatch.poll_msg()) { if let Some((head, body)) = try_ready!(self.dispatch.poll_msg()) {
self.conn.write_head(head, body.is_some()); self.conn.write_head(head, body.is_some());
self.body_rx = body; self.body_rx = body;
} else { } else {
self.conn.close_write(); self.is_closing = true;
//self.conn.close_read();
//self.conn.close_write();
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} }
} else if self.conn.has_queued_body() { } else if self.conn.has_queued_body() {
@@ -190,6 +203,16 @@ where
}) })
} }
fn poll_close(&mut self) -> Poll<(), ::Error> {
debug_assert!(self.is_closing);
try_ready!(self.conn.close_and_shutdown());
self.conn.close_read();
self.conn.close_write();
self.is_closing = false;
Ok(Async::Ready(()))
}
fn is_done(&self) -> bool { fn is_done(&self) -> bool {
let read_done = self.conn.is_read_closed(); let read_done = self.conn.is_read_closed();
@@ -224,6 +247,10 @@ where
self.poll_write()?; self.poll_write()?;
self.poll_flush()?; self.poll_flush()?;
if self.is_closing {
self.poll_close()?;
}
if self.is_done() { if self.is_done() {
try_ready!(self.conn.shutdown()); try_ready!(self.conn.shutdown());
trace!("Dispatch::poll done"); trace!("Dispatch::poll done");
@@ -285,6 +312,7 @@ where
// ===== impl Client ===== // ===== impl Client =====
impl<B> Client<B> { impl<B> Client<B> {
pub fn new(rx: ClientRx<B>) -> Client<B> { pub fn new(rx: ClientRx<B>) -> Client<B> {
Client { Client {
@@ -305,11 +333,13 @@ where
fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> { fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Option<Self::PollBody>)>, ::Error> {
match self.rx.poll() { match self.rx.poll() {
Ok(Async::Ready(Some((head, body, cb)))) => { Ok(Async::Ready(Some(ClientMsg::Request(head, body, cb)))) => {
self.callback = Some(cb); self.callback = Some(cb);
Ok(Async::Ready(Some((head, body)))) Ok(Async::Ready(Some((head, body))))
}, },
Ok(Async::Ready(Some(ClientMsg::Close))) |
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
trace!("client tx closed");
// user has dropped sender handle // user has dropped sender handle
Ok(Async::Ready(None)) Ok(Async::Ready(None))
}, },

View File

@@ -607,7 +607,7 @@ mod dispatch_impl {
} }
#[test] #[test]
fn drop_client_closes_connection() { fn dropped_client_closes_connection() {
// https://github.com/hyperium/hyper/issues/1353 // https://github.com/hyperium/hyper/issues/1353
let _ = pretty_env_logger::init(); let _ = pretty_env_logger::init();
@@ -653,6 +653,57 @@ mod dispatch_impl {
assert_eq!(closes.load(Ordering::Relaxed), 1); assert_eq!(closes.load(Ordering::Relaxed), 1);
} }
#[test]
fn drop_client_closes_idle_connections() {
let _ = pretty_env_logger::init();
let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let closes = Arc::new(AtomicUsize::new(0));
let (tx1, rx1) = oneshot::channel();
let (_client_drop_tx, client_drop_rx) = oneshot::channel::<()>();
thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
let body =[b'x'; 64];
write!(sock, "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", body.len()).expect("write head");
let _ = sock.write_all(&body);
let _ = tx1.send(());
// prevent this thread from closing until end of test, so the connection
// stays open and idle until Client is dropped
let _ = client_drop_rx.wait();
});
let uri = format!("http://{}/a", addr).parse().unwrap();
let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.no_proto()
.build(&handle);
let res = client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok);
res.body().concat2()
});
let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
core.run(res.join(rx).map(|r| r.0)).unwrap();
// not closed yet, just idle
assert_eq!(closes.load(Ordering::Relaxed), 0);
drop(client);
core.run(Timeout::new(Duration::from_millis(100), &handle).unwrap()).unwrap();
assert_eq!(closes.load(Ordering::Relaxed), 1);
}
#[test] #[test]
fn no_keep_alive_closes_connection() { fn no_keep_alive_closes_connection() {
// https://github.com/hyperium/hyper/issues/1383 // https://github.com/hyperium/hyper/issues/1383