From 41ffd1d44f24f703fd902b75d1cc9fc2b1b63312 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 12 Jul 2017 21:04:58 +0000 Subject: [PATCH] closer to flow control --- src/error.rs | 7 ++ src/frame/data.rs | 26 ++-- src/frame/mod.rs | 5 - src/frame/settings.rs | 6 + src/frame/window_update.rs | 6 +- src/lib.rs | 2 + src/proto/connection.rs | 173 ++++++++++++++------------- src/proto/flow_control.rs | 46 +++++++- src/proto/mod.rs | 2 +- src/proto/settings.rs | 8 ++ src/proto/state.rs | 235 +++++++++++++++++++++++++------------ 11 files changed, 337 insertions(+), 179 deletions(-) diff --git a/src/error.rs b/src/error.rs index 606c3d9..5a84bc2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,8 @@ use std::{error, fmt, io}; /// The error type for HTTP/2 operations +/// +/// XXX does this sufficiently destinguish stream-level errors from connection-level errors? #[derive(Debug)] pub enum ConnectionError { /// An error caused by an action taken by the remote peer. @@ -56,6 +58,10 @@ pub enum User { /// The stream is not currently expecting a frame of this type. UnexpectedFrameType, + /// The connection or stream does not have a sufficient flow control window to + /// transmit a Data frame to the remote. + FlowControlViolation, + // TODO: reserve additional variants } @@ -93,6 +99,7 @@ macro_rules! user_desc { InvalidStreamId => concat!($prefix, "invalid stream ID"), InactiveStreamId => concat!($prefix, "inactive stream ID"), UnexpectedFrameType => concat!($prefix, "unexpected frame type"), + FlowControlViolation => concat!($prefix, "flow control violation"), } }); } diff --git a/src/frame/data.rs b/src/frame/data.rs index 6e3eea3..23d5dcf 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -4,6 +4,7 @@ use bytes::{BufMut, Bytes, Buf}; #[derive(Debug)] pub struct Data { stream_id: StreamId, + data_len: usize, data: T, flags: DataFlag, pad_len: Option, @@ -26,9 +27,9 @@ impl Data { } else { None }; - Ok(Data { stream_id: head.stream_id(), + data_len: payload.len(), data: payload, flags: flags, pad_len: pad_len, @@ -37,15 +38,6 @@ impl Data { } impl Data { - pub fn new(stream_id: StreamId, data: T) -> Self { - Data { - stream_id: stream_id, - data: data, - flags: DataFlag::default(), - pad_len: None, - } - } - pub fn stream_id(&self) -> StreamId { self.stream_id } @@ -62,14 +54,24 @@ impl Data { Head::new(Kind::Data, self.flags.into(), self.stream_id) } + pub fn len(&self) -> usize { + self.data_len + } + pub fn into_payload(self) -> T { self.data } } impl Data { - pub fn len(&self) -> usize { - self.data.remaining() + pub fn from_buf(stream_id: StreamId, data: T) -> Self { + Data { + stream_id: stream_id, + data_len: data.remaining(), + data: data, + flags: DataFlag::default(), + pad_len: None, + } } pub fn encode_chunk(&mut self, dst: &mut U) { diff --git a/src/frame/mod.rs b/src/frame/mod.rs index e8ecdcd..24590e2 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -113,11 +113,6 @@ pub enum Error { Hpack(hpack::DecoderError), } -// ===== impl Frame ====== - -impl Frame { -} - // ===== impl Error ===== impl From for ConnectionError { diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 6a43013..52de210 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -18,6 +18,12 @@ pub struct SettingSet { max_header_list_size: Option, } +impl SettingSet { + pub fn initial_window_size(&self) -> u32 { + self.initial_window_size.unwrap_or(65_535) + } +} + /// An enum that lists all valid settings that can be sent in a SETTINGS /// frame. /// diff --git a/src/frame/window_update.rs b/src/frame/window_update.rs index 2b4cebe..b3443dd 100644 --- a/src/frame/window_update.rs +++ b/src/frame/window_update.rs @@ -5,7 +5,7 @@ use frame::{self, Head, Kind, Error}; const SIZE_INCREMENT_MASK: u32 = 1 << 31; -#[derive(Debug)] +#[derive(Copy, Clone, Debug)] pub struct WindowUpdate { stream_id: StreamId, size_increment: u32, @@ -52,8 +52,8 @@ impl WindowUpdate { } } -impl From for frame::Frame { - fn from(src: WindowUpdate) -> frame::Frame { +impl From for frame::Frame { + fn from(src: WindowUpdate) -> Self { frame::Frame::WindowUpdate(src) } } diff --git a/src/lib.rs b/src/lib.rs index bf05683..615f1c5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,6 +51,8 @@ pub enum Frame { Data { id: StreamId, data: B, + /// TODO figure out how to make this a requirement on `B` + data_len: usize, end_of_stream: bool, }, Trailers { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index f49b931..d5eb996 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -2,7 +2,7 @@ use Frame; use client::Client; use error::{self, ConnectionError}; use frame::{self, StreamId}; -use proto::{self, Peer, ReadySink, State, PeerState, WindowUpdate, FlowController}; +use proto::{self, Peer, ReadySink, State, FlowController}; use server::Server; use tokio_io::{AsyncRead, AsyncWrite}; @@ -15,12 +15,9 @@ use futures::*; use ordermap::OrderMap; use fnv::FnvHasher; -use std::collections::VecDeque; use std::hash::BuildHasherDefault; use std::marker::PhantomData; -// TODO get window size from `inner`. - /// An H2 connection #[derive(Debug)] pub struct Connection { @@ -30,95 +27,89 @@ pub struct Connection { /// Tracks connection-level flow control. local_flow_controller: FlowController, - initial_local_window_size: u32, - pending_local_window_updates: VecDeque, - remote_flow_controller: FlowController, - initial_remote_window_size: u32, - pending_remote_window_updates: VecDeque, - blocked_remote_window_update: Option + + + pending_local_window_update: Option, + blocked_remote_window_update: Option, } type StreamMap = OrderMap>; -pub fn new(transport: proto::Inner, - initial_local_window_size: u32, - initial_remote_window_size: u32) +pub fn new(transport: proto::Inner) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, B: IntoBuf, { + let local_window_size = transport.local_settings().initial_window_size(); + let remote_window_size = transport.remote_settings().initial_window_size(); Connection { inner: transport, streams: StreamMap::default(), peer: PhantomData, - local_flow_controller: FlowController::new(initial_local_window_size), - initial_local_window_size, - pending_local_window_updates: VecDeque::default(), + local_flow_controller: FlowController::new(local_window_size), + remote_flow_controller: FlowController::new(remote_window_size), - remote_flow_controller: FlowController::new(initial_remote_window_size), - initial_remote_window_size, - pending_remote_window_updates: VecDeque::default(), + pending_local_window_update: None, blocked_remote_window_update: None, } } impl Connection { + pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { + if id.is_zero() { + return match self.local_flow_controller.take_window_update() { + Some(incr) => Ok(Async::Ready(incr)), + None => { + self.blocked_remote_window_update = Some(task::current()); + Ok(Async::NotReady) + } + }; + } + + match self.streams.get_mut(&id).and_then(|mut s| s.take_remote_window_update()) { + Some(incr) => Ok(Async::Ready(incr)), + None => { + self.blocked_remote_window_update = Some(task::current()); + Ok(Async::NotReady) + } + } + } + /// Publishes local stream window updates to the remote. /// /// Connection window updates (StreamId=0) and stream window must be published /// distinctly. - pub fn increment_local_window(&mut self, up: WindowUpdate) { - let added = match &up { - &WindowUpdate::Connection { increment } => { - if increment == 0 { - false - } else { - self.local_flow_controller.increment(increment); - true - } - } - &WindowUpdate::Stream { id, increment } => { - if increment == 0 { - false - } else { - match self.streams.get_mut(&id) { - Some(&mut State::Open { local: PeerState::Data(ref mut fc), .. }) | - Some(&mut State::HalfClosedRemote(PeerState::Data(ref mut fc))) => { - fc.increment(increment); - true - } - _ => false, - } - } - } + pub fn init_send_window_update(&mut self, id: StreamId, incr: u32) { + assert!(self.pending_local_window_update.is_none()); + + let added = if id.is_zero() { + self.remote_flow_controller.add_to_window(incr); + self.remote_flow_controller.take_window_update() + } else { + self.streams.get_mut(&id).and_then(|mut s| s.send_window_update(incr)) }; - if added { - self.pending_local_window_updates.push_back(up); + if let Some(added) = added { + self.pending_local_window_update = Some(frame::WindowUpdate::new(id, added)); } } /// Advertises the remote's stream window updates. /// - /// Connection window updates (StreamId=0) and stream window updates are advertised + /// Connection window updates (id=0) and stream window updates are advertised /// distinctly. - fn increment_remote_window(&mut self, id: StreamId, incr: u32) { + fn recv_window_update(&mut self, id: StreamId, incr: u32) { if id.is_zero() { - self.remote_flow_controller.increment(incr); - } else { - match self.streams.get_mut(&id) { - Some(&mut State::Open { remote: PeerState::Data(ref mut fc), .. }) | - Some(&mut State::HalfClosedLocal(PeerState::Data(ref mut fc))) => { - fc.increment(incr); - } - _ => {} - } + return self.remote_flow_controller.add_to_window(incr); + } + + if let Some(mut s) = self.streams.get_mut(&id) { + s.recv_window_update(incr); } - unimplemented!() } } @@ -127,16 +118,28 @@ impl Connection P: Peer, B: IntoBuf, { + fn poll_send_window_update(&mut self) -> Poll<(), ConnectionError> { + if let Some(f) = self.pending_local_window_update.take() { + if self.inner.start_send(f.into())?.is_not_ready() { + self.pending_local_window_update = Some(f); + return Ok(Async::NotReady); + } + } + Ok(Async::Ready(())) + } + pub fn send_data(self, id: StreamId, data: B, + data_len: usize, end_of_stream: bool) -> sink::Send { self.send(Frame::Data { - id: id, - data: data, - end_of_stream: end_of_stream, + id, + data_len, + data, + end_of_stream, }) } } @@ -201,15 +204,13 @@ impl Stream for Connection }; trace!("received; frame={:?}", frame); - let frame = match frame { Some(Headers(v)) => { // TODO: Update stream state let stream_id = v.stream_id(); let end_of_stream = v.is_end_stream(); - // TODO load window size from settings. - let init_window_size = 65_535; + let init_window_size = self.inner.local_settings().initial_window_size(); let stream_initialized = try!(self.streams.entry(stream_id) .or_insert(State::default()) @@ -231,26 +232,27 @@ impl Stream for Connection end_of_stream: end_of_stream, } } - Some(Data(v)) => { - // TODO: Validate frame + Some(Data(v)) => { let stream_id = v.stream_id(); let end_of_stream = v.is_end_stream(); match self.streams.get_mut(&stream_id) { None => return Err(error::Reason::ProtocolError.into()), - Some(state) => try!(state.recv_data(end_of_stream)), + Some(state) => try!(state.recv_data(end_of_stream, v.len())), } - Frame::Data { id: stream_id, + data_len: v.len(), data: v.into_payload(), - end_of_stream: end_of_stream, + end_of_stream, } } + Some(WindowUpdate(v)) => { - self.increment_remote_window(v.stream_id(), v.size_increment()); + self.recv_window_update(v.stream_id(), v.size_increment()); continue; } + Some(frame) => panic!("unexpected frame; frame={:?}", frame), None => return Ok(Async::Ready(None)), }; @@ -268,24 +270,31 @@ impl Sink for Connection type SinkItem = Frame; type SinkError = ConnectionError; + /// Sends a frame to the remote. fn start_send(&mut self, item: Self::SinkItem) -> StartSend { use frame::Frame::Headers; - // First ensure that the upstream can process a new item - if !try!(self.poll_ready()).is_ready() { + // First ensure that the upstream can process a new item. This ensures, for + // instance, that any pending local window updates have been sent to the remote + // before sending any other frames. + if try!(self.poll_ready()).is_not_ready() { return Ok(AsyncSink::NotReady(item)); } + assert!(self.pending_local_window_update.is_none()); match item { Frame::Headers { id, headers, end_of_stream } => { - // TODO load window size from settings. - let init_window_size = 65_535; + let init_window_size = self.inner.remote_settings().initial_window_size(); // Transition the stream state, creating a new entry if needed - // TODO: Response can send multiple headers frames before body - // (1xx responses). + // + // TODO: Response can send multiple headers frames before body (1xx + // responses). + // + // ACTUALLY(ver), maybe not? + // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 let stream_initialized = try!(self.streams.entry(id) .or_insert(State::default()) .send_headers::

(end_of_stream, init_window_size)); @@ -294,7 +303,6 @@ impl Sink for Connection // TODO: Ensure available capacity for a new stream // This won't be as simple as self.streams.len() as closed // connections should not be factored. - // if !P::is_valid_local_stream_id(id) { // TODO: clear state return Err(error::User::InvalidStreamId.into()); @@ -314,25 +322,26 @@ impl Sink for Connection Ok(AsyncSink::Ready) } - Frame::Data { id, data, end_of_stream } => { + + Frame::Data { id, data, data_len, end_of_stream } => { // The stream must be initialized at this point match self.streams.get_mut(&id) { None => return Err(error::User::InactiveStreamId.into()), - Some(state) => try!(state.send_data(end_of_stream)), + Some(state) => try!(state.send_data(end_of_stream, data_len)), } - let mut frame = frame::Data::new(id, data.into_buf()); + let mut frame = frame::Data::from_buf(id, data.into_buf()); if end_of_stream { frame.set_end_stream(); } let res = try!(self.inner.start_send(frame.into())); - + // poll_ready has already been called. assert!(res.is_ready()); - Ok(AsyncSink::Ready) } + /* Frame::Trailers { id, headers } => { unimplemented!(); @@ -352,6 +361,7 @@ impl Sink for Connection } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + try_ready!(self.poll_send_window_update()); self.inner.poll_complete() } } @@ -362,6 +372,7 @@ impl ReadySink for Connection B: IntoBuf, { fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { + try_ready!(self.poll_send_window_update()); self.inner.poll_ready() } } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 3ad4397..bc7dc13 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -1,10 +1,23 @@ #[derive(Clone, Copy, Debug)] pub struct WindowUnderflow; +pub const DEFAULT_INITIAL_WINDOW_SIZE: u32 = 65_535; + #[derive(Copy, Clone, Debug)] pub struct FlowController { + /// Amount that may be claimed. window_size: u32, + /// Amount to be removed by future increments. underflow: u32, + /// The amount that has been incremented but not yet advertised (to the application or + /// the remote). + next_window_update: u32, +} + +impl Default for FlowController { + fn default() -> Self { + Self::new(DEFAULT_INITIAL_WINDOW_SIZE) + } } impl FlowController { @@ -12,14 +25,23 @@ impl FlowController { FlowController { window_size, underflow: 0, + next_window_update: 0, } } - pub fn shrink(&mut self, sz: u32) { - self.underflow += sz; + pub fn window_size(&self) -> u32 { + self.window_size } - pub fn consume(&mut self, sz: u32) -> Result<(), WindowUnderflow> { + /// Reduce future capacity of the window. + /// + /// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. + pub fn shrink_window(&mut self, decr: u32) { + self.underflow += decr; + } + + /// Claim the provided amount from the window, if there is enough space. + pub fn claim_window(&mut self, sz: u32) -> Result<(), WindowUnderflow> { if self.window_size < sz { return Err(WindowUnderflow); } @@ -28,13 +50,27 @@ impl FlowController { Ok(()) } - pub fn increment(&mut self, sz: u32) { + /// Applies a window increment immediately. + pub fn add_to_window(&mut self, sz: u32) { if sz <= self.underflow { self.underflow -= sz; return; } - self.window_size += sz - self.underflow; + let added = sz - self.underflow; + self.window_size += added; + self.next_window_update += added; self.underflow = 0; } + + /// Obtains and clears an unadvertised window update. + pub fn take_window_update(&mut self) -> Option { + if self.next_window_update == 0 { + return None; + } + + let incr = self.next_window_update; + self.next_window_update = 0; + Some(incr) + } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index aca21f5..ecc1c30 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -93,5 +93,5 @@ pub fn from_server_handshaker(transport: Settings Settings } } + pub fn local_settings(&self) -> &frame::SettingSet { + &self.local + } + + pub fn remote_settings(&self) -> &frame::SettingSet { + &self.local + } + /// Swap the inner transport while maintaining the current state. pub fn swap_inner T2>(self, f: F) -> Settings { let inner = f(self.inner); diff --git a/src/proto/state.rs b/src/proto/state.rs index b2f93ee..c4a8a72 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -58,106 +58,178 @@ pub enum State { Closed, } -#[derive(Debug, Copy, Clone)] -pub enum PeerState { - Headers, - Data(FlowController), -} - impl State { - pub fn increment_local_window_size(&mut self, incr: u32) { + /// Updates the local flow controller with the given window size increment. + /// + /// Returns the amount of capacity created, accounting for window size changes. The + /// caller should send the the returned window size increment to the remote. + /// + /// If the remote is closed, None is returned. + pub fn send_window_update(&mut self, incr: u32) -> Option { use self::State::*; use self::PeerState::*; - *self = match *self { - Open { local: Data(mut local), remote } => { - local.increment(incr); - Open { local: Data(local), remote } - } - HalfClosedRemote(Data(mut local)) => { - local.increment(incr); - HalfClosedRemote(Data(local)) - } - s => s, + if incr == 0 { + return None; } + + match self { + &mut Open { local: Data(ref mut fc), .. } | + &mut HalfClosedRemote(Data(ref mut fc)) => { + fc.add_to_window(incr); + fc.take_window_update() + } + _ => None, + } + } + + /// Updates the remote flow controller with the given window size increment. + /// + /// Returns the amount of capacity created, accounting for window size changes. The + /// caller should send the the returned window size increment to the remote. + pub fn recv_window_update(&mut self, incr: u32) { + use self::State::*; + use self::PeerState::*; + + if incr == 0 { + return; + } + + match self { + &mut Open { remote: Data(ref mut fc), .. } | + &mut HalfClosedLocal(Data(ref mut fc)) => fc.add_to_window(incr), + _ => {}, + } + } + + pub fn take_remote_window_update(&mut self) -> Option { + use self::State::*; + use self::PeerState::*; + + match self { + &mut Open { remote: Data(ref mut fc), .. } | + &mut HalfClosedLocal(Data(ref mut fc)) => fc.take_window_update(), + _ => None, + } + } + + /// Applies an update to the remote's initial window size. + /// + /// Per RFC 7540 ยง6.9.2 + /// + /// > In addition to changing the flow-control window for streams that are not yet + /// > active, a SETTINGS frame can alter the initial flow-control window size for + /// > streams with active flow-control windows (that is, streams in the "open" or + /// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE + /// > changes, a receiver MUST adjust the size of all stream flow-control windows that + /// > it maintains by the difference between the new value and the old value. + /// > + /// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a + /// > flow-control window to become negative. A sender MUST track the negative + /// > flow-control window and MUST NOT send new flow-controlled frames until it + /// > receives WINDOW_UPDATE frames that cause the flow-control window to become + /// > positive. + pub fn update_remote_initial_window_size(&mut self, old: u32, new: u32) { + use self::State::*; + use self::PeerState::*; + + match self { + &mut Open { remote: Data(ref mut fc), .. } | + &mut HalfClosedLocal(Data(ref mut fc)) => { + if new < old { + fc.shrink_window(old - new); + } else { + fc.add_to_window(new - old); + } + } + _ => {} + } + } + + /// TODO Connection doesn't have an API for local updates yet. + pub fn update_local_initial_window_size(&mut self, _old: u32, _new: u32) { + //use self::State::*; + //use self::PeerState::*; + unimplemented!() } /// Transition the state to represent headers being received. /// /// Returns true if this state transition results in iniitializing the /// stream id. `Err` is returned if this is an invalid state transition. - pub fn recv_headers(&mut self, eos: bool, remote_window_size: u32) -> Result { + pub fn recv_headers(&mut self, + eos: bool, + remote_window_size: u32) + -> Result + { use self::State::*; use self::PeerState::*; match *self { Idle => { - *self = if eos { - HalfClosedRemote(Headers) + let local = Headers; + if eos { + *self = HalfClosedRemote(local); } else { - Open { - local: Headers, - remote: Data(FlowController::new(remote_window_size)), - } - }; - + *self = Open { local, remote: Data(FlowController::new(remote_window_size)) }; + } Ok(true) } + Open { local, remote } => { try!(remote.check_is_headers(ProtocolError.into())); - - *self = if eos { - HalfClosedRemote(local) - } else { - let remote = Data(FlowController::new(remote_window_size)); - Open { local, remote } - }; - + if !eos { + // Received non-trailers HEADERS on open remote. + return Err(ProtocolError.into()); + } + *self = HalfClosedRemote(local); Ok(false) } - HalfClosedLocal(remote) => { - try!(remote.check_is_headers(ProtocolError.into())); - *self = if eos { - Closed + HalfClosedLocal(headers) => { + try!(headers.check_is_headers(ProtocolError.into())); + if eos { + *self = Closed; } else { - HalfClosedLocal(Data(FlowController::new(remote_window_size))) + *self = HalfClosedLocal(Data(FlowController::new(remote_window_size))); }; - Ok(false) } + Closed | HalfClosedRemote(..) => { Err(ProtocolError.into()) } + _ => unimplemented!(), } } - pub fn recv_data(&mut self, eos: bool) -> Result<(), ConnectionError> { + pub fn recv_data(&mut self, eos: bool, len: usize) -> Result<(), ConnectionError> { use self::State::*; match *self { Open { local, remote } => { try!(remote.check_is_data(ProtocolError.into())); - + try!(remote.check_window_size(len, FlowControlError.into())); if eos { *self = HalfClosedRemote(local); } - Ok(()) } + HalfClosedLocal(remote) => { try!(remote.check_is_data(ProtocolError.into())); - + try!(remote.check_window_size(len, FlowControlError.into())); if eos { *self = Closed; } - Ok(()) } + Closed | HalfClosedRemote(..) => { Err(ProtocolError.into()) } + _ => unimplemented!(), } } @@ -183,6 +255,7 @@ impl State { Ok(true) } + Open { local, remote } => { try!(local.check_is_headers(UnexpectedFrameType.into())); @@ -195,6 +268,7 @@ impl State { Ok(false) } + HalfClosedRemote(local) => { try!(local.check_is_headers(UnexpectedFrameType.into())); @@ -206,67 +280,84 @@ impl State { Ok(false) } + Closed | HalfClosedLocal(..) => { Err(UnexpectedFrameType.into()) } + _ => unimplemented!(), } } - pub fn send_data(&mut self, eos: bool) -> Result<(), ConnectionError> { + pub fn send_data(&mut self, eos: bool, len: usize) -> Result<(), ConnectionError> { use self::State::*; match *self { Open { local, remote } => { try!(local.check_is_data(UnexpectedFrameType.into())); - + try!(local.check_window_size(len, FlowControlViolation.into())); if eos { *self = HalfClosedLocal(remote); } - Ok(()) } + HalfClosedRemote(local) => { try!(local.check_is_data(UnexpectedFrameType.into())); - + try!(local.check_window_size(len, FlowControlViolation.into())); if eos { *self = Closed; } - Ok(()) } + Closed | HalfClosedLocal(..) => { Err(UnexpectedFrameType.into()) } + _ => unimplemented!(), } } } -impl PeerState { - #[inline] - fn check_is_headers(&self, err: ConnectionError) -> Result<(), ConnectionError> { - use self::PeerState::*; - - match *self { - Headers => Ok(()), - _ => Err(err), - } - } - - #[inline] - fn check_is_data(&self, err: ConnectionError) -> Result<(), ConnectionError> { - use self::PeerState::*; - - match *self { - Data { .. } => Ok(()), - _ => Err(err), - } - } -} - impl Default for State { fn default() -> State { State::Idle } } + +#[derive(Debug, Copy, Clone)] +pub enum PeerState { + Headers, + /// Contains a FlowController representing the _receiver_ of this this data stream. + Data(FlowController), +} + +impl PeerState { + #[inline] + fn check_is_headers(&self, err: ConnectionError) -> Result<(), ConnectionError> { + use self::PeerState::*; + match self { + &Headers => Ok(()), + _ => Err(err), + } + } + + #[inline] + fn check_is_data(&self, err: ConnectionError) -> Result<(), ConnectionError> { + use self::PeerState::*; + match self { + &Data(_) => Ok(()), + _ => Err(err), + } + } + + #[inline] + fn check_window_size(&self, len: usize, err: ConnectionError) -> Result<(), ConnectionError> { + use self::PeerState::*; + match self { + &Data(ref fc) if len <= fc.window_size() as usize=> Ok(()), + _ => Err(err), + } + } +}