From 95bb95af01fd8f07190be31c49a8b6d8a306f1e2 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 9 Aug 2017 14:37:41 -0700 Subject: [PATCH] More send flow control --- src/proto/streams/flow_control.rs | 12 ++++++--- src/proto/streams/prioritize.rs | 4 +-- src/proto/streams/send.rs | 42 ++++++++++++++++++++++--------- src/proto/streams/stream.rs | 2 ++ 4 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index f51566c..d54e203 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -24,11 +24,17 @@ impl FlowControl { } pub fn has_capacity(&self) -> bool { - self.window_size > 0 + self.effective_window_size() > 0 } - pub fn window_size(&self) -> WindowSize { - self.window_size + pub fn effective_window_size(&self) -> WindowSize { + let plus = self.window_size + self.next_window_update; + + if self.underflow >= plus { + return 0; + } + + plus - self.underflow } /// Returns true iff `claim_window(sz)` would return succeed. diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 38d46ee..8f449ff 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -32,7 +32,7 @@ impl Prioritize } pub fn available_window(&self) -> WindowSize { - let win = self.flow_control.window_size(); + let win = self.flow_control.effective_window_size(); if self.buffered_data >= win as usize { 0 @@ -109,7 +109,7 @@ impl Prioritize Frame::Data(frame) => { let len = frame.payload().remaining(); - if len > self.flow_control.window_size() as usize { + if len > self.flow_control.effective_window_size() as usize { // TODO: This could be smarter... stream.pending_send.push_front(&mut self.buffer, frame.into()); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 39bd9fc..8b47dc4 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -197,19 +197,37 @@ impl Send where B: Buf { stream: &mut store::Ptr) -> Result<(), ConnectionError> { + let connection = self.prioritize.available_window(); + let unadvertised = stream.unadvertised_send_window; + + let effective_window_size = { + let mut flow = match stream.state.send_flow_control() { + Some(flow) => flow, + None => return Ok(()), + }; + + debug_assert!(unadvertised == 0 || connection == 0); + + // Expand the full window + flow.expand_window(frame.size_increment())?; + flow.effective_window_size() + }; + + if connection < effective_window_size { + stream.unadvertised_send_window = effective_window_size - connection; + + // TODO: Queue the stream in a pending connection capacity list. + } + + if stream.unadvertised_send_window == frame.size_increment() + unadvertised { + // The entire window update is unadvertised, no need to do anything + // else + return Ok(()); + } + + // TODO: Notify the send task that there is additional capacity + unimplemented!(); - /* - if let Some(flow) = stream.send_flow_control() { - // TODO: Handle invalid increment - flow.expand_window(frame.size_increment()); - } - - if let Some(task) = self.blocked.take() { - task.notify(); - } - - Ok(()) - */ } pub fn dec_num_streams(&mut self) { diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 351e055..2853437 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -50,10 +50,12 @@ impl Stream { } } + // TODO: remove? pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> { self.state.send_flow_control() } + // TODO: remove? pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { self.state.recv_flow_control() }