diff --git a/examples/server.rs b/examples/server.rs index 9d34f85..d49fada 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -27,7 +27,6 @@ pub fn main() { let server = listener.incoming().for_each(move |(socket, _)| { // let socket = io_dump::Dump::to_stdout(socket); - let connection = Server::handshake(socket) .and_then(|conn| { println!("H2 connection bound"); @@ -35,6 +34,7 @@ pub fn main() { conn.for_each(|(request, mut stream)| { println!("GOT request: {:?}", request); + let response = Response::builder().status(StatusCode::OK).body(()).unwrap(); if let Err(e) = stream.send_response(response, false) { @@ -47,12 +47,11 @@ pub fn main() { } Ok(()) - }).and_then(|_| { - println!( - "~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~" - ); - Ok(()) - }) + }) + }) + .and_then(|_| { + println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~"); + Ok(()) }) .then(|res| { if let Err(e) = res { diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index b02988f..70d8e4a 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -46,7 +46,6 @@ pub(crate) struct Prioritized { impl Prioritize where - B: Buf, P: Peer, { pub fn new(config: &Config) -> Prioritize { @@ -101,7 +100,10 @@ where frame: frame::Data, stream: &mut store::Ptr, task: &mut Option, - ) -> Result<(), UserError> { + ) -> Result<(), UserError> + where + B: Buf, + { let sz = frame.payload().remaining(); if sz > MAX_WINDOW_SIZE as usize { @@ -369,6 +371,7 @@ where ) -> Poll<(), io::Error> where T: AsyncWrite, + B: Buf, { // Ensure codec is ready try_ready!(dst.poll_ready()); @@ -422,7 +425,10 @@ where &mut self, store: &mut Store, dst: &mut Codec>, - ) -> bool { + ) -> bool + where + B: Buf, + { trace!("try reclaim frame"); // First check if there are any data chunks to take back @@ -485,7 +491,10 @@ where store: &mut Store, max_len: usize, counts: &mut Counts

, - ) -> Option>> { + ) -> Option>> + where + B: Buf, + { trace!("pop_frame"); loop { diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 0f560f8..ba983b5 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -59,7 +59,6 @@ struct Indices { impl Recv where - B: Buf, P: Peer, { pub fn new(config: &Config) -> Self { @@ -468,6 +467,7 @@ where ) -> Poll<(), io::Error> where T: AsyncWrite, + B: Buf, { if let Some(stream_id) = self.refused { try_ready!(dst.poll_ready()); @@ -493,6 +493,7 @@ where ) -> Poll<(), io::Error> where T: AsyncWrite, + B: Buf, { // Send any pending connection level window updates try_ready!(self.send_connection_window_update(dst)); @@ -510,6 +511,7 @@ where ) -> Poll<(), io::Error> where T: AsyncWrite, + B: Buf, { if let Some(incr) = self.flow.unclaimed_capacity() { let frame = frame::WindowUpdate::new(StreamId::zero(), incr); @@ -541,6 +543,7 @@ where ) -> Poll<(), io::Error> where T: AsyncWrite, + B: Buf, { loop { // Ensure the codec has capacity diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 3d0bbda..8933c84 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -26,7 +26,6 @@ where impl Send where - B: Buf, P: Peer, { /// Create a new `Send` @@ -81,27 +80,72 @@ where Ok(()) } + /// Send an RST_STREAM frame + /// + /// # Arguments + /// + `reason`: the error code for the RST_STREAM frame + /// + `clear_queue`: if true, all pending outbound frames will be cleared, + /// if false, the RST_STREAM frame will be appended to the end of the + /// send queue. pub fn send_reset( &mut self, reason: Reason, stream: &mut store::Ptr, task: &mut Option, + clear_queue: bool, ) { - if stream.state.is_reset() { + let is_reset = stream.state.is_reset(); + let is_closed = stream.state.is_closed(); + let is_empty = stream.pending_send.is_empty(); + trace!( + "send_reset(..., reason={:?}, stream={:?}, ..., \ + clear_queue={:?});\n\ + is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ + state={:?} \ + ", + stream.id, + reason, + clear_queue, + is_reset, + is_closed, + is_empty, + stream.state + ); + if is_reset { // Don't double reset + trace!( + " -> not sending RST_STREAM ({:?} is already reset)", + stream.id + ); return; } // If closed AND the send queue is flushed, then the stream cannot be - // reset either - if stream.state.is_closed() && stream.pending_send.is_empty() { + // reset explicitly, either. Implicit resets can still be queued. + if is_closed && (is_empty || !clear_queue) { + trace!( + " -> not sending explicit RST_STREAM ({:?} was closed \ + and send queue was flushed)", + stream.id + ); return; } // Transition the state stream.state.set_reset(reason); - self.recv_err(stream); + // TODO: this could be a call to `recv_err`, but that will always + // clear the send queue. could we pass whether or not to clear + // the send queue to that method? + if clear_queue { + // Clear all pending outbound frames + self.prioritize.clear_queue(stream); + } + + // Reclaim all capacity assigned to the stream and re-assign it to the + // connection + let available = stream.send_flow.available(); + stream.send_flow.claim_capacity(available); let frame = frame::Reset::new(stream.id, reason); @@ -116,7 +160,10 @@ where frame: frame::Data, stream: &mut store::Ptr, task: &mut Option, - ) -> Result<(), UserError> { + ) -> Result<(), UserError> + where + B: Buf, + { self.prioritize.send_data(frame, stream, task) } @@ -150,6 +197,7 @@ where ) -> Poll<(), io::Error> where T: AsyncWrite, + B: Buf, { self.prioritize.poll_complete(store, counts, dst) } @@ -205,7 +253,7 @@ where ) -> Result<(), Reason> { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { debug!("recv_stream_window_update !!; err={:?}", e); - self.send_reset(FlowControlError.into(), stream, task); + self.send_reset(FlowControlError.into(), stream, task, true); return Err(e); } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index c38f5e2..1a87f0f 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -103,10 +103,6 @@ where } } - pub fn contains_id(&self, id: &StreamId) -> bool { - self.ids.contains_key(id) - } - pub fn find_mut(&mut self, id: &StreamId) -> Option> { let key = match self.ids.get(id) { Some(key) => *key, diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 01c3599..b2988ef 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -220,8 +220,8 @@ where // There are no more outstanding references to the stream self.ref_count == 0 && // The stream is not in any queue - !self.is_pending_send && !self.is_pending_send_capacity - && !self.is_pending_accept && !self.is_pending_window_update + !self.is_pending_send && !self.is_pending_send_capacity && + !self.is_pending_accept && !self.is_pending_window_update } pub fn assign_capacity(&mut self, capacity: WindowSize) { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 2cc56b8..970e038 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -7,7 +7,7 @@ use proto::*; use http::HeaderMap; -use std::io; +use std::{fmt, io}; use std::sync::{Arc, Mutex}; #[derive(Debug)] @@ -19,7 +19,6 @@ where } /// Reference to the stream state -#[derive(Debug)] pub(crate) struct StreamRef where P: Peer, @@ -450,7 +449,9 @@ where let actions = &mut me.actions; me.counts.transition(stream, |_, stream| { - actions.send.send_reset(reason, stream, &mut actions.task) + actions + .send + .send_reset(reason, stream, &mut actions.task, true) }) } } @@ -480,7 +481,6 @@ where impl Streams where - B: Buf, P: Peer, { pub fn num_active_streams(&self) -> usize { @@ -498,7 +498,6 @@ where // no derive because we don't need B and P to be Clone. impl Clone for Streams where - B: Buf, P: Peer, { fn clone(&self) -> Self { @@ -512,10 +511,13 @@ where impl StreamRef where - B: Buf, P: Peer, { - pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError> { + pub fn send_data(&mut self, data: B, end_stream: bool) + -> Result<(), UserError> + where + B: Buf, + { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -556,7 +558,9 @@ where let actions = &mut me.actions; me.counts.transition(stream, |_, stream| { - actions.send.send_reset(reason, stream, &mut actions.task) + actions + .send + .send_reset(reason, stream, &mut actions.task, true) }) } @@ -580,7 +584,10 @@ where }) } - pub fn body_is_empty(&self) -> bool { + pub fn body_is_empty(&self) -> bool + where + B: Buf, + { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -589,7 +596,10 @@ where me.actions.recv.body_is_empty(&stream) } - pub fn poll_data(&mut self) -> Poll, proto::Error> { + pub fn poll_data(&mut self) -> Poll, proto::Error> + where + B: Buf, + { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -598,7 +608,10 @@ where me.actions.recv.poll_data(&mut stream) } - pub fn poll_trailers(&mut self) -> Poll, proto::Error> { + pub fn poll_trailers(&mut self) -> Poll, proto::Error> + where + B: Buf, + { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -609,7 +622,13 @@ where /// Releases recv capacity back to the peer. This may result in sending /// WINDOW_UPDATE frames on both the stream and connection. - pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> { + pub fn release_capacity( + &mut self, + capacity: WindowSize + ) -> Result<(), UserError> + where + B: Buf, + { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -710,11 +729,26 @@ where } } +impl fmt::Debug for StreamRef +where + P: Peer, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let me = self.inner.lock().unwrap(); + let stream = &me.store[self.key]; + fmt.debug_struct("StreamRef") + .field("stream_id", &stream.id) + .field("ref_count", &stream.ref_count) + .finish() + } +} + impl Drop for StreamRef where P: Peer, { fn drop(&mut self) { + trace!("StreamRef::drop({:?})", self); let mut me = match self.inner.lock() { Ok(inner) => inner, Err(_) => if ::std::thread::panicking() { @@ -727,18 +761,29 @@ where let me = &mut *me; - let id = { - let mut stream = me.store.resolve(self.key); - stream.ref_dec(); + let mut stream = me.store.resolve(self.key); + // decrement the stream's ref count by 1. + stream.ref_dec(); - if !stream.is_released() { - return; - } - - stream.remove() - }; - - debug_assert!(!me.store.contains_id(&id)); + let actions = &mut me.actions; + // the reset must be sent inside a `transition` block. + // `transition_after` will release the stream if it is + // released. + let recv_closed = stream.state.is_recv_closed(); + me.counts.transition(stream, |_, stream| + // if this is the last reference to the stream, reset the stream. + if stream.ref_count == 0 && !recv_closed { + trace!( + " -> last reference to {:?} was dropped, trying to reset", + stream.id, + ); + actions.send.send_reset( + Reason::Cancel, + stream, + &mut actions.task, + false + ); + }); } } @@ -759,7 +804,7 @@ where }) = res { // Reset the stream. - self.send.send_reset(reason, stream, &mut self.task); + self.send.send_reset(reason, stream, &mut self.task, true); Ok(()) } else { res diff --git a/tests/flow_control.rs b/tests/flow_control.rs index f25b920..65c77f3 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -378,9 +378,12 @@ fn stream_close_by_data_frame_releases_capacity() { // Send the frame s2.send_data("hello".into(), true).unwrap(); - // Wait for the connection to close - h2.unwrap() - }); + // Drive both streams to prevent the handles from being dropped + // (which will send a RST_STREAM) before the connection is closed. + h2.drive(s1) + .and_then(move |(h2, _)| h2.drive(s2)) + }) + .unwrap(); let srv = srv.assert_client_handshake() .unwrap() @@ -446,9 +449,12 @@ fn stream_close_by_trailers_frame_releases_capacity() { // Send the frame s2.send_data("hello".into(), true).unwrap(); - // Wait for the connection to close - h2.unwrap() - }); + // Drive both streams to prevent the handles from being dropped + // (which will send a RST_STREAM) before the connection is closed. + h2.drive(s1) + .and_then(move |(h2, _)| h2.drive(s2)) + }) + .unwrap(); let srv = srv.assert_client_handshake().unwrap() // Get the first frame @@ -527,7 +533,16 @@ fn recv_window_update_on_stream_closed_by_data_frame() { stream.send_data("hello".into(), true).unwrap(); // Wait for the connection to close - h2.unwrap() + h2.map(|h2| { + // keep `stream` from being dropped in order to prevent + // it from sending an RST_STREAM frame. + std::mem::forget(stream); + // i know this is kind of evil, but it's necessary to + // ensure that the stream is closed by the EOS frame, + // and not by the RST_STREAM. + h2 + }) + .unwrap() }); let srv = srv.assert_client_handshake() diff --git a/tests/prioritization.rs b/tests/prioritization.rs index b3a6c11..2e07b00 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -202,7 +202,12 @@ fn send_data_receive_window_update() { let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; stream.send_data(payload.into(), true).unwrap(); - h2.unwrap() + h2.map(|h2| { + // keep `stream` from being dropped in order to prevent + // it from sending an RST_STREAM frame. + std::mem::forget(stream); + h2 + }).unwrap() }); let mock = mock.assert_client_handshake().unwrap()