Start working on prioritization

This commit is contained in:
Carl Lerche
2017-08-03 22:44:19 -07:00
parent dd8412d660
commit 74b3852a58
8 changed files with 156 additions and 23 deletions

View File

@@ -296,8 +296,8 @@ impl Headers {
} }
} }
impl From<Headers> for Frame { impl<T> From<Headers> for Frame<T> {
fn from(src: Headers) -> Frame { fn from(src: Headers) -> Self {
Frame::Headers(src) Frame::Headers(src)
} }
} }

View File

@@ -1,5 +1,6 @@
mod buffer; mod buffer;
mod flow_control; mod flow_control;
mod prioritize;
mod recv; mod recv;
mod send; mod send;
mod state; mod state;
@@ -11,6 +12,7 @@ pub use self::streams::{Streams, StreamRef};
use self::buffer::Buffer; use self::buffer::Buffer;
use self::flow_control::FlowControl; use self::flow_control::FlowControl;
use self::prioritize::Prioritize;
use self::recv::Recv; use self::recv::Recv;
use self::send::Send; use self::send::Send;
use self::state::State; use self::state::State;

View File

@@ -0,0 +1,61 @@
use super::*;
#[derive(Debug)]
pub(super) struct Prioritize<B> {
pending_send: Option<Indices>,
/// Holds frames that are waiting to be written to the socket
buffer: Buffer<B>,
}
#[derive(Debug, Clone, Copy)]
struct Indices {
head: store::Key,
tail: store::Key,
}
impl<B> Prioritize<B> {
pub fn new() -> Prioritize<B> {
Prioritize {
pending_send: None,
buffer: Buffer::new(),
}
}
pub fn queue_frame(&mut self,
frame: Frame<B>,
stream: &mut store::Ptr<B>)
{
// queue the frame in the buffer
stream.pending_send.push_back(&mut self.buffer, frame);
if stream.is_pending_send {
debug_assert!(self.pending_send.is_some());
// Already queued to have frame processed.
return;
}
// The next pointer shouldn't be set
debug_assert!(stream.next_pending_send.is_none());
// Queue the stream
match self.pending_send {
Some(ref mut idxs) => {
// Update the current tail node to point to `stream`
stream.resolve(idxs.tail).next_pending_send = Some(stream.key());
// Update the tail pointer
idxs.tail = stream.key();
}
None => {
self.pending_send = Some(Indices {
head: stream.key(),
tail: stream.key(),
});
}
}
stream.is_pending_send = true;
}
}

View File

@@ -208,7 +208,7 @@ impl<P, B> Recv<P, B>
where T: AsyncWrite, where T: AsyncWrite,
{ {
while let Some(id) = self.pending_window_updates.pop_front() { while let Some(id) = self.pending_window_updates.pop_front() {
let flow = streams.get_mut(&id) let flow = streams.find_mut(&id)
.and_then(|stream| stream.recv_flow_control()); .and_then(|stream| stream.recv_flow_control());

View File

@@ -30,8 +30,7 @@ pub(super) struct Send<P, B> {
// XXX It would be cool if this didn't exist. // XXX It would be cool if this didn't exist.
pending_window_updates: VecDeque<StreamId>, pending_window_updates: VecDeque<StreamId>,
/// Holds frames that are waiting to be written to the socket prioritize: Prioritize<B>,
buffer: Buffer<B>,
/// When `poll_window_update` is not ready, then the calling task is saved to /// When `poll_window_update` is not ready, then the calling task is saved to
/// be notified later. Access to poll_window_update must not be shared across tasks, /// be notified later. Access to poll_window_update must not be shared across tasks,
@@ -58,8 +57,8 @@ impl<P, B> Send<P, B>
next_stream_id: next_stream_id.into(), next_stream_id: next_stream_id.into(),
init_window_sz: config.init_local_window_sz, init_window_sz: config.init_local_window_sz,
flow_control: FlowControl::new(config.init_local_window_sz), flow_control: FlowControl::new(config.init_local_window_sz),
prioritize: Prioritize::new(),
pending_window_updates: VecDeque::new(), pending_window_updates: VecDeque::new(),
buffer: Buffer::new(),
blocked: None, blocked: None,
_p: PhantomData, _p: PhantomData,
} }
@@ -86,12 +85,17 @@ impl<P, B> Send<P, B>
Ok(ret) Ok(ret)
} }
pub fn send_headers(&mut self, stream: &mut Stream<B>, frame: frame::Headers) pub fn send_headers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
// Update the state // Update the state
stream.state.send_open(self.init_window_sz, frame.is_end_stream())?; stream.state.send_open(self.init_window_sz, frame.is_end_stream())?;
// stream.send_buf.headers = Some(frame);
// Queue the frame for sending
self.prioritize.queue_frame(frame.into(), stream);
Ok(()) Ok(())
} }
@@ -161,7 +165,7 @@ impl<P, B> Send<P, B>
// TODO this should probably account for stream priority? // TODO this should probably account for stream priority?
let update = self.pending_window_updates.pop_front() let update = self.pending_window_updates.pop_front()
.and_then(|id| { .and_then(|id| {
streams.get_mut(&id) streams.find_mut(&id)
.and_then(|stream| stream.send_flow_control()) .and_then(|stream| stream.send_flow_control())
.and_then(|flow| flow.apply_window_update()) .and_then(|flow| flow.apply_window_update())
.map(|incr| WindowUpdate::new(id, incr)) .map(|incr| WindowUpdate::new(id, incr))

View File

@@ -2,6 +2,7 @@ use super::*;
use slab; use slab;
use std::ops;
use std::collections::{HashMap, hash_map}; use std::collections::{HashMap, hash_map};
/// Storage for streams /// Storage for streams
@@ -11,6 +12,16 @@ pub(super) struct Store<B> {
ids: HashMap<StreamId, usize>, ids: HashMap<StreamId, usize>,
} }
/// "Pointer" to an entry in the store
pub(super) struct Ptr<'a, B: 'a> {
key: Key,
store: &'a mut Store<B>,
}
/// References an entry in the store.
#[derive(Debug, Clone, Copy)]
pub(super) struct Key(usize);
pub(super) enum Entry<'a, B: 'a> { pub(super) enum Entry<'a, B: 'a> {
Occupied(OccupiedEntry<'a, B>), Occupied(OccupiedEntry<'a, B>),
Vacant(VacantEntry<'a, B>), Vacant(VacantEntry<'a, B>),
@@ -26,6 +37,8 @@ pub(super) struct VacantEntry<'a, B: 'a> {
slab: &'a mut slab::Slab<Stream<B>>, slab: &'a mut slab::Slab<Stream<B>>,
} }
// ===== impl Store =====
impl<B> Store<B> { impl<B> Store<B> {
pub fn new() -> Self { pub fn new() -> Self {
Store { Store {
@@ -34,7 +47,7 @@ impl<B> Store<B> {
} }
} }
pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut Stream<B>> { pub fn find_mut(&mut self, id: &StreamId) -> Option<&mut Stream<B>> {
if let Some(handle) = self.ids.get(id) { if let Some(handle) = self.ids.get(id) {
Some(&mut self.slab[*handle]) Some(&mut self.slab[*handle])
} else { } else {
@@ -42,12 +55,17 @@ impl<B> Store<B> {
} }
} }
pub fn insert(&mut self, id: StreamId, val: Stream<B>) { pub fn insert(&mut self, id: StreamId, val: Stream<B>) -> Ptr<B> {
let handle = self.slab.insert(val); let key = self.slab.insert(val);
assert!(self.ids.insert(id, handle).is_none()); assert!(self.ids.insert(id, key).is_none());
Ptr {
key: Key(key),
store: self,
}
} }
pub fn entry(&mut self, id: StreamId) -> Entry<B> { pub fn find_entry(&mut self, id: StreamId) -> Entry<B> {
use self::hash_map::Entry::*; use self::hash_map::Entry::*;
match self.ids.entry(id) { match self.ids.entry(id) {
@@ -67,12 +85,53 @@ impl<B> Store<B> {
} }
} }
// ===== impl Ptr =====
impl<'a, B: 'a> Ptr<'a, B> {
pub fn key(&self) -> Key {
self.key
}
pub fn resolve(&mut self, key: Key) -> Ptr<B> {
Ptr {
key: key,
store: self.store,
}
}
}
impl<'a, B: 'a> ops::Deref for Ptr<'a, B> {
type Target = Stream<B>;
fn deref(&self) -> &Stream<B> {
&self.store.slab[self.key.0]
}
}
impl<'a, B: 'a> ops::DerefMut for Ptr<'a, B> {
fn deref_mut(&mut self) -> &mut Stream<B> {
&mut self.store.slab[self.key.0]
}
}
// ===== impl OccupiedEntry =====
impl<'a, B> OccupiedEntry<'a, B> { impl<'a, B> OccupiedEntry<'a, B> {
pub fn get(&self) -> &Stream<B> {
&self.slab[*self.ids.get()]
}
pub fn get_mut(&mut self) -> &mut Stream<B> {
&mut self.slab[*self.ids.get()]
}
pub fn into_mut(self) -> &'a mut Stream<B> { pub fn into_mut(self) -> &'a mut Stream<B> {
&mut self.slab[*self.ids.get()] &mut self.slab[*self.ids.get()]
} }
} }
// ===== impl VacantEntry =====
//
impl<'a, B> VacantEntry<'a, B> { impl<'a, B> VacantEntry<'a, B> {
pub fn insert(self, value: Stream<B>) -> &'a mut Stream<B> { pub fn insert(self, value: Stream<B>) -> &'a mut Stream<B> {
// Insert the value in the slab // Insert the value in the slab

View File

@@ -7,6 +7,12 @@ pub(super) struct Stream<B> {
/// Frames pending for this stream being sent to the socket /// Frames pending for this stream being sent to the socket
pub pending_send: buffer::Deque<B>, pub pending_send: buffer::Deque<B>,
/// Next stream pending send
pub next_pending_send: Option<store::Key>,
/// True if the stream is currently pending send
pub is_pending_send: bool,
} }
impl<B> Stream<B> { impl<B> Stream<B> {
@@ -14,6 +20,8 @@ impl<B> Stream<B> {
Stream { Stream {
state: State::default(), state: State::default(),
pending_send: buffer::Deque::new(), pending_send: buffer::Deque::new(),
next_pending_send: None,
is_pending_send: false,
} }
} }

View File

@@ -60,7 +60,7 @@ impl<P, B> Streams<P, B>
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let stream = match me.store.entry(id) { let stream = match me.store.find_entry(id) {
Entry::Occupied(e) => e.into_mut(), Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => { Entry::Vacant(e) => {
// Trailers cannot open a stream. Trailers are header frames // Trailers cannot open a stream. Trailers are header frames
@@ -103,7 +103,7 @@ impl<P, B> Streams<P, B>
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let stream = match me.store.get_mut(&id) { let stream = match me.store.find_mut(&id) {
Some(stream) => stream, Some(stream) => stream,
None => return Err(ProtocolError.into()), None => return Err(ProtocolError.into()),
}; };
@@ -136,7 +136,7 @@ impl<P, B> Streams<P, B>
} else { } else {
// The remote may send window updates for streams that the local now // The remote may send window updates for streams that the local now
// considers closed. It's ok... // considers closed. It's ok...
if let Some(state) = me.store.get_mut(&id) { if let Some(state) = me.store.find_mut(&id) {
try!(me.actions.send.recv_stream_window_update(frame, state)); try!(me.actions.send.recv_stream_window_update(frame, state));
} }
} }
@@ -191,7 +191,7 @@ impl<P, B> Streams<P, B>
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let stream = match me.store.get_mut(&id) { let stream = match me.store.find_mut(&id) {
Some(stream) => stream, Some(stream) => stream,
None => return Err(UnexpectedFrameType.into()), None => return Err(UnexpectedFrameType.into()),
}; };
@@ -224,7 +224,7 @@ impl<P, B> Streams<P, B>
if id.is_zero() { if id.is_zero() {
try!(me.actions.recv.expand_connection_window(sz)); try!(me.actions.recv.expand_connection_window(sz));
} else { } else {
if let Some(state) = me.store.get_mut(&id) { if let Some(state) = me.store.find_mut(&id) {
try!(me.actions.recv.expand_stream_window(id, sz, state)); try!(me.actions.recv.expand_stream_window(id, sz, state));
} }
} }
@@ -271,15 +271,14 @@ impl<B> Streams<client::Peer, B>
let headers = client::Peer::convert_send_message( let headers = client::Peer::convert_send_message(
id, request, end_of_stream); id, request, end_of_stream);
me.actions.send.send_headers(&mut stream, headers)?; let mut stream = me.store.insert(id, stream);
me.actions.send.send_headers(headers, &mut stream)?;
// Given that the stream has been initialized, it should not be in the // Given that the stream has been initialized, it should not be in the
// closed state. // closed state.
debug_assert!(!stream.state.is_closed()); debug_assert!(!stream.state.is_closed());
// Store the state
me.store.insert(id, stream);
id id
}; };