More send flow control

This commit is contained in:
Carl Lerche
2017-08-09 14:37:41 -07:00
parent a8c8cdb8e9
commit 95bb95af01
4 changed files with 43 additions and 17 deletions

View File

@@ -24,11 +24,17 @@ impl FlowControl {
}
pub fn has_capacity(&self) -> bool {
self.window_size > 0
self.effective_window_size() > 0
}
pub fn window_size(&self) -> WindowSize {
self.window_size
pub fn effective_window_size(&self) -> WindowSize {
let plus = self.window_size + self.next_window_update;
if self.underflow >= plus {
return 0;
}
plus - self.underflow
}
/// Returns true iff `claim_window(sz)` would return succeed.

View File

@@ -32,7 +32,7 @@ impl<B> Prioritize<B>
}
pub fn available_window(&self) -> WindowSize {
let win = self.flow_control.window_size();
let win = self.flow_control.effective_window_size();
if self.buffered_data >= win as usize {
0
@@ -109,7 +109,7 @@ impl<B> Prioritize<B>
Frame::Data(frame) => {
let len = frame.payload().remaining();
if len > self.flow_control.window_size() as usize {
if len > self.flow_control.effective_window_size() as usize {
// TODO: This could be smarter...
stream.pending_send.push_front(&mut self.buffer, frame.into());

View File

@@ -197,19 +197,37 @@ impl<B> Send<B> where B: Buf {
stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
let connection = self.prioritize.available_window();
let unadvertised = stream.unadvertised_send_window;
let effective_window_size = {
let mut flow = match stream.state.send_flow_control() {
Some(flow) => flow,
None => return Ok(()),
};
debug_assert!(unadvertised == 0 || connection == 0);
// Expand the full window
flow.expand_window(frame.size_increment())?;
flow.effective_window_size()
};
if connection < effective_window_size {
stream.unadvertised_send_window = effective_window_size - connection;
// TODO: Queue the stream in a pending connection capacity list.
}
if stream.unadvertised_send_window == frame.size_increment() + unadvertised {
// The entire window update is unadvertised, no need to do anything
// else
return Ok(());
}
// TODO: Notify the send task that there is additional capacity
unimplemented!();
/*
if let Some(flow) = stream.send_flow_control() {
// TODO: Handle invalid increment
flow.expand_window(frame.size_increment());
}
if let Some(task) = self.blocked.take() {
task.notify();
}
Ok(())
*/
}
pub fn dec_num_streams(&mut self) {

View File

@@ -50,10 +50,12 @@ impl<B> Stream<B> {
}
}
// TODO: remove?
pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> {
self.state.send_flow_control()
}
// TODO: remove?
pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> {
self.state.recv_flow_control()
}