diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 49a00df..8abd97f 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -66,6 +66,22 @@ pub enum Frame { } impl Frame { + pub fn is_connection_frame(&self) -> bool { + use self::Frame::*; + + match self { + &Headers(..) | + &Data(..) | + &PushPromise(..) | + &Reset(..) => false, + + &WindowUpdate(ref v) => v.stream_id().is_zero(), + + &Ping(_) | + &Settings(_) => true, + } + } + pub fn stream_id(&self) -> StreamId { use self::Frame::*; diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index d71a072..2ac8cc0 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -78,19 +78,19 @@ impl FlowControl // Flow control utitlities. impl FlowControl { - fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { if id.is_zero() { Some(&mut self.local_connection) } else { - self.inner.local_flow_controller(id) + self.inner.recv_flow_controller(id) } } - fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { if id.is_zero() { Some(&mut self.remote_connection) } else { - self.inner.remote_flow_controller(id) + self.inner.send_flow_controller(id) } } } @@ -104,7 +104,7 @@ impl ControlFlow for FlowControl { // TODO this should probably account for stream priority? while let Some(id) = self.remote_pending_streams.pop_front() { - if let Some(mut flow) = self.remote_flow_controller(id) { + if let Some(mut flow) = self.send_flow_controller(id) { if let Some(incr) = flow.apply_window_update() { return Ok(Async::Ready(WindowUpdate::new(id, incr))); } @@ -116,7 +116,7 @@ impl ControlFlow for FlowControl { } fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - let added = match self.local_flow_controller(id) { + let added = match self.recv_flow_controller(id) { None => false, Some(mut fc) => { fc.expand_window(incr); @@ -153,7 +153,7 @@ impl FlowControl while let Some(id) = self.local_pending_streams.pop_front() { if self.inner.get_reset(id).is_none() { - let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update()); + let update = self.recv_flow_controller(id).and_then(|s| s.apply_window_update()); if let Some(incr) = update { try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); } @@ -187,7 +187,7 @@ impl Stream for FlowControl loop { match try_ready!(self.inner.poll()) { Some(WindowUpdate(v)) => { - if let Some(fc) = self.remote_flow_controller(v.stream_id()) { + if let Some(fc) = self.send_flow_controller(v.stream_id()) { fc.expand_window(v.size_increment()); } } @@ -199,7 +199,7 @@ impl Stream for FlowControl } // If this frame ends the stream, there may no longer be a flow // controller. That's fine. - if let Some(fc) = self.local_flow_controller(v.stream_id()) { + if let Some(fc) = self.recv_flow_controller(v.stream_id()) { if fc.claim_window(sz).is_err() { return Err(error::Reason::FlowControlError.into()) } @@ -248,7 +248,7 @@ impl Sink for FlowControl // Ensure there's enough capacity on stream. { - let mut fc = self.inner.remote_flow_controller(v.stream_id()) + let mut fc = self.inner.send_flow_controller(v.stream_id()) .expect("no remote stream for data frame"); if fc.claim_window(sz).is_err() { return Err(error::User::FlowControlViolation.into()) @@ -311,7 +311,7 @@ impl ApplySettings for FlowControl return Ok(()); } - self.inner.local_update_inital_window_size(old_window_size, new_window_size); + self.inner.update_inital_recv_window_size(old_window_size, new_window_size); self.local_initial = new_window_size; Ok(()) } @@ -325,7 +325,7 @@ impl ApplySettings for FlowControl return Ok(()); } - self.inner.remote_update_inital_window_size(old_window_size, new_window_size); + self.inner.update_inital_send_window_size(old_window_size, new_window_size); self.remote_initial = new_window_size; Ok(()) } @@ -348,16 +348,24 @@ impl ControlStreams for FlowControl { self.inner.local_open(id, sz) } + fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open_recv_half(id, sz) + } + + fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open_send_half(id, sz) + } + fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { self.inner.remote_open(id, sz) } - fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_local_half(id) + fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_send_half(id) } - fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_remote_half(id) + fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_recv_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { @@ -383,20 +391,20 @@ impl ControlStreams for FlowControl { self.inner.remote_active_len() } - fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { - self.inner.local_update_inital_window_size(old_sz, new_sz) + fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.update_inital_recv_window_size(old_sz, new_sz) } - fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { - self.inner.remote_update_inital_window_size(old_sz, new_sz) + fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.update_inital_send_window_size(old_sz, new_sz) } - fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.local_flow_controller(id) + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.recv_flow_controller(id) } - fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.remote_flow_controller(id) + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.send_flow_controller(id) } fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 9ed9fc0..795ec2c 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -31,7 +31,6 @@ use self::framed_write::FramedWrite; use self::ping_pong::{ControlPing, PingPayload, PingPong}; use self::ready::ReadySink; use self::settings::{ApplySettings, /*ControlSettings,*/ Settings}; -use self::state::{StreamState, PeerState}; use self::stream_recv_close::StreamRecvClose; use self::stream_recv_open::StreamRecvOpen; use self::stream_send_close::StreamSendClose; @@ -172,8 +171,8 @@ pub fn from_server_handshaker(settings: Settings P: Peer, B: IntoBuf, { - let initial_local_window_size = settings.local_settings().initial_window_size(); - let initial_remote_window_size = settings.remote_settings().initial_window_size(); + let initial_recv_window_size = settings.local_settings().initial_window_size(); + let initial_send_window_size = settings.remote_settings().initial_window_size(); let local_max_concurrency = settings.local_settings().max_concurrent_streams(); let remote_max_concurrency = settings.remote_settings().max_concurrent_streams(); @@ -187,16 +186,17 @@ pub fn from_server_handshaker(settings: Settings .num_skip(0) // Don't skip the header .new_read(io); + trace!("composing transport"); StreamSendOpen::new( - initial_remote_window_size, + initial_send_window_size, remote_max_concurrency, StreamRecvClose::new( FlowControl::new( - initial_local_window_size, - initial_remote_window_size, + initial_recv_window_size, + initial_send_window_size, StreamSendClose::new( StreamRecvOpen::new( - initial_local_window_size, + initial_recv_window_size, local_max_concurrency, StreamStore::new( PingPong::new( @@ -205,3 +205,4 @@ pub fn from_server_handshaker(settings: Settings connection::new(transport) } + diff --git a/src/proto/state.rs b/src/proto/state.rs index a0167a3..aa621e5 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -1,4 +1,4 @@ -use {ConnectionError, Peer}; +use ConnectionError; use error::Reason::*; use error::User::*; use proto::{FlowControlState, WindowSize}; @@ -59,126 +59,113 @@ pub enum StreamState { } impl StreamState { - // /// Transition the state to represent headers being received. - // /// - // /// Returns true if this state transition results in iniitializing the - // /// stream id. `Err` is returned if this is an invalid state transition. - // pub fn recv_headers

(&mut self, eos: bool, initial_window_size: WindowSize) - // -> Result - // where P: Peer - // { - // use self::StreamState::*; - // use self::PeerState::*; + pub fn new_open_sending(sz: WindowSize) -> StreamState { + StreamState::Open { + local: PeerState::AwaitingHeaders, + remote: PeerState::streaming(sz), + } + } - // match *self { - // Idle => { - // let local = Headers; - // if eos { - // *self = HalfClosedRemote(local); - // } else { - // let remote = Data(FlowControlState::with_initial_size(initial_window_size)); - // *self = Open { local, remote }; - // } - // Ok(true) - // } + pub fn new_open_recving(sz: WindowSize) -> StreamState { + StreamState::Open { + local: PeerState::streaming(sz), + remote: PeerState::AwaitingHeaders, + } + } - // Open { local, remote } => { - // try!(remote.check_is_headers(ProtocolError.into())); - // if !eos { - // // Received non-trailers HEADERS on open remote. - // return Err(ProtocolError.into()); - // } - // *self = HalfClosedRemote(local); - // Ok(false) - // } + /// Opens the send-half of a stream if it is not already open. + /// + /// Returns true iff the send half was not previously open. + pub fn open_send_half(&mut self, sz: WindowSize) -> Result { + use self::StreamState::*; + use self::PeerState::*; - // HalfClosedLocal(headers) => { - // try!(headers.check_is_headers(ProtocolError.into())); - // if eos { - // *self = Closed; - // } else { - // let remote = FlowControlState::with_initial_size(initial_window_size); - // *self = HalfClosedLocal(Data(remote)); - // }; - // Ok(false) - // } + // Try to avoid copying `self` by first checking to see whether the stream needs + // to be updated. + match self { + &mut Idle | + &mut Closed | + &mut HalfClosedRemote(..) => { + return Err(ProtocolError.into()); + } - // Closed | HalfClosedRemote(..) => { - // Err(ProtocolError.into()) - // } - // } - // } + &mut Open { remote: Streaming(..), .. } | + &mut HalfClosedLocal(Streaming(..)) => { + return Ok(false); + } - // /// Transition the state to represent headers being sent. - // /// - // /// Returns true if this state transition results in initializing the stream - // /// id. `Err` is returned if this is an invalid state transition. - // pub fn send_headers(&mut self, - // eos: bool, - // initial_window_size: WindowSize) - // -> Result - // { - // use self::StreamState::*; - // use self::PeerState::*; + &mut Open { remote: AwaitingHeaders, .. } | + &mut HalfClosedLocal(AwaitingHeaders) => {} + } - // match *self { - // Idle => { - // *self = if eos { - // HalfClosedLocal(Headers) - // } else { - // Open { - // local: Data(FlowControlState::with_initial_size(initial_window_size)), - // remote: Headers, - // } - // }; + match *self { + Open { local, remote: AwaitingHeaders } => { + *self = Open { + local, + remote: PeerState::streaming(sz), + }; + } - // Ok(true) - // } + HalfClosedLocal(AwaitingHeaders) => { + *self = HalfClosedLocal(PeerState::streaming(sz)); + } - // Open { local, remote } => { - // try!(local.check_is_headers(UnexpectedFrameType.into())); + _ => unreachable!() + } - // *self = if eos { - // HalfClosedLocal(remote) - // } else { - // let fc = FlowControlState::with_initial_size(initial_window_size); - // let local = Data(fc); - // Open { local, remote } - // }; + Ok(true) + } - // Ok(false) - // } + pub fn open_recv_half(&mut self, sz: WindowSize) -> Result { + use self::StreamState::*; + use self::PeerState::*; - // HalfClosedRemote(local) => { - // try!(local.check_is_headers(UnexpectedFrameType.into())); + // Try to avoid copying `self` by first checking to see whether the stream needs + // to be updated. + match self { + &mut Idle | + &mut Closed | + &mut HalfClosedLocal(..) => { + return Err(ProtocolError.into()); + } - // *self = if eos { - // Closed - // } else { - // let fc = FlowControlState::with_initial_size(initial_window_size); - // HalfClosedRemote(Data(fc)) - // }; + &mut Open { local: Streaming(..), .. } | + &mut HalfClosedRemote(Streaming(..)) => { + return Ok(false); + } - // Ok(false) - // } + &mut Open { local: AwaitingHeaders, .. } | + &mut HalfClosedRemote(AwaitingHeaders) => {} + } - // Closed | HalfClosedLocal(..) => { - // Err(UnexpectedFrameType.into()) - // } - // } - // } + match *self { + Open { remote, local: AwaitingHeaders } => { + *self = Open { + local: PeerState::streaming(sz), + remote, + }; + } + + HalfClosedLocal(AwaitingHeaders) => { + *self = HalfClosedRemote(PeerState::streaming(sz)); + } + + _ => unreachable!() + } + + Ok(true) + } pub fn check_can_send_data(&self) -> Result<(), ConnectionError> { use self::StreamState::*; - match self { &Open { ref remote, .. } => { - try!(remote.check_is_data(UnexpectedFrameType.into())); + try!(remote.check_streaming(UnexpectedFrameType.into())); Ok(()) } &HalfClosedLocal(ref remote) => { - try!(remote.check_is_data(UnexpectedFrameType.into())); + try!(remote.check_streaming(UnexpectedFrameType.into())); Ok(()) } @@ -190,15 +177,14 @@ impl StreamState { pub fn check_can_recv_data(&self) -> Result<(), ConnectionError> { use self::StreamState::*; - match self { &Open { ref local, .. } => { - try!(local.check_is_data(ProtocolError.into())); + try!(local.check_streaming(ProtocolError.into())); Ok(()) } &HalfClosedRemote(ref local) => { - try!(local.check_is_data(ProtocolError.into())); + try!(local.check_streaming(ProtocolError.into())); Ok(()) } @@ -208,17 +194,21 @@ impl StreamState { } } + /// Indicates that the local side will not send more data to the remote. + /// /// Returns true iff the stream is fully closed. - pub fn close_local(&mut self) -> Result { + pub fn close_send_half(&mut self) -> Result { use self::StreamState::*; - match *self { - Open { remote, .. } => { - *self = HalfClosedLocal(remote); + Open { local, .. } => { + // The local side will continue to receive data. + trace!("close_send_half: Open => HalfClosedRemote({:?})", local); + *self = HalfClosedRemote(local); Ok(false) } - HalfClosedLocal(remote) => { + HalfClosedLocal(..) => { + trace!("close_send_half: HalfClosedLocal => Closed"); *self = Closed; Ok(true) } @@ -229,17 +219,21 @@ impl StreamState { } } + /// Indicates that the remote side will not send more data to the local. + /// /// Returns true iff the stream is fully closed. - pub fn close_remote(&mut self) -> Result { + pub fn close_recv_half(&mut self) -> Result { use self::StreamState::*; - match *self { - Open { local, .. } => { - *self = HalfClosedRemote(local); + Open { remote, .. } => { + // The remote side will continue to receive data. + trace!("close_recv_half: Open => HalfClosedLocal({:?})", remote); + *self = HalfClosedLocal(remote); Ok(false) } - HalfClosedRemote(local) => { + HalfClosedRemote(..) => { + trace!("close_recv_half: HalfClosedRemoteOpen => Closed"); *self = Closed; Ok(true) } @@ -250,24 +244,20 @@ impl StreamState { } } - pub fn local_flow_controller(&mut self) -> Option<&mut FlowControlState> { + pub fn recv_flow_controller(&mut self) -> Option<&mut FlowControlState> { use self::StreamState::*; - use self::PeerState::*; - match self { - &mut Open { local: Data(ref mut fc), .. } | - &mut HalfClosedRemote(Data(ref mut fc)) => Some(fc), + &mut Open { ref mut local, .. } | + &mut HalfClosedRemote(ref mut local) => local.flow_controller(), _ => None, } } - pub fn remote_flow_controller(&mut self) -> Option<&mut FlowControlState> { + pub fn send_flow_controller(&mut self) -> Option<&mut FlowControlState> { use self::StreamState::*; - use self::PeerState::*; - match self { - &mut Open { remote: Data(ref mut fc), .. } | - &mut HalfClosedLocal(Data(ref mut fc)) => Some(fc), + &mut Open { ref mut remote, .. } | + &mut HalfClosedLocal(ref mut remote) => remote.flow_controller(), _ => None, } } @@ -281,27 +271,36 @@ impl Default for StreamState { #[derive(Debug, Copy, Clone)] pub enum PeerState { - Headers, + AwaitingHeaders, /// Contains a FlowControlState representing the _receiver_ of this this data stream. - Data(FlowControlState), + Streaming(FlowControlState), +} + +impl Default for PeerState { + fn default() -> Self { + PeerState::AwaitingHeaders + } } impl PeerState { + fn streaming(sz: WindowSize) -> PeerState { + PeerState::Streaming(FlowControlState::with_initial_size(sz)) + } + #[inline] - fn check_is_headers(&self, err: ConnectionError) -> Result<(), ConnectionError> { + fn check_streaming(&self, err: ConnectionError) -> Result<(), ConnectionError> { use self::PeerState::*; match self { - &Headers => Ok(()), + &Streaming(..) => Ok(()), _ => Err(err), } } - #[inline] - fn check_is_data(&self, err: ConnectionError) -> Result<(), ConnectionError> { + fn flow_controller(&mut self) -> Option<&mut FlowControlState> { use self::PeerState::*; match self { - &Data(_) => Ok(()), - _ => Err(err), + &mut Streaming(ref mut fc) => Some(fc), + _ => None, } } } diff --git a/src/proto/stream_recv_close.rs b/src/proto/stream_recv_close.rs index 90ce8a5..6db2bce 100644 --- a/src/proto/stream_recv_close.rs +++ b/src/proto/stream_recv_close.rs @@ -9,6 +9,7 @@ use proto::ready::ReadySink; // TODO reset_streams needs to be bounded. // TODO track reserved streams (PUSH_PROMISE). +/// Handles end-of-stream frames sent from the remote. #[derive(Debug)] pub struct StreamRecvClose { inner: T, @@ -32,11 +33,24 @@ impl Stream for StreamRecvClose type Error = T::Error; fn poll(&mut self) -> Poll, T::Error> { - use frame::Frame::*; + let frame = match try_ready!(self.inner.poll()) { + None => return Ok(Async::Ready(None)), + Some(f) => f, + }; - let frame = try_ready!(self.inner.poll()); + let id = frame.stream_id(); + if !id.is_zero() { + if frame.is_end_stream() { + if let &Frame::Reset(ref rst) = &frame { + self.inner.reset_stream(id, rst.reason()); + } else { + debug_assert!(self.inner.is_active(id)); + self.inner.close_recv_half(id)?; + } + } + } - unimplemented!() + Ok(Async::Ready(Some(frame))) } } @@ -87,12 +101,20 @@ impl ControlStreams for StreamRecvClose { self.inner.remote_open(id, sz) } - fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_local_half(id) + fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open_recv_half(id, sz) } - fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_remote_half(id) + fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open_send_half(id, sz) + } + + fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_send_half(id) + } + + fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_recv_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { @@ -119,20 +141,20 @@ impl ControlStreams for StreamRecvClose { self.inner.remote_active_len() } - fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { - self.inner.local_update_inital_window_size(old_sz, new_sz) + fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.update_inital_recv_window_size(old_sz, new_sz) } - fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { - self.inner.remote_update_inital_window_size(old_sz, new_sz) + fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.update_inital_send_window_size(old_sz, new_sz) } - fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.local_flow_controller(id) + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.recv_flow_controller(id) } - fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.remote_flow_controller(id) + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.send_flow_controller(id) } fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs index ae32e64..69c9972 100644 --- a/src/proto/stream_recv_open.rs +++ b/src/proto/stream_recv_open.rs @@ -39,10 +39,10 @@ impl StreamRecvOpen fn send_refuse(&mut self, id: StreamId) -> Poll<(), ConnectionError> { debug_assert!(self.pending_refuse.is_none()); - let f = frame::Reset::new(id, Reason::RefusedStream); + let f = frame::Reset::new(id, RefusedStream); match self.inner.start_send(f.into())? { AsyncSink::Ready => { - self.inner.reset_stream(id, Reason::RefusedStream); + self.inner.reset_stream(id, RefusedStream); Ok(Async::Ready(())) } AsyncSink::NotReady(_) => { @@ -51,6 +51,13 @@ impl StreamRecvOpen } } } + + fn send_pending_refuse(&mut self) -> Poll<(), ConnectionError> { + if let Some(id) = self.pending_refuse.take() { + try_ready!(self.send_refuse(id)); + } + Ok(Async::Ready(())) + } } /// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`. @@ -99,18 +106,39 @@ impl Stream for StreamRecvOpen // Since there's only one slot for pending refused streams, it must be cleared // before polling a frame from the transport. - if let Some(id) = self.pending_refuse.take() { - try_ready!(self.send_refuse(id)); - } + try_ready!(self.send_pending_refuse()); + trace!("poll"); loop { - let frame = match try_ready!(self.inner.poll()) { - None => return Ok(Async::Ready(None)), - Some(f) => f, + let frame = match self.inner.poll()? { + Async::NotReady => { + panic!("poll: NotReady"); + } + Async::Ready(None) => { + panic!("poll: None"); + } + Async::Ready(Some(f)) => { + trace!("poll: id={:?} eos={}", f.stream_id(), f.is_end_stream()); + f + } }; + // let frame = match try_ready!(self.inner.poll()) { + // None => return Ok(Async::Ready(None)), + // Some(f) => f, + // }; let id = frame.stream_id(); + trace!("poll: id={:?}", id); if id.is_zero() { + if !frame.is_connection_frame() { + return Err(ProtocolError.into()) + } + // Nothing to do on connection frames. + return Ok(Async::Ready(Some(frame))); + } + + if let &Reset(_) = &frame { + // Resetting handled by StreamRecvClose. return Ok(Async::Ready(Some(frame))); } @@ -129,20 +157,29 @@ impl Stream for StreamRecvOpen if let Some(max) = self.max_concurrency { if (max as usize) < self.inner.remote_active_len() { - return Err(RefusedStream.into()); + let _ = self.send_refuse(id)?; + debug!("refusing stream that would exceed max_concurrency"); + + // There's no point in returning an error to the application. + continue; } } self.inner.remote_open(id, self.initial_window_size)?; } } else { - // Receiving on local stream MUST be on active stream. - if !self.inner.is_local_active(id) && !frame.is_reset() { + // If the frame is part of a local stream, it MUST already exist. + if self.inner.is_local_active(id) { + if let &Headers(..) = &frame { + self.inner.local_open_recv_half(id, self.initial_window_size)?; + } + } else { return Err(ProtocolError.into()); } } if let &Data(..) = &frame { + // Ensures we've already received headers for this stream. self.inner.check_can_recv_data(id)?; } @@ -161,32 +198,17 @@ impl Sink for StreamRecvOpen type SinkError = T::SinkError; fn start_send(&mut self, frame: T::SinkItem) -> StartSend { - use frame::Frame::*; - // The local must complete refusing the remote stream before sending any other // frames. - if let Some(id) = self.pending_refuse.take() { - if self.send_refuse(id)?.is_not_ready() { - return Ok(AsyncSink::NotReady(frame)); - } - } - - let id = frame.stream_id(); - if !id.is_zero() { - // enforced by StreamSend. - debug_assert!(self.inner.get_reset(id).is_none()); - - let eos = frame.is_end_stream(); + if self.send_pending_refuse()?.is_not_ready() { + return Ok(AsyncSink::NotReady(frame)); } self.inner.start_send(frame) } fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - if let Some(id) = self.pending_refuse.take() { - try_ready!(self.send_refuse(id)); - } - + try_ready!(self.send_pending_refuse()); self.inner.poll_complete() } } @@ -332,12 +354,20 @@ impl ControlStreams for StreamRecvOpen { self.inner.remote_open(id, sz) } - fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_local_half(id) + fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open_recv_half(id, sz) } - fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_remote_half(id) + fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open_send_half(id, sz) + } + + fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_send_half(id) + } + + fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_recv_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { @@ -364,20 +394,20 @@ impl ControlStreams for StreamRecvOpen { self.inner.remote_active_len() } - fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { - self.inner.local_update_inital_window_size(old_sz, new_sz) + fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.update_inital_recv_window_size(old_sz, new_sz) } - fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { - self.inner.remote_update_inital_window_size(old_sz, new_sz) + fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.update_inital_send_window_size(old_sz, new_sz) } - fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.local_flow_controller(id) + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.recv_flow_controller(id) } - fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.remote_flow_controller(id) + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.send_flow_controller(id) } fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { diff --git a/src/proto/stream_send_close.rs b/src/proto/stream_send_close.rs index bb0c406..00acea6 100644 --- a/src/proto/stream_send_close.rs +++ b/src/proto/stream_send_close.rs @@ -43,12 +43,17 @@ impl Sink for StreamSendClose type SinkError = ConnectionError; fn start_send(&mut self, frame: Self::SinkItem) -> StartSend, ConnectionError> { - if frame.is_end_stream() { - let id = frame.stream_id(); - if let &Frame::Reset(ref rst) = &frame { - self.inner.reset_stream(id, rst.reason()); - } else { - self.inner.close_local_half(id)?; + let id = frame.stream_id(); + let eos = frame.is_end_stream(); + trace!("start_send: id={:?} eos={}", id, eos); + if !id.is_zero() { + if frame.is_end_stream() { + if let &Frame::Reset(ref rst) = &frame { + self.inner.reset_stream(id, rst.reason()); + } else { + debug_assert!(self.inner.is_active(id)); + self.inner.close_send_half(id)?; + } } } @@ -101,12 +106,20 @@ impl ControlStreams for StreamSendClose { self.inner.remote_open(id, sz) } - fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_local_half(id) + fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open_recv_half(id, sz) } - fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_remote_half(id) + fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open_send_half(id, sz) + } + + fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_send_half(id) + } + + fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_recv_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { @@ -133,20 +146,20 @@ impl ControlStreams for StreamSendClose { self.inner.remote_active_len() } - fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { - self.inner.local_update_inital_window_size(old_sz, new_sz) + fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.update_inital_recv_window_size(old_sz, new_sz) } - fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { - self.inner.remote_update_inital_window_size(old_sz, new_sz) + fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.update_inital_send_window_size(old_sz, new_sz) } - fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.local_flow_controller(id) + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.recv_flow_controller(id) } - fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.remote_flow_controller(id) + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.send_flow_controller(id) } fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs index 75eabe5..6b754ab 100644 --- a/src/proto/stream_send_open.rs +++ b/src/proto/stream_send_open.rs @@ -68,6 +68,7 @@ impl Stream for StreamSendOpen type Error = ConnectionError; fn poll(&mut self) -> Poll, ConnectionError> { + trace!("poll"); self.inner.poll() } } @@ -80,18 +81,19 @@ impl Sink for StreamSendOpen type SinkError = T::SinkError; fn start_send(&mut self, frame: T::SinkItem) -> StartSend { - use frame::Frame::*; - let id = frame.stream_id(); + trace!("start_send: id={:?}", id); if id.is_zero() { + if !frame.is_connection_frame() { + return Err(InvalidStreamId.into()) + } // Nothing to do on connection frames. return self.inner.start_send(frame); } // Reset the stream immediately and send the Reset on the underlying transport. - if let Reset(rst) = frame { - self.inner.reset_stream(id, rst.reason()); - return self.inner.start_send(Reset(rst)); + if let &Frame::Reset(..) = &frame { + return self.inner.start_send(frame); } // Ensure that the stream hasn't been closed otherwise. @@ -111,19 +113,26 @@ impl Sink for StreamSendOpen } } + trace!("creating new local stream"); self.inner.local_open(id, self.initial_window_size)?; } } else { - // If the frame was part of a remote stream, it MUST already exist. - if !self.inner.is_remote_active(id) && !frame.is_reset() { + // If the frame is part of a remote stream, it MUST already exist. + if self.inner.is_remote_active(id) { + if let &Frame::Headers(..) = &frame { + self.inner.remote_open_send_half(id, self.initial_window_size)?; + } + } else { return Err(InvalidStreamId.into()); } } - if let &Data(..) = &frame { + if let &Frame::Data(..) = &frame { + // Ensures we've already sent headers for this stream. self.inner.check_can_send_data(id)?; } + trace!("sending frame..."); return self.inner.start_send(frame); } @@ -164,12 +173,20 @@ impl ControlStreams for StreamSendOpen { self.inner.remote_open(id, sz) } - fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_local_half(id) + fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.local_open_recv_half(id, sz) } - fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - self.inner.close_remote_half(id) + fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + self.inner.remote_open_send_half(id, sz) + } + + fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_send_half(id) + } + + fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + self.inner.close_recv_half(id) } fn reset_stream(&mut self, id: StreamId, cause: Reason) { @@ -196,20 +213,20 @@ impl ControlStreams for StreamSendOpen { self.inner.remote_active_len() } - fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { - self.inner.local_update_inital_window_size(old_sz, new_sz) + fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.update_inital_recv_window_size(old_sz, new_sz) } - fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { - self.inner.remote_update_inital_window_size(old_sz, new_sz) + fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { + self.inner.update_inital_send_window_size(old_sz, new_sz) } - fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.local_flow_controller(id) + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.recv_flow_controller(id) } - fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - self.inner.remote_flow_controller(id) + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + self.inner.send_flow_controller(id) } fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> { diff --git a/src/proto/stream_store.rs b/src/proto/stream_store.rs index 9508051..078e00a 100644 --- a/src/proto/stream_store.rs +++ b/src/proto/stream_store.rs @@ -1,7 +1,7 @@ use {ConnectionError, Peer, StreamId}; use error::Reason::{NoError, ProtocolError}; use proto::*; -use proto::state::{StreamState, PeerState}; +use proto::state::StreamState; use fnv::FnvHasher; use ordermap::OrderMap; @@ -14,19 +14,44 @@ pub trait ControlStreams { fn local_valid_id(id: StreamId) -> bool; fn remote_valid_id(id: StreamId) -> bool; + /// Indicates whether this local endpoint may open streams (with HEADERS). + /// + /// Implies that this endpoint is a client. fn local_can_open() -> bool; + + /// Indicates whether this remote endpoint may open streams (with HEADERS). + /// + /// Implies that this endpoint is a server. fn remote_can_open() -> bool { !Self::local_can_open() } + /// Create a new stream in the OPEN state from the local side (i.e. as a Client). + /// + /// Must only be called when local_can_open returns true. fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + + /// Create a new stream in the OPEN state from the remote side (i.e. as a Server). + /// + /// Must only be called when remote_can_open returns true. fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + /// Prepare the receive side of a local stream to receive data from the remote. + /// + /// Typically called when a client receives a response header. + fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + + /// Prepare the send side of a remote stream to receive data from the local endpoint. + /// + /// Typically called when a server sends a response header. + fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError>; + // fn local_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; // fn remote_reserve(&mut self, id: StreamId) -> Result<(), ConnectionError>; - fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; - fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; + /// Close the local half of a stream so that the local side may not RECEIVE + fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; + fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError>; fn reset_stream(&mut self, id: StreamId, cause: Reason); fn get_reset(&self, id: StreamId) -> Option; @@ -34,14 +59,22 @@ pub trait ControlStreams { fn is_local_active(&self, id: StreamId) -> bool; fn is_remote_active(&self, id: StreamId) -> bool; + fn is_active(&self, id: StreamId) -> bool { + if Self::local_valid_id(id) { + self.is_local_active(id) + } else { + self.is_remote_active(id) + } + } + fn local_active_len(&self) -> usize; fn remote_active_len(&self) -> usize; - fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; - fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>; - fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32); - fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32); + fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32); + fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32); fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError>; fn check_can_recv_data(&mut self, id: StreamId) -> Result<(), ConnectionError>; @@ -160,35 +193,49 @@ impl ControlStreams for StreamStore { } /// Open a new stream from the local side (i.e. as a Client). - // fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - debug_assert!(Self::local_can_open()); assert!(Self::local_valid_id(id)); + debug_assert!(Self::local_can_open()); debug_assert!(!self.local_active.contains_key(&id)); - self.local_active.insert(id, StreamState::Open { - remote: PeerState::Data(FlowControlState::with_initial_size(sz)), - local: PeerState::Headers, - }); + self.local_active.insert(id, StreamState::new_open_sending(sz)); Ok(()) } /// Open a new stream from the remote side (i.e. as a Server). fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - debug_assert!(Self::remote_can_open()); assert!(Self::remote_valid_id(id)); + debug_assert!(Self::remote_can_open()); debug_assert!(!self.remote_active.contains_key(&id)); - self.remote_active.insert(id, StreamState::Open { - local: PeerState::Data(FlowControlState::with_initial_size(sz)), - remote: PeerState::Headers, - }); + self.remote_active.insert(id, StreamState::new_open_recving(sz)); Ok(()) } - fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + assert!(Self::local_valid_id(id)); + debug_assert!(self.local_active.contains_key(&id)); + + match self.local_active.get_mut(&id) { + Some(s) => s.open_recv_half(sz).map(|_| {}), + None => Err(ProtocolError.into()), + } + } + + fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { + assert!(Self::remote_valid_id(id)); + debug_assert!(Self::remote_can_open()); + debug_assert!(self.remote_active.contains_key(&id)); + + match self.remote_active.get_mut(&id) { + Some(s) => s.open_send_half(sz).map(|_| {}), + None => Err(ProtocolError.into()), + } + } + + fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { let fully_closed = self.get_active_mut(id) - .map(|s| s.close_local()) + .map(|s| s.close_send_half()) .unwrap_or_else(|| Err(ProtocolError.into()))?; if fully_closed { @@ -198,9 +245,9 @@ impl ControlStreams for StreamStore { Ok(()) } - fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { + fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { let fully_closed = self.get_active_mut(id) - .map(|s| s.close_remote()) + .map(|s| s.close_recv_half()) .unwrap_or_else(|| Err(ProtocolError.into()))?; if fully_closed { @@ -235,18 +282,18 @@ impl ControlStreams for StreamStore { self.remote_active.len() } - fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) { if new_sz < old_sz { let decr = old_sz - new_sz; for s in self.local_active.values_mut() { - if let Some(fc) = s.local_flow_controller() { + if let Some(fc) = s.recv_flow_controller() { fc.shrink_window(decr); } } for s in self.remote_active.values_mut() { - if let Some(fc) = s.local_flow_controller() { + if let Some(fc) = s.recv_flow_controller() { fc.shrink_window(decr); } } @@ -254,31 +301,31 @@ impl ControlStreams for StreamStore { let incr = new_sz - old_sz; for s in self.local_active.values_mut() { - if let Some(fc) = s.local_flow_controller() { + if let Some(fc) = s.recv_flow_controller() { fc.expand_window(incr); } } for s in self.remote_active.values_mut() { - if let Some(fc) = s.local_flow_controller() { + if let Some(fc) = s.recv_flow_controller() { fc.expand_window(incr); } } } } - fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) { + fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) { if new_sz < old_sz { let decr = old_sz - new_sz; for s in self.local_active.values_mut() { - if let Some(fc) = s.remote_flow_controller() { + if let Some(fc) = s.send_flow_controller() { fc.shrink_window(decr); } } for s in self.remote_active.values_mut() { - if let Some(fc) = s.remote_flow_controller() { + if let Some(fc) = s.send_flow_controller() { fc.shrink_window(decr); } } @@ -286,36 +333,36 @@ impl ControlStreams for StreamStore { let incr = new_sz - old_sz; for s in self.local_active.values_mut() { - if let Some(fc) = s.remote_flow_controller() { + if let Some(fc) = s.send_flow_controller() { fc.expand_window(incr); } } for s in self.remote_active.values_mut() { - if let Some(fc) = s.remote_flow_controller() { + if let Some(fc) = s.send_flow_controller() { fc.expand_window(incr); } } } } - fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { if id.is_zero() { None } else if P::is_valid_local_stream_id(id) { - self.local_active.get_mut(&id).and_then(|s| s.local_flow_controller()) + self.local_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) } else { - self.remote_active.get_mut(&id).and_then(|s| s.local_flow_controller()) + self.remote_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) } } - fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { + fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { if id.is_zero() { None } else if P::is_valid_local_stream_id(id) { - self.local_active.get_mut(&id).and_then(|s| s.remote_flow_controller()) + self.local_active.get_mut(&id).and_then(|s| s.send_flow_controller()) } else { - self.remote_active.get_mut(&id).and_then(|s| s.remote_flow_controller()) + self.remote_active.get_mut(&id).and_then(|s| s.send_flow_controller()) } }