Remove assert around self.pending_capacity.is_empty() (#225)
This assert does not hold as many streams can be pushed into pending_capacity during a call to send_data(). See issue #224 for more discussion and sign-off. Closes #224
This commit is contained in:
		
				
					committed by
					
						 Sean McArthur
						Sean McArthur
					
				
			
			
				
	
			
			
			
						parent
						
							06672cbde9
						
					
				
				
					commit
					ad90f9b97b
				
			| @@ -353,9 +353,6 @@ impl Prioritize { | |||||||
|  |  | ||||||
|         // First check if capacity is immediately available |         // First check if capacity is immediately available | ||||||
|         if conn_available > 0 { |         if conn_available > 0 { | ||||||
|             // There should be no streams pending capacity |  | ||||||
|             debug_assert!(self.pending_capacity.is_empty()); |  | ||||||
|  |  | ||||||
|             // The amount of capacity to assign to the stream |             // The amount of capacity to assign to the stream | ||||||
|             // TODO: Should prioritization factor into this? |             // TODO: Should prioritization factor into this? | ||||||
|             let assign = cmp::min(conn_available, additional); |             let assign = cmp::min(conn_available, additional); | ||||||
|   | |||||||
| @@ -215,10 +215,6 @@ where | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn is_empty(&self) -> bool { |  | ||||||
|         self.indices.is_none() |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn take(&mut self) -> Self { |     pub fn take(&mut self) -> Self { | ||||||
|         Queue { |         Queue { | ||||||
|             indices: self.indices.take(), |             indices: self.indices.take(), | ||||||
|   | |||||||
| @@ -1,5 +1,6 @@ | |||||||
| #[macro_use] | #[macro_use] | ||||||
| pub mod support; | pub mod support; | ||||||
|  | use support::{DEFAULT_WINDOW_SIZE}; | ||||||
| use support::prelude::*; | use support::prelude::*; | ||||||
|  |  | ||||||
| #[test] | #[test] | ||||||
| @@ -65,6 +66,69 @@ fn single_stream_send_large_body() { | |||||||
|     h2.wait().unwrap(); |     h2.wait().unwrap(); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn multiple_streams_with_payload_greater_than_default_window() { | ||||||
|  |     let _ = ::env_logger::init(); | ||||||
|  |  | ||||||
|  |     let payload = vec![0; 16384*5-1]; | ||||||
|  |  | ||||||
|  |     let (io, srv) = mock::new(); | ||||||
|  |  | ||||||
|  |     let srv = srv.assert_client_handshake().unwrap() | ||||||
|  |         .recv_settings() | ||||||
|  |         .recv_frame( | ||||||
|  |             frames::headers(1).request("POST", "https://http2.akamai.com/") | ||||||
|  |         ) | ||||||
|  |         .recv_frame( | ||||||
|  |             frames::headers(3).request("POST", "https://http2.akamai.com/") | ||||||
|  |         ) | ||||||
|  |         .recv_frame( | ||||||
|  |             frames::headers(5).request("POST", "https://http2.akamai.com/") | ||||||
|  |         ) | ||||||
|  |         .recv_frame(frames::data(1, &payload[0..16_384])) | ||||||
|  |         .recv_frame(frames::data(1, &payload[16_384..(16_384*2)])) | ||||||
|  |         .recv_frame(frames::data(1, &payload[(16_384*2)..(16_384*3)])) | ||||||
|  |         .recv_frame(frames::data(1, &payload[(16_384*3)..(16_384*4-1)])) | ||||||
|  |         .send_frame(frames::settings()) | ||||||
|  |         .recv_frame(frames::settings_ack()) | ||||||
|  |         .send_frame(frames::headers(1).response(200).eos()) | ||||||
|  |         .send_frame(frames::headers(3).response(200).eos()) | ||||||
|  |         .send_frame(frames::headers(5).response(200).eos()) | ||||||
|  |         .close(); | ||||||
|  |  | ||||||
|  |     let client = client::handshake(io).unwrap() | ||||||
|  |         .and_then(|(mut client, conn)| { | ||||||
|  |             let request1 = Request::post("https://http2.akamai.com/").body(()).unwrap(); | ||||||
|  |             let request2 = Request::post("https://http2.akamai.com/").body(()).unwrap(); | ||||||
|  |             let request3 = Request::post("https://http2.akamai.com/").body(()).unwrap(); | ||||||
|  |             let (response1, mut stream1) = client.send_request(request1, false).unwrap(); | ||||||
|  |             let (_response2, mut stream2) = client.send_request(request2, false).unwrap(); | ||||||
|  |             let (_response3, mut stream3) = client.send_request(request3, false).unwrap(); | ||||||
|  |  | ||||||
|  |             // The capacity should be immediately | ||||||
|  |             // allocated to default window size (smaller than payload) | ||||||
|  |             stream1.reserve_capacity(payload.len()); | ||||||
|  |             assert_eq!(stream1.capacity(), DEFAULT_WINDOW_SIZE); | ||||||
|  |  | ||||||
|  |             stream2.reserve_capacity(payload.len()); | ||||||
|  |             assert_eq!(stream2.capacity(), 0); | ||||||
|  |  | ||||||
|  |             stream3.reserve_capacity(payload.len()); | ||||||
|  |             assert_eq!(stream3.capacity(), 0); | ||||||
|  |  | ||||||
|  |             stream1.send_data(payload[..].into(), true).unwrap(); | ||||||
|  |  | ||||||
|  |             // hold onto streams so they don't close | ||||||
|  |             // stream1 doesn't close because response1 is used | ||||||
|  |             conn.drive(response1.expect("response")).map(|c| (c, client, stream2, stream3)) | ||||||
|  |         }) | ||||||
|  |         .and_then(|((conn, _res), client, stream2, stream3)| { | ||||||
|  |             conn.expect("client").map(|c| (c, client, stream2, stream3)) | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     srv.join(client).wait().unwrap(); | ||||||
|  | } | ||||||
|  |  | ||||||
| #[test] | #[test] | ||||||
| fn single_stream_send_extra_large_body_multi_frames_one_buffer() { | fn single_stream_send_extra_large_body_multi_frames_one_buffer() { | ||||||
|     let _ = ::env_logger::try_init(); |     let _ = ::env_logger::try_init(); | ||||||
|   | |||||||
| @@ -29,8 +29,8 @@ macro_rules! try_ready { | |||||||
|  |  | ||||||
| macro_rules! try_nb { | macro_rules! try_nb { | ||||||
|     ($e:expr) => ({ |     ($e:expr) => ({ | ||||||
|         use $crate::support::futures::Async; |  | ||||||
|         use ::std::io::ErrorKind::WouldBlock; |         use ::std::io::ErrorKind::WouldBlock; | ||||||
|  |         use $crate::support::futures::Async; | ||||||
|  |  | ||||||
|         match $e { |         match $e { | ||||||
|             Ok(t) => t, |             Ok(t) => t, | ||||||
| @@ -59,9 +59,11 @@ mod future_ext; | |||||||
|  |  | ||||||
| pub use self::future_ext::{FutureExt, Unwrap}; | pub use self::future_ext::{FutureExt, Unwrap}; | ||||||
|  |  | ||||||
|  | pub type WindowSize = usize; | ||||||
|  | pub const DEFAULT_WINDOW_SIZE: WindowSize = (1 << 16) - 1; | ||||||
|  |  | ||||||
| // This is our test Codec type | // This is our test Codec type | ||||||
| pub type Codec<T> = h2::Codec<T, ::std::io::Cursor<::bytes::Bytes>>; | pub type Codec<T> = h2::Codec<T, ::std::io::Cursor<::bytes::Bytes>>; | ||||||
|  |  | ||||||
| // This is the frame type that is sent | // This is the frame type that is sent | ||||||
| pub type SendFrame = h2::frame::Frame<::std::io::Cursor<::bytes::Bytes>>; | pub type SendFrame = h2::frame::Frame<::std::io::Cursor<::bytes::Bytes>>; | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user