New send flow control (#25)

Restructure send flow control such that sending data is always accepted by `Stream`. Data frames will be buffered until there is available window to send them. Producers can monitor the available window capacity to decide if data should be produced.
This commit is contained in:
Carl Lerche
2017-08-21 13:52:58 -07:00
committed by GitHub
parent 41b25a4a56
commit a623ab68b5
13 changed files with 715 additions and 736 deletions

View File

@@ -6,17 +6,14 @@ use std::{fmt, cmp};
#[derive(Debug)]
pub(super) struct Prioritize<B> {
/// Streams that have pending frames
pending_send: store::List<B>,
/// Queue of streams waiting for socket capacity to send a frame
pending_send: store::Queue<B, stream::Next>,
/// Streams that are waiting for connection level flow control capacity
pending_capacity: store::List<B>,
/// Queue of streams waiting for window capacity to produce data.
pending_capacity: store::Queue<B, stream::NextSendCapacity>,
/// Connection level flow control governing sent data
flow_control: FlowControl,
/// Total amount of buffered data in data frames
buffered_data: usize,
flow: FlowControl,
/// Holds frames that are waiting to be written to the socket
buffer: Buffer<B>,
@@ -42,83 +39,226 @@ impl<B> Prioritize<B>
where B: Buf,
{
pub fn new(config: &Config) -> Prioritize<B> {
let mut flow = FlowControl::new();
flow.inc_window(config.init_local_window_sz);
flow.assign_capacity(config.init_local_window_sz);
Prioritize {
pending_send: store::List::new(),
pending_capacity: store::List::new(),
flow_control: FlowControl::new(config.init_local_window_sz),
buffered_data: 0,
pending_send: store::Queue::new(),
pending_capacity: store::Queue::new(),
flow: flow,
buffer: Buffer::new(),
conn_task: None,
}
}
pub fn available_window(&self) -> WindowSize {
let win = self.flow_control.effective_window_size();
if self.buffered_data >= win as usize {
0
} else {
win - self.buffered_data as WindowSize
}
}
pub fn recv_window_update(&mut self, frame: frame::WindowUpdate)
-> Result<(), ConnectionError>
{
// Expand the window
self.flow_control.expand_window(frame.size_increment())?;
// Imediately apply the update
self.flow_control.apply_window_update();
Ok(())
}
/// Queue a frame to be sent to the remote
pub fn queue_frame(&mut self,
frame: Frame<B>,
stream: &mut store::Ptr<B>)
{
if self.queue_frame2(frame, stream) {
// Notification required
if let Some(ref task) = self.conn_task {
task.notify();
}
}
}
/// Queue frame without actually notifying. Returns ture if the queue was
/// succesfful.
fn queue_frame2(&mut self, frame: Frame<B>, stream: &mut store::Ptr<B>)
-> bool
{
self.buffered_data += frame.flow_len();
// queue the frame in the buffer
// Queue the frame in the buffer
stream.pending_send.push_back(&mut self.buffer, frame);
// Queue the stream
!push_sender(&mut self.pending_send, stream)
self.pending_send.push(stream);
// Notify the connection.
if let Some(task) = self.conn_task.take() {
task.notify();
}
}
/// Push the frame to the front of the stream's deque, scheduling the
/// steream if needed.
fn push_back_frame(&mut self, frame: Frame<B>, stream: &mut store::Ptr<B>) {
// Push the frame to the front of the stream's deque
stream.pending_send.push_front(&mut self.buffer, frame);
/// Send a data frame
pub fn send_data(&mut self,
frame: frame::Data<B>,
stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
let sz = frame.payload().remaining();
// If needed, schedule the sender
push_sender(&mut self.pending_capacity, stream);
if sz > MAX_WINDOW_SIZE as usize {
// TODO: handle overflow
unimplemented!();
}
let sz = sz as WindowSize;
if !stream.state.is_send_streaming() {
if stream.state.is_closed() {
return Err(InactiveStreamId.into());
} else {
return Err(UnexpectedFrameType.into());
}
}
// Update the buffered data counter
stream.buffered_send_data += sz;
// Implicitly request more send capacity if not enough has been
// requested yet.
if stream.requested_send_capacity < stream.buffered_send_data {
// Update the target requested capacity
stream.requested_send_capacity = stream.buffered_send_data;
self.try_assign_capacity(stream);
}
if frame.is_end_stream() {
try!(stream.state.send_close());
}
if stream.send_flow.available() > stream.buffered_send_data {
// The stream currently has capacity to send the data frame, so
// queue it up and notify the connection task.
self.queue_frame(frame.into(), stream);
} else {
// The stream has no capacity to send the frame now, save it but
// don't notify the conneciton task. Once additional capacity
// becomes available, the frame will be flushed.
stream.pending_send.push_back(&mut self.buffer, frame.into());
}
Ok(())
}
/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
// Actual capacity is `capacity` + the current amount of buffered data.
// It it were less, then we could never send out the buffered data.
let capacity = capacity + stream.buffered_send_data;
if capacity == stream.requested_send_capacity {
// Nothing to do
return Ok(());
} else if capacity < stream.requested_send_capacity {
// TODO: release capacity
unimplemented!();
} else {
// Update the target requested capacity
stream.requested_send_capacity = capacity;
// Try to assign additional capacity to the stream. If none is
// currently available, the stream will be queued to receive some
// when more becomes available.
self.try_assign_capacity(stream);
Ok(())
}
}
pub fn recv_stream_window_update(&mut self,
inc: WindowSize,
stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
if !stream.state.is_send_streaming() {
return Ok(());
}
// Update the stream level flow control.
stream.send_flow.inc_window(inc)?;
// If the stream is waiting on additional capacity, then this will
// assign it (if available on the connection) and notify the producer
self.try_assign_capacity(stream);
Ok(())
}
pub fn recv_connection_window_update(&mut self,
inc: WindowSize,
store: &mut Store<B>)
-> Result<(), ConnectionError>
{
// Update the connection's window
self.flow.inc_window(inc)?;
// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
let mut stream = match self.pending_capacity.pop(store) {
Some(stream) => stream,
None => return Ok(()),
};
// Try to assign capacity to the stream. This will also re-queue the
// stream if there isn't enough connection level capacity to fulfill
// the capacity request.
self.try_assign_capacity(&mut stream);
}
Ok(())
}
/// Request capacity to send data
fn try_assign_capacity(&mut self, stream: &mut store::Ptr<B>) {
let total_requested = stream.requested_send_capacity;
// Total requested should never go below actual assigned
// (Note: the window size can go lower than assigned)
debug_assert!(total_requested >= stream.send_flow.available());
// The amount of additional capacity that the stream requests.
// Don't assign more than the window has available!
let mut additional = cmp::min(
total_requested - stream.send_flow.available(),
stream.send_flow.window_size());
trace!("try_assign_capacity; requested={}; additional={}; conn={}",
total_requested, additional, self.flow.available());
if additional == 0 {
// Nothing more to do
return;
}
// The amount of currently available capacity on the connection
let conn_available = self.flow.available();
// First check if capacity is immediately available
if conn_available > 0 {
// There should be no streams pending capacity
debug_assert!(self.pending_capacity.is_empty());
// The amount of capacity to assign to the stream
// TODO: Should prioritization factor into this?
let assign = cmp::min(conn_available, additional);
// Assign the capacity to the stream
stream.assign_capacity(assign);
// Claim the capacity from the connection
self.flow.claim_capacity(assign);
}
if stream.send_flow.available() < stream.requested_send_capacity {
if stream.send_flow.has_unavailable() {
// The stream requires additional capacity and the stream's
// window has availablel capacity, but the connection window
// does not.
//
// In this case, the stream needs to be queued up for when the
// connection has more capacity.
self.pending_capacity.push(stream);
}
}
// If data is buffered, then schedule the stream for execution
if stream.buffered_send_data > 0 {
self.pending_send.push(stream);
}
}
pub fn poll_complete<T>(&mut self,
store: &mut Store<B>,
dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
{
// Track the task
self.conn_task = Some(task::current());
// Ensure codec is ready
try_ready!(dst.poll_ready());
@@ -129,16 +269,10 @@ impl<B> Prioritize<B>
let max_frame_len = dst.max_send_frame_size();
trace!("poll_complete");
loop {
match self.pop_frame(store, max_frame_len) {
Some(frame) => {
// Figure out the byte size this frame applies to flow
// control
let len = cmp::min(frame.flow_len(), max_frame_len);
// Subtract the data size
self.buffered_data -= len;
trace!("writing frame={:?}", frame);
let res = dst.start_send(frame)?;
@@ -160,6 +294,9 @@ impl<B> Prioritize<B>
// This might release a data frame...
if !self.reclaim_frame(store, dst) {
// Nothing else to do, track the task
self.conn_task = Some(task::current());
return Ok(().into());
}
@@ -170,84 +307,13 @@ impl<B> Prioritize<B>
}
}
fn pop_frame(&mut self, store: &mut Store<B>, max_len: usize)
-> Option<Frame<Prioritized<B>>>
{
loop {
match self.pop_sender(store) {
Some(mut stream) => {
let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() {
Frame::Data(frame) => {
let len = frame.payload().remaining();
if len > self.flow_control.effective_window_size() as usize {
// TODO: This could be smarter...
self.push_back_frame(frame.into(), &mut stream);
// Try again w/ the next stream
continue;
}
frame.into()
}
frame => frame,
};
if !stream.pending_send.is_empty() {
push_sender(&mut self.pending_send, &mut stream);
}
let frame = match frame {
Frame::Data(mut frame) => {
let eos = frame.is_end_stream();
if frame.payload().remaining() > max_len {
frame.unset_end_stream();
}
Frame::Data(frame.map(|buf| {
Prioritized {
inner: buf.take(max_len),
end_of_stream: eos,
stream: stream.key(),
}
}))
}
frame => frame.map(|_| unreachable!()),
};
return Some(frame);
}
None => return None,
}
}
}
fn pop_sender<'a>(&mut self, store: &'a mut Store<B>) -> Option<store::Ptr<'a, B>> {
// If the connection level window has capacity, pop off of the pending
// capacity list first.
if self.flow_control.has_capacity() && !self.pending_capacity.is_empty() {
let mut stream = self.pending_capacity
.pop::<stream::Next>(store)
.unwrap();
stream.is_pending_send = false;
Some(stream)
} else {
let stream = self.pending_send
.pop::<stream::Next>(store);
match stream {
Some(mut stream) => {
stream.is_pending_send = false;
Some(stream)
}
None => None,
}
}
}
/// Tries to reclaim a pending data frame from the codec.
///
/// Returns true if a frame was reclaimed.
///
/// When a data frame is written to the codec, it may not be written in its
/// entirety (large chunks are split up into potentially many data frames).
/// In this case, the stream needs to be reprioritized.
fn reclaim_frame<T>(&mut self,
store: &mut Store<B>,
dst: &mut Codec<T, Prioritized<B>>) -> bool
@@ -282,21 +348,100 @@ impl<B> Prioritize<B>
false
}
}
/// Push the stream onto the `pending_send` list. Returns true if the sender was
/// not already queued.
fn push_sender<B>(list: &mut store::List<B>, stream: &mut store::Ptr<B>)
-> bool
{
if stream.is_pending_send {
return false;
/// Push the frame to the front of the stream's deque, scheduling the
/// steream if needed.
fn push_back_frame(&mut self, frame: Frame<B>, stream: &mut store::Ptr<B>) {
// Push the frame to the front of the stream's deque
stream.pending_send.push_front(&mut self.buffer, frame);
// If needed, schedule the sender
self.pending_send.push(stream);
}
list.push::<stream::Next>(stream);
stream.is_pending_send = true;
// =========== OLD JUNK ===========
true
fn pop_frame(&mut self, store: &mut Store<B>, max_len: usize)
-> Option<Frame<Prioritized<B>>>
{
loop {
trace!("pop frame");
match self.pending_send.pop(store) {
Some(mut stream) => {
let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() {
Frame::Data(mut frame) => {
trace!(" --> data frame");
// Get the amount of capacity remaining for stream's
// window.
//
// TODO: Is this the right thing to check?
let stream_capacity = stream.send_flow.window_size();
if stream_capacity == 0 {
trace!(" --> stream capacity is 0, return");
// The stream has no more capacity, this can
// happen if the remote reduced the stream
// window. In this case, we need to buffer the
// frame and wait for a window update...
stream.pending_send.push_front(&mut self.buffer, frame.into());
continue;
}
// Only send up to the max frame length
let len = cmp::min(
frame.payload().remaining(),
max_len);
// Only send up to the stream's window capacity
let len = cmp::min(len, stream_capacity as usize);
// There *must* be be enough connection level
// capacity at this point.
debug_assert!(len <= self.flow.window_size() as usize);
// Update the flow control
trace!(" -- updating stream flow --");
stream.send_flow.send_data(len as WindowSize);
// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
// line.
self.flow.assign_capacity(len as WindowSize);
trace!(" -- updating connection flow --");
self.flow.send_data(len as WindowSize);
// Wrap the frame's data payload to ensure that the
// correct amount of data gets written.
let eos = frame.is_end_stream();
if frame.payload().remaining() > len {
frame.unset_end_stream();
}
Frame::Data(frame.map(|buf| {
Prioritized {
inner: buf.take(len),
end_of_stream: eos,
stream: stream.key(),
}
}))
}
frame => frame.map(|_| unreachable!()),
};
if !stream.pending_send.is_empty() {
self.pending_send.push(&mut stream);
}
return Some(frame);
}
None => return None,
}
}
}
}
// ===== impl Prioritized =====