wip: stream state management seems good, but tests fail

This commit is contained in:
Oliver Gould
2017-07-22 17:30:40 +00:00
parent d7042097c4
commit ab4f85ea2f
9 changed files with 445 additions and 292 deletions

View File

@@ -39,10 +39,10 @@ impl<T, U> StreamRecvOpen<T>
fn send_refuse(&mut self, id: StreamId) -> Poll<(), ConnectionError> {
debug_assert!(self.pending_refuse.is_none());
let f = frame::Reset::new(id, Reason::RefusedStream);
let f = frame::Reset::new(id, RefusedStream);
match self.inner.start_send(f.into())? {
AsyncSink::Ready => {
self.inner.reset_stream(id, Reason::RefusedStream);
self.inner.reset_stream(id, RefusedStream);
Ok(Async::Ready(()))
}
AsyncSink::NotReady(_) => {
@@ -51,6 +51,13 @@ impl<T, U> StreamRecvOpen<T>
}
}
}
fn send_pending_refuse(&mut self) -> Poll<(), ConnectionError> {
if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refuse(id));
}
Ok(Async::Ready(()))
}
}
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`.
@@ -99,18 +106,39 @@ impl<T, U> Stream for StreamRecvOpen<T>
// Since there's only one slot for pending refused streams, it must be cleared
// before polling a frame from the transport.
if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refuse(id));
}
try_ready!(self.send_pending_refuse());
trace!("poll");
loop {
let frame = match try_ready!(self.inner.poll()) {
None => return Ok(Async::Ready(None)),
Some(f) => f,
let frame = match self.inner.poll()? {
Async::NotReady => {
panic!("poll: NotReady");
}
Async::Ready(None) => {
panic!("poll: None");
}
Async::Ready(Some(f)) => {
trace!("poll: id={:?} eos={}", f.stream_id(), f.is_end_stream());
f
}
};
// let frame = match try_ready!(self.inner.poll()) {
// None => return Ok(Async::Ready(None)),
// Some(f) => f,
// };
let id = frame.stream_id();
trace!("poll: id={:?}", id);
if id.is_zero() {
if !frame.is_connection_frame() {
return Err(ProtocolError.into())
}
// Nothing to do on connection frames.
return Ok(Async::Ready(Some(frame)));
}
if let &Reset(_) = &frame {
// Resetting handled by StreamRecvClose.
return Ok(Async::Ready(Some(frame)));
}
@@ -129,20 +157,29 @@ impl<T, U> Stream for StreamRecvOpen<T>
if let Some(max) = self.max_concurrency {
if (max as usize) < self.inner.remote_active_len() {
return Err(RefusedStream.into());
let _ = self.send_refuse(id)?;
debug!("refusing stream that would exceed max_concurrency");
// There's no point in returning an error to the application.
continue;
}
}
self.inner.remote_open(id, self.initial_window_size)?;
}
} else {
// Receiving on local stream MUST be on active stream.
if !self.inner.is_local_active(id) && !frame.is_reset() {
// If the frame is part of a local stream, it MUST already exist.
if self.inner.is_local_active(id) {
if let &Headers(..) = &frame {
self.inner.local_open_recv_half(id, self.initial_window_size)?;
}
} else {
return Err(ProtocolError.into());
}
}
if let &Data(..) = &frame {
// Ensures we've already received headers for this stream.
self.inner.check_can_recv_data(id)?;
}
@@ -161,32 +198,17 @@ impl<T, U> Sink for StreamRecvOpen<T>
type SinkError = T::SinkError;
fn start_send(&mut self, frame: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
// The local must complete refusing the remote stream before sending any other
// frames.
if let Some(id) = self.pending_refuse.take() {
if self.send_refuse(id)?.is_not_ready() {
return Ok(AsyncSink::NotReady(frame));
}
}
let id = frame.stream_id();
if !id.is_zero() {
// enforced by StreamSend.
debug_assert!(self.inner.get_reset(id).is_none());
let eos = frame.is_end_stream();
if self.send_pending_refuse()?.is_not_ready() {
return Ok(AsyncSink::NotReady(frame));
}
self.inner.start_send(frame)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refuse(id));
}
try_ready!(self.send_pending_refuse());
self.inner.poll_complete()
}
}
@@ -332,12 +354,20 @@ impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> {
self.inner.remote_open(id, sz)
}
fn close_local_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_local_half(id)
fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
self.inner.local_open_recv_half(id, sz)
}
fn close_remote_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_remote_half(id)
fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
self.inner.remote_open_send_half(id, sz)
}
fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_send_half(id)
}
fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
self.inner.close_recv_half(id)
}
fn reset_stream(&mut self, id: StreamId, cause: Reason) {
@@ -364,20 +394,20 @@ impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> {
self.inner.remote_active_len()
}
fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.local_update_inital_window_size(old_sz, new_sz)
fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.update_inital_recv_window_size(old_sz, new_sz)
}
fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.remote_update_inital_window_size(old_sz, new_sz)
fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.update_inital_send_window_size(old_sz, new_sz)
}
fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.local_flow_controller(id)
fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.recv_flow_controller(id)
}
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id)
fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.send_flow_controller(id)
}
fn check_can_send_data(&mut self, id: StreamId) -> Result<(), ConnectionError> {