diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 207ce3f..32d6fd9 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -159,14 +159,56 @@ impl Send where B: Buf { { self.prioritize.recv_window_update(frame)?; - // TODO: If there is available connection capacity, release pending - // streams. - // + // Get the current connection capacity + let connection = self.prioritize.available_window(); + // Walk each stream pending capacity and see if this change to the // connection window can increase the advertised capacity of the stream. + // + // TODO: This is not a hugely efficient operation. It could be better to + // change the pending_capacity structure to a red-black tree. + // + self.pending_capacity.retain::( + store, + |stream| { + // Make sure that the stream is flagged as queued + debug_assert!(stream.is_pending_send_capacity); - unimplemented!(); - // Ok(()) + // Get the current unadvertised window + let unadvertised = stream.unadvertised_send_window; + + if unadvertised == 0 { + stream.is_pending_send_capacity = false; + return false; + } + + let effective_window_size = match stream.state.send_flow_control() { + Some(flow) => flow.effective_window_size(), + None => { + // The state transitioned and this stream is no longer + // waiting for updates + stream.is_pending_send_capacity = false; + return false; + } + }; + + if connection <= effective_window_size - unadvertised { + // The window is not increased, but we remain interested in + // updates in the future. + return true; + } + + if connection >= effective_window_size { + stream.unadvertised_send_window = 0; + } else { + stream.unadvertised_send_window = effective_window_size - connection; + } + + stream.notify_send(); + true + }); + + Ok(()) } pub fn recv_stream_window_update(&mut self, diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 2ade9c9..55b1229 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -32,9 +32,9 @@ pub(super) struct List { pub(super) trait Next { fn next(stream: &Stream) -> Option; - fn set_next(stream: &mut Stream, key: Key); + fn set_next(stream: &mut Stream, key: Option); - fn take_next(stream: &mut Stream) -> Key; + fn take_next(stream: &mut Stream) -> Option; } /// A linked list @@ -125,6 +125,20 @@ impl Store { } } +impl ops::Index for Store { + type Output = Stream; + + fn index(&self, key: Key) -> &Self::Output { + self.slab.index(key.0) + } +} + +impl ops::IndexMut for Store { + fn index_mut(&mut self, key: Key) -> &mut Self::Output { + self.slab.index_mut(key.0) + } +} + // ===== impl List ===== impl List { @@ -157,7 +171,7 @@ impl List { Some(ref mut idxs) => { // Update the current tail node to point to `stream` let key = stream.key(); - N::set_next(&mut stream.resolve(idxs.tail), key); + N::set_next(&mut stream.resolve(idxs.tail), Some(key)); // Update the tail pointer idxs.tail = stream.key(); @@ -181,7 +195,7 @@ impl List { assert!(N::next(&*stream).is_none()); self.indices = None; } else { - idxs.head = N::take_next(&mut *stream); + idxs.head = N::take_next(&mut *stream).unwrap(); self.indices = Some(idxs); } @@ -190,6 +204,53 @@ impl List { None } + + pub fn retain(&mut self, store: &mut Store, mut f: F) + where N: Next, + F: FnMut(&mut Stream) -> bool, + { + if let Some(mut idxs) = self.indices { + let mut prev = None; + let mut curr = idxs.head; + + loop { + if f(&mut store[curr]) { + // Element is retained, walk to the next + if let Some(next) = N::next(&mut store[curr]) { + prev = Some(curr); + curr = next; + } else { + // Tail + break; + } + } else { + // Element is dropped + if let Some(prev) = prev { + let next = N::take_next(&mut store[curr]); + N::set_next(&mut store[prev], next); + + // current is last element, but guaranteed to not be the + // only one + if next.is_none() { + idxs.tail = prev; + break; + } + } else { + if let Some(next) = N::take_next(&mut store[curr]) { + curr = next; + idxs.head = next; + } else { + // Only element + self.indices = None; + return; + } + } + } + } + + self.indices = Some(idxs); + } + } } // ===== impl Ptr ===== diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 0c3ae1c..6f328c7 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -97,13 +97,12 @@ impl store::Next for Next { stream.next } - fn set_next(stream: &mut Stream, key: store::Key) { - debug_assert!(stream.next.is_none()); - stream.next = Some(key); + fn set_next(stream: &mut Stream, key: Option) { + stream.next = key; } - fn take_next(stream: &mut Stream) -> store::Key { - stream.next.take().unwrap() + fn take_next(stream: &mut Stream) -> Option { + stream.next.take() } } @@ -112,12 +111,11 @@ impl store::Next for NextCapacity { stream.next_capacity } - fn set_next(stream: &mut Stream, key: store::Key) { - debug_assert!(stream.next_capacity.is_none()); - stream.next_capacity = Some(key); + fn set_next(stream: &mut Stream, key: Option) { + stream.next_capacity = key; } - fn take_next(stream: &mut Stream) -> store::Key { - stream.next_capacity.take().unwrap() + fn take_next(stream: &mut Stream) -> Option { + stream.next_capacity.take() } }