From d0c55c52e99beb1b44ab81dfc17b3d85bb6a42ba Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 13 Jul 2017 06:43:28 +0000 Subject: [PATCH] clarify terminology and fix several obvious bugs in the process --- src/proto/connection.rs | 134 ++++++++++++++++++++++---------------- src/proto/flow_control.rs | 7 +- src/proto/settings.rs | 1 - src/proto/state.rs | 77 +++++++++++++--------- 4 files changed, 129 insertions(+), 90 deletions(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 6dbcb6f..67ce03c 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -25,13 +25,18 @@ pub struct Connection { streams: StreamMap, peer: PhantomData

, - /// Tracks connection-level flow control. - recv_flow_controller: FlowController, - send_flow_controller: FlowController, + /// Tracks the connection-level flow control window for receiving data from the + /// remote. + local_flow_controller: FlowController, + /// Tracks the onnection-level flow control window for receiving data from the remote. + remote_flow_controller: FlowController, - pending_send_window_update: Option, - blocked_recv_window_update: Option, + /// When `poll_window_update` is not ready, then the calling task is saved to be + /// notified later. Access to poll_window_update must not be shared across tasks. + blocked_window_update: Option, + + sending_window_update: Option, } type StreamMap = OrderMap>; @@ -49,76 +54,92 @@ pub fn new(transport: proto::Inner) streams: StreamMap::default(), peer: PhantomData, - recv_flow_controller: FlowController::new(recv_window_size), - send_flow_controller: FlowController::new(send_window_size), + local_flow_controller: FlowController::new(recv_window_size), + remote_flow_controller: FlowController::new(send_window_size), - pending_send_window_update: None, - blocked_recv_window_update: None, + blocked_window_update: None, + sending_window_update: None, } } impl Connection { #[inline] - fn claim_connection_recv_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { - self.recv_flow_controller.claim_window(len) + fn claim_local_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { + self.local_flow_controller.claim_window(len) .map_err(|_| error::Reason::FlowControlError.into()) } #[inline] - fn claim_connection_send_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { - self.send_flow_controller.claim_window(len) - .map_err(|_| error::Reason::FlowControlError.into()) + fn claim_remote_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { + self.remote_flow_controller.claim_window(len) + .map_err(|_| error::User::FlowControlViolation.into()) } - // TODO check max frame size - - pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { + /// Polls for the amount of additional data that may be sent to a remote. + /// + /// Connection and stream updates are distinct. + pub fn poll_window_update(&mut self, id: StreamId) -> Poll { let added = if id.is_zero() { - self.send_flow_controller.take_window_update() + self.remote_flow_controller.take_window_update() } else { - self.streams.get_mut(&id).and_then(|mut s| s.take_recv_window_update()) + self.streams.get_mut(&id).and_then(|s| s.take_send_window_update()) }; match added { Some(incr) => Ok(Async::Ready(incr)), None => { - self.blocked_recv_window_update = Some(task::current()); + self.blocked_window_update = Some(task::current()); Ok(Async::NotReady) } } } - /// Publishes local stream window updates to the remote. + /// Increases the amount of data that the remote endpoint may send. /// - /// Connection window updates (StreamId=0) and stream window must be published - /// distinctly. - pub fn init_send_window_update(&mut self, id: StreamId, incr: WindowSize) { - assert!(self.pending_send_window_update.is_none()); + /// Connection and stream updates are distinct. + pub fn increment_window_size(&mut self, id: StreamId, incr: WindowSize) { + assert!(self.sending_window_update.is_none()); let added = if id.is_zero() { - self.send_flow_controller.add_to_window(incr); - self.send_flow_controller.take_window_update() + self.local_flow_controller.increment_window_size(incr); + self.local_flow_controller.take_window_update() } else { - self.streams.get_mut(&id).and_then(|mut s| s.send_window_update(incr)) + self.streams.get_mut(&id).and_then(|s| { + s.increment_recv_window_size(incr); + s.take_recv_window_update() + }) }; if let Some(added) = added { - self.pending_send_window_update = Some(frame::WindowUpdate::new(id, added)); + self.sending_window_update = Some(frame::WindowUpdate::new(id, added)); } } - /// Advertises the remote's stream window updates. + /// Handles a window update received from the remote, indicating that the local may + /// send `incr` additional bytes. /// /// Connection window updates (id=0) and stream window updates are advertised /// distinctly. - fn recv_window_update(&mut self, id: StreamId, incr: WindowSize) { - if id.is_zero() { - return self.recv_flow_controller.add_to_window(incr); + fn increment_send_window_size(&mut self, id: StreamId, incr: WindowSize) { + if incr == 0 { + return; } - if let Some(mut s) = self.streams.get_mut(&id) { - s.recv_window_update(incr); + let added = if id.is_zero() { + self.remote_flow_controller.increment_window_size(incr); + true + } else if let Some(mut s) = self.streams.get_mut(&id) { + s.increment_send_window_size(incr); + true + } else { + false + }; + + if added { + if let Some(task) = self.blocked_window_update.take() { + task.notify(); + } } } } @@ -129,13 +150,14 @@ impl Connection B: IntoBuf { /// Attempts to send a window update to the remote, if one is pending. - fn poll_send_window_update(&mut self) -> Poll<(), ConnectionError> { - if let Some(f) = self.pending_send_window_update.take() { + fn poll_sending_window_update(&mut self) -> Poll<(), ConnectionError> { + if let Some(f) = self.sending_window_update.take() { if self.inner.start_send(f.into())?.is_not_ready() { - self.pending_send_window_update = Some(f); + self.sending_window_update = Some(f); return Ok(Async::NotReady); } } + Ok(Async::Ready(())) } } @@ -213,11 +235,11 @@ impl Stream for Connection Async::Ready(f) => f, Async::NotReady => { // Receiving new frames may depend on ensuring that the write buffer - // is clear (e.g. if window updates need to be sent), so `poll_ready` + // is clear (e.g. if window updates need to be sent), so `poll_complete` // is called here. try_ready!(self.inner.poll_complete()); - // If the snder sink is ready, we attempt to poll the underlying + // If the sender sink is ready, we attempt to poll the underlying // stream once more because it, may have been made ready by flushing // the sink. try_ready!(self.inner.poll()) @@ -258,7 +280,7 @@ impl Stream for Connection let id = v.stream_id(); let end_of_stream = v.is_end_stream(); - self.claim_connection_recv_window(v.len())?; + self.claim_local_window(v.len())?; match self.streams.get_mut(&id) { None => return Err(error::Reason::ProtocolError.into()), Some(state) => state.recv_data(end_of_stream, v.len())?, @@ -273,9 +295,9 @@ impl Stream for Connection } Some(WindowUpdate(v)) => { - // When a window update is read from the remote, apply that update to - // the proper stream. - self.recv_window_update(v.stream_id(), v.size_increment()); + // When a window update is received from the remote, apply that update + // to the proper stream so that more data may be sent to the remote. + self.increment_send_window_size(v.stream_id(), v.size_increment()); // There's nothing to return yet, so continue attempting to read // additional frames. @@ -306,12 +328,12 @@ impl Sink for Connection use frame::Frame::Headers; trace!("start_send"); - // Ensure that a pending window update is sent before doing anything further. - if self.poll_send_window_update()? == Async::NotReady - || self.inner.poll_ready()? == Async::NotReady { + // Ensure that a pending window update is sent before doing anything further and + // ensure that the inner sink will actually receive a frame. + if self.poll_ready()? == Async::NotReady { return Ok(AsyncSink::NotReady(item)); } - assert!(self.pending_send_window_update.is_none()); + assert!(self.sending_window_update.is_none()); match item { Frame::Headers { id, headers, end_of_stream } => { @@ -353,12 +375,12 @@ impl Sink for Connection } Frame::Data { id, data, data_len, end_of_stream } => { - self.claim_connection_send_window(data_len)?; + try!(self.claim_remote_window(data_len)); // The stream must be initialized at this point. match self.streams.get_mut(&id) { None => return Err(error::User::InactiveStreamId.into()), - Some(state) => try!(state.send_data(end_of_stream, data_len)), + Some(mut s) => try!(s.send_data(end_of_stream, data_len)), } let mut frame = frame::Data::from_buf(id, data.into_buf()); @@ -367,10 +389,7 @@ impl Sink for Connection } let res = try!(self.inner.start_send(frame.into())); - - // poll_ready has already been called. assert!(res.is_ready()); - Ok(AsyncSink::Ready) } @@ -394,8 +413,13 @@ impl Sink for Connection fn poll_complete(&mut self) -> Poll<(), ConnectionError> { trace!("poll_complete"); + try_ready!(self.inner.poll_complete()); - self.poll_send_window_update() + + // TODO check for settings updates and update the initial window size of all + // streams. + + self.poll_sending_window_update() } } @@ -407,6 +431,6 @@ impl ReadySink for Connection fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { trace!("poll_ready"); try_ready!(self.inner.poll_ready()); - self.poll_send_window_update() + self.poll_sending_window_update() } } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 2ead575..f6655f8 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -38,7 +38,10 @@ impl FlowController { self.underflow += decr; } - /// Claim the provided amount from the window, if there is enough space. + /// Claims the provided amount from the window, if there is enough space. + /// + /// Fails when `take_window_update()` hasn't returned at least `sz` more bytes than + /// have been previously claimed. pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> { if self.window_size < sz { return Err(WindowUnderflow); @@ -49,7 +52,7 @@ impl FlowController { } /// Applies a window increment immediately. - pub fn add_to_window(&mut self, sz: WindowSize) { + pub fn increment_window_size(&mut self, sz: WindowSize) { if sz <= self.underflow { self.underflow -= sz; return; diff --git a/src/proto/settings.rs b/src/proto/settings.rs index e4f8fb2..b0a9369 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -91,7 +91,6 @@ impl Settings } else { Ok(Async::NotReady) } - } } diff --git a/src/proto/state.rs b/src/proto/state.rs index be8967c..e509733 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -59,35 +59,13 @@ pub enum State { } impl State { - /// Updates the local flow controller with the given window size increment. + /// Updates the local flow controller so that the remote may send `incr` more bytes. /// /// Returns the amount of capacity created, accounting for window size changes. The /// caller should send the the returned window size increment to the remote. /// /// If the remote is closed, None is returned. - pub fn send_window_update(&mut self, incr: u32) -> Option { - use self::State::*; - use self::PeerState::*; - - if incr == 0 { - return None; - } - - match self { - &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedRemote(Data(ref mut fc)) => { - fc.add_to_window(incr); - fc.take_window_update() - } - _ => None, - } - } - - /// Updates the remote flow controller with the given window size increment. - /// - /// Returns the amount of capacity created, accounting for window size changes. The - /// caller should send the the returned window size increment to the remote. - pub fn recv_window_update(&mut self, incr: u32) { + pub fn increment_send_window_size(&mut self, incr: u32) { use self::State::*; use self::PeerState::*; @@ -97,12 +75,14 @@ impl State { match self { &mut Open { remote: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => fc.add_to_window(incr), + &mut HalfClosedLocal(Data(ref mut fc)) => fc.increment_window_size(incr), _ => {}, } } - - pub fn take_recv_window_update(&mut self) -> Option { + + /// Consumes newly-advertised capacity to inform the local endpoint it may send more + /// data. + pub fn take_send_window_update(&mut self) -> Option { use self::State::*; use self::PeerState::*; @@ -113,6 +93,39 @@ impl State { } } + /// Updates the remote flow controller so that the remote may receive `incr` + /// additional bytes. + /// + /// Returns the amount of capacity created, accounting for window size changes. The + /// caller should send the the returned window size increment to the remote. + pub fn increment_recv_window_size(&mut self, incr: u32) { + use self::State::*; + use self::PeerState::*; + + if incr == 0 { + return; + } + + match self { + &mut Open { local: Data(ref mut fc), .. } | + &mut HalfClosedRemote(Data(ref mut fc)) => fc.increment_window_size(incr), + _ => {}, + } + } + + /// Consumes newly-advertised capacity to inform the local endpoint it may send more + /// data. + pub fn take_recv_window_update(&mut self) -> Option { + use self::State::*; + use self::PeerState::*; + + match self { + &mut Open { local: Data(ref mut fc), .. } | + &mut HalfClosedRemote(Data(ref mut fc)) => fc.take_window_update(), + _ => None, + } + } + /// Applies an update to the remote's initial window size. /// /// Per RFC 7540 ยง6.9.2 @@ -139,7 +152,7 @@ impl State { if new < old { fc.shrink_window(old - new); } else { - fc.add_to_window(new - old); + fc.increment_window_size(new - old); } } _ => {} @@ -240,7 +253,7 @@ impl State { /// id. `Err` is returned if this is an invalid state transition. pub fn send_headers(&mut self, eos: bool, - initial_send_window_size: u32) + initial_window_size: u32) -> Result { use self::State::*; @@ -252,7 +265,7 @@ impl State { HalfClosedLocal(Headers) } else { Open { - local: Data(FlowController::new(initial_send_window_size)), + local: Data(FlowController::new(initial_window_size)), remote: Headers, } }; @@ -266,7 +279,7 @@ impl State { *self = if eos { HalfClosedLocal(remote) } else { - let local = Data(FlowController::new(initial_send_window_size)); + let local = Data(FlowController::new(initial_window_size)); Open { local, remote } }; @@ -279,7 +292,7 @@ impl State { *self = if eos { Closed } else { - HalfClosedRemote(Data(FlowController::new(initial_send_window_size))) + HalfClosedRemote(Data(FlowController::new(initial_window_size))) }; Ok(false)