From d269029dd6d398619e464ab1a0903be3e7d06d62 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 10 Jul 2017 00:46:20 +0000 Subject: [PATCH] wip --- src/frame/window_update.rs | 13 ++- src/proto/connection.rs | 223 ++++++++++++++++++++----------------- src/proto/flow_control.rs | 40 +++++++ src/proto/mod.rs | 6 +- src/proto/state.rs | 40 +++++-- src/proto/window_update.rs | 26 ++++- 6 files changed, 227 insertions(+), 121 deletions(-) create mode 100644 src/proto/flow_control.rs diff --git a/src/frame/window_update.rs b/src/frame/window_update.rs index 92ec263..c79f381 100644 --- a/src/frame/window_update.rs +++ b/src/frame/window_update.rs @@ -14,6 +14,13 @@ pub struct WindowUpdate { } impl WindowUpdate { + pub fn new(stream_id: StreamId, increment: Increment) -> WindowUpdate { + WindowUpdate { + stream_id, + increment, + } + } + pub fn stream_id(&self) -> StreamId { self.stream_id } @@ -22,13 +29,13 @@ impl WindowUpdate { self.increment } - /// Builds a `Ping` frame from a raw frame. + /// Builds a `WindowUpdate` frame from a raw frame. pub fn load(head: Head, bytes: &[u8]) -> Result { debug_assert_eq!(head.kind(), ::frame::Kind::WindowUpdate); Ok(WindowUpdate { stream_id: head.stream_id(), - // Clear the most significant bit, as that is reserved and MUST be ignored when - // received. + // Clear the most significant bit, as that is reserved and MUST be ignored + // when received. increment: NetworkEndian::read_u32(bytes) & !INCREMENT_MASK, }) } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index e8e3c9d..5b0f99d 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,8 +1,8 @@ use {Frame, ConnectionError, Peer, StreamId}; use client::Client; use frame::{Frame as WireFrame}; +use proto::{self, FlowController, ReadySink, PeerState, State, WindowUpdate}; use server::Server; -use proto::{self, ReadySink, State, WindowUpdate}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -13,49 +13,11 @@ use futures::*; use ordermap::OrderMap; use fnv::FnvHasher; -use std::marker::PhantomData; +use std::collections::VecDeque; use std::hash::BuildHasherDefault; +use std::marker::PhantomData; -pub struct FlowControlViolation; - -#[derive(Debug)] -struct FlowController { - window_size: u32, - underflow: u32, -} - -impl FlowController { - pub fn new(window_size: u32) -> FlowController { - FlowController { - window_size, - underflow: 0, - } - } - - pub fn shrink(&mut self, mut sz: u32) { - self.underflow += sz; - } - - pub fn consume(&mut self, mut sz: u32) -> Result<(), FlowControlViolation> { - if sz < self.window_size { - self.underflow -= sz; - return Err(FlowControlViolation); - } - - self.window_size -= sz; - Ok(()) - } - - pub fn increment(&mut self, mut sz: u32) { - if sz <= self.underflow { - self.underflow -= sz; - return; - } - - sz -= self.underflow; - self.window_size += sz; - } -} +// TODO get window size from `inner`. /// An H2 connection #[derive(Debug)] @@ -63,13 +25,24 @@ pub struct Connection { inner: proto::Inner, streams: StreamMap, peer: PhantomData

, + + /// Tracks connection-level flow control. local_flow_controller: FlowController, - remote_flow_controller: FlowController, + initial_local_window_size: u32, + pending_local_window_updates: VecDeque, + + remote_flow_controller: FlowController, + initial_remote_window_size: u32, + pending_remote_window_updates: VecDeque, + blocked_remote_window_update: Option } type StreamMap = OrderMap>; -pub fn new(transport: proto::Inner, initial_local_window_size: u32, initial_remote_window_size: u32) -> Connection +pub fn new(transport: proto::Inner, + initial_local_window_size: u32, + initial_remote_window_size: u32) + -> Connection where T: AsyncRead + AsyncWrite, P: Peer, { @@ -77,35 +50,70 @@ pub fn new(transport: proto::Inner, initial_local_window_size: u32, ini inner: transport, streams: StreamMap::default(), peer: PhantomData, + local_flow_controller: FlowController::new(initial_local_window_size), + initial_local_window_size, + pending_local_window_updates: VecDeque::default(), + remote_flow_controller: FlowController::new(initial_remote_window_size), + initial_remote_window_size, + pending_remote_window_updates: VecDeque::default(), + blocked_remote_window_update: None, } } impl Connection { - /// Publishes stream window updates to the remote. + /// Publishes local stream window updates to the remote. /// - /// Connection window updates (StreamId=0) and stream window updates are published + /// Connection window updates (StreamId=0) and stream window must be published /// distinctly. pub fn increment_local_window(&mut self, up: WindowUpdate) { - let incr = up.increment(); - let flow = match up { - WindowUpdate::Connection { .. } => Some(&self.local_flow_controller), - WindowUpdate::Stream { id, .. } => { - self.streams.get(&id).map(|s| s.local_flow_controller()) + let added = match &up { + &WindowUpdate::Connection { increment } => { + if increment == 0 { + false + } else { + self.local_flow_controller.increment(increment); + true + } + } + &WindowUpdate::Stream { id, increment } => { + if increment == 0 { + false + } else { + match self.streams.get_mut(&id) { + Some(&mut State::Open { local: PeerState::Data(ref mut fc), .. }) | + Some(&mut State::HalfClosedRemote(PeerState::Data(ref mut fc))) => { + fc.increment(increment); + true + } + _ => false, + } + } } }; - if let Some(flow) = flow { - flow.increment(incr); + + if added { + self.pending_local_window_updates.push_back(up); } - unimplemented!() } - /// Advertises stream window updates from the remote. + /// Advertises the remote's stream window updates. /// /// Connection window updates (StreamId=0) and stream window updates are advertised /// distinctly. - pub fn poll_remote_window(&mut self) -> Poll { + fn increment_remote_window(&mut self, id: StreamId, incr: u32) { + if id.is_zero() { + self.remote_flow_controller.increment(incr); + } else { + match self.streams.get_mut(&id) { + Some(&mut State::Open { remote: PeerState::Data(ref mut fc), .. }) | + Some(&mut State::HalfClosedLocal(PeerState::Data(ref mut fc))) => { + fc.increment(incr); + } + _ => {} + } + } unimplemented!() } } @@ -154,61 +162,70 @@ impl Stream for Connection fn poll(&mut self) -> Poll, ConnectionError> { trace!("Connection::poll"); - let frame = match try!(self.inner.poll()) { - Async::Ready(f) => f, - Async::NotReady => { - // Because receiving new frames may depend on ensuring that the - // write buffer is clear, `poll_complete` is called here. - let _ = try!(self.poll_complete()); - return Ok(Async::NotReady); - } - }; + loop { + let frame = match try!(self.inner.poll()) { + Async::Ready(f) => f, + Async::NotReady => { + // Because receiving new frames may depend on ensuring that the + // write buffer is clear, `poll_complete` is called here. + let _ = try!(self.poll_complete()); + return Ok(Async::NotReady); + } + }; - trace!("received; frame={:?}", frame); + trace!("received; frame={:?}", frame); - let frame = match frame { - Some(WireFrame::Headers(v)) => { - // TODO: Update stream state - let stream_id = v.stream_id(); - let end_of_stream = v.is_end_stream(); + let frame = match frame { + Some(WireFrame::Headers(v)) => { + // TODO: Update stream state + let stream_id = v.stream_id(); + let end_of_stream = v.is_end_stream(); - let stream_initialized = try!(self.streams.entry(stream_id) - .or_insert(State::default()) - .recv_headers::

(end_of_stream)); + // TODO load window size from settings. + let init_window_size = 65_535; - if stream_initialized { - // TODO: Ensure available capacity for a new stream - // This won't be as simple as self.streams.len() as closed - // connections should not be factored. + let stream_initialized = try!(self.streams.entry(stream_id) + .or_insert(State::default()) + .recv_headers::

(end_of_stream, init_window_size)); - if !P::is_valid_remote_stream_id(stream_id) { - unimplemented!(); + if stream_initialized { + // TODO: Ensure available capacity for a new stream + // This won't be as simple as self.streams.len() as closed + // connections should not be factored. + + if !P::is_valid_remote_stream_id(stream_id) { + unimplemented!(); + } + } + + Frame::Headers { + id: stream_id, + headers: P::convert_poll_message(v), + end_of_stream: end_of_stream, } } + Some(WireFrame::Data(v)) => { + // TODO: Validate frame - Frame::Headers { - id: stream_id, - headers: P::convert_poll_message(v), - end_of_stream: end_of_stream, + let stream_id = v.stream_id(); + let end_of_stream = v.is_end_stream(); + + Frame::Body { + id: stream_id, + chunk: v.into_payload(), + end_of_stream: end_of_stream, + } } - } - Some(WireFrame::Data(v)) => { - // TODO: Validate frame - - let stream_id = v.stream_id(); - let end_of_stream = v.is_end_stream(); - - Frame::Body { - id: stream_id, - chunk: v.into_payload(), - end_of_stream: end_of_stream, + Some(WireFrame::WindowUpdate(v)) => { + self.increment_remote_window(v.stream_id(), v.increment()); + continue; } - } - Some(frame) => panic!("unexpected frame; frame={:?}", frame), - None => return Ok(Async::Ready(None)), - }; + Some(frame) => panic!("unexpected frame; frame={:?}", frame), + None => return Ok(Async::Ready(None)), + }; - Ok(Async::Ready(Some(frame))) + return Ok(Async::Ready(Some(frame))); + } } } @@ -229,13 +246,15 @@ impl Sink for Connection match item { Frame::Headers { id, headers, end_of_stream } => { + // TODO load window size from settings. + let init_window_size = 65_535; + // Transition the stream state, creating a new entry if needed - // // TODO: Response can send multiple headers frames before body // (1xx responses). let stream_initialized = try!(self.streams.entry(id) .or_insert(State::default()) - .send_headers::

(end_of_stream)); + .send_headers::

(end_of_stream, init_window_size)); if stream_initialized { // TODO: Ensure available capacity for a new stream diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs new file mode 100644 index 0000000..3ad4397 --- /dev/null +++ b/src/proto/flow_control.rs @@ -0,0 +1,40 @@ +#[derive(Clone, Copy, Debug)] +pub struct WindowUnderflow; + +#[derive(Copy, Clone, Debug)] +pub struct FlowController { + window_size: u32, + underflow: u32, +} + +impl FlowController { + pub fn new(window_size: u32) -> FlowController { + FlowController { + window_size, + underflow: 0, + } + } + + pub fn shrink(&mut self, sz: u32) { + self.underflow += sz; + } + + pub fn consume(&mut self, sz: u32) -> Result<(), WindowUnderflow> { + if self.window_size < sz { + return Err(WindowUnderflow); + } + + self.window_size -= sz; + Ok(()) + } + + pub fn increment(&mut self, sz: u32) { + if sz <= self.underflow { + self.underflow -= sz; + return; + } + + self.window_size += sz - self.underflow; + self.underflow = 0; + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 66df641..945b408 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,4 +1,5 @@ mod connection; +mod flow_control; mod framed_read; mod framed_write; mod ping_pong; @@ -7,13 +8,14 @@ mod settings; mod state; mod window_update; -pub use self::connection::{Connection}; +pub use self::connection::Connection; +pub use self::flow_control::FlowController; pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; pub use self::ping_pong::PingPong; pub use self::ready::ReadySink; pub use self::settings::Settings; -pub use self::state::State; +pub use self::state::{PeerState, State}; pub use self::window_update::WindowUpdate; use {frame, Peer}; diff --git a/src/proto/state.rs b/src/proto/state.rs index 31700ad..45181ae 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -1,4 +1,5 @@ use {ConnectionError, Reason, Peer}; +use proto::FlowController; /// Represents the state of an H2 stream /// @@ -40,7 +41,7 @@ use {ConnectionError, Reason, Peer}; /// ES: END_STREAM flag /// R: RST_STREAM frame /// ``` -#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[derive(Debug, Copy, Clone)] pub enum State { Idle, ReservedLocal, @@ -54,18 +55,35 @@ pub enum State { Closed, } -#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[derive(Debug, Copy, Clone)] pub enum PeerState { Headers, - Data, + Data(FlowController), } impl State { + pub fn increment_local_window_size(&mut self, incr: u32) { + use self::State::*; + use self::PeerState::*; + + *self = match *self { + Open { local: Data(mut local), remote } => { + local.increment(incr); + Open { local: Data(local), remote } + } + HalfClosedRemote(Data(mut local)) => { + local.increment(incr); + HalfClosedRemote(Data(local)) + } + s => s, + } + } + /// Transition the state to represent headers being received. /// /// Returns true if this state transition results in iniitializing the /// stream id. `Err` is returned if this is an invalid state transition. - pub fn recv_headers(&mut self, eos: bool) -> Result { + pub fn recv_headers(&mut self, eos: bool, remote_window_size: u32) -> Result { use self::State::*; use self::PeerState::*; @@ -76,7 +94,7 @@ impl State { } else { Open { local: Headers, - remote: Data, + remote: Data(FlowController::new(remote_window_size)), } }; @@ -88,7 +106,7 @@ impl State { *self = if eos { HalfClosedRemote(local) } else { - let remote = Data; + let remote = Data(FlowController::new(remote_window_size)); Open { local, remote } }; @@ -100,7 +118,7 @@ impl State { *self = if eos { Closed } else { - HalfClosedLocal(Data) + HalfClosedLocal(Data(FlowController::new(remote_window_size))) }; Ok(false) @@ -116,7 +134,7 @@ impl State { /// /// Returns true if this state transition results in initializing the stream /// id. `Err` is returned if this is an invalid state transition. - pub fn send_headers(&mut self, eos: bool) -> Result { + pub fn send_headers(&mut self, eos: bool, local_window_size: u32) -> Result { use self::State::*; use self::PeerState::*; @@ -126,7 +144,7 @@ impl State { HalfClosedLocal(Headers) } else { Open { - local: Data, + local: Data(FlowController::new(local_window_size)), remote: Headers, } }; @@ -139,7 +157,7 @@ impl State { *self = if eos { HalfClosedLocal(remote) } else { - let local = Data; + let local = Data(FlowController::new(local_window_size)); Open { local, remote } }; @@ -151,7 +169,7 @@ impl State { *self = if eos { Closed } else { - HalfClosedRemote(Data) + HalfClosedRemote(Data(FlowController::new(local_window_size))) }; Ok(false) diff --git a/src/proto/window_update.rs b/src/proto/window_update.rs index e6ea78a..7ba4cfc 100644 --- a/src/proto/window_update.rs +++ b/src/proto/window_update.rs @@ -1,16 +1,36 @@ use StreamId; +use frame; #[derive(Debug)] pub enum WindowUpdate { - Connection { increment: usize }, - Stream { id: StreamId, increment: usize }, + Connection { increment: u32 }, + Stream { id: StreamId, increment: u32 }, } impl WindowUpdate { - pub fn increment(&self) -> usize { + pub fn increment(&self) -> u32 { match *self { WindowUpdate::Connection { increment } | WindowUpdate::Stream { increment, .. } => increment } } } + +impl From for frame::WindowUpdate { + fn from(src: WindowUpdate) -> Self { + match src { + WindowUpdate::Connection { increment } => { + frame::WindowUpdate::new(StreamId::zero(), increment) + } + WindowUpdate::Stream { id, increment } => { + frame::WindowUpdate::new(id, increment) + } + } + } +} + +impl From for frame::Frame { + fn from(src: WindowUpdate) -> Self { + frame::Frame::WindowUpdate(src.into()) + } +}