diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 5fc72fd..62f3a70 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -50,19 +50,6 @@ impl ControlSettings for Connection } } -impl ControlFlow for Connection - where T: AsyncRead + AsyncWrite, - B: IntoBuf, -{ - fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { - self.inner.poll_remote_window_update(id) - } - - fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.expand_local_window(id, incr) - } -} - impl ControlPing for Connection where T: AsyncRead + AsyncWrite, P: Peer, @@ -82,6 +69,16 @@ impl Connection P: Peer, B: IntoBuf, { + /// Polls for the next update to a remote flow control window. + pub fn poll_window_update(&mut self) -> Poll { + self.inner.poll_window_update() + } + + /// Increases the capacity of a local flow control window. + pub fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.expand_window(id, incr) + } + pub fn send_data(self, id: StreamId, data: B, diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index a2f0840..abce977 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -7,28 +7,33 @@ use std::collections::VecDeque; #[derive(Debug)] pub struct FlowControl { inner: T, - initial_local_window_size: WindowSize, - initial_remote_window_size: WindowSize, + + local_initial: WindowSize, + remote_initial: WindowSize, /// Tracks the connection-level flow control window for receiving data from the /// remote. - connection_local_flow_controller: FlowControlState, + local_connection: FlowControlState, /// Tracks the onnection-level flow control window for receiving data from the remote. - connection_remote_flow_controller: FlowControlState, + remote_connection: FlowControlState, /// Holds the list of streams on which local window updates may be sent. // XXX It would be cool if this didn't exist. - pending_local_window_updates: VecDeque, - pending_local_connection_window_update: bool, + local_pending_streams: VecDeque, - /// If a window update can't be sent immediately, it may need to be saved to be sent later. - sending_local_window_update: Option, + /// If a window update can't be sent immediately, it may need to be saved to be sent + /// later. + local_sending: Option, - /// When `poll_remote_window_update` is not ready, then the calling task is saved to + /// Holds the list of streams on which local window updates may be sent. + // XXX It would be cool if this didn't exist. + remote_pending_streams: VecDeque, + + /// When `poll_window_update` is not ready, then the calling task is saved to /// be notified later. Access to poll_window_update must not be shared across tasks, /// as we only track a single task (and *not* i.e. a task per stream id). - blocked_remote_window_update: Option, + remote_blocked: Option, } impl FlowControl @@ -36,21 +41,23 @@ impl FlowControl T: Sink, SinkError = ConnectionError>, T: ControlStreams { - pub fn new(initial_local_window_size: WindowSize, - initial_remote_window_size: WindowSize, + pub fn new(local_initial: WindowSize, + remote_initial: WindowSize, inner: T) -> FlowControl { FlowControl { inner, - initial_local_window_size, - initial_remote_window_size, - connection_local_flow_controller: FlowControlState::with_initial_size(initial_local_window_size), - connection_remote_flow_controller: FlowControlState::with_initial_size(initial_remote_window_size), - blocked_remote_window_update: None, - sending_local_window_update: None, - pending_local_window_updates: VecDeque::new(), - pending_local_connection_window_update: false, + + local_initial, + local_connection: FlowControlState::with_initial_size(local_initial), + local_sending: None, + local_pending_streams: VecDeque::new(), + + remote_initial, + remote_connection: FlowControlState::with_initial_size(remote_initial), + remote_blocked: None, + remote_pending_streams: VecDeque::new(), } } } @@ -59,7 +66,7 @@ impl FlowControl impl FlowControl { fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { if id.is_zero() { - Some(&mut self.connection_local_flow_controller) + Some(&mut self.local_connection) } else { self.inner.streams_mut().get_mut(id).and_then(|s| s.local_flow_controller()) } @@ -67,7 +74,7 @@ impl FlowControl { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { if id.is_zero() { - Some(&mut self.connection_remote_flow_controller) + Some(&mut self.remote_connection) } else { self.inner.streams_mut().get_mut(id).and_then(|s| s.remote_flow_controller()) } @@ -91,25 +98,26 @@ impl ControlStreams for FlowControl { /// Exposes a public upward API for flow control. impl ControlFlow for FlowControl { - fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { - if self.stream_is_reset(id).is_some() { - return Err(error::User::StreamReset.into()); + fn poll_window_update(&mut self) -> Poll { + // This biases connection window updates, which probably makese sense. + if let Some(incr) = self.remote_connection.apply_window_update() { + return Ok(Async::Ready(WindowUpdate(StreamId::zero(), incr))); } - match self.remote_flow_controller(id) { - None => return Err(error::User::InvalidStreamId.into()), - Some(mut flow) => { - if let Some(sz) = flow.apply_window_update() { - return Ok(Async::Ready(sz)); + // TODO this should probably account for stream priority? + while let Some(id) = self.remote_pending_streams.pop_front() { + if let Some(mut flow) = self.remote_flow_controller(id) { + if let Some(incr) = flow.apply_window_update() { + return Ok(Async::Ready(WindowUpdate(id, incr))); } } } - self.blocked_remote_window_update = Some(task::current()); + self.remote_blocked = Some(task::current()); return Ok(Async::NotReady); } - fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { let added = match self.local_flow_controller(id) { None => false, Some(mut fc) => { @@ -118,11 +126,9 @@ impl ControlFlow for FlowControl { } }; - if added { - if id.is_zero() { - self.pending_local_connection_window_update = true; - } else { - self.pending_local_window_updates.push_back(id); + if added { + if !id.is_zero() { + self.local_pending_streams.push_back(id); } Ok(()) } else if self.stream_is_reset(id).is_some() { @@ -148,18 +154,16 @@ impl FlowControl T: ControlStreams, { /// Returns ready when there are no pending window updates to send. - fn poll_send_local_window_updates(&mut self) -> Poll<(), ConnectionError> { - if let Some(f) = self.sending_local_window_update.take() { + fn poll_send_local(&mut self) -> Poll<(), ConnectionError> { + if let Some(f) = self.local_sending.take() { try_ready!(self.try_send(f)); } - if self.pending_local_connection_window_update { - if let Some(incr) = self.connection_local_flow_controller.apply_window_update() { - try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr))); - } + if let Some(incr) = self.local_connection.apply_window_update() { + try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr))); } - while let Some(id) = self.pending_local_window_updates.pop_front() { + while let Some(id) = self.local_pending_streams.pop_front() { if self.stream_is_reset(id).is_none() { let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update()); if let Some(incr) = update { @@ -173,7 +177,7 @@ impl FlowControl fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> { if self.inner.start_send(f.into())?.is_not_ready() { - self.sending_local_window_update = Some(f); + self.local_sending = Some(f); Ok(Async::NotReady) } else { Ok(Async::Ready(())) @@ -204,7 +208,7 @@ impl ApplySettings for FlowControl fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.apply_local_settings(set)?; - let old_window_size = self.initial_local_window_size; + let old_window_size = self.local_initial; let new_window_size = set.initial_window_size(); if new_window_size == old_window_size { return Ok(()); @@ -219,14 +223,14 @@ impl ApplySettings for FlowControl streams.expand_all_local_windows(incr); } - self.initial_local_window_size = new_window_size; + self.local_initial = new_window_size; Ok(()) } fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.apply_remote_settings(set)?; - let old_window_size = self.initial_remote_window_size; + let old_window_size = self.remote_initial; let new_window_size = set.initial_window_size(); if new_window_size == old_window_size { return Ok(()); @@ -241,7 +245,7 @@ impl ApplySettings for FlowControl streams.expand_all_remote_windows(incr); } - self.initial_remote_window_size = new_window_size; + self.remote_initial = new_window_size; Ok(()) } } @@ -267,7 +271,7 @@ impl Stream for FlowControl Some(Data(v)) => { let sz = v.payload().len() as FrameSize; - if self.connection_local_flow_controller.claim_window(sz).is_err() { + if self.local_connection.claim_window(sz).is_err() { return Err(error::Reason::FlowControlError.into()) } // If this frame ends the stream, there may no longer be a flow @@ -298,51 +302,60 @@ impl Sink for FlowControl fn start_send(&mut self, frame: Frame) -> StartSend { use frame::Frame::*; - if self.poll_send_local_window_updates()?.is_not_ready() { + debug_assert!(self.stream_is_reset(frame.stream_id()).is_none()); + + // Ensures that: + // 1. all pending local window updates have been sent to the remote. + // 2. the underlying transport is will accept the frame. It's important that this + // be checked before claiming capacity from the flow controllers. + if self.poll_ready()?.is_not_ready() { return Ok(AsyncSink::NotReady(frame)); } - match frame { - Data(v) => { - // Before claiming space, ensure that the transport will accept the frame. - if self.inner.poll_ready()?.is_not_ready() { - return Ok(AsyncSink::NotReady(Data(v))); - } + // Ensure that an outbound data frame does not violate the remote's flow control + // window. + if let &Data(ref v) = &frame { + let sz = v.payload().remaining() as FrameSize; - let sz = v.payload().remaining() as FrameSize; - if self.connection_remote_flow_controller.claim_window(sz).is_err() { - return Err(error::User::FlowControlViolation.into()); - } - if let Some(fc) = self.remote_flow_controller(v.stream_id()) { - if fc.claim_window(sz).is_err() { - return Err(error::User::FlowControlViolation.into()) - } - } - - let res = self.inner.start_send(Data(v))?; - assert!(res.is_ready()); - Ok(res) + // Ensure there's enough capacity on the connection before acting on the + // stream. + if !self.remote_connection.check_window(sz) { + return Err(error::User::FlowControlViolation.into()); } - frame => self.inner.start_send(frame), + // Ensure there's enough capacity on stream. + { + let mut fc = self.streams_mut() + .remote_flow_controller(v.stream_id()) + .expect("no remote stream for data frame"); + if fc.claim_window(sz).is_err() { + return Err(error::User::FlowControlViolation.into()) + } + } + + self.remote_connection.claim_window(sz) + .expect("remote connection flow control error"); } + + let res = self.inner.start_send(frame)?; + assert!(res.is_ready()); + Ok(res) } fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - try_ready!(self.inner.poll_complete()); - self.poll_send_local_window_updates() + try_ready!(self.poll_send_local()); + self.inner.poll_complete() } } impl ReadySink for FlowControl - where T: Stream, - T: Sink, SinkError = ConnectionError>, + where T: Sink, SinkError = ConnectionError>, T: ReadySink, T: ControlStreams, U: Buf, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.inner.poll_ready()); - self.poll_send_local_window_updates() + try_ready!(self.poll_send_local()); + self.inner.poll_ready() } } diff --git a/src/proto/flow_control_state.rs b/src/proto/flow_control_state.rs index 2c3b6e5..4babb39 100644 --- a/src/proto/flow_control_state.rs +++ b/src/proto/flow_control_state.rs @@ -51,12 +51,17 @@ impl FlowControlState { } } + /// Returns true iff `claim_window(sz)` would return succeed. + pub fn check_window(&mut self, sz: WindowSize) -> bool { + sz <= self.window_size + } + /// Claims the provided amount from the window, if there is enough space. /// /// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than /// have been previously claimed. pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> { - if self.window_size < sz { + if !self.check_window(sz) { return Err(WindowUnderflow); } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 915d557..76db759 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -110,18 +110,30 @@ pub trait ApplySettings { fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; } +#[derive(Debug, Copy, Clone)] +pub struct WindowUpdate(pub StreamId, pub WindowSize); +impl WindowUpdate { + pub fn stream_id(&self) -> StreamId { + self.0 + } + + pub fn increment(&self) -> WindowSize { + self.1 + } +} + /// Exposes flow control states to "upper" layers of the transport (i.e. above /// FlowControl). pub trait ControlFlow { - /// Asks the flow controller for unreported send capacity on a stream. - /// - /// Errors if the given stream is not active. - fn poll_remote_window_update(&mut self, id: StreamId) -> Poll; + /// Polls for the next window update from the remote. + fn poll_window_update(&mut self) -> Poll; - /// Attempts to increase the receive capacity of a stream. + /// Increases the local receive capacity of a stream. /// - /// Errors if the given stream is not active. - fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; + /// This may cause a window update to be sent to the remote. + /// + /// Fails if the given stream is not active. + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; } /// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index 3f715a3..8adf216 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -353,7 +353,7 @@ mod test { impl ReadySink for Transport { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - let mut trans = self.0.borrow_mut(); + let trans = self.0.borrow(); if trans.closing || trans.start_send_blocked { Ok(Async::NotReady) } else { diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 2b267b2..355b368 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -110,12 +110,12 @@ impl ControlStreams for Settings { } impl ControlFlow for Settings { - fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { - self.inner.poll_remote_window_update(id) + fn poll_window_update(&mut self) -> Poll { + self.inner.poll_window_update() } - fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.expand_local_window(id, incr) + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.expand_window(id, incr) } } diff --git a/src/proto/state.rs b/src/proto/state.rs index 611c81f..ab4d79d 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -90,7 +90,8 @@ impl StreamState { if eos { *self = HalfClosedRemote(local); } else { - *self = Open { local, remote: Data(FlowControlState::with_initial_size(initial_recv_window_size)) }; + let remote = Data(FlowControlState::with_initial_size(initial_recv_window_size)); + *self = Open { local, remote }; } Ok(true) } @@ -300,6 +301,14 @@ impl StreamMap { self.inner.get_mut(&id) } + pub fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.get_mut(&id).and_then(|s| s.local_flow_controller()) + } + + pub fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.get_mut(&id).and_then(|s| s.remote_flow_controller()) + } + pub fn has_stream(&mut self, id: StreamId) -> bool { self.inner.contains_key(&id) } diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index 6ec8085..19c0271 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -14,6 +14,7 @@ use std::marker::PhantomData; // TODO reset_streams needs to be bounded. // TODO track reserved streams (PUSH_PROMISE). +/// Tracks a connection's streams. #[derive(Debug)] pub struct StreamTracker { inner: T, @@ -96,7 +97,7 @@ impl ControlStreams for StreamTracker { /// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. /// -/// > Indicates the maximum number of concurrent streams that the sender will allow. This +/// > Indicates the maximum number of concurrent streams that the senderg will allow. This /// > limit is directional: it applies to the number of streams that the sender permits /// > the receiver to create. Initially, there is no limit to this value. It is /// > recommended that this value be no smaller than 100, so as to not unnecessarily limit @@ -152,8 +153,8 @@ impl Stream for StreamTracker fn poll(&mut self) -> Poll, T::Error> { use frame::Frame::*; - // The local must complete refusing the remote stream before processing additional - // frames. + // Since there's only one slot for pending refused streams, it must be cleared + // before polling a frame from the transport. if let Some(id) = self.pending_refused_stream.take() { try_ready!(self.send_refusal(id)); }