From ab3c73fd842848b38bb20721dfe8cc1e95ad7def Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 1 May 2018 12:45:22 -0700 Subject: [PATCH] fix(http2): force notify h2 client connection when all body streams drop --- src/proto/h2/client.rs | 35 +++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index c02175ad..0915eb37 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -1,7 +1,7 @@ use bytes::IntoBuf; use futures::{Async, Future, Poll, Stream}; use futures::future::{self, Either}; -use futures::sync::oneshot; +use futures::sync::mpsc; use h2::client::{Builder, Handshake, SendRequest}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -11,6 +11,9 @@ use super::{PipeToSendStream, SendBuf}; use ::{Body, Request, Response}; type ClientRx = ::client::dispatch::Receiver, Response>; +/// An mpsc channel is used to help notify the `Connection` task when *all* +/// other handles to it have been dropped, so that it can shutdown. +type ConnDropRef = mpsc::Sender; pub struct Client where @@ -23,7 +26,7 @@ where enum State where B: IntoBuf { Handshaking(Handshake), - Ready(SendRequest, oneshot::Sender), + Ready(SendRequest, ConnDropRef), } impl Client @@ -58,11 +61,17 @@ where let next = match self.state { State::Handshaking(ref mut h) => { let (request_tx, conn) = try_ready!(h.poll().map_err(::Error::new_h2)); - // A oneshot channel is used entirely to detect when the + // An mpsc channel is used entirely to detect when the // 'Client' has been dropped. This is to get around a bug // in h2 where dropping all SendRequests won't notify a // parked Connection. - let (tx, rx) = oneshot::channel(); + let (tx, rx) = mpsc::channel(0); + let rx = rx.into_future() + .map(|(msg, _)| match msg { + Some(never) => match never {}, + None => (), + }) + .map_err(|_| -> Never { unreachable!("mpsc cannot error") }); let fut = conn .inspect(|_| trace!("connection complete")) .map_err(|e| debug!("connection error: {}", e)) @@ -73,19 +82,19 @@ where // conn has finished either way Either::A(future::ok(())) }, - Err(Either::B((_, conn))) => { - // oneshot has been dropped, hopefully polling + Ok(Either::B(((), conn))) => { + // mpsc has been dropped, hopefully polling // the connection some more should start shutdown // and then close trace!("send_request dropped, starting conn shutdown"); Either::B(conn) } - Ok(Either::B((never, _))) => match never {}, + Err(Either::B((never, _))) => match never {}, }); self.executor.execute(fut); State::Ready(request_tx, tx) }, - State::Ready(ref mut tx, _) => { + State::Ready(ref mut tx, ref conn_dropper) => { try_ready!(tx.poll_ready().map_err(::Error::new_h2)); match self.rx.poll() { Ok(Async::Ready(Some((req, mut cb)))) => { @@ -107,8 +116,14 @@ where } }; if !eos { - let pipe = PipeToSendStream::new(body, body_tx); - self.executor.execute(pipe.map_err(|e| debug!("client request body error: {}", e))); + let conn_drop_ref = conn_dropper.clone(); + let pipe = PipeToSendStream::new(body, body_tx) + .map_err(|e| debug!("client request body error: {}", e)) + .then(move |x| { + drop(conn_drop_ref); + x + }); + self.executor.execute(pipe); } let fut = fut