Add ability to adjust INITIAL_WINDOW_SIZE setting on an existing connection (#421)

This commit is contained in:
Sean McArthur
2019-10-07 15:29:23 -07:00
committed by GitHub
parent 367206bfa1
commit 4c1d797712
10 changed files with 439 additions and 27 deletions

View File

@@ -1201,6 +1201,25 @@ where
self.inner.set_target_window_size(size); self.inner.set_target_window_size(size);
} }
/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
/// flow control for received data.
///
/// The `SETTINGS` will be sent to the remote, and only applied once the
/// remote acknowledges the change.
///
/// This can be used to increase or decrease the window size for existing
/// streams.
///
/// # Errors
///
/// Returns an error if a previous call is still pending acknowledgement
/// from the remote endpoint.
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
assert!(size <= proto::MAX_WINDOW_SIZE);
self.inner.set_initial_window_size(size)?;
Ok(())
}
/// Takes a `PingPong` instance from the connection. /// Takes a `PingPong` instance from the connection.
/// ///
/// # Note /// # Note

View File

@@ -60,6 +60,9 @@ pub enum UserError {
/// Calls `PingPong::send_ping` before receiving a pong. /// Calls `PingPong::send_ping` before receiving a pong.
SendPingWhilePending, SendPingWhilePending,
/// Tries to update local SETTINGS while ACK has not been received.
SendSettingsWhilePending,
} }
// ===== impl RecvError ===== // ===== impl RecvError =====
@@ -140,6 +143,7 @@ impl error::Error for UserError {
MissingUriSchemeAndAuthority => "request URI missing scheme and authority", MissingUriSchemeAndAuthority => "request URI missing scheme and authority",
PollResetAfterSendResponse => "poll_reset after send_response is illegal", PollResetAfterSendResponse => "poll_reset after send_response is illegal",
SendPingWhilePending => "send_ping before received previous pong", SendPingWhilePending => "send_ping before received previous pong",
SendSettingsWhilePending => "sending SETTINGS before received previous ACK",
} }
} }
} }

View File

@@ -1,4 +1,4 @@
use crate::codec::RecvError; use crate::codec::{RecvError, UserError};
use crate::frame::{Reason, StreamId}; use crate::frame::{Reason, StreamId};
use crate::{client, frame, proto, server}; use crate::{client, frame, proto, server};
@@ -99,16 +99,24 @@ where
codec, codec,
go_away: GoAway::new(), go_away: GoAway::new(),
ping_pong: PingPong::new(), ping_pong: PingPong::new(),
settings: Settings::new(), settings: Settings::new(config.settings),
streams, streams,
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
pub fn set_target_window_size(&mut self, size: WindowSize) { /// connection flow control
pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
self.streams.set_target_connection_window_size(size); self.streams.set_target_connection_window_size(size);
} }
/// Send a new SETTINGS frame with an updated initial window size.
pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> {
let mut settings = frame::Settings::default();
settings.set_initial_window_size(Some(size));
self.settings.send_settings(settings)
}
/// Returns `Ready` when the connection is ready to receive a frame. /// Returns `Ready` when the connection is ready to receive a frame.
/// ///
/// Returns `RecvError` as this may raise errors that are caused by delayed /// Returns `RecvError` as this may raise errors that are caused by delayed
@@ -119,7 +127,7 @@ where
ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?; ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?;
ready!(self ready!(self
.settings .settings
.send_pending_ack(cx, &mut self.codec, &mut self.streams))?; .poll_send(cx, &mut self.codec, &mut self.streams))?;
ready!(self.streams.send_pending_refusal(cx, &mut self.codec))?; ready!(self.streams.send_pending_refusal(cx, &mut self.codec))?;
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
@@ -327,7 +335,8 @@ where
} }
Some(Settings(frame)) => { Some(Settings(frame)) => {
log::trace!("recv SETTINGS; frame={:?}", frame); log::trace!("recv SETTINGS; frame={:?}", frame);
self.settings.recv_settings(frame); self.settings
.recv_settings(frame, &mut self.codec, &mut self.streams)?;
} }
Some(GoAway(frame)) => { Some(GoAway(frame)) => {
log::trace!("recv GOAWAY; frame={:?}", frame); log::trace!("recv GOAWAY; frame={:?}", frame);

View File

@@ -1,32 +1,98 @@
use crate::codec::RecvError; use crate::codec::{RecvError, UserError};
use crate::error::Reason;
use crate::frame; use crate::frame;
use crate::proto::*; use crate::proto::*;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Settings { pub(crate) struct Settings {
/// Our local SETTINGS sync state with the remote.
local: Local,
/// Received SETTINGS frame pending processing. The ACK must be written to /// Received SETTINGS frame pending processing. The ACK must be written to
/// the socket first then the settings applied **before** receiving any /// the socket first then the settings applied **before** receiving any
/// further frames. /// further frames.
pending: Option<frame::Settings>, remote: Option<frame::Settings>,
}
#[derive(Debug)]
enum Local {
/// We want to send these SETTINGS to the remote when the socket is ready.
ToSend(frame::Settings),
/// We have sent these SETTINGS and are waiting for the remote to ACK
/// before we apply them.
WaitingAck(frame::Settings),
/// Our local settings are in sync with the remote.
Synced,
} }
impl Settings { impl Settings {
pub fn new() -> Self { pub(crate) fn new(local: frame::Settings) -> Self {
Settings { pending: None } Settings {
} // We assume the initial local SETTINGS were flushed during
// the handshake process.
pub fn recv_settings(&mut self, frame: frame::Settings) { local: Local::WaitingAck(local),
if frame.is_ack() { remote: None,
log::debug!("received remote settings ack");
// TODO: handle acks
} else {
assert!(self.pending.is_none());
self.pending = Some(frame);
} }
} }
pub fn send_pending_ack<T, B, C, P>( pub(crate) fn recv_settings<T, B, C, P>(
&mut self,
frame: frame::Settings,
codec: &mut Codec<T, B>,
streams: &mut Streams<C, P>,
) -> Result<(), RecvError>
where
T: AsyncWrite + Unpin,
B: Buf + Unpin,
C: Buf + Unpin,
P: Peer,
{
if frame.is_ack() {
match &self.local {
Local::WaitingAck(local) => {
log::debug!("received settings ACK; applying {:?}", local);
if let Some(max) = local.max_frame_size() {
codec.set_max_recv_frame_size(max as usize);
}
if let Some(max) = local.max_header_list_size() {
codec.set_max_recv_header_list_size(max as usize);
}
streams.apply_local_settings(local)?;
self.local = Local::Synced;
Ok(())
}
Local::ToSend(..) | Local::Synced => {
// We haven't sent any SETTINGS frames to be ACKed, so
// this is very bizarre! Remote is either buggy or malicious.
proto_err!(conn: "received unexpected settings ack");
Err(RecvError::Connection(Reason::PROTOCOL_ERROR))
}
}
} else {
// We always ACK before reading more frames, so `remote` should
// always be none!
assert!(self.remote.is_none());
self.remote = Some(frame);
Ok(())
}
}
pub(crate) fn send_settings(&mut self, frame: frame::Settings) -> Result<(), UserError> {
assert!(!frame.is_ack());
match &self.local {
Local::ToSend(..) | Local::WaitingAck(..) => Err(UserError::SendSettingsWhilePending),
Local::Synced => {
log::trace!("queue to send local settings: {:?}", frame);
self.local = Local::ToSend(frame);
Ok(())
}
}
}
pub(crate) fn poll_send<T, B, C, P>(
&mut self, &mut self,
cx: &mut Context, cx: &mut Context,
dst: &mut Codec<T, B>, dst: &mut Codec<T, B>,
@@ -38,11 +104,8 @@ impl Settings {
C: Buf + Unpin, C: Buf + Unpin,
P: Peer, P: Peer,
{ {
log::trace!("send_pending_ack; pending={:?}", self.pending); if let Some(settings) = &self.remote {
if let Some(settings) = &self.pending {
if !dst.poll_ready(cx)?.is_ready() { if !dst.poll_ready(cx)?.is_ready() {
log::trace!("failed to send ACK");
return Poll::Pending; return Poll::Pending;
} }
@@ -61,7 +124,23 @@ impl Settings {
streams.apply_remote_settings(settings)?; streams.apply_remote_settings(settings)?;
} }
self.pending = None; self.remote = None;
match &self.local {
Local::ToSend(settings) => {
if !dst.poll_ready(cx)?.is_ready() {
return Poll::Pending;
}
// Buffer the settings frame
dst.buffer(settings.clone().into())
.expect("invalid settings frame");
log::trace!("local settings sent; waiting for ack: {:?}", settings);
self.local = Local::WaitingAck(settings.clone());
}
Local::WaitingAck(..) | Local::Synced => {}
}
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }

View File

@@ -131,11 +131,11 @@ impl FlowControl {
Ok(()) Ok(())
} }
/// Decrement the window size. /// Decrement the send-side window size.
/// ///
/// This is called after receiving a SETTINGS frame with a lower /// This is called after receiving a SETTINGS frame with a lower
/// INITIAL_WINDOW_SIZE value. /// INITIAL_WINDOW_SIZE value.
pub fn dec_window(&mut self, sz: WindowSize) { pub fn dec_send_window(&mut self, sz: WindowSize) {
log::trace!( log::trace!(
"dec_window; sz={}; window={}, available={}", "dec_window; sz={}; window={}, available={}",
sz, sz,
@@ -146,6 +146,22 @@ impl FlowControl {
self.window_size -= sz; self.window_size -= sz;
} }
/// Decrement the recv-side window size.
///
/// This is called after receiving a SETTINGS ACK frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_recv_window(&mut self, sz: WindowSize) {
log::trace!(
"dec_recv_window; sz={}; window={}, available={}",
sz,
self.window_size,
self.available
);
// This should not be able to overflow `window_size` from the bottom.
self.window_size -= sz;
self.available -= sz;
}
/// Decrements the window reflecting data has actually been sent. The caller /// Decrements the window reflecting data has actually been sent. The caller
/// must ensure that the window has capacity. /// must ensure that the window has capacity.
pub fn send_data(&mut self, sz: WindowSize) { pub fn send_data(&mut self, sz: WindowSize) {

View File

@@ -455,6 +455,67 @@ impl Recv {
} }
} }
pub(crate) fn apply_local_settings(
&mut self,
settings: &frame::Settings,
store: &mut Store,
) -> Result<(), RecvError> {
let target = if let Some(val) = settings.initial_window_size() {
val
} else {
return Ok(());
};
let old_sz = self.init_window_sz;
self.init_window_sz = target;
log::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
// Per RFC 7540 §6.9.2:
//
// In addition to changing the flow-control window for streams that are
// not yet active, a SETTINGS frame can alter the initial flow-control
// window size for streams with active flow-control windows (that is,
// streams in the "open" or "half-closed (remote)" state). When the
// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
// the size of all stream flow-control windows that it maintains by the
// difference between the new value and the old value.
//
// A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
// space in a flow-control window to become negative. A sender MUST
// track the negative flow-control window and MUST NOT send new
// flow-controlled frames until it receives WINDOW_UPDATE frames that
// cause the flow-control window to become positive.
if target < old_sz {
// We must decrease the (local) window on every open stream.
let dec = old_sz - target;
log::trace!("decrementing all windows; dec={}", dec);
store.for_each(|mut stream| {
stream.recv_flow.dec_recv_window(dec);
Ok(())
})
} else if target > old_sz {
// We must increase the (local) window on every open stream.
let inc = target - old_sz;
log::trace!("incrementing all windows; inc={}", inc);
store.for_each(|mut stream| {
// XXX: Shouldn't the peer have already noticed our
// overflow and sent us a GOAWAY?
stream
.recv_flow
.inc_window(inc)
.map_err(RecvError::Connection)?;
stream.recv_flow.assign_capacity(inc);
Ok(())
})
} else {
// size is the same... so do nothing
Ok(())
}
}
pub fn is_end_stream(&self, stream: &store::Ptr) -> bool { pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {
if !stream.state.is_recv_closed() { if !stream.state.is_recv_closed() {
return false; return false;

View File

@@ -419,7 +419,7 @@ impl Send {
store.for_each(|mut stream| { store.for_each(|mut stream| {
let stream = &mut *stream; let stream = &mut *stream;
stream.send_flow.dec_window(dec); stream.send_flow.dec_send_window(dec);
// It's possible that decreasing the window causes // It's possible that decreasing the window causes
// `window_size` (the stream-specific window) to fall below // `window_size` (the stream-specific window) to fall below

View File

@@ -664,6 +664,13 @@ where
) )
} }
pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
me.actions.recv.apply_local_settings(frame, &mut me.store)
}
pub fn send_request( pub fn send_request(
&mut self, &mut self,
request: Request<()>, request: Request<()>,

View File

@@ -441,6 +441,25 @@ where
self.connection.set_target_window_size(size); self.connection.set_target_window_size(size);
} }
/// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
/// flow control for received data.
///
/// The `SETTINGS` will be sent to the remote, and only applied once the
/// remote acknowledges the change.
///
/// This can be used to increase or decrease the window size for existing
/// streams.
///
/// # Errors
///
/// Returns an error if a previous call is still pending acknowledgement
/// from the remote endpoint.
pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
assert!(size <= proto::MAX_WINDOW_SIZE);
self.connection.set_initial_window_size(size)?;
Ok(())
}
/// Returns `Ready` when the underlying connection has closed. /// Returns `Ready` when the underlying connection has closed.
/// ///
/// If any new inbound streams are received during a call to `poll_closed`, /// If any new inbound streams are received during a call to `poll_closed`,

View File

@@ -1167,6 +1167,204 @@ async fn decrease_target_window_size() {
join(srv, client).await; join(srv, client).await;
} }
#[tokio::test]
async fn client_update_initial_window_size() {
let _ = env_logger::try_init();
let (io, mut srv) = mock::new();
let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE * 2;
let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(frames::window_update(0, window_size - 65_535))
.await;
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.await;
srv.send_frame(frames::headers(1).response(200)).await;
srv.send_frame(frames::data(1, vec![b'a'; 16_384])).await;
srv.send_frame(frames::data(1, vec![b'b'; 16_384])).await;
srv.send_frame(frames::data(1, vec![b'c'; 16_384])).await;
srv.recv_frame(frames::settings().initial_window_size(window_size))
.await;
srv.send_frame(frames::settings_ack()).await;
// we never got a WINDOW_UPDATE, but initial update allows more
srv.send_frame(frames::data(1, vec![b'd'; 16_384])).await;
srv.send_frame(frames::data(1, vec![b'e'; 16_384]).eos())
.await;
};
let client = async move {
let (mut client, mut conn) = client::handshake(io).await.unwrap();
conn.set_target_window_size(window_size);
// We'll never release_capacity back...
async fn data(body: &mut h2::RecvStream, expect: &str) {
let buf = body.data().await.expect(expect).expect(expect);
assert_eq!(buf.len(), 16_384, "{}", expect);
}
let res_fut = client.get("https://http2.akamai.com/");
// Receive most of the stream's window...
let body = conn
.drive(async move {
let resp = res_fut.await.expect("response");
let mut body = resp.into_body();
data(&mut body, "data1").await;
data(&mut body, "data2").await;
data(&mut body, "data3").await;
body
})
.await;
// Update the initial window size to double
conn.set_initial_window_size(window_size).expect("update");
// And then ensure we got the data normally "over" the smaller
// initial_window_size...
let f = async move {
let mut body = body;
data(&mut body, "data4").await;
data(&mut body, "data5").await;
assert!(body.data().await.is_none(), "eos");
};
join(async move { conn.await.expect("client") }, f).await;
};
join(srv, client).await;
}
#[tokio::test]
async fn client_decrease_initial_window_size() {
let _ = env_logger::try_init();
let (io, mut srv) = mock::new();
let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.await;
srv.send_frame(frames::headers(1).response(200)).await;
srv.send_frame(frames::data(1, vec![b'a'; 100])).await;
srv.recv_frame(
frames::headers(3)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.await;
srv.send_frame(frames::headers(3).response(200)).await;
srv.send_frame(frames::data(3, vec![b'a'; 100])).await;
srv.recv_frame(
frames::headers(5)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.await;
srv.send_frame(frames::headers(5).response(200)).await;
srv.send_frame(frames::data(5, vec![b'a'; 100])).await;
srv.recv_frame(frames::settings().initial_window_size(0))
.await;
// check settings haven't applied before ACK
srv.send_frame(frames::data(1, vec![b'a'; 100]).eos()).await;
srv.send_frame(frames::settings_ack()).await;
// check stream 3 has no window
srv.send_frame(frames::data(3, vec![b'a'; 1])).await;
srv.recv_frame(frames::reset(3).flow_control()).await;
// check stream 5 can release capacity
srv.recv_frame(frames::window_update(5, 100)).await;
srv.recv_frame(frames::settings().initial_window_size(16_384))
.await;
srv.send_frame(frames::settings_ack()).await;
srv.send_frame(frames::data(5, vec![b'a'; 100])).await;
srv.send_frame(frames::data(5, vec![b'a'; 100]).eos()).await;
};
let client = async move {
let (mut client, mut conn) = client::handshake(io).await.unwrap();
async fn req(client: &mut client::SendRequest<Bytes>) -> h2::RecvStream {
let res_fut = client.get("https://http2.akamai.com/");
// Use some of the recv window
let resp = res_fut.await.expect("response");
let mut body = resp.into_body();
data(&mut body, "data1").await;
body
}
async fn data(body: &mut h2::RecvStream, expect: &str) {
let buf = body.data().await.expect(expect).expect(expect);
assert_eq!(buf.len(), 100, "{}", expect);
}
let mut body1 = conn.drive(req(&mut client)).await;
let mut body3 = conn.drive(req(&mut client)).await;
let mut body5 = conn.drive(req(&mut client)).await;
// Remove *all* window size of streams
conn.set_initial_window_size(0).expect("update0");
conn.drive(yield_once()).await;
// stream 1 received before settings ACK
conn.drive(async {
data(&mut body1, "body1 data2").await;
assert!(body1.is_end_stream());
})
.await;
// stream 3 received after ACK, which is stream error
conn.drive(async {
body3.data().await.expect("body3").expect_err("data2");
})
.await;
// stream 5 went negative, so release back to 0
body5
.release_capacity()
.release_capacity(100)
.expect("release_capacity");
conn.drive(yield_once()).await;
// open up again
conn.set_initial_window_size(16_384).expect("update16");
conn.drive(yield_once()).await;
// get stream 5 data after opening up
conn.drive(async {
data(&mut body5, "body5 data2").await;
data(&mut body5, "body5 data3").await;
assert!(body3.is_end_stream());
})
.await;
conn.await.expect("client")
};
join(srv, client).await;
}
#[tokio::test] #[tokio::test]
async fn server_target_window_size() { async fn server_target_window_size() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();