From 38bbf30b2fd3156455fa62cd99ed009a726f80ba Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 7 Sep 2017 14:12:21 -0700 Subject: [PATCH] Fix bug in prioritization (#63) The stream buffered data counter was never decremented. --- src/frame/mod.rs | 1 + src/frame/ping.rs | 1 + src/frame/settings.rs | 3 + src/proto/connection.rs | 1 + src/proto/mod.rs | 2 - src/proto/streams/prioritize.rs | 8 ++ tests/prioritization.rs | 151 ++++++++++++++++++++++++++++++++ tests/support/src/assert.rs | 20 +++++ tests/support/src/prelude.rs | 1 + 9 files changed, 186 insertions(+), 2 deletions(-) diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 94b500d..69ac0c5 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -54,6 +54,7 @@ pub use self::window_update::WindowUpdate; // Re-export some constants pub use self::settings::{ DEFAULT_SETTINGS_HEADER_TABLE_SIZE, + DEFAULT_INITIAL_WINDOW_SIZE, DEFAULT_MAX_FRAME_SIZE, MAX_INITIAL_WINDOW_SIZE, MAX_MAX_FRAME_SIZE, diff --git a/src/frame/ping.rs b/src/frame/ping.rs index 1a4d342..32bad8c 100644 --- a/src/frame/ping.rs +++ b/src/frame/ping.rs @@ -12,6 +12,7 @@ pub struct Ping { } impl Ping { + #[cfg(feature = "unstable")] pub fn new() -> Ping { Ping { ack: false, diff --git a/src/frame/settings.rs b/src/frame/settings.rs index f158bdd..b4ae32f 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -36,6 +36,9 @@ const ALL: u8 = ACK; /// The default value of SETTINGS_HEADER_TABLE_SIZE pub const DEFAULT_SETTINGS_HEADER_TABLE_SIZE: usize = 4_096; +/// The default value of SETTINGS_INITIAL_WINDOW_SIZE +pub const DEFAULT_INITIAL_WINDOW_SIZE: u32 = 65_535; + /// The default value of MAX_FRAME_SIZE pub const DEFAULT_MAX_FRAME_SIZE: FrameSize = 16_384; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index b090f79..7617890 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -2,6 +2,7 @@ use {client, frame, server, proto}; use frame::Reason; use codec::{SendError, RecvError}; +use frame::DEFAULT_INITIAL_WINDOW_SIZE; use proto::*; use http::Request; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index a48a26b..54036b5 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -30,6 +30,4 @@ pub type PingPayload = [u8; 8]; pub type WindowSize = u32; // Constants -// TODO: Move these into `frame` -pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index a732625..a2b2311 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -281,6 +281,8 @@ impl Prioritize // If data is buffered, then schedule the stream for execution if stream.buffered_send_data > 0 { debug_assert!(stream.send_flow.available() > 0); + debug_assert!(!stream.pending_send.is_empty()); + self.pending_send.push(stream); } } @@ -382,6 +384,7 @@ impl Prioritize // If needed, schedule the sender if stream.send_flow.available() > 0 { + debug_assert!(!stream.pending_send.is_empty()); self.pending_send.push(stream); } } @@ -404,6 +407,7 @@ impl Prioritize match self.pending_send.pop(store) { Some(mut stream) => { trace!("pop_frame; stream={:?}", stream.id); + debug_assert!(!stream.pending_send.is_empty()); let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { Frame::Data(mut frame) => { @@ -456,6 +460,10 @@ impl Prioritize trace!(" -- updating stream flow --"); stream.send_flow.send_data(len as WindowSize); + // Decrement the stream's buffered data counter + debug_assert!(stream.buffered_send_data >= len as u32); + stream.buffered_send_data -= len as u32; + // Assign the capacity back to the connection that // was just consumed from the stream in the previous // line. diff --git a/tests/prioritization.rs b/tests/prioritization.rs index 2640cc4..0138d93 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -1,3 +1,4 @@ +#[macro_use] extern crate h2_test_support; use h2_test_support::prelude::*; @@ -168,3 +169,153 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { h2.wait().unwrap(); } + +#[macro_use] +extern crate futures; + +use futures::{Poll, Async}; +use std::fmt; + +// TODO: These types should be extracted out +struct WaitForCapacity { + stream: Option>, + target: usize, +} + +struct Drive { + conn: Option>, + fut: T, +} + +impl Future for Drive + where T: Future, + T::Error: fmt::Debug, +{ + type Item = (Client, T::Item); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.fut.poll() { + Ok(Async::Ready(v)) => return Ok((self.conn.take().unwrap(), v).into()), + Ok(_) => {} + Err(e) => panic!("unexpected error; {:?}", e), + } + + match self.conn.as_mut().unwrap().poll() { + Ok(Async::Ready(_)) => panic!(), + Ok(Async::NotReady) => {} + Err(e) => panic!("unexpected error; {:?}", e), + } + + Ok(Async::NotReady) + } +} + +impl WaitForCapacity { + fn stream(&mut self) -> &mut client::Stream { + self.stream.as_mut().unwrap() + } +} + +impl Future for WaitForCapacity { + type Item = client::Stream; + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = try_ready!(self.stream().poll_capacity().map_err(|_| panic!())); + + let act = self.stream().capacity(); + + if act >= self.target { + return Ok(self.stream.take().unwrap().into()) + } + + Ok(Async::NotReady) + } +} + +#[test] +fn send_data_receive_window_update() { + let _ = ::env_logger::init(); + let (m, mock) = mock::new(); + + let h2 = Client::handshake(m).unwrap() + .and_then(|mut h2| { + let request = Request::builder() + .method(Method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + // Send request + let mut stream = h2.request(request, false).unwrap(); + + // Send data frame + stream.send_data("hello".into(), false).unwrap(); + + stream.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize); + + // Wait for capacity + let fut = WaitForCapacity { + stream: Some(stream), + target: frame::DEFAULT_INITIAL_WINDOW_SIZE as usize, + }; + + Drive { + conn: Some(h2), + fut: fut, + } + }) + .and_then(|(h2, mut stream)| { + let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; + stream.send_data(payload.into(), true).unwrap(); + + h2.unwrap() + }); + + let mock = mock.assert_client_handshake().unwrap() + .and_then(|(_, mock)| { + mock.into_future().unwrap() + }) + .and_then(|(frame, mock)| { + let _ = assert_headers!(frame.unwrap()); + mock.into_future().unwrap() + }) + .and_then(|(frame, mut mock)| { + let data = assert_data!(frame.unwrap()); + + // Update the windows + let len = data.payload().len(); + let f = frame::WindowUpdate::new(StreamId::zero(), len as u32); + mock.send(f.into()).unwrap(); + + let f = frame::WindowUpdate::new(data.stream_id(), len as u32); + mock.send(f.into()).unwrap(); + + mock.into_future().unwrap() + }) + // TODO: Dedup the following lines + .and_then(|(frame, mock)| { + let data = assert_data!(frame.unwrap()); + assert_eq!(data.payload().len(), frame::DEFAULT_MAX_FRAME_SIZE as usize); + mock.into_future().unwrap() + }) + .and_then(|(frame, mock)| { + let data = assert_data!(frame.unwrap()); + assert_eq!(data.payload().len(), frame::DEFAULT_MAX_FRAME_SIZE as usize); + mock.into_future().unwrap() + }) + .and_then(|(frame, mock)| { + let data = assert_data!(frame.unwrap()); + assert_eq!(data.payload().len(), frame::DEFAULT_MAX_FRAME_SIZE as usize); + mock.into_future().unwrap() + }) + .and_then(|(frame, _)| { + let data = assert_data!(frame.unwrap()); + assert_eq!(data.payload().len(), (frame::DEFAULT_MAX_FRAME_SIZE-1) as usize); + Ok(()) + }) + ; + + let _ = h2.join(mock) + .wait().unwrap(); +} diff --git a/tests/support/src/assert.rs b/tests/support/src/assert.rs index bed10d5..2a4159a 100644 --- a/tests/support/src/assert.rs +++ b/tests/support/src/assert.rs @@ -6,6 +6,26 @@ macro_rules! assert_closed { }} } +#[macro_export] +macro_rules! assert_headers { + ($frame:expr) => {{ + match $frame { + ::h2::frame::Frame::Headers(v) => v, + f => panic!("expected HEADERS; actual={:?}", f), + } + }} +} + +#[macro_export] +macro_rules! assert_data { + ($frame:expr) => {{ + match $frame { + ::h2::frame::Frame::Data(v) => v, + f => panic!("expected DATA; actual={:?}", f), + } + }} +} + #[macro_export] macro_rules! assert_ping { ($frame:expr) => {{ diff --git a/tests/support/src/prelude.rs b/tests/support/src/prelude.rs index 2c2f849..d4c662f 100644 --- a/tests/support/src/prelude.rs +++ b/tests/support/src/prelude.rs @@ -3,6 +3,7 @@ pub use super::h2; pub use self::h2::*; +pub use self::h2::frame::StreamId; pub use self::h2::client::{self, Client}; pub use self::h2::server::{self, Server};