fix(http2): force notify h2 client connection when all body streams drop
This commit is contained in:
		| @@ -1,7 +1,7 @@ | |||||||
| use bytes::IntoBuf; | use bytes::IntoBuf; | ||||||
| use futures::{Async, Future, Poll, Stream}; | use futures::{Async, Future, Poll, Stream}; | ||||||
| use futures::future::{self, Either}; | use futures::future::{self, Either}; | ||||||
| use futures::sync::oneshot; | use futures::sync::mpsc; | ||||||
| use h2::client::{Builder, Handshake, SendRequest}; | use h2::client::{Builder, Handshake, SendRequest}; | ||||||
| use tokio_io::{AsyncRead, AsyncWrite}; | use tokio_io::{AsyncRead, AsyncWrite}; | ||||||
|  |  | ||||||
| @@ -11,6 +11,9 @@ use super::{PipeToSendStream, SendBuf}; | |||||||
| use ::{Body, Request, Response}; | use ::{Body, Request, Response}; | ||||||
|  |  | ||||||
| type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>; | type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>; | ||||||
|  | /// An mpsc channel is used to help notify the `Connection` task when *all* | ||||||
|  | /// other handles to it have been dropped, so that it can shutdown. | ||||||
|  | type ConnDropRef = mpsc::Sender<Never>; | ||||||
|  |  | ||||||
| pub struct Client<T, B> | pub struct Client<T, B> | ||||||
| where | where | ||||||
| @@ -23,7 +26,7 @@ where | |||||||
|  |  | ||||||
| enum State<T, B> where B: IntoBuf { | enum State<T, B> where B: IntoBuf { | ||||||
|     Handshaking(Handshake<T, B>), |     Handshaking(Handshake<T, B>), | ||||||
|     Ready(SendRequest<B>, oneshot::Sender<Never>), |     Ready(SendRequest<B>, ConnDropRef), | ||||||
| } | } | ||||||
|  |  | ||||||
| impl<T, B> Client<T, B> | impl<T, B> Client<T, B> | ||||||
| @@ -58,11 +61,17 @@ where | |||||||
|             let next = match self.state { |             let next = match self.state { | ||||||
|                 State::Handshaking(ref mut h) => { |                 State::Handshaking(ref mut h) => { | ||||||
|                     let (request_tx, conn) = try_ready!(h.poll().map_err(::Error::new_h2)); |                     let (request_tx, conn) = try_ready!(h.poll().map_err(::Error::new_h2)); | ||||||
|                     // A oneshot channel is used entirely to detect when the |                     // An mpsc channel is used entirely to detect when the | ||||||
|                     // 'Client' has been dropped. This is to get around a bug |                     // 'Client' has been dropped. This is to get around a bug | ||||||
|                     // in h2 where dropping all SendRequests won't notify a |                     // in h2 where dropping all SendRequests won't notify a | ||||||
|                     // parked Connection. |                     // parked Connection. | ||||||
|                     let (tx, rx) = oneshot::channel(); |                     let (tx, rx) = mpsc::channel(0); | ||||||
|  |                     let rx = rx.into_future() | ||||||
|  |                         .map(|(msg, _)| match msg { | ||||||
|  |                             Some(never) => match never {}, | ||||||
|  |                             None => (), | ||||||
|  |                         }) | ||||||
|  |                         .map_err(|_| -> Never { unreachable!("mpsc cannot error") }); | ||||||
|                     let fut = conn |                     let fut = conn | ||||||
|                         .inspect(|_| trace!("connection complete")) |                         .inspect(|_| trace!("connection complete")) | ||||||
|                         .map_err(|e| debug!("connection error: {}", e)) |                         .map_err(|e| debug!("connection error: {}", e)) | ||||||
| @@ -73,19 +82,19 @@ where | |||||||
|                                 // conn has finished either way |                                 // conn has finished either way | ||||||
|                                 Either::A(future::ok(())) |                                 Either::A(future::ok(())) | ||||||
|                             }, |                             }, | ||||||
|                             Err(Either::B((_, conn))) => { |                             Ok(Either::B(((), conn))) => { | ||||||
|                                 // oneshot has been dropped, hopefully polling |                                 // mpsc has been dropped, hopefully polling | ||||||
|                                 // the connection some more should start shutdown |                                 // the connection some more should start shutdown | ||||||
|                                 // and then close |                                 // and then close | ||||||
|                                 trace!("send_request dropped, starting conn shutdown"); |                                 trace!("send_request dropped, starting conn shutdown"); | ||||||
|                                 Either::B(conn) |                                 Either::B(conn) | ||||||
|                             } |                             } | ||||||
|                             Ok(Either::B((never, _))) => match never {}, |                             Err(Either::B((never, _))) => match never {}, | ||||||
|                         }); |                         }); | ||||||
|                     self.executor.execute(fut); |                     self.executor.execute(fut); | ||||||
|                     State::Ready(request_tx, tx) |                     State::Ready(request_tx, tx) | ||||||
|                 }, |                 }, | ||||||
|                 State::Ready(ref mut tx, _) => { |                 State::Ready(ref mut tx, ref conn_dropper) => { | ||||||
|                     try_ready!(tx.poll_ready().map_err(::Error::new_h2)); |                     try_ready!(tx.poll_ready().map_err(::Error::new_h2)); | ||||||
|                     match self.rx.poll() { |                     match self.rx.poll() { | ||||||
|                         Ok(Async::Ready(Some((req, mut cb)))) => { |                         Ok(Async::Ready(Some((req, mut cb)))) => { | ||||||
| @@ -107,8 +116,14 @@ where | |||||||
|                                 } |                                 } | ||||||
|                             }; |                             }; | ||||||
|                             if !eos { |                             if !eos { | ||||||
|                                 let pipe = PipeToSendStream::new(body, body_tx); |                                 let conn_drop_ref = conn_dropper.clone(); | ||||||
|                                 self.executor.execute(pipe.map_err(|e| debug!("client request body error: {}", e))); |                                 let pipe = PipeToSendStream::new(body, body_tx) | ||||||
|  |                                     .map_err(|e| debug!("client request body error: {}", e)) | ||||||
|  |                                     .then(move |x| { | ||||||
|  |                                         drop(conn_drop_ref); | ||||||
|  |                                         x | ||||||
|  |                                     }); | ||||||
|  |                                 self.executor.execute(pipe); | ||||||
|                             } |                             } | ||||||
|  |  | ||||||
|                             let fut = fut |                             let fut = fut | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user