StreamRef sends RST_STREAM on drop (#109)

This PR modifies the `Drop` implementation for `StreamRef` to reset the underlying stream if it is the last reference to that stream. Since both `Stream` and `Body` are internally just a `StreamRef`, this means they will both reset the stream on drop; thus, this closes #100.

The assertion that the store no longer contains the dropped stream ID at the end of the `Drop` method  had to be removed, as the stream has to be reset from inside of a `transition` block (which now manages releasing that ID for us), and the `transition` closure moves the value of `stream`, making the assertion no longer possible.

Modifications to some of the tests in `flow_control.rs` were also necessary, in order to prevent `StreamRef`s from being dropped too early.
This commit is contained in:
Eliza Weisman
2017-10-05 18:05:18 -05:00
committed by GitHub
parent ecd2764f4b
commit 2e3dcf602c
9 changed files with 177 additions and 57 deletions

View File

@@ -27,7 +27,6 @@ pub fn main() {
let server = listener.incoming().for_each(move |(socket, _)| {
// let socket = io_dump::Dump::to_stdout(socket);
let connection = Server::handshake(socket)
.and_then(|conn| {
println!("H2 connection bound");
@@ -35,6 +34,7 @@ pub fn main() {
conn.for_each(|(request, mut stream)| {
println!("GOT request: {:?}", request);
let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
if let Err(e) = stream.send_response(response, false) {
@@ -47,12 +47,11 @@ pub fn main() {
}
Ok(())
}).and_then(|_| {
println!(
"~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~"
);
Ok(())
})
})
})
.and_then(|_| {
println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~");
Ok(())
})
.then(|res| {
if let Err(e) = res {

View File

@@ -46,7 +46,6 @@ pub(crate) struct Prioritized<B> {
impl<B, P> Prioritize<B, P>
where
B: Buf,
P: Peer,
{
pub fn new(config: &Config) -> Prioritize<B, P> {
@@ -101,7 +100,10 @@ where
frame: frame::Data<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) -> Result<(), UserError> {
) -> Result<(), UserError>
where
B: Buf,
{
let sz = frame.payload().remaining();
if sz > MAX_WINDOW_SIZE as usize {
@@ -369,6 +371,7 @@ where
) -> Poll<(), io::Error>
where
T: AsyncWrite,
B: Buf,
{
// Ensure codec is ready
try_ready!(dst.poll_ready());
@@ -422,7 +425,10 @@ where
&mut self,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>,
) -> bool {
) -> bool
where
B: Buf,
{
trace!("try reclaim frame");
// First check if there are any data chunks to take back
@@ -485,7 +491,10 @@ where
store: &mut Store<B, P>,
max_len: usize,
counts: &mut Counts<P>,
) -> Option<Frame<Prioritized<B>>> {
) -> Option<Frame<Prioritized<B>>>
where
B: Buf,
{
trace!("pop_frame");
loop {

View File

@@ -59,7 +59,6 @@ struct Indices {
impl<B, P> Recv<B, P>
where
B: Buf,
P: Peer,
{
pub fn new(config: &Config) -> Self {
@@ -468,6 +467,7 @@ where
) -> Poll<(), io::Error>
where
T: AsyncWrite,
B: Buf,
{
if let Some(stream_id) = self.refused {
try_ready!(dst.poll_ready());
@@ -493,6 +493,7 @@ where
) -> Poll<(), io::Error>
where
T: AsyncWrite,
B: Buf,
{
// Send any pending connection level window updates
try_ready!(self.send_connection_window_update(dst));
@@ -510,6 +511,7 @@ where
) -> Poll<(), io::Error>
where
T: AsyncWrite,
B: Buf,
{
if let Some(incr) = self.flow.unclaimed_capacity() {
let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
@@ -541,6 +543,7 @@ where
) -> Poll<(), io::Error>
where
T: AsyncWrite,
B: Buf,
{
loop {
// Ensure the codec has capacity

View File

@@ -26,7 +26,6 @@ where
impl<B, P> Send<B, P>
where
B: Buf,
P: Peer,
{
/// Create a new `Send`
@@ -81,27 +80,72 @@ where
Ok(())
}
/// Send an RST_STREAM frame
///
/// # Arguments
/// + `reason`: the error code for the RST_STREAM frame
/// + `clear_queue`: if true, all pending outbound frames will be cleared,
/// if false, the RST_STREAM frame will be appended to the end of the
/// send queue.
pub fn send_reset(
&mut self,
reason: Reason,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
clear_queue: bool,
) {
if stream.state.is_reset() {
let is_reset = stream.state.is_reset();
let is_closed = stream.state.is_closed();
let is_empty = stream.pending_send.is_empty();
trace!(
"send_reset(..., reason={:?}, stream={:?}, ..., \
clear_queue={:?});\n\
is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
state={:?} \
",
stream.id,
reason,
clear_queue,
is_reset,
is_closed,
is_empty,
stream.state
);
if is_reset {
// Don't double reset
trace!(
" -> not sending RST_STREAM ({:?} is already reset)",
stream.id
);
return;
}
// If closed AND the send queue is flushed, then the stream cannot be
// reset either
if stream.state.is_closed() && stream.pending_send.is_empty() {
// reset explicitly, either. Implicit resets can still be queued.
if is_closed && (is_empty || !clear_queue) {
trace!(
" -> not sending explicit RST_STREAM ({:?} was closed \
and send queue was flushed)",
stream.id
);
return;
}
// Transition the state
stream.state.set_reset(reason);
self.recv_err(stream);
// TODO: this could be a call to `recv_err`, but that will always
// clear the send queue. could we pass whether or not to clear
// the send queue to that method?
if clear_queue {
// Clear all pending outbound frames
self.prioritize.clear_queue(stream);
}
// Reclaim all capacity assigned to the stream and re-assign it to the
// connection
let available = stream.send_flow.available();
stream.send_flow.claim_capacity(available);
let frame = frame::Reset::new(stream.id, reason);
@@ -116,7 +160,10 @@ where
frame: frame::Data<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) -> Result<(), UserError> {
) -> Result<(), UserError>
where
B: Buf,
{
self.prioritize.send_data(frame, stream, task)
}
@@ -150,6 +197,7 @@ where
) -> Poll<(), io::Error>
where
T: AsyncWrite,
B: Buf,
{
self.prioritize.poll_complete(store, counts, dst)
}
@@ -205,7 +253,7 @@ where
) -> Result<(), Reason> {
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
debug!("recv_stream_window_update !!; err={:?}", e);
self.send_reset(FlowControlError.into(), stream, task);
self.send_reset(FlowControlError.into(), stream, task, true);
return Err(e);
}

View File

@@ -103,10 +103,6 @@ where
}
}
pub fn contains_id(&self, id: &StreamId) -> bool {
self.ids.contains_key(id)
}
pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<B, P>> {
let key = match self.ids.get(id) {
Some(key) => *key,

View File

@@ -220,8 +220,8 @@ where
// There are no more outstanding references to the stream
self.ref_count == 0 &&
// The stream is not in any queue
!self.is_pending_send && !self.is_pending_send_capacity
&& !self.is_pending_accept && !self.is_pending_window_update
!self.is_pending_send && !self.is_pending_send_capacity &&
!self.is_pending_accept && !self.is_pending_window_update
}
pub fn assign_capacity(&mut self, capacity: WindowSize) {

View File

@@ -7,7 +7,7 @@ use proto::*;
use http::HeaderMap;
use std::io;
use std::{fmt, io};
use std::sync::{Arc, Mutex};
#[derive(Debug)]
@@ -19,7 +19,6 @@ where
}
/// Reference to the stream state
#[derive(Debug)]
pub(crate) struct StreamRef<B, P>
where
P: Peer,
@@ -450,7 +449,9 @@ where
let actions = &mut me.actions;
me.counts.transition(stream, |_, stream| {
actions.send.send_reset(reason, stream, &mut actions.task)
actions
.send
.send_reset(reason, stream, &mut actions.task, true)
})
}
}
@@ -480,7 +481,6 @@ where
impl<B, P> Streams<B, P>
where
B: Buf,
P: Peer,
{
pub fn num_active_streams(&self) -> usize {
@@ -498,7 +498,6 @@ where
// no derive because we don't need B and P to be Clone.
impl<B, P> Clone for Streams<B, P>
where
B: Buf,
P: Peer,
{
fn clone(&self) -> Self {
@@ -512,10 +511,13 @@ where
impl<B, P> StreamRef<B, P>
where
B: Buf,
P: Peer,
{
pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError> {
pub fn send_data(&mut self, data: B, end_stream: bool)
-> Result<(), UserError>
where
B: Buf,
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -556,7 +558,9 @@ where
let actions = &mut me.actions;
me.counts.transition(stream, |_, stream| {
actions.send.send_reset(reason, stream, &mut actions.task)
actions
.send
.send_reset(reason, stream, &mut actions.task, true)
})
}
@@ -580,7 +584,10 @@ where
})
}
pub fn body_is_empty(&self) -> bool {
pub fn body_is_empty(&self) -> bool
where
B: Buf,
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -589,7 +596,10 @@ where
me.actions.recv.body_is_empty(&stream)
}
pub fn poll_data(&mut self) -> Poll<Option<Bytes>, proto::Error> {
pub fn poll_data(&mut self) -> Poll<Option<Bytes>, proto::Error>
where
B: Buf,
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -598,7 +608,10 @@ where
me.actions.recv.poll_data(&mut stream)
}
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, proto::Error> {
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, proto::Error>
where
B: Buf,
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -609,7 +622,13 @@ where
/// Releases recv capacity back to the peer. This may result in sending
/// WINDOW_UPDATE frames on both the stream and connection.
pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {
pub fn release_capacity(
&mut self,
capacity: WindowSize
) -> Result<(), UserError>
where
B: Buf,
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -710,11 +729,26 @@ where
}
}
impl<B, P> fmt::Debug for StreamRef<B, P>
where
P: Peer,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let me = self.inner.lock().unwrap();
let stream = &me.store[self.key];
fmt.debug_struct("StreamRef")
.field("stream_id", &stream.id)
.field("ref_count", &stream.ref_count)
.finish()
}
}
impl<B, P> Drop for StreamRef<B, P>
where
P: Peer,
{
fn drop(&mut self) {
trace!("StreamRef::drop({:?})", self);
let mut me = match self.inner.lock() {
Ok(inner) => inner,
Err(_) => if ::std::thread::panicking() {
@@ -727,18 +761,29 @@ where
let me = &mut *me;
let id = {
let mut stream = me.store.resolve(self.key);
stream.ref_dec();
let mut stream = me.store.resolve(self.key);
// decrement the stream's ref count by 1.
stream.ref_dec();
if !stream.is_released() {
return;
}
stream.remove()
};
debug_assert!(!me.store.contains_id(&id));
let actions = &mut me.actions;
// the reset must be sent inside a `transition` block.
// `transition_after` will release the stream if it is
// released.
let recv_closed = stream.state.is_recv_closed();
me.counts.transition(stream, |_, stream|
// if this is the last reference to the stream, reset the stream.
if stream.ref_count == 0 && !recv_closed {
trace!(
" -> last reference to {:?} was dropped, trying to reset",
stream.id,
);
actions.send.send_reset(
Reason::Cancel,
stream,
&mut actions.task,
false
);
});
}
}
@@ -759,7 +804,7 @@ where
}) = res
{
// Reset the stream.
self.send.send_reset(reason, stream, &mut self.task);
self.send.send_reset(reason, stream, &mut self.task, true);
Ok(())
} else {
res

View File

@@ -378,9 +378,12 @@ fn stream_close_by_data_frame_releases_capacity() {
// Send the frame
s2.send_data("hello".into(), true).unwrap();
// Wait for the connection to close
h2.unwrap()
});
// Drive both streams to prevent the handles from being dropped
// (which will send a RST_STREAM) before the connection is closed.
h2.drive(s1)
.and_then(move |(h2, _)| h2.drive(s2))
})
.unwrap();
let srv = srv.assert_client_handshake()
.unwrap()
@@ -446,9 +449,12 @@ fn stream_close_by_trailers_frame_releases_capacity() {
// Send the frame
s2.send_data("hello".into(), true).unwrap();
// Wait for the connection to close
h2.unwrap()
});
// Drive both streams to prevent the handles from being dropped
// (which will send a RST_STREAM) before the connection is closed.
h2.drive(s1)
.and_then(move |(h2, _)| h2.drive(s2))
})
.unwrap();
let srv = srv.assert_client_handshake().unwrap()
// Get the first frame
@@ -527,7 +533,16 @@ fn recv_window_update_on_stream_closed_by_data_frame() {
stream.send_data("hello".into(), true).unwrap();
// Wait for the connection to close
h2.unwrap()
h2.map(|h2| {
// keep `stream` from being dropped in order to prevent
// it from sending an RST_STREAM frame.
std::mem::forget(stream);
// i know this is kind of evil, but it's necessary to
// ensure that the stream is closed by the EOS frame,
// and not by the RST_STREAM.
h2
})
.unwrap()
});
let srv = srv.assert_client_handshake()

View File

@@ -202,7 +202,12 @@ fn send_data_receive_window_update() {
let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize];
stream.send_data(payload.into(), true).unwrap();
h2.unwrap()
h2.map(|h2| {
// keep `stream` from being dropped in order to prevent
// it from sending an RST_STREAM frame.
std::mem::forget(stream);
h2
}).unwrap()
});
let mock = mock.assert_client_handshake().unwrap()