From 23090c9fed169985a591b864541a2ba02e87c016 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 27 Mar 2018 21:20:16 -0700 Subject: [PATCH] recv_reset resets closed streams with queued EOS frames (#247) --- src/proto/streams/prioritize.rs | 3 +- src/proto/streams/recv.rs | 2 +- src/proto/streams/state.rs | 9 ++++- tests/stream_states.rs | 62 +++++++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 3 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index c543668..c1e0655 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -575,7 +575,8 @@ impl Prioritize { loop { match self.pending_send.pop(store) { Some(mut stream) => { - trace!("pop_frame; stream={:?}", stream.id); + trace!("pop_frame; stream={:?}; stream.state={:?}", + stream.id, stream.state); // If the stream receives a RESET from the peer, it may have // had data buffered to be sent, but all the frames are cleared diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 2dc587c..6e2d524 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -590,7 +590,7 @@ impl Recv { /// Handle remote sending an explicit RST_STREAM. pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { // Notify the stream - stream.state.recv_reset(frame.reason()); + stream.state.recv_reset(frame.reason(), stream.is_pending_send); stream.notify_send(); stream.notify_recv(); diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 94f093d..1088f3b 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -214,8 +214,15 @@ impl State { } /// The remote explicitly sent a RST_STREAM. - pub fn recv_reset(&mut self, reason: Reason) { + pub fn recv_reset(&mut self, reason: Reason, queued: bool) { + match self.inner { + Closed(Cause::EndStream) if queued => { + // If the stream has a queued EOS frame, transition to peer + // reset. + trace!("recv_reset: reason={:?}; queued=true", reason); + self.inner = Closed(Cause::Proto(reason)); + }, Closed(..) => {}, _ => { trace!("recv_reset; reason={:?}", reason); diff --git a/tests/stream_states.rs b/tests/stream_states.rs index 5254fa9..d7ed610 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -808,3 +808,65 @@ fn send_data_after_headers_eos() { fn exceed_max_streams() { } */ + + +#[test] +fn rst_while_closing() { + // Test to reproduce panic in issue #246 --- receipt of a RST_STREAM frame + // on a stream in the Half Closed (remote) state with a queued EOS causes + // a panic. + let _ = ::env_logger::try_init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + ) + .send_frame(frames::headers(1).response(200)) + .send_frame(frames::headers(1).eos()) + // Idling for a moment here is necessary to ensure that the client + // enqueues its TRAILERS frame *before* we send the RST_STREAM frame + // which causes the panic. + .idle_ms(1) + // Send the RST_STREAM frame which causes the client to panic. + .send_frame(frames::reset(1).cancel()) + .ping_pong([1; 8]) + .close(); + ; + + let client = client::handshake(io) + .expect("handshake") + .and_then(|(mut client, conn)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://example.com/") + .body(()) + .unwrap(); + + // The request should be left streaming. + let (resp, mut stream) = client.send_request(request, false) + .expect("send_request"); + let req = resp + // on receipt of an EOS response from the server, transition + // the stream Open => Half Closed (remote). + .expect("response") + .and_then(move |resp| { + assert_eq!(resp.status(), StatusCode::OK); + // Enqueue trailers frame. + let _ = stream.send_trailers(HeaderMap::new()); + Ok(()) + }) + .map_err(|()| -> Error { + unreachable!() + }); + + conn.drive(req) + .and_then(|(conn, _)| conn.expect("client")) + }); + + + client.join(srv).wait().expect("wait"); +}