From cf62b783e0bfe7b183cdef78874c5ca6e417c59e Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 9 May 2018 15:03:21 -0700 Subject: [PATCH] Misc bug fixes related to stream state (#273) This patch includes two new significant debug assertions: * Assert stream counts are zero when the connection finalizes. * Assert all stream state has been released when the connection is dropped. These two assertions were added in an effort to test the fix provided by #261. In doing so, many related bugs have been discovered and fixed. The details related to these bugs can be found in #273. --- src/proto/connection.rs | 15 +++- src/proto/streams/counts.rs | 50 +++++++++-- src/proto/streams/prioritize.rs | 78 +++++++++++------ src/proto/streams/recv.rs | 106 ++++++++++++++-------- src/proto/streams/send.rs | 44 +++++++--- src/proto/streams/state.rs | 12 --- src/proto/streams/store.rs | 17 ++++ src/proto/streams/stream.rs | 11 ++- src/proto/streams/streams.rs | 121 +++++++++++++++++--------- tests/h2-support/src/frames.rs | 7 ++ tests/h2-tests/tests/stream_states.rs | 2 + 11 files changed, 319 insertions(+), 144 deletions(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 55c4d85..e2e9f4b 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -178,7 +178,6 @@ where } } - /// Advances the internal state of the connection. pub fn poll(&mut self) -> Poll<(), proto::Error> { use codec::RecvError::*; @@ -341,7 +340,8 @@ where }, None => { trace!("codec closed"); - self.streams.recv_eof(); + self.streams.recv_eof(false) + .ok().expect("mutex poisoned"); return Ok(Async::Ready(())); }, } @@ -397,3 +397,14 @@ where self.ping_pong.ping_shutdown(); } } + +impl Drop for Connection +where + P: Peer, + B: IntoBuf, +{ + fn drop(&mut self) { + // Ignore errors as this indicates that the mutex is poisoned. + let _ = self.streams.recv_eof(true); + } +} diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index 2d54279..e027426 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -60,11 +60,13 @@ impl Counts { /// # Panics /// /// Panics on failure as this should have been validated before hand. - pub fn inc_num_recv_streams(&mut self) { + pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) { assert!(self.can_inc_num_recv_streams()); + assert!(!stream.is_counted); // Increment the number of remote initiated streams self.num_recv_streams += 1; + stream.is_counted = true; } /// Returns true if the send stream concurrency can be incremented @@ -77,11 +79,13 @@ impl Counts { /// # Panics /// /// Panics on failure as this should have been validated before hand. - pub fn inc_num_send_streams(&mut self) { + pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) { assert!(self.can_inc_num_send_streams()); + assert!(!stream.is_counted); // Increment the number of remote initiated streams self.num_send_streams += 1; + stream.is_counted = true; } /// Returns true if the number of pending reset streams can be incremented. @@ -110,23 +114,36 @@ impl Counts { /// /// If the stream state transitions to closed, this function will perform /// all necessary cleanup. + /// + /// TODO: Is this function still needed? pub fn transition(&mut self, mut stream: store::Ptr, f: F) -> U where F: FnOnce(&mut Self, &mut store::Ptr) -> U, { - let is_counted = stream.is_counted(); + // TODO: Does this need to be computed before performing the action? let is_pending_reset = stream.is_pending_reset_expiration(); // Run the action let ret = f(self, &mut stream); - self.transition_after(stream, is_counted, is_pending_reset); + self.transition_after(stream, is_pending_reset); ret } // TODO: move this to macro? - pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool, is_reset_counted: bool) { + pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) { + trace!("transition_after; stream={:?}; state={:?}; is_closed={:?}; \ + pending_send_empty={:?}; buffered_send_data={}; \ + num_recv={}; num_send={}", + stream.id, + stream.state, + stream.is_closed(), + stream.pending_send.is_empty(), + stream.buffered_send_data, + self.num_recv_streams, + self.num_send_streams); + if stream.is_closed() { if !stream.is_pending_reset_expiration() { stream.unlink(); @@ -136,9 +153,10 @@ impl Counts { } } - if is_counted { + if stream.is_counted { + trace!("dec_num_streams; stream={:?}", stream.id); // Decrement the number of active streams. - self.dec_num_streams(stream.id); + self.dec_num_streams(&mut stream); } } @@ -148,13 +166,17 @@ impl Counts { } } - fn dec_num_streams(&mut self, id: StreamId) { - if self.peer.is_local_init(id) { + fn dec_num_streams(&mut self, stream: &mut store::Ptr) { + assert!(stream.is_counted); + + if self.peer.is_local_init(stream.id) { assert!(self.num_send_streams > 0); self.num_send_streams -= 1; + stream.is_counted = false; } else { assert!(self.num_recv_streams > 0); self.num_recv_streams -= 1; + stream.is_counted = false; } } @@ -163,3 +185,13 @@ impl Counts { self.num_reset_streams -= 1; } } + +impl Drop for Counts { + fn drop(&mut self) { + use std::thread; + + if !thread::panicking() { + debug_assert!(!self.has_streams()); + } + } +} diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 712a991..d26b614 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -134,6 +134,7 @@ impl Prioritize { frame: frame::Data, buffer: &mut Buffer>, stream: &mut store::Ptr, + counts: &mut Counts, task: &mut Option, ) -> Result<(), UserError> where @@ -176,7 +177,7 @@ impl Prioritize { if frame.is_end_stream() { stream.state.send_close(); - self.reserve_capacity(0, stream); + self.reserve_capacity(0, stream, counts); } trace!( @@ -210,7 +211,11 @@ impl Prioritize { } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { + pub fn reserve_capacity( + &mut self, + capacity: WindowSize, + stream: &mut store::Ptr, + counts: &mut Counts) { trace!( "reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}", stream.id, @@ -239,7 +244,7 @@ impl Prioritize { stream.send_flow.claim_capacity(diff); - self.assign_connection_capacity(diff, stream); + self.assign_connection_capacity(diff, stream, counts); } } else { // Update the target requested capacity @@ -284,36 +289,49 @@ impl Prioritize { &mut self, inc: WindowSize, store: &mut Store, + counts: &mut Counts, ) -> Result<(), Reason> { // Update the connection's window self.flow.inc_window(inc)?; - self.assign_connection_capacity(inc, store); + self.assign_connection_capacity(inc, store, counts); Ok(()) } /// Reclaim all capacity assigned to the stream and re-assign it to the /// connection - pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr) { + pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { let available = stream.send_flow.available().as_size(); stream.send_flow.claim_capacity(available); // Re-assign all capacity to the connection - self.assign_connection_capacity(available, stream); + self.assign_connection_capacity(available, stream, counts); } /// Reclaim just reserved capacity, not buffered capacity, and re-assign /// it to the connection - pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr) { + pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) { // only reclaim requested capacity that isn't already buffered if stream.requested_send_capacity > stream.buffered_send_data { let reserved = stream.requested_send_capacity - stream.buffered_send_data; stream.send_flow.claim_capacity(reserved); - self.assign_connection_capacity(reserved, stream); + self.assign_connection_capacity(reserved, stream, counts); } } - pub fn assign_connection_capacity(&mut self, inc: WindowSize, store: &mut R) + pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_capacity.pop(store) { + counts.transition(stream, |_, stream| { + trace!("clear_pending_capacity; stream={:?}", stream.id); + }) + } + } + + pub fn assign_connection_capacity( + &mut self, + inc: WindowSize, + store: &mut R, + counts: &mut Counts) where R: Resolve, { @@ -323,15 +341,17 @@ impl Prioritize { // Assign newly acquired capacity to streams pending capacity. while self.flow.available() > 0 { - let mut stream = match self.pending_capacity.pop(store) { + let stream = match self.pending_capacity.pop(store) { Some(stream) => stream, None => return, }; - // Try to assign capacity to the stream. This will also re-queue the - // stream if there isn't enough connection level capacity to fulfill - // the capacity request. - self.try_assign_capacity(&mut stream); + counts.transition(stream, |_, mut stream| { + // Try to assign capacity to the stream. This will also re-queue the + // stream if there isn't enough connection level capacity to fulfill + // the capacity request. + self.try_assign_capacity(&mut stream); + }) } } @@ -595,6 +615,13 @@ impl Prioritize { } } + pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_send.pop(store) { + let is_pending_reset = stream.is_pending_reset_expiration(); + counts.transition_after(stream, is_pending_reset); + } + } + fn pop_frame( &mut self, buffer: &mut Buffer>, @@ -613,21 +640,22 @@ impl Prioritize { trace!("pop_frame; stream={:?}; stream.state={:?}", stream.id, stream.state); - // If the stream receives a RESET from the peer, it may have - // had data buffered to be sent, but all the frames are cleared - // in clear_queue(). Instead of doing O(N) traversal through queue - // to remove, lets just ignore peer_reset streams here. - if stream.state.is_peer_reset() { - continue; - } - // It's possible that this stream, besides having data to send, // is also queued to send a reset, and thus is already in the queue // to wait for "some time" after a reset. // // To be safe, we just always ask the stream. - let is_counted = stream.is_counted(); let is_pending_reset = stream.is_pending_reset_expiration(); + + // If the stream receives a RESET from the peer, it may have + // had data buffered to be sent, but all the frames are cleared + // in clear_queue(). Instead of doing O(N) traversal through queue + // to remove, lets just ignore peer_reset streams here. + if stream.state.is_peer_reset() { + counts.transition_after(stream, is_pending_reset); + continue; + } + trace!(" --> stream={:?}; is_pending_reset={:?};", stream.id, is_pending_reset); @@ -754,7 +782,7 @@ impl Prioritize { self.pending_send.push(&mut stream); } - counts.transition_after(stream, is_counted, is_pending_reset); + counts.transition_after(stream, is_pending_reset); return Some(frame); }, @@ -770,7 +798,7 @@ impl Prioritize { if let Some(mut stream) = self.pending_open.pop(store) { trace!("schedule_pending_open; stream={:?}", stream.id); - counts.inc_num_send_streams(); + counts.inc_num_send_streams(&mut stream); self.pending_send.push(&mut stream); } else { return; diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index e27ec10..a4a3259 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -162,7 +162,7 @@ impl Recv { } // Increment the number of concurrent streams - counts.inc_num_recv_streams(); + counts.inc_num_recv_streams(stream); } if !stream.content_length.is_head() { @@ -680,12 +680,7 @@ impl Recv { // if max allow is 0, this won't be able to evict, // and then we'll just bail after if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) { - // It's possible that this stream is still sitting in a send queue, - // such as if some data is to be sent and then a CANCEL. In this case, - // it could still be "counted", so we just make sure to always ask the - // stream instead of assuming. - let is_counted = evicted.is_counted(); - counts.transition_after(evicted, is_counted, true); + counts.transition_after(evicted, true); } } @@ -728,14 +723,48 @@ impl Recv { let reset_at = stream.reset_at.expect("reset_at must be set if in queue"); now - reset_at > reset_duration }) { - let is_counted = stream.is_counted(); - counts.transition_after(stream, is_counted, true); + counts.transition_after(stream, true); + } + } + + pub fn clear_queues(&mut self, + clear_pending_accept: bool, + store: &mut Store, + counts: &mut Counts) + { + self.clear_stream_window_update_queue(store, counts); + self.clear_all_reset_streams(store, counts); + + if clear_pending_accept { + self.clear_all_pending_accept(store, counts); + } + } + + fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_window_updates.pop(store) { + counts.transition(stream, |_, stream| { + trace!("clear_stream_window_update_queue; stream={:?}", stream.id); + }) + } + } + + /// Called on EOF + fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_reset_expired.pop(store) { + counts.transition_after(stream, true); + } + } + + fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) { + while let Some(stream) = self.pending_accept.pop(store) { + counts.transition_after(stream, false); } } pub fn poll_complete( &mut self, store: &mut Store, + counts: &mut Counts, dst: &mut Codec>, ) -> Poll<(), io::Error> where @@ -746,7 +775,7 @@ impl Recv { try_ready!(self.send_connection_window_update(dst)); // Send any pending stream level window updates - try_ready!(self.send_stream_window_updates(store, dst)); + try_ready!(self.send_stream_window_updates(store, counts, dst)); Ok(().into()) } @@ -781,11 +810,11 @@ impl Recv { Ok(().into()) } - /// Send stream level window update pub fn send_stream_window_updates( &mut self, store: &mut Store, + counts: &mut Counts, dst: &mut Codec>, ) -> Poll<(), io::Error> where @@ -797,38 +826,43 @@ impl Recv { try_ready!(dst.poll_ready()); // Get the next stream - let mut stream = match self.pending_window_updates.pop(store) { + let stream = match self.pending_window_updates.pop(store) { Some(stream) => stream, None => return Ok(().into()), }; - if !stream.state.is_recv_streaming() { - // No need to send window updates on the stream if the stream is - // no longer receiving data. - // - // TODO: is this correct? We could possibly send a window - // update on a ReservedRemote stream if we already know - // we want to stream the data faster... - continue; - } + counts.transition(stream, |_, stream| { + trace!("pending_window_updates -- pop; stream={:?}", stream.id); + debug_assert!(!stream.is_pending_window_update); - // TODO: de-dup - if let Some(incr) = stream.recv_flow.unclaimed_capacity() { - // Create the WINDOW_UPDATE frame - let frame = frame::WindowUpdate::new(stream.id, incr); + if !stream.state.is_recv_streaming() { + // No need to send window updates on the stream if the stream is + // no longer receiving data. + // + // TODO: is this correct? We could possibly send a window + // update on a ReservedRemote stream if we already know + // we want to stream the data faster... + return; + } - // Buffer it - dst.buffer(frame.into()) - .ok() - .expect("invalid WINDOW_UPDATE frame"); + // TODO: de-dup + if let Some(incr) = stream.recv_flow.unclaimed_capacity() { + // Create the WINDOW_UPDATE frame + let frame = frame::WindowUpdate::new(stream.id, incr); - // Update flow control - stream - .recv_flow - .inc_window(incr) - .ok() - .expect("unexpected flow control state"); - } + // Buffer it + dst.buffer(frame.into()) + .ok() + .expect("invalid WINDOW_UPDATE frame"); + + // Update flow control + stream + .recv_flow + .inc_window(incr) + .ok() + .expect("unexpected flow control state"); + } + }) } } diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index b3f4716..9a98d0a 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -80,7 +80,7 @@ impl Send { if counts.peer().is_local_init(frame.stream_id()) { if counts.can_inc_num_send_streams() { - counts.inc_num_send_streams(); + counts.inc_num_send_streams(stream); } else { self.prioritize.queue_open(stream); } @@ -104,6 +104,7 @@ impl Send { reason: Reason, buffer: &mut Buffer>, stream: &mut store::Ptr, + counts: &mut Counts, task: &mut Option, ) { let is_reset = stream.state.is_reset(); @@ -146,7 +147,7 @@ impl Send { return; } - self.recv_err(buffer, stream); + self.recv_err(buffer, stream, counts); let frame = frame::Reset::new(stream.id, reason); @@ -158,6 +159,7 @@ impl Send { &mut self, stream: &mut store::Ptr, reason: Reason, + counts: &mut Counts, task: &mut Option, ) { if stream.state.is_closed() { @@ -167,7 +169,7 @@ impl Send { stream.state.set_scheduled_reset(reason); - self.prioritize.reclaim_reserved_capacity(stream); + self.prioritize.reclaim_reserved_capacity(stream, counts); self.prioritize.schedule_send(stream, task); } @@ -176,11 +178,12 @@ impl Send { frame: frame::Data, buffer: &mut Buffer>, stream: &mut store::Ptr, + counts: &mut Counts, task: &mut Option, ) -> Result<(), UserError> where B: Buf, { - self.prioritize.send_data(frame, buffer, stream, task) + self.prioritize.send_data(frame, buffer, stream, counts, task) } pub fn send_trailers( @@ -188,6 +191,7 @@ impl Send { frame: frame::Headers, buffer: &mut Buffer>, stream: &mut store::Ptr, + counts: &mut Counts, task: &mut Option, ) -> Result<(), UserError> { // TODO: Should this logic be moved into state.rs? @@ -201,7 +205,7 @@ impl Send { self.prioritize.queue_frame(frame.into(), buffer, stream, task); // Release any excess capacity - self.prioritize.reserve_capacity(0, stream); + self.prioritize.reserve_capacity(0, stream, counts); Ok(()) } @@ -220,8 +224,13 @@ impl Send { } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { - self.prioritize.reserve_capacity(capacity, stream) + pub fn reserve_capacity( + &mut self, + capacity: WindowSize, + stream: &mut store::Ptr, + counts: &mut Counts) + { + self.prioritize.reserve_capacity(capacity, stream, counts) } pub fn poll_capacity( @@ -258,9 +267,10 @@ impl Send { &mut self, frame: frame::WindowUpdate, store: &mut Store, + counts: &mut Counts, ) -> Result<(), Reason> { self.prioritize - .recv_connection_window_update(frame.size_increment(), store) + .recv_connection_window_update(frame.size_increment(), store, counts) } pub fn recv_stream_window_update( @@ -268,6 +278,7 @@ impl Send { sz: WindowSize, buffer: &mut Buffer>, stream: &mut store::Ptr, + counts: &mut Counts, task: &mut Option, ) -> Result<(), Reason> { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { @@ -275,7 +286,7 @@ impl Send { self.send_reset( Reason::FLOW_CONTROL_ERROR.into(), - buffer, stream, task); + buffer, stream, counts, task); return Err(e); } @@ -295,11 +306,12 @@ impl Send { pub fn recv_err( &mut self, buffer: &mut Buffer>, - stream: &mut store::Ptr + stream: &mut store::Ptr, + counts: &mut Counts, ) { // Clear all pending outbound frames self.prioritize.clear_queue(buffer, stream); - self.prioritize.reclaim_all_capacity(stream); + self.prioritize.reclaim_all_capacity(stream, counts); } pub fn apply_remote_settings( @@ -307,6 +319,7 @@ impl Send { settings: &frame::Settings, buffer: &mut Buffer>, store: &mut Store, + counts: &mut Counts, task: &mut Option, ) -> Result<(), RecvError> { // Applies an update to the remote endpoint's initial window size. @@ -361,12 +374,12 @@ impl Send { })?; self.prioritize - .assign_connection_capacity(total_reclaimed, store); + .assign_connection_capacity(total_reclaimed, store, counts); } else if val > old_val { let inc = val - old_val; store.for_each(|mut stream| { - self.recv_stream_window_update(inc, buffer, &mut stream, task) + self.recv_stream_window_update(inc, buffer, &mut stream, counts, task) .map_err(RecvError::Connection) })?; } @@ -375,6 +388,11 @@ impl Send { Ok(()) } + pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) { + self.prioritize.clear_pending_capacity(store, counts); + self.prioritize.clear_pending_send(store, counts); + } + pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { if let Ok(next) = self.next_stream_id { if id >= next { diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 6dcec60..f994e35 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -341,18 +341,6 @@ impl State { } } - /// Returns true if a stream is open or half-closed. - pub fn is_at_least_half_open(&self) -> bool { - match self.inner { - Open { - .. - } => true, - HalfClosedLocal(..) => true, - HalfClosedRemote(..) => true, - _ => false, - } - } - pub fn is_send_streaming(&self) -> bool { match self.inner { Open { diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 121b291..f66906a 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -4,6 +4,7 @@ use slab; use indexmap::{self, IndexMap}; +use std::fmt; use std::marker::PhantomData; use std::ops; @@ -202,6 +203,16 @@ impl Store { } } +impl Drop for Store { + fn drop(&mut self) { + use std::thread; + + if !thread::panicking() { + debug_assert!(self.slab.is_empty()); + } + } +} + // ===== impl Queue ===== impl Queue @@ -356,6 +367,12 @@ impl<'a> ops::DerefMut for Ptr<'a> { } } +impl<'a> fmt::Debug for Ptr<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + (**self).fmt(fmt) + } +} + // ===== impl OccupiedEntry ===== impl<'a> OccupiedEntry<'a> { diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index fa7d4b7..fa7079f 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -22,6 +22,10 @@ pub(super) struct Stream { /// Current state of the stream pub state: State, + /// Set to `true` when the stream is counted against the connection's max + /// concurrent streams. + pub is_counted: bool, + /// Number of outstanding handles pointing to this stream pub ref_count: usize, @@ -151,6 +155,7 @@ impl Stream { id, state: State::default(), ref_count: 0, + is_counted: false, // ===== Fields related to sending ===== next_pending_send: None, @@ -194,12 +199,6 @@ impl Stream { self.ref_count -= 1; } - /// Returns true if a stream with the current state counts against the - /// concurrency limit. - pub fn is_counted(&self) -> bool { - !self.is_pending_open && self.state.is_at_least_half_open() - } - /// Returns true if stream is currently being held for some time because of /// a local reset. pub fn is_pending_reset_expiration(&self) -> bool { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 8c5de28..3954852 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -180,6 +180,7 @@ where actions.send.schedule_implicit_reset( stream, Reason::REFUSED_STREAM, + counts, &mut actions.task); actions.recv.enqueue_reset_expiration(stream, counts); Ok(()) @@ -201,7 +202,7 @@ where actions.recv.recv_trailers(frame, stream) }; - actions.reset_on_recv_stream_err(send_buffer, stream, res) + actions.reset_on_recv_stream_err(send_buffer, stream, counts, res) }) } @@ -228,7 +229,7 @@ where let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - me.counts.transition(stream, |_, stream| { + me.counts.transition(stream, |counts, stream| { let sz = frame.payload().len(); let res = actions.recv.recv_data(frame, stream); @@ -239,7 +240,7 @@ where actions.recv.release_connection_capacity(sz as WindowSize, &mut None); } - actions.reset_on_recv_stream_err(send_buffer, stream, res) + actions.reset_on_recv_stream_err(send_buffer, stream, counts, res) }) } @@ -297,9 +298,9 @@ where me.store .for_each(|stream| { - counts.transition(stream, |_, stream| { + counts.transition(stream, |counts, stream| { actions.recv.recv_err(err, &mut *stream); - actions.send.recv_err(send_buffer, stream); + actions.send.recv_err(send_buffer, stream, counts); Ok::<_, ()>(()) }) }) @@ -337,9 +338,9 @@ where me.store .for_each(|stream| if stream.id > last_stream_id { - counts.transition(stream, |_, stream| { + counts.transition(stream, |counts, stream| { actions.recv.recv_err(&err, &mut *stream); - actions.send.recv_err(send_buffer, stream); + actions.send.recv_err(send_buffer, stream, counts); Ok::<_, ()>(()) }) } else { @@ -352,27 +353,6 @@ where Ok(()) } - pub fn recv_eof(&mut self) { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - let actions = &mut me.actions; - let counts = &mut me.counts; - - if actions.conn_error.is_none() { - actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into()); - } - - me.store - .for_each(|stream| { - counts.transition(stream, |_, stream| { - actions.recv.recv_eof(stream); - Ok::<_, ()>(()) - }) - }) - .expect("recv_eof"); - } - pub fn last_processed_id(&self) -> StreamId { self.inner.lock().unwrap().actions.recv.last_processed_id() } @@ -388,7 +368,7 @@ where if id.is_zero() { me.actions .send - .recv_connection_window_update(frame, &mut me.store) + .recv_connection_window_update(frame, &mut me.store, &mut me.counts) .map_err(RecvError::Connection)?; } else { // The remote may send window updates for streams that the local now @@ -401,6 +381,7 @@ where frame.size_increment(), send_buffer, &mut stream, + &mut me.counts, &mut me.actions.task, ); } else { @@ -443,7 +424,11 @@ where if let Some(ref mut new_stream) = me.store.find_mut(&promised_id) { let mut send_buffer = self.send_buffer.inner.lock().unwrap(); - me.actions.reset_on_recv_stream_err(&mut *send_buffer, new_stream, Err(err)) + me.actions.reset_on_recv_stream_err( + &mut *send_buffer, + new_stream, + &mut me.counts, + Err(err)) } else { // If there was a stream error, the stream should have been stored // so we can track sending a reset. @@ -519,7 +504,7 @@ where // // TODO: It would probably be better to interleave updates w/ data // frames. - try_ready!(me.actions.recv.poll_complete(&mut me.store, dst)); + try_ready!(me.actions.recv.poll_complete(&mut me.store, &mut me.counts, dst)); // Send any other pending frames try_ready!(me.actions.send.poll_complete( @@ -545,7 +530,7 @@ where me.counts.apply_remote_settings(frame); me.actions.send.apply_remote_settings( - frame, send_buffer, &mut me.store, &mut me.actions.task) + frame, send_buffer, &mut me.store, &mut me.counts, &mut me.actions.task) } pub fn send_request( @@ -665,7 +650,7 @@ where me.counts.transition(stream, |counts, stream| { actions.send.send_reset( - reason, send_buffer, stream, &mut actions.task); + reason, send_buffer, stream, counts, &mut actions.task); actions.recv.enqueue_reset_expiration(stream, counts) }) } @@ -705,6 +690,41 @@ impl Streams where P: Peer, { + /// This function is safe to call multiple times. + /// + /// A `Result` is returned to avoid panicking if the mutex is poisoned. + pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> { + let mut me = self.inner.lock().map_err(|_| ())?; + let me = &mut *me; + + let actions = &mut me.actions; + let counts = &mut me.counts; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + + if actions.conn_error.is_none() { + actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into()); + } + + trace!("Streams::recv_eof"); + + me.store + .for_each(|stream| { + counts.transition(stream, |counts, stream| { + actions.recv.recv_eof(stream); + + // This handles resetting send state associated with the + // stream + actions.send.recv_err(send_buffer, stream, counts); + Ok::<_, ()>(()) + }) + }) + .expect("recv_eof"); + + actions.clear_queues(clear_pending_accept, &mut me.store, counts); + Ok(()) + } + pub fn num_active_streams(&self) -> usize { let me = self.inner.lock().unwrap(); me.store.num_active_streams() @@ -759,13 +779,18 @@ impl StreamRef { let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - me.counts.transition(stream, |_, stream| { + me.counts.transition(stream, |counts, stream| { // Create the data frame let mut frame = frame::Data::new(stream.id, data); frame.set_end_stream(end_stream); // Send the data frame - actions.send.send_data(frame, send_buffer, stream, &mut actions.task) + actions.send.send_data( + frame, + send_buffer, + stream, + counts, + &mut actions.task) }) } @@ -778,13 +803,13 @@ impl StreamRef { let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - me.counts.transition(stream, |_, stream| { + me.counts.transition(stream, |counts, stream| { // Create the trailers frame let frame = frame::Headers::trailers(stream.id, trailers); // Send the trailers frame actions.send.send_trailers( - frame, send_buffer, stream, &mut actions.task) + frame, send_buffer, stream, counts, &mut actions.task) }) } @@ -797,9 +822,9 @@ impl StreamRef { let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - me.counts.transition(stream, |_, stream| { + me.counts.transition(stream, |counts, stream| { actions.send.send_reset( - reason, send_buffer, stream, &mut actions.task) + reason, send_buffer, stream, counts, &mut actions.task) }) } @@ -852,7 +877,7 @@ impl StreamRef { let mut stream = me.store.resolve(self.opaque.key); - me.actions.send.reserve_capacity(capacity, &mut stream) + me.actions.send.reserve_capacity(capacity, &mut stream, &mut me.counts) } /// Returns the stream's current send capacity. @@ -1008,6 +1033,9 @@ fn drop_stream_ref(inner: &Mutex, key: store::Key) { let me = &mut *me; let mut stream = me.store.resolve(key); + + trace!("drop_stream_ref; stream={:?}", stream); + // decrement the stream's ref count by 1. stream.ref_dec(); @@ -1042,6 +1070,7 @@ fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Cou actions.send.schedule_implicit_reset( stream, Reason::CANCEL, + counts, &mut actions.task); actions.recv.enqueue_reset_expiration(stream, counts); } @@ -1063,6 +1092,7 @@ impl Actions { &mut self, buffer: &mut Buffer>, stream: &mut store::Ptr, + counts: &mut Counts, res: Result<(), RecvError>, ) -> Result<(), RecvError> { if let Err(RecvError::Stream { @@ -1070,7 +1100,7 @@ impl Actions { }) = res { // Reset the stream. - self.send.send_reset(reason, buffer, stream, &mut self.task); + self.send.send_reset(reason, buffer, stream, counts, &mut self.task); Ok(()) } else { res @@ -1092,4 +1122,13 @@ impl Actions { Ok(()) } } + + fn clear_queues(&mut self, + clear_pending_accept: bool, + store: &mut Store, + counts: &mut Counts) + { + self.recv.clear_queues(clear_pending_accept, store, counts); + self.send.clear_queues(store, counts); + } } diff --git a/tests/h2-support/src/frames.rs b/tests/h2-support/src/frames.rs index 4a50ff0..3f52b38 100644 --- a/tests/h2-support/src/frames.rs +++ b/tests/h2-support/src/frames.rs @@ -258,6 +258,13 @@ impl Mock { frame::Reason::FRAME_SIZE_ERROR, )) } + + pub fn no_error(self) -> Self { + Mock(frame::GoAway::new( + self.0.last_stream_id(), + frame::Reason::NO_ERROR, + )) + } } impl From> for SendFrame { diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index 5e1bfc8..3810492 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -839,6 +839,7 @@ fn rst_while_closing() { // Send the RST_STREAM frame which causes the client to panic. .send_frame(frames::reset(1).cancel()) .ping_pong([1; 8]) + .recv_frame(frames::go_away(0).no_error()) .close(); ; @@ -1038,6 +1039,7 @@ fn send_err_with_buffered_data() { .recv_frame( frames::data(1, vec![0; 16_384])) .recv_frame(frames::reset(1).cancel()) + .recv_frame(frames::go_away(0).no_error()) .close() ;