diff --git a/src/proto/connection.rs b/src/proto/connection.rs index c1373b7..3aa6564 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,7 +1,7 @@ use {ConnectionError, Frame, FrameSize}; use client::Client; use frame::{self, SettingSet, StreamId}; -use proto::{self, ControlSettings, Peer, ReadySink, ControlFlow, WindowSize}; +use proto::{self, ControlFlow, ControlPing, ControlSettings, Peer, PingPayload, ReadySink, WindowSize}; use server::Server; use tokio_io::{AsyncRead, AsyncWrite}; @@ -58,8 +58,23 @@ impl ControlFlow for Connection self.inner.poll_remote_window_update(id) } - fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.grow_local_window(id, incr) + fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.expand_local_window(id, incr) + } +} + +impl ControlPing for Connection + where T: AsyncRead + AsyncWrite, + T: ControlPing, + P: Peer, + B: IntoBuf, +{ + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn pop_pong(&mut self) -> Option { + self.inner.pop_pong() } } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 79358dd..4ea6bf8 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -13,14 +13,15 @@ pub struct FlowControl { /// Tracks the connection-level flow control window for receiving data from the /// remote. - local_flow_controller: FlowControlState, + connection_local_flow_controller: FlowControlState, /// Tracks the onnection-level flow control window for receiving data from the remote. - remote_flow_controller: FlowControlState, + connection_remote_flow_controller: FlowControlState, /// Holds the list of streams on which local window updates may be sent. // XXX It would be cool if this didn't exist. pending_local_window_updates: VecDeque, + pending_local_connection_window_update: bool, /// If a window update can't be sent immediately, it may need to be saved to be sent later. sending_local_window_update: Option, @@ -45,98 +46,31 @@ impl FlowControl inner, initial_local_window_size, initial_remote_window_size, - local_flow_controller: FlowControlState::with_initial_size(initial_local_window_size), - remote_flow_controller: FlowControlState::with_next_update(initial_remote_window_size), + connection_local_flow_controller: FlowControlState::with_initial_size(initial_local_window_size), + connection_remote_flow_controller: FlowControlState::with_next_update(initial_remote_window_size), blocked_remote_window_update: None, sending_local_window_update: None, pending_local_window_updates: VecDeque::new(), + pending_local_connection_window_update: false, } } } // Flow control utitlities. impl FlowControl { - fn claim_local_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { - let res = if id.is_zero() { - self.local_flow_controller.claim_window(len) - } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { - stream.claim_local_window(len) - } else { - // Ignore updates for non-existent streams. - Ok(()) - }; - - res.map_err(|_| error::Reason::FlowControlError.into()) - } - - fn claim_remote_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { - let res = if id.is_zero() { - self.local_flow_controller.claim_window(len) - } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { - stream.claim_remote_window(len) - } else { - // Ignore updates for non-existent streams. - Ok(()) - }; - - res.map_err(|_| error::Reason::FlowControlError.into()) - } - - /// Handles a window update received from the remote, indicating that the local may - /// send `incr` additional bytes. - /// - /// Connection window updates (id=0) and stream window updates are advertised - /// distinctly. - fn grow_remote_window(&mut self, id: StreamId, incr: WindowSize) { - if incr == 0 { - return; - } - + fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { if id.is_zero() { - self.remote_flow_controller.grow_window(incr); - } else if let Some(mut s) = self.inner.streams_mut().get_mut(&id) { - s.grow_remote_window(incr); + Some(&mut self.connection_local_flow_controller) } else { - // Ignore updates for non-existent streams. - return; - }; - - if let Some(task) = self.blocked_remote_window_update.take() { - task.notify(); + self.inner.streams_mut().get_mut(&id).and_then(|s| s.local_flow_controller()) } } -} -/// Exposes a public upward API for flow control. -impl ControlFlow for FlowControl { - fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { + fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { if id.is_zero() { - if let Some(sz) = self.remote_flow_controller.take_window_update() { - return Ok(Async::Ready(sz)); - } - } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { - if let Some(sz) = stream.take_remote_window_update() { - return Ok(Async::Ready(sz)); - } + Some(&mut self.connection_remote_flow_controller) } else { - return Err(error::User::InvalidStreamId.into()); - } - - self.blocked_remote_window_update = Some(task::current()); - return Ok(Async::NotReady); - } - - fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - if id.is_zero() { - self.local_flow_controller.grow_window(incr); - self.pending_local_window_updates.push_back(id); - Ok(()) - } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { - stream.grow_local_window(incr); - self.pending_local_window_updates.push_back(id); - Ok(()) - } else { - Err(error::User::InvalidStreamId.into()) + self.inner.streams_mut().get_mut(&id).and_then(|s| s.remote_flow_controller()) } } } @@ -154,6 +88,47 @@ impl ControlStreams for FlowControl { } } +/// Exposes a public upward API for flow control. +impl ControlFlow for FlowControl { + fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { + if let Some(mut flow) = self.remote_flow_controller(id) { + if let Some(sz) = flow.apply_window_update() { + return Ok(Async::Ready(sz)); + } + } else { + return Err(error::User::InvalidStreamId.into()); + } + + self.blocked_remote_window_update = Some(task::current()); + return Ok(Async::NotReady); + } + + fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + if let Some(mut fc) = self.local_flow_controller(id) { + fc.expand_window(incr); + } else { + return Err(error::User::InvalidStreamId.into()); + } + + if id.is_zero() { + self.pending_local_connection_window_update = true; + } else { + self.pending_local_window_updates.push_back(id); + } + Ok(()) + } +} + +impl ControlPing for FlowControl { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn pop_pong(&mut self) -> Option { + self.inner.pop_pong() + } +} + impl FlowControl where T: Sink, SinkError = ConnectionError>, T: ControlStreams, @@ -161,27 +136,33 @@ impl FlowControl /// Returns ready when there are no pending window updates to send. fn poll_send_local_window_updates(&mut self) -> Poll<(), ConnectionError> { if let Some(f) = self.sending_local_window_update.take() { - if self.inner.start_send(f.into())?.is_not_ready() { - self.sending_local_window_update = Some(f); - return Ok(Async::NotReady); + try_ready!(self.try_send(f)); + } + + if self.pending_local_connection_window_update { + if let Some(incr) = self.connection_local_flow_controller.apply_window_update() { + try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr))); } } while let Some(id) = self.pending_local_window_updates.pop_front() { - let update = self.inner.streams_mut().get_mut(&id) - .and_then(|mut s| s.take_local_window_update()) - .map(|incr| frame::WindowUpdate::new(id, incr)); - - if let Some(f) = update { - if self.inner.start_send(f.into())?.is_not_ready() { - self.sending_local_window_update = Some(f); - return Ok(Async::NotReady); - } + let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update()); + if let Some(incr) = update { + try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); } } Ok(Async::Ready(())) } + + fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> { + if self.inner.start_send(f.into())?.is_not_ready() { + self.sending_local_window_update = Some(f); + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + } } /// Applies an update to an endpoint's initial window size. @@ -219,7 +200,7 @@ impl ApplySettings for FlowControl streams.shrink_all_local_windows(decr); } else { let incr = new_window_size - old_window_size; - streams.grow_all_local_windows(incr); + streams.expand_all_local_windows(incr); } self.initial_local_window_size = new_window_size; @@ -241,7 +222,7 @@ impl ApplySettings for FlowControl streams.shrink_all_remote_windows(decr); } else { let incr = new_window_size - old_window_size; - streams.grow_all_remote_windows(incr); + streams.expand_all_remote_windows(incr); } self.initial_remote_window_size = new_window_size; @@ -263,11 +244,20 @@ impl Stream for FlowControl loop { match try_ready!(self.inner.poll()) { Some(WindowUpdate(v)) => { - self.grow_remote_window(v.stream_id(), v.size_increment()); + if let Some(fc) = self.remote_flow_controller(v.stream_id()) { + fc.expand_window(v.size_increment()); + } } Some(Data(v)) => { - self.claim_local_window(&v.stream_id(), v.len())?; + if self.connection_local_flow_controller.claim_window(v.len()).is_err() { + return Err(error::Reason::FlowControlError.into()) + } + if let Some(fc) = self.local_flow_controller(v.stream_id()) { + if fc.claim_window(v.len()).is_err() { + return Err(error::Reason::FlowControlError.into()) + } + } return Ok(Async::Ready(Some(Data(v)))); } @@ -277,7 +267,6 @@ impl Stream for FlowControl } } - impl Sink for FlowControl where T: Sink, SinkError = ConnectionError>, T: ReadySink, @@ -300,7 +289,14 @@ impl Sink for FlowControl return Ok(AsyncSink::NotReady(Data(v))); } - self.claim_remote_window(&v.stream_id(), v.len())?; + if self.connection_remote_flow_controller.claim_window(v.len()).is_err() { + return Err(error::User::FlowControlViolation.into()); + } + if let Some(fc) = self.remote_flow_controller(v.stream_id()) { + if fc.claim_window(v.len()).is_err() { + return Err(error::User::FlowControlViolation.into()) + } + } let res = self.inner.start_send(Data(v))?; assert!(res.is_ready()); diff --git a/src/proto/flow_control_state.rs b/src/proto/flow_control_state.rs index f0fbacf..2c3b6e5 100644 --- a/src/proto/flow_control_state.rs +++ b/src/proto/flow_control_state.rs @@ -53,7 +53,7 @@ impl FlowControlState { /// Claims the provided amount from the window, if there is enough space. /// - /// Fails when `take_window_update()` hasn't returned at least `sz` more bytes than + /// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than /// have been previously claimed. pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> { if self.window_size < sz { @@ -65,7 +65,7 @@ impl FlowControlState { } /// Increase the _unadvertised_ window capacity. - pub fn grow_window(&mut self, sz: WindowSize) { + pub fn expand_window(&mut self, sz: WindowSize) { if sz <= self.underflow { self.underflow -= sz; return; @@ -77,7 +77,7 @@ impl FlowControlState { } /// Obtains and applies an unadvertised window update. - pub fn take_window_update(&mut self) -> Option { + pub fn apply_window_update(&mut self) -> Option { if self.next_window_update == 0 { return None; } @@ -93,29 +93,29 @@ impl FlowControlState { fn test_with_initial_size() { let mut fc = FlowControlState::with_initial_size(10); - fc.grow_window(8); + fc.expand_window(8); assert_eq!(fc.window_size, 10); assert_eq!(fc.next_window_update, 8); - assert_eq!(fc.take_window_update(), Some(8)); + assert_eq!(fc.apply_window_update(), Some(8)); assert_eq!(fc.window_size, 18); assert_eq!(fc.next_window_update, 0); assert!(fc.claim_window(13).is_ok()); assert_eq!(fc.window_size, 5); assert_eq!(fc.next_window_update, 0); - assert!(fc.take_window_update().is_none()); + assert!(fc.apply_window_update().is_none()); } #[test] fn test_with_next_update() { let mut fc = FlowControlState::with_next_update(10); - fc.grow_window(8); + fc.expand_window(8); assert_eq!(fc.window_size, 0); assert_eq!(fc.next_window_update, 18); - assert_eq!(fc.take_window_update(), Some(18)); + assert_eq!(fc.apply_window_update(), Some(18)); assert_eq!(fc.window_size, 18); assert_eq!(fc.next_window_update, 0); } @@ -125,13 +125,13 @@ fn test_grow_accumulates() { let mut fc = FlowControlState::with_initial_size(5); // Updates accumulate, though the window is not made immediately available. Trying to - // claim data not returned by take_window_update results in an underflow. + // claim data not returned by apply_window_update results in an underflow. - fc.grow_window(2); + fc.expand_window(2); assert_eq!(fc.window_size, 5); assert_eq!(fc.next_window_update, 2); - fc.grow_window(6); + fc.expand_window(6); assert_eq!(fc.window_size, 5); assert_eq!(fc.next_window_update, 8); @@ -139,7 +139,7 @@ fn test_grow_accumulates() { assert_eq!(fc.window_size, 5); assert_eq!(fc.next_window_update, 8); - assert_eq!(fc.take_window_update(), Some(8)); + assert_eq!(fc.apply_window_update(), Some(8)); assert_eq!(fc.window_size, 13); assert_eq!(fc.next_window_update, 0); @@ -154,7 +154,7 @@ fn test_shrink() { assert_eq!(fc.window_size, 5); assert_eq!(fc.next_window_update, 0); - fc.grow_window(3); + fc.expand_window(3); assert_eq!(fc.window_size, 5); assert_eq!(fc.next_window_update, 3); assert_eq!(fc.underflow, 0); @@ -169,7 +169,7 @@ fn test_shrink() { assert_eq!(fc.next_window_update, 0); assert_eq!(fc.underflow, 5); - fc.grow_window(8); + fc.expand_window(8); assert_eq!(fc.window_size, 0); assert_eq!(fc.next_window_update, 3); assert_eq!(fc.underflow, 0); diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 6a8c2fd..32b4b73 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -30,20 +30,56 @@ pub use self::settings::Settings; pub use self::stream_tracker::StreamTracker; use self::state::StreamState; -/// Represents the internals of an HTTP2 connection. +/// Represents the internals of an HTTP/2 connection. /// /// A transport consists of several layers (_transporters_) and is arranged from _top_ /// (near the application) to _bottom_ (near the network). Each transporter implements a /// Stream of frames received from the remote, and a ReadySink of frames sent to the /// remote. /// -/// At the top of the transport, the Settings module is responsible for: -/// - Transmitting local settings to the remote. -/// - Sending settings acknowledgements for all settings frames received from the remote. -/// - Exposing settings upward to the Connection. +/// ## Transport Layers +/// +/// ### `Settings` +/// +/// - Receives remote settings frames and applies the settings downward through the +/// transport (via the ApplySettings trait) before responding with acknowledgements. +/// - Exposes ControlSettings up towards the application and transmits local settings to +/// the remote. +/// +/// ### `FlowControl` +/// +/// - Tracks received data frames against the local stream and connection flow control +/// windows. +/// - Tracks sent data frames against the remote stream and connection flow control +/// windows. +/// - Tracks remote settings updates to SETTINGS_INITIAL_WINDOW_SIZE. +/// - Exposes `ControlFlow` upwards. +/// - Tracks received window updates against the remote stream and connection flow +/// control windows so that upper layers may poll for updates. +/// - Sends window updates for the local stream and connection flow control windows as +/// instructed by upper layers. +/// +/// ### `StreamTracker` +/// +/// - Tracks the states of each stream. +/// - **TODO** Enforces maximum concurrency. +/// - Exposes `ControlStreams` so that upper layers may share stream state. +/// +/// ### `PingPong` +/// +/// - Acknowleges PINGs from the remote. +/// - Exposes ControlPing that allows the application side to send ping requests to the +/// remote. Acknowledgements from the remoe are queued to be consumed by the +/// application. +/// +/// ### FramedRead +/// +/// - Decodes frames from bytes. +/// +/// ### FramedWrite +/// +/// - Encodes frames to bytes. /// -/// All transporters below Settings must apply relevant settings before passing a frame on -/// to another level. For example, if the frame writer n type Transport= Settings< FlowControl< @@ -75,25 +111,33 @@ impl StreamMap { fn shrink_all_local_windows(&mut self, decr: u32) { for (_, mut s) in &mut self.inner { - s.shrink_local_window(decr) + if let Some(fc) = s.local_flow_controller() { + fc.shrink_window(decr); + } } } - fn grow_all_local_windows(&mut self, incr: u32) { + fn expand_all_local_windows(&mut self, incr: u32) { for (_, mut s) in &mut self.inner { - s.grow_local_window(incr) + if let Some(fc) = s.local_flow_controller() { + fc.expand_window(incr); + } } } - + fn shrink_all_remote_windows(&mut self, decr: u32) { for (_, mut s) in &mut self.inner { - s.shrink_remote_window(decr) + if let Some(fc) = s.remote_flow_controller() { + fc.shrink_window(decr); + } } } - fn grow_all_remote_windows(&mut self, incr: u32) { + fn expand_all_remote_windows(&mut self, incr: u32) { for (_, mut s) in &mut self.inner { - s.grow_remote_window(incr) + if let Some(fc) = s.remote_flow_controller() { + fc.expand_window(incr); + } } } } @@ -120,6 +164,13 @@ pub trait ControlStreams { fn streams_mut(&mut self) -> &mut StreamMap; } +pub type PingPayload = [u8; 8]; + +pub trait ControlPing { + fn start_ping(&mut self, body: PingPayload) -> StartSend; + fn pop_pong(&mut self) -> Option; +} + /// Exposes flow control states to "upper" layers of the transport (i.e. above /// FlowControl). pub trait ControlFlow { @@ -131,7 +182,7 @@ pub trait ControlFlow { /// Attempts to increase the receive capacity of a stream. /// /// Errors if the given stream is not active. - fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; + fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; } /// Create a full H2 transport from an I/O handle. diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index 7405f7c..a56e001 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,13 +1,16 @@ use ConnectionError; use frame::{Frame, Ping, SettingSet}; +use proto::{ApplySettings, ControlPing, PingPayload, ReadySink}; + use futures::*; -use proto::{ApplySettings, ReadySink}; +use std::collections::VecDeque; /// Acknowledges ping requests from the remote. #[derive(Debug)] pub struct PingPong { inner: T, - pong: Option>, + sending_pong: Option>, + received_pongs: VecDeque, } impl PingPong @@ -17,7 +20,8 @@ impl PingPong pub fn new(inner: T) -> Self { PingPong { inner, - pong: None, + sending_pong: None, + received_pongs: VecDeque::new(), } } } @@ -32,18 +36,37 @@ impl ApplySettings for PingPong { } } +impl ControlPing for PingPong + where T: Sink, SinkError = ConnectionError>, + T: ReadySink, +{ + fn start_ping(&mut self, body: PingPayload) -> StartSend { + if self.inner.poll_ready()?.is_not_ready() { + return Ok(AsyncSink::NotReady(body)); + } + + match self.inner.start_send(Ping::ping(body).into())? { + AsyncSink::NotReady(_) => unreachable!(), + AsyncSink::Ready => Ok(AsyncSink::Ready), + } + } + + fn pop_pong(&mut self) -> Option { + self.received_pongs.pop_front() + } +} + impl PingPong where T: Sink, SinkError = ConnectionError>, { fn try_send_pong(&mut self) -> Poll<(), ConnectionError> { - if let Some(pong) = self.pong.take() { + if let Some(pong) = self.sending_pong.take() { if let AsyncSink::NotReady(pong) = self.inner.start_send(pong)? { // If the pong can't be sent, save it. - self.pong = Some(pong); + self.sending_pong = Some(pong); return Ok(Async::NotReady); } } - Ok(Async::Ready(())) } } @@ -78,7 +101,7 @@ impl Stream for PingPong // Save a pong to be sent when there is nothing more to be returned // from the stream or when frames are sent to the sink. let pong = Ping::pong(ping.into_payload()); - self.pong = Some(pong.into()); + self.sending_pong = Some(pong.into()); } // Everything other than ping gets passed through. diff --git a/src/proto/settings.rs b/src/proto/settings.rs index dfb37b9..a2383c2 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -110,8 +110,18 @@ impl ControlFlow for Settings { self.inner.poll_remote_window_update(id) } - fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.grow_local_window(id, incr) + fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.expand_local_window(id, incr) + } +} + +impl ControlPing for Settings { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn pop_pong(&mut self) -> Option { + self.inner.pop_pong() } } diff --git a/src/proto/state.rs b/src/proto/state.rs index d38cd41..7d75f0f 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -2,7 +2,7 @@ use Peer; use error::ConnectionError; use error::Reason::*; use error::User::*; -use proto::{FlowControlState, WindowSize, WindowUnderflow}; +use proto::{FlowControlState, WindowSize}; /// Represents the state of an H2 stream /// @@ -221,130 +221,25 @@ impl StreamState { } } } - - /// Updates the local flow controller so that the remote may send `incr` more bytes. - /// - /// Returns the amount of capacity created, accounting for window size changes. The - /// caller should send the the returned window size increment to the remote. - /// - /// If the remote is closed, None is returned. - pub fn grow_remote_window(&mut self, incr: WindowSize) { - use self::StreamState::*; - use self::PeerState::*; - - if incr == 0 { - return; - } - - match self { - &mut Open { remote: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => fc.grow_window(incr), - _ => {}, - } - } - pub fn claim_remote_window(&mut self, decr: WindowSize) -> Result<(), WindowUnderflow> { + pub fn local_flow_controller(&mut self) -> Option<&mut FlowControlState> { use self::StreamState::*; use self::PeerState::*; - if decr == 0 { - return Ok(()); - } - - match self { - &mut Open { remote: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => fc.claim_window(decr), - _ => Ok(()), - } - } - - pub fn shrink_remote_window(&mut self, decr: WindowSize) { - use self::StreamState::*; - use self::PeerState::*; - - if decr == 0 { - return; - } - match self { &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => fc.shrink_window(decr), - _ => {}, - } - } - - /// Consumes newly-advertised capacity to inform the local endpoint it may send more - /// data. - pub fn take_remote_window_update(&mut self) -> Option { - use self::StreamState::*; - use self::PeerState::*; - - match self { - &mut Open { remote: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => fc.take_window_update(), + &mut HalfClosedRemote(Data(ref mut fc)) => Some(fc), _ => None, } } - /// Updates the remote flow controller so that the remote may receive `incr` - /// additional bytes. - /// - /// Returns the amount of capacity created, accounting for window size changes. The - /// caller should send the the returned window size increment to the remote. - pub fn grow_local_window(&mut self, incr: WindowSize) { - use self::StreamState::*; - use self::PeerState::*; - - if incr == 0 { - return; - } - - match self { - &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedRemote(Data(ref mut fc)) => fc.grow_window(incr), - _ => {}, - } - } - - pub fn claim_local_window(&mut self, decr: WindowSize) -> Result<(), WindowUnderflow> { - use self::StreamState::*; - use self::PeerState::*; - - if decr == 0 { - return Ok(()); - } - - match self { - &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedRemote(Data(ref mut fc)) => fc.claim_window(decr), - _ => Ok(()), - } - } - - pub fn shrink_local_window(&mut self, decr: WindowSize) { - use self::StreamState::*; - use self::PeerState::*; - - if decr == 0 { - return; - } - - match self { - &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedRemote(Data(ref mut fc)) => fc.shrink_window(decr), - _ => {}, - } - } - - /// Consumes newly-advertised capacity to inform the local endpoint it may send more - /// data. - pub fn take_local_window_update(&mut self) -> Option { + pub fn remote_flow_controller(&mut self) -> Option<&mut FlowControlState> { use self::StreamState::*; use self::PeerState::*; match self { - &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedRemote(Data(ref mut fc)) => fc.take_window_update(), + &mut Open { remote: Data(ref mut fc), .. } | + &mut HalfClosedLocal(Data(ref mut fc)) => Some(fc), _ => None, } } diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index 5566a37..85d05ec 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -87,6 +87,18 @@ impl ApplySettings for StreamTracker } } +impl ControlPing for StreamTracker + where T: ControlPing +{ + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn pop_pong(&mut self) -> Option { + self.inner.pop_pong() + } +} + impl Stream for StreamTracker where T: Stream, P: Peer,