From 0d84c98c893f2255f5201190015ccb93680a2121 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 19 Jul 2017 19:53:33 +0000 Subject: [PATCH] wip --- src/client.rs | 8 +- src/error.rs | 4 +- src/lib.rs | 11 + src/proto/connection.rs | 4 - src/proto/flow_control.rs | 32 ++- src/proto/mod.rs | 92 +++++--- src/proto/settings.rs | 34 +-- src/proto/state.rs | 89 ++++--- .../{stream_tracker.rs => stream_recv.rs} | 150 ++++++------ src/proto/stream_send.rs | 217 ++++++++++++++++++ src/server.rs | 6 +- 11 files changed, 488 insertions(+), 159 deletions(-) rename src/proto/{stream_tracker.rs => stream_recv.rs} (75%) create mode 100644 src/proto/stream_send.rs diff --git a/src/client.rs b/src/client.rs index eaaf601..439be61 100644 --- a/src/client.rs +++ b/src/client.rs @@ -56,8 +56,12 @@ impl Peer for Client { id.is_client_initiated() } - fn is_valid_remote_stream_id(_id: StreamId) -> bool { - false + fn is_valid_remote_stream_id(id: StreamId) -> bool { + id.is_server_initiated() + } + + fn can_create_local_stream() -> bool { + true } fn convert_send_message( diff --git a/src/error.rs b/src/error.rs index d5a3429..d04cde0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -82,7 +82,7 @@ pub enum User { Corrupt, /// The stream state has been reset. - StreamReset, + StreamReset(Reason), /// The application attempted to initiate too many streams to remote. MaxConcurrencyExceeded, @@ -125,7 +125,7 @@ macro_rules! user_desc { InactiveStreamId => concat!($prefix, "inactive stream ID"), UnexpectedFrameType => concat!($prefix, "unexpected frame type"), FlowControlViolation => concat!($prefix, "flow control violation"), - StreamReset => concat!($prefix, "frame sent on reset stream"), + StreamReset(_) => concat!($prefix, "frame sent on reset stream"), Corrupt => concat!($prefix, "connection state corrupt"), MaxConcurrencyExceeded => concat!($prefix, "stream would exceed remote max concurrency"), } diff --git a/src/lib.rs b/src/lib.rs index f05ccba..c0e9b0d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,6 +85,17 @@ pub trait Peer { /// remote node. fn is_valid_remote_stream_id(id: StreamId) -> bool; + + fn can_create_local_stream() -> bool; + fn can_create_remote_stream() -> bool { + !Self::can_create_local_stream() + } + + //fn can_reserve_local_stream() -> bool; + // fn can_reserve_remote_stream() -> bool { + // !self.can_reserve_local_stream + // } + #[doc(hidden)] fn convert_send_message( id: StreamId, diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 62f3a70..0e99f53 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -216,10 +216,6 @@ impl Sink for Connection match item { Frame::Headers { id, headers, end_of_stream } => { - if self.inner.stream_is_reset(id).is_some() { - return Err(error::User::StreamReset.into()); - } - // This is a one-way conversion. By checking `poll_ready` first (above), // it's already been determined that the inner `Sink` can accept the item. // If the item is rejected, then there is a bug. diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index abce977..16eab85 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -68,7 +68,7 @@ impl FlowControl { if id.is_zero() { Some(&mut self.local_connection) } else { - self.inner.streams_mut().get_mut(id).and_then(|s| s.local_flow_controller()) + self.inner.local_flow_controller(id) } } @@ -76,23 +76,31 @@ impl FlowControl { if id.is_zero() { Some(&mut self.remote_connection) } else { - self.inner.streams_mut().get_mut(id).and_then(|s| s.remote_flow_controller()) + self.inner.remote_flow_controller(id) } } } /// Proxies access to streams. impl ControlStreams for FlowControl { - fn streams(&self) -> &StreamMap { - self.inner.streams() + fn local_streams(&self) -> &StreamMap { + self.inner.local_streams() } - fn streams_mut(&mut self) -> &mut StreamMap { - self.inner.streams_mut() + fn local_streams_mut(&mut self) -> &mut StreamMap { + self.inner.local_streams_mut() } - fn stream_is_reset(&self, id: StreamId) -> Option { - self.inner.stream_is_reset(id) + fn remote_streams(&self) -> &StreamMap { + self.inner.local_streams() + } + + fn remote_streams_mut(&mut self) -> &mut StreamMap { + self.inner.local_streams_mut() + } + + fn is_valid_local_id(id: StreamId) -> bool { + T::is_valid_local_id(id) } } @@ -101,14 +109,14 @@ impl ControlFlow for FlowControl { fn poll_window_update(&mut self) -> Poll { // This biases connection window updates, which probably makese sense. if let Some(incr) = self.remote_connection.apply_window_update() { - return Ok(Async::Ready(WindowUpdate(StreamId::zero(), incr))); + return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr))); } // TODO this should probably account for stream priority? while let Some(id) = self.remote_pending_streams.pop_front() { if let Some(mut flow) = self.remote_flow_controller(id) { if let Some(incr) = flow.apply_window_update() { - return Ok(Async::Ready(WindowUpdate(id, incr))); + return Ok(Async::Ready(WindowUpdate::new(id, incr))); } } } @@ -131,8 +139,8 @@ impl ControlFlow for FlowControl { self.local_pending_streams.push_back(id); } Ok(()) - } else if self.stream_is_reset(id).is_some() { - Err(error::User::StreamReset.into()) + } else if let Some(rst) = self.get_reset(id) { + Err(error::User::StreamReset(rst).into()) } else { Err(error::User::InvalidStreamId.into()) } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 76db759..9db6933 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -16,7 +16,8 @@ mod ping_pong; mod ready; mod settings; mod state; -mod stream_tracker; +mod stream_recv; +mod stream_send; pub use self::connection::Connection; pub use self::flow_control::FlowControl; @@ -26,7 +27,8 @@ pub use self::framed_write::FramedWrite; pub use self::ping_pong::PingPong; pub use self::ready::ReadySink; pub use self::settings::Settings; -pub use self::stream_tracker::StreamTracker; +pub use self::stream_recv::StreamRecv; +pub use self::stream_send::StreamSend; use self::state::{StreamMap, StreamState}; @@ -82,14 +84,19 @@ use self::state::{StreamMap, StreamState}; /// type Transport= Settings< - FlowControl< - StreamTracker< - PingPong< - Framer, - B>, - P>>>; + Streams< + PingPong< + Codec, + B>, + P>>; -type Framer = +type Streams = + StreamSend< + FlowControl< + StreamRecv>, + P>; + +type Codec = FramedRead< FramedWrite>; @@ -111,14 +118,22 @@ pub trait ApplySettings { } #[derive(Debug, Copy, Clone)] -pub struct WindowUpdate(pub StreamId, pub WindowSize); +pub struct WindowUpdate { + stream_id: StreamId, + increment: WindowSize +} + impl WindowUpdate { + pub fn new(stream_id: StreamId, increment: WindowSize) -> WindowUpdate { + WindowUpdate { stream_id, increment } + } + pub fn stream_id(&self) -> StreamId { - self.0 + self.stream_id } pub fn increment(&self) -> WindowSize { - self.1 + self.increment } } @@ -139,14 +154,35 @@ pub trait ControlFlow { /// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up /// to Connection). pub trait ControlStreams { - /// Accesses the map of all active streams. - fn streams(&self)-> &StreamMap; + fn is_valid_local_id(id: StreamId) -> bool; + fn is_valid_remote_id(id: StreamId) -> bool { + !id.is_zero() && !Self::is_valid_local_id(id) + } - /// Mutably accesses the map of all active streams. - fn streams_mut(&mut self) -> &mut StreamMap; + fn get_active(&self, id: StreamId) -> Option<&StreamState> { + self.streams(id).get_active(id) + } - /// Checks whether a stream has been reset. - fn stream_is_reset(&self, id: StreamId) -> Option; + fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { + self.streams_mut(id).get_active_mut(id) + } + + + fn get_reset(&self, id: StreamId) -> Option { + self.streams(id).get_reset(id) + } + + fn reset(&mut self, id: StreamId, cause: Reason) { + self.streams_mut(id).reset(id, cause); + } + + fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.streams_mut(id).local_flow_controller(id) + } + + fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.streams_mut(id).remote_flow_controller(id) + } } pub type PingPayload = [u8; 8]; @@ -206,26 +242,24 @@ pub fn from_server_handshaker(settings: Settings // Replace Settings' writer with a full transport. let transport = settings.swap_inner(|io| { // Delimit the frames. - let framer = length_delimited::Builder::new() + let framed = length_delimited::Builder::new() .big_endian() .length_field_length(3) .length_adjustment(9) .num_skip(0) // Don't skip the header .new_read(io); - FlowControl::new( - initial_local_window_size, + StreamSend::new( initial_remote_window_size, - StreamTracker::new( + remote_max_concurrency, + FlowControl::new( initial_local_window_size, initial_remote_window_size, - local_max_concurrency, - remote_max_concurrency, - PingPong::new( - FramedRead::new(framer) - ) - ) - ) + StreamRecv::new( + initial_local_window_size, + local_max_concurrency, + PingPong::new( + FramedRead::new(framed))))) }); connection::new(transport) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 355b368..85f8c38 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -24,7 +24,7 @@ pub struct Settings { remaining_acks: usize, // True when the local settings must be flushed to the remote - is_local_dirty: bool, + is_valid_local_id_dirty: bool, // True when we have received a settings frame from the remote. received_remote: bool, @@ -39,7 +39,7 @@ impl Settings local: local, remote: SettingSet::default(), remaining_acks: 0, - is_local_dirty: true, + is_valid_local_id_dirty: true, received_remote: false, } } @@ -61,18 +61,18 @@ impl Settings local: self.local, remote: self.remote, remaining_acks: self.remaining_acks, - is_local_dirty: self.is_local_dirty, + is_valid_local_id_dirty: self.is_valid_local_id_dirty, received_remote: self.received_remote, } } fn try_send_pending(&mut self) -> Poll<(), ConnectionError> { - trace!("try_send_pending; dirty={} acks={}", self.is_local_dirty, self.remaining_acks); - if self.is_local_dirty { + trace!("try_send_pending; dirty={} acks={}", self.is_valid_local_id_dirty, self.remaining_acks); + if self.is_valid_local_id_dirty { let frame = frame::Settings::new(self.local.clone()); try_ready!(self.try_send(frame)); - self.is_local_dirty = false; + self.is_valid_local_id_dirty = false; } while self.remaining_acks > 0 { @@ -96,16 +96,24 @@ impl Settings } impl ControlStreams for Settings { - fn streams(&self) -> &StreamMap { - self.inner.streams() + fn local_streams(&self) -> &StreamMap { + self.inner.local_streams() } - fn streams_mut(&mut self) -> &mut StreamMap { - self.inner.streams_mut() + fn local_streams_mut(&mut self) -> &mut StreamMap { + self.inner.local_streams_mut() } - fn stream_is_reset(&self, id: StreamId) -> Option { - self.inner.stream_is_reset(id) + fn remote_streams(&self) -> &StreamMap { + self.inner.local_streams() + } + + fn remote_streams_mut(&mut self) -> &mut StreamMap { + self.inner.local_streams_mut() + } + + fn is_valid_local_id(id: StreamId) -> bool { + T::is_valid_local_id(id) } } @@ -132,7 +140,7 @@ impl ControlPing for Settings { impl ControlSettings for Settings{ fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { self.local = local; - self.is_local_dirty = true; + self.is_valid_local_id_dirty = true; Ok(()) } diff --git a/src/proto/state.rs b/src/proto/state.rs index ab4d79d..03d6b7a 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -1,5 +1,5 @@ use {Peer, StreamId}; -use error::ConnectionError; +use error::{ConnectionError, Reason}; use error::Reason::*; use error::User::*; use proto::{FlowControlState, WindowSize}; @@ -7,6 +7,7 @@ use proto::{FlowControlState, WindowSize}; use fnv::FnvHasher; use ordermap::{Entry, OrderMap}; use std::hash::BuildHasherDefault; +use std::marker::PhantomData; /// Represents the state of an H2 stream /// @@ -76,10 +77,9 @@ impl StreamState { /// /// Returns true if this state transition results in iniitializing the /// stream id. `Err` is returned if this is an invalid state transition. - pub fn recv_headers(&mut self, - eos: bool, - initial_recv_window_size: WindowSize) + pub fn recv_headers

(&mut self, eos: bool, initial_window_size: WindowSize) -> Result + where P: Peer { use self::StreamState::*; use self::PeerState::*; @@ -90,7 +90,7 @@ impl StreamState { if eos { *self = HalfClosedRemote(local); } else { - let remote = Data(FlowControlState::with_initial_size(initial_recv_window_size)); + let remote = Data(FlowControlState::with_initial_size(initial_window_size)); *self = Open { local, remote }; } Ok(true) @@ -111,7 +111,8 @@ impl StreamState { if eos { *self = Closed; } else { - *self = HalfClosedLocal(Data(FlowControlState::with_initial_size(initial_recv_window_size))); + let remote = FlowControlState::with_initial_size(initial_window_size); + *self = HalfClosedLocal(Data(remote)); }; Ok(false) } @@ -291,46 +292,76 @@ impl PeerState { } } +// TODO track reserved streams +// TODO constrain the size of `reset` #[derive(Debug, Default)] -pub struct StreamMap { - inner: OrderMap> +pub struct StreamMap

{ + /// Holds active streams initiated by the local endpoint. + local_active: OrderMap>, + + /// Holds active streams initiated by the remote endpoint. + remote_active: OrderMap>, + + /// Holds active streams initiated by the remote. + reset: OrderMap>, + + _phantom: PhantomData

, } -impl StreamMap { - pub fn get_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { - self.inner.get_mut(&id) +impl StreamMap

{ + pub fn active(&mut self, id: StreamId) -> Option<&StreamState> { + assert!(!id.is_zero()); + if P::is_valid_local_stream_id(id) { + self.local_active.get(id) + } else { + self.remote_active.get(id) + } + } + + pub fn active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { + assert!(!id.is_zero()); + if P::is_valid_local_stream_id(id) { + self.local_active.get_mut(id) + } else { + self.remote_active.get_mut(id) + } + } + + pub fn local_active(&self, id: StreamId) -> Option<&StreamState> { + self.local_active.get(&id) + } + + pub fn local_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { + self.local_active.get_mut(&id) } pub fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.get_mut(&id).and_then(|s| s.local_flow_controller()) + self.get_active_mut(id).and_then(|s| s.local_flow_controller()) } pub fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.get_mut(&id).and_then(|s| s.remote_flow_controller()) + self.get_active_mut(id).and_then(|s| s.remote_flow_controller()) } - pub fn has_stream(&mut self, id: StreamId) -> bool { - self.inner.contains_key(&id) + pub fn localis_active(&mut self, id: StreamId) -> bool { + self.active.contains_key(&id) } - pub fn is_empty(&self) -> bool { - self.inner.is_empty() + pub fn active_count(&self) -> usize { + self.active.len() } - pub fn len(&self) -> usize { - self.inner.len() + pub fn reset(&mut self, id: StreamId, cause: Reason) { + self.reset.insert(id, cause); + self.active.remove(&id); } - pub fn entry(&mut self, id: StreamId) -> Entry> { - self.inner.entry(id) - } - - pub fn remove(&mut self, id: StreamId) -> Option { - self.inner.remove(&id) + pub fn get_reset(&mut self, id: StreamId) -> Option { + self.reset.get(&id).map(|r| *r) } pub fn shrink_all_local_windows(&mut self, decr: u32) { - for (_, mut s) in &mut self.inner { + for (_, mut s) in &mut self.active { if let Some(fc) = s.local_flow_controller() { fc.shrink_window(decr); } @@ -338,7 +369,7 @@ impl StreamMap { } pub fn expand_all_local_windows(&mut self, incr: u32) { - for (_, mut s) in &mut self.inner { + for (_, mut s) in &mut self.active { if let Some(fc) = s.local_flow_controller() { fc.expand_window(incr); } @@ -346,7 +377,7 @@ impl StreamMap { } pub fn shrink_all_remote_windows(&mut self, decr: u32) { - for (_, mut s) in &mut self.inner { + for (_, mut s) in &mut self.active { if let Some(fc) = s.remote_flow_controller() { fc.shrink_window(decr); } @@ -354,7 +385,7 @@ impl StreamMap { } pub fn expand_all_remote_windows(&mut self, incr: u32) { - for (_, mut s) in &mut self.inner { + for (_, mut s) in &mut self.active { if let Some(fc) = s.remote_flow_controller() { fc.expand_window(incr); } diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_recv.rs similarity index 75% rename from src/proto/stream_tracker.rs rename to src/proto/stream_recv.rs index 19c0271..11bdf63 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_recv.rs @@ -1,8 +1,10 @@ -use {ConnectionError}; +use ConnectionError; +use client::Client; use error::Reason; use error::User; use frame::{self, Frame}; use proto::*; +use server::Server; use fnv::FnvHasher; use ordermap::OrderMap; @@ -16,82 +18,93 @@ use std::marker::PhantomData; /// Tracks a connection's streams. #[derive(Debug)] -pub struct StreamTracker { +pub struct StreamRecv { inner: T, peer: PhantomData

, - active_streams: StreamMap, - // TODO reserved_streams: HashSet - reset_streams: OrderMap>, - + local: StreamMap, local_max_concurrency: Option, - remote_max_concurrency: Option, - initial_local_window_size: WindowSize, - initial_remote_window_size: WindowSize, + local_initial_window_size: WindowSize, - pending_refused_stream: Option, + remote: StreamMap, + remote_max_concurrency: Option, + remote_initial_window_size: WindowSize, + remote_pending_refuse: Option, } -impl StreamTracker +impl StreamRecv where T: Stream, T: Sink, SinkError = ConnectionError>, P: Peer { - pub fn new(initial_local_window_size: WindowSize, - initial_remote_window_size: WindowSize, - local_max_concurrency: Option, - remote_max_concurrency: Option, + pub fn new(initial_window_size: WindowSize, + max_concurrency: Option, inner: T) - -> StreamTracker + -> StreamRecv { - StreamTracker { + StreamRecv { inner, peer: PhantomData, - active_streams: StreamMap::default(), - reset_streams: OrderMap::default(), - pending_refused_stream: None, - - local_max_concurrency, - remote_max_concurrency, - initial_local_window_size, - initial_remote_window_size, + local: StreamMap::default(), + remote: StreamMap::default(), + max_concurrency, + initial_window_size, + remote_pending_refuse: None, } } + + pub fn try_open_remote(&mut self, frame: Frame) -> Result<(), ConnectionError> { + unimplemented!() + } + + pub fn try_close(&mut self, frame: Frame) -> Result<(), ConnectionError> { + unimplemented!() + } } -impl StreamTracker +impl StreamRecv where T: Sink, SinkError = ConnectionError>, P: Peer { fn send_refusal(&mut self, id: StreamId) -> Poll<(), ConnectionError> { - debug_assert!(self.pending_refused_stream.is_none()); + debug_assert!(self.remote_pending_refused.is_none()); let f = frame::Reset::new(id, Reason::RefusedStream); match self.inner.start_send(f.into())? { AsyncSink::Ready => { - self.reset_streams.insert(id, Reason::RefusedStream); + self.reset(id, Reason::RefusedStream); Ok(Async::Ready(())) } AsyncSink::NotReady(_) => { - self.pending_refused_stream = Some(id); + self.pending_refused = Some(id); Ok(Async::NotReady) } } } } -impl ControlStreams for StreamTracker { - fn streams(&self) -> &StreamMap { - &self.active_streams +impl ControlStreams for StreamRecv + where P: Peer +{ + fn local_streams(&self) -> &StreamMap { + &self.local } - fn streams_mut(&mut self) -> &mut StreamMap { - &mut self.active_streams + fn local_streams_mut(&mut self) -> &mut StreamMap { + &mut self.local } - fn stream_is_reset(&self, id: StreamId) -> Option { - self.reset_streams.get(&id).map(|r| *r) + fn remote_streams(&self) -> &StreamMap { + &self.remote + } + + fn remote_streams_mut(&mut self) -> &mut StreamMap { + &mut self.remote + } + + fn is_valid_local_id(id: StreamId) -> bool { + P::is_valid_local_stream_id(id) } } @@ -114,23 +127,21 @@ impl ControlStreams for StreamTracker { /// > exceed the new value or allow streams to complete. /// /// This module does NOT close streams when the setting changes. -impl ApplySettings for StreamTracker +impl ApplySettings for StreamRecv where T: ApplySettings { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.local_max_concurrency = set.max_concurrent_streams(); - self.initial_local_window_size = set.initial_window_size(); + self.max_concurrency = set.max_concurrent_streams(); + self.initial_window_size = set.initial_window_size(); self.inner.apply_local_settings(set) } fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.remote_max_concurrency = set.max_concurrent_streams(); - self.initial_remote_window_size = set.initial_window_size(); self.inner.apply_remote_settings(set) } } -impl ControlPing for StreamTracker +impl ControlPing for StreamRecv where T: ControlPing { fn start_ping(&mut self, body: PingPayload) -> StartSend { @@ -142,7 +153,7 @@ impl ControlPing for StreamTracker } } -impl Stream for StreamTracker +impl Stream for StreamRecv where T: Stream, T: Sink, SinkError = ConnectionError>, P: Peer, @@ -155,7 +166,7 @@ impl Stream for StreamTracker // Since there's only one slot for pending refused streams, it must be cleared // before polling a frame from the transport. - if let Some(id) = self.pending_refused_stream.take() { + if let Some(id) = self.pending_refused.take() { try_ready!(self.send_refusal(id)); } @@ -165,17 +176,24 @@ impl Stream for StreamTracker let id = v.stream_id(); let eos = v.is_end_stream(); - if self.reset_streams.contains_key(&id) { + if self.get_reset(id).is_some() { + // TODO send the remote errors when it sends us frames on reset + // streams. continue; } + if let Some(mut s) = self.get_active_mut(id) { + let created = s.recv_headers(eos, self.initial_window_size)?; + assert!(!created); + return Ok(Async::Ready(Some(Headers(v)))); + } + // Ensure that receiving this frame will not violate the local max // stream concurrency setting. Ensure that the stream is refused // before processing additional frames. - if let Some(max) = self.local_max_concurrency { + if let Some(max) = self.max_concurrency { let max = max as usize; - if !self.active_streams.has_stream(id) - && self.active_streams.len() >= max - 1 { + if !self.local.is_active(id) && self.local.active_count() >= max - 1 { // This frame would violate our local max concurrency, so reject // the stream. try_ready!(self.send_refusal(id)); @@ -191,7 +209,7 @@ impl Stream for StreamTracker .or_insert_with(|| StreamState::default()); let initialized = - stream.recv_headers::

(eos, self.initial_local_window_size)?; + stream.recv_headers(eos, self.initial_window_size)?; if initialized { if !P::is_valid_remote_stream_id(id) { @@ -213,7 +231,9 @@ impl Stream for StreamTracker Some(Data(v)) => { let id = v.stream_id(); - if self.reset_streams.contains_key(&id) { + if self.get_reset(id).is_some() { + // TODO send the remote errors when it sends us frames on reset + // streams. continue; } @@ -227,28 +247,24 @@ impl Stream for StreamTracker }; if is_closed { - self.active_streams.remove(id); - self.reset_streams.insert(id, Reason::NoError); + self.reset(id, Reason::NoError); } return Ok(Async::Ready(Some(Data(v)))); } Some(Reset(v)) => { - let id = v.stream_id(); - // Set or update the reset reason. - self.reset_streams.insert(id, v.reason()); - - if self.active_streams.remove(id).is_some() { - return Ok(Async::Ready(Some(Reset(v)))); - } + self.reset(v.stream_id(), v.reason()); + return Ok(Async::Ready(Some(Reset(v)))); } Some(f) => { let id = f.stream_id(); - if self.reset_streams.contains_key(&id) { + if self.get_reset(id).is_some() { + // TODO send the remote errors when it sends us frames on reset + // streams. continue; } @@ -263,14 +279,14 @@ impl Stream for StreamTracker } } -impl Sink for StreamTracker +impl Sink for StreamRecv where T: Sink, SinkError = ConnectionError>, P: Peer, { type SinkItem = T::SinkItem; type SinkError = T::SinkError; - fn start_send(&mut self, item: T::SinkItem) -> StartSend { + fn start_send(&mut self, frame: T::SinkItem) -> StartSend { use frame::Frame::*; // Must be enforced through higher levels. @@ -278,13 +294,13 @@ impl Sink for StreamTracker // The local must complete refusing the remote stream before sending any other // frames. - if let Some(id) = self.pending_refused_stream.take() { + if let Some(id) = self.pending_refused.take() { if self.send_refusal(id)?.is_not_ready() { return Ok(AsyncSink::NotReady(item)); } } - match item { + match frame { Headers(v) => { let id = v.stream_id(); let eos = v.is_end_stream(); @@ -366,7 +382,7 @@ impl Sink for StreamTracker } fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - if let Some(id) = self.pending_refused_stream.take() { + if let Some(id) = self.pending_refused.take() { try_ready!(self.send_refusal(id)); } @@ -375,14 +391,14 @@ impl Sink for StreamTracker } -impl ReadySink for StreamTracker +impl ReadySink for StreamRecv where T: Stream, T: Sink, SinkError = ConnectionError>, T: ReadySink, P: Peer, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - if let Some(id) = self.pending_refused_stream.take() { + if let Some(id) = self.pending_refused.take() { try_ready!(self.send_refusal(id)); } diff --git a/src/proto/stream_send.rs b/src/proto/stream_send.rs new file mode 100644 index 0000000..9bde351 --- /dev/null +++ b/src/proto/stream_send.rs @@ -0,0 +1,217 @@ +use {ConnectionError}; +use error::Reason; +use error::User; +use frame::{self, Frame}; +use proto::*; + +use fnv::FnvHasher; +use ordermap::OrderMap; +use std::hash::BuildHasherDefault; +use std::marker::PhantomData; + +// TODO track "last stream id" for GOAWAY. +// TODO track/provide "next" stream id. +// TODO reset_streams needs to be bounded. +// TODO track reserved streams (PUSH_PROMISE). + +/// Tracks a connection's streams. +#[derive(Debug)] +pub struct StreamSend { + inner: T, + peer: PhantomData

, + max_concurrency: Option, + initial_window_size: WindowSize, +} + +impl StreamSend + where T: Stream, + T: Sink, SinkError = ConnectionError>, + P: Peer +{ + pub fn new(initial_window_size: WindowSize, + max_concurrency: Option, + inner: T) + -> StreamSend + { + StreamSend { + inner, + peer: PhantomData, + max_concurrency, + initial_window_size, + } + } + + pub fn try_open_local(&mut self, frame: Frame) -> Result<(), ConnectionError> { + unimplemented!() + } + + pub fn try_close(&mut self, frame: Frame) -> Result<(), ConnectionError> { + unimplemented!() + } +} + + +/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. +/// +/// > Indicates the maximum number of concurrent streams that the senderg will allow. This +/// > limit is directional: it applies to the number of streams that the sender permits +/// > the receiver to create. Initially, there is no limit to this value. It is +/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit +/// > parallelism. +/// > +/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by +/// > endpoints. A zero value does prevent the creation of new streams; however, this can +/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only +/// > set a zero value for short durations; if a server does not wish to accept requests, +/// > closing the connection is more appropriate. +/// +/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a +/// > value that is below the current number of open streams can either close streams that +/// > exceed the new value or allow streams to complete. +/// +/// This module does NOT close streams when the setting changes. +impl ApplySettings for StreamSend + where T: ApplySettings +{ + fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.inner.apply_local_settings(set) + } + + fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { + self.max_concurrency = set.max_concurrent_streams(); + self.initial_window_size = set.initial_window_size(); + self.inner.apply_remote_settings(set) + } +} + +impl ControlPing for StreamSend + where T: ControlPing +{ + fn start_ping(&mut self, body: PingPayload) -> StartSend { + self.inner.start_ping(body) + } + + fn take_pong(&mut self) -> Option { + self.inner.take_pong() + } +} + +impl Stream for StreamSend + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ControlStreams, + P: Peer, +{ + type Item = T::Item; + type Error = T::Error; + + fn poll(&mut self) -> Poll, T::Error> { + self.inner.poll() + } +} + +impl Sink for StreamSend + where T: Sink, SinkError = ConnectionError>, + T: ControlStreams, + P: Peer, +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: T::SinkItem) -> StartSend { + use frame::Frame::*; + + // Must be enforced through higher levels. + if let Some(rst) = self.inner.get_reset(item.stream_id()) { + return Err(User::StreamReset(rst).into()); + } + + match item { + Headers(v) => { + let id = v.stream_id(); + let eos = v.is_end_stream(); + + // Transition the stream state, creating a new entry if needed + // + // TODO: Response can send multiple headers frames before body (1xx + // responses). + // + // ACTUALLY(ver), maybe not? + // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 + + // Ensure that sending this frame would not violate the remote's max + // stream concurrency setting. + if let Some(max) = self.max_concurrency { + let max = max as usize; + let streams = self.inner.streams(); + if !streams.is_active(id) && streams.active_count() >= max - 1 { + return Err(User::MaxConcurrencyExceeded.into()) + } + } + + let is_closed = { + let stream = self.active_streams.entry(id) + .or_insert_with(|| StreamState::default()); + + let initialized = + stream.send_headers::

(eos, self.initial_window_size)?; + + if initialized { + // TODO: Ensure available capacity for a new stream + // This won't be as simple as self.streams.len() as closed + // connections should not be factored. + if !P::is_valid_local_stream_id(id) { + // TODO: clear state + return Err(User::InvalidStreamId.into()); + } + } + + stream.is_closed() + }; + + if is_closed { + self.active_streams.remove(id); + self.reset_streams.insert(id, Reason::NoError); + } + + self.inner.start_send(Headers(v)) + } + + Data(v) => { + match self.active_streams.get_mut(v.stream_id()) { + None => return Err(User::InactiveStreamId.into()), + Some(stream) => { + stream.send_data(v.is_end_stream())?; + self.inner.start_send(Data(v)) + } + + } + } + + Reset(v) => { + let id = v.stream_id(); + self.active_streams.remove(id); + self.reset_streams.insert(id, v.reason()); + self.inner.start_send(Reset(v)) + } + + frame => self.inner.start_send(frame), + } + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + self.inner.poll_complete() + } +} + +impl ReadySink for StreamSend + where T: Stream, + T: Sink, SinkError = ConnectionError>, + T: ControlStreams, + T: ReadySink, + P: Peer, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } +} diff --git a/src/server.rs b/src/server.rs index af8bbc2..f1b0331 100644 --- a/src/server.rs +++ b/src/server.rs @@ -111,13 +111,17 @@ impl Peer for Server { type Poll = http::request::Head; fn is_valid_local_stream_id(_id: StreamId) -> bool { - false + id.is_server_initiated() } fn is_valid_remote_stream_id(id: StreamId) -> bool { id.is_client_initiated() } + fn can_create_local_stream() -> bool { + false + } + fn convert_send_message( id: StreamId, headers: Self::Send,