From ad90f9b97ba8457490e54cf6ea7842ee7e4bf30c Mon Sep 17 00:00:00 2001 From: Darren Tsung Date: Tue, 27 Feb 2018 10:35:00 -0800 Subject: [PATCH] Remove assert around self.pending_capacity.is_empty() (#225) This assert does not hold as many streams can be pushed into pending_capacity during a call to send_data(). See issue #224 for more discussion and sign-off. Closes #224 --- src/proto/streams/prioritize.rs | 3 -- src/proto/streams/store.rs | 4 --- tests/prioritization.rs | 64 +++++++++++++++++++++++++++++++++ tests/support/mod.rs | 6 ++-- 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index ce0d847..a20b258 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -353,9 +353,6 @@ impl Prioritize { // First check if capacity is immediately available if conn_available > 0 { - // There should be no streams pending capacity - debug_assert!(self.pending_capacity.is_empty()); - // The amount of capacity to assign to the stream // TODO: Should prioritization factor into this? let assign = cmp::min(conn_available, additional); diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index ea4f5fd..121b291 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -215,10 +215,6 @@ where } } - pub fn is_empty(&self) -> bool { - self.indices.is_none() - } - pub fn take(&mut self) -> Self { Queue { indices: self.indices.take(), diff --git a/tests/prioritization.rs b/tests/prioritization.rs index 2698317..67410df 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -1,5 +1,6 @@ #[macro_use] pub mod support; +use support::{DEFAULT_WINDOW_SIZE}; use support::prelude::*; #[test] @@ -65,6 +66,69 @@ fn single_stream_send_large_body() { h2.wait().unwrap(); } +#[test] +fn multiple_streams_with_payload_greater_than_default_window() { + let _ = ::env_logger::init(); + + let payload = vec![0; 16384*5-1]; + + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake().unwrap() + .recv_settings() + .recv_frame( + frames::headers(1).request("POST", "https://http2.akamai.com/") + ) + .recv_frame( + frames::headers(3).request("POST", "https://http2.akamai.com/") + ) + .recv_frame( + frames::headers(5).request("POST", "https://http2.akamai.com/") + ) + .recv_frame(frames::data(1, &payload[0..16_384])) + .recv_frame(frames::data(1, &payload[16_384..(16_384*2)])) + .recv_frame(frames::data(1, &payload[(16_384*2)..(16_384*3)])) + .recv_frame(frames::data(1, &payload[(16_384*3)..(16_384*4-1)])) + .send_frame(frames::settings()) + .recv_frame(frames::settings_ack()) + .send_frame(frames::headers(1).response(200).eos()) + .send_frame(frames::headers(3).response(200).eos()) + .send_frame(frames::headers(5).response(200).eos()) + .close(); + + let client = client::handshake(io).unwrap() + .and_then(|(mut client, conn)| { + let request1 = Request::post("https://http2.akamai.com/").body(()).unwrap(); + let request2 = Request::post("https://http2.akamai.com/").body(()).unwrap(); + let request3 = Request::post("https://http2.akamai.com/").body(()).unwrap(); + let (response1, mut stream1) = client.send_request(request1, false).unwrap(); + let (_response2, mut stream2) = client.send_request(request2, false).unwrap(); + let (_response3, mut stream3) = client.send_request(request3, false).unwrap(); + + // The capacity should be immediately + // allocated to default window size (smaller than payload) + stream1.reserve_capacity(payload.len()); + assert_eq!(stream1.capacity(), DEFAULT_WINDOW_SIZE); + + stream2.reserve_capacity(payload.len()); + assert_eq!(stream2.capacity(), 0); + + stream3.reserve_capacity(payload.len()); + assert_eq!(stream3.capacity(), 0); + + stream1.send_data(payload[..].into(), true).unwrap(); + + // hold onto streams so they don't close + // stream1 doesn't close because response1 is used + conn.drive(response1.expect("response")).map(|c| (c, client, stream2, stream3)) + }) + .and_then(|((conn, _res), client, stream2, stream3)| { + conn.expect("client").map(|c| (c, client, stream2, stream3)) + }); + + srv.join(client).wait().unwrap(); +} + #[test] fn single_stream_send_extra_large_body_multi_frames_one_buffer() { let _ = ::env_logger::try_init(); diff --git a/tests/support/mod.rs b/tests/support/mod.rs index acdf76a..0924afc 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -29,8 +29,8 @@ macro_rules! try_ready { macro_rules! try_nb { ($e:expr) => ({ - use $crate::support::futures::Async; use ::std::io::ErrorKind::WouldBlock; + use $crate::support::futures::Async; match $e { Ok(t) => t, @@ -59,9 +59,11 @@ mod future_ext; pub use self::future_ext::{FutureExt, Unwrap}; +pub type WindowSize = usize; +pub const DEFAULT_WINDOW_SIZE: WindowSize = (1 << 16) - 1; + // This is our test Codec type pub type Codec = h2::Codec>; // This is the frame type that is sent pub type SendFrame = h2::frame::Frame<::std::io::Cursor<::bytes::Bytes>>; -