diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 32b4b73..eb1b7f4 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,10 +1,8 @@ use {frame, ConnectionError, Peer, StreamId}; use frame::SettingSet; + use bytes::{Buf, IntoBuf}; -use fnv::FnvHasher; use futures::*; -use ordermap::{Entry, OrderMap}; -use std::hash::BuildHasherDefault; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; @@ -28,7 +26,8 @@ pub use self::ping_pong::PingPong; pub use self::ready::ReadySink; pub use self::settings::Settings; pub use self::stream_tracker::StreamTracker; -use self::state::StreamState; + +use self::state::{StreamMap, StreamState}; /// Represents the internals of an HTTP/2 connection. /// @@ -95,60 +94,6 @@ type Framer = pub type WindowSize = u32; -#[derive(Debug, Default)] -pub struct StreamMap { - inner: OrderMap> -} - -impl StreamMap { - fn get_mut(&mut self, id: &StreamId) -> Option<&mut StreamState> { - self.inner.get_mut(id) - } - - fn entry(&mut self, id: StreamId) -> Entry> { - self.inner.entry(id) - } - - fn shrink_all_local_windows(&mut self, decr: u32) { - for (_, mut s) in &mut self.inner { - if let Some(fc) = s.local_flow_controller() { - fc.shrink_window(decr); - } - } - } - - fn expand_all_local_windows(&mut self, incr: u32) { - for (_, mut s) in &mut self.inner { - if let Some(fc) = s.local_flow_controller() { - fc.expand_window(incr); - } - } - } - - fn shrink_all_remote_windows(&mut self, decr: u32) { - for (_, mut s) in &mut self.inner { - if let Some(fc) = s.remote_flow_controller() { - fc.shrink_window(decr); - } - } - } - - fn expand_all_remote_windows(&mut self, incr: u32) { - for (_, mut s) in &mut self.inner { - if let Some(fc) = s.remote_flow_controller() { - fc.expand_window(incr); - } - } - } -} - -/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to -/// FramedWrite). -pub trait ApplySettings { - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; -} - /// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and /// above---Connection). pub trait ControlSettings { @@ -157,18 +102,11 @@ pub trait ControlSettings { fn remote_settings(&self) -> &SettingSet; } -/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up -/// to Connection). -pub trait ControlStreams { - fn streams(&self)-> &StreamMap; - fn streams_mut(&mut self) -> &mut StreamMap; -} - -pub type PingPayload = [u8; 8]; - -pub trait ControlPing { - fn start_ping(&mut self, body: PingPayload) -> StartSend; - fn pop_pong(&mut self) -> Option; +/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to +/// FramedWrite). +pub trait ApplySettings { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; } /// Exposes flow control states to "upper" layers of the transport (i.e. above @@ -185,6 +123,20 @@ pub trait ControlFlow { fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; } +/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up +/// to Connection). +pub trait ControlStreams { + fn streams(&self)-> &StreamMap; + fn streams_mut(&mut self) -> &mut StreamMap; +} + +pub type PingPayload = [u8; 8]; + +pub trait ControlPing { + fn start_ping(&mut self, body: PingPayload) -> StartSend; + fn pop_pong(&mut self) -> Option; +} + /// Create a full H2 transport from an I/O handle. /// /// This is called as the final step of the client handshake future. diff --git a/src/proto/state.rs b/src/proto/state.rs index 7d75f0f..ffcd7ef 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -1,9 +1,13 @@ -use Peer; +use {Peer, StreamId}; use error::ConnectionError; use error::Reason::*; use error::User::*; use proto::{FlowControlState, WindowSize}; +use fnv::FnvHasher; +use ordermap::{Entry, OrderMap}; +use std::hash::BuildHasherDefault; + /// Represents the state of an H2 stream /// /// ```not_rust @@ -277,3 +281,50 @@ impl PeerState { } } } + +#[derive(Debug, Default)] +pub struct StreamMap { + inner: OrderMap> +} + +impl StreamMap { + pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut StreamState> { + self.inner.get_mut(id) + } + + pub fn entry(&mut self, id: StreamId) -> Entry> { + self.inner.entry(id) + } + + pub fn shrink_all_local_windows(&mut self, decr: u32) { + for (_, mut s) in &mut self.inner { + if let Some(fc) = s.local_flow_controller() { + fc.shrink_window(decr); + } + } + } + + pub fn expand_all_local_windows(&mut self, incr: u32) { + for (_, mut s) in &mut self.inner { + if let Some(fc) = s.local_flow_controller() { + fc.expand_window(incr); + } + } + } + + pub fn shrink_all_remote_windows(&mut self, decr: u32) { + for (_, mut s) in &mut self.inner { + if let Some(fc) = s.remote_flow_controller() { + fc.shrink_window(decr); + } + } + } + + pub fn expand_all_remote_windows(&mut self, incr: u32) { + for (_, mut s) in &mut self.inner { + if let Some(fc) = s.remote_flow_controller() { + fc.expand_window(incr); + } + } + } +} diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index c7842e3..20cd3d8 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -134,13 +134,15 @@ impl Stream for StreamTracker Some(Data(v)) => { match self.streams.get_mut(&v.stream_id()) { - None => return Err(Reason::ProtocolError.into()), - Some(state) => state.recv_data(v.is_end_stream())?, + None => Err(Reason::ProtocolError.into()), + Some(state) => { + state.recv_data(v.is_end_stream())?; + Ok(Async::Ready(Some(Data(v)))) + } } - Ok(Async::Ready(Some(Data(v)))) } - f => Ok(Async::Ready(f)) + f => Ok(Async::Ready(f)), } } }