From 91aa1db2ff3c408ec9d4e1f68021e7527c9a38c6 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 25 Aug 2017 10:21:47 -0700 Subject: [PATCH] Misc protocol fixes * Verify contiuation frame stream ID * Fix sending RST_STREAM in certain cases. --- src/proto/framed_read.rs | 5 ++++ src/proto/streams/prioritize.rs | 15 ++++++++--- src/proto/streams/send.rs | 44 +++++++++++++++++++++++++++------ src/proto/streams/state.rs | 19 +++++++------- src/proto/streams/store.rs | 27 +++++++++++++------- src/proto/streams/streams.rs | 1 + 6 files changed, 83 insertions(+), 28 deletions(-) diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 32376e7..8eef567 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -133,6 +133,11 @@ impl FramedRead { match partial.frame { Continuable::Headers(mut frame) => { + // The stream identifiers must match + if frame.stream_id() != head.stream_id() { + return Err(ProtocolError.into()); + } + frame.load_hpack(partial.buf, &mut self.hpack)?; frame.into() } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 0a8ce2b..58f5fdf 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -1,4 +1,5 @@ use super::*; +use super::store::Resolve; use bytes::buf::Take; use futures::Sink; @@ -179,13 +180,23 @@ impl Prioritize { // Update the connection's window self.flow.inc_window(inc)?; + + self.assign_connection_capacity(inc, store); + Ok(()) + } + + pub fn assign_connection_capacity(&mut self, + inc: WindowSize, + store: &mut R) + where R: Resolve + { self.flow.assign_capacity(inc); // Assign newly acquired capacity to streams pending capacity. while self.flow.available() > 0 { let mut stream = match self.pending_capacity.pop(store) { Some(stream) => stream, - None => return Ok(()), + None => return, }; // Try to assign capacity to the stream. This will also re-queue the @@ -193,8 +204,6 @@ impl Prioritize // the capacity request. self.try_assign_capacity(&mut stream); } - - Ok(()) } /// Request capacity to send data diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 3eeabbd..d893bb8 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -101,7 +101,10 @@ impl Send where B: Buf { Ok(()) } - pub fn send_reset(&mut self, reason: Reason, + /// This is called by the user to send a reset and should not be called + /// by internal state transitions. Use `reset_stream` for that. + pub fn send_reset(&mut self, + reason: Reason, stream: &mut store::Ptr, task: &mut Option) -> Result<(), ConnectionError> @@ -111,17 +114,44 @@ impl Send where B: Buf { return Err(InactiveStreamId.into()) } - stream.state.send_reset(reason)?; + self.reset_stream(reason, stream, task); + Ok(()) + } + + fn reset_stream(&mut self, + reason: Reason, + stream: &mut store::Ptr, + task: &mut Option) + { + if stream.state.is_reset() { + // Don't double reset + return; + } + + // If closed AND the send queue is flushed, then the stream cannot be + // reset either + if stream.state.is_closed() && stream.pending_send.is_empty() { + return; + } + + // Transition the state + stream.state.set_reset(reason); + + // Clear all pending outbound frames + self.prioritize.clear_queue(stream); + + // Reclaim all capacity assigned to the stream and re-assign it to the + // connection + let available = stream.send_flow.available(); + stream.send_flow.claim_capacity(available); let frame = frame::Reset::new(stream.id, reason); - // TODO: This could impact connection level flow control. - self.prioritize.clear_queue(stream); - trace!("send_reset -- queueing; frame={:?}", frame); self.prioritize.queue_frame(frame.into(), stream, task); - Ok(()) + // Re-assign all capacity to the connection + self.prioritize.assign_connection_capacity(available, stream); } pub fn send_data(&mut self, @@ -210,7 +240,7 @@ impl Send where B: Buf { { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { debug!("recv_stream_window_update !!; err={:?}", e); - self.send_reset(FlowControlError.into(), stream, task)?; + self.reset_stream(FlowControlError.into(), stream, task); } Ok(()) diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index e72556c..98783a4 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -241,16 +241,17 @@ impl State { } } - /// Indicates that the local side will not send more data to the local. - pub fn send_reset(&mut self, reason: Reason) -> Result<(), ConnectionError> { + /// Set the stream state to reset + pub fn set_reset(&mut self, reason: Reason) { + debug_assert!(!self.is_reset()); + self.inner = Closed(Some(Cause::Proto(reason))); + } + + /// Returns true if the stream is already reset. + pub fn is_reset(&self) -> bool { match self.inner { - Idle => Err(ProtocolError.into()), - Closed(..) => Ok(()), - _ => { - trace!("send_reset: => Closed"); - self.inner = Closed(Some(Cause::Proto(reason))); - Ok(()) - } + Closed(Some(_)) => true, + _ => false, } } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 82e7f86..0954c1f 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -62,6 +62,10 @@ pub(super) struct VacantEntry<'a, B: 'a> { slab: &'a mut slab::Slab>, } +pub(super) trait Resolve { + fn resolve(&mut self, key: Key) -> Ptr; +} + // ===== impl Store ===== impl Store { @@ -72,13 +76,6 @@ impl Store { } } - pub fn resolve(&mut self, key: Key) -> Ptr { - Ptr { - key: key, - slab: &mut self.slab, - } - } - pub fn find_mut(&mut self, id: &StreamId) -> Option> { if let Some(&key) = self.ids.get(id) { Some(Ptr { @@ -132,6 +129,15 @@ impl Store { } } +impl Resolve for Store { + fn resolve(&mut self, key: Key) -> Ptr { + Ptr { + key: key, + slab: &mut self.slab, + } + } +} + impl ops::Index for Store { type Output = Stream; @@ -209,7 +215,8 @@ impl Queue true } - pub fn pop<'a>(&mut self, store: &'a mut Store) -> Option> + pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option> + where R: Resolve { if let Some(mut idxs) = self.indices { let mut stream = store.resolve(idxs.head); @@ -238,8 +245,10 @@ impl<'a, B: 'a> Ptr<'a, B> { pub fn key(&self) -> Key { self.key } +} - pub fn resolve(&mut self, key: Key) -> Ptr { +impl<'a, B: 'a> Resolve for Ptr<'a, B> { + fn resolve(&mut self, key: Key) -> Ptr { Ptr { key: key, slab: &mut *self.slab, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index ab2baa8..a2d3cf3 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,6 +1,7 @@ use {client, server, HeaderMap}; use proto::*; use super::*; +use super::store::Resolve; use std::sync::{Arc, Mutex};