clarify terminology and fix several obvious bugs in the process

This commit is contained in:
Oliver Gould
2017-07-13 06:43:28 +00:00
parent cc97653fd7
commit d0c55c52e9
4 changed files with 129 additions and 90 deletions

View File

@@ -25,13 +25,18 @@ pub struct Connection<T, P, B: IntoBuf = Bytes> {
streams: StreamMap<State>, streams: StreamMap<State>,
peer: PhantomData<P>, peer: PhantomData<P>,
/// Tracks connection-level flow control. /// Tracks the connection-level flow control window for receiving data from the
recv_flow_controller: FlowController, /// remote.
send_flow_controller: FlowController, local_flow_controller: FlowController,
/// Tracks the onnection-level flow control window for receiving data from the remote.
remote_flow_controller: FlowController,
pending_send_window_update: Option<frame::WindowUpdate>, /// When `poll_window_update` is not ready, then the calling task is saved to be
blocked_recv_window_update: Option<task::Task>, /// notified later. Access to poll_window_update must not be shared across tasks.
blocked_window_update: Option<task::Task>,
sending_window_update: Option<frame::WindowUpdate>,
} }
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>; type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
@@ -49,76 +54,92 @@ pub fn new<T, P, B>(transport: proto::Inner<T, B::Buf>)
streams: StreamMap::default(), streams: StreamMap::default(),
peer: PhantomData, peer: PhantomData,
recv_flow_controller: FlowController::new(recv_window_size), local_flow_controller: FlowController::new(recv_window_size),
send_flow_controller: FlowController::new(send_window_size), remote_flow_controller: FlowController::new(send_window_size),
pending_send_window_update: None, blocked_window_update: None,
blocked_recv_window_update: None, sending_window_update: None,
} }
} }
impl<T, P, B: IntoBuf> Connection<T, P, B> { impl<T, P, B: IntoBuf> Connection<T, P, B> {
#[inline] #[inline]
fn claim_connection_recv_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { fn claim_local_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> {
self.recv_flow_controller.claim_window(len) self.local_flow_controller.claim_window(len)
.map_err(|_| error::Reason::FlowControlError.into()) .map_err(|_| error::Reason::FlowControlError.into())
} }
#[inline] #[inline]
fn claim_connection_send_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { fn claim_remote_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> {
self.send_flow_controller.claim_window(len) self.remote_flow_controller.claim_window(len)
.map_err(|_| error::Reason::FlowControlError.into()) .map_err(|_| error::User::FlowControlViolation.into())
} }
// TODO check max frame size /// Polls for the amount of additional data that may be sent to a remote.
///
pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> { /// Connection and stream updates are distinct.
pub fn poll_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> {
let added = if id.is_zero() { let added = if id.is_zero() {
self.send_flow_controller.take_window_update() self.remote_flow_controller.take_window_update()
} else { } else {
self.streams.get_mut(&id).and_then(|mut s| s.take_recv_window_update()) self.streams.get_mut(&id).and_then(|s| s.take_send_window_update())
}; };
match added { match added {
Some(incr) => Ok(Async::Ready(incr)), Some(incr) => Ok(Async::Ready(incr)),
None => { None => {
self.blocked_recv_window_update = Some(task::current()); self.blocked_window_update = Some(task::current());
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
} }
/// Publishes local stream window updates to the remote. /// Increases the amount of data that the remote endpoint may send.
/// ///
/// Connection window updates (StreamId=0) and stream window must be published /// Connection and stream updates are distinct.
/// distinctly. pub fn increment_window_size(&mut self, id: StreamId, incr: WindowSize) {
pub fn init_send_window_update(&mut self, id: StreamId, incr: WindowSize) { assert!(self.sending_window_update.is_none());
assert!(self.pending_send_window_update.is_none());
let added = if id.is_zero() { let added = if id.is_zero() {
self.send_flow_controller.add_to_window(incr); self.local_flow_controller.increment_window_size(incr);
self.send_flow_controller.take_window_update() self.local_flow_controller.take_window_update()
} else { } else {
self.streams.get_mut(&id).and_then(|mut s| s.send_window_update(incr)) self.streams.get_mut(&id).and_then(|s| {
s.increment_recv_window_size(incr);
s.take_recv_window_update()
})
}; };
if let Some(added) = added { if let Some(added) = added {
self.pending_send_window_update = Some(frame::WindowUpdate::new(id, added)); self.sending_window_update = Some(frame::WindowUpdate::new(id, added));
} }
} }
/// Advertises the remote's stream window updates. /// Handles a window update received from the remote, indicating that the local may
/// send `incr` additional bytes.
/// ///
/// Connection window updates (id=0) and stream window updates are advertised /// Connection window updates (id=0) and stream window updates are advertised
/// distinctly. /// distinctly.
fn recv_window_update(&mut self, id: StreamId, incr: WindowSize) { fn increment_send_window_size(&mut self, id: StreamId, incr: WindowSize) {
if id.is_zero() { if incr == 0 {
return self.recv_flow_controller.add_to_window(incr); return;
} }
if let Some(mut s) = self.streams.get_mut(&id) { let added = if id.is_zero() {
s.recv_window_update(incr); self.remote_flow_controller.increment_window_size(incr);
true
} else if let Some(mut s) = self.streams.get_mut(&id) {
s.increment_send_window_size(incr);
true
} else {
false
};
if added {
if let Some(task) = self.blocked_window_update.take() {
task.notify();
}
} }
} }
} }
@@ -129,13 +150,14 @@ impl<T, P, B> Connection<T, P, B>
B: IntoBuf B: IntoBuf
{ {
/// Attempts to send a window update to the remote, if one is pending. /// Attempts to send a window update to the remote, if one is pending.
fn poll_send_window_update(&mut self) -> Poll<(), ConnectionError> { fn poll_sending_window_update(&mut self) -> Poll<(), ConnectionError> {
if let Some(f) = self.pending_send_window_update.take() { if let Some(f) = self.sending_window_update.take() {
if self.inner.start_send(f.into())?.is_not_ready() { if self.inner.start_send(f.into())?.is_not_ready() {
self.pending_send_window_update = Some(f); self.sending_window_update = Some(f);
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
} }
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
} }
@@ -213,11 +235,11 @@ impl<T, P, B> Stream for Connection<T, P, B>
Async::Ready(f) => f, Async::Ready(f) => f,
Async::NotReady => { Async::NotReady => {
// Receiving new frames may depend on ensuring that the write buffer // Receiving new frames may depend on ensuring that the write buffer
// is clear (e.g. if window updates need to be sent), so `poll_ready` // is clear (e.g. if window updates need to be sent), so `poll_complete`
// is called here. // is called here.
try_ready!(self.inner.poll_complete()); try_ready!(self.inner.poll_complete());
// If the snder sink is ready, we attempt to poll the underlying // If the sender sink is ready, we attempt to poll the underlying
// stream once more because it, may have been made ready by flushing // stream once more because it, may have been made ready by flushing
// the sink. // the sink.
try_ready!(self.inner.poll()) try_ready!(self.inner.poll())
@@ -258,7 +280,7 @@ impl<T, P, B> Stream for Connection<T, P, B>
let id = v.stream_id(); let id = v.stream_id();
let end_of_stream = v.is_end_stream(); let end_of_stream = v.is_end_stream();
self.claim_connection_recv_window(v.len())?; self.claim_local_window(v.len())?;
match self.streams.get_mut(&id) { match self.streams.get_mut(&id) {
None => return Err(error::Reason::ProtocolError.into()), None => return Err(error::Reason::ProtocolError.into()),
Some(state) => state.recv_data(end_of_stream, v.len())?, Some(state) => state.recv_data(end_of_stream, v.len())?,
@@ -273,9 +295,9 @@ impl<T, P, B> Stream for Connection<T, P, B>
} }
Some(WindowUpdate(v)) => { Some(WindowUpdate(v)) => {
// When a window update is read from the remote, apply that update to // When a window update is received from the remote, apply that update
// the proper stream. // to the proper stream so that more data may be sent to the remote.
self.recv_window_update(v.stream_id(), v.size_increment()); self.increment_send_window_size(v.stream_id(), v.size_increment());
// There's nothing to return yet, so continue attempting to read // There's nothing to return yet, so continue attempting to read
// additional frames. // additional frames.
@@ -306,12 +328,12 @@ impl<T, P, B> Sink for Connection<T, P, B>
use frame::Frame::Headers; use frame::Frame::Headers;
trace!("start_send"); trace!("start_send");
// Ensure that a pending window update is sent before doing anything further. // Ensure that a pending window update is sent before doing anything further and
if self.poll_send_window_update()? == Async::NotReady // ensure that the inner sink will actually receive a frame.
|| self.inner.poll_ready()? == Async::NotReady { if self.poll_ready()? == Async::NotReady {
return Ok(AsyncSink::NotReady(item)); return Ok(AsyncSink::NotReady(item));
} }
assert!(self.pending_send_window_update.is_none()); assert!(self.sending_window_update.is_none());
match item { match item {
Frame::Headers { id, headers, end_of_stream } => { Frame::Headers { id, headers, end_of_stream } => {
@@ -353,12 +375,12 @@ impl<T, P, B> Sink for Connection<T, P, B>
} }
Frame::Data { id, data, data_len, end_of_stream } => { Frame::Data { id, data, data_len, end_of_stream } => {
self.claim_connection_send_window(data_len)?; try!(self.claim_remote_window(data_len));
// The stream must be initialized at this point. // The stream must be initialized at this point.
match self.streams.get_mut(&id) { match self.streams.get_mut(&id) {
None => return Err(error::User::InactiveStreamId.into()), None => return Err(error::User::InactiveStreamId.into()),
Some(state) => try!(state.send_data(end_of_stream, data_len)), Some(mut s) => try!(s.send_data(end_of_stream, data_len)),
} }
let mut frame = frame::Data::from_buf(id, data.into_buf()); let mut frame = frame::Data::from_buf(id, data.into_buf());
@@ -367,10 +389,7 @@ impl<T, P, B> Sink for Connection<T, P, B>
} }
let res = try!(self.inner.start_send(frame.into())); let res = try!(self.inner.start_send(frame.into()));
// poll_ready has already been called.
assert!(res.is_ready()); assert!(res.is_ready());
Ok(AsyncSink::Ready) Ok(AsyncSink::Ready)
} }
@@ -394,8 +413,13 @@ impl<T, P, B> Sink for Connection<T, P, B>
fn poll_complete(&mut self) -> Poll<(), ConnectionError> { fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
trace!("poll_complete"); trace!("poll_complete");
try_ready!(self.inner.poll_complete()); try_ready!(self.inner.poll_complete());
self.poll_send_window_update()
// TODO check for settings updates and update the initial window size of all
// streams.
self.poll_sending_window_update()
} }
} }
@@ -407,6 +431,6 @@ impl<T, P, B> ReadySink for Connection<T, P, B>
fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { fn poll_ready(&mut self) -> Poll<(), Self::SinkError> {
trace!("poll_ready"); trace!("poll_ready");
try_ready!(self.inner.poll_ready()); try_ready!(self.inner.poll_ready());
self.poll_send_window_update() self.poll_sending_window_update()
} }
} }

View File

@@ -38,7 +38,10 @@ impl FlowController {
self.underflow += decr; self.underflow += decr;
} }
/// Claim the provided amount from the window, if there is enough space. /// Claims the provided amount from the window, if there is enough space.
///
/// Fails when `take_window_update()` hasn't returned at least `sz` more bytes than
/// have been previously claimed.
pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> { pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> {
if self.window_size < sz { if self.window_size < sz {
return Err(WindowUnderflow); return Err(WindowUnderflow);
@@ -49,7 +52,7 @@ impl FlowController {
} }
/// Applies a window increment immediately. /// Applies a window increment immediately.
pub fn add_to_window(&mut self, sz: WindowSize) { pub fn increment_window_size(&mut self, sz: WindowSize) {
if sz <= self.underflow { if sz <= self.underflow {
self.underflow -= sz; self.underflow -= sz;
return; return;

View File

@@ -91,7 +91,6 @@ impl<T, U> Settings<T>
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
} }

View File

@@ -59,35 +59,13 @@ pub enum State {
} }
impl State { impl State {
/// Updates the local flow controller with the given window size increment. /// Updates the local flow controller so that the remote may send `incr` more bytes.
/// ///
/// Returns the amount of capacity created, accounting for window size changes. The /// Returns the amount of capacity created, accounting for window size changes. The
/// caller should send the the returned window size increment to the remote. /// caller should send the the returned window size increment to the remote.
/// ///
/// If the remote is closed, None is returned. /// If the remote is closed, None is returned.
pub fn send_window_update(&mut self, incr: u32) -> Option<u32> { pub fn increment_send_window_size(&mut self, incr: u32) {
use self::State::*;
use self::PeerState::*;
if incr == 0 {
return None;
}
match self {
&mut Open { local: Data(ref mut fc), .. } |
&mut HalfClosedRemote(Data(ref mut fc)) => {
fc.add_to_window(incr);
fc.take_window_update()
}
_ => None,
}
}
/// Updates the remote flow controller with the given window size increment.
///
/// Returns the amount of capacity created, accounting for window size changes. The
/// caller should send the the returned window size increment to the remote.
pub fn recv_window_update(&mut self, incr: u32) {
use self::State::*; use self::State::*;
use self::PeerState::*; use self::PeerState::*;
@@ -97,12 +75,14 @@ impl State {
match self { match self {
&mut Open { remote: Data(ref mut fc), .. } | &mut Open { remote: Data(ref mut fc), .. } |
&mut HalfClosedLocal(Data(ref mut fc)) => fc.add_to_window(incr), &mut HalfClosedLocal(Data(ref mut fc)) => fc.increment_window_size(incr),
_ => {}, _ => {},
} }
} }
pub fn take_recv_window_update(&mut self) -> Option<u32> { /// Consumes newly-advertised capacity to inform the local endpoint it may send more
/// data.
pub fn take_send_window_update(&mut self) -> Option<u32> {
use self::State::*; use self::State::*;
use self::PeerState::*; use self::PeerState::*;
@@ -113,6 +93,39 @@ impl State {
} }
} }
/// Updates the remote flow controller so that the remote may receive `incr`
/// additional bytes.
///
/// Returns the amount of capacity created, accounting for window size changes. The
/// caller should send the the returned window size increment to the remote.
pub fn increment_recv_window_size(&mut self, incr: u32) {
use self::State::*;
use self::PeerState::*;
if incr == 0 {
return;
}
match self {
&mut Open { local: Data(ref mut fc), .. } |
&mut HalfClosedRemote(Data(ref mut fc)) => fc.increment_window_size(incr),
_ => {},
}
}
/// Consumes newly-advertised capacity to inform the local endpoint it may send more
/// data.
pub fn take_recv_window_update(&mut self) -> Option<u32> {
use self::State::*;
use self::PeerState::*;
match self {
&mut Open { local: Data(ref mut fc), .. } |
&mut HalfClosedRemote(Data(ref mut fc)) => fc.take_window_update(),
_ => None,
}
}
/// Applies an update to the remote's initial window size. /// Applies an update to the remote's initial window size.
/// ///
/// Per RFC 7540 §6.9.2 /// Per RFC 7540 §6.9.2
@@ -139,7 +152,7 @@ impl State {
if new < old { if new < old {
fc.shrink_window(old - new); fc.shrink_window(old - new);
} else { } else {
fc.add_to_window(new - old); fc.increment_window_size(new - old);
} }
} }
_ => {} _ => {}
@@ -240,7 +253,7 @@ impl State {
/// id. `Err` is returned if this is an invalid state transition. /// id. `Err` is returned if this is an invalid state transition.
pub fn send_headers<P: Peer>(&mut self, pub fn send_headers<P: Peer>(&mut self,
eos: bool, eos: bool,
initial_send_window_size: u32) initial_window_size: u32)
-> Result<bool, ConnectionError> -> Result<bool, ConnectionError>
{ {
use self::State::*; use self::State::*;
@@ -252,7 +265,7 @@ impl State {
HalfClosedLocal(Headers) HalfClosedLocal(Headers)
} else { } else {
Open { Open {
local: Data(FlowController::new(initial_send_window_size)), local: Data(FlowController::new(initial_window_size)),
remote: Headers, remote: Headers,
} }
}; };
@@ -266,7 +279,7 @@ impl State {
*self = if eos { *self = if eos {
HalfClosedLocal(remote) HalfClosedLocal(remote)
} else { } else {
let local = Data(FlowController::new(initial_send_window_size)); let local = Data(FlowController::new(initial_window_size));
Open { local, remote } Open { local, remote }
}; };
@@ -279,7 +292,7 @@ impl State {
*self = if eos { *self = if eos {
Closed Closed
} else { } else {
HalfClosedRemote(Data(FlowController::new(initial_send_window_size))) HalfClosedRemote(Data(FlowController::new(initial_window_size)))
}; };
Ok(false) Ok(false)