clarify stream state management -- pattern matching ftw

This commit is contained in:
Oliver Gould
2017-07-23 15:51:39 +00:00
parent bf724bd53e
commit 4f723fffce
6 changed files with 107 additions and 93 deletions

View File

@@ -41,6 +41,7 @@ impl<T> Stream for StreamRecvClose<T>
let id = frame.stream_id();
if !id.is_zero() {
if frame.is_end_stream() {
trace!("poll: id={:?} eos", id);
if let &Frame::Reset(ref rst) = &frame {
self.inner.reset_stream(id, rst.reason());
} else {

View File

@@ -93,6 +93,16 @@ impl<T> ApplySettings for StreamRecvOpen<T>
}
}
impl<T: ControlStreams> StreamRecvOpen<T> {
fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> {
// Ensure that the stream hasn't been closed otherwise.
match self.inner.get_reset(id) {
Some(reason) => Err(reason.into()),
None => Ok(()),
}
}
}
impl<T, U> Stream for StreamRecvOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
@@ -127,51 +137,46 @@ impl<T, U> Stream for StreamRecvOpen<T>
return Ok(Async::Ready(Some(frame)));
}
if let &Reset(_) = &frame {
// Resetting handled by StreamRecvClose.
return Ok(Async::Ready(Some(frame)));
}
match &frame {
&Frame::Reset(..) => {}
if self.inner.get_reset(id).is_some() {
// For now, just ignore frames on reset streams.
debug!("ignoring received frame on reset stream");
// TODO tell the remote to knock it off?
continue;
}
if T::remote_valid_id(id) {
if !self.inner.is_remote_active(id) {
if !T::remote_can_open() {
return Err(ProtocolError.into());
}
if let Some(max) = self.max_concurrency {
if (max as usize) < self.inner.remote_active_len() {
let _ = self.send_refuse(id)?;
debug!("refusing stream that would exceed max_concurrency");
// There's no point in returning an error to the application.
continue;
&Frame::Headers(..) => {
self.check_not_reset(id)?;
if T::remote_valid_id(id) {
if self.inner.is_remote_active(id) {
// Can't send a a HEADERS frame on a remote stream that's
// active, because we've already received headers. This will
// have to change to support PUSH_PROMISE.
return Err(ProtocolError.into());
}
}
self.inner.remote_open(id, self.initial_window_size)?;
}
} else {
// If the frame is part of a local stream, it MUST already exist.
if self.inner.is_local_active(id) {
if let &Headers(..) = &frame {
if !T::remote_can_open() {
return Err(ProtocolError.into());
}
if let Some(max) = self.max_concurrency {
if (max as usize) < self.inner.remote_active_len() {
debug!("refusing stream that would exceed max_concurrency={}", max);
self.send_refuse(id)?;
// There's no point in returning an error to the application.
continue;
}
}
self.inner.remote_open(id, self.initial_window_size)?;
} else {
// On remote streams,
self.inner.local_open_recv_half(id, self.initial_window_size)?;
}
} else {
return Err(ProtocolError.into());
}
}
if let &Data(..) = &frame {
// Ensures we've already received headers for this stream.
if !self.inner.can_recv_data(id) {
return Err(ProtocolError.into());
// All other stream frames are sent only when
_ => {
self.check_not_reset(id)?;
if !self.inner.can_recv_data(id) {
return Err(ProtocolError.into());
}
}
}
@@ -182,6 +187,7 @@ impl<T, U> Stream for StreamRecvOpen<T>
}
}
/// Ensures that a pending reset is
impl<T, U> Sink for StreamRecvOpen<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,

View File

@@ -47,7 +47,7 @@ impl<T, U> Sink for StreamSendClose<T>
let eos = frame.is_end_stream();
trace!("start_send: id={:?} eos={}", id, eos);
if !id.is_zero() {
if frame.is_end_stream() {
if eos {
if let &Frame::Reset(ref rst) = &frame {
self.inner.reset_stream(id, rst.reason());
} else {

View File

@@ -1,8 +1,9 @@
use ConnectionError;
use error::User::{InactiveStreamId, InvalidStreamId, StreamReset, Rejected};
use error::User::{InactiveStreamId, InvalidStreamId, StreamReset, Rejected, UnexpectedFrameType};
use frame::{Frame, SettingSet};
use proto::*;
///
#[derive(Debug)]
pub struct StreamSendOpen<T> {
inner: T,
@@ -73,6 +74,16 @@ impl<T> Stream for StreamSendOpen<T>
}
}
impl<T: ControlStreams> StreamSendOpen<T> {
fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> {
// Ensure that the stream hasn't been closed otherwise.
match self.inner.get_reset(id) {
Some(reason) => Err(StreamReset(reason).into()),
None => Ok(()),
}
}
}
impl<T, U> Sink for StreamSendOpen<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
@@ -83,67 +94,62 @@ impl<T, U> Sink for StreamSendOpen<T>
fn start_send(&mut self, frame: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
let id = frame.stream_id();
trace!("start_send: id={:?}", id);
// Forward connection frames immediately.
if id.is_zero() {
if !frame.is_connection_frame() {
return Err(InvalidStreamId.into())
return Err(InvalidStreamId.into());
}
// Nothing to do on connection frames.
return self.inner.start_send(frame);
}
// Reset the stream immediately and send the Reset on the underlying transport.
if let &Frame::Reset(..) = &frame {
return self.inner.start_send(frame);
}
match &frame {
&Frame::Reset(..) => {}
// Ensure that the stream hasn't been closed otherwise.
if let Some(reason) = self.inner.get_reset(id) {
return Err(StreamReset(reason).into())
}
&Frame::Headers(..) => {
self.check_not_reset(id)?;
if T::local_valid_id(id) {
if self.inner.is_local_active(id) {
// Can't send a a HEADERS frame on a local stream that's active,
// because we've already sent headers. This will have to change
// to support PUSH_PROMISE.
return Err(UnexpectedFrameType.into());
}
if T::local_valid_id(id) {
if self.inner.is_local_active(id) {
if !T::local_can_open() {
// A server tried to start a stream with a HEADERS frame.
return Err(UnexpectedFrameType.into());
}
if let Some(max) = self.max_concurrency {
// Don't allow this stream to overflow the remote's max stream
// concurrency.
if (max as usize) < self.inner.local_active_len() {
return Err(Rejected.into());
}
}
self.inner.local_open(id, self.initial_window_size)?;
} else {
// On remote streams,
if self.inner.remote_open_send_half(id, self.initial_window_size).is_err() {
return Err(InvalidStreamId.into());
}
}
}
// This only handles other stream frames (data, window update, ...). Ensure
// the stream is open (i.e. has already sent headers).
_ => {
self.check_not_reset(id)?;
if !self.inner.can_send_data(id) {
return Err(InactiveStreamId.into());
}
} else {
if !T::local_can_open() {
return Err(InvalidStreamId.into());
}
if let Some(max) = self.max_concurrency {
if (max as usize) < self.inner.local_active_len() {
return Err(Rejected.into());
}
}
if let &Frame::Headers(..) = &frame {
self.inner.local_open(id, self.initial_window_size)?;
} else {
return Err(InactiveStreamId.into());
}
}
} else {
// If the frame is part of a remote stream, it MUST already exist.
if self.inner.is_remote_active(id) {
if let &Frame::Headers(..) = &frame {
self.inner.remote_open_send_half(id, self.initial_window_size)?;
}
} else {
return Err(InvalidStreamId.into());
}
}
if let &Frame::Data(..) = &frame {
// Ensures we've already sent headers for this stream.
if !self.inner.can_send_data(id) {
return Err(InactiveStreamId.into());
}
}
trace!("sending frame...");
return self.inner.start_send(frame);
self.inner.start_send(frame)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {

View File

@@ -213,8 +213,9 @@ impl<T, P: Peer> ControlStreams for StreamStore<T, P> {
}
fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
assert!(Self::local_valid_id(id));
debug_assert!(self.local_active.contains_key(&id));
if !Self::local_valid_id(id) {
return Err(ProtocolError.into());
}
match self.local_active.get_mut(&id) {
Some(s) => s.open_recv_half(sz).map(|_| {}),
@@ -223,9 +224,9 @@ impl<T, P: Peer> ControlStreams for StreamStore<T, P> {
}
fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
assert!(Self::remote_valid_id(id));
debug_assert!(Self::remote_can_open());
debug_assert!(self.remote_active.contains_key(&id));
if !Self::remote_valid_id(id) {
return Err(ProtocolError.into());
}
match self.remote_active.get_mut(&id) {
Some(s) => s.open_send_half(sz).map(|_| {}),

View File

@@ -241,7 +241,7 @@ mod client_request {
request.uri = "https://http2.akamai.com/".parse().unwrap();
let err = h2.send_request(0.into(), request, true).wait().unwrap_err();
assert_user_err!(err, InvalidStreamId);
assert_user_err!(err, UnexpectedFrameType);
}
#[test]
@@ -350,7 +350,7 @@ mod client_request {
request.uri = "https://http2.akamai.com/".parse().unwrap();
let err = h2.send_request(1.into(), request, true).wait().unwrap_err();
assert_user_err!(err, InactiveStreamId);
assert_user_err!(err, UnexpectedFrameType);
}
#[test]