fix graceful shutdown to close once idle (#296)

This commit is contained in:
Sean McArthur
2018-07-30 21:42:00 -07:00
committed by Carl Lerche
parent fdfb873438
commit c564273986
3 changed files with 33 additions and 10 deletions

View File

@@ -127,6 +127,8 @@ impl GoAway {
.expect("invalid GOAWAY frame"); .expect("invalid GOAWAY frame");
return Ok(Async::Ready(Some(reason))); return Ok(Async::Ready(Some(reason)));
} else if self.should_close_now() {
return Ok(Async::Ready(self.going_away_reason()));
} }
Ok(Async::Ready(None)) Ok(Async::Ready(None))

View File

@@ -364,6 +364,17 @@ impl AsyncWrite for Mock {
} }
} }
impl Drop for Mock {
fn drop(&mut self) {
let mut me = self.pipe.inner.lock().unwrap();
me.closed = true;
if let Some(task) = me.tx_task.take() {
task.notify();
}
}
}
// ===== impl Pipe ===== // ===== impl Pipe =====
impl io::Read for Pipe { impl io::Read for Pipe {
@@ -375,7 +386,12 @@ impl io::Read for Pipe {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
if me.tx.is_empty() { if me.tx.is_empty() {
if me.closed {
return Ok(0);
}
me.tx_task = Some(task::current()); me.tx_task = Some(task::current());
return Err(WouldBlock.into()); return Err(WouldBlock.into());
} }
@@ -442,7 +458,7 @@ pub trait HandleFutureExt {
Box::new(map); Box::new(map);
RecvFrame { RecvFrame {
inner: boxed, inner: boxed,
frame: settings.into().into(), frame: Some(settings.into().into()),
} }
} }
@@ -460,7 +476,14 @@ pub trait HandleFutureExt {
Self: IntoRecvFrame + Sized, Self: IntoRecvFrame + Sized,
T: Into<Frame>, T: Into<Frame>,
{ {
self.into_recv_frame(frame.into()) self.into_recv_frame(Some(frame.into()))
}
fn recv_eof(self) -> RecvFrame<<Self as IntoRecvFrame>::Future>
where
Self: IntoRecvFrame + Sized,
{
self.into_recv_frame(None)
} }
fn send_frame<T>(self, frame: T) -> SendFrameFut<Self> fn send_frame<T>(self, frame: T) -> SendFrameFut<Self>
@@ -623,7 +646,7 @@ pub trait HandleFutureExt {
pub struct RecvFrame<T> { pub struct RecvFrame<T> {
inner: T, inner: T,
frame: Frame, frame: Option<Frame>,
} }
impl<T> Future for RecvFrame<T> impl<T> Future for RecvFrame<T>
@@ -642,10 +665,8 @@ where
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
}; };
let frame = frame.unwrap();
match (frame, &self.frame) { match (frame, &self.frame) {
(Data(ref a), &Data(ref b)) => { (Some(Data(ref a)), &Some(Data(ref b))) => {
assert_eq!(a.payload().len(), b.payload().len(), "recv_frame data payload len"); assert_eq!(a.payload().len(), b.payload().len(), "recv_frame data payload len");
assert_eq!(a, b, "recv_frame"); assert_eq!(a, b, "recv_frame");
} }
@@ -712,13 +733,13 @@ where
pub trait IntoRecvFrame { pub trait IntoRecvFrame {
type Future: Future; type Future: Future;
fn into_recv_frame(self, frame: Frame) -> RecvFrame<Self::Future>; fn into_recv_frame(self, frame: Option<Frame>) -> RecvFrame<Self::Future>;
} }
impl IntoRecvFrame for Handle { impl IntoRecvFrame for Handle {
type Future = ::futures::stream::StreamFuture<Self>; type Future = ::futures::stream::StreamFuture<Self>;
fn into_recv_frame(self, frame: Frame) -> RecvFrame<Self::Future> { fn into_recv_frame(self, frame: Option<Frame>) -> RecvFrame<Self::Future> {
RecvFrame { RecvFrame {
inner: self.into_future(), inner: self.into_future(),
frame: frame, frame: frame,
@@ -733,7 +754,7 @@ where
{ {
type Future = Box<Future<Item = (Option<Frame>, Handle), Error = ()>>; type Future = Box<Future<Item = (Option<Frame>, Handle), Error = ()>>;
fn into_recv_frame(self, frame: Frame) -> RecvFrame<Self::Future> { fn into_recv_frame(self, frame: Option<Frame>) -> RecvFrame<Self::Future> {
let into_fut = Box::new( let into_fut = Box::new(
self.unwrap() self.unwrap()
.and_then(|handle| handle.into_future().unwrap()), .and_then(|handle| handle.into_future().unwrap()),

View File

@@ -268,7 +268,7 @@ fn graceful_shutdown() {
.send_frame(frames::data(7, "").eos()) .send_frame(frames::data(7, "").eos())
.send_frame(frames::data(3, "").eos()) .send_frame(frames::data(3, "").eos())
.recv_frame(frames::headers(3).response(200).eos()) .recv_frame(frames::headers(3).response(200).eos())
.close(); //TODO: closed()? .recv_eof();
let srv = server::handshake(io) let srv = server::handshake(io)
.expect("handshake") .expect("handshake")