Make 'pending reset' streams not count towards active streams
This commit is contained in:
@@ -173,7 +173,7 @@ where
|
|||||||
pub fn maybe_close_connection_if_no_streams(&mut self) {
|
pub fn maybe_close_connection_if_no_streams(&mut self) {
|
||||||
// If we poll() and realize that there are no streams or references
|
// If we poll() and realize that there are no streams or references
|
||||||
// then we can close the connection by transitioning to GOAWAY
|
// then we can close the connection by transitioning to GOAWAY
|
||||||
if self.streams.num_active_streams() == 0 && !self.streams.has_streams_or_other_references() {
|
if !self.streams.has_streams_or_other_references() {
|
||||||
self.go_away_now(Reason::NO_ERROR);
|
self.go_away_now(Reason::NO_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -202,7 +202,7 @@ where
|
|||||||
try_ready!(self.streams.poll_complete(&mut self.codec));
|
try_ready!(self.streams.poll_complete(&mut self.codec));
|
||||||
|
|
||||||
if self.error.is_some() || self.go_away.should_close_on_idle() {
|
if self.error.is_some() || self.go_away.should_close_on_idle() {
|
||||||
if self.streams.num_active_streams() == 0 {
|
if !self.streams.has_streams() {
|
||||||
self.go_away_now(Reason::NO_ERROR);
|
self.go_away_now(Reason::NO_ERROR);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -147,7 +147,6 @@ impl Counts {
|
|||||||
if stream.is_closed() {
|
if stream.is_closed() {
|
||||||
if !stream.is_pending_reset_expiration() {
|
if !stream.is_pending_reset_expiration() {
|
||||||
stream.unlink();
|
stream.unlink();
|
||||||
|
|
||||||
if is_reset_counted {
|
if is_reset_counted {
|
||||||
self.dec_num_reset_streams();
|
self.dec_num_reset_streams();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -193,6 +193,7 @@ impl ops::IndexMut<Key> for Store {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Store {
|
impl Store {
|
||||||
|
#[cfg(feature = "unstable")]
|
||||||
pub fn num_active_streams(&self) -> usize {
|
pub fn num_active_streams(&self) -> usize {
|
||||||
self.ids.len()
|
self.ids.len()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -746,11 +746,17 @@ where
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "unstable")]
|
||||||
pub fn num_active_streams(&self) -> usize {
|
pub fn num_active_streams(&self) -> usize {
|
||||||
let me = self.inner.lock().unwrap();
|
let me = self.inner.lock().unwrap();
|
||||||
me.store.num_active_streams()
|
me.store.num_active_streams()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn has_streams(&self) -> bool {
|
||||||
|
let me = self.inner.lock().unwrap();
|
||||||
|
me.counts.has_streams()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn has_streams_or_other_references(&self) -> bool {
|
pub fn has_streams_or_other_references(&self) -> bool {
|
||||||
let me = self.inner.lock().unwrap();
|
let me = self.inner.lock().unwrap();
|
||||||
me.counts.has_streams() || me.refs > 1
|
me.counts.has_streams() || me.refs > 1
|
||||||
|
|||||||
@@ -375,17 +375,8 @@ fn send_reset_notifies_recv_stream() {
|
|||||||
)
|
)
|
||||||
.send_frame(frames::headers(1).response(200))
|
.send_frame(frames::headers(1).response(200))
|
||||||
.recv_frame(frames::reset(1).refused())
|
.recv_frame(frames::reset(1).refused())
|
||||||
.recv_frame(
|
.recv_frame(frames::go_away(0))
|
||||||
frames::headers(3)
|
.recv_eof();
|
||||||
.request("POST", "https://example.com/")
|
|
||||||
.eos()
|
|
||||||
)
|
|
||||||
.send_frame(
|
|
||||||
frames::headers(3)
|
|
||||||
.response(200)
|
|
||||||
.eos()
|
|
||||||
)
|
|
||||||
.close();
|
|
||||||
|
|
||||||
let client = client::handshake(io)
|
let client = client::handshake(io)
|
||||||
.expect("handshake")
|
.expect("handshake")
|
||||||
@@ -424,22 +415,10 @@ fn send_reset_notifies_recv_stream() {
|
|||||||
unordered.push(Box::new(tx));
|
unordered.push(Box::new(tx));
|
||||||
|
|
||||||
conn.drive(unordered.for_each(|_| Ok(())))
|
conn.drive(unordered.for_each(|_| Ok(())))
|
||||||
.map(move |(conn, _)| (client, conn))
|
.and_then(move |(conn, _)| {
|
||||||
})
|
drop(client); // now let client gracefully goaway
|
||||||
.and_then(|(mut client, conn)| {
|
conn.expect("client")
|
||||||
// send a second request just to keep the connection alive until
|
})
|
||||||
// we know the previous `RecvStream` was notified about the reset.
|
|
||||||
let request = Request::builder()
|
|
||||||
.method(Method::POST)
|
|
||||||
.uri("https://example.com/")
|
|
||||||
.body(())
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let (resp2, _) = client.send_request(request, true).unwrap();
|
|
||||||
let fut = resp2.map(|_res| ());
|
|
||||||
|
|
||||||
conn.drive(fut)
|
|
||||||
.and_then(|(conn, _)| conn.expect("client"))
|
|
||||||
});
|
});
|
||||||
|
|
||||||
client.join(srv).wait().expect("wait");
|
client.join(srv).wait().expect("wait");
|
||||||
|
|||||||
@@ -198,7 +198,7 @@ fn pending_push_promises_reset_when_dropped() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
conn.drive(req)
|
conn.drive(req)
|
||||||
.and_then(|(conn, _)| conn.expect("client"))
|
.and_then(move |(conn, _)| conn.expect("client").map(move |()| drop(client)))
|
||||||
});
|
});
|
||||||
|
|
||||||
client.join(srv).wait().expect("wait");
|
client.join(srv).wait().expect("wait");
|
||||||
|
|||||||
@@ -513,7 +513,7 @@ fn send_rst_stream_allows_recv_data() {
|
|||||||
|
|
||||||
conn.expect("client")
|
conn.expect("client")
|
||||||
.drive(req)
|
.drive(req)
|
||||||
.and_then(|(conn, _)| conn)
|
.and_then(move |(conn, _)| conn.map(move |()| drop(client)))
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
@@ -562,7 +562,7 @@ fn send_rst_stream_allows_recv_trailers() {
|
|||||||
|
|
||||||
conn.expect("client")
|
conn.expect("client")
|
||||||
.drive(req)
|
.drive(req)
|
||||||
.and_then(|(conn, _)| conn)
|
.and_then(move |(conn, _)| conn.map(move |()| drop(client)))
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
@@ -709,7 +709,8 @@ fn rst_stream_max() {
|
|||||||
|
|
||||||
conn.drive(req1.join(req2))
|
conn.drive(req1.join(req2))
|
||||||
.and_then(|(conn, _)| conn.expect_err("client"))
|
.and_then(|(conn, _)| conn.expect_err("client"))
|
||||||
.map(|err| {
|
.map(move |err| {
|
||||||
|
drop(client);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
err.to_string(),
|
err.to_string(),
|
||||||
"protocol error: unspecific protocol error detected"
|
"protocol error: unspecific protocol error detected"
|
||||||
@@ -1127,9 +1128,9 @@ fn srv_window_update_on_lower_stream_id() {
|
|||||||
|
|
||||||
h2.expect("client")
|
h2.expect("client")
|
||||||
.drive(response)
|
.drive(response)
|
||||||
.and_then(|(h2, _)| {
|
.and_then(move |(h2, _)| {
|
||||||
println!("RESPONSE DONE");
|
println!("RESPONSE DONE");
|
||||||
h2
|
h2.map(move |()| drop(client))
|
||||||
})
|
})
|
||||||
.then(|result| {
|
.then(|result| {
|
||||||
println!("WUT");
|
println!("WUT");
|
||||||
|
|||||||
Reference in New Issue
Block a user