closer to flow control

This commit is contained in:
Oliver Gould
2017-07-12 21:04:58 +00:00
parent b9f3556070
commit 41ffd1d44f
11 changed files with 337 additions and 179 deletions

View File

@@ -2,7 +2,7 @@ use Frame;
use client::Client;
use error::{self, ConnectionError};
use frame::{self, StreamId};
use proto::{self, Peer, ReadySink, State, PeerState, WindowUpdate, FlowController};
use proto::{self, Peer, ReadySink, State, FlowController};
use server::Server;
use tokio_io::{AsyncRead, AsyncWrite};
@@ -15,12 +15,9 @@ use futures::*;
use ordermap::OrderMap;
use fnv::FnvHasher;
use std::collections::VecDeque;
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
// TODO get window size from `inner`.
/// An H2 connection
#[derive(Debug)]
pub struct Connection<T, P, B: IntoBuf = Bytes> {
@@ -30,95 +27,89 @@ pub struct Connection<T, P, B: IntoBuf = Bytes> {
/// Tracks connection-level flow control.
local_flow_controller: FlowController,
initial_local_window_size: u32,
pending_local_window_updates: VecDeque<WindowUpdate>,
remote_flow_controller: FlowController,
initial_remote_window_size: u32,
pending_remote_window_updates: VecDeque<WindowUpdate>,
blocked_remote_window_update: Option<task::Task>
pending_local_window_update: Option<frame::WindowUpdate>,
blocked_remote_window_update: Option<task::Task>,
}
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
pub fn new<T, P, B>(transport: proto::Inner<T, B::Buf>,
initial_local_window_size: u32,
initial_remote_window_size: u32)
pub fn new<T, P, B>(transport: proto::Inner<T, B::Buf>)
-> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
let local_window_size = transport.local_settings().initial_window_size();
let remote_window_size = transport.remote_settings().initial_window_size();
Connection {
inner: transport,
streams: StreamMap::default(),
peer: PhantomData,
local_flow_controller: FlowController::new(initial_local_window_size),
initial_local_window_size,
pending_local_window_updates: VecDeque::default(),
local_flow_controller: FlowController::new(local_window_size),
remote_flow_controller: FlowController::new(remote_window_size),
remote_flow_controller: FlowController::new(initial_remote_window_size),
initial_remote_window_size,
pending_remote_window_updates: VecDeque::default(),
pending_local_window_update: None,
blocked_remote_window_update: None,
}
}
impl<T, P, B: IntoBuf> Connection<T, P, B> {
pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<u32, ConnectionError> {
if id.is_zero() {
return match self.local_flow_controller.take_window_update() {
Some(incr) => Ok(Async::Ready(incr)),
None => {
self.blocked_remote_window_update = Some(task::current());
Ok(Async::NotReady)
}
};
}
match self.streams.get_mut(&id).and_then(|mut s| s.take_remote_window_update()) {
Some(incr) => Ok(Async::Ready(incr)),
None => {
self.blocked_remote_window_update = Some(task::current());
Ok(Async::NotReady)
}
}
}
/// Publishes local stream window updates to the remote.
///
/// Connection window updates (StreamId=0) and stream window must be published
/// distinctly.
pub fn increment_local_window(&mut self, up: WindowUpdate) {
let added = match &up {
&WindowUpdate::Connection { increment } => {
if increment == 0 {
false
} else {
self.local_flow_controller.increment(increment);
true
}
}
&WindowUpdate::Stream { id, increment } => {
if increment == 0 {
false
} else {
match self.streams.get_mut(&id) {
Some(&mut State::Open { local: PeerState::Data(ref mut fc), .. }) |
Some(&mut State::HalfClosedRemote(PeerState::Data(ref mut fc))) => {
fc.increment(increment);
true
}
_ => false,
}
}
}
pub fn init_send_window_update(&mut self, id: StreamId, incr: u32) {
assert!(self.pending_local_window_update.is_none());
let added = if id.is_zero() {
self.remote_flow_controller.add_to_window(incr);
self.remote_flow_controller.take_window_update()
} else {
self.streams.get_mut(&id).and_then(|mut s| s.send_window_update(incr))
};
if added {
self.pending_local_window_updates.push_back(up);
if let Some(added) = added {
self.pending_local_window_update = Some(frame::WindowUpdate::new(id, added));
}
}
/// Advertises the remote's stream window updates.
///
/// Connection window updates (StreamId=0) and stream window updates are advertised
/// Connection window updates (id=0) and stream window updates are advertised
/// distinctly.
fn increment_remote_window(&mut self, id: StreamId, incr: u32) {
fn recv_window_update(&mut self, id: StreamId, incr: u32) {
if id.is_zero() {
self.remote_flow_controller.increment(incr);
} else {
match self.streams.get_mut(&id) {
Some(&mut State::Open { remote: PeerState::Data(ref mut fc), .. }) |
Some(&mut State::HalfClosedLocal(PeerState::Data(ref mut fc))) => {
fc.increment(incr);
}
_ => {}
}
return self.remote_flow_controller.add_to_window(incr);
}
if let Some(mut s) = self.streams.get_mut(&id) {
s.recv_window_update(incr);
}
unimplemented!()
}
}
@@ -127,16 +118,28 @@ impl<T, P, B> Connection<T, P, B>
P: Peer,
B: IntoBuf,
{
fn poll_send_window_update(&mut self) -> Poll<(), ConnectionError> {
if let Some(f) = self.pending_local_window_update.take() {
if self.inner.start_send(f.into())?.is_not_ready() {
self.pending_local_window_update = Some(f);
return Ok(Async::NotReady);
}
}
Ok(Async::Ready(()))
}
pub fn send_data(self,
id: StreamId,
data: B,
data_len: usize,
end_of_stream: bool)
-> sink::Send<Self>
{
self.send(Frame::Data {
id: id,
data: data,
end_of_stream: end_of_stream,
id,
data_len,
data,
end_of_stream,
})
}
}
@@ -201,15 +204,13 @@ impl<T, P, B> Stream for Connection<T, P, B>
};
trace!("received; frame={:?}", frame);
let frame = match frame {
Some(Headers(v)) => {
// TODO: Update stream state
let stream_id = v.stream_id();
let end_of_stream = v.is_end_stream();
// TODO load window size from settings.
let init_window_size = 65_535;
let init_window_size = self.inner.local_settings().initial_window_size();
let stream_initialized = try!(self.streams.entry(stream_id)
.or_insert(State::default())
@@ -231,26 +232,27 @@ impl<T, P, B> Stream for Connection<T, P, B>
end_of_stream: end_of_stream,
}
}
Some(Data(v)) => {
// TODO: Validate frame
Some(Data(v)) => {
let stream_id = v.stream_id();
let end_of_stream = v.is_end_stream();
match self.streams.get_mut(&stream_id) {
None => return Err(error::Reason::ProtocolError.into()),
Some(state) => try!(state.recv_data(end_of_stream)),
Some(state) => try!(state.recv_data(end_of_stream, v.len())),
}
Frame::Data {
id: stream_id,
data_len: v.len(),
data: v.into_payload(),
end_of_stream: end_of_stream,
end_of_stream,
}
}
Some(WindowUpdate(v)) => {
self.increment_remote_window(v.stream_id(), v.size_increment());
self.recv_window_update(v.stream_id(), v.size_increment());
continue;
}
Some(frame) => panic!("unexpected frame; frame={:?}", frame),
None => return Ok(Async::Ready(None)),
};
@@ -268,24 +270,31 @@ impl<T, P, B> Sink for Connection<T, P, B>
type SinkItem = Frame<P::Send, B>;
type SinkError = ConnectionError;
/// Sends a frame to the remote.
fn start_send(&mut self, item: Self::SinkItem)
-> StartSend<Self::SinkItem, Self::SinkError>
{
use frame::Frame::Headers;
// First ensure that the upstream can process a new item
if !try!(self.poll_ready()).is_ready() {
// First ensure that the upstream can process a new item. This ensures, for
// instance, that any pending local window updates have been sent to the remote
// before sending any other frames.
if try!(self.poll_ready()).is_not_ready() {
return Ok(AsyncSink::NotReady(item));
}
assert!(self.pending_local_window_update.is_none());
match item {
Frame::Headers { id, headers, end_of_stream } => {
// TODO load window size from settings.
let init_window_size = 65_535;
let init_window_size = self.inner.remote_settings().initial_window_size();
// Transition the stream state, creating a new entry if needed
// TODO: Response can send multiple headers frames before body
// (1xx responses).
//
// TODO: Response can send multiple headers frames before body (1xx
// responses).
//
// ACTUALLY(ver), maybe not?
// https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784
let stream_initialized = try!(self.streams.entry(id)
.or_insert(State::default())
.send_headers::<P>(end_of_stream, init_window_size));
@@ -294,7 +303,6 @@ impl<T, P, B> Sink for Connection<T, P, B>
// TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
//
if !P::is_valid_local_stream_id(id) {
// TODO: clear state
return Err(error::User::InvalidStreamId.into());
@@ -314,25 +322,26 @@ impl<T, P, B> Sink for Connection<T, P, B>
Ok(AsyncSink::Ready)
}
Frame::Data { id, data, end_of_stream } => {
Frame::Data { id, data, data_len, end_of_stream } => {
// The stream must be initialized at this point
match self.streams.get_mut(&id) {
None => return Err(error::User::InactiveStreamId.into()),
Some(state) => try!(state.send_data(end_of_stream)),
Some(state) => try!(state.send_data(end_of_stream, data_len)),
}
let mut frame = frame::Data::new(id, data.into_buf());
let mut frame = frame::Data::from_buf(id, data.into_buf());
if end_of_stream {
frame.set_end_stream();
}
let res = try!(self.inner.start_send(frame.into()));
// poll_ready has already been called.
assert!(res.is_ready());
Ok(AsyncSink::Ready)
}
/*
Frame::Trailers { id, headers } => {
unimplemented!();
@@ -352,6 +361,7 @@ impl<T, P, B> Sink for Connection<T, P, B>
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.poll_send_window_update());
self.inner.poll_complete()
}
}
@@ -362,6 +372,7 @@ impl<T, P, B> ReadySink for Connection<T, P, B>
B: IntoBuf,
{
fn poll_ready(&mut self) -> Poll<(), Self::SinkError> {
try_ready!(self.poll_send_window_update());
self.inner.poll_ready()
}
}

View File

@@ -1,10 +1,23 @@
#[derive(Clone, Copy, Debug)]
pub struct WindowUnderflow;
pub const DEFAULT_INITIAL_WINDOW_SIZE: u32 = 65_535;
#[derive(Copy, Clone, Debug)]
pub struct FlowController {
/// Amount that may be claimed.
window_size: u32,
/// Amount to be removed by future increments.
underflow: u32,
/// The amount that has been incremented but not yet advertised (to the application or
/// the remote).
next_window_update: u32,
}
impl Default for FlowController {
fn default() -> Self {
Self::new(DEFAULT_INITIAL_WINDOW_SIZE)
}
}
impl FlowController {
@@ -12,14 +25,23 @@ impl FlowController {
FlowController {
window_size,
underflow: 0,
next_window_update: 0,
}
}
pub fn shrink(&mut self, sz: u32) {
self.underflow += sz;
pub fn window_size(&self) -> u32 {
self.window_size
}
pub fn consume(&mut self, sz: u32) -> Result<(), WindowUnderflow> {
/// Reduce future capacity of the window.
///
/// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE.
pub fn shrink_window(&mut self, decr: u32) {
self.underflow += decr;
}
/// Claim the provided amount from the window, if there is enough space.
pub fn claim_window(&mut self, sz: u32) -> Result<(), WindowUnderflow> {
if self.window_size < sz {
return Err(WindowUnderflow);
}
@@ -28,13 +50,27 @@ impl FlowController {
Ok(())
}
pub fn increment(&mut self, sz: u32) {
/// Applies a window increment immediately.
pub fn add_to_window(&mut self, sz: u32) {
if sz <= self.underflow {
self.underflow -= sz;
return;
}
self.window_size += sz - self.underflow;
let added = sz - self.underflow;
self.window_size += added;
self.next_window_update += added;
self.underflow = 0;
}
/// Obtains and clears an unadvertised window update.
pub fn take_window_update(&mut self) -> Option<u32> {
if self.next_window_update == 0 {
return None;
}
let incr = self.next_window_update;
self.next_window_update = 0;
Some(incr)
}
}

View File

@@ -93,5 +93,5 @@ pub fn from_server_handshaker<T, P, B>(transport: Settings<FramedWrite<T, B::Buf
});
// Finally, return the constructed `Connection`
connection::new(settings, 65_535, 65_535)
connection::new(settings)
}

View File

@@ -43,6 +43,14 @@ impl<T, U> Settings<T>
}
}
pub fn local_settings(&self) -> &frame::SettingSet {
&self.local
}
pub fn remote_settings(&self) -> &frame::SettingSet {
&self.local
}
/// Swap the inner transport while maintaining the current state.
pub fn swap_inner<T2, F: FnOnce(T) -> T2>(self, f: F) -> Settings<T2> {
let inner = f(self.inner);

View File

@@ -58,106 +58,178 @@ pub enum State {
Closed,
}
#[derive(Debug, Copy, Clone)]
pub enum PeerState {
Headers,
Data(FlowController),
}
impl State {
pub fn increment_local_window_size(&mut self, incr: u32) {
/// Updates the local flow controller with the given window size increment.
///
/// Returns the amount of capacity created, accounting for window size changes. The
/// caller should send the the returned window size increment to the remote.
///
/// If the remote is closed, None is returned.
pub fn send_window_update(&mut self, incr: u32) -> Option<u32> {
use self::State::*;
use self::PeerState::*;
*self = match *self {
Open { local: Data(mut local), remote } => {
local.increment(incr);
Open { local: Data(local), remote }
}
HalfClosedRemote(Data(mut local)) => {
local.increment(incr);
HalfClosedRemote(Data(local))
}
s => s,
if incr == 0 {
return None;
}
match self {
&mut Open { local: Data(ref mut fc), .. } |
&mut HalfClosedRemote(Data(ref mut fc)) => {
fc.add_to_window(incr);
fc.take_window_update()
}
_ => None,
}
}
/// Updates the remote flow controller with the given window size increment.
///
/// Returns the amount of capacity created, accounting for window size changes. The
/// caller should send the the returned window size increment to the remote.
pub fn recv_window_update(&mut self, incr: u32) {
use self::State::*;
use self::PeerState::*;
if incr == 0 {
return;
}
match self {
&mut Open { remote: Data(ref mut fc), .. } |
&mut HalfClosedLocal(Data(ref mut fc)) => fc.add_to_window(incr),
_ => {},
}
}
pub fn take_remote_window_update(&mut self) -> Option<u32> {
use self::State::*;
use self::PeerState::*;
match self {
&mut Open { remote: Data(ref mut fc), .. } |
&mut HalfClosedLocal(Data(ref mut fc)) => fc.take_window_update(),
_ => None,
}
}
/// Applies an update to the remote's initial window size.
///
/// Per RFC 7540 §6.9.2
///
/// > In addition to changing the flow-control window for streams that are not yet
/// > active, a SETTINGS frame can alter the initial flow-control window size for
/// > streams with active flow-control windows (that is, streams in the "open" or
/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE
/// > changes, a receiver MUST adjust the size of all stream flow-control windows that
/// > it maintains by the difference between the new value and the old value.
/// >
/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a
/// > flow-control window to become negative. A sender MUST track the negative
/// > flow-control window and MUST NOT send new flow-controlled frames until it
/// > receives WINDOW_UPDATE frames that cause the flow-control window to become
/// > positive.
pub fn update_remote_initial_window_size(&mut self, old: u32, new: u32) {
use self::State::*;
use self::PeerState::*;
match self {
&mut Open { remote: Data(ref mut fc), .. } |
&mut HalfClosedLocal(Data(ref mut fc)) => {
if new < old {
fc.shrink_window(old - new);
} else {
fc.add_to_window(new - old);
}
}
_ => {}
}
}
/// TODO Connection doesn't have an API for local updates yet.
pub fn update_local_initial_window_size(&mut self, _old: u32, _new: u32) {
//use self::State::*;
//use self::PeerState::*;
unimplemented!()
}
/// Transition the state to represent headers being received.
///
/// Returns true if this state transition results in iniitializing the
/// stream id. `Err` is returned if this is an invalid state transition.
pub fn recv_headers<P: Peer>(&mut self, eos: bool, remote_window_size: u32) -> Result<bool, ConnectionError> {
pub fn recv_headers<P: Peer>(&mut self,
eos: bool,
remote_window_size: u32)
-> Result<bool, ConnectionError>
{
use self::State::*;
use self::PeerState::*;
match *self {
Idle => {
*self = if eos {
HalfClosedRemote(Headers)
let local = Headers;
if eos {
*self = HalfClosedRemote(local);
} else {
Open {
local: Headers,
remote: Data(FlowController::new(remote_window_size)),
}
};
*self = Open { local, remote: Data(FlowController::new(remote_window_size)) };
}
Ok(true)
}
Open { local, remote } => {
try!(remote.check_is_headers(ProtocolError.into()));
*self = if eos {
HalfClosedRemote(local)
} else {
let remote = Data(FlowController::new(remote_window_size));
Open { local, remote }
};
if !eos {
// Received non-trailers HEADERS on open remote.
return Err(ProtocolError.into());
}
*self = HalfClosedRemote(local);
Ok(false)
}
HalfClosedLocal(remote) => {
try!(remote.check_is_headers(ProtocolError.into()));
*self = if eos {
Closed
HalfClosedLocal(headers) => {
try!(headers.check_is_headers(ProtocolError.into()));
if eos {
*self = Closed;
} else {
HalfClosedLocal(Data(FlowController::new(remote_window_size)))
*self = HalfClosedLocal(Data(FlowController::new(remote_window_size)));
};
Ok(false)
}
Closed | HalfClosedRemote(..) => {
Err(ProtocolError.into())
}
_ => unimplemented!(),
}
}
pub fn recv_data(&mut self, eos: bool) -> Result<(), ConnectionError> {
pub fn recv_data(&mut self, eos: bool, len: usize) -> Result<(), ConnectionError> {
use self::State::*;
match *self {
Open { local, remote } => {
try!(remote.check_is_data(ProtocolError.into()));
try!(remote.check_window_size(len, FlowControlError.into()));
if eos {
*self = HalfClosedRemote(local);
}
Ok(())
}
HalfClosedLocal(remote) => {
try!(remote.check_is_data(ProtocolError.into()));
try!(remote.check_window_size(len, FlowControlError.into()));
if eos {
*self = Closed;
}
Ok(())
}
Closed | HalfClosedRemote(..) => {
Err(ProtocolError.into())
}
_ => unimplemented!(),
}
}
@@ -183,6 +255,7 @@ impl State {
Ok(true)
}
Open { local, remote } => {
try!(local.check_is_headers(UnexpectedFrameType.into()));
@@ -195,6 +268,7 @@ impl State {
Ok(false)
}
HalfClosedRemote(local) => {
try!(local.check_is_headers(UnexpectedFrameType.into()));
@@ -206,67 +280,84 @@ impl State {
Ok(false)
}
Closed | HalfClosedLocal(..) => {
Err(UnexpectedFrameType.into())
}
_ => unimplemented!(),
}
}
pub fn send_data(&mut self, eos: bool) -> Result<(), ConnectionError> {
pub fn send_data(&mut self, eos: bool, len: usize) -> Result<(), ConnectionError> {
use self::State::*;
match *self {
Open { local, remote } => {
try!(local.check_is_data(UnexpectedFrameType.into()));
try!(local.check_window_size(len, FlowControlViolation.into()));
if eos {
*self = HalfClosedLocal(remote);
}
Ok(())
}
HalfClosedRemote(local) => {
try!(local.check_is_data(UnexpectedFrameType.into()));
try!(local.check_window_size(len, FlowControlViolation.into()));
if eos {
*self = Closed;
}
Ok(())
}
Closed | HalfClosedLocal(..) => {
Err(UnexpectedFrameType.into())
}
_ => unimplemented!(),
}
}
}
impl PeerState {
#[inline]
fn check_is_headers(&self, err: ConnectionError) -> Result<(), ConnectionError> {
use self::PeerState::*;
match *self {
Headers => Ok(()),
_ => Err(err),
}
}
#[inline]
fn check_is_data(&self, err: ConnectionError) -> Result<(), ConnectionError> {
use self::PeerState::*;
match *self {
Data { .. } => Ok(()),
_ => Err(err),
}
}
}
impl Default for State {
fn default() -> State {
State::Idle
}
}
#[derive(Debug, Copy, Clone)]
pub enum PeerState {
Headers,
/// Contains a FlowController representing the _receiver_ of this this data stream.
Data(FlowController),
}
impl PeerState {
#[inline]
fn check_is_headers(&self, err: ConnectionError) -> Result<(), ConnectionError> {
use self::PeerState::*;
match self {
&Headers => Ok(()),
_ => Err(err),
}
}
#[inline]
fn check_is_data(&self, err: ConnectionError) -> Result<(), ConnectionError> {
use self::PeerState::*;
match self {
&Data(_) => Ok(()),
_ => Err(err),
}
}
#[inline]
fn check_window_size(&self, len: usize, err: ConnectionError) -> Result<(), ConnectionError> {
use self::PeerState::*;
match self {
&Data(ref fc) if len <= fc.window_size() as usize=> Ok(()),
_ => Err(err),
}
}
}