diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 28232cb..f3128bb 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -3,8 +3,6 @@ use client::Client; use error; use frame::{self, StreamId}; use proto::*; -use proto::ping_pong::{ControlPing, PingPayload}; -use proto::settings::ControlSettings; use server::Server; use bytes::{Bytes, IntoBuf}; diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 9128e57..726c73b 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -4,20 +4,6 @@ use proto::*; use std::collections::VecDeque; -/// Exposes flow control states to "upper" layers of the transport (i.e. above -/// FlowControl). -pub trait ControlFlow { - /// Polls for the next window update from the remote. - fn poll_window_update(&mut self) -> Poll; - - /// Increases the local receive capacity of a stream. - /// - /// This may cause a window update to be sent to the remote. - /// - /// Fails if the given stream is not active. - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; -} - #[derive(Debug)] pub struct FlowControl { inner: T, @@ -345,100 +331,5 @@ impl ApplySettings for FlowControl } } -/// Proxy. -impl ControlStreams for FlowControl { - fn local_valid_id(id: StreamId) -> bool { - T::local_valid_id(id) - } - - fn remote_valid_id(id: StreamId) -> bool { - T::remote_valid_id(id) - } - - fn local_can_open() -> bool { - T::local_can_open() - } - - fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open(id, sz) - } - - fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open_recv_half(id, sz) - } - - fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open_send_half(id, sz) - } - - fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open(id, sz) - } - - fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_send_half(id) - } - - fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_recv_half(id) - } - - fn reset_stream(&mut self, id: StreamId, cause: Reason) { - self.inner.reset_stream(id, cause) - } - - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) - } - - fn is_local_active(&self, id: StreamId) -> bool { - self.inner.is_local_active(id) - } - - fn is_remote_active(&self, id: StreamId) -> bool { - self.inner.is_remote_active(id) - } - - fn local_active_len(&self) -> usize { - self.inner.local_active_len() - } - - fn remote_active_len(&self) -> usize { - self.inner.remote_active_len() - } - - fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_recv_window_size(old_sz, new_sz) - } - - fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_send_window_size(old_sz, new_sz) - } - - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.recv_flow_controller(id) - } - - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.send_flow_controller(id) - } - - fn is_send_open(&mut self, id: StreamId) -> bool { - self.inner.is_send_open(id) - } - - fn is_recv_open(&mut self, id: StreamId) -> bool { - self.inner.is_recv_open(id) - } -} - -/// Proxy. -impl ControlPing for FlowControl { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} +proxy_control_streams!(FlowControl); +proxy_control_ping!(FlowControl); diff --git a/src/proto/ifaces.rs b/src/proto/ifaces.rs new file mode 100644 index 0000000..3189322 --- /dev/null +++ b/src/proto/ifaces.rs @@ -0,0 +1,302 @@ +use ConnectionError; +use frame::SettingSet; +use proto::*; + +/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and +/// above---Connection). +pub trait ControlSettings { + fn update_local_settings(&mut self, set: SettingSet) -> Result<(), ConnectionError>; + + fn remote_push_enabled(&self) -> Option; + fn remote_max_concurrent_streams(&self) -> Option; + fn remote_initial_window_size(&self) -> WindowSize; +} + +// macro_rules! proxy_control_settings { +// ($outer:ident) => ( +// impl ControlSettings for $outer { +// fn update_local_settings(&mut self, set: SettingSet) -> Result<(), ConnectionError> { +// self.inner.update_local_settings(set) +// } +// +// fn remote_push_enabled(&self) -> Option { +// self.inner.remote_push_enabled(set) +// } +// +// fn remote_max_concurrent_streams(&self) -> Option { +// self.inner.remote_max_concurrent_streams(set) +// } +// +// fn remote_initial_window_size(&self) -> WindowSize { +// self.inner.remote_initial_window_size(set) +// } +// } +// ) +// } + +/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to +/// FramedWrite). +pub trait ApplySettings { + fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>; + fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>; +} + +macro_rules! proxy_apply_settings { + ($outer:ident) => ( + impl ApplySettings for $outer { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set) + } + } + ) +} + +/// Exposes flow control states to "upper" layers of the transport (i.e. above +/// FlowControl). +pub trait ControlFlow { + /// Polls for the next window update from the remote. + fn poll_window_update(&mut self) -> Poll; + + /// Increases the local receive capacity of a stream. + /// + /// This may cause a window update to be sent to the remote. + /// + /// Fails if the given stream is not active. + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; +} + +macro_rules! proxy_control_flow { + ($outer:ident) => ( + impl ControlFlow for $outer { + fn poll_window_update(&mut self) -> Poll { + self.inner.poll_window_update() + } + + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.expand_window(id, incr) + } + } + ) +} + +/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up +/// to Connection). +pub trait ControlStreams { + /// Determines whether the given stream could theoretically be opened by the local + /// side of this connection. + fn local_valid_id(id: StreamId) -> bool; + + /// Determines whether the given stream could theoretically be opened by the remote + /// side of this connection. + fn remote_valid_id(id: StreamId) -> bool; + + /// Indicates whether this local endpoint may open streams (with HEADERS). + /// + /// Implies that this endpoint is a client. + fn local_can_open() -> bool; + + /// Indicates whether this remote endpoint may open streams (with HEADERS). + /// + /// Implies that this endpoint is a server. + fn remote_can_open() -> bool { + !Self::local_can_open() + } + + /// Creates a new stream in the OPEN state from the local side (i.e. as a Client). + /// + /// Must only be called when local_can_open returns true. + fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + + /// Create a new stream in the OPEN state from the remote side (i.e. as a Server). + /// + /// Must only be called when remote_can_open returns true. + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + + /// Prepare the receive side of a local stream to receive data from the remote. + /// + /// Typically called when a client receives a response header. + fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + + /// Prepare the send side of a remote stream to receive data from the local endpoint. + /// + /// Typically called when a server sends a response header. + fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + + // TODO push promise + // fn local_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; + // fn remote_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; + + /// Closes the send half of a stream. + /// + /// Fails with a ProtocolError if send half of the stream was not open. + fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; + + /// Closes the recv half of a stream. + /// + /// Fails with a ProtocolError if recv half of the stream was not open. + fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; + + /// Resets the given stream. + /// + /// If the stream was already reset, the stored cause is updated. + fn reset_stream(&mut self, id: StreamId, cause: Reason); + + /// Get the reason the stream was reset, if it was reset. + fn get_reset(&self, id: StreamId) -> Option; + + /// Returns true if the given stream was opened by the local peer and is not yet + /// closed. + fn is_local_active(&self, id: StreamId) -> bool; + + /// Returns true if the given stream was opened by the remote peer and is not yet + /// closed. + fn is_remote_active(&self, id: StreamId) -> bool; + + /// Returns true if the given stream was opened and is not yet closed. + fn is_active(&self, id: StreamId) -> bool { + if Self::local_valid_id(id) { + self.is_local_active(id) + } else { + self.is_remote_active(id) + } + } + + /// Returns the number of open streams initiated by the local peer. + fn local_active_len(&self) -> usize; + + /// Returns the number of open streams initiated by the remote peer. + fn remote_active_len(&self) -> usize; + + /// Returns true iff the recv half of the given stream is open. + fn is_recv_open(&mut self, id: StreamId) -> bool; + + /// Returns true iff the send half of the given stream is open. + fn is_send_open(&mut self, id: StreamId) -> bool; + + /// If the given stream ID is active and able to recv data, get its mutable recv flow + /// control state. + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; + + /// If the given stream ID is active and able to send data, get its mutable send flow + /// control state. + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; + + /// Updates the initial window size for the local peer. + fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize); + + /// Updates the initial window size for the remote peer. + fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize); +} + +macro_rules! proxy_control_streams { + ($outer:ident) => ( + impl ControlStreams for $outer { + fn local_valid_id(id: StreamId) -> bool { + T::local_valid_id(id) + } + + fn remote_valid_id(id: StreamId) -> bool { + T::remote_valid_id(id) + } + + fn local_can_open() -> bool { + T::local_can_open() + } + + fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open(id, sz) + } + + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open(id, sz) + } + + fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open_recv_half(id, sz) + } + + fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open_send_half(id, sz) + } + + fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_send_half(id) + } + + fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_recv_half(id) + } + + fn reset_stream(&mut self, id: StreamId, cause: Reason) { + self.inner.reset_stream(id, cause) + } + + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } + + fn is_local_active(&self, id: StreamId) -> bool { + self.inner.is_local_active(id) + } + + fn is_remote_active(&self, id: StreamId) -> bool { + self.inner.is_remote_active(id) + } + + fn local_active_len(&self) -> usize { + self.inner.local_active_len() + } + + fn remote_active_len(&self) -> usize { + self.inner.remote_active_len() + } + + fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { + self.inner.update_inital_recv_window_size(old_sz, new_sz) + } + + fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { + self.inner.update_inital_send_window_size(old_sz, new_sz) + } + + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.recv_flow_controller(id) + } + + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.send_flow_controller(id) + } + + fn is_send_open(&mut self, id: StreamId) -> bool { + self.inner.is_send_open(id) + } + + fn is_recv_open(&mut self, id: StreamId) -> bool { + self.inner.is_recv_open(id) + } + } + ) +} + +pub trait ControlPing { + fn start_ping(&mut self, body: PingPayload) -> StartSend; + fn take_pong(&mut self) -> Option; +} + +macro_rules! proxy_control_ping { + ($outer:ident) => ( + impl ControlPing for $outer { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn take_pong(&mut self) -> Option { + self.inner.take_pong() + } + } + ) +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 53e2ef4..1fe7cf3 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -7,6 +7,10 @@ use futures::*; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; +#[macro_use] +mod ifaces; +use self::ifaces::*; + mod connection; mod flow_control; mod flow_control_state; @@ -24,18 +28,18 @@ mod stream_store; pub use self::connection::Connection; -use self::flow_control::{ControlFlow, FlowControl}; +use self::flow_control::{FlowControl}; use self::flow_control_state::{FlowControlState}; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; -use self::ping_pong::{ControlPing, PingPayload, PingPong}; +use self::ping_pong::{PingPong}; use self::ready::ReadySink; -use self::settings::{ApplySettings, ControlSettings, Settings}; +use self::settings::Settings; use self::stream_recv_close::StreamRecvClose; use self::stream_recv_open::StreamRecvOpen; use self::stream_send_close::StreamSendClose; use self::stream_send_open::StreamSendOpen; -use self::stream_store::{ControlStreams, StreamStore}; +use self::stream_store::StreamStore; /// Represents the internals of an HTTP/2 connection. /// @@ -140,6 +144,8 @@ type Codec = FramedRead< FramedWrite>; +pub type PingPayload = [u8; 8]; + pub type WindowSize = u32; #[derive(Debug, Copy, Clone)] diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index ece90ff..f67642f 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,15 +1,6 @@ use ConnectionError; use frame::{Frame, Ping, SettingSet}; -use proto::{ApplySettings, ReadySink}; - -use futures::*; - -pub type PingPayload = [u8; 8]; - -pub trait ControlPing { - fn start_ping(&mut self, body: PingPayload) -> StartSend; - fn take_pong(&mut self) -> Option; -} +use proto::*; /// Acknowledges ping requests from the remote. #[derive(Debug)] diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 66cc98f..f9e8d07 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -7,23 +7,6 @@ use bytes::BufMut; use std::io; -/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and -/// above---Connection). -pub trait ControlSettings { - fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>; - - fn remote_push_enabled(&self) -> Option; - fn remote_max_concurrent_streams(&self) -> Option; - fn remote_initial_window_size(&self) -> WindowSize; -} - -/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to -/// FramedWrite). -pub trait ApplySettings { - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; -} - #[derive(Debug)] pub struct Settings { // Upstream transport @@ -241,12 +224,4 @@ impl ControlFlow for Settings { } } -impl ControlPing for Settings { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} +proxy_control_ping!(Settings); diff --git a/src/proto/stream_recv_close.rs b/src/proto/stream_recv_close.rs index 95baae6..41839c8 100644 --- a/src/proto/stream_recv_close.rs +++ b/src/proto/stream_recv_close.rs @@ -79,122 +79,7 @@ impl ReadySink for StreamRecvClose } } -// Proxy. -impl ControlStreams for StreamRecvClose { - fn local_valid_id(id: StreamId) -> bool { - T::local_valid_id(id) - } - - fn remote_valid_id(id: StreamId) -> bool { - T::remote_valid_id(id) - } - - fn local_can_open() -> bool { - T::local_can_open() - } - - fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open(id, sz) - } - - fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open(id, sz) - } - - fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open_recv_half(id, sz) - } - - fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open_send_half(id, sz) - } - - fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_send_half(id) - } - - fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_recv_half(id) - } - - fn reset_stream(&mut self, id: StreamId, cause: Reason) { - self.inner.reset_stream(id, cause) - } - - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) - } - - fn is_local_active(&self, id: StreamId) -> bool { - self.inner.is_local_active(id) - } - - fn is_remote_active(&self, id: StreamId) -> bool { - self.inner.is_remote_active(id) - } - - fn local_active_len(&self) -> usize { - self.inner.local_active_len() - } - - fn remote_active_len(&self) -> usize { - self.inner.remote_active_len() - } - - fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_recv_window_size(old_sz, new_sz) - } - - fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_send_window_size(old_sz, new_sz) - } - - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.recv_flow_controller(id) - } - - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.send_flow_controller(id) - } - - fn is_send_open(&mut self, id: StreamId) -> bool { - self.inner.is_send_open(id) - } - - fn is_recv_open(&mut self, id: StreamId) -> bool { - self.inner.is_recv_open(id) - } -} - -// Proxy. -impl ApplySettings for StreamRecvClose { - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set) - } -} - -// Proxy. -impl ControlFlow for StreamRecvClose { - fn poll_window_update(&mut self) -> Poll { - self.inner.poll_window_update() - } - - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.expand_window(id, incr) - } -} - -// Proxy. -impl ControlPing for StreamRecvClose { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} +proxy_apply_settings!(StreamRecvClose); +proxy_control_flow!(StreamRecvClose); +proxy_control_streams!(StreamRecvClose); +proxy_control_ping!(StreamRecvClose); diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs index 74f70d7..dbc6da1 100644 --- a/src/proto/stream_recv_open.rs +++ b/src/proto/stream_recv_open.rs @@ -212,100 +212,6 @@ impl ReadySink for StreamRecvOpen } } -/// Proxy. -impl ControlStreams for StreamRecvOpen { - fn local_valid_id(id: StreamId) -> bool { - T::local_valid_id(id) - } - - fn remote_valid_id(id: StreamId) -> bool { - T::remote_valid_id(id) - } - - fn local_can_open() -> bool { - T::local_can_open() - } - - fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open(id, sz) - } - - fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open(id, sz) - } - - fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open_recv_half(id, sz) - } - - fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open_send_half(id, sz) - } - - fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_send_half(id) - } - - fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_recv_half(id) - } - - fn reset_stream(&mut self, id: StreamId, cause: Reason) { - self.inner.reset_stream(id, cause) - } - - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) - } - - fn is_local_active(&self, id: StreamId) -> bool { - self.inner.is_local_active(id) - } - - fn is_remote_active(&self, id: StreamId) -> bool { - self.inner.is_remote_active(id) - } - - fn local_active_len(&self) -> usize { - self.inner.local_active_len() - } - - fn remote_active_len(&self) -> usize { - self.inner.remote_active_len() - } - - fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_recv_window_size(old_sz, new_sz) - } - - fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_send_window_size(old_sz, new_sz) - } - - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.recv_flow_controller(id) - } - - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.send_flow_controller(id) - } - - fn is_send_open(&mut self, id: StreamId) -> bool { - self.inner.is_send_open(id) - } - - fn is_recv_open(&mut self, id: StreamId) -> bool { - self.inner.is_recv_open(id) - } -} - -/// Proxy. -impl ControlPing for StreamRecvOpen { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} +proxy_control_flow!(StreamRecvOpen); +proxy_control_streams!(StreamRecvOpen); +proxy_control_ping!(StreamRecvOpen); diff --git a/src/proto/stream_send_close.rs b/src/proto/stream_send_close.rs index cf00e7f..bc75757 100644 --- a/src/proto/stream_send_close.rs +++ b/src/proto/stream_send_close.rs @@ -74,111 +74,7 @@ impl ReadySink for StreamSendClose } } -/// Proxy. -impl ApplySettings for StreamSendClose { - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set) - } -} - -/// Proxy. -impl ControlStreams for StreamSendClose { - fn local_valid_id(id: StreamId) -> bool { - T::local_valid_id(id) - } - - fn remote_valid_id(id: StreamId) -> bool { - T::remote_valid_id(id) - } - - fn local_can_open() -> bool { - T::local_can_open() - } - - fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open(id, sz) - } - - fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open(id, sz) - } - - fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open_recv_half(id, sz) - } - - fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open_send_half(id, sz) - } - - fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_send_half(id) - } - - fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_recv_half(id) - } - - fn reset_stream(&mut self, id: StreamId, cause: Reason) { - self.inner.reset_stream(id, cause) - } - - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) - } - - fn is_local_active(&self, id: StreamId) -> bool { - self.inner.is_local_active(id) - } - - fn is_remote_active(&self, id: StreamId) -> bool { - self.inner.is_remote_active(id) - } - - fn local_active_len(&self) -> usize { - self.inner.local_active_len() - } - - fn remote_active_len(&self) -> usize { - self.inner.remote_active_len() - } - - fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_recv_window_size(old_sz, new_sz) - } - - fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_send_window_size(old_sz, new_sz) - } - - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.recv_flow_controller(id) - } - - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.send_flow_controller(id) - } - - fn is_send_open(&mut self, id: StreamId) -> bool { - self.inner.is_send_open(id) - } - - fn is_recv_open(&mut self, id: StreamId) -> bool { - self.inner.is_recv_open(id) - } -} - -/// Proxy. -impl ControlPing for StreamSendClose { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} +proxy_apply_settings!(StreamSendClose); +proxy_control_flow!(StreamSendClose); +proxy_control_streams!(StreamSendClose); +proxy_control_ping!(StreamSendClose); diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs index 13be1a0..d4046d5 100644 --- a/src/proto/stream_send_open.rs +++ b/src/proto/stream_send_open.rs @@ -156,111 +156,6 @@ impl ReadySink for StreamSendOpen } } -/// Proxy. -impl ControlStreams for StreamSendOpen { - fn local_valid_id(id: StreamId) -> bool { - T::local_valid_id(id) - } - - fn remote_valid_id(id: StreamId) -> bool { - T::remote_valid_id(id) - } - - fn local_can_open() -> bool { - T::local_can_open() - } - - fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open(id, sz) - } - - fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open(id, sz) - } - - fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.local_open_recv_half(id, sz) - } - - fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - self.inner.remote_open_send_half(id, sz) - } - - fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_send_half(id) - } - - fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_recv_half(id) - } - - fn reset_stream(&mut self, id: StreamId, cause: Reason) { - self.inner.reset_stream(id, cause) - } - - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) - } - - fn is_local_active(&self, id: StreamId) -> bool { - self.inner.is_local_active(id) - } - - fn is_remote_active(&self, id: StreamId) -> bool { - self.inner.is_remote_active(id) - } - - fn local_active_len(&self) -> usize { - self.inner.local_active_len() - } - - fn remote_active_len(&self) -> usize { - self.inner.remote_active_len() - } - - fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_recv_window_size(old_sz, new_sz) - } - - fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - self.inner.update_inital_send_window_size(old_sz, new_sz) - } - - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.recv_flow_controller(id) - } - - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.send_flow_controller(id) - } - - fn is_send_open(&mut self, id: StreamId) -> bool { - self.inner.is_send_open(id) - } - - fn is_recv_open(&mut self, id: StreamId) -> bool { - self.inner.is_recv_open(id) - } -} - -/// Proxy. -impl ControlFlow for StreamSendOpen { - fn poll_window_update(&mut self) -> Poll { - self.inner.poll_window_update() - } - - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.expand_window(id, incr) - } -} - -/// Proxy. -impl ControlPing for StreamSendOpen { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} +proxy_control_flow!(StreamSendOpen); +proxy_control_streams!(StreamSendOpen); +proxy_control_ping!(StreamSendOpen); diff --git a/src/proto/stream_store.rs b/src/proto/stream_store.rs index fc93d6e..7e8a691 100644 --- a/src/proto/stream_store.rs +++ b/src/proto/stream_store.rs @@ -8,115 +8,6 @@ use ordermap::OrderMap; use std::hash::BuildHasherDefault; use std::marker::PhantomData; -/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up -/// to Connection). -pub trait ControlStreams { - /// Determines whether the given stream could theoretically be opened by the local - /// side of this connection. - fn local_valid_id(id: StreamId) -> bool; - - /// Determines whether the given stream could theoretically be opened by the remote - /// side of this connection. - fn remote_valid_id(id: StreamId) -> bool; - - /// Indicates whether this local endpoint may open streams (with HEADERS). - /// - /// Implies that this endpoint is a client. - fn local_can_open() -> bool; - - /// Indicates whether this remote endpoint may open streams (with HEADERS). - /// - /// Implies that this endpoint is a server. - fn remote_can_open() -> bool { - !Self::local_can_open() - } - - /// Creates a new stream in the OPEN state from the local side (i.e. as a Client). - /// - /// Must only be called when local_can_open returns true. - fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; - - /// Create a new stream in the OPEN state from the remote side (i.e. as a Server). - /// - /// Must only be called when remote_can_open returns true. - fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; - - /// Prepare the receive side of a local stream to receive data from the remote. - /// - /// Typically called when a client receives a response header. - fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; - - /// Prepare the send side of a remote stream to receive data from the local endpoint. - /// - /// Typically called when a server sends a response header. - fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; - - // TODO push promise - // fn local_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; - // fn remote_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; - - /// Closes the send half of a stream. - /// - /// Fails with a ProtocolError if send half of the stream was not open. - fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; - - /// Closes the recv half of a stream. - /// - /// Fails with a ProtocolError if recv half of the stream was not open. - fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; - - /// Resets the given stream. - /// - /// If the stream was already reset, the stored cause is updated. - fn reset_stream(&mut self, id: StreamId, cause: Reason); - - /// Get the reason the stream was reset, if it was reset. - fn get_reset(&self, id: StreamId) -> Option; - - /// Returns true if the given stream was opened by the local peer and is not yet - /// closed. - fn is_local_active(&self, id: StreamId) -> bool; - - /// Returns true if the given stream was opened by the remote peer and is not yet - /// closed. - fn is_remote_active(&self, id: StreamId) -> bool; - - /// Returns true if the given stream was opened and is not yet closed. - fn is_active(&self, id: StreamId) -> bool { - if Self::local_valid_id(id) { - self.is_local_active(id) - } else { - self.is_remote_active(id) - } - } - - /// Returns the number of open streams initiated by the local peer. - fn local_active_len(&self) -> usize; - - /// Returns the number of open streams initiated by the remote peer. - fn remote_active_len(&self) -> usize; - - /// Returns true iff the recv half of the given stream is open. - fn is_recv_open(&mut self, id: StreamId) -> bool; - - /// Returns true iff the send half of the given stream is open. - fn is_send_open(&mut self, id: StreamId) -> bool; - - /// If the given stream ID is active and able to recv data, get its mutable recv flow - /// control state. - fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; - - /// If the given stream ID is active and able to send data, get its mutable send flow - /// control state. - fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; - - /// Updates the initial window size for the local peer. - fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize); - - /// Updates the initial window size for the remote peer. - fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize); -} - /// Holds the underlying stream state to be accessed by upper layers. // TODO track reserved streams // TODO constrain the size of `reset`