diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 58a85b3..5047867 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -80,6 +80,21 @@ impl Frame { &Settings(_) => StreamId::zero(), } } + + pub fn is_end_stream(&self) -> bool { + use self::Frame::*; + + match self { + &Headers(ref v) => v.is_end_stream(), + &Data(ref v) => v.is_end_stream(), + &Reset(ref v) => true, + + &PushPromise(_) | + &WindowUpdate(_) | + &Ping(_) | + &Settings(_) => false, + } + } } /// Errors that can occur during parsing an HTTP/2 frame. diff --git a/src/frame/reset.rs b/src/frame/reset.rs index 925dd56..b513250 100644 --- a/src/frame/reset.rs +++ b/src/frame/reset.rs @@ -16,6 +16,15 @@ impl Reset { error_code: error.into(), } } + + pub fn stream_id(&self) -> StreamId { + self.stream_id + } + + pub fn reason(&self) -> Reason { + self.error_code.into() + } + pub fn load(head: Head, payload: &[u8]) -> Result { if payload.len() != 4 { return Err(Error::InvalidPayloadLength); @@ -35,14 +44,6 @@ impl Reset { head.encode(4, dst); dst.put_u32::(self.error_code); } - - pub fn stream_id(&self) -> StreamId { - self.stream_id - } - - pub fn reason(&self) -> Reason { - self.error_code.into() - } } impl From for frame::Frame { diff --git a/src/lib.rs b/src/lib.rs index c0e9b0d..557f1d9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,7 +85,6 @@ pub trait Peer { /// remote node. fn is_valid_remote_stream_id(id: StreamId) -> bool; - fn can_create_local_stream() -> bool; fn can_create_remote_stream() -> bool { !Self::can_create_local_stream() diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 0e99f53..7098943 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -3,6 +3,7 @@ use client::Client; use error; use frame::{self, SettingSet, StreamId}; use proto::*; +use proto::ping_pong::PingPayload; use server::Server; use bytes::{Bytes, IntoBuf}; @@ -32,37 +33,23 @@ pub fn new(transport: Transport) } } +// impl ControlSettings for Connection +// where T: AsyncRead + AsyncWrite, +// B: IntoBuf, +// { +// fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { +// self.inner.update_local_settings(local) +// } -impl ControlSettings for Connection - where T: AsyncRead + AsyncWrite, - B: IntoBuf, -{ - fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.update_local_settings(local) - } +// fn local_settings(&self) -> &SettingSet { +// self.inner.local_settings() +// } - fn local_settings(&self) -> &SettingSet { - self.inner.local_settings() - } +// fn remote_settings(&self) -> &SettingSet { +// self.inner.remote_settings() +// } +// } - fn remote_settings(&self) -> &SettingSet { - self.inner.remote_settings() - } -} - -impl ControlPing for Connection - where T: AsyncRead + AsyncWrite, - P: Peer, - B: IntoBuf, -{ - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} impl Connection where T: AsyncRead + AsyncWrite, @@ -146,21 +133,7 @@ impl Stream for Connection } loop { - let frame = match try!(self.inner.poll()) { - Async::Ready(f) => f, - - // XXX is this necessary? - Async::NotReady => { - // Receiving new frames may depend on ensuring that the write buffer - // is clear (e.g. if window updates need to be sent), so `poll_complete` - // is called here. - try_ready!(self.poll_complete()); - - // If the write buffer is cleared, attempt to poll the underlying - // stream once more because it, may have been made ready. - try_ready!(self.inner.poll()) - } - }; + let frame = try_ready!(self.inner.poll()); trace!("poll; frame={:?}", frame); let frame = match frame { @@ -214,34 +187,20 @@ impl Sink for Connection return Ok(AsyncSink::NotReady(item)); } - match item { + let frame = match item { Frame::Headers { id, headers, end_of_stream } => { // This is a one-way conversion. By checking `poll_ready` first (above), // it's already been determined that the inner `Sink` can accept the item. // If the item is rejected, then there is a bug. - let frame = P::convert_send_message(id, headers, end_of_stream); - let res = self.inner.start_send(frame::Frame::Headers(frame))?; - assert!(res.is_ready()); - Ok(AsyncSink::Ready) + let f = P::convert_send_message(id, headers, end_of_stream); + frame::Frame::Headers(f) } Frame::Data { id, data, end_of_stream } => { - if self.inner.stream_is_reset(id).is_some() { - return Err(error::User::StreamReset.into()); - } - - let frame = frame::Data::from_buf(id, data.into_buf(), end_of_stream); - let res = try!(self.inner.start_send(frame.into())); - assert!(res.is_ready()); - Ok(AsyncSink::Ready) + frame::Data::from_buf(id, data.into_buf(), end_of_stream).into() } - Frame::Reset { id, error } => { - let f = frame::Reset::new(id, error); - let res = self.inner.start_send(f.into())?; - assert!(res.is_ready()); - Ok(AsyncSink::Ready) - } + Frame::Reset { id, error } => frame::Reset::new(id, error).into(), /* Frame::Trailers { id, headers } => { @@ -255,7 +214,11 @@ impl Sink for Connection } */ _ => unimplemented!(), - } + }; + + let res = self.inner.start_send(frame)?; + assert!(res.is_ready()); + Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 16eab85..ff21847 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -4,6 +4,20 @@ use proto::*; use std::collections::VecDeque; +/// Exposes flow control states to "upper" layers of the transport (i.e. above +/// FlowControl). +pub trait ControlFlow { + /// Polls for the next window update from the remote. + fn poll_window_update(&mut self) -> Poll; + + /// Increases the local receive capacity of a stream. + /// + /// This may cause a window update to be sent to the remote. + /// + /// Fails if the given stream is not active. + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; +} + #[derive(Debug)] pub struct FlowControl { inner: T, @@ -80,30 +94,6 @@ impl FlowControl { } } } - -/// Proxies access to streams. -impl ControlStreams for FlowControl { - fn local_streams(&self) -> &StreamMap { - self.inner.local_streams() - } - - fn local_streams_mut(&mut self) -> &mut StreamMap { - self.inner.local_streams_mut() - } - - fn remote_streams(&self) -> &StreamMap { - self.inner.local_streams() - } - - fn remote_streams_mut(&mut self) -> &mut StreamMap { - self.inner.local_streams_mut() - } - - fn is_valid_local_id(id: StreamId) -> bool { - T::is_valid_local_id(id) - } -} - /// Exposes a public upward API for flow control. impl ControlFlow for FlowControl { fn poll_window_update(&mut self) -> Poll { @@ -139,7 +129,7 @@ impl ControlFlow for FlowControl { self.local_pending_streams.push_back(id); } Ok(()) - } else if let Some(rst) = self.get_reset(id) { + } else if let Some(rst) = self.inner.get_reset(id) { Err(error::User::StreamReset(rst).into()) } else { Err(error::User::InvalidStreamId.into()) @@ -147,16 +137,6 @@ impl ControlFlow for FlowControl { } } -impl ControlPing for FlowControl { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} - impl FlowControl where T: Sink, SinkError = ConnectionError>, T: ControlStreams, @@ -172,7 +152,7 @@ impl FlowControl } while let Some(id) = self.local_pending_streams.pop_front() { - if self.stream_is_reset(id).is_none() { + if self.inner.get_reset(id).is_none() { let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update()); if let Some(incr) = update { try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); @@ -193,71 +173,6 @@ impl FlowControl } } -/// Applies an update to an endpoint's initial window size. -/// -/// Per RFC 7540 §6.9.2: -/// -/// > In addition to changing the flow-control window for streams that are not yet -/// > active, a SETTINGS frame can alter the initial flow-control window size for -/// > streams with active flow-control windows (that is, streams in the "open" or -/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE -/// > changes, a receiver MUST adjust the size of all stream flow-control windows that -/// > it maintains by the difference between the new value and the old value. -/// > -/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a -/// > flow-control window to become negative. A sender MUST track the negative -/// > flow-control window and MUST NOT send new flow-controlled frames until it -/// > receives WINDOW_UPDATE frames that cause the flow-control window to become -/// > positive. -impl ApplySettings for FlowControl - where T: ApplySettings, - T: ControlStreams -{ - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set)?; - - let old_window_size = self.local_initial; - let new_window_size = set.initial_window_size(); - if new_window_size == old_window_size { - return Ok(()); - } - - let mut streams = self.inner.streams_mut(); - if new_window_size < old_window_size { - let decr = old_window_size - new_window_size; - streams.shrink_all_local_windows(decr); - } else { - let incr = new_window_size - old_window_size; - streams.expand_all_local_windows(incr); - } - - self.local_initial = new_window_size; - Ok(()) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set)?; - - let old_window_size = self.remote_initial; - let new_window_size = set.initial_window_size(); - if new_window_size == old_window_size { - return Ok(()); - } - - let mut streams = self.inner.streams_mut(); - if new_window_size < old_window_size { - let decr = old_window_size - new_window_size; - streams.shrink_all_remote_windows(decr); - } else { - let incr = new_window_size - old_window_size; - streams.expand_all_remote_windows(incr); - } - - self.remote_initial = new_window_size; - Ok(()) - } -} - impl Stream for FlowControl where T: Stream, T: ControlStreams, @@ -310,7 +225,7 @@ impl Sink for FlowControl fn start_send(&mut self, frame: Frame) -> StartSend { use frame::Frame::*; - debug_assert!(self.stream_is_reset(frame.stream_id()).is_none()); + debug_assert!(self.inner.get_reset(frame.stream_id()).is_none()); // Ensures that: // 1. all pending local window updates have been sent to the remote. @@ -333,8 +248,7 @@ impl Sink for FlowControl // Ensure there's enough capacity on stream. { - let mut fc = self.streams_mut() - .remote_flow_controller(v.stream_id()) + let mut fc = self.inner.remote_flow_controller(v.stream_id()) .expect("no remote stream for data frame"); if fc.claim_window(sz).is_err() { return Err(error::User::FlowControlViolation.into()) @@ -367,3 +281,116 @@ impl ReadySink for FlowControl self.inner.poll_ready() } } + +/// Applies an update to an endpoint's initial window size. +/// +/// Per RFC 7540 §6.9.2: +/// +/// > In addition to changing the flow-control window for streams that are not yet +/// > active, a SETTINGS frame can alter the initial flow-control window size for +/// > streams with active flow-control windows (that is, streams in the "open" or +/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE +/// > changes, a receiver MUST adjust the size of all stream flow-control windows that +/// > it maintains by the difference between the new value and the old value. +/// > +/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a +/// > flow-control window to become negative. A sender MUST track the negative +/// > flow-control window and MUST NOT send new flow-controlled frames until it +/// > receives WINDOW_UPDATE frames that cause the flow-control window to become +/// > positive. +impl ApplySettings for FlowControl + where T: ApplySettings, + T: ControlStreams +{ + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set)?; + + let old_window_size = self.local_initial; + let new_window_size = set.initial_window_size(); + if new_window_size == old_window_size { + return Ok(()); + } + + self.inner.local_update_inital_window_size(old_window_size, new_window_size); + self.local_initial = new_window_size; + Ok(()) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set)?; + + let old_window_size = self.remote_initial; + let new_window_size = set.initial_window_size(); + if new_window_size == old_window_size { + return Ok(()); + } + + self.inner.remote_update_inital_window_size(old_window_size, new_window_size); + self.remote_initial = new_window_size; + Ok(()) + } +} + +impl ControlStreams for FlowControl { + fn is_valid_local_id(id: StreamId) -> bool { + T::is_valid_local_id(id) + } + + fn is_valid_remote_id(id: StreamId) -> bool { + T::is_valid_remote_id(id) + } + + fn can_create_local_stream() -> bool { + T::can_create_local_stream() + } + + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } + + fn reset_stream(&mut self, id: StreamId, cause: Reason) { + self.inner.reset_stream(id, cause) + } + + fn is_local_active(&self, id: StreamId) -> bool { + self.inner.is_local_active(id) + } + + fn is_remote_active(&self, id: StreamId) -> bool { + self.inner.is_remote_active(id) + } + + fn local_active_len(&self) -> usize { + self.inner.local_active_len() + } + + fn remote_active_len(&self) -> usize { + self.inner.remote_active_len() + } + + fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.local_update_inital_window_size(old_sz, new_sz) + } + + fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.remote_update_inital_window_size(old_sz, new_sz) + } + + fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.local_flow_controller(id) + } + + fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.remote_flow_controller(id) + } +} + +impl ControlPing for FlowControl { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn take_pong(&mut self) -> Option { + self.inner.take_pong() + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 9db6933..4563091 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,6 +1,6 @@ use {frame, ConnectionError, Peer, StreamId}; use error::Reason; -use frame::SettingSet; +use frame::{Frame, SettingSet}; use bytes::{Buf, IntoBuf}; use futures::*; @@ -16,21 +16,27 @@ mod ping_pong; mod ready; mod settings; mod state; -mod stream_recv; -mod stream_send; +mod stream_recv_close; +mod stream_recv_open; +mod stream_send_close; +mod stream_send_open; +mod stream_store; pub use self::connection::Connection; -pub use self::flow_control::FlowControl; -pub use self::flow_control_state::{FlowControlState, WindowUnderflow}; -pub use self::framed_read::FramedRead; -pub use self::framed_write::FramedWrite; -pub use self::ping_pong::PingPong; -pub use self::ready::ReadySink; -pub use self::settings::Settings; -pub use self::stream_recv::StreamRecv; -pub use self::stream_send::StreamSend; -use self::state::{StreamMap, StreamState}; +use self::flow_control::{ControlFlow, FlowControl}; +use self::flow_control_state::{FlowControlState, WindowUnderflow}; +use self::framed_read::FramedRead; +use self::framed_write::FramedWrite; +use self::ping_pong::{ControlPing, PingPayload, PingPong}; +use self::ready::ReadySink; +use self::settings::{ApplySettings, ControlSettings, Settings}; +use self::state::{StreamState, PeerState}; +use self::stream_recv_close::StreamRecvClose; +use self::stream_recv_open::StreamRecvOpen; +use self::stream_send_close::StreamSendClose; +use self::stream_send_open::StreamSendOpen; +use self::stream_store::{ControlStreams, StreamStore}; /// Represents the internals of an HTTP/2 connection. /// @@ -91,10 +97,12 @@ type Transport= P>>; type Streams = - StreamSend< - FlowControl< - StreamRecv>, - P>; + StreamSendOpen< + StreamRecvClose< + FlowControl< + StreamSendClose< + StreamRecvOpen< + StreamStore>>>>>; type Codec = FramedRead< @@ -102,21 +110,6 @@ type Codec = pub type WindowSize = u32; -/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and -/// above---Connection). -pub trait ControlSettings { - fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>; - fn local_settings(&self) -> &SettingSet; - fn remote_settings(&self) -> &SettingSet; -} - -/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to -/// FramedWrite). -pub trait ApplySettings { - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; -} - #[derive(Debug, Copy, Clone)] pub struct WindowUpdate { stream_id: StreamId, @@ -137,61 +130,6 @@ impl WindowUpdate { } } -/// Exposes flow control states to "upper" layers of the transport (i.e. above -/// FlowControl). -pub trait ControlFlow { - /// Polls for the next window update from the remote. - fn poll_window_update(&mut self) -> Poll; - - /// Increases the local receive capacity of a stream. - /// - /// This may cause a window update to be sent to the remote. - /// - /// Fails if the given stream is not active. - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; -} - -/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up -/// to Connection). -pub trait ControlStreams { - fn is_valid_local_id(id: StreamId) -> bool; - fn is_valid_remote_id(id: StreamId) -> bool { - !id.is_zero() && !Self::is_valid_local_id(id) - } - - fn get_active(&self, id: StreamId) -> Option<&StreamState> { - self.streams(id).get_active(id) - } - - fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { - self.streams_mut(id).get_active_mut(id) - } - - - fn get_reset(&self, id: StreamId) -> Option { - self.streams(id).get_reset(id) - } - - fn reset(&mut self, id: StreamId, cause: Reason) { - self.streams_mut(id).reset(id, cause); - } - - fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.streams_mut(id).local_flow_controller(id) - } - - fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.streams_mut(id).remote_flow_controller(id) - } -} - -pub type PingPayload = [u8; 8]; - -pub trait ControlPing { - fn start_ping(&mut self, body: PingPayload) -> StartSend; - fn take_pong(&mut self) -> Option; -} - /// Create a full H2 transport from an I/O handle. /// /// This is called as the final step of the client handshake future. @@ -249,17 +187,20 @@ pub fn from_server_handshaker(settings: Settings .num_skip(0) // Don't skip the header .new_read(io); - StreamSend::new( + StreamSendOpen::new( initial_remote_window_size, remote_max_concurrency, - FlowControl::new( - initial_local_window_size, - initial_remote_window_size, - StreamRecv::new( + StreamRecvClose::new( + FlowControl::new( initial_local_window_size, - local_max_concurrency, - PingPong::new( - FramedRead::new(framed))))) + initial_remote_window_size, + StreamSendClose::new( + StreamRecvOpen::new( + initial_local_window_size, + local_max_concurrency, + StreamStore::new( + PingPong::new( + FramedRead::new(framed)))))))) }); connection::new(transport) diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index 8adf216..1a21b36 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,9 +1,16 @@ -use ConnectionError; +use {ConnectionError, StreamId}; use frame::{Frame, Ping, SettingSet}; -use proto::{ApplySettings, ControlPing, PingPayload, ReadySink}; +use proto::{ApplySettings, ReadySink, ControlStreams, FlowControlState}; use futures::*; +pub type PingPayload = [u8; 8]; + +pub trait ControlPing { + fn start_ping(&mut self, body: PingPayload) -> StartSend; + fn take_pong(&mut self) -> Option; +} + /// Acknowledges ping requests from the remote. #[derive(Debug)] pub struct PingPong { @@ -29,16 +36,6 @@ impl PingPong } } -impl ApplySettings for PingPong { - fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set) - } -} - impl ControlPing for PingPong where T: Sink, SinkError = ConnectionError>, T: ReadySink, @@ -172,6 +169,16 @@ impl ReadySink for PingPong } } +impl ApplySettings for PingPong { + fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set) + } +} + #[cfg(test)] mod test { use super::*; diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 85f8c38..730134a 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -7,8 +7,21 @@ use bytes::BufMut; use std::io; +/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and +/// above---Connection). +pub trait ControlSettings { + fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>; + fn local_settings(&self) -> &SettingSet; + fn remote_settings(&self) -> &SettingSet; +} + +/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to +/// FramedWrite). +pub trait ApplySettings { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; +} -// TODO #[derive(Debug)] pub struct Settings { // Upstream transport @@ -95,48 +108,6 @@ impl Settings } } -impl ControlStreams for Settings { - fn local_streams(&self) -> &StreamMap { - self.inner.local_streams() - } - - fn local_streams_mut(&mut self) -> &mut StreamMap { - self.inner.local_streams_mut() - } - - fn remote_streams(&self) -> &StreamMap { - self.inner.local_streams() - } - - fn remote_streams_mut(&mut self) -> &mut StreamMap { - self.inner.local_streams_mut() - } - - fn is_valid_local_id(id: StreamId) -> bool { - T::is_valid_local_id(id) - } -} - -impl ControlFlow for Settings { - fn poll_window_update(&mut self) -> Poll { - self.inner.poll_window_update() - } - - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.expand_window(id, incr) - } -} - -impl ControlPing for Settings { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} - impl ControlSettings for Settings{ fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { self.local = local; @@ -244,3 +215,23 @@ impl AsyncRead for Settings { self.inner.prepare_uninitialized_buffer(buf) } } + +impl ControlFlow for Settings { + fn poll_window_update(&mut self) -> Poll { + self.inner.poll_window_update() + } + + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.expand_window(id, incr) + } +} + +impl ControlPing for Settings { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn take_pong(&mut self) -> Option { + self.inner.take_pong() + } +} diff --git a/src/proto/state.rs b/src/proto/state.rs index 03d6b7a..a28d675 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -291,104 +291,3 @@ impl PeerState { } } } - -// TODO track reserved streams -// TODO constrain the size of `reset` -#[derive(Debug, Default)] -pub struct StreamMap

{ - /// Holds active streams initiated by the local endpoint. - local_active: OrderMap>, - - /// Holds active streams initiated by the remote endpoint. - remote_active: OrderMap>, - - /// Holds active streams initiated by the remote. - reset: OrderMap>, - - _phantom: PhantomData

, -} - -impl StreamMap

{ - pub fn active(&mut self, id: StreamId) -> Option<&StreamState> { - assert!(!id.is_zero()); - if P::is_valid_local_stream_id(id) { - self.local_active.get(id) - } else { - self.remote_active.get(id) - } - } - - pub fn active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { - assert!(!id.is_zero()); - if P::is_valid_local_stream_id(id) { - self.local_active.get_mut(id) - } else { - self.remote_active.get_mut(id) - } - } - - pub fn local_active(&self, id: StreamId) -> Option<&StreamState> { - self.local_active.get(&id) - } - - pub fn local_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { - self.local_active.get_mut(&id) - } - - pub fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.get_active_mut(id).and_then(|s| s.local_flow_controller()) - } - - pub fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.get_active_mut(id).and_then(|s| s.remote_flow_controller()) - } - - pub fn localis_active(&mut self, id: StreamId) -> bool { - self.active.contains_key(&id) - } - - pub fn active_count(&self) -> usize { - self.active.len() - } - - pub fn reset(&mut self, id: StreamId, cause: Reason) { - self.reset.insert(id, cause); - self.active.remove(&id); - } - - pub fn get_reset(&mut self, id: StreamId) -> Option { - self.reset.get(&id).map(|r| *r) - } - - pub fn shrink_all_local_windows(&mut self, decr: u32) { - for (_, mut s) in &mut self.active { - if let Some(fc) = s.local_flow_controller() { - fc.shrink_window(decr); - } - } - } - - pub fn expand_all_local_windows(&mut self, incr: u32) { - for (_, mut s) in &mut self.active { - if let Some(fc) = s.local_flow_controller() { - fc.expand_window(incr); - } - } - } - - pub fn shrink_all_remote_windows(&mut self, decr: u32) { - for (_, mut s) in &mut self.active { - if let Some(fc) = s.remote_flow_controller() { - fc.shrink_window(decr); - } - } - } - - pub fn expand_all_remote_windows(&mut self, incr: u32) { - for (_, mut s) in &mut self.active { - if let Some(fc) = s.remote_flow_controller() { - fc.expand_window(incr); - } - } - } -} diff --git a/src/proto/stream_recv.rs b/src/proto/stream_recv.rs deleted file mode 100644 index 11bdf63..0000000 --- a/src/proto/stream_recv.rs +++ /dev/null @@ -1,407 +0,0 @@ -use ConnectionError; -use client::Client; -use error::Reason; -use error::User; -use frame::{self, Frame}; -use proto::*; -use server::Server; - -use fnv::FnvHasher; -use ordermap::OrderMap; -use std::hash::BuildHasherDefault; -use std::marker::PhantomData; - -// TODO track "last stream id" for GOAWAY. -// TODO track/provide "next" stream id. -// TODO reset_streams needs to be bounded. -// TODO track reserved streams (PUSH_PROMISE). - -/// Tracks a connection's streams. -#[derive(Debug)] -pub struct StreamRecv { - inner: T, - peer: PhantomData

, - - local: StreamMap, - local_max_concurrency: Option, - local_initial_window_size: WindowSize, - - remote: StreamMap, - remote_max_concurrency: Option, - remote_initial_window_size: WindowSize, - remote_pending_refuse: Option, -} - -impl StreamRecv - where T: Stream, - T: Sink, SinkError = ConnectionError>, - P: Peer -{ - pub fn new(initial_window_size: WindowSize, - max_concurrency: Option, - inner: T) - -> StreamRecv - { - StreamRecv { - inner, - peer: PhantomData, - - local: StreamMap::default(), - remote: StreamMap::default(), - max_concurrency, - initial_window_size, - remote_pending_refuse: None, - } - } - - pub fn try_open_remote(&mut self, frame: Frame) -> Result<(), ConnectionError> { - unimplemented!() - } - - pub fn try_close(&mut self, frame: Frame) -> Result<(), ConnectionError> { - unimplemented!() - } -} - -impl StreamRecv - where T: Sink, SinkError = ConnectionError>, - P: Peer -{ - fn send_refusal(&mut self, id: StreamId) -> Poll<(), ConnectionError> { - debug_assert!(self.remote_pending_refused.is_none()); - - let f = frame::Reset::new(id, Reason::RefusedStream); - match self.inner.start_send(f.into())? { - AsyncSink::Ready => { - self.reset(id, Reason::RefusedStream); - Ok(Async::Ready(())) - } - AsyncSink::NotReady(_) => { - self.pending_refused = Some(id); - Ok(Async::NotReady) - } - } - } -} - -impl ControlStreams for StreamRecv - where P: Peer -{ - fn local_streams(&self) -> &StreamMap { - &self.local - } - - fn local_streams_mut(&mut self) -> &mut StreamMap { - &mut self.local - } - - fn remote_streams(&self) -> &StreamMap { - &self.remote - } - - fn remote_streams_mut(&mut self) -> &mut StreamMap { - &mut self.remote - } - - fn is_valid_local_id(id: StreamId) -> bool { - P::is_valid_local_stream_id(id) - } -} - -/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. -/// -/// > Indicates the maximum number of concurrent streams that the senderg will allow. This -/// > limit is directional: it applies to the number of streams that the sender permits -/// > the receiver to create. Initially, there is no limit to this value. It is -/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit -/// > parallelism. -/// > -/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by -/// > endpoints. A zero value does prevent the creation of new streams; however, this can -/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only -/// > set a zero value for short durations; if a server does not wish to accept requests, -/// > closing the connection is more appropriate. -/// -/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a -/// > value that is below the current number of open streams can either close streams that -/// > exceed the new value or allow streams to complete. -/// -/// This module does NOT close streams when the setting changes. -impl ApplySettings for StreamRecv - where T: ApplySettings -{ - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.max_concurrency = set.max_concurrent_streams(); - self.initial_window_size = set.initial_window_size(); - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set) - } -} - -impl ControlPing for StreamRecv - where T: ControlPing -{ - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} - -impl Stream for StreamRecv - where T: Stream, - T: Sink, SinkError = ConnectionError>, - P: Peer, -{ - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll, T::Error> { - use frame::Frame::*; - - // Since there's only one slot for pending refused streams, it must be cleared - // before polling a frame from the transport. - if let Some(id) = self.pending_refused.take() { - try_ready!(self.send_refusal(id)); - } - - loop { - match try_ready!(self.inner.poll()) { - Some(Headers(v)) => { - let id = v.stream_id(); - let eos = v.is_end_stream(); - - if self.get_reset(id).is_some() { - // TODO send the remote errors when it sends us frames on reset - // streams. - continue; - } - - if let Some(mut s) = self.get_active_mut(id) { - let created = s.recv_headers(eos, self.initial_window_size)?; - assert!(!created); - return Ok(Async::Ready(Some(Headers(v)))); - } - - // Ensure that receiving this frame will not violate the local max - // stream concurrency setting. Ensure that the stream is refused - // before processing additional frames. - if let Some(max) = self.max_concurrency { - let max = max as usize; - if !self.local.is_active(id) && self.local.active_count() >= max - 1 { - // This frame would violate our local max concurrency, so reject - // the stream. - try_ready!(self.send_refusal(id)); - - // Try to process another frame (hopefully for an active - // stream). - continue; - } - } - - let is_closed = { - let stream = self.active_streams.entry(id) - .or_insert_with(|| StreamState::default()); - - let initialized = - stream.recv_headers(eos, self.initial_window_size)?; - - if initialized { - if !P::is_valid_remote_stream_id(id) { - return Err(Reason::ProtocolError.into()); - } - } - - stream.is_closed() - }; - - if is_closed { - self.active_streams.remove(id); - self.reset_streams.insert(id, Reason::NoError); - } - - return Ok(Async::Ready(Some(Headers(v)))); - } - - Some(Data(v)) => { - let id = v.stream_id(); - - if self.get_reset(id).is_some() { - // TODO send the remote errors when it sends us frames on reset - // streams. - continue; - } - - let is_closed = { - let stream = match self.active_streams.get_mut(id) { - None => return Err(Reason::ProtocolError.into()), - Some(s) => s, - }; - stream.recv_data(v.is_end_stream())?; - stream.is_closed() - }; - - if is_closed { - self.reset(id, Reason::NoError); - } - - return Ok(Async::Ready(Some(Data(v)))); - } - - Some(Reset(v)) => { - // Set or update the reset reason. - self.reset(v.stream_id(), v.reason()); - return Ok(Async::Ready(Some(Reset(v)))); - } - - Some(f) => { - let id = f.stream_id(); - - if self.get_reset(id).is_some() { - // TODO send the remote errors when it sends us frames on reset - // streams. - continue; - } - - return Ok(Async::Ready(Some(f))); - } - - None => { - return Ok(Async::Ready(None)); - } - } - } - } -} - -impl Sink for StreamRecv - where T: Sink, SinkError = ConnectionError>, - P: Peer, -{ - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send(&mut self, frame: T::SinkItem) -> StartSend { - use frame::Frame::*; - - // Must be enforced through higher levels. - debug_assert!(self.stream_is_reset(item.stream_id()).is_none()); - - // The local must complete refusing the remote stream before sending any other - // frames. - if let Some(id) = self.pending_refused.take() { - if self.send_refusal(id)?.is_not_ready() { - return Ok(AsyncSink::NotReady(item)); - } - } - - match frame { - Headers(v) => { - let id = v.stream_id(); - let eos = v.is_end_stream(); - - // Transition the stream state, creating a new entry if needed - // - // TODO: Response can send multiple headers frames before body (1xx - // responses). - // - // ACTUALLY(ver), maybe not? - // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 - - // Ensure that sending this frame would not violate the remote's max - // stream concurrency setting. - if let Some(max) = self.remote_max_concurrency { - let max = max as usize; - if !self.active_streams.has_stream(id) - && self.active_streams.len() >= max - 1 { - // This frame would violate our local max concurrency, so reject - // the stream. - if self.send_refusal(id)?.is_not_ready() { - return Ok(AsyncSink::NotReady(Headers(v))); - } - - // Try to process another frame (hopefully for an active - // stream). - return Err(User::MaxConcurrencyExceeded.into()) - } - } - - let is_closed = { - let stream = self.active_streams.entry(id) - .or_insert_with(|| StreamState::default()); - - let initialized = - stream.send_headers::

(eos, self.initial_remote_window_size)?; - - if initialized { - // TODO: Ensure available capacity for a new stream - // This won't be as simple as self.streams.len() as closed - // connections should not be factored. - if !P::is_valid_local_stream_id(id) { - // TODO: clear state - return Err(User::InvalidStreamId.into()); - } - } - - stream.is_closed() - }; - - if is_closed { - self.active_streams.remove(id); - self.reset_streams.insert(id, Reason::NoError); - } - - self.inner.start_send(Headers(v)) - } - - Data(v) => { - match self.active_streams.get_mut(v.stream_id()) { - None => return Err(User::InactiveStreamId.into()), - Some(stream) => { - stream.send_data(v.is_end_stream())?; - self.inner.start_send(Data(v)) - } - - } - } - - Reset(v) => { - let id = v.stream_id(); - self.active_streams.remove(id); - self.reset_streams.insert(id, v.reason()); - self.inner.start_send(Reset(v)) - } - - frame => self.inner.start_send(frame), - } - } - - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - if let Some(id) = self.pending_refused.take() { - try_ready!(self.send_refusal(id)); - } - - self.inner.poll_complete() - } -} - - -impl ReadySink for StreamRecv - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ReadySink, - P: Peer, -{ - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - if let Some(id) = self.pending_refused.take() { - try_ready!(self.send_refusal(id)); - } - - self.inner.poll_ready() - } -} diff --git a/src/proto/stream_recv_close.rs b/src/proto/stream_recv_close.rs new file mode 100644 index 0000000..1988ca9 --- /dev/null +++ b/src/proto/stream_recv_close.rs @@ -0,0 +1,157 @@ +use {ConnectionError}; +use error::Reason; +use error::User; +use frame::{self, Frame}; +use proto::*; +use proto::ready::ReadySink; + +use fnv::FnvHasher; +use futures::*; +use ordermap::OrderMap; +use std::hash::BuildHasherDefault; + +// TODO track "last stream id" for GOAWAY. +// TODO track/provide "next" stream id. +// TODO reset_streams needs to be bounded. +// TODO track reserved streams (PUSH_PROMISE). + +#[derive(Debug)] +pub struct StreamRecvClose { + inner: T, +} + +impl StreamRecvClose + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ControlStreams, +{ + pub fn new(inner: T) -> StreamRecvClose { + StreamRecvClose { inner } + } +} + +impl Stream for StreamRecvClose + where T: Stream, + T: ControlStreams, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll, T::Error> { + use frame::Frame::*; + + let frame = try_ready!(self.inner.poll()); + + unimplemented!() + } +} + +impl Sink for StreamRecvClose + where T: Sink, SinkError = ConnectionError>, + T: ControlStreams, +{ + type SinkItem = Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend, ConnectionError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_complete() + } +} + +impl ReadySink for StreamRecvClose + where T: Sink, SinkError = ConnectionError>, + T: ReadySink, + T: ControlStreams, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } +} + +impl ControlStreams for StreamRecvClose { + fn is_valid_local_id(id: StreamId) -> bool { + T::is_valid_local_id(id) + } + + fn is_valid_remote_id(id: StreamId) -> bool { + T::is_valid_remote_id(id) + } + + fn can_create_local_stream() -> bool { + T::can_create_local_stream() + } + + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } + + fn reset_stream(&mut self, id: StreamId, cause: Reason) { + self.inner.reset_stream(id, cause) + } + + fn is_local_active(&self, id: StreamId) -> bool { + self.inner.is_local_active(id) + } + + fn is_remote_active(&self, id: StreamId) -> bool { + self.inner.is_remote_active(id) + } + + fn local_active_len(&self) -> usize { + self.inner.local_active_len() + } + + fn remote_active_len(&self) -> usize { + self.inner.remote_active_len() + } + + fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.local_update_inital_window_size(old_sz, new_sz) + } + + fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.remote_update_inital_window_size(old_sz, new_sz) + } + + fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.local_flow_controller(id) + } + + fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.remote_flow_controller(id) + } +} + +impl ApplySettings for StreamRecvClose { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set) + } +} + +impl ControlFlow for StreamRecvClose { + fn poll_window_update(&mut self) -> Poll { + self.inner.poll_window_update() + } + + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.expand_window(id, incr) + } +} + +impl ControlPing for StreamRecvClose { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn take_pong(&mut self) -> Option { + self.inner.take_pong() + } +} diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs new file mode 100644 index 0000000..56c94b2 --- /dev/null +++ b/src/proto/stream_recv_open.rs @@ -0,0 +1,352 @@ +use ConnectionError; +use frame::{Frame, StreamId}; +use proto::*; + +use futures::*; + +/// Tracks a connection's streams. +#[derive(Debug)] +pub struct StreamRecvOpen { + inner: T, + max_concurrency: Option, + initial_window_size: WindowSize, + pending_refuse: Option, +} + +impl StreamRecvOpen { + + pub fn new(initial_window_size: WindowSize, + max_concurrency: Option, + inner: T) + -> StreamRecvOpen + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ControlStreams, + { + StreamRecvOpen { + inner, + max_concurrency, + initial_window_size, + pending_refuse: None, + } + } + +} + +impl StreamRecvOpen + where T: Sink, SinkError = ConnectionError>, + T: ControlStreams, +{ + fn send_refusal(&mut self, id: StreamId) -> Poll<(), ConnectionError> { + debug_assert!(self.pending_refuse.is_none()); + + let f = frame::Reset::new(id, Reason::RefusedStream); + match self.inner.start_send(f.into())? { + AsyncSink::Ready => { + self.inner.reset_stream(id, Reason::RefusedStream); + Ok(Async::Ready(())) + } + AsyncSink::NotReady(_) => { + self.pending_refuse = Some(id); + Ok(Async::NotReady) + } + } + } +} + +/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. +/// +/// > Indicates the maximum number of concurrent streams that the senderg will allow. This +/// > limit is directional: it applies to the number of streams that the sender permits +/// > the receiver to create. Initially, there is no limit to this value. It is +/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit +/// > parallelism. +/// > +/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by +/// > endpoints. A zero value does prevent the creation of new streams; however, this can +/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only +/// > set a zero value for short durations; if a server does not wish to accept requests, +/// > closing the connection is more appropriate. +/// +/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a +/// > value that is below the current number of open streams can either close streams that +/// > exceed the new value or allow streams to complete. +/// +/// This module does NOT close streams when the setting changes. +impl ApplySettings for StreamRecvOpen + where T: ApplySettings +{ + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.max_concurrency = set.max_concurrent_streams(); + self.initial_window_size = set.initial_window_size(); + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set) + } +} + +impl Stream for StreamRecvOpen + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ControlStreams, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll, T::Error> { + use frame::Frame::*; + + // Since there's only one slot for pending refused streams, it must be cleared + // before polling a frame from the transport. + if let Some(id) = self.pending_refuse.take() { + try_ready!(self.send_refusal(id)); + } + + loop { + let frame = match try_ready!(self.inner.poll()) { + None => return Ok(Async::Ready(None)), + Some(f) => f, + }; + + let id = frame.stream_id(); + if id.is_zero() { + return Ok(Async::Ready(Some(frame))); + } + + if self.inner.get_reset(id).is_some() { + // For now, just ignore frames on reset streams. + // TODO tell the remote to knock it off? + continue; + } + + if T::is_valid_remote_id(id) { + unimplemented!() + } + } + } +} + +impl Sink for StreamRecvOpen + where T: Sink, SinkError = ConnectionError>, + T: ControlStreams, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, frame: T::SinkItem) -> StartSend { + use frame::Frame::*; + + // The local must complete refusing the remote stream before sending any other + // frames. + if let Some(id) = self.pending_refuse.take() { + if self.send_refusal(id)?.is_not_ready() { + return Ok(AsyncSink::NotReady(frame)); + } + } + + let id = frame.stream_id(); + if !id.is_zero() { + // enforced by StreamSend. + debug_assert!(self.inner.get_reset(id).is_none()); + + let eos = frame.is_end_stream(); + } + + self.inner.start_send(frame) + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + if let Some(id) = self.pending_refuse.take() { + try_ready!(self.send_refusal(id)); + } + + self.inner.poll_complete() + } +} + + +impl ReadySink for StreamRecvOpen + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ReadySink, + T: ControlStreams, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + if let Some(id) = self.pending_refuse.take() { + try_ready!(self.send_refusal(id)); + } + + self.inner.poll_ready() + } +} + + + // Some(Headers(v)) => { + // let id = v.stream_id(); + // let eos = v.is_end_stream(); + + // if self.get_reset(id).is_some() { + // // TODO send the remote errors when it sends us frames on reset + // // streams. + // continue; + // } + + // if let Some(mut s) = self.get_active_mut(id) { + // let created = s.recv_headers(eos, self.initial_window_size)?; + // assert!(!created); + // return Ok(Async::Ready(Some(Headers(v)))); + // } + + // // Ensure that receiving this frame will not violate the local max + // // stream concurrency setting. Ensure that the stream is refused + // // before processing additional frames. + // if let Some(max) = self.max_concurrency { + // let max = max as usize; + // if !self.local.is_active(id) && self.local.active_count() >= max - 1 { + // // This frame would violate our local max concurrency, so reject + // // the stream. + // try_ready!(self.send_refusal(id)); + + // // Try to process another frame (hopefully for an active + // // stream). + // continue; + // } + // } + + // let is_closed = { + // let stream = self.active_streams.entry(id) + // .or_insert_with(|| StreamState::default()); + + // let initialized = + // stream.recv_headers(eos, self.initial_window_size)?; + + // if initialized { + // if !P::is_valid_remote_stream_id(id) { + // return Err(Reason::ProtocolError.into()); + // } + // } + + // stream.is_closed() + // }; + + // if is_closed { + // self.active_streams.remove(id); + // self.reset_streams.insert(id, Reason::NoError); + // } + + // return Ok(Async::Ready(Some(Headers(v)))); + // } + + // Some(Data(v)) => { + // let id = v.stream_id(); + + // if self.get_reset(id).is_some() { + // // TODO send the remote errors when it sends us frames on reset + // // streams. + // continue; + // } + + // let is_closed = { + // let stream = match self.active_streams.get_mut(id) { + // None => return Err(Reason::ProtocolError.into()), + // Some(s) => s, + // }; + // stream.recv_data(v.is_end_stream())?; + // stream.is_closed() + // }; + + // if is_closed { + // self.reset(id, Reason::NoError); + // } + + // return Ok(Async::Ready(Some(Data(v)))); + // } + + // Some(Reset(v)) => { + // // Set or update the reset reason. + // self.reset(v.stream_id(), v.reason()); + // return Ok(Async::Ready(Some(Reset(v)))); + // } + + // Some(f) => { + // let id = f.stream_id(); + + // if self.get_reset(id).is_some() { + // // TODO send the remote errors when it sends us frames on reset + // // streams. + // continue; + // } + + // return Ok(Async::Ready(Some(f))); + // } + + // None => { + // return Ok(Async::Ready(None)); + // } + + +impl ControlStreams for StreamRecvOpen { + fn is_valid_local_id(id: StreamId) -> bool { + T::is_valid_local_id(id) + } + + fn is_valid_remote_id(id: StreamId) -> bool { + T::is_valid_remote_id(id) + } + + fn can_create_local_stream() -> bool { + T::can_create_local_stream() + } + + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } + + fn reset_stream(&mut self, id: StreamId, cause: Reason) { + self.inner.reset_stream(id, cause) + } + + fn is_local_active(&self, id: StreamId) -> bool { + self.inner.is_local_active(id) + } + + fn is_remote_active(&self, id: StreamId) -> bool { + self.inner.is_remote_active(id) + } + + fn local_active_len(&self) -> usize { + self.inner.local_active_len() + } + + fn remote_active_len(&self) -> usize { + self.inner.remote_active_len() + } + + fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.local_update_inital_window_size(old_sz, new_sz) + } + + fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.remote_update_inital_window_size(old_sz, new_sz) + } + + fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.local_flow_controller(id) + } + + fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.remote_flow_controller(id) + } +} + +impl ControlPing for StreamRecvOpen { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn take_pong(&mut self) -> Option { + self.inner.take_pong() + } +} diff --git a/src/proto/stream_send.rs b/src/proto/stream_send.rs deleted file mode 100644 index 9bde351..0000000 --- a/src/proto/stream_send.rs +++ /dev/null @@ -1,217 +0,0 @@ -use {ConnectionError}; -use error::Reason; -use error::User; -use frame::{self, Frame}; -use proto::*; - -use fnv::FnvHasher; -use ordermap::OrderMap; -use std::hash::BuildHasherDefault; -use std::marker::PhantomData; - -// TODO track "last stream id" for GOAWAY. -// TODO track/provide "next" stream id. -// TODO reset_streams needs to be bounded. -// TODO track reserved streams (PUSH_PROMISE). - -/// Tracks a connection's streams. -#[derive(Debug)] -pub struct StreamSend { - inner: T, - peer: PhantomData

, - max_concurrency: Option, - initial_window_size: WindowSize, -} - -impl StreamSend - where T: Stream, - T: Sink, SinkError = ConnectionError>, - P: Peer -{ - pub fn new(initial_window_size: WindowSize, - max_concurrency: Option, - inner: T) - -> StreamSend - { - StreamSend { - inner, - peer: PhantomData, - max_concurrency, - initial_window_size, - } - } - - pub fn try_open_local(&mut self, frame: Frame) -> Result<(), ConnectionError> { - unimplemented!() - } - - pub fn try_close(&mut self, frame: Frame) -> Result<(), ConnectionError> { - unimplemented!() - } -} - - -/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. -/// -/// > Indicates the maximum number of concurrent streams that the senderg will allow. This -/// > limit is directional: it applies to the number of streams that the sender permits -/// > the receiver to create. Initially, there is no limit to this value. It is -/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit -/// > parallelism. -/// > -/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by -/// > endpoints. A zero value does prevent the creation of new streams; however, this can -/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only -/// > set a zero value for short durations; if a server does not wish to accept requests, -/// > closing the connection is more appropriate. -/// -/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a -/// > value that is below the current number of open streams can either close streams that -/// > exceed the new value or allow streams to complete. -/// -/// This module does NOT close streams when the setting changes. -impl ApplySettings for StreamSend - where T: ApplySettings -{ - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.max_concurrency = set.max_concurrent_streams(); - self.initial_window_size = set.initial_window_size(); - self.inner.apply_remote_settings(set) - } -} - -impl ControlPing for StreamSend - where T: ControlPing -{ - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } -} - -impl Stream for StreamSend - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ControlStreams, - P: Peer, -{ - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll, T::Error> { - self.inner.poll() - } -} - -impl Sink for StreamSend - where T: Sink, SinkError = ConnectionError>, - T: ControlStreams, - P: Peer, -{ - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send(&mut self, item: T::SinkItem) -> StartSend { - use frame::Frame::*; - - // Must be enforced through higher levels. - if let Some(rst) = self.inner.get_reset(item.stream_id()) { - return Err(User::StreamReset(rst).into()); - } - - match item { - Headers(v) => { - let id = v.stream_id(); - let eos = v.is_end_stream(); - - // Transition the stream state, creating a new entry if needed - // - // TODO: Response can send multiple headers frames before body (1xx - // responses). - // - // ACTUALLY(ver), maybe not? - // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 - - // Ensure that sending this frame would not violate the remote's max - // stream concurrency setting. - if let Some(max) = self.max_concurrency { - let max = max as usize; - let streams = self.inner.streams(); - if !streams.is_active(id) && streams.active_count() >= max - 1 { - return Err(User::MaxConcurrencyExceeded.into()) - } - } - - let is_closed = { - let stream = self.active_streams.entry(id) - .or_insert_with(|| StreamState::default()); - - let initialized = - stream.send_headers::

(eos, self.initial_window_size)?; - - if initialized { - // TODO: Ensure available capacity for a new stream - // This won't be as simple as self.streams.len() as closed - // connections should not be factored. - if !P::is_valid_local_stream_id(id) { - // TODO: clear state - return Err(User::InvalidStreamId.into()); - } - } - - stream.is_closed() - }; - - if is_closed { - self.active_streams.remove(id); - self.reset_streams.insert(id, Reason::NoError); - } - - self.inner.start_send(Headers(v)) - } - - Data(v) => { - match self.active_streams.get_mut(v.stream_id()) { - None => return Err(User::InactiveStreamId.into()), - Some(stream) => { - stream.send_data(v.is_end_stream())?; - self.inner.start_send(Data(v)) - } - - } - } - - Reset(v) => { - let id = v.stream_id(); - self.active_streams.remove(id); - self.reset_streams.insert(id, v.reason()); - self.inner.start_send(Reset(v)) - } - - frame => self.inner.start_send(frame), - } - } - - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - self.inner.poll_complete() - } -} - -impl ReadySink for StreamSend - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ControlStreams, - T: ReadySink, - P: Peer, -{ - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_ready() - } -} diff --git a/src/proto/stream_send_close.rs b/src/proto/stream_send_close.rs new file mode 100644 index 0000000..b78fd64 --- /dev/null +++ b/src/proto/stream_send_close.rs @@ -0,0 +1,141 @@ +use ConnectionError; +use client::Client; +use error::Reason; +use error::User; +use frame::{self, Frame}; +use proto::*; + +use futures::*; +use std::marker::PhantomData; + +// TODO track "last stream id" for GOAWAY. +// TODO track/provide "next" stream id. +// TODO reset_streams needs to be bounded. +// TODO track reserved streams (PUSH_PROMISE). + +#[derive(Debug)] +pub struct StreamSendClose { + inner: T, +} + +impl StreamSendClose + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ControlStreams, +{ + pub fn new(inner: T) -> StreamSendClose { + StreamSendClose { inner } + } +} + +impl Stream for StreamSendClose + where T: Stream, + T: ControlStreams, +{ + type Item = Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + self.inner.poll() + } +} + +impl Sink for StreamSendClose + where T: Sink, SinkError = ConnectionError>, + T: ControlStreams, +{ + type SinkItem = Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend, ConnectionError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_complete() + } +} + +impl ReadySink for StreamSendClose + where T: Sink, SinkError = ConnectionError>, + T: ReadySink, + T: ControlStreams, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } +} + +impl ApplySettings for StreamSendClose { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set) + } +} + +impl ControlStreams for StreamSendClose { + fn is_valid_local_id(id: StreamId) -> bool { + T::is_valid_local_id(id) + } + + fn is_valid_remote_id(id: StreamId) -> bool { + T::is_valid_remote_id(id) + } + + fn can_create_local_stream() -> bool { + T::can_create_local_stream() + } + + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } + + fn reset_stream(&mut self, id: StreamId, cause: Reason) { + self.inner.reset_stream(id, cause) + } + + fn is_local_active(&self, id: StreamId) -> bool { + self.inner.is_local_active(id) + } + + fn is_remote_active(&self, id: StreamId) -> bool { + self.inner.is_remote_active(id) + } + + fn local_active_len(&self) -> usize { + self.inner.local_active_len() + } + + fn remote_active_len(&self) -> usize { + self.inner.remote_active_len() + } + + fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.local_update_inital_window_size(old_sz, new_sz) + } + + fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.remote_update_inital_window_size(old_sz, new_sz) + } + + fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.local_flow_controller(id) + } + + fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.remote_flow_controller(id) + } +} + +impl ControlPing for StreamSendClose { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn take_pong(&mut self) -> Option { + self.inner.take_pong() + } +} diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs new file mode 100644 index 0000000..3e6d367 --- /dev/null +++ b/src/proto/stream_send_open.rs @@ -0,0 +1,224 @@ +use ConnectionError; +use error::User::{InvalidStreamId, StreamReset}; +use frame::{Frame, SettingSet}; +use proto::*; + +use futures::*; + +#[derive(Debug)] +pub struct StreamSendOpen { + inner: T, + + max_concurrency: Option, + initial_window_size: WindowSize, +} + +impl StreamSendOpen + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ControlStreams, +{ + pub fn new(initial_window_size: WindowSize, + max_concurrency: Option, + inner: T) + -> StreamSendOpen + { + StreamSendOpen { + inner, + max_concurrency, + initial_window_size, + } + } +} + +/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. +/// +/// > Indicates the maximum number of concurrent streams that the senderg will allow. This +/// > limit is directional: it applies to the number of streams that the sender permits +/// > the receiver to create. Initially, there is no limit to this value. It is +/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit +/// > parallelism. +/// > +/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by +/// > endpoints. A zero value does prevent the creation of new streams; however, this can +/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only +/// > set a zero value for short durations; if a server does not wish to accept requests, +/// > closing the connection is more appropriate. +/// +/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a +/// > value that is below the current number of open streams can either close streams that +/// > exceed the new value or allow streams to complete. +/// +/// This module does NOT close streams when the setting changes. +impl ApplySettings for StreamSendOpen { + fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { + self.max_concurrency = set.max_concurrent_streams(); + self.initial_window_size = set.initial_window_size(); + self.inner.apply_remote_settings(set) + } +} + +impl Stream for StreamSendOpen + where T: Stream, + T: ControlStreams, +{ + type Item = Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + self.inner.poll() + } +} + +impl Sink for StreamSendOpen + where T: Sink, SinkError = ConnectionError>, + T: ControlStreams, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, frame: T::SinkItem) -> StartSend { + use frame::Frame::*; + + let id = frame.stream_id(); + if id.is_zero() { + // Nothing to do on connection frames. + return self.inner.start_send(frame); + } + + // Reset the stream immediately and send the Reset on the underlying transport. + if let Reset(rst) = frame { + self.inner.reset_stream(id, rst.reason()); + return self.inner.start_send(Reset(rst)); + } + + // Ensure that the stream hasn't been closed otherwise. + if let Some(reason) = self.inner.get_reset(id) { + return Err(StreamReset(reason).into()) + } + + if T::is_valid_local_id(id) { + if self.inner.is_local_active(id) { + // If the frame ends thestream, it will be handled in stream_recv. + return self.inner.start_send(frame); + } + + if T::can_create_local_stream() { + let has_capacity = match self.max_concurrency { + None => true, + Some(max) => self.inner.local_active_len() < (max as usize), + }; + if has_capacity { + // create that shit. + unimplemented!(); + } + } + } else { + if self.inner.is_remote_active(id) { + // If the frame was part of a remote stream, it MUST already exist. If the + // frame ends thestream, it will be handled in stream_recv. + return self.inner.start_send(frame); + } + + if let Reset(rst) = frame { + return self.inner.start_send(Reset(rst)); + } + } + + // Tried to send a frame on a stream + return Err(InvalidStreamId.into()); + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + self.inner.poll_complete() + } +} + +impl ReadySink for StreamSendOpen + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ControlStreams, + T: ReadySink, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } +} + +impl ControlStreams for StreamSendOpen { + fn is_valid_local_id(id: StreamId) -> bool { + T::is_valid_local_id(id) + } + + fn is_valid_remote_id(id: StreamId) -> bool { + T::is_valid_remote_id(id) + } + + fn can_create_local_stream() -> bool { + T::can_create_local_stream() + } + + fn get_reset(&self, id: StreamId) -> Option { + self.inner.get_reset(id) + } + + fn reset_stream(&mut self, id: StreamId, cause: Reason) { + self.inner.reset_stream(id, cause) + } + + fn is_local_active(&self, id: StreamId) -> bool { + self.inner.is_local_active(id) + } + + fn is_remote_active(&self, id: StreamId) -> bool { + self.inner.is_remote_active(id) + } + + fn local_active_len(&self) -> usize { + self.inner.local_active_len() + } + + fn remote_active_len(&self) -> usize { + self.inner.remote_active_len() + } + + fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.local_update_inital_window_size(old_sz, new_sz) + } + + fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.remote_update_inital_window_size(old_sz, new_sz) + } + + fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.local_flow_controller(id) + } + + fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.remote_flow_controller(id) + } +} + +impl ControlFlow for StreamSendOpen { + fn poll_window_update(&mut self) -> Poll { + self.inner.poll_window_update() + } + + fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.expand_window(id, incr) + } +} + +impl ControlPing for StreamSendOpen { + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn take_pong(&mut self) -> Option { + self.inner.take_pong() + } +} diff --git a/src/proto/stream_store.rs b/src/proto/stream_store.rs new file mode 100644 index 0000000..ff8345c --- /dev/null +++ b/src/proto/stream_store.rs @@ -0,0 +1,250 @@ +use {ConnectionError, Peer, StreamId}; +use error::Reason; +use proto::*; + +use fnv::FnvHasher; +use ordermap::OrderMap; +use std::hash::BuildHasherDefault; +use std::marker::PhantomData; + +/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up +/// to Connection). +pub trait ControlStreams { + fn is_valid_local_id(id: StreamId) -> bool; + fn is_valid_remote_id(id: StreamId) -> bool; + + fn can_create_local_stream() -> bool; + fn can_create_remote_stream() -> bool { + !Self::can_create_local_stream() + } + + fn get_reset(&self, id: StreamId) -> Option; + fn reset_stream(&mut self, id: StreamId, cause: Reason); + + fn is_local_active(&self, id: StreamId) -> bool; + fn is_remote_active(&self, id: StreamId) -> bool; + + fn local_active_len(&self) -> usize; + fn remote_active_len(&self) -> usize; + + fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; + fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; + + fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32); + fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32); + + // fn get_active(&self, id: StreamId) -> Option<&StreamState> { + // self.streams(id).get_active(id) + // } + + // fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { + // self.streams_mut(id).get_active_mut(id) + // } +} + +/// Holds the underlying stream state to be accessed by upper layers. +// TODO track reserved streams +// TODO constrain the size of `reset` +#[derive(Debug, Default)] +pub struct StreamStore { + inner: T, + + /// Holds active streams initiated by the local endpoint. + local_active: OrderMap>, + + /// Holds active streams initiated by the remote endpoint. + remote_active: OrderMap>, + + /// Holds active streams initiated by the remote. + reset: OrderMap>, + + _phantom: PhantomData

, +} + +impl StreamStore + where T: Stream, + T: Sink, SinkError = ConnectionError>, + P: Peer, +{ + pub fn new(inner: T) -> StreamStore { + StreamStore { + inner, + local_active: OrderMap::default(), + remote_active: OrderMap::default(), + reset: OrderMap::default(), + _phantom: PhantomData, + } + } +} + +impl Stream for StreamStore + where T: Stream, +{ + type Item = Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + self.inner.poll() + } +} + +impl Sink for StreamStore + where T: Sink, SinkError = ConnectionError>, +{ + type SinkItem = Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend, ConnectionError> { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_complete() + } +} + +impl ReadySink for StreamStore + where T: Sink, SinkError = ConnectionError>, + T: ReadySink, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } +} + +impl ControlStreams for StreamStore { + fn is_valid_local_id(id: StreamId) -> bool { + P::is_valid_local_stream_id(id) + } + + fn is_valid_remote_id(id: StreamId) -> bool { + P::is_valid_remote_stream_id(id) + } + + fn can_create_local_stream() -> bool { + P::can_create_local_stream() + } + + fn get_reset(&self, id: StreamId) -> Option { + self.reset.get(&id).map(|r| *r) + } + + fn reset_stream(&mut self, id: StreamId, cause: Reason) { + if P::is_valid_local_stream_id(id) { + self.local_active.remove(&id); + } else { + self.remote_active.remove(&id); + } + self.reset.insert(id, cause); + } + + fn is_local_active(&self, id: StreamId) -> bool { + self.local_active.contains_key(&id) + } + + fn is_remote_active(&self, id: StreamId) -> bool { + self.remote_active.contains_key(&id) + } + + fn local_active_len(&self) -> usize { + self.local_active.len() + } + + fn remote_active_len(&self) -> usize { + self.remote_active.len() + } + + fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + if new_sz < old_sz { + let decr = old_sz - new_sz; + + for s in self.local_active.values_mut() { + if let Some(fc) = s.local_flow_controller() { + fc.shrink_window(decr); + } + } + + for s in self.remote_active.values_mut() { + if let Some(fc) = s.local_flow_controller() { + fc.shrink_window(decr); + } + } + } else { + let incr = new_sz - old_sz; + + for s in self.local_active.values_mut() { + if let Some(fc) = s.local_flow_controller() { + fc.expand_window(incr); + } + } + + for s in self.remote_active.values_mut() { + if let Some(fc) = s.local_flow_controller() { + fc.expand_window(incr); + } + } + } + } + + fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + if new_sz < old_sz { + let decr = old_sz - new_sz; + + for s in self.local_active.values_mut() { + if let Some(fc) = s.remote_flow_controller() { + fc.shrink_window(decr); + } + } + + for s in self.remote_active.values_mut() { + if let Some(fc) = s.remote_flow_controller() { + fc.shrink_window(decr); + } + } + } else { + let incr = new_sz - old_sz; + + for s in self.local_active.values_mut() { + if let Some(fc) = s.remote_flow_controller() { + fc.expand_window(incr); + } + } + + for s in self.remote_active.values_mut() { + if let Some(fc) = s.remote_flow_controller() { + fc.expand_window(incr); + } + } + } + } + + fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + if id.is_zero() { + None + } else if P::is_valid_local_stream_id(id) { + self.local_active.get_mut(&id).and_then(|s| s.local_flow_controller()) + } else { + self.remote_active.get_mut(&id).and_then(|s| s.local_flow_controller()) + } + } + + fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + if id.is_zero() { + None + } else if P::is_valid_local_stream_id(id) { + self.local_active.get_mut(&id).and_then(|s| s.remote_flow_controller()) + } else { + self.remote_active.get_mut(&id).and_then(|s| s.remote_flow_controller()) + } + } +} + +impl ApplySettings for StreamStore { + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_remote_settings(set) + } +} diff --git a/src/server.rs b/src/server.rs index f1b0331..96ad834 100644 --- a/src/server.rs +++ b/src/server.rs @@ -110,7 +110,7 @@ impl Peer for Server { type Send = http::response::Head; type Poll = http::request::Head; - fn is_valid_local_stream_id(_id: StreamId) -> bool { + fn is_valid_local_stream_id(id: StreamId) -> bool { id.is_server_initiated() }