From 492f4e7f1168fc17f432294c48f54cb0150e124a Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 12 Mar 2019 14:39:48 -0700 Subject: [PATCH] Make 'pending reset' streams not count towards active streams --- src/proto/connection.rs | 4 ++-- src/proto/streams/counts.rs | 1 - src/proto/streams/store.rs | 1 + src/proto/streams/streams.rs | 6 +++++ tests/h2-tests/tests/client_request.rs | 33 +++++--------------------- tests/h2-tests/tests/push_promise.rs | 2 +- tests/h2-tests/tests/stream_states.rs | 11 +++++---- 7 files changed, 22 insertions(+), 36 deletions(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 87ac4a0..dc67c4f 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -173,7 +173,7 @@ where pub fn maybe_close_connection_if_no_streams(&mut self) { // If we poll() and realize that there are no streams or references // then we can close the connection by transitioning to GOAWAY - if self.streams.num_active_streams() == 0 && !self.streams.has_streams_or_other_references() { + if !self.streams.has_streams_or_other_references() { self.go_away_now(Reason::NO_ERROR); } } @@ -202,7 +202,7 @@ where try_ready!(self.streams.poll_complete(&mut self.codec)); if self.error.is_some() || self.go_away.should_close_on_idle() { - if self.streams.num_active_streams() == 0 { + if !self.streams.has_streams() { self.go_away_now(Reason::NO_ERROR); continue; } diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index e027426..f49da6c 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -147,7 +147,6 @@ impl Counts { if stream.is_closed() { if !stream.is_pending_reset_expiration() { stream.unlink(); - if is_reset_counted { self.dec_num_reset_streams(); } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index f66906a..1b7f2a2 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -193,6 +193,7 @@ impl ops::IndexMut for Store { } impl Store { + #[cfg(feature = "unstable")] pub fn num_active_streams(&self) -> usize { self.ids.len() } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 5f98d09..3b980b1 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -746,11 +746,17 @@ where Ok(()) } + #[cfg(feature = "unstable")] pub fn num_active_streams(&self) -> usize { let me = self.inner.lock().unwrap(); me.store.num_active_streams() } + pub fn has_streams(&self) -> bool { + let me = self.inner.lock().unwrap(); + me.counts.has_streams() + } + pub fn has_streams_or_other_references(&self) -> bool { let me = self.inner.lock().unwrap(); me.counts.has_streams() || me.refs > 1 diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 07627c6..f515551 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -375,17 +375,8 @@ fn send_reset_notifies_recv_stream() { ) .send_frame(frames::headers(1).response(200)) .recv_frame(frames::reset(1).refused()) - .recv_frame( - frames::headers(3) - .request("POST", "https://example.com/") - .eos() - ) - .send_frame( - frames::headers(3) - .response(200) - .eos() - ) - .close(); + .recv_frame(frames::go_away(0)) + .recv_eof(); let client = client::handshake(io) .expect("handshake") @@ -424,22 +415,10 @@ fn send_reset_notifies_recv_stream() { unordered.push(Box::new(tx)); conn.drive(unordered.for_each(|_| Ok(()))) - .map(move |(conn, _)| (client, conn)) - }) - .and_then(|(mut client, conn)| { - // send a second request just to keep the connection alive until - // we know the previous `RecvStream` was notified about the reset. - let request = Request::builder() - .method(Method::POST) - .uri("https://example.com/") - .body(()) - .unwrap(); - - let (resp2, _) = client.send_request(request, true).unwrap(); - let fut = resp2.map(|_res| ()); - - conn.drive(fut) - .and_then(|(conn, _)| conn.expect("client")) + .and_then(move |(conn, _)| { + drop(client); // now let client gracefully goaway + conn.expect("client") + }) }); client.join(srv).wait().expect("wait"); diff --git a/tests/h2-tests/tests/push_promise.rs b/tests/h2-tests/tests/push_promise.rs index 1631daa..95a6109 100644 --- a/tests/h2-tests/tests/push_promise.rs +++ b/tests/h2-tests/tests/push_promise.rs @@ -198,7 +198,7 @@ fn pending_push_promises_reset_when_dropped() { }); conn.drive(req) - .and_then(|(conn, _)| conn.expect("client")) + .and_then(move |(conn, _)| conn.expect("client").map(move |()| drop(client))) }); client.join(srv).wait().expect("wait"); diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index b801724..583b005 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -513,7 +513,7 @@ fn send_rst_stream_allows_recv_data() { conn.expect("client") .drive(req) - .and_then(|(conn, _)| conn) + .and_then(move |(conn, _)| conn.map(move |()| drop(client))) }); @@ -562,7 +562,7 @@ fn send_rst_stream_allows_recv_trailers() { conn.expect("client") .drive(req) - .and_then(|(conn, _)| conn) + .and_then(move |(conn, _)| conn.map(move |()| drop(client))) }); @@ -709,7 +709,8 @@ fn rst_stream_max() { conn.drive(req1.join(req2)) .and_then(|(conn, _)| conn.expect_err("client")) - .map(|err| { + .map(move |err| { + drop(client); assert_eq!( err.to_string(), "protocol error: unspecific protocol error detected" @@ -1127,9 +1128,9 @@ fn srv_window_update_on_lower_stream_id() { h2.expect("client") .drive(response) - .and_then(|(h2, _)| { + .and_then(move |(h2, _)| { println!("RESPONSE DONE"); - h2 + h2.map(move |()| drop(client)) }) .then(|result| { println!("WUT");