From 9d7221e6cf98e16edce14f216efa5df720c6b554 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 2 Aug 2017 13:09:14 -0700 Subject: [PATCH] Misc streams cleanup --- src/proto/streams/flow_control.rs | 85 +++++++++++++++++ src/proto/streams/mod.rs | 5 +- src/proto/streams/recv.rs | 18 ++-- src/proto/streams/send.rs | 18 ++-- src/proto/streams/state.rs | 146 ++++++------------------------ src/proto/streams/store.rs | 15 ++- 6 files changed, 141 insertions(+), 146 deletions(-) create mode 100644 src/proto/streams/flow_control.rs diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs new file mode 100644 index 0000000..81b57f5 --- /dev/null +++ b/src/proto/streams/flow_control.rs @@ -0,0 +1,85 @@ +use ConnectionError; +use proto::*; + +#[derive(Copy, Clone, Debug)] +pub struct FlowControl { + /// Amount that may be claimed. + window_size: WindowSize, + + /// Amount to be removed by future increments. + underflow: WindowSize, + + /// The amount that has been incremented but not yet advertised (to the application or + /// the remote). + next_window_update: WindowSize, +} + +impl FlowControl { + pub fn new(window_size: WindowSize) -> FlowControl { + FlowControl { + window_size, + underflow: 0, + next_window_update: 0, + } + } + + /// Returns true iff `claim_window(sz)` would return succeed. + pub fn ensure_window(&mut self, sz: WindowSize, err: T) -> Result<(), ConnectionError> + where T: Into, + { + if sz <= self.window_size { + Ok(()) + } else { + Err(err.into()) + } + } + + /// Claims the provided amount from the window, if there is enough space. + /// + /// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than + /// have been previously claimed. + pub fn claim_window(&mut self, sz: WindowSize, err: T) + -> Result<(), ConnectionError> + where T: Into, + { + self.ensure_window(sz, err)?; + + self.window_size -= sz; + Ok(()) + } + + /// Increase the _unadvertised_ window capacity. + pub fn expand_window(&mut self, sz: WindowSize) { + if sz <= self.underflow { + self.underflow -= sz; + return; + } + + let added = sz - self.underflow; + self.next_window_update += added; + self.underflow = 0; + } + + /// Obtains the unadvertised window update. + /// + /// This does not apply the window update to `self`. + pub fn peek_window_update(&mut self) -> Option { + if self.next_window_update == 0 { + None + } else { + Some(self.next_window_update) + } + } + + /// Obtains and applies an unadvertised window update. + pub fn apply_window_update(&mut self) -> Option { + if self.next_window_update == 0 { + return None; + } + + let incr = self.next_window_update; + self.next_window_update = 0; + self.window_size += incr; + Some(incr) + } +} diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 02757a2..931adbc 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -1,10 +1,13 @@ +mod flow_control; mod recv; mod send; mod state; mod store; +use self::flow_control::FlowControl; use self::recv::Recv; use self::send::Send; +use self::state::State; use self::store::{Store, Entry}; use {frame, Peer, StreamId, ConnectionError}; @@ -12,7 +15,7 @@ use proto::*; use error::Reason::*; use error::User::*; -// TODO: All the VecDeques should become linked lists using the state::Stream +// TODO: All the VecDeques should become linked lists using the State // values. #[derive(Debug)] pub struct Streams

{ diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 115bffc..8dd6074 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,6 +1,6 @@ use {frame, Peer, ConnectionError}; use proto::*; -use super::{state, Config, Store}; +use super::*; use error::Reason::*; @@ -19,7 +19,7 @@ pub struct Recv

{ init_window_sz: WindowSize, /// Connection level flow control governing received data - flow_control: state::FlowControl, + flow_control: FlowControl, pending_window_updates: VecDeque, @@ -35,7 +35,7 @@ impl Recv

{ max_streams: config.max_remote_initiated, num_streams: 0, init_window_sz: config.init_remote_window_sz, - flow_control: state::FlowControl::new(config.init_remote_window_sz), + flow_control: FlowControl::new(config.init_remote_window_sz), pending_window_updates: VecDeque::new(), refused: None, _p: PhantomData, @@ -45,7 +45,7 @@ impl Recv

{ /// Update state reflecting a new, remotely opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self, id: StreamId) -> Result, ConnectionError> { + pub fn open(&mut self, id: StreamId) -> Result, ConnectionError> { assert!(self.refused.is_none()); try!(self.ensure_can_open(id)); @@ -60,17 +60,17 @@ impl Recv

{ // Increment the number of remote initiated streams self.num_streams += 1; - Ok(Some(state::Stream::default())) + Ok(Some(State::default())) } /// Transition the stream state based on receiving headers - pub fn recv_headers(&mut self, state: &mut state::Stream, eos: bool) + pub fn recv_headers(&mut self, state: &mut State, eos: bool) -> Result<(), ConnectionError> { state.recv_open(self.init_window_sz, eos) } - pub fn recv_eos(&mut self, state: &mut state::Stream) + pub fn recv_eos(&mut self, state: &mut State) -> Result<(), ConnectionError> { state.recv_close() @@ -78,7 +78,7 @@ impl Recv

{ pub fn recv_data(&mut self, frame: &frame::Data, - state: &mut state::Stream) + state: &mut State) -> Result<(), ConnectionError> { let sz = frame.payload().len(); @@ -168,7 +168,7 @@ impl Recv

{ pub fn expand_stream_window(&mut self, id: StreamId, sz: WindowSize, - state: &mut state::Stream) + state: &mut State) -> Result<(), ConnectionError> { // TODO: handle overflow diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index a0ca3a8..875fa06 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -1,6 +1,6 @@ use {frame, Peer, ConnectionError}; use proto::*; -use super::{state, Config, Store}; +use super::*; use error::User::*; @@ -21,7 +21,7 @@ pub struct Send

{ init_window_sz: WindowSize, /// Connection level flow control governing sent data - flow_control: state::FlowControl, + flow_control: FlowControl, /// Holds the list of streams on which local window updates may be sent. // XXX It would be cool if this didn't exist. @@ -41,7 +41,7 @@ impl Send

{ max_streams: config.max_local_initiated, num_streams: 0, init_window_sz: config.init_local_window_sz, - flow_control: state::FlowControl::new(config.init_local_window_sz), + flow_control: FlowControl::new(config.init_local_window_sz), pending_window_updates: VecDeque::new(), blocked: None, _p: PhantomData, @@ -51,7 +51,7 @@ impl Send

{ /// Update state reflecting a new, locally opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self, id: StreamId) -> Result { + pub fn open(&mut self, id: StreamId) -> Result { try!(self.ensure_can_open(id)); if let Some(max) = self.max_streams { @@ -63,16 +63,16 @@ impl Send

{ // Increment the number of locally initiated streams self.num_streams += 1; - Ok(state::Stream::default()) + Ok(State::default()) } - pub fn send_headers(&mut self, state: &mut state::Stream, eos: bool) + pub fn send_headers(&mut self, state: &mut State, eos: bool) -> Result<(), ConnectionError> { state.send_open(self.init_window_sz, eos) } - pub fn send_eos(&mut self, state: &mut state::Stream) + pub fn send_eos(&mut self, state: &mut State) -> Result<(), ConnectionError> { state.send_close() @@ -80,7 +80,7 @@ impl Send

{ pub fn send_data(&mut self, frame: &frame::Data, - state: &mut state::Stream) + state: &mut State) -> Result<(), ConnectionError> { let sz = frame.payload().remaining(); @@ -171,7 +171,7 @@ impl Send

{ pub fn recv_stream_window_update(&mut self, frame: frame::WindowUpdate, - state: &mut state::Stream) + state: &mut State) -> Result<(), ConnectionError> { if let Some(flow) = state.send_flow_control() { diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 9e4f7e0..369ac8c 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -3,6 +3,10 @@ use error::Reason; use error::Reason::*; use error::User::*; use proto::*; +use super::FlowControl; + +use self::Inner::*; +use self::Peer::*; /// Represents the state of an H2 stream /// @@ -44,8 +48,13 @@ use proto::*; /// ES: END_STREAM flag /// R: RST_STREAM frame /// ``` -#[derive(Debug, Copy, Clone)] -pub enum Stream { +#[derive(Debug, Clone)] +pub struct State { + inner: Inner, +} + +#[derive(Debug, Clone, Copy)] +enum Inner { Idle, // TODO: these states shouldn't count against concurrency limits: //ReservedLocal, @@ -61,34 +70,18 @@ pub enum Stream { } #[derive(Debug, Copy, Clone)] -pub enum Peer { +enum Peer { AwaitingHeaders, /// Contains a FlowControl representing the _receiver_ of this this data stream. Streaming(FlowControl), } -#[derive(Copy, Clone, Debug)] -pub struct FlowControl { - /// Amount that may be claimed. - window_size: WindowSize, - - /// Amount to be removed by future increments. - underflow: WindowSize, - - /// The amount that has been incremented but not yet advertised (to the application or - /// the remote). - next_window_update: WindowSize, -} - -impl Stream { +impl State { /// Opens the send-half of a stream if it is not already open. pub fn send_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> { - use self::Stream::*; - use self::Peer::*; - let local = Peer::streaming(sz); - *self = match *self { + self.inner = match self.inner { Idle => { if eos { HalfClosedLocal(AwaitingHeaders) @@ -128,12 +121,9 @@ impl Stream { /// Open the receive have of the stream, this action is taken when a HEADERS /// frame is received. pub fn recv_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> { - use self::Stream::*; - use self::Peer::*; - let remote = Peer::streaming(sz); - *self = match *self { + self.inner = match self.inner { Idle => { if eos { HalfClosedRemote(AwaitingHeaders) @@ -172,18 +162,16 @@ impl Stream { /// Indicates that the remote side will not send more data to the local. pub fn recv_close(&mut self) -> Result<(), ConnectionError> { - use self::Stream::*; - - match *self { + match self.inner { Open { local, .. } => { // The remote side will continue to receive data. trace!("recv_close: Open => HalfClosedRemote({:?})", local); - *self = HalfClosedRemote(local); + self.inner = HalfClosedRemote(local); Ok(()) } HalfClosedLocal(..) => { trace!("recv_close: HalfClosedLocal => Closed"); - *self = Closed(None); + self.inner = Closed(None); Ok(()) } _ => Err(ProtocolError.into()), @@ -192,18 +180,16 @@ impl Stream { /// Indicates that the local side will not send more data to the local. pub fn send_close(&mut self) -> Result<(), ConnectionError> { - use self::Stream::*; - - match *self { + match self.inner { Open { remote, .. } => { // The remote side will continue to receive data. trace!("send_close: Open => HalfClosedLocal({:?})", remote); - *self = HalfClosedLocal(remote); + self.inner = HalfClosedLocal(remote); Ok(()) } HalfClosedRemote(..) => { trace!("send_close: HalfClosedRemote => Closed"); - *self = Closed(None); + self.inner = Closed(None); Ok(()) } _ => Err(ProtocolError.into()), @@ -211,18 +197,14 @@ impl Stream { } pub fn is_closed(&self) -> bool { - use self::Stream::*; - - match *self { + match self.inner { Closed(_) => true, _ => false, } } pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { - use self::Stream::*; - - match *self { + match self.inner { Open { ref mut remote, .. } | HalfClosedLocal(ref mut remote) => remote.flow_control(), _ => None, @@ -230,9 +212,7 @@ impl Stream { } pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> { - use self::Stream::*; - - match *self { + match self.inner { Open { ref mut local, .. } | HalfClosedRemote(ref mut local) => local.flow_control(), _ => None, @@ -240,9 +220,9 @@ impl Stream { } } -impl Default for Stream { - fn default() -> Stream { - Stream::Idle +impl Default for State { + fn default() -> State { + State { inner: Inner::Idle } } } @@ -258,81 +238,9 @@ impl Peer { } fn flow_control(&mut self) -> Option<&mut FlowControl> { - use self::Peer::*; - match *self { Streaming(ref mut flow) => Some(flow), _ => None, } } } - -impl FlowControl { - pub fn new(window_size: WindowSize) -> FlowControl { - FlowControl { - window_size, - underflow: 0, - next_window_update: 0, - } - } - - /// Returns true iff `claim_window(sz)` would return succeed. - pub fn ensure_window(&mut self, sz: WindowSize, err: T) -> Result<(), ConnectionError> - where T: Into, - { - if sz <= self.window_size { - Ok(()) - } else { - Err(err.into()) - } - } - - /// Claims the provided amount from the window, if there is enough space. - /// - /// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than - /// have been previously claimed. - pub fn claim_window(&mut self, sz: WindowSize, err: T) - -> Result<(), ConnectionError> - where T: Into, - { - self.ensure_window(sz, err)?; - - self.window_size -= sz; - Ok(()) - } - - /// Increase the _unadvertised_ window capacity. - pub fn expand_window(&mut self, sz: WindowSize) { - if sz <= self.underflow { - self.underflow -= sz; - return; - } - - let added = sz - self.underflow; - self.next_window_update += added; - self.underflow = 0; - } - - /// Obtains the unadvertised window update. - /// - /// This does not apply the window update to `self`. - pub fn peek_window_update(&mut self) -> Option { - if self.next_window_update == 0 { - None - } else { - Some(self.next_window_update) - } - } - - /// Obtains and applies an unadvertised window update. - pub fn apply_window_update(&mut self) -> Option { - if self.next_window_update == 0 { - return None; - } - - let incr = self.next_window_update; - self.next_window_update = 0; - self.window_size += incr; - Some(incr) - } -} diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index cb164e0..3962529 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -1,14 +1,13 @@ extern crate slab; -use proto::*; -use super::state; +use super::*; use std::collections::{HashMap, hash_map}; /// Storage for streams #[derive(Debug)] pub struct Store { - slab: slab::Slab, + slab: slab::Slab, ids: HashMap, } @@ -19,12 +18,12 @@ pub enum Entry<'a> { pub struct OccupiedEntry<'a> { ids: hash_map::OccupiedEntry<'a, StreamId, usize>, - slab: &'a mut slab::Slab, + slab: &'a mut slab::Slab, } pub struct VacantEntry<'a> { ids: hash_map::VacantEntry<'a, StreamId, usize>, - slab: &'a mut slab::Slab, + slab: &'a mut slab::Slab, } impl Store { @@ -35,7 +34,7 @@ impl Store { } } - pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut state::Stream> { + pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut State> { if let Some(handle) = self.ids.get(id) { Some(&mut self.slab[*handle]) } else { @@ -64,13 +63,13 @@ impl Store { } impl<'a> OccupiedEntry<'a> { - pub fn into_mut(self) -> &'a mut state::Stream { + pub fn into_mut(self) -> &'a mut State { &mut self.slab[*self.ids.get()] } } impl<'a> VacantEntry<'a> { - pub fn insert(self, value: state::Stream) -> &'a mut state::Stream { + pub fn insert(self, value: State) -> &'a mut State { // Insert the value in the slab let handle = self.slab.insert(value);