From e2cda1860b4e90d7432582603d503d8e7b2a643e Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 11 Sep 2017 14:46:46 -0700 Subject: [PATCH] fix Body to return errors when there is recv error --- src/proto/streams/recv.rs | 33 +++++++++++++-------------------- src/proto/streams/state.rs | 5 +++-- tests/flow_control.rs | 11 +---------- 3 files changed, 17 insertions(+), 32 deletions(-) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 1db912b..c810edf 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -565,16 +565,7 @@ where // No more data frames Ok(None.into()) }, - None => { - if stream.state.is_recv_closed() { - // No more data frames will be received - Ok(None.into()) - } else { - // Request to get notified once more data frames arrive - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) - } - }, + None => self.schedule_recv(stream), } } @@ -590,16 +581,18 @@ where // we do? unimplemented!(); }, - None => { - if stream.state.is_recv_closed() { - // There will be no trailer frame - Ok(None.into()) - } else { - // Request to get notified once another frame arrives - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) - } - }, + None => self.schedule_recv(stream), + } + } + + fn schedule_recv(&mut self, stream: &mut Stream) -> Poll, proto::Error> { + if stream.state.ensure_recv_open()? { + // Request to get notified once more frames arrive + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } else { + // No more frames will be received + Ok(None.into()) } } } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 200e3b0..c6efdd5 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -314,14 +314,15 @@ impl State { } } - pub fn ensure_recv_open(&self) -> Result<(), proto::Error> { + pub fn ensure_recv_open(&self) -> Result { use std::io; // TODO: Is this correct? match self.inner { Closed(Some(Cause::Proto(reason))) => Err(proto::Error::Proto(reason)), Closed(Some(Cause::Io)) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), - _ => Ok(()), + Closed(None) | HalfClosedRemote(..) => Ok(false), + _ => Ok(true), } } } diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 41c3ef0..227e1a8 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -225,17 +225,8 @@ fn recv_data_overflows_connection_window() { .and_then(|resp| { assert_eq!(resp.status(), StatusCode::OK); let body = resp.into_parts().1; + // FIXME: body stream should error also body.concat2().unwrap() - /* FIXME: body stream should error also - .then(|res| { - let err = res.unwrap_err(); - assert_eq!( - err.to_string(), - "protocol error: flow-control protocol violated" - ); - Ok::<(), ()>(()) - }) - */ }); // client should see a flow control error