Fix the handling of incoming SETTINGS_INITIAL_WINDOW_SIZE. (#299)

This commit is contained in:
Geoffry Song
2018-08-03 16:00:13 -07:00
committed by Sean McArthur
parent 78ab6167c4
commit d2aa9197f9
2 changed files with 77 additions and 6 deletions

View File

@@ -11,7 +11,7 @@ use futures::{Async, Poll};
use futures::task::Task;
use tokio_io::AsyncWrite;
use std::{cmp, io};
use std::io;
/// Manages state transitions related to outbound frames.
#[derive(Debug)]
@@ -370,8 +370,8 @@ impl Send {
self.init_window_sz = val;
if val < old_val {
// We must decrease the (remote) window on every open stream.
let dec = old_val - val;
trace!("decrementing all windows; dec={}", dec);
let mut total_reclaimed = 0;
@@ -380,15 +380,29 @@ impl Send {
stream.send_flow.dec_window(dec);
// It's possible that decreasing the window causes
// `window_size` (the stream-specific window) to fall below
// `available` (the portion of the connection-level window
// that we have allocated to the stream).
// In this case, we should take that excess allocation away
// and reassign it to other streams.
let window_size = stream.send_flow.window_size();
let available = stream.send_flow.available().as_size();
let reclaim = cmp::min(dec, available);
stream.send_flow.claim_capacity(reclaim);
total_reclaimed += reclaim;
let reclaimed = if available > window_size {
// Drop down to `window_size`.
let reclaim = available - window_size;
stream.send_flow.claim_capacity(reclaim);
total_reclaimed += reclaim;
reclaim
} else {
0
};
trace!(
"decremented stream window; id={:?}; decr={}; flow={:?}",
"decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
stream.id,
dec,
reclaimed,
stream.send_flow
);

View File

@@ -844,6 +844,63 @@ fn recv_settings_removes_available_capacity() {
.wait().unwrap();
}
#[test]
fn recv_settings_keeps_assigned_capacity() {
let _ = ::env_logger::try_init();
let (io, srv) = mock::new();
let (sent_settings, sent_settings_rx) = futures::sync::oneshot::channel();
let srv = srv.assert_client_handshake().unwrap()
.recv_settings()
.recv_frame(
frames::headers(1)
.request("POST", "https://http2.akamai.com/")
)
.send_frame(frames::settings().initial_window_size(64))
.recv_frame(frames::settings_ack())
.then_notify(sent_settings)
.recv_frame(frames::data(1, "hello world").eos())
.send_frame(
frames::headers(1)
.response(204)
.eos()
)
.close();
let h2 = client::handshake(io).unwrap()
.and_then(move |(mut client, h2)| {
let request = Request::builder()
.method(Method::POST)
.uri("https://http2.akamai.com/")
.body(()).unwrap();
let (response, mut stream) = client.send_request(request, false).unwrap();
stream.reserve_capacity(11);
h2.expect("h2")
.join(
util::wait_for_capacity(stream, 11)
.and_then(|mut stream| {
sent_settings_rx.expect("rx")
.and_then(move |()| {
stream.send_data("hello world".into(), true).unwrap();
response.expect("response")
})
.and_then(move |resp| {
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
Ok(client)
})
})
)
});
let _ = h2.join(srv)
.wait().unwrap();
}
#[test]
fn recv_no_init_window_then_receive_some_init_window() {
let _ = ::env_logger::try_init();