From 2be2523162c0401069194b575b36ecfc3a09e71b Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 28 Nov 2017 13:42:22 -0800 Subject: [PATCH] notify stream refs when the connection receives EOF (#176) --- src/proto/connection.rs | 2 +- src/proto/streams/recv.rs | 5 +++++ src/proto/streams/state.rs | 10 +++++++++ src/proto/streams/streams.rs | 17 ++++++++++++++ tests/client_request.rs | 43 ++++++++++++++++++++++++++++++++++++ 5 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 11cfb04..6db375a 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -277,8 +277,8 @@ where // TODO: handle }, None => { - // TODO: Is this correct? trace!("codec closed"); + self.streams.recv_eof(); return Ok(Async::Ready(())); }, } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index b210ad6..aa9745e 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -497,6 +497,11 @@ impl Recv { stream.notify_recv(); } + pub fn recv_eof(&mut self, stream: &mut Stream) { + stream.state.recv_eof(); + stream.notify_recv(); + } + fn next_stream_id(&self) -> Result { if let Ok(id) = self.next_stream_id { Ok(id) diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index ae50427..1c4a28e 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -223,6 +223,16 @@ impl State { } } + pub fn recv_eof(&mut self) { + match self.inner { + Closed(..) => {}, + s => { + trace!("recv_eof; state={:?}", s); + self.inner = Closed(Some(Cause::Io)); + } + } + } + /// Indicates that the local side will not send more data to the local. pub fn send_close(&mut self) { match self.inner { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index ad15047..3dd449d 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -272,6 +272,23 @@ where actions.conn_error = Some(err); } + pub fn recv_eof(&mut self) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let actions = &mut me.actions; + let counts = &mut me.counts; + + me.store + .for_each(|stream| { + counts.transition(stream, |_, stream| { + actions.recv.recv_eof(stream); + Ok::<_, ()>(()) + }) + }) + .expect("recv_eof"); + } + pub fn last_processed_id(&self) -> StreamId { self.inner.lock().unwrap().actions.recv.last_processed_id() } diff --git a/tests/client_request.rs b/tests/client_request.rs index 9c445fb..f2cb613 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -319,6 +319,49 @@ fn request_with_connection_headers() { client.join(srv).wait().expect("wait"); } +#[test] +fn connection_close_notifies_response_future() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + // don't send any response, just close + .close(); + + let client = Client::handshake(io) + .expect("handshake") + .and_then(|(mut client, conn)| { + let request = Request::builder() + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + + let req = client + .send_request(request, true) + .expect("send_request1") + .0 + .then(|res| { + let err = res.expect_err("response"); + assert_eq!( + err.to_string(), + "broken pipe" + ); + Ok(()) + }); + + conn.expect("conn").join(req) + }); + + client.join(srv).wait().expect("wait"); +} + #[test] fn sending_request_on_closed_connection() { let _ = ::env_logger::init();