From 1ed4b7e56a0d2b0d997509df7fa6ec12b7623d69 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Sat, 15 Jul 2017 18:39:45 +0000 Subject: [PATCH] wip: Sketch out stream state refactor introduce the StreamTransporter trait, which exposes a map containing all active stream states. add skeletons for StreamTracker and FlowControl. StreamTracker drives all state changes --- src/proto/connection.rs | 13 ++-- src/proto/flow_control.rs | 125 +++++++++++++++++------------------ src/proto/flow_controller.rs | 83 +++++++++++++++++++++++ src/proto/mod.rs | 47 +++++++++---- src/proto/ping_pong.rs | 10 +-- src/proto/state.rs | 30 ++++----- src/proto/stream_tracker.rs | 67 +++++++++++++++++++ src/proto/window_update.rs | 36 ---------- 8 files changed, 270 insertions(+), 141 deletions(-) create mode 100644 src/proto/flow_controller.rs create mode 100644 src/proto/stream_tracker.rs delete mode 100644 src/proto/window_update.rs diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 67ce03c..d5ee250 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -2,7 +2,7 @@ use {Frame, FrameSize}; use client::Client; use error::{self, ConnectionError}; use frame::{self, StreamId}; -use proto::{self, Peer, ReadySink, State, FlowController, WindowSize}; +use proto::{self, Peer, ReadySink, StreamState, FlowController, WindowSize}; use server::Server; use tokio_io::{AsyncRead, AsyncWrite}; @@ -14,15 +14,14 @@ use futures::*; use ordermap::OrderMap; use fnv::FnvHasher; - use std::hash::BuildHasherDefault; use std::marker::PhantomData; /// An H2 connection #[derive(Debug)] pub struct Connection { - inner: proto::Inner, - streams: StreamMap, + inner: proto::Transport, + streams: StreamMap, peer: PhantomData

, /// Tracks the connection-level flow control window for receiving data from the @@ -41,7 +40,7 @@ pub struct Connection { type StreamMap = OrderMap>; -pub fn new(transport: proto::Inner) +pub fn new(transport: proto::Transport) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, @@ -256,7 +255,7 @@ impl Stream for Connection let init_window_size = self.inner.local_settings().initial_window_size(); let stream_initialized = try!(self.streams.entry(stream_id) - .or_insert(State::default()) + .or_insert(StreamState::default()) .recv_headers::

(end_of_stream, init_window_size)); if stream_initialized { @@ -347,7 +346,7 @@ impl Sink for Connection // ACTUALLY(ver), maybe not? // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 let stream_initialized = try!(self.streams.entry(id) - .or_insert(State::default()) + .or_insert(StreamState::default()) .send_headers::

(end_of_stream, init_window_size)); if stream_initialized { diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index f6655f8..8b27431 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -1,77 +1,70 @@ -use proto::WindowSize; +use ConnectionError; +use frame::{self, Frame}; +use proto::{ReadySink, StreamMap, StreamTransporter, WindowSize}; -#[derive(Clone, Copy, Debug)] -pub struct WindowUnderflow; +use futures::*; -pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; - -#[derive(Copy, Clone, Debug)] -pub struct FlowController { - /// Amount that may be claimed. - window_size: WindowSize, - /// Amount to be removed by future increments. - underflow: WindowSize, - /// The amount that has been incremented but not yet advertised (to the application or - /// the remote). - next_window_update: WindowSize, +#[derive(Debug)] +pub struct FlowControl { + inner: T, } -impl Default for FlowController { - fn default() -> Self { - Self::new(DEFAULT_INITIAL_WINDOW_SIZE) +impl FlowControl + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: StreamTransporter +{ + pub fn new(inner: T) -> FlowControl { + FlowControl { inner } } } -impl FlowController { - pub fn new(window_size: WindowSize) -> FlowController { - FlowController { - window_size, - underflow: 0, - next_window_update: 0, - } +impl StreamTransporter for FlowControl { + fn streams(&self) -> &StreamMap { + self.inner.streams() } - /// Reduce future capacity of the window. - /// - /// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. - pub fn shrink_window(&mut self, decr: WindowSize) { - self.underflow += decr; - } - - /// Claims the provided amount from the window, if there is enough space. - /// - /// Fails when `take_window_update()` hasn't returned at least `sz` more bytes than - /// have been previously claimed. - pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> { - if self.window_size < sz { - return Err(WindowUnderflow); - } - - self.window_size -= sz; - Ok(()) - } - - /// Applies a window increment immediately. - pub fn increment_window_size(&mut self, sz: WindowSize) { - if sz <= self.underflow { - self.underflow -= sz; - return; - } - - let added = sz - self.underflow; - self.window_size += added; - self.next_window_update += added; - self.underflow = 0; - } - - /// Obtains and clears an unadvertised window update. - pub fn take_window_update(&mut self) -> Option { - if self.next_window_update == 0 { - return None; - } - - let incr = self.next_window_update; - self.next_window_update = 0; - Some(incr) + fn streams_mut(&mut self) -> &mut StreamMap { + self.inner.streams_mut() + } +} + +impl Stream for FlowControl + where T: Stream, + T: StreamTransporter, + { + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll, T::Error> { + self.inner.poll() + } +} + + +impl Sink for FlowControl + where T: Sink, SinkError = ConnectionError>, + T: StreamTransporter, + { + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: Frame) -> StartSend { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + self.inner.poll_complete() + } +} + +impl ReadySink for FlowControl + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ReadySink, + T: StreamTransporter, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() } } diff --git a/src/proto/flow_controller.rs b/src/proto/flow_controller.rs new file mode 100644 index 0000000..1f12c9d --- /dev/null +++ b/src/proto/flow_controller.rs @@ -0,0 +1,83 @@ +use proto::WindowSize; + +#[derive(Clone, Copy, Debug)] +pub struct WindowUnderflow; + +pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; + +#[derive(Copy, Clone, Debug)] +pub struct FlowController { + /// Amount that may be claimed. + window_size: WindowSize, + /// Amount to be removed by future increments. + underflow: WindowSize, + /// The amount that has been incremented but not yet advertised (to the application or + /// the remote). + next_window_update: WindowSize, +} + +impl Default for FlowController { + fn default() -> Self { + Self::new(DEFAULT_INITIAL_WINDOW_SIZE) + } +} + +impl FlowController { + pub fn new(window_size: WindowSize) -> FlowController { + FlowController { + window_size, + underflow: 0, + next_window_update: 0, + } + } + + /// Reduce future capacity of the window. + /// + /// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. + pub fn shrink_window(&mut self, decr: WindowSize) { + self.underflow += decr; + } + + /// Claims the provided amount from the window, if there is enough space. + /// + /// Fails when `take_window_update()` hasn't returned at least `sz` more bytes than + /// have been previously claimed. + pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> { + if self.window_size < sz { + return Err(WindowUnderflow); + } + + self.window_size -= sz; + Ok(()) + } + + /// Applies a window increment immediately. + pub fn increment_window_size(&mut self, sz: WindowSize) { + if sz <= self.underflow { + self.underflow -= sz; + return; + } + + let added = sz - self.underflow; + self.window_size += added; + self.next_window_update += added; + self.underflow = 0; + } + + /// Obtains and clears an unadvertised window update. + pub fn take_window_update(&mut self) -> Option { + if self.next_window_update == 0 { + return None; + } + + let incr = self.next_window_update; + self.next_window_update = 0; + Some(incr) + } +} + +#[test] +fn test() { + let mut fc = FlowController::new(65_535); + +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 5fb5c12..07dfee4 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,43 +1,62 @@ mod connection; mod flow_control; +mod flow_controller; mod framed_read; mod framed_write; mod ping_pong; mod ready; mod settings; mod state; -mod window_update; +mod stream_tracker; pub use self::connection::Connection; -pub use self::flow_control::FlowController; +pub use self::flow_control::FlowControl; +pub use self::flow_controller::FlowController; pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; pub use self::ping_pong::PingPong; pub use self::ready::ReadySink; pub use self::settings::Settings; -pub use self::state::{PeerState, State}; -pub use self::window_update::WindowUpdate; +pub use self::stream_tracker::StreamTracker; +use self::state::StreamState; -use {frame, Peer}; +use {frame, Peer, StreamId}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; use bytes::{Buf, IntoBuf}; -type Inner = - Settings< - PingPong< - Framed, - B>>; +use ordermap::OrderMap; +use fnv::FnvHasher; +use std::hash::BuildHasherDefault; -type Framed = +/// Represents +type Transport = + Settings< + FlowControl< + StreamTracker< + PingPong< + Framer, + B>>>>; + +type Framer = FramedRead< FramedWrite>; pub type WindowSize = u32; +#[derive(Debug)] +struct StreamMap { + inner: OrderMap> +} + +trait StreamTransporter { + fn streams(&self)-> &StreamMap; + fn streams_mut(&mut self) -> &mut StreamMap; +} + /// Create a full H2 transport from an I/O handle. /// /// This is called as the final step of the client handshake future. @@ -91,8 +110,10 @@ pub fn from_server_handshaker(transport: Settings PingPong pong: None, } } +} +impl PingPong + where T: Sink, SinkError = ConnectionError>, +{ fn try_send_pong(&mut self) -> Poll<(), ConnectionError> { if let Some(pong) = self.pong.take() { if let AsyncSink::NotReady(pong) = self.inner.start_send(pong)? { @@ -77,8 +81,7 @@ impl Stream for PingPong } impl Sink for PingPong - where T: Stream, - T: Sink, SinkError = ConnectionError>, + where T: Sink, SinkError = ConnectionError>, { type SinkItem = Frame; type SinkError = ConnectionError; @@ -103,8 +106,7 @@ impl Sink for PingPong } impl ReadySink for PingPong - where T: Stream, - T: Sink, SinkError = ConnectionError>, + where T: Sink, SinkError = ConnectionError>, T: ReadySink, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { diff --git a/src/proto/state.rs b/src/proto/state.rs index e509733..28f3ff8 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -45,7 +45,7 @@ use proto::FlowController; /// R: RST_STREAM frame /// ``` #[derive(Debug, Copy, Clone)] -pub enum State { +pub enum StreamState { Idle, ReservedLocal, ReservedRemote, @@ -58,7 +58,7 @@ pub enum State { Closed, } -impl State { +impl StreamState { /// Updates the local flow controller so that the remote may send `incr` more bytes. /// /// Returns the amount of capacity created, accounting for window size changes. The @@ -66,7 +66,7 @@ impl State { /// /// If the remote is closed, None is returned. pub fn increment_send_window_size(&mut self, incr: u32) { - use self::State::*; + use self::StreamState::*; use self::PeerState::*; if incr == 0 { @@ -83,7 +83,7 @@ impl State { /// Consumes newly-advertised capacity to inform the local endpoint it may send more /// data. pub fn take_send_window_update(&mut self) -> Option { - use self::State::*; + use self::StreamState::*; use self::PeerState::*; match self { @@ -99,7 +99,7 @@ impl State { /// Returns the amount of capacity created, accounting for window size changes. The /// caller should send the the returned window size increment to the remote. pub fn increment_recv_window_size(&mut self, incr: u32) { - use self::State::*; + use self::StreamState::*; use self::PeerState::*; if incr == 0 { @@ -116,7 +116,7 @@ impl State { /// Consumes newly-advertised capacity to inform the local endpoint it may send more /// data. pub fn take_recv_window_update(&mut self) -> Option { - use self::State::*; + use self::StreamState::*; use self::PeerState::*; match self { @@ -143,7 +143,7 @@ impl State { /// > receives WINDOW_UPDATE frames that cause the flow-control window to become /// > positive. pub fn update_initial_recv_window_size(&mut self, old: u32, new: u32) { - use self::State::*; + use self::StreamState::*; use self::PeerState::*; match self { @@ -161,7 +161,7 @@ impl State { /// TODO Connection doesn't have an API for local updates yet. pub fn update_initial_send_window_size(&mut self, _old: u32, _new: u32) { - //use self::State::*; + //use self::StreamState::*; //use self::PeerState::*; unimplemented!() } @@ -175,7 +175,7 @@ impl State { initial_recv_window_size: u32) -> Result { - use self::State::*; + use self::StreamState::*; use self::PeerState::*; match *self { @@ -218,7 +218,7 @@ impl State { } pub fn recv_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> { - use self::State::*; + use self::StreamState::*; match *self { Open { local, mut remote } => { @@ -256,7 +256,7 @@ impl State { initial_window_size: u32) -> Result { - use self::State::*; + use self::StreamState::*; use self::PeerState::*; match *self { @@ -307,7 +307,7 @@ impl State { } pub fn send_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> { - use self::State::*; + use self::StreamState::*; match *self { Open { mut local, remote } => { @@ -337,9 +337,9 @@ impl State { } } -impl Default for State { - fn default() -> State { - State::Idle +impl Default for StreamState { + fn default() -> StreamState { + StreamState::Idle } } diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs new file mode 100644 index 0000000..b38a714 --- /dev/null +++ b/src/proto/stream_tracker.rs @@ -0,0 +1,67 @@ +use ConnectionError; +use frame::{self, Frame}; +use proto::{ReadySink, StreamMap, StreamTransporter, WindowSize}; + +use futures::*; + +#[derive(Debug)] +pub struct StreamTracker { + inner: T, +} + +impl StreamTracker + where T: Stream, + T: Sink, SinkError = ConnectionError> +{ + pub fn new(inner: T) -> StreamTracker { + StreamTracker { inner } + } +} + +impl StreamTransporter for StreamTracker { + fn streams(&self) -> &StreamMap { + unimplemented!() + } + + fn streams_mut(&mut self) -> &mut StreamMap { + unimplemented!() + } +} + +impl Stream for StreamTracker + where T: Stream, Error = ConnectionError>, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll, T::Error> { + self.inner.poll() + } +} + + +impl Sink for StreamTracker + where T: Sink, SinkError = ConnectionError>, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: T::SinkItem) -> StartSend { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + self.inner.poll_complete() + } +} + + +impl ReadySink for StreamTracker + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ReadySink, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } +} diff --git a/src/proto/window_update.rs b/src/proto/window_update.rs deleted file mode 100644 index 7ba4cfc..0000000 --- a/src/proto/window_update.rs +++ /dev/null @@ -1,36 +0,0 @@ -use StreamId; -use frame; - -#[derive(Debug)] -pub enum WindowUpdate { - Connection { increment: u32 }, - Stream { id: StreamId, increment: u32 }, -} - -impl WindowUpdate { - pub fn increment(&self) -> u32 { - match *self { - WindowUpdate::Connection { increment } | - WindowUpdate::Stream { increment, .. } => increment - } - } -} - -impl From for frame::WindowUpdate { - fn from(src: WindowUpdate) -> Self { - match src { - WindowUpdate::Connection { increment } => { - frame::WindowUpdate::new(StreamId::zero(), increment) - } - WindowUpdate::Stream { id, increment } => { - frame::WindowUpdate::new(id, increment) - } - } - } -} - -impl From for frame::Frame { - fn from(src: WindowUpdate) -> Self { - frame::Frame::WindowUpdate(src.into()) - } -}