test flow control state

This commit is contained in:
Oliver Gould
2017-07-16 20:11:36 +00:00
parent 06d9978c53
commit b0fd2bfac0
6 changed files with 143 additions and 54 deletions

View File

@@ -65,12 +65,16 @@ impl<T> Data<T> {
} }
impl<T: Buf> Data<T> { impl<T: Buf> Data<T> {
pub fn from_buf(stream_id: StreamId, data: T) -> Self { pub fn from_buf(stream_id: StreamId, data: T, eos: bool) -> Self {
let mut flags = DataFlag::default();
if eos {
flags.set_end_stream();
}
Data { Data {
stream_id, stream_id,
data_len: data.remaining() as FrameSize, data_len: data.remaining() as FrameSize,
data, data,
flags: DataFlag::default(), flags,
pad_len: None, pad_len: None,
} }
} }

View File

@@ -148,29 +148,18 @@ impl<T, P, B> Stream for Connection<T, P, B>
trace!("poll; frame={:?}", frame); trace!("poll; frame={:?}", frame);
let frame = match frame { let frame = match frame {
Some(Headers(v)) => { Some(Headers(v)) => Frame::Headers {
// TODO: Update stream state id: v.stream_id(),
let stream_id = v.stream_id(); end_of_stream: v.is_end_stream(),
let end_of_stream = v.is_end_stream();
Frame::Headers {
id: stream_id,
headers: P::convert_poll_message(v), headers: P::convert_poll_message(v),
end_of_stream: end_of_stream, },
}
}
Some(Data(v)) => { Some(Data(v)) => Frame::Data {
let id = v.stream_id(); id: v.stream_id(),
let end_of_stream = v.is_end_stream(); end_of_stream: v.is_end_stream(),
Frame::Data {
id,
end_of_stream,
data_len: v.len(), data_len: v.len(),
data: v.into_payload(), data: v.into_payload(),
} },
}
Some(frame) => panic!("unexpected frame; frame={:?}", frame), Some(frame) => panic!("unexpected frame; frame={:?}", frame),
None => return Ok(Async::Ready(None)), None => return Ok(Async::Ready(None)),
@@ -193,12 +182,11 @@ impl<T, P, B> Sink for Connection<T, P, B>
fn start_send(&mut self, item: Self::SinkItem) fn start_send(&mut self, item: Self::SinkItem)
-> StartSend<Self::SinkItem, Self::SinkError> -> StartSend<Self::SinkItem, Self::SinkError>
{ {
use frame::Frame::Headers;
trace!("start_send"); trace!("start_send");
// Ensure that a pending window update is sent before doing anything further and // Ensure the transport is ready to send a frame before we transform the external
// ensure that the inner sink will actually receive a frame. // `Frame` into an internal `frame::Framme`.
if self.poll_ready()? == Async::NotReady { if self.inner.poll_ready()? == Async::NotReady {
return Ok(AsyncSink::NotReady(item)); return Ok(AsyncSink::NotReady(item));
} }
@@ -208,17 +196,13 @@ impl<T, P, B> Sink for Connection<T, P, B>
// it's already been determined that the inner `Sink` can accept the item. // it's already been determined that the inner `Sink` can accept the item.
// If the item is rejected, then there is a bug. // If the item is rejected, then there is a bug.
let frame = P::convert_send_message(id, headers, end_of_stream); let frame = P::convert_send_message(id, headers, end_of_stream);
let res = self.inner.start_send(Headers(frame))?; let res = self.inner.start_send(frame::Frame::Headers(frame))?;
assert!(res.is_ready()); assert!(res.is_ready());
Ok(AsyncSink::Ready) Ok(AsyncSink::Ready)
} }
Frame::Data { id, data, end_of_stream, .. } => { Frame::Data { id, data, end_of_stream, .. } => {
let mut frame = frame::Data::from_buf(id, data.into_buf()); let frame = frame::Data::from_buf(id, data.into_buf(), end_of_stream);
if end_of_stream {
frame.set_end_stream();
}
let res = try!(self.inner.start_send(frame.into())); let res = try!(self.inner.start_send(frame.into()));
assert!(res.is_ready()); assert!(res.is_ready());
Ok(AsyncSink::Ready) Ok(AsyncSink::Ready)

View File

@@ -45,8 +45,8 @@ impl<T, U> FlowControl<T>
inner, inner,
initial_local_window_size, initial_local_window_size,
initial_remote_window_size, initial_remote_window_size,
local_flow_controller: FlowControlState::new(initial_local_window_size), local_flow_controller: FlowControlState::with_initial_size(initial_local_window_size),
remote_flow_controller: FlowControlState::new(initial_remote_window_size), remote_flow_controller: FlowControlState::with_next_update(initial_remote_window_size),
blocked_remote_window_update: None, blocked_remote_window_update: None,
sending_local_window_update: None, sending_local_window_update: None,
pending_local_window_updates: VecDeque::new(), pending_local_window_updates: VecDeque::new(),
@@ -54,6 +54,7 @@ impl<T, U> FlowControl<T>
} }
} }
// Flow control utitlities.
impl<T: ControlStreams> FlowControl<T> { impl<T: ControlStreams> FlowControl<T> {
fn claim_local_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { fn claim_local_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> {
let res = if id.is_zero() { let res = if id.is_zero() {
@@ -106,6 +107,7 @@ impl<T: ControlStreams> FlowControl<T> {
} }
} }
/// Exposes a public upward API for flow control.
impl<T: ControlStreams> ControlFlow for FlowControl<T> { impl<T: ControlStreams> ControlFlow for FlowControl<T> {
fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> { fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> {
if id.is_zero() { if id.is_zero() {
@@ -139,6 +141,7 @@ impl<T: ControlStreams> ControlFlow for FlowControl<T> {
} }
} }
/// Proxies access to streams.
impl<T: ControlStreams> ControlStreams for FlowControl<T> { impl<T: ControlStreams> ControlStreams for FlowControl<T> {
#[inline] #[inline]
fn streams(&self) -> &StreamMap { fn streams(&self) -> &StreamMap {
@@ -183,7 +186,7 @@ impl<T, U> FlowControl<T>
/// Applies an update to an endpoint's initial window size. /// Applies an update to an endpoint's initial window size.
/// ///
/// Per RFC 7540 §6.9.2 /// Per RFC 7540 §6.9.2:
/// ///
/// > In addition to changing the flow-control window for streams that are not yet /// > In addition to changing the flow-control window for streams that are not yet
/// > active, a SETTINGS frame can alter the initial flow-control window size for /// > active, a SETTINGS frame can alter the initial flow-control window size for

View File

@@ -18,12 +18,12 @@ pub struct FlowControlState {
impl Default for FlowControlState { impl Default for FlowControlState {
fn default() -> Self { fn default() -> Self {
Self::new(DEFAULT_INITIAL_WINDOW_SIZE) Self::with_initial_size(DEFAULT_INITIAL_WINDOW_SIZE)
} }
} }
impl FlowControlState { impl FlowControlState {
pub fn new(window_size: WindowSize) -> FlowControlState { pub fn with_initial_size(window_size: WindowSize) -> FlowControlState {
FlowControlState { FlowControlState {
window_size, window_size,
underflow: 0, underflow: 0,
@@ -31,11 +31,24 @@ impl FlowControlState {
} }
} }
pub fn with_next_update(next_window_update: WindowSize) -> FlowControlState {
FlowControlState {
window_size: 0,
underflow: 0,
next_window_update,
}
}
/// Reduce future capacity of the window. /// Reduce future capacity of the window.
/// ///
/// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. /// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE.
pub fn shrink_window(&mut self, decr: WindowSize) { pub fn shrink_window(&mut self, decr: WindowSize) {
self.underflow += decr; if decr < self.next_window_update {
self.next_window_update -= decr
} else {
self.underflow += decr - self.next_window_update;
self.next_window_update = 0;
}
} }
/// Claims the provided amount from the window, if there is enough space. /// Claims the provided amount from the window, if there is enough space.
@@ -51,7 +64,7 @@ impl FlowControlState {
Ok(()) Ok(())
} }
/// Applies a window increment immediately. /// Increase the _unadvertised_ window capacity.
pub fn grow_window(&mut self, sz: WindowSize) { pub fn grow_window(&mut self, sz: WindowSize) {
if sz <= self.underflow { if sz <= self.underflow {
self.underflow -= sz; self.underflow -= sz;
@@ -59,12 +72,11 @@ impl FlowControlState {
} }
let added = sz - self.underflow; let added = sz - self.underflow;
self.window_size += added;
self.next_window_update += added; self.next_window_update += added;
self.underflow = 0; self.underflow = 0;
} }
/// Obtains and clears an unadvertised window update. /// Obtains and applies an unadvertised window update.
pub fn take_window_update(&mut self) -> Option<WindowSize> { pub fn take_window_update(&mut self) -> Option<WindowSize> {
if self.next_window_update == 0 { if self.next_window_update == 0 {
return None; return None;
@@ -72,12 +84,93 @@ impl FlowControlState {
let incr = self.next_window_update; let incr = self.next_window_update;
self.next_window_update = 0; self.next_window_update = 0;
self.window_size += incr;
Some(incr) Some(incr)
} }
} }
#[test] #[test]
fn test() { fn test_with_initial_size() {
let mut fc = FlowControlState::new(65_535); let mut fc = FlowControlState::with_initial_size(10);
fc.grow_window(8);
assert_eq!(fc.window_size, 10);
assert_eq!(fc.next_window_update, 8);
assert_eq!(fc.take_window_update(), Some(8));
assert_eq!(fc.window_size, 18);
assert_eq!(fc.next_window_update, 0);
assert!(fc.claim_window(13).is_ok());
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 0);
assert!(fc.take_window_update().is_none());
}
#[test]
fn test_with_next_update() {
let mut fc = FlowControlState::with_next_update(10);
fc.grow_window(8);
assert_eq!(fc.window_size, 0);
assert_eq!(fc.next_window_update, 18);
assert_eq!(fc.take_window_update(), Some(18));
assert_eq!(fc.window_size, 18);
assert_eq!(fc.next_window_update, 0);
}
#[test]
fn test_grow_accumulates() {
let mut fc = FlowControlState::with_initial_size(5);
// Updates accumulate, though the window is not made immediately available. Trying to
// claim data not returned by take_window_update results in an underflow.
fc.grow_window(2);
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 2);
fc.grow_window(6);
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 8);
assert!(fc.claim_window(13).is_err());
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 8);
assert_eq!(fc.take_window_update(), Some(8));
assert_eq!(fc.window_size, 13);
assert_eq!(fc.next_window_update, 0);
assert!(fc.claim_window(13).is_ok());
assert_eq!(fc.window_size, 0);
assert_eq!(fc.next_window_update, 0);
}
#[test]
fn test_shrink() {
let mut fc = FlowControlState::with_initial_size(5);
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 0);
fc.grow_window(3);
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 3);
assert_eq!(fc.underflow, 0);
fc.shrink_window(8);
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 0);
assert_eq!(fc.underflow, 5);
assert!(fc.claim_window(5).is_ok());
assert_eq!(fc.window_size, 0);
assert_eq!(fc.next_window_update, 0);
assert_eq!(fc.underflow, 5);
fc.grow_window(8);
assert_eq!(fc.window_size, 0);
assert_eq!(fc.next_window_update, 3);
assert_eq!(fc.underflow, 0);
} }

View File

@@ -44,7 +44,7 @@ use self::state::StreamState;
/// ///
/// All transporters below Settings must apply relevant settings before passing a frame on /// All transporters below Settings must apply relevant settings before passing a frame on
/// to another level. For example, if the frame writer n /// to another level. For example, if the frame writer n
type Transport<T, P, B> = type Transport<T, P, B>=
Settings< Settings<
FlowControl< FlowControl<
StreamTracker< StreamTracker<
@@ -98,20 +98,23 @@ impl StreamMap {
} }
} }
/// Allows settings updates to be pushed "down" the transport (i.e. below Settings). /// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to
/// FramedWrite).
pub trait ApplySettings { pub trait ApplySettings {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>;
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>;
} }
/// Exposes settings to "upper" layers of the transport (i.e. above Settings). /// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and
/// above---Connection).
pub trait ControlSettings { pub trait ControlSettings {
fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>; fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>;
fn local_settings(&self) -> &SettingSet; fn local_settings(&self) -> &SettingSet;
fn remote_settings(&self) -> &SettingSet; fn remote_settings(&self) -> &SettingSet;
} }
/// Exposes stream states to "upper" layers of the transport (i.e. above StreamTracker). /// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up
/// to Connection).
pub trait ControlStreams { pub trait ControlStreams {
fn streams(&self)-> &StreamMap; fn streams(&self)-> &StreamMap;
fn streams_mut(&mut self) -> &mut StreamMap; fn streams_mut(&mut self) -> &mut StreamMap;

View File

@@ -78,7 +78,7 @@ impl StreamState {
if eos { if eos {
*self = HalfClosedRemote(local); *self = HalfClosedRemote(local);
} else { } else {
*self = Open { local, remote: Data(FlowControlState::new(initial_recv_window_size)) }; *self = Open { local, remote: Data(FlowControlState::with_initial_size(initial_recv_window_size)) };
} }
Ok(true) Ok(true)
} }
@@ -98,7 +98,7 @@ impl StreamState {
if eos { if eos {
*self = Closed; *self = Closed;
} else { } else {
*self = HalfClosedLocal(Data(FlowControlState::new(initial_recv_window_size))); *self = HalfClosedLocal(Data(FlowControlState::with_initial_size(initial_recv_window_size)));
}; };
Ok(false) Ok(false)
} }
@@ -155,7 +155,7 @@ impl StreamState {
HalfClosedLocal(Headers) HalfClosedLocal(Headers)
} else { } else {
Open { Open {
local: Data(FlowControlState::new(initial_window_size)), local: Data(FlowControlState::with_initial_size(initial_window_size)),
remote: Headers, remote: Headers,
} }
}; };
@@ -169,7 +169,8 @@ impl StreamState {
*self = if eos { *self = if eos {
HalfClosedLocal(remote) HalfClosedLocal(remote)
} else { } else {
let local = Data(FlowControlState::new(initial_window_size)); let fc = FlowControlState::with_initial_size(initial_window_size);
let local = Data(fc);
Open { local, remote } Open { local, remote }
}; };
@@ -182,7 +183,8 @@ impl StreamState {
*self = if eos { *self = if eos {
Closed Closed
} else { } else {
HalfClosedRemote(Data(FlowControlState::new(initial_window_size))) let fc = FlowControlState::with_initial_size(initial_window_size);
HalfClosedRemote(Data(fc))
}; };
Ok(false) Ok(false)