unify file/type naming

This commit is contained in:
Oliver Gould
2017-07-24 15:42:16 +00:00
parent 1069629aef
commit 275b835023
5 changed files with 17 additions and 353 deletions

View File

@@ -24,6 +24,10 @@ pub trait ControlStreams {
!Self::local_can_open()
}
// TODO push promise
// fn local_can_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>;
// fn remote_can_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>;
/// Creates a new stream in the OPEN state from the local side (i.e. as a Client).
///
/// Must only be called when local_can_open returns true.

View File

@@ -38,7 +38,7 @@ mod stream_recv_close;
mod stream_recv_open;
mod stream_send_close;
mod stream_send_open;
mod stream_store;
mod stream_states;
pub use self::connection::Connection;
@@ -53,7 +53,7 @@ use self::stream_recv_close::StreamRecvClose;
use self::stream_recv_open::StreamRecvOpen;
use self::stream_send_close::StreamSendClose;
use self::stream_send_open::StreamSendOpen;
use self::stream_store::StreamStates;
use self::stream_states::StreamStates;
/// Represents the internals of an HTTP/2 connection.
///

View File

@@ -12,7 +12,7 @@ use std::marker::PhantomData;
// TODO track reserved streams
// TODO constrain the size of `reset`
#[derive(Debug, Default)]
pub struct StreamStore<T, P> {
pub struct StreamStates<T, P> {
inner: T,
/// Holds active streams initiated by the local endpoint.
@@ -27,13 +27,13 @@ pub struct StreamStore<T, P> {
_phantom: PhantomData<P>,
}
impl<T, P, U> StreamStore<T, P>
impl<T, P, U> StreamStates<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
{
pub fn new(inner: T) -> StreamStore<T, P> {
StreamStore {
pub fn new(inner: T) -> StreamStates<T, P> {
StreamStates {
inner,
local_active: OrderMap::default(),
remote_active: OrderMap::default(),
@@ -43,7 +43,7 @@ impl<T, P, U> StreamStore<T, P>
}
}
impl<T, P: Peer> StreamStore<T, P> {
impl<T, P: Peer> StreamStates<T, P> {
pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
@@ -72,7 +72,7 @@ impl<T, P: Peer> StreamStore<T, P> {
}
}
impl<T, P: Peer> ControlStreams for StreamStore<T, P> {
impl<T, P: Peer> ControlStreams for StreamStates<T, P> {
fn local_valid_id(id: StreamId) -> bool {
P::is_valid_local_stream_id(id)
}
@@ -280,7 +280,7 @@ impl<T, P: Peer> ControlStreams for StreamStore<T, P> {
}
/// Proxy.
impl<T, P> Stream for StreamStore<T, P>
impl<T, P> Stream for StreamStates<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
{
type Item = Frame;
@@ -292,7 +292,7 @@ impl<T, P> Stream for StreamStore<T, P>
}
/// Proxy.
impl<T, P, U> Sink for StreamStore<T, P>
impl<T, P, U> Sink for StreamStates<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
type SinkItem = Frame<U>;
@@ -308,7 +308,7 @@ impl<T, P, U> Sink for StreamStore<T, P>
}
/// Proxy.
impl<T, P, U> ReadySink for StreamStore<T, P>
impl<T, P, U> ReadySink for StreamStates<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
{
@@ -318,7 +318,7 @@ impl<T, P, U> ReadySink for StreamStore<T, P>
}
/// Proxy.
impl<T: ApplySettings, P> ApplySettings for StreamStore<T, P> {
impl<T: ApplySettings, P> ApplySettings for StreamStates<T, P> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
@@ -329,7 +329,7 @@ impl<T: ApplySettings, P> ApplySettings for StreamStore<T, P> {
}
/// Proxy.
impl<T: ControlPing, P> ControlPing for StreamStore<T, P> {
impl<T: ControlPing, P> ControlPing for StreamStates<T, P> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}

View File

@@ -1,340 +0,0 @@
use {ConnectionError, Peer, StreamId};
use error::Reason::{NoError, ProtocolError};
use proto::*;
use proto::state::StreamState;
use fnv::FnvHasher;
use ordermap::OrderMap;
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
/// Holds the underlying stream state to be accessed by upper layers.
// TODO track reserved streams
// TODO constrain the size of `reset`
#[derive(Debug, Default)]
pub struct StreamStates<T, P> {
inner: T,
/// Holds active streams initiated by the local endpoint.
local_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote endpoint.
remote_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote.
reset: OrderMap<StreamId, Reason, BuildHasherDefault<FnvHasher>>,
_phantom: PhantomData<P>,
}
impl<T, P, U> StreamStates<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
{
pub fn new(inner: T) -> StreamStates<T, P> {
StreamStates {
inner,
local_active: OrderMap::default(),
remote_active: OrderMap::default(),
reset: OrderMap::default(),
_phantom: PhantomData,
}
}
}
impl<T, P: Peer> StreamStates<T, P> {
pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.get(&id)
} else {
self.remote_active.get(&id)
}
}
pub fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(&id)
} else {
self.remote_active.get_mut(&id)
}
}
pub fn remove_active(&mut self, id: StreamId) {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.remove(&id);
} else {
self.remote_active.remove(&id);
}
}
}
impl<T, P: Peer> ControlStreams for StreamStates<T, P> {
fn local_valid_id(id: StreamId) -> bool {
P::is_valid_local_stream_id(id)
}
fn remote_valid_id(id: StreamId) -> bool {
P::is_valid_remote_stream_id(id)
}
fn local_can_open() -> bool {
P::local_can_open()
}
fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !Self::local_valid_id(id) || !Self::local_can_open() {
return Err(ProtocolError.into());
}
if self.local_active.contains_key(&id) {
return Err(ProtocolError.into());
}
self.local_active.insert(id, StreamState::new_open_sending(sz));
Ok(())
}
fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !Self::remote_valid_id(id) || !Self::remote_can_open() {
return Err(ProtocolError.into());
}
if self.remote_active.contains_key(&id) {
return Err(ProtocolError.into());
}
self.remote_active.insert(id, StreamState::new_open_recving(sz));
Ok(())
}
fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !Self::local_valid_id(id) {
return Err(ProtocolError.into());
}
match self.local_active.get_mut(&id) {
Some(s) => s.open_recv_half(sz).map(|_| {}),
None => Err(ProtocolError.into()),
}
}
fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !Self::remote_valid_id(id) {
return Err(ProtocolError.into());
}
match self.remote_active.get_mut(&id) {
Some(s) => s.open_send_half(sz).map(|_| {}),
None => Err(ProtocolError.into()),
}
}
fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
let fully_closed = self.get_active_mut(id)
.map(|s| s.close_send_half())
.unwrap_or_else(|| Err(ProtocolError.into()))?;
if fully_closed {
self.remove_active(id);
self.reset.insert(id, NoError);
}
Ok(())
}
fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
let fully_closed = self.get_active_mut(id)
.map(|s| s.close_recv_half())
.unwrap_or_else(|| Err(ProtocolError.into()))?;
if fully_closed {
self.remove_active(id);
self.reset.insert(id, NoError);
}
Ok(())
}
fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.remove_active(id);
self.reset.insert(id, cause);
}
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.reset.get(&id).map(|r| *r)
}
fn is_local_active(&self, id: StreamId) -> bool {
self.local_active.contains_key(&id)
}
fn is_remote_active(&self, id: StreamId) -> bool {
self.remote_active.contains_key(&id)
}
fn is_send_open(&mut self, id: StreamId) -> bool {
match self.get_active(id) {
Some(s) => s.is_send_open(),
None => false,
}
}
fn is_recv_open(&mut self, id: StreamId) -> bool {
match self.get_active(id) {
Some(s) => s.is_recv_open(),
None => false,
}
}
fn local_active_len(&self) -> usize {
self.local_active.len()
}
fn remote_active_len(&self) -> usize {
self.remote_active.len()
}
fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) {
if new_sz < old_sz {
let decr = old_sz - new_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.shrink_window(decr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.shrink_window(decr);
}
}
} else {
let incr = new_sz - old_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.expand_window(incr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.expand_window(incr);
}
}
}
}
fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) {
if new_sz < old_sz {
let decr = old_sz - new_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.shrink_window(decr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.shrink_window(decr);
}
}
} else {
let incr = new_sz - old_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.expand_window(incr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.expand_window(incr);
}
}
}
}
fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
if id.is_zero() {
None
} else if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(&id).and_then(|s| s.recv_flow_controller())
} else {
self.remote_active.get_mut(&id).and_then(|s| s.recv_flow_controller())
}
}
fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
if id.is_zero() {
None
} else if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(&id).and_then(|s| s.send_flow_controller())
} else {
self.remote_active.get_mut(&id).and_then(|s| s.send_flow_controller())
}
}
}
/// Proxy.
impl<T, P> Stream for StreamStates<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
{
type Item = Frame;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
self.inner.poll()
}
}
/// Proxy.
impl<T, P, U> Sink for StreamStates<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
type SinkItem = Frame<U>;
type SinkError = ConnectionError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Frame<U>, ConnectionError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_complete()
}
}
/// Proxy.
impl<T, P, U> ReadySink for StreamStates<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}
/// Proxy.
impl<T: ApplySettings, P> ApplySettings for StreamStates<T, P> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
/// Proxy.
impl<T: ControlPing, P> ControlPing for StreamStates<T, P> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}