put each interface in its own file

This commit is contained in:
Oliver Gould
2017-07-24 15:33:48 +00:00
parent f3115d1b6f
commit 1069629aef
7 changed files with 448 additions and 108 deletions

View File

@@ -0,0 +1,23 @@
use ConnectionError;
use frame::SettingSet;
/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to
/// FramedWrite).
pub trait ApplySettings {
fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>;
fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>;
}
macro_rules! proxy_apply_settings {
($outer:ident) => (
impl<T: ApplySettings> ApplySettings for $outer<T> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
)
}

30
src/proto/control_flow.rs Normal file
View File

@@ -0,0 +1,30 @@
use ConnectionError;
use proto::*;
/// Exposes flow control states to "upper" layers of the transport (i.e. above
/// FlowControl).
pub trait ControlFlow {
/// Polls for the next window update from the remote.
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError>;
/// Increases the local receive capacity of a stream.
///
/// This may cause a window update to be sent to the remote.
///
/// Fails if the given stream is not active.
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>;
}
macro_rules! proxy_control_flow {
($outer:ident) => (
impl<T: ControlFlow> ControlFlow for $outer<T> {
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
self.inner.poll_window_update()
}
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
self.inner.expand_window(id, incr)
}
}
)
}

21
src/proto/control_ping.rs Normal file
View File

@@ -0,0 +1,21 @@
use ConnectionError;
use proto::*;
pub trait ControlPing {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError>;
fn take_pong(&mut self) -> Option<PingPayload>;
}
macro_rules! proxy_control_ping {
($outer:ident) => (
impl<T: ControlPing> ControlPing for $outer<T> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
)
}

View File

@@ -0,0 +1,13 @@
use ConnectionError;
use frame::SettingSet;
use proto::*;
/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and
/// above---Connection).
pub trait ControlSettings {
fn update_local_settings(&mut self, set: SettingSet) -> Result<(), ConnectionError>;
fn remote_push_enabled(&self) -> Option<bool>;
fn remote_max_concurrent_streams(&self) -> Option<u32>;
fn remote_initial_window_size(&self) -> WindowSize;
}

View File

@@ -1,88 +1,6 @@
use ConnectionError;
use frame::SettingSet;
use proto::*;
/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and
/// above---Connection).
pub trait ControlSettings {
fn update_local_settings(&mut self, set: SettingSet) -> Result<(), ConnectionError>;
fn remote_push_enabled(&self) -> Option<bool>;
fn remote_max_concurrent_streams(&self) -> Option<u32>;
fn remote_initial_window_size(&self) -> WindowSize;
}
// macro_rules! proxy_control_settings {
// ($outer:ident) => (
// impl<T: ControlSettings> ControlSettings for $outer<T> {
// fn update_local_settings(&mut self, set: SettingSet) -> Result<(), ConnectionError> {
// self.inner.update_local_settings(set)
// }
//
// fn remote_push_enabled(&self) -> Option<bool> {
// self.inner.remote_push_enabled(set)
// }
//
// fn remote_max_concurrent_streams(&self) -> Option<u32> {
// self.inner.remote_max_concurrent_streams(set)
// }
//
// fn remote_initial_window_size(&self) -> WindowSize {
// self.inner.remote_initial_window_size(set)
// }
// }
// )
// }
/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to
/// FramedWrite).
pub trait ApplySettings {
fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>;
fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>;
}
macro_rules! proxy_apply_settings {
($outer:ident) => (
impl<T: ApplySettings> ApplySettings for $outer<T> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
)
}
/// Exposes flow control states to "upper" layers of the transport (i.e. above
/// FlowControl).
pub trait ControlFlow {
/// Polls for the next window update from the remote.
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError>;
/// Increases the local receive capacity of a stream.
///
/// This may cause a window update to be sent to the remote.
///
/// Fails if the given stream is not active.
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>;
}
macro_rules! proxy_control_flow {
($outer:ident) => (
impl<T: ControlFlow> ControlFlow for $outer<T> {
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
self.inner.poll_window_update()
}
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
self.inner.expand_window(id, incr)
}
}
)
}
/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up
/// to Connection).
pub trait ControlStreams {
@@ -281,22 +199,3 @@ macro_rules! proxy_control_streams {
}
)
}
pub trait ControlPing {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError>;
fn take_pong(&mut self) -> Option<PingPayload>;
}
macro_rules! proxy_control_ping {
($outer:ident) => (
impl<T: ControlPing> ControlPing for $outer<T> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
)
}

View File

@@ -7,9 +7,23 @@ use futures::*;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
// First, pull in the internal interfaces that support macros used throughout this module.
#[macro_use]
mod ifaces;
use self::ifaces::*;
mod apply_settings;
#[macro_use]
mod control_flow;
#[macro_use]
mod control_ping;
mod control_settings;
#[macro_use]
mod control_streams;
use self::apply_settings::ApplySettings;
use self::control_flow::ControlFlow;
use self::control_ping::ControlPing;
use self::control_settings::ControlSettings;
use self::control_streams::ControlStreams;
mod connection;
mod flow_control;
@@ -39,7 +53,7 @@ use self::stream_recv_close::StreamRecvClose;
use self::stream_recv_open::StreamRecvOpen;
use self::stream_send_close::StreamSendClose;
use self::stream_send_open::StreamSendOpen;
use self::stream_store::StreamStore;
use self::stream_store::StreamStates;
/// Represents the internals of an HTTP/2 connection.
///
@@ -59,7 +73,7 @@ use self::stream_store::StreamStore;
///
/// ### The stream transport
///
/// The states of all HTTP/2 connections are stored centrally in the `StreamStore` at the
/// The states of all HTTP/2 connections are stored centrally in the `StreamStates` at the
/// bottom of the stream transport. Several modules above this access this state via the
/// `ControlStreams` API to drive changes to the stream state. In each direction (send
/// from local to remote, and recv from remote to local), there is an Stream\*Open module
@@ -103,7 +117,7 @@ use self::stream_store::StreamStore;
/// - Ensures that the local peer's max stream concurrency is not violated.
/// - Emits StreamRefused resets to the remote.
///
/// #### `StreamStore`
/// #### `StreamStates`
///
/// - Holds the state of all local & remote active streams.
/// - Holds the cause of all reset/closed streams.
@@ -138,7 +152,7 @@ type Streams<T, P> =
FlowControl<
StreamSendClose<
StreamRecvOpen<
StreamStore<T, P>>>>>>;
StreamStates<T, P>>>>>>;
type Codec<T, B> =
FramedRead<
@@ -238,7 +252,7 @@ pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf>
StreamRecvOpen::new(
initial_recv_window_size,
local_max_concurrency,
StreamStore::new(
StreamStates::new(
PingPong::new(
FramedRead::new(framed))))))))
});

340
src/proto/stream_store.rs Normal file
View File

@@ -0,0 +1,340 @@
use {ConnectionError, Peer, StreamId};
use error::Reason::{NoError, ProtocolError};
use proto::*;
use proto::state::StreamState;
use fnv::FnvHasher;
use ordermap::OrderMap;
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
/// Holds the underlying stream state to be accessed by upper layers.
// TODO track reserved streams
// TODO constrain the size of `reset`
#[derive(Debug, Default)]
pub struct StreamStates<T, P> {
inner: T,
/// Holds active streams initiated by the local endpoint.
local_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote endpoint.
remote_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote.
reset: OrderMap<StreamId, Reason, BuildHasherDefault<FnvHasher>>,
_phantom: PhantomData<P>,
}
impl<T, P, U> StreamStates<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
{
pub fn new(inner: T) -> StreamStates<T, P> {
StreamStates {
inner,
local_active: OrderMap::default(),
remote_active: OrderMap::default(),
reset: OrderMap::default(),
_phantom: PhantomData,
}
}
}
impl<T, P: Peer> StreamStates<T, P> {
pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.get(&id)
} else {
self.remote_active.get(&id)
}
}
pub fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(&id)
} else {
self.remote_active.get_mut(&id)
}
}
pub fn remove_active(&mut self, id: StreamId) {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.remove(&id);
} else {
self.remote_active.remove(&id);
}
}
}
impl<T, P: Peer> ControlStreams for StreamStates<T, P> {
fn local_valid_id(id: StreamId) -> bool {
P::is_valid_local_stream_id(id)
}
fn remote_valid_id(id: StreamId) -> bool {
P::is_valid_remote_stream_id(id)
}
fn local_can_open() -> bool {
P::local_can_open()
}
fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !Self::local_valid_id(id) || !Self::local_can_open() {
return Err(ProtocolError.into());
}
if self.local_active.contains_key(&id) {
return Err(ProtocolError.into());
}
self.local_active.insert(id, StreamState::new_open_sending(sz));
Ok(())
}
fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !Self::remote_valid_id(id) || !Self::remote_can_open() {
return Err(ProtocolError.into());
}
if self.remote_active.contains_key(&id) {
return Err(ProtocolError.into());
}
self.remote_active.insert(id, StreamState::new_open_recving(sz));
Ok(())
}
fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !Self::local_valid_id(id) {
return Err(ProtocolError.into());
}
match self.local_active.get_mut(&id) {
Some(s) => s.open_recv_half(sz).map(|_| {}),
None => Err(ProtocolError.into()),
}
}
fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !Self::remote_valid_id(id) {
return Err(ProtocolError.into());
}
match self.remote_active.get_mut(&id) {
Some(s) => s.open_send_half(sz).map(|_| {}),
None => Err(ProtocolError.into()),
}
}
fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
let fully_closed = self.get_active_mut(id)
.map(|s| s.close_send_half())
.unwrap_or_else(|| Err(ProtocolError.into()))?;
if fully_closed {
self.remove_active(id);
self.reset.insert(id, NoError);
}
Ok(())
}
fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
let fully_closed = self.get_active_mut(id)
.map(|s| s.close_recv_half())
.unwrap_or_else(|| Err(ProtocolError.into()))?;
if fully_closed {
self.remove_active(id);
self.reset.insert(id, NoError);
}
Ok(())
}
fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.remove_active(id);
self.reset.insert(id, cause);
}
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.reset.get(&id).map(|r| *r)
}
fn is_local_active(&self, id: StreamId) -> bool {
self.local_active.contains_key(&id)
}
fn is_remote_active(&self, id: StreamId) -> bool {
self.remote_active.contains_key(&id)
}
fn is_send_open(&mut self, id: StreamId) -> bool {
match self.get_active(id) {
Some(s) => s.is_send_open(),
None => false,
}
}
fn is_recv_open(&mut self, id: StreamId) -> bool {
match self.get_active(id) {
Some(s) => s.is_recv_open(),
None => false,
}
}
fn local_active_len(&self) -> usize {
self.local_active.len()
}
fn remote_active_len(&self) -> usize {
self.remote_active.len()
}
fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) {
if new_sz < old_sz {
let decr = old_sz - new_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.shrink_window(decr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.shrink_window(decr);
}
}
} else {
let incr = new_sz - old_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.expand_window(incr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.expand_window(incr);
}
}
}
}
fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) {
if new_sz < old_sz {
let decr = old_sz - new_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.shrink_window(decr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.shrink_window(decr);
}
}
} else {
let incr = new_sz - old_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.expand_window(incr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.expand_window(incr);
}
}
}
}
fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
if id.is_zero() {
None
} else if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(&id).and_then(|s| s.recv_flow_controller())
} else {
self.remote_active.get_mut(&id).and_then(|s| s.recv_flow_controller())
}
}
fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
if id.is_zero() {
None
} else if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(&id).and_then(|s| s.send_flow_controller())
} else {
self.remote_active.get_mut(&id).and_then(|s| s.send_flow_controller())
}
}
}
/// Proxy.
impl<T, P> Stream for StreamStates<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
{
type Item = Frame;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
self.inner.poll()
}
}
/// Proxy.
impl<T, P, U> Sink for StreamStates<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
type SinkItem = Frame<U>;
type SinkError = ConnectionError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Frame<U>, ConnectionError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_complete()
}
}
/// Proxy.
impl<T, P, U> ReadySink for StreamStates<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}
/// Proxy.
impl<T: ApplySettings, P> ApplySettings for StreamStates<T, P> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
/// Proxy.
impl<T: ControlPing, P> ControlPing for StreamStates<T, P> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}