From 74b3852a58434b6de270ce0416db5ff0f6010249 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 3 Aug 2017 22:44:19 -0700 Subject: [PATCH] Start working on prioritization --- src/frame/headers.rs | 4 +- src/proto/streams/mod.rs | 2 + src/proto/streams/prioritize.rs | 61 +++++++++++++++++++++++++++++ src/proto/streams/recv.rs | 2 +- src/proto/streams/send.rs | 16 +++++--- src/proto/streams/store.rs | 69 ++++++++++++++++++++++++++++++--- src/proto/streams/stream.rs | 8 ++++ src/proto/streams/streams.rs | 17 ++++---- 8 files changed, 156 insertions(+), 23 deletions(-) create mode 100644 src/proto/streams/prioritize.rs diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 6c33a20..81c0dee 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -296,8 +296,8 @@ impl Headers { } } -impl From for Frame { - fn from(src: Headers) -> Frame { +impl From for Frame { + fn from(src: Headers) -> Self { Frame::Headers(src) } } diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index a36602f..2b5429e 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -1,5 +1,6 @@ mod buffer; mod flow_control; +mod prioritize; mod recv; mod send; mod state; @@ -11,6 +12,7 @@ pub use self::streams::{Streams, StreamRef}; use self::buffer::Buffer; use self::flow_control::FlowControl; +use self::prioritize::Prioritize; use self::recv::Recv; use self::send::Send; use self::state::State; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs new file mode 100644 index 0000000..0e0dbfd --- /dev/null +++ b/src/proto/streams/prioritize.rs @@ -0,0 +1,61 @@ +use super::*; + +#[derive(Debug)] +pub(super) struct Prioritize { + pending_send: Option, + + /// Holds frames that are waiting to be written to the socket + buffer: Buffer, +} + +#[derive(Debug, Clone, Copy)] +struct Indices { + head: store::Key, + tail: store::Key, +} + +impl Prioritize { + pub fn new() -> Prioritize { + Prioritize { + pending_send: None, + buffer: Buffer::new(), + } + } + + pub fn queue_frame(&mut self, + frame: Frame, + stream: &mut store::Ptr) + { + // queue the frame in the buffer + stream.pending_send.push_back(&mut self.buffer, frame); + + if stream.is_pending_send { + debug_assert!(self.pending_send.is_some()); + + // Already queued to have frame processed. + return; + } + + // The next pointer shouldn't be set + debug_assert!(stream.next_pending_send.is_none()); + + // Queue the stream + match self.pending_send { + Some(ref mut idxs) => { + // Update the current tail node to point to `stream` + stream.resolve(idxs.tail).next_pending_send = Some(stream.key()); + + // Update the tail pointer + idxs.tail = stream.key(); + } + None => { + self.pending_send = Some(Indices { + head: stream.key(), + tail: stream.key(), + }); + } + } + + stream.is_pending_send = true; + } +} diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index c222afe..99a7a39 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -208,7 +208,7 @@ impl Recv where T: AsyncWrite, { while let Some(id) = self.pending_window_updates.pop_front() { - let flow = streams.get_mut(&id) + let flow = streams.find_mut(&id) .and_then(|stream| stream.recv_flow_control()); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index bbc363c..6ed4fdd 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -30,8 +30,7 @@ pub(super) struct Send { // XXX It would be cool if this didn't exist. pending_window_updates: VecDeque, - /// Holds frames that are waiting to be written to the socket - buffer: Buffer, + prioritize: Prioritize, /// When `poll_window_update` is not ready, then the calling task is saved to /// be notified later. Access to poll_window_update must not be shared across tasks, @@ -58,8 +57,8 @@ impl Send next_stream_id: next_stream_id.into(), init_window_sz: config.init_local_window_sz, flow_control: FlowControl::new(config.init_local_window_sz), + prioritize: Prioritize::new(), pending_window_updates: VecDeque::new(), - buffer: Buffer::new(), blocked: None, _p: PhantomData, } @@ -86,12 +85,17 @@ impl Send Ok(ret) } - pub fn send_headers(&mut self, stream: &mut Stream, frame: frame::Headers) + pub fn send_headers(&mut self, + frame: frame::Headers, + stream: &mut store::Ptr) -> Result<(), ConnectionError> { // Update the state stream.state.send_open(self.init_window_sz, frame.is_end_stream())?; - // stream.send_buf.headers = Some(frame); + + // Queue the frame for sending + self.prioritize.queue_frame(frame.into(), stream); + Ok(()) } @@ -161,7 +165,7 @@ impl Send // TODO this should probably account for stream priority? let update = self.pending_window_updates.pop_front() .and_then(|id| { - streams.get_mut(&id) + streams.find_mut(&id) .and_then(|stream| stream.send_flow_control()) .and_then(|flow| flow.apply_window_update()) .map(|incr| WindowUpdate::new(id, incr)) diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 4c2dbf7..b19c755 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -2,6 +2,7 @@ use super::*; use slab; +use std::ops; use std::collections::{HashMap, hash_map}; /// Storage for streams @@ -11,6 +12,16 @@ pub(super) struct Store { ids: HashMap, } +/// "Pointer" to an entry in the store +pub(super) struct Ptr<'a, B: 'a> { + key: Key, + store: &'a mut Store, +} + +/// References an entry in the store. +#[derive(Debug, Clone, Copy)] +pub(super) struct Key(usize); + pub(super) enum Entry<'a, B: 'a> { Occupied(OccupiedEntry<'a, B>), Vacant(VacantEntry<'a, B>), @@ -26,6 +37,8 @@ pub(super) struct VacantEntry<'a, B: 'a> { slab: &'a mut slab::Slab>, } +// ===== impl Store ===== + impl Store { pub fn new() -> Self { Store { @@ -34,7 +47,7 @@ impl Store { } } - pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut Stream> { + pub fn find_mut(&mut self, id: &StreamId) -> Option<&mut Stream> { if let Some(handle) = self.ids.get(id) { Some(&mut self.slab[*handle]) } else { @@ -42,12 +55,17 @@ impl Store { } } - pub fn insert(&mut self, id: StreamId, val: Stream) { - let handle = self.slab.insert(val); - assert!(self.ids.insert(id, handle).is_none()); + pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { + let key = self.slab.insert(val); + assert!(self.ids.insert(id, key).is_none()); + + Ptr { + key: Key(key), + store: self, + } } - pub fn entry(&mut self, id: StreamId) -> Entry { + pub fn find_entry(&mut self, id: StreamId) -> Entry { use self::hash_map::Entry::*; match self.ids.entry(id) { @@ -67,12 +85,53 @@ impl Store { } } +// ===== impl Ptr ===== + +impl<'a, B: 'a> Ptr<'a, B> { + pub fn key(&self) -> Key { + self.key + } + + pub fn resolve(&mut self, key: Key) -> Ptr { + Ptr { + key: key, + store: self.store, + } + } +} + +impl<'a, B: 'a> ops::Deref for Ptr<'a, B> { + type Target = Stream; + + fn deref(&self) -> &Stream { + &self.store.slab[self.key.0] + } +} + +impl<'a, B: 'a> ops::DerefMut for Ptr<'a, B> { + fn deref_mut(&mut self) -> &mut Stream { + &mut self.store.slab[self.key.0] + } +} + +// ===== impl OccupiedEntry ===== + impl<'a, B> OccupiedEntry<'a, B> { + pub fn get(&self) -> &Stream { + &self.slab[*self.ids.get()] + } + + pub fn get_mut(&mut self) -> &mut Stream { + &mut self.slab[*self.ids.get()] + } + pub fn into_mut(self) -> &'a mut Stream { &mut self.slab[*self.ids.get()] } } +// ===== impl VacantEntry ===== +// impl<'a, B> VacantEntry<'a, B> { pub fn insert(self, value: Stream) -> &'a mut Stream { // Insert the value in the slab diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 2b3a67b..0689d3c 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -7,6 +7,12 @@ pub(super) struct Stream { /// Frames pending for this stream being sent to the socket pub pending_send: buffer::Deque, + + /// Next stream pending send + pub next_pending_send: Option, + + /// True if the stream is currently pending send + pub is_pending_send: bool, } impl Stream { @@ -14,6 +20,8 @@ impl Stream { Stream { state: State::default(), pending_send: buffer::Deque::new(), + next_pending_send: None, + is_pending_send: false, } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index d385956..b5b3ded 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -60,7 +60,7 @@ impl Streams let mut me = self.inner.lock().unwrap(); let me = &mut *me; - let stream = match me.store.entry(id) { + let stream = match me.store.find_entry(id) { Entry::Occupied(e) => e.into_mut(), Entry::Vacant(e) => { // Trailers cannot open a stream. Trailers are header frames @@ -103,7 +103,7 @@ impl Streams let mut me = self.inner.lock().unwrap(); let me = &mut *me; - let stream = match me.store.get_mut(&id) { + let stream = match me.store.find_mut(&id) { Some(stream) => stream, None => return Err(ProtocolError.into()), }; @@ -136,7 +136,7 @@ impl Streams } else { // The remote may send window updates for streams that the local now // considers closed. It's ok... - if let Some(state) = me.store.get_mut(&id) { + if let Some(state) = me.store.find_mut(&id) { try!(me.actions.send.recv_stream_window_update(frame, state)); } } @@ -191,7 +191,7 @@ impl Streams let mut me = self.inner.lock().unwrap(); let me = &mut *me; - let stream = match me.store.get_mut(&id) { + let stream = match me.store.find_mut(&id) { Some(stream) => stream, None => return Err(UnexpectedFrameType.into()), }; @@ -224,7 +224,7 @@ impl Streams if id.is_zero() { try!(me.actions.recv.expand_connection_window(sz)); } else { - if let Some(state) = me.store.get_mut(&id) { + if let Some(state) = me.store.find_mut(&id) { try!(me.actions.recv.expand_stream_window(id, sz, state)); } } @@ -271,15 +271,14 @@ impl Streams let headers = client::Peer::convert_send_message( id, request, end_of_stream); - me.actions.send.send_headers(&mut stream, headers)?; + let mut stream = me.store.insert(id, stream); + + me.actions.send.send_headers(headers, &mut stream)?; // Given that the stream has been initialized, it should not be in the // closed state. debug_assert!(!stream.state.is_closed()); - // Store the state - me.store.insert(id, stream); - id };