Wire in PushPromise

This commit is contained in:
Carl Lerche
2017-08-08 13:32:36 -07:00
parent fa66323cec
commit 314b7a1848
9 changed files with 271 additions and 100 deletions

View File

@@ -21,6 +21,8 @@ pub(super) struct Recv<P, B> {
/// Connection level flow control governing received data
flow_control: FlowControl,
/// Streams that have pending window updates
/// TODO: don't use a VecDeque
pending_window_updates: VecDeque<StreamId>,
/// Holds frames that are waiting to be read
@@ -38,6 +40,12 @@ pub(super) struct Chunk {
pub pending_recv: buffer::Deque<Bytes>,
}
#[derive(Debug, Clone, Copy)]
struct Indices {
head: store::Key,
tail: store::Key,
}
impl<P, B> Recv<P, B>
where P: Peer,
B: Buf,
@@ -63,16 +71,11 @@ impl<P, B> Recv<P, B>
try!(self.ensure_can_open(id));
if let Some(max) = self.max_streams {
if max <= self.num_streams {
self.refused = Some(id);
return Ok(None);
}
if !self.can_inc_num_streams() {
self.refused = Some(id);
return Ok(None);
}
// Increment the number of remote initiated streams
self.num_streams += 1;
Ok(Some(Stream::new(id)))
}
@@ -82,7 +85,16 @@ impl<P, B> Recv<P, B>
stream: &mut store::Ptr<B>)
-> Result<Option<frame::Headers>, ConnectionError>
{
stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?;
let is_initial = stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?;
if is_initial {
if !self.can_inc_num_streams() {
unimplemented!();
}
// Increment the number of concurrent streams
self.inc_num_streams();
}
// Only servers can receive a headers frame that initiates the stream.
// This is verified in `Streams` before calling this function.
@@ -105,7 +117,7 @@ impl<P, B> Recv<P, B>
pub fn recv_data(&mut self,
frame: frame::Data,
stream: &mut Stream<B>)
stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
let sz = frame.payload().len();
@@ -143,6 +155,48 @@ impl<P, B> Recv<P, B>
Ok(())
}
pub fn recv_push_promise(&mut self, frame: frame::PushPromise, stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
// First, make sure that the values are legit
self.ensure_can_reserve(frame.promised_id())?;
// Make sure that the stream state is valid
stream.state.ensure_recv_open()?;
// TODO: Streams in the reserved states do not count towards the concurrency
// limit. However, it seems like there should be a cap otherwise this
// could grow in memory indefinitely.
/*
if !self.inc_num_streams() {
self.refused = Some(frame.promised_id());
return Ok(());
}
*/
// TODO: All earlier stream IDs should be implicitly closed.
// Now, create a new entry for the stream
let mut new_stream = Stream::new(frame.promised_id());
new_stream.state.reserve_remote();
let mut ppp = stream.pending_push_promises.take();
{
// Store the stream
let mut new_stream = stream.store()
.insert(frame.promised_id(), new_stream);
ppp.push(&mut new_stream);
}
stream.pending_push_promises = ppp;
stream.notify_recv();
Ok(())
}
pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream<B>) {
// Receive an error
stream.state.recv_err(err);
@@ -151,6 +205,26 @@ impl<P, B> Recv<P, B>
stream.notify_recv();
}
/// Returns true if the current stream concurrency can be incremetned
fn can_inc_num_streams(&self) -> bool {
if let Some(max) = self.max_streams {
max > self.num_streams
} else {
true
}
}
/// Increments the number of concurrenty streams. Panics on failure as this
/// should have been validated before hand.
fn inc_num_streams(&mut self) {
if !self.can_inc_num_streams() {
panic!();
}
// Increment the number of remote initiated streams
self.num_streams += 1;
}
pub fn dec_num_streams(&mut self) {
self.num_streams -= 1;
}
@@ -171,6 +245,21 @@ impl<P, B> Recv<P, B>
Ok(())
}
/// Returns true if the remote peer can reserve a stream with the given ID.
fn ensure_can_reserve(&self, promised_id: StreamId) -> Result<(), ConnectionError> {
// TODO: Are there other rules?
if P::is_server() {
// The remote is a client and cannot reserve
return Err(ProtocolError.into());
}
if !promised_id.is_server_initiated() {
return Err(ProtocolError.into());
}
Ok(())
}
/// Send any pending refusals.
pub fn send_pending_refusal<T>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
@@ -206,7 +295,7 @@ impl<P, B> Recv<P, B>
pub fn expand_stream_window(&mut self,
id: StreamId,
sz: WindowSize,
stream: &mut Stream<B>)
stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
// TODO: handle overflow
@@ -276,7 +365,7 @@ impl<P, B> Recv<P, B>
{
while let Some(id) = self.pending_window_updates.pop_front() {
let flow = streams.find_mut(&id)
.and_then(|stream| stream.recv_flow_control());
.and_then(|stream| stream.into_mut().recv_flow_control());
if let Some(flow) = flow {