From d2aa9197f9ff080326cf2fa4186cc0d857eb9d77 Mon Sep 17 00:00:00 2001 From: Geoffry Song Date: Fri, 3 Aug 2018 16:00:13 -0700 Subject: [PATCH] Fix the handling of incoming SETTINGS_INITIAL_WINDOW_SIZE. (#299) --- src/proto/streams/send.rs | 26 ++++++++++--- tests/h2-tests/tests/flow_control.rs | 57 ++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 6 deletions(-) diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 89c243e..8b0c5c1 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -11,7 +11,7 @@ use futures::{Async, Poll}; use futures::task::Task; use tokio_io::AsyncWrite; -use std::{cmp, io}; +use std::io; /// Manages state transitions related to outbound frames. #[derive(Debug)] @@ -370,8 +370,8 @@ impl Send { self.init_window_sz = val; if val < old_val { + // We must decrease the (remote) window on every open stream. let dec = old_val - val; - trace!("decrementing all windows; dec={}", dec); let mut total_reclaimed = 0; @@ -380,15 +380,29 @@ impl Send { stream.send_flow.dec_window(dec); + // It's possible that decreasing the window causes + // `window_size` (the stream-specific window) to fall below + // `available` (the portion of the connection-level window + // that we have allocated to the stream). + // In this case, we should take that excess allocation away + // and reassign it to other streams. + let window_size = stream.send_flow.window_size(); let available = stream.send_flow.available().as_size(); - let reclaim = cmp::min(dec, available); - stream.send_flow.claim_capacity(reclaim); - total_reclaimed += reclaim; + let reclaimed = if available > window_size { + // Drop down to `window_size`. + let reclaim = available - window_size; + stream.send_flow.claim_capacity(reclaim); + total_reclaimed += reclaim; + reclaim + } else { + 0 + }; trace!( - "decremented stream window; id={:?}; decr={}; flow={:?}", + "decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}", stream.id, dec, + reclaimed, stream.send_flow ); diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 1cd8038..5943781 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -844,6 +844,63 @@ fn recv_settings_removes_available_capacity() { .wait().unwrap(); } +#[test] +fn recv_settings_keeps_assigned_capacity() { + let _ = ::env_logger::try_init(); + let (io, srv) = mock::new(); + + let (sent_settings, sent_settings_rx) = futures::sync::oneshot::channel(); + + let srv = srv.assert_client_handshake().unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("POST", "https://http2.akamai.com/") + ) + .send_frame(frames::settings().initial_window_size(64)) + .recv_frame(frames::settings_ack()) + .then_notify(sent_settings) + .recv_frame(frames::data(1, "hello world").eos()) + .send_frame( + frames::headers(1) + .response(204) + .eos() + ) + .close(); + + + let h2 = client::handshake(io).unwrap() + .and_then(move |(mut client, h2)| { + let request = Request::builder() + .method(Method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + let (response, mut stream) = client.send_request(request, false).unwrap(); + + stream.reserve_capacity(11); + + h2.expect("h2") + .join( + util::wait_for_capacity(stream, 11) + .and_then(|mut stream| { + sent_settings_rx.expect("rx") + .and_then(move |()| { + stream.send_data("hello world".into(), true).unwrap(); + response.expect("response") + }) + .and_then(move |resp| { + assert_eq!(resp.status(), StatusCode::NO_CONTENT); + Ok(client) + }) + }) + ) + }); + + let _ = h2.join(srv) + .wait().unwrap(); +} + #[test] fn recv_no_init_window_then_receive_some_init_window() { let _ = ::env_logger::try_init();