diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index ac9866b..a310de1 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -158,7 +158,15 @@ impl Prioritize { stream.buffered_send_data ); - if stream.send_flow.available() >= stream.buffered_send_data { + // The `stream.buffered_send_data == 0` check is here so that, if a zero + // length data frame is queued to the front (there is no previously + // queued data), it gets sent out immediately even if there is no + // available send window. + // + // Sending out zero length data frames can be done to singal + // end-of-stream. + // + if stream.send_flow.available() > 0 || stream.buffered_send_data == 0 { // The stream currently has capacity to send the data frame, so // queue it up and notify the connection task. self.queue_frame(frame.into(), buffer, stream, task); diff --git a/tests/prioritization.rs b/tests/prioritization.rs index b1a87d5..734b893 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -10,6 +10,7 @@ fn single_stream_send_large_body() { let mock = mock_io::Builder::new() .handshake() + .write(frames::SETTINGS_ACK) .write(&[ // POST / 0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, @@ -20,13 +21,24 @@ fn single_stream_send_large_body() { 0, 4, 0, 0, 1, 0, 0, 0, 1, ]) .write(&payload[..]) - .write(frames::SETTINGS_ACK) // Read response .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) .build(); + let notify = MockNotify::new(); let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); + // Poll h2 once to get notifications + loop { + // Run the connection until all work is done, this handles processing + // the handshake. + notify.with(|| h2.poll()).unwrap(); + + if !notify.is_notified() { + break; + } + } + let request = Request::builder() .method(Method::POST) .uri("https://http2.akamai.com/") @@ -44,6 +56,8 @@ fn single_stream_send_large_body() { // Send the data stream.send_data(payload[..].into(), true).unwrap(); + assert!(notify.is_notified()); + // Get the response let resp = h2.run(response).unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); @@ -59,6 +73,7 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() { let mock = mock_io::Builder::new() .handshake() + .write(frames::SETTINGS_ACK) .write(&[ // POST / 0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, @@ -74,13 +89,24 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() { 0, 64, 0, 0, 1, 0, 0, 0, 1, ]) .write(&payload[16_384..]) - .write(frames::SETTINGS_ACK) // Read response .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) .build(); + let notify = MockNotify::new(); let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); + // Poll h2 once to get notifications + loop { + // Run the connection until all work is done, this handles processing + // the handshake. + notify.with(|| h2.poll()).unwrap(); + + if !notify.is_notified() { + break; + } + } + let request = Request::builder() .method(Method::POST) .uri("https://http2.akamai.com/") @@ -97,6 +123,101 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() { // Send the data stream.send_data(payload.into(), true).unwrap(); + assert!(notify.is_notified()); + + // Get the response + let resp = h2.run(response).unwrap(); + assert_eq!(resp.status(), StatusCode::NO_CONTENT); + + h2.wait().unwrap(); +} + +#[test] +fn single_stream_send_body_greater_than_default_window() { + let _ = ::env_logger::init(); + + let payload = vec![0; 16384*5-1]; + + let mock = mock_io::Builder::new() + .handshake() + .write(frames::SETTINGS_ACK) + .write(&[ + // POST / + 0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, + 172, 75, 143, 168, 233, 25, 151, 33, 233, 132, + ]) + .write(&[ + // DATA + 0, 64, 0, 0, 0, 0, 0, 0, 1, + ]) + .write(&payload[0..16_384]) + .write(&[ + // DATA + 0, 64, 0, 0, 0, 0, 0, 0, 1, + ]) + .write(&payload[16_384..(16_384*2)]) + .write(&[ + // DATA + 0, 64, 0, 0, 0, 0, 0, 0, 1, + ]) + .write(&payload[(16_384*2)..(16_384*3)]) + .write(&[ + // DATA + 0, 63, 255, 0, 0, 0, 0, 0, 1, + ]) + .write(&payload[(16_384*3)..(16_384*4-1)]) + + // Read window update + .read(&[0, 0, 4, 8, 0, 0, 0, 0, 0, 0, 0, 64, 0]) + .read(&[0, 0, 4, 8, 0, 0, 0, 0, 1, 0, 0, 64, 0]) + + .write(&[ + // DATA + 0, 64, 0, 0, 1, 0, 0, 0, 1, + ]) + .write(&payload[(16_384*4-1)..(16_384*5-1)]) + // Read response + .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) + .build(); + + let notify = MockNotify::new(); + let (mut client, mut h2) = Client::handshake(mock).wait().unwrap(); + + // Poll h2 once to get notifications + loop { + // Run the connection until all work is done, this handles processing + // the handshake. + notify.with(|| h2.poll()).unwrap(); + + if !notify.is_notified() { + break; + } + } + + let request = Request::builder() + .method(Method::POST) + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + // Flush request head + loop { + // Run the connection until all work is done, this handles processing + // the handshake. + notify.with(|| h2.poll()).unwrap(); + + if !notify.is_notified() { + break; + } + } + + // Send the data + stream.send_data(payload.into(), true).unwrap(); + + assert!(notify.is_notified()); + // Get the response let resp = h2.run(response).unwrap(); assert_eq!(resp.status(), StatusCode::NO_CONTENT); diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 68ab24b..732fdef 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -52,6 +52,7 @@ pub mod raw; pub mod frames; pub mod prelude; pub mod mock; +pub mod notify; pub mod util; mod future_ext; diff --git a/tests/support/notify.rs b/tests/support/notify.rs new file mode 100644 index 0000000..6f3e96a --- /dev/null +++ b/tests/support/notify.rs @@ -0,0 +1,55 @@ +use futures::executor::{self, Notify}; + +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::SeqCst; + +pub struct MockNotify { + inner: Arc, +} + +struct Inner { + notified: AtomicBool, +} + +impl MockNotify { + pub fn new() -> Self { + MockNotify { + inner: Arc::new(Inner { + notified: AtomicBool::new(false), + }), + } + } + + pub fn with R, R>(&self, f: F) -> R { + use futures::Async::Ready; + use futures::future::poll_fn; + + self.clear(); + + let mut f = Some(f); + + let res = executor::spawn(poll_fn(move || { + Ok::<_, ()>(Ready(f.take().unwrap()())) + })).poll_future_notify(&self.inner, 0); + + match res { + Ok(Ready(v)) => v, + _ => unreachable!(), + } + } + + pub fn clear(&self) { + self.inner.notified.store(false, SeqCst); + } + + pub fn is_notified(&self) -> bool { + self.inner.notified.load(SeqCst) + } +} + +impl Notify for Inner { + fn notify(&self, _: usize) { + self.notified.store(true, SeqCst); + } +} diff --git a/tests/support/prelude.rs b/tests/support/prelude.rs index 36022b3..23004ec 100644 --- a/tests/support/prelude.rs +++ b/tests/support/prelude.rs @@ -13,6 +13,9 @@ pub use super::mock::{self, HandleFutureExt}; // Re-export frames helpers pub use super::frames; +// Re-export mock notify +pub use super::notify::MockNotify; + // Re-export utility mod pub use super::util;