wip: Sketch out stream state refactor

introduce the StreamTransporter trait, which exposes a map containing
all active stream states.

add skeletons for StreamTracker and FlowControl.  StreamTracker drives
all state changes
This commit is contained in:
Oliver Gould
2017-07-15 18:39:45 +00:00
parent d0c55c52e9
commit 1ed4b7e56a
8 changed files with 270 additions and 141 deletions

View File

@@ -2,7 +2,7 @@ use {Frame, FrameSize};
use client::Client; use client::Client;
use error::{self, ConnectionError}; use error::{self, ConnectionError};
use frame::{self, StreamId}; use frame::{self, StreamId};
use proto::{self, Peer, ReadySink, State, FlowController, WindowSize}; use proto::{self, Peer, ReadySink, StreamState, FlowController, WindowSize};
use server::Server; use server::Server;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@@ -14,15 +14,14 @@ use futures::*;
use ordermap::OrderMap; use ordermap::OrderMap;
use fnv::FnvHasher; use fnv::FnvHasher;
use std::hash::BuildHasherDefault; use std::hash::BuildHasherDefault;
use std::marker::PhantomData; use std::marker::PhantomData;
/// An H2 connection /// An H2 connection
#[derive(Debug)] #[derive(Debug)]
pub struct Connection<T, P, B: IntoBuf = Bytes> { pub struct Connection<T, P, B: IntoBuf = Bytes> {
inner: proto::Inner<T, B::Buf>, inner: proto::Transport<T, B::Buf>,
streams: StreamMap<State>, streams: StreamMap<StreamState>,
peer: PhantomData<P>, peer: PhantomData<P>,
/// Tracks the connection-level flow control window for receiving data from the /// Tracks the connection-level flow control window for receiving data from the
@@ -41,7 +40,7 @@ pub struct Connection<T, P, B: IntoBuf = Bytes> {
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>; type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
pub fn new<T, P, B>(transport: proto::Inner<T, B::Buf>) pub fn new<T, P, B>(transport: proto::Transport<T, B::Buf>)
-> Connection<T, P, B> -> Connection<T, P, B>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,
P: Peer, P: Peer,
@@ -256,7 +255,7 @@ impl<T, P, B> Stream for Connection<T, P, B>
let init_window_size = self.inner.local_settings().initial_window_size(); let init_window_size = self.inner.local_settings().initial_window_size();
let stream_initialized = try!(self.streams.entry(stream_id) let stream_initialized = try!(self.streams.entry(stream_id)
.or_insert(State::default()) .or_insert(StreamState::default())
.recv_headers::<P>(end_of_stream, init_window_size)); .recv_headers::<P>(end_of_stream, init_window_size));
if stream_initialized { if stream_initialized {
@@ -347,7 +346,7 @@ impl<T, P, B> Sink for Connection<T, P, B>
// ACTUALLY(ver), maybe not? // ACTUALLY(ver), maybe not?
// https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784
let stream_initialized = try!(self.streams.entry(id) let stream_initialized = try!(self.streams.entry(id)
.or_insert(State::default()) .or_insert(StreamState::default())
.send_headers::<P>(end_of_stream, init_window_size)); .send_headers::<P>(end_of_stream, init_window_size));
if stream_initialized { if stream_initialized {

View File

@@ -1,77 +1,70 @@
use proto::WindowSize; use ConnectionError;
use frame::{self, Frame};
use proto::{ReadySink, StreamMap, StreamTransporter, WindowSize};
#[derive(Clone, Copy, Debug)] use futures::*;
pub struct WindowUnderflow;
pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; #[derive(Debug)]
pub struct FlowControl<T> {
#[derive(Copy, Clone, Debug)] inner: T,
pub struct FlowController {
/// Amount that may be claimed.
window_size: WindowSize,
/// Amount to be removed by future increments.
underflow: WindowSize,
/// The amount that has been incremented but not yet advertised (to the application or
/// the remote).
next_window_update: WindowSize,
} }
impl Default for FlowController { impl<T, U> FlowControl<T>
fn default() -> Self { where T: Stream<Item = Frame, Error = ConnectionError>,
Self::new(DEFAULT_INITIAL_WINDOW_SIZE) T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: StreamTransporter
{
pub fn new(inner: T) -> FlowControl<T> {
FlowControl { inner }
} }
} }
impl FlowController { impl<T: StreamTransporter> StreamTransporter for FlowControl<T> {
pub fn new(window_size: WindowSize) -> FlowController { fn streams(&self) -> &StreamMap {
FlowController { self.inner.streams()
window_size,
underflow: 0,
next_window_update: 0,
}
} }
/// Reduce future capacity of the window. fn streams_mut(&mut self) -> &mut StreamMap {
/// self.inner.streams_mut()
/// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. }
pub fn shrink_window(&mut self, decr: WindowSize) { }
self.underflow += decr;
} impl<T> Stream for FlowControl<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
/// Claims the provided amount from the window, if there is enough space. T: StreamTransporter,
/// {
/// Fails when `take_window_update()` hasn't returned at least `sz` more bytes than type Item = T::Item;
/// have been previously claimed. type Error = T::Error;
pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> {
if self.window_size < sz { fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
return Err(WindowUnderflow); self.inner.poll()
} }
}
self.window_size -= sz;
Ok(())
} impl<T, U> Sink for FlowControl<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
/// Applies a window increment immediately. T: StreamTransporter,
pub fn increment_window_size(&mut self, sz: WindowSize) { {
if sz <= self.underflow { type SinkItem = T::SinkItem;
self.underflow -= sz; type SinkError = T::SinkError;
return;
} fn start_send(&mut self, item: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
self.inner.start_send(item)
let added = sz - self.underflow; }
self.window_size += added;
self.next_window_update += added; fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.underflow = 0; self.inner.poll_complete()
} }
}
/// Obtains and clears an unadvertised window update.
pub fn take_window_update(&mut self) -> Option<WindowSize> { impl<T, U> ReadySink for FlowControl<T>
if self.next_window_update == 0 { where T: Stream<Item = Frame, Error = ConnectionError>,
return None; T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
} T: ReadySink,
T: StreamTransporter,
let incr = self.next_window_update; {
self.next_window_update = 0; fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
Some(incr) self.inner.poll_ready()
} }
} }

View File

@@ -0,0 +1,83 @@
use proto::WindowSize;
#[derive(Clone, Copy, Debug)]
pub struct WindowUnderflow;
pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535;
#[derive(Copy, Clone, Debug)]
pub struct FlowController {
/// Amount that may be claimed.
window_size: WindowSize,
/// Amount to be removed by future increments.
underflow: WindowSize,
/// The amount that has been incremented but not yet advertised (to the application or
/// the remote).
next_window_update: WindowSize,
}
impl Default for FlowController {
fn default() -> Self {
Self::new(DEFAULT_INITIAL_WINDOW_SIZE)
}
}
impl FlowController {
pub fn new(window_size: WindowSize) -> FlowController {
FlowController {
window_size,
underflow: 0,
next_window_update: 0,
}
}
/// Reduce future capacity of the window.
///
/// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE.
pub fn shrink_window(&mut self, decr: WindowSize) {
self.underflow += decr;
}
/// Claims the provided amount from the window, if there is enough space.
///
/// Fails when `take_window_update()` hasn't returned at least `sz` more bytes than
/// have been previously claimed.
pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> {
if self.window_size < sz {
return Err(WindowUnderflow);
}
self.window_size -= sz;
Ok(())
}
/// Applies a window increment immediately.
pub fn increment_window_size(&mut self, sz: WindowSize) {
if sz <= self.underflow {
self.underflow -= sz;
return;
}
let added = sz - self.underflow;
self.window_size += added;
self.next_window_update += added;
self.underflow = 0;
}
/// Obtains and clears an unadvertised window update.
pub fn take_window_update(&mut self) -> Option<WindowSize> {
if self.next_window_update == 0 {
return None;
}
let incr = self.next_window_update;
self.next_window_update = 0;
Some(incr)
}
}
#[test]
fn test() {
let mut fc = FlowController::new(65_535);
}

View File

@@ -1,43 +1,62 @@
mod connection; mod connection;
mod flow_control; mod flow_control;
mod flow_controller;
mod framed_read; mod framed_read;
mod framed_write; mod framed_write;
mod ping_pong; mod ping_pong;
mod ready; mod ready;
mod settings; mod settings;
mod state; mod state;
mod window_update; mod stream_tracker;
pub use self::connection::Connection; pub use self::connection::Connection;
pub use self::flow_control::FlowController; pub use self::flow_control::FlowControl;
pub use self::flow_controller::FlowController;
pub use self::framed_read::FramedRead; pub use self::framed_read::FramedRead;
pub use self::framed_write::FramedWrite; pub use self::framed_write::FramedWrite;
pub use self::ping_pong::PingPong; pub use self::ping_pong::PingPong;
pub use self::ready::ReadySink; pub use self::ready::ReadySink;
pub use self::settings::Settings; pub use self::settings::Settings;
pub use self::state::{PeerState, State}; pub use self::stream_tracker::StreamTracker;
pub use self::window_update::WindowUpdate; use self::state::StreamState;
use {frame, Peer}; use {frame, Peer, StreamId};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited; use tokio_io::codec::length_delimited;
use bytes::{Buf, IntoBuf}; use bytes::{Buf, IntoBuf};
type Inner<T, B> = use ordermap::OrderMap;
Settings< use fnv::FnvHasher;
PingPong< use std::hash::BuildHasherDefault;
Framed<T, B>,
B>>;
type Framed<T, B> = /// Represents
type Transport<T, B> =
Settings<
FlowControl<
StreamTracker<
PingPong<
Framer<T, B>,
B>>>>;
type Framer<T, B> =
FramedRead< FramedRead<
FramedWrite<T, B>>; FramedWrite<T, B>>;
pub type WindowSize = u32; pub type WindowSize = u32;
#[derive(Debug)]
struct StreamMap {
inner: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>
}
trait StreamTransporter {
fn streams(&self)-> &StreamMap;
fn streams_mut(&mut self) -> &mut StreamMap;
}
/// Create a full H2 transport from an I/O handle. /// Create a full H2 transport from an I/O handle.
/// ///
/// This is called as the final step of the client handshake future. /// This is called as the final step of the client handshake future.
@@ -91,8 +110,10 @@ pub fn from_server_handshaker<T, P, B>(transport: Settings<FramedWrite<T, B::Buf
// Map to `Frame` types // Map to `Frame` types
let framed = FramedRead::new(framed_read); let framed = FramedRead::new(framed_read);
// Add ping/pong responder. FlowControl::new(
PingPong::new(framed) StreamTracker::new(
PingPong::new(
framed)))
}); });
// Finally, return the constructed `Connection` // Finally, return the constructed `Connection`

View File

@@ -20,7 +20,11 @@ impl<T, U> PingPong<T, U>
pong: None, pong: None,
} }
} }
}
impl<T, U> PingPong<T, U>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
fn try_send_pong(&mut self) -> Poll<(), ConnectionError> { fn try_send_pong(&mut self) -> Poll<(), ConnectionError> {
if let Some(pong) = self.pong.take() { if let Some(pong) = self.pong.take() {
if let AsyncSink::NotReady(pong) = self.inner.start_send(pong)? { if let AsyncSink::NotReady(pong) = self.inner.start_send(pong)? {
@@ -77,8 +81,7 @@ impl<T, U> Stream for PingPong<T, U>
} }
impl<T, U> Sink for PingPong<T, U> impl<T, U> Sink for PingPong<T, U>
where T: Stream<Item = Frame, Error = ConnectionError>, where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{ {
type SinkItem = Frame<U>; type SinkItem = Frame<U>;
type SinkError = ConnectionError; type SinkError = ConnectionError;
@@ -103,8 +106,7 @@ impl<T, U> Sink for PingPong<T, U>
} }
impl<T, U> ReadySink for PingPong<T, U> impl<T, U> ReadySink for PingPong<T, U>
where T: Stream<Item = Frame, Error = ConnectionError>, where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink, T: ReadySink,
{ {
fn poll_ready(&mut self) -> Poll<(), ConnectionError> { fn poll_ready(&mut self) -> Poll<(), ConnectionError> {

View File

@@ -45,7 +45,7 @@ use proto::FlowController;
/// R: RST_STREAM frame /// R: RST_STREAM frame
/// ``` /// ```
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub enum State { pub enum StreamState {
Idle, Idle,
ReservedLocal, ReservedLocal,
ReservedRemote, ReservedRemote,
@@ -58,7 +58,7 @@ pub enum State {
Closed, Closed,
} }
impl State { impl StreamState {
/// Updates the local flow controller so that the remote may send `incr` more bytes. /// Updates the local flow controller so that the remote may send `incr` more bytes.
/// ///
/// Returns the amount of capacity created, accounting for window size changes. The /// Returns the amount of capacity created, accounting for window size changes. The
@@ -66,7 +66,7 @@ impl State {
/// ///
/// If the remote is closed, None is returned. /// If the remote is closed, None is returned.
pub fn increment_send_window_size(&mut self, incr: u32) { pub fn increment_send_window_size(&mut self, incr: u32) {
use self::State::*; use self::StreamState::*;
use self::PeerState::*; use self::PeerState::*;
if incr == 0 { if incr == 0 {
@@ -83,7 +83,7 @@ impl State {
/// Consumes newly-advertised capacity to inform the local endpoint it may send more /// Consumes newly-advertised capacity to inform the local endpoint it may send more
/// data. /// data.
pub fn take_send_window_update(&mut self) -> Option<u32> { pub fn take_send_window_update(&mut self) -> Option<u32> {
use self::State::*; use self::StreamState::*;
use self::PeerState::*; use self::PeerState::*;
match self { match self {
@@ -99,7 +99,7 @@ impl State {
/// Returns the amount of capacity created, accounting for window size changes. The /// Returns the amount of capacity created, accounting for window size changes. The
/// caller should send the the returned window size increment to the remote. /// caller should send the the returned window size increment to the remote.
pub fn increment_recv_window_size(&mut self, incr: u32) { pub fn increment_recv_window_size(&mut self, incr: u32) {
use self::State::*; use self::StreamState::*;
use self::PeerState::*; use self::PeerState::*;
if incr == 0 { if incr == 0 {
@@ -116,7 +116,7 @@ impl State {
/// Consumes newly-advertised capacity to inform the local endpoint it may send more /// Consumes newly-advertised capacity to inform the local endpoint it may send more
/// data. /// data.
pub fn take_recv_window_update(&mut self) -> Option<u32> { pub fn take_recv_window_update(&mut self) -> Option<u32> {
use self::State::*; use self::StreamState::*;
use self::PeerState::*; use self::PeerState::*;
match self { match self {
@@ -143,7 +143,7 @@ impl State {
/// > receives WINDOW_UPDATE frames that cause the flow-control window to become /// > receives WINDOW_UPDATE frames that cause the flow-control window to become
/// > positive. /// > positive.
pub fn update_initial_recv_window_size(&mut self, old: u32, new: u32) { pub fn update_initial_recv_window_size(&mut self, old: u32, new: u32) {
use self::State::*; use self::StreamState::*;
use self::PeerState::*; use self::PeerState::*;
match self { match self {
@@ -161,7 +161,7 @@ impl State {
/// TODO Connection doesn't have an API for local updates yet. /// TODO Connection doesn't have an API for local updates yet.
pub fn update_initial_send_window_size(&mut self, _old: u32, _new: u32) { pub fn update_initial_send_window_size(&mut self, _old: u32, _new: u32) {
//use self::State::*; //use self::StreamState::*;
//use self::PeerState::*; //use self::PeerState::*;
unimplemented!() unimplemented!()
} }
@@ -175,7 +175,7 @@ impl State {
initial_recv_window_size: u32) initial_recv_window_size: u32)
-> Result<bool, ConnectionError> -> Result<bool, ConnectionError>
{ {
use self::State::*; use self::StreamState::*;
use self::PeerState::*; use self::PeerState::*;
match *self { match *self {
@@ -218,7 +218,7 @@ impl State {
} }
pub fn recv_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> { pub fn recv_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> {
use self::State::*; use self::StreamState::*;
match *self { match *self {
Open { local, mut remote } => { Open { local, mut remote } => {
@@ -256,7 +256,7 @@ impl State {
initial_window_size: u32) initial_window_size: u32)
-> Result<bool, ConnectionError> -> Result<bool, ConnectionError>
{ {
use self::State::*; use self::StreamState::*;
use self::PeerState::*; use self::PeerState::*;
match *self { match *self {
@@ -307,7 +307,7 @@ impl State {
} }
pub fn send_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> { pub fn send_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> {
use self::State::*; use self::StreamState::*;
match *self { match *self {
Open { mut local, remote } => { Open { mut local, remote } => {
@@ -337,9 +337,9 @@ impl State {
} }
} }
impl Default for State { impl Default for StreamState {
fn default() -> State { fn default() -> StreamState {
State::Idle StreamState::Idle
} }
} }

View File

@@ -0,0 +1,67 @@
use ConnectionError;
use frame::{self, Frame};
use proto::{ReadySink, StreamMap, StreamTransporter, WindowSize};
use futures::*;
#[derive(Debug)]
pub struct StreamTracker<T> {
inner: T,
}
impl<T, U> StreamTracker<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>
{
pub fn new(inner: T) -> StreamTracker<T> {
StreamTracker { inner }
}
}
impl<T> StreamTransporter for StreamTracker<T> {
fn streams(&self) -> &StreamMap {
unimplemented!()
}
fn streams_mut(&mut self) -> &mut StreamMap {
unimplemented!()
}
}
impl<T, U> Stream for StreamTracker<T>
where T: Stream<Item = Frame<U>, Error = ConnectionError>,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
self.inner.poll()
}
}
impl<T, U> Sink for StreamTracker<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
}
}
impl<T, U> ReadySink for StreamTracker<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}

View File

@@ -1,36 +0,0 @@
use StreamId;
use frame;
#[derive(Debug)]
pub enum WindowUpdate {
Connection { increment: u32 },
Stream { id: StreamId, increment: u32 },
}
impl WindowUpdate {
pub fn increment(&self) -> u32 {
match *self {
WindowUpdate::Connection { increment } |
WindowUpdate::Stream { increment, .. } => increment
}
}
}
impl From<WindowUpdate> for frame::WindowUpdate {
fn from(src: WindowUpdate) -> Self {
match src {
WindowUpdate::Connection { increment } => {
frame::WindowUpdate::new(StreamId::zero(), increment)
}
WindowUpdate::Stream { id, increment } => {
frame::WindowUpdate::new(id, increment)
}
}
}
}
impl From<WindowUpdate> for frame::Frame {
fn from(src: WindowUpdate) -> Self {
frame::Frame::WindowUpdate(src.into())
}
}