WIP: send flow control

This commit is contained in:
Carl Lerche
2017-08-09 14:16:32 -07:00
parent 87c4d36b0c
commit dfec401fdf
9 changed files with 170 additions and 89 deletions

View File

@@ -1,7 +1,7 @@
use hpack;
use error::{ConnectionError, Reason};
use bytes::Bytes;
use bytes::{Bytes, Buf};
use std::fmt;
@@ -78,6 +78,18 @@ impl<T> Frame<T> {
}
}
impl<T: Buf> Frame<T> {
/// Returns the length of the frame as it applies to flow control.
pub fn flow_len(&self) -> usize {
use self::Frame::*;
match *self {
Data(ref frame) => frame.payload().remaining(),
_ => 0,
}
}
}
impl<T> fmt::Debug for Frame<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::Frame::*;

View File

@@ -47,23 +47,6 @@ impl<T, P, B> Connection<T, P, B>
}
}
/// Polls for the next update to a remote flow control window.
pub fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
self.streams.poll_window_update()
}
/// Increases the capacity of a local flow control window.
///
/// # Panics
///
/// THis function panics if `incr` is not a valid window size.
pub fn expand_window(&mut self, id: StreamId, incr: usize)
-> Result<(), ConnectionError>
{
assert!(incr <= MAX_WINDOW_SIZE as usize);
self.streams.expand_window(id, incr as WindowSize)
}
pub fn update_local_settings(&mut self, _local: frame::SettingSet) -> Result<(), ConnectionError> {
unimplemented!();
}
@@ -149,8 +132,7 @@ impl<T, P, B> Connection<T, P, B>
}
Some(WindowUpdate(frame)) => {
trace!("recv WINDOW_UPDATE; frame={:?}", frame);
// TODO: implement
// try!(self.streams.recv_window_update(frame));
self.streams.recv_window_update(frame)?;
}
None => {
// TODO: Is this correct?

View File

@@ -70,6 +70,26 @@ impl<B> Deque<B> {
}
}
pub fn push_front(&mut self, buf: &mut Buffer<B>, frame: Frame<B>) {
let key = buf.slab.insert(Slot {
frame,
next: None,
});
match self.indices {
Some(ref mut idxs) => {
buf.slab[key].next = Some(idxs.head);
idxs.head = key;
}
None => {
self.indices = Some(Indices {
head: key,
tail: key,
});
}
}
}
pub fn pop_front(&mut self, buf: &mut Buffer<B>) -> Option<Frame<B>> {
match self.indices {
Some(mut idxs) => {

View File

@@ -9,8 +9,8 @@ pub struct FlowControl {
/// Amount to be removed by future increments.
underflow: WindowSize,
/// The amount that has been incremented but not yet advertised (to the application or
/// the remote).
/// The amount that has been incremented but not yet advertised (to the
/// application or the remote).
next_window_update: WindowSize,
}
@@ -23,6 +23,14 @@ impl FlowControl {
}
}
pub fn has_capacity(&self) -> bool {
self.window_size > 0
}
pub fn window_size(&self) -> WindowSize {
self.window_size
}
/// Returns true iff `claim_window(sz)` would return succeed.
pub fn ensure_window<T>(&mut self, sz: WindowSize, err: T) -> Result<(), ConnectionError>
where T: Into<ConnectionError>,
@@ -49,7 +57,10 @@ impl FlowControl {
}
/// Increase the _unadvertised_ window capacity.
pub fn expand_window(&mut self, sz: WindowSize) {
pub fn expand_window(&mut self, sz: WindowSize)
-> Result<(), ConnectionError>
{
// TODO: Handle invalid increment
if sz <= self.underflow {
self.underflow -= sz;
return;
@@ -60,6 +71,7 @@ impl FlowControl {
self.underflow = 0;
}
/*
/// Obtains the unadvertised window update.
///
/// This does not apply the window update to `self`.
@@ -70,6 +82,7 @@ impl FlowControl {
Some(self.next_window_update)
}
}
*/
/// Obtains and applies an unadvertised window update.
pub fn apply_window_update(&mut self) -> Option<WindowSize> {

View File

@@ -2,8 +2,18 @@ use super::*;
#[derive(Debug)]
pub(super) struct Prioritize<B> {
/// Streams that have pending frames
pending_send: store::List<B>,
/// Streams that are waiting for connection level flow control capacity
pending_capacity: store::List<B>,
/// Connection level flow control governing sent data
flow_control: FlowControl,
/// Total amount of buffered data in data frames
buffered_data: usize,
/// Holds frames that are waiting to be written to the socket
buffer: Buffer<B>,
}
@@ -11,17 +21,44 @@ pub(super) struct Prioritize<B> {
impl<B> Prioritize<B>
where B: Buf,
{
pub fn new() -> Prioritize<B> {
pub fn new(config: &Config) -> Prioritize<B> {
Prioritize {
pending_send: store::List::new(),
pending_capacity: store::List::new(),
flow_control: FlowControl::new(config.init_local_window_sz),
buffered_data: 0,
buffer: Buffer::new(),
}
}
pub fn available_window(&self) -> WindowSize {
let win = self.flow_control.window_size();
if self.buffered_data >= win as usize {
0
} else {
win - self.buffered_data as WindowSize
}
}
pub fn recv_window_update(&mut self, frame: frame::WindowUpdate)
-> Result<(), ConnectionError>
{
// Expand the window
self.flow_control.expand_window(frame.size_increment())?;
// Imediately apply the update
self.flow.apply_window_update();
Ok(())
}
pub fn queue_frame(&mut self,
frame: Frame<B>,
stream: &mut store::Ptr<B>)
{
self.buffered_data += frame.flow_len();
// queue the frame in the buffer
stream.pending_send.push_back(&mut self.buffer, frame);
@@ -33,7 +70,7 @@ impl<B> Prioritize<B>
}
// Queue the stream
self.push_sender(stream);
push_sender(&mut self.pending_send, stream);
}
pub fn poll_complete<T>(&mut self,
@@ -48,7 +85,9 @@ impl<B> Prioritize<B>
match self.pop_frame(store) {
Some(frame) => {
// TODO: data frames should be handled specially...
// Subtract the data size
self.buffered_data -= frame.flow_len();
let res = dst.start_send(frame)?;
// We already verified that `dst` is ready to accept the
@@ -63,35 +102,63 @@ impl<B> Prioritize<B>
}
fn pop_frame(&mut self, store: &mut Store<B>) -> Option<Frame<B>> {
match self.pop_sender(store) {
Some(mut stream) => {
let frame = stream.pending_send.pop_front(&mut self.buffer).unwrap();
loop {
match self.pop_sender(store) {
Some(mut stream) => {
let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() {
Frame::Data(frame) => {
let len = frame.payload().remaining();
if !stream.pending_send.is_empty() {
self.push_sender(&mut stream);
if len > self.flow_control.window_size() as usize {
// TODO: This could be smarter...
stream.pending_send.push_front(&mut self.buffer, frame.into());
// Push the stream onto the list of streams
// waiting for connection capacity
push_sender(&mut self.pending_capacity, &mut stream);
// Try again w/ the next stream
continue;
}
frame.into()
}
frame => frame,
};
if !stream.pending_send.is_empty() {
push_sender(&mut self.pending_send, &mut stream);
}
return Some(frame);
}
Some(frame)
None => return None,
}
None => None,
}
}
fn push_sender(&mut self, stream: &mut store::Ptr<B>) {
debug_assert!(!stream.is_pending_send);
self.pending_send.push(stream);
stream.is_pending_send = true;
}
fn pop_sender<'a>(&mut self, store: &'a mut Store<B>) -> Option<store::Ptr<'a, B>> {
match self.pending_send.pop(store) {
Some(mut stream) => {
stream.is_pending_send = false;
Some(stream)
// If the connection level window has capacity, pop off of the pending
// capacity list first.
if self.flow_control.has_capacity() && !self.pending_capacity.is_empty() {
let mut stream = self.pending_capacity.pop(store).unwrap();
stream.is_pending_send = false;
Some(stream)
} else {
match self.pending_send.pop(store) {
Some(mut stream) => {
stream.is_pending_send = false;
Some(stream)
}
None => None,
}
None => None,
}
}
}
fn push_sender<B>(list: &mut store::List<B>, stream: &mut store::Ptr<B>) {
debug_assert!(!stream.is_pending_send);
list.push(stream);
stream.is_pending_send = true;
}

View File

@@ -360,6 +360,7 @@ impl<B> Recv<B> where B: Buf {
Ok(())
}
/*
/// Send connection level window update
pub fn send_connection_window_update<T>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
@@ -377,6 +378,7 @@ impl<B> Recv<B> where B: Buf {
Ok(().into())
}
*/
pub fn next_incoming(&mut self, store: &mut Store<B>) -> Option<store::Key> {
self.pending_accept.pop(store)
@@ -413,6 +415,7 @@ impl<B> Recv<B> where B: Buf {
}
}
/*
/// Send stream level window update
pub fn send_stream_window_update<T>(&mut self,
streams: &mut Store<B>,
@@ -441,6 +444,7 @@ impl<B> Recv<B> where B: Buf {
Ok(().into())
}
*/
fn reset(&mut self, _stream_id: StreamId, _reason: Reason) {
unimplemented!();

View File

@@ -9,6 +9,7 @@ use bytes::Buf;
use std::collections::VecDeque;
use std::marker::PhantomData;
/// Manages state transitions related to outbound frames.
#[derive(Debug)]
pub(super) struct Send<B> {
/// Maximum number of locally initiated streams
@@ -23,19 +24,7 @@ pub(super) struct Send<B> {
/// Initial window size of locally initiated streams
init_window_sz: WindowSize,
/// Connection level flow control governing sent data
flow_control: FlowControl,
/// Holds the list of streams on which local window updates may be sent.
// XXX It would be cool if this didn't exist.
pending_window_updates: VecDeque<StreamId>,
prioritize: Prioritize<B>,
/// When `poll_window_update` is not ready, then the calling task is saved to
/// be notified later. Access to poll_window_update must not be shared across tasks,
/// as we only track a single task (and *not* i.e. a task per stream id).
blocked: Option<task::Task>,
}
impl<B> Send<B> where B: Buf {
@@ -53,10 +42,7 @@ impl<B> Send<B> where B: Buf {
num_streams: 0,
next_stream_id: next_stream_id.into(),
init_window_sz: config.init_local_window_sz,
flow_control: FlowControl::new(config.init_local_window_sz),
prioritize: Prioritize::new(),
pending_window_updates: VecDeque::new(),
blocked: None,
prioritize: Prioritize::new(config),
}
}
@@ -119,15 +105,16 @@ impl<B> Send<B> where B: Buf {
// Make borrow checker happy
loop {
let unadvertised = stream.unadvertised_send_window;
match stream.send_flow_control() {
Some(flow) => {
try!(self.flow_control.ensure_window(sz, FlowControlViolation));
// Ensure that the size fits within the advertised size
try!(flow.ensure_window(
sz + unadvertised, FlowControlViolation));
// Claim the window on the stream
try!(flow.claim_window(sz, FlowControlViolation));
// Claim the window on the connection
self.flow_control.claim_window(sz, FlowControlViolation)
// Now, claim the window on the stream
flow.claim_window(sz, FlowControlViolation)
.expect("local connection flow control error");
break;
@@ -160,6 +147,7 @@ impl<B> Send<B> where B: Buf {
self.prioritize.poll_complete(store, dst)
}
/*
/// Get pending window updates
pub fn poll_window_update(&mut self, streams: &mut Store<B>)
-> Poll<WindowUpdate, ConnectionError>
@@ -191,16 +179,15 @@ impl<B> Send<B> where B: Buf {
return Ok(Async::NotReady);
}
*/
pub fn recv_connection_window_update(&mut self, frame: frame::WindowUpdate)
-> Result<(), ConnectionError>
{
// TODO: Handle invalid increment
self.flow_control.expand_window(frame.size_increment());
self.priority.recv_window_update(frame)?;
if let Some(task) = self.blocked.take() {
task.notify();
}
// TODO: If there is available connection capacity, release pending
// streams.
Ok(())
}
@@ -210,6 +197,8 @@ impl<B> Send<B> where B: Buf {
stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
unimplemented!();
/*
if let Some(flow) = stream.send_flow_control() {
// TODO: Handle invalid increment
flow.expand_window(frame.size_increment());
@@ -220,6 +209,7 @@ impl<B> Send<B> where B: Buf {
}
Ok(())
*/
}
pub fn dec_num_streams(&mut self) {

View File

@@ -28,6 +28,11 @@ pub(super) struct Stream<B> {
/// True if the stream is currently pending send
pub is_pending_send: bool,
/// A stream's capacity is never advertised past the connection's capacity.
/// This value represents the amount of the stream window that has been
/// temporarily withheld.
pub unadvertised_send_window: WindowSize,
}
impl<B> Stream<B> {
@@ -41,6 +46,7 @@ impl<B> Stream<B> {
next: None,
pending_push_promises: store::List::new(),
is_pending_send: false,
unadvertised_send_window: 0,
}
}

View File

@@ -154,7 +154,8 @@ impl<B> Streams<B>
}
pub fn recv_window_update(&mut self, frame: frame::WindowUpdate)
-> Result<(), ConnectionError> {
-> Result<(), ConnectionError>
{
let id = frame.stream_id();
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -238,14 +239,6 @@ impl<B> Streams<B>
})
}
pub fn poll_window_update(&mut self)
-> Poll<WindowUpdate, ConnectionError>
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
me.actions.send.poll_window_update(&mut me.store)
}
pub fn expand_window(&mut self, id: StreamId, sz: WindowSize)
-> Result<(), ConnectionError>
{
@@ -279,12 +272,6 @@ impl<B> Streams<B>
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
// TODO: sending window updates should be part of Prioritize
/*
try_ready!(me.actions.recv.send_connection_window_update(dst));
try_ready!(me.actions.recv.send_stream_window_update(&mut me.store, dst));
*/
me.actions.send.poll_complete(&mut me.store, dst)
}
}