Prevent trying to assign capacity to streams that were just reset

This commit is contained in:
Sean McArthur
2019-05-31 14:27:46 -07:00
parent 45c4e0336f
commit b8f1f0ccf1
2 changed files with 18 additions and 5 deletions

View File

@@ -352,6 +352,14 @@ impl Prioritize {
None => return, None => return,
}; };
// Streams pending capacity may have been reset before capacity
// became available. In that case, the stream won't want any
// capacity, and so we shouldn't "transition" on it, but just evict
// it and continue the loop.
if !(stream.state.is_send_streaming() || stream.buffered_send_data > 0) {
continue;
}
counts.transition(stream, |_, mut stream| { counts.transition(stream, |_, mut stream| {
// Try to assign capacity to the stream. This will also re-queue the // Try to assign capacity to the stream. This will also re-queue the
// stream if there isn't enough connection level capacity to fulfill // stream if there isn't enough connection level capacity to fulfill
@@ -378,7 +386,8 @@ impl Prioritize {
); );
trace!( trace!(
"try_assign_capacity; requested={}; additional={}; buffered={}; window={}; conn={}", "try_assign_capacity; stream={:?}, requested={}; additional={}; buffered={}; window={}; conn={}",
stream.id,
total_requested, total_requested,
additional, additional,
stream.buffered_send_data, stream.buffered_send_data,
@@ -409,7 +418,11 @@ impl Prioritize {
// TODO: Should prioritization factor into this? // TODO: Should prioritization factor into this?
let assign = cmp::min(conn_available, additional); let assign = cmp::min(conn_available, additional);
trace!(" assigning; num={}", assign); trace!(
" assigning; stream={:?}, capacity={}",
stream.id,
assign,
);
// Assign the capacity to the stream // Assign the capacity to the stream
stream.assign_capacity(assign); stream.assign_capacity(assign);
@@ -419,7 +432,7 @@ impl Prioritize {
} }
trace!( trace!(
"try_assign_capacity; available={}; requested={}; buffered={}; has_unavailable={:?}", "try_assign_capacity(2); available={}; requested={}; buffered={}; has_unavailable={:?}",
stream.send_flow.available(), stream.send_flow.available(),
stream.requested_send_capacity, stream.requested_send_capacity,
stream.buffered_send_data, stream.buffered_send_data,
@@ -604,7 +617,7 @@ impl Prioritize {
} }
pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) { pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
trace!("clear_queue; stream-id={:?}", stream.id); trace!("clear_queue; stream={:?}", stream.id);
// TODO: make this more efficient? // TODO: make this more efficient?
while let Some(frame) = stream.pending_send.pop_front(buffer) { while let Some(frame) = stream.pending_send.pop_front(buffer) {

View File

@@ -70,7 +70,7 @@ where
B: IntoBuf + 'static, B: IntoBuf + 'static,
{ {
fn run<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> { fn run<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> {
use futures::future::{self, Future}; use futures::future;
use futures::future::Either::*; use futures::future::Either::*;
let res = future::poll_fn(|| self.poll()).select2(f).wait(); let res = future::poll_fn(|| self.poll()).select2(f).wait();