This commit is contained in:
Oliver Gould
2017-07-10 00:46:20 +00:00
parent d7b82cd50b
commit d269029dd6
6 changed files with 227 additions and 121 deletions

View File

@@ -14,6 +14,13 @@ pub struct WindowUpdate {
} }
impl WindowUpdate { impl WindowUpdate {
pub fn new(stream_id: StreamId, increment: Increment) -> WindowUpdate {
WindowUpdate {
stream_id,
increment,
}
}
pub fn stream_id(&self) -> StreamId { pub fn stream_id(&self) -> StreamId {
self.stream_id self.stream_id
} }
@@ -22,13 +29,13 @@ impl WindowUpdate {
self.increment self.increment
} }
/// Builds a `Ping` frame from a raw frame. /// Builds a `WindowUpdate` frame from a raw frame.
pub fn load(head: Head, bytes: &[u8]) -> Result<WindowUpdate, Error> { pub fn load(head: Head, bytes: &[u8]) -> Result<WindowUpdate, Error> {
debug_assert_eq!(head.kind(), ::frame::Kind::WindowUpdate); debug_assert_eq!(head.kind(), ::frame::Kind::WindowUpdate);
Ok(WindowUpdate { Ok(WindowUpdate {
stream_id: head.stream_id(), stream_id: head.stream_id(),
// Clear the most significant bit, as that is reserved and MUST be ignored when // Clear the most significant bit, as that is reserved and MUST be ignored
// received. // when received.
increment: NetworkEndian::read_u32(bytes) & !INCREMENT_MASK, increment: NetworkEndian::read_u32(bytes) & !INCREMENT_MASK,
}) })
} }

View File

@@ -1,8 +1,8 @@
use {Frame, ConnectionError, Peer, StreamId}; use {Frame, ConnectionError, Peer, StreamId};
use client::Client; use client::Client;
use frame::{Frame as WireFrame}; use frame::{Frame as WireFrame};
use proto::{self, FlowController, ReadySink, PeerState, State, WindowUpdate};
use server::Server; use server::Server;
use proto::{self, ReadySink, State, WindowUpdate};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@@ -13,49 +13,11 @@ use futures::*;
use ordermap::OrderMap; use ordermap::OrderMap;
use fnv::FnvHasher; use fnv::FnvHasher;
use std::marker::PhantomData; use std::collections::VecDeque;
use std::hash::BuildHasherDefault; use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
pub struct FlowControlViolation; // TODO get window size from `inner`.
#[derive(Debug)]
struct FlowController {
window_size: u32,
underflow: u32,
}
impl FlowController {
pub fn new(window_size: u32) -> FlowController {
FlowController {
window_size,
underflow: 0,
}
}
pub fn shrink(&mut self, mut sz: u32) {
self.underflow += sz;
}
pub fn consume(&mut self, mut sz: u32) -> Result<(), FlowControlViolation> {
if sz < self.window_size {
self.underflow -= sz;
return Err(FlowControlViolation);
}
self.window_size -= sz;
Ok(())
}
pub fn increment(&mut self, mut sz: u32) {
if sz <= self.underflow {
self.underflow -= sz;
return;
}
sz -= self.underflow;
self.window_size += sz;
}
}
/// An H2 connection /// An H2 connection
#[derive(Debug)] #[derive(Debug)]
@@ -63,13 +25,24 @@ pub struct Connection<T, P> {
inner: proto::Inner<T>, inner: proto::Inner<T>,
streams: StreamMap<State>, streams: StreamMap<State>,
peer: PhantomData<P>, peer: PhantomData<P>,
/// Tracks connection-level flow control.
local_flow_controller: FlowController, local_flow_controller: FlowController,
remote_flow_controller: FlowController, initial_local_window_size: u32,
pending_local_window_updates: VecDeque<WindowUpdate>,
remote_flow_controller: FlowController,
initial_remote_window_size: u32,
pending_remote_window_updates: VecDeque<WindowUpdate>,
blocked_remote_window_update: Option<task::Task>
} }
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>; type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
pub fn new<T, P>(transport: proto::Inner<T>, initial_local_window_size: u32, initial_remote_window_size: u32) -> Connection<T, P> pub fn new<T, P>(transport: proto::Inner<T>,
initial_local_window_size: u32,
initial_remote_window_size: u32)
-> Connection<T, P>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,
P: Peer, P: Peer,
{ {
@@ -77,35 +50,70 @@ pub fn new<T, P>(transport: proto::Inner<T>, initial_local_window_size: u32, ini
inner: transport, inner: transport,
streams: StreamMap::default(), streams: StreamMap::default(),
peer: PhantomData, peer: PhantomData,
local_flow_controller: FlowController::new(initial_local_window_size), local_flow_controller: FlowController::new(initial_local_window_size),
initial_local_window_size,
pending_local_window_updates: VecDeque::default(),
remote_flow_controller: FlowController::new(initial_remote_window_size), remote_flow_controller: FlowController::new(initial_remote_window_size),
initial_remote_window_size,
pending_remote_window_updates: VecDeque::default(),
blocked_remote_window_update: None,
} }
} }
impl<T, P> Connection<T, P> { impl<T, P> Connection<T, P> {
/// Publishes stream window updates to the remote. /// Publishes local stream window updates to the remote.
/// ///
/// Connection window updates (StreamId=0) and stream window updates are published /// Connection window updates (StreamId=0) and stream window must be published
/// distinctly. /// distinctly.
pub fn increment_local_window(&mut self, up: WindowUpdate) { pub fn increment_local_window(&mut self, up: WindowUpdate) {
let incr = up.increment(); let added = match &up {
let flow = match up { &WindowUpdate::Connection { increment } => {
WindowUpdate::Connection { .. } => Some(&self.local_flow_controller), if increment == 0 {
WindowUpdate::Stream { id, .. } => { false
self.streams.get(&id).map(|s| s.local_flow_controller()) } else {
self.local_flow_controller.increment(increment);
true
}
}
&WindowUpdate::Stream { id, increment } => {
if increment == 0 {
false
} else {
match self.streams.get_mut(&id) {
Some(&mut State::Open { local: PeerState::Data(ref mut fc), .. }) |
Some(&mut State::HalfClosedRemote(PeerState::Data(ref mut fc))) => {
fc.increment(increment);
true
}
_ => false,
}
}
} }
}; };
if let Some(flow) = flow {
flow.increment(incr); if added {
self.pending_local_window_updates.push_back(up);
} }
unimplemented!()
} }
/// Advertises stream window updates from the remote. /// Advertises the remote's stream window updates.
/// ///
/// Connection window updates (StreamId=0) and stream window updates are advertised /// Connection window updates (StreamId=0) and stream window updates are advertised
/// distinctly. /// distinctly.
pub fn poll_remote_window(&mut self) -> Poll<WindowUpdate, ()> { fn increment_remote_window(&mut self, id: StreamId, incr: u32) {
if id.is_zero() {
self.remote_flow_controller.increment(incr);
} else {
match self.streams.get_mut(&id) {
Some(&mut State::Open { remote: PeerState::Data(ref mut fc), .. }) |
Some(&mut State::HalfClosedLocal(PeerState::Data(ref mut fc))) => {
fc.increment(incr);
}
_ => {}
}
}
unimplemented!() unimplemented!()
} }
} }
@@ -154,61 +162,70 @@ impl<T, P> Stream for Connection<T, P>
fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> { fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> {
trace!("Connection::poll"); trace!("Connection::poll");
let frame = match try!(self.inner.poll()) { loop {
Async::Ready(f) => f, let frame = match try!(self.inner.poll()) {
Async::NotReady => { Async::Ready(f) => f,
// Because receiving new frames may depend on ensuring that the Async::NotReady => {
// write buffer is clear, `poll_complete` is called here. // Because receiving new frames may depend on ensuring that the
let _ = try!(self.poll_complete()); // write buffer is clear, `poll_complete` is called here.
return Ok(Async::NotReady); let _ = try!(self.poll_complete());
} return Ok(Async::NotReady);
}; }
};
trace!("received; frame={:?}", frame); trace!("received; frame={:?}", frame);
let frame = match frame { let frame = match frame {
Some(WireFrame::Headers(v)) => { Some(WireFrame::Headers(v)) => {
// TODO: Update stream state // TODO: Update stream state
let stream_id = v.stream_id(); let stream_id = v.stream_id();
let end_of_stream = v.is_end_stream(); let end_of_stream = v.is_end_stream();
let stream_initialized = try!(self.streams.entry(stream_id) // TODO load window size from settings.
.or_insert(State::default()) let init_window_size = 65_535;
.recv_headers::<P>(end_of_stream));
if stream_initialized { let stream_initialized = try!(self.streams.entry(stream_id)
// TODO: Ensure available capacity for a new stream .or_insert(State::default())
// This won't be as simple as self.streams.len() as closed .recv_headers::<P>(end_of_stream, init_window_size));
// connections should not be factored.
if !P::is_valid_remote_stream_id(stream_id) { if stream_initialized {
unimplemented!(); // TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
if !P::is_valid_remote_stream_id(stream_id) {
unimplemented!();
}
}
Frame::Headers {
id: stream_id,
headers: P::convert_poll_message(v),
end_of_stream: end_of_stream,
} }
} }
Some(WireFrame::Data(v)) => {
// TODO: Validate frame
Frame::Headers { let stream_id = v.stream_id();
id: stream_id, let end_of_stream = v.is_end_stream();
headers: P::convert_poll_message(v),
end_of_stream: end_of_stream, Frame::Body {
id: stream_id,
chunk: v.into_payload(),
end_of_stream: end_of_stream,
}
} }
} Some(WireFrame::WindowUpdate(v)) => {
Some(WireFrame::Data(v)) => { self.increment_remote_window(v.stream_id(), v.increment());
// TODO: Validate frame continue;
let stream_id = v.stream_id();
let end_of_stream = v.is_end_stream();
Frame::Body {
id: stream_id,
chunk: v.into_payload(),
end_of_stream: end_of_stream,
} }
} Some(frame) => panic!("unexpected frame; frame={:?}", frame),
Some(frame) => panic!("unexpected frame; frame={:?}", frame), None => return Ok(Async::Ready(None)),
None => return Ok(Async::Ready(None)), };
};
Ok(Async::Ready(Some(frame))) return Ok(Async::Ready(Some(frame)));
}
} }
} }
@@ -229,13 +246,15 @@ impl<T, P> Sink for Connection<T, P>
match item { match item {
Frame::Headers { id, headers, end_of_stream } => { Frame::Headers { id, headers, end_of_stream } => {
// TODO load window size from settings.
let init_window_size = 65_535;
// Transition the stream state, creating a new entry if needed // Transition the stream state, creating a new entry if needed
//
// TODO: Response can send multiple headers frames before body // TODO: Response can send multiple headers frames before body
// (1xx responses). // (1xx responses).
let stream_initialized = try!(self.streams.entry(id) let stream_initialized = try!(self.streams.entry(id)
.or_insert(State::default()) .or_insert(State::default())
.send_headers::<P>(end_of_stream)); .send_headers::<P>(end_of_stream, init_window_size));
if stream_initialized { if stream_initialized {
// TODO: Ensure available capacity for a new stream // TODO: Ensure available capacity for a new stream

40
src/proto/flow_control.rs Normal file
View File

@@ -0,0 +1,40 @@
#[derive(Clone, Copy, Debug)]
pub struct WindowUnderflow;
#[derive(Copy, Clone, Debug)]
pub struct FlowController {
window_size: u32,
underflow: u32,
}
impl FlowController {
pub fn new(window_size: u32) -> FlowController {
FlowController {
window_size,
underflow: 0,
}
}
pub fn shrink(&mut self, sz: u32) {
self.underflow += sz;
}
pub fn consume(&mut self, sz: u32) -> Result<(), WindowUnderflow> {
if self.window_size < sz {
return Err(WindowUnderflow);
}
self.window_size -= sz;
Ok(())
}
pub fn increment(&mut self, sz: u32) {
if sz <= self.underflow {
self.underflow -= sz;
return;
}
self.window_size += sz - self.underflow;
self.underflow = 0;
}
}

View File

@@ -1,4 +1,5 @@
mod connection; mod connection;
mod flow_control;
mod framed_read; mod framed_read;
mod framed_write; mod framed_write;
mod ping_pong; mod ping_pong;
@@ -7,13 +8,14 @@ mod settings;
mod state; mod state;
mod window_update; mod window_update;
pub use self::connection::{Connection}; pub use self::connection::Connection;
pub use self::flow_control::FlowController;
pub use self::framed_read::FramedRead; pub use self::framed_read::FramedRead;
pub use self::framed_write::FramedWrite; pub use self::framed_write::FramedWrite;
pub use self::ping_pong::PingPong; pub use self::ping_pong::PingPong;
pub use self::ready::ReadySink; pub use self::ready::ReadySink;
pub use self::settings::Settings; pub use self::settings::Settings;
pub use self::state::State; pub use self::state::{PeerState, State};
pub use self::window_update::WindowUpdate; pub use self::window_update::WindowUpdate;
use {frame, Peer}; use {frame, Peer};

View File

@@ -1,4 +1,5 @@
use {ConnectionError, Reason, Peer}; use {ConnectionError, Reason, Peer};
use proto::FlowController;
/// Represents the state of an H2 stream /// Represents the state of an H2 stream
/// ///
@@ -40,7 +41,7 @@ use {ConnectionError, Reason, Peer};
/// ES: END_STREAM flag /// ES: END_STREAM flag
/// R: RST_STREAM frame /// R: RST_STREAM frame
/// ``` /// ```
#[derive(Debug, Copy, Clone, Eq, PartialEq)] #[derive(Debug, Copy, Clone)]
pub enum State { pub enum State {
Idle, Idle,
ReservedLocal, ReservedLocal,
@@ -54,18 +55,35 @@ pub enum State {
Closed, Closed,
} }
#[derive(Debug, Copy, Clone, Eq, PartialEq)] #[derive(Debug, Copy, Clone)]
pub enum PeerState { pub enum PeerState {
Headers, Headers,
Data, Data(FlowController),
} }
impl State { impl State {
pub fn increment_local_window_size(&mut self, incr: u32) {
use self::State::*;
use self::PeerState::*;
*self = match *self {
Open { local: Data(mut local), remote } => {
local.increment(incr);
Open { local: Data(local), remote }
}
HalfClosedRemote(Data(mut local)) => {
local.increment(incr);
HalfClosedRemote(Data(local))
}
s => s,
}
}
/// Transition the state to represent headers being received. /// Transition the state to represent headers being received.
/// ///
/// Returns true if this state transition results in iniitializing the /// Returns true if this state transition results in iniitializing the
/// stream id. `Err` is returned if this is an invalid state transition. /// stream id. `Err` is returned if this is an invalid state transition.
pub fn recv_headers<P: Peer>(&mut self, eos: bool) -> Result<bool, ConnectionError> { pub fn recv_headers<P: Peer>(&mut self, eos: bool, remote_window_size: u32) -> Result<bool, ConnectionError> {
use self::State::*; use self::State::*;
use self::PeerState::*; use self::PeerState::*;
@@ -76,7 +94,7 @@ impl State {
} else { } else {
Open { Open {
local: Headers, local: Headers,
remote: Data, remote: Data(FlowController::new(remote_window_size)),
} }
}; };
@@ -88,7 +106,7 @@ impl State {
*self = if eos { *self = if eos {
HalfClosedRemote(local) HalfClosedRemote(local)
} else { } else {
let remote = Data; let remote = Data(FlowController::new(remote_window_size));
Open { local, remote } Open { local, remote }
}; };
@@ -100,7 +118,7 @@ impl State {
*self = if eos { *self = if eos {
Closed Closed
} else { } else {
HalfClosedLocal(Data) HalfClosedLocal(Data(FlowController::new(remote_window_size)))
}; };
Ok(false) Ok(false)
@@ -116,7 +134,7 @@ impl State {
/// ///
/// Returns true if this state transition results in initializing the stream /// Returns true if this state transition results in initializing the stream
/// id. `Err` is returned if this is an invalid state transition. /// id. `Err` is returned if this is an invalid state transition.
pub fn send_headers<P: Peer>(&mut self, eos: bool) -> Result<bool, ConnectionError> { pub fn send_headers<P: Peer>(&mut self, eos: bool, local_window_size: u32) -> Result<bool, ConnectionError> {
use self::State::*; use self::State::*;
use self::PeerState::*; use self::PeerState::*;
@@ -126,7 +144,7 @@ impl State {
HalfClosedLocal(Headers) HalfClosedLocal(Headers)
} else { } else {
Open { Open {
local: Data, local: Data(FlowController::new(local_window_size)),
remote: Headers, remote: Headers,
} }
}; };
@@ -139,7 +157,7 @@ impl State {
*self = if eos { *self = if eos {
HalfClosedLocal(remote) HalfClosedLocal(remote)
} else { } else {
let local = Data; let local = Data(FlowController::new(local_window_size));
Open { local, remote } Open { local, remote }
}; };
@@ -151,7 +169,7 @@ impl State {
*self = if eos { *self = if eos {
Closed Closed
} else { } else {
HalfClosedRemote(Data) HalfClosedRemote(Data(FlowController::new(local_window_size)))
}; };
Ok(false) Ok(false)

View File

@@ -1,16 +1,36 @@
use StreamId; use StreamId;
use frame;
#[derive(Debug)] #[derive(Debug)]
pub enum WindowUpdate { pub enum WindowUpdate {
Connection { increment: usize }, Connection { increment: u32 },
Stream { id: StreamId, increment: usize }, Stream { id: StreamId, increment: u32 },
} }
impl WindowUpdate { impl WindowUpdate {
pub fn increment(&self) -> usize { pub fn increment(&self) -> u32 {
match *self { match *self {
WindowUpdate::Connection { increment } | WindowUpdate::Connection { increment } |
WindowUpdate::Stream { increment, .. } => increment WindowUpdate::Stream { increment, .. } => increment
} }
} }
} }
impl From<WindowUpdate> for frame::WindowUpdate {
fn from(src: WindowUpdate) -> Self {
match src {
WindowUpdate::Connection { increment } => {
frame::WindowUpdate::new(StreamId::zero(), increment)
}
WindowUpdate::Stream { id, increment } => {
frame::WindowUpdate::new(id, increment)
}
}
}
}
impl From<WindowUpdate> for frame::Frame {
fn from(src: WindowUpdate) -> Self {
frame::Frame::WindowUpdate(src.into())
}
}