From 0c8a94aa11b362ac742cc3ced4da0ede98919f34 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 25 Aug 2017 07:07:21 -0700 Subject: [PATCH] Fix send flow control bug The send stream state is transitioned before data is buffered. As such, the stream state could be closed while there is still data to be sent. --- src/proto/settings.rs | 4 +++- src/proto/streams/prioritize.rs | 11 +++++------ src/proto/streams/send.rs | 16 ++++++++++------ src/proto/streams/state.rs | 11 +---------- src/proto/streams/stream.rs | 1 + 5 files changed, 20 insertions(+), 23 deletions(-) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 0beb685..b18d095 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -36,6 +36,8 @@ impl Settings { B: Buf, C: Buf, { + trace!("send_pending_ack; pending={:?}", self.pending); + if let Some(ref settings) = self.pending { let frame = frame::Settings::ack(); @@ -44,7 +46,7 @@ impl Settings { return Ok(Async::NotReady); } - trace!("ACK sent"); + trace!("ACK sent; applying settings"); dst.apply_remote_settings(settings); streams.apply_remote_settings(settings)?; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 0cf0977..0a8ce2b 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -159,10 +159,8 @@ impl Prioritize stream: &mut store::Ptr) -> Result<(), ConnectionError> { - // Ignore window updates when the stream is not active. - if !stream.state.could_send_data() { - return Ok(()); - } + trace!("recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}", + stream.id, stream.state, inc, stream.send_flow); // Update the stream level flow control. stream.send_flow.inc_window(inc)?; @@ -222,8 +220,9 @@ impl Prioritize } // If the stream has requested capacity, then it must be in the - // streaming state. - debug_assert!(stream.state.is_send_streaming()); + // streaming state (more data could be sent) or there is buffered data + // waiting to be sent. + debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0); // The amount of currently available capacity on the connection let conn_available = self.flow.available(); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 989c783..3eeabbd 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -250,17 +250,21 @@ impl Send where B: Buf { if val < old_val { let dec = old_val - val; + trace!("decrementing all windows; dec={}", dec); + store.for_each(|mut stream| { let stream = &mut *stream; - if stream.state.is_send_streaming() { - stream.send_flow.dec_window(dec); + stream.send_flow.dec_window(dec); + trace!("decremented stream window; id={:?}; decr={}; flow={:?}", + stream.id, dec, stream.send_flow); - // TODO: Handle reclaiming connection level window - // capacity. + // TODO: Probably try to assign capacity? - // TODO: Should this notify the producer? - } + // TODO: Handle reclaiming connection level window + // capacity. + + // TODO: Should this notify the producer? Ok(()) })?; diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 3635fea..e72556c 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -213,6 +213,7 @@ impl State { match self.inner { Closed(..) => {} _ => { + trace!("recv_err; err={:?}", err); self.inner = Closed(match *err { ConnectionError::Proto(reason) => Some(Cause::Proto(reason)), ConnectionError::Io(..) => Some(Cause::Io), @@ -264,16 +265,6 @@ impl State { } } - /// Returns true if the stream is in a state such that it could send data in - /// the future. - pub fn could_send_data(&self) -> bool { - match self.inner { - Open { .. } => true, - HalfClosedRemote(_) => true, - _ => false, - } - } - pub fn is_send_streaming(&self) -> bool { match self.inner { Open { local: Peer::Streaming, .. } => true, diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index dfb0bdb..6bf1ca7 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -23,6 +23,7 @@ pub(super) struct Stream { pub requested_send_capacity: WindowSize, /// Amount of data buffered at the prioritization layer. + /// TODO: Technically this could be greater than the window size... pub buffered_send_data: WindowSize, /// Task tracking additional send capacity (i.e. window updates).