diff --git a/src/proto/connection.rs b/src/proto/connection.rs index cef47af..763eb62 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,7 +1,7 @@ use {ConnectionError, Frame, FrameSize}; use client::Client; -use frame::{self, StreamId}; -use proto::{self, Peer, ReadySink, FlowTransporter, WindowSize}; +use frame::{self, SettingSet, StreamId}; +use proto::{self, ControlSettings, Peer, ReadySink, ControlFlow, WindowSize}; use server::Server; use tokio_io::{AsyncRead, AsyncWrite}; @@ -32,21 +32,33 @@ pub fn new(transport: proto::Transport) } } -impl Connection - where T: FlowTransporter, + +impl ControlSettings for Connection + where T: ControlSettings, B: IntoBuf, { - /// Polls for the amount of additional data that may be sent to a remote. - /// - /// Connection and stream updates are distinct. - pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { + fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.update_local_settings(local) + } + + fn local_settings(&self) -> &SettingSet { + self.inner.local_settings() + } + + fn remote_settings(&self) -> &SettingSet { + self.inner.remote_settings() + } +} + +impl ControlFlow for Connection + where T: ControlFlow, + B: IntoBuf, +{ + fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { self.inner.poll_remote_window_update(id) } - /// Increases the amount of data that the remote endpoint may send. - /// - /// Connection and stream updates are distinct. - pub fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { self.inner.grow_local_window(id, incr) } } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 5e785d6..2a0f53f 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -13,10 +13,10 @@ pub struct FlowControl { /// Tracks the connection-level flow control window for receiving data from the /// remote. - local_flow_controller: FlowController, + local_flow_controller: FlowControlState, /// Tracks the onnection-level flow control window for receiving data from the remote. - remote_flow_controller: FlowController, + remote_flow_controller: FlowControlState, /// Holds the list of streams on which local window updates may be sent. // XXX It would be cool if this didn't exist. @@ -34,7 +34,7 @@ pub struct FlowControl { impl FlowControl where T: Stream, T: Sink, SinkError = ConnectionError>, - T: StreamTransporter + T: ControlStreams { pub fn new(initial_local_window_size: u32, initial_remote_window_size: u32, @@ -45,8 +45,8 @@ impl FlowControl inner, initial_local_window_size, initial_remote_window_size, - local_flow_controller: FlowController::new(initial_local_window_size), - remote_flow_controller: FlowController::new(initial_remote_window_size), + local_flow_controller: FlowControlState::new(initial_local_window_size), + remote_flow_controller: FlowControlState::new(initial_remote_window_size), blocked_remote_window_update: None, sending_local_window_update: None, pending_local_window_updates: VecDeque::new(), @@ -54,7 +54,7 @@ impl FlowControl } } -impl FlowControl { +impl FlowControl { fn claim_local_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { let res = if id.is_zero() { self.local_flow_controller.claim_window(len) @@ -106,7 +106,7 @@ impl FlowControl { } } -impl FlowTransporter for FlowControl { +impl ControlFlow for FlowControl { fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { if id.is_zero() { if let Some(sz) = self.remote_flow_controller.take_window_update() { @@ -139,7 +139,7 @@ impl FlowTransporter for FlowControl { } } -impl StreamTransporter for FlowControl { +impl ControlStreams for FlowControl { #[inline] fn streams(&self) -> &StreamMap { self.inner.streams() @@ -153,7 +153,7 @@ impl StreamTransporter for FlowControl { impl FlowControl where T: Sink, SinkError = ConnectionError>, - T: StreamTransporter, + T: ControlStreams, { /// Returns ready when there are no pending window updates to send. fn poll_send_local_window_updates(&mut self) -> Poll<(), ConnectionError> { @@ -199,7 +199,7 @@ impl FlowControl /// > positive. impl ApplySettings for FlowControl where T: ApplySettings, - T: StreamTransporter + T: ControlStreams { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.apply_local_settings(set)?; @@ -248,7 +248,7 @@ impl ApplySettings for FlowControl impl Stream for FlowControl where T: Stream, - T: StreamTransporter, + T: ControlStreams, { type Item = T::Item; type Error = T::Error; @@ -278,7 +278,7 @@ impl Stream for FlowControl impl Sink for FlowControl where T: Sink, SinkError = ConnectionError>, T: ReadySink, - T: StreamTransporter, + T: ControlStreams, { type SinkItem = T::SinkItem; type SinkError = T::SinkError; @@ -318,7 +318,7 @@ impl ReadySink for FlowControl where T: Stream, T: Sink, SinkError = ConnectionError>, T: ReadySink, - T: StreamTransporter, + T: ControlStreams, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { try_ready!(self.inner.poll_ready()); diff --git a/src/proto/flow_controller.rs b/src/proto/flow_control_state.rs similarity index 89% rename from src/proto/flow_controller.rs rename to src/proto/flow_control_state.rs index 7c47458..0d8c8a2 100644 --- a/src/proto/flow_controller.rs +++ b/src/proto/flow_control_state.rs @@ -6,7 +6,7 @@ pub struct WindowUnderflow; pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; #[derive(Copy, Clone, Debug)] -pub struct FlowController { +pub struct FlowControlState { /// Amount that may be claimed. window_size: WindowSize, /// Amount to be removed by future increments. @@ -16,15 +16,15 @@ pub struct FlowController { next_window_update: WindowSize, } -impl Default for FlowController { +impl Default for FlowControlState { fn default() -> Self { Self::new(DEFAULT_INITIAL_WINDOW_SIZE) } } -impl FlowController { - pub fn new(window_size: WindowSize) -> FlowController { - FlowController { +impl FlowControlState { + pub fn new(window_size: WindowSize) -> FlowControlState { + FlowControlState { window_size, underflow: 0, next_window_update: 0, @@ -78,6 +78,6 @@ impl FlowController { #[test] fn test() { - let mut fc = FlowController::new(65_535); + let mut fc = FlowControlState::new(65_535); } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 5a1fe79..8785901 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,4 +1,5 @@ use {frame, ConnectionError, Peer, StreamId}; +use frame::SettingSet; use bytes::{Buf, IntoBuf}; use fnv::FnvHasher; use futures::*; @@ -9,7 +10,7 @@ use tokio_io::codec::length_delimited; mod connection; mod flow_control; -mod flow_controller; +mod flow_control_state; mod framed_read; mod framed_write; mod ping_pong; @@ -20,7 +21,7 @@ mod stream_tracker; pub use self::connection::Connection; pub use self::flow_control::FlowControl; -pub use self::flow_controller::{FlowController, WindowUnderflow}; +pub use self::flow_control_state::{FlowControlState, WindowUnderflow}; pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; pub use self::ping_pong::PingPong; @@ -97,19 +98,36 @@ impl StreamMap { } } -/// Allows settings to be applied from the top of the stack to the lower levels.d +/// Allows settings updates to be pushed "down" the transport (i.e. below Settings). pub trait ApplySettings { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; } -pub trait StreamTransporter { +/// Exposes settings to "upper" layers of the transport (i.e. above Settings). +pub trait ControlSettings { + fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>; + fn local_settings(&self) -> &SettingSet; + fn remote_settings(&self) -> &SettingSet; +} + +/// Exposes stream states to "upper" layers of the transport (i.e. above StreamTracker). +pub trait ControlStreams { fn streams(&self)-> &StreamMap; fn streams_mut(&mut self) -> &mut StreamMap; } -pub trait FlowTransporter { +/// Exposes flow control states to "upper" layers of the transport (i.e. above +/// FlowControl). +pub trait ControlFlow { + /// Asks the flow controller for unreported send capacity on a stream. + /// + /// Errors if the given stream is not active. fn poll_remote_window_update(&mut self, id: StreamId) -> Poll; + + /// Attempts to increase the receive capacity of a stream. + /// + /// Errors if the given stream is not active. fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; } @@ -128,10 +146,10 @@ pub fn from_io(io: T, settings: frame::SettingSet) // weird, but oh well... // // We first create a Settings directly around a framed writer - let settings = Settings::new( + let transport = Settings::new( framed_write, settings); - from_server_handshaker(settings) + from_server_handshaker(transport) } /// Create a transport prepared to handle the server handshake. @@ -145,7 +163,6 @@ pub fn server_handshaker(io: T, settings: frame::SettingSet) B: Buf, { let framed_write = FramedWrite::new(io); - Settings::new(framed_write, settings) } diff --git a/src/proto/settings.rs b/src/proto/settings.rs index fd7efb8..dfb37b9 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,8 +1,7 @@ use {StreamId, ConnectionError}; -use frame::{self, Frame}; -use proto::{ApplySettings, ReadySink, StreamMap, StreamTransporter, FlowTransporter, WindowSize}; +use frame::{self, Frame, SettingSet}; +use proto::*; -use futures::*; use tokio_io::AsyncRead; use bytes::BufMut; @@ -16,10 +15,10 @@ pub struct Settings { inner: T, // Our settings - local: frame::SettingSet, + local: SettingSet, // Peer settings - remote: frame::SettingSet, + remote: SettingSet, // Number of acks remaining to send to the peer remaining_acks: usize, @@ -34,22 +33,22 @@ pub struct Settings { impl Settings where T: Sink, SinkError = ConnectionError>, { - pub fn new(inner: T, local: frame::SettingSet) -> Settings { + pub fn new(inner: T, local: SettingSet) -> Settings { Settings { inner: inner, local: local, - remote: frame::SettingSet::default(), + remote: SettingSet::default(), remaining_acks: 0, is_local_dirty: true, received_remote: false, } } - pub fn local_settings(&self) -> &frame::SettingSet { + pub fn local_settings(&self) -> &SettingSet { &self.local } - pub fn remote_settings(&self) -> &frame::SettingSet { + pub fn remote_settings(&self) -> &SettingSet { &self.local } @@ -96,7 +95,7 @@ impl Settings } } -impl StreamTransporter for Settings { +impl ControlStreams for Settings { fn streams(&self) -> &StreamMap { self.inner.streams() } @@ -106,7 +105,7 @@ impl StreamTransporter for Settings { } } -impl FlowTransporter for Settings { +impl ControlFlow for Settings { fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { self.inner.poll_remote_window_update(id) } @@ -116,6 +115,22 @@ impl FlowTransporter for Settings { } } +impl ControlSettings for Settings{ + fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { + self.local = local; + self.is_local_dirty = true; + Ok(()) + } + + fn local_settings(&self) -> &SettingSet { + &self.local + } + + fn remote_settings(&self) -> &SettingSet { + &self.remote + } +} + impl Stream for Settings where T: Stream, T: Sink, SinkError = ConnectionError>, diff --git a/src/proto/state.rs b/src/proto/state.rs index 6ee1e19..472cca6 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -2,7 +2,7 @@ use Peer; use error::ConnectionError; use error::Reason::*; use error::User::*; -use proto::{FlowController, WindowSize, WindowUnderflow}; +use proto::{FlowControlState, WindowSize, WindowUnderflow}; /// Represents the state of an H2 stream /// @@ -78,7 +78,7 @@ impl StreamState { if eos { *self = HalfClosedRemote(local); } else { - *self = Open { local, remote: Data(FlowController::new(initial_recv_window_size)) }; + *self = Open { local, remote: Data(FlowControlState::new(initial_recv_window_size)) }; } Ok(true) } @@ -98,7 +98,7 @@ impl StreamState { if eos { *self = Closed; } else { - *self = HalfClosedLocal(Data(FlowController::new(initial_recv_window_size))); + *self = HalfClosedLocal(Data(FlowControlState::new(initial_recv_window_size))); }; Ok(false) } @@ -155,7 +155,7 @@ impl StreamState { HalfClosedLocal(Headers) } else { Open { - local: Data(FlowController::new(initial_window_size)), + local: Data(FlowControlState::new(initial_window_size)), remote: Headers, } }; @@ -169,7 +169,7 @@ impl StreamState { *self = if eos { HalfClosedLocal(remote) } else { - let local = Data(FlowController::new(initial_window_size)); + let local = Data(FlowControlState::new(initial_window_size)); Open { local, remote } }; @@ -182,7 +182,7 @@ impl StreamState { *self = if eos { Closed } else { - HalfClosedRemote(Data(FlowController::new(initial_window_size))) + HalfClosedRemote(Data(FlowControlState::new(initial_window_size))) }; Ok(false) @@ -357,8 +357,8 @@ impl Default for StreamState { #[derive(Debug, Copy, Clone)] pub enum PeerState { Headers, - /// Contains a FlowController representing the _receiver_ of this this data stream. - Data(FlowController), + /// Contains a FlowControlState representing the _receiver_ of this this data stream. + Data(FlowControlState), } impl PeerState { diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index e36c8ff..5566a37 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -41,7 +41,7 @@ impl StreamTracker } } -impl StreamTransporter for StreamTracker { +impl ControlStreams for StreamTracker { #[inline] fn streams(&self) -> &StreamMap { &self.streams