Prevent pushing a stream into both pending_send + pending_open (#235)
Prevent pushing a stream into both pending_send + pending_open, Clear out variables from buffered streams that get a reset, and ignore them when traversing the pending_send queue if they are is_reset(). Add asserts that a stream cannot be in pending_open & pending_send at the same time.
This commit is contained in:
committed by
Carl Lerche
parent
200c04f1d3
commit
bbed41974b
@@ -398,8 +398,19 @@ impl Prioritize {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If data is buffered, then schedule the stream for execution
|
// If data is buffered and the stream is not pending open, then
|
||||||
if stream.buffered_send_data > 0 {
|
// schedule the stream for execution
|
||||||
|
//
|
||||||
|
// Why do we not push into pending_send when the stream is in pending_open?
|
||||||
|
//
|
||||||
|
// We allow users to call send_request() which schedules a stream to be pending_open
|
||||||
|
// if there is no room according to the concurrency limit (max_send_streams), and we
|
||||||
|
// also allow data to be buffered for send with send_data() if there is no capacity for
|
||||||
|
// the stream to send the data, which attempts to place the stream in pending_send.
|
||||||
|
// If the stream is not open, we don't want the stream to be scheduled for
|
||||||
|
// execution (pending_send). Note that if the stream is in pending_open, it will be
|
||||||
|
// pushed to pending_send when there is room for an open stream.
|
||||||
|
if stream.buffered_send_data > 0 && !stream.is_pending_open {
|
||||||
// TODO: This assertion isn't *exactly* correct. There can still be
|
// TODO: This assertion isn't *exactly* correct. There can still be
|
||||||
// buffered send data while the stream's pending send queue is
|
// buffered send data while the stream's pending send queue is
|
||||||
// empty. This can happen when a large data frame is in the process
|
// empty. This can happen when a large data frame is in the process
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
use http;
|
|
||||||
use super::*;
|
|
||||||
use codec::{RecvError, UserError};
|
use codec::{RecvError, UserError};
|
||||||
use codec::UserError::*;
|
use codec::UserError::*;
|
||||||
use frame::{self, Reason};
|
use frame::{self, Reason};
|
||||||
|
use http;
|
||||||
|
use super::*;
|
||||||
|
|
||||||
use bytes::Buf;
|
use bytes::Buf;
|
||||||
|
|
||||||
|
|||||||
@@ -336,6 +336,11 @@ impl store::Next for NextSend {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn set_queued(stream: &mut Stream, val: bool) {
|
fn set_queued(stream: &mut Stream, val: bool) {
|
||||||
|
if val {
|
||||||
|
// ensure that stream is not queued for being opened
|
||||||
|
// if it's being put into queue for sending data
|
||||||
|
debug_assert_eq!(stream.is_pending_open, false);
|
||||||
|
}
|
||||||
stream.is_pending_send = val;
|
stream.is_pending_send = val;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -402,6 +407,11 @@ impl store::Next for NextOpen {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn set_queued(stream: &mut Stream, val: bool) {
|
fn set_queued(stream: &mut Stream, val: bool) {
|
||||||
|
if val {
|
||||||
|
// ensure that stream is not queued for being sent
|
||||||
|
// if it's being put into queue for opening the stream
|
||||||
|
debug_assert_eq!(stream.is_pending_send, false);
|
||||||
|
}
|
||||||
stream.is_pending_open = val;
|
stream.is_pending_open = val;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user