wire up flow control polling through connection

This commit is contained in:
Oliver Gould
2017-07-16 16:04:40 +00:00
parent 85626f5a79
commit 76dbb5d285
8 changed files with 207 additions and 156 deletions

View File

@@ -1,7 +1,7 @@
use {ConnectionError, Frame, FrameSize};
use client::Client;
use frame::{self, StreamId};
use proto::{self, Peer, ReadySink, WindowSize};
use proto::{self, Peer, ReadySink, FlowTransporter, WindowSize};
use server::Server;
use tokio_io::{AsyncRead, AsyncWrite};
@@ -32,44 +32,22 @@ pub fn new<T, P, B>(transport: proto::Transport<T, P, B::Buf>)
}
}
impl<T, P, B: IntoBuf> Connection<T, P, B> {
impl<T, P, B> Connection<T, P, B>
where T: FlowTransporter,
B: IntoBuf,
{
/// Polls for the amount of additional data that may be sent to a remote.
///
/// Connection and stream updates are distinct.
pub fn poll_window_update(&mut self, _id: StreamId) -> Poll<WindowSize, ConnectionError> {
// let added = if id.is_zero() {
// self.remote_flow_controller.take_window_update()
// } else {
// self.streams.get_mut(&id).and_then(|s| s.take_send_window_update())
// };
// match added {
// Some(incr) => Ok(Async::Ready(incr)),
// None => {
// self.blocked_window_update = Some(task::current());
// Ok(Async::NotReady)
// }
// }
unimplemented!()
pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> {
self.inner.poll_remote_window_update(id)
}
/// Increases the amount of data that the remote endpoint may send.
///
/// Connection and stream updates are distinct.
pub fn increment_window_size(&mut self, _id: StreamId, _incr: WindowSize) {
// assert!(self.sending_window_update.is_none());
// let added = if id.is_zero() {
// self.local_flow_controller.grow_window(incr);
// self.local_flow_controller.take_window_update()
// } else {
// self.streams.get_mut(&id).and_then(|s| {
// s.grow_recv_window(incr);
// s.take_recv_window_update()
// })
// };
// if let Some(added) = added {
// self.sending_window_update = Some(frame::WindowUpdate::new(id, added));
// }
unimplemented!()
pub fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
self.inner.grow_local_window(id, incr)
}
}
@@ -150,9 +128,8 @@ impl<T, P, B> Stream for Connection<T, P, B>
// is called here.
try_ready!(self.inner.poll_complete());
// If the sender sink is ready, we attempt to poll the underlying
// stream once more because it, may have been made ready by flushing
// the sink.
// If the write buffer is cleared, attempt to poll the underlying
// stream once more because it, may have been made ready.
try_ready!(self.inner.poll())
}
};
@@ -215,17 +192,12 @@ impl<T, P, B> Sink for Connection<T, P, B>
match item {
Frame::Headers { id, headers, end_of_stream } => {
// This is a one-way conversion. By checking `poll_ready` first (above),
// it's already been determined that the inner `Sink` can accept the item.
// If the item is rejected, then there is a bug.
let frame = P::convert_send_message(id, headers, end_of_stream);
// We already ensured that the upstream can handle the frame, so
// panic if it gets rejected.
let res = try!(self.inner.start_send(Headers(frame)));
// This is a one-way conversion. By checking `poll_ready` first,
// it's already been determined that the inner `Sink` can accept
// the item. If the item is rejected, then there is a bug.
let res = self.inner.start_send(Headers(frame))?;
assert!(res.is_ready());
Ok(AsyncSink::Ready)
}

View File

@@ -3,7 +3,7 @@ use error;
use frame::{self, Frame};
use proto::*;
use futures::*;
use std::collections::VecDeque;
#[derive(Debug)]
pub struct FlowControl<T> {
@@ -18,11 +18,17 @@ pub struct FlowControl<T> {
/// Tracks the onnection-level flow control window for receiving data from the remote.
remote_flow_controller: FlowController,
/// When `poll_window_update` is not ready, then the calling task is saved to be
/// notified later. Access to poll_window_update must not be shared across tasks.
blocked_window_update: Option<task::Task>,
/// Holds the list of streams on which local window updates may be sent.
// XXX It would be cool if this didn't exist.
pending_local_window_updates: VecDeque<StreamId>,
sending_window_update: Option<frame::WindowUpdate>,
/// If a window update can't be sent immediately, it may need to be saved to be sent later.
sending_local_window_update: Option<frame::WindowUpdate>,
/// When `poll_remote_window_update` is not ready, then the calling task is saved to
/// be notified later. Access to poll_window_update must not be shared across tasks,
/// as we only track a single task (and *not* i.e. a task per stream id).
blocked_remote_window_update: Option<task::Task>,
}
impl<T, U> FlowControl<T>
@@ -41,41 +47,38 @@ impl<T, U> FlowControl<T>
initial_remote_window_size,
local_flow_controller: FlowController::new(initial_local_window_size),
remote_flow_controller: FlowController::new(initial_remote_window_size),
blocked_window_update: None,
sending_window_update: None,
blocked_remote_window_update: None,
sending_local_window_update: None,
pending_local_window_updates: VecDeque::new(),
}
}
}
impl<T: StreamTransporter> FlowControl<T> {
fn claim_local_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> {
if id.is_zero() {
return self.local_flow_controller.claim_window(len)
.map_err(|_| error::Reason::FlowControlError.into());
}
let res = if id.is_zero() {
self.local_flow_controller.claim_window(len)
} else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) {
stream.claim_local_window(len)
} else {
// Ignore updates for non-existent streams.
Ok(())
};
if let Some(mut stream) = self.streams_mut().get_mut(&id) {
return stream.claim_local_window(len)
.map_err(|_| error::Reason::FlowControlError.into());
}
// Ignore updates for non-existent streams.
Ok(())
res.map_err(|_| error::Reason::FlowControlError.into())
}
fn claim_remote_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> {
if id.is_zero() {
return self.local_flow_controller.claim_window(len)
.map_err(|_| error::Reason::FlowControlError.into());
}
let res = if id.is_zero() {
self.local_flow_controller.claim_window(len)
} else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) {
stream.claim_remote_window(len)
} else {
// Ignore updates for non-existent streams.
Ok(())
};
if let Some(mut stream) = self.streams_mut().get_mut(&id) {
return stream.claim_remote_window(len)
.map_err(|_| error::Reason::FlowControlError.into());
}
// Ignore updates for non-existent streams.
Ok(())
res.map_err(|_| error::Reason::FlowControlError.into())
}
/// Handles a window update received from the remote, indicating that the local may
@@ -87,35 +90,93 @@ impl<T: StreamTransporter> FlowControl<T> {
if incr == 0 {
return;
}
let added = if id.is_zero() {
if id.is_zero() {
self.remote_flow_controller.grow_window(incr);
true
} else if let Some(mut s) = self.streams_mut().get_mut(&id) {
} else if let Some(mut s) = self.inner.streams_mut().get_mut(&id) {
s.grow_remote_window(incr);
true
} else {
false
// Ignore updates for non-existent streams.
return;
};
if added {
if let Some(task) = self.blocked_window_update.take() {
task.notify();
}
if let Some(task) = self.blocked_remote_window_update.take() {
task.notify();
}
}
}
impl<T: StreamTransporter> FlowTransporter for FlowControl<T> {
fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> {
if id.is_zero() {
if let Some(sz) = self.remote_flow_controller.take_window_update() {
return Ok(Async::Ready(sz));
}
} else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) {
if let Some(sz) = stream.take_remote_window_update() {
return Ok(Async::Ready(sz));
}
} else {
return Err(error::User::InvalidStreamId.into());
}
self.blocked_remote_window_update = Some(task::current());
return Ok(Async::NotReady);
}
fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
if id.is_zero() {
self.local_flow_controller.grow_window(incr);
self.pending_local_window_updates.push_back(id);
Ok(())
} else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) {
stream.grow_local_window(incr);
self.pending_local_window_updates.push_back(id);
Ok(())
} else {
Err(error::User::InvalidStreamId.into())
}
}
}
impl<T: StreamTransporter> StreamTransporter for FlowControl<T> {
#[inline]
fn streams(&self) -> &StreamMap {
self.inner.streams()
}
#[inline]
fn streams_mut(&mut self) -> &mut StreamMap {
self.inner.streams_mut()
}
}
impl<T, U> FlowControl<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: StreamTransporter,
{
/// Attempts to send a window update to the remote, if one is pending.
fn poll_sending_window_update(&mut self) -> Poll<(), ConnectionError> {
if let Some(f) = self.sending_window_update.take() {
/// Returns ready when there are no pending window updates to send.
fn poll_send_local_window_updates(&mut self) -> Poll<(), ConnectionError> {
if let Some(f) = self.sending_local_window_update.take() {
if self.inner.start_send(f.into())?.is_not_ready() {
self.sending_window_update = Some(f);
self.sending_local_window_update = Some(f);
return Ok(Async::NotReady);
}
}
while let Some(id) = self.pending_local_window_updates.pop_front() {
let update = self.inner.streams_mut().get_mut(&id)
.and_then(|mut s| s.take_local_window_update())
.map(|incr| frame::WindowUpdate::new(id, incr));
if let Some(f) = update {
if self.inner.start_send(f.into())?.is_not_ready() {
self.sending_local_window_update = Some(f);
return Ok(Async::NotReady);
}
}
}
Ok(Async::Ready(()))
}
}
@@ -136,8 +197,8 @@ impl<T, U> FlowControl<T>
/// > flow-control window and MUST NOT send new flow-controlled frames until it
/// > receives WINDOW_UPDATE frames that cause the flow-control window to become
/// > positive.
impl<T> ConnectionTransporter for FlowControl<T>
where T: ConnectionTransporter,
impl<T> ApplySettings for FlowControl<T>
where T: ApplySettings,
T: StreamTransporter
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
@@ -149,15 +210,13 @@ impl<T> ConnectionTransporter for FlowControl<T>
return Ok(());
}
{
let mut streams = self.streams_mut();
if new_window_size < old_window_size {
let decr = old_window_size - new_window_size;
streams.shrink_all_local_windows(decr);
} else {
let incr = new_window_size - old_window_size;
streams.grow_all_local_windows(incr);
}
let mut streams = self.inner.streams_mut();
if new_window_size < old_window_size {
let decr = old_window_size - new_window_size;
streams.shrink_all_local_windows(decr);
} else {
let incr = new_window_size - old_window_size;
streams.grow_all_local_windows(incr);
}
self.initial_local_window_size = new_window_size;
@@ -173,15 +232,13 @@ impl<T> ConnectionTransporter for FlowControl<T>
return Ok(());
}
{
let mut streams = self.streams_mut();
if new_window_size < old_window_size {
let decr = old_window_size - new_window_size;
streams.shrink_all_remote_windows(decr);
} else {
let incr = new_window_size - old_window_size;
streams.grow_all_remote_windows(incr);
}
let mut streams = self.inner.streams_mut();
if new_window_size < old_window_size {
let decr = old_window_size - new_window_size;
streams.shrink_all_remote_windows(decr);
} else {
let incr = new_window_size - old_window_size;
streams.grow_all_remote_windows(incr);
}
self.initial_remote_window_size = new_window_size;
@@ -189,16 +246,6 @@ impl<T> ConnectionTransporter for FlowControl<T>
}
}
impl<T: StreamTransporter> StreamTransporter for FlowControl<T> {
fn streams(&self) -> &StreamMap {
self.inner.streams()
}
fn streams_mut(&mut self) -> &mut StreamMap {
self.inner.streams_mut()
}
}
impl<T> Stream for FlowControl<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: StreamTransporter,
@@ -230,23 +277,40 @@ impl<T> Stream for FlowControl<T>
impl<T, U> Sink for FlowControl<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
T: StreamTransporter,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, item: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
use frame::Frame::*;
if let &Data(ref v) = &item {
self.claim_remote_window(&v.stream_id(), v.len())?;
if self.poll_send_local_window_updates()?.is_not_ready() {
return Ok(AsyncSink::NotReady(frame));
}
self.inner.start_send(item)
match frame {
Data(v) => {
// Before claiming space, ensure that the transport will accept the frame.
if self.inner.poll_ready()?.is_not_ready() {
return Ok(AsyncSink::NotReady(Data(v)));
}
self.claim_remote_window(&v.stream_id(), v.len())?;
let res = self.inner.start_send(Data(v))?;
assert!(res.is_ready());
Ok(res)
}
frame => self.inner.start_send(frame),
}
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
try_ready!(self.inner.poll_complete());
self.poll_send_local_window_updates()
}
}
@@ -257,6 +321,7 @@ impl<T, U> ReadySink for FlowControl<T>
T: StreamTransporter,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
try_ready!(self.inner.poll_ready());
self.poll_send_local_window_updates()
}
}

View File

@@ -1,7 +1,7 @@
use {hpack, ConnectionError};
use frame::{self, Frame, Kind};
use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE;
use proto::{ConnectionTransporter, ReadySink};
use proto::{ApplySettings, ReadySink};
use futures::*;
@@ -103,7 +103,7 @@ impl<T> FramedRead<T> {
}
}
impl<T: ConnectionTransporter> ConnectionTransporter for FramedRead<T> {
impl<T: ApplySettings> ApplySettings for FramedRead<T> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.get_mut().apply_local_settings(set)
}

View File

@@ -1,6 +1,6 @@
use {hpack, ConnectionError, FrameSize};
use frame::{self, Frame};
use proto::{ConnectionTransporter, ReadySink};
use proto::{ApplySettings, ReadySink};
use futures::*;
use tokio_io::{AsyncRead, AsyncWrite};
@@ -78,7 +78,7 @@ impl<T, B> FramedWrite<T, B>
}
}
impl<T, B> ConnectionTransporter for FramedWrite<T, B> {
impl<T, B> ApplySettings for FramedWrite<T, B> {
fn apply_local_settings(&mut self, _set: &frame::SettingSet) -> Result<(), ConnectionError> {
Ok(())
}

View File

@@ -1,3 +1,12 @@
use {frame, ConnectionError, Peer, StreamId};
use bytes::{Buf, IntoBuf};
use fnv::FnvHasher;
use futures::*;
use ordermap::{Entry, OrderMap};
use std::hash::BuildHasherDefault;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
mod connection;
mod flow_control;
mod flow_controller;
@@ -20,17 +29,6 @@ pub use self::settings::Settings;
pub use self::stream_tracker::StreamTracker;
use self::state::StreamState;
use {frame, ConnectionError, Peer, StreamId};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use bytes::{Buf, IntoBuf};
use ordermap::{Entry, OrderMap};
use fnv::FnvHasher;
use std::hash::BuildHasherDefault;
/// Represents the internals of an HTTP2 connection.
///
/// A transport consists of several layers (_transporters_) and is arranged from _top_
@@ -100,7 +98,7 @@ impl StreamMap {
}
/// Allows settings to be applied from the top of the stack to the lower levels.d
pub trait ConnectionTransporter {
pub trait ApplySettings {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>;
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>;
}
@@ -110,6 +108,11 @@ pub trait StreamTransporter {
fn streams_mut(&mut self) -> &mut StreamMap;
}
pub trait FlowTransporter {
fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError>;
fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>;
}
/// Create a full H2 transport from an I/O handle.
///
/// This is called as the final step of the client handshake future.

View File

@@ -1,7 +1,7 @@
use ConnectionError;
use frame::{Frame, Ping, SettingSet};
use futures::*;
use proto::{ConnectionTransporter, ReadySink};
use proto::{ApplySettings, ReadySink};
/// Acknowledges ping requests from the remote.
#[derive(Debug)]
@@ -22,7 +22,7 @@ impl<T, U> PingPong<T, U>
}
}
impl<T: ConnectionTransporter, U> ConnectionTransporter for PingPong<T, U> {
impl<T: ApplySettings, U> ApplySettings for PingPong<T, U> {
fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}

View File

@@ -1,6 +1,6 @@
use ConnectionError;
use {StreamId, ConnectionError};
use frame::{self, Frame};
use proto::{ConnectionTransporter, ReadySink, StreamMap, StreamTransporter};
use proto::{ApplySettings, ReadySink, StreamMap, StreamTransporter, FlowTransporter, WindowSize};
use futures::*;
use tokio_io::AsyncRead;
@@ -8,6 +8,8 @@ use bytes::BufMut;
use std::io;
// TODO
#[derive(Debug)]
pub struct Settings<T> {
// Upstream transport
@@ -23,7 +25,7 @@ pub struct Settings<T> {
remaining_acks: usize,
// True when the local settings must be flushed to the remote
is_dirty: bool,
is_local_dirty: bool,
// True when we have received a settings frame from the remote.
received_remote: bool,
@@ -38,7 +40,7 @@ impl<T, U> Settings<T>
local: local,
remote: frame::SettingSet::default(),
remaining_acks: 0,
is_dirty: true,
is_local_dirty: true,
received_remote: false,
}
}
@@ -60,18 +62,18 @@ impl<T, U> Settings<T>
local: self.local,
remote: self.remote,
remaining_acks: self.remaining_acks,
is_dirty: self.is_dirty,
is_local_dirty: self.is_local_dirty,
received_remote: self.received_remote,
}
}
fn try_send_pending(&mut self) -> Poll<(), ConnectionError> {
trace!("try_send_pending; dirty={} acks={}", self.is_dirty, self.remaining_acks);
if self.is_dirty {
trace!("try_send_pending; dirty={} acks={}", self.is_local_dirty, self.remaining_acks);
if self.is_local_dirty {
let frame = frame::Settings::new(self.local.clone());
try_ready!(self.try_send(frame));
self.is_dirty = false;
self.is_local_dirty = false;
}
while self.remaining_acks > 0 {
@@ -104,10 +106,20 @@ impl<T: StreamTransporter> StreamTransporter for Settings<T> {
}
}
impl<T: FlowTransporter> FlowTransporter for Settings<T> {
fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> {
self.inner.poll_remote_window_update(id)
}
fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
self.inner.grow_local_window(id, incr)
}
}
impl<T, U> Stream for Settings<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ConnectionTransporter,
T: ApplySettings,
{
type Item = Frame;
type Error = ConnectionError;
@@ -120,14 +132,13 @@ impl<T, U> Stream for Settings<T>
debug!("received remote settings ack");
// TODO: Handle acks
} else {
// Received new settings, queue an ACK
self.remaining_acks += 1;
// Apply the settings before saving them.
// Apply the settings before saving them and sending
// acknowledgements.
let settings = v.into_set();
self.inner.apply_remote_settings(&settings)?;
self.remote = settings;
self.remaining_acks += 1;
let _ = try!(self.try_send_pending());
}
}

View File

@@ -4,8 +4,6 @@ use error::User::InvalidStreamId;
use frame::{self, Frame};
use proto::*;
use futures::*;
use std::marker::PhantomData;
#[derive(Debug)]
@@ -44,10 +42,12 @@ impl<T, P, U> StreamTracker<T, P>
}
impl<T, P> StreamTransporter for StreamTracker<T, P> {
#[inline]
fn streams(&self) -> &StreamMap {
&self.streams
}
#[inline]
fn streams_mut(&mut self) -> &mut StreamMap {
&mut self.streams
}
@@ -71,8 +71,8 @@ impl<T, P> StreamTransporter for StreamTracker<T, P> {
/// > exceed the new value or allow streams to complete.
///
/// This module does NOT close streams when the setting changes.
impl<T, P> ConnectionTransporter for StreamTracker<T, P>
where T: ConnectionTransporter
impl<T, P> ApplySettings for StreamTracker<T, P>
where T: ApplySettings
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.local_max_concurrency = set.max_concurrent_streams();