More work on window updates

This commit is contained in:
Carl Lerche
2017-08-09 22:34:22 -07:00
parent c118f86517
commit 7107e9fc06
3 changed files with 120 additions and 19 deletions

View File

@@ -159,14 +159,56 @@ impl<B> Send<B> where B: Buf {
{ {
self.prioritize.recv_window_update(frame)?; self.prioritize.recv_window_update(frame)?;
// TODO: If there is available connection capacity, release pending // Get the current connection capacity
// streams. let connection = self.prioritize.available_window();
//
// Walk each stream pending capacity and see if this change to the // Walk each stream pending capacity and see if this change to the
// connection window can increase the advertised capacity of the stream. // connection window can increase the advertised capacity of the stream.
//
// TODO: This is not a hugely efficient operation. It could be better to
// change the pending_capacity structure to a red-black tree.
//
self.pending_capacity.retain::<stream::NextCapacity, _>(
store,
|stream| {
// Make sure that the stream is flagged as queued
debug_assert!(stream.is_pending_send_capacity);
unimplemented!(); // Get the current unadvertised window
// Ok(()) let unadvertised = stream.unadvertised_send_window;
if unadvertised == 0 {
stream.is_pending_send_capacity = false;
return false;
}
let effective_window_size = match stream.state.send_flow_control() {
Some(flow) => flow.effective_window_size(),
None => {
// The state transitioned and this stream is no longer
// waiting for updates
stream.is_pending_send_capacity = false;
return false;
}
};
if connection <= effective_window_size - unadvertised {
// The window is not increased, but we remain interested in
// updates in the future.
return true;
}
if connection >= effective_window_size {
stream.unadvertised_send_window = 0;
} else {
stream.unadvertised_send_window = effective_window_size - connection;
}
stream.notify_send();
true
});
Ok(())
} }
pub fn recv_stream_window_update(&mut self, pub fn recv_stream_window_update(&mut self,

View File

@@ -32,9 +32,9 @@ pub(super) struct List<B> {
pub(super) trait Next { pub(super) trait Next {
fn next<B>(stream: &Stream<B>) -> Option<Key>; fn next<B>(stream: &Stream<B>) -> Option<Key>;
fn set_next<B>(stream: &mut Stream<B>, key: Key); fn set_next<B>(stream: &mut Stream<B>, key: Option<Key>);
fn take_next<B>(stream: &mut Stream<B>) -> Key; fn take_next<B>(stream: &mut Stream<B>) -> Option<Key>;
} }
/// A linked list /// A linked list
@@ -125,6 +125,20 @@ impl<B> Store<B> {
} }
} }
impl<B> ops::Index<Key> for Store<B> {
type Output = Stream<B>;
fn index(&self, key: Key) -> &Self::Output {
self.slab.index(key.0)
}
}
impl<B> ops::IndexMut<Key> for Store<B> {
fn index_mut(&mut self, key: Key) -> &mut Self::Output {
self.slab.index_mut(key.0)
}
}
// ===== impl List ===== // ===== impl List =====
impl<B> List<B> { impl<B> List<B> {
@@ -157,7 +171,7 @@ impl<B> List<B> {
Some(ref mut idxs) => { Some(ref mut idxs) => {
// Update the current tail node to point to `stream` // Update the current tail node to point to `stream`
let key = stream.key(); let key = stream.key();
N::set_next(&mut stream.resolve(idxs.tail), key); N::set_next(&mut stream.resolve(idxs.tail), Some(key));
// Update the tail pointer // Update the tail pointer
idxs.tail = stream.key(); idxs.tail = stream.key();
@@ -181,7 +195,7 @@ impl<B> List<B> {
assert!(N::next(&*stream).is_none()); assert!(N::next(&*stream).is_none());
self.indices = None; self.indices = None;
} else { } else {
idxs.head = N::take_next(&mut *stream); idxs.head = N::take_next(&mut *stream).unwrap();
self.indices = Some(idxs); self.indices = Some(idxs);
} }
@@ -190,6 +204,53 @@ impl<B> List<B> {
None None
} }
pub fn retain<N, F>(&mut self, store: &mut Store<B>, mut f: F)
where N: Next,
F: FnMut(&mut Stream<B>) -> bool,
{
if let Some(mut idxs) = self.indices {
let mut prev = None;
let mut curr = idxs.head;
loop {
if f(&mut store[curr]) {
// Element is retained, walk to the next
if let Some(next) = N::next(&mut store[curr]) {
prev = Some(curr);
curr = next;
} else {
// Tail
break;
}
} else {
// Element is dropped
if let Some(prev) = prev {
let next = N::take_next(&mut store[curr]);
N::set_next(&mut store[prev], next);
// current is last element, but guaranteed to not be the
// only one
if next.is_none() {
idxs.tail = prev;
break;
}
} else {
if let Some(next) = N::take_next(&mut store[curr]) {
curr = next;
idxs.head = next;
} else {
// Only element
self.indices = None;
return;
}
}
}
}
self.indices = Some(idxs);
}
}
} }
// ===== impl Ptr ===== // ===== impl Ptr =====

View File

@@ -97,13 +97,12 @@ impl store::Next for Next {
stream.next stream.next
} }
fn set_next<B>(stream: &mut Stream<B>, key: store::Key) { fn set_next<B>(stream: &mut Stream<B>, key: Option<store::Key>) {
debug_assert!(stream.next.is_none()); stream.next = key;
stream.next = Some(key);
} }
fn take_next<B>(stream: &mut Stream<B>) -> store::Key { fn take_next<B>(stream: &mut Stream<B>) -> Option<store::Key> {
stream.next.take().unwrap() stream.next.take()
} }
} }
@@ -112,12 +111,11 @@ impl store::Next for NextCapacity {
stream.next_capacity stream.next_capacity
} }
fn set_next<B>(stream: &mut Stream<B>, key: store::Key) { fn set_next<B>(stream: &mut Stream<B>, key: Option<store::Key>) {
debug_assert!(stream.next_capacity.is_none()); stream.next_capacity = key;
stream.next_capacity = Some(key);
} }
fn take_next<B>(stream: &mut Stream<B>) -> store::Key { fn take_next<B>(stream: &mut Stream<B>) -> Option<store::Key> {
stream.next_capacity.take().unwrap() stream.next_capacity.take()
} }
} }