diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index d05bed7..a9e2dae 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -173,6 +173,10 @@ impl FlowControl } } +/// Tracks window updates received from the remote and ensures that the remote does not +/// violate the local peer's flow controller. +/// +/// TODO send flow control reset when the peer violates the flow control window. impl Stream for FlowControl where T: Stream, T: ControlStreams, @@ -201,6 +205,7 @@ impl Stream for FlowControl // controller. That's fine. if let Some(fc) = self.recv_flow_controller(v.stream_id()) { if fc.claim_window(sz).is_err() { + // TODO send flow control reset. return Err(error::Reason::FlowControlError.into()) } } @@ -213,6 +218,12 @@ impl Stream for FlowControl } } +/// Tracks the send flow control windows for sent frames. +/// +/// If sending a frame would violate the remote's window, start_send fails with +/// `FlowControlViolation`. +/// +/// Sends pending window updates before operating on the underlying transport. impl Sink for FlowControl where T: Sink, SinkError = ConnectionError>, T: ReadySink, @@ -270,6 +281,7 @@ impl Sink for FlowControl } } +/// Sends pending window updates before checking the underyling transport's readiness. impl ReadySink for FlowControl where T: Sink, SinkError = ConnectionError>, T: ReadySink, @@ -331,6 +343,7 @@ impl ApplySettings for FlowControl } } +/// Proxy. impl ControlStreams for FlowControl { fn local_valid_id(id: StreamId) -> bool { T::local_valid_id(id) @@ -375,6 +388,7 @@ impl ControlStreams for FlowControl { fn get_reset(&self, id: StreamId) -> Option { self.inner.get_reset(id) } + fn is_local_active(&self, id: StreamId) -> bool { self.inner.is_local_active(id) } @@ -391,11 +405,11 @@ impl ControlStreams for FlowControl { self.inner.remote_active_len() } - fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { self.inner.update_inital_recv_window_size(old_sz, new_sz) } - fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { self.inner.update_inital_send_window_size(old_sz, new_sz) } @@ -407,15 +421,16 @@ impl ControlStreams for FlowControl { self.inner.send_flow_controller(id) } - fn can_send_data(&mut self, id: StreamId) -> bool { - self.inner.can_send_data(id) + fn is_send_open(&mut self, id: StreamId) -> bool { + self.inner.is_send_open(id) } - fn can_recv_data(&mut self, id: StreamId) -> bool { - self.inner.can_recv_data(id) + fn is_recv_open(&mut self, id: StreamId) -> bool { + self.inner.is_recv_open(id) } } +/// Proxy. impl ControlPing for FlowControl { fn start_ping(&mut self, body: PingPayload) -> StartSend { self.inner.start_ping(body) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 795ec2c..0c18332 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -53,7 +53,28 @@ use self::stream_store::{ControlStreams, StreamStore}; /// - Exposes ControlSettings up towards the application and transmits local settings to /// the remote. /// -/// ### `FlowControl` +/// ### The stream transport +/// +/// The states of all HTTP/2 connections are stored centrally in the `StreamStore` at the +/// bottom of the stream transport. Several modules above this access this state via the +/// `ControlStreams` API to drive changes to the stream state. In each direction (send +/// from local to remote, and recv from remote to local), there is an Stream\*Open module +/// responsible for initializing new streams and ensuring that frames do not violate +/// stream state. Then, there are modules that operate on streams (for instance, +/// FlowControl). Finally, a Stream\*Close module is responsible for acting on END_STREAM +/// frames to ensure that stream states are not closed before work is complete. +/// +/// #### `StreamSendOpen` +/// +/// - Initializes streams initiated by the local peer. +/// - Ensures that frames sent from the local peer are appropriate for the stream's state. +/// - Ensures that the remote's max stream concurrency is not violated. +/// +/// #### `StreamRecvClose` +/// +/// - Updates the stream state for frames sent with END_STREAM. +/// +/// #### `FlowControl` /// /// - Tracks received data frames against the local stream and connection flow control /// windows. @@ -66,10 +87,22 @@ use self::stream_store::{ControlStreams, StreamStore}; /// - Sends window updates for the local stream and connection flow control windows as /// instructed by upper layers. /// -/// ### `StreamTracker` +/// #### `StreamSendClose` /// -/// - Tracks all active streams. -/// - Tracks all reset streams. +/// - Updates the stream state for frames receive` with END_STREAM. +/// +/// #### `StreamRecvOpen` +/// +/// - Initializes streams initiated by the remote peer. +/// - Ensures that frames received from the remote peer are appropriate for the stream's +/// state. +/// - Ensures that the local peer's max stream concurrency is not violated. +/// - Emits StreamRefused resets to the remote. +/// +/// #### `StreamStore` +/// +/// - Holds the state of all local & remote active streams. +/// - Holds the cause of all reset/closed streams. /// - Exposes `ControlStreams` so that upper layers may share stream state. /// /// ### `PingPong` diff --git a/src/proto/state.rs b/src/proto/state.rs index a129b46..79ab530 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -155,7 +155,7 @@ impl StreamState { Ok(true) } - pub fn can_send_data(&self) -> bool { + pub fn is_send_open(&self) -> bool { use self::StreamState::*; match self { &Idle | &Closed | &HalfClosedRemote(..) => false, @@ -165,7 +165,7 @@ impl StreamState { } } - pub fn can_recv_data(&self) -> bool { + pub fn is_recv_open(&self) -> bool { use self::StreamState::*; match self { &Idle | &Closed | &HalfClosedLocal(..) => false, diff --git a/src/proto/stream_recv_close.rs b/src/proto/stream_recv_close.rs index e46a912..95baae6 100644 --- a/src/proto/stream_recv_close.rs +++ b/src/proto/stream_recv_close.rs @@ -4,12 +4,7 @@ use frame::{self, Frame}; use proto::*; use proto::ready::ReadySink; -// TODO track "last stream id" for GOAWAY. -// TODO track/provide "next" stream id. -// TODO reset_streams needs to be bounded. -// TODO track reserved streams (PUSH_PROMISE). - -/// Handles end-of-stream frames sent from the remote. +/// Tracks END_STREAM frames received from the remote peer. #[derive(Debug)] pub struct StreamRecvClose { inner: T, @@ -25,6 +20,7 @@ impl StreamRecvClose } } +/// Tracks END_STREAM frames received from the remote peer. impl Stream for StreamRecvClose where T: Stream, T: ControlStreams, @@ -55,6 +51,7 @@ impl Stream for StreamRecvClose } } +// Proxy. impl Sink for StreamRecvClose where T: Sink, SinkError = ConnectionError>, T: ControlStreams, @@ -71,6 +68,7 @@ impl Sink for StreamRecvClose } } +// Proxy. impl ReadySink for StreamRecvClose where T: Sink, SinkError = ConnectionError>, T: ReadySink, @@ -81,6 +79,7 @@ impl ReadySink for StreamRecvClose } } +// Proxy. impl ControlStreams for StreamRecvClose { fn local_valid_id(id: StreamId) -> bool { T::local_valid_id(id) @@ -142,11 +141,11 @@ impl ControlStreams for StreamRecvClose { self.inner.remote_active_len() } - fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { self.inner.update_inital_recv_window_size(old_sz, new_sz) } - fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { self.inner.update_inital_send_window_size(old_sz, new_sz) } @@ -158,15 +157,16 @@ impl ControlStreams for StreamRecvClose { self.inner.send_flow_controller(id) } - fn can_send_data(&mut self, id: StreamId) -> bool { - self.inner.can_send_data(id) + fn is_send_open(&mut self, id: StreamId) -> bool { + self.inner.is_send_open(id) } - fn can_recv_data(&mut self, id: StreamId) -> bool { - self.inner.can_recv_data(id) + fn is_recv_open(&mut self, id: StreamId) -> bool { + self.inner.is_recv_open(id) } } +// Proxy. impl ApplySettings for StreamRecvClose { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.apply_local_settings(set) @@ -177,6 +177,7 @@ impl ApplySettings for StreamRecvClose { } } +// Proxy. impl ControlFlow for StreamRecvClose { fn poll_window_update(&mut self) -> Poll { self.inner.poll_window_update() @@ -187,6 +188,7 @@ impl ControlFlow for StreamRecvClose { } } +// Proxy. impl ControlPing for StreamRecvClose { fn start_ping(&mut self, body: PingPayload) -> StartSend { self.inner.start_ping(body) diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs index f9b9eca..15c7282 100644 --- a/src/proto/stream_recv_open.rs +++ b/src/proto/stream_recv_open.rs @@ -3,7 +3,7 @@ use error::Reason::{ProtocolError, RefusedStream}; use frame::{Frame, StreamId}; use proto::*; -/// Tracks a connection's streams. +/// Ensures that frames are received on open streams in the appropriate state. #[derive(Debug)] pub struct StreamRecvOpen { inner: T, @@ -60,25 +60,7 @@ impl StreamRecvOpen } } -/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. -/// -/// > Indicates the maximum number of concurrent streams that the senderg will allow. This -/// > limit is directional: it applies to the number of streams that the sender permits -/// > the receiver to create. Initially, there is no limit to this value. It is -/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit -/// > parallelism. -/// > -/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by -/// > endpoints. A zero value does prevent the creation of new streams; however, this can -/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only -/// > set a zero value for short durations; if a server does not wish to accept requests, -/// > closing the connection is more appropriate. -/// -/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a -/// > value that is below the current number of open streams can either close streams that -/// > exceed the new value or allow streams to complete. -/// -/// This module does NOT close streams when the setting changes. +/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS` from the local peer. impl ApplySettings for StreamRecvOpen where T: ApplySettings { @@ -93,6 +75,7 @@ impl ApplySettings for StreamRecvOpen } } +/// Helper. impl StreamRecvOpen { fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> { // Ensure that the stream hasn't been closed otherwise. @@ -103,6 +86,7 @@ impl StreamRecvOpen { } } +/// Ensures that frames are received on open streams in the appropriate state. impl Stream for StreamRecvOpen where T: Stream, T: Sink, SinkError = ConnectionError>, @@ -112,8 +96,6 @@ impl Stream for StreamRecvOpen type Error = T::Error; fn poll(&mut self) -> Poll, T::Error> { - use frame::Frame::*; - // Since there's only one slot for pending refused streams, it must be cleared // before polling a frame from the transport. try_ready!(self.send_pending_refuse()); @@ -142,6 +124,7 @@ impl Stream for StreamRecvOpen &Frame::Headers(..) => { self.check_not_reset(id)?; + if T::remote_valid_id(id) { if self.inner.is_remote_active(id) { // Can't send a a HEADERS frame on a remote stream that's @@ -174,7 +157,7 @@ impl Stream for StreamRecvOpen // All other stream frames are sent only when _ => { self.check_not_reset(id)?; - if !self.inner.can_recv_data(id) { + if !self.inner.is_recv_open(id) { return Err(ProtocolError.into()); } } @@ -187,7 +170,7 @@ impl Stream for StreamRecvOpen } } -/// Ensures that a pending reset is +/// Sends pending resets before operating on the underlying transport. impl Sink for StreamRecvOpen where T: Sink, SinkError = ConnectionError>, T: ControlStreams, @@ -211,7 +194,7 @@ impl Sink for StreamRecvOpen } } - +/// Sends pending resets before checking the underlying transport's readiness. impl ReadySink for StreamRecvOpen where T: Stream, T: Sink, SinkError = ConnectionError>, @@ -227,110 +210,7 @@ impl ReadySink for StreamRecvOpen } } - - // Some(Headers(v)) => { - // let id = v.stream_id(); - // let eos = v.is_end_stream(); - - // if self.get_reset(id).is_some() { - // // TODO send the remote errors when it sends us frames on reset - // // streams. - // continue; - // } - - // if let Some(mut s) = self.get_active_mut(id) { - // let created = s.recv_headers(eos, self.initial_window_size)?; - // assert!(!created); - // return Ok(Async::Ready(Some(Headers(v)))); - // } - - // // Ensure that receiving this frame will not violate the local max - // // stream concurrency setting. Ensure that the stream is refused - // // before processing additional frames. - // if let Some(max) = self.max_concurrency { - // let max = max as usize; - // if !self.local.is_active(id) && self.local.active_count() >= max - 1 { - // // This frame would violate our local max concurrency, so reject - // // the stream. - // try_ready!(self.send_refusal(id)); - - // // Try to process another frame (hopefully for an active - // // stream). - // continue; - // } - // } - - // let is_closed = { - // let stream = self.active_streams.entry(id) - // .or_insert_with(|| StreamState::default()); - - // let initialized = - // stream.recv_headers(eos, self.initial_window_size)?; - - // if initialized { - // if !P::is_valid_remote_stream_id(id) { - // return Err(Reason::ProtocolError.into()); - // } - // } - - // stream.is_closed() - // }; - - // if is_closed { - // self.active_streams.remove(id); - // self.reset_streams.insert(id, Reason::NoError); - // } - - // return Ok(Async::Ready(Some(Headers(v)))); - // } - - // Some(Data(v)) => { - // let id = v.stream_id(); - - // if self.get_reset(id).is_some() { - // // TODO send the remote errors when it sends us frames on reset - // // streams. - // continue; - // } - - // let is_closed = { - // let stream = match self.active_streams.get_mut(id) { - // None => return Err(Reason::ProtocolError.into()), - // Some(s) => s, - // }; - // stream.recv_data(v.is_end_stream())?; - // stream.is_closed() - // }; - - // if is_closed { - // self.reset(id, Reason::NoError); - // } - - // return Ok(Async::Ready(Some(Data(v)))); - // } - - // Some(Reset(v)) => { - // // Set or update the reset reason. - // self.reset(v.stream_id(), v.reason()); - // return Ok(Async::Ready(Some(Reset(v)))); - // } - - // Some(f) => { - // let id = f.stream_id(); - - // if self.get_reset(id).is_some() { - // // TODO send the remote errors when it sends us frames on reset - // // streams. - // continue; - // } - - // return Ok(Async::Ready(Some(f))); - // } - - // None => { - // return Ok(Async::Ready(None)); - // } - +/// Proxy. impl ControlStreams for StreamRecvOpen { fn local_valid_id(id: StreamId) -> bool { T::local_valid_id(id) @@ -392,11 +272,11 @@ impl ControlStreams for StreamRecvOpen { self.inner.remote_active_len() } - fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { self.inner.update_inital_recv_window_size(old_sz, new_sz) } - fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { self.inner.update_inital_send_window_size(old_sz, new_sz) } @@ -408,15 +288,16 @@ impl ControlStreams for StreamRecvOpen { self.inner.send_flow_controller(id) } - fn can_send_data(&mut self, id: StreamId) -> bool { - self.inner.can_send_data(id) + fn is_send_open(&mut self, id: StreamId) -> bool { + self.inner.is_send_open(id) } - fn can_recv_data(&mut self, id: StreamId) -> bool { - self.inner.can_recv_data(id) + fn is_recv_open(&mut self, id: StreamId) -> bool { + self.inner.is_recv_open(id) } } +/// Proxy. impl ControlPing for StreamRecvOpen { fn start_ping(&mut self, body: PingPayload) -> StartSend { self.inner.start_ping(body) diff --git a/src/proto/stream_send_close.rs b/src/proto/stream_send_close.rs index 195cb98..cf00e7f 100644 --- a/src/proto/stream_send_close.rs +++ b/src/proto/stream_send_close.rs @@ -3,11 +3,7 @@ use error::Reason; use frame::{self, Frame}; use proto::*; -// TODO track "last stream id" for GOAWAY. -// TODO track/provide "next" stream id. -// TODO reset_streams needs to be bounded. -// TODO track reserved streams (PUSH_PROMISE). - +/// Tracks END_STREAM frames sent from the local peer. #[derive(Debug)] pub struct StreamSendClose { inner: T, @@ -23,6 +19,7 @@ impl StreamSendClose } } +/// Proxy. impl Stream for StreamSendClose where T: Stream, T: ControlStreams, @@ -35,6 +32,7 @@ impl Stream for StreamSendClose } } +/// Tracks END_STREAM frames sent from the local peer. impl Sink for StreamSendClose where T: Sink, SinkError = ConnectionError>, T: ControlStreams, @@ -65,6 +63,7 @@ impl Sink for StreamSendClose } } +/// Proxy. impl ReadySink for StreamSendClose where T: Sink, SinkError = ConnectionError>, T: ReadySink, @@ -75,6 +74,7 @@ impl ReadySink for StreamSendClose } } +/// Proxy. impl ApplySettings for StreamSendClose { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.apply_local_settings(set) @@ -85,6 +85,7 @@ impl ApplySettings for StreamSendClose { } } +/// Proxy. impl ControlStreams for StreamSendClose { fn local_valid_id(id: StreamId) -> bool { T::local_valid_id(id) @@ -146,11 +147,11 @@ impl ControlStreams for StreamSendClose { self.inner.remote_active_len() } - fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { self.inner.update_inital_recv_window_size(old_sz, new_sz) } - fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { self.inner.update_inital_send_window_size(old_sz, new_sz) } @@ -162,15 +163,16 @@ impl ControlStreams for StreamSendClose { self.inner.send_flow_controller(id) } - fn can_send_data(&mut self, id: StreamId) -> bool { - self.inner.can_send_data(id) + fn is_send_open(&mut self, id: StreamId) -> bool { + self.inner.is_send_open(id) } - fn can_recv_data(&mut self, id: StreamId) -> bool { - self.inner.can_recv_data(id) + fn is_recv_open(&mut self, id: StreamId) -> bool { + self.inner.is_recv_open(id) } } +/// Proxy. impl ControlPing for StreamSendClose { fn start_ping(&mut self, body: PingPayload) -> StartSend { self.inner.start_ping(body) diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs index c39ed73..b6701ac 100644 --- a/src/proto/stream_send_open.rs +++ b/src/proto/stream_send_open.rs @@ -3,7 +3,7 @@ use error::User::{InactiveStreamId, InvalidStreamId, StreamReset, Rejected, Unex use frame::{Frame, SettingSet}; use proto::*; -/// +/// Ensures that frames are sent on open streams in the appropriate state. #[derive(Debug)] pub struct StreamSendOpen { inner: T, @@ -30,25 +30,7 @@ impl StreamSendOpen } } -/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. -/// -/// > Indicates the maximum number of concurrent streams that the senderg will allow. This -/// > limit is directional: it applies to the number of streams that the sender permits -/// > the receiver to create. Initially, there is no limit to this value. It is -/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit -/// > parallelism. -/// > -/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by -/// > endpoints. A zero value does prevent the creation of new streams; however, this can -/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only -/// > set a zero value for short durations; if a server does not wish to accept requests, -/// > closing the connection is more appropriate. -/// -/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a -/// > value that is below the current number of open streams can either close streams that -/// > exceed the new value or allow streams to complete. -/// -/// This module does NOT close streams when the setting changes. +/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS` from the remote peer. impl ApplySettings for StreamSendOpen { fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { self.inner.apply_local_settings(set) @@ -61,6 +43,7 @@ impl ApplySettings for StreamSendOpen { } } +/// Proxy. impl Stream for StreamSendOpen where T: Stream, T: ControlStreams, @@ -74,6 +57,7 @@ impl Stream for StreamSendOpen } } +/// Helper. impl StreamSendOpen { fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> { // Ensure that the stream hasn't been closed otherwise. @@ -84,6 +68,7 @@ impl StreamSendOpen { } } +/// Ensures that frames are sent on open streams in the appropriate state. impl Sink for StreamSendOpen where T: Sink, SinkError = ConnectionError>, T: ControlStreams, @@ -143,7 +128,7 @@ impl Sink for StreamSendOpen // the stream is open (i.e. has already sent headers). _ => { self.check_not_reset(id)?; - if !self.inner.can_send_data(id) { + if !self.inner.is_send_open(id) { return Err(InactiveStreamId.into()); } } @@ -157,6 +142,7 @@ impl Sink for StreamSendOpen } } +/// Proxy. impl ReadySink for StreamSendOpen where T: Stream, T: Sink, SinkError = ConnectionError>, @@ -168,6 +154,7 @@ impl ReadySink for StreamSendOpen } } +/// Proxy. impl ControlStreams for StreamSendOpen { fn local_valid_id(id: StreamId) -> bool { T::local_valid_id(id) @@ -229,11 +216,11 @@ impl ControlStreams for StreamSendOpen { self.inner.remote_active_len() } - fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { self.inner.update_inital_recv_window_size(old_sz, new_sz) } - fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { self.inner.update_inital_send_window_size(old_sz, new_sz) } @@ -245,15 +232,16 @@ impl ControlStreams for StreamSendOpen { self.inner.send_flow_controller(id) } - fn can_send_data(&mut self, id: StreamId) -> bool { - self.inner.can_send_data(id) + fn is_send_open(&mut self, id: StreamId) -> bool { + self.inner.is_send_open(id) } - fn can_recv_data(&mut self, id: StreamId) -> bool { - self.inner.can_recv_data(id) + fn is_recv_open(&mut self, id: StreamId) -> bool { + self.inner.is_recv_open(id) } } +/// Proxy. impl ControlFlow for StreamSendOpen { fn poll_window_update(&mut self) -> Poll { self.inner.poll_window_update() @@ -264,6 +252,7 @@ impl ControlFlow for StreamSendOpen { } } +/// Proxy. impl ControlPing for StreamSendOpen { fn start_ping(&mut self, body: PingPayload) -> StartSend { self.inner.start_ping(body) diff --git a/src/proto/stream_store.rs b/src/proto/stream_store.rs index 562421d..fc93d6e 100644 --- a/src/proto/stream_store.rs +++ b/src/proto/stream_store.rs @@ -11,7 +11,12 @@ use std::marker::PhantomData; /// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up /// to Connection). pub trait ControlStreams { + /// Determines whether the given stream could theoretically be opened by the local + /// side of this connection. fn local_valid_id(id: StreamId) -> bool; + + /// Determines whether the given stream could theoretically be opened by the remote + /// side of this connection. fn remote_valid_id(id: StreamId) -> bool; /// Indicates whether this local endpoint may open streams (with HEADERS). @@ -26,7 +31,7 @@ pub trait ControlStreams { !Self::local_can_open() } - /// Create a new stream in the OPEN state from the local side (i.e. as a Client). + /// Creates a new stream in the OPEN state from the local side (i.e. as a Client). /// /// Must only be called when local_can_open returns true. fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; @@ -46,19 +51,37 @@ pub trait ControlStreams { /// Typically called when a server sends a response header. fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + // TODO push promise // fn local_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; // fn remote_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; - /// Close the local half of a stream so that the local side may not RECEIVE + /// Closes the send half of a stream. + /// + /// Fails with a ProtocolError if send half of the stream was not open. fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; + + /// Closes the recv half of a stream. + /// + /// Fails with a ProtocolError if recv half of the stream was not open. fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; + /// Resets the given stream. + /// + /// If the stream was already reset, the stored cause is updated. fn reset_stream(&mut self, id: StreamId, cause: Reason); + + /// Get the reason the stream was reset, if it was reset. fn get_reset(&self, id: StreamId) -> Option; + /// Returns true if the given stream was opened by the local peer and is not yet + /// closed. fn is_local_active(&self, id: StreamId) -> bool; + + /// Returns true if the given stream was opened by the remote peer and is not yet + /// closed. fn is_remote_active(&self, id: StreamId) -> bool; + /// Returns true if the given stream was opened and is not yet closed. fn is_active(&self, id: StreamId) -> bool { if Self::local_valid_id(id) { self.is_local_active(id) @@ -67,17 +90,31 @@ pub trait ControlStreams { } } + /// Returns the number of open streams initiated by the local peer. fn local_active_len(&self) -> usize; + + /// Returns the number of open streams initiated by the remote peer. fn remote_active_len(&self) -> usize; + /// Returns true iff the recv half of the given stream is open. + fn is_recv_open(&mut self, id: StreamId) -> bool; + + /// Returns true iff the send half of the given stream is open. + fn is_send_open(&mut self, id: StreamId) -> bool; + + /// If the given stream ID is active and able to recv data, get its mutable recv flow + /// control state. fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; + + /// If the given stream ID is active and able to send data, get its mutable send flow + /// control state. fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; - fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32); - fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32); + /// Updates the initial window size for the local peer. + fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize); - fn can_send_data(&mut self, id: StreamId) -> bool; - fn can_recv_data(&mut self, id: StreamId) -> bool; + /// Updates the initial window size for the remote peer. + fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize); } /// Holds the underlying stream state to be accessed by upper layers. @@ -115,41 +152,6 @@ impl StreamStore } } -impl Stream for StreamStore - where T: Stream, -{ - type Item = Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - self.inner.poll() - } -} - -impl Sink for StreamStore - where T: Sink, SinkError = ConnectionError>, -{ - type SinkItem = Frame; - type SinkError = ConnectionError; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend, ConnectionError> { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_complete() - } -} - -impl ReadySink for StreamStore - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, -{ - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_ready() - } -} - impl StreamStore { pub fn get_active(&mut self, id: StreamId) -> Option<&StreamState> { assert!(!id.is_zero()); @@ -192,21 +194,25 @@ impl ControlStreams for StreamStore { P::local_can_open() } - /// Open a new stream from the local side (i.e. as a Client). fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - assert!(Self::local_valid_id(id)); - debug_assert!(Self::local_can_open()); - debug_assert!(!self.local_active.contains_key(&id)); + if !Self::local_valid_id(id) || !Self::local_can_open() { + return Err(ProtocolError.into()); + } + if self.local_active.contains_key(&id) { + return Err(ProtocolError.into()); + } self.local_active.insert(id, StreamState::new_open_sending(sz)); Ok(()) } - /// Open a new stream from the remote side (i.e. as a Server). fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - assert!(Self::remote_valid_id(id)); - debug_assert!(Self::remote_can_open()); - debug_assert!(!self.remote_active.contains_key(&id)); + if !Self::remote_valid_id(id) || !Self::remote_can_open() { + return Err(ProtocolError.into()); + } + if self.remote_active.contains_key(&id) { + return Err(ProtocolError.into()); + } self.remote_active.insert(id, StreamState::new_open_recving(sz)); Ok(()) @@ -275,6 +281,20 @@ impl ControlStreams for StreamStore { self.remote_active.contains_key(&id) } + fn is_send_open(&mut self, id: StreamId) -> bool { + match self.get_active(id) { + Some(s) => s.is_send_open(), + None => false, + } + } + + fn is_recv_open(&mut self, id: StreamId) -> bool { + match self.get_active(id) { + Some(s) => s.is_recv_open(), + None => false, + } + } + fn local_active_len(&self) -> usize { self.local_active.len() } @@ -283,7 +303,7 @@ impl ControlStreams for StreamStore { self.remote_active.len() } - fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { if new_sz < old_sz { let decr = old_sz - new_sz; @@ -315,7 +335,7 @@ impl ControlStreams for StreamStore { } } - fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { if new_sz < old_sz { let decr = old_sz - new_sz; @@ -366,22 +386,47 @@ impl ControlStreams for StreamStore { self.remote_active.get_mut(&id).and_then(|s| s.send_flow_controller()) } } +} - fn can_send_data(&mut self, id: StreamId) -> bool { - match self.get_active(id) { - Some(s) => s.can_send_data(), - None => false, - } - } +/// Proxy. +impl Stream for StreamStore + where T: Stream, +{ + type Item = Frame; + type Error = ConnectionError; - fn can_recv_data(&mut self, id: StreamId) -> bool { - match self.get_active(id) { - Some(s) => s.can_recv_data(), - None => false, - } + fn poll(&mut self) -> Poll, ConnectionError> { + self.inner.poll() } } +/// Proxy. +impl Sink for StreamStore + where T: Sink, SinkError = ConnectionError>, +{ + type SinkItem = Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend, ConnectionError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_complete() + } +} + +/// Proxy. +impl ReadySink for StreamStore + where T: Sink, SinkError = ConnectionError>, + T: ReadySink, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } +} + +/// Proxy. impl ApplySettings for StreamStore { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.apply_local_settings(set) @@ -392,6 +437,7 @@ impl ApplySettings for StreamStore { } } +/// Proxy. impl ControlPing for StreamStore { fn start_ping(&mut self, body: PingPayload) -> StartSend { self.inner.start_ping(body) diff --git a/tests/client_request.rs b/tests/client_request.rs index bbfe716..eb60619 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -241,7 +241,7 @@ mod client_request { request.uri = "https://http2.akamai.com/".parse().unwrap(); let err = h2.send_request(0.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, UnexpectedFrameType); + assert_user_err!(err, InvalidStreamId); } #[test]