From 79d3aee1dc909e606ea7b3909361b375ef0e8d2e Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 17 Jul 2017 22:18:03 +0000 Subject: [PATCH] refuse streams that would violate max concurrency settings. improve ping control API --- src/error.rs | 4 + src/frame/data.rs | 1 + src/proto/connection.rs | 16 ++-- src/proto/flow_control.rs | 18 +++-- src/proto/mod.rs | 18 +++-- src/proto/ping_pong.rs | 80 +++++++++++++------ src/proto/settings.rs | 6 +- src/proto/state.rs | 20 ++++- src/proto/stream_tracker.rs | 153 ++++++++++++++++++++++++++++-------- 9 files changed, 234 insertions(+), 82 deletions(-) diff --git a/src/error.rs b/src/error.rs index 91993d6..d5a3429 100644 --- a/src/error.rs +++ b/src/error.rs @@ -84,6 +84,9 @@ pub enum User { /// The stream state has been reset. StreamReset, + /// The application attempted to initiate too many streams to remote. + MaxConcurrencyExceeded, + // TODO: reserve additional variants } @@ -124,6 +127,7 @@ macro_rules! user_desc { FlowControlViolation => concat!($prefix, "flow control violation"), StreamReset => concat!($prefix, "frame sent on reset stream"), Corrupt => concat!($prefix, "connection state corrupt"), + MaxConcurrencyExceeded => concat!($prefix, "stream would exceed remote max concurrency"), } }); } diff --git a/src/frame/data.rs b/src/frame/data.rs index 7e63748..9f71998 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -63,6 +63,7 @@ impl Data { impl Data { pub fn from_buf(stream_id: StreamId, data: T, eos: bool) -> Self { + // TODO ensure that data.remaining() < MAX_FRAME_SIZE let mut flags = DataFlag::default(); if eos { flags.set_end_stream(); diff --git a/src/proto/connection.rs b/src/proto/connection.rs index f23312b..5fc72fd 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -72,19 +72,19 @@ impl ControlPing for Connection self.inner.start_ping(body) } - fn pop_pong(&mut self) -> Option { - self.inner.pop_pong() + fn take_pong(&mut self) -> Option { + self.inner.take_pong() } } -// Note: this is bytes-specific for now so that we can know the payload's length. -impl Connection +impl Connection where T: AsyncRead + AsyncWrite, P: Peer, + B: IntoBuf, { pub fn send_data(self, id: StreamId, - data: Bytes, + data: B, end_of_stream: bool) -> sink::Send { @@ -151,6 +151,8 @@ impl Stream for Connection loop { let frame = match try!(self.inner.poll()) { Async::Ready(f) => f, + + // XXX is this necessary? Async::NotReady => { // Receiving new frames may depend on ensuring that the write buffer // is clear (e.g. if window updates need to be sent), so `poll_complete` @@ -217,7 +219,7 @@ impl Sink for Connection match item { Frame::Headers { id, headers, end_of_stream } => { - if self.inner.stream_is_reset(id) { + if self.inner.stream_is_reset(id).is_some() { return Err(error::User::StreamReset.into()); } @@ -231,7 +233,7 @@ impl Sink for Connection } Frame::Data { id, data, end_of_stream } => { - if self.inner.stream_is_reset(id) { + if self.inner.stream_is_reset(id).is_some() { return Err(error::User::StreamReset.into()); } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 913fe71..a2f0840 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -61,7 +61,7 @@ impl FlowControl { if id.is_zero() { Some(&mut self.connection_local_flow_controller) } else { - self.inner.streams_mut().get_mut(&id).and_then(|s| s.local_flow_controller()) + self.inner.streams_mut().get_mut(id).and_then(|s| s.local_flow_controller()) } } @@ -69,7 +69,7 @@ impl FlowControl { if id.is_zero() { Some(&mut self.connection_remote_flow_controller) } else { - self.inner.streams_mut().get_mut(&id).and_then(|s| s.remote_flow_controller()) + self.inner.streams_mut().get_mut(id).and_then(|s| s.remote_flow_controller()) } } } @@ -84,7 +84,7 @@ impl ControlStreams for FlowControl { self.inner.streams_mut() } - fn stream_is_reset(&self, id: StreamId) -> bool { + fn stream_is_reset(&self, id: StreamId) -> Option { self.inner.stream_is_reset(id) } } @@ -92,7 +92,7 @@ impl ControlStreams for FlowControl { /// Exposes a public upward API for flow control. impl ControlFlow for FlowControl { fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { - if self.stream_is_reset(id) { + if self.stream_is_reset(id).is_some() { return Err(error::User::StreamReset.into()); } @@ -125,7 +125,7 @@ impl ControlFlow for FlowControl { self.pending_local_window_updates.push_back(id); } Ok(()) - } else if self.stream_is_reset(id) { + } else if self.stream_is_reset(id).is_some() { Err(error::User::StreamReset.into()) } else { Err(error::User::InvalidStreamId.into()) @@ -138,8 +138,8 @@ impl ControlPing for FlowControl { self.inner.start_ping(body) } - fn pop_pong(&mut self) -> Option { - self.inner.pop_pong() + fn take_pong(&mut self) -> Option { + self.inner.take_pong() } } @@ -160,7 +160,7 @@ impl FlowControl } while let Some(id) = self.pending_local_window_updates.pop_front() { - if !self.stream_is_reset(id) { + if self.stream_is_reset(id).is_none() { let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update()); if let Some(incr) = update { try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); @@ -270,6 +270,8 @@ impl Stream for FlowControl if self.connection_local_flow_controller.claim_window(sz).is_err() { return Err(error::Reason::FlowControlError.into()) } + // If this frame ends the stream, there may no longer be a flow + // controller. That's fine. if let Some(fc) = self.local_flow_controller(v.stream_id()) { if fc.claim_window(sz).is_err() { return Err(error::Reason::FlowControlError.into()) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index c59db0a..915d557 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,4 +1,5 @@ use {frame, ConnectionError, Peer, StreamId}; +use error::Reason; use frame::SettingSet; use bytes::{Buf, IntoBuf}; @@ -60,8 +61,8 @@ use self::state::{StreamMap, StreamState}; /// /// ### `StreamTracker` /// -/// - Tracks the states of each stream. -/// - **TODO** Enforces maximum concurrency. +/// - Tracks all active streams. +/// - Tracks all reset streams. /// - Exposes `ControlStreams` so that upper layers may share stream state. /// /// ### `PingPong` @@ -126,16 +127,21 @@ pub trait ControlFlow { /// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up /// to Connection). pub trait ControlStreams { + /// Accesses the map of all active streams. fn streams(&self)-> &StreamMap; + + /// Mutably accesses the map of all active streams. fn streams_mut(&mut self) -> &mut StreamMap; - fn stream_is_reset(&self, id: StreamId) -> bool; + + /// Checks whether a stream has been reset. + fn stream_is_reset(&self, id: StreamId) -> Option; } pub type PingPayload = [u8; 8]; pub trait ControlPing { fn start_ping(&mut self, body: PingPayload) -> StartSend; - fn pop_pong(&mut self) -> Option; + fn take_pong(&mut self) -> Option; } /// Create a full H2 transport from an I/O handle. @@ -203,7 +209,9 @@ pub fn from_server_handshaker(settings: Settings initial_remote_window_size, local_max_concurrency, remote_max_concurrency, - PingPong::new(FramedRead::new(framer)) + PingPong::new( + FramedRead::new(framer) + ) ) ) }); diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index a56e001..3f715a3 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -3,14 +3,15 @@ use frame::{Frame, Ping, SettingSet}; use proto::{ApplySettings, ControlPing, PingPayload, ReadySink}; use futures::*; -use std::collections::VecDeque; /// Acknowledges ping requests from the remote. #[derive(Debug)] pub struct PingPong { inner: T, sending_pong: Option>, - received_pongs: VecDeque, + received_pong: Option, + blocked_ping: Option, + expecting_pong: bool, } impl PingPong @@ -21,7 +22,9 @@ impl PingPong PingPong { inner, sending_pong: None, - received_pongs: VecDeque::new(), + received_pong: None, + expecting_pong: false, + blocked_ping: None, } } } @@ -45,14 +48,35 @@ impl ControlPing for PingPong return Ok(AsyncSink::NotReady(body)); } + // Only allow one in-flight ping. + if self.expecting_pong || self.received_pong.is_some() { + self.blocked_ping = Some(task::current()); + return Ok(AsyncSink::NotReady(body)) + } + match self.inner.start_send(Ping::ping(body).into())? { - AsyncSink::NotReady(_) => unreachable!(), - AsyncSink::Ready => Ok(AsyncSink::Ready), + AsyncSink::NotReady(_) => { + // By virtual of calling inner.poll_ready(), this must not happen. + unreachable!() + } + AsyncSink::Ready => { + self.expecting_pong = true; + Ok(AsyncSink::Ready) + } } } - fn pop_pong(&mut self) -> Option { - self.received_pongs.pop_front() + fn take_pong(&mut self) -> Option { + match self.received_pong.take() { + None => None, + Some(p) => { + self.expecting_pong = false; + if let Some(task) = self.blocked_ping.take() { + task.notify(); + } + Some(p) + } + } } } @@ -94,20 +118,20 @@ impl Stream for PingPong match self.inner.poll()? { Async::Ready(Some(Frame::Ping(ping))) => { if ping.is_ack() { - // If we received an ACK, pass it on (nothing to be done here). - return Ok(Async::Ready(Some(ping.into()))); + // Save acknowledgements to be returned from take_pong(). + self.received_pong = Some(ping.into_payload()); + if let Some(task) = self.blocked_ping.take() { + task.notify(); + } + } else { + // Save the ping's payload to be sent as an acknowledgement. + let pong = Ping::pong(ping.into_payload()); + self.sending_pong = Some(pong.into()); } - - // Save a pong to be sent when there is nothing more to be returned - // from the stream or when frames are sent to the sink. - let pong = Ping::pong(ping.into_payload()); - self.sending_pong = Some(pong.into()); } // Everything other than ping gets passed through. - f => { - return Ok(f); - } + f => return Ok(f), } } } @@ -151,6 +175,7 @@ impl ReadySink for PingPong #[cfg(test)] mod test { use super::*; + use proto::ControlPing; use std::cell::RefCell; use std::collections::VecDeque; use std::rc::Rc; @@ -253,12 +278,10 @@ mod test { trans.from_socket.push_back(pong.into()); } - match ping_pong.poll().unwrap() { - Async::Ready(Some(Frame::Ping(pong))) => { - assert!(pong.is_ack()); - assert_eq!(&pong.into_payload(), b"buoyant!"); - } - f => panic!("unexpected frame: {:?}", f), + assert!(ping_pong.poll().unwrap().is_not_ready()); + match ping_pong.take_pong() { + Some(pong) => assert_eq!(&pong, b"buoyant!"), + None => panic!("no pong received"), } { @@ -327,4 +350,15 @@ mod test { self.poll_complete() } } + + impl ReadySink for Transport { + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + let mut trans = self.0.borrow_mut(); + if trans.closing || trans.start_send_blocked { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + } + } } diff --git a/src/proto/settings.rs b/src/proto/settings.rs index d5ae257..2b267b2 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -104,7 +104,7 @@ impl ControlStreams for Settings { self.inner.streams_mut() } - fn stream_is_reset(&self, id: StreamId) -> bool { + fn stream_is_reset(&self, id: StreamId) -> Option { self.inner.stream_is_reset(id) } } @@ -124,8 +124,8 @@ impl ControlPing for Settings { self.inner.start_ping(body) } - fn pop_pong(&mut self) -> Option { - self.inner.pop_pong() + fn take_pong(&mut self) -> Option { + self.inner.take_pong() } } diff --git a/src/proto/state.rs b/src/proto/state.rs index f7afb0a..611c81f 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -296,16 +296,28 @@ pub struct StreamMap { } impl StreamMap { - pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut StreamState> { - self.inner.get_mut(id) + pub fn get_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { + self.inner.get_mut(&id) + } + + pub fn has_stream(&mut self, id: StreamId) -> bool { + self.inner.contains_key(&id) + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn len(&self) -> usize { + self.inner.len() } pub fn entry(&mut self, id: StreamId) -> Entry> { self.inner.entry(id) } - pub fn remove(&mut self, id: &StreamId) -> Option { - self.inner.remove(id) + pub fn remove(&mut self, id: StreamId) -> Option { + self.inner.remove(&id) } pub fn shrink_all_local_windows(&mut self, decr: u32) { diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index cb0f62d..41a452a 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -9,17 +9,26 @@ use ordermap::OrderMap; use std::hash::BuildHasherDefault; use std::marker::PhantomData; +// TODO enforce local_max_concurrency. +// TODO enforce remote_max_concurrency. +// TODO reset_streams nees to be bounded. +// TODO track reserved streams (PUSH_PROMISE) + #[derive(Debug)] pub struct StreamTracker { inner: T, peer: PhantomData

, + active_streams: StreamMap, // TODO reserved_streams: HashSet reset_streams: OrderMap>, + local_max_concurrency: Option, remote_max_concurrency: Option, initial_local_window_size: WindowSize, initial_remote_window_size: WindowSize, + + pending_refused_stream: Option, } impl StreamTracker @@ -37,8 +46,11 @@ impl StreamTracker StreamTracker { inner, peer: PhantomData, + active_streams: StreamMap::default(), reset_streams: OrderMap::default(), + pending_refused_stream: None, + local_max_concurrency, remote_max_concurrency, initial_local_window_size, @@ -47,28 +59,48 @@ impl StreamTracker } } +impl StreamTracker + where T: Sink, SinkError = ConnectionError>, + P: Peer +{ + fn send_refusal(&mut self, id: StreamId) -> Poll<(), ConnectionError> { + debug_assert!(self.pending_refused_stream.is_none()); + + let f = frame::Reset::new(id, Reason::RefusedStream); + match self.inner.start_send(f.into())? { + AsyncSink::Ready => { + self.reset_streams.insert(id, Reason::RefusedStream); + Ok(Async::Ready(())) + } + AsyncSink::NotReady(_) => { + self.pending_refused_stream = Some(id); + Ok(Async::NotReady) + } + } + } +} + impl ControlStreams for StreamTracker { - #[inline] fn streams(&self) -> &StreamMap { &self.active_streams } - #[inline] fn streams_mut(&mut self) -> &mut StreamMap { &mut self.active_streams } - fn stream_is_reset(&self, id: StreamId) -> bool { - self.reset_streams.contains_key(&id) + fn stream_is_reset(&self, id: StreamId) -> Option { + self.reset_streams.get(&id).map(|r| *r) } } /// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. /// /// > Indicates the maximum number of concurrent streams that the sender will allow. This -/// > limit is directional: it applies to the number of streams that the sender permits the -/// > receiver to create. Initially, there is no limit to this value. It is recommended that -/// > this value be no smaller than 100, so as to not unnecessarily limit parallelism. +/// > limit is directional: it applies to the number of streams that the sender permits +/// > the receiver to create. Initially, there is no limit to this value. It is +/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit +/// > parallelism. /// > /// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by /// > endpoints. A zero value does prevent the creation of new streams; however, this can @@ -104,13 +136,14 @@ impl ControlPing for StreamTracker self.inner.start_ping(body) } - fn pop_pong(&mut self) -> Option { - self.inner.pop_pong() + fn take_pong(&mut self) -> Option { + self.inner.take_pong() } } -impl Stream for StreamTracker +impl Stream for StreamTracker where T: Stream, + T: Sink, SinkError = ConnectionError>, P: Peer, { type Item = T::Item; @@ -119,6 +152,12 @@ impl Stream for StreamTracker fn poll(&mut self) -> Poll, T::Error> { use frame::Frame::*; + // The local must complete refusing the remote stream before processing additional + // frames. + if let Some(id) = self.pending_refused_stream.take() { + try_ready!(self.send_refusal(id)); + } + loop { match try_ready!(self.inner.poll()) { Some(Headers(v)) => { @@ -129,6 +168,23 @@ impl Stream for StreamTracker continue; } + // Ensure that receiving this frame will not violate the local max + // stream concurrency setting. Ensure that the stream is refused + // before processing additional frames. + if let Some(max) = self.local_max_concurrency { + let max = max as usize; + if !self.active_streams.has_stream(id) + && self.active_streams.len() >= max - 1 { + // This frame would violate our local max concurrency, so reject + // the stream. + try_ready!(self.send_refusal(id)); + + // Try to process another frame (hopefully for an active + // stream). + continue; + } + } + let is_closed = { let stream = self.active_streams.entry(id) .or_insert_with(|| StreamState::default()); @@ -137,10 +193,6 @@ impl Stream for StreamTracker stream.recv_headers::

(eos, self.initial_local_window_size)?; if initialized { - // TODO: Ensure available capacity for a new stream - // This won't be as simple as self.streams.len() as closed - // connections should not be factored. - if !P::is_valid_remote_stream_id(id) { return Err(Reason::ProtocolError.into()); } @@ -150,7 +202,7 @@ impl Stream for StreamTracker }; if is_closed { - self.active_streams.remove(&id); + self.active_streams.remove(id); self.reset_streams.insert(id, Reason::NoError); } @@ -165,7 +217,7 @@ impl Stream for StreamTracker } let is_closed = { - let stream = match self.active_streams.get_mut(&id) { + let stream = match self.active_streams.get_mut(id) { None => return Err(Reason::ProtocolError.into()), Some(s) => s, }; @@ -174,7 +226,7 @@ impl Stream for StreamTracker }; if is_closed { - self.active_streams.remove(&id); + self.active_streams.remove(id); self.reset_streams.insert(id, Reason::NoError); } @@ -187,7 +239,7 @@ impl Stream for StreamTracker // Set or update the reset reason. self.reset_streams.insert(id, v.reason()); - if self.active_streams.remove(&id).is_some() { + if self.active_streams.remove(id).is_some() { return Ok(Async::Ready(Some(Reset(v)))); } } @@ -210,7 +262,6 @@ impl Stream for StreamTracker } } - impl Sink for StreamTracker where T: Sink, SinkError = ConnectionError>, P: Peer, @@ -222,10 +273,18 @@ impl Sink for StreamTracker use frame::Frame::*; // Must be enforced through higher levels. - debug_assert!(!self.stream_is_reset(item.stream_id())); + debug_assert!(self.stream_is_reset(item.stream_id()).is_none()); - match &item { - &Headers(ref v) => { + // The local must complete refusing the remote stream before sending any other + // frames. + if let Some(id) = self.pending_refused_stream.take() { + if self.send_refusal(id)?.is_not_ready() { + return Ok(AsyncSink::NotReady(item)); + } + } + + match item { + Headers(v) => { let id = v.stream_id(); let eos = v.is_end_stream(); @@ -237,6 +296,24 @@ impl Sink for StreamTracker // ACTUALLY(ver), maybe not? // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 + // Ensure that sending this frame would not violate the remote's max + // stream concurrency setting. + if let Some(max) = self.remote_max_concurrency { + let max = max as usize; + if !self.active_streams.has_stream(id) + && self.active_streams.len() >= max - 1 { + // This frame would violate our local max concurrency, so reject + // the stream. + if self.send_refusal(id)?.is_not_ready() { + return Ok(AsyncSink::NotReady(Headers(v))); + } + + // Try to process another frame (hopefully for an active + // stream). + return Err(User::MaxConcurrencyExceeded.into()) + } + } + let is_closed = { let stream = self.active_streams.entry(id) .or_insert_with(|| StreamState::default()); @@ -258,32 +335,40 @@ impl Sink for StreamTracker }; if is_closed { - self.active_streams.remove(&id); + self.active_streams.remove(id); self.reset_streams.insert(id, Reason::NoError); } + + self.inner.start_send(Headers(v)) } - &Data(ref v) => { - match self.active_streams.get_mut(&v.stream_id()) { + Data(v) => { + match self.active_streams.get_mut(v.stream_id()) { None => return Err(User::InactiveStreamId.into()), - Some(stream) => stream.send_data(v.is_end_stream())?, + Some(stream) => { + stream.send_data(v.is_end_stream())?; + self.inner.start_send(Data(v)) + } + } } - - &Reset(ref v) => { + Reset(v) => { let id = v.stream_id(); - self.active_streams.remove(&id); + self.active_streams.remove(id); self.reset_streams.insert(id, v.reason()); + self.inner.start_send(Reset(v)) } - _ => {} + frame => self.inner.start_send(frame), } - - self.inner.start_send(item) } fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + if let Some(id) = self.pending_refused_stream.take() { + try_ready!(self.send_refusal(id)); + } + self.inner.poll_complete() } } @@ -296,6 +381,10 @@ impl ReadySink for StreamTracker P: Peer, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + if let Some(id) = self.pending_refused_stream.take() { + try_ready!(self.send_refusal(id)); + } + self.inner.poll_ready() } }