update comments
This commit is contained in:
@@ -3,7 +3,7 @@ use error::Reason::{ProtocolError, RefusedStream};
|
||||
use frame::{Frame, StreamId};
|
||||
use proto::*;
|
||||
|
||||
/// Tracks a connection's streams.
|
||||
/// Ensures that frames are received on open streams in the appropriate state.
|
||||
#[derive(Debug)]
|
||||
pub struct StreamRecvOpen<T> {
|
||||
inner: T,
|
||||
@@ -60,25 +60,7 @@ impl<T, U> StreamRecvOpen<T>
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`.
|
||||
///
|
||||
/// > Indicates the maximum number of concurrent streams that the senderg will allow. This
|
||||
/// > limit is directional: it applies to the number of streams that the sender permits
|
||||
/// > the receiver to create. Initially, there is no limit to this value. It is
|
||||
/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit
|
||||
/// > parallelism.
|
||||
/// >
|
||||
/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by
|
||||
/// > endpoints. A zero value does prevent the creation of new streams; however, this can
|
||||
/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only
|
||||
/// > set a zero value for short durations; if a server does not wish to accept requests,
|
||||
/// > closing the connection is more appropriate.
|
||||
///
|
||||
/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a
|
||||
/// > value that is below the current number of open streams can either close streams that
|
||||
/// > exceed the new value or allow streams to complete.
|
||||
///
|
||||
/// This module does NOT close streams when the setting changes.
|
||||
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS` from the local peer.
|
||||
impl<T> ApplySettings for StreamRecvOpen<T>
|
||||
where T: ApplySettings
|
||||
{
|
||||
@@ -93,6 +75,7 @@ impl<T> ApplySettings for StreamRecvOpen<T>
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper.
|
||||
impl<T: ControlStreams> StreamRecvOpen<T> {
|
||||
fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> {
|
||||
// Ensure that the stream hasn't been closed otherwise.
|
||||
@@ -103,6 +86,7 @@ impl<T: ControlStreams> StreamRecvOpen<T> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensures that frames are received on open streams in the appropriate state.
|
||||
impl<T, U> Stream for StreamRecvOpen<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
@@ -112,8 +96,6 @@ impl<T, U> Stream for StreamRecvOpen<T>
|
||||
type Error = T::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
|
||||
use frame::Frame::*;
|
||||
|
||||
// Since there's only one slot for pending refused streams, it must be cleared
|
||||
// before polling a frame from the transport.
|
||||
try_ready!(self.send_pending_refuse());
|
||||
@@ -142,6 +124,7 @@ impl<T, U> Stream for StreamRecvOpen<T>
|
||||
|
||||
&Frame::Headers(..) => {
|
||||
self.check_not_reset(id)?;
|
||||
|
||||
if T::remote_valid_id(id) {
|
||||
if self.inner.is_remote_active(id) {
|
||||
// Can't send a a HEADERS frame on a remote stream that's
|
||||
@@ -174,7 +157,7 @@ impl<T, U> Stream for StreamRecvOpen<T>
|
||||
// All other stream frames are sent only when
|
||||
_ => {
|
||||
self.check_not_reset(id)?;
|
||||
if !self.inner.can_recv_data(id) {
|
||||
if !self.inner.is_recv_open(id) {
|
||||
return Err(ProtocolError.into());
|
||||
}
|
||||
}
|
||||
@@ -187,7 +170,7 @@ impl<T, U> Stream for StreamRecvOpen<T>
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensures that a pending reset is
|
||||
/// Sends pending resets before operating on the underlying transport.
|
||||
impl<T, U> Sink for StreamRecvOpen<T>
|
||||
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
T: ControlStreams,
|
||||
@@ -211,7 +194,7 @@ impl<T, U> Sink for StreamRecvOpen<T>
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Sends pending resets before checking the underlying transport's readiness.
|
||||
impl<T, U> ReadySink for StreamRecvOpen<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
|
||||
@@ -227,110 +210,7 @@ impl<T, U> ReadySink for StreamRecvOpen<T>
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Some(Headers(v)) => {
|
||||
// let id = v.stream_id();
|
||||
// let eos = v.is_end_stream();
|
||||
|
||||
// if self.get_reset(id).is_some() {
|
||||
// // TODO send the remote errors when it sends us frames on reset
|
||||
// // streams.
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// if let Some(mut s) = self.get_active_mut(id) {
|
||||
// let created = s.recv_headers(eos, self.initial_window_size)?;
|
||||
// assert!(!created);
|
||||
// return Ok(Async::Ready(Some(Headers(v))));
|
||||
// }
|
||||
|
||||
// // Ensure that receiving this frame will not violate the local max
|
||||
// // stream concurrency setting. Ensure that the stream is refused
|
||||
// // before processing additional frames.
|
||||
// if let Some(max) = self.max_concurrency {
|
||||
// let max = max as usize;
|
||||
// if !self.local.is_active(id) && self.local.active_count() >= max - 1 {
|
||||
// // This frame would violate our local max concurrency, so reject
|
||||
// // the stream.
|
||||
// try_ready!(self.send_refusal(id));
|
||||
|
||||
// // Try to process another frame (hopefully for an active
|
||||
// // stream).
|
||||
// continue;
|
||||
// }
|
||||
// }
|
||||
|
||||
// let is_closed = {
|
||||
// let stream = self.active_streams.entry(id)
|
||||
// .or_insert_with(|| StreamState::default());
|
||||
|
||||
// let initialized =
|
||||
// stream.recv_headers(eos, self.initial_window_size)?;
|
||||
|
||||
// if initialized {
|
||||
// if !P::is_valid_remote_stream_id(id) {
|
||||
// return Err(Reason::ProtocolError.into());
|
||||
// }
|
||||
// }
|
||||
|
||||
// stream.is_closed()
|
||||
// };
|
||||
|
||||
// if is_closed {
|
||||
// self.active_streams.remove(id);
|
||||
// self.reset_streams.insert(id, Reason::NoError);
|
||||
// }
|
||||
|
||||
// return Ok(Async::Ready(Some(Headers(v))));
|
||||
// }
|
||||
|
||||
// Some(Data(v)) => {
|
||||
// let id = v.stream_id();
|
||||
|
||||
// if self.get_reset(id).is_some() {
|
||||
// // TODO send the remote errors when it sends us frames on reset
|
||||
// // streams.
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// let is_closed = {
|
||||
// let stream = match self.active_streams.get_mut(id) {
|
||||
// None => return Err(Reason::ProtocolError.into()),
|
||||
// Some(s) => s,
|
||||
// };
|
||||
// stream.recv_data(v.is_end_stream())?;
|
||||
// stream.is_closed()
|
||||
// };
|
||||
|
||||
// if is_closed {
|
||||
// self.reset(id, Reason::NoError);
|
||||
// }
|
||||
|
||||
// return Ok(Async::Ready(Some(Data(v))));
|
||||
// }
|
||||
|
||||
// Some(Reset(v)) => {
|
||||
// // Set or update the reset reason.
|
||||
// self.reset(v.stream_id(), v.reason());
|
||||
// return Ok(Async::Ready(Some(Reset(v))));
|
||||
// }
|
||||
|
||||
// Some(f) => {
|
||||
// let id = f.stream_id();
|
||||
|
||||
// if self.get_reset(id).is_some() {
|
||||
// // TODO send the remote errors when it sends us frames on reset
|
||||
// // streams.
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// return Ok(Async::Ready(Some(f)));
|
||||
// }
|
||||
|
||||
// None => {
|
||||
// return Ok(Async::Ready(None));
|
||||
// }
|
||||
|
||||
/// Proxy.
|
||||
impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> {
|
||||
fn local_valid_id(id: StreamId) -> bool {
|
||||
T::local_valid_id(id)
|
||||
@@ -392,11 +272,11 @@ impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> {
|
||||
self.inner.remote_active_len()
|
||||
}
|
||||
|
||||
fn update_inital_recv_window_size(&mut self, old_sz: u32, new_sz: u32) {
|
||||
fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) {
|
||||
self.inner.update_inital_recv_window_size(old_sz, new_sz)
|
||||
}
|
||||
|
||||
fn update_inital_send_window_size(&mut self, old_sz: u32, new_sz: u32) {
|
||||
fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) {
|
||||
self.inner.update_inital_send_window_size(old_sz, new_sz)
|
||||
}
|
||||
|
||||
@@ -408,15 +288,16 @@ impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> {
|
||||
self.inner.send_flow_controller(id)
|
||||
}
|
||||
|
||||
fn can_send_data(&mut self, id: StreamId) -> bool {
|
||||
self.inner.can_send_data(id)
|
||||
fn is_send_open(&mut self, id: StreamId) -> bool {
|
||||
self.inner.is_send_open(id)
|
||||
}
|
||||
|
||||
fn can_recv_data(&mut self, id: StreamId) -> bool {
|
||||
self.inner.can_recv_data(id)
|
||||
fn is_recv_open(&mut self, id: StreamId) -> bool {
|
||||
self.inner.is_recv_open(id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Proxy.
|
||||
impl<T: ControlPing> ControlPing for StreamRecvOpen<T> {
|
||||
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
|
||||
self.inner.start_ping(body)
|
||||
|
||||
Reference in New Issue
Block a user