wip: improve split stream tracking

This commit is contained in:
Oliver Gould
2017-07-21 01:30:39 +00:00
parent 44edd6a4d4
commit 8453435422
10 changed files with 408 additions and 222 deletions

View File

@@ -85,7 +85,7 @@ pub enum User {
StreamReset(Reason), StreamReset(Reason),
/// The application attempted to initiate too many streams to remote. /// The application attempted to initiate too many streams to remote.
MaxConcurrencyExceeded, Rejected,
// TODO: reserve additional variants // TODO: reserve additional variants
} }
@@ -127,7 +127,7 @@ macro_rules! user_desc {
FlowControlViolation => concat!($prefix, "flow control violation"), FlowControlViolation => concat!($prefix, "flow control violation"),
StreamReset(_) => concat!($prefix, "frame sent on reset stream"), StreamReset(_) => concat!($prefix, "frame sent on reset stream"),
Corrupt => concat!($prefix, "connection state corrupt"), Corrupt => concat!($prefix, "connection state corrupt"),
MaxConcurrencyExceeded => concat!($prefix, "stream would exceed remote max concurrency"), Rejected => concat!($prefix, "stream would exceed remote max concurrency"),
} }
}); });
} }

View File

@@ -87,12 +87,22 @@ impl<T> Frame<T> {
match self { match self {
&Headers(ref v) => v.is_end_stream(), &Headers(ref v) => v.is_end_stream(),
&Data(ref v) => v.is_end_stream(), &Data(ref v) => v.is_end_stream(),
&Reset(_) => true,
&PushPromise(_) | &PushPromise(_) |
&WindowUpdate(_) | &WindowUpdate(_) |
&Ping(_) | &Ping(_) |
&Settings(_) => false, &Settings(_) => false,
&Reset(_) => true,
}
}
pub fn is_reset(&self) -> bool {
use self::Frame::*;
match self {
&Reset(_) => true,
_ => false,
} }
} }
} }

View File

@@ -344,14 +344,21 @@ impl<T: ControlStreams> ControlStreams for FlowControl<T> {
T::can_create_local_stream() T::can_create_local_stream()
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> { fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.get_reset(id) self.inner.close_stream_local_half(id)
}
fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_stream_remote_half(id)
} }
fn reset_stream(&mut self, id: StreamId, cause: Reason) { fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause) self.inner.reset_stream(id, cause)
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}
fn is_local_active(&self, id: StreamId) -> bool { fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id) self.inner.is_local_active(id)
} }
@@ -383,6 +390,14 @@ impl<T: ControlStreams> ControlStreams for FlowControl<T> {
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id) self.inner.remote_flow_controller(id)
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.check_can_send_data(id)
}
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.check_can_recv_data(id)
}
} }
impl<T: ControlPing> ControlPing for FlowControl<T> { impl<T: ControlPing> ControlPing for FlowControl<T> {

View File

@@ -112,18 +112,18 @@ fn test_with_initial_size() {
assert!(fc.apply_window_update().is_none()); assert!(fc.apply_window_update().is_none());
} }
#[test] // #[test]
fn test_with_next_update() { // fn test_with_next_update() {
let mut fc = FlowControlState::with_next_update(10); // let mut fc = FlowControlState::with_next_update(10);
//
fc.expand_window(8); // fc.expand_window(8);
assert_eq!(fc.window_size, 0); // assert_eq!(fc.window_size, 0);
assert_eq!(fc.next_window_update, 18); // assert_eq!(fc.next_window_update, 18);
//
assert_eq!(fc.apply_window_update(), Some(18)); // assert_eq!(fc.apply_window_update(), Some(18));
assert_eq!(fc.window_size, 18); // assert_eq!(fc.window_size, 18);
assert_eq!(fc.next_window_update, 0); // assert_eq!(fc.next_window_update, 0);
} // }
#[test] #[test]
fn test_grow_accumulates() { fn test_grow_accumulates() {

View File

@@ -59,177 +59,197 @@ pub enum StreamState {
} }
impl StreamState { impl StreamState {
pub fn is_closed(&self) -> bool { // /// Transition the state to represent headers being received.
// ///
// /// Returns true if this state transition results in iniitializing the
// /// stream id. `Err` is returned if this is an invalid state transition.
// pub fn recv_headers<P>(&mut self, eos: bool, initial_window_size: WindowSize)
// -> Result<bool, ConnectionError>
// where P: Peer
// {
// use self::StreamState::*;
// use self::PeerState::*;
// match *self {
// Idle => {
// let local = Headers;
// if eos {
// *self = HalfClosedRemote(local);
// } else {
// let remote = Data(FlowControlState::with_initial_size(initial_window_size));
// *self = Open { local, remote };
// }
// Ok(true)
// }
// Open { local, remote } => {
// try!(remote.check_is_headers(ProtocolError.into()));
// if !eos {
// // Received non-trailers HEADERS on open remote.
// return Err(ProtocolError.into());
// }
// *self = HalfClosedRemote(local);
// Ok(false)
// }
// HalfClosedLocal(headers) => {
// try!(headers.check_is_headers(ProtocolError.into()));
// if eos {
// *self = Closed;
// } else {
// let remote = FlowControlState::with_initial_size(initial_window_size);
// *self = HalfClosedLocal(Data(remote));
// };
// Ok(false)
// }
// Closed | HalfClosedRemote(..) => {
// Err(ProtocolError.into())
// }
// }
// }
// /// Transition the state to represent headers being sent.
// ///
// /// Returns true if this state transition results in initializing the stream
// /// id. `Err` is returned if this is an invalid state transition.
// pub fn send_headers<P: Peer>(&mut self,
// eos: bool,
// initial_window_size: WindowSize)
// -> Result<bool, ConnectionError>
// {
// use self::StreamState::*;
// use self::PeerState::*;
// match *self {
// Idle => {
// *self = if eos {
// HalfClosedLocal(Headers)
// } else {
// Open {
// local: Data(FlowControlState::with_initial_size(initial_window_size)),
// remote: Headers,
// }
// };
// Ok(true)
// }
// Open { local, remote } => {
// try!(local.check_is_headers(UnexpectedFrameType.into()));
// *self = if eos {
// HalfClosedLocal(remote)
// } else {
// let fc = FlowControlState::with_initial_size(initial_window_size);
// let local = Data(fc);
// Open { local, remote }
// };
// Ok(false)
// }
// HalfClosedRemote(local) => {
// try!(local.check_is_headers(UnexpectedFrameType.into()));
// *self = if eos {
// Closed
// } else {
// let fc = FlowControlState::with_initial_size(initial_window_size);
// HalfClosedRemote(Data(fc))
// };
// Ok(false)
// }
// Closed | HalfClosedLocal(..) => {
// Err(UnexpectedFrameType.into())
// }
// }
// }
pub fn check_can_send_data(&self) -> Result<(), ConnectionError> {
use self::StreamState::*; use self::StreamState::*;
match self { match self {
&Closed => true, &Open { ref remote, .. } => {
_ => false, try!(remote.check_is_data(UnexpectedFrameType.into()));
Ok(())
}
&HalfClosedLocal(ref remote) => {
try!(remote.check_is_data(UnexpectedFrameType.into()));
Ok(())
}
&Idle | &Closed | &HalfClosedRemote(..) => {
Err(UnexpectedFrameType.into())
}
} }
} }
/// Transition the state to represent headers being received.
/// pub fn check_can_recv_data(&self) -> Result<(), ConnectionError> {
/// Returns true if this state transition results in iniitializing the use self::StreamState::*;
/// stream id. `Err` is returned if this is an invalid state transition.
pub fn recv_headers<P>(&mut self, eos: bool, initial_window_size: WindowSize) match self {
-> Result<bool, ConnectionError> &Open { ref local, .. } => {
where P: Peer try!(local.check_is_data(ProtocolError.into()));
{ Ok(())
}
&HalfClosedRemote(ref local) => {
try!(local.check_is_data(ProtocolError.into()));
Ok(())
}
&Idle | &Closed | &HalfClosedLocal(..) => {
Err(ProtocolError.into())
}
}
}
/// Returns true iff the stream is fully closed.
pub fn close_local(&mut self) -> Result<bool, ConnectionError> {
use self::StreamState::*; use self::StreamState::*;
use self::PeerState::*;
match *self { match *self {
Idle => { Open { remote, .. } => {
let local = Headers; *self = HalfClosedLocal(remote);
if eos { Ok(false)
*self = HalfClosedRemote(local); }
} else {
let remote = Data(FlowControlState::with_initial_size(initial_window_size)); HalfClosedLocal(remote) => {
*self = Open { local, remote }; *self = Closed;
}
Ok(true) Ok(true)
} }
Open { local, remote } => { Idle | Closed | HalfClosedRemote(..) => {
try!(remote.check_is_headers(ProtocolError.into())); Err(ProtocolError.into())
if !eos { }
// Received non-trailers HEADERS on open remote. }
return Err(ProtocolError.into()); }
}
/// Returns true iff the stream is fully closed.
pub fn close_remote(&mut self) -> Result<bool, ConnectionError> {
use self::StreamState::*;
match *self {
Open { local, .. } => {
*self = HalfClosedRemote(local); *self = HalfClosedRemote(local);
Ok(false) Ok(false)
} }
HalfClosedLocal(headers) => { HalfClosedRemote(local) => {
try!(headers.check_is_headers(ProtocolError.into())); *self = Closed;
if eos {
*self = Closed;
} else {
let remote = FlowControlState::with_initial_size(initial_window_size);
*self = HalfClosedLocal(Data(remote));
};
Ok(false)
}
Closed | HalfClosedRemote(..) => {
Err(ProtocolError.into())
}
}
}
pub fn recv_data(&mut self, eos: bool) -> Result<(), ConnectionError> {
use self::StreamState::*;
match *self {
Open { local, remote } => {
try!(remote.check_is_data(ProtocolError.into()));
if eos {
*self = HalfClosedRemote(local);
}
Ok(())
}
HalfClosedLocal(remote) => {
try!(remote.check_is_data(ProtocolError.into()));
if eos {
*self = Closed;
}
Ok(())
}
Closed | HalfClosedRemote(..) => {
Err(ProtocolError.into())
}
_ => unimplemented!(),
}
}
/// Transition the state to represent headers being sent.
///
/// Returns true if this state transition results in initializing the stream
/// id. `Err` is returned if this is an invalid state transition.
pub fn send_headers<P: Peer>(&mut self,
eos: bool,
initial_window_size: WindowSize)
-> Result<bool, ConnectionError>
{
use self::StreamState::*;
use self::PeerState::*;
match *self {
Idle => {
*self = if eos {
HalfClosedLocal(Headers)
} else {
Open {
local: Data(FlowControlState::with_initial_size(initial_window_size)),
remote: Headers,
}
};
Ok(true) Ok(true)
} }
Open { local, remote } => {
try!(local.check_is_headers(UnexpectedFrameType.into()));
*self = if eos {
HalfClosedLocal(remote)
} else {
let fc = FlowControlState::with_initial_size(initial_window_size);
let local = Data(fc);
Open { local, remote }
};
Ok(false)
}
HalfClosedRemote(local) => {
try!(local.check_is_headers(UnexpectedFrameType.into()));
*self = if eos {
Closed
} else {
let fc = FlowControlState::with_initial_size(initial_window_size);
HalfClosedRemote(Data(fc))
};
Ok(false)
}
Closed | HalfClosedLocal(..) => {
Err(UnexpectedFrameType.into())
}
}
}
pub fn send_data(&mut self, eos: bool) -> Result<(), ConnectionError> {
use self::StreamState::*;
match *self {
Open { local, remote } => {
try!(local.check_is_data(UnexpectedFrameType.into()));
if eos {
*self = HalfClosedLocal(remote);
}
Ok(())
}
HalfClosedRemote(local) => {
try!(local.check_is_data(UnexpectedFrameType.into()));
if eos {
*self = Closed;
}
Ok(())
}
Idle | Closed | HalfClosedLocal(..) => { Idle | Closed | HalfClosedLocal(..) => {
Err(UnexpectedFrameType.into()) Err(ProtocolError.into())
} }
} }
} }
pub fn local_flow_controller(&mut self) -> Option<&mut FlowControlState> { pub fn local_flow_controller(&mut self) -> Option<&mut FlowControlState> {
use self::StreamState::*; use self::StreamState::*;
use self::PeerState::*; use self::PeerState::*;

View File

@@ -79,14 +79,22 @@ impl<T: ControlStreams> ControlStreams for StreamRecvClose<T> {
T::can_create_local_stream() T::can_create_local_stream()
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> { fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.get_reset(id) self.inner.close_stream_local_half(id)
}
fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_stream_remote_half(id)
} }
fn reset_stream(&mut self, id: StreamId, cause: Reason) { fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause) self.inner.reset_stream(id, cause)
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}
fn is_local_active(&self, id: StreamId) -> bool { fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id) self.inner.is_local_active(id)
} }
@@ -118,6 +126,14 @@ impl<T: ControlStreams> ControlStreams for StreamRecvClose<T> {
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id) self.inner.remote_flow_controller(id)
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.check_can_send_data(id)
}
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.check_can_recv_data(id)
}
} }
impl<T: ApplySettings> ApplySettings for StreamRecvClose<T> { impl<T: ApplySettings> ApplySettings for StreamRecvClose<T> {

View File

@@ -1,4 +1,5 @@
use ConnectionError; use ConnectionError;
use error::Reason::{ProtocolError, RefusedStream};
use frame::{Frame, StreamId}; use frame::{Frame, StreamId};
use proto::*; use proto::*;
@@ -35,7 +36,7 @@ impl<T, U> StreamRecvOpen<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams, T: ControlStreams,
{ {
fn send_refusal(&mut self, id: StreamId) -> Poll<(), ConnectionError> { fn send_refuse(&mut self, id: StreamId) -> Poll<(), ConnectionError> {
debug_assert!(self.pending_refuse.is_none()); debug_assert!(self.pending_refuse.is_none());
let f = frame::Reset::new(id, Reason::RefusedStream); let f = frame::Reset::new(id, Reason::RefusedStream);
@@ -99,7 +100,7 @@ impl<T, U> Stream for StreamRecvOpen<T>
// Since there's only one slot for pending refused streams, it must be cleared // Since there's only one slot for pending refused streams, it must be cleared
// before polling a frame from the transport. // before polling a frame from the transport.
if let Some(id) = self.pending_refuse.take() { if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refusal(id)); try_ready!(self.send_refuse(id));
} }
loop { loop {
@@ -115,12 +116,27 @@ impl<T, U> Stream for StreamRecvOpen<T>
if self.inner.get_reset(id).is_some() { if self.inner.get_reset(id).is_some() {
// For now, just ignore frames on reset streams. // For now, just ignore frames on reset streams.
debug!("ignoring received frame on reset stream");
// TODO tell the remote to knock it off? // TODO tell the remote to knock it off?
continue; continue;
} }
if T::is_valid_remote_id(id) { if T::is_valid_remote_id(id) {
unimplemented!() if !self.inner.is_local_active(id) {
if !T::can_create_remote_stream() {
return Err(ProtocolError.into());
}
if let Some(max) = self.max_concurrency {
if (max as usize) < self.inner.local_active_len() {
return Err(RefusedStream.into());
}
}
}
// If the frame ends the stream, it will be handled in
// StreamRecvClose.
return Ok(Async::Ready(Some(frame)));
} }
} }
} }
@@ -139,7 +155,7 @@ impl<T, U> Sink for StreamRecvOpen<T>
// The local must complete refusing the remote stream before sending any other // The local must complete refusing the remote stream before sending any other
// frames. // frames.
if let Some(id) = self.pending_refuse.take() { if let Some(id) = self.pending_refuse.take() {
if self.send_refusal(id)?.is_not_ready() { if self.send_refuse(id)?.is_not_ready() {
return Ok(AsyncSink::NotReady(frame)); return Ok(AsyncSink::NotReady(frame));
} }
} }
@@ -157,7 +173,7 @@ impl<T, U> Sink for StreamRecvOpen<T>
fn poll_complete(&mut self) -> Poll<(), T::SinkError> { fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
if let Some(id) = self.pending_refuse.take() { if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refusal(id)); try_ready!(self.send_refuse(id));
} }
self.inner.poll_complete() self.inner.poll_complete()
@@ -173,7 +189,7 @@ impl<T, U> ReadySink for StreamRecvOpen<T>
{ {
fn poll_ready(&mut self) -> Poll<(), ConnectionError> { fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
if let Some(id) = self.pending_refuse.take() { if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refusal(id)); try_ready!(self.send_refuse(id));
} }
self.inner.poll_ready() self.inner.poll_ready()
@@ -284,7 +300,6 @@ impl<T, U> ReadySink for StreamRecvOpen<T>
// return Ok(Async::Ready(None)); // return Ok(Async::Ready(None));
// } // }
impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> { impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> {
fn is_valid_local_id(id: StreamId) -> bool { fn is_valid_local_id(id: StreamId) -> bool {
T::is_valid_local_id(id) T::is_valid_local_id(id)
@@ -298,14 +313,22 @@ impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> {
T::can_create_local_stream() T::can_create_local_stream()
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> { fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.get_reset(id) self.inner.close_stream_local_half(id)
}
fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_stream_remote_half(id)
} }
fn reset_stream(&mut self, id: StreamId, cause: Reason) { fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause) self.inner.reset_stream(id, cause)
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}
fn is_local_active(&self, id: StreamId) -> bool { fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id) self.inner.is_local_active(id)
} }
@@ -337,6 +360,14 @@ impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> {
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id) self.inner.remote_flow_controller(id)
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.check_can_send_data(id)
}
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.check_can_recv_data(id)
}
} }
impl<T: ControlPing> ControlPing for StreamRecvOpen<T> { impl<T: ControlPing> ControlPing for StreamRecvOpen<T> {

View File

@@ -42,8 +42,17 @@ impl<T, U> Sink for StreamSendClose<T>
type SinkItem = Frame<U>; type SinkItem = Frame<U>;
type SinkError = ConnectionError; type SinkError = ConnectionError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Frame<U>, ConnectionError> { fn start_send(&mut self, frame: Self::SinkItem) -> StartSend<Frame<U>, ConnectionError> {
self.inner.start_send(item) if frame.is_end_stream() {
let id = frame.stream_id();
if let &Frame::Reset(ref rst) = &frame {
self.inner.reset_stream(id, rst.reason());
} else {
self.inner.close_stream_local_half(id)?;
}
}
self.inner.start_send(frame)
} }
fn poll_complete(&mut self) -> Poll<(), ConnectionError> { fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
@@ -84,14 +93,22 @@ impl<T: ControlStreams> ControlStreams for StreamSendClose<T> {
T::can_create_local_stream() T::can_create_local_stream()
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> { fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.get_reset(id) self.inner.close_stream_local_half(id)
}
fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_stream_remote_half(id)
} }
fn reset_stream(&mut self, id: StreamId, cause: Reason) { fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause) self.inner.reset_stream(id, cause)
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}
fn is_local_active(&self, id: StreamId) -> bool { fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id) self.inner.is_local_active(id)
} }
@@ -123,6 +140,14 @@ impl<T: ControlStreams> ControlStreams for StreamSendClose<T> {
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id) self.inner.remote_flow_controller(id)
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.check_can_send_data(id)
}
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.check_can_recv_data(id)
}
} }
impl<T: ControlPing> ControlPing for StreamSendClose<T> { impl<T: ControlPing> ControlPing for StreamSendClose<T> {

View File

@@ -1,5 +1,5 @@
use ConnectionError; use ConnectionError;
use error::User::{InvalidStreamId, StreamReset}; use error::User::{InvalidStreamId, StreamReset, Rejected};
use frame::{Frame, SettingSet}; use frame::{Frame, SettingSet};
use proto::*; use proto::*;
@@ -101,34 +101,27 @@ impl<T, U> Sink for StreamSendOpen<T>
if T::is_valid_local_id(id) { if T::is_valid_local_id(id) {
if self.inner.is_local_active(id) { if self.inner.is_local_active(id) {
// If the frame ends thestream, it will be handled in stream_recv. } else if T::can_create_local_stream() {
return self.inner.start_send(frame); if let Some(max) = self.max_concurrency {
} if (max as usize) < self.inner.local_active_len() {
return Err(Rejected.into());
if T::can_create_local_stream() { }
let has_capacity = match self.max_concurrency {
None => true,
Some(max) => self.inner.local_active_len() < (max as usize),
};
if has_capacity {
// create that shit.
unimplemented!();
} }
// TODO create that shit.
} }
} else { } else {
if self.inner.is_remote_active(id) { // If the frame was part of a remote stream, it MUST already exist.
// If the frame was part of a remote stream, it MUST already exist. If the if !self.inner.is_remote_active(id) && !frame.is_reset() {
// frame ends thestream, it will be handled in stream_recv. return Err(InvalidStreamId.into());
return self.inner.start_send(frame);
}
if let Reset(rst) = frame {
return self.inner.start_send(Reset(rst));
} }
} }
// Tried to send a frame on a stream if let &Data(..) = &frame {
return Err(InvalidStreamId.into()); self.inner.check_can_send_data(id);
}
return self.inner.start_send(frame);
} }
fn poll_complete(&mut self) -> Poll<(), T::SinkError> { fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
@@ -160,14 +153,22 @@ impl<T: ControlStreams> ControlStreams for StreamSendOpen<T> {
T::can_create_local_stream() T::can_create_local_stream()
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> { fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.get_reset(id) self.inner.close_stream_local_half(id)
}
fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_stream_remote_half(id)
} }
fn reset_stream(&mut self, id: StreamId, cause: Reason) { fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause) self.inner.reset_stream(id, cause)
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}
fn is_local_active(&self, id: StreamId) -> bool { fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id) self.inner.is_local_active(id)
} }
@@ -199,6 +200,14 @@ impl<T: ControlStreams> ControlStreams for StreamSendOpen<T> {
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id) self.inner.remote_flow_controller(id)
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.check_can_send_data(id)
}
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.check_can_recv_data(id)
}
} }
impl<T: ControlFlow> ControlFlow for StreamSendOpen<T> { impl<T: ControlFlow> ControlFlow for StreamSendOpen<T> {

View File

@@ -1,5 +1,5 @@
use {ConnectionError, Peer, StreamId}; use {ConnectionError, Peer, StreamId};
use error::Reason; use error::Reason::{NoError, ProtocolError};
use proto::*; use proto::*;
use fnv::FnvHasher; use fnv::FnvHasher;
@@ -18,8 +18,10 @@ pub trait ControlStreams {
!Self::can_create_local_stream() !Self::can_create_local_stream()
} }
fn get_reset(&self, id: StreamId) -> Option<Reason>; fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError>;
fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError>;
fn reset_stream(&mut self, id: StreamId, cause: Reason); fn reset_stream(&mut self, id: StreamId, cause: Reason);
fn get_reset(&self, id: StreamId) -> Option<Reason>;
fn is_local_active(&self, id: StreamId) -> bool; fn is_local_active(&self, id: StreamId) -> bool;
fn is_remote_active(&self, id: StreamId) -> bool; fn is_remote_active(&self, id: StreamId) -> bool;
@@ -33,13 +35,8 @@ pub trait ControlStreams {
fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32); fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32);
fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32); fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32);
// fn get_active(&self, id: StreamId) -> Option<&StreamState> { fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError>;
// self.streams(id).get_active(id) fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError>;
// }
// fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
// self.streams_mut(id).get_active_mut(id)
// }
} }
/// Holds the underlying stream state to be accessed by upper layers. /// Holds the underlying stream state to be accessed by upper layers.
@@ -112,6 +109,35 @@ impl<T, P, U> ReadySink for StreamStore<T, P>
} }
} }
impl<T, P: Peer> StreamStore<T, P> {
pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.get(&id)
} else {
self.remote_active.get(&id)
}
}
pub fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(&id)
} else {
self.remote_active.get_mut(&id)
}
}
pub fn remove_active(&mut self, id: StreamId) {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.remove(&id);
} else {
self.remote_active.remove(&id);
}
}
}
impl<T, P: Peer> ControlStreams for StreamStore<T, P> { impl<T, P: Peer> ControlStreams for StreamStore<T, P> {
fn is_valid_local_id(id: StreamId) -> bool { fn is_valid_local_id(id: StreamId) -> bool {
P::is_valid_local_stream_id(id) P::is_valid_local_stream_id(id)
@@ -125,19 +151,39 @@ impl<T, P: Peer> ControlStreams for StreamStore<T, P> {
P::can_create_local_stream() P::can_create_local_stream()
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> { fn close_stream_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.reset.get(&id).map(|r| *r) let fully_closed = self.get_active_mut(id)
.map(|s| s.close_local())
.unwrap_or_else(|| Err(ProtocolError.into()))?;
if fully_closed {
self.remove_active(id);
self.reset.insert(id, NoError);
}
Ok(())
}
fn close_stream_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
let fully_closed = self.get_active_mut(id)
.map(|s| s.close_remote())
.unwrap_or_else(|| Err(ProtocolError.into()))?;
if fully_closed {
self.remove_active(id);
self.reset.insert(id, NoError);
}
Ok(())
} }
fn reset_stream(&mut self, id: StreamId, cause: Reason) { fn reset_stream(&mut self, id: StreamId, cause: Reason) {
if P::is_valid_local_stream_id(id) { self.remove_active(id);
self.local_active.remove(&id);
} else {
self.remote_active.remove(&id);
}
self.reset.insert(id, cause); self.reset.insert(id, cause);
} }
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.reset.get(&id).map(|r| *r)
}
fn is_local_active(&self, id: StreamId) -> bool { fn is_local_active(&self, id: StreamId) -> bool {
self.local_active.contains_key(&id) self.local_active.contains_key(&id)
} }
@@ -237,6 +283,20 @@ impl<T, P: Peer> ControlStreams for StreamStore<T, P> {
self.remote_active.get_mut(&id).and_then(|s| s.remote_flow_controller()) self.remote_active.get_mut(&id).and_then(|s| s.remote_flow_controller())
} }
} }
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
if let Some(s) = self.get_active(id) {
return s.check_can_send_data();
}
Err(ProtocolError.into())
}
fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {
if let Some(s) = self.get_active(id) {
return s.check_can_recv_data();
}
Err(ProtocolError.into())
}
} }
impl<T: ApplySettings, P> ApplySettings for StreamStore<T, P> { impl<T: ApplySettings, P> ApplySettings for StreamStore<T, P> {