diff --git a/src/proto/control_flow.rs b/src/proto/control_flow.rs index 4ae27cb..83711b9 100644 --- a/src/proto/control_flow.rs +++ b/src/proto/control_flow.rs @@ -3,10 +3,12 @@ use proto::*; /// Exposes flow control states to "upper" layers of the transport (i.e. above /// FlowControl). -pub trait ControlFlow { +pub trait ControlFlowSend { /// Polls for the next window update from the remote. fn poll_window_update(&mut self) -> Poll; +} +pub trait ControlFlowRecv { /// Increases the local receive capacity of a stream. /// /// This may cause a window update to be sent to the remote. @@ -15,16 +17,29 @@ pub trait ControlFlow { fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; } -macro_rules! proxy_control_flow { +macro_rules! proxy_control_flow_send { ($outer:ident) => ( - impl ControlFlow for $outer { + impl ControlFlowSend for $outer { fn poll_window_update(&mut self) -> Poll { self.inner.poll_window_update() } + } + ) +} +macro_rules! proxy_control_flow_recv { + ($outer:ident) => ( + impl ControlFlowRecv for $outer { fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { self.inner.expand_window(id, incr) } } ) } + +macro_rules! proxy_control_flow { + ($outer:ident) => ( + proxy_control_flow_recv!($outer); + proxy_control_flow_send!($outer); + ) +} diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs deleted file mode 100644 index 3e5c75a..0000000 --- a/src/proto/flow_control.rs +++ /dev/null @@ -1,335 +0,0 @@ -use {error, ConnectionError, FrameSize}; -use frame::{self, Frame}; -use proto::*; - -use std::collections::VecDeque; - -#[derive(Debug)] -pub struct FlowControl { - inner: T, - - local_initial: WindowSize, - remote_initial: WindowSize, - - /// Tracks the connection-level flow control window for receiving data from the - /// remote. - local_connection: FlowControlState, - - /// Tracks the onnection-level flow control window for receiving data from the remote. - remote_connection: FlowControlState, - - /// Holds the list of streams on which local window updates may be sent. - // XXX It would be cool if this didn't exist. - local_pending_streams: VecDeque, - - /// If a window update can't be sent immediately, it may need to be saved to be sent - /// later. - local_sending: Option, - - /// Holds the list of streams on which local window updates may be sent. - // XXX It would be cool if this didn't exist. - remote_pending_streams: VecDeque, - - /// When `poll_window_update` is not ready, then the calling task is saved to - /// be notified later. Access to poll_window_update must not be shared across tasks, - /// as we only track a single task (and *not* i.e. a task per stream id). - remote_blocked: Option, -} - -impl FlowControl - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ControlStreams -{ - pub fn new(local_initial: WindowSize, - remote_initial: WindowSize, - inner: T) - -> FlowControl - { - FlowControl { - inner, - - local_initial, - local_connection: FlowControlState::with_initial_size(local_initial), - local_sending: None, - local_pending_streams: VecDeque::new(), - - remote_initial, - remote_connection: FlowControlState::with_initial_size(remote_initial), - remote_blocked: None, - remote_pending_streams: VecDeque::new(), - } - } -} - -// Flow control utitlities. -impl FlowControl { - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - if id.is_zero() { - Some(&mut self.local_connection) - } else { - self.inner.recv_flow_controller(id) - } - } - - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - if id.is_zero() { - Some(&mut self.remote_connection) - } else { - self.inner.send_flow_controller(id) - } - } -} -/// Exposes a public upward API for flow control. -impl ControlFlow for FlowControl { - fn poll_window_update(&mut self) -> Poll { - // This biases connection window updates, which probably makese sense. - if let Some(incr) = self.remote_connection.apply_window_update() { - return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr))); - } - - // TODO this should probably account for stream priority? - while let Some(id) = self.remote_pending_streams.pop_front() { - if let Some(mut flow) = self.send_flow_controller(id) { - if let Some(incr) = flow.apply_window_update() { - return Ok(Async::Ready(WindowUpdate::new(id, incr))); - } - } - } - - self.remote_blocked = Some(task::current()); - return Ok(Async::NotReady); - } - - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - let added = match self.recv_flow_controller(id) { - None => false, - Some(mut fc) => { - fc.expand_window(incr); - true - } - }; - - if added { - if !id.is_zero() { - self.local_pending_streams.push_back(id); - } - Ok(()) - } else if let Some(rst) = self.inner.get_reset(id) { - Err(error::User::StreamReset(rst).into()) - } else { - Err(error::User::InvalidStreamId.into()) - } - } -} - -impl FlowControl - where T: Sink, SinkError = ConnectionError>, - T: ControlStreams, -{ - /// Returns ready when there are no pending window updates to send. - fn poll_send_local(&mut self) -> Poll<(), ConnectionError> { - if let Some(f) = self.local_sending.take() { - try_ready!(self.try_send(f)); - } - - if let Some(incr) = self.local_connection.apply_window_update() { - try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr))); - } - - while let Some(id) = self.local_pending_streams.pop_front() { - if self.inner.get_reset(id).is_none() { - let update = self.recv_flow_controller(id).and_then(|s| s.apply_window_update()); - if let Some(incr) = update { - try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); - } - } - } - - Ok(Async::Ready(())) - } - - fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> { - if self.inner.start_send(f.into())?.is_not_ready() { - self.local_sending = Some(f); - Ok(Async::NotReady) - } else { - Ok(Async::Ready(())) - } - } -} - -/// Tracks window updates received from the remote and ensures that the remote does not -/// violate the local peer's flow controller. -/// -/// TODO send flow control reset when the peer violates the flow control window. -impl Stream for FlowControl - where T: Stream, - T: ControlStreams, - { - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll, T::Error> { - use frame::Frame::*; - trace!("poll"); - - loop { - match try_ready!(self.inner.poll()) { - Some(WindowUpdate(v)) => { - if let Some(fc) = self.send_flow_controller(v.stream_id()) { - fc.expand_window(v.size_increment()); - } - } - - Some(Data(v)) => { - let sz = v.payload().len() as FrameSize; - if self.local_connection.claim_window(sz).is_err() { - return Err(error::Reason::FlowControlError.into()) - } - // If this frame ends the stream, there may no longer be a flow - // controller. That's fine. - if let Some(fc) = self.recv_flow_controller(v.stream_id()) { - if fc.claim_window(sz).is_err() { - // TODO send flow control reset. - return Err(error::Reason::FlowControlError.into()) - } - } - return Ok(Async::Ready(Some(Data(v)))); - } - - v => return Ok(Async::Ready(v)), - } - } - } -} - -/// Tracks the send flow control windows for sent frames. -/// -/// If sending a frame would violate the remote's window, start_send fails with -/// `FlowControlViolation`. -/// -/// Sends pending window updates before operating on the underlying transport. -impl Sink for FlowControl - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, - T: ControlStreams, - U: Buf, - { - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send(&mut self, frame: Frame) -> StartSend { - use frame::Frame::*; - - debug_assert!(self.inner.get_reset(frame.stream_id()).is_none()); - - // Ensures that: - // 1. all pending local window updates have been sent to the remote. - // 2. the underlying transport is will accept the frame. It's important that this - // be checked before claiming capacity from the flow controllers. - if self.poll_ready()?.is_not_ready() { - return Ok(AsyncSink::NotReady(frame)); - } - - // Ensure that an outbound data frame does not violate the remote's flow control - // window. - if let &Data(ref v) = &frame { - let sz = v.payload().remaining() as FrameSize; - - // Ensure there's enough capacity on the connection before acting on the - // stream. - if !self.remote_connection.check_window(sz) { - return Err(error::User::FlowControlViolation.into()); - } - - // Ensure there's enough capacity on stream. - { - let mut fc = self.inner.send_flow_controller(v.stream_id()) - .expect("no remote stream for data frame"); - if fc.claim_window(sz).is_err() { - return Err(error::User::FlowControlViolation.into()) - } - } - - self.remote_connection.claim_window(sz) - .expect("remote connection flow control error"); - } - - let res = self.inner.start_send(frame)?; - assert!(res.is_ready()); - Ok(res) - } - - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - try_ready!(self.poll_send_local()); - self.inner.poll_complete() - } -} - -/// Sends pending window updates before checking the underyling transport's readiness. -impl ReadySink for FlowControl - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, - T: ControlStreams, - U: Buf, -{ - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.poll_send_local()); - self.inner.poll_ready() - } -} - -/// Applies an update to an endpoint's initial window size. -/// -/// Per RFC 7540 §6.9.2: -/// -/// > In addition to changing the flow-control window for streams that are not yet -/// > active, a SETTINGS frame can alter the initial flow-control window size for -/// > streams with active flow-control windows (that is, streams in the "open" or -/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE -/// > changes, a receiver MUST adjust the size of all stream flow-control windows that -/// > it maintains by the difference between the new value and the old value. -/// > -/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a -/// > flow-control window to become negative. A sender MUST track the negative -/// > flow-control window and MUST NOT send new flow-controlled frames until it -/// > receives WINDOW_UPDATE frames that cause the flow-control window to become -/// > positive. -impl ApplySettings for FlowControl - where T: ApplySettings, - T: ControlStreams -{ - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set)?; - - if let Some(new_window_size) = set.initial_window_size() { - let old_window_size = self.local_initial; - if new_window_size == old_window_size { - return Ok(()); - } - - self.inner.update_inital_recv_window_size(old_window_size, new_window_size); - self.local_initial = new_window_size; - } - Ok(()) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set)?; - - if let Some(new_window_size) = set.initial_window_size() { - let old_window_size = self.remote_initial; - if new_window_size == old_window_size { - return Ok(()); - } - - self.inner.update_inital_send_window_size(old_window_size, new_window_size); - self.remote_initial = new_window_size; - } - Ok(()) - } -} - -proxy_control_streams!(FlowControl); -proxy_control_ping!(FlowControl); diff --git a/src/proto/flow_control_recv.rs b/src/proto/flow_control_recv.rs new file mode 100644 index 0000000..221534c --- /dev/null +++ b/src/proto/flow_control_recv.rs @@ -0,0 +1,222 @@ +use {error, ConnectionError, FrameSize}; +use frame::{self, Frame}; +use proto::*; + +use std::collections::VecDeque; + +/// Tracks local flow control windows. +#[derive(Debug)] +pub struct FlowControlRecv { + inner: T, + + + initial_window_size: WindowSize, + + /// Tracks the connection-level flow control window for receiving data from the + /// remote. + connection: FlowControlState, + + /// Holds the list of streams on which local window updates may be sent. + // XXX It would be cool if this didn't exist. + pending_streams: VecDeque, + + /// If a window update can't be sent immediately, it may need to be saved to be sent + /// later. + sending: Option, +} + +impl FlowControlRecv + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ControlStreams +{ + pub fn new(initial_window_size: WindowSize, inner: T) -> FlowControlRecv { + FlowControlRecv { + inner, + initial_window_size, + connection: FlowControlState::with_initial_size(initial_window_size), + pending_streams: VecDeque::new(), + sending: None, + } + } +} + +/// Exposes a public upward API for flow control. +impl ControlFlowRecv for FlowControlRecv { + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + let added = match self.recv_flow_controller(id) { + None => false, + Some(mut fc) => { + fc.expand_window(incr); + true + } + }; + + if added { + if !id.is_zero() { + self.pending_streams.push_back(id); + } + Ok(()) + } else if let Some(rst) = self.inner.get_reset(id) { + Err(error::User::StreamReset(rst).into()) + } else { + Err(error::User::InvalidStreamId.into()) + } + } +} + +impl FlowControlRecv + where T: Sink, SinkError = ConnectionError>, + T: ControlStreams, +{ + /// Returns ready when there are no pending window updates to send. + fn poll_send_local(&mut self) -> Poll<(), ConnectionError> { + if let Some(f) = self.sending.take() { + try_ready!(self.try_send(f)); + } + + if let Some(incr) = self.connection.apply_window_update() { + try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr))); + } + + while let Some(id) = self.pending_streams.pop_front() { + if self.inner.get_reset(id).is_none() { + let update = self.recv_flow_controller(id).and_then(|s| s.apply_window_update()); + if let Some(incr) = update { + try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); + } + } + } + + Ok(Async::Ready(())) + } + + fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> { + if self.inner.start_send(f.into())?.is_not_ready() { + self.sending = Some(f); + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + } +} + +/// Ensures that the remote does not violate the local peer's flow controller. +impl Stream for FlowControlRecv + where T: Stream, + T: ControlStreams, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll, T::Error> { + trace!("poll"); + loop { + match try_ready!(self.inner.poll()) { + Some(Frame::Data(v)) => { + let id = v.stream_id(); + let sz = v.payload().len() as FrameSize; + + // Ensure there's enough capacity on the connection before acting on + // the stream. + if !self.connection.check_window(sz) { + // TODO this should cause a GO_AWAY + return Err(error::Reason::FlowControlError.into()); + } + + let fc = self.inner.recv_flow_controller(id) + .expect("receiving data with no flow controller"); + if fc.claim_window(sz).is_err() { + // TODO this should cause a GO_AWAY + return Err(error::Reason::FlowControlError.into()); + } + + self.connection.claim_window(sz) + .expect("local connection flow control error"); + + return Ok(Async::Ready(Some(Frame::Data(v)))); + } + + v => return Ok(Async::Ready(v)), + } + } + } +} + +/// Sends pending window updates before operating on the underlying transport. +impl Sink for FlowControlRecv + where T: Sink, SinkError = ConnectionError>, + T: ReadySink, + T: ControlStreams, + { + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, frame: Frame) -> StartSend { + if self.poll_send_local()?.is_not_ready() { + return Ok(AsyncSink::NotReady(frame)); + } + self.inner.start_send(frame) + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + try_ready!(self.poll_send_local()); + self.inner.poll_complete() + } +} + +/// Sends pending window updates before checking the underyling transport's readiness. +impl ReadySink for FlowControlRecv + where T: Sink, SinkError = ConnectionError>, + T: ReadySink, + T: ControlStreams, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + try_ready!(self.poll_send_local()); + self.inner.poll_ready() + } +} + +/// Applies an update to an endpoint's initial window size. +/// +/// Per RFC 7540 §6.9.2: +/// +/// > In addition to changing the flow-control window for streams that are not yet +/// > active, a SETTINGS frame can alter the initial flow-control window size for +/// > streams with active flow-control windows (that is, streams in the "open" or +/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE +/// > changes, a receiver MUST adjust the size of all stream flow-control windows that +/// > it maintains by the difference between the new value and the old value. +/// > +/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a +/// > flow-control window to become negative. A sender MUST track the negative +/// > flow-control window and MUST NOT send new flow-controlled frames until it +/// > receives WINDOW_UPDATE frames that cause the flow-control window to become +/// > positive. +impl ApplySettings for FlowControlRecv + where T: ApplySettings, + T: ControlStreams +{ + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set)?; + + if let Some(new_window_size) = set.initial_window_size() { + let old_window_size = self.initial_window_size; + if new_window_size == old_window_size { + return Ok(()); + } + + self.inner.update_inital_recv_window_size(old_window_size, new_window_size); + self.initial_window_size = new_window_size; + } + Ok(()) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set) + } +} + +proxy_control_flow_send!(FlowControlRecv); +proxy_control_ping!(FlowControlRecv); +proxy_control_streams!(FlowControlRecv); diff --git a/src/proto/flow_control_send.rs b/src/proto/flow_control_send.rs new file mode 100644 index 0000000..a3135c1 --- /dev/null +++ b/src/proto/flow_control_send.rs @@ -0,0 +1,208 @@ +use {error, ConnectionError, FrameSize}; +use frame::{self, Frame}; +use proto::*; + +use std::collections::VecDeque; + +/// Tracks remote flow control windows. +#[derive(Debug)] +pub struct FlowControlSend { + inner: T, + + initial_window_size: WindowSize, + + /// Tracks the onnection-level flow control window for receiving data from the remote. + connection: FlowControlState, + + /// Holds the list of streams on which local window updates may be sent. + // XXX It would be cool if this didn't exist. + pending_streams: VecDeque, + + /// When `poll_window_update` is not ready, then the calling task is saved to + /// be notified later. Access to poll_window_update must not be shared across tasks, + /// as we only track a single task (and *not* i.e. a task per stream id). + blocked: Option, +} + +impl FlowControlSend + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ControlStreams +{ + pub fn new(initial_window_size: WindowSize, inner: T) -> FlowControlSend { + FlowControlSend { + inner, + initial_window_size, + connection: FlowControlState::with_initial_size(initial_window_size), + pending_streams: VecDeque::new(), + blocked: None, + } + } +} + +/// Exposes a public upward API for flow control. +impl ControlFlowSend for FlowControlSend { + fn poll_window_update(&mut self) -> Poll { + // This biases connection window updates, which probably makes sense. + if let Some(incr) = self.connection.apply_window_update() { + return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr))); + } + + // TODO this should probably account for stream priority? + while let Some(id) = self.pending_streams.pop_front() { + if let Some(mut flow) = self.send_flow_controller(id) { + if let Some(incr) = flow.apply_window_update() { + return Ok(Async::Ready(WindowUpdate::new(id, incr))); + } + } + } + + self.blocked = Some(task::current()); + return Ok(Async::NotReady); + } +} + +/// Applies remote window updates as they are received. +impl Stream for FlowControlSend + where T: Stream, + T: ControlStreams, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll, T::Error> { + trace!("poll"); + + loop { + match try_ready!(self.inner.poll()) { + Some(Frame::WindowUpdate(v)) => { + let id = v.stream_id(); + let sz = v.size_increment(); + + if id.is_zero() { + self.connection.expand_window(sz); + } else { + // The remote may send window updates for streams that the local + // now considers closed. It's okay. + if let Some(fc) = self.inner.send_flow_controller(id) { + fc.expand_window(sz); + } + } + } + + f => return Ok(Async::Ready(f)), + } + } + } +} + +/// Tracks the flow control windows for sent davta frames. +/// +/// If sending a frame would violate the remote's window, start_send fails with +/// `FlowControlViolation`. +impl Sink for FlowControlSend + where T: Sink, SinkError = ConnectionError>, + T: ReadySink, + T: ControlStreams, + U: Buf, + { + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, frame: Frame) -> StartSend { + debug_assert!(self.inner.get_reset(frame.stream_id()).is_none()); + + // Ensures that the underlying transport is will accept the frame. It's important + // that this be checked before claiming capacity from the flow controllers. + if self.poll_ready()?.is_not_ready() { + return Ok(AsyncSink::NotReady(frame)); + } + + // Ensure that an outbound data frame does not violate the remote's flow control + // window. + if let &Frame::Data(ref v) = &frame { + let sz = v.payload().remaining() as FrameSize; + + // Ensure there's enough capacity on the connection before acting on the + // stream. + if !self.connection.check_window(sz) { + return Err(error::User::FlowControlViolation.into()); + } + + // Ensure there's enough capacity on stream. + let mut fc = self.inner.send_flow_controller(v.stream_id()) + .expect("no remote stream for data frame"); + if fc.claim_window(sz).is_err() { + return Err(error::User::FlowControlViolation.into()) + } + + self.connection.claim_window(sz) + .expect("remote connection flow control error"); + } + + let res = self.inner.start_send(frame)?; + assert!(res.is_ready()); + Ok(res) + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + self.inner.poll_complete() + } +} + +/// Proxy. +impl ReadySink for FlowControlSend + where T: Sink, SinkError = ConnectionError>, + T: ReadySink, + T: ControlStreams, + U: Buf, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } +} + +/// Applies an update to the remote endpoint's initial window size. +/// +/// Per RFC 7540 §6.9.2: +/// +/// > In addition to changing the flow-control window for streams that are not yet +/// > active, a SETTINGS frame can alter the initial flow-control window size for +/// > streams with active flow-control windows (that is, streams in the "open" or +/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE +/// > changes, a receiver MUST adjust the size of all stream flow-control windows that +/// > it maintains by the difference between the new value and the old value. +/// > +/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a +/// > flow-control window to become negative. A sender MUST track the negative +/// > flow-control window and MUST NOT send new flow-controlled frames until it +/// > receives WINDOW_UPDATE frames that cause the flow-control window to become +/// > positive. +impl ApplySettings for FlowControlSend + where T: ApplySettings, + T: ControlStreams +{ + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set)?; + + if let Some(new_window_size) = set.initial_window_size() { + let old_window_size = self.initial_window_size; + if new_window_size == old_window_size { + return Ok(()); + } + + self.inner.update_inital_send_window_size(old_window_size, new_window_size); + self.initial_window_size = new_window_size; + } + + Ok(()) + } +} + +proxy_control_flow_recv!(FlowControlSend); +proxy_control_ping!(FlowControlSend); +proxy_control_streams!(FlowControlSend); diff --git a/src/proto/mod.rs b/src/proto/mod.rs index e3ed3e9..e505cb6 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -62,13 +62,14 @@ mod control_settings; mod control_streams; use self::apply_settings::ApplySettings; -use self::control_flow::ControlFlow; +use self::control_flow::{ControlFlowRecv, ControlFlowSend}; use self::control_ping::ControlPing; use self::control_settings::ControlSettings; use self::control_streams::ControlStreams; mod connection; -mod flow_control; +mod flow_control_recv; +mod flow_control_send; mod flow_control_state; mod framed_read; mod framed_write; @@ -84,7 +85,8 @@ mod stream_states; pub use self::connection::Connection; -use self::flow_control::FlowControl; +use self::flow_control_recv::FlowControlRecv; +use self::flow_control_send::FlowControlSend; use self::flow_control_state::FlowControlState; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; @@ -191,10 +193,11 @@ type Transport= type Streams = StreamSendOpen< StreamRecvClose< - FlowControl< - StreamSendClose< - StreamRecvOpen< - StreamStates>>>>>; + FlowControlSend< + FlowControlRecv< + StreamSendClose< + StreamRecvOpen< + StreamStates>>>>>>; type Codec = FramedRead< @@ -287,16 +290,17 @@ pub fn from_server_handshaker(settings: Settings initial_send_window_size, remote_max_concurrency, StreamRecvClose::new( - FlowControl::new( - initial_recv_window_size, + FlowControlSend::new( initial_send_window_size, - StreamSendClose::new( - StreamRecvOpen::new( - initial_recv_window_size, - local_max_concurrency, - StreamStates::new( - PingPong::new( - FramedRead::new(framed)))))))) + FlowControlRecv::new( + initial_recv_window_size, + StreamSendClose::new( + StreamRecvOpen::new( + initial_recv_window_size, + local_max_concurrency, + StreamStates::new( + PingPong::new( + FramedRead::new(framed))))))))) }); connection::new(transport) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index f9e8d07..1caa138 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -214,14 +214,5 @@ impl AsyncRead for Settings { } } -impl ControlFlow for Settings { - fn poll_window_update(&mut self) -> Poll { - self.inner.poll_window_update() - } - - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.expand_window(id, incr) - } -} - +proxy_control_flow!(Settings); proxy_control_ping!(Settings);