fix(client): fix polling dispatch channel after it has closed

This commit is contained in:
Sean McArthur
2019-10-30 13:59:24 -07:00
parent de5dcd7865
commit 039281b89c

View File

@@ -37,6 +37,7 @@ pub struct Server<S: HttpService<B>, B> {
pub struct Client<B> { pub struct Client<B> {
callback: Option<crate::client::dispatch::Callback<Request<B>, Response<Body>>>, callback: Option<crate::client::dispatch::Callback<Request<B>, Response<Body>>>,
rx: ClientRx<B>, rx: ClientRx<B>,
rx_closed: bool,
} }
type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>; type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
@@ -490,7 +491,8 @@ impl<B> Client<B> {
pub fn new(rx: ClientRx<B>) -> Client<B> { pub fn new(rx: ClientRx<B>) -> Client<B> {
Client { Client {
callback: None, callback: None,
rx: rx, rx,
rx_closed: false,
} }
} }
} }
@@ -505,6 +507,7 @@ where
type RecvItem = ResponseHead; type RecvItem = ResponseHead;
fn poll_msg(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Never>>> { fn poll_msg(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Never>>> {
debug_assert!(!self.rx_closed);
match self.rx.poll_next(cx) { match self.rx.poll_next(cx) {
Poll::Ready(Some((req, mut cb))) => { Poll::Ready(Some((req, mut cb))) => {
// check that future hasn't been canceled already // check that future hasn't been canceled already
@@ -526,8 +529,9 @@ where
} }
}, },
Poll::Ready(None) => { Poll::Ready(None) => {
trace!("client tx closed");
// user has dropped sender handle // user has dropped sender handle
trace!("client tx closed");
self.rx_closed = true;
Poll::Ready(None) Poll::Ready(None)
}, },
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
@@ -555,7 +559,7 @@ where
if let Some(cb) = self.callback.take() { if let Some(cb) = self.callback.take() {
let _ = cb.send(Err((err, None))); let _ = cb.send(Err((err, None)));
Ok(()) Ok(())
} else { } else if !self.rx_closed {
self.rx.close(); self.rx.close();
if let Some((req, cb)) = self.rx.try_recv() { if let Some((req, cb)) = self.rx.try_recv() {
trace!("canceling queued request with connection error: {}", err); trace!("canceling queued request with connection error: {}", err);
@@ -566,6 +570,8 @@ where
} else { } else {
Err(err) Err(err)
} }
} else {
Err(err)
} }
} }
} }