diff --git a/src/frame/data.rs b/src/frame/data.rs index 7bde76e..84deb62 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -65,12 +65,16 @@ impl Data { } impl Data { - pub fn from_buf(stream_id: StreamId, data: T) -> Self { + pub fn from_buf(stream_id: StreamId, data: T, eos: bool) -> Self { + let mut flags = DataFlag::default(); + if eos { + flags.set_end_stream(); + } Data { stream_id, data_len: data.remaining() as FrameSize, data, - flags: DataFlag::default(), + flags, pad_len: None, } } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 763eb62..c1373b7 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -148,29 +148,18 @@ impl Stream for Connection trace!("poll; frame={:?}", frame); let frame = match frame { - Some(Headers(v)) => { - // TODO: Update stream state - let stream_id = v.stream_id(); - let end_of_stream = v.is_end_stream(); + Some(Headers(v)) => Frame::Headers { + id: v.stream_id(), + end_of_stream: v.is_end_stream(), + headers: P::convert_poll_message(v), + }, - Frame::Headers { - id: stream_id, - headers: P::convert_poll_message(v), - end_of_stream: end_of_stream, - } - } - - Some(Data(v)) => { - let id = v.stream_id(); - let end_of_stream = v.is_end_stream(); - - Frame::Data { - id, - end_of_stream, - data_len: v.len(), - data: v.into_payload(), - } - } + Some(Data(v)) => Frame::Data { + id: v.stream_id(), + end_of_stream: v.is_end_stream(), + data_len: v.len(), + data: v.into_payload(), + }, Some(frame) => panic!("unexpected frame; frame={:?}", frame), None => return Ok(Async::Ready(None)), @@ -193,12 +182,11 @@ impl Sink for Connection fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - use frame::Frame::Headers; trace!("start_send"); - // Ensure that a pending window update is sent before doing anything further and - // ensure that the inner sink will actually receive a frame. - if self.poll_ready()? == Async::NotReady { + // Ensure the transport is ready to send a frame before we transform the external + // `Frame` into an internal `frame::Framme`. + if self.inner.poll_ready()? == Async::NotReady { return Ok(AsyncSink::NotReady(item)); } @@ -208,17 +196,13 @@ impl Sink for Connection // it's already been determined that the inner `Sink` can accept the item. // If the item is rejected, then there is a bug. let frame = P::convert_send_message(id, headers, end_of_stream); - let res = self.inner.start_send(Headers(frame))?; + let res = self.inner.start_send(frame::Frame::Headers(frame))?; assert!(res.is_ready()); Ok(AsyncSink::Ready) } Frame::Data { id, data, end_of_stream, .. } => { - let mut frame = frame::Data::from_buf(id, data.into_buf()); - if end_of_stream { - frame.set_end_stream(); - } - + let frame = frame::Data::from_buf(id, data.into_buf(), end_of_stream); let res = try!(self.inner.start_send(frame.into())); assert!(res.is_ready()); Ok(AsyncSink::Ready) diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 2a0f53f..79358dd 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -45,8 +45,8 @@ impl FlowControl inner, initial_local_window_size, initial_remote_window_size, - local_flow_controller: FlowControlState::new(initial_local_window_size), - remote_flow_controller: FlowControlState::new(initial_remote_window_size), + local_flow_controller: FlowControlState::with_initial_size(initial_local_window_size), + remote_flow_controller: FlowControlState::with_next_update(initial_remote_window_size), blocked_remote_window_update: None, sending_local_window_update: None, pending_local_window_updates: VecDeque::new(), @@ -54,6 +54,7 @@ impl FlowControl } } +// Flow control utitlities. impl FlowControl { fn claim_local_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { let res = if id.is_zero() { @@ -106,6 +107,7 @@ impl FlowControl { } } +/// Exposes a public upward API for flow control. impl ControlFlow for FlowControl { fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { if id.is_zero() { @@ -139,6 +141,7 @@ impl ControlFlow for FlowControl { } } +/// Proxies access to streams. impl ControlStreams for FlowControl { #[inline] fn streams(&self) -> &StreamMap { @@ -183,7 +186,7 @@ impl FlowControl /// Applies an update to an endpoint's initial window size. /// -/// Per RFC 7540 §6.9.2 +/// Per RFC 7540 §6.9.2: /// /// > In addition to changing the flow-control window for streams that are not yet /// > active, a SETTINGS frame can alter the initial flow-control window size for diff --git a/src/proto/flow_control_state.rs b/src/proto/flow_control_state.rs index 0d8c8a2..f0fbacf 100644 --- a/src/proto/flow_control_state.rs +++ b/src/proto/flow_control_state.rs @@ -18,12 +18,12 @@ pub struct FlowControlState { impl Default for FlowControlState { fn default() -> Self { - Self::new(DEFAULT_INITIAL_WINDOW_SIZE) + Self::with_initial_size(DEFAULT_INITIAL_WINDOW_SIZE) } } impl FlowControlState { - pub fn new(window_size: WindowSize) -> FlowControlState { + pub fn with_initial_size(window_size: WindowSize) -> FlowControlState { FlowControlState { window_size, underflow: 0, @@ -31,11 +31,24 @@ impl FlowControlState { } } + pub fn with_next_update(next_window_update: WindowSize) -> FlowControlState { + FlowControlState { + window_size: 0, + underflow: 0, + next_window_update, + } + } + /// Reduce future capacity of the window. /// /// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. pub fn shrink_window(&mut self, decr: WindowSize) { - self.underflow += decr; + if decr < self.next_window_update { + self.next_window_update -= decr + } else { + self.underflow += decr - self.next_window_update; + self.next_window_update = 0; + } } /// Claims the provided amount from the window, if there is enough space. @@ -51,7 +64,7 @@ impl FlowControlState { Ok(()) } - /// Applies a window increment immediately. + /// Increase the _unadvertised_ window capacity. pub fn grow_window(&mut self, sz: WindowSize) { if sz <= self.underflow { self.underflow -= sz; @@ -59,12 +72,11 @@ impl FlowControlState { } let added = sz - self.underflow; - self.window_size += added; self.next_window_update += added; self.underflow = 0; } - /// Obtains and clears an unadvertised window update. + /// Obtains and applies an unadvertised window update. pub fn take_window_update(&mut self) -> Option { if self.next_window_update == 0 { return None; @@ -72,12 +84,93 @@ impl FlowControlState { let incr = self.next_window_update; self.next_window_update = 0; + self.window_size += incr; Some(incr) } } #[test] -fn test() { - let mut fc = FlowControlState::new(65_535); +fn test_with_initial_size() { + let mut fc = FlowControlState::with_initial_size(10); + fc.grow_window(8); + assert_eq!(fc.window_size, 10); + assert_eq!(fc.next_window_update, 8); + + assert_eq!(fc.take_window_update(), Some(8)); + assert_eq!(fc.window_size, 18); + assert_eq!(fc.next_window_update, 0); + + assert!(fc.claim_window(13).is_ok()); + assert_eq!(fc.window_size, 5); + assert_eq!(fc.next_window_update, 0); + assert!(fc.take_window_update().is_none()); +} + +#[test] +fn test_with_next_update() { + let mut fc = FlowControlState::with_next_update(10); + + fc.grow_window(8); + assert_eq!(fc.window_size, 0); + assert_eq!(fc.next_window_update, 18); + + assert_eq!(fc.take_window_update(), Some(18)); + assert_eq!(fc.window_size, 18); + assert_eq!(fc.next_window_update, 0); +} + +#[test] +fn test_grow_accumulates() { + let mut fc = FlowControlState::with_initial_size(5); + + // Updates accumulate, though the window is not made immediately available. Trying to + // claim data not returned by take_window_update results in an underflow. + + fc.grow_window(2); + assert_eq!(fc.window_size, 5); + assert_eq!(fc.next_window_update, 2); + + fc.grow_window(6); + assert_eq!(fc.window_size, 5); + assert_eq!(fc.next_window_update, 8); + + assert!(fc.claim_window(13).is_err()); + assert_eq!(fc.window_size, 5); + assert_eq!(fc.next_window_update, 8); + + assert_eq!(fc.take_window_update(), Some(8)); + assert_eq!(fc.window_size, 13); + assert_eq!(fc.next_window_update, 0); + + assert!(fc.claim_window(13).is_ok()); + assert_eq!(fc.window_size, 0); + assert_eq!(fc.next_window_update, 0); +} + +#[test] +fn test_shrink() { + let mut fc = FlowControlState::with_initial_size(5); + assert_eq!(fc.window_size, 5); + assert_eq!(fc.next_window_update, 0); + + fc.grow_window(3); + assert_eq!(fc.window_size, 5); + assert_eq!(fc.next_window_update, 3); + assert_eq!(fc.underflow, 0); + + fc.shrink_window(8); + assert_eq!(fc.window_size, 5); + assert_eq!(fc.next_window_update, 0); + assert_eq!(fc.underflow, 5); + + assert!(fc.claim_window(5).is_ok()); + assert_eq!(fc.window_size, 0); + assert_eq!(fc.next_window_update, 0); + assert_eq!(fc.underflow, 5); + + fc.grow_window(8); + assert_eq!(fc.window_size, 0); + assert_eq!(fc.next_window_update, 3); + assert_eq!(fc.underflow, 0); } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 8785901..6a8c2fd 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -44,7 +44,7 @@ use self::state::StreamState; /// /// All transporters below Settings must apply relevant settings before passing a frame on /// to another level. For example, if the frame writer n -type Transport = +type Transport= Settings< FlowControl< StreamTracker< @@ -98,20 +98,23 @@ impl StreamMap { } } -/// Allows settings updates to be pushed "down" the transport (i.e. below Settings). +/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to +/// FramedWrite). pub trait ApplySettings { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; } -/// Exposes settings to "upper" layers of the transport (i.e. above Settings). +/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and +/// above---Connection). pub trait ControlSettings { fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>; fn local_settings(&self) -> &SettingSet; fn remote_settings(&self) -> &SettingSet; } -/// Exposes stream states to "upper" layers of the transport (i.e. above StreamTracker). +/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up +/// to Connection). pub trait ControlStreams { fn streams(&self)-> &StreamMap; fn streams_mut(&mut self) -> &mut StreamMap; diff --git a/src/proto/state.rs b/src/proto/state.rs index 472cca6..d38cd41 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -78,7 +78,7 @@ impl StreamState { if eos { *self = HalfClosedRemote(local); } else { - *self = Open { local, remote: Data(FlowControlState::new(initial_recv_window_size)) }; + *self = Open { local, remote: Data(FlowControlState::with_initial_size(initial_recv_window_size)) }; } Ok(true) } @@ -98,7 +98,7 @@ impl StreamState { if eos { *self = Closed; } else { - *self = HalfClosedLocal(Data(FlowControlState::new(initial_recv_window_size))); + *self = HalfClosedLocal(Data(FlowControlState::with_initial_size(initial_recv_window_size))); }; Ok(false) } @@ -155,7 +155,7 @@ impl StreamState { HalfClosedLocal(Headers) } else { Open { - local: Data(FlowControlState::new(initial_window_size)), + local: Data(FlowControlState::with_initial_size(initial_window_size)), remote: Headers, } }; @@ -169,7 +169,8 @@ impl StreamState { *self = if eos { HalfClosedLocal(remote) } else { - let local = Data(FlowControlState::new(initial_window_size)); + let fc = FlowControlState::with_initial_size(initial_window_size); + let local = Data(fc); Open { local, remote } }; @@ -182,7 +183,8 @@ impl StreamState { *self = if eos { Closed } else { - HalfClosedRemote(Data(FlowControlState::new(initial_window_size))) + let fc = FlowControlState::with_initial_size(initial_window_size); + HalfClosedRemote(Data(fc)) }; Ok(false)