From 314b7a1848840ed8eee5c31bdf2b9c53a2c0b1bc Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 8 Aug 2017 13:32:36 -0700 Subject: [PATCH] Wire in PushPromise --- src/frame/headers.rs | 8 +++ src/proto/connection.rs | 5 +- src/proto/streams/prioritize.rs | 50 +++----------- src/proto/streams/recv.rs | 113 ++++++++++++++++++++++++++++---- src/proto/streams/send.rs | 4 +- src/proto/streams/state.rs | 34 +++++++++- src/proto/streams/store.rs | 93 +++++++++++++++++++++++++- src/proto/streams/stream.rs | 13 +++- src/proto/streams/streams.rs | 51 +++++--------- 9 files changed, 271 insertions(+), 100 deletions(-) diff --git a/src/frame/headers.rs b/src/frame/headers.rs index ab13fd6..a181e18 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -320,6 +320,14 @@ impl PushPromise { flags: flags, }) } + + pub fn stream_id(&self) -> StreamId { + self.stream_id + } + + pub fn promised_id(&self) -> StreamId { + self.promised_id + } } impl From for Frame { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 7851716..13d1312 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -155,11 +155,8 @@ impl Connection */ } Some(PushPromise(frame)) => { - // TODO: implement - /* trace!("recv PUSH_PROMISE; frame={:?}", frame); - try!(self.streams.recv_push_promise(frame)); - */ + self.streams.recv_push_promise(frame)?; } Some(Settings(frame)) => { trace!("recv SETTINGS; frame={:?}", frame); diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index d88548d..667bbf3 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -2,24 +2,18 @@ use super::*; #[derive(Debug)] pub(super) struct Prioritize { - pending_send: Option, + pending_send: store::List, /// Holds frames that are waiting to be written to the socket buffer: Buffer, } -#[derive(Debug, Clone, Copy)] -struct Indices { - head: store::Key, - tail: store::Key, -} - impl Prioritize where B: Buf, { pub fn new() -> Prioritize { Prioritize { - pending_send: None, + pending_send: store::List::new(), buffer: Buffer::new(), } } @@ -32,7 +26,7 @@ impl Prioritize stream.pending_send.push_back(&mut self.buffer, frame); if stream.is_pending_send { - debug_assert!(self.pending_send.is_some()); + debug_assert!(!self.pending_send.is_empty()); // Already queued to have frame processed. return; @@ -84,44 +78,20 @@ impl Prioritize } fn push_sender(&mut self, stream: &mut store::Ptr) { - // The next pointer shouldn't be set - debug_assert!(stream.next_pending_send.is_none()); + debug_assert!(!stream.is_pending_send); - // Queue the stream - match self.pending_send { - Some(ref mut idxs) => { - // Update the current tail node to point to `stream` - stream.resolve(idxs.tail).next_pending_send = Some(stream.key()); - - // Update the tail pointer - idxs.tail = stream.key(); - } - None => { - self.pending_send = Some(Indices { - head: stream.key(), - tail: stream.key(), - }); - } - } + self.pending_send.push(stream); stream.is_pending_send = true; } fn pop_sender<'a>(&mut self, store: &'a mut Store) -> Option> { - if let Some(mut idxs) = self.pending_send { - let mut stream = store.resolve(idxs.head); - - if idxs.head == idxs.tail { - assert!(stream.next_pending_send.is_none()); - self.pending_send = None; - } else { - idxs.head = stream.next_pending_send.take().unwrap(); - self.pending_send = Some(idxs); + match self.pending_send.pop(store) { + Some(mut stream) => { + stream.is_pending_send = false; + Some(stream) } - - return Some(stream); + None => None, } - - None } } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 8f319c0..318dbc6 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -21,6 +21,8 @@ pub(super) struct Recv { /// Connection level flow control governing received data flow_control: FlowControl, + /// Streams that have pending window updates + /// TODO: don't use a VecDeque pending_window_updates: VecDeque, /// Holds frames that are waiting to be read @@ -38,6 +40,12 @@ pub(super) struct Chunk { pub pending_recv: buffer::Deque, } +#[derive(Debug, Clone, Copy)] +struct Indices { + head: store::Key, + tail: store::Key, +} + impl Recv where P: Peer, B: Buf, @@ -63,16 +71,11 @@ impl Recv try!(self.ensure_can_open(id)); - if let Some(max) = self.max_streams { - if max <= self.num_streams { - self.refused = Some(id); - return Ok(None); - } + if !self.can_inc_num_streams() { + self.refused = Some(id); + return Ok(None); } - // Increment the number of remote initiated streams - self.num_streams += 1; - Ok(Some(Stream::new(id))) } @@ -82,7 +85,16 @@ impl Recv stream: &mut store::Ptr) -> Result, ConnectionError> { - stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?; + let is_initial = stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?; + + if is_initial { + if !self.can_inc_num_streams() { + unimplemented!(); + } + + // Increment the number of concurrent streams + self.inc_num_streams(); + } // Only servers can receive a headers frame that initiates the stream. // This is verified in `Streams` before calling this function. @@ -105,7 +117,7 @@ impl Recv pub fn recv_data(&mut self, frame: frame::Data, - stream: &mut Stream) + stream: &mut store::Ptr) -> Result<(), ConnectionError> { let sz = frame.payload().len(); @@ -143,6 +155,48 @@ impl Recv Ok(()) } + pub fn recv_push_promise(&mut self, frame: frame::PushPromise, stream: &mut store::Ptr) + -> Result<(), ConnectionError> + { + // First, make sure that the values are legit + self.ensure_can_reserve(frame.promised_id())?; + + // Make sure that the stream state is valid + stream.state.ensure_recv_open()?; + + // TODO: Streams in the reserved states do not count towards the concurrency + // limit. However, it seems like there should be a cap otherwise this + // could grow in memory indefinitely. + + /* + if !self.inc_num_streams() { + self.refused = Some(frame.promised_id()); + return Ok(()); + } + */ + + // TODO: All earlier stream IDs should be implicitly closed. + + // Now, create a new entry for the stream + let mut new_stream = Stream::new(frame.promised_id()); + new_stream.state.reserve_remote(); + + let mut ppp = stream.pending_push_promises.take(); + + { + // Store the stream + let mut new_stream = stream.store() + .insert(frame.promised_id(), new_stream); + + ppp.push(&mut new_stream); + } + + stream.pending_push_promises = ppp; + stream.notify_recv(); + + Ok(()) + } + pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream) { // Receive an error stream.state.recv_err(err); @@ -151,6 +205,26 @@ impl Recv stream.notify_recv(); } + /// Returns true if the current stream concurrency can be incremetned + fn can_inc_num_streams(&self) -> bool { + if let Some(max) = self.max_streams { + max > self.num_streams + } else { + true + } + } + + /// Increments the number of concurrenty streams. Panics on failure as this + /// should have been validated before hand. + fn inc_num_streams(&mut self) { + if !self.can_inc_num_streams() { + panic!(); + } + + // Increment the number of remote initiated streams + self.num_streams += 1; + } + pub fn dec_num_streams(&mut self) { self.num_streams -= 1; } @@ -171,6 +245,21 @@ impl Recv Ok(()) } + /// Returns true if the remote peer can reserve a stream with the given ID. + fn ensure_can_reserve(&self, promised_id: StreamId) -> Result<(), ConnectionError> { + // TODO: Are there other rules? + if P::is_server() { + // The remote is a client and cannot reserve + return Err(ProtocolError.into()); + } + + if !promised_id.is_server_initiated() { + return Err(ProtocolError.into()); + } + + Ok(()) + } + /// Send any pending refusals. pub fn send_pending_refusal(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> @@ -206,7 +295,7 @@ impl Recv pub fn expand_stream_window(&mut self, id: StreamId, sz: WindowSize, - stream: &mut Stream) + stream: &mut store::Ptr) -> Result<(), ConnectionError> { // TODO: handle overflow @@ -276,7 +365,7 @@ impl Recv { while let Some(id) = self.pending_window_updates.pop_front() { let flow = streams.find_mut(&id) - .and_then(|stream| stream.recv_flow_control()); + .and_then(|stream| stream.into_mut().recv_flow_control()); if let Some(flow) = flow { diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 8650a31..de7bc6c 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -177,7 +177,7 @@ impl Send let update = self.pending_window_updates.pop_front() .and_then(|id| { streams.find_mut(&id) - .and_then(|stream| stream.send_flow_control()) + .and_then(|stream| stream.into_mut().send_flow_control()) .and_then(|flow| flow.apply_window_update()) .map(|incr| WindowUpdate::new(id, incr)) }); @@ -209,7 +209,7 @@ impl Send pub fn recv_stream_window_update(&mut self, frame: frame::WindowUpdate, - stream: &mut Stream) + stream: &mut store::Ptr) -> Result<(), ConnectionError> { if let Some(flow) = stream.send_flow_control() { diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index b39bd14..3c0d903 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -58,7 +58,7 @@ enum Inner { Idle, // TODO: these states shouldn't count against concurrency limits: //ReservedLocal, - //ReservedRemote, + ReservedRemote, Open { local: Peer, remote: Peer, @@ -126,11 +126,16 @@ impl State { /// Open the receive have of the stream, this action is taken when a HEADERS /// frame is received. - pub fn recv_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> { + /// + /// Returns true if this transitions the state to Open + pub fn recv_open(&mut self, sz: WindowSize, eos: bool) -> Result { let remote = Peer::streaming(sz); + let mut initial = false; self.inner = match self.inner { Idle => { + initial = true; + if eos { HalfClosedRemote(AwaitingHeaders) } else { @@ -140,6 +145,18 @@ impl State { } } } + ReservedRemote => { + initial = true; + + if eos { + Closed(None) + } else { + Open { + local: AwaitingHeaders, + remote, + } + } + } Open { local, remote: AwaitingHeaders } => { if eos { HalfClosedRemote(local) @@ -163,7 +180,18 @@ impl State { } }; - return Ok(()); + return Ok(initial); + } + + /// Transition from Idle -> ReservedRemote + pub fn reserve_remote(&mut self) -> Result<(), ConnectionError> { + match self.inner { + Idle => { + self.inner = ReservedRemote; + Ok(()) + } + _ => Err(ProtocolError.into()), + } } /// Indicates that the remote side will not send more data to the local. diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 6821ff8..89900b8 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -4,6 +4,7 @@ use slab; use std::ops; use std::collections::{HashMap, hash_map}; +use std::marker::PhantomData; /// Storage for streams #[derive(Debug)] @@ -22,6 +23,19 @@ pub(super) struct Ptr<'a, B: 'a> { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(super) struct Key(usize); +#[derive(Debug)] +pub(super) struct List { + indices: Option, + _p: PhantomData, +} + +/// A linked list +#[derive(Debug, Clone, Copy)] +struct Indices { + pub head: Key, + pub tail: Key, +} + pub(super) enum Entry<'a, B: 'a> { Occupied(OccupiedEntry<'a, B>), Vacant(VacantEntry<'a, B>), @@ -54,9 +68,12 @@ impl Store { } } - pub fn find_mut(&mut self, id: &StreamId) -> Option<&mut Stream> { - if let Some(handle) = self.ids.get(id) { - Some(&mut self.slab[*handle]) + pub fn find_mut(&mut self, id: &StreamId) -> Option> { + if let Some(&key) = self.ids.get(id) { + Some(Ptr { + key: Key(key), + store: self, + }) } else { None } @@ -100,6 +117,68 @@ impl Store { } } +// ===== impl List ===== + +impl List { + pub fn new() -> Self { + List { + indices: None, + _p: PhantomData, + } + } + + pub fn is_empty(&self) -> bool { + self.indices.is_none() + } + + pub fn take(&mut self) -> Self { + List { + indices: self.indices.take(), + _p: PhantomData, + } + } + + pub fn push(&mut self, stream: &mut store::Ptr) { + // The next pointer shouldn't be set + debug_assert!(stream.next.is_none()); + + // Queue the stream + match self.indices { + Some(ref mut idxs) => { + // Update the current tail node to point to `stream` + stream.resolve(idxs.tail).next = Some(stream.key()); + + // Update the tail pointer + idxs.tail = stream.key(); + } + None => { + self.indices = Some(store::Indices { + head: stream.key(), + tail: stream.key(), + }); + } + } + } + + pub fn pop<'a>(&mut self, store: &'a mut Store) -> Option> { + if let Some(mut idxs) = self.indices { + let mut stream = store.resolve(idxs.head); + + if idxs.head == idxs.tail { + assert!(stream.next.is_none()); + self.indices = None; + } else { + idxs.head = stream.next.take().unwrap(); + self.indices = Some(idxs); + } + + return Some(stream); + } + + None + } +} + // ===== impl Ptr ===== impl<'a, B: 'a> Ptr<'a, B> { @@ -107,12 +186,20 @@ impl<'a, B: 'a> Ptr<'a, B> { self.key } + pub fn store(&mut self) -> &mut Store { + &mut self.store + } + pub fn resolve(&mut self, key: Key) -> Ptr { Ptr { key: key, store: self.store, } } + + pub fn into_mut(self) -> &'a mut Stream { + &mut self.store.slab[self.key.0] + } } impl<'a, B: 'a> ops::Deref for Ptr<'a, B> { diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index d26266c..7518a48 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -17,8 +17,14 @@ pub(super) struct Stream { /// Frames pending for this stream being sent to the socket pub pending_send: buffer::Deque, - /// Next stream pending send - pub next_pending_send: Option, + /// Next node in the `Stream` linked list. + /// + /// This field is used in different linked lists depending on the stream + /// state. + pub next: Option, + + /// The stream's pending push promises + pub pending_push_promises: store::List, /// True if the stream is currently pending send pub is_pending_send: bool, @@ -32,7 +38,8 @@ impl Stream { pending_recv: buffer::Deque::new(), recv_task: None, pending_send: buffer::Deque::new(), - next_pending_send: None, + next: None, + pending_push_promises: store::List::new(), is_pending_send: false, } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index edfd058..57954a9 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -119,14 +119,14 @@ impl Streams let mut me = self.inner.lock().unwrap(); let me = &mut *me; - let stream = match me.store.find_mut(&id) { + let mut stream = match me.store.find_mut(&id) { Some(stream) => stream, None => return Err(ProtocolError.into()), }; // Ensure there's enough capacity on the connection before acting on the // stream. - try!(me.actions.recv.recv_data(frame, stream)); + try!(me.actions.recv.recv_data(frame, &mut stream)); if stream.state.is_closed() { me.actions.dec_num_streams(id); @@ -160,18 +160,28 @@ impl Streams } else { // The remote may send window updates for streams that the local now // considers closed. It's ok... - if let Some(state) = me.store.find_mut(&id) { - try!(me.actions.send.recv_stream_window_update(frame, state)); + if let Some(mut stream) = me.store.find_mut(&id) { + try!(me.actions.send.recv_stream_window_update(frame, &mut stream)); } } Ok(()) } - pub fn recv_push_promise(&mut self, _frame: frame::PushPromise) + pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), ConnectionError> { - unimplemented!(); + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let id = frame.stream_id(); + + let mut stream = match me.store.find_mut(&id) { + Some(stream) => stream, + None => return Err(ProtocolError.into()), + }; + + me.actions.recv.recv_push_promise(frame, &mut stream) } pub fn send_headers(&mut self, headers: frame::Headers) @@ -208,31 +218,6 @@ impl Streams */ } - /* - pub fn send_data(&mut self, frame: &frame::Data) - -> Result<(), ConnectionError> - { - let id = frame.stream_id(); - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - let stream = match me.store.find_mut(&id) { - Some(stream) => stream, - None => return Err(UnexpectedFrameType.into()), - }; - - // Ensure there's enough capacity on the connection before acting on the - // stream. - try!(me.actions.send.send_data(frame, stream)); - - if stream.state.is_closed() { - me.actions.dec_num_streams(id); - } - - Ok(()) - } - */ - pub fn poll_window_update(&mut self) -> Poll { @@ -250,8 +235,8 @@ impl Streams if id.is_zero() { try!(me.actions.recv.expand_connection_window(sz)); } else { - if let Some(state) = me.store.find_mut(&id) { - try!(me.actions.recv.expand_stream_window(id, sz, state)); + if let Some(mut stream) = me.store.find_mut(&id) { + try!(me.actions.recv.expand_stream_window(id, sz, &mut stream)); } }