From 7e8c7fd2b8e82ed7d3babf93d61462f19dc59582 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 23 Aug 2017 20:34:58 -0700 Subject: [PATCH] Use `FlowControl::available` to size data frames (#29) --- src/frame/data.rs | 31 +++++++++++++++- src/frame/mod.rs | 2 +- src/proto/streams/flow_control.rs | 2 + src/proto/streams/prioritize.rs | 62 ++++++++++++++++++++++++------- 4 files changed, 81 insertions(+), 16 deletions(-) diff --git a/src/frame/data.rs b/src/frame/data.rs index 3dc18e3..7d3ea80 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -1,7 +1,8 @@ use frame::{util, Frame, Head, Error, StreamId, Kind}; use bytes::{BufMut, Bytes, Buf}; -#[derive(Debug)] +use std::fmt; + pub struct Data { stream_id: StreamId, data: T, @@ -9,7 +10,7 @@ pub struct Data { pad_len: Option, } -#[derive(Debug, Copy, Clone, Eq, PartialEq)] +#[derive(Copy, Clone, Eq, PartialEq)] pub struct DataFlag(u8); const END_STREAM: u8 = 0x1; @@ -112,6 +113,16 @@ impl From> for Frame { } } +impl fmt::Debug for Data { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Data") + .field("stream_id", &self.stream_id) + .field("flags", &self.flags) + .field("pad_len", &self.pad_len) + .finish() + } +} + // ===== impl DataFlag ===== impl DataFlag { @@ -156,3 +167,19 @@ impl From for u8 { src.0 } } + +impl fmt::Debug for DataFlag { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let mut f = fmt.debug_struct("DataFlag"); + + if self.is_end_stream() { + f.field("end_stream", &true); + } + + if self.is_padded() { + f.field("padded", &true); + } + + f.finish() + } +} diff --git a/src/frame/mod.rs b/src/frame/mod.rs index c310850..9c4999a 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -117,7 +117,7 @@ impl fmt::Debug for Frame { use self::Frame::*; match *self { - Data(..) => write!(fmt, "Frame::Data(..)"), + Data(ref frame) => write!(fmt, "Frame::Data({:?})", frame), Headers(ref frame) => write!(fmt, "Frame::Headers({:?})", frame), Priority(ref frame) => write!(fmt, "Frame::Priority({:?})", frame), PushPromise(ref frame) => write!(fmt, "Frame::PushPromise({:?})", frame), diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index d404474..19eb2a1 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -93,6 +93,8 @@ impl FlowControl { return Err(FlowControlError.into()); } + trace!("inc_window; sz={}; old={}; new={}", sz, self.window_size, val); + self.window_size = val; Ok(()) } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index ba30461..cdaad1f 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -96,6 +96,9 @@ impl Prioritize // Update the buffered data counter stream.buffered_send_data += sz; + trace!("send_data; sz={}; buffered={}; requested={}", + sz, stream.buffered_send_data, stream.requested_send_capacity); + // Implicitly request more send capacity if not enough has been // requested yet. if stream.requested_send_capacity < stream.buffered_send_data { @@ -109,7 +112,11 @@ impl Prioritize try!(stream.state.send_close()); } - if stream.send_flow.available() > stream.buffered_send_data { + trace!("send_data (2); available={}; buffered={}", + stream.send_flow.available(), + stream.buffered_send_data); + + if stream.send_flow.available() >= stream.buffered_send_data { // The stream currently has capacity to send the data frame, so // queue it up and notify the connection task. self.queue_frame(frame.into(), stream, task); @@ -118,6 +125,8 @@ impl Prioritize // don't notify the conneciton task. Once additional capacity // becomes available, the frame will be flushed. stream.pending_send.push_back(&mut self.buffer, frame.into()); + + debug_assert!(stream.is_pending_send_capacity); } Ok(()) @@ -172,6 +181,7 @@ impl Prioritize { // Update the connection's window self.flow.inc_window(inc)?; + self.flow.assign_capacity(inc)?; // Assign newly acquired capacity to streams pending capacity. while self.flow.available() > 0 { @@ -232,6 +242,12 @@ impl Prioritize self.flow.claim_capacity(assign); } + trace!("try_assign_capacity; available={}; requested={}; buffered={}; has_unavailable={:?}", + stream.send_flow.available(), + stream.requested_send_capacity, + stream.buffered_send_data, + stream.send_flow.has_unavailable()); + if stream.send_flow.available() < stream.requested_send_capacity { if stream.send_flow.has_unavailable() { // The stream requires additional capacity and the stream's @@ -246,6 +262,7 @@ impl Prioritize // If data is buffered, then schedule the stream for execution if stream.buffered_send_data > 0 { + debug_assert!(stream.send_flow.available() > 0); self.pending_send.push(stream); } } @@ -317,7 +334,7 @@ impl Prioritize // First check if there are any data chunks to take back if let Some(frame) = dst.take_last_data_frame() { - trace!(" -> reclaimed; frame={:?}", frame); + trace!(" -> reclaimed; frame={:?}; sz={}", frame, frame.payload().remaining()); let mut eos = false; let key = frame.payload().stream; @@ -351,11 +368,11 @@ impl Prioritize stream.pending_send.push_front(&mut self.buffer, frame); // If needed, schedule the sender - self.pending_send.push(stream); + if stream.send_flow.available() > 0 { + self.pending_send.push(stream); + } } - // =========== OLD JUNK =========== - fn pop_frame(&mut self, store: &mut Store, max_len: usize) -> Option>> { @@ -365,16 +382,33 @@ impl Prioritize Some(mut stream) => { let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { Frame::Data(mut frame) => { - trace!(" --> data frame"); - // Get the amount of capacity remaining for stream's // window. // // TODO: Is this the right thing to check? - let stream_capacity = stream.send_flow.window_size(); + let stream_capacity = stream.send_flow.available(); + let sz = frame.payload().remaining(); + + trace!(" --> data frame; stream={:?}; sz={}; eos={:?}; window={}; available={}; requested={}", + frame.stream_id(), + sz, + frame.is_end_stream(), + stream_capacity, + stream.send_flow.available(), + stream.requested_send_capacity); + + // Zero length data frames always have capacity to + // be sent. + if sz > 0 && stream_capacity == 0 { + trace!(" --> stream capacity is 0; requested={}", + stream.requested_send_capacity); + + // Ensure that the stream is waiting for + // connection level capacity + // + // TODO: uncomment + // debug_assert!(stream.is_pending_send_capacity); - if stream_capacity == 0 { - trace!(" --> stream capacity is 0, return"); // The stream has no more capacity, this can // happen if the remote reduced the stream // window. In this case, we need to buffer the @@ -384,9 +418,7 @@ impl Prioritize } // Only send up to the max frame length - let len = cmp::min( - frame.payload().remaining(), - max_len); + let len = cmp::min(sz, max_len); // Only send up to the stream's window capacity let len = cmp::min(len, stream_capacity as usize); @@ -428,6 +460,10 @@ impl Prioritize }; if !stream.pending_send.is_empty() { + // TODO: Only requeue the sender IF it is ready to send + // the next frame. i.e. don't requeue it if the next + // frame is a data frame and the stream does not have + // any more capacity. self.pending_send.push(&mut stream); }