From c118f865176f4852c1c24cfc3f837af92c4c06e0 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 9 Aug 2017 16:42:55 -0700 Subject: [PATCH] More flow control work --- src/frame/window_update.rs | 2 ++ src/proto/streams/prioritize.rs | 12 +++++-- src/proto/streams/recv.rs | 6 ++-- src/proto/streams/send.rs | 58 +++++++++++---------------------- src/proto/streams/store.rs | 25 ++++++++++---- src/proto/streams/stream.rs | 57 +++++++++++++++++++++++++++++++- src/proto/streams/streams.rs | 3 +- 7 files changed, 110 insertions(+), 53 deletions(-) diff --git a/src/frame/window_update.rs b/src/frame/window_update.rs index 1c4293d..785051d 100644 --- a/src/frame/window_update.rs +++ b/src/frame/window_update.rs @@ -38,6 +38,8 @@ impl WindowUpdate { // when received. let size_increment = unpack_octets_4!(payload, 0, u32) & !SIZE_INCREMENT_MASK; + // TODO: the size_increment must be greater than 0 + Ok(WindowUpdate { stream_id: head.stream_id(), size_increment, diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 8f449ff..cf836b3 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -142,11 +142,17 @@ impl Prioritize // capacity list first. if self.flow_control.has_capacity() && !self.pending_capacity.is_empty() { - let mut stream = self.pending_capacity.pop(store).unwrap(); + let mut stream = self.pending_capacity + .pop::(store) + .unwrap(); + stream.is_pending_send = false; Some(stream) } else { - match self.pending_send.pop(store) { + let stream = self.pending_send + .pop::(store); + + match stream { Some(mut stream) => { stream.is_pending_send = false; Some(stream) @@ -159,6 +165,6 @@ impl Prioritize fn push_sender(list: &mut store::List, stream: &mut store::Ptr) { debug_assert!(!stream.is_pending_send); - list.push(stream); + list.push::(stream); stream.is_pending_send = true; } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 3e11a5a..6c25740 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -139,7 +139,7 @@ impl Recv where B: Buf { // Only servers can receive a headers frame that initiates the stream. // This is verified in `Streams` before calling this function. if P::is_server() { - self.pending_accept.push(stream); + self.pending_accept.push::(stream); } Ok(()) @@ -226,7 +226,7 @@ impl Recv where B: Buf { let mut new_stream = stream.store() .insert(frame.promised_id(), new_stream); - ppp.push(&mut new_stream); + ppp.push::(&mut new_stream); } stream.pending_push_promises = ppp; @@ -381,7 +381,7 @@ impl Recv where B: Buf { */ pub fn next_incoming(&mut self, store: &mut Store) -> Option { - self.pending_accept.pop(store) + self.pending_accept.pop::(store) .map(|ptr| ptr.key()) } diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 8b47dc4..207ce3f 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -24,6 +24,10 @@ pub(super) struct Send { /// Initial window size of locally initiated streams init_window_sz: WindowSize, + /// List of streams waiting for outbound connection capacity + pending_capacity: store::List, + + /// Prioritization layer prioritize: Prioritize, } @@ -42,6 +46,7 @@ impl Send where B: Buf { num_streams: 0, next_stream_id: next_stream_id.into(), init_window_sz: config.init_local_window_sz, + pending_capacity: store::List::new(), prioritize: Prioritize::new(config), } } @@ -147,49 +152,21 @@ impl Send where B: Buf { self.prioritize.poll_complete(store, dst) } - /* - /// Get pending window updates - pub fn poll_window_update(&mut self, streams: &mut Store) - -> Poll - { - // This biases connection window updates, which probably makes sense. - // - // TODO: We probably don't want to expose connection level updates - if let Some(incr) = self.flow_control.apply_window_update() { - return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr))); - } - - // TODO this should probably account for stream priority? - let update = self.pending_window_updates.pop_front() - .and_then(|id| { - streams.find_mut(&id) - .and_then(|stream| stream.into_mut().send_flow_control()) - .and_then(|flow| flow.apply_window_update()) - .map(|incr| WindowUpdate::new(id, incr)) - }); - - if let Some(update) = update { - return Ok(Async::Ready(update)); - } - - // Update the task. - // - // TODO: Extract this "gate" logic - self.blocked = Some(task::current()); - - return Ok(Async::NotReady); - } - */ - - pub fn recv_connection_window_update(&mut self, frame: frame::WindowUpdate) + pub fn recv_connection_window_update(&mut self, + frame: frame::WindowUpdate, + store: &mut Store) -> Result<(), ConnectionError> { self.prioritize.recv_window_update(frame)?; // TODO: If there is available connection capacity, release pending // streams. + // + // Walk each stream pending capacity and see if this change to the + // connection window can increase the advertised capacity of the stream. - Ok(()) + unimplemented!(); + // Ok(()) } pub fn recv_stream_window_update(&mut self, @@ -216,7 +193,10 @@ impl Send where B: Buf { if connection < effective_window_size { stream.unadvertised_send_window = effective_window_size - connection; - // TODO: Queue the stream in a pending connection capacity list. + if !stream.is_pending_send_capacity { + stream.is_pending_send_capacity = true; + self.pending_capacity.push::(stream); + } } if stream.unadvertised_send_window == frame.size_increment() + unadvertised { @@ -225,9 +205,9 @@ impl Send where B: Buf { return Ok(()); } - // TODO: Notify the send task that there is additional capacity + stream.notify_send(); - unimplemented!(); + Ok(()) } pub fn dec_num_streams(&mut self) { diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 89900b8..2ade9c9 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -29,6 +29,14 @@ pub(super) struct List { _p: PhantomData, } +pub(super) trait Next { + fn next(stream: &Stream) -> Option; + + fn set_next(stream: &mut Stream, key: Key); + + fn take_next(stream: &mut Stream) -> Key; +} + /// A linked list #[derive(Debug, Clone, Copy)] struct Indices { @@ -138,15 +146,18 @@ impl List { } } - pub fn push(&mut self, stream: &mut store::Ptr) { + pub fn push(&mut self, stream: &mut store::Ptr) + where N: Next, + { // The next pointer shouldn't be set - debug_assert!(stream.next.is_none()); + debug_assert!(N::next(stream).is_none()); // Queue the stream match self.indices { Some(ref mut idxs) => { // Update the current tail node to point to `stream` - stream.resolve(idxs.tail).next = Some(stream.key()); + let key = stream.key(); + N::set_next(&mut stream.resolve(idxs.tail), key); // Update the tail pointer idxs.tail = stream.key(); @@ -160,15 +171,17 @@ impl List { } } - pub fn pop<'a>(&mut self, store: &'a mut Store) -> Option> { + pub fn pop<'a, N>(&mut self, store: &'a mut Store) -> Option> + where N: Next, + { if let Some(mut idxs) = self.indices { let mut stream = store.resolve(idxs.head); if idxs.head == idxs.tail { - assert!(stream.next.is_none()); + assert!(N::next(&*stream).is_none()); self.indices = None; } else { - idxs.head = stream.next.take().unwrap(); + idxs.head = N::take_next(&mut *stream); self.indices = Some(idxs); } diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 2853437..0c3ae1c 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -14,6 +14,9 @@ pub(super) struct Stream { /// Task tracking receiving frames pub recv_task: Option, + /// Task tracking additional send capacity (i.e. window updates). + pub send_task: Option, + /// Frames pending for this stream being sent to the socket pub pending_send: buffer::Deque, @@ -23,6 +26,13 @@ pub(super) struct Stream { /// state. pub next: Option, + /// Next node in the linked list of streams waiting for additional + /// connection level capacity. + pub next_capacity: Option, + + /// True if the stream is waiting for outbound connection capacity + pub is_pending_send_capacity: bool, + /// The stream's pending push promises pub pending_push_promises: store::List, @@ -35,6 +45,12 @@ pub(super) struct Stream { pub unadvertised_send_window: WindowSize, } +#[derive(Debug)] +pub(super) struct Next; + +#[derive(Debug)] +pub(super) struct NextCapacity; + impl Stream { pub fn new(id: StreamId) -> Stream { Stream { @@ -42,8 +58,11 @@ impl Stream { state: State::default(), pending_recv: buffer::Deque::new(), recv_task: None, + send_task: None, pending_send: buffer::Deque::new(), next: None, + next_capacity: None, + is_pending_send_capacity: false, pending_push_promises: store::List::new(), is_pending_send: false, unadvertised_send_window: 0, @@ -60,9 +79,45 @@ impl Stream { self.state.recv_flow_control() } + pub fn notify_send(&mut self) { + if let Some(task) = self.send_task.take() { + task.notify(); + } + } + pub fn notify_recv(&mut self) { - if let Some(ref mut task) = self.recv_task { + if let Some(task) = self.recv_task.take() { task.notify(); } } } + +impl store::Next for Next { + fn next(stream: &Stream) -> Option { + stream.next + } + + fn set_next(stream: &mut Stream, key: store::Key) { + debug_assert!(stream.next.is_none()); + stream.next = Some(key); + } + + fn take_next(stream: &mut Stream) -> store::Key { + stream.next.take().unwrap() + } +} + +impl store::Next for NextCapacity { + fn next(stream: &Stream) -> Option { + stream.next_capacity + } + + fn set_next(stream: &mut Stream, key: store::Key) { + debug_assert!(stream.next_capacity.is_none()); + stream.next_capacity = Some(key); + } + + fn take_next(stream: &mut Stream) -> store::Key { + stream.next_capacity.take().unwrap() + } +} diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 6ba4c60..e203279 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -161,7 +161,8 @@ impl Streams let me = &mut *me; if id.is_zero() { - try!(me.actions.send.recv_connection_window_update(frame)); + me.actions.send.recv_connection_window_update( + frame, &mut me.store)?; } else { // The remote may send window updates for streams that the local now // considers closed. It's ok...