Merge pull request #5 from carllerche/ver/flow-split
split FlowControl into FlowControlRecv and FlowControlSend
This commit is contained in:
@@ -3,10 +3,12 @@ use proto::*;
|
||||
|
||||
/// Exposes flow control states to "upper" layers of the transport (i.e. above
|
||||
/// FlowControl).
|
||||
pub trait ControlFlow {
|
||||
pub trait ControlFlowSend {
|
||||
/// Polls for the next window update from the remote.
|
||||
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError>;
|
||||
}
|
||||
|
||||
pub trait ControlFlowRecv {
|
||||
/// Increases the local receive capacity of a stream.
|
||||
///
|
||||
/// This may cause a window update to be sent to the remote.
|
||||
@@ -15,16 +17,29 @@ pub trait ControlFlow {
|
||||
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>;
|
||||
}
|
||||
|
||||
macro_rules! proxy_control_flow {
|
||||
macro_rules! proxy_control_flow_send {
|
||||
($outer:ident) => (
|
||||
impl<T: ControlFlow> ControlFlow for $outer<T> {
|
||||
impl<T: ControlFlowSend> ControlFlowSend for $outer<T> {
|
||||
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
|
||||
self.inner.poll_window_update()
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
macro_rules! proxy_control_flow_recv {
|
||||
($outer:ident) => (
|
||||
impl<T: ControlFlowRecv> ControlFlowRecv for $outer<T> {
|
||||
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
|
||||
self.inner.expand_window(id, incr)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
macro_rules! proxy_control_flow {
|
||||
($outer:ident) => (
|
||||
proxy_control_flow_recv!($outer);
|
||||
proxy_control_flow_send!($outer);
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,335 +0,0 @@
|
||||
use {error, ConnectionError, FrameSize};
|
||||
use frame::{self, Frame};
|
||||
use proto::*;
|
||||
|
||||
use std::collections::VecDeque;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FlowControl<T> {
|
||||
inner: T,
|
||||
|
||||
local_initial: WindowSize,
|
||||
remote_initial: WindowSize,
|
||||
|
||||
/// Tracks the connection-level flow control window for receiving data from the
|
||||
/// remote.
|
||||
local_connection: FlowControlState,
|
||||
|
||||
/// Tracks the onnection-level flow control window for receiving data from the remote.
|
||||
remote_connection: FlowControlState,
|
||||
|
||||
/// Holds the list of streams on which local window updates may be sent.
|
||||
// XXX It would be cool if this didn't exist.
|
||||
local_pending_streams: VecDeque<StreamId>,
|
||||
|
||||
/// If a window update can't be sent immediately, it may need to be saved to be sent
|
||||
/// later.
|
||||
local_sending: Option<frame::WindowUpdate>,
|
||||
|
||||
/// Holds the list of streams on which local window updates may be sent.
|
||||
// XXX It would be cool if this didn't exist.
|
||||
remote_pending_streams: VecDeque<StreamId>,
|
||||
|
||||
/// When `poll_window_update` is not ready, then the calling task is saved to
|
||||
/// be notified later. Access to poll_window_update must not be shared across tasks,
|
||||
/// as we only track a single task (and *not* i.e. a task per stream id).
|
||||
remote_blocked: Option<task::Task>,
|
||||
}
|
||||
|
||||
impl<T, U> FlowControl<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ControlStreams
|
||||
{
|
||||
pub fn new(local_initial: WindowSize,
|
||||
remote_initial: WindowSize,
|
||||
inner: T)
|
||||
-> FlowControl<T>
|
||||
{
|
||||
FlowControl {
|
||||
inner,
|
||||
|
||||
local_initial,
|
||||
local_connection: FlowControlState::with_initial_size(local_initial),
|
||||
local_sending: None,
|
||||
local_pending_streams: VecDeque::new(),
|
||||
|
||||
remote_initial,
|
||||
remote_connection: FlowControlState::with_initial_size(remote_initial),
|
||||
remote_blocked: None,
|
||||
remote_pending_streams: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flow control utitlities.
|
||||
impl<T: ControlStreams> FlowControl<T> {
|
||||
fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
|
||||
if id.is_zero() {
|
||||
Some(&mut self.local_connection)
|
||||
} else {
|
||||
self.inner.recv_flow_controller(id)
|
||||
}
|
||||
}
|
||||
|
||||
fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
|
||||
if id.is_zero() {
|
||||
Some(&mut self.remote_connection)
|
||||
} else {
|
||||
self.inner.send_flow_controller(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Exposes a public upward API for flow control.
|
||||
impl<T: ControlStreams> ControlFlow for FlowControl<T> {
|
||||
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
|
||||
// This biases connection window updates, which probably makese sense.
|
||||
if let Some(incr) = self.remote_connection.apply_window_update() {
|
||||
return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr)));
|
||||
}
|
||||
|
||||
// TODO this should probably account for stream priority?
|
||||
while let Some(id) = self.remote_pending_streams.pop_front() {
|
||||
if let Some(mut flow) = self.send_flow_controller(id) {
|
||||
if let Some(incr) = flow.apply_window_update() {
|
||||
return Ok(Async::Ready(WindowUpdate::new(id, incr)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.remote_blocked = Some(task::current());
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
|
||||
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
|
||||
let added = match self.recv_flow_controller(id) {
|
||||
None => false,
|
||||
Some(mut fc) => {
|
||||
fc.expand_window(incr);
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if added {
|
||||
if !id.is_zero() {
|
||||
self.local_pending_streams.push_back(id);
|
||||
}
|
||||
Ok(())
|
||||
} else if let Some(rst) = self.inner.get_reset(id) {
|
||||
Err(error::User::StreamReset(rst).into())
|
||||
} else {
|
||||
Err(error::User::InvalidStreamId.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> FlowControl<T>
|
||||
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ControlStreams,
|
||||
{
|
||||
/// Returns ready when there are no pending window updates to send.
|
||||
fn poll_send_local(&mut self) -> Poll<(), ConnectionError> {
|
||||
if let Some(f) = self.local_sending.take() {
|
||||
try_ready!(self.try_send(f));
|
||||
}
|
||||
|
||||
if let Some(incr) = self.local_connection.apply_window_update() {
|
||||
try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr)));
|
||||
}
|
||||
|
||||
while let Some(id) = self.local_pending_streams.pop_front() {
|
||||
if self.inner.get_reset(id).is_none() {
|
||||
let update = self.recv_flow_controller(id).and_then(|s| s.apply_window_update());
|
||||
if let Some(incr) = update {
|
||||
try_ready!(self.try_send(frame::WindowUpdate::new(id, incr)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> {
|
||||
if self.inner.start_send(f.into())?.is_not_ready() {
|
||||
self.local_sending = Some(f);
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks window updates received from the remote and ensures that the remote does not
|
||||
/// violate the local peer's flow controller.
|
||||
///
|
||||
/// TODO send flow control reset when the peer violates the flow control window.
|
||||
impl<T> Stream for FlowControl<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: ControlStreams,
|
||||
{
|
||||
type Item = T::Item;
|
||||
type Error = T::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
|
||||
use frame::Frame::*;
|
||||
trace!("poll");
|
||||
|
||||
loop {
|
||||
match try_ready!(self.inner.poll()) {
|
||||
Some(WindowUpdate(v)) => {
|
||||
if let Some(fc) = self.send_flow_controller(v.stream_id()) {
|
||||
fc.expand_window(v.size_increment());
|
||||
}
|
||||
}
|
||||
|
||||
Some(Data(v)) => {
|
||||
let sz = v.payload().len() as FrameSize;
|
||||
if self.local_connection.claim_window(sz).is_err() {
|
||||
return Err(error::Reason::FlowControlError.into())
|
||||
}
|
||||
// If this frame ends the stream, there may no longer be a flow
|
||||
// controller. That's fine.
|
||||
if let Some(fc) = self.recv_flow_controller(v.stream_id()) {
|
||||
if fc.claim_window(sz).is_err() {
|
||||
// TODO send flow control reset.
|
||||
return Err(error::Reason::FlowControlError.into())
|
||||
}
|
||||
}
|
||||
return Ok(Async::Ready(Some(Data(v))));
|
||||
}
|
||||
|
||||
v => return Ok(Async::Ready(v)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks the send flow control windows for sent frames.
|
||||
///
|
||||
/// If sending a frame would violate the remote's window, start_send fails with
|
||||
/// `FlowControlViolation`.
|
||||
///
|
||||
/// Sends pending window updates before operating on the underlying transport.
|
||||
impl<T, U> Sink for FlowControl<T>
|
||||
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ReadySink,
|
||||
T: ControlStreams,
|
||||
U: Buf,
|
||||
{
|
||||
type SinkItem = T::SinkItem;
|
||||
type SinkError = T::SinkError;
|
||||
|
||||
fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
|
||||
use frame::Frame::*;
|
||||
|
||||
debug_assert!(self.inner.get_reset(frame.stream_id()).is_none());
|
||||
|
||||
// Ensures that:
|
||||
// 1. all pending local window updates have been sent to the remote.
|
||||
// 2. the underlying transport is will accept the frame. It's important that this
|
||||
// be checked before claiming capacity from the flow controllers.
|
||||
if self.poll_ready()?.is_not_ready() {
|
||||
return Ok(AsyncSink::NotReady(frame));
|
||||
}
|
||||
|
||||
// Ensure that an outbound data frame does not violate the remote's flow control
|
||||
// window.
|
||||
if let &Data(ref v) = &frame {
|
||||
let sz = v.payload().remaining() as FrameSize;
|
||||
|
||||
// Ensure there's enough capacity on the connection before acting on the
|
||||
// stream.
|
||||
if !self.remote_connection.check_window(sz) {
|
||||
return Err(error::User::FlowControlViolation.into());
|
||||
}
|
||||
|
||||
// Ensure there's enough capacity on stream.
|
||||
{
|
||||
let mut fc = self.inner.send_flow_controller(v.stream_id())
|
||||
.expect("no remote stream for data frame");
|
||||
if fc.claim_window(sz).is_err() {
|
||||
return Err(error::User::FlowControlViolation.into())
|
||||
}
|
||||
}
|
||||
|
||||
self.remote_connection.claim_window(sz)
|
||||
.expect("remote connection flow control error");
|
||||
}
|
||||
|
||||
let res = self.inner.start_send(frame)?;
|
||||
assert!(res.is_ready());
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
|
||||
try_ready!(self.poll_send_local());
|
||||
self.inner.poll_complete()
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends pending window updates before checking the underyling transport's readiness.
|
||||
impl<T, U> ReadySink for FlowControl<T>
|
||||
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ReadySink,
|
||||
T: ControlStreams,
|
||||
U: Buf,
|
||||
{
|
||||
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
|
||||
try_ready!(self.poll_send_local());
|
||||
self.inner.poll_ready()
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies an update to an endpoint's initial window size.
|
||||
///
|
||||
/// Per RFC 7540 §6.9.2:
|
||||
///
|
||||
/// > In addition to changing the flow-control window for streams that are not yet
|
||||
/// > active, a SETTINGS frame can alter the initial flow-control window size for
|
||||
/// > streams with active flow-control windows (that is, streams in the "open" or
|
||||
/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE
|
||||
/// > changes, a receiver MUST adjust the size of all stream flow-control windows that
|
||||
/// > it maintains by the difference between the new value and the old value.
|
||||
/// >
|
||||
/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a
|
||||
/// > flow-control window to become negative. A sender MUST track the negative
|
||||
/// > flow-control window and MUST NOT send new flow-controlled frames until it
|
||||
/// > receives WINDOW_UPDATE frames that cause the flow-control window to become
|
||||
/// > positive.
|
||||
impl<T> ApplySettings for FlowControl<T>
|
||||
where T: ApplySettings,
|
||||
T: ControlStreams
|
||||
{
|
||||
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
|
||||
self.inner.apply_local_settings(set)?;
|
||||
|
||||
if let Some(new_window_size) = set.initial_window_size() {
|
||||
let old_window_size = self.local_initial;
|
||||
if new_window_size == old_window_size {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.inner.update_inital_recv_window_size(old_window_size, new_window_size);
|
||||
self.local_initial = new_window_size;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
|
||||
self.inner.apply_remote_settings(set)?;
|
||||
|
||||
if let Some(new_window_size) = set.initial_window_size() {
|
||||
let old_window_size = self.remote_initial;
|
||||
if new_window_size == old_window_size {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.inner.update_inital_send_window_size(old_window_size, new_window_size);
|
||||
self.remote_initial = new_window_size;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
proxy_control_streams!(FlowControl);
|
||||
proxy_control_ping!(FlowControl);
|
||||
222
src/proto/flow_control_recv.rs
Normal file
222
src/proto/flow_control_recv.rs
Normal file
@@ -0,0 +1,222 @@
|
||||
use {error, ConnectionError, FrameSize};
|
||||
use frame::{self, Frame};
|
||||
use proto::*;
|
||||
|
||||
use std::collections::VecDeque;
|
||||
|
||||
/// Tracks local flow control windows.
|
||||
#[derive(Debug)]
|
||||
pub struct FlowControlRecv<T> {
|
||||
inner: T,
|
||||
|
||||
|
||||
initial_window_size: WindowSize,
|
||||
|
||||
/// Tracks the connection-level flow control window for receiving data from the
|
||||
/// remote.
|
||||
connection: FlowControlState,
|
||||
|
||||
/// Holds the list of streams on which local window updates may be sent.
|
||||
// XXX It would be cool if this didn't exist.
|
||||
pending_streams: VecDeque<StreamId>,
|
||||
|
||||
/// If a window update can't be sent immediately, it may need to be saved to be sent
|
||||
/// later.
|
||||
sending: Option<frame::WindowUpdate>,
|
||||
}
|
||||
|
||||
impl<T, U> FlowControlRecv<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ControlStreams
|
||||
{
|
||||
pub fn new(initial_window_size: WindowSize, inner: T) -> FlowControlRecv<T> {
|
||||
FlowControlRecv {
|
||||
inner,
|
||||
initial_window_size,
|
||||
connection: FlowControlState::with_initial_size(initial_window_size),
|
||||
pending_streams: VecDeque::new(),
|
||||
sending: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Exposes a public upward API for flow control.
|
||||
impl<T: ControlStreams> ControlFlowRecv for FlowControlRecv<T> {
|
||||
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
|
||||
let added = match self.recv_flow_controller(id) {
|
||||
None => false,
|
||||
Some(mut fc) => {
|
||||
fc.expand_window(incr);
|
||||
true
|
||||
}
|
||||
};
|
||||
|
||||
if added {
|
||||
if !id.is_zero() {
|
||||
self.pending_streams.push_back(id);
|
||||
}
|
||||
Ok(())
|
||||
} else if let Some(rst) = self.inner.get_reset(id) {
|
||||
Err(error::User::StreamReset(rst).into())
|
||||
} else {
|
||||
Err(error::User::InvalidStreamId.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> FlowControlRecv<T>
|
||||
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ControlStreams,
|
||||
{
|
||||
/// Returns ready when there are no pending window updates to send.
|
||||
fn poll_send_local(&mut self) -> Poll<(), ConnectionError> {
|
||||
if let Some(f) = self.sending.take() {
|
||||
try_ready!(self.try_send(f));
|
||||
}
|
||||
|
||||
if let Some(incr) = self.connection.apply_window_update() {
|
||||
try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr)));
|
||||
}
|
||||
|
||||
while let Some(id) = self.pending_streams.pop_front() {
|
||||
if self.inner.get_reset(id).is_none() {
|
||||
let update = self.recv_flow_controller(id).and_then(|s| s.apply_window_update());
|
||||
if let Some(incr) = update {
|
||||
try_ready!(self.try_send(frame::WindowUpdate::new(id, incr)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
|
||||
fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> {
|
||||
if self.inner.start_send(f.into())?.is_not_ready() {
|
||||
self.sending = Some(f);
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensures that the remote does not violate the local peer's flow controller.
|
||||
impl<T> Stream for FlowControlRecv<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: ControlStreams,
|
||||
{
|
||||
type Item = T::Item;
|
||||
type Error = T::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
|
||||
trace!("poll");
|
||||
loop {
|
||||
match try_ready!(self.inner.poll()) {
|
||||
Some(Frame::Data(v)) => {
|
||||
let id = v.stream_id();
|
||||
let sz = v.payload().len() as FrameSize;
|
||||
|
||||
// Ensure there's enough capacity on the connection before acting on
|
||||
// the stream.
|
||||
if !self.connection.check_window(sz) {
|
||||
// TODO this should cause a GO_AWAY
|
||||
return Err(error::Reason::FlowControlError.into());
|
||||
}
|
||||
|
||||
let fc = self.inner.recv_flow_controller(id)
|
||||
.expect("receiving data with no flow controller");
|
||||
if fc.claim_window(sz).is_err() {
|
||||
// TODO this should cause a GO_AWAY
|
||||
return Err(error::Reason::FlowControlError.into());
|
||||
}
|
||||
|
||||
self.connection.claim_window(sz)
|
||||
.expect("local connection flow control error");
|
||||
|
||||
return Ok(Async::Ready(Some(Frame::Data(v))));
|
||||
}
|
||||
|
||||
v => return Ok(Async::Ready(v)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends pending window updates before operating on the underlying transport.
|
||||
impl<T, U> Sink for FlowControlRecv<T>
|
||||
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ReadySink,
|
||||
T: ControlStreams,
|
||||
{
|
||||
type SinkItem = T::SinkItem;
|
||||
type SinkError = T::SinkError;
|
||||
|
||||
fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
|
||||
if self.poll_send_local()?.is_not_ready() {
|
||||
return Ok(AsyncSink::NotReady(frame));
|
||||
}
|
||||
self.inner.start_send(frame)
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
|
||||
try_ready!(self.poll_send_local());
|
||||
self.inner.poll_complete()
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends pending window updates before checking the underyling transport's readiness.
|
||||
impl<T, U> ReadySink for FlowControlRecv<T>
|
||||
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ReadySink,
|
||||
T: ControlStreams,
|
||||
{
|
||||
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
|
||||
try_ready!(self.poll_send_local());
|
||||
self.inner.poll_ready()
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies an update to an endpoint's initial window size.
|
||||
///
|
||||
/// Per RFC 7540 §6.9.2:
|
||||
///
|
||||
/// > In addition to changing the flow-control window for streams that are not yet
|
||||
/// > active, a SETTINGS frame can alter the initial flow-control window size for
|
||||
/// > streams with active flow-control windows (that is, streams in the "open" or
|
||||
/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE
|
||||
/// > changes, a receiver MUST adjust the size of all stream flow-control windows that
|
||||
/// > it maintains by the difference between the new value and the old value.
|
||||
/// >
|
||||
/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a
|
||||
/// > flow-control window to become negative. A sender MUST track the negative
|
||||
/// > flow-control window and MUST NOT send new flow-controlled frames until it
|
||||
/// > receives WINDOW_UPDATE frames that cause the flow-control window to become
|
||||
/// > positive.
|
||||
impl<T> ApplySettings for FlowControlRecv<T>
|
||||
where T: ApplySettings,
|
||||
T: ControlStreams
|
||||
{
|
||||
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
|
||||
self.inner.apply_local_settings(set)?;
|
||||
|
||||
if let Some(new_window_size) = set.initial_window_size() {
|
||||
let old_window_size = self.initial_window_size;
|
||||
if new_window_size == old_window_size {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.inner.update_inital_recv_window_size(old_window_size, new_window_size);
|
||||
self.initial_window_size = new_window_size;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
|
||||
self.inner.apply_remote_settings(set)
|
||||
}
|
||||
}
|
||||
|
||||
proxy_control_flow_send!(FlowControlRecv);
|
||||
proxy_control_ping!(FlowControlRecv);
|
||||
proxy_control_streams!(FlowControlRecv);
|
||||
208
src/proto/flow_control_send.rs
Normal file
208
src/proto/flow_control_send.rs
Normal file
@@ -0,0 +1,208 @@
|
||||
use {error, ConnectionError, FrameSize};
|
||||
use frame::{self, Frame};
|
||||
use proto::*;
|
||||
|
||||
use std::collections::VecDeque;
|
||||
|
||||
/// Tracks remote flow control windows.
|
||||
#[derive(Debug)]
|
||||
pub struct FlowControlSend<T> {
|
||||
inner: T,
|
||||
|
||||
initial_window_size: WindowSize,
|
||||
|
||||
/// Tracks the onnection-level flow control window for receiving data from the remote.
|
||||
connection: FlowControlState,
|
||||
|
||||
/// Holds the list of streams on which local window updates may be sent.
|
||||
// XXX It would be cool if this didn't exist.
|
||||
pending_streams: VecDeque<StreamId>,
|
||||
|
||||
/// When `poll_window_update` is not ready, then the calling task is saved to
|
||||
/// be notified later. Access to poll_window_update must not be shared across tasks,
|
||||
/// as we only track a single task (and *not* i.e. a task per stream id).
|
||||
blocked: Option<task::Task>,
|
||||
}
|
||||
|
||||
impl<T, U> FlowControlSend<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ControlStreams
|
||||
{
|
||||
pub fn new(initial_window_size: WindowSize, inner: T) -> FlowControlSend<T> {
|
||||
FlowControlSend {
|
||||
inner,
|
||||
initial_window_size,
|
||||
connection: FlowControlState::with_initial_size(initial_window_size),
|
||||
pending_streams: VecDeque::new(),
|
||||
blocked: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Exposes a public upward API for flow control.
|
||||
impl<T: ControlStreams> ControlFlowSend for FlowControlSend<T> {
|
||||
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
|
||||
// This biases connection window updates, which probably makes sense.
|
||||
if let Some(incr) = self.connection.apply_window_update() {
|
||||
return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr)));
|
||||
}
|
||||
|
||||
// TODO this should probably account for stream priority?
|
||||
while let Some(id) = self.pending_streams.pop_front() {
|
||||
if let Some(mut flow) = self.send_flow_controller(id) {
|
||||
if let Some(incr) = flow.apply_window_update() {
|
||||
return Ok(Async::Ready(WindowUpdate::new(id, incr)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.blocked = Some(task::current());
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies remote window updates as they are received.
|
||||
impl<T> Stream for FlowControlSend<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: ControlStreams,
|
||||
{
|
||||
type Item = T::Item;
|
||||
type Error = T::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
|
||||
trace!("poll");
|
||||
|
||||
loop {
|
||||
match try_ready!(self.inner.poll()) {
|
||||
Some(Frame::WindowUpdate(v)) => {
|
||||
let id = v.stream_id();
|
||||
let sz = v.size_increment();
|
||||
|
||||
if id.is_zero() {
|
||||
self.connection.expand_window(sz);
|
||||
} else {
|
||||
// The remote may send window updates for streams that the local
|
||||
// now considers closed. It's okay.
|
||||
if let Some(fc) = self.inner.send_flow_controller(id) {
|
||||
fc.expand_window(sz);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
f => return Ok(Async::Ready(f)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks the flow control windows for sent davta frames.
|
||||
///
|
||||
/// If sending a frame would violate the remote's window, start_send fails with
|
||||
/// `FlowControlViolation`.
|
||||
impl<T, U> Sink for FlowControlSend<T>
|
||||
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ReadySink,
|
||||
T: ControlStreams,
|
||||
U: Buf,
|
||||
{
|
||||
type SinkItem = T::SinkItem;
|
||||
type SinkError = T::SinkError;
|
||||
|
||||
fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
|
||||
debug_assert!(self.inner.get_reset(frame.stream_id()).is_none());
|
||||
|
||||
// Ensures that the underlying transport is will accept the frame. It's important
|
||||
// that this be checked before claiming capacity from the flow controllers.
|
||||
if self.poll_ready()?.is_not_ready() {
|
||||
return Ok(AsyncSink::NotReady(frame));
|
||||
}
|
||||
|
||||
// Ensure that an outbound data frame does not violate the remote's flow control
|
||||
// window.
|
||||
if let &Frame::Data(ref v) = &frame {
|
||||
let sz = v.payload().remaining() as FrameSize;
|
||||
|
||||
// Ensure there's enough capacity on the connection before acting on the
|
||||
// stream.
|
||||
if !self.connection.check_window(sz) {
|
||||
return Err(error::User::FlowControlViolation.into());
|
||||
}
|
||||
|
||||
// Ensure there's enough capacity on stream.
|
||||
let mut fc = self.inner.send_flow_controller(v.stream_id())
|
||||
.expect("no remote stream for data frame");
|
||||
if fc.claim_window(sz).is_err() {
|
||||
return Err(error::User::FlowControlViolation.into())
|
||||
}
|
||||
|
||||
self.connection.claim_window(sz)
|
||||
.expect("remote connection flow control error");
|
||||
}
|
||||
|
||||
let res = self.inner.start_send(frame)?;
|
||||
assert!(res.is_ready());
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
|
||||
self.inner.poll_complete()
|
||||
}
|
||||
}
|
||||
|
||||
/// Proxy.
|
||||
impl<T, U> ReadySink for FlowControlSend<T>
|
||||
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ReadySink,
|
||||
T: ControlStreams,
|
||||
U: Buf,
|
||||
{
|
||||
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
|
||||
self.inner.poll_ready()
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies an update to the remote endpoint's initial window size.
|
||||
///
|
||||
/// Per RFC 7540 §6.9.2:
|
||||
///
|
||||
/// > In addition to changing the flow-control window for streams that are not yet
|
||||
/// > active, a SETTINGS frame can alter the initial flow-control window size for
|
||||
/// > streams with active flow-control windows (that is, streams in the "open" or
|
||||
/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE
|
||||
/// > changes, a receiver MUST adjust the size of all stream flow-control windows that
|
||||
/// > it maintains by the difference between the new value and the old value.
|
||||
/// >
|
||||
/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a
|
||||
/// > flow-control window to become negative. A sender MUST track the negative
|
||||
/// > flow-control window and MUST NOT send new flow-controlled frames until it
|
||||
/// > receives WINDOW_UPDATE frames that cause the flow-control window to become
|
||||
/// > positive.
|
||||
impl<T> ApplySettings for FlowControlSend<T>
|
||||
where T: ApplySettings,
|
||||
T: ControlStreams
|
||||
{
|
||||
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
|
||||
self.inner.apply_local_settings(set)
|
||||
}
|
||||
|
||||
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
|
||||
self.inner.apply_remote_settings(set)?;
|
||||
|
||||
if let Some(new_window_size) = set.initial_window_size() {
|
||||
let old_window_size = self.initial_window_size;
|
||||
if new_window_size == old_window_size {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.inner.update_inital_send_window_size(old_window_size, new_window_size);
|
||||
self.initial_window_size = new_window_size;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
proxy_control_flow_recv!(FlowControlSend);
|
||||
proxy_control_ping!(FlowControlSend);
|
||||
proxy_control_streams!(FlowControlSend);
|
||||
@@ -62,13 +62,14 @@ mod control_settings;
|
||||
mod control_streams;
|
||||
|
||||
use self::apply_settings::ApplySettings;
|
||||
use self::control_flow::ControlFlow;
|
||||
use self::control_flow::{ControlFlowRecv, ControlFlowSend};
|
||||
use self::control_ping::ControlPing;
|
||||
use self::control_settings::ControlSettings;
|
||||
use self::control_streams::ControlStreams;
|
||||
|
||||
mod connection;
|
||||
mod flow_control;
|
||||
mod flow_control_recv;
|
||||
mod flow_control_send;
|
||||
mod flow_control_state;
|
||||
mod framed_read;
|
||||
mod framed_write;
|
||||
@@ -84,7 +85,8 @@ mod stream_states;
|
||||
|
||||
pub use self::connection::Connection;
|
||||
|
||||
use self::flow_control::FlowControl;
|
||||
use self::flow_control_recv::FlowControlRecv;
|
||||
use self::flow_control_send::FlowControlSend;
|
||||
use self::flow_control_state::FlowControlState;
|
||||
use self::framed_read::FramedRead;
|
||||
use self::framed_write::FramedWrite;
|
||||
@@ -130,26 +132,31 @@ use self::stream_states::StreamStates;
|
||||
/// - Ensures that frames sent from the local peer are appropriate for the stream's state.
|
||||
/// - Ensures that the remote's max stream concurrency is not violated.
|
||||
///
|
||||
/// #### `StreamRecvClose`
|
||||
/// #### `FlowControlSend`
|
||||
///
|
||||
/// - Updates the stream state for frames sent with END_STREAM.
|
||||
///
|
||||
/// #### `FlowControl`
|
||||
///
|
||||
/// - Tracks received data frames against the local stream and connection flow control
|
||||
/// windows.
|
||||
/// - Tracks sent data frames against the remote stream and connection flow control
|
||||
/// windows.
|
||||
/// - Tracks remote settings updates to SETTINGS_INITIAL_WINDOW_SIZE.
|
||||
/// - Exposes `ControlFlow` upwards.
|
||||
/// - Exposes `ControlFlowSend` upwards.
|
||||
/// - Tracks received window updates against the remote stream and connection flow
|
||||
/// control windows so that upper layers may poll for updates.
|
||||
/// - Sends window updates for the local stream and connection flow control windows as
|
||||
/// instructed by upper layers.
|
||||
///
|
||||
/// #### `StreamSendClose`
|
||||
///
|
||||
/// - Updates the stream state for frames receive` with END_STREAM.
|
||||
/// - Updates the stream state for frames sent with END_STREAM.
|
||||
///
|
||||
/// #### `StreamRecvClose`
|
||||
///
|
||||
/// - Updates the stream state for frames received with END_STREAM.
|
||||
///
|
||||
/// #### `FlowControlRecv`
|
||||
///
|
||||
/// - Tracks received data frames against the local stream and connection flow control
|
||||
/// windows.
|
||||
/// - Tracks remote settings updates to SETTINGS_INITIAL_WINDOW_SIZE.
|
||||
/// - Exposes `ControlFlowRecv` upwards.
|
||||
/// - Sends window updates for the local stream and connection flow control windows as
|
||||
/// instructed by upper layers.
|
||||
///
|
||||
/// #### `StreamRecvOpen`
|
||||
///
|
||||
@@ -190,11 +197,12 @@ type Transport<T, P, B>=
|
||||
|
||||
type Streams<T, P> =
|
||||
StreamSendOpen<
|
||||
StreamRecvClose<
|
||||
FlowControl<
|
||||
StreamSendClose<
|
||||
StreamRecvOpen<
|
||||
StreamStates<T, P>>>>>>;
|
||||
FlowControlSend<
|
||||
StreamSendClose<
|
||||
StreamRecvClose<
|
||||
FlowControlRecv<
|
||||
StreamRecvOpen<
|
||||
StreamStates<T, P>>>>>>>;
|
||||
|
||||
type Codec<T, B> =
|
||||
FramedRead<
|
||||
@@ -286,17 +294,18 @@ pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf>
|
||||
StreamSendOpen::new(
|
||||
initial_send_window_size,
|
||||
remote_max_concurrency,
|
||||
StreamRecvClose::new(
|
||||
FlowControl::new(
|
||||
initial_recv_window_size,
|
||||
initial_send_window_size,
|
||||
StreamSendClose::new(
|
||||
StreamRecvOpen::new(
|
||||
FlowControlSend::new(
|
||||
initial_send_window_size,
|
||||
StreamSendClose::new(
|
||||
StreamRecvClose::new(
|
||||
FlowControlRecv::new(
|
||||
initial_recv_window_size,
|
||||
local_max_concurrency,
|
||||
StreamStates::new(
|
||||
PingPong::new(
|
||||
FramedRead::new(framed))))))))
|
||||
StreamRecvOpen::new(
|
||||
initial_recv_window_size,
|
||||
local_max_concurrency,
|
||||
StreamStates::new(
|
||||
PingPong::new(
|
||||
FramedRead::new(framed)))))))))
|
||||
});
|
||||
|
||||
connection::new(transport)
|
||||
|
||||
@@ -214,14 +214,5 @@ impl<T: AsyncRead> AsyncRead for Settings<T> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ControlFlow> ControlFlow for Settings<T> {
|
||||
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
|
||||
self.inner.poll_window_update()
|
||||
}
|
||||
|
||||
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
|
||||
self.inner.expand_window(id, incr)
|
||||
}
|
||||
}
|
||||
|
||||
proxy_control_flow!(Settings);
|
||||
proxy_control_ping!(Settings);
|
||||
|
||||
Reference in New Issue
Block a user