diff --git a/src/proto/apply_settings.rs b/src/proto/apply_settings.rs new file mode 100644 index 0000000..990443f --- /dev/null +++ b/src/proto/apply_settings.rs @@ -0,0 +1,23 @@ +use ConnectionError; +use frame::SettingSet; + +/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to +/// FramedWrite). +pub trait ApplySettings { + fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>; + fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>; +} + +macro_rules! proxy_apply_settings { + ($outer:ident) => ( + impl ApplySettings for $outer { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set) + } + } + ) +} diff --git a/src/proto/control_flow.rs b/src/proto/control_flow.rs new file mode 100644 index 0000000..4ae27cb --- /dev/null +++ b/src/proto/control_flow.rs @@ -0,0 +1,30 @@ +use ConnectionError; +use proto::*; + +/// Exposes flow control states to "upper" layers of the transport (i.e. above +/// FlowControl). +pub trait ControlFlow { + /// Polls for the next window update from the remote. + fn poll_window_update(&mut self) -> Poll; + + /// Increases the local receive capacity of a stream. + /// + /// This may cause a window update to be sent to the remote. + /// + /// Fails if the given stream is not active. + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; +} + +macro_rules! proxy_control_flow { + ($outer:ident) => ( + impl ControlFlow for $outer { + fn poll_window_update(&mut self) -> Poll { + self.inner.poll_window_update() + } + + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.expand_window(id, incr) + } + } + ) +} diff --git a/src/proto/control_ping.rs b/src/proto/control_ping.rs new file mode 100644 index 0000000..e605d27 --- /dev/null +++ b/src/proto/control_ping.rs @@ -0,0 +1,21 @@ +use ConnectionError; +use proto::*; + +pub trait ControlPing { + fn start_ping(&mut self, body: PingPayload) -> StartSend; + fn take_pong(&mut self) -> Option; +} + +macro_rules! proxy_control_ping { + ($outer:ident) => ( + impl ControlPing for $outer { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn take_pong(&mut self) -> Option { + self.inner.take_pong() + } + } + ) +} diff --git a/src/proto/control_settings.rs b/src/proto/control_settings.rs new file mode 100644 index 0000000..e5e399e --- /dev/null +++ b/src/proto/control_settings.rs @@ -0,0 +1,13 @@ +use ConnectionError; +use frame::SettingSet; +use proto::*; + +/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and +/// above---Connection). +pub trait ControlSettings { + fn update_local_settings(&mut self, set: SettingSet) -> Result<(), ConnectionError>; + + fn remote_push_enabled(&self) -> Option; + fn remote_max_concurrent_streams(&self) -> Option; + fn remote_initial_window_size(&self) -> WindowSize; +} diff --git a/src/proto/ifaces.rs b/src/proto/control_streams.rs similarity index 68% rename from src/proto/ifaces.rs rename to src/proto/control_streams.rs index 3189322..9f1ecee 100644 --- a/src/proto/ifaces.rs +++ b/src/proto/control_streams.rs @@ -1,88 +1,6 @@ use ConnectionError; -use frame::SettingSet; use proto::*; -/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and -/// above---Connection). -pub trait ControlSettings { - fn update_local_settings(&mut self, set: SettingSet) -> Result<(), ConnectionError>; - - fn remote_push_enabled(&self) -> Option; - fn remote_max_concurrent_streams(&self) -> Option; - fn remote_initial_window_size(&self) -> WindowSize; -} - -// macro_rules! proxy_control_settings { -// ($outer:ident) => ( -// impl ControlSettings for $outer { -// fn update_local_settings(&mut self, set: SettingSet) -> Result<(), ConnectionError> { -// self.inner.update_local_settings(set) -// } -// -// fn remote_push_enabled(&self) -> Option { -// self.inner.remote_push_enabled(set) -// } -// -// fn remote_max_concurrent_streams(&self) -> Option { -// self.inner.remote_max_concurrent_streams(set) -// } -// -// fn remote_initial_window_size(&self) -> WindowSize { -// self.inner.remote_initial_window_size(set) -// } -// } -// ) -// } - -/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to -/// FramedWrite). -pub trait ApplySettings { - fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>; - fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>; -} - -macro_rules! proxy_apply_settings { - ($outer:ident) => ( - impl ApplySettings for $outer { - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set) - } - } - ) -} - -/// Exposes flow control states to "upper" layers of the transport (i.e. above -/// FlowControl). -pub trait ControlFlow { - /// Polls for the next window update from the remote. - fn poll_window_update(&mut self) -> Poll; - - /// Increases the local receive capacity of a stream. - /// - /// This may cause a window update to be sent to the remote. - /// - /// Fails if the given stream is not active. - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; -} - -macro_rules! proxy_control_flow { - ($outer:ident) => ( - impl ControlFlow for $outer { - fn poll_window_update(&mut self) -> Poll { - self.inner.poll_window_update() - } - - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.expand_window(id, incr) - } - } - ) -} - /// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up /// to Connection). pub trait ControlStreams { @@ -281,22 +199,3 @@ macro_rules! proxy_control_streams { } ) } - -pub trait ControlPing { - fn start_ping(&mut self, body: PingPayload) -> StartSend; - fn take_pong(&mut self) -> Option; -} - -macro_rules! proxy_control_ping { - ($outer:ident) => ( - impl ControlPing for $outer { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } - } - ) -} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 1fe7cf3..95eb73b 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -7,9 +7,23 @@ use futures::*; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; +// First, pull in the internal interfaces that support macros used throughout this module. + #[macro_use] -mod ifaces; -use self::ifaces::*; +mod apply_settings; +#[macro_use] +mod control_flow; +#[macro_use] +mod control_ping; +mod control_settings; +#[macro_use] +mod control_streams; + +use self::apply_settings::ApplySettings; +use self::control_flow::ControlFlow; +use self::control_ping::ControlPing; +use self::control_settings::ControlSettings; +use self::control_streams::ControlStreams; mod connection; mod flow_control; @@ -39,7 +53,7 @@ use self::stream_recv_close::StreamRecvClose; use self::stream_recv_open::StreamRecvOpen; use self::stream_send_close::StreamSendClose; use self::stream_send_open::StreamSendOpen; -use self::stream_store::StreamStore; +use self::stream_store::StreamStates; /// Represents the internals of an HTTP/2 connection. /// @@ -59,7 +73,7 @@ use self::stream_store::StreamStore; /// /// ### The stream transport /// -/// The states of all HTTP/2 connections are stored centrally in the `StreamStore` at the +/// The states of all HTTP/2 connections are stored centrally in the `StreamStates` at the /// bottom of the stream transport. Several modules above this access this state via the /// `ControlStreams` API to drive changes to the stream state. In each direction (send /// from local to remote, and recv from remote to local), there is an Stream\*Open module @@ -103,7 +117,7 @@ use self::stream_store::StreamStore; /// - Ensures that the local peer's max stream concurrency is not violated. /// - Emits StreamRefused resets to the remote. /// -/// #### `StreamStore` +/// #### `StreamStates` /// /// - Holds the state of all local & remote active streams. /// - Holds the cause of all reset/closed streams. @@ -138,7 +152,7 @@ type Streams = FlowControl< StreamSendClose< StreamRecvOpen< - StreamStore>>>>>; + StreamStates>>>>>; type Codec = FramedRead< @@ -238,7 +252,7 @@ pub fn from_server_handshaker(settings: Settings StreamRecvOpen::new( initial_recv_window_size, local_max_concurrency, - StreamStore::new( + StreamStates::new( PingPong::new( FramedRead::new(framed)))))))) }); diff --git a/src/proto/stream_store.rs b/src/proto/stream_store.rs new file mode 100644 index 0000000..fa2aac5 --- /dev/null +++ b/src/proto/stream_store.rs @@ -0,0 +1,340 @@ +use {ConnectionError, Peer, StreamId}; +use error::Reason::{NoError, ProtocolError}; +use proto::*; +use proto::state::StreamState; + +use fnv::FnvHasher; +use ordermap::OrderMap; +use std::hash::BuildHasherDefault; +use std::marker::PhantomData; + +/// Holds the underlying stream state to be accessed by upper layers. +// TODO track reserved streams +// TODO constrain the size of `reset` +#[derive(Debug, Default)] +pub struct StreamStates { + inner: T, + + /// Holds active streams initiated by the local endpoint. + local_active: OrderMap>, + + /// Holds active streams initiated by the remote endpoint. + remote_active: OrderMap>, + + /// Holds active streams initiated by the remote. + reset: OrderMap>, + + _phantom: PhantomData

, +} + +impl StreamStates + where T: Stream, + T: Sink, SinkError = ConnectionError>, + P: Peer, +{ + pub fn new(inner: T) -> StreamStates { + StreamStates { + inner, + local_active: OrderMap::default(), + remote_active: OrderMap::default(), + reset: OrderMap::default(), + _phantom: PhantomData, + } + } +} + +impl StreamStates { + pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> { + assert!(!id.is_zero()); + if P::is_valid_local_stream_id(id) { + self.local_active.get(&id) + } else { + self.remote_active.get(&id) + } + } + + pub fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { + assert!(!id.is_zero()); + if P::is_valid_local_stream_id(id) { + self.local_active.get_mut(&id) + } else { + self.remote_active.get_mut(&id) + } + } + + pub fn remove_active(&mut self, id: StreamId) { + assert!(!id.is_zero()); + if P::is_valid_local_stream_id(id) { + self.local_active.remove(&id); + } else { + self.remote_active.remove(&id); + } + } +} + +impl ControlStreams for StreamStates { + fn local_valid_id(id: StreamId) -> bool { + P::is_valid_local_stream_id(id) + } + + fn remote_valid_id(id: StreamId) -> bool { + P::is_valid_remote_stream_id(id) + } + + fn local_can_open() -> bool { + P::local_can_open() + } + + fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + if !Self::local_valid_id(id) || !Self::local_can_open() { + return Err(ProtocolError.into()); + } + if self.local_active.contains_key(&id) { + return Err(ProtocolError.into()); + } + + self.local_active.insert(id, StreamState::new_open_sending(sz)); + Ok(()) + } + + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + if !Self::remote_valid_id(id) || !Self::remote_can_open() { + return Err(ProtocolError.into()); + } + if self.remote_active.contains_key(&id) { + return Err(ProtocolError.into()); + } + + self.remote_active.insert(id, StreamState::new_open_recving(sz)); + Ok(()) + } + + fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + if !Self::local_valid_id(id) { + return Err(ProtocolError.into()); + } + + match self.local_active.get_mut(&id) { + Some(s) => s.open_recv_half(sz).map(|_| {}), + None => Err(ProtocolError.into()), + } + } + + fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + if !Self::remote_valid_id(id) { + return Err(ProtocolError.into()); + } + + match self.remote_active.get_mut(&id) { + Some(s) => s.open_send_half(sz).map(|_| {}), + None => Err(ProtocolError.into()), + } + } + + fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + let fully_closed = self.get_active_mut(id) + .map(|s| s.close_send_half()) + .unwrap_or_else(|| Err(ProtocolError.into()))?; + + if fully_closed { + self.remove_active(id); + self.reset.insert(id, NoError); + } + Ok(()) + } + + fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + let fully_closed = self.get_active_mut(id) + .map(|s| s.close_recv_half()) + .unwrap_or_else(|| Err(ProtocolError.into()))?; + + if fully_closed { + self.remove_active(id); + self.reset.insert(id, NoError); + } + Ok(()) + } + + fn reset_stream(&mut self, id: StreamId, cause: Reason) { + self.remove_active(id); + self.reset.insert(id, cause); + } + + fn get_reset(&self, id: StreamId) -> Option { + self.reset.get(&id).map(|r| *r) + } + + fn is_local_active(&self, id: StreamId) -> bool { + self.local_active.contains_key(&id) + } + + fn is_remote_active(&self, id: StreamId) -> bool { + self.remote_active.contains_key(&id) + } + + fn is_send_open(&mut self, id: StreamId) -> bool { + match self.get_active(id) { + Some(s) => s.is_send_open(), + None => false, + } + } + + fn is_recv_open(&mut self, id: StreamId) -> bool { + match self.get_active(id) { + Some(s) => s.is_recv_open(), + None => false, + } + } + + fn local_active_len(&self) -> usize { + self.local_active.len() + } + + fn remote_active_len(&self) -> usize { + self.remote_active.len() + } + + fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { + if new_sz < old_sz { + let decr = old_sz - new_sz; + + for s in self.local_active.values_mut() { + if let Some(fc) = s.recv_flow_controller() { + fc.shrink_window(decr); + } + } + + for s in self.remote_active.values_mut() { + if let Some(fc) = s.recv_flow_controller() { + fc.shrink_window(decr); + } + } + } else { + let incr = new_sz - old_sz; + + for s in self.local_active.values_mut() { + if let Some(fc) = s.recv_flow_controller() { + fc.expand_window(incr); + } + } + + for s in self.remote_active.values_mut() { + if let Some(fc) = s.recv_flow_controller() { + fc.expand_window(incr); + } + } + } + } + + fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { + if new_sz < old_sz { + let decr = old_sz - new_sz; + + for s in self.local_active.values_mut() { + if let Some(fc) = s.send_flow_controller() { + fc.shrink_window(decr); + } + } + + for s in self.remote_active.values_mut() { + if let Some(fc) = s.send_flow_controller() { + fc.shrink_window(decr); + } + } + } else { + let incr = new_sz - old_sz; + + for s in self.local_active.values_mut() { + if let Some(fc) = s.send_flow_controller() { + fc.expand_window(incr); + } + } + + for s in self.remote_active.values_mut() { + if let Some(fc) = s.send_flow_controller() { + fc.expand_window(incr); + } + } + } + } + + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + if id.is_zero() { + None + } else if P::is_valid_local_stream_id(id) { + self.local_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) + } else { + self.remote_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) + } + } + + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + if id.is_zero() { + None + } else if P::is_valid_local_stream_id(id) { + self.local_active.get_mut(&id).and_then(|s| s.send_flow_controller()) + } else { + self.remote_active.get_mut(&id).and_then(|s| s.send_flow_controller()) + } + } +} + +/// Proxy. +impl Stream for StreamStates + where T: Stream, +{ + type Item = Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + self.inner.poll() + } +} + +/// Proxy. +impl Sink for StreamStates + where T: Sink, SinkError = ConnectionError>, +{ + type SinkItem = Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend, ConnectionError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_complete() + } +} + +/// Proxy. +impl ReadySink for StreamStates + where T: Sink, SinkError = ConnectionError>, + T: ReadySink, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } +} + +/// Proxy. +impl ApplySettings for StreamStates { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set) + } +} + +/// Proxy. +impl ControlPing for StreamStates { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn take_pong(&mut self) -> Option { + self.inner.take_pong() + } +}