This commit is contained in:
Oliver Gould
2017-07-19 19:53:33 +00:00
parent df589f2fde
commit 0d84c98c89
11 changed files with 488 additions and 159 deletions

View File

@@ -56,8 +56,12 @@ impl Peer for Client {
id.is_client_initiated()
}
fn is_valid_remote_stream_id(_id: StreamId) -> bool {
false
fn is_valid_remote_stream_id(id: StreamId) -> bool {
id.is_server_initiated()
}
fn can_create_local_stream() -> bool {
true
}
fn convert_send_message(

View File

@@ -82,7 +82,7 @@ pub enum User {
Corrupt,
/// The stream state has been reset.
StreamReset,
StreamReset(Reason),
/// The application attempted to initiate too many streams to remote.
MaxConcurrencyExceeded,
@@ -125,7 +125,7 @@ macro_rules! user_desc {
InactiveStreamId => concat!($prefix, "inactive stream ID"),
UnexpectedFrameType => concat!($prefix, "unexpected frame type"),
FlowControlViolation => concat!($prefix, "flow control violation"),
StreamReset => concat!($prefix, "frame sent on reset stream"),
StreamReset(_) => concat!($prefix, "frame sent on reset stream"),
Corrupt => concat!($prefix, "connection state corrupt"),
MaxConcurrencyExceeded => concat!($prefix, "stream would exceed remote max concurrency"),
}

View File

@@ -85,6 +85,17 @@ pub trait Peer {
/// remote node.
fn is_valid_remote_stream_id(id: StreamId) -> bool;
fn can_create_local_stream() -> bool;
fn can_create_remote_stream() -> bool {
!Self::can_create_local_stream()
}
//fn can_reserve_local_stream() -> bool;
// fn can_reserve_remote_stream() -> bool {
// !self.can_reserve_local_stream
// }
#[doc(hidden)]
fn convert_send_message(
id: StreamId,

View File

@@ -216,10 +216,6 @@ impl<T, P, B> Sink for Connection<T, P, B>
match item {
Frame::Headers { id, headers, end_of_stream } => {
if self.inner.stream_is_reset(id).is_some() {
return Err(error::User::StreamReset.into());
}
// This is a one-way conversion. By checking `poll_ready` first (above),
// it's already been determined that the inner `Sink` can accept the item.
// If the item is rejected, then there is a bug.

View File

@@ -68,7 +68,7 @@ impl<T: ControlStreams> FlowControl<T> {
if id.is_zero() {
Some(&mut self.local_connection)
} else {
self.inner.streams_mut().get_mut(id).and_then(|s| s.local_flow_controller())
self.inner.local_flow_controller(id)
}
}
@@ -76,23 +76,31 @@ impl<T: ControlStreams> FlowControl<T> {
if id.is_zero() {
Some(&mut self.remote_connection)
} else {
self.inner.streams_mut().get_mut(id).and_then(|s| s.remote_flow_controller())
self.inner.remote_flow_controller(id)
}
}
}
/// Proxies access to streams.
impl<T: ControlStreams> ControlStreams for FlowControl<T> {
fn streams(&self) -> &StreamMap {
self.inner.streams()
fn local_streams(&self) -> &StreamMap {
self.inner.local_streams()
}
fn streams_mut(&mut self) -> &mut StreamMap {
self.inner.streams_mut()
fn local_streams_mut(&mut self) -> &mut StreamMap {
self.inner.local_streams_mut()
}
fn stream_is_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.stream_is_reset(id)
fn remote_streams(&self) -> &StreamMap {
self.inner.local_streams()
}
fn remote_streams_mut(&mut self) -> &mut StreamMap {
self.inner.local_streams_mut()
}
fn is_valid_local_id(id: StreamId) -> bool {
T::is_valid_local_id(id)
}
}
@@ -101,14 +109,14 @@ impl<T: ControlStreams> ControlFlow for FlowControl<T> {
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
// This biases connection window updates, which probably makese sense.
if let Some(incr) = self.remote_connection.apply_window_update() {
return Ok(Async::Ready(WindowUpdate(StreamId::zero(), incr)));
return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr)));
}
// TODO this should probably account for stream priority?
while let Some(id) = self.remote_pending_streams.pop_front() {
if let Some(mut flow) = self.remote_flow_controller(id) {
if let Some(incr) = flow.apply_window_update() {
return Ok(Async::Ready(WindowUpdate(id, incr)));
return Ok(Async::Ready(WindowUpdate::new(id, incr)));
}
}
}
@@ -131,8 +139,8 @@ impl<T: ControlStreams> ControlFlow for FlowControl<T> {
self.local_pending_streams.push_back(id);
}
Ok(())
} else if self.stream_is_reset(id).is_some() {
Err(error::User::StreamReset.into())
} else if let Some(rst) = self.get_reset(id) {
Err(error::User::StreamReset(rst).into())
} else {
Err(error::User::InvalidStreamId.into())
}

View File

@@ -16,7 +16,8 @@ mod ping_pong;
mod ready;
mod settings;
mod state;
mod stream_tracker;
mod stream_recv;
mod stream_send;
pub use self::connection::Connection;
pub use self::flow_control::FlowControl;
@@ -26,7 +27,8 @@ pub use self::framed_write::FramedWrite;
pub use self::ping_pong::PingPong;
pub use self::ready::ReadySink;
pub use self::settings::Settings;
pub use self::stream_tracker::StreamTracker;
pub use self::stream_recv::StreamRecv;
pub use self::stream_send::StreamSend;
use self::state::{StreamMap, StreamState};
@@ -82,14 +84,19 @@ use self::state::{StreamMap, StreamState};
///
type Transport<T, P, B>=
Settings<
FlowControl<
StreamTracker<
PingPong<
Framer<T, B>,
B>,
P>>>;
Streams<
PingPong<
Codec<T, B>,
B>,
P>>;
type Framer<T, B> =
type Streams<T, P> =
StreamSend<
FlowControl<
StreamRecv<T, P>>,
P>;
type Codec<T, B> =
FramedRead<
FramedWrite<T, B>>;
@@ -111,14 +118,22 @@ pub trait ApplySettings {
}
#[derive(Debug, Copy, Clone)]
pub struct WindowUpdate(pub StreamId, pub WindowSize);
pub struct WindowUpdate {
stream_id: StreamId,
increment: WindowSize
}
impl WindowUpdate {
pub fn new(stream_id: StreamId, increment: WindowSize) -> WindowUpdate {
WindowUpdate { stream_id, increment }
}
pub fn stream_id(&self) -> StreamId {
self.0
self.stream_id
}
pub fn increment(&self) -> WindowSize {
self.1
self.increment
}
}
@@ -139,14 +154,35 @@ pub trait ControlFlow {
/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up
/// to Connection).
pub trait ControlStreams {
/// Accesses the map of all active streams.
fn streams(&self)-> &StreamMap;
fn is_valid_local_id(id: StreamId) -> bool;
fn is_valid_remote_id(id: StreamId) -> bool {
!id.is_zero() && !Self::is_valid_local_id(id)
}
/// Mutably accesses the map of all active streams.
fn streams_mut(&mut self) -> &mut StreamMap;
fn get_active(&self, id: StreamId) -> Option<&StreamState> {
self.streams(id).get_active(id)
}
/// Checks whether a stream has been reset.
fn stream_is_reset(&self, id: StreamId) -> Option<Reason>;
fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
self.streams_mut(id).get_active_mut(id)
}
fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.streams(id).get_reset(id)
}
fn reset(&mut self, id: StreamId, cause: Reason) {
self.streams_mut(id).reset(id, cause);
}
fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.streams_mut(id).local_flow_controller(id)
}
fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.streams_mut(id).remote_flow_controller(id)
}
}
pub type PingPayload = [u8; 8];
@@ -206,26 +242,24 @@ pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf>
// Replace Settings' writer with a full transport.
let transport = settings.swap_inner(|io| {
// Delimit the frames.
let framer = length_delimited::Builder::new()
let framed = length_delimited::Builder::new()
.big_endian()
.length_field_length(3)
.length_adjustment(9)
.num_skip(0) // Don't skip the header
.new_read(io);
FlowControl::new(
initial_local_window_size,
StreamSend::new(
initial_remote_window_size,
StreamTracker::new(
remote_max_concurrency,
FlowControl::new(
initial_local_window_size,
initial_remote_window_size,
local_max_concurrency,
remote_max_concurrency,
PingPong::new(
FramedRead::new(framer)
)
)
)
StreamRecv::new(
initial_local_window_size,
local_max_concurrency,
PingPong::new(
FramedRead::new(framed)))))
});
connection::new(transport)

View File

@@ -24,7 +24,7 @@ pub struct Settings<T> {
remaining_acks: usize,
// True when the local settings must be flushed to the remote
is_local_dirty: bool,
is_valid_local_id_dirty: bool,
// True when we have received a settings frame from the remote.
received_remote: bool,
@@ -39,7 +39,7 @@ impl<T, U> Settings<T>
local: local,
remote: SettingSet::default(),
remaining_acks: 0,
is_local_dirty: true,
is_valid_local_id_dirty: true,
received_remote: false,
}
}
@@ -61,18 +61,18 @@ impl<T, U> Settings<T>
local: self.local,
remote: self.remote,
remaining_acks: self.remaining_acks,
is_local_dirty: self.is_local_dirty,
is_valid_local_id_dirty: self.is_valid_local_id_dirty,
received_remote: self.received_remote,
}
}
fn try_send_pending(&mut self) -> Poll<(), ConnectionError> {
trace!("try_send_pending; dirty={} acks={}", self.is_local_dirty, self.remaining_acks);
if self.is_local_dirty {
trace!("try_send_pending; dirty={} acks={}", self.is_valid_local_id_dirty, self.remaining_acks);
if self.is_valid_local_id_dirty {
let frame = frame::Settings::new(self.local.clone());
try_ready!(self.try_send(frame));
self.is_local_dirty = false;
self.is_valid_local_id_dirty = false;
}
while self.remaining_acks > 0 {
@@ -96,16 +96,24 @@ impl<T, U> Settings<T>
}
impl<T: ControlStreams> ControlStreams for Settings<T> {
fn streams(&self) -> &StreamMap {
self.inner.streams()
fn local_streams(&self) -> &StreamMap {
self.inner.local_streams()
}
fn streams_mut(&mut self) -> &mut StreamMap {
self.inner.streams_mut()
fn local_streams_mut(&mut self) -> &mut StreamMap {
self.inner.local_streams_mut()
}
fn stream_is_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.stream_is_reset(id)
fn remote_streams(&self) -> &StreamMap {
self.inner.local_streams()
}
fn remote_streams_mut(&mut self) -> &mut StreamMap {
self.inner.local_streams_mut()
}
fn is_valid_local_id(id: StreamId) -> bool {
T::is_valid_local_id(id)
}
}
@@ -132,7 +140,7 @@ impl<T: ControlPing> ControlPing for Settings<T> {
impl<T> ControlSettings for Settings<T>{
fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> {
self.local = local;
self.is_local_dirty = true;
self.is_valid_local_id_dirty = true;
Ok(())
}

View File

@@ -1,5 +1,5 @@
use {Peer, StreamId};
use error::ConnectionError;
use error::{ConnectionError, Reason};
use error::Reason::*;
use error::User::*;
use proto::{FlowControlState, WindowSize};
@@ -7,6 +7,7 @@ use proto::{FlowControlState, WindowSize};
use fnv::FnvHasher;
use ordermap::{Entry, OrderMap};
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
/// Represents the state of an H2 stream
///
@@ -76,10 +77,9 @@ impl StreamState {
///
/// Returns true if this state transition results in iniitializing the
/// stream id. `Err` is returned if this is an invalid state transition.
pub fn recv_headers<P: Peer>(&mut self,
eos: bool,
initial_recv_window_size: WindowSize)
pub fn recv_headers<P>(&mut self, eos: bool, initial_window_size: WindowSize)
-> Result<bool, ConnectionError>
where P: Peer
{
use self::StreamState::*;
use self::PeerState::*;
@@ -90,7 +90,7 @@ impl StreamState {
if eos {
*self = HalfClosedRemote(local);
} else {
let remote = Data(FlowControlState::with_initial_size(initial_recv_window_size));
let remote = Data(FlowControlState::with_initial_size(initial_window_size));
*self = Open { local, remote };
}
Ok(true)
@@ -111,7 +111,8 @@ impl StreamState {
if eos {
*self = Closed;
} else {
*self = HalfClosedLocal(Data(FlowControlState::with_initial_size(initial_recv_window_size)));
let remote = FlowControlState::with_initial_size(initial_window_size);
*self = HalfClosedLocal(Data(remote));
};
Ok(false)
}
@@ -291,46 +292,76 @@ impl PeerState {
}
}
// TODO track reserved streams
// TODO constrain the size of `reset`
#[derive(Debug, Default)]
pub struct StreamMap {
inner: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>
pub struct StreamMap<P> {
/// Holds active streams initiated by the local endpoint.
local_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote endpoint.
remote_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote.
reset: OrderMap<StreamId, Reason, BuildHasherDefault<FnvHasher>>,
_phantom: PhantomData<P>,
}
impl StreamMap {
pub fn get_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
self.inner.get_mut(&id)
impl<P: Peer> StreamMap<P> {
pub fn active(&mut self, id: StreamId) -> Option<&StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.get(id)
} else {
self.remote_active.get(id)
}
}
pub fn active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
assert!(!id.is_zero());
if P::is_valid_local_stream_id(id) {
self.local_active.get_mut(id)
} else {
self.remote_active.get_mut(id)
}
}
pub fn local_active(&self, id: StreamId) -> Option<&StreamState> {
self.local_active.get(&id)
}
pub fn local_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
self.local_active.get_mut(&id)
}
pub fn local_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.get_mut(&id).and_then(|s| s.local_flow_controller())
self.get_active_mut(id).and_then(|s| s.local_flow_controller())
}
pub fn remote_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
self.inner.get_mut(&id).and_then(|s| s.remote_flow_controller())
self.get_active_mut(id).and_then(|s| s.remote_flow_controller())
}
pub fn has_stream(&mut self, id: StreamId) -> bool {
self.inner.contains_key(&id)
pub fn localis_active(&mut self, id: StreamId) -> bool {
self.active.contains_key(&id)
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
pub fn active_count(&self) -> usize {
self.active.len()
}
pub fn len(&self) -> usize {
self.inner.len()
pub fn reset(&mut self, id: StreamId, cause: Reason) {
self.reset.insert(id, cause);
self.active.remove(&id);
}
pub fn entry(&mut self, id: StreamId) -> Entry<StreamId, StreamState, BuildHasherDefault<FnvHasher>> {
self.inner.entry(id)
}
pub fn remove(&mut self, id: StreamId) -> Option<StreamState> {
self.inner.remove(&id)
pub fn get_reset(&mut self, id: StreamId) -> Option<Reason> {
self.reset.get(&id).map(|r| *r)
}
pub fn shrink_all_local_windows(&mut self, decr: u32) {
for (_, mut s) in &mut self.inner {
for (_, mut s) in &mut self.active {
if let Some(fc) = s.local_flow_controller() {
fc.shrink_window(decr);
}
@@ -338,7 +369,7 @@ impl StreamMap {
}
pub fn expand_all_local_windows(&mut self, incr: u32) {
for (_, mut s) in &mut self.inner {
for (_, mut s) in &mut self.active {
if let Some(fc) = s.local_flow_controller() {
fc.expand_window(incr);
}
@@ -346,7 +377,7 @@ impl StreamMap {
}
pub fn shrink_all_remote_windows(&mut self, decr: u32) {
for (_, mut s) in &mut self.inner {
for (_, mut s) in &mut self.active {
if let Some(fc) = s.remote_flow_controller() {
fc.shrink_window(decr);
}
@@ -354,7 +385,7 @@ impl StreamMap {
}
pub fn expand_all_remote_windows(&mut self, incr: u32) {
for (_, mut s) in &mut self.inner {
for (_, mut s) in &mut self.active {
if let Some(fc) = s.remote_flow_controller() {
fc.expand_window(incr);
}

View File

@@ -1,8 +1,10 @@
use {ConnectionError};
use ConnectionError;
use client::Client;
use error::Reason;
use error::User;
use frame::{self, Frame};
use proto::*;
use server::Server;
use fnv::FnvHasher;
use ordermap::OrderMap;
@@ -16,82 +18,93 @@ use std::marker::PhantomData;
/// Tracks a connection's streams.
#[derive(Debug)]
pub struct StreamTracker<T, P> {
pub struct StreamRecv<T, P> {
inner: T,
peer: PhantomData<P>,
active_streams: StreamMap,
// TODO reserved_streams: HashSet<StreamId>
reset_streams: OrderMap<StreamId, Reason, BuildHasherDefault<FnvHasher>>,
local: StreamMap,
local_max_concurrency: Option<u32>,
remote_max_concurrency: Option<u32>,
initial_local_window_size: WindowSize,
initial_remote_window_size: WindowSize,
local_initial_window_size: WindowSize,
pending_refused_stream: Option<StreamId>,
remote: StreamMap,
remote_max_concurrency: Option<u32>,
remote_initial_window_size: WindowSize,
remote_pending_refuse: Option<StreamId>,
}
impl<T, P, U> StreamTracker<T, P>
impl<T, P, U> StreamRecv<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer
{
pub fn new(initial_local_window_size: WindowSize,
initial_remote_window_size: WindowSize,
local_max_concurrency: Option<u32>,
remote_max_concurrency: Option<u32>,
pub fn new(initial_window_size: WindowSize,
max_concurrency: Option<u32>,
inner: T)
-> StreamTracker<T, P>
-> StreamRecv<T, P>
{
StreamTracker {
StreamRecv {
inner,
peer: PhantomData,
active_streams: StreamMap::default(),
reset_streams: OrderMap::default(),
pending_refused_stream: None,
local_max_concurrency,
remote_max_concurrency,
initial_local_window_size,
initial_remote_window_size,
local: StreamMap::default(),
remote: StreamMap::default(),
max_concurrency,
initial_window_size,
remote_pending_refuse: None,
}
}
pub fn try_open_remote(&mut self, frame: Frame) -> Result<(), ConnectionError> {
unimplemented!()
}
pub fn try_close(&mut self, frame: Frame) -> Result<(), ConnectionError> {
unimplemented!()
}
}
impl<T, P, U> StreamTracker<T, P>
impl<T, P, U> StreamRecv<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer
{
fn send_refusal(&mut self, id: StreamId) -> Poll<(), ConnectionError> {
debug_assert!(self.pending_refused_stream.is_none());
debug_assert!(self.remote_pending_refused.is_none());
let f = frame::Reset::new(id, Reason::RefusedStream);
match self.inner.start_send(f.into())? {
AsyncSink::Ready => {
self.reset_streams.insert(id, Reason::RefusedStream);
self.reset(id, Reason::RefusedStream);
Ok(Async::Ready(()))
}
AsyncSink::NotReady(_) => {
self.pending_refused_stream = Some(id);
self.pending_refused = Some(id);
Ok(Async::NotReady)
}
}
}
}
impl<T, P> ControlStreams for StreamTracker<T, P> {
fn streams(&self) -> &StreamMap {
&self.active_streams
impl<T, P> ControlStreams for StreamRecv<T, P>
where P: Peer
{
fn local_streams(&self) -> &StreamMap {
&self.local
}
fn streams_mut(&mut self) -> &mut StreamMap {
&mut self.active_streams
fn local_streams_mut(&mut self) -> &mut StreamMap {
&mut self.local
}
fn stream_is_reset(&self, id: StreamId) -> Option<Reason> {
self.reset_streams.get(&id).map(|r| *r)
fn remote_streams(&self) -> &StreamMap {
&self.remote
}
fn remote_streams_mut(&mut self) -> &mut StreamMap {
&mut self.remote
}
fn is_valid_local_id(id: StreamId) -> bool {
P::is_valid_local_stream_id(id)
}
}
@@ -114,23 +127,21 @@ impl<T, P> ControlStreams for StreamTracker<T, P> {
/// > exceed the new value or allow streams to complete.
///
/// This module does NOT close streams when the setting changes.
impl<T, P> ApplySettings for StreamTracker<T, P>
impl<T, P> ApplySettings for StreamRecv<T, P>
where T: ApplySettings
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.local_max_concurrency = set.max_concurrent_streams();
self.initial_local_window_size = set.initial_window_size();
self.max_concurrency = set.max_concurrent_streams();
self.initial_window_size = set.initial_window_size();
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.remote_max_concurrency = set.max_concurrent_streams();
self.initial_remote_window_size = set.initial_window_size();
self.inner.apply_remote_settings(set)
}
}
impl<T, P> ControlPing for StreamTracker<T, P>
impl<T, P> ControlPing for StreamRecv<T, P>
where T: ControlPing
{
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
@@ -142,7 +153,7 @@ impl<T, P> ControlPing for StreamTracker<T, P>
}
}
impl<T, P, U> Stream for StreamTracker<T, P>
impl<T, P, U> Stream for StreamRecv<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
@@ -155,7 +166,7 @@ impl<T, P, U> Stream for StreamTracker<T, P>
// Since there's only one slot for pending refused streams, it must be cleared
// before polling a frame from the transport.
if let Some(id) = self.pending_refused_stream.take() {
if let Some(id) = self.pending_refused.take() {
try_ready!(self.send_refusal(id));
}
@@ -165,17 +176,24 @@ impl<T, P, U> Stream for StreamTracker<T, P>
let id = v.stream_id();
let eos = v.is_end_stream();
if self.reset_streams.contains_key(&id) {
if self.get_reset(id).is_some() {
// TODO send the remote errors when it sends us frames on reset
// streams.
continue;
}
if let Some(mut s) = self.get_active_mut(id) {
let created = s.recv_headers(eos, self.initial_window_size)?;
assert!(!created);
return Ok(Async::Ready(Some(Headers(v))));
}
// Ensure that receiving this frame will not violate the local max
// stream concurrency setting. Ensure that the stream is refused
// before processing additional frames.
if let Some(max) = self.local_max_concurrency {
if let Some(max) = self.max_concurrency {
let max = max as usize;
if !self.active_streams.has_stream(id)
&& self.active_streams.len() >= max - 1 {
if !self.local.is_active(id) && self.local.active_count() >= max - 1 {
// This frame would violate our local max concurrency, so reject
// the stream.
try_ready!(self.send_refusal(id));
@@ -191,7 +209,7 @@ impl<T, P, U> Stream for StreamTracker<T, P>
.or_insert_with(|| StreamState::default());
let initialized =
stream.recv_headers::<P>(eos, self.initial_local_window_size)?;
stream.recv_headers(eos, self.initial_window_size)?;
if initialized {
if !P::is_valid_remote_stream_id(id) {
@@ -213,7 +231,9 @@ impl<T, P, U> Stream for StreamTracker<T, P>
Some(Data(v)) => {
let id = v.stream_id();
if self.reset_streams.contains_key(&id) {
if self.get_reset(id).is_some() {
// TODO send the remote errors when it sends us frames on reset
// streams.
continue;
}
@@ -227,28 +247,24 @@ impl<T, P, U> Stream for StreamTracker<T, P>
};
if is_closed {
self.active_streams.remove(id);
self.reset_streams.insert(id, Reason::NoError);
self.reset(id, Reason::NoError);
}
return Ok(Async::Ready(Some(Data(v))));
}
Some(Reset(v)) => {
let id = v.stream_id();
// Set or update the reset reason.
self.reset_streams.insert(id, v.reason());
if self.active_streams.remove(id).is_some() {
return Ok(Async::Ready(Some(Reset(v))));
}
self.reset(v.stream_id(), v.reason());
return Ok(Async::Ready(Some(Reset(v))));
}
Some(f) => {
let id = f.stream_id();
if self.reset_streams.contains_key(&id) {
if self.get_reset(id).is_some() {
// TODO send the remote errors when it sends us frames on reset
// streams.
continue;
}
@@ -263,14 +279,14 @@ impl<T, P, U> Stream for StreamTracker<T, P>
}
}
impl<T, P, U> Sink for StreamTracker<T, P>
impl<T, P, U> Sink for StreamRecv<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
fn start_send(&mut self, frame: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
// Must be enforced through higher levels.
@@ -278,13 +294,13 @@ impl<T, P, U> Sink for StreamTracker<T, P>
// The local must complete refusing the remote stream before sending any other
// frames.
if let Some(id) = self.pending_refused_stream.take() {
if let Some(id) = self.pending_refused.take() {
if self.send_refusal(id)?.is_not_ready() {
return Ok(AsyncSink::NotReady(item));
}
}
match item {
match frame {
Headers(v) => {
let id = v.stream_id();
let eos = v.is_end_stream();
@@ -366,7 +382,7 @@ impl<T, P, U> Sink for StreamTracker<T, P>
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
if let Some(id) = self.pending_refused_stream.take() {
if let Some(id) = self.pending_refused.take() {
try_ready!(self.send_refusal(id));
}
@@ -375,14 +391,14 @@ impl<T, P, U> Sink for StreamTracker<T, P>
}
impl<T, P, U> ReadySink for StreamTracker<T, P>
impl<T, P, U> ReadySink for StreamRecv<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
P: Peer,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
if let Some(id) = self.pending_refused_stream.take() {
if let Some(id) = self.pending_refused.take() {
try_ready!(self.send_refusal(id));
}

217
src/proto/stream_send.rs Normal file
View File

@@ -0,0 +1,217 @@
use {ConnectionError};
use error::Reason;
use error::User;
use frame::{self, Frame};
use proto::*;
use fnv::FnvHasher;
use ordermap::OrderMap;
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
// TODO track "last stream id" for GOAWAY.
// TODO track/provide "next" stream id.
// TODO reset_streams needs to be bounded.
// TODO track reserved streams (PUSH_PROMISE).
/// Tracks a connection's streams.
#[derive(Debug)]
pub struct StreamSend<T, P> {
inner: T,
peer: PhantomData<P>,
max_concurrency: Option<u32>,
initial_window_size: WindowSize,
}
impl<T, P, U> StreamSend<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer
{
pub fn new(initial_window_size: WindowSize,
max_concurrency: Option<u32>,
inner: T)
-> StreamSend<T, P>
{
StreamSend {
inner,
peer: PhantomData,
max_concurrency,
initial_window_size,
}
}
pub fn try_open_local(&mut self, frame: Frame) -> Result<(), ConnectionError> {
unimplemented!()
}
pub fn try_close(&mut self, frame: Frame) -> Result<(), ConnectionError> {
unimplemented!()
}
}
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`.
///
/// > Indicates the maximum number of concurrent streams that the senderg will allow. This
/// > limit is directional: it applies to the number of streams that the sender permits
/// > the receiver to create. Initially, there is no limit to this value. It is
/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit
/// > parallelism.
/// >
/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by
/// > endpoints. A zero value does prevent the creation of new streams; however, this can
/// > also happen for any limit that is exhausted with active streams. Servers SHOULD only
/// > set a zero value for short durations; if a server does not wish to accept requests,
/// > closing the connection is more appropriate.
///
/// > An endpoint that wishes to reduce the value of SETTINGS_MAX_CONCURRENT_STREAMS to a
/// > value that is below the current number of open streams can either close streams that
/// > exceed the new value or allow streams to complete.
///
/// This module does NOT close streams when the setting changes.
impl<T, P> ApplySettings for StreamSend<T, P>
where T: ApplySettings
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.max_concurrency = set.max_concurrent_streams();
self.initial_window_size = set.initial_window_size();
self.inner.apply_remote_settings(set)
}
}
impl<T, P> ControlPing for StreamSend<T, P>
where T: ControlPing
{
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
impl<T, P, U> Stream for StreamSend<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
P: Peer,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
self.inner.poll()
}
}
impl<T, P, U> Sink for StreamSend<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
P: Peer,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
// Must be enforced through higher levels.
if let Some(rst) = self.inner.get_reset(item.stream_id()) {
return Err(User::StreamReset(rst).into());
}
match item {
Headers(v) => {
let id = v.stream_id();
let eos = v.is_end_stream();
// Transition the stream state, creating a new entry if needed
//
// TODO: Response can send multiple headers frames before body (1xx
// responses).
//
// ACTUALLY(ver), maybe not?
// https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784
// Ensure that sending this frame would not violate the remote's max
// stream concurrency setting.
if let Some(max) = self.max_concurrency {
let max = max as usize;
let streams = self.inner.streams();
if !streams.is_active(id) && streams.active_count() >= max - 1 {
return Err(User::MaxConcurrencyExceeded.into())
}
}
let is_closed = {
let stream = self.active_streams.entry(id)
.or_insert_with(|| StreamState::default());
let initialized =
stream.send_headers::<P>(eos, self.initial_window_size)?;
if initialized {
// TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
if !P::is_valid_local_stream_id(id) {
// TODO: clear state
return Err(User::InvalidStreamId.into());
}
}
stream.is_closed()
};
if is_closed {
self.active_streams.remove(id);
self.reset_streams.insert(id, Reason::NoError);
}
self.inner.start_send(Headers(v))
}
Data(v) => {
match self.active_streams.get_mut(v.stream_id()) {
None => return Err(User::InactiveStreamId.into()),
Some(stream) => {
stream.send_data(v.is_end_stream())?;
self.inner.start_send(Data(v))
}
}
}
Reset(v) => {
let id = v.stream_id();
self.active_streams.remove(id);
self.reset_streams.insert(id, v.reason());
self.inner.start_send(Reset(v))
}
frame => self.inner.start_send(frame),
}
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
}
}
impl<T, P, U> ReadySink for StreamSend<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
T: ReadySink,
P: Peer,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}

View File

@@ -111,13 +111,17 @@ impl Peer for Server {
type Poll = http::request::Head;
fn is_valid_local_stream_id(_id: StreamId) -> bool {
false
id.is_server_initiated()
}
fn is_valid_remote_stream_id(id: StreamId) -> bool {
id.is_client_initiated()
}
fn can_create_local_stream() -> bool {
false
}
fn convert_send_message(
id: StreamId,
headers: Self::Send,