wip: start splitting out stream management

This commit is contained in:
Oliver Gould
2017-07-15 22:50:13 +00:00
parent 59c92e1089
commit e90a6e9250
4 changed files with 247 additions and 217 deletions

View File

@@ -1,8 +1,7 @@
use {Frame, FrameSize};
use {ConnectionError, Frame, FrameSize};
use client::Client;
use error::{self, ConnectionError};
use frame::{self, StreamId};
use proto::{self, Peer, ReadySink, StreamState, FlowController, WindowSize};
use proto::{self, Peer, ReadySink, WindowSize};
use server::Server;
use tokio_io::{AsyncRead, AsyncWrite};
@@ -12,152 +11,65 @@ use bytes::{Bytes, IntoBuf};
use futures::*;
use ordermap::OrderMap;
use fnv::FnvHasher;
use std::hash::BuildHasherDefault;
use std::marker::PhantomData;
/// An H2 connection
#[derive(Debug)]
pub struct Connection<T, P, B: IntoBuf = Bytes> {
inner: proto::Transport<T, B::Buf>,
streams: StreamMap<StreamState>,
inner: proto::Transport<T, P, B::Buf>,
peer: PhantomData<P>,
/// Tracks the connection-level flow control window for receiving data from the
/// remote.
local_flow_controller: FlowController,
/// Tracks the onnection-level flow control window for receiving data from the remote.
remote_flow_controller: FlowController,
/// When `poll_window_update` is not ready, then the calling task is saved to be
/// notified later. Access to poll_window_update must not be shared across tasks.
blocked_window_update: Option<task::Task>,
sending_window_update: Option<frame::WindowUpdate>,
}
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
pub fn new<T, P, B>(transport: proto::Transport<T, B::Buf>)
pub fn new<T, P, B>(transport: proto::Transport<T, P, B::Buf>)
-> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
let recv_window_size = transport.local_settings().initial_window_size();
let send_window_size = transport.remote_settings().initial_window_size();
Connection {
inner: transport,
streams: StreamMap::default(),
peer: PhantomData,
local_flow_controller: FlowController::new(recv_window_size),
remote_flow_controller: FlowController::new(send_window_size),
blocked_window_update: None,
sending_window_update: None,
}
}
impl<T, P, B: IntoBuf> Connection<T, P, B> {
#[inline]
fn claim_local_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> {
self.local_flow_controller.claim_window(len)
.map_err(|_| error::Reason::FlowControlError.into())
}
#[inline]
fn claim_remote_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> {
self.remote_flow_controller.claim_window(len)
.map_err(|_| error::User::FlowControlViolation.into())
}
/// Polls for the amount of additional data that may be sent to a remote.
///
/// Connection and stream updates are distinct.
pub fn poll_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> {
let added = if id.is_zero() {
self.remote_flow_controller.take_window_update()
} else {
self.streams.get_mut(&id).and_then(|s| s.take_send_window_update())
};
match added {
Some(incr) => Ok(Async::Ready(incr)),
None => {
self.blocked_window_update = Some(task::current());
Ok(Async::NotReady)
}
}
pub fn poll_window_update(&mut self, _id: StreamId) -> Poll<WindowSize, ConnectionError> {
// let added = if id.is_zero() {
// self.remote_flow_controller.take_window_update()
// } else {
// self.streams.get_mut(&id).and_then(|s| s.take_send_window_update())
// };
// match added {
// Some(incr) => Ok(Async::Ready(incr)),
// None => {
// self.blocked_window_update = Some(task::current());
// Ok(Async::NotReady)
// }
// }
unimplemented!()
}
/// Increases the amount of data that the remote endpoint may send.
///
/// Connection and stream updates are distinct.
pub fn increment_window_size(&mut self, id: StreamId, incr: WindowSize) {
assert!(self.sending_window_update.is_none());
let added = if id.is_zero() {
self.local_flow_controller.grow_window(incr);
self.local_flow_controller.take_window_update()
} else {
self.streams.get_mut(&id).and_then(|s| {
s.grow_recv_window(incr);
s.take_recv_window_update()
})
};
if let Some(added) = added {
self.sending_window_update = Some(frame::WindowUpdate::new(id, added));
}
}
/// Handles a window update received from the remote, indicating that the local may
/// send `incr` additional bytes.
///
/// Connection window updates (id=0) and stream window updates are advertised
/// distinctly.
fn increment_send_window_size(&mut self, id: StreamId, incr: WindowSize) {
if incr == 0 {
return;
}
let added = if id.is_zero() {
self.remote_flow_controller.grow_window(incr);
true
} else if let Some(mut s) = self.streams.get_mut(&id) {
s.grow_send_window(incr);
true
} else {
false
};
if added {
if let Some(task) = self.blocked_window_update.take() {
task.notify();
}
}
}
}
impl<T, P, B> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf
{
/// Attempts to send a window update to the remote, if one is pending.
fn poll_sending_window_update(&mut self) -> Poll<(), ConnectionError> {
if let Some(f) = self.sending_window_update.take() {
if self.inner.start_send(f.into())?.is_not_ready() {
self.sending_window_update = Some(f);
return Ok(Async::NotReady);
}
}
Ok(Async::Ready(()))
pub fn increment_window_size(&mut self, _id: StreamId, _incr: WindowSize) {
// assert!(self.sending_window_update.is_none());
// let added = if id.is_zero() {
// self.local_flow_controller.grow_window(incr);
// self.local_flow_controller.take_window_update()
// } else {
// self.streams.get_mut(&id).and_then(|s| {
// s.grow_recv_window(incr);
// s.take_recv_window_update()
// })
// };
// if let Some(added) = added {
// self.sending_window_update = Some(frame::WindowUpdate::new(id, added));
// }
unimplemented!()
}
}
@@ -252,22 +164,6 @@ impl<T, P, B> Stream for Connection<T, P, B>
let stream_id = v.stream_id();
let end_of_stream = v.is_end_stream();
let init_window_size = self.inner.local_settings().initial_window_size();
let stream_initialized = try!(self.streams.entry(stream_id)
.or_insert(StreamState::default())
.recv_headers::<P>(end_of_stream, init_window_size));
if stream_initialized {
// TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
if !P::is_valid_remote_stream_id(stream_id) {
unimplemented!();
}
}
Frame::Headers {
id: stream_id,
headers: P::convert_poll_message(v),
@@ -279,12 +175,6 @@ impl<T, P, B> Stream for Connection<T, P, B>
let id = v.stream_id();
let end_of_stream = v.is_end_stream();
self.claim_local_window(v.len())?;
match self.streams.get_mut(&id) {
None => return Err(error::Reason::ProtocolError.into()),
Some(state) => state.recv_data(end_of_stream, v.len())?,
}
Frame::Data {
id,
end_of_stream,
@@ -293,16 +183,6 @@ impl<T, P, B> Stream for Connection<T, P, B>
}
}
Some(WindowUpdate(v)) => {
// When a window update is received from the remote, apply that update
// to the proper stream so that more data may be sent to the remote.
self.increment_send_window_size(v.stream_id(), v.size_increment());
// There's nothing to return yet, so continue attempting to read
// additional frames.
continue;
}
Some(frame) => panic!("unexpected frame; frame={:?}", frame),
None => return Ok(Async::Ready(None)),
};
@@ -332,33 +212,9 @@ impl<T, P, B> Sink for Connection<T, P, B>
if self.poll_ready()? == Async::NotReady {
return Ok(AsyncSink::NotReady(item));
}
assert!(self.sending_window_update.is_none());
match item {
Frame::Headers { id, headers, end_of_stream } => {
let init_window_size = self.inner.remote_settings().initial_window_size();
// Transition the stream state, creating a new entry if needed
//
// TODO: Response can send multiple headers frames before body (1xx
// responses).
//
// ACTUALLY(ver), maybe not?
// https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784
let stream_initialized = try!(self.streams.entry(id)
.or_insert(StreamState::default())
.send_headers::<P>(end_of_stream, init_window_size));
if stream_initialized {
// TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
if !P::is_valid_local_stream_id(id) {
// TODO: clear state
return Err(error::User::InvalidStreamId.into());
}
}
let frame = P::convert_send_message(id, headers, end_of_stream);
// We already ensured that the upstream can handle the frame, so
@@ -373,15 +229,7 @@ impl<T, P, B> Sink for Connection<T, P, B>
Ok(AsyncSink::Ready)
}
Frame::Data { id, data, data_len, end_of_stream } => {
try!(self.claim_remote_window(data_len));
// The stream must be initialized at this point.
match self.streams.get_mut(&id) {
None => return Err(error::User::InactiveStreamId.into()),
Some(mut s) => try!(s.send_data(end_of_stream, data_len)),
}
Frame::Data { id, data, end_of_stream, .. } => {
let mut frame = frame::Data::from_buf(id, data.into_buf());
if end_of_stream {
frame.set_end_stream();
@@ -412,13 +260,7 @@ impl<T, P, B> Sink for Connection<T, P, B>
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
trace!("poll_complete");
try_ready!(self.inner.poll_complete());
// TODO check for settings updates and update the initial window size of all
// streams.
self.poll_sending_window_update()
self.inner.poll_complete()
}
}
@@ -429,7 +271,6 @@ impl<T, P, B> ReadySink for Connection<T, P, B>
{
fn poll_ready(&mut self) -> Poll<(), Self::SinkError> {
trace!("poll_ready");
try_ready!(self.inner.poll_ready());
self.poll_sending_window_update()
self.inner.poll_ready()
}
}

View File

@@ -1,6 +1,7 @@
use ConnectionError;
use error;
use frame::{self, Frame};
use proto::{ReadySink, StreamMap, ConnectionTransporter, StreamTransporter};
use proto::*;
use futures::*;
@@ -9,6 +10,19 @@ pub struct FlowControl<T> {
inner: T,
initial_local_window_size: u32,
initial_remote_window_size: u32,
/// Tracks the connection-level flow control window for receiving data from the
/// remote.
local_flow_controller: FlowController,
/// Tracks the onnection-level flow control window for receiving data from the remote.
remote_flow_controller: FlowController,
/// When `poll_window_update` is not ready, then the calling task is saved to be
/// notified later. Access to poll_window_update must not be shared across tasks.
blocked_window_update: Option<task::Task>,
sending_window_update: Option<frame::WindowUpdate>,
}
impl<T, U> FlowControl<T>
@@ -25,10 +39,71 @@ impl<T, U> FlowControl<T>
inner,
initial_local_window_size,
initial_remote_window_size,
local_flow_controller: FlowController::new(initial_local_window_size),
remote_flow_controller: FlowController::new(initial_remote_window_size),
blocked_window_update: None,
sending_window_update: None,
}
}
}
impl<T> FlowControl<T> {
#[inline]
fn claim_local_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> {
self.local_flow_controller.claim_window(len)
.map_err(|_| error::Reason::FlowControlError.into())
}
#[inline]
fn claim_remote_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> {
self.remote_flow_controller.claim_window(len)
.map_err(|_| error::User::FlowControlViolation.into())
}
}
impl<T: StreamTransporter> FlowControl<T> {
/// Handles a window update received from the remote, indicating that the local may
/// send `incr` additional bytes.
///
/// Connection window updates (id=0) and stream window updates are advertised
/// distinctly.
fn grow_remote_window(&mut self, id: StreamId, incr: WindowSize) {
if incr == 0 {
return;
}
let added = if id.is_zero() {
self.remote_flow_controller.grow_window(incr);
true
} else if let Some(mut s) = self.streams_mut().get_mut(&id) {
s.grow_send_window(incr);
true
} else {
false
};
if added {
if let Some(task) = self.blocked_window_update.take() {
task.notify();
}
}
}
}
impl<T, U> FlowControl<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
/// Attempts to send a window update to the remote, if one is pending.
fn poll_sending_window_update(&mut self) -> Poll<(), ConnectionError> {
if let Some(f) = self.sending_window_update.take() {
if self.inner.start_send(f.into())?.is_not_ready() {
self.sending_window_update = Some(f);
return Ok(Async::NotReady);
}
}
Ok(Async::Ready(()))
}
}
/// Applies an update to an endpoint's initial window size.
///
/// Per RFC 7540 §6.9.2
@@ -116,7 +191,23 @@ impl<T> Stream for FlowControl<T>
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
self.inner.poll()
use frame::Frame::*;
trace!("poll");
loop {
match try_ready!(self.inner.poll()) {
Some(WindowUpdate(v)) => {
self.grow_remote_window(v.stream_id(), v.size_increment());
}
Some(Data(v)) => {
self.claim_local_window(v.len())?;
return Ok(Async::Ready(Some(Data(v))));
}
v => return Ok(Async::Ready(v)),
}
}
}
}
@@ -129,6 +220,12 @@ impl<T, U> Sink for FlowControl<T>
type SinkError = T::SinkError;
fn start_send(&mut self, item: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
if let &Data(ref v) = &item {
self.claim_remote_window(v.len())?;
}
self.inner.start_send(item)
}

View File

@@ -27,7 +27,7 @@ use tokio_io::codec::length_delimited;
use bytes::{Buf, IntoBuf};
use ordermap::OrderMap;
use ordermap::{Entry, OrderMap};
use fnv::FnvHasher;
use std::hash::BuildHasherDefault;
@@ -45,13 +45,14 @@ use std::hash::BuildHasherDefault;
///
/// All transporters below Settings must apply relevant settings before passing a frame on
/// to another level. For example, if the frame writer n
type Transport<T, B> =
type Transport<T, P, B> =
Settings<
FlowControl<
StreamTracker<
PingPong<
Framer<T, B>,
B>>>>;
B>,
P>>>;
type Framer<T, B> =
FramedRead<
@@ -65,6 +66,14 @@ pub struct StreamMap {
}
impl StreamMap {
fn get_mut(&mut self, id: &StreamId) -> Option<&mut StreamState> {
self.inner.get_mut(id)
}
fn entry(&mut self, id: StreamId) -> Entry<StreamId, StreamState, BuildHasherDefault<FnvHasher>> {
self.inner.entry(id)
}
fn shrink_local_window(&mut self, decr: u32) {
for (_, mut s) in &mut self.inner {
s.shrink_recv_window(decr)
@@ -159,10 +168,17 @@ pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf>
.num_skip(0) // Don't skip the header
.new_read(io);
FlowControl::new(initial_local_window_size, initial_remote_window_size,
StreamTracker::new(local_max_concurrency, remote_max_concurrency,
PingPong::new(
FramedRead::new(framer))))
FlowControl::new(
initial_local_window_size,
initial_remote_window_size,
StreamTracker::new(
initial_local_window_size,
initial_remote_window_size,
local_max_concurrency,
remote_max_concurrency,
PingPong::new(FramedRead::new(framer))
)
)
});
connection::new(transport)

View File

@@ -1,36 +1,48 @@
use ConnectionError;
use error::User::*;
use frame::{self, Frame};
use proto::{ReadySink, StreamMap, ConnectionTransporter, StreamTransporter};
use proto::*;
use futures::*;
use std::marker::PhantomData;
#[derive(Debug)]
pub struct StreamTracker<T> {
pub struct StreamTracker<T, P> {
inner: T,
peer: PhantomData<P>,
streams: StreamMap,
local_max_concurrency: Option<u32>,
remote_max_concurrency: Option<u32>,
initial_local_window_size: WindowSize,
initial_remote_window_size: WindowSize,
}
impl<T, U> StreamTracker<T>
impl<T, P, U> StreamTracker<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer
{
pub fn new(local_max_concurrency: Option<u32>,
pub fn new(initial_local_window_size: WindowSize,
initial_remote_window_size: WindowSize,
local_max_concurrency: Option<u32>,
remote_max_concurrency: Option<u32>,
inner: T)
-> StreamTracker<T>
-> StreamTracker<T, P>
{
StreamTracker {
inner,
peer: PhantomData,
streams: StreamMap::default(),
local_max_concurrency,
remote_max_concurrency,
initial_local_window_size,
initial_remote_window_size,
}
}
}
impl<T> StreamTransporter for StreamTracker<T> {
impl<T, P> StreamTransporter for StreamTracker<T, P> {
fn streams(&self) -> &StreamMap {
&self.streams
}
@@ -58,38 +70,101 @@ impl<T> StreamTransporter for StreamTracker<T> {
/// > exceed the new value or allow streams to complete.
///
/// This module does NOT close streams when the setting changes.
impl<T: ConnectionTransporter> ConnectionTransporter for StreamTracker<T> {
impl<T, P> ConnectionTransporter for StreamTracker<T, P>
where T: ConnectionTransporter
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.local_max_concurrency = set.max_concurrent_streams();
self.initial_local_window_size = set.initial_window_size();
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.remote_max_concurrency = set.max_concurrent_streams();
self.initial_remote_window_size = set.initial_window_size();
self.inner.apply_remote_settings(set)
}
}
impl<T, U> Stream for StreamTracker<T>
where T: Stream<Item = Frame<U>, Error = ConnectionError>,
impl<T, P> Stream for StreamTracker<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
P: Peer,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
self.inner.poll()
use frame::Frame::*;
match try_ready!(self.inner.poll()) {
Some(Headers(v)) => {
let id = v.stream_id();
let eos = v.is_end_stream();
let initialized = self.streams
.entry(id)
.or_insert_with(|| StreamState::default())
.recv_headers::<P>(eos, self.initial_local_window_size)?;
if initialized {
// TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
if !P::is_valid_remote_stream_id(id) {
unimplemented!();
}
}
Ok(Async::Ready(Some(Headers(v))))
}
f => Ok(Async::Ready(f))
}
}
}
impl<T, U> Sink for StreamTracker<T>
impl<T, P, U> Sink for StreamTracker<T, P>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
P: Peer,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
if let &Headers(ref v) = &item {
let id = v.stream_id();
let eos = v.is_end_stream();
// Transition the stream state, creating a new entry if needed
//
// TODO: Response can send multiple headers frames before body (1xx
// responses).
//
// ACTUALLY(ver), maybe not?
// https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784
let initialized = self.streams
.entry(id)
.or_insert_with(|| StreamState::default())
.send_headers::<P>(eos, self.initial_remote_window_size)?;
if initialized {
// TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
if !P::is_valid_local_stream_id(id) {
// TODO: clear state
return Err(InvalidStreamId.into());
}
}
}
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
@@ -98,10 +173,11 @@ impl<T, U> Sink for StreamTracker<T>
}
impl<T, U> ReadySink for StreamTracker<T>
impl<T, P, U> ReadySink for StreamTracker<T, P>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
P: Peer,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()