Misc bug fixes related to stream state (#273)

This patch includes two new significant debug assertions:

* Assert stream counts are zero when the connection finalizes.
* Assert all stream state has been released when the connection is 
  dropped.

These two assertions were added in an effort to test the fix provided
by #261. In doing so, many related bugs have been discovered and fixed.
The details related to these bugs can be found in #273.
This commit is contained in:
Carl Lerche
2018-05-09 15:03:21 -07:00
committed by GitHub
parent b4383b6a8c
commit cf62b783e0
11 changed files with 319 additions and 144 deletions

View File

@@ -178,7 +178,6 @@ where
}
}
/// Advances the internal state of the connection.
pub fn poll(&mut self) -> Poll<(), proto::Error> {
use codec::RecvError::*;
@@ -341,7 +340,8 @@ where
},
None => {
trace!("codec closed");
self.streams.recv_eof();
self.streams.recv_eof(false)
.ok().expect("mutex poisoned");
return Ok(Async::Ready(()));
},
}
@@ -397,3 +397,14 @@ where
self.ping_pong.ping_shutdown();
}
}
impl<T, P, B> Drop for Connection<T, P, B>
where
P: Peer,
B: IntoBuf,
{
fn drop(&mut self) {
// Ignore errors as this indicates that the mutex is poisoned.
let _ = self.streams.recv_eof(true);
}
}

View File

@@ -60,11 +60,13 @@ impl Counts {
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub fn inc_num_recv_streams(&mut self) {
pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
assert!(self.can_inc_num_recv_streams());
assert!(!stream.is_counted);
// Increment the number of remote initiated streams
self.num_recv_streams += 1;
stream.is_counted = true;
}
/// Returns true if the send stream concurrency can be incremented
@@ -77,11 +79,13 @@ impl Counts {
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub fn inc_num_send_streams(&mut self) {
pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
assert!(self.can_inc_num_send_streams());
assert!(!stream.is_counted);
// Increment the number of remote initiated streams
self.num_send_streams += 1;
stream.is_counted = true;
}
/// Returns true if the number of pending reset streams can be incremented.
@@ -110,23 +114,36 @@ impl Counts {
///
/// If the stream state transitions to closed, this function will perform
/// all necessary cleanup.
///
/// TODO: Is this function still needed?
pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
where
F: FnOnce(&mut Self, &mut store::Ptr) -> U,
{
let is_counted = stream.is_counted();
// TODO: Does this need to be computed before performing the action?
let is_pending_reset = stream.is_pending_reset_expiration();
// Run the action
let ret = f(self, &mut stream);
self.transition_after(stream, is_counted, is_pending_reset);
self.transition_after(stream, is_pending_reset);
ret
}
// TODO: move this to macro?
pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool, is_reset_counted: bool) {
pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
trace!("transition_after; stream={:?}; state={:?}; is_closed={:?}; \
pending_send_empty={:?}; buffered_send_data={}; \
num_recv={}; num_send={}",
stream.id,
stream.state,
stream.is_closed(),
stream.pending_send.is_empty(),
stream.buffered_send_data,
self.num_recv_streams,
self.num_send_streams);
if stream.is_closed() {
if !stream.is_pending_reset_expiration() {
stream.unlink();
@@ -136,9 +153,10 @@ impl Counts {
}
}
if is_counted {
if stream.is_counted {
trace!("dec_num_streams; stream={:?}", stream.id);
// Decrement the number of active streams.
self.dec_num_streams(stream.id);
self.dec_num_streams(&mut stream);
}
}
@@ -148,13 +166,17 @@ impl Counts {
}
}
fn dec_num_streams(&mut self, id: StreamId) {
if self.peer.is_local_init(id) {
fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
assert!(stream.is_counted);
if self.peer.is_local_init(stream.id) {
assert!(self.num_send_streams > 0);
self.num_send_streams -= 1;
stream.is_counted = false;
} else {
assert!(self.num_recv_streams > 0);
self.num_recv_streams -= 1;
stream.is_counted = false;
}
}
@@ -163,3 +185,13 @@ impl Counts {
self.num_reset_streams -= 1;
}
}
impl Drop for Counts {
fn drop(&mut self) {
use std::thread;
if !thread::panicking() {
debug_assert!(!self.has_streams());
}
}
}

View File

@@ -134,6 +134,7 @@ impl Prioritize {
frame: frame::Data<B>,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
counts: &mut Counts,
task: &mut Option<Task>,
) -> Result<(), UserError>
where
@@ -176,7 +177,7 @@ impl Prioritize {
if frame.is_end_stream() {
stream.state.send_close();
self.reserve_capacity(0, stream);
self.reserve_capacity(0, stream, counts);
}
trace!(
@@ -210,7 +211,11 @@ impl Prioritize {
}
/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) {
pub fn reserve_capacity(
&mut self,
capacity: WindowSize,
stream: &mut store::Ptr,
counts: &mut Counts) {
trace!(
"reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
stream.id,
@@ -239,7 +244,7 @@ impl Prioritize {
stream.send_flow.claim_capacity(diff);
self.assign_connection_capacity(diff, stream);
self.assign_connection_capacity(diff, stream, counts);
}
} else {
// Update the target requested capacity
@@ -284,36 +289,49 @@ impl Prioritize {
&mut self,
inc: WindowSize,
store: &mut Store,
counts: &mut Counts,
) -> Result<(), Reason> {
// Update the connection's window
self.flow.inc_window(inc)?;
self.assign_connection_capacity(inc, store);
self.assign_connection_capacity(inc, store, counts);
Ok(())
}
/// Reclaim all capacity assigned to the stream and re-assign it to the
/// connection
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr) {
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
let available = stream.send_flow.available().as_size();
stream.send_flow.claim_capacity(available);
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream);
self.assign_connection_capacity(available, stream, counts);
}
/// Reclaim just reserved capacity, not buffered capacity, and re-assign
/// it to the connection
pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr) {
pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
// only reclaim requested capacity that isn't already buffered
if stream.requested_send_capacity > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data;
stream.send_flow.claim_capacity(reserved);
self.assign_connection_capacity(reserved, stream);
self.assign_connection_capacity(reserved, stream, counts);
}
}
pub fn assign_connection_capacity<R>(&mut self, inc: WindowSize, store: &mut R)
pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_capacity.pop(store) {
counts.transition(stream, |_, stream| {
trace!("clear_pending_capacity; stream={:?}", stream.id);
})
}
}
pub fn assign_connection_capacity<R>(
&mut self,
inc: WindowSize,
store: &mut R,
counts: &mut Counts)
where
R: Resolve,
{
@@ -323,15 +341,17 @@ impl Prioritize {
// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
let mut stream = match self.pending_capacity.pop(store) {
let stream = match self.pending_capacity.pop(store) {
Some(stream) => stream,
None => return,
};
// Try to assign capacity to the stream. This will also re-queue the
// stream if there isn't enough connection level capacity to fulfill
// the capacity request.
self.try_assign_capacity(&mut stream);
counts.transition(stream, |_, mut stream| {
// Try to assign capacity to the stream. This will also re-queue the
// stream if there isn't enough connection level capacity to fulfill
// the capacity request.
self.try_assign_capacity(&mut stream);
})
}
}
@@ -595,6 +615,13 @@ impl Prioritize {
}
}
pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_send.pop(store) {
let is_pending_reset = stream.is_pending_reset_expiration();
counts.transition_after(stream, is_pending_reset);
}
}
fn pop_frame<B>(
&mut self,
buffer: &mut Buffer<Frame<B>>,
@@ -613,21 +640,22 @@ impl Prioritize {
trace!("pop_frame; stream={:?}; stream.state={:?}",
stream.id, stream.state);
// If the stream receives a RESET from the peer, it may have
// had data buffered to be sent, but all the frames are cleared
// in clear_queue(). Instead of doing O(N) traversal through queue
// to remove, lets just ignore peer_reset streams here.
if stream.state.is_peer_reset() {
continue;
}
// It's possible that this stream, besides having data to send,
// is also queued to send a reset, and thus is already in the queue
// to wait for "some time" after a reset.
//
// To be safe, we just always ask the stream.
let is_counted = stream.is_counted();
let is_pending_reset = stream.is_pending_reset_expiration();
// If the stream receives a RESET from the peer, it may have
// had data buffered to be sent, but all the frames are cleared
// in clear_queue(). Instead of doing O(N) traversal through queue
// to remove, lets just ignore peer_reset streams here.
if stream.state.is_peer_reset() {
counts.transition_after(stream, is_pending_reset);
continue;
}
trace!(" --> stream={:?}; is_pending_reset={:?};",
stream.id, is_pending_reset);
@@ -754,7 +782,7 @@ impl Prioritize {
self.pending_send.push(&mut stream);
}
counts.transition_after(stream, is_counted, is_pending_reset);
counts.transition_after(stream, is_pending_reset);
return Some(frame);
},
@@ -770,7 +798,7 @@ impl Prioritize {
if let Some(mut stream) = self.pending_open.pop(store) {
trace!("schedule_pending_open; stream={:?}", stream.id);
counts.inc_num_send_streams();
counts.inc_num_send_streams(&mut stream);
self.pending_send.push(&mut stream);
} else {
return;

View File

@@ -162,7 +162,7 @@ impl Recv {
}
// Increment the number of concurrent streams
counts.inc_num_recv_streams();
counts.inc_num_recv_streams(stream);
}
if !stream.content_length.is_head() {
@@ -680,12 +680,7 @@ impl Recv {
// if max allow is 0, this won't be able to evict,
// and then we'll just bail after
if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) {
// It's possible that this stream is still sitting in a send queue,
// such as if some data is to be sent and then a CANCEL. In this case,
// it could still be "counted", so we just make sure to always ask the
// stream instead of assuming.
let is_counted = evicted.is_counted();
counts.transition_after(evicted, is_counted, true);
counts.transition_after(evicted, true);
}
}
@@ -728,14 +723,48 @@ impl Recv {
let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
now - reset_at > reset_duration
}) {
let is_counted = stream.is_counted();
counts.transition_after(stream, is_counted, true);
counts.transition_after(stream, true);
}
}
pub fn clear_queues(&mut self,
clear_pending_accept: bool,
store: &mut Store,
counts: &mut Counts)
{
self.clear_stream_window_update_queue(store, counts);
self.clear_all_reset_streams(store, counts);
if clear_pending_accept {
self.clear_all_pending_accept(store, counts);
}
}
fn clear_stream_window_update_queue(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_window_updates.pop(store) {
counts.transition(stream, |_, stream| {
trace!("clear_stream_window_update_queue; stream={:?}", stream.id);
})
}
}
/// Called on EOF
fn clear_all_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_reset_expired.pop(store) {
counts.transition_after(stream, true);
}
}
fn clear_all_pending_accept(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_accept.pop(store) {
counts.transition_after(stream, false);
}
}
pub fn poll_complete<T, B>(
&mut self,
store: &mut Store,
counts: &mut Counts,
dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error>
where
@@ -746,7 +775,7 @@ impl Recv {
try_ready!(self.send_connection_window_update(dst));
// Send any pending stream level window updates
try_ready!(self.send_stream_window_updates(store, dst));
try_ready!(self.send_stream_window_updates(store, counts, dst));
Ok(().into())
}
@@ -781,11 +810,11 @@ impl Recv {
Ok(().into())
}
/// Send stream level window update
pub fn send_stream_window_updates<T, B>(
&mut self,
store: &mut Store,
counts: &mut Counts,
dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error>
where
@@ -797,38 +826,43 @@ impl Recv {
try_ready!(dst.poll_ready());
// Get the next stream
let mut stream = match self.pending_window_updates.pop(store) {
let stream = match self.pending_window_updates.pop(store) {
Some(stream) => stream,
None => return Ok(().into()),
};
if !stream.state.is_recv_streaming() {
// No need to send window updates on the stream if the stream is
// no longer receiving data.
//
// TODO: is this correct? We could possibly send a window
// update on a ReservedRemote stream if we already know
// we want to stream the data faster...
continue;
}
counts.transition(stream, |_, stream| {
trace!("pending_window_updates -- pop; stream={:?}", stream.id);
debug_assert!(!stream.is_pending_window_update);
// TODO: de-dup
if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
// Create the WINDOW_UPDATE frame
let frame = frame::WindowUpdate::new(stream.id, incr);
if !stream.state.is_recv_streaming() {
// No need to send window updates on the stream if the stream is
// no longer receiving data.
//
// TODO: is this correct? We could possibly send a window
// update on a ReservedRemote stream if we already know
// we want to stream the data faster...
return;
}
// Buffer it
dst.buffer(frame.into())
.ok()
.expect("invalid WINDOW_UPDATE frame");
// TODO: de-dup
if let Some(incr) = stream.recv_flow.unclaimed_capacity() {
// Create the WINDOW_UPDATE frame
let frame = frame::WindowUpdate::new(stream.id, incr);
// Update flow control
stream
.recv_flow
.inc_window(incr)
.ok()
.expect("unexpected flow control state");
}
// Buffer it
dst.buffer(frame.into())
.ok()
.expect("invalid WINDOW_UPDATE frame");
// Update flow control
stream
.recv_flow
.inc_window(incr)
.ok()
.expect("unexpected flow control state");
}
})
}
}

View File

@@ -80,7 +80,7 @@ impl Send {
if counts.peer().is_local_init(frame.stream_id()) {
if counts.can_inc_num_send_streams() {
counts.inc_num_send_streams();
counts.inc_num_send_streams(stream);
} else {
self.prioritize.queue_open(stream);
}
@@ -104,6 +104,7 @@ impl Send {
reason: Reason,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
counts: &mut Counts,
task: &mut Option<Task>,
) {
let is_reset = stream.state.is_reset();
@@ -146,7 +147,7 @@ impl Send {
return;
}
self.recv_err(buffer, stream);
self.recv_err(buffer, stream, counts);
let frame = frame::Reset::new(stream.id, reason);
@@ -158,6 +159,7 @@ impl Send {
&mut self,
stream: &mut store::Ptr,
reason: Reason,
counts: &mut Counts,
task: &mut Option<Task>,
) {
if stream.state.is_closed() {
@@ -167,7 +169,7 @@ impl Send {
stream.state.set_scheduled_reset(reason);
self.prioritize.reclaim_reserved_capacity(stream);
self.prioritize.reclaim_reserved_capacity(stream, counts);
self.prioritize.schedule_send(stream, task);
}
@@ -176,11 +178,12 @@ impl Send {
frame: frame::Data<B>,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
counts: &mut Counts,
task: &mut Option<Task>,
) -> Result<(), UserError>
where B: Buf,
{
self.prioritize.send_data(frame, buffer, stream, task)
self.prioritize.send_data(frame, buffer, stream, counts, task)
}
pub fn send_trailers<B>(
@@ -188,6 +191,7 @@ impl Send {
frame: frame::Headers,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
counts: &mut Counts,
task: &mut Option<Task>,
) -> Result<(), UserError> {
// TODO: Should this logic be moved into state.rs?
@@ -201,7 +205,7 @@ impl Send {
self.prioritize.queue_frame(frame.into(), buffer, stream, task);
// Release any excess capacity
self.prioritize.reserve_capacity(0, stream);
self.prioritize.reserve_capacity(0, stream, counts);
Ok(())
}
@@ -220,8 +224,13 @@ impl Send {
}
/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) {
self.prioritize.reserve_capacity(capacity, stream)
pub fn reserve_capacity(
&mut self,
capacity: WindowSize,
stream: &mut store::Ptr,
counts: &mut Counts)
{
self.prioritize.reserve_capacity(capacity, stream, counts)
}
pub fn poll_capacity(
@@ -258,9 +267,10 @@ impl Send {
&mut self,
frame: frame::WindowUpdate,
store: &mut Store,
counts: &mut Counts,
) -> Result<(), Reason> {
self.prioritize
.recv_connection_window_update(frame.size_increment(), store)
.recv_connection_window_update(frame.size_increment(), store, counts)
}
pub fn recv_stream_window_update<B>(
@@ -268,6 +278,7 @@ impl Send {
sz: WindowSize,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
counts: &mut Counts,
task: &mut Option<Task>,
) -> Result<(), Reason> {
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
@@ -275,7 +286,7 @@ impl Send {
self.send_reset(
Reason::FLOW_CONTROL_ERROR.into(),
buffer, stream, task);
buffer, stream, counts, task);
return Err(e);
}
@@ -295,11 +306,12 @@ impl Send {
pub fn recv_err<B>(
&mut self,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr
stream: &mut store::Ptr,
counts: &mut Counts,
) {
// Clear all pending outbound frames
self.prioritize.clear_queue(buffer, stream);
self.prioritize.reclaim_all_capacity(stream);
self.prioritize.reclaim_all_capacity(stream, counts);
}
pub fn apply_remote_settings<B>(
@@ -307,6 +319,7 @@ impl Send {
settings: &frame::Settings,
buffer: &mut Buffer<Frame<B>>,
store: &mut Store,
counts: &mut Counts,
task: &mut Option<Task>,
) -> Result<(), RecvError> {
// Applies an update to the remote endpoint's initial window size.
@@ -361,12 +374,12 @@ impl Send {
})?;
self.prioritize
.assign_connection_capacity(total_reclaimed, store);
.assign_connection_capacity(total_reclaimed, store, counts);
} else if val > old_val {
let inc = val - old_val;
store.for_each(|mut stream| {
self.recv_stream_window_update(inc, buffer, &mut stream, task)
self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
.map_err(RecvError::Connection)
})?;
}
@@ -375,6 +388,11 @@ impl Send {
Ok(())
}
pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) {
self.prioritize.clear_pending_capacity(store, counts);
self.prioritize.clear_pending_send(store, counts);
}
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
if let Ok(next) = self.next_stream_id {
if id >= next {

View File

@@ -341,18 +341,6 @@ impl State {
}
}
/// Returns true if a stream is open or half-closed.
pub fn is_at_least_half_open(&self) -> bool {
match self.inner {
Open {
..
} => true,
HalfClosedLocal(..) => true,
HalfClosedRemote(..) => true,
_ => false,
}
}
pub fn is_send_streaming(&self) -> bool {
match self.inner {
Open {

View File

@@ -4,6 +4,7 @@ use slab;
use indexmap::{self, IndexMap};
use std::fmt;
use std::marker::PhantomData;
use std::ops;
@@ -202,6 +203,16 @@ impl Store {
}
}
impl Drop for Store {
fn drop(&mut self) {
use std::thread;
if !thread::panicking() {
debug_assert!(self.slab.is_empty());
}
}
}
// ===== impl Queue =====
impl<N> Queue<N>
@@ -356,6 +367,12 @@ impl<'a> ops::DerefMut for Ptr<'a> {
}
}
impl<'a> fmt::Debug for Ptr<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
(**self).fmt(fmt)
}
}
// ===== impl OccupiedEntry =====
impl<'a> OccupiedEntry<'a> {

View File

@@ -22,6 +22,10 @@ pub(super) struct Stream {
/// Current state of the stream
pub state: State,
/// Set to `true` when the stream is counted against the connection's max
/// concurrent streams.
pub is_counted: bool,
/// Number of outstanding handles pointing to this stream
pub ref_count: usize,
@@ -151,6 +155,7 @@ impl Stream {
id,
state: State::default(),
ref_count: 0,
is_counted: false,
// ===== Fields related to sending =====
next_pending_send: None,
@@ -194,12 +199,6 @@ impl Stream {
self.ref_count -= 1;
}
/// Returns true if a stream with the current state counts against the
/// concurrency limit.
pub fn is_counted(&self) -> bool {
!self.is_pending_open && self.state.is_at_least_half_open()
}
/// Returns true if stream is currently being held for some time because of
/// a local reset.
pub fn is_pending_reset_expiration(&self) -> bool {

View File

@@ -180,6 +180,7 @@ where
actions.send.schedule_implicit_reset(
stream,
Reason::REFUSED_STREAM,
counts,
&mut actions.task);
actions.recv.enqueue_reset_expiration(stream, counts);
Ok(())
@@ -201,7 +202,7 @@ where
actions.recv.recv_trailers(frame, stream)
};
actions.reset_on_recv_stream_err(send_buffer, stream, res)
actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
})
}
@@ -228,7 +229,7 @@ where
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |_, stream| {
me.counts.transition(stream, |counts, stream| {
let sz = frame.payload().len();
let res = actions.recv.recv_data(frame, stream);
@@ -239,7 +240,7 @@ where
actions.recv.release_connection_capacity(sz as WindowSize, &mut None);
}
actions.reset_on_recv_stream_err(send_buffer, stream, res)
actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
})
}
@@ -297,9 +298,9 @@ where
me.store
.for_each(|stream| {
counts.transition(stream, |_, stream| {
counts.transition(stream, |counts, stream| {
actions.recv.recv_err(err, &mut *stream);
actions.send.recv_err(send_buffer, stream);
actions.send.recv_err(send_buffer, stream, counts);
Ok::<_, ()>(())
})
})
@@ -337,9 +338,9 @@ where
me.store
.for_each(|stream| if stream.id > last_stream_id {
counts.transition(stream, |_, stream| {
counts.transition(stream, |counts, stream| {
actions.recv.recv_err(&err, &mut *stream);
actions.send.recv_err(send_buffer, stream);
actions.send.recv_err(send_buffer, stream, counts);
Ok::<_, ()>(())
})
} else {
@@ -352,27 +353,6 @@ where
Ok(())
}
pub fn recv_eof(&mut self) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let actions = &mut me.actions;
let counts = &mut me.counts;
if actions.conn_error.is_none() {
actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into());
}
me.store
.for_each(|stream| {
counts.transition(stream, |_, stream| {
actions.recv.recv_eof(stream);
Ok::<_, ()>(())
})
})
.expect("recv_eof");
}
pub fn last_processed_id(&self) -> StreamId {
self.inner.lock().unwrap().actions.recv.last_processed_id()
}
@@ -388,7 +368,7 @@ where
if id.is_zero() {
me.actions
.send
.recv_connection_window_update(frame, &mut me.store)
.recv_connection_window_update(frame, &mut me.store, &mut me.counts)
.map_err(RecvError::Connection)?;
} else {
// The remote may send window updates for streams that the local now
@@ -401,6 +381,7 @@ where
frame.size_increment(),
send_buffer,
&mut stream,
&mut me.counts,
&mut me.actions.task,
);
} else {
@@ -443,7 +424,11 @@ where
if let Some(ref mut new_stream) = me.store.find_mut(&promised_id) {
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
me.actions.reset_on_recv_stream_err(&mut *send_buffer, new_stream, Err(err))
me.actions.reset_on_recv_stream_err(
&mut *send_buffer,
new_stream,
&mut me.counts,
Err(err))
} else {
// If there was a stream error, the stream should have been stored
// so we can track sending a reset.
@@ -519,7 +504,7 @@ where
//
// TODO: It would probably be better to interleave updates w/ data
// frames.
try_ready!(me.actions.recv.poll_complete(&mut me.store, dst));
try_ready!(me.actions.recv.poll_complete(&mut me.store, &mut me.counts, dst));
// Send any other pending frames
try_ready!(me.actions.send.poll_complete(
@@ -545,7 +530,7 @@ where
me.counts.apply_remote_settings(frame);
me.actions.send.apply_remote_settings(
frame, send_buffer, &mut me.store, &mut me.actions.task)
frame, send_buffer, &mut me.store, &mut me.counts, &mut me.actions.task)
}
pub fn send_request(
@@ -665,7 +650,7 @@ where
me.counts.transition(stream, |counts, stream| {
actions.send.send_reset(
reason, send_buffer, stream, &mut actions.task);
reason, send_buffer, stream, counts, &mut actions.task);
actions.recv.enqueue_reset_expiration(stream, counts)
})
}
@@ -705,6 +690,41 @@ impl<B, P> Streams<B, P>
where
P: Peer,
{
/// This function is safe to call multiple times.
///
/// A `Result` is returned to avoid panicking if the mutex is poisoned.
pub fn recv_eof(&mut self, clear_pending_accept: bool) -> Result<(), ()> {
let mut me = self.inner.lock().map_err(|_| ())?;
let me = &mut *me;
let actions = &mut me.actions;
let counts = &mut me.counts;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
if actions.conn_error.is_none() {
actions.conn_error = Some(io::Error::from(io::ErrorKind::BrokenPipe).into());
}
trace!("Streams::recv_eof");
me.store
.for_each(|stream| {
counts.transition(stream, |counts, stream| {
actions.recv.recv_eof(stream);
// This handles resetting send state associated with the
// stream
actions.send.recv_err(send_buffer, stream, counts);
Ok::<_, ()>(())
})
})
.expect("recv_eof");
actions.clear_queues(clear_pending_accept, &mut me.store, counts);
Ok(())
}
pub fn num_active_streams(&self) -> usize {
let me = self.inner.lock().unwrap();
me.store.num_active_streams()
@@ -759,13 +779,18 @@ impl<B> StreamRef<B> {
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |_, stream| {
me.counts.transition(stream, |counts, stream| {
// Create the data frame
let mut frame = frame::Data::new(stream.id, data);
frame.set_end_stream(end_stream);
// Send the data frame
actions.send.send_data(frame, send_buffer, stream, &mut actions.task)
actions.send.send_data(
frame,
send_buffer,
stream,
counts,
&mut actions.task)
})
}
@@ -778,13 +803,13 @@ impl<B> StreamRef<B> {
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |_, stream| {
me.counts.transition(stream, |counts, stream| {
// Create the trailers frame
let frame = frame::Headers::trailers(stream.id, trailers);
// Send the trailers frame
actions.send.send_trailers(
frame, send_buffer, stream, &mut actions.task)
frame, send_buffer, stream, counts, &mut actions.task)
})
}
@@ -797,9 +822,9 @@ impl<B> StreamRef<B> {
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |_, stream| {
me.counts.transition(stream, |counts, stream| {
actions.send.send_reset(
reason, send_buffer, stream, &mut actions.task)
reason, send_buffer, stream, counts, &mut actions.task)
})
}
@@ -852,7 +877,7 @@ impl<B> StreamRef<B> {
let mut stream = me.store.resolve(self.opaque.key);
me.actions.send.reserve_capacity(capacity, &mut stream)
me.actions.send.reserve_capacity(capacity, &mut stream, &mut me.counts)
}
/// Returns the stream's current send capacity.
@@ -1008,6 +1033,9 @@ fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
let me = &mut *me;
let mut stream = me.store.resolve(key);
trace!("drop_stream_ref; stream={:?}", stream);
// decrement the stream's ref count by 1.
stream.ref_dec();
@@ -1042,6 +1070,7 @@ fn maybe_cancel(stream: &mut store::Ptr, actions: &mut Actions, counts: &mut Cou
actions.send.schedule_implicit_reset(
stream,
Reason::CANCEL,
counts,
&mut actions.task);
actions.recv.enqueue_reset_expiration(stream, counts);
}
@@ -1063,6 +1092,7 @@ impl Actions {
&mut self,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
counts: &mut Counts,
res: Result<(), RecvError>,
) -> Result<(), RecvError> {
if let Err(RecvError::Stream {
@@ -1070,7 +1100,7 @@ impl Actions {
}) = res
{
// Reset the stream.
self.send.send_reset(reason, buffer, stream, &mut self.task);
self.send.send_reset(reason, buffer, stream, counts, &mut self.task);
Ok(())
} else {
res
@@ -1092,4 +1122,13 @@ impl Actions {
Ok(())
}
}
fn clear_queues(&mut self,
clear_pending_accept: bool,
store: &mut Store,
counts: &mut Counts)
{
self.recv.clear_queues(clear_pending_accept, store, counts);
self.send.clear_queues(store, counts);
}
}

View File

@@ -258,6 +258,13 @@ impl Mock<frame::GoAway> {
frame::Reason::FRAME_SIZE_ERROR,
))
}
pub fn no_error(self) -> Self {
Mock(frame::GoAway::new(
self.0.last_stream_id(),
frame::Reason::NO_ERROR,
))
}
}
impl From<Mock<frame::GoAway>> for SendFrame {

View File

@@ -839,6 +839,7 @@ fn rst_while_closing() {
// Send the RST_STREAM frame which causes the client to panic.
.send_frame(frames::reset(1).cancel())
.ping_pong([1; 8])
.recv_frame(frames::go_away(0).no_error())
.close();
;
@@ -1038,6 +1039,7 @@ fn send_err_with_buffered_data() {
.recv_frame(
frames::data(1, vec![0; 16_384]))
.recv_frame(frames::reset(1).cancel())
.recv_frame(frames::go_away(0).no_error())
.close()
;