wip: refactor, compiles

This commit is contained in:
Oliver Gould
2017-07-20 14:51:27 +00:00
parent 0d84c98c89
commit a62d3dda54
17 changed files with 1395 additions and 1052 deletions

View File

@@ -80,6 +80,21 @@ impl<T> Frame<T> {
&Settings(_) => StreamId::zero(),
}
}
pub fn is_end_stream(&self) -> bool {
use self::Frame::*;
match self {
&Headers(ref v) => v.is_end_stream(),
&Data(ref v) => v.is_end_stream(),
&Reset(ref v) => true,
&PushPromise(_) |
&WindowUpdate(_) |
&Ping(_) |
&Settings(_) => false,
}
}
}
/// Errors that can occur during parsing an HTTP/2 frame.

View File

@@ -16,6 +16,15 @@ impl Reset {
error_code: error.into(),
}
}
pub fn stream_id(&self) -> StreamId {
self.stream_id
}
pub fn reason(&self) -> Reason {
self.error_code.into()
}
pub fn load(head: Head, payload: &[u8]) -> Result<Reset, Error> {
if payload.len() != 4 {
return Err(Error::InvalidPayloadLength);
@@ -35,14 +44,6 @@ impl Reset {
head.encode(4, dst);
dst.put_u32::<BigEndian>(self.error_code);
}
pub fn stream_id(&self) -> StreamId {
self.stream_id
}
pub fn reason(&self) -> Reason {
self.error_code.into()
}
}
impl<B> From<Reset> for frame::Frame<B> {

View File

@@ -85,7 +85,6 @@ pub trait Peer {
/// remote node.
fn is_valid_remote_stream_id(id: StreamId) -> bool;
fn can_create_local_stream() -> bool;
fn can_create_remote_stream() -> bool {
!Self::can_create_local_stream()

View File

@@ -3,6 +3,7 @@ use client::Client;
use error;
use frame::{self, SettingSet, StreamId};
use proto::*;
use proto::ping_pong::PingPayload;
use server::Server;
use bytes::{Bytes, IntoBuf};
@@ -32,37 +33,23 @@ pub fn new<T, P, B>(transport: Transport<T, P, B::Buf>)
}
}
// impl<T, P, B> ControlSettings for Connection<T, P, B>
// where T: AsyncRead + AsyncWrite,
// B: IntoBuf,
// {
// fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> {
// self.inner.update_local_settings(local)
// }
impl<T, P, B> ControlSettings for Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.update_local_settings(local)
}
// fn local_settings(&self) -> &SettingSet {
// self.inner.local_settings()
// }
fn local_settings(&self) -> &SettingSet {
self.inner.local_settings()
}
// fn remote_settings(&self) -> &SettingSet {
// self.inner.remote_settings()
// }
// }
fn remote_settings(&self) -> &SettingSet {
self.inner.remote_settings()
}
}
impl<T, P, B> ControlPing for Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
impl<T, P, B> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
@@ -146,21 +133,7 @@ impl<T, P, B> Stream for Connection<T, P, B>
}
loop {
let frame = match try!(self.inner.poll()) {
Async::Ready(f) => f,
// XXX is this necessary?
Async::NotReady => {
// Receiving new frames may depend on ensuring that the write buffer
// is clear (e.g. if window updates need to be sent), so `poll_complete`
// is called here.
try_ready!(self.poll_complete());
// If the write buffer is cleared, attempt to poll the underlying
// stream once more because it, may have been made ready.
try_ready!(self.inner.poll())
}
};
let frame = try_ready!(self.inner.poll());
trace!("poll; frame={:?}", frame);
let frame = match frame {
@@ -214,34 +187,20 @@ impl<T, P, B> Sink for Connection<T, P, B>
return Ok(AsyncSink::NotReady(item));
}
match item {
let frame = match item {
Frame::Headers { id, headers, end_of_stream } => {
// This is a one-way conversion. By checking `poll_ready` first (above),
// it's already been determined that the inner `Sink` can accept the item.
// If the item is rejected, then there is a bug.
let frame = P::convert_send_message(id, headers, end_of_stream);
let res = self.inner.start_send(frame::Frame::Headers(frame))?;
assert!(res.is_ready());
Ok(AsyncSink::Ready)
let f = P::convert_send_message(id, headers, end_of_stream);
frame::Frame::Headers(f)
}
Frame::Data { id, data, end_of_stream } => {
if self.inner.stream_is_reset(id).is_some() {
return Err(error::User::StreamReset.into());
}
let frame = frame::Data::from_buf(id, data.into_buf(), end_of_stream);
let res = try!(self.inner.start_send(frame.into()));
assert!(res.is_ready());
Ok(AsyncSink::Ready)
frame::Data::from_buf(id, data.into_buf(), end_of_stream).into()
}
Frame::Reset { id, error } => {
let f = frame::Reset::new(id, error);
let res = self.inner.start_send(f.into())?;
assert!(res.is_ready());
Ok(AsyncSink::Ready)
}
Frame::Reset { id, error } => frame::Reset::new(id, error).into(),
/*
Frame::Trailers { id, headers } => {
@@ -255,7 +214,11 @@ impl<T, P, B> Sink for Connection<T, P, B>
}
*/
_ => unimplemented!(),
}
};
let res = self.inner.start_send(frame)?;
assert!(res.is_ready());
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {

View File

@@ -4,6 +4,20 @@ use proto::*;
use std::collections::VecDeque;
/// Exposes flow control states to "upper" layers of the transport (i.e. above
/// FlowControl).
pub trait ControlFlow {
/// Polls for the next window update from the remote.
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError>;
/// Increases the local receive capacity of a stream.
///
/// This may cause a window update to be sent to the remote.
///
/// Fails if the given stream is not active.
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>;
}
#[derive(Debug)]
pub struct FlowControl<T> {
inner: T,
@@ -80,30 +94,6 @@ impl<T: ControlStreams> FlowControl<T> {
}
}
}
/// Proxies access to streams.
impl<T: ControlStreams> ControlStreams for FlowControl<T> {
fn local_streams(&self) -> &StreamMap {
self.inner.local_streams()
}
fn local_streams_mut(&mut self) -> &mut StreamMap {
self.inner.local_streams_mut()
}
fn remote_streams(&self) -> &StreamMap {
self.inner.local_streams()
}
fn remote_streams_mut(&mut self) -> &mut StreamMap {
self.inner.local_streams_mut()
}
fn is_valid_local_id(id: StreamId) -> bool {
T::is_valid_local_id(id)
}
}
/// Exposes a public upward API for flow control.
impl<T: ControlStreams> ControlFlow for FlowControl<T> {
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
@@ -139,7 +129,7 @@ impl<T: ControlStreams> ControlFlow for FlowControl<T> {
self.local_pending_streams.push_back(id);
}
Ok(())
} else if let Some(rst) = self.get_reset(id) {
} else if let Some(rst) = self.inner.get_reset(id) {
Err(error::User::StreamReset(rst).into())
} else {
Err(error::User::InvalidStreamId.into())
@@ -147,16 +137,6 @@ impl<T: ControlStreams> ControlFlow for FlowControl<T> {
}
}
impl<T: ControlPing> ControlPing for FlowControl<T> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
impl<T, U> FlowControl<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
@@ -172,7 +152,7 @@ impl<T, U> FlowControl<T>
}
while let Some(id) = self.local_pending_streams.pop_front() {
if self.stream_is_reset(id).is_none() {
if self.inner.get_reset(id).is_none() {
let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update());
if let Some(incr) = update {
try_ready!(self.try_send(frame::WindowUpdate::new(id, incr)));
@@ -193,71 +173,6 @@ impl<T, U> FlowControl<T>
}
}
/// Applies an update to an endpoint's initial window size.
///
/// Per RFC 7540 §6.9.2:
///
/// > In addition to changing the flow-control window for streams that are not yet
/// > active, a SETTINGS frame can alter the initial flow-control window size for
/// > streams with active flow-control windows (that is, streams in the "open" or
/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE
/// > changes, a receiver MUST adjust the size of all stream flow-control windows that
/// > it maintains by the difference between the new value and the old value.
/// >
/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a
/// > flow-control window to become negative. A sender MUST track the negative
/// > flow-control window and MUST NOT send new flow-controlled frames until it
/// > receives WINDOW_UPDATE frames that cause the flow-control window to become
/// > positive.
impl<T> ApplySettings for FlowControl<T>
where T: ApplySettings,
T: ControlStreams
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)?;
let old_window_size = self.local_initial;
let new_window_size = set.initial_window_size();
if new_window_size == old_window_size {
return Ok(());
}
let mut streams = self.inner.streams_mut();
if new_window_size < old_window_size {
let decr = old_window_size - new_window_size;
streams.shrink_all_local_windows(decr);
} else {
let incr = new_window_size - old_window_size;
streams.expand_all_local_windows(incr);
}
self.local_initial = new_window_size;
Ok(())
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)?;
let old_window_size = self.remote_initial;
let new_window_size = set.initial_window_size();
if new_window_size == old_window_size {
return Ok(());
}
let mut streams = self.inner.streams_mut();
if new_window_size < old_window_size {
let decr = old_window_size - new_window_size;
streams.shrink_all_remote_windows(decr);
} else {
let incr = new_window_size - old_window_size;
streams.expand_all_remote_windows(incr);
}
self.remote_initial = new_window_size;
Ok(())
}
}
impl<T> Stream for FlowControl<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: ControlStreams,
@@ -310,7 +225,7 @@ impl<T, U> Sink for FlowControl<T>
fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
debug_assert!(self.stream_is_reset(frame.stream_id()).is_none());
debug_assert!(self.inner.get_reset(frame.stream_id()).is_none());
// Ensures that:
// 1. all pending local window updates have been sent to the remote.
@@ -333,8 +248,7 @@ impl<T, U> Sink for FlowControl<T>
// Ensure there's enough capacity on stream.
{
let mut fc = self.streams_mut()
.remote_flow_controller(v.stream_id())
let mut fc = self.inner.remote_flow_controller(v.stream_id())
.expect("no remote stream for data frame");
if fc.claim_window(sz).is_err() {
return Err(error::User::FlowControlViolation.into())
@@ -367,3 +281,116 @@ impl<T, U> ReadySink for FlowControl<T>
self.inner.poll_ready()
}
}
/// Applies an update to an endpoint's initial window size.
///
/// Per RFC 7540 §6.9.2:
///
/// > In addition to changing the flow-control window for streams that are not yet
/// > active, a SETTINGS frame can alter the initial flow-control window size for
/// > streams with active flow-control windows (that is, streams in the "open" or
/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE
/// > changes, a receiver MUST adjust the size of all stream flow-control windows that
/// > it maintains by the difference between the new value and the old value.
/// >
/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a
/// > flow-control window to become negative. A sender MUST track the negative
/// > flow-control window and MUST NOT send new flow-controlled frames until it
/// > receives WINDOW_UPDATE frames that cause the flow-control window to become
/// > positive.
impl<T> ApplySettings for FlowControl<T>
where T: ApplySettings,
T: ControlStreams
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)?;
let old_window_size = self.local_initial;
let new_window_size = set.initial_window_size();
if new_window_size == old_window_size {
return Ok(());
}
self.inner.local_update_inital_window_size(old_window_size, new_window_size);
self.local_initial = new_window_size;
Ok(())
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)?;
let old_window_size = self.remote_initial;
let new_window_size = set.initial_window_size();
if new_window_size == old_window_size {
return Ok(());
}
self.inner.remote_update_inital_window_size(old_window_size, new_window_size);
self.remote_initial = new_window_size;
Ok(())
}
}
impl<T: ControlStreams> ControlStreams for FlowControl<T> {
fn is_valid_local_id(id: StreamId) -> bool {
T::is_valid_local_id(id)
}
fn is_valid_remote_id(id: StreamId) -> bool {
T::is_valid_remote_id(id)
}
fn can_create_local_stream() -> bool {
T::can_create_local_stream()
}
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}
fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause)
}
fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id)
}
fn is_remote_active(&self, id: StreamId) -> bool {
self.inner.is_remote_active(id)
}
fn local_active_len(&self) -> usize {
self.inner.local_active_len()
}
fn remote_active_len(&self) -> usize {
self.inner.remote_active_len()
}
fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.local_update_inital_window_size(old_sz, new_sz)
}
fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.remote_update_inital_window_size(old_sz, new_sz)
}
fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.local_flow_controller(id)
}
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id)
}
}
impl<T: ControlPing> ControlPing for FlowControl<T> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}

View File

@@ -1,6 +1,6 @@
use {frame, ConnectionError, Peer, StreamId};
use error::Reason;
use frame::SettingSet;
use frame::{Frame, SettingSet};
use bytes::{Buf, IntoBuf};
use futures::*;
@@ -16,21 +16,27 @@ mod ping_pong;
mod ready;
mod settings;
mod state;
mod stream_recv;
mod stream_send;
mod stream_recv_close;
mod stream_recv_open;
mod stream_send_close;
mod stream_send_open;
mod stream_store;
pub use self::connection::Connection;
pub use self::flow_control::FlowControl;
pub use self::flow_control_state::{FlowControlState, WindowUnderflow};
pub use self::framed_read::FramedRead;
pub use self::framed_write::FramedWrite;
pub use self::ping_pong::PingPong;
pub use self::ready::ReadySink;
pub use self::settings::Settings;
pub use self::stream_recv::StreamRecv;
pub use self::stream_send::StreamSend;
use self::state::{StreamMap, StreamState};
use self::flow_control::{ControlFlow, FlowControl};
use self::flow_control_state::{FlowControlState, WindowUnderflow};
use self::framed_read::FramedRead;
use self::framed_write::FramedWrite;
use self::ping_pong::{ControlPing, PingPayload, PingPong};
use self::ready::ReadySink;
use self::settings::{ApplySettings, ControlSettings, Settings};
use self::state::{StreamState, PeerState};
use self::stream_recv_close::StreamRecvClose;
use self::stream_recv_open::StreamRecvOpen;
use self::stream_send_close::StreamSendClose;
use self::stream_send_open::StreamSendOpen;
use self::stream_store::{ControlStreams, StreamStore};
/// Represents the internals of an HTTP/2 connection.
///
@@ -91,10 +97,12 @@ type Transport<T, P, B>=
P>>;
type Streams<T, P> =
StreamSend<
FlowControl<
StreamRecv<T, P>>,
P>;
StreamSendOpen<
StreamRecvClose<
FlowControl<
StreamSendClose<
StreamRecvOpen<
StreamStore<T, P>>>>>>;
type Codec<T, B> =
FramedRead<
@@ -102,21 +110,6 @@ type Codec<T, B> =
pub type WindowSize = u32;
/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and
/// above---Connection).
pub trait ControlSettings {
fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>;
fn local_settings(&self) -> &SettingSet;
fn remote_settings(&self) -> &SettingSet;
}
/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to
/// FramedWrite).
pub trait ApplySettings {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>;
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>;
}
#[derive(Debug, Copy, Clone)]
pub struct WindowUpdate {
stream_id: StreamId,
@@ -137,61 +130,6 @@ impl WindowUpdate {
}
}
/// Exposes flow control states to "upper" layers of the transport (i.e. above
/// FlowControl).
pub trait ControlFlow {
/// Polls for the next window update from the remote.
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError>;
/// Increases the local receive capacity of a stream.
///
/// This may cause a window update to be sent to the remote.
///
/// Fails if the given stream is not active.
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>;
}
/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up
/// to Connection).
pub trait ControlStreams {
fn is_valid_local_id(id: StreamId) -> bool;
fn is_valid_remote_id(id: StreamId) -> bool {
!id.is_zero() && !Self::is_valid_local_id(id)
}
fn get_active(&self, id: StreamId) -> Option<&StreamState> {
self.streams(id).get_active(id)
}
fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
self.streams_mut(id).get_active_mut(id)
}
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.streams(id).get_reset(id)
}
fn reset(&mut self, id: StreamId, cause: Reason) {
self.streams_mut(id).reset(id, cause);
}
fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.streams_mut(id).local_flow_controller(id)
}
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.streams_mut(id).remote_flow_controller(id)
}
}
pub type PingPayload = [u8; 8];
pub trait ControlPing {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError>;
fn take_pong(&mut self) -> Option<PingPayload>;
}
/// Create a full H2 transport from an I/O handle.
///
/// This is called as the final step of the client handshake future.
@@ -249,17 +187,20 @@ pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf>
.num_skip(0) // Don't skip the header
.new_read(io);
StreamSend::new(
StreamSendOpen::new(
initial_remote_window_size,
remote_max_concurrency,
FlowControl::new(
initial_local_window_size,
initial_remote_window_size,
StreamRecv::new(
StreamRecvClose::new(
FlowControl::new(
initial_local_window_size,
local_max_concurrency,
PingPong::new(
FramedRead::new(framed)))))
initial_remote_window_size,
StreamSendClose::new(
StreamRecvOpen::new(
initial_local_window_size,
local_max_concurrency,
StreamStore::new(
PingPong::new(
FramedRead::new(framed))))))))
});
connection::new(transport)

View File

@@ -1,9 +1,16 @@
use ConnectionError;
use {ConnectionError, StreamId};
use frame::{Frame, Ping, SettingSet};
use proto::{ApplySettings, ControlPing, PingPayload, ReadySink};
use proto::{ApplySettings, ReadySink, ControlStreams, FlowControlState};
use futures::*;
pub type PingPayload = [u8; 8];
pub trait ControlPing {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError>;
fn take_pong(&mut self) -> Option<PingPayload>;
}
/// Acknowledges ping requests from the remote.
#[derive(Debug)]
pub struct PingPong<T, U> {
@@ -29,16 +36,6 @@ impl<T, U> PingPong<T, U>
}
}
impl<T: ApplySettings, U> ApplySettings for PingPong<T, U> {
fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
impl<T, U> ControlPing for PingPong<T, U>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
@@ -172,6 +169,16 @@ impl<T, U> ReadySink for PingPong<T, U>
}
}
impl<T: ApplySettings, U> ApplySettings for PingPong<T, U> {
fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -7,8 +7,21 @@ use bytes::BufMut;
use std::io;
/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and
/// above---Connection).
pub trait ControlSettings {
fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>;
fn local_settings(&self) -> &SettingSet;
fn remote_settings(&self) -> &SettingSet;
}
/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to
/// FramedWrite).
pub trait ApplySettings {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>;
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>;
}
// TODO
#[derive(Debug)]
pub struct Settings<T> {
// Upstream transport
@@ -95,48 +108,6 @@ impl<T, U> Settings<T>
}
}
impl<T: ControlStreams> ControlStreams for Settings<T> {
fn local_streams(&self) -> &StreamMap {
self.inner.local_streams()
}
fn local_streams_mut(&mut self) -> &mut StreamMap {
self.inner.local_streams_mut()
}
fn remote_streams(&self) -> &StreamMap {
self.inner.local_streams()
}
fn remote_streams_mut(&mut self) -> &mut StreamMap {
self.inner.local_streams_mut()
}
fn is_valid_local_id(id: StreamId) -> bool {
T::is_valid_local_id(id)
}
}
impl<T: ControlFlow> ControlFlow for Settings<T> {
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
self.inner.poll_window_update()
}
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
self.inner.expand_window(id, incr)
}
}
impl<T: ControlPing> ControlPing for Settings<T> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
impl<T> ControlSettings for Settings<T>{
fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> {
self.local = local;
@@ -244,3 +215,23 @@ impl<T: AsyncRead> AsyncRead for Settings<T> {
self.inner.prepare_uninitialized_buffer(buf)
}
}
impl<T: ControlFlow> ControlFlow for Settings<T> {
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
self.inner.poll_window_update()
}
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
self.inner.expand_window(id, incr)
}
}
impl<T: ControlPing> ControlPing for Settings<T> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}

View File

@@ -291,104 +291,3 @@ impl PeerState {
}
}
}
// TODO track reserved streams
// TODO constrain the size of `reset`
#[derive(Debug, Default)]
pub struct StreamMap<P> {
/// Holds active streams initiated by the local endpoint.
local_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote endpoint.
remote_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote.
reset: OrderMap<StreamId, Reason, BuildHasherDefault<FnvHasher>>,
_phantom: PhantomData<P>,
}
impl<P: Peer> StreamMap<P> {
pub fn active(&mut self, id: StreamId) -> Option<&StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.get(id)
} else {
self.remote_active.get(id)
}
}
pub fn active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(id)
} else {
self.remote_active.get_mut(id)
}
}
pub fn local_active(&self, id: StreamId) -> Option<&StreamState> {
self.local_active.get(&id)
}
pub fn local_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
self.local_active.get_mut(&id)
}
pub fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.get_active_mut(id).and_then(|s| s.local_flow_controller())
}
pub fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.get_active_mut(id).and_then(|s| s.remote_flow_controller())
}
pub fn localis_active(&mut self, id: StreamId) -> bool {
self.active.contains_key(&id)
}
pub fn active_count(&self) -> usize {
self.active.len()
}
pub fn reset(&mut self, id: StreamId, cause: Reason) {
self.reset.insert(id, cause);
self.active.remove(&id);
}
pub fn get_reset(&mut self, id: StreamId) -> Option<Reason> {
self.reset.get(&id).map(|r| *r)
}
pub fn shrink_all_local_windows(&mut self, decr: u32) {
for (_, mut s) in &mut self.active {
if let Some(fc) = s.local_flow_controller() {
fc.shrink_window(decr);
}
}
}
pub fn expand_all_local_windows(&mut self, incr: u32) {
for (_, mut s) in &mut self.active {
if let Some(fc) = s.local_flow_controller() {
fc.expand_window(incr);
}
}
}
pub fn shrink_all_remote_windows(&mut self, decr: u32) {
for (_, mut s) in &mut self.active {
if let Some(fc) = s.remote_flow_controller() {
fc.shrink_window(decr);
}
}
}
pub fn expand_all_remote_windows(&mut self, incr: u32) {
for (_, mut s) in &mut self.active {
if let Some(fc) = s.remote_flow_controller() {
fc.expand_window(incr);
}
}
}
}

View File

@@ -1,407 +0,0 @@
use ConnectionError;
use client::Client;
use error::Reason;
use error::User;
use frame::{self, Frame};
use proto::*;
use server::Server;
use fnv::FnvHasher;
use ordermap::OrderMap;
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
// TODO track "last stream id" for GOAWAY.
// TODO track/provide "next" stream id.
// TODO reset_streams needs to be bounded.
// TODO track reserved streams (PUSH_PROMISE).
/// Tracks a connection's streams.
#[derive(Debug)]
pub struct StreamRecv<T, P> {
inner: T,
peer: PhantomData<P>,
local: StreamMap,
local_max_concurrency: Option<u32>,
local_initial_window_size: WindowSize,
remote: StreamMap,
remote_max_concurrency: Option<u32>,
remote_initial_window_size: WindowSize,
remote_pending_refuse: Option<StreamId>,
}
impl<T, P, U> StreamRecv<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer
{
pub fn new(initial_window_size: WindowSize,
max_concurrency: Option<u32>,
inner: T)
-> StreamRecv<T, P>
{
StreamRecv {
inner,
peer: PhantomData,
local: StreamMap::default(),
remote: StreamMap::default(),
max_concurrency,
initial_window_size,
remote_pending_refuse: None,
}
}
pub fn try_open_remote(&mut self, frame: Frame) -> Result<(), ConnectionError> {
unimplemented!()
}
pub fn try_close(&mut self, frame: Frame) -> Result<(), ConnectionError> {
unimplemented!()
}
}
impl<T, P, U> StreamRecv<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer
{
fn send_refusal(&mut self, id: StreamId) -> Poll<(), ConnectionError> {
debug_assert!(self.remote_pending_refused.is_none());
let f = frame::Reset::new(id, Reason::RefusedStream);
match self.inner.start_send(f.into())? {
AsyncSink::Ready => {
self.reset(id, Reason::RefusedStream);
Ok(Async::Ready(()))
}
AsyncSink::NotReady(_) => {
self.pending_refused = Some(id);
Ok(Async::NotReady)
}
}
}
}
impl<T, P> ControlStreams for StreamRecv<T, P>
where P: Peer
{
fn local_streams(&self) -> &StreamMap {
&self.local
}
fn local_streams_mut(&mut self) -> &mut StreamMap {
&mut self.local
}
fn remote_streams(&self) -> &StreamMap {
&self.remote
}
fn remote_streams_mut(&mut self) -> &mut StreamMap {
&mut self.remote
}
fn is_valid_local_id(id: StreamId) -> bool {
P::is_valid_local_stream_id(id)
}
}
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`.
///
/// > Indicates the maximum number of concurrent streams that the senderg will allow. This
/// > limit is directional: it applies to the number of streams that the sender permits
/// > the receiver to create. Initially, there is no limit to this value. It is
/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit
/// > parallelism.
/// >
/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by
/// > endpoints. A zero value does prevent the creation of new streams; however, this can
/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only
/// > set a zero value for short durations; if a server does not wish to accept requests,
/// > closing the connection is more appropriate.
///
/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a
/// > value that is below the current number of open streams can either close streams that
/// > exceed the new value or allow streams to complete.
///
/// This module does NOT close streams when the setting changes.
impl<T, P> ApplySettings for StreamRecv<T, P>
where T: ApplySettings
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.max_concurrency = set.max_concurrent_streams();
self.initial_window_size = set.initial_window_size();
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
impl<T, P> ControlPing for StreamRecv<T, P>
where T: ControlPing
{
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
impl<T, P, U> Stream for StreamRecv<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
use frame::Frame::*;
// Since there's only one slot for pending refused streams, it must be cleared
// before polling a frame from the transport.
if let Some(id) = self.pending_refused.take() {
try_ready!(self.send_refusal(id));
}
loop {
match try_ready!(self.inner.poll()) {
Some(Headers(v)) => {
let id = v.stream_id();
let eos = v.is_end_stream();
if self.get_reset(id).is_some() {
// TODO send the remote errors when it sends us frames on reset
// streams.
continue;
}
if let Some(mut s) = self.get_active_mut(id) {
let created = s.recv_headers(eos, self.initial_window_size)?;
assert!(!created);
return Ok(Async::Ready(Some(Headers(v))));
}
// Ensure that receiving this frame will not violate the local max
// stream concurrency setting. Ensure that the stream is refused
// before processing additional frames.
if let Some(max) = self.max_concurrency {
let max = max as usize;
if !self.local.is_active(id) && self.local.active_count() >= max - 1 {
// This frame would violate our local max concurrency, so reject
// the stream.
try_ready!(self.send_refusal(id));
// Try to process another frame (hopefully for an active
// stream).
continue;
}
}
let is_closed = {
let stream = self.active_streams.entry(id)
.or_insert_with(|| StreamState::default());
let initialized =
stream.recv_headers(eos, self.initial_window_size)?;
if initialized {
if !P::is_valid_remote_stream_id(id) {
return Err(Reason::ProtocolError.into());
}
}
stream.is_closed()
};
if is_closed {
self.active_streams.remove(id);
self.reset_streams.insert(id, Reason::NoError);
}
return Ok(Async::Ready(Some(Headers(v))));
}
Some(Data(v)) => {
let id = v.stream_id();
if self.get_reset(id).is_some() {
// TODO send the remote errors when it sends us frames on reset
// streams.
continue;
}
let is_closed = {
let stream = match self.active_streams.get_mut(id) {
None => return Err(Reason::ProtocolError.into()),
Some(s) => s,
};
stream.recv_data(v.is_end_stream())?;
stream.is_closed()
};
if is_closed {
self.reset(id, Reason::NoError);
}
return Ok(Async::Ready(Some(Data(v))));
}
Some(Reset(v)) => {
// Set or update the reset reason.
self.reset(v.stream_id(), v.reason());
return Ok(Async::Ready(Some(Reset(v))));
}
Some(f) => {
let id = f.stream_id();
if self.get_reset(id).is_some() {
// TODO send the remote errors when it sends us frames on reset
// streams.
continue;
}
return Ok(Async::Ready(Some(f)));
}
None => {
return Ok(Async::Ready(None));
}
}
}
}
}
impl<T, P, U> Sink for StreamRecv<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, frame: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
// Must be enforced through higher levels.
debug_assert!(self.stream_is_reset(item.stream_id()).is_none());
// The local must complete refusing the remote stream before sending any other
// frames.
if let Some(id) = self.pending_refused.take() {
if self.send_refusal(id)?.is_not_ready() {
return Ok(AsyncSink::NotReady(item));
}
}
match frame {
Headers(v) => {
let id = v.stream_id();
let eos = v.is_end_stream();
// Transition the stream state, creating a new entry if needed
//
// TODO: Response can send multiple headers frames before body (1xx
// responses).
//
// ACTUALLY(ver), maybe not?
// https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784
// Ensure that sending this frame would not violate the remote's max
// stream concurrency setting.
if let Some(max) = self.remote_max_concurrency {
let max = max as usize;
if !self.active_streams.has_stream(id)
&& self.active_streams.len() >= max - 1 {
// This frame would violate our local max concurrency, so reject
// the stream.
if self.send_refusal(id)?.is_not_ready() {
return Ok(AsyncSink::NotReady(Headers(v)));
}
// Try to process another frame (hopefully for an active
// stream).
return Err(User::MaxConcurrencyExceeded.into())
}
}
let is_closed = {
let stream = self.active_streams.entry(id)
.or_insert_with(|| StreamState::default());
let initialized =
stream.send_headers::<P>(eos, self.initial_remote_window_size)?;
if initialized {
// TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
if !P::is_valid_local_stream_id(id) {
// TODO: clear state
return Err(User::InvalidStreamId.into());
}
}
stream.is_closed()
};
if is_closed {
self.active_streams.remove(id);
self.reset_streams.insert(id, Reason::NoError);
}
self.inner.start_send(Headers(v))
}
Data(v) => {
match self.active_streams.get_mut(v.stream_id()) {
None => return Err(User::InactiveStreamId.into()),
Some(stream) => {
stream.send_data(v.is_end_stream())?;
self.inner.start_send(Data(v))
}
}
}
Reset(v) => {
let id = v.stream_id();
self.active_streams.remove(id);
self.reset_streams.insert(id, v.reason());
self.inner.start_send(Reset(v))
}
frame => self.inner.start_send(frame),
}
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
if let Some(id) = self.pending_refused.take() {
try_ready!(self.send_refusal(id));
}
self.inner.poll_complete()
}
}
impl<T, P, U> ReadySink for StreamRecv<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
P: Peer,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
if let Some(id) = self.pending_refused.take() {
try_ready!(self.send_refusal(id));
}
self.inner.poll_ready()
}
}

View File

@@ -0,0 +1,157 @@
use {ConnectionError};
use error::Reason;
use error::User;
use frame::{self, Frame};
use proto::*;
use proto::ready::ReadySink;
use fnv::FnvHasher;
use futures::*;
use ordermap::OrderMap;
use std::hash::BuildHasherDefault;
// TODO track "last stream id" for GOAWAY.
// TODO track/provide "next" stream id.
// TODO reset_streams needs to be bounded.
// TODO track reserved streams (PUSH_PROMISE).
#[derive(Debug)]
pub struct StreamRecvClose<T> {
inner: T,
}
impl<T, U> StreamRecvClose<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
pub fn new(inner: T) -> StreamRecvClose<T> {
StreamRecvClose { inner }
}
}
impl<T> Stream for StreamRecvClose<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: ControlStreams,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
use frame::Frame::*;
let frame = try_ready!(self.inner.poll());
unimplemented!()
}
}
impl<T, U> Sink for StreamRecvClose<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
type SinkItem = Frame<U>;
type SinkError = ConnectionError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Frame<U>, ConnectionError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_complete()
}
}
impl<T, U> ReadySink for StreamRecvClose<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
T: ControlStreams,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}
impl<T: ControlStreams> ControlStreams for StreamRecvClose<T> {
fn is_valid_local_id(id: StreamId) -> bool {
T::is_valid_local_id(id)
}
fn is_valid_remote_id(id: StreamId) -> bool {
T::is_valid_remote_id(id)
}
fn can_create_local_stream() -> bool {
T::can_create_local_stream()
}
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}
fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause)
}
fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id)
}
fn is_remote_active(&self, id: StreamId) -> bool {
self.inner.is_remote_active(id)
}
fn local_active_len(&self) -> usize {
self.inner.local_active_len()
}
fn remote_active_len(&self) -> usize {
self.inner.remote_active_len()
}
fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.local_update_inital_window_size(old_sz, new_sz)
}
fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.remote_update_inital_window_size(old_sz, new_sz)
}
fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.local_flow_controller(id)
}
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id)
}
}
impl<T: ApplySettings> ApplySettings for StreamRecvClose<T> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
impl<T: ControlFlow> ControlFlow for StreamRecvClose<T> {
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
self.inner.poll_window_update()
}
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
self.inner.expand_window(id, incr)
}
}
impl<T: ControlPing> ControlPing for StreamRecvClose<T> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}

View File

@@ -0,0 +1,352 @@
use ConnectionError;
use frame::{Frame, StreamId};
use proto::*;
use futures::*;
/// Tracks a connection's streams.
#[derive(Debug)]
pub struct StreamRecvOpen<T> {
inner: T,
max_concurrency: Option<u32>,
initial_window_size: WindowSize,
pending_refuse: Option<StreamId>,
}
impl<T> StreamRecvOpen<T> {
pub fn new<U>(initial_window_size: WindowSize,
max_concurrency: Option<u32>,
inner: T)
-> StreamRecvOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
StreamRecvOpen {
inner,
max_concurrency,
initial_window_size,
pending_refuse: None,
}
}
}
impl<T, U> StreamRecvOpen<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
fn send_refusal(&mut self, id: StreamId) -> Poll<(), ConnectionError> {
debug_assert!(self.pending_refuse.is_none());
let f = frame::Reset::new(id, Reason::RefusedStream);
match self.inner.start_send(f.into())? {
AsyncSink::Ready => {
self.inner.reset_stream(id, Reason::RefusedStream);
Ok(Async::Ready(()))
}
AsyncSink::NotReady(_) => {
self.pending_refuse = Some(id);
Ok(Async::NotReady)
}
}
}
}
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`.
///
/// > Indicates the maximum number of concurrent streams that the senderg will allow. This
/// > limit is directional: it applies to the number of streams that the sender permits
/// > the receiver to create. Initially, there is no limit to this value. It is
/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit
/// > parallelism.
/// >
/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by
/// > endpoints. A zero value does prevent the creation of new streams; however, this can
/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only
/// > set a zero value for short durations; if a server does not wish to accept requests,
/// > closing the connection is more appropriate.
///
/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a
/// > value that is below the current number of open streams can either close streams that
/// > exceed the new value or allow streams to complete.
///
/// This module does NOT close streams when the setting changes.
impl<T> ApplySettings for StreamRecvOpen<T>
where T: ApplySettings
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.max_concurrency = set.max_concurrent_streams();
self.initial_window_size = set.initial_window_size();
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
impl<T, U> Stream for StreamRecvOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
use frame::Frame::*;
// Since there's only one slot for pending refused streams, it must be cleared
// before polling a frame from the transport.
if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refusal(id));
}
loop {
let frame = match try_ready!(self.inner.poll()) {
None => return Ok(Async::Ready(None)),
Some(f) => f,
};
let id = frame.stream_id();
if id.is_zero() {
return Ok(Async::Ready(Some(frame)));
}
if self.inner.get_reset(id).is_some() {
// For now, just ignore frames on reset streams.
// TODO tell the remote to knock it off?
continue;
}
if T::is_valid_remote_id(id) {
unimplemented!()
}
}
}
}
impl<T, U> Sink for StreamRecvOpen<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, frame: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
// The local must complete refusing the remote stream before sending any other
// frames.
if let Some(id) = self.pending_refuse.take() {
if self.send_refusal(id)?.is_not_ready() {
return Ok(AsyncSink::NotReady(frame));
}
}
let id = frame.stream_id();
if !id.is_zero() {
// enforced by StreamSend.
debug_assert!(self.inner.get_reset(id).is_none());
let eos = frame.is_end_stream();
}
self.inner.start_send(frame)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refusal(id));
}
self.inner.poll_complete()
}
}
impl<T, U> ReadySink for StreamRecvOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
T: ControlStreams,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refusal(id));
}
self.inner.poll_ready()
}
}
// Some(Headers(v)) => {
// let id = v.stream_id();
// let eos = v.is_end_stream();
// if self.get_reset(id).is_some() {
// // TODO send the remote errors when it sends us frames on reset
// // streams.
// continue;
// }
// if let Some(mut s) = self.get_active_mut(id) {
// let created = s.recv_headers(eos, self.initial_window_size)?;
// assert!(!created);
// return Ok(Async::Ready(Some(Headers(v))));
// }
// // Ensure that receiving this frame will not violate the local max
// // stream concurrency setting. Ensure that the stream is refused
// // before processing additional frames.
// if let Some(max) = self.max_concurrency {
// let max = max as usize;
// if !self.local.is_active(id) && self.local.active_count() >= max - 1 {
// // This frame would violate our local max concurrency, so reject
// // the stream.
// try_ready!(self.send_refusal(id));
// // Try to process another frame (hopefully for an active
// // stream).
// continue;
// }
// }
// let is_closed = {
// let stream = self.active_streams.entry(id)
// .or_insert_with(|| StreamState::default());
// let initialized =
// stream.recv_headers(eos, self.initial_window_size)?;
// if initialized {
// if !P::is_valid_remote_stream_id(id) {
// return Err(Reason::ProtocolError.into());
// }
// }
// stream.is_closed()
// };
// if is_closed {
// self.active_streams.remove(id);
// self.reset_streams.insert(id, Reason::NoError);
// }
// return Ok(Async::Ready(Some(Headers(v))));
// }
// Some(Data(v)) => {
// let id = v.stream_id();
// if self.get_reset(id).is_some() {
// // TODO send the remote errors when it sends us frames on reset
// // streams.
// continue;
// }
// let is_closed = {
// let stream = match self.active_streams.get_mut(id) {
// None => return Err(Reason::ProtocolError.into()),
// Some(s) => s,
// };
// stream.recv_data(v.is_end_stream())?;
// stream.is_closed()
// };
// if is_closed {
// self.reset(id, Reason::NoError);
// }
// return Ok(Async::Ready(Some(Data(v))));
// }
// Some(Reset(v)) => {
// // Set or update the reset reason.
// self.reset(v.stream_id(), v.reason());
// return Ok(Async::Ready(Some(Reset(v))));
// }
// Some(f) => {
// let id = f.stream_id();
// if self.get_reset(id).is_some() {
// // TODO send the remote errors when it sends us frames on reset
// // streams.
// continue;
// }
// return Ok(Async::Ready(Some(f)));
// }
// None => {
// return Ok(Async::Ready(None));
// }
impl<T: ControlStreams> ControlStreams for StreamRecvOpen<T> {
fn is_valid_local_id(id: StreamId) -> bool {
T::is_valid_local_id(id)
}
fn is_valid_remote_id(id: StreamId) -> bool {
T::is_valid_remote_id(id)
}
fn can_create_local_stream() -> bool {
T::can_create_local_stream()
}
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}
fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause)
}
fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id)
}
fn is_remote_active(&self, id: StreamId) -> bool {
self.inner.is_remote_active(id)
}
fn local_active_len(&self) -> usize {
self.inner.local_active_len()
}
fn remote_active_len(&self) -> usize {
self.inner.remote_active_len()
}
fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.local_update_inital_window_size(old_sz, new_sz)
}
fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.remote_update_inital_window_size(old_sz, new_sz)
}
fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.local_flow_controller(id)
}
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id)
}
}
impl<T: ControlPing> ControlPing for StreamRecvOpen<T> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}

View File

@@ -1,217 +0,0 @@
use {ConnectionError};
use error::Reason;
use error::User;
use frame::{self, Frame};
use proto::*;
use fnv::FnvHasher;
use ordermap::OrderMap;
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
// TODO track "last stream id" for GOAWAY.
// TODO track/provide "next" stream id.
// TODO reset_streams needs to be bounded.
// TODO track reserved streams (PUSH_PROMISE).
/// Tracks a connection's streams.
#[derive(Debug)]
pub struct StreamSend<T, P> {
inner: T,
peer: PhantomData<P>,
max_concurrency: Option<u32>,
initial_window_size: WindowSize,
}
impl<T, P, U> StreamSend<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer
{
pub fn new(initial_window_size: WindowSize,
max_concurrency: Option<u32>,
inner: T)
-> StreamSend<T, P>
{
StreamSend {
inner,
peer: PhantomData,
max_concurrency,
initial_window_size,
}
}
pub fn try_open_local(&mut self, frame: Frame) -> Result<(), ConnectionError> {
unimplemented!()
}
pub fn try_close(&mut self, frame: Frame) -> Result<(), ConnectionError> {
unimplemented!()
}
}
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`.
///
/// > Indicates the maximum number of concurrent streams that the senderg will allow. This
/// > limit is directional: it applies to the number of streams that the sender permits
/// > the receiver to create. Initially, there is no limit to this value. It is
/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit
/// > parallelism.
/// >
/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by
/// > endpoints. A zero value does prevent the creation of new streams; however, this can
/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only
/// > set a zero value for short durations; if a server does not wish to accept requests,
/// > closing the connection is more appropriate.
///
/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a
/// > value that is below the current number of open streams can either close streams that
/// > exceed the new value or allow streams to complete.
///
/// This module does NOT close streams when the setting changes.
impl<T, P> ApplySettings for StreamSend<T, P>
where T: ApplySettings
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.max_concurrency = set.max_concurrent_streams();
self.initial_window_size = set.initial_window_size();
self.inner.apply_remote_settings(set)
}
}
impl<T, P> ControlPing for StreamSend<T, P>
where T: ControlPing
{
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
impl<T, P, U> Stream for StreamSend<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
P: Peer,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
self.inner.poll()
}
}
impl<T, P, U> Sink for StreamSend<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
P: Peer,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
// Must be enforced through higher levels.
if let Some(rst) = self.inner.get_reset(item.stream_id()) {
return Err(User::StreamReset(rst).into());
}
match item {
Headers(v) => {
let id = v.stream_id();
let eos = v.is_end_stream();
// Transition the stream state, creating a new entry if needed
//
// TODO: Response can send multiple headers frames before body (1xx
// responses).
//
// ACTUALLY(ver), maybe not?
// https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784
// Ensure that sending this frame would not violate the remote's max
// stream concurrency setting.
if let Some(max) = self.max_concurrency {
let max = max as usize;
let streams = self.inner.streams();
if !streams.is_active(id) && streams.active_count() >= max - 1 {
return Err(User::MaxConcurrencyExceeded.into())
}
}
let is_closed = {
let stream = self.active_streams.entry(id)
.or_insert_with(|| StreamState::default());
let initialized =
stream.send_headers::<P>(eos, self.initial_window_size)?;
if initialized {
// TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
if !P::is_valid_local_stream_id(id) {
// TODO: clear state
return Err(User::InvalidStreamId.into());
}
}
stream.is_closed()
};
if is_closed {
self.active_streams.remove(id);
self.reset_streams.insert(id, Reason::NoError);
}
self.inner.start_send(Headers(v))
}
Data(v) => {
match self.active_streams.get_mut(v.stream_id()) {
None => return Err(User::InactiveStreamId.into()),
Some(stream) => {
stream.send_data(v.is_end_stream())?;
self.inner.start_send(Data(v))
}
}
}
Reset(v) => {
let id = v.stream_id();
self.active_streams.remove(id);
self.reset_streams.insert(id, v.reason());
self.inner.start_send(Reset(v))
}
frame => self.inner.start_send(frame),
}
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
}
}
impl<T, P, U> ReadySink for StreamSend<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
T: ReadySink,
P: Peer,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}

View File

@@ -0,0 +1,141 @@
use ConnectionError;
use client::Client;
use error::Reason;
use error::User;
use frame::{self, Frame};
use proto::*;
use futures::*;
use std::marker::PhantomData;
// TODO track "last stream id" for GOAWAY.
// TODO track/provide "next" stream id.
// TODO reset_streams needs to be bounded.
// TODO track reserved streams (PUSH_PROMISE).
#[derive(Debug)]
pub struct StreamSendClose<T> {
inner: T,
}
impl<T, U> StreamSendClose<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
pub fn new(inner: T) -> StreamSendClose<T> {
StreamSendClose { inner }
}
}
impl<T> Stream for StreamSendClose<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: ControlStreams,
{
type Item = Frame;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
self.inner.poll()
}
}
impl<T, U> Sink for StreamSendClose<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
type SinkItem = Frame<U>;
type SinkError = ConnectionError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Frame<U>, ConnectionError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_complete()
}
}
impl<T, U> ReadySink for StreamSendClose<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
T: ControlStreams,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}
impl<T: ApplySettings> ApplySettings for StreamSendClose<T> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
impl<T: ControlStreams> ControlStreams for StreamSendClose<T> {
fn is_valid_local_id(id: StreamId) -> bool {
T::is_valid_local_id(id)
}
fn is_valid_remote_id(id: StreamId) -> bool {
T::is_valid_remote_id(id)
}
fn can_create_local_stream() -> bool {
T::can_create_local_stream()
}
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}
fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause)
}
fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id)
}
fn is_remote_active(&self, id: StreamId) -> bool {
self.inner.is_remote_active(id)
}
fn local_active_len(&self) -> usize {
self.inner.local_active_len()
}
fn remote_active_len(&self) -> usize {
self.inner.remote_active_len()
}
fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.local_update_inital_window_size(old_sz, new_sz)
}
fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.remote_update_inital_window_size(old_sz, new_sz)
}
fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.local_flow_controller(id)
}
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id)
}
}
impl<T: ControlPing> ControlPing for StreamSendClose<T> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}

View File

@@ -0,0 +1,224 @@
use ConnectionError;
use error::User::{InvalidStreamId, StreamReset};
use frame::{Frame, SettingSet};
use proto::*;
use futures::*;
#[derive(Debug)]
pub struct StreamSendOpen<T> {
inner: T,
max_concurrency: Option<u32>,
initial_window_size: WindowSize,
}
impl<T, U> StreamSendOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
pub fn new(initial_window_size: WindowSize,
max_concurrency: Option<u32>,
inner: T)
-> StreamSendOpen<T>
{
StreamSendOpen {
inner,
max_concurrency,
initial_window_size,
}
}
}
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`.
///
/// > Indicates the maximum number of concurrent streams that the senderg will allow. This
/// > limit is directional: it applies to the number of streams that the sender permits
/// > the receiver to create. Initially, there is no limit to this value. It is
/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit
/// > parallelism.
/// >
/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by
/// > endpoints. A zero value does prevent the creation of new streams; however, this can
/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only
/// > set a zero value for short durations; if a server does not wish to accept requests,
/// > closing the connection is more appropriate.
///
/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a
/// > value that is below the current number of open streams can either close streams that
/// > exceed the new value or allow streams to complete.
///
/// This module does NOT close streams when the setting changes.
impl<T: ApplySettings> ApplySettings for StreamSendOpen<T> {
fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.max_concurrency = set.max_concurrent_streams();
self.initial_window_size = set.initial_window_size();
self.inner.apply_remote_settings(set)
}
}
impl<T> Stream for StreamSendOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: ControlStreams,
{
type Item = Frame;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
self.inner.poll()
}
}
impl<T, U> Sink for StreamSendOpen<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, frame: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
let id = frame.stream_id();
if id.is_zero() {
// Nothing to do on connection frames.
return self.inner.start_send(frame);
}
// Reset the stream immediately and send the Reset on the underlying transport.
if let Reset(rst) = frame {
self.inner.reset_stream(id, rst.reason());
return self.inner.start_send(Reset(rst));
}
// Ensure that the stream hasn't been closed otherwise.
if let Some(reason) = self.inner.get_reset(id) {
return Err(StreamReset(reason).into())
}
if T::is_valid_local_id(id) {
if self.inner.is_local_active(id) {
// If the frame ends thestream, it will be handled in stream_recv.
return self.inner.start_send(frame);
}
if T::can_create_local_stream() {
let has_capacity = match self.max_concurrency {
None => true,
Some(max) => self.inner.local_active_len() < (max as usize),
};
if has_capacity {
// create that shit.
unimplemented!();
}
}
} else {
if self.inner.is_remote_active(id) {
// If the frame was part of a remote stream, it MUST already exist. If the
// frame ends thestream, it will be handled in stream_recv.
return self.inner.start_send(frame);
}
if let Reset(rst) = frame {
return self.inner.start_send(Reset(rst));
}
}
// Tried to send a frame on a stream
return Err(InvalidStreamId.into());
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
}
}
impl<T, U> ReadySink for StreamSendOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
T: ReadySink,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}
impl<T: ControlStreams> ControlStreams for StreamSendOpen<T> {
fn is_valid_local_id(id: StreamId) -> bool {
T::is_valid_local_id(id)
}
fn is_valid_remote_id(id: StreamId) -> bool {
T::is_valid_remote_id(id)
}
fn can_create_local_stream() -> bool {
T::can_create_local_stream()
}
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.get_reset(id)
}
fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.inner.reset_stream(id, cause)
}
fn is_local_active(&self, id: StreamId) -> bool {
self.inner.is_local_active(id)
}
fn is_remote_active(&self, id: StreamId) -> bool {
self.inner.is_remote_active(id)
}
fn local_active_len(&self) -> usize {
self.inner.local_active_len()
}
fn remote_active_len(&self) -> usize {
self.inner.remote_active_len()
}
fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.local_update_inital_window_size(old_sz, new_sz)
}
fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
self.inner.remote_update_inital_window_size(old_sz, new_sz)
}
fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.local_flow_controller(id)
}
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.remote_flow_controller(id)
}
}
impl<T: ControlFlow> ControlFlow for StreamSendOpen<T> {
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
self.inner.poll_window_update()
}
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
self.inner.expand_window(id, incr)
}
}
impl<T: ControlPing> ControlPing for StreamSendOpen<T> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}

250
src/proto/stream_store.rs Normal file
View File

@@ -0,0 +1,250 @@
use {ConnectionError, Peer, StreamId};
use error::Reason;
use proto::*;
use fnv::FnvHasher;
use ordermap::OrderMap;
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up
/// to Connection).
pub trait ControlStreams {
fn is_valid_local_id(id: StreamId) -> bool;
fn is_valid_remote_id(id: StreamId) -> bool;
fn can_create_local_stream() -> bool;
fn can_create_remote_stream() -> bool {
!Self::can_create_local_stream()
}
fn get_reset(&self, id: StreamId) -> Option<Reason>;
fn reset_stream(&mut self, id: StreamId, cause: Reason);
fn is_local_active(&self, id: StreamId) -> bool;
fn is_remote_active(&self, id: StreamId) -> bool;
fn local_active_len(&self) -> usize;
fn remote_active_len(&self) -> usize;
fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>;
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState>;
fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32);
fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32);
// fn get_active(&self, id: StreamId) -> Option<&StreamState> {
// self.streams(id).get_active(id)
// }
// fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
// self.streams_mut(id).get_active_mut(id)
// }
}
/// Holds the underlying stream state to be accessed by upper layers.
// TODO track reserved streams
// TODO constrain the size of `reset`
#[derive(Debug, Default)]
pub struct StreamStore<T, P> {
inner: T,
/// Holds active streams initiated by the local endpoint.
local_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote endpoint.
remote_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote.
reset: OrderMap<StreamId, Reason, BuildHasherDefault<FnvHasher>>,
_phantom: PhantomData<P>,
}
impl<T, P, U> StreamStore<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
{
pub fn new(inner: T) -> StreamStore<T, P> {
StreamStore {
inner,
local_active: OrderMap::default(),
remote_active: OrderMap::default(),
reset: OrderMap::default(),
_phantom: PhantomData,
}
}
}
impl<T, P> Stream for StreamStore<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
{
type Item = Frame;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
self.inner.poll()
}
}
impl<T, P, U> Sink for StreamStore<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
type SinkItem = Frame<U>;
type SinkError = ConnectionError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Frame<U>, ConnectionError> {
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_complete()
}
}
impl<T, P, U> ReadySink for StreamStore<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}
impl<T, P: Peer> ControlStreams for StreamStore<T, P> {
fn is_valid_local_id(id: StreamId) -> bool {
P::is_valid_local_stream_id(id)
}
fn is_valid_remote_id(id: StreamId) -> bool {
P::is_valid_remote_stream_id(id)
}
fn can_create_local_stream() -> bool {
P::can_create_local_stream()
}
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.reset.get(&id).map(|r| *r)
}
fn reset_stream(&mut self, id: StreamId, cause: Reason) {
if P::is_valid_local_stream_id(id) {
self.local_active.remove(&id);
} else {
self.remote_active.remove(&id);
}
self.reset.insert(id, cause);
}
fn is_local_active(&self, id: StreamId) -> bool {
self.local_active.contains_key(&id)
}
fn is_remote_active(&self, id: StreamId) -> bool {
self.remote_active.contains_key(&id)
}
fn local_active_len(&self) -> usize {
self.local_active.len()
}
fn remote_active_len(&self) -> usize {
self.remote_active.len()
}
fn local_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
if new_sz < old_sz {
let decr = old_sz - new_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.local_flow_controller() {
fc.shrink_window(decr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.local_flow_controller() {
fc.shrink_window(decr);
}
}
} else {
let incr = new_sz - old_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.local_flow_controller() {
fc.expand_window(incr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.local_flow_controller() {
fc.expand_window(incr);
}
}
}
}
fn remote_update_inital_window_size(&mut self, old_sz: u32, new_sz: u32) {
if new_sz < old_sz {
let decr = old_sz - new_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.remote_flow_controller() {
fc.shrink_window(decr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.remote_flow_controller() {
fc.shrink_window(decr);
}
}
} else {
let incr = new_sz - old_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.remote_flow_controller() {
fc.expand_window(incr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.remote_flow_controller() {
fc.expand_window(incr);
}
}
}
}
fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
if id.is_zero() {
None
} else if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(&id).and_then(|s| s.local_flow_controller())
} else {
self.remote_active.get_mut(&id).and_then(|s| s.local_flow_controller())
}
}
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
if id.is_zero() {
None
} else if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(&id).and_then(|s| s.remote_flow_controller())
} else {
self.remote_active.get_mut(&id).and_then(|s| s.remote_flow_controller())
}
}
}
impl<T: ApplySettings, P> ApplySettings for StreamStore<T, P> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}

View File

@@ -110,7 +110,7 @@ impl Peer for Server {
type Send = http::response::Head;
type Poll = http::request::Head;
fn is_valid_local_stream_id(_id: StreamId) -> bool {
fn is_valid_local_stream_id(id: StreamId) -> bool {
id.is_server_initiated()
}