From 845343542270de9a4393708dc6eb6b6773535b87 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 21 Jul 2017 01:30:39 +0000 Subject: [PATCH] wip: improve split stream tracking --- src/error.rs | 4 +- src/frame/mod.rs | 12 +- src/proto/flow_control.rs | 19 +- src/proto/flow_control_state.rs | 24 +-- src/proto/state.rs | 318 +++++++++++++++++--------------- src/proto/stream_recv_close.rs | 20 +- src/proto/stream_recv_open.rs | 49 ++++- src/proto/stream_send_close.rs | 33 +++- src/proto/stream_send_open.rs | 59 +++--- src/proto/stream_store.rs | 92 +++++++-- 10 files changed, 408 insertions(+), 222 deletions(-) diff --git a/src/error.rs b/src/error.rs index d04cde0..b5294b3 100644 --- a/src/error.rs +++ b/src/error.rs @@ -85,7 +85,7 @@ pub enum User { StreamReset(Reason), /// The application attempted to initiate too many streams to remote. - MaxConcurrencyExceeded, + Rejected, // TODO: reserve additional variants } @@ -127,7 +127,7 @@ macro_rules! user_desc { FlowControlViolation => concat!($prefix, "flow control violation"), StreamReset(_) => concat!($prefix, "frame sent on reset stream"), Corrupt => concat!($prefix, "connection state corrupt"), - MaxConcurrencyExceeded => concat!($prefix, "stream would exceed remote max concurrency"), + Rejected => concat!($prefix, "stream would exceed remote max concurrency"), } }); } diff --git a/src/frame/mod.rs b/src/frame/mod.rs index af65434..49a00df 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -87,12 +87,22 @@ impl Frame { match self { &Headers(ref v) => v.is_end_stream(), &Data(ref v) => v.is_end_stream(), - &Reset(_) => true, &PushPromise(_) | &WindowUpdate(_) | &Ping(_) | &Settings(_) => false, + + &Reset(_) => true, + } + } + + pub fn is_reset(&self) -> bool { + use self::Frame::*; + + match self { + &Reset(_) => true, + _ => false, } } } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index ff21847..763bd00 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -344,14 +344,21 @@ impl ControlStreams for FlowControl { T::can_create_local_stream() } - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) + fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_stream_local_half(id) + } + + fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_stream_remote_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { self.inner.reset_stream(id, cause) } + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } fn is_local_active(&self, id: StreamId) -> bool { self.inner.is_local_active(id) } @@ -383,6 +390,14 @@ impl ControlStreams for FlowControl { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { self.inner.remote_flow_controller(id) } + + fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.check_can_send_data(id) + } + + fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.check_can_recv_data(id) + } } impl ControlPing for FlowControl { diff --git a/src/proto/flow_control_state.rs b/src/proto/flow_control_state.rs index 2c5c426..3b731a1 100644 --- a/src/proto/flow_control_state.rs +++ b/src/proto/flow_control_state.rs @@ -112,18 +112,18 @@ fn test_with_initial_size() { assert!(fc.apply_window_update().is_none()); } -#[test] -fn test_with_next_update() { - let mut fc = FlowControlState::with_next_update(10); - - fc.expand_window(8); - assert_eq!(fc.window_size, 0); - assert_eq!(fc.next_window_update, 18); - - assert_eq!(fc.apply_window_update(), Some(18)); - assert_eq!(fc.window_size, 18); - assert_eq!(fc.next_window_update, 0); -} +// #[test] +// fn test_with_next_update() { +// let mut fc = FlowControlState::with_next_update(10); +// +// fc.expand_window(8); +// assert_eq!(fc.window_size, 0); +// assert_eq!(fc.next_window_update, 18); +// +// assert_eq!(fc.apply_window_update(), Some(18)); +// assert_eq!(fc.window_size, 18); +// assert_eq!(fc.next_window_update, 0); +// } #[test] fn test_grow_accumulates() { diff --git a/src/proto/state.rs b/src/proto/state.rs index fa79940..a0167a3 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -59,177 +59,197 @@ pub enum StreamState { } impl StreamState { - pub fn is_closed(&self) -> bool { + // /// Transition the state to represent headers being received. + // /// + // /// Returns true if this state transition results in iniitializing the + // /// stream id. `Err` is returned if this is an invalid state transition. + // pub fn recv_headers

(&mut self, eos: bool, initial_window_size: WindowSize) + // -> Result + // where P: Peer + // { + // use self::StreamState::*; + // use self::PeerState::*; + + // match *self { + // Idle => { + // let local = Headers; + // if eos { + // *self = HalfClosedRemote(local); + // } else { + // let remote = Data(FlowControlState::with_initial_size(initial_window_size)); + // *self = Open { local, remote }; + // } + // Ok(true) + // } + + // Open { local, remote } => { + // try!(remote.check_is_headers(ProtocolError.into())); + // if !eos { + // // Received non-trailers HEADERS on open remote. + // return Err(ProtocolError.into()); + // } + // *self = HalfClosedRemote(local); + // Ok(false) + // } + + // HalfClosedLocal(headers) => { + // try!(headers.check_is_headers(ProtocolError.into())); + // if eos { + // *self = Closed; + // } else { + // let remote = FlowControlState::with_initial_size(initial_window_size); + // *self = HalfClosedLocal(Data(remote)); + // }; + // Ok(false) + // } + + // Closed | HalfClosedRemote(..) => { + // Err(ProtocolError.into()) + // } + // } + // } + + // /// Transition the state to represent headers being sent. + // /// + // /// Returns true if this state transition results in initializing the stream + // /// id. `Err` is returned if this is an invalid state transition. + // pub fn send_headers(&mut self, + // eos: bool, + // initial_window_size: WindowSize) + // -> Result + // { + // use self::StreamState::*; + // use self::PeerState::*; + + // match *self { + // Idle => { + // *self = if eos { + // HalfClosedLocal(Headers) + // } else { + // Open { + // local: Data(FlowControlState::with_initial_size(initial_window_size)), + // remote: Headers, + // } + // }; + + // Ok(true) + // } + + // Open { local, remote } => { + // try!(local.check_is_headers(UnexpectedFrameType.into())); + + // *self = if eos { + // HalfClosedLocal(remote) + // } else { + // let fc = FlowControlState::with_initial_size(initial_window_size); + // let local = Data(fc); + // Open { local, remote } + // }; + + // Ok(false) + // } + + // HalfClosedRemote(local) => { + // try!(local.check_is_headers(UnexpectedFrameType.into())); + + // *self = if eos { + // Closed + // } else { + // let fc = FlowControlState::with_initial_size(initial_window_size); + // HalfClosedRemote(Data(fc)) + // }; + + // Ok(false) + // } + + // Closed | HalfClosedLocal(..) => { + // Err(UnexpectedFrameType.into()) + // } + // } + // } + + pub fn check_can_send_data(&self) -> Result<(), ConnectionError> { use self::StreamState::*; match self { - &Closed => true, - _ => false, + &Open { ref remote, .. } => { + try!(remote.check_is_data(UnexpectedFrameType.into())); + Ok(()) + } + + &HalfClosedLocal(ref remote) => { + try!(remote.check_is_data(UnexpectedFrameType.into())); + Ok(()) + } + + &Idle | &Closed | &HalfClosedRemote(..) => { + Err(UnexpectedFrameType.into()) + } } } - /// Transition the state to represent headers being received. - /// - /// Returns true if this state transition results in iniitializing the - /// stream id. `Err` is returned if this is an invalid state transition. - pub fn recv_headers

(&mut self, eos: bool, initial_window_size: WindowSize) - -> Result - where P: Peer - { + + pub fn check_can_recv_data(&self) -> Result<(), ConnectionError> { + use self::StreamState::*; + + match self { + &Open { ref local, .. } => { + try!(local.check_is_data(ProtocolError.into())); + Ok(()) + } + + &HalfClosedRemote(ref local) => { + try!(local.check_is_data(ProtocolError.into())); + Ok(()) + } + + &Idle | &Closed | &HalfClosedLocal(..) => { + Err(ProtocolError.into()) + } + } + } + + /// Returns true iff the stream is fully closed. + pub fn close_local(&mut self) -> Result { use self::StreamState::*; - use self::PeerState::*; match *self { - Idle => { - let local = Headers; - if eos { - *self = HalfClosedRemote(local); - } else { - let remote = Data(FlowControlState::with_initial_size(initial_window_size)); - *self = Open { local, remote }; - } + Open { remote, .. } => { + *self = HalfClosedLocal(remote); + Ok(false) + } + + HalfClosedLocal(remote) => { + *self = Closed; Ok(true) } - Open { local, remote } => { - try!(remote.check_is_headers(ProtocolError.into())); - if !eos { - // Received non-trailers HEADERS on open remote. - return Err(ProtocolError.into()); - } + Idle | Closed | HalfClosedRemote(..) => { + Err(ProtocolError.into()) + } + } + } + + /// Returns true iff the stream is fully closed. + pub fn close_remote(&mut self) -> Result { + use self::StreamState::*; + + match *self { + Open { local, .. } => { *self = HalfClosedRemote(local); Ok(false) } - HalfClosedLocal(headers) => { - try!(headers.check_is_headers(ProtocolError.into())); - if eos { - *self = Closed; - } else { - let remote = FlowControlState::with_initial_size(initial_window_size); - *self = HalfClosedLocal(Data(remote)); - }; - Ok(false) - } - - Closed | HalfClosedRemote(..) => { - Err(ProtocolError.into()) - } - } - } - - pub fn recv_data(&mut self, eos: bool) -> Result<(), ConnectionError> { - use self::StreamState::*; - - match *self { - Open { local, remote } => { - try!(remote.check_is_data(ProtocolError.into())); - if eos { - *self = HalfClosedRemote(local); - } - Ok(()) - } - - HalfClosedLocal(remote) => { - try!(remote.check_is_data(ProtocolError.into())); - if eos { - *self = Closed; - } - Ok(()) - } - - Closed | HalfClosedRemote(..) => { - Err(ProtocolError.into()) - } - - _ => unimplemented!(), - } - } - - /// Transition the state to represent headers being sent. - /// - /// Returns true if this state transition results in initializing the stream - /// id. `Err` is returned if this is an invalid state transition. - pub fn send_headers(&mut self, - eos: bool, - initial_window_size: WindowSize) - -> Result - { - use self::StreamState::*; - use self::PeerState::*; - - match *self { - Idle => { - *self = if eos { - HalfClosedLocal(Headers) - } else { - Open { - local: Data(FlowControlState::with_initial_size(initial_window_size)), - remote: Headers, - } - }; - + HalfClosedRemote(local) => { + *self = Closed; Ok(true) } - Open { local, remote } => { - try!(local.check_is_headers(UnexpectedFrameType.into())); - - *self = if eos { - HalfClosedLocal(remote) - } else { - let fc = FlowControlState::with_initial_size(initial_window_size); - let local = Data(fc); - Open { local, remote } - }; - - Ok(false) - } - - HalfClosedRemote(local) => { - try!(local.check_is_headers(UnexpectedFrameType.into())); - - *self = if eos { - Closed - } else { - let fc = FlowControlState::with_initial_size(initial_window_size); - HalfClosedRemote(Data(fc)) - }; - - Ok(false) - } - - Closed | HalfClosedLocal(..) => { - Err(UnexpectedFrameType.into()) - } - } - } - - pub fn send_data(&mut self, eos: bool) -> Result<(), ConnectionError> { - use self::StreamState::*; - - match *self { - Open { local, remote } => { - try!(local.check_is_data(UnexpectedFrameType.into())); - if eos { - *self = HalfClosedLocal(remote); - } - Ok(()) - } - - HalfClosedRemote(local) => { - try!(local.check_is_data(UnexpectedFrameType.into())); - if eos { - *self = Closed; - } - Ok(()) - } - Idle | Closed | HalfClosedLocal(..) => { - Err(UnexpectedFrameType.into()) + Err(ProtocolError.into()) } } } - + pub fn local_flow_controller(&mut self) -> Option<&mut FlowControlState> { use self::StreamState::*; use self::PeerState::*; diff --git a/src/proto/stream_recv_close.rs b/src/proto/stream_recv_close.rs index b84bfe8..6127a1d 100644 --- a/src/proto/stream_recv_close.rs +++ b/src/proto/stream_recv_close.rs @@ -79,14 +79,22 @@ impl ControlStreams for StreamRecvClose { T::can_create_local_stream() } - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) + fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_stream_local_half(id) + } + + fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_stream_remote_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { self.inner.reset_stream(id, cause) } + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } + fn is_local_active(&self, id: StreamId) -> bool { self.inner.is_local_active(id) } @@ -118,6 +126,14 @@ impl ControlStreams for StreamRecvClose { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { self.inner.remote_flow_controller(id) } + + fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.check_can_send_data(id) + } + + fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.check_can_recv_data(id) + } } impl ApplySettings for StreamRecvClose { diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs index 5db91ea..2effe8c 100644 --- a/src/proto/stream_recv_open.rs +++ b/src/proto/stream_recv_open.rs @@ -1,4 +1,5 @@ use ConnectionError; +use error::Reason::{ProtocolError, RefusedStream}; use frame::{Frame, StreamId}; use proto::*; @@ -35,7 +36,7 @@ impl StreamRecvOpen where T: Sink, SinkError = ConnectionError>, T: ControlStreams, { - fn send_refusal(&mut self, id: StreamId) -> Poll<(), ConnectionError> { + fn send_refuse(&mut self, id: StreamId) -> Poll<(), ConnectionError> { debug_assert!(self.pending_refuse.is_none()); let f = frame::Reset::new(id, Reason::RefusedStream); @@ -99,7 +100,7 @@ impl Stream for StreamRecvOpen // Since there's only one slot for pending refused streams, it must be cleared // before polling a frame from the transport. if let Some(id) = self.pending_refuse.take() { - try_ready!(self.send_refusal(id)); + try_ready!(self.send_refuse(id)); } loop { @@ -115,12 +116,27 @@ impl Stream for StreamRecvOpen if self.inner.get_reset(id).is_some() { // For now, just ignore frames on reset streams. + debug!("ignoring received frame on reset stream"); // TODO tell the remote to knock it off? continue; } if T::is_valid_remote_id(id) { - unimplemented!() + if !self.inner.is_local_active(id) { + if !T::can_create_remote_stream() { + return Err(ProtocolError.into()); + } + + if let Some(max) = self.max_concurrency { + if (max as usize) < self.inner.local_active_len() { + return Err(RefusedStream.into()); + } + } + } + + // If the frame ends the stream, it will be handled in + // StreamRecvClose. + return Ok(Async::Ready(Some(frame))); } } } @@ -139,7 +155,7 @@ impl Sink for StreamRecvOpen // The local must complete refusing the remote stream before sending any other // frames. if let Some(id) = self.pending_refuse.take() { - if self.send_refusal(id)?.is_not_ready() { + if self.send_refuse(id)?.is_not_ready() { return Ok(AsyncSink::NotReady(frame)); } } @@ -157,7 +173,7 @@ impl Sink for StreamRecvOpen fn poll_complete(&mut self) -> Poll<(), T::SinkError> { if let Some(id) = self.pending_refuse.take() { - try_ready!(self.send_refusal(id)); + try_ready!(self.send_refuse(id)); } self.inner.poll_complete() @@ -173,7 +189,7 @@ impl ReadySink for StreamRecvOpen { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { if let Some(id) = self.pending_refuse.take() { - try_ready!(self.send_refusal(id)); + try_ready!(self.send_refuse(id)); } self.inner.poll_ready() @@ -284,7 +300,6 @@ impl ReadySink for StreamRecvOpen // return Ok(Async::Ready(None)); // } - impl ControlStreams for StreamRecvOpen { fn is_valid_local_id(id: StreamId) -> bool { T::is_valid_local_id(id) @@ -298,14 +313,22 @@ impl ControlStreams for StreamRecvOpen { T::can_create_local_stream() } - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) + fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_stream_local_half(id) + } + + fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_stream_remote_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { self.inner.reset_stream(id, cause) } + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } + fn is_local_active(&self, id: StreamId) -> bool { self.inner.is_local_active(id) } @@ -337,6 +360,14 @@ impl ControlStreams for StreamRecvOpen { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { self.inner.remote_flow_controller(id) } + + fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.check_can_send_data(id) + } + + fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.check_can_recv_data(id) + } } impl ControlPing for StreamRecvOpen { diff --git a/src/proto/stream_send_close.rs b/src/proto/stream_send_close.rs index 1decb24..6c514d7 100644 --- a/src/proto/stream_send_close.rs +++ b/src/proto/stream_send_close.rs @@ -42,8 +42,17 @@ impl Sink for StreamSendClose type SinkItem = Frame; type SinkError = ConnectionError; - fn start_send(&mut self, item: Self::SinkItem) -> StartSend, ConnectionError> { - self.inner.start_send(item) + fn start_send(&mut self, frame: Self::SinkItem) -> StartSend, ConnectionError> { + if frame.is_end_stream() { + let id = frame.stream_id(); + if let &Frame::Reset(ref rst) = &frame { + self.inner.reset_stream(id, rst.reason()); + } else { + self.inner.close_stream_local_half(id)?; + } + } + + self.inner.start_send(frame) } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { @@ -84,14 +93,22 @@ impl ControlStreams for StreamSendClose { T::can_create_local_stream() } - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) + fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_stream_local_half(id) + } + + fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_stream_remote_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { self.inner.reset_stream(id, cause) } + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } + fn is_local_active(&self, id: StreamId) -> bool { self.inner.is_local_active(id) } @@ -123,6 +140,14 @@ impl ControlStreams for StreamSendClose { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { self.inner.remote_flow_controller(id) } + + fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.check_can_send_data(id) + } + + fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.check_can_recv_data(id) + } } impl ControlPing for StreamSendClose { diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs index 5f1dcba..19e746c 100644 --- a/src/proto/stream_send_open.rs +++ b/src/proto/stream_send_open.rs @@ -1,5 +1,5 @@ use ConnectionError; -use error::User::{InvalidStreamId, StreamReset}; +use error::User::{InvalidStreamId, StreamReset, Rejected}; use frame::{Frame, SettingSet}; use proto::*; @@ -101,34 +101,27 @@ impl Sink for StreamSendOpen if T::is_valid_local_id(id) { if self.inner.is_local_active(id) { - // If the frame ends thestream, it will be handled in stream_recv. - return self.inner.start_send(frame); - } - - if T::can_create_local_stream() { - let has_capacity = match self.max_concurrency { - None => true, - Some(max) => self.inner.local_active_len() < (max as usize), - }; - if has_capacity { - // create that shit. - unimplemented!(); + } else if T::can_create_local_stream() { + if let Some(max) = self.max_concurrency { + if (max as usize) < self.inner.local_active_len() { + return Err(Rejected.into()); + } } + + // TODO create that shit. } } else { - if self.inner.is_remote_active(id) { - // If the frame was part of a remote stream, it MUST already exist. If the - // frame ends thestream, it will be handled in stream_recv. - return self.inner.start_send(frame); - } - - if let Reset(rst) = frame { - return self.inner.start_send(Reset(rst)); + // If the frame was part of a remote stream, it MUST already exist. + if !self.inner.is_remote_active(id) && !frame.is_reset() { + return Err(InvalidStreamId.into()); } } - // Tried to send a frame on a stream - return Err(InvalidStreamId.into()); + if let &Data(..) = &frame { + self.inner.check_can_send_data(id); + } + + return self.inner.start_send(frame); } fn poll_complete(&mut self) -> Poll<(), T::SinkError> { @@ -160,14 +153,22 @@ impl ControlStreams for StreamSendOpen { T::can_create_local_stream() } - fn get_reset(&self, id: StreamId) -> Option { - self.inner.get_reset(id) + fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_stream_local_half(id) + } + + fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_stream_remote_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { self.inner.reset_stream(id, cause) } + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } + fn is_local_active(&self, id: StreamId) -> bool { self.inner.is_local_active(id) } @@ -199,6 +200,14 @@ impl ControlStreams for StreamSendOpen { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { self.inner.remote_flow_controller(id) } + + fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.check_can_send_data(id) + } + + fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.check_can_recv_data(id) + } } impl ControlFlow for StreamSendOpen { diff --git a/src/proto/stream_store.rs b/src/proto/stream_store.rs index ff8345c..f46db7c 100644 --- a/src/proto/stream_store.rs +++ b/src/proto/stream_store.rs @@ -1,5 +1,5 @@ use {ConnectionError, Peer, StreamId}; -use error::Reason; +use error::Reason::{NoError, ProtocolError}; use proto::*; use fnv::FnvHasher; @@ -18,8 +18,10 @@ pub trait ControlStreams { !Self::can_create_local_stream() } - fn get_reset(&self, id: StreamId) -> Option; + fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; + fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; fn reset_stream(&mut self, id: StreamId, cause: Reason); + fn get_reset(&self, id: StreamId) -> Option; fn is_local_active(&self, id: StreamId) -> bool; fn is_remote_active(&self, id: StreamId) -> bool; @@ -33,13 +35,8 @@ pub trait ControlStreams { fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32); fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32); - // fn get_active(&self, id: StreamId) -> Option<&StreamState> { - // self.streams(id).get_active(id) - // } - - // fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { - // self.streams_mut(id).get_active_mut(id) - // } + fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError>; + fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError>; } /// Holds the underlying stream state to be accessed by upper layers. @@ -112,6 +109,35 @@ impl ReadySink for StreamStore } } +impl StreamStore { + pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> { + assert!(!id.is_zero()); + if P::is_valid_local_stream_id(id) { + self.local_active.get(&id) + } else { + self.remote_active.get(&id) + } + } + + pub fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { + assert!(!id.is_zero()); + if P::is_valid_local_stream_id(id) { + self.local_active.get_mut(&id) + } else { + self.remote_active.get_mut(&id) + } + } + + pub fn remove_active(&mut self, id: StreamId) { + assert!(!id.is_zero()); + if P::is_valid_local_stream_id(id) { + self.local_active.remove(&id); + } else { + self.remote_active.remove(&id); + } + } +} + impl ControlStreams for StreamStore { fn is_valid_local_id(id: StreamId) -> bool { P::is_valid_local_stream_id(id) @@ -125,19 +151,39 @@ impl ControlStreams for StreamStore { P::can_create_local_stream() } - fn get_reset(&self, id: StreamId) -> Option { - self.reset.get(&id).map(|r| *r) + fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + let fully_closed = self.get_active_mut(id) + .map(|s| s.close_local()) + .unwrap_or_else(|| Err(ProtocolError.into()))?; + + if fully_closed { + self.remove_active(id); + self.reset.insert(id, NoError); + } + Ok(()) + } + + fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + let fully_closed = self.get_active_mut(id) + .map(|s| s.close_remote()) + .unwrap_or_else(|| Err(ProtocolError.into()))?; + + if fully_closed { + self.remove_active(id); + self.reset.insert(id, NoError); + } + Ok(()) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { - if P::is_valid_local_stream_id(id) { - self.local_active.remove(&id); - } else { - self.remote_active.remove(&id); - } + self.remove_active(id); self.reset.insert(id, cause); } + fn get_reset(&self, id: StreamId) -> Option { + self.reset.get(&id).map(|r| *r) + } + fn is_local_active(&self, id: StreamId) -> bool { self.local_active.contains_key(&id) } @@ -237,6 +283,20 @@ impl ControlStreams for StreamStore { self.remote_active.get_mut(&id).and_then(|s| s.remote_flow_controller()) } } + + fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + if let Some(s) = self.get_active(id) { + return s.check_can_send_data(); + } + Err(ProtocolError.into()) + } + + fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { + if let Some(s) = self.get_active(id) { + return s.check_can_recv_data(); + } + Err(ProtocolError.into()) + } } impl ApplySettings for StreamStore {