diff --git a/src/client.rs b/src/client.rs index 439be61..c522451 100644 --- a/src/client.rs +++ b/src/client.rs @@ -60,7 +60,7 @@ impl Peer for Client { id.is_server_initiated() } - fn can_create_local_stream() -> bool { + fn local_can_open() -> bool { true } diff --git a/src/lib.rs b/src/lib.rs index 557f1d9..67b7c83 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,9 +85,9 @@ pub trait Peer { /// remote node. fn is_valid_remote_stream_id(id: StreamId) -> bool; - fn can_create_local_stream() -> bool; - fn can_create_remote_stream() -> bool { - !Self::can_create_local_stream() + fn local_can_open() -> bool; + fn remote_can_open() -> bool { + !Self::local_can_open() } //fn can_reserve_local_stream() -> bool; diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 763bd00..d71a072 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -332,24 +332,32 @@ impl ApplySettings for FlowControl } impl ControlStreams for FlowControl { - fn is_valid_local_id(id: StreamId) -> bool { - T::is_valid_local_id(id) + fn local_valid_id(id: StreamId) -> bool { + T::local_valid_id(id) } - fn is_valid_remote_id(id: StreamId) -> bool { - T::is_valid_remote_id(id) + fn remote_valid_id(id: StreamId) -> bool { + T::remote_valid_id(id) } - fn can_create_local_stream() -> bool { - T::can_create_local_stream() + fn local_can_open() -> bool { + T::local_can_open() } - fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_stream_local_half(id) + fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open(id, sz) } - fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_stream_remote_half(id) + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open(id, sz) + } + + fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_local_half(id) + } + + fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_remote_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 17b390a..9ed9fc0 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -31,7 +31,7 @@ use self::framed_write::FramedWrite; use self::ping_pong::{ControlPing, PingPayload, PingPong}; use self::ready::ReadySink; use self::settings::{ApplySettings, /*ControlSettings,*/ Settings}; -use self::state::{StreamState}; +use self::state::{StreamState, PeerState}; use self::stream_recv_close::StreamRecvClose; use self::stream_recv_open::StreamRecvOpen; use self::stream_send_close::StreamSendClose; diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 730134a..887fce7 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -37,7 +37,7 @@ pub struct Settings { remaining_acks: usize, // True when the local settings must be flushed to the remote - is_valid_local_id_dirty: bool, + local_valid_id_dirty: bool, // True when we have received a settings frame from the remote. received_remote: bool, @@ -52,7 +52,7 @@ impl Settings local: local, remote: SettingSet::default(), remaining_acks: 0, - is_valid_local_id_dirty: true, + local_valid_id_dirty: true, received_remote: false, } } @@ -74,18 +74,18 @@ impl Settings local: self.local, remote: self.remote, remaining_acks: self.remaining_acks, - is_valid_local_id_dirty: self.is_valid_local_id_dirty, + local_valid_id_dirty: self.local_valid_id_dirty, received_remote: self.received_remote, } } fn try_send_pending(&mut self) -> Poll<(), ConnectionError> { - trace!("try_send_pending; dirty={} acks={}", self.is_valid_local_id_dirty, self.remaining_acks); - if self.is_valid_local_id_dirty { + trace!("try_send_pending; dirty={} acks={}", self.local_valid_id_dirty, self.remaining_acks); + if self.local_valid_id_dirty { let frame = frame::Settings::new(self.local.clone()); try_ready!(self.try_send(frame)); - self.is_valid_local_id_dirty = false; + self.local_valid_id_dirty = false; } while self.remaining_acks > 0 { @@ -111,7 +111,7 @@ impl Settings impl ControlSettings for Settings{ fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { self.local = local; - self.is_valid_local_id_dirty = true; + self.local_valid_id_dirty = true; Ok(()) } diff --git a/src/proto/stream_recv_close.rs b/src/proto/stream_recv_close.rs index 6127a1d..90ce8a5 100644 --- a/src/proto/stream_recv_close.rs +++ b/src/proto/stream_recv_close.rs @@ -67,24 +67,32 @@ impl ReadySink for StreamRecvClose } impl ControlStreams for StreamRecvClose { - fn is_valid_local_id(id: StreamId) -> bool { - T::is_valid_local_id(id) + fn local_valid_id(id: StreamId) -> bool { + T::local_valid_id(id) } - fn is_valid_remote_id(id: StreamId) -> bool { - T::is_valid_remote_id(id) + fn remote_valid_id(id: StreamId) -> bool { + T::remote_valid_id(id) } - fn can_create_local_stream() -> bool { - T::can_create_local_stream() + fn local_can_open() -> bool { + T::local_can_open() } - fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_stream_local_half(id) + fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open(id, sz) } - fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_stream_remote_half(id) + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open(id, sz) + } + + fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_local_half(id) + } + + fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_remote_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs index 2effe8c..ae32e64 100644 --- a/src/proto/stream_recv_open.rs +++ b/src/proto/stream_recv_open.rs @@ -121,23 +121,34 @@ impl Stream for StreamRecvOpen continue; } - if T::is_valid_remote_id(id) { - if !self.inner.is_local_active(id) { - if !T::can_create_remote_stream() { + if T::remote_valid_id(id) { + if !self.inner.is_remote_active(id) { + if !T::remote_can_open() { return Err(ProtocolError.into()); } if let Some(max) = self.max_concurrency { - if (max as usize) < self.inner.local_active_len() { + if (max as usize) < self.inner.remote_active_len() { return Err(RefusedStream.into()); } } - } - // If the frame ends the stream, it will be handled in - // StreamRecvClose. - return Ok(Async::Ready(Some(frame))); + self.inner.remote_open(id, self.initial_window_size)?; + } + } else { + // Receiving on local stream MUST be on active stream. + if !self.inner.is_local_active(id) && !frame.is_reset() { + return Err(ProtocolError.into()); + } } + + if let &Data(..) = &frame { + self.inner.check_can_recv_data(id)?; + } + + // If the frame ends the stream, it will be handled in + // StreamRecvClose. + return Ok(Async::Ready(Some(frame))); } } } @@ -301,24 +312,32 @@ impl ReadySink for StreamRecvOpen // } impl ControlStreams for StreamRecvOpen { - fn is_valid_local_id(id: StreamId) -> bool { - T::is_valid_local_id(id) + fn local_valid_id(id: StreamId) -> bool { + T::local_valid_id(id) } - fn is_valid_remote_id(id: StreamId) -> bool { - T::is_valid_remote_id(id) + fn remote_valid_id(id: StreamId) -> bool { + T::remote_valid_id(id) } - fn can_create_local_stream() -> bool { - T::can_create_local_stream() + fn local_can_open() -> bool { + T::local_can_open() } - fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_stream_local_half(id) + fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open(id, sz) } - fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_stream_remote_half(id) + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open(id, sz) + } + + fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_local_half(id) + } + + fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_remote_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { diff --git a/src/proto/stream_send_close.rs b/src/proto/stream_send_close.rs index 6c514d7..bb0c406 100644 --- a/src/proto/stream_send_close.rs +++ b/src/proto/stream_send_close.rs @@ -48,7 +48,7 @@ impl Sink for StreamSendClose if let &Frame::Reset(ref rst) = &frame { self.inner.reset_stream(id, rst.reason()); } else { - self.inner.close_stream_local_half(id)?; + self.inner.close_local_half(id)?; } } @@ -81,24 +81,32 @@ impl ApplySettings for StreamSendClose { } impl ControlStreams for StreamSendClose { - fn is_valid_local_id(id: StreamId) -> bool { - T::is_valid_local_id(id) + fn local_valid_id(id: StreamId) -> bool { + T::local_valid_id(id) } - fn is_valid_remote_id(id: StreamId) -> bool { - T::is_valid_remote_id(id) + fn remote_valid_id(id: StreamId) -> bool { + T::remote_valid_id(id) } - fn can_create_local_stream() -> bool { - T::can_create_local_stream() + fn local_can_open() -> bool { + T::local_can_open() } - fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_stream_local_half(id) + fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open(id, sz) } - fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_stream_remote_half(id) + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open(id, sz) + } + + fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_local_half(id) + } + + fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_remote_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs index 19e746c..75eabe5 100644 --- a/src/proto/stream_send_open.rs +++ b/src/proto/stream_send_open.rs @@ -99,16 +99,19 @@ impl Sink for StreamSendOpen return Err(StreamReset(reason).into()) } - if T::is_valid_local_id(id) { - if self.inner.is_local_active(id) { - } else if T::can_create_local_stream() { + if T::local_valid_id(id) { + if !self.inner.is_local_active(id) { + if !T::local_can_open() { + return Err(InvalidStreamId.into()); + } + if let Some(max) = self.max_concurrency { if (max as usize) < self.inner.local_active_len() { return Err(Rejected.into()); } } - // TODO create that shit. + self.inner.local_open(id, self.initial_window_size)?; } } else { // If the frame was part of a remote stream, it MUST already exist. @@ -118,7 +121,7 @@ impl Sink for StreamSendOpen } if let &Data(..) = &frame { - self.inner.check_can_send_data(id); + self.inner.check_can_send_data(id)?; } return self.inner.start_send(frame); @@ -141,24 +144,32 @@ impl ReadySink for StreamSendOpen } impl ControlStreams for StreamSendOpen { - fn is_valid_local_id(id: StreamId) -> bool { - T::is_valid_local_id(id) + fn local_valid_id(id: StreamId) -> bool { + T::local_valid_id(id) } - fn is_valid_remote_id(id: StreamId) -> bool { - T::is_valid_remote_id(id) + fn remote_valid_id(id: StreamId) -> bool { + T::remote_valid_id(id) } - fn can_create_local_stream() -> bool { - T::can_create_local_stream() + fn local_can_open() -> bool { + T::local_can_open() } - fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_stream_local_half(id) + fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open(id, sz) } - fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_stream_remote_half(id) + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open(id, sz) + } + + fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_local_half(id) + } + + fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_remote_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { diff --git a/src/proto/stream_store.rs b/src/proto/stream_store.rs index f46db7c..9508051 100644 --- a/src/proto/stream_store.rs +++ b/src/proto/stream_store.rs @@ -1,6 +1,7 @@ use {ConnectionError, Peer, StreamId}; use error::Reason::{NoError, ProtocolError}; use proto::*; +use proto::state::{StreamState, PeerState}; use fnv::FnvHasher; use ordermap::OrderMap; @@ -10,16 +11,23 @@ use std::marker::PhantomData; /// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up /// to Connection). pub trait ControlStreams { - fn is_valid_local_id(id: StreamId) -> bool; - fn is_valid_remote_id(id: StreamId) -> bool; + fn local_valid_id(id: StreamId) -> bool; + fn remote_valid_id(id: StreamId) -> bool; - fn can_create_local_stream() -> bool; - fn can_create_remote_stream() -> bool { - !Self::can_create_local_stream() + fn local_can_open() -> bool; + fn remote_can_open() -> bool { + !Self::local_can_open() } - fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; - fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; + fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + + // fn local_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; + // fn remote_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; + + fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; + fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; + fn reset_stream(&mut self, id: StreamId, cause: Reason); fn get_reset(&self, id: StreamId) -> Option; @@ -139,19 +147,46 @@ impl StreamStore { } impl ControlStreams for StreamStore { - fn is_valid_local_id(id: StreamId) -> bool { + fn local_valid_id(id: StreamId) -> bool { P::is_valid_local_stream_id(id) } - fn is_valid_remote_id(id: StreamId) -> bool { + fn remote_valid_id(id: StreamId) -> bool { P::is_valid_remote_stream_id(id) } - fn can_create_local_stream() -> bool { - P::can_create_local_stream() + fn local_can_open() -> bool { + P::local_can_open() } - fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + /// Open a new stream from the local side (i.e. as a Client). + // + fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + debug_assert!(Self::local_can_open()); + assert!(Self::local_valid_id(id)); + debug_assert!(!self.local_active.contains_key(&id)); + + self.local_active.insert(id, StreamState::Open { + remote: PeerState::Data(FlowControlState::with_initial_size(sz)), + local: PeerState::Headers, + }); + Ok(()) + } + + /// Open a new stream from the remote side (i.e. as a Server). + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + debug_assert!(Self::remote_can_open()); + assert!(Self::remote_valid_id(id)); + debug_assert!(!self.remote_active.contains_key(&id)); + + self.remote_active.insert(id, StreamState::Open { + local: PeerState::Data(FlowControlState::with_initial_size(sz)), + remote: PeerState::Headers, + }); + Ok(()) + } + + fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { let fully_closed = self.get_active_mut(id) .map(|s| s.close_local()) .unwrap_or_else(|| Err(ProtocolError.into()))?; @@ -163,7 +198,7 @@ impl ControlStreams for StreamStore { Ok(()) } - fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { let fully_closed = self.get_active_mut(id) .map(|s| s.close_remote()) .unwrap_or_else(|| Err(ProtocolError.into()))?; diff --git a/src/server.rs b/src/server.rs index 96ad834..fac683a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -118,7 +118,7 @@ impl Peer for Server { id.is_client_initiated() } - fn can_create_local_stream() -> bool { + fn local_can_open() -> bool { false }