From 59c92e1089958859ab7f65655f30ca0c825d56fa Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 15 Jul 2017 21:01:11 +0000 Subject: [PATCH] wire up remote settings application --- src/frame/settings.rs | 4 ++ src/proto/connection.rs | 8 ++-- src/proto/flow_control.rs | 85 ++++++++++++++++++++++++++++++++-- src/proto/flow_controller.rs | 2 +- src/proto/framed_read.rs | 12 ++++- src/proto/framed_write.rs | 12 ++++- src/proto/mod.rs | 84 ++++++++++++++++++++++++++-------- src/proto/ping_pong.rs | 14 +++++- src/proto/settings.rs | 19 ++++++-- src/proto/state.rs | 88 ++++++++++++++++-------------------- src/proto/stream_tracker.rs | 52 +++++++++++++++++++-- 11 files changed, 292 insertions(+), 88 deletions(-) diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 70c5d9c..220a150 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -23,6 +23,10 @@ impl SettingSet { pub fn initial_window_size(&self) -> u32 { self.initial_window_size.unwrap_or(65_535) } + + pub fn max_concurrent_streams(&self) -> Option { + self.max_concurrent_streams + } } /// An enum that lists all valid settings that can be sent in a SETTINGS diff --git a/src/proto/connection.rs b/src/proto/connection.rs index d5ee250..b6d5a40 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -101,11 +101,11 @@ impl Connection { assert!(self.sending_window_update.is_none()); let added = if id.is_zero() { - self.local_flow_controller.increment_window_size(incr); + self.local_flow_controller.grow_window(incr); self.local_flow_controller.take_window_update() } else { self.streams.get_mut(&id).and_then(|s| { - s.increment_recv_window_size(incr); + s.grow_recv_window(incr); s.take_recv_window_update() }) }; @@ -126,10 +126,10 @@ impl Connection { } let added = if id.is_zero() { - self.remote_flow_controller.increment_window_size(incr); + self.remote_flow_controller.grow_window(incr); true } else if let Some(mut s) = self.streams.get_mut(&id) { - s.increment_send_window_size(incr); + s.grow_send_window(incr); true } else { false diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 8b27431..012cb61 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -1,12 +1,14 @@ use ConnectionError; use frame::{self, Frame}; -use proto::{ReadySink, StreamMap, StreamTransporter, WindowSize}; +use proto::{ReadySink, StreamMap, ConnectionTransporter, StreamTransporter}; use futures::*; #[derive(Debug)] pub struct FlowControl { inner: T, + initial_local_window_size: u32, + initial_remote_window_size: u32, } impl FlowControl @@ -14,8 +16,85 @@ impl FlowControl T: Sink, SinkError = ConnectionError>, T: StreamTransporter { - pub fn new(inner: T) -> FlowControl { - FlowControl { inner } + pub fn new(initial_local_window_size: u32, + initial_remote_window_size: u32, + inner: T) + -> FlowControl + { + FlowControl { + inner, + initial_local_window_size, + initial_remote_window_size, + } + } +} + +/// Applies an update to an endpoint's initial window size. +/// +/// Per RFC 7540 §6.9.2 +/// +/// > In addition to changing the flow-control window for streams that are not yet +/// > active, a SETTINGS frame can alter the initial flow-control window size for +/// > streams with active flow-control windows (that is, streams in the "open" or +/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE +/// > changes, a receiver MUST adjust the size of all stream flow-control windows that +/// > it maintains by the difference between the new value and the old value. +/// > +/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a +/// > flow-control window to become negative. A sender MUST track the negative +/// > flow-control window and MUST NOT send new flow-controlled frames until it +/// > receives WINDOW_UPDATE frames that cause the flow-control window to become +/// > positive. +impl ConnectionTransporter for FlowControl + where T: ConnectionTransporter, + T: StreamTransporter +{ + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set)?; + + let old_window_size = self.initial_local_window_size; + let new_window_size = set.initial_window_size(); + if new_window_size == old_window_size { + return Ok(()); + } + + { + let mut streams = self.streams_mut(); + if new_window_size < old_window_size { + let decr = old_window_size - new_window_size; + streams.shrink_local_window(decr); + } else { + let incr = new_window_size - old_window_size; + streams.grow_local_window(incr); + } + } + + self.initial_local_window_size = new_window_size; + Ok(()) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set)?; + + let old_window_size = self.initial_remote_window_size; + let new_window_size = set.initial_window_size(); + if new_window_size == old_window_size { + return Ok(()); + } + + { + let mut streams = self.streams_mut(); + if new_window_size < old_window_size { + let decr = old_window_size - new_window_size; + streams.shrink_remote_window(decr); + } else { + let incr = new_window_size - old_window_size; + streams.grow_remote_window(incr); + } + } + + self.initial_remote_window_size = new_window_size; + Ok(()) } } diff --git a/src/proto/flow_controller.rs b/src/proto/flow_controller.rs index 1f12c9d..7c47458 100644 --- a/src/proto/flow_controller.rs +++ b/src/proto/flow_controller.rs @@ -52,7 +52,7 @@ impl FlowController { } /// Applies a window increment immediately. - pub fn increment_window_size(&mut self, sz: WindowSize) { + pub fn grow_window(&mut self, sz: WindowSize) { if sz <= self.underflow { self.underflow -= sz; return; diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index a94a2be..0c34ee4 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -1,7 +1,7 @@ use {hpack, ConnectionError}; use frame::{self, Frame, Kind}; use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE; -use proto::ReadySink; +use proto::{ConnectionTransporter, ReadySink}; use futures::*; @@ -103,6 +103,16 @@ impl FramedRead { } } +impl ConnectionTransporter for FramedRead { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.get_mut().apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.get_mut().apply_remote_settings(set) + } +} + impl Stream for FramedRead where T: AsyncRead, { diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 307f660..1a0def8 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -1,6 +1,6 @@ use {hpack, ConnectionError, FrameSize}; use frame::{self, Frame}; -use proto::ReadySink; +use proto::{ConnectionTransporter, ReadySink}; use futures::*; use tokio_io::{AsyncRead, AsyncWrite}; @@ -78,6 +78,16 @@ impl FramedWrite } } +impl ConnectionTransporter for FramedWrite { + fn apply_local_settings(&mut self, _set: &frame::SettingSet) -> Result<(), ConnectionError> { + Ok(()) + } + + fn apply_remote_settings(&mut self, _set: &frame::SettingSet) -> Result<(), ConnectionError> { + Ok(()) + } +} + impl Sink for FramedWrite where T: AsyncWrite, B: Buf, diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 07dfee4..6e2e06f 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -20,7 +20,7 @@ pub use self::settings::Settings; pub use self::stream_tracker::StreamTracker; use self::state::StreamState; -use {frame, Peer, StreamId}; +use {frame, ConnectionError, Peer, StreamId}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; @@ -31,7 +31,20 @@ use ordermap::OrderMap; use fnv::FnvHasher; use std::hash::BuildHasherDefault; -/// Represents +/// Represents the internals of an HTTP2 connection. +/// +/// A transport consists of several layers (_transporters_) and is arranged from _top_ +/// (near the application) to _bottom_ (near the network). Each transporter implements a +/// Stream of frames received from the remote, and a ReadySink of frames sent to the +/// remote. +/// +/// At the top of the transport, the Settings module is responsible for: +/// - Transmitting local settings to the remote. +/// - Sending settings acknowledgements for all settings frames received from the remote. +/// - Exposing settings upward to the Connection. +/// +/// All transporters below Settings must apply relevant settings before passing a frame on +/// to another level. For example, if the frame writer n type Transport = Settings< FlowControl< @@ -44,15 +57,46 @@ type Framer = FramedRead< FramedWrite>; - pub type WindowSize = u32; -#[derive(Debug)] -struct StreamMap { +#[derive(Debug, Default)] +pub struct StreamMap { inner: OrderMap> } -trait StreamTransporter { +impl StreamMap { + fn shrink_local_window(&mut self, decr: u32) { + for (_, mut s) in &mut self.inner { + s.shrink_recv_window(decr) + } + } + + fn grow_local_window(&mut self, incr: u32) { + for (_, mut s) in &mut self.inner { + s.grow_recv_window(incr) + } + } + + fn shrink_remote_window(&mut self, decr: u32) { + for (_, mut s) in &mut self.inner { + s.shrink_send_window(decr) + } + } + + fn grow_remote_window(&mut self, incr: u32) { + for (_, mut s) in &mut self.inner { + s.grow_send_window(incr) + } + } +} + +/// Allows settings to be applied from the top of the stack to the lower levels.d +pub trait ConnectionTransporter { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; +} + +pub trait StreamTransporter { fn streams(&self)-> &StreamMap; fn streams_mut(&mut self) -> &mut StreamMap; } @@ -70,6 +114,8 @@ pub fn from_io(io: T, settings: frame::SettingSet) // To avoid code duplication, we're going to go this route. It is a bit // weird, but oh well... + // + // We first create a Settings directly around a framed writer let settings = Settings::new( framed_write, settings); @@ -92,30 +138,32 @@ pub fn server_handshaker(io: T, settings: frame::SettingSet) } /// Create a full H2 transport from the server handshaker -pub fn from_server_handshaker(transport: Settings>) +pub fn from_server_handshaker(settings: Settings>) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, B: IntoBuf, { - let settings = transport.swap_inner(|io| { - // Delimit the frames - let framed_read = length_delimited::Builder::new() + let initial_local_window_size = settings.local_settings().initial_window_size(); + let initial_remote_window_size = settings.remote_settings().initial_window_size(); + let local_max_concurrency = settings.local_settings().max_concurrent_streams(); + let remote_max_concurrency = settings.remote_settings().max_concurrent_streams(); + + // Replace Settings' writer with a full transport. + let transport = settings.swap_inner(|io| { + // Delimit the frames. + let framer = length_delimited::Builder::new() .big_endian() .length_field_length(3) .length_adjustment(9) .num_skip(0) // Don't skip the header .new_read(io); - // Map to `Frame` types - let framed = FramedRead::new(framed_read); - - FlowControl::new( - StreamTracker::new( + FlowControl::new(initial_local_window_size, initial_remote_window_size, + StreamTracker::new(local_max_concurrency, remote_max_concurrency, PingPong::new( - framed))) + FramedRead::new(framer)))) }); - // Finally, return the constructed `Connection` - connection::new(settings) + connection::new(transport) } diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index 113b86d..d527225 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,7 +1,7 @@ use ConnectionError; -use frame::{Frame, Ping}; +use frame::{Frame, Ping, SettingSet}; use futures::*; -use proto::ReadySink; +use proto::{ConnectionTransporter, ReadySink}; /// Acknowledges ping requests from the remote. #[derive(Debug)] @@ -22,6 +22,16 @@ impl PingPong } } +impl ConnectionTransporter for PingPong { + fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set) + } +} + impl PingPong where T: Sink, SinkError = ConnectionError>, { diff --git a/src/proto/settings.rs b/src/proto/settings.rs index b0a9369..f62dfd4 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,6 +1,6 @@ use ConnectionError; use frame::{self, Frame}; -use proto::ReadySink; +use proto::{ConnectionTransporter, ReadySink, StreamMap, StreamTransporter}; use futures::*; use tokio_io::AsyncRead; @@ -94,9 +94,20 @@ impl Settings } } +impl StreamTransporter for Settings { + fn streams(&self) -> &StreamMap { + self.inner.streams() + } + + fn streams_mut(&mut self) -> &mut StreamMap { + self.inner.streams_mut() + } +} + impl Stream for Settings where T: Stream, T: Sink, SinkError = ConnectionError>, + T: ConnectionTransporter, { type Item = Frame; type Error = ConnectionError; @@ -112,8 +123,10 @@ impl Stream for Settings // Received new settings, queue an ACK self.remaining_acks += 1; - // Save off the settings - self.remote = v.into_set(); + // Apply the settings before saving them. + let settings = v.into_set(); + self.inner.apply_remote_settings(&settings)?; + self.remote = settings; let _ = try!(self.try_send_pending()); } diff --git a/src/proto/state.rs b/src/proto/state.rs index 28f3ff8..ccb8362 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -47,8 +47,9 @@ use proto::FlowController; #[derive(Debug, Copy, Clone)] pub enum StreamState { Idle, - ReservedLocal, - ReservedRemote, + // TODO: these states shouldn't count against concurrency limits: + //ReservedLocal, + //ReservedRemote, Open { local: PeerState, remote: PeerState, @@ -65,7 +66,7 @@ impl StreamState { /// caller should send the the returned window size increment to the remote. /// /// If the remote is closed, None is returned. - pub fn increment_send_window_size(&mut self, incr: u32) { + pub fn grow_send_window(&mut self, incr: u32) { use self::StreamState::*; use self::PeerState::*; @@ -75,11 +76,27 @@ impl StreamState { match self { &mut Open { remote: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => fc.increment_window_size(incr), + &mut HalfClosedLocal(Data(ref mut fc)) => fc.grow_window(incr), _ => {}, } } + pub fn shrink_send_window(&mut self, decr: u32) { + use self::StreamState::*; + use self::PeerState::*; + + if decr == 0 { + return; + } + + match self { + &mut Open { local: Data(ref mut fc), .. } | + &mut HalfClosedLocal(Data(ref mut fc)) => fc.shrink_window(decr), + _ => {}, + } + } + + /// Consumes newly-advertised capacity to inform the local endpoint it may send more /// data. pub fn take_send_window_update(&mut self) -> Option { @@ -98,7 +115,7 @@ impl StreamState { /// /// Returns the amount of capacity created, accounting for window size changes. The /// caller should send the the returned window size increment to the remote. - pub fn increment_recv_window_size(&mut self, incr: u32) { + pub fn grow_recv_window(&mut self, incr: u32) { use self::StreamState::*; use self::PeerState::*; @@ -108,7 +125,22 @@ impl StreamState { match self { &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedRemote(Data(ref mut fc)) => fc.increment_window_size(incr), + &mut HalfClosedRemote(Data(ref mut fc)) => fc.grow_window(incr), + _ => {}, + } + } + + pub fn shrink_recv_window(&mut self, decr: u32) { + use self::StreamState::*; + use self::PeerState::*; + + if decr == 0 { + return; + } + + match self { + &mut Open { local: Data(ref mut fc), .. } | + &mut HalfClosedRemote(Data(ref mut fc)) => fc.shrink_window(decr), _ => {}, } } @@ -126,46 +158,6 @@ impl StreamState { } } - /// Applies an update to the remote's initial window size. - /// - /// Per RFC 7540 §6.9.2 - /// - /// > In addition to changing the flow-control window for streams that are not yet - /// > active, a SETTINGS frame can alter the initial flow-control window size for - /// > streams with active flow-control windows (that is, streams in the "open" or - /// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE - /// > changes, a receiver MUST adjust the size of all stream flow-control windows that - /// > it maintains by the difference between the new value and the old value. - /// > - /// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a - /// > flow-control window to become negative. A sender MUST track the negative - /// > flow-control window and MUST NOT send new flow-controlled frames until it - /// > receives WINDOW_UPDATE frames that cause the flow-control window to become - /// > positive. - pub fn update_initial_recv_window_size(&mut self, old: u32, new: u32) { - use self::StreamState::*; - use self::PeerState::*; - - match self { - &mut Open { remote: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => { - if new < old { - fc.shrink_window(old - new); - } else { - fc.increment_window_size(new - old); - } - } - _ => {} - } - } - - /// TODO Connection doesn't have an API for local updates yet. - pub fn update_initial_send_window_size(&mut self, _old: u32, _new: u32) { - //use self::StreamState::*; - //use self::PeerState::*; - unimplemented!() - } - /// Transition the state to represent headers being received. /// /// Returns true if this state transition results in iniitializing the @@ -212,8 +204,6 @@ impl StreamState { Closed | HalfClosedRemote(..) => { Err(ProtocolError.into()) } - - _ => unimplemented!(), } } @@ -301,8 +291,6 @@ impl StreamState { Closed | HalfClosedLocal(..) => { Err(UnexpectedFrameType.into()) } - - _ => unimplemented!(), } } diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index b38a714..b962e6f 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -1,30 +1,72 @@ use ConnectionError; use frame::{self, Frame}; -use proto::{ReadySink, StreamMap, StreamTransporter, WindowSize}; +use proto::{ReadySink, StreamMap, ConnectionTransporter, StreamTransporter}; use futures::*; #[derive(Debug)] pub struct StreamTracker { inner: T, + streams: StreamMap, + local_max_concurrency: Option, + remote_max_concurrency: Option, } impl StreamTracker where T: Stream, T: Sink, SinkError = ConnectionError> { - pub fn new(inner: T) -> StreamTracker { - StreamTracker { inner } + pub fn new(local_max_concurrency: Option, + remote_max_concurrency: Option, + inner: T) + -> StreamTracker + { + StreamTracker { + inner, + streams: StreamMap::default(), + local_max_concurrency, + remote_max_concurrency, + } } } impl StreamTransporter for StreamTracker { fn streams(&self) -> &StreamMap { - unimplemented!() + &self.streams } fn streams_mut(&mut self) -> &mut StreamMap { - unimplemented!() + &mut self.streams + } +} + +/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. +/// +/// > Indicates the maximum number of concurrent streams that the sender will allow. This +/// > limit is directional: it applies to the number of streams that the sender permits the +/// > receiver to create. Initially, there is no limit to this value. It is recommended that +/// > this value be no smaller than 100, so as to not unnecessarily limit parallelism. +/// > +/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by +/// > endpoints. A zero value does prevent the creation of new streams; however, this can +/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only +/// > set a zero value for short durations; if a server does not wish to accept requests, +/// > closing the connection is more appropriate. +/// +/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a +/// > value that is below the current number of open streams can either close streams that +/// > exceed the new value or allow streams to complete. +/// +/// This module does NOT close streams when the setting changes. +impl ConnectionTransporter for StreamTracker { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.local_max_concurrency = set.max_concurrent_streams(); + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.remote_max_concurrency = set.max_concurrent_streams(); + self.inner.apply_remote_settings(set) } }