More flow control work

This commit is contained in:
Carl Lerche
2017-08-09 16:42:55 -07:00
parent 95bb95af01
commit c118f86517
7 changed files with 110 additions and 53 deletions

View File

@@ -38,6 +38,8 @@ impl WindowUpdate {
// when received. // when received.
let size_increment = unpack_octets_4!(payload, 0, u32) & !SIZE_INCREMENT_MASK; let size_increment = unpack_octets_4!(payload, 0, u32) & !SIZE_INCREMENT_MASK;
// TODO: the size_increment must be greater than 0
Ok(WindowUpdate { Ok(WindowUpdate {
stream_id: head.stream_id(), stream_id: head.stream_id(),
size_increment, size_increment,

View File

@@ -142,11 +142,17 @@ impl<B> Prioritize<B>
// capacity list first. // capacity list first.
if self.flow_control.has_capacity() && !self.pending_capacity.is_empty() { if self.flow_control.has_capacity() && !self.pending_capacity.is_empty() {
let mut stream = self.pending_capacity.pop(store).unwrap(); let mut stream = self.pending_capacity
.pop::<stream::Next>(store)
.unwrap();
stream.is_pending_send = false; stream.is_pending_send = false;
Some(stream) Some(stream)
} else { } else {
match self.pending_send.pop(store) { let stream = self.pending_send
.pop::<stream::Next>(store);
match stream {
Some(mut stream) => { Some(mut stream) => {
stream.is_pending_send = false; stream.is_pending_send = false;
Some(stream) Some(stream)
@@ -159,6 +165,6 @@ impl<B> Prioritize<B>
fn push_sender<B>(list: &mut store::List<B>, stream: &mut store::Ptr<B>) { fn push_sender<B>(list: &mut store::List<B>, stream: &mut store::Ptr<B>) {
debug_assert!(!stream.is_pending_send); debug_assert!(!stream.is_pending_send);
list.push(stream); list.push::<stream::Next>(stream);
stream.is_pending_send = true; stream.is_pending_send = true;
} }

View File

@@ -139,7 +139,7 @@ impl<B> Recv<B> where B: Buf {
// Only servers can receive a headers frame that initiates the stream. // Only servers can receive a headers frame that initiates the stream.
// This is verified in `Streams` before calling this function. // This is verified in `Streams` before calling this function.
if P::is_server() { if P::is_server() {
self.pending_accept.push(stream); self.pending_accept.push::<stream::Next>(stream);
} }
Ok(()) Ok(())
@@ -226,7 +226,7 @@ impl<B> Recv<B> where B: Buf {
let mut new_stream = stream.store() let mut new_stream = stream.store()
.insert(frame.promised_id(), new_stream); .insert(frame.promised_id(), new_stream);
ppp.push(&mut new_stream); ppp.push::<stream::Next>(&mut new_stream);
} }
stream.pending_push_promises = ppp; stream.pending_push_promises = ppp;
@@ -381,7 +381,7 @@ impl<B> Recv<B> where B: Buf {
*/ */
pub fn next_incoming(&mut self, store: &mut Store<B>) -> Option<store::Key> { pub fn next_incoming(&mut self, store: &mut Store<B>) -> Option<store::Key> {
self.pending_accept.pop(store) self.pending_accept.pop::<stream::Next>(store)
.map(|ptr| ptr.key()) .map(|ptr| ptr.key())
} }

View File

@@ -24,6 +24,10 @@ pub(super) struct Send<B> {
/// Initial window size of locally initiated streams /// Initial window size of locally initiated streams
init_window_sz: WindowSize, init_window_sz: WindowSize,
/// List of streams waiting for outbound connection capacity
pending_capacity: store::List<B>,
/// Prioritization layer
prioritize: Prioritize<B>, prioritize: Prioritize<B>,
} }
@@ -42,6 +46,7 @@ impl<B> Send<B> where B: Buf {
num_streams: 0, num_streams: 0,
next_stream_id: next_stream_id.into(), next_stream_id: next_stream_id.into(),
init_window_sz: config.init_local_window_sz, init_window_sz: config.init_local_window_sz,
pending_capacity: store::List::new(),
prioritize: Prioritize::new(config), prioritize: Prioritize::new(config),
} }
} }
@@ -147,49 +152,21 @@ impl<B> Send<B> where B: Buf {
self.prioritize.poll_complete(store, dst) self.prioritize.poll_complete(store, dst)
} }
/* pub fn recv_connection_window_update(&mut self,
/// Get pending window updates frame: frame::WindowUpdate,
pub fn poll_window_update(&mut self, streams: &mut Store<B>) store: &mut Store<B>)
-> Poll<WindowUpdate, ConnectionError>
{
// This biases connection window updates, which probably makes sense.
//
// TODO: We probably don't want to expose connection level updates
if let Some(incr) = self.flow_control.apply_window_update() {
return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr)));
}
// TODO this should probably account for stream priority?
let update = self.pending_window_updates.pop_front()
.and_then(|id| {
streams.find_mut(&id)
.and_then(|stream| stream.into_mut().send_flow_control())
.and_then(|flow| flow.apply_window_update())
.map(|incr| WindowUpdate::new(id, incr))
});
if let Some(update) = update {
return Ok(Async::Ready(update));
}
// Update the task.
//
// TODO: Extract this "gate" logic
self.blocked = Some(task::current());
return Ok(Async::NotReady);
}
*/
pub fn recv_connection_window_update(&mut self, frame: frame::WindowUpdate)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
self.prioritize.recv_window_update(frame)?; self.prioritize.recv_window_update(frame)?;
// TODO: If there is available connection capacity, release pending // TODO: If there is available connection capacity, release pending
// streams. // streams.
//
// Walk each stream pending capacity and see if this change to the
// connection window can increase the advertised capacity of the stream.
Ok(()) unimplemented!();
// Ok(())
} }
pub fn recv_stream_window_update(&mut self, pub fn recv_stream_window_update(&mut self,
@@ -216,7 +193,10 @@ impl<B> Send<B> where B: Buf {
if connection < effective_window_size { if connection < effective_window_size {
stream.unadvertised_send_window = effective_window_size - connection; stream.unadvertised_send_window = effective_window_size - connection;
// TODO: Queue the stream in a pending connection capacity list. if !stream.is_pending_send_capacity {
stream.is_pending_send_capacity = true;
self.pending_capacity.push::<stream::NextCapacity>(stream);
}
} }
if stream.unadvertised_send_window == frame.size_increment() + unadvertised { if stream.unadvertised_send_window == frame.size_increment() + unadvertised {
@@ -225,9 +205,9 @@ impl<B> Send<B> where B: Buf {
return Ok(()); return Ok(());
} }
// TODO: Notify the send task that there is additional capacity stream.notify_send();
unimplemented!(); Ok(())
} }
pub fn dec_num_streams(&mut self) { pub fn dec_num_streams(&mut self) {

View File

@@ -29,6 +29,14 @@ pub(super) struct List<B> {
_p: PhantomData<B>, _p: PhantomData<B>,
} }
pub(super) trait Next {
fn next<B>(stream: &Stream<B>) -> Option<Key>;
fn set_next<B>(stream: &mut Stream<B>, key: Key);
fn take_next<B>(stream: &mut Stream<B>) -> Key;
}
/// A linked list /// A linked list
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
struct Indices { struct Indices {
@@ -138,15 +146,18 @@ impl<B> List<B> {
} }
} }
pub fn push(&mut self, stream: &mut store::Ptr<B>) { pub fn push<N>(&mut self, stream: &mut store::Ptr<B>)
where N: Next,
{
// The next pointer shouldn't be set // The next pointer shouldn't be set
debug_assert!(stream.next.is_none()); debug_assert!(N::next(stream).is_none());
// Queue the stream // Queue the stream
match self.indices { match self.indices {
Some(ref mut idxs) => { Some(ref mut idxs) => {
// Update the current tail node to point to `stream` // Update the current tail node to point to `stream`
stream.resolve(idxs.tail).next = Some(stream.key()); let key = stream.key();
N::set_next(&mut stream.resolve(idxs.tail), key);
// Update the tail pointer // Update the tail pointer
idxs.tail = stream.key(); idxs.tail = stream.key();
@@ -160,15 +171,17 @@ impl<B> List<B> {
} }
} }
pub fn pop<'a>(&mut self, store: &'a mut Store<B>) -> Option<store::Ptr<'a, B>> { pub fn pop<'a, N>(&mut self, store: &'a mut Store<B>) -> Option<store::Ptr<'a, B>>
where N: Next,
{
if let Some(mut idxs) = self.indices { if let Some(mut idxs) = self.indices {
let mut stream = store.resolve(idxs.head); let mut stream = store.resolve(idxs.head);
if idxs.head == idxs.tail { if idxs.head == idxs.tail {
assert!(stream.next.is_none()); assert!(N::next(&*stream).is_none());
self.indices = None; self.indices = None;
} else { } else {
idxs.head = stream.next.take().unwrap(); idxs.head = N::take_next(&mut *stream);
self.indices = Some(idxs); self.indices = Some(idxs);
} }

View File

@@ -14,6 +14,9 @@ pub(super) struct Stream<B> {
/// Task tracking receiving frames /// Task tracking receiving frames
pub recv_task: Option<task::Task>, pub recv_task: Option<task::Task>,
/// Task tracking additional send capacity (i.e. window updates).
pub send_task: Option<task::Task>,
/// Frames pending for this stream being sent to the socket /// Frames pending for this stream being sent to the socket
pub pending_send: buffer::Deque<B>, pub pending_send: buffer::Deque<B>,
@@ -23,6 +26,13 @@ pub(super) struct Stream<B> {
/// state. /// state.
pub next: Option<store::Key>, pub next: Option<store::Key>,
/// Next node in the linked list of streams waiting for additional
/// connection level capacity.
pub next_capacity: Option<store::Key>,
/// True if the stream is waiting for outbound connection capacity
pub is_pending_send_capacity: bool,
/// The stream's pending push promises /// The stream's pending push promises
pub pending_push_promises: store::List<B>, pub pending_push_promises: store::List<B>,
@@ -35,6 +45,12 @@ pub(super) struct Stream<B> {
pub unadvertised_send_window: WindowSize, pub unadvertised_send_window: WindowSize,
} }
#[derive(Debug)]
pub(super) struct Next;
#[derive(Debug)]
pub(super) struct NextCapacity;
impl<B> Stream<B> { impl<B> Stream<B> {
pub fn new(id: StreamId) -> Stream<B> { pub fn new(id: StreamId) -> Stream<B> {
Stream { Stream {
@@ -42,8 +58,11 @@ impl<B> Stream<B> {
state: State::default(), state: State::default(),
pending_recv: buffer::Deque::new(), pending_recv: buffer::Deque::new(),
recv_task: None, recv_task: None,
send_task: None,
pending_send: buffer::Deque::new(), pending_send: buffer::Deque::new(),
next: None, next: None,
next_capacity: None,
is_pending_send_capacity: false,
pending_push_promises: store::List::new(), pending_push_promises: store::List::new(),
is_pending_send: false, is_pending_send: false,
unadvertised_send_window: 0, unadvertised_send_window: 0,
@@ -60,9 +79,45 @@ impl<B> Stream<B> {
self.state.recv_flow_control() self.state.recv_flow_control()
} }
pub fn notify_send(&mut self) {
if let Some(task) = self.send_task.take() {
task.notify();
}
}
pub fn notify_recv(&mut self) { pub fn notify_recv(&mut self) {
if let Some(ref mut task) = self.recv_task { if let Some(task) = self.recv_task.take() {
task.notify(); task.notify();
} }
} }
} }
impl store::Next for Next {
fn next<B>(stream: &Stream<B>) -> Option<store::Key> {
stream.next
}
fn set_next<B>(stream: &mut Stream<B>, key: store::Key) {
debug_assert!(stream.next.is_none());
stream.next = Some(key);
}
fn take_next<B>(stream: &mut Stream<B>) -> store::Key {
stream.next.take().unwrap()
}
}
impl store::Next for NextCapacity {
fn next<B>(stream: &Stream<B>) -> Option<store::Key> {
stream.next_capacity
}
fn set_next<B>(stream: &mut Stream<B>, key: store::Key) {
debug_assert!(stream.next_capacity.is_none());
stream.next_capacity = Some(key);
}
fn take_next<B>(stream: &mut Stream<B>) -> store::Key {
stream.next_capacity.take().unwrap()
}
}

View File

@@ -161,7 +161,8 @@ impl<B> Streams<B>
let me = &mut *me; let me = &mut *me;
if id.is_zero() { if id.is_zero() {
try!(me.actions.send.recv_connection_window_update(frame)); me.actions.send.recv_connection_window_update(
frame, &mut me.store)?;
} else { } else {
// The remote may send window updates for streams that the local now // The remote may send window updates for streams that the local now
// considers closed. It's ok... // considers closed. It's ok...