diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 0beb685..b18d095 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -36,6 +36,8 @@ impl Settings { B: Buf, C: Buf, { + trace!("send_pending_ack; pending={:?}", self.pending); + if let Some(ref settings) = self.pending { let frame = frame::Settings::ack(); @@ -44,7 +46,7 @@ impl Settings { return Ok(Async::NotReady); } - trace!("ACK sent"); + trace!("ACK sent; applying settings"); dst.apply_remote_settings(settings); streams.apply_remote_settings(settings)?; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 0cf0977..0a8ce2b 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -159,10 +159,8 @@ impl Prioritize stream: &mut store::Ptr) -> Result<(), ConnectionError> { - // Ignore window updates when the stream is not active. - if !stream.state.could_send_data() { - return Ok(()); - } + trace!("recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}", + stream.id, stream.state, inc, stream.send_flow); // Update the stream level flow control. stream.send_flow.inc_window(inc)?; @@ -222,8 +220,9 @@ impl Prioritize } // If the stream has requested capacity, then it must be in the - // streaming state. - debug_assert!(stream.state.is_send_streaming()); + // streaming state (more data could be sent) or there is buffered data + // waiting to be sent. + debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0); // The amount of currently available capacity on the connection let conn_available = self.flow.available(); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 989c783..3eeabbd 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -250,17 +250,21 @@ impl Send where B: Buf { if val < old_val { let dec = old_val - val; + trace!("decrementing all windows; dec={}", dec); + store.for_each(|mut stream| { let stream = &mut *stream; - if stream.state.is_send_streaming() { - stream.send_flow.dec_window(dec); + stream.send_flow.dec_window(dec); + trace!("decremented stream window; id={:?}; decr={}; flow={:?}", + stream.id, dec, stream.send_flow); - // TODO: Handle reclaiming connection level window - // capacity. + // TODO: Probably try to assign capacity? - // TODO: Should this notify the producer? - } + // TODO: Handle reclaiming connection level window + // capacity. + + // TODO: Should this notify the producer? Ok(()) })?; diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 3635fea..e72556c 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -213,6 +213,7 @@ impl State { match self.inner { Closed(..) => {} _ => { + trace!("recv_err; err={:?}", err); self.inner = Closed(match *err { ConnectionError::Proto(reason) => Some(Cause::Proto(reason)), ConnectionError::Io(..) => Some(Cause::Io), @@ -264,16 +265,6 @@ impl State { } } - /// Returns true if the stream is in a state such that it could send data in - /// the future. - pub fn could_send_data(&self) -> bool { - match self.inner { - Open { .. } => true, - HalfClosedRemote(_) => true, - _ => false, - } - } - pub fn is_send_streaming(&self) -> bool { match self.inner { Open { local: Peer::Streaming, .. } => true, diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index dfb0bdb..6bf1ca7 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -23,6 +23,7 @@ pub(super) struct Stream { pub requested_send_capacity: WindowSize, /// Amount of data buffered at the prioritization layer. + /// TODO: Technically this could be greater than the window size... pub buffered_send_data: WindowSize, /// Task tracking additional send capacity (i.e. window updates).