expose a ControlPing api
document transports properly handle connection flow control tracking.
This commit is contained in:
		| @@ -1,7 +1,7 @@ | ||||
| use {ConnectionError, Frame, FrameSize}; | ||||
| use client::Client; | ||||
| use frame::{self, SettingSet, StreamId}; | ||||
| use proto::{self, ControlSettings, Peer, ReadySink, ControlFlow, WindowSize}; | ||||
| use proto::{self, ControlFlow, ControlPing, ControlSettings, Peer, PingPayload, ReadySink, WindowSize}; | ||||
| use server::Server; | ||||
|  | ||||
| use tokio_io::{AsyncRead, AsyncWrite}; | ||||
| @@ -58,8 +58,23 @@ impl<T, P, B> ControlFlow for Connection<T, P, B> | ||||
|         self.inner.poll_remote_window_update(id) | ||||
|     } | ||||
|  | ||||
|     fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         self.inner.grow_local_window(id, incr) | ||||
|     fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         self.inner.expand_local_window(id, incr) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, P, B> ControlPing for Connection<T, P, B> | ||||
|     where T: AsyncRead + AsyncWrite, | ||||
|           T: ControlPing, | ||||
|           P: Peer, | ||||
|           B: IntoBuf, | ||||
| { | ||||
|     fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> { | ||||
|         self.inner.start_ping(body) | ||||
|     } | ||||
|  | ||||
|     fn pop_pong(&mut self) -> Option<PingPayload> { | ||||
|         self.inner.pop_pong() | ||||
|     } | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -13,14 +13,15 @@ pub struct FlowControl<T>  { | ||||
|  | ||||
|     /// Tracks the connection-level flow control window for receiving data from the | ||||
|     /// remote. | ||||
|     local_flow_controller: FlowControlState, | ||||
|     connection_local_flow_controller: FlowControlState, | ||||
|  | ||||
|     /// Tracks the onnection-level flow control window for receiving data from the remote. | ||||
|     remote_flow_controller: FlowControlState, | ||||
|     connection_remote_flow_controller: FlowControlState, | ||||
|  | ||||
|     /// Holds the list of streams on which local window updates may be sent. | ||||
|     // XXX It would be cool if this didn't exist. | ||||
|     pending_local_window_updates: VecDeque<StreamId>, | ||||
|     pending_local_connection_window_update: bool, | ||||
|  | ||||
|     /// If a window update can't be sent immediately, it may need to be saved to be sent later. | ||||
|     sending_local_window_update: Option<frame::WindowUpdate>, | ||||
| @@ -45,98 +46,31 @@ impl<T, U> FlowControl<T> | ||||
|             inner, | ||||
|             initial_local_window_size, | ||||
|             initial_remote_window_size, | ||||
|             local_flow_controller: FlowControlState::with_initial_size(initial_local_window_size), | ||||
|             remote_flow_controller: FlowControlState::with_next_update(initial_remote_window_size), | ||||
|             connection_local_flow_controller: FlowControlState::with_initial_size(initial_local_window_size), | ||||
|             connection_remote_flow_controller: FlowControlState::with_next_update(initial_remote_window_size), | ||||
|             blocked_remote_window_update: None, | ||||
|             sending_local_window_update: None, | ||||
|             pending_local_window_updates: VecDeque::new(), | ||||
|             pending_local_connection_window_update: false, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Flow control utitlities. | ||||
| impl<T: ControlStreams> FlowControl<T> { | ||||
|     fn claim_local_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { | ||||
|         let res = if id.is_zero() { | ||||
|             self.local_flow_controller.claim_window(len) | ||||
|         } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { | ||||
|             stream.claim_local_window(len) | ||||
|         } else { | ||||
|             // Ignore updates for non-existent streams. | ||||
|             Ok(()) | ||||
|         }; | ||||
|  | ||||
|         res.map_err(|_| error::Reason::FlowControlError.into()) | ||||
|     } | ||||
|  | ||||
|     fn claim_remote_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { | ||||
|         let res = if id.is_zero() { | ||||
|             self.local_flow_controller.claim_window(len) | ||||
|         } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { | ||||
|             stream.claim_remote_window(len) | ||||
|         } else { | ||||
|             // Ignore updates for non-existent streams. | ||||
|             Ok(()) | ||||
|         }; | ||||
|  | ||||
|         res.map_err(|_| error::Reason::FlowControlError.into()) | ||||
|     } | ||||
|  | ||||
|     /// Handles a window update received from the remote, indicating that the local may | ||||
|     /// send `incr` additional bytes. | ||||
|     /// | ||||
|     /// Connection window updates (id=0) and stream window updates are advertised | ||||
|     /// distinctly. | ||||
|     fn grow_remote_window(&mut self, id: StreamId, incr: WindowSize) { | ||||
|         if incr == 0 { | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|     fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { | ||||
|         if id.is_zero() { | ||||
|             self.remote_flow_controller.grow_window(incr); | ||||
|         } else if let Some(mut s) = self.inner.streams_mut().get_mut(&id) { | ||||
|             s.grow_remote_window(incr); | ||||
|             Some(&mut self.connection_local_flow_controller) | ||||
|         } else { | ||||
|             // Ignore updates for non-existent streams. | ||||
|             return; | ||||
|         }; | ||||
|  | ||||
|         if let Some(task) = self.blocked_remote_window_update.take() { | ||||
|             task.notify(); | ||||
|             self.inner.streams_mut().get_mut(&id).and_then(|s| s.local_flow_controller()) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Exposes a public upward API for flow control. | ||||
| impl<T: ControlStreams> ControlFlow for FlowControl<T> { | ||||
|     fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> { | ||||
|    fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { | ||||
|         if id.is_zero() { | ||||
|             if let Some(sz) = self.remote_flow_controller.take_window_update() { | ||||
|                 return Ok(Async::Ready(sz)); | ||||
|             } | ||||
|         } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { | ||||
|             if let Some(sz) = stream.take_remote_window_update() { | ||||
|                 return Ok(Async::Ready(sz)); | ||||
|             } | ||||
|             Some(&mut self.connection_remote_flow_controller) | ||||
|         } else { | ||||
|             return Err(error::User::InvalidStreamId.into()); | ||||
|         } | ||||
|  | ||||
|         self.blocked_remote_window_update = Some(task::current()); | ||||
|         return Ok(Async::NotReady); | ||||
|     } | ||||
|  | ||||
|     fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if id.is_zero() { | ||||
|             self.local_flow_controller.grow_window(incr); | ||||
|             self.pending_local_window_updates.push_back(id); | ||||
|             Ok(()) | ||||
|         } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { | ||||
|             stream.grow_local_window(incr); | ||||
|             self.pending_local_window_updates.push_back(id); | ||||
|             Ok(()) | ||||
|         } else { | ||||
|             Err(error::User::InvalidStreamId.into()) | ||||
|             self.inner.streams_mut().get_mut(&id).and_then(|s| s.remote_flow_controller()) | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -154,6 +88,47 @@ impl<T: ControlStreams> ControlStreams for FlowControl<T> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Exposes a public upward API for flow control. | ||||
| impl<T: ControlStreams> ControlFlow for FlowControl<T> { | ||||
|     fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> { | ||||
|         if let Some(mut flow) = self.remote_flow_controller(id) { | ||||
|             if let Some(sz) = flow.apply_window_update() { | ||||
|                 return Ok(Async::Ready(sz)); | ||||
|             } | ||||
|         } else { | ||||
|             return Err(error::User::InvalidStreamId.into()); | ||||
|         } | ||||
|  | ||||
|         self.blocked_remote_window_update = Some(task::current()); | ||||
|         return Ok(Async::NotReady); | ||||
|     } | ||||
|  | ||||
|     fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         if let Some(mut fc) = self.local_flow_controller(id) { | ||||
|             fc.expand_window(incr); | ||||
|         } else { | ||||
|             return Err(error::User::InvalidStreamId.into()); | ||||
|         } | ||||
|  | ||||
|         if id.is_zero() { | ||||
|             self.pending_local_connection_window_update = true; | ||||
|         } else { | ||||
|             self.pending_local_window_updates.push_back(id); | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: ControlPing> ControlPing for FlowControl<T> { | ||||
|     fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> { | ||||
|         self.inner.start_ping(body) | ||||
|     } | ||||
|  | ||||
|     fn pop_pong(&mut self) -> Option<PingPayload> { | ||||
|         self.inner.pop_pong() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, U> FlowControl<T> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ControlStreams, | ||||
| @@ -161,27 +136,33 @@ impl<T, U> FlowControl<T> | ||||
|     /// Returns ready when there are no pending window updates to send. | ||||
|     fn poll_send_local_window_updates(&mut self) -> Poll<(), ConnectionError> { | ||||
|         if let Some(f) = self.sending_local_window_update.take() { | ||||
|             if self.inner.start_send(f.into())?.is_not_ready() { | ||||
|                 self.sending_local_window_update = Some(f); | ||||
|                 return Ok(Async::NotReady); | ||||
|             try_ready!(self.try_send(f)); | ||||
|         } | ||||
|  | ||||
|         if self.pending_local_connection_window_update { | ||||
|             if let Some(incr) = self.connection_local_flow_controller.apply_window_update() { | ||||
|                 try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr))); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         while let Some(id) = self.pending_local_window_updates.pop_front() { | ||||
|             let update = self.inner.streams_mut().get_mut(&id) | ||||
|                 .and_then(|mut s| s.take_local_window_update()) | ||||
|                 .map(|incr| frame::WindowUpdate::new(id, incr)); | ||||
|  | ||||
|             if let Some(f) = update { | ||||
|                 if self.inner.start_send(f.into())?.is_not_ready() { | ||||
|                     self.sending_local_window_update = Some(f); | ||||
|                     return Ok(Async::NotReady); | ||||
|                 } | ||||
|             let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update()); | ||||
|             if let Some(incr) = update { | ||||
|                 try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(Async::Ready(())) | ||||
|     } | ||||
|  | ||||
|     fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> { | ||||
|         if self.inner.start_send(f.into())?.is_not_ready() { | ||||
|             self.sending_local_window_update = Some(f); | ||||
|             Ok(Async::NotReady) | ||||
|         } else { | ||||
|             Ok(Async::Ready(())) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Applies an update to an endpoint's initial window size. | ||||
| @@ -219,7 +200,7 @@ impl<T> ApplySettings for FlowControl<T> | ||||
|             streams.shrink_all_local_windows(decr); | ||||
|         } else {  | ||||
|             let incr = new_window_size - old_window_size; | ||||
|             streams.grow_all_local_windows(incr); | ||||
|             streams.expand_all_local_windows(incr); | ||||
|         } | ||||
|          | ||||
|         self.initial_local_window_size = new_window_size; | ||||
| @@ -241,7 +222,7 @@ impl<T> ApplySettings for FlowControl<T> | ||||
|             streams.shrink_all_remote_windows(decr); | ||||
|         } else {  | ||||
|             let incr = new_window_size - old_window_size; | ||||
|             streams.grow_all_remote_windows(incr); | ||||
|             streams.expand_all_remote_windows(incr); | ||||
|         } | ||||
|          | ||||
|         self.initial_remote_window_size = new_window_size; | ||||
| @@ -263,11 +244,20 @@ impl<T> Stream for FlowControl<T> | ||||
|         loop { | ||||
|             match try_ready!(self.inner.poll()) { | ||||
|                 Some(WindowUpdate(v)) => { | ||||
|                     self.grow_remote_window(v.stream_id(), v.size_increment()); | ||||
|                     if let Some(fc) = self.remote_flow_controller(v.stream_id()) { | ||||
|                         fc.expand_window(v.size_increment()); | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 Some(Data(v)) => { | ||||
|                     self.claim_local_window(&v.stream_id(), v.len())?; | ||||
|                     if self.connection_local_flow_controller.claim_window(v.len()).is_err() { | ||||
|                         return Err(error::Reason::FlowControlError.into()) | ||||
|                     } | ||||
|                     if let Some(fc) = self.local_flow_controller(v.stream_id()) { | ||||
|                         if fc.claim_window(v.len()).is_err() { | ||||
|                             return Err(error::Reason::FlowControlError.into()) | ||||
|                         } | ||||
|                     } | ||||
|                     return Ok(Async::Ready(Some(Data(v)))); | ||||
|                 } | ||||
|  | ||||
| @@ -277,7 +267,6 @@ impl<T> Stream for FlowControl<T> | ||||
|     } | ||||
| } | ||||
|  | ||||
|  | ||||
| impl<T, U> Sink for FlowControl<T> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ReadySink, | ||||
| @@ -300,7 +289,14 @@ impl<T, U> Sink for FlowControl<T> | ||||
|                     return Ok(AsyncSink::NotReady(Data(v))); | ||||
|                 } | ||||
|  | ||||
|                 self.claim_remote_window(&v.stream_id(), v.len())?; | ||||
|                 if self.connection_remote_flow_controller.claim_window(v.len()).is_err() { | ||||
|                     return Err(error::User::FlowControlViolation.into()); | ||||
|                 } | ||||
|                 if let Some(fc) = self.remote_flow_controller(v.stream_id()) { | ||||
|                     if fc.claim_window(v.len()).is_err() { | ||||
|                         return Err(error::User::FlowControlViolation.into()) | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 let res = self.inner.start_send(Data(v))?; | ||||
|                 assert!(res.is_ready()); | ||||
|   | ||||
| @@ -53,7 +53,7 @@ impl FlowControlState { | ||||
|  | ||||
|     /// Claims the provided amount from the window, if there is enough space. | ||||
|     /// | ||||
|     /// Fails when `take_window_update()` hasn't returned at least `sz` more bytes than | ||||
|     /// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than | ||||
|     /// have been previously claimed. | ||||
|     pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> { | ||||
|         if self.window_size < sz { | ||||
| @@ -65,7 +65,7 @@ impl FlowControlState { | ||||
|     } | ||||
|  | ||||
|     /// Increase the _unadvertised_ window capacity. | ||||
|     pub fn grow_window(&mut self, sz: WindowSize) { | ||||
|     pub fn expand_window(&mut self, sz: WindowSize) { | ||||
|         if sz <= self.underflow { | ||||
|             self.underflow -= sz; | ||||
|             return; | ||||
| @@ -77,7 +77,7 @@ impl FlowControlState { | ||||
|     } | ||||
|  | ||||
|     /// Obtains and applies an unadvertised window update. | ||||
|     pub fn take_window_update(&mut self) -> Option<WindowSize> { | ||||
|     pub fn apply_window_update(&mut self) -> Option<WindowSize> { | ||||
|         if self.next_window_update == 0 { | ||||
|             return None; | ||||
|         } | ||||
| @@ -93,29 +93,29 @@ impl FlowControlState { | ||||
| fn test_with_initial_size() { | ||||
|     let mut fc = FlowControlState::with_initial_size(10); | ||||
|  | ||||
|     fc.grow_window(8); | ||||
|     fc.expand_window(8); | ||||
|     assert_eq!(fc.window_size, 10); | ||||
|     assert_eq!(fc.next_window_update, 8); | ||||
|  | ||||
|     assert_eq!(fc.take_window_update(), Some(8)); | ||||
|     assert_eq!(fc.apply_window_update(), Some(8)); | ||||
|     assert_eq!(fc.window_size, 18); | ||||
|     assert_eq!(fc.next_window_update, 0); | ||||
|  | ||||
|     assert!(fc.claim_window(13).is_ok()); | ||||
|     assert_eq!(fc.window_size, 5); | ||||
|     assert_eq!(fc.next_window_update, 0); | ||||
|     assert!(fc.take_window_update().is_none()); | ||||
|     assert!(fc.apply_window_update().is_none()); | ||||
| } | ||||
|  | ||||
| #[test] | ||||
| fn test_with_next_update() { | ||||
|     let mut fc = FlowControlState::with_next_update(10); | ||||
|  | ||||
|     fc.grow_window(8); | ||||
|     fc.expand_window(8); | ||||
|     assert_eq!(fc.window_size, 0); | ||||
|     assert_eq!(fc.next_window_update, 18); | ||||
|  | ||||
|     assert_eq!(fc.take_window_update(), Some(18)); | ||||
|     assert_eq!(fc.apply_window_update(), Some(18)); | ||||
|     assert_eq!(fc.window_size, 18); | ||||
|     assert_eq!(fc.next_window_update, 0); | ||||
| } | ||||
| @@ -125,13 +125,13 @@ fn test_grow_accumulates() { | ||||
|     let mut fc = FlowControlState::with_initial_size(5); | ||||
|  | ||||
|     // Updates accumulate, though the window is not made immediately available.  Trying to | ||||
|     // claim data not returned by take_window_update results in an underflow. | ||||
|     // claim data not returned by apply_window_update results in an underflow. | ||||
|  | ||||
|     fc.grow_window(2); | ||||
|     fc.expand_window(2); | ||||
|     assert_eq!(fc.window_size, 5); | ||||
|     assert_eq!(fc.next_window_update, 2); | ||||
|  | ||||
|     fc.grow_window(6); | ||||
|     fc.expand_window(6); | ||||
|     assert_eq!(fc.window_size, 5); | ||||
|     assert_eq!(fc.next_window_update, 8); | ||||
|  | ||||
| @@ -139,7 +139,7 @@ fn test_grow_accumulates() { | ||||
|     assert_eq!(fc.window_size, 5); | ||||
|     assert_eq!(fc.next_window_update, 8); | ||||
|  | ||||
|     assert_eq!(fc.take_window_update(), Some(8)); | ||||
|     assert_eq!(fc.apply_window_update(), Some(8)); | ||||
|     assert_eq!(fc.window_size, 13); | ||||
|     assert_eq!(fc.next_window_update, 0); | ||||
|  | ||||
| @@ -154,7 +154,7 @@ fn test_shrink() { | ||||
|     assert_eq!(fc.window_size, 5); | ||||
|     assert_eq!(fc.next_window_update, 0); | ||||
|  | ||||
|     fc.grow_window(3); | ||||
|     fc.expand_window(3); | ||||
|     assert_eq!(fc.window_size, 5); | ||||
|     assert_eq!(fc.next_window_update, 3); | ||||
|     assert_eq!(fc.underflow, 0); | ||||
| @@ -169,7 +169,7 @@ fn test_shrink() { | ||||
|     assert_eq!(fc.next_window_update, 0); | ||||
|     assert_eq!(fc.underflow, 5); | ||||
|  | ||||
|     fc.grow_window(8); | ||||
|     fc.expand_window(8); | ||||
|     assert_eq!(fc.window_size, 0); | ||||
|     assert_eq!(fc.next_window_update, 3); | ||||
|     assert_eq!(fc.underflow, 0); | ||||
|   | ||||
| @@ -30,20 +30,56 @@ pub use self::settings::Settings; | ||||
| pub use self::stream_tracker::StreamTracker; | ||||
| use self::state::StreamState; | ||||
|  | ||||
| /// Represents the internals of an HTTP2 connection. | ||||
| /// Represents the internals of an HTTP/2 connection. | ||||
| /// | ||||
| /// A transport consists of several layers (_transporters_) and is arranged from _top_ | ||||
| /// (near the application) to _bottom_ (near the network).  Each transporter implements a | ||||
| /// Stream of frames received from the remote, and a ReadySink of frames sent to the | ||||
| /// remote. | ||||
| /// | ||||
| /// At the top of the transport, the Settings module is responsible for: | ||||
| /// - Transmitting local settings to the remote. | ||||
| /// - Sending settings acknowledgements for all settings frames received from the remote. | ||||
| /// - Exposing settings upward to the Connection. | ||||
| /// ## Transport Layers | ||||
| /// | ||||
| /// ### `Settings` | ||||
| /// | ||||
| /// - Receives remote settings frames and applies the settings downward through the | ||||
| ///   transport (via the ApplySettings trait) before responding with acknowledgements. | ||||
| /// - Exposes ControlSettings up towards the application and transmits local settings to | ||||
| ///   the remote. | ||||
| /// | ||||
| /// ### `FlowControl` | ||||
| /// | ||||
| /// - Tracks received data frames against the local stream and connection flow control | ||||
| ///   windows. | ||||
| /// - Tracks sent data frames against the remote stream and connection flow control | ||||
| ///   windows. | ||||
| /// - Tracks remote settings updates to SETTINGS_INITIAL_WINDOW_SIZE. | ||||
| /// - Exposes `ControlFlow` upwards. | ||||
| ///   - Tracks received window updates against the remote stream and connection flow | ||||
| ///     control windows so that upper layers may poll for updates. | ||||
| ///   - Sends window updates for the local stream and connection flow control windows as | ||||
| ///     instructed by upper layers. | ||||
| /// | ||||
| /// ### `StreamTracker` | ||||
| /// | ||||
| /// - Tracks the states of each stream. | ||||
| /// - **TODO** Enforces maximum concurrency. | ||||
| /// - Exposes `ControlStreams` so that upper layers may share stream state. | ||||
| /// | ||||
| /// ### `PingPong` | ||||
| /// | ||||
| /// - Acknowleges PINGs from the remote. | ||||
| /// - Exposes ControlPing that allows the application side to send ping requests to the | ||||
| ///   remote. Acknowledgements from the remoe are queued to be consumed by the | ||||
| ///   application. | ||||
| /// | ||||
| /// ### FramedRead | ||||
| /// | ||||
| /// - Decodes frames from bytes. | ||||
| /// | ||||
| /// ### FramedWrite | ||||
| /// | ||||
| /// - Encodes frames to bytes. | ||||
| /// | ||||
| /// All transporters below Settings must apply relevant settings before passing a frame on | ||||
| /// to another level.  For example, if the frame writer n | ||||
| type Transport<T, P, B>= | ||||
|     Settings< | ||||
|         FlowControl< | ||||
| @@ -75,25 +111,33 @@ impl StreamMap { | ||||
|  | ||||
|     fn shrink_all_local_windows(&mut self, decr: u32) { | ||||
|         for (_, mut s) in &mut self.inner { | ||||
|             s.shrink_local_window(decr) | ||||
|             if let Some(fc) = s.local_flow_controller() { | ||||
|                 fc.shrink_window(decr); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn grow_all_local_windows(&mut self, incr: u32) { | ||||
|     fn expand_all_local_windows(&mut self, incr: u32) { | ||||
|         for (_, mut s) in &mut self.inner { | ||||
|             s.grow_local_window(incr) | ||||
|             if let Some(fc) = s.local_flow_controller() { | ||||
|                 fc.expand_window(incr); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|      | ||||
|  | ||||
|     fn shrink_all_remote_windows(&mut self, decr: u32) { | ||||
|         for (_, mut s) in &mut self.inner { | ||||
|             s.shrink_remote_window(decr) | ||||
|             if let Some(fc) = s.remote_flow_controller() { | ||||
|                 fc.shrink_window(decr); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn grow_all_remote_windows(&mut self, incr: u32) { | ||||
|     fn expand_all_remote_windows(&mut self, incr: u32) { | ||||
|         for (_, mut s) in &mut self.inner { | ||||
|             s.grow_remote_window(incr) | ||||
|             if let Some(fc) = s.remote_flow_controller() { | ||||
|                 fc.expand_window(incr); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -120,6 +164,13 @@ pub trait ControlStreams { | ||||
|     fn streams_mut(&mut self) -> &mut StreamMap; | ||||
| } | ||||
|  | ||||
| pub type PingPayload = [u8; 8]; | ||||
|  | ||||
| pub trait ControlPing { | ||||
|     fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError>; | ||||
|     fn pop_pong(&mut self) -> Option<PingPayload>; | ||||
| } | ||||
|  | ||||
| /// Exposes flow control states to "upper" layers of the transport (i.e. above | ||||
| /// FlowControl). | ||||
| pub trait ControlFlow { | ||||
| @@ -131,7 +182,7 @@ pub trait ControlFlow { | ||||
|     /// Attempts to increase the receive capacity of a stream. | ||||
|     /// | ||||
|     /// Errors if the given stream is not active. | ||||
|     fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; | ||||
|     fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; | ||||
| } | ||||
|  | ||||
| /// Create a full H2 transport from an I/O handle. | ||||
|   | ||||
| @@ -1,13 +1,16 @@ | ||||
| use ConnectionError; | ||||
| use frame::{Frame, Ping, SettingSet}; | ||||
| use proto::{ApplySettings, ControlPing, PingPayload, ReadySink}; | ||||
|  | ||||
| use futures::*; | ||||
| use proto::{ApplySettings, ReadySink}; | ||||
| use std::collections::VecDeque; | ||||
|  | ||||
| /// Acknowledges ping requests from the remote. | ||||
| #[derive(Debug)] | ||||
| pub struct PingPong<T, U> { | ||||
|     inner: T, | ||||
|     pong: Option<Frame<U>>, | ||||
|     sending_pong: Option<Frame<U>>, | ||||
|     received_pongs: VecDeque<PingPayload>, | ||||
| } | ||||
|  | ||||
| impl<T, U> PingPong<T, U> | ||||
| @@ -17,7 +20,8 @@ impl<T, U> PingPong<T, U> | ||||
|     pub fn new(inner: T) -> Self { | ||||
|         PingPong { | ||||
|             inner, | ||||
|             pong: None, | ||||
|             sending_pong: None, | ||||
|             received_pongs: VecDeque::new(), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -32,18 +36,37 @@ impl<T: ApplySettings, U> ApplySettings for PingPong<T, U> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, U> ControlPing for PingPong<T, U> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ReadySink, | ||||
| { | ||||
|     fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> { | ||||
|         if self.inner.poll_ready()?.is_not_ready() { | ||||
|             return Ok(AsyncSink::NotReady(body)); | ||||
|         } | ||||
|  | ||||
|         match self.inner.start_send(Ping::ping(body).into())? { | ||||
|             AsyncSink::NotReady(_) => unreachable!(), | ||||
|             AsyncSink::Ready => Ok(AsyncSink::Ready), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn pop_pong(&mut self) -> Option<PingPayload> { | ||||
|         self.received_pongs.pop_front() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, U> PingPong<T, U> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
| { | ||||
|     fn try_send_pong(&mut self) -> Poll<(), ConnectionError> { | ||||
|         if let Some(pong) = self.pong.take() { | ||||
|         if let Some(pong) = self.sending_pong.take() { | ||||
|             if let AsyncSink::NotReady(pong) = self.inner.start_send(pong)? { | ||||
|                 // If the pong can't be sent, save it. | ||||
|                 self.pong = Some(pong); | ||||
|                 self.sending_pong = Some(pong); | ||||
|                 return Ok(Async::NotReady); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(Async::Ready(())) | ||||
|     } | ||||
| } | ||||
| @@ -78,7 +101,7 @@ impl<T, U> Stream for PingPong<T, U> | ||||
|                     // Save a pong to be sent when there is nothing more to be returned | ||||
|                     // from the stream or when frames are sent to the sink. | ||||
|                     let pong = Ping::pong(ping.into_payload()); | ||||
|                     self.pong = Some(pong.into()); | ||||
|                     self.sending_pong = Some(pong.into()); | ||||
|                 } | ||||
|  | ||||
|                 // Everything other than ping gets passed through. | ||||
|   | ||||
| @@ -110,8 +110,18 @@ impl<T: ControlFlow> ControlFlow for Settings<T> { | ||||
|         self.inner.poll_remote_window_update(id) | ||||
|     } | ||||
|  | ||||
|     fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         self.inner.grow_local_window(id, incr) | ||||
|     fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         self.inner.expand_local_window(id, incr) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: ControlPing> ControlPing for Settings<T> { | ||||
|     fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> { | ||||
|         self.inner.start_ping(body) | ||||
|     } | ||||
|  | ||||
|     fn pop_pong(&mut self) -> Option<PingPayload> { | ||||
|         self.inner.pop_pong() | ||||
|     } | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -2,7 +2,7 @@ use Peer; | ||||
| use error::ConnectionError; | ||||
| use error::Reason::*; | ||||
| use error::User::*; | ||||
| use proto::{FlowControlState, WindowSize, WindowUnderflow}; | ||||
| use proto::{FlowControlState, WindowSize}; | ||||
|  | ||||
| /// Represents the state of an H2 stream | ||||
| /// | ||||
| @@ -221,130 +221,25 @@ impl StreamState { | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Updates the local flow controller so that the remote may send `incr` more bytes. | ||||
|     /// | ||||
|     /// Returns the amount of capacity created, accounting for window size changes. The | ||||
|     /// caller should send the the returned window size increment to the remote. | ||||
|     /// | ||||
|     /// If the remote is closed, None is returned. | ||||
|     pub fn grow_remote_window(&mut self, incr: WindowSize) { | ||||
|         use self::StreamState::*; | ||||
|         use self::PeerState::*; | ||||
|  | ||||
|         if incr == 0 { | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         match self { | ||||
|             &mut Open { remote: Data(ref mut fc), .. } | | ||||
|             &mut HalfClosedLocal(Data(ref mut fc)) => fc.grow_window(incr), | ||||
|             _ => {}, | ||||
|         } | ||||
|     } | ||||
|   | ||||
|     pub fn claim_remote_window(&mut self, decr: WindowSize) -> Result<(), WindowUnderflow> { | ||||
|     pub fn local_flow_controller(&mut self) -> Option<&mut FlowControlState> { | ||||
|         use self::StreamState::*; | ||||
|         use self::PeerState::*; | ||||
|  | ||||
|         if decr == 0 { | ||||
|             return Ok(()); | ||||
|         } | ||||
|  | ||||
|         match self { | ||||
|             &mut Open { remote: Data(ref mut fc), .. } | | ||||
|             &mut HalfClosedLocal(Data(ref mut fc)) => fc.claim_window(decr), | ||||
|             _ => Ok(()), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn shrink_remote_window(&mut self, decr: WindowSize) { | ||||
|         use self::StreamState::*; | ||||
|         use self::PeerState::*; | ||||
|  | ||||
|         if decr == 0 { | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         match self { | ||||
|             &mut Open { local: Data(ref mut fc), .. } | | ||||
|             &mut HalfClosedLocal(Data(ref mut fc)) => fc.shrink_window(decr), | ||||
|             _ => {}, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Consumes newly-advertised capacity to inform the local endpoint it may send more | ||||
|     /// data. | ||||
|     pub fn take_remote_window_update(&mut self) -> Option<WindowSize> { | ||||
|         use self::StreamState::*; | ||||
|         use self::PeerState::*; | ||||
|  | ||||
|         match self { | ||||
|             &mut Open { remote: Data(ref mut fc), .. } | | ||||
|             &mut HalfClosedLocal(Data(ref mut fc)) => fc.take_window_update(), | ||||
|             &mut HalfClosedRemote(Data(ref mut fc)) => Some(fc), | ||||
|             _ => None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Updates the remote flow controller so that the remote may receive `incr` | ||||
|     /// additional bytes. | ||||
|     /// | ||||
|     /// Returns the amount of capacity created, accounting for window size changes. The | ||||
|     /// caller should send the the returned window size increment to the remote. | ||||
|     pub fn grow_local_window(&mut self, incr: WindowSize) { | ||||
|         use self::StreamState::*; | ||||
|         use self::PeerState::*; | ||||
|  | ||||
|         if incr == 0 { | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         match self { | ||||
|             &mut Open { local: Data(ref mut fc), .. } | | ||||
|             &mut HalfClosedRemote(Data(ref mut fc)) => fc.grow_window(incr), | ||||
|             _ => {}, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn claim_local_window(&mut self, decr: WindowSize) -> Result<(), WindowUnderflow> { | ||||
|         use self::StreamState::*; | ||||
|         use self::PeerState::*; | ||||
|  | ||||
|         if decr == 0 { | ||||
|             return Ok(()); | ||||
|         } | ||||
|  | ||||
|         match self { | ||||
|             &mut Open { local: Data(ref mut fc), .. } | | ||||
|             &mut HalfClosedRemote(Data(ref mut fc)) => fc.claim_window(decr), | ||||
|             _ => Ok(()), | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn shrink_local_window(&mut self, decr: WindowSize) { | ||||
|         use self::StreamState::*; | ||||
|         use self::PeerState::*; | ||||
|  | ||||
|         if decr == 0 { | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         match self { | ||||
|             &mut Open { local: Data(ref mut fc), .. } | | ||||
|             &mut HalfClosedRemote(Data(ref mut fc)) => fc.shrink_window(decr), | ||||
|             _ => {}, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Consumes newly-advertised capacity to inform the local endpoint it may send more | ||||
|     /// data. | ||||
|     pub fn take_local_window_update(&mut self) -> Option<WindowSize> { | ||||
|     pub fn remote_flow_controller(&mut self) -> Option<&mut FlowControlState> { | ||||
|         use self::StreamState::*; | ||||
|         use self::PeerState::*; | ||||
|  | ||||
|         match self { | ||||
|             &mut Open { local: Data(ref mut fc), .. } | | ||||
|             &mut HalfClosedRemote(Data(ref mut fc)) => fc.take_window_update(), | ||||
|             &mut Open { remote: Data(ref mut fc), .. } | | ||||
|             &mut HalfClosedLocal(Data(ref mut fc)) => Some(fc), | ||||
|             _ => None, | ||||
|         } | ||||
|     } | ||||
|   | ||||
| @@ -87,6 +87,18 @@ impl<T, P> ApplySettings for StreamTracker<T, P> | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, P> ControlPing for StreamTracker<T, P> | ||||
|     where T: ControlPing | ||||
| { | ||||
|     fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> { | ||||
|         self.inner.start_ping(body) | ||||
|     } | ||||
|  | ||||
|     fn pop_pong(&mut self) -> Option<PingPayload> { | ||||
|         self.inner.pop_pong() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, P> Stream for StreamTracker<T, P> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           P: Peer, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user