From 93925e6d1ffe4259e7a3daacd3caba60a2ac098f Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 12 Sep 2017 10:48:11 -0700 Subject: [PATCH] Limit send flow control bug to window_size (#78) Senders could set the available capacity greater than the current `window_size`. This caused a panic when the sender attempted to send more than the receiver could accept. --- src/codec/framed_read.rs | 1 + src/proto/streams/prioritize.rs | 5 +-- tests/flow_control.rs | 80 +++++++++++++++++++++++++++++++++ tests/prioritization.rs | 39 +--------------- tests/support/src/lib.rs | 3 ++ tests/support/src/mock.rs | 61 +++++++++++++++++++++++-- tests/support/src/prelude.rs | 3 ++ tests/support/src/util.rs | 44 ++++++++++++++++++ 8 files changed, 192 insertions(+), 44 deletions(-) create mode 100644 tests/support/src/util.rs diff --git a/src/codec/framed_read.rs b/src/codec/framed_read.rs index f65dfb8..7f428cb 100644 --- a/src/codec/framed_read.rs +++ b/src/codec/framed_read.rs @@ -249,6 +249,7 @@ impl Stream for FramedRead trace!("poll; bytes={}B", bytes.len()); if let Some(frame) = try!(self.decode_frame(bytes)) { + debug!("received; frame={:?}", frame); return Ok(Async::Ready(Some(frame))); } } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 915b021..a8801c6 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -246,7 +246,8 @@ impl Prioritize // Don't assign more than the window has available! let additional = cmp::min( total_requested - stream.send_flow.available(), - stream.send_flow.window_size()); + // Can't assign more than what is available + stream.send_flow.window_size() - stream.send_flow.available()); trace!("try_assign_capacity; requested={}; additional={}; buffered={}; window={}; conn={}", total_requested, @@ -451,8 +452,6 @@ impl Prioritize Frame::Data(mut frame) => { // Get the amount of capacity remaining for stream's // window. - // - // TODO: Is this the right thing to check? let stream_capacity = stream.send_flow.available(); let sz = frame.payload().remaining(); diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 4b5be65..ab3e716 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -539,3 +539,83 @@ fn recv_window_update_on_stream_closed_by_data_frame() { let _ = h2.join(srv) .wait().unwrap(); } + +#[test] +fn reserved_capacity_assigned_in_multi_window_updates() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let h2 = Client::handshake(io).unwrap() + .and_then(|mut h2| { + let request = Request::builder() + .method(Method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + let mut stream = h2.request(request, false).unwrap(); + + // Consume the capacity + let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; + stream.send_data(payload.into(), false).unwrap(); + + // Reserve more data than we want + stream.reserve_capacity(10); + + h2.drive(util::wait_for_capacity(stream, 5)) + }) + .and_then(|(h2, mut stream)| { + stream.send_data("hello".into(), false).unwrap(); + stream.send_data("world".into(), true).unwrap(); + + h2.drive(GetResponse { stream: Some(stream) }) + }) + .and_then(|(h2, (response, _))| { + assert_eq!(response.status(), StatusCode::NO_CONTENT); + + // Wait for the connection to close + h2.unwrap() + }) + ; + + let srv = srv.assert_client_handshake().unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("POST", "https://http2.akamai.com/") + ) + .recv_frame(frames::data(1, vec![0u8; 16_384])) + .recv_frame(frames::data(1, vec![0u8; 16_384])) + .recv_frame(frames::data(1, vec![0u8; 16_384])) + .recv_frame(frames::data(1, vec![0u8; 16_383])) + .idle_ms(100) + // Increase the connection window + .send_frame( + frames::window_update(0, 10)) + // Incrementally increase the stream window + .send_frame( + frames::window_update(1, 4)) + .idle_ms(50) + .send_frame( + frames::window_update(1, 1)) + // Receive first chunk + .recv_frame(frames::data(1, "hello")) + .send_frame( + frames::window_update(1, 5)) + // Receive second chunk + .recv_frame( + frames::data(1, "world").eos()) + .send_frame( + frames::headers(1) + .response(204) + .eos() + ) + /* + .recv_frame(frames::data(1, "hello").eos()) + .send_frame(frames::window_update(1, 5)) + */ + .map(drop) + ; + + let _ = h2.join(srv) + .wait().unwrap(); +} diff --git a/tests/prioritization.rs b/tests/prioritization.rs index 76560ac..add69ed 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -170,40 +170,6 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { h2.wait().unwrap(); } -#[macro_use] -extern crate futures; - -use futures::{Poll, Async}; - -// TODO: Extract this out? -struct WaitForCapacity { - stream: Option>, - target: usize, -} - -impl WaitForCapacity { - fn stream(&mut self) -> &mut client::Stream { - self.stream.as_mut().unwrap() - } -} - -impl Future for WaitForCapacity { - type Item = client::Stream; - type Error = (); - - fn poll(&mut self) -> Poll { - let _ = try_ready!(self.stream().poll_capacity().map_err(|_| panic!())); - - let act = self.stream().capacity(); - - if act >= self.target { - return Ok(self.stream.take().unwrap().into()) - } - - Ok(Async::NotReady) - } -} - #[test] fn send_data_receive_window_update() { let _ = ::env_logger::init(); @@ -225,10 +191,7 @@ fn send_data_receive_window_update() { stream.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize); // Wait for capacity - h2.drive(WaitForCapacity { - stream: Some(stream), - target: frame::DEFAULT_INITIAL_WINDOW_SIZE as usize, - }) + h2.drive(util::wait_for_capacity(stream, frame::DEFAULT_INITIAL_WINDOW_SIZE as usize)) }) .and_then(|(h2, mut stream)| { let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; diff --git a/tests/support/src/lib.rs b/tests/support/src/lib.rs index d060fbd..027a97a 100644 --- a/tests/support/src/lib.rs +++ b/tests/support/src/lib.rs @@ -6,6 +6,8 @@ pub extern crate http; #[macro_use] pub extern crate tokio_io; + +#[macro_use] pub extern crate futures; pub extern crate mock_io; pub extern crate env_logger; @@ -19,6 +21,7 @@ pub mod raw; pub mod frames; pub mod prelude; pub mod mock; +pub mod util; mod future_ext; diff --git a/tests/support/src/mock.rs b/tests/support/src/mock.rs index ed13335..dc22a26 100644 --- a/tests/support/src/mock.rs +++ b/tests/support/src/mock.rs @@ -5,6 +5,7 @@ use h2::frame::{self, Frame}; use futures::{Async, Future, Stream, Poll}; use futures::task::{self, Task}; +use futures::sync::oneshot; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::read_exact; @@ -100,12 +101,18 @@ impl Handle { } /// Perform the H2 handshake - pub fn assert_client_handshake(mut self) + pub fn assert_client_handshake(self) + -> Box> + { + self.assert_client_handshake_with_settings(frame::Settings::default()) + } + + /// Perform the H2 handshake + pub fn assert_client_handshake_with_settings(mut self, settings: frame::Settings) -> Box> { // Send a settings frame - let frame = frame::Settings::default(); - self.send(frame.into()).unwrap(); + self.send(settings.into()).unwrap(); let ret = self.read_preface().unwrap() .and_then(|me| me.into_future().unwrap()) @@ -336,6 +343,31 @@ pub trait HandleFutureExt { } } + fn idle_ms(self, ms: usize) -> Box> + where Self: Sized + 'static, + Self: Future, + Self::Error: fmt::Debug, + { + use std::thread; + use std::time::Duration; + + + Box::new(self.and_then(move |handle| { + // This is terrible... but oh well + let (tx, rx) = oneshot::channel(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(ms as u64)); + tx.send(()).unwrap(); + }); + + Idle { + handle: Some(handle), + timeout: rx, + }.map_err(|_| unreachable!()) + })) + } + fn close(self) -> Box> where Self: Future + Sized + 'static, { @@ -387,6 +419,29 @@ impl Future for SendFrameFut } } +pub struct Idle { + handle: Option, + timeout: oneshot::Receiver<()>, +} + +impl Future for Idle { + type Item = Handle; + type Error = (); + + fn poll(&mut self) -> Poll { + if self.timeout.poll().unwrap().is_ready() { + return Ok(self.handle.take().unwrap().into()); + } + + match self.handle.as_mut().unwrap().poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + res => { + panic!("Received unexpected frame on handle; frame={:?}", res); + } + } + } +} + impl HandleFutureExt for T where T: Future + 'static, { diff --git a/tests/support/src/prelude.rs b/tests/support/src/prelude.rs index f8d7c87..9b8925e 100644 --- a/tests/support/src/prelude.rs +++ b/tests/support/src/prelude.rs @@ -13,6 +13,9 @@ pub use super::mock::{self, HandleFutureExt}; // Re-export frames helpers pub use super::frames; +// Re-export utility mod +pub use super::util; + // Re-export some type defines pub use super::{Codec, SendFrame}; diff --git a/tests/support/src/util.rs b/tests/support/src/util.rs new file mode 100644 index 0000000..fcfe516 --- /dev/null +++ b/tests/support/src/util.rs @@ -0,0 +1,44 @@ +use h2::client; + +use futures::{Poll, Async, Future}; +use bytes::Bytes; + +pub fn wait_for_capacity(stream: client::Stream, + target: usize) + -> WaitForCapacity +{ + WaitForCapacity { + stream: Some(stream), + target: target, + } +} + +pub struct WaitForCapacity { + stream: Option>, + target: usize, +} + +impl WaitForCapacity { + fn stream(&mut self) -> &mut client::Stream { + self.stream.as_mut().unwrap() + } +} + +impl Future for WaitForCapacity { + type Item = client::Stream; + type Error = (); + + fn poll(&mut self) -> Poll { + let _ = try_ready!(self.stream().poll_capacity().map_err(|_| panic!())); + + let act = self.stream().capacity(); + + println!("CAP={:?}", act); + + if act >= self.target { + return Ok(self.stream.take().unwrap().into()) + } + + Ok(Async::NotReady) + } +}