From c5642739867679118aed333887866e22ca090852 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 30 Jul 2018 21:42:00 -0700 Subject: [PATCH] fix graceful shutdown to close once idle (#296) --- src/proto/go_away.rs | 2 ++ tests/h2-support/src/mock.rs | 39 ++++++++++++++++++++++++++-------- tests/h2-tests/tests/server.rs | 2 +- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/proto/go_away.rs b/src/proto/go_away.rs index 32eb90f..5b4f856 100644 --- a/src/proto/go_away.rs +++ b/src/proto/go_away.rs @@ -127,6 +127,8 @@ impl GoAway { .expect("invalid GOAWAY frame"); return Ok(Async::Ready(Some(reason))); + } else if self.should_close_now() { + return Ok(Async::Ready(self.going_away_reason())); } Ok(Async::Ready(None)) diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index ee81980..a1bf9e5 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -364,6 +364,17 @@ impl AsyncWrite for Mock { } } +impl Drop for Mock { + fn drop(&mut self) { + let mut me = self.pipe.inner.lock().unwrap(); + me.closed = true; + + if let Some(task) = me.tx_task.take() { + task.notify(); + } + } +} + // ===== impl Pipe ===== impl io::Read for Pipe { @@ -375,7 +386,12 @@ impl io::Read for Pipe { let mut me = self.inner.lock().unwrap(); + if me.tx.is_empty() { + if me.closed { + return Ok(0); + } + me.tx_task = Some(task::current()); return Err(WouldBlock.into()); } @@ -442,7 +458,7 @@ pub trait HandleFutureExt { Box::new(map); RecvFrame { inner: boxed, - frame: settings.into().into(), + frame: Some(settings.into().into()), } } @@ -460,7 +476,14 @@ pub trait HandleFutureExt { Self: IntoRecvFrame + Sized, T: Into, { - self.into_recv_frame(frame.into()) + self.into_recv_frame(Some(frame.into())) + } + + fn recv_eof(self) -> RecvFrame<::Future> + where + Self: IntoRecvFrame + Sized, + { + self.into_recv_frame(None) } fn send_frame(self, frame: T) -> SendFrameFut @@ -623,7 +646,7 @@ pub trait HandleFutureExt { pub struct RecvFrame { inner: T, - frame: Frame, + frame: Option, } impl Future for RecvFrame @@ -642,10 +665,8 @@ where Async::NotReady => return Ok(Async::NotReady), }; - let frame = frame.unwrap(); - match (frame, &self.frame) { - (Data(ref a), &Data(ref b)) => { + (Some(Data(ref a)), &Some(Data(ref b))) => { assert_eq!(a.payload().len(), b.payload().len(), "recv_frame data payload len"); assert_eq!(a, b, "recv_frame"); } @@ -712,13 +733,13 @@ where pub trait IntoRecvFrame { type Future: Future; - fn into_recv_frame(self, frame: Frame) -> RecvFrame; + fn into_recv_frame(self, frame: Option) -> RecvFrame; } impl IntoRecvFrame for Handle { type Future = ::futures::stream::StreamFuture; - fn into_recv_frame(self, frame: Frame) -> RecvFrame { + fn into_recv_frame(self, frame: Option) -> RecvFrame { RecvFrame { inner: self.into_future(), frame: frame, @@ -733,7 +754,7 @@ where { type Future = Box, Handle), Error = ()>>; - fn into_recv_frame(self, frame: Frame) -> RecvFrame { + fn into_recv_frame(self, frame: Option) -> RecvFrame { let into_fut = Box::new( self.unwrap() .and_then(|handle| handle.into_future().unwrap()), diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index 1bbec43..d6f276c 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -268,7 +268,7 @@ fn graceful_shutdown() { .send_frame(frames::data(7, "").eos()) .send_frame(frames::data(3, "").eos()) .recv_frame(frames::headers(3).response(200).eos()) - .close(); //TODO: closed()? + .recv_eof(); let srv = server::handshake(io) .expect("handshake")