From 66dbde92ef55b0016e4a1e6fa117a311e49f3cbd Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 23 Aug 2017 20:35:53 -0700 Subject: [PATCH] Do not reuse next ptr for multiple linked lists Because, you might think that each linked list has exclusive access to the next pointer, but then there is an edge case that proves otherwise. Also, debugging this kind of thing is annoying. --- examples/server.rs | 15 ++++--- src/proto/streams/prioritize.rs | 24 ++++++++--- src/proto/streams/recv.rs | 2 +- src/proto/streams/send.rs | 20 +++++++--- src/proto/streams/store.rs | 6 +++ src/proto/streams/stream.rs | 71 +++++++++++++++++++++++---------- src/proto/streams/streams.rs | 5 ++- 7 files changed, 101 insertions(+), 42 deletions(-) diff --git a/examples/server.rs b/examples/server.rs index cee18dc..5c4ea90 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -33,9 +33,7 @@ pub fn main() { let connection = Server::handshake(socket) - .then(|res| { - let conn = res.unwrap(); - + .and_then(|conn| { println!("H2 connection bound"); conn.for_each(|(request, mut stream)| { @@ -45,12 +43,14 @@ pub fn main() { .status(status::OK) .body(()).unwrap(); - if let Err(e) = stream.send_response(response, true) { + if let Err(e) = stream.send_response(response, false) { println!(" error responding; err={:?}", e); } println!(">>>> sending data"); - stream.send_data(Bytes::from_static(b"hello world"), true).unwrap(); + if let Err(e) = stream.send_data(Bytes::from_static(b"hello world"), true) { + println!(" -> err={:?}", e); + } Ok(()) }).and_then(|_| { @@ -59,7 +59,10 @@ pub fn main() { }) }) .then(|res| { - let _ = res.unwrap(); + if let Err(e) = res { + println!(" -> err={:?}", e); + } + Ok(()) }) ; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index cdaad1f..e06f492 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -7,7 +7,7 @@ use std::{fmt, cmp}; #[derive(Debug)] pub(super) struct Prioritize { /// Queue of streams waiting for socket capacity to send a frame - pending_send: store::Queue, + pending_send: store::Queue, /// Queue of streams waiting for window capacity to produce data. pending_capacity: store::Queue, @@ -125,8 +125,6 @@ impl Prioritize // don't notify the conneciton task. Once additional capacity // becomes available, the frame will be flushed. stream.pending_send.push_back(&mut self.buffer, frame.into()); - - debug_assert!(stream.is_pending_send_capacity); } Ok(()) @@ -213,8 +211,8 @@ impl Prioritize total_requested - stream.send_flow.available(), stream.send_flow.window_size()); - trace!("try_assign_capacity; requested={}; additional={}; conn={}", - total_requested, additional, self.flow.available()); + trace!("try_assign_capacity; requested={}; additional={}; window={}; conn={}", + total_requested, additional, stream.send_flow.window_size(), self.flow.available()); if additional == 0 { // Nothing more to do @@ -373,13 +371,25 @@ impl Prioritize } } + pub fn clear_queue(&mut self, stream: &mut store::Ptr) { + trace!("clear_queue; stream-id={:?}", stream.id); + + // TODO: make this more efficient? + while let Some(frame) = stream.pending_send.pop_front(&mut self.buffer) { + trace!("dropping; frame={:?}", frame); + } + } + fn pop_frame(&mut self, store: &mut Store, max_len: usize) -> Option>> { + trace!("pop_frame"); + loop { - trace!("pop frame"); match self.pending_send.pop(store) { Some(mut stream) => { + trace!("pop_frame; stream={:?}", stream.id); + let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { Frame::Data(mut frame) => { // Get the amount of capacity remaining for stream's @@ -459,6 +469,8 @@ impl Prioritize frame => frame.map(|_| unreachable!()), }; + trace!("pop_frame; frame={:?}", frame); + if !stream.pending_send.is_empty() { // TODO: Only requeue the sender IF it is ready to send // the next frame. i.e. don't requeue it if the next diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index ad81030..a503ed5 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -31,7 +31,7 @@ pub(super) struct Recv { pending_window_updates: store::Queue, /// New streams to be accepted - pending_accept: store::Queue, + pending_accept: store::Queue, /// Holds frames that are waiting to be read buffer: Buffer, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index f470505..c329770 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -115,12 +115,17 @@ impl Send where B: Buf { -> Result<(), ConnectionError> { if stream.state.is_closed() { + debug!("send_reset; invalid stream ID"); return Err(InactiveStreamId.into()) } stream.state.send_reset(reason)?; let frame = frame::Reset::new(stream.id, reason); + + self.prioritize.clear_queue(stream); + + trace!("send_reset -- queueing; frame={:?}", frame); self.prioritize.queue_frame(frame.into(), stream, task); Ok(()) @@ -189,17 +194,22 @@ impl Send where B: Buf { pub fn recv_stream_window_update(&mut self, sz: WindowSize, - stream: &mut store::Ptr) + stream: &mut store::Ptr, + task: &mut Option) + -> Result<(), ConnectionError> { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { - // TODO: Send reset - unimplemented!(); + debug!("recv_stream_window_update !!; err={:?}", e); + self.send_reset(FlowControlError.into(), stream, task)?; } + + Ok(()) } pub fn apply_remote_settings(&mut self, settings: &frame::Settings, - store: &mut Store) + store: &mut Store, + task: &mut Option) { if let Some(val) = settings.max_concurrent_streams() { self.max_streams = Some(val as usize); @@ -245,7 +255,7 @@ impl Send where B: Buf { let inc = val - old_val; store.for_each(|mut stream| { - self.recv_stream_window_update(inc, &mut stream); + self.recv_stream_window_update(inc, &mut stream, task); }); } } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 7c04065..6388f77 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -173,7 +173,10 @@ impl Queue /// /// If the stream is already contained by the list, return `false`. pub fn push(&mut self, stream: &mut store::Ptr) -> bool { + trace!("Queue::push"); + if N::is_queued(stream) { + trace!(" -> already queued"); return false; } @@ -185,6 +188,8 @@ impl Queue // Queue the stream match self.indices { Some(ref mut idxs) => { + trace!(" -> existing entries"); + // Update the current tail node to point to `stream` let key = stream.key(); N::set_next(&mut stream.resolve(idxs.tail), Some(key)); @@ -193,6 +198,7 @@ impl Queue idxs.tail = stream.key(); } None => { + trace!(" -> first entry"); self.indices = Some(store::Indices { head: stream.key(), tail: stream.key(), diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 5bf2abd..4a10acc 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -8,20 +8,14 @@ pub(super) struct Stream { /// Current state of the stream pub state: State, - /// Next node in the `Stream` linked list. - /// - /// This field is used in different linked lists depending on the stream - /// state. First, it is used as part of the linked list of streams waiting - /// to be accepted (either by a server or by a client as a push promise). - /// Once the stream is accepted, this is used for the linked list of streams - /// waiting to flush prioritized frames to the socket. - pub next: Option, - - /// Set to true when the stream is queued - pub is_queued: bool, - // ===== Fields related to sending ===== + /// Next node in the accept linked list + pub next_pending_send: Option, + + /// Set to true when the stream is pending accept + pub is_pending_send: bool, + /// Send data flow control pub send_flow: FlowControl, @@ -49,6 +43,12 @@ pub(super) struct Stream { // ===== Fields related to receiving ===== + /// Next node in the accept linked list + pub next_pending_accept: Option, + + /// Set to true when the stream is pending accept + pub is_pending_accept: bool, + /// Receive data flow control pub recv_flow: FlowControl, @@ -67,11 +67,14 @@ pub(super) struct Stream { pub recv_task: Option, /// The stream's pending push promises - pub pending_push_promises: store::Queue, + pub pending_push_promises: store::Queue, } #[derive(Debug)] -pub(super) struct Next; +pub(super) struct NextAccept; + +#[derive(Debug)] +pub(super) struct NextSend; #[derive(Debug)] pub(super) struct NextSendCapacity; @@ -85,11 +88,11 @@ impl Stream { Stream { id, state: State::default(), - next: None, - is_queued: false, // ===== Fields related to sending ===== + next_pending_send: None, + is_pending_send: false, send_flow: FlowControl::new(), requested_send_capacity: 0, buffered_send_data: 0, @@ -101,6 +104,8 @@ impl Stream { // ===== Fields related to receiving ===== + next_pending_accept: None, + is_pending_accept: false, recv_flow: FlowControl::new(), in_flight_recv_data: 0, next_window_update: None, @@ -135,25 +140,47 @@ impl Stream { } } -impl store::Next for Next { +impl store::Next for NextAccept { fn next(stream: &Stream) -> Option { - stream.next + stream.next_pending_accept } fn set_next(stream: &mut Stream, key: Option) { - stream.next = key; + stream.next_pending_accept = key; } fn take_next(stream: &mut Stream) -> Option { - stream.next.take() + stream.next_pending_accept.take() } fn is_queued(stream: &Stream) -> bool { - stream.is_queued + stream.is_pending_accept } fn set_queued(stream: &mut Stream, val: bool) { - stream.is_queued = val; + stream.is_pending_accept = val; + } +} + +impl store::Next for NextSend { + fn next(stream: &Stream) -> Option { + stream.next_pending_send + } + + fn set_next(stream: &mut Stream, key: Option) { + stream.next_pending_send = key; + } + + fn take_next(stream: &mut Stream) -> Option { + stream.next_pending_send.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_send + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_send = val; } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 198df8d..0011f14 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -173,7 +173,7 @@ impl Streams // considers closed. It's ok... if let Some(mut stream) = me.store.find_mut(&id) { me.actions.send.recv_stream_window_update( - frame.size_increment(), &mut stream); + frame.size_increment(), &mut stream, &mut me.actions.task); } else { me.actions.recv.ensure_not_idle(id)?; } @@ -249,7 +249,8 @@ impl Streams let mut me = self.inner.lock().unwrap(); let me = &mut *me; - me.actions.send.apply_remote_settings(frame, &mut me.store); + me.actions.send.apply_remote_settings( + frame, &mut me.store, &mut me.actions.task); } pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> {