diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index d07b075..d3c3b05 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -47,21 +47,37 @@ impl FlowControl } } -impl FlowControl { - #[inline] - fn claim_local_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { - self.local_flow_controller.claim_window(len) - .map_err(|_| error::Reason::FlowControlError.into()) - } - - #[inline] - fn claim_remote_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> { - self.remote_flow_controller.claim_window(len) - .map_err(|_| error::User::FlowControlViolation.into()) - } -} - impl FlowControl { + fn claim_local_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { + if id.is_zero() { + return self.local_flow_controller.claim_window(len) + .map_err(|_| error::Reason::FlowControlError.into()); + } + + if let Some(mut stream) = self.streams_mut().get_mut(&id) { + return stream.claim_local_window(len) + .map_err(|_| error::Reason::FlowControlError.into()); + } + + // Ignore updates for non-existent streams. + Ok(()) + } + + fn claim_remote_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { + if id.is_zero() { + return self.local_flow_controller.claim_window(len) + .map_err(|_| error::Reason::FlowControlError.into()); + } + + if let Some(mut stream) = self.streams_mut().get_mut(&id) { + return stream.claim_remote_window(len) + .map_err(|_| error::Reason::FlowControlError.into()); + } + + // Ignore updates for non-existent streams. + Ok(()) + } + /// Handles a window update received from the remote, indicating that the local may /// send `incr` additional bytes. /// @@ -75,7 +91,7 @@ impl FlowControl { self.remote_flow_controller.grow_window(incr); true } else if let Some(mut s) = self.streams_mut().get_mut(&id) { - s.grow_send_window(incr); + s.grow_remote_window(incr); true } else { false @@ -137,10 +153,10 @@ impl ConnectionTransporter for FlowControl let mut streams = self.streams_mut(); if new_window_size < old_window_size { let decr = old_window_size - new_window_size; - streams.shrink_local_window(decr); + streams.shrink_all_local_windows(decr); } else { let incr = new_window_size - old_window_size; - streams.grow_local_window(incr); + streams.grow_all_local_windows(incr); } } @@ -161,10 +177,10 @@ impl ConnectionTransporter for FlowControl let mut streams = self.streams_mut(); if new_window_size < old_window_size { let decr = old_window_size - new_window_size; - streams.shrink_remote_window(decr); + streams.shrink_all_remote_windows(decr); } else { let incr = new_window_size - old_window_size; - streams.grow_remote_window(incr); + streams.grow_all_remote_windows(incr); } } @@ -201,7 +217,7 @@ impl Stream for FlowControl } Some(Data(v)) => { - self.claim_local_window(v.len())?; + self.claim_local_window(&v.stream_id(), v.len())?; return Ok(Async::Ready(Some(Data(v)))); } @@ -223,7 +239,7 @@ impl Sink for FlowControl use frame::Frame::*; if let &Data(ref v) = &item { - self.claim_remote_window(v.len())?; + self.claim_remote_window(&v.stream_id(), v.len())?; } self.inner.start_send(item) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 6d2ee4d..6834327 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -11,7 +11,7 @@ mod stream_tracker; pub use self::connection::Connection; pub use self::flow_control::FlowControl; -pub use self::flow_controller::FlowController; +pub use self::flow_controller::{FlowController, WindowUnderflow}; pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; pub use self::ping_pong::PingPong; @@ -74,27 +74,27 @@ impl StreamMap { self.inner.entry(id) } - fn shrink_local_window(&mut self, decr: u32) { + fn shrink_all_local_windows(&mut self, decr: u32) { for (_, mut s) in &mut self.inner { - s.shrink_recv_window(decr) + s.shrink_local_window(decr) } } - fn grow_local_window(&mut self, incr: u32) { + fn grow_all_local_windows(&mut self, incr: u32) { for (_, mut s) in &mut self.inner { - s.grow_recv_window(incr) + s.grow_local_window(incr) } } - fn shrink_remote_window(&mut self, decr: u32) { + fn shrink_all_remote_windows(&mut self, decr: u32) { for (_, mut s) in &mut self.inner { - s.shrink_send_window(decr) + s.shrink_remote_window(decr) } } - fn grow_remote_window(&mut self, incr: u32) { + fn grow_all_remote_windows(&mut self, incr: u32) { for (_, mut s) in &mut self.inner { - s.grow_send_window(incr) + s.grow_remote_window(incr) } } } diff --git a/src/proto/state.rs b/src/proto/state.rs index ccb8362..6ee1e19 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -1,8 +1,8 @@ -use {FrameSize, Peer}; +use Peer; use error::ConnectionError; use error::Reason::*; use error::User::*; -use proto::FlowController; +use proto::{FlowController, WindowSize, WindowUnderflow}; /// Represents the state of an H2 stream /// @@ -60,111 +60,13 @@ pub enum StreamState { } impl StreamState { - /// Updates the local flow controller so that the remote may send `incr` more bytes. - /// - /// Returns the amount of capacity created, accounting for window size changes. The - /// caller should send the the returned window size increment to the remote. - /// - /// If the remote is closed, None is returned. - pub fn grow_send_window(&mut self, incr: u32) { - use self::StreamState::*; - use self::PeerState::*; - - if incr == 0 { - return; - } - - match self { - &mut Open { remote: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => fc.grow_window(incr), - _ => {}, - } - } - - pub fn shrink_send_window(&mut self, decr: u32) { - use self::StreamState::*; - use self::PeerState::*; - - if decr == 0 { - return; - } - - match self { - &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => fc.shrink_window(decr), - _ => {}, - } - } - - - /// Consumes newly-advertised capacity to inform the local endpoint it may send more - /// data. - pub fn take_send_window_update(&mut self) -> Option { - use self::StreamState::*; - use self::PeerState::*; - - match self { - &mut Open { remote: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => fc.take_window_update(), - _ => None, - } - } - - /// Updates the remote flow controller so that the remote may receive `incr` - /// additional bytes. - /// - /// Returns the amount of capacity created, accounting for window size changes. The - /// caller should send the the returned window size increment to the remote. - pub fn grow_recv_window(&mut self, incr: u32) { - use self::StreamState::*; - use self::PeerState::*; - - if incr == 0 { - return; - } - - match self { - &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedRemote(Data(ref mut fc)) => fc.grow_window(incr), - _ => {}, - } - } - - pub fn shrink_recv_window(&mut self, decr: u32) { - use self::StreamState::*; - use self::PeerState::*; - - if decr == 0 { - return; - } - - match self { - &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedRemote(Data(ref mut fc)) => fc.shrink_window(decr), - _ => {}, - } - } - - /// Consumes newly-advertised capacity to inform the local endpoint it may send more - /// data. - pub fn take_recv_window_update(&mut self) -> Option { - use self::StreamState::*; - use self::PeerState::*; - - match self { - &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedRemote(Data(ref mut fc)) => fc.take_window_update(), - _ => None, - } - } - /// Transition the state to represent headers being received. /// /// Returns true if this state transition results in iniitializing the /// stream id. `Err` is returned if this is an invalid state transition. pub fn recv_headers(&mut self, eos: bool, - initial_recv_window_size: u32) + initial_recv_window_size: WindowSize) -> Result { use self::StreamState::*; @@ -207,22 +109,20 @@ impl StreamState { } } - pub fn recv_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> { + pub fn recv_data(&mut self, eos: bool) -> Result<(), ConnectionError> { use self::StreamState::*; match *self { - Open { local, mut remote } => { + Open { local, remote } => { try!(remote.check_is_data(ProtocolError.into())); - try!(remote.claim_window_size(len, FlowControlError.into())); if eos { *self = HalfClosedRemote(local); } Ok(()) } - HalfClosedLocal(mut remote) => { + HalfClosedLocal(remote) => { try!(remote.check_is_data(ProtocolError.into())); - try!(remote.claim_window_size(len, FlowControlError.into())); if eos { *self = Closed; } @@ -243,7 +143,7 @@ impl StreamState { /// id. `Err` is returned if this is an invalid state transition. pub fn send_headers(&mut self, eos: bool, - initial_window_size: u32) + initial_window_size: WindowSize) -> Result { use self::StreamState::*; @@ -294,33 +194,156 @@ impl StreamState { } } - pub fn send_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> { + pub fn send_data(&mut self, eos: bool) -> Result<(), ConnectionError> { use self::StreamState::*; match *self { - Open { mut local, remote } => { + Open { local, remote } => { try!(local.check_is_data(UnexpectedFrameType.into())); - try!(local.claim_window_size(len, FlowControlViolation.into())); if eos { *self = HalfClosedLocal(remote); } Ok(()) } - HalfClosedRemote(mut local) => { + HalfClosedRemote(local) => { try!(local.check_is_data(UnexpectedFrameType.into())); - try!(local.claim_window_size(len, FlowControlViolation.into())); if eos { *self = Closed; } Ok(()) } - Closed | HalfClosedLocal(..) => { + Idle | Closed | HalfClosedLocal(..) => { Err(UnexpectedFrameType.into()) } + } + } - _ => unimplemented!(), + /// Updates the local flow controller so that the remote may send `incr` more bytes. + /// + /// Returns the amount of capacity created, accounting for window size changes. The + /// caller should send the the returned window size increment to the remote. + /// + /// If the remote is closed, None is returned. + pub fn grow_remote_window(&mut self, incr: WindowSize) { + use self::StreamState::*; + use self::PeerState::*; + + if incr == 0 { + return; + } + + match self { + &mut Open { remote: Data(ref mut fc), .. } | + &mut HalfClosedLocal(Data(ref mut fc)) => fc.grow_window(incr), + _ => {}, + } + } + + pub fn claim_remote_window(&mut self, decr: WindowSize) -> Result<(), WindowUnderflow> { + use self::StreamState::*; + use self::PeerState::*; + + if decr == 0 { + return Ok(()); + } + + match self { + &mut Open { remote: Data(ref mut fc), .. } | + &mut HalfClosedLocal(Data(ref mut fc)) => fc.claim_window(decr), + _ => Ok(()), + } + } + + pub fn shrink_remote_window(&mut self, decr: WindowSize) { + use self::StreamState::*; + use self::PeerState::*; + + if decr == 0 { + return; + } + + match self { + &mut Open { local: Data(ref mut fc), .. } | + &mut HalfClosedLocal(Data(ref mut fc)) => fc.shrink_window(decr), + _ => {}, + } + } + + /// Consumes newly-advertised capacity to inform the local endpoint it may send more + /// data. + pub fn take_remote_window_update(&mut self) -> Option { + use self::StreamState::*; + use self::PeerState::*; + + match self { + &mut Open { remote: Data(ref mut fc), .. } | + &mut HalfClosedLocal(Data(ref mut fc)) => fc.take_window_update(), + _ => None, + } + } + + /// Updates the remote flow controller so that the remote may receive `incr` + /// additional bytes. + /// + /// Returns the amount of capacity created, accounting for window size changes. The + /// caller should send the the returned window size increment to the remote. + pub fn grow_local_window(&mut self, incr: WindowSize) { + use self::StreamState::*; + use self::PeerState::*; + + if incr == 0 { + return; + } + + match self { + &mut Open { local: Data(ref mut fc), .. } | + &mut HalfClosedRemote(Data(ref mut fc)) => fc.grow_window(incr), + _ => {}, + } + } + + pub fn claim_local_window(&mut self, decr: WindowSize) -> Result<(), WindowUnderflow> { + use self::StreamState::*; + use self::PeerState::*; + + if decr == 0 { + return Ok(()); + } + + match self { + &mut Open { local: Data(ref mut fc), .. } | + &mut HalfClosedRemote(Data(ref mut fc)) => fc.claim_window(decr), + _ => Ok(()), + } + } + + pub fn shrink_local_window(&mut self, decr: WindowSize) { + use self::StreamState::*; + use self::PeerState::*; + + if decr == 0 { + return; + } + + match self { + &mut Open { local: Data(ref mut fc), .. } | + &mut HalfClosedRemote(Data(ref mut fc)) => fc.shrink_window(decr), + _ => {}, + } + } + + /// Consumes newly-advertised capacity to inform the local endpoint it may send more + /// data. + pub fn take_local_window_update(&mut self) -> Option { + use self::StreamState::*; + use self::PeerState::*; + + match self { + &mut Open { local: Data(ref mut fc), .. } | + &mut HalfClosedRemote(Data(ref mut fc)) => fc.take_window_update(), + _ => None, } } } @@ -356,13 +379,4 @@ impl PeerState { _ => Err(err), } } - - #[inline] - fn claim_window_size(&mut self, sz: FrameSize, err: ConnectionError) -> Result<(), ConnectionError> { - use self::PeerState::*; - match self { - &mut Data(ref mut fc) => fc.claim_window(sz).map_err(|_| err), - _ => Err(err), - } - } } diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index 1cfe35a..2ffccbd 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -1,5 +1,6 @@ use ConnectionError; -use error::User::*; +use error::Reason::ProtocolError; +use error::User::InvalidStreamId; use frame::{self, Frame}; use proto::*; @@ -119,8 +120,15 @@ impl Stream for StreamTracker Ok(Async::Ready(Some(Headers(v)))) } - f => Ok(Async::Ready(f)) + Some(Data(v)) => { + match self.streams.get_mut(&v.stream_id()) { + None => return Err(ProtocolError.into()), + Some(state) => state.recv_data(v.is_end_stream())?, + } + Ok(Async::Ready(Some(Data(v)))) + } + f => Ok(Async::Ready(f)) } } } @@ -136,31 +144,42 @@ impl Sink for StreamTracker fn start_send(&mut self, item: T::SinkItem) -> StartSend { use frame::Frame::*; - if let &Headers(ref v) = &item { - let id = v.stream_id(); - let eos = v.is_end_stream(); + match &item { + &Headers(ref v) => { + let id = v.stream_id(); + let eos = v.is_end_stream(); - // Transition the stream state, creating a new entry if needed - // - // TODO: Response can send multiple headers frames before body (1xx - // responses). - // - // ACTUALLY(ver), maybe not? - // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 - let initialized = self.streams - .entry(id) - .or_insert_with(|| StreamState::default()) - .send_headers::

(eos, self.initial_remote_window_size)?; + // Transition the stream state, creating a new entry if needed + // + // TODO: Response can send multiple headers frames before body (1xx + // responses). + // + // ACTUALLY(ver), maybe not? + // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 + let initialized = self.streams + .entry(id) + .or_insert_with(|| StreamState::default()) + .send_headers::

(eos, self.initial_remote_window_size)?; - if initialized { - // TODO: Ensure available capacity for a new stream - // This won't be as simple as self.streams.len() as closed - // connections should not be factored. - if !P::is_valid_local_stream_id(id) { - // TODO: clear state - return Err(InvalidStreamId.into()); + if initialized { + // TODO: Ensure available capacity for a new stream + // This won't be as simple as self.streams.len() as closed + // connections should not be factored. + if !P::is_valid_local_stream_id(id) { + // TODO: clear state + return Err(InvalidStreamId.into()); + } } } + + &Data(ref v) => { + match self.streams.get_mut(&v.stream_id()) { + None => return Err(ProtocolError.into()), + Some(state) => state.send_data(v.is_end_stream())?, + } + } + + _ => {} } self.inner.start_send(item)