From 275b83502305bfa868965326a3d99113c1ca822b Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Mon, 24 Jul 2017 15:42:16 +0000 Subject: [PATCH] unify file/type naming --- src/proto/control_streams.rs | 4 + src/proto/mod.rs | 4 +- src/proto/{state.rs => stream_state.rs} | 0 src/proto/stream_states.rs | 22 +- src/proto/stream_store.rs | 340 ------------------------ 5 files changed, 17 insertions(+), 353 deletions(-) rename src/proto/{state.rs => stream_state.rs} (100%) delete mode 100644 src/proto/stream_store.rs diff --git a/src/proto/control_streams.rs b/src/proto/control_streams.rs index 9f1ecee..a1686b8 100644 --- a/src/proto/control_streams.rs +++ b/src/proto/control_streams.rs @@ -24,6 +24,10 @@ pub trait ControlStreams { !Self::local_can_open() } + // TODO push promise + // fn local_can_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; + // fn remote_can_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; + /// Creates a new stream in the OPEN state from the local side (i.e. as a Client). /// /// Must only be called when local_can_open returns true. diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 95eb73b..57b8387 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -38,7 +38,7 @@ mod stream_recv_close; mod stream_recv_open; mod stream_send_close; mod stream_send_open; -mod stream_store; +mod stream_states; pub use self::connection::Connection; @@ -53,7 +53,7 @@ use self::stream_recv_close::StreamRecvClose; use self::stream_recv_open::StreamRecvOpen; use self::stream_send_close::StreamSendClose; use self::stream_send_open::StreamSendOpen; -use self::stream_store::StreamStates; +use self::stream_states::StreamStates; /// Represents the internals of an HTTP/2 connection. /// diff --git a/src/proto/state.rs b/src/proto/stream_state.rs similarity index 100% rename from src/proto/state.rs rename to src/proto/stream_state.rs diff --git a/src/proto/stream_states.rs b/src/proto/stream_states.rs index 7e8a691..fa2aac5 100644 --- a/src/proto/stream_states.rs +++ b/src/proto/stream_states.rs @@ -12,7 +12,7 @@ use std::marker::PhantomData; // TODO track reserved streams // TODO constrain the size of `reset` #[derive(Debug, Default)] -pub struct StreamStore { +pub struct StreamStates { inner: T, /// Holds active streams initiated by the local endpoint. @@ -27,13 +27,13 @@ pub struct StreamStore { _phantom: PhantomData

, } -impl StreamStore +impl StreamStates where T: Stream, T: Sink, SinkError = ConnectionError>, P: Peer, { - pub fn new(inner: T) -> StreamStore { - StreamStore { + pub fn new(inner: T) -> StreamStates { + StreamStates { inner, local_active: OrderMap::default(), remote_active: OrderMap::default(), @@ -43,7 +43,7 @@ impl StreamStore } } -impl StreamStore { +impl StreamStates { pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> { assert!(!id.is_zero()); if P::is_valid_local_stream_id(id) { @@ -72,7 +72,7 @@ impl StreamStore { } } -impl ControlStreams for StreamStore { +impl ControlStreams for StreamStates { fn local_valid_id(id: StreamId) -> bool { P::is_valid_local_stream_id(id) } @@ -280,7 +280,7 @@ impl ControlStreams for StreamStore { } /// Proxy. -impl Stream for StreamStore +impl Stream for StreamStates where T: Stream, { type Item = Frame; @@ -292,7 +292,7 @@ impl Stream for StreamStore } /// Proxy. -impl Sink for StreamStore +impl Sink for StreamStates where T: Sink, SinkError = ConnectionError>, { type SinkItem = Frame; @@ -308,7 +308,7 @@ impl Sink for StreamStore } /// Proxy. -impl ReadySink for StreamStore +impl ReadySink for StreamStates where T: Sink, SinkError = ConnectionError>, T: ReadySink, { @@ -318,7 +318,7 @@ impl ReadySink for StreamStore } /// Proxy. -impl ApplySettings for StreamStore { +impl ApplySettings for StreamStates { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.apply_local_settings(set) } @@ -329,7 +329,7 @@ impl ApplySettings for StreamStore { } /// Proxy. -impl ControlPing for StreamStore { +impl ControlPing for StreamStates { fn start_ping(&mut self, body: PingPayload) -> StartSend { self.inner.start_ping(body) } diff --git a/src/proto/stream_store.rs b/src/proto/stream_store.rs deleted file mode 100644 index fa2aac5..0000000 --- a/src/proto/stream_store.rs +++ /dev/null @@ -1,340 +0,0 @@ -use {ConnectionError, Peer, StreamId}; -use error::Reason::{NoError, ProtocolError}; -use proto::*; -use proto::state::StreamState; - -use fnv::FnvHasher; -use ordermap::OrderMap; -use std::hash::BuildHasherDefault; -use std::marker::PhantomData; - -/// Holds the underlying stream state to be accessed by upper layers. -// TODO track reserved streams -// TODO constrain the size of `reset` -#[derive(Debug, Default)] -pub struct StreamStates { - inner: T, - - /// Holds active streams initiated by the local endpoint. - local_active: OrderMap>, - - /// Holds active streams initiated by the remote endpoint. - remote_active: OrderMap>, - - /// Holds active streams initiated by the remote. - reset: OrderMap>, - - _phantom: PhantomData

, -} - -impl StreamStates - where T: Stream, - T: Sink, SinkError = ConnectionError>, - P: Peer, -{ - pub fn new(inner: T) -> StreamStates { - StreamStates { - inner, - local_active: OrderMap::default(), - remote_active: OrderMap::default(), - reset: OrderMap::default(), - _phantom: PhantomData, - } - } -} - -impl StreamStates { - pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> { - assert!(!id.is_zero()); - if P::is_valid_local_stream_id(id) { - self.local_active.get(&id) - } else { - self.remote_active.get(&id) - } - } - - pub fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { - assert!(!id.is_zero()); - if P::is_valid_local_stream_id(id) { - self.local_active.get_mut(&id) - } else { - self.remote_active.get_mut(&id) - } - } - - pub fn remove_active(&mut self, id: StreamId) { - assert!(!id.is_zero()); - if P::is_valid_local_stream_id(id) { - self.local_active.remove(&id); - } else { - self.remote_active.remove(&id); - } - } -} - -impl ControlStreams for StreamStates { - fn local_valid_id(id: StreamId) -> bool { - P::is_valid_local_stream_id(id) - } - - fn remote_valid_id(id: StreamId) -> bool { - P::is_valid_remote_stream_id(id) - } - - fn local_can_open() -> bool { - P::local_can_open() - } - - fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !Self::local_valid_id(id) || !Self::local_can_open() { - return Err(ProtocolError.into()); - } - if self.local_active.contains_key(&id) { - return Err(ProtocolError.into()); - } - - self.local_active.insert(id, StreamState::new_open_sending(sz)); - Ok(()) - } - - fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !Self::remote_valid_id(id) || !Self::remote_can_open() { - return Err(ProtocolError.into()); - } - if self.remote_active.contains_key(&id) { - return Err(ProtocolError.into()); - } - - self.remote_active.insert(id, StreamState::new_open_recving(sz)); - Ok(()) - } - - fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !Self::local_valid_id(id) { - return Err(ProtocolError.into()); - } - - match self.local_active.get_mut(&id) { - Some(s) => s.open_recv_half(sz).map(|_| {}), - None => Err(ProtocolError.into()), - } - } - - fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !Self::remote_valid_id(id) { - return Err(ProtocolError.into()); - } - - match self.remote_active.get_mut(&id) { - Some(s) => s.open_send_half(sz).map(|_| {}), - None => Err(ProtocolError.into()), - } - } - - fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - let fully_closed = self.get_active_mut(id) - .map(|s| s.close_send_half()) - .unwrap_or_else(|| Err(ProtocolError.into()))?; - - if fully_closed { - self.remove_active(id); - self.reset.insert(id, NoError); - } - Ok(()) - } - - fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - let fully_closed = self.get_active_mut(id) - .map(|s| s.close_recv_half()) - .unwrap_or_else(|| Err(ProtocolError.into()))?; - - if fully_closed { - self.remove_active(id); - self.reset.insert(id, NoError); - } - Ok(()) - } - - fn reset_stream(&mut self, id: StreamId, cause: Reason) { - self.remove_active(id); - self.reset.insert(id, cause); - } - - fn get_reset(&self, id: StreamId) -> Option { - self.reset.get(&id).map(|r| *r) - } - - fn is_local_active(&self, id: StreamId) -> bool { - self.local_active.contains_key(&id) - } - - fn is_remote_active(&self, id: StreamId) -> bool { - self.remote_active.contains_key(&id) - } - - fn is_send_open(&mut self, id: StreamId) -> bool { - match self.get_active(id) { - Some(s) => s.is_send_open(), - None => false, - } - } - - fn is_recv_open(&mut self, id: StreamId) -> bool { - match self.get_active(id) { - Some(s) => s.is_recv_open(), - None => false, - } - } - - fn local_active_len(&self) -> usize { - self.local_active.len() - } - - fn remote_active_len(&self) -> usize { - self.remote_active.len() - } - - fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - if new_sz < old_sz { - let decr = old_sz - new_sz; - - for s in self.local_active.values_mut() { - if let Some(fc) = s.recv_flow_controller() { - fc.shrink_window(decr); - } - } - - for s in self.remote_active.values_mut() { - if let Some(fc) = s.recv_flow_controller() { - fc.shrink_window(decr); - } - } - } else { - let incr = new_sz - old_sz; - - for s in self.local_active.values_mut() { - if let Some(fc) = s.recv_flow_controller() { - fc.expand_window(incr); - } - } - - for s in self.remote_active.values_mut() { - if let Some(fc) = s.recv_flow_controller() { - fc.expand_window(incr); - } - } - } - } - - fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - if new_sz < old_sz { - let decr = old_sz - new_sz; - - for s in self.local_active.values_mut() { - if let Some(fc) = s.send_flow_controller() { - fc.shrink_window(decr); - } - } - - for s in self.remote_active.values_mut() { - if let Some(fc) = s.send_flow_controller() { - fc.shrink_window(decr); - } - } - } else { - let incr = new_sz - old_sz; - - for s in self.local_active.values_mut() { - if let Some(fc) = s.send_flow_controller() { - fc.expand_window(incr); - } - } - - for s in self.remote_active.values_mut() { - if let Some(fc) = s.send_flow_controller() { - fc.expand_window(incr); - } - } - } - } - - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - if id.is_zero() { - None - } else if P::is_valid_local_stream_id(id) { - self.local_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) - } else { - self.remote_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) - } - } - - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - if id.is_zero() { - None - } else if P::is_valid_local_stream_id(id) { - self.local_active.get_mut(&id).and_then(|s| s.send_flow_controller()) - } else { - self.remote_active.get_mut(&id).and_then(|s| s.send_flow_controller()) - } - } -} - -/// Proxy. -impl Stream for StreamStates - where T: Stream, -{ - type Item = Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - self.inner.poll() - } -} - -/// Proxy. -impl Sink for StreamStates - where T: Sink, SinkError = ConnectionError>, -{ - type SinkItem = Frame; - type SinkError = ConnectionError; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend, ConnectionError> { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_complete() - } -} - -/// Proxy. -impl ReadySink for StreamStates - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, -{ - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_ready() - } -} - -/// Proxy. -impl ApplySettings for StreamStates { - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set) - } -} - -/// Proxy. -impl ControlPing for StreamStates { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -}