diff --git a/src/proto/stream_recv_close.rs b/src/proto/stream_recv_close.rs index 059c3d4..e46a912 100644 --- a/src/proto/stream_recv_close.rs +++ b/src/proto/stream_recv_close.rs @@ -41,6 +41,7 @@ impl Stream for StreamRecvClose let id = frame.stream_id(); if !id.is_zero() { if frame.is_end_stream() { + trace!("poll: id={:?} eos", id); if let &Frame::Reset(ref rst) = &frame { self.inner.reset_stream(id, rst.reason()); } else { diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs index 817d710..f9b9eca 100644 --- a/src/proto/stream_recv_open.rs +++ b/src/proto/stream_recv_open.rs @@ -93,6 +93,16 @@ impl ApplySettings for StreamRecvOpen } } +impl StreamRecvOpen { + fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> { + // Ensure that the stream hasn't been closed otherwise. + match self.inner.get_reset(id) { + Some(reason) => Err(reason.into()), + None => Ok(()), + } + } +} + impl Stream for StreamRecvOpen where T: Stream, T: Sink, SinkError = ConnectionError>, @@ -127,51 +137,46 @@ impl Stream for StreamRecvOpen return Ok(Async::Ready(Some(frame))); } - if let &Reset(_) = &frame { - // Resetting handled by StreamRecvClose. - return Ok(Async::Ready(Some(frame))); - } + match &frame { + &Frame::Reset(..) => {} - if self.inner.get_reset(id).is_some() { - // For now, just ignore frames on reset streams. - debug!("ignoring received frame on reset stream"); - // TODO tell the remote to knock it off? - continue; - } - - if T::remote_valid_id(id) { - if !self.inner.is_remote_active(id) { - if !T::remote_can_open() { - return Err(ProtocolError.into()); - } - - if let Some(max) = self.max_concurrency { - if (max as usize) < self.inner.remote_active_len() { - let _ = self.send_refuse(id)?; - debug!("refusing stream that would exceed max_concurrency"); - - // There's no point in returning an error to the application. - continue; + &Frame::Headers(..) => { + self.check_not_reset(id)?; + if T::remote_valid_id(id) { + if self.inner.is_remote_active(id) { + // Can't send a a HEADERS frame on a remote stream that's + // active, because we've already received headers. This will + // have to change to support PUSH_PROMISE. + return Err(ProtocolError.into()); } - } - self.inner.remote_open(id, self.initial_window_size)?; - } - } else { - // If the frame is part of a local stream, it MUST already exist. - if self.inner.is_local_active(id) { - if let &Headers(..) = &frame { + if !T::remote_can_open() { + return Err(ProtocolError.into()); + } + + if let Some(max) = self.max_concurrency { + if (max as usize) < self.inner.remote_active_len() { + debug!("refusing stream that would exceed max_concurrency={}", max); + self.send_refuse(id)?; + + // There's no point in returning an error to the application. + continue; + } + } + + self.inner.remote_open(id, self.initial_window_size)?; + } else { + // On remote streams, self.inner.local_open_recv_half(id, self.initial_window_size)?; } - } else { - return Err(ProtocolError.into()); } - } - if let &Data(..) = &frame { - // Ensures we've already received headers for this stream. - if !self.inner.can_recv_data(id) { - return Err(ProtocolError.into()); + // All other stream frames are sent only when + _ => { + self.check_not_reset(id)?; + if !self.inner.can_recv_data(id) { + return Err(ProtocolError.into()); + } } } @@ -182,6 +187,7 @@ impl Stream for StreamRecvOpen } } +/// Ensures that a pending reset is impl Sink for StreamRecvOpen where T: Sink, SinkError = ConnectionError>, T: ControlStreams, diff --git a/src/proto/stream_send_close.rs b/src/proto/stream_send_close.rs index 53cf25c..195cb98 100644 --- a/src/proto/stream_send_close.rs +++ b/src/proto/stream_send_close.rs @@ -47,7 +47,7 @@ impl Sink for StreamSendClose let eos = frame.is_end_stream(); trace!("start_send: id={:?} eos={}", id, eos); if !id.is_zero() { - if frame.is_end_stream() { + if eos { if let &Frame::Reset(ref rst) = &frame { self.inner.reset_stream(id, rst.reason()); } else { diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs index 6c814e2..c39ed73 100644 --- a/src/proto/stream_send_open.rs +++ b/src/proto/stream_send_open.rs @@ -1,8 +1,9 @@ use ConnectionError; -use error::User::{InactiveStreamId, InvalidStreamId, StreamReset, Rejected}; +use error::User::{InactiveStreamId, InvalidStreamId, StreamReset, Rejected, UnexpectedFrameType}; use frame::{Frame, SettingSet}; use proto::*; +/// #[derive(Debug)] pub struct StreamSendOpen { inner: T, @@ -73,6 +74,16 @@ impl Stream for StreamSendOpen } } +impl StreamSendOpen { + fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> { + // Ensure that the stream hasn't been closed otherwise. + match self.inner.get_reset(id) { + Some(reason) => Err(StreamReset(reason).into()), + None => Ok(()), + } + } +} + impl Sink for StreamSendOpen where T: Sink, SinkError = ConnectionError>, T: ControlStreams, @@ -83,67 +94,62 @@ impl Sink for StreamSendOpen fn start_send(&mut self, frame: T::SinkItem) -> StartSend { let id = frame.stream_id(); trace!("start_send: id={:?}", id); + + // Forward connection frames immediately. if id.is_zero() { if !frame.is_connection_frame() { - return Err(InvalidStreamId.into()) + return Err(InvalidStreamId.into()); } - // Nothing to do on connection frames. return self.inner.start_send(frame); } - // Reset the stream immediately and send the Reset on the underlying transport. - if let &Frame::Reset(..) = &frame { - return self.inner.start_send(frame); - } + match &frame { + &Frame::Reset(..) => {} - // Ensure that the stream hasn't been closed otherwise. - if let Some(reason) = self.inner.get_reset(id) { - return Err(StreamReset(reason).into()) - } + &Frame::Headers(..) => { + self.check_not_reset(id)?; + if T::local_valid_id(id) { + if self.inner.is_local_active(id) { + // Can't send a a HEADERS frame on a local stream that's active, + // because we've already sent headers. This will have to change + // to support PUSH_PROMISE. + return Err(UnexpectedFrameType.into()); + } - if T::local_valid_id(id) { - if self.inner.is_local_active(id) { + if !T::local_can_open() { + // A server tried to start a stream with a HEADERS frame. + return Err(UnexpectedFrameType.into()); + } + + if let Some(max) = self.max_concurrency { + // Don't allow this stream to overflow the remote's max stream + // concurrency. + if (max as usize) < self.inner.local_active_len() { + return Err(Rejected.into()); + } + } + + self.inner.local_open(id, self.initial_window_size)?; + } else { + // On remote streams, + if self.inner.remote_open_send_half(id, self.initial_window_size).is_err() { + return Err(InvalidStreamId.into()); + } + } + } + + // This only handles other stream frames (data, window update, ...). Ensure + // the stream is open (i.e. has already sent headers). + _ => { + self.check_not_reset(id)?; if !self.inner.can_send_data(id) { return Err(InactiveStreamId.into()); } - } else { - if !T::local_can_open() { - return Err(InvalidStreamId.into()); - } - - if let Some(max) = self.max_concurrency { - if (max as usize) < self.inner.local_active_len() { - return Err(Rejected.into()); - } - } - - if let &Frame::Headers(..) = &frame { - self.inner.local_open(id, self.initial_window_size)?; - } else { - return Err(InactiveStreamId.into()); - } - } - } else { - // If the frame is part of a remote stream, it MUST already exist. - if self.inner.is_remote_active(id) { - if let &Frame::Headers(..) = &frame { - self.inner.remote_open_send_half(id, self.initial_window_size)?; - } - } else { - return Err(InvalidStreamId.into()); } } - if let &Frame::Data(..) = &frame { - // Ensures we've already sent headers for this stream. - if !self.inner.can_send_data(id) { - return Err(InactiveStreamId.into()); - } - } - - trace!("sending frame..."); - return self.inner.start_send(frame); + self.inner.start_send(frame) } fn poll_complete(&mut self) -> Poll<(), T::SinkError> { diff --git a/src/proto/stream_store.rs b/src/proto/stream_store.rs index 23deb62..562421d 100644 --- a/src/proto/stream_store.rs +++ b/src/proto/stream_store.rs @@ -213,8 +213,9 @@ impl ControlStreams for StreamStore { } fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - assert!(Self::local_valid_id(id)); - debug_assert!(self.local_active.contains_key(&id)); + if !Self::local_valid_id(id) { + return Err(ProtocolError.into()); + } match self.local_active.get_mut(&id) { Some(s) => s.open_recv_half(sz).map(|_| {}), @@ -223,9 +224,9 @@ impl ControlStreams for StreamStore { } fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - assert!(Self::remote_valid_id(id)); - debug_assert!(Self::remote_can_open()); - debug_assert!(self.remote_active.contains_key(&id)); + if !Self::remote_valid_id(id) { + return Err(ProtocolError.into()); + } match self.remote_active.get_mut(&id) { Some(s) => s.open_send_half(sz).map(|_| {}), diff --git a/tests/client_request.rs b/tests/client_request.rs index e9e97aa..bbfe716 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -241,7 +241,7 @@ mod client_request { request.uri = "https://http2.akamai.com/".parse().unwrap(); let err = h2.send_request(0.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, InvalidStreamId); + assert_user_err!(err, UnexpectedFrameType); } #[test] @@ -350,7 +350,7 @@ mod client_request { request.uri = "https://http2.akamai.com/".parse().unwrap(); let err = h2.send_request(1.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, InactiveStreamId); + assert_user_err!(err, UnexpectedFrameType); } #[test]