refuse streams that would violate max concurrency settings.

improve ping control API
This commit is contained in:
Oliver Gould
2017-07-17 22:18:03 +00:00
parent fb4f0bc5af
commit 79d3aee1dc
9 changed files with 234 additions and 82 deletions

View File

@@ -84,6 +84,9 @@ pub enum User {
/// The stream state has been reset.
StreamReset,
/// The application attempted to initiate too many streams to remote.
MaxConcurrencyExceeded,
// TODO: reserve additional variants
}
@@ -124,6 +127,7 @@ macro_rules! user_desc {
FlowControlViolation => concat!($prefix, "flow control violation"),
StreamReset => concat!($prefix, "frame sent on reset stream"),
Corrupt => concat!($prefix, "connection state corrupt"),
MaxConcurrencyExceeded => concat!($prefix, "stream would exceed remote max concurrency"),
}
});
}

View File

@@ -63,6 +63,7 @@ impl<T> Data<T> {
impl<T: Buf> Data<T> {
pub fn from_buf(stream_id: StreamId, data: T, eos: bool) -> Self {
// TODO ensure that data.remaining() < MAX_FRAME_SIZE
let mut flags = DataFlag::default();
if eos {
flags.set_end_stream();

View File

@@ -72,19 +72,19 @@ impl<T, P, B> ControlPing for Connection<T, P, B>
self.inner.start_ping(body)
}
fn pop_pong(&mut self) -> Option<PingPayload> {
self.inner.pop_pong()
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
// Note: this is bytes-specific for now so that we can know the payload's length.
impl<T, P> Connection<T, P, Bytes>
impl<T, P, B> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
pub fn send_data(self,
id: StreamId,
data: Bytes,
data: B,
end_of_stream: bool)
-> sink::Send<Self>
{
@@ -151,6 +151,8 @@ impl<T, P, B> Stream for Connection<T, P, B>
loop {
let frame = match try!(self.inner.poll()) {
Async::Ready(f) => f,
// XXX is this necessary?
Async::NotReady => {
// Receiving new frames may depend on ensuring that the write buffer
// is clear (e.g. if window updates need to be sent), so `poll_complete`
@@ -217,7 +219,7 @@ impl<T, P, B> Sink for Connection<T, P, B>
match item {
Frame::Headers { id, headers, end_of_stream } => {
if self.inner.stream_is_reset(id) {
if self.inner.stream_is_reset(id).is_some() {
return Err(error::User::StreamReset.into());
}
@@ -231,7 +233,7 @@ impl<T, P, B> Sink for Connection<T, P, B>
}
Frame::Data { id, data, end_of_stream } => {
if self.inner.stream_is_reset(id) {
if self.inner.stream_is_reset(id).is_some() {
return Err(error::User::StreamReset.into());
}

View File

@@ -61,7 +61,7 @@ impl<T: ControlStreams> FlowControl<T> {
if id.is_zero() {
Some(&mut self.connection_local_flow_controller)
} else {
self.inner.streams_mut().get_mut(&id).and_then(|s| s.local_flow_controller())
self.inner.streams_mut().get_mut(id).and_then(|s| s.local_flow_controller())
}
}
@@ -69,7 +69,7 @@ impl<T: ControlStreams> FlowControl<T> {
if id.is_zero() {
Some(&mut self.connection_remote_flow_controller)
} else {
self.inner.streams_mut().get_mut(&id).and_then(|s| s.remote_flow_controller())
self.inner.streams_mut().get_mut(id).and_then(|s| s.remote_flow_controller())
}
}
}
@@ -84,7 +84,7 @@ impl<T: ControlStreams> ControlStreams for FlowControl<T> {
self.inner.streams_mut()
}
fn stream_is_reset(&self, id: StreamId) -> bool {
fn stream_is_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.stream_is_reset(id)
}
}
@@ -92,7 +92,7 @@ impl<T: ControlStreams> ControlStreams for FlowControl<T> {
/// Exposes a public upward API for flow control.
impl<T: ControlStreams> ControlFlow for FlowControl<T> {
fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> {
if self.stream_is_reset(id) {
if self.stream_is_reset(id).is_some() {
return Err(error::User::StreamReset.into());
}
@@ -125,7 +125,7 @@ impl<T: ControlStreams> ControlFlow for FlowControl<T> {
self.pending_local_window_updates.push_back(id);
}
Ok(())
} else if self.stream_is_reset(id) {
} else if self.stream_is_reset(id).is_some() {
Err(error::User::StreamReset.into())
} else {
Err(error::User::InvalidStreamId.into())
@@ -138,8 +138,8 @@ impl<T: ControlPing> ControlPing for FlowControl<T> {
self.inner.start_ping(body)
}
fn pop_pong(&mut self) -> Option<PingPayload> {
self.inner.pop_pong()
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
@@ -160,7 +160,7 @@ impl<T, U> FlowControl<T>
}
while let Some(id) = self.pending_local_window_updates.pop_front() {
if !self.stream_is_reset(id) {
if self.stream_is_reset(id).is_none() {
let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update());
if let Some(incr) = update {
try_ready!(self.try_send(frame::WindowUpdate::new(id, incr)));
@@ -270,6 +270,8 @@ impl<T> Stream for FlowControl<T>
if self.connection_local_flow_controller.claim_window(sz).is_err() {
return Err(error::Reason::FlowControlError.into())
}
// If this frame ends the stream, there may no longer be a flow
// controller. That's fine.
if let Some(fc) = self.local_flow_controller(v.stream_id()) {
if fc.claim_window(sz).is_err() {
return Err(error::Reason::FlowControlError.into())

View File

@@ -1,4 +1,5 @@
use {frame, ConnectionError, Peer, StreamId};
use error::Reason;
use frame::SettingSet;
use bytes::{Buf, IntoBuf};
@@ -60,8 +61,8 @@ use self::state::{StreamMap, StreamState};
///
/// ### `StreamTracker`
///
/// - Tracks the states of each stream.
/// - **TODO** Enforces maximum concurrency.
/// - Tracks all active streams.
/// - Tracks all reset streams.
/// - Exposes `ControlStreams` so that upper layers may share stream state.
///
/// ### `PingPong`
@@ -126,16 +127,21 @@ pub trait ControlFlow {
/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up
/// to Connection).
pub trait ControlStreams {
/// Accesses the map of all active streams.
fn streams(&self)-> &StreamMap;
/// Mutably accesses the map of all active streams.
fn streams_mut(&mut self) -> &mut StreamMap;
fn stream_is_reset(&self, id: StreamId) -> bool;
/// Checks whether a stream has been reset.
fn stream_is_reset(&self, id: StreamId) -> Option<Reason>;
}
pub type PingPayload = [u8; 8];
pub trait ControlPing {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError>;
fn pop_pong(&mut self) -> Option<PingPayload>;
fn take_pong(&mut self) -> Option<PingPayload>;
}
/// Create a full H2 transport from an I/O handle.
@@ -203,7 +209,9 @@ pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf>
initial_remote_window_size,
local_max_concurrency,
remote_max_concurrency,
PingPong::new(FramedRead::new(framer))
PingPong::new(
FramedRead::new(framer)
)
)
)
});

View File

@@ -3,14 +3,15 @@ use frame::{Frame, Ping, SettingSet};
use proto::{ApplySettings, ControlPing, PingPayload, ReadySink};
use futures::*;
use std::collections::VecDeque;
/// Acknowledges ping requests from the remote.
#[derive(Debug)]
pub struct PingPong<T, U> {
inner: T,
sending_pong: Option<Frame<U>>,
received_pongs: VecDeque<PingPayload>,
received_pong: Option<PingPayload>,
blocked_ping: Option<task::Task>,
expecting_pong: bool,
}
impl<T, U> PingPong<T, U>
@@ -21,7 +22,9 @@ impl<T, U> PingPong<T, U>
PingPong {
inner,
sending_pong: None,
received_pongs: VecDeque::new(),
received_pong: None,
expecting_pong: false,
blocked_ping: None,
}
}
}
@@ -45,14 +48,35 @@ impl<T, U> ControlPing for PingPong<T, U>
return Ok(AsyncSink::NotReady(body));
}
// Only allow one in-flight ping.
if self.expecting_pong || self.received_pong.is_some() {
self.blocked_ping = Some(task::current());
return Ok(AsyncSink::NotReady(body))
}
match self.inner.start_send(Ping::ping(body).into())? {
AsyncSink::NotReady(_) => unreachable!(),
AsyncSink::Ready => Ok(AsyncSink::Ready),
AsyncSink::NotReady(_) => {
// By virtual of calling inner.poll_ready(), this must not happen.
unreachable!()
}
AsyncSink::Ready => {
self.expecting_pong = true;
Ok(AsyncSink::Ready)
}
}
}
fn pop_pong(&mut self) -> Option<PingPayload> {
self.received_pongs.pop_front()
fn take_pong(&mut self) -> Option<PingPayload> {
match self.received_pong.take() {
None => None,
Some(p) => {
self.expecting_pong = false;
if let Some(task) = self.blocked_ping.take() {
task.notify();
}
Some(p)
}
}
}
}
@@ -94,20 +118,20 @@ impl<T, U> Stream for PingPong<T, U>
match self.inner.poll()? {
Async::Ready(Some(Frame::Ping(ping))) => {
if ping.is_ack() {
// If we received an ACK, pass it on (nothing to be done here).
return Ok(Async::Ready(Some(ping.into())));
// Save acknowledgements to be returned from take_pong().
self.received_pong = Some(ping.into_payload());
if let Some(task) = self.blocked_ping.take() {
task.notify();
}
} else {
// Save the ping's payload to be sent as an acknowledgement.
let pong = Ping::pong(ping.into_payload());
self.sending_pong = Some(pong.into());
}
// Save a pong to be sent when there is nothing more to be returned
// from the stream or when frames are sent to the sink.
let pong = Ping::pong(ping.into_payload());
self.sending_pong = Some(pong.into());
}
// Everything other than ping gets passed through.
f => {
return Ok(f);
}
f => return Ok(f),
}
}
}
@@ -151,6 +175,7 @@ impl<T, U> ReadySink for PingPong<T, U>
#[cfg(test)]
mod test {
use super::*;
use proto::ControlPing;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
@@ -253,12 +278,10 @@ mod test {
trans.from_socket.push_back(pong.into());
}
match ping_pong.poll().unwrap() {
Async::Ready(Some(Frame::Ping(pong))) => {
assert!(pong.is_ack());
assert_eq!(&pong.into_payload(), b"buoyant!");
}
f => panic!("unexpected frame: {:?}", f),
assert!(ping_pong.poll().unwrap().is_not_ready());
match ping_pong.take_pong() {
Some(pong) => assert_eq!(&pong, b"buoyant!"),
None => panic!("no pong received"),
}
{
@@ -327,4 +350,15 @@ mod test {
self.poll_complete()
}
}
impl ReadySink for Transport {
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
let mut trans = self.0.borrow_mut();
if trans.closing || trans.start_send_blocked {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
}
}
}

View File

@@ -104,7 +104,7 @@ impl<T: ControlStreams> ControlStreams for Settings<T> {
self.inner.streams_mut()
}
fn stream_is_reset(&self, id: StreamId) -> bool {
fn stream_is_reset(&self, id: StreamId) -> Option<Reason> {
self.inner.stream_is_reset(id)
}
}
@@ -124,8 +124,8 @@ impl<T: ControlPing> ControlPing for Settings<T> {
self.inner.start_ping(body)
}
fn pop_pong(&mut self) -> Option<PingPayload> {
self.inner.pop_pong()
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}

View File

@@ -296,16 +296,28 @@ pub struct StreamMap {
}
impl StreamMap {
pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut StreamState> {
self.inner.get_mut(id)
pub fn get_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
self.inner.get_mut(&id)
}
pub fn has_stream(&mut self, id: StreamId) -> bool {
self.inner.contains_key(&id)
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn entry(&mut self, id: StreamId) -> Entry<StreamId, StreamState, BuildHasherDefault<FnvHasher>> {
self.inner.entry(id)
}
pub fn remove(&mut self, id: &StreamId) -> Option<StreamState> {
self.inner.remove(id)
pub fn remove(&mut self, id: StreamId) -> Option<StreamState> {
self.inner.remove(&id)
}
pub fn shrink_all_local_windows(&mut self, decr: u32) {

View File

@@ -9,17 +9,26 @@ use ordermap::OrderMap;
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
// TODO enforce local_max_concurrency.
// TODO enforce remote_max_concurrency.
// TODO reset_streams nees to be bounded.
// TODO track reserved streams (PUSH_PROMISE)
#[derive(Debug)]
pub struct StreamTracker<T, P> {
inner: T,
peer: PhantomData<P>,
active_streams: StreamMap,
// TODO reserved_streams: HashSet<StreamId>
reset_streams: OrderMap<StreamId, Reason, BuildHasherDefault<FnvHasher>>,
local_max_concurrency: Option<u32>,
remote_max_concurrency: Option<u32>,
initial_local_window_size: WindowSize,
initial_remote_window_size: WindowSize,
pending_refused_stream: Option<StreamId>,
}
impl<T, P, U> StreamTracker<T, P>
@@ -37,8 +46,11 @@ impl<T, P, U> StreamTracker<T, P>
StreamTracker {
inner,
peer: PhantomData,
active_streams: StreamMap::default(),
reset_streams: OrderMap::default(),
pending_refused_stream: None,
local_max_concurrency,
remote_max_concurrency,
initial_local_window_size,
@@ -47,28 +59,48 @@ impl<T, P, U> StreamTracker<T, P>
}
}
impl<T, P, U> StreamTracker<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer
{
fn send_refusal(&mut self, id: StreamId) -> Poll<(), ConnectionError> {
debug_assert!(self.pending_refused_stream.is_none());
let f = frame::Reset::new(id, Reason::RefusedStream);
match self.inner.start_send(f.into())? {
AsyncSink::Ready => {
self.reset_streams.insert(id, Reason::RefusedStream);
Ok(Async::Ready(()))
}
AsyncSink::NotReady(_) => {
self.pending_refused_stream = Some(id);
Ok(Async::NotReady)
}
}
}
}
impl<T, P> ControlStreams for StreamTracker<T, P> {
#[inline]
fn streams(&self) -> &StreamMap {
&self.active_streams
}
#[inline]
fn streams_mut(&mut self) -> &mut StreamMap {
&mut self.active_streams
}
fn stream_is_reset(&self, id: StreamId) -> bool {
self.reset_streams.contains_key(&id)
fn stream_is_reset(&self, id: StreamId) -> Option<Reason> {
self.reset_streams.get(&id).map(|r| *r)
}
}
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS`.
///
/// > Indicates the maximum number of concurrent streams that the sender will allow. This
/// > limit is directional: it applies to the number of streams that the sender permits the
/// > receiver to create. Initially, there is no limit to this value. It is recommended that
/// > this value be no smaller than 100, so as to not unnecessarily limit parallelism.
/// > limit is directional: it applies to the number of streams that the sender permits
/// > the receiver to create. Initially, there is no limit to this value. It is
/// > recommended that this value be no smaller than 100, so as to not unnecessarily limit
/// > parallelism.
/// >
/// > A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as special by
/// > endpoints. A zero value does prevent the creation of new streams; however, this can
@@ -104,13 +136,14 @@ impl<T, P> ControlPing for StreamTracker<T, P>
self.inner.start_ping(body)
}
fn pop_pong(&mut self) -> Option<PingPayload> {
self.inner.pop_pong()
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
impl<T, P> Stream for StreamTracker<T, P>
impl<T, P, U> Stream for StreamTracker<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
{
type Item = T::Item;
@@ -119,6 +152,12 @@ impl<T, P> Stream for StreamTracker<T, P>
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
use frame::Frame::*;
// The local must complete refusing the remote stream before processing additional
// frames.
if let Some(id) = self.pending_refused_stream.take() {
try_ready!(self.send_refusal(id));
}
loop {
match try_ready!(self.inner.poll()) {
Some(Headers(v)) => {
@@ -129,6 +168,23 @@ impl<T, P> Stream for StreamTracker<T, P>
continue;
}
// Ensure that receiving this frame will not violate the local max
// stream concurrency setting. Ensure that the stream is refused
// before processing additional frames.
if let Some(max) = self.local_max_concurrency {
let max = max as usize;
if !self.active_streams.has_stream(id)
&& self.active_streams.len() >= max - 1 {
// This frame would violate our local max concurrency, so reject
// the stream.
try_ready!(self.send_refusal(id));
// Try to process another frame (hopefully for an active
// stream).
continue;
}
}
let is_closed = {
let stream = self.active_streams.entry(id)
.or_insert_with(|| StreamState::default());
@@ -137,10 +193,6 @@ impl<T, P> Stream for StreamTracker<T, P>
stream.recv_headers::<P>(eos, self.initial_local_window_size)?;
if initialized {
// TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
if !P::is_valid_remote_stream_id(id) {
return Err(Reason::ProtocolError.into());
}
@@ -150,7 +202,7 @@ impl<T, P> Stream for StreamTracker<T, P>
};
if is_closed {
self.active_streams.remove(&id);
self.active_streams.remove(id);
self.reset_streams.insert(id, Reason::NoError);
}
@@ -165,7 +217,7 @@ impl<T, P> Stream for StreamTracker<T, P>
}
let is_closed = {
let stream = match self.active_streams.get_mut(&id) {
let stream = match self.active_streams.get_mut(id) {
None => return Err(Reason::ProtocolError.into()),
Some(s) => s,
};
@@ -174,7 +226,7 @@ impl<T, P> Stream for StreamTracker<T, P>
};
if is_closed {
self.active_streams.remove(&id);
self.active_streams.remove(id);
self.reset_streams.insert(id, Reason::NoError);
}
@@ -187,7 +239,7 @@ impl<T, P> Stream for StreamTracker<T, P>
// Set or update the reset reason.
self.reset_streams.insert(id, v.reason());
if self.active_streams.remove(&id).is_some() {
if self.active_streams.remove(id).is_some() {
return Ok(Async::Ready(Some(Reset(v))));
}
}
@@ -210,7 +262,6 @@ impl<T, P> Stream for StreamTracker<T, P>
}
}
impl<T, P, U> Sink for StreamTracker<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
@@ -222,10 +273,18 @@ impl<T, P, U> Sink for StreamTracker<T, P>
use frame::Frame::*;
// Must be enforced through higher levels.
debug_assert!(!self.stream_is_reset(item.stream_id()));
debug_assert!(self.stream_is_reset(item.stream_id()).is_none());
match &item {
&Headers(ref v) => {
// The local must complete refusing the remote stream before sending any other
// frames.
if let Some(id) = self.pending_refused_stream.take() {
if self.send_refusal(id)?.is_not_ready() {
return Ok(AsyncSink::NotReady(item));
}
}
match item {
Headers(v) => {
let id = v.stream_id();
let eos = v.is_end_stream();
@@ -237,6 +296,24 @@ impl<T, P, U> Sink for StreamTracker<T, P>
// ACTUALLY(ver), maybe not?
// https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784
// Ensure that sending this frame would not violate the remote's max
// stream concurrency setting.
if let Some(max) = self.remote_max_concurrency {
let max = max as usize;
if !self.active_streams.has_stream(id)
&& self.active_streams.len() >= max - 1 {
// This frame would violate our local max concurrency, so reject
// the stream.
if self.send_refusal(id)?.is_not_ready() {
return Ok(AsyncSink::NotReady(Headers(v)));
}
// Try to process another frame (hopefully for an active
// stream).
return Err(User::MaxConcurrencyExceeded.into())
}
}
let is_closed = {
let stream = self.active_streams.entry(id)
.or_insert_with(|| StreamState::default());
@@ -258,32 +335,40 @@ impl<T, P, U> Sink for StreamTracker<T, P>
};
if is_closed {
self.active_streams.remove(&id);
self.active_streams.remove(id);
self.reset_streams.insert(id, Reason::NoError);
}
self.inner.start_send(Headers(v))
}
&Data(ref v) => {
match self.active_streams.get_mut(&v.stream_id()) {
Data(v) => {
match self.active_streams.get_mut(v.stream_id()) {
None => return Err(User::InactiveStreamId.into()),
Some(stream) => stream.send_data(v.is_end_stream())?,
Some(stream) => {
stream.send_data(v.is_end_stream())?;
self.inner.start_send(Data(v))
}
}
}
&Reset(ref v) => {
Reset(v) => {
let id = v.stream_id();
self.active_streams.remove(&id);
self.active_streams.remove(id);
self.reset_streams.insert(id, v.reason());
self.inner.start_send(Reset(v))
}
_ => {}
frame => self.inner.start_send(frame),
}
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
if let Some(id) = self.pending_refused_stream.take() {
try_ready!(self.send_refusal(id));
}
self.inner.poll_complete()
}
}
@@ -296,6 +381,10 @@ impl<T, P, U> ReadySink for StreamTracker<T, P>
P: Peer,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
if let Some(id) = self.pending_refused_stream.take() {
try_ready!(self.send_refusal(id));
}
self.inner.poll_ready()
}
}