From 040f391479029c520eba8f36e6482db4ed8fa0b4 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 16 Apr 2018 16:17:29 -0700 Subject: [PATCH] Reset any queued stream on receipt of remote reset (#258) Fixes #256. This PR changes `state::recv_reset` so any closed stream with queued send is immediately reset (and thus, the queue is cleared) on receipt of a `RST_STREAM` frame from the remote. This fixes the panic encountered by the test @goffrie posted in #256, where the stream is scheduled to close, receives a `RST_STREAM` frame, and sets the buffered capacity to 0, but isn't removed from the send queue, so we hit an assertion (or wrap, if debug assertions are disabled) when subtracting a sent frame's size from the buffered size. Signed-off-by: Eliza Weisman --- src/proto/streams/prioritize.rs | 7 +++++-- src/proto/streams/state.rs | 37 ++++++++++++++++++++++++--------- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index c1e0655..ef66ed6 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -593,6 +593,8 @@ impl Prioritize { // To be safe, we just always ask the stream. let is_counted = stream.is_counted(); let is_pending_reset = stream.is_pending_reset_expiration(); + trace!(" --> stream={:?}; is_pending_reset={:?};", + stream.id, is_pending_reset); let frame = match stream.pending_send.pop_front(buffer) { Some(Frame::Data(mut frame)) => { @@ -603,13 +605,14 @@ impl Prioritize { trace!( " --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \ - available={}; requested={}", + available={}; requested={}; buffered={};", frame.stream_id(), sz, frame.is_end_stream(), stream_capacity, stream.send_flow.available(), - stream.requested_send_capacity + stream.requested_send_capacity, + stream.buffered_send_data, ); // Zero length data frames always have capacity to diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 1088f3b..6dcec60 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -214,20 +214,37 @@ impl State { } /// The remote explicitly sent a RST_STREAM. + /// + /// # Arguments + /// - `reason`: the reason field of the received RST_STREAM frame. + /// - `queued`: true if this stream has frames in the pending send queue. pub fn recv_reset(&mut self, reason: Reason, queued: bool) { - match self.inner { - Closed(Cause::EndStream) if queued => { - // If the stream has a queued EOS frame, transition to peer - // reset. - trace!("recv_reset: reason={:?}; queued=true", reason); - self.inner = Closed(Cause::Proto(reason)); - }, - Closed(..) => {}, - _ => { - trace!("recv_reset; reason={:?}", reason); + // If the stream is already in a `Closed` state, do nothing, + // provided that there are no frames still in the send queue. + Closed(..) if !queued => {}, + // A notionally `Closed` stream may still have queued frames in + // the following cases: + // + // - if the cause is `Cause::Scheduled(..)` (i.e. we have not + // actually closed the stream yet). + // - if the cause is `Cause::EndStream`: we transition to this + // state when an EOS frame is *enqueued* (so that it's invalid + // to enqueue more frames), not when the EOS frame is *sent*; + // therefore, there may still be frames ahead of the EOS frame + // in the send queue. + // + // In either of these cases, we want to overwrite the stream's + // previous state with the received RST_STREAM, so that the queue + // will be cleared by `Prioritize::pop_frame`. + state => { + trace!( + "recv_reset; reason={:?}; state={:?}; queued={:?}", + reason, state, queued + ); self.inner = Closed(Cause::Proto(reason)); }, + } }