From 4c1d797712b7b1d4008f55624864fb52607c986b Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 7 Oct 2019 15:29:23 -0700 Subject: [PATCH] Add ability to adjust INITIAL_WINDOW_SIZE setting on an existing connection (#421) --- src/client.rs | 19 +++ src/codec/error.rs | 4 + src/proto/connection.rs | 19 ++- src/proto/settings.rs | 117 +++++++++++++--- src/proto/streams/flow_control.rs | 20 ++- src/proto/streams/recv.rs | 61 +++++++++ src/proto/streams/send.rs | 2 +- src/proto/streams/streams.rs | 7 + src/server.rs | 19 +++ tests/h2-tests/tests/flow_control.rs | 198 +++++++++++++++++++++++++++ 10 files changed, 439 insertions(+), 27 deletions(-) diff --git a/src/client.rs b/src/client.rs index d6b3667..593f02f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1201,6 +1201,25 @@ where self.inner.set_target_window_size(size); } + /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level + /// flow control for received data. + /// + /// The `SETTINGS` will be sent to the remote, and only applied once the + /// remote acknowledges the change. + /// + /// This can be used to increase or decrease the window size for existing + /// streams. + /// + /// # Errors + /// + /// Returns an error if a previous call is still pending acknowledgement + /// from the remote endpoint. + pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> { + assert!(size <= proto::MAX_WINDOW_SIZE); + self.inner.set_initial_window_size(size)?; + Ok(()) + } + /// Takes a `PingPong` instance from the connection. /// /// # Note diff --git a/src/codec/error.rs b/src/codec/error.rs index 5155e0d..c6d4e95 100644 --- a/src/codec/error.rs +++ b/src/codec/error.rs @@ -60,6 +60,9 @@ pub enum UserError { /// Calls `PingPong::send_ping` before receiving a pong. SendPingWhilePending, + + /// Tries to update local SETTINGS while ACK has not been received. + SendSettingsWhilePending, } // ===== impl RecvError ===== @@ -140,6 +143,7 @@ impl error::Error for UserError { MissingUriSchemeAndAuthority => "request URI missing scheme and authority", PollResetAfterSendResponse => "poll_reset after send_response is illegal", SendPingWhilePending => "send_ping before received previous pong", + SendSettingsWhilePending => "sending SETTINGS before received previous ACK", } } } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 434b4ad..d0cd8df 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,4 +1,4 @@ -use crate::codec::RecvError; +use crate::codec::{RecvError, UserError}; use crate::frame::{Reason, StreamId}; use crate::{client, frame, proto, server}; @@ -99,16 +99,24 @@ where codec, go_away: GoAway::new(), ping_pong: PingPong::new(), - settings: Settings::new(), + settings: Settings::new(config.settings), streams, _phantom: PhantomData, } } - pub fn set_target_window_size(&mut self, size: WindowSize) { + /// connection flow control + pub(crate) fn set_target_window_size(&mut self, size: WindowSize) { self.streams.set_target_connection_window_size(size); } + /// Send a new SETTINGS frame with an updated initial window size. + pub(crate) fn set_initial_window_size(&mut self, size: WindowSize) -> Result<(), UserError> { + let mut settings = frame::Settings::default(); + settings.set_initial_window_size(Some(size)); + self.settings.send_settings(settings) + } + /// Returns `Ready` when the connection is ready to receive a frame. /// /// Returns `RecvError` as this may raise errors that are caused by delayed @@ -119,7 +127,7 @@ where ready!(self.ping_pong.send_pending_ping(cx, &mut self.codec))?; ready!(self .settings - .send_pending_ack(cx, &mut self.codec, &mut self.streams))?; + .poll_send(cx, &mut self.codec, &mut self.streams))?; ready!(self.streams.send_pending_refusal(cx, &mut self.codec))?; Poll::Ready(Ok(())) @@ -327,7 +335,8 @@ where } Some(Settings(frame)) => { log::trace!("recv SETTINGS; frame={:?}", frame); - self.settings.recv_settings(frame); + self.settings + .recv_settings(frame, &mut self.codec, &mut self.streams)?; } Some(GoAway(frame)) => { log::trace!("recv GOAWAY; frame={:?}", frame); diff --git a/src/proto/settings.rs b/src/proto/settings.rs index b37386d..46bb3f6 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,32 +1,98 @@ -use crate::codec::RecvError; +use crate::codec::{RecvError, UserError}; +use crate::error::Reason; use crate::frame; use crate::proto::*; use std::task::{Context, Poll}; #[derive(Debug)] pub(crate) struct Settings { + /// Our local SETTINGS sync state with the remote. + local: Local, /// Received SETTINGS frame pending processing. The ACK must be written to /// the socket first then the settings applied **before** receiving any /// further frames. - pending: Option, + remote: Option, +} + +#[derive(Debug)] +enum Local { + /// We want to send these SETTINGS to the remote when the socket is ready. + ToSend(frame::Settings), + /// We have sent these SETTINGS and are waiting for the remote to ACK + /// before we apply them. + WaitingAck(frame::Settings), + /// Our local settings are in sync with the remote. + Synced, } impl Settings { - pub fn new() -> Self { - Settings { pending: None } - } - - pub fn recv_settings(&mut self, frame: frame::Settings) { - if frame.is_ack() { - log::debug!("received remote settings ack"); - // TODO: handle acks - } else { - assert!(self.pending.is_none()); - self.pending = Some(frame); + pub(crate) fn new(local: frame::Settings) -> Self { + Settings { + // We assume the initial local SETTINGS were flushed during + // the handshake process. + local: Local::WaitingAck(local), + remote: None, } } - pub fn send_pending_ack( + pub(crate) fn recv_settings( + &mut self, + frame: frame::Settings, + codec: &mut Codec, + streams: &mut Streams, + ) -> Result<(), RecvError> + where + T: AsyncWrite + Unpin, + B: Buf + Unpin, + C: Buf + Unpin, + P: Peer, + { + if frame.is_ack() { + match &self.local { + Local::WaitingAck(local) => { + log::debug!("received settings ACK; applying {:?}", local); + + if let Some(max) = local.max_frame_size() { + codec.set_max_recv_frame_size(max as usize); + } + + if let Some(max) = local.max_header_list_size() { + codec.set_max_recv_header_list_size(max as usize); + } + + streams.apply_local_settings(local)?; + self.local = Local::Synced; + Ok(()) + } + Local::ToSend(..) | Local::Synced => { + // We haven't sent any SETTINGS frames to be ACKed, so + // this is very bizarre! Remote is either buggy or malicious. + proto_err!(conn: "received unexpected settings ack"); + Err(RecvError::Connection(Reason::PROTOCOL_ERROR)) + } + } + } else { + // We always ACK before reading more frames, so `remote` should + // always be none! + assert!(self.remote.is_none()); + self.remote = Some(frame); + Ok(()) + } + } + + pub(crate) fn send_settings(&mut self, frame: frame::Settings) -> Result<(), UserError> { + assert!(!frame.is_ack()); + match &self.local { + Local::ToSend(..) | Local::WaitingAck(..) => Err(UserError::SendSettingsWhilePending), + Local::Synced => { + log::trace!("queue to send local settings: {:?}", frame); + self.local = Local::ToSend(frame); + Ok(()) + } + } + } + + pub(crate) fn poll_send( &mut self, cx: &mut Context, dst: &mut Codec, @@ -38,11 +104,8 @@ impl Settings { C: Buf + Unpin, P: Peer, { - log::trace!("send_pending_ack; pending={:?}", self.pending); - - if let Some(settings) = &self.pending { + if let Some(settings) = &self.remote { if !dst.poll_ready(cx)?.is_ready() { - log::trace!("failed to send ACK"); return Poll::Pending; } @@ -61,7 +124,23 @@ impl Settings { streams.apply_remote_settings(settings)?; } - self.pending = None; + self.remote = None; + + match &self.local { + Local::ToSend(settings) => { + if !dst.poll_ready(cx)?.is_ready() { + return Poll::Pending; + } + + // Buffer the settings frame + dst.buffer(settings.clone().into()) + .expect("invalid settings frame"); + log::trace!("local settings sent; waiting for ack: {:?}", settings); + + self.local = Local::WaitingAck(settings.clone()); + } + Local::WaitingAck(..) | Local::Synced => {} + } Poll::Ready(Ok(())) } diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index b0fd104..6bb1966 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -131,11 +131,11 @@ impl FlowControl { Ok(()) } - /// Decrement the window size. + /// Decrement the send-side window size. /// /// This is called after receiving a SETTINGS frame with a lower /// INITIAL_WINDOW_SIZE value. - pub fn dec_window(&mut self, sz: WindowSize) { + pub fn dec_send_window(&mut self, sz: WindowSize) { log::trace!( "dec_window; sz={}; window={}, available={}", sz, @@ -146,6 +146,22 @@ impl FlowControl { self.window_size -= sz; } + /// Decrement the recv-side window size. + /// + /// This is called after receiving a SETTINGS ACK frame with a lower + /// INITIAL_WINDOW_SIZE value. + pub fn dec_recv_window(&mut self, sz: WindowSize) { + log::trace!( + "dec_recv_window; sz={}; window={}, available={}", + sz, + self.window_size, + self.available + ); + // This should not be able to overflow `window_size` from the bottom. + self.window_size -= sz; + self.available -= sz; + } + /// Decrements the window reflecting data has actually been sent. The caller /// must ensure that the window has capacity. pub fn send_data(&mut self, sz: WindowSize) { diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 4295aaa..ac2120c 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -455,6 +455,67 @@ impl Recv { } } + pub(crate) fn apply_local_settings( + &mut self, + settings: &frame::Settings, + store: &mut Store, + ) -> Result<(), RecvError> { + let target = if let Some(val) = settings.initial_window_size() { + val + } else { + return Ok(()); + }; + + let old_sz = self.init_window_sz; + self.init_window_sz = target; + + log::trace!("update_initial_window_size; new={}; old={}", target, old_sz,); + + // Per RFC 7540 ยง6.9.2: + // + // In addition to changing the flow-control window for streams that are + // not yet active, a SETTINGS frame can alter the initial flow-control + // window size for streams with active flow-control windows (that is, + // streams in the "open" or "half-closed (remote)" state). When the + // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust + // the size of all stream flow-control windows that it maintains by the + // difference between the new value and the old value. + // + // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available + // space in a flow-control window to become negative. A sender MUST + // track the negative flow-control window and MUST NOT send new + // flow-controlled frames until it receives WINDOW_UPDATE frames that + // cause the flow-control window to become positive. + + if target < old_sz { + // We must decrease the (local) window on every open stream. + let dec = old_sz - target; + log::trace!("decrementing all windows; dec={}", dec); + + store.for_each(|mut stream| { + stream.recv_flow.dec_recv_window(dec); + Ok(()) + }) + } else if target > old_sz { + // We must increase the (local) window on every open stream. + let inc = target - old_sz; + log::trace!("incrementing all windows; inc={}", inc); + store.for_each(|mut stream| { + // XXX: Shouldn't the peer have already noticed our + // overflow and sent us a GOAWAY? + stream + .recv_flow + .inc_window(inc) + .map_err(RecvError::Connection)?; + stream.recv_flow.assign_capacity(inc); + Ok(()) + }) + } else { + // size is the same... so do nothing + Ok(()) + } + } + pub fn is_end_stream(&self, stream: &store::Ptr) -> bool { if !stream.state.is_recv_closed() { return false; diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 0e114ae..d7e34d0 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -419,7 +419,7 @@ impl Send { store.for_each(|mut stream| { let stream = &mut *stream; - stream.send_flow.dec_window(dec); + stream.send_flow.dec_send_window(dec); // It's possible that decreasing the window causes // `window_size` (the stream-specific window) to fall below diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index da12daf..b1d49fb 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -664,6 +664,13 @@ where ) } + pub fn apply_local_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions.recv.apply_local_settings(frame, &mut me.store) + } + pub fn send_request( &mut self, request: Request<()>, diff --git a/src/server.rs b/src/server.rs index a71ad21..0e2a3a6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -441,6 +441,25 @@ where self.connection.set_target_window_size(size); } + /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level + /// flow control for received data. + /// + /// The `SETTINGS` will be sent to the remote, and only applied once the + /// remote acknowledges the change. + /// + /// This can be used to increase or decrease the window size for existing + /// streams. + /// + /// # Errors + /// + /// Returns an error if a previous call is still pending acknowledgement + /// from the remote endpoint. + pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> { + assert!(size <= proto::MAX_WINDOW_SIZE); + self.connection.set_initial_window_size(size)?; + Ok(()) + } + /// Returns `Ready` when the underlying connection has closed. /// /// If any new inbound streams are received during a call to `poll_closed`, diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 170263a..1695edc 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1167,6 +1167,204 @@ async fn decrease_target_window_size() { join(srv, client).await; } +#[tokio::test] +async fn client_update_initial_window_size() { + let _ = env_logger::try_init(); + let (io, mut srv) = mock::new(); + + let window_size = frame::DEFAULT_INITIAL_WINDOW_SIZE * 2; + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + srv.recv_frame(frames::window_update(0, window_size - 65_535)) + .await; + srv.recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200)).await; + srv.send_frame(frames::data(1, vec![b'a'; 16_384])).await; + srv.send_frame(frames::data(1, vec![b'b'; 16_384])).await; + srv.send_frame(frames::data(1, vec![b'c'; 16_384])).await; + srv.recv_frame(frames::settings().initial_window_size(window_size)) + .await; + srv.send_frame(frames::settings_ack()).await; + // we never got a WINDOW_UPDATE, but initial update allows more + srv.send_frame(frames::data(1, vec![b'd'; 16_384])).await; + srv.send_frame(frames::data(1, vec![b'e'; 16_384]).eos()) + .await; + }; + + let client = async move { + let (mut client, mut conn) = client::handshake(io).await.unwrap(); + conn.set_target_window_size(window_size); + + // We'll never release_capacity back... + async fn data(body: &mut h2::RecvStream, expect: &str) { + let buf = body.data().await.expect(expect).expect(expect); + assert_eq!(buf.len(), 16_384, "{}", expect); + } + + let res_fut = client.get("https://http2.akamai.com/"); + + // Receive most of the stream's window... + let body = conn + .drive(async move { + let resp = res_fut.await.expect("response"); + let mut body = resp.into_body(); + + data(&mut body, "data1").await; + data(&mut body, "data2").await; + data(&mut body, "data3").await; + + body + }) + .await; + + // Update the initial window size to double + conn.set_initial_window_size(window_size).expect("update"); + + // And then ensure we got the data normally "over" the smaller + // initial_window_size... + let f = async move { + let mut body = body; + data(&mut body, "data4").await; + data(&mut body, "data5").await; + assert!(body.data().await.is_none(), "eos"); + }; + + join(async move { conn.await.expect("client") }, f).await; + }; + + join(srv, client).await; +} + +#[tokio::test] +async fn client_decrease_initial_window_size() { + let _ = env_logger::try_init(); + let (io, mut srv) = mock::new(); + + let srv = async move { + let settings = srv.assert_client_handshake().await; + assert_default_settings!(settings); + + srv.recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(1).response(200)).await; + srv.send_frame(frames::data(1, vec![b'a'; 100])).await; + + srv.recv_frame( + frames::headers(3) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(3).response(200)).await; + srv.send_frame(frames::data(3, vec![b'a'; 100])).await; + + srv.recv_frame( + frames::headers(5) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .await; + srv.send_frame(frames::headers(5).response(200)).await; + srv.send_frame(frames::data(5, vec![b'a'; 100])).await; + + srv.recv_frame(frames::settings().initial_window_size(0)) + .await; + // check settings haven't applied before ACK + srv.send_frame(frames::data(1, vec![b'a'; 100]).eos()).await; + srv.send_frame(frames::settings_ack()).await; + + // check stream 3 has no window + srv.send_frame(frames::data(3, vec![b'a'; 1])).await; + srv.recv_frame(frames::reset(3).flow_control()).await; + + // check stream 5 can release capacity + srv.recv_frame(frames::window_update(5, 100)).await; + + srv.recv_frame(frames::settings().initial_window_size(16_384)) + .await; + srv.send_frame(frames::settings_ack()).await; + + srv.send_frame(frames::data(5, vec![b'a'; 100])).await; + srv.send_frame(frames::data(5, vec![b'a'; 100]).eos()).await; + }; + + let client = async move { + let (mut client, mut conn) = client::handshake(io).await.unwrap(); + + async fn req(client: &mut client::SendRequest) -> h2::RecvStream { + let res_fut = client.get("https://http2.akamai.com/"); + + // Use some of the recv window + let resp = res_fut.await.expect("response"); + let mut body = resp.into_body(); + + data(&mut body, "data1").await; + + body + } + + async fn data(body: &mut h2::RecvStream, expect: &str) { + let buf = body.data().await.expect(expect).expect(expect); + assert_eq!(buf.len(), 100, "{}", expect); + } + + let mut body1 = conn.drive(req(&mut client)).await; + let mut body3 = conn.drive(req(&mut client)).await; + let mut body5 = conn.drive(req(&mut client)).await; + + // Remove *all* window size of streams + conn.set_initial_window_size(0).expect("update0"); + conn.drive(yield_once()).await; + + // stream 1 received before settings ACK + conn.drive(async { + data(&mut body1, "body1 data2").await; + assert!(body1.is_end_stream()); + }) + .await; + + // stream 3 received after ACK, which is stream error + conn.drive(async { + body3.data().await.expect("body3").expect_err("data2"); + }) + .await; + + // stream 5 went negative, so release back to 0 + body5 + .release_capacity() + .release_capacity(100) + .expect("release_capacity"); + conn.drive(yield_once()).await; + + // open up again + conn.set_initial_window_size(16_384).expect("update16"); + conn.drive(yield_once()).await; + + // get stream 5 data after opening up + conn.drive(async { + data(&mut body5, "body5 data2").await; + data(&mut body5, "body5 data3").await; + assert!(body3.is_end_stream()); + }) + .await; + + conn.await.expect("client") + }; + + join(srv, client).await; +} + #[tokio::test] async fn server_target_window_size() { let _ = env_logger::try_init();