Do not reuse next ptr for multiple linked lists
Because, you might think that each linked list has exclusive access to the next pointer, but then there is an edge case that proves otherwise. Also, debugging this kind of thing is annoying.
This commit is contained in:
@@ -33,9 +33,7 @@ pub fn main() {
|
|||||||
|
|
||||||
|
|
||||||
let connection = Server::handshake(socket)
|
let connection = Server::handshake(socket)
|
||||||
.then(|res| {
|
.and_then(|conn| {
|
||||||
let conn = res.unwrap();
|
|
||||||
|
|
||||||
println!("H2 connection bound");
|
println!("H2 connection bound");
|
||||||
|
|
||||||
conn.for_each(|(request, mut stream)| {
|
conn.for_each(|(request, mut stream)| {
|
||||||
@@ -45,12 +43,14 @@ pub fn main() {
|
|||||||
.status(status::OK)
|
.status(status::OK)
|
||||||
.body(()).unwrap();
|
.body(()).unwrap();
|
||||||
|
|
||||||
if let Err(e) = stream.send_response(response, true) {
|
if let Err(e) = stream.send_response(response, false) {
|
||||||
println!(" error responding; err={:?}", e);
|
println!(" error responding; err={:?}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
println!(">>>> sending data");
|
println!(">>>> sending data");
|
||||||
stream.send_data(Bytes::from_static(b"hello world"), true).unwrap();
|
if let Err(e) = stream.send_data(Bytes::from_static(b"hello world"), true) {
|
||||||
|
println!(" -> err={:?}", e);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}).and_then(|_| {
|
}).and_then(|_| {
|
||||||
@@ -59,7 +59,10 @@ pub fn main() {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
.then(|res| {
|
.then(|res| {
|
||||||
let _ = res.unwrap();
|
if let Err(e) = res {
|
||||||
|
println!(" -> err={:?}", e);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
;
|
;
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use std::{fmt, cmp};
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(super) struct Prioritize<B> {
|
pub(super) struct Prioritize<B> {
|
||||||
/// Queue of streams waiting for socket capacity to send a frame
|
/// Queue of streams waiting for socket capacity to send a frame
|
||||||
pending_send: store::Queue<B, stream::Next>,
|
pending_send: store::Queue<B, stream::NextSend>,
|
||||||
|
|
||||||
/// Queue of streams waiting for window capacity to produce data.
|
/// Queue of streams waiting for window capacity to produce data.
|
||||||
pending_capacity: store::Queue<B, stream::NextSendCapacity>,
|
pending_capacity: store::Queue<B, stream::NextSendCapacity>,
|
||||||
@@ -125,8 +125,6 @@ impl<B> Prioritize<B>
|
|||||||
// don't notify the conneciton task. Once additional capacity
|
// don't notify the conneciton task. Once additional capacity
|
||||||
// becomes available, the frame will be flushed.
|
// becomes available, the frame will be flushed.
|
||||||
stream.pending_send.push_back(&mut self.buffer, frame.into());
|
stream.pending_send.push_back(&mut self.buffer, frame.into());
|
||||||
|
|
||||||
debug_assert!(stream.is_pending_send_capacity);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -213,8 +211,8 @@ impl<B> Prioritize<B>
|
|||||||
total_requested - stream.send_flow.available(),
|
total_requested - stream.send_flow.available(),
|
||||||
stream.send_flow.window_size());
|
stream.send_flow.window_size());
|
||||||
|
|
||||||
trace!("try_assign_capacity; requested={}; additional={}; conn={}",
|
trace!("try_assign_capacity; requested={}; additional={}; window={}; conn={}",
|
||||||
total_requested, additional, self.flow.available());
|
total_requested, additional, stream.send_flow.window_size(), self.flow.available());
|
||||||
|
|
||||||
if additional == 0 {
|
if additional == 0 {
|
||||||
// Nothing more to do
|
// Nothing more to do
|
||||||
@@ -373,13 +371,25 @@ impl<B> Prioritize<B>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn clear_queue(&mut self, stream: &mut store::Ptr<B>) {
|
||||||
|
trace!("clear_queue; stream-id={:?}", stream.id);
|
||||||
|
|
||||||
|
// TODO: make this more efficient?
|
||||||
|
while let Some(frame) = stream.pending_send.pop_front(&mut self.buffer) {
|
||||||
|
trace!("dropping; frame={:?}", frame);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn pop_frame(&mut self, store: &mut Store<B>, max_len: usize)
|
fn pop_frame(&mut self, store: &mut Store<B>, max_len: usize)
|
||||||
-> Option<Frame<Prioritized<B>>>
|
-> Option<Frame<Prioritized<B>>>
|
||||||
{
|
{
|
||||||
|
trace!("pop_frame");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
trace!("pop frame");
|
|
||||||
match self.pending_send.pop(store) {
|
match self.pending_send.pop(store) {
|
||||||
Some(mut stream) => {
|
Some(mut stream) => {
|
||||||
|
trace!("pop_frame; stream={:?}", stream.id);
|
||||||
|
|
||||||
let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() {
|
let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() {
|
||||||
Frame::Data(mut frame) => {
|
Frame::Data(mut frame) => {
|
||||||
// Get the amount of capacity remaining for stream's
|
// Get the amount of capacity remaining for stream's
|
||||||
@@ -459,6 +469,8 @@ impl<B> Prioritize<B>
|
|||||||
frame => frame.map(|_| unreachable!()),
|
frame => frame.map(|_| unreachable!()),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
trace!("pop_frame; frame={:?}", frame);
|
||||||
|
|
||||||
if !stream.pending_send.is_empty() {
|
if !stream.pending_send.is_empty() {
|
||||||
// TODO: Only requeue the sender IF it is ready to send
|
// TODO: Only requeue the sender IF it is ready to send
|
||||||
// the next frame. i.e. don't requeue it if the next
|
// the next frame. i.e. don't requeue it if the next
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ pub(super) struct Recv<B> {
|
|||||||
pending_window_updates: store::Queue<B, stream::NextWindowUpdate>,
|
pending_window_updates: store::Queue<B, stream::NextWindowUpdate>,
|
||||||
|
|
||||||
/// New streams to be accepted
|
/// New streams to be accepted
|
||||||
pending_accept: store::Queue<B, stream::Next>,
|
pending_accept: store::Queue<B, stream::NextAccept>,
|
||||||
|
|
||||||
/// Holds frames that are waiting to be read
|
/// Holds frames that are waiting to be read
|
||||||
buffer: Buffer<Bytes>,
|
buffer: Buffer<Bytes>,
|
||||||
|
|||||||
@@ -115,12 +115,17 @@ impl<B> Send<B> where B: Buf {
|
|||||||
-> Result<(), ConnectionError>
|
-> Result<(), ConnectionError>
|
||||||
{
|
{
|
||||||
if stream.state.is_closed() {
|
if stream.state.is_closed() {
|
||||||
|
debug!("send_reset; invalid stream ID");
|
||||||
return Err(InactiveStreamId.into())
|
return Err(InactiveStreamId.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
stream.state.send_reset(reason)?;
|
stream.state.send_reset(reason)?;
|
||||||
|
|
||||||
let frame = frame::Reset::new(stream.id, reason);
|
let frame = frame::Reset::new(stream.id, reason);
|
||||||
|
|
||||||
|
self.prioritize.clear_queue(stream);
|
||||||
|
|
||||||
|
trace!("send_reset -- queueing; frame={:?}", frame);
|
||||||
self.prioritize.queue_frame(frame.into(), stream, task);
|
self.prioritize.queue_frame(frame.into(), stream, task);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -189,17 +194,22 @@ impl<B> Send<B> where B: Buf {
|
|||||||
|
|
||||||
pub fn recv_stream_window_update(&mut self,
|
pub fn recv_stream_window_update(&mut self,
|
||||||
sz: WindowSize,
|
sz: WindowSize,
|
||||||
stream: &mut store::Ptr<B>)
|
stream: &mut store::Ptr<B>,
|
||||||
|
task: &mut Option<Task>)
|
||||||
|
-> Result<(), ConnectionError>
|
||||||
{
|
{
|
||||||
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
|
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
|
||||||
// TODO: Send reset
|
debug!("recv_stream_window_update !!; err={:?}", e);
|
||||||
unimplemented!();
|
self.send_reset(FlowControlError.into(), stream, task)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn apply_remote_settings(&mut self,
|
pub fn apply_remote_settings(&mut self,
|
||||||
settings: &frame::Settings,
|
settings: &frame::Settings,
|
||||||
store: &mut Store<B>)
|
store: &mut Store<B>,
|
||||||
|
task: &mut Option<Task>)
|
||||||
{
|
{
|
||||||
if let Some(val) = settings.max_concurrent_streams() {
|
if let Some(val) = settings.max_concurrent_streams() {
|
||||||
self.max_streams = Some(val as usize);
|
self.max_streams = Some(val as usize);
|
||||||
@@ -245,7 +255,7 @@ impl<B> Send<B> where B: Buf {
|
|||||||
let inc = val - old_val;
|
let inc = val - old_val;
|
||||||
|
|
||||||
store.for_each(|mut stream| {
|
store.for_each(|mut stream| {
|
||||||
self.recv_stream_window_update(inc, &mut stream);
|
self.recv_stream_window_update(inc, &mut stream, task);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -173,7 +173,10 @@ impl<B, N> Queue<B, N>
|
|||||||
///
|
///
|
||||||
/// If the stream is already contained by the list, return `false`.
|
/// If the stream is already contained by the list, return `false`.
|
||||||
pub fn push(&mut self, stream: &mut store::Ptr<B>) -> bool {
|
pub fn push(&mut self, stream: &mut store::Ptr<B>) -> bool {
|
||||||
|
trace!("Queue::push");
|
||||||
|
|
||||||
if N::is_queued(stream) {
|
if N::is_queued(stream) {
|
||||||
|
trace!(" -> already queued");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -185,6 +188,8 @@ impl<B, N> Queue<B, N>
|
|||||||
// Queue the stream
|
// Queue the stream
|
||||||
match self.indices {
|
match self.indices {
|
||||||
Some(ref mut idxs) => {
|
Some(ref mut idxs) => {
|
||||||
|
trace!(" -> existing entries");
|
||||||
|
|
||||||
// Update the current tail node to point to `stream`
|
// Update the current tail node to point to `stream`
|
||||||
let key = stream.key();
|
let key = stream.key();
|
||||||
N::set_next(&mut stream.resolve(idxs.tail), Some(key));
|
N::set_next(&mut stream.resolve(idxs.tail), Some(key));
|
||||||
@@ -193,6 +198,7 @@ impl<B, N> Queue<B, N>
|
|||||||
idxs.tail = stream.key();
|
idxs.tail = stream.key();
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
trace!(" -> first entry");
|
||||||
self.indices = Some(store::Indices {
|
self.indices = Some(store::Indices {
|
||||||
head: stream.key(),
|
head: stream.key(),
|
||||||
tail: stream.key(),
|
tail: stream.key(),
|
||||||
|
|||||||
@@ -8,20 +8,14 @@ pub(super) struct Stream<B> {
|
|||||||
/// Current state of the stream
|
/// Current state of the stream
|
||||||
pub state: State,
|
pub state: State,
|
||||||
|
|
||||||
/// Next node in the `Stream` linked list.
|
|
||||||
///
|
|
||||||
/// This field is used in different linked lists depending on the stream
|
|
||||||
/// state. First, it is used as part of the linked list of streams waiting
|
|
||||||
/// to be accepted (either by a server or by a client as a push promise).
|
|
||||||
/// Once the stream is accepted, this is used for the linked list of streams
|
|
||||||
/// waiting to flush prioritized frames to the socket.
|
|
||||||
pub next: Option<store::Key>,
|
|
||||||
|
|
||||||
/// Set to true when the stream is queued
|
|
||||||
pub is_queued: bool,
|
|
||||||
|
|
||||||
// ===== Fields related to sending =====
|
// ===== Fields related to sending =====
|
||||||
|
|
||||||
|
/// Next node in the accept linked list
|
||||||
|
pub next_pending_send: Option<store::Key>,
|
||||||
|
|
||||||
|
/// Set to true when the stream is pending accept
|
||||||
|
pub is_pending_send: bool,
|
||||||
|
|
||||||
/// Send data flow control
|
/// Send data flow control
|
||||||
pub send_flow: FlowControl,
|
pub send_flow: FlowControl,
|
||||||
|
|
||||||
@@ -49,6 +43,12 @@ pub(super) struct Stream<B> {
|
|||||||
|
|
||||||
// ===== Fields related to receiving =====
|
// ===== Fields related to receiving =====
|
||||||
|
|
||||||
|
/// Next node in the accept linked list
|
||||||
|
pub next_pending_accept: Option<store::Key>,
|
||||||
|
|
||||||
|
/// Set to true when the stream is pending accept
|
||||||
|
pub is_pending_accept: bool,
|
||||||
|
|
||||||
/// Receive data flow control
|
/// Receive data flow control
|
||||||
pub recv_flow: FlowControl,
|
pub recv_flow: FlowControl,
|
||||||
|
|
||||||
@@ -67,11 +67,14 @@ pub(super) struct Stream<B> {
|
|||||||
pub recv_task: Option<task::Task>,
|
pub recv_task: Option<task::Task>,
|
||||||
|
|
||||||
/// The stream's pending push promises
|
/// The stream's pending push promises
|
||||||
pub pending_push_promises: store::Queue<B, Next>,
|
pub pending_push_promises: store::Queue<B, NextAccept>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(super) struct Next;
|
pub(super) struct NextAccept;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub(super) struct NextSend;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(super) struct NextSendCapacity;
|
pub(super) struct NextSendCapacity;
|
||||||
@@ -85,11 +88,11 @@ impl<B> Stream<B> {
|
|||||||
Stream {
|
Stream {
|
||||||
id,
|
id,
|
||||||
state: State::default(),
|
state: State::default(),
|
||||||
next: None,
|
|
||||||
is_queued: false,
|
|
||||||
|
|
||||||
// ===== Fields related to sending =====
|
// ===== Fields related to sending =====
|
||||||
|
|
||||||
|
next_pending_send: None,
|
||||||
|
is_pending_send: false,
|
||||||
send_flow: FlowControl::new(),
|
send_flow: FlowControl::new(),
|
||||||
requested_send_capacity: 0,
|
requested_send_capacity: 0,
|
||||||
buffered_send_data: 0,
|
buffered_send_data: 0,
|
||||||
@@ -101,6 +104,8 @@ impl<B> Stream<B> {
|
|||||||
|
|
||||||
// ===== Fields related to receiving =====
|
// ===== Fields related to receiving =====
|
||||||
|
|
||||||
|
next_pending_accept: None,
|
||||||
|
is_pending_accept: false,
|
||||||
recv_flow: FlowControl::new(),
|
recv_flow: FlowControl::new(),
|
||||||
in_flight_recv_data: 0,
|
in_flight_recv_data: 0,
|
||||||
next_window_update: None,
|
next_window_update: None,
|
||||||
@@ -135,25 +140,47 @@ impl<B> Stream<B> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl store::Next for Next {
|
impl store::Next for NextAccept {
|
||||||
fn next<B>(stream: &Stream<B>) -> Option<store::Key> {
|
fn next<B>(stream: &Stream<B>) -> Option<store::Key> {
|
||||||
stream.next
|
stream.next_pending_accept
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_next<B>(stream: &mut Stream<B>, key: Option<store::Key>) {
|
fn set_next<B>(stream: &mut Stream<B>, key: Option<store::Key>) {
|
||||||
stream.next = key;
|
stream.next_pending_accept = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn take_next<B>(stream: &mut Stream<B>) -> Option<store::Key> {
|
fn take_next<B>(stream: &mut Stream<B>) -> Option<store::Key> {
|
||||||
stream.next.take()
|
stream.next_pending_accept.take()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_queued<B>(stream: &Stream<B>) -> bool {
|
fn is_queued<B>(stream: &Stream<B>) -> bool {
|
||||||
stream.is_queued
|
stream.is_pending_accept
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_queued<B>(stream: &mut Stream<B>, val: bool) {
|
fn set_queued<B>(stream: &mut Stream<B>, val: bool) {
|
||||||
stream.is_queued = val;
|
stream.is_pending_accept = val;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl store::Next for NextSend {
|
||||||
|
fn next<B>(stream: &Stream<B>) -> Option<store::Key> {
|
||||||
|
stream.next_pending_send
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_next<B>(stream: &mut Stream<B>, key: Option<store::Key>) {
|
||||||
|
stream.next_pending_send = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn take_next<B>(stream: &mut Stream<B>) -> Option<store::Key> {
|
||||||
|
stream.next_pending_send.take()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_queued<B>(stream: &Stream<B>) -> bool {
|
||||||
|
stream.is_pending_send
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_queued<B>(stream: &mut Stream<B>, val: bool) {
|
||||||
|
stream.is_pending_send = val;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -173,7 +173,7 @@ impl<B> Streams<B>
|
|||||||
// considers closed. It's ok...
|
// considers closed. It's ok...
|
||||||
if let Some(mut stream) = me.store.find_mut(&id) {
|
if let Some(mut stream) = me.store.find_mut(&id) {
|
||||||
me.actions.send.recv_stream_window_update(
|
me.actions.send.recv_stream_window_update(
|
||||||
frame.size_increment(), &mut stream);
|
frame.size_increment(), &mut stream, &mut me.actions.task);
|
||||||
} else {
|
} else {
|
||||||
me.actions.recv.ensure_not_idle(id)?;
|
me.actions.recv.ensure_not_idle(id)?;
|
||||||
}
|
}
|
||||||
@@ -249,7 +249,8 @@ impl<B> Streams<B>
|
|||||||
let mut me = self.inner.lock().unwrap();
|
let mut me = self.inner.lock().unwrap();
|
||||||
let me = &mut *me;
|
let me = &mut *me;
|
||||||
|
|
||||||
me.actions.send.apply_remote_settings(frame, &mut me.store);
|
me.actions.send.apply_remote_settings(
|
||||||
|
frame, &mut me.store, &mut me.actions.task);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> {
|
pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> {
|
||||||
|
|||||||
Reference in New Issue
Block a user