From 6a6c9665cd019c703acd7a604562cd2d67d786a5 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 24 Aug 2017 11:03:33 -0700 Subject: [PATCH] Immediately apply initial window size to streams The initial window size should be applied to streams once they leave the IDLE state. --- src/proto/streams/prioritize.rs | 7 +++++++ src/proto/streams/recv.rs | 21 +++++++++++++-------- src/proto/streams/send.rs | 13 +++++++------ src/proto/streams/state.rs | 10 ++++++++++ src/proto/streams/stream.rs | 18 +++++++++++++++--- src/proto/streams/streams.rs | 21 +++++++++++++++++---- 6 files changed, 69 insertions(+), 21 deletions(-) diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index e06f492..0377cec 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -162,6 +162,11 @@ impl Prioritize stream: &mut store::Ptr) -> Result<(), ConnectionError> { + // Ignore window updates when the stream is not active. + if !stream.state.could_send_data() { + return Ok(()); + } + // Update the stream level flow control. stream.send_flow.inc_window(inc)?; @@ -219,6 +224,8 @@ impl Prioritize return; } + // If the stream has requested capacity, then it must be in the + // streaming state. debug_assert!(stream.state.is_send_streaming()); // The amount of currently available capacity on the connection diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index a503ed5..1961790 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -76,6 +76,11 @@ impl Recv where B: Buf { } } + /// Returns the initial receive window size + pub fn init_window_sz(&self) -> WindowSize { + self.init_window_sz + } + /// Returns the ID of the last processed stream pub fn last_processed_id(&self) -> StreamId { self.last_processed_id @@ -85,7 +90,7 @@ impl Recv where B: Buf { /// /// Returns the stream state if successful. `None` if refused pub fn open(&mut self, id: StreamId) - -> Result>, ConnectionError> + -> Result, ConnectionError> { assert!(self.refused.is_none()); @@ -96,7 +101,7 @@ impl Recv where B: Buf { return Ok(None); } - Ok(Some(Stream::new(id))) + Ok(Some(id)) } pub fn take_request(&mut self, stream: &mut store::Ptr) @@ -141,11 +146,6 @@ impl Recv where B: Buf { trace!("opening stream; init_window={}", self.init_window_sz); let is_initial = stream.state.recv_open(frame.is_end_stream())?; - if stream.state.is_recv_streaming() { - stream.recv_flow.inc_window(self.init_window_sz)?; - stream.recv_flow.assign_capacity(self.init_window_sz); - } - if is_initial { if !self.can_inc_num_streams() { unimplemented!(); @@ -285,6 +285,7 @@ impl Recv where B: Buf { pub fn recv_push_promise(&mut self, frame: frame::PushPromise, + send: &Send, stream: store::Key, store: &mut Store) -> Result<(), ConnectionError> @@ -309,7 +310,11 @@ impl Recv where B: Buf { // TODO: All earlier stream IDs should be implicitly closed. // Now, create a new entry for the stream - let mut new_stream = Stream::new(frame.promised_id()); + let mut new_stream = Stream::new( + frame.promised_id(), + send.init_window_sz(), + self.init_window_sz); + new_stream.state.reserve_remote(); let mut ppp = store[stream].pending_push_promises.take(); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index c329770..aae8101 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -47,6 +47,11 @@ impl Send where B: Buf { } } + /// Returns the initial send window size + pub fn init_window_sz(&self) -> WindowSize { + self.init_window_sz + } + pub fn poll_open_ready(&mut self) -> Poll<(), ConnectionError> { try!(self.ensure_can_open::

()); @@ -64,7 +69,7 @@ impl Send where B: Buf { /// /// Returns the stream state if successful. `None` if refused pub fn open(&mut self) - -> Result, ConnectionError> + -> Result { try!(self.ensure_can_open::

()); @@ -74,7 +79,7 @@ impl Send where B: Buf { } } - let ret = Stream::new(self.next_stream_id); + let ret = self.next_stream_id; // Increment the number of locally initiated streams self.num_streams += 1; @@ -93,10 +98,6 @@ impl Send where B: Buf { // Update the state stream.state.send_open(frame.is_end_stream())?; - if stream.state.is_send_streaming() { - stream.send_flow.inc_window(self.init_window_sz)?; - } - // Queue the frame for sending self.prioritize.queue_frame(frame.into(), stream, task); diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index eb34c18..10944f7 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -266,6 +266,16 @@ impl State { } } + /// Returns true if the stream is in a state such that it could send data in + /// the future. + pub fn could_send_data(&self) -> bool { + match self.inner { + Open { .. } => true, + HalfClosedRemote(_) => true, + _ => false, + } + } + pub fn is_send_streaming(&self) -> bool { match self.inner { Open { local: Peer::Streaming, .. } => true, diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 4a10acc..dfb0bdb 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -83,8 +83,20 @@ pub(super) struct NextSendCapacity; pub(super) struct NextWindowUpdate; impl Stream { - pub fn new(id: StreamId) -> Stream + pub fn new(id: StreamId, + init_send_window: WindowSize, + init_recv_window: WindowSize) -> Stream { + let mut send_flow = FlowControl::new(); + let mut recv_flow = FlowControl::new(); + + recv_flow.inc_window(init_recv_window) + .ok().expect("invalid initial receive window"); + recv_flow.assign_capacity(init_recv_window); + + send_flow.inc_window(init_send_window) + .ok().expect("invalid initial send window size"); + Stream { id, state: State::default(), @@ -93,7 +105,7 @@ impl Stream { next_pending_send: None, is_pending_send: false, - send_flow: FlowControl::new(), + send_flow: send_flow, requested_send_capacity: 0, buffered_send_data: 0, send_task: None, @@ -106,7 +118,7 @@ impl Stream { next_pending_accept: None, is_pending_accept: false, - recv_flow: FlowControl::new(), + recv_flow: recv_flow, in_flight_recv_data: 0, next_window_update: None, is_pending_window_update: false, diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 0011f14..2606899 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -75,7 +75,14 @@ impl Streams } match try!(me.actions.recv.open::

(id)) { - Some(stream) => e.insert(stream), + Some(stream_id) => { + let stream = Stream::new( + stream_id, + me.actions.send.init_window_sz(), + me.actions.recv.init_window_sz()); + + e.insert(stream) + } None => return Ok(()), } } @@ -195,7 +202,8 @@ impl Streams None => return Err(ProtocolError.into()), }; - me.actions.recv.recv_push_promise::

(frame, stream, &mut me.store) + me.actions.recv.recv_push_promise::

( + frame, &me.actions.send, stream, &mut me.store) } pub fn next_incoming(&mut self) -> Option> { @@ -273,11 +281,16 @@ impl Streams let me = &mut *me; // Initialize a new stream. This fails if the connection is at capacity. - let mut stream = me.actions.send.open::()?; + let stream_id = me.actions.send.open::()?; + + let stream = Stream::new( + stream_id, + me.actions.send.init_window_sz(), + me.actions.recv.init_window_sz()); // Convert the message let headers = client::Peer::convert_send_message( - stream.id, request, end_of_stream); + stream_id, request, end_of_stream); let mut stream = me.store.insert(stream.id, stream);