split FlowControl into FlowControlRecv and FlowControlSend
This commit is contained in:
		| @@ -3,10 +3,12 @@ use proto::*; | ||||
|  | ||||
| /// Exposes flow control states to "upper" layers of the transport (i.e. above | ||||
| /// FlowControl). | ||||
| pub trait ControlFlow { | ||||
| pub trait ControlFlowSend { | ||||
|     /// Polls for the next window update from the remote. | ||||
|     fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError>; | ||||
| } | ||||
|  | ||||
| pub trait ControlFlowRecv { | ||||
|     /// Increases the local receive capacity of a stream. | ||||
|     /// | ||||
|     /// This may cause a window update to be sent to the remote. | ||||
| @@ -15,16 +17,29 @@ pub trait ControlFlow { | ||||
|     fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; | ||||
| } | ||||
|  | ||||
| macro_rules! proxy_control_flow { | ||||
| macro_rules! proxy_control_flow_send { | ||||
|     ($outer:ident) => ( | ||||
|         impl<T: ControlFlow> ControlFlow for $outer<T> { | ||||
|         impl<T: ControlFlowSend> ControlFlowSend for $outer<T> { | ||||
|             fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> { | ||||
|                 self.inner.poll_window_update() | ||||
|             } | ||||
|         } | ||||
|     ) | ||||
| } | ||||
|  | ||||
| macro_rules! proxy_control_flow_recv { | ||||
|     ($outer:ident) => ( | ||||
|         impl<T: ControlFlowRecv> ControlFlowRecv for $outer<T> { | ||||
|             fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|                 self.inner.expand_window(id, incr) | ||||
|             } | ||||
|         } | ||||
|     ) | ||||
| } | ||||
|  | ||||
| macro_rules! proxy_control_flow { | ||||
|     ($outer:ident) => ( | ||||
|         proxy_control_flow_recv!($outer); | ||||
|         proxy_control_flow_send!($outer); | ||||
|     ) | ||||
| } | ||||
|   | ||||
| @@ -1,335 +0,0 @@ | ||||
| use {error, ConnectionError, FrameSize}; | ||||
| use frame::{self, Frame}; | ||||
| use proto::*; | ||||
|  | ||||
| use std::collections::VecDeque; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub struct FlowControl<T>  { | ||||
|     inner: T, | ||||
|  | ||||
|     local_initial: WindowSize, | ||||
|     remote_initial: WindowSize, | ||||
|  | ||||
|     /// Tracks the connection-level flow control window for receiving data from the | ||||
|     /// remote. | ||||
|     local_connection: FlowControlState, | ||||
|  | ||||
|     /// Tracks the onnection-level flow control window for receiving data from the remote. | ||||
|     remote_connection: FlowControlState, | ||||
|  | ||||
|     /// Holds the list of streams on which local window updates may be sent. | ||||
|     // XXX It would be cool if this didn't exist. | ||||
|     local_pending_streams: VecDeque<StreamId>, | ||||
|  | ||||
|     /// If a window update can't be sent immediately, it may need to be saved to be sent | ||||
|     /// later. | ||||
|     local_sending: Option<frame::WindowUpdate>, | ||||
|  | ||||
|     /// Holds the list of streams on which local window updates may be sent. | ||||
|     // XXX It would be cool if this didn't exist. | ||||
|     remote_pending_streams: VecDeque<StreamId>, | ||||
|  | ||||
|     /// When `poll_window_update` is not ready, then the calling task is saved to | ||||
|     /// be notified later. Access to poll_window_update must not be shared across tasks, | ||||
|     /// as we only track a single task (and *not* i.e. a task per stream id). | ||||
|     remote_blocked: Option<task::Task>, | ||||
| } | ||||
|  | ||||
| impl<T, U> FlowControl<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ControlStreams | ||||
| { | ||||
|     pub fn new(local_initial: WindowSize, | ||||
|                remote_initial: WindowSize, | ||||
|                inner: T) | ||||
|         -> FlowControl<T> | ||||
|     { | ||||
|         FlowControl { | ||||
|             inner, | ||||
|  | ||||
|             local_initial, | ||||
|             local_connection: FlowControlState::with_initial_size(local_initial), | ||||
|             local_sending: None, | ||||
|             local_pending_streams: VecDeque::new(), | ||||
|  | ||||
|             remote_initial, | ||||
|             remote_connection: FlowControlState::with_initial_size(remote_initial), | ||||
|             remote_blocked: None, | ||||
|             remote_pending_streams: VecDeque::new(), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| // Flow control utitlities. | ||||
| impl<T: ControlStreams> FlowControl<T> { | ||||
|     fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { | ||||
|         if id.is_zero() { | ||||
|             Some(&mut self.local_connection) | ||||
|         } else { | ||||
|             self.inner.recv_flow_controller(id) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|    fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { | ||||
|         if id.is_zero() { | ||||
|             Some(&mut self.remote_connection) | ||||
|         } else { | ||||
|             self.inner.send_flow_controller(id) | ||||
|         } | ||||
|     } | ||||
| } | ||||
| /// Exposes a public upward API for flow control. | ||||
| impl<T: ControlStreams> ControlFlow for FlowControl<T> { | ||||
|     fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> { | ||||
|         // This biases connection window updates, which probably makese sense. | ||||
|         if let Some(incr) = self.remote_connection.apply_window_update() { | ||||
|             return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr))); | ||||
|         } | ||||
|  | ||||
|         // TODO this should probably account for stream priority? | ||||
|         while let Some(id) = self.remote_pending_streams.pop_front() { | ||||
|             if let Some(mut flow) = self.send_flow_controller(id) { | ||||
|                 if let Some(incr) = flow.apply_window_update() { | ||||
|                     return Ok(Async::Ready(WindowUpdate::new(id, incr))); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         self.remote_blocked = Some(task::current()); | ||||
|         return Ok(Async::NotReady); | ||||
|     } | ||||
|  | ||||
|     fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         let added = match self.recv_flow_controller(id) { | ||||
|             None => false, | ||||
|             Some(mut fc) => { | ||||
|                 fc.expand_window(incr); | ||||
|                 true | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         if added { | ||||
|             if !id.is_zero() { | ||||
|                 self.local_pending_streams.push_back(id); | ||||
|             } | ||||
|             Ok(()) | ||||
|         } else if let Some(rst) = self.inner.get_reset(id) { | ||||
|             Err(error::User::StreamReset(rst).into()) | ||||
|         } else { | ||||
|             Err(error::User::InvalidStreamId.into()) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, U> FlowControl<T> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ControlStreams, | ||||
| { | ||||
|     /// Returns ready when there are no pending window updates to send. | ||||
|     fn poll_send_local(&mut self) -> Poll<(), ConnectionError> { | ||||
|         if let Some(f) = self.local_sending.take() { | ||||
|             try_ready!(self.try_send(f)); | ||||
|         } | ||||
|  | ||||
|         if let Some(incr) = self.local_connection.apply_window_update() { | ||||
|             try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr))); | ||||
|         } | ||||
|  | ||||
|         while let Some(id) = self.local_pending_streams.pop_front() { | ||||
|             if self.inner.get_reset(id).is_none() { | ||||
|                 let update = self.recv_flow_controller(id).and_then(|s| s.apply_window_update()); | ||||
|                 if let Some(incr) = update { | ||||
|                     try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(Async::Ready(())) | ||||
|     } | ||||
|  | ||||
|     fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> { | ||||
|         if self.inner.start_send(f.into())?.is_not_ready() { | ||||
|             self.local_sending = Some(f); | ||||
|             Ok(Async::NotReady) | ||||
|         } else { | ||||
|             Ok(Async::Ready(())) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Tracks window updates received from the remote and ensures that the remote does not | ||||
| /// violate the local peer's flow controller. | ||||
| /// | ||||
| /// TODO send flow control reset when the peer violates the flow control window. | ||||
| impl<T> Stream for FlowControl<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: ControlStreams, | ||||
|  { | ||||
|     type Item = T::Item; | ||||
|     type Error = T::Error; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> { | ||||
|         use frame::Frame::*; | ||||
|         trace!("poll"); | ||||
|  | ||||
|         loop { | ||||
|             match try_ready!(self.inner.poll()) { | ||||
|                 Some(WindowUpdate(v)) => { | ||||
|                     if let Some(fc) = self.send_flow_controller(v.stream_id()) { | ||||
|                         fc.expand_window(v.size_increment()); | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 Some(Data(v)) => { | ||||
|                     let sz = v.payload().len() as FrameSize; | ||||
|                     if self.local_connection.claim_window(sz).is_err() { | ||||
|                         return Err(error::Reason::FlowControlError.into()) | ||||
|                     } | ||||
|                     // If this frame ends the stream, there may no longer be a flow | ||||
|                     // controller.  That's fine. | ||||
|                     if let Some(fc) = self.recv_flow_controller(v.stream_id()) { | ||||
|                         if fc.claim_window(sz).is_err() { | ||||
|                             // TODO send flow control reset. | ||||
|                             return Err(error::Reason::FlowControlError.into()) | ||||
|                         } | ||||
|                     } | ||||
|                     return Ok(Async::Ready(Some(Data(v)))); | ||||
|                 } | ||||
|  | ||||
|                 v => return Ok(Async::Ready(v)), | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Tracks the send flow control windows for sent frames. | ||||
| /// | ||||
| /// If sending a frame would violate the remote's window, start_send fails with | ||||
| /// `FlowControlViolation`. | ||||
| /// | ||||
| /// Sends pending window updates before operating on the underlying transport. | ||||
| impl<T, U> Sink for FlowControl<T> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ReadySink, | ||||
|           T: ControlStreams, | ||||
|           U: Buf, | ||||
|  { | ||||
|     type SinkItem = T::SinkItem; | ||||
|     type SinkError = T::SinkError; | ||||
|  | ||||
|     fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> { | ||||
|         use frame::Frame::*; | ||||
|  | ||||
|         debug_assert!(self.inner.get_reset(frame.stream_id()).is_none()); | ||||
|  | ||||
|         // Ensures that: | ||||
|         // 1. all pending local window updates have been sent to the remote. | ||||
|         // 2. the underlying transport is will accept the frame. It's important that this | ||||
|         //    be checked before claiming capacity from the flow controllers. | ||||
|         if self.poll_ready()?.is_not_ready() { | ||||
|             return Ok(AsyncSink::NotReady(frame)); | ||||
|         } | ||||
|  | ||||
|         // Ensure that an outbound data frame does not violate the remote's flow control | ||||
|         // window. | ||||
|         if let &Data(ref v) = &frame { | ||||
|             let sz = v.payload().remaining() as FrameSize; | ||||
|  | ||||
|             // Ensure there's enough capacity on the connection before acting on the | ||||
|             // stream. | ||||
|             if !self.remote_connection.check_window(sz) { | ||||
|                 return Err(error::User::FlowControlViolation.into()); | ||||
|             } | ||||
|  | ||||
|             // Ensure there's enough capacity on stream. | ||||
|             { | ||||
|                 let mut fc = self.inner.send_flow_controller(v.stream_id()) | ||||
|                     .expect("no remote stream for data frame"); | ||||
|                 if fc.claim_window(sz).is_err() { | ||||
|                     return Err(error::User::FlowControlViolation.into()) | ||||
|                 } | ||||
|             } | ||||
|  | ||||
|             self.remote_connection.claim_window(sz) | ||||
|                 .expect("remote connection flow control error"); | ||||
|         } | ||||
|  | ||||
|         let res = self.inner.start_send(frame)?; | ||||
|         assert!(res.is_ready()); | ||||
|         Ok(res) | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), T::SinkError> { | ||||
|         try_ready!(self.poll_send_local()); | ||||
|         self.inner.poll_complete() | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Sends pending window updates before checking the underyling transport's readiness. | ||||
| impl<T, U> ReadySink for FlowControl<T> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ReadySink, | ||||
|           T: ControlStreams, | ||||
|           U: Buf, | ||||
| { | ||||
|     fn poll_ready(&mut self) -> Poll<(), ConnectionError> { | ||||
|         try_ready!(self.poll_send_local()); | ||||
|         self.inner.poll_ready() | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Applies an update to an endpoint's initial window size. | ||||
| /// | ||||
| /// Per RFC 7540 §6.9.2: | ||||
| /// | ||||
| /// > In addition to changing the flow-control window for streams that are not yet | ||||
| /// > active, a SETTINGS frame can alter the initial flow-control window size for | ||||
| /// > streams with active flow-control windows (that is, streams in the "open" or | ||||
| /// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE | ||||
| /// > changes, a receiver MUST adjust the size of all stream flow-control windows that | ||||
| /// > it maintains by the difference between the new value and the old value. | ||||
| /// > | ||||
| /// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a | ||||
| /// > flow-control window to become negative. A sender MUST track the negative | ||||
| /// > flow-control window and MUST NOT send new flow-controlled frames until it | ||||
| /// > receives WINDOW_UPDATE frames that cause the flow-control window to become | ||||
| /// > positive. | ||||
| impl<T> ApplySettings for FlowControl<T> | ||||
|     where T: ApplySettings, | ||||
|           T: ControlStreams | ||||
| { | ||||
|     fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { | ||||
|         self.inner.apply_local_settings(set)?; | ||||
|  | ||||
|         if let Some(new_window_size) = set.initial_window_size() { | ||||
|             let old_window_size = self.local_initial; | ||||
|             if new_window_size == old_window_size { | ||||
|                 return Ok(()); | ||||
|             } | ||||
|  | ||||
|             self.inner.update_inital_recv_window_size(old_window_size, new_window_size); | ||||
|             self.local_initial = new_window_size; | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { | ||||
|         self.inner.apply_remote_settings(set)?; | ||||
|  | ||||
|         if let Some(new_window_size) = set.initial_window_size() { | ||||
|             let old_window_size = self.remote_initial; | ||||
|             if new_window_size == old_window_size { | ||||
|                 return Ok(()); | ||||
|             } | ||||
|  | ||||
|             self.inner.update_inital_send_window_size(old_window_size, new_window_size); | ||||
|             self.remote_initial = new_window_size; | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| proxy_control_streams!(FlowControl); | ||||
| proxy_control_ping!(FlowControl); | ||||
							
								
								
									
										222
									
								
								src/proto/flow_control_recv.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										222
									
								
								src/proto/flow_control_recv.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,222 @@ | ||||
| use {error, ConnectionError, FrameSize}; | ||||
| use frame::{self, Frame}; | ||||
| use proto::*; | ||||
|  | ||||
| use std::collections::VecDeque; | ||||
|  | ||||
| /// Tracks local flow control windows. | ||||
| #[derive(Debug)] | ||||
| pub struct FlowControlRecv<T>  { | ||||
|     inner: T, | ||||
|  | ||||
|  | ||||
|     initial_window_size: WindowSize, | ||||
|  | ||||
|     /// Tracks the connection-level flow control window for receiving data from the | ||||
|     /// remote. | ||||
|     connection: FlowControlState, | ||||
|  | ||||
|     /// Holds the list of streams on which local window updates may be sent. | ||||
|     // XXX It would be cool if this didn't exist. | ||||
|     pending_streams: VecDeque<StreamId>, | ||||
|  | ||||
|     /// If a window update can't be sent immediately, it may need to be saved to be sent | ||||
|     /// later. | ||||
|     sending: Option<frame::WindowUpdate>, | ||||
| } | ||||
|  | ||||
| impl<T, U> FlowControlRecv<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ControlStreams | ||||
| { | ||||
|     pub fn new(initial_window_size: WindowSize, inner: T) -> FlowControlRecv<T> { | ||||
|         FlowControlRecv { | ||||
|             inner, | ||||
|             initial_window_size, | ||||
|             connection: FlowControlState::with_initial_size(initial_window_size), | ||||
|             pending_streams: VecDeque::new(), | ||||
|             sending: None, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Exposes a public upward API for flow control. | ||||
| impl<T: ControlStreams> ControlFlowRecv for FlowControlRecv<T> { | ||||
|     fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         let added = match self.recv_flow_controller(id) { | ||||
|             None => false, | ||||
|             Some(mut fc) => { | ||||
|                 fc.expand_window(incr); | ||||
|                 true | ||||
|             } | ||||
|         }; | ||||
|  | ||||
|         if added { | ||||
|             if !id.is_zero() { | ||||
|                 self.pending_streams.push_back(id); | ||||
|             } | ||||
|             Ok(()) | ||||
|         } else if let Some(rst) = self.inner.get_reset(id) { | ||||
|             Err(error::User::StreamReset(rst).into()) | ||||
|         } else { | ||||
|             Err(error::User::InvalidStreamId.into()) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T, U> FlowControlRecv<T> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ControlStreams, | ||||
| { | ||||
|     /// Returns ready when there are no pending window updates to send. | ||||
|     fn poll_send_local(&mut self) -> Poll<(), ConnectionError> { | ||||
|         if let Some(f) = self.sending.take() { | ||||
|             try_ready!(self.try_send(f)); | ||||
|         } | ||||
|  | ||||
|         if let Some(incr) = self.connection.apply_window_update() { | ||||
|             try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr))); | ||||
|         } | ||||
|  | ||||
|         while let Some(id) = self.pending_streams.pop_front() { | ||||
|             if self.inner.get_reset(id).is_none() { | ||||
|                 let update = self.recv_flow_controller(id).and_then(|s| s.apply_window_update()); | ||||
|                 if let Some(incr) = update { | ||||
|                     try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(Async::Ready(())) | ||||
|     } | ||||
|  | ||||
|     fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> { | ||||
|         if self.inner.start_send(f.into())?.is_not_ready() { | ||||
|             self.sending = Some(f); | ||||
|             Ok(Async::NotReady) | ||||
|         } else { | ||||
|             Ok(Async::Ready(())) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Ensures that the remote does not violate the local peer's flow controller. | ||||
| impl<T> Stream for FlowControlRecv<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: ControlStreams, | ||||
| { | ||||
|     type Item = T::Item; | ||||
|     type Error = T::Error; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> { | ||||
|         trace!("poll"); | ||||
|         loop { | ||||
|             match try_ready!(self.inner.poll()) { | ||||
|                 Some(Frame::Data(v)) => { | ||||
|                     let id = v.stream_id(); | ||||
|                     let sz = v.payload().len() as FrameSize; | ||||
|  | ||||
|                     // Ensure there's enough capacity on the connection before acting on | ||||
|                     // the stream. | ||||
|                     if !self.connection.check_window(sz) { | ||||
|                         // TODO this should cause a GO_AWAY | ||||
|                         return Err(error::Reason::FlowControlError.into()); | ||||
|                     } | ||||
|  | ||||
|                     let fc = self.inner.recv_flow_controller(id) | ||||
|                         .expect("receiving data with no flow controller"); | ||||
|                     if fc.claim_window(sz).is_err() { | ||||
|                         // TODO this should cause a GO_AWAY | ||||
|                         return Err(error::Reason::FlowControlError.into()); | ||||
|                     } | ||||
|  | ||||
|                     self.connection.claim_window(sz) | ||||
|                         .expect("local connection flow control error"); | ||||
|  | ||||
|                     return Ok(Async::Ready(Some(Frame::Data(v)))); | ||||
|                 } | ||||
|  | ||||
|                 v => return Ok(Async::Ready(v)), | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Sends pending window updates before operating on the underlying transport. | ||||
| impl<T, U> Sink for FlowControlRecv<T> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ReadySink, | ||||
|           T: ControlStreams, | ||||
|  { | ||||
|     type SinkItem = T::SinkItem; | ||||
|     type SinkError = T::SinkError; | ||||
|  | ||||
|     fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> { | ||||
|         if self.poll_send_local()?.is_not_ready() { | ||||
|             return Ok(AsyncSink::NotReady(frame)); | ||||
|         } | ||||
|         self.inner.start_send(frame) | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), T::SinkError> { | ||||
|         try_ready!(self.poll_send_local()); | ||||
|         self.inner.poll_complete() | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Sends pending window updates before checking the underyling transport's readiness. | ||||
| impl<T, U> ReadySink for FlowControlRecv<T> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ReadySink, | ||||
|           T: ControlStreams, | ||||
| { | ||||
|     fn poll_ready(&mut self) -> Poll<(), ConnectionError> { | ||||
|         try_ready!(self.poll_send_local()); | ||||
|         self.inner.poll_ready() | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Applies an update to an endpoint's initial window size. | ||||
| /// | ||||
| /// Per RFC 7540 §6.9.2: | ||||
| /// | ||||
| /// > In addition to changing the flow-control window for streams that are not yet | ||||
| /// > active, a SETTINGS frame can alter the initial flow-control window size for | ||||
| /// > streams with active flow-control windows (that is, streams in the "open" or | ||||
| /// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE | ||||
| /// > changes, a receiver MUST adjust the size of all stream flow-control windows that | ||||
| /// > it maintains by the difference between the new value and the old value. | ||||
| /// > | ||||
| /// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a | ||||
| /// > flow-control window to become negative. A sender MUST track the negative | ||||
| /// > flow-control window and MUST NOT send new flow-controlled frames until it | ||||
| /// > receives WINDOW_UPDATE frames that cause the flow-control window to become | ||||
| /// > positive. | ||||
| impl<T> ApplySettings for FlowControlRecv<T> | ||||
|     where T: ApplySettings, | ||||
|           T: ControlStreams | ||||
| { | ||||
|     fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { | ||||
|         self.inner.apply_local_settings(set)?; | ||||
|  | ||||
|         if let Some(new_window_size) = set.initial_window_size() { | ||||
|             let old_window_size = self.initial_window_size; | ||||
|             if new_window_size == old_window_size { | ||||
|                 return Ok(()); | ||||
|             } | ||||
|  | ||||
|             self.inner.update_inital_recv_window_size(old_window_size, new_window_size); | ||||
|             self.initial_window_size = new_window_size; | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { | ||||
|         self.inner.apply_remote_settings(set) | ||||
|     } | ||||
| } | ||||
|  | ||||
| proxy_control_flow_send!(FlowControlRecv); | ||||
| proxy_control_ping!(FlowControlRecv); | ||||
| proxy_control_streams!(FlowControlRecv); | ||||
							
								
								
									
										208
									
								
								src/proto/flow_control_send.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										208
									
								
								src/proto/flow_control_send.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,208 @@ | ||||
| use {error, ConnectionError, FrameSize}; | ||||
| use frame::{self, Frame}; | ||||
| use proto::*; | ||||
|  | ||||
| use std::collections::VecDeque; | ||||
|  | ||||
| /// Tracks remote flow control windows. | ||||
| #[derive(Debug)] | ||||
| pub struct FlowControlSend<T>  { | ||||
|     inner: T, | ||||
|  | ||||
|     initial_window_size: WindowSize, | ||||
|  | ||||
|     /// Tracks the onnection-level flow control window for receiving data from the remote. | ||||
|     connection: FlowControlState, | ||||
|  | ||||
|     /// Holds the list of streams on which local window updates may be sent. | ||||
|     // XXX It would be cool if this didn't exist. | ||||
|     pending_streams: VecDeque<StreamId>, | ||||
|  | ||||
|     /// When `poll_window_update` is not ready, then the calling task is saved to | ||||
|     /// be notified later. Access to poll_window_update must not be shared across tasks, | ||||
|     /// as we only track a single task (and *not* i.e. a task per stream id). | ||||
|     blocked: Option<task::Task>, | ||||
| } | ||||
|  | ||||
| impl<T, U> FlowControlSend<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ControlStreams | ||||
| { | ||||
|     pub fn new(initial_window_size: WindowSize, inner: T) -> FlowControlSend<T> { | ||||
|         FlowControlSend { | ||||
|             inner, | ||||
|             initial_window_size, | ||||
|             connection: FlowControlState::with_initial_size(initial_window_size), | ||||
|             pending_streams: VecDeque::new(), | ||||
|             blocked: None, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Exposes a public upward API for flow control. | ||||
| impl<T: ControlStreams> ControlFlowSend for FlowControlSend<T> { | ||||
|     fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> { | ||||
|         // This biases connection window updates, which probably makes sense. | ||||
|         if let Some(incr) = self.connection.apply_window_update() { | ||||
|             return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr))); | ||||
|         } | ||||
|  | ||||
|         // TODO this should probably account for stream priority? | ||||
|         while let Some(id) = self.pending_streams.pop_front() { | ||||
|             if let Some(mut flow) = self.send_flow_controller(id) { | ||||
|                 if let Some(incr) = flow.apply_window_update() { | ||||
|                     return Ok(Async::Ready(WindowUpdate::new(id, incr))); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         self.blocked = Some(task::current()); | ||||
|         return Ok(Async::NotReady); | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Applies remote window updates as they are received. | ||||
| impl<T> Stream for FlowControlSend<T> | ||||
|     where T: Stream<Item = Frame, Error = ConnectionError>, | ||||
|           T: ControlStreams, | ||||
| { | ||||
|     type Item = T::Item; | ||||
|     type Error = T::Error; | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> { | ||||
|         trace!("poll"); | ||||
|  | ||||
|         loop { | ||||
|             match try_ready!(self.inner.poll()) { | ||||
|                 Some(Frame::WindowUpdate(v)) => { | ||||
|                     let id = v.stream_id(); | ||||
|                     let sz = v.size_increment(); | ||||
|  | ||||
|                     if id.is_zero() { | ||||
|                         self.connection.expand_window(sz); | ||||
|                     } else { | ||||
|                         // The remote may send window updates for streams that the local | ||||
|                         // now considers closed. It's okay. | ||||
|                         if let Some(fc) = self.inner.send_flow_controller(id) { | ||||
|                             fc.expand_window(sz); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 f => return Ok(Async::Ready(f)), | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Tracks the flow control windows for sent davta frames. | ||||
| /// | ||||
| /// If sending a frame would violate the remote's window, start_send fails with | ||||
| /// `FlowControlViolation`. | ||||
| impl<T, U> Sink for FlowControlSend<T> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ReadySink, | ||||
|           T: ControlStreams, | ||||
|           U: Buf, | ||||
|  { | ||||
|     type SinkItem = T::SinkItem; | ||||
|     type SinkError = T::SinkError; | ||||
|  | ||||
|     fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> { | ||||
|         debug_assert!(self.inner.get_reset(frame.stream_id()).is_none()); | ||||
|  | ||||
|         // Ensures that the underlying transport is will accept the frame. It's important | ||||
|         //  that this be checked before claiming capacity from the flow controllers. | ||||
|         if self.poll_ready()?.is_not_ready() { | ||||
|             return Ok(AsyncSink::NotReady(frame)); | ||||
|         } | ||||
|  | ||||
|         // Ensure that an outbound data frame does not violate the remote's flow control | ||||
|         // window. | ||||
|         if let &Frame::Data(ref v) = &frame { | ||||
|             let sz = v.payload().remaining() as FrameSize; | ||||
|  | ||||
|             // Ensure there's enough capacity on the connection before acting on the | ||||
|             // stream. | ||||
|             if !self.connection.check_window(sz) { | ||||
|                 return Err(error::User::FlowControlViolation.into()); | ||||
|             } | ||||
|  | ||||
|             // Ensure there's enough capacity on stream. | ||||
|             let mut fc = self.inner.send_flow_controller(v.stream_id()) | ||||
|                 .expect("no remote stream for data frame"); | ||||
|             if fc.claim_window(sz).is_err() { | ||||
|                 return Err(error::User::FlowControlViolation.into()) | ||||
|             } | ||||
|  | ||||
|             self.connection.claim_window(sz) | ||||
|                 .expect("remote connection flow control error"); | ||||
|         } | ||||
|  | ||||
|         let res = self.inner.start_send(frame)?; | ||||
|         assert!(res.is_ready()); | ||||
|         Ok(res) | ||||
|     } | ||||
|  | ||||
|     fn poll_complete(&mut self) -> Poll<(), T::SinkError> { | ||||
|         self.inner.poll_complete() | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Proxy. | ||||
| impl<T, U> ReadySink for FlowControlSend<T> | ||||
|     where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, | ||||
|           T: ReadySink, | ||||
|           T: ControlStreams, | ||||
|           U: Buf, | ||||
| { | ||||
|     fn poll_ready(&mut self) -> Poll<(), ConnectionError> { | ||||
|         self.inner.poll_ready() | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Applies an update to the remote endpoint's initial window size. | ||||
| /// | ||||
| /// Per RFC 7540 §6.9.2: | ||||
| /// | ||||
| /// > In addition to changing the flow-control window for streams that are not yet | ||||
| /// > active, a SETTINGS frame can alter the initial flow-control window size for | ||||
| /// > streams with active flow-control windows (that is, streams in the "open" or | ||||
| /// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE | ||||
| /// > changes, a receiver MUST adjust the size of all stream flow-control windows that | ||||
| /// > it maintains by the difference between the new value and the old value. | ||||
| /// > | ||||
| /// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a | ||||
| /// > flow-control window to become negative. A sender MUST track the negative | ||||
| /// > flow-control window and MUST NOT send new flow-controlled frames until it | ||||
| /// > receives WINDOW_UPDATE frames that cause the flow-control window to become | ||||
| /// > positive. | ||||
| impl<T> ApplySettings for FlowControlSend<T> | ||||
|     where T: ApplySettings, | ||||
|           T: ControlStreams | ||||
| { | ||||
|     fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { | ||||
|         self.inner.apply_local_settings(set) | ||||
|     } | ||||
|  | ||||
|     fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { | ||||
|         self.inner.apply_remote_settings(set)?; | ||||
|  | ||||
|         if let Some(new_window_size) = set.initial_window_size() { | ||||
|             let old_window_size = self.initial_window_size; | ||||
|             if new_window_size == old_window_size { | ||||
|                 return Ok(()); | ||||
|             } | ||||
|  | ||||
|             self.inner.update_inital_send_window_size(old_window_size, new_window_size); | ||||
|             self.initial_window_size = new_window_size; | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| proxy_control_flow_recv!(FlowControlSend); | ||||
| proxy_control_ping!(FlowControlSend); | ||||
| proxy_control_streams!(FlowControlSend); | ||||
| @@ -62,13 +62,14 @@ mod control_settings; | ||||
| mod control_streams; | ||||
|  | ||||
| use self::apply_settings::ApplySettings; | ||||
| use self::control_flow::ControlFlow; | ||||
| use self::control_flow::{ControlFlowRecv, ControlFlowSend}; | ||||
| use self::control_ping::ControlPing; | ||||
| use self::control_settings::ControlSettings; | ||||
| use self::control_streams::ControlStreams; | ||||
|  | ||||
| mod connection; | ||||
| mod flow_control; | ||||
| mod flow_control_recv; | ||||
| mod flow_control_send; | ||||
| mod flow_control_state; | ||||
| mod framed_read; | ||||
| mod framed_write; | ||||
| @@ -84,7 +85,8 @@ mod stream_states; | ||||
|  | ||||
| pub use self::connection::Connection; | ||||
|  | ||||
| use self::flow_control::FlowControl; | ||||
| use self::flow_control_recv::FlowControlRecv; | ||||
| use self::flow_control_send::FlowControlSend; | ||||
| use self::flow_control_state::FlowControlState; | ||||
| use self::framed_read::FramedRead; | ||||
| use self::framed_write::FramedWrite; | ||||
| @@ -191,10 +193,11 @@ type Transport<T, P, B>= | ||||
| type Streams<T, P> = | ||||
|     StreamSendOpen< | ||||
|         StreamRecvClose< | ||||
|             FlowControl< | ||||
|                 StreamSendClose< | ||||
|                     StreamRecvOpen< | ||||
|                         StreamStates<T, P>>>>>>; | ||||
|             FlowControlSend< | ||||
|                 FlowControlRecv< | ||||
|                     StreamSendClose< | ||||
|                         StreamRecvOpen< | ||||
|                             StreamStates<T, P>>>>>>>; | ||||
|  | ||||
| type Codec<T, B> = | ||||
|     FramedRead< | ||||
| @@ -287,16 +290,17 @@ pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf> | ||||
|             initial_send_window_size, | ||||
|             remote_max_concurrency, | ||||
|             StreamRecvClose::new( | ||||
|                 FlowControl::new( | ||||
|                     initial_recv_window_size, | ||||
|                 FlowControlSend::new( | ||||
|                     initial_send_window_size, | ||||
|                     StreamSendClose::new( | ||||
|                         StreamRecvOpen::new( | ||||
|                             initial_recv_window_size, | ||||
|                             local_max_concurrency, | ||||
|                             StreamStates::new( | ||||
|                                 PingPong::new( | ||||
|                                     FramedRead::new(framed)))))))) | ||||
|                     FlowControlRecv::new( | ||||
|                         initial_recv_window_size, | ||||
|                         StreamSendClose::new( | ||||
|                             StreamRecvOpen::new( | ||||
|                                 initial_recv_window_size, | ||||
|                                 local_max_concurrency, | ||||
|                                 StreamStates::new( | ||||
|                                     PingPong::new( | ||||
|                                         FramedRead::new(framed))))))))) | ||||
|     }); | ||||
|  | ||||
|     connection::new(transport) | ||||
|   | ||||
| @@ -214,14 +214,5 @@ impl<T: AsyncRead> AsyncRead for Settings<T> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<T: ControlFlow> ControlFlow for Settings<T> { | ||||
|     fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> { | ||||
|         self.inner.poll_window_update() | ||||
|     } | ||||
|  | ||||
|     fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { | ||||
|         self.inner.expand_window(id, incr) | ||||
|     } | ||||
| } | ||||
|  | ||||
| proxy_control_flow!(Settings); | ||||
| proxy_control_ping!(Settings); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user