Misc protocol fixes

* Verify contiuation frame stream ID
* Fix sending RST_STREAM in certain cases.
This commit is contained in:
Carl Lerche
2017-08-25 10:21:47 -07:00
parent 0c8a94aa11
commit 91aa1db2ff
6 changed files with 83 additions and 28 deletions

View File

@@ -133,6 +133,11 @@ impl<T> FramedRead<T> {
match partial.frame {
Continuable::Headers(mut frame) => {
// The stream identifiers must match
if frame.stream_id() != head.stream_id() {
return Err(ProtocolError.into());
}
frame.load_hpack(partial.buf, &mut self.hpack)?;
frame.into()
}

View File

@@ -1,4 +1,5 @@
use super::*;
use super::store::Resolve;
use bytes::buf::Take;
use futures::Sink;
@@ -179,13 +180,23 @@ impl<B> Prioritize<B>
{
// Update the connection's window
self.flow.inc_window(inc)?;
self.assign_connection_capacity(inc, store);
Ok(())
}
pub fn assign_connection_capacity<R>(&mut self,
inc: WindowSize,
store: &mut R)
where R: Resolve<B>
{
self.flow.assign_capacity(inc);
// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
let mut stream = match self.pending_capacity.pop(store) {
Some(stream) => stream,
None => return Ok(()),
None => return,
};
// Try to assign capacity to the stream. This will also re-queue the
@@ -193,8 +204,6 @@ impl<B> Prioritize<B>
// the capacity request.
self.try_assign_capacity(&mut stream);
}
Ok(())
}
/// Request capacity to send data

View File

@@ -101,7 +101,10 @@ impl<B> Send<B> where B: Buf {
Ok(())
}
pub fn send_reset(&mut self, reason: Reason,
/// This is called by the user to send a reset and should not be called
/// by internal state transitions. Use `reset_stream` for that.
pub fn send_reset(&mut self,
reason: Reason,
stream: &mut store::Ptr<B>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
@@ -111,17 +114,44 @@ impl<B> Send<B> where B: Buf {
return Err(InactiveStreamId.into())
}
stream.state.send_reset(reason)?;
self.reset_stream(reason, stream, task);
Ok(())
}
fn reset_stream(&mut self,
reason: Reason,
stream: &mut store::Ptr<B>,
task: &mut Option<Task>)
{
if stream.state.is_reset() {
// Don't double reset
return;
}
// If closed AND the send queue is flushed, then the stream cannot be
// reset either
if stream.state.is_closed() && stream.pending_send.is_empty() {
return;
}
// Transition the state
stream.state.set_reset(reason);
// Clear all pending outbound frames
self.prioritize.clear_queue(stream);
// Reclaim all capacity assigned to the stream and re-assign it to the
// connection
let available = stream.send_flow.available();
stream.send_flow.claim_capacity(available);
let frame = frame::Reset::new(stream.id, reason);
// TODO: This could impact connection level flow control.
self.prioritize.clear_queue(stream);
trace!("send_reset -- queueing; frame={:?}", frame);
self.prioritize.queue_frame(frame.into(), stream, task);
Ok(())
// Re-assign all capacity to the connection
self.prioritize.assign_connection_capacity(available, stream);
}
pub fn send_data(&mut self,
@@ -210,7 +240,7 @@ impl<B> Send<B> where B: Buf {
{
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
debug!("recv_stream_window_update !!; err={:?}", e);
self.send_reset(FlowControlError.into(), stream, task)?;
self.reset_stream(FlowControlError.into(), stream, task);
}
Ok(())

View File

@@ -241,16 +241,17 @@ impl State {
}
}
/// Indicates that the local side will not send more data to the local.
pub fn send_reset(&mut self, reason: Reason) -> Result<(), ConnectionError> {
/// Set the stream state to reset
pub fn set_reset(&mut self, reason: Reason) {
debug_assert!(!self.is_reset());
self.inner = Closed(Some(Cause::Proto(reason)));
}
/// Returns true if the stream is already reset.
pub fn is_reset(&self) -> bool {
match self.inner {
Idle => Err(ProtocolError.into()),
Closed(..) => Ok(()),
_ => {
trace!("send_reset: => Closed");
self.inner = Closed(Some(Cause::Proto(reason)));
Ok(())
}
Closed(Some(_)) => true,
_ => false,
}
}

View File

@@ -62,6 +62,10 @@ pub(super) struct VacantEntry<'a, B: 'a> {
slab: &'a mut slab::Slab<Stream<B>>,
}
pub(super) trait Resolve<B> {
fn resolve(&mut self, key: Key) -> Ptr<B>;
}
// ===== impl Store =====
impl<B> Store<B> {
@@ -72,13 +76,6 @@ impl<B> Store<B> {
}
}
pub fn resolve(&mut self, key: Key) -> Ptr<B> {
Ptr {
key: key,
slab: &mut self.slab,
}
}
pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<B>> {
if let Some(&key) = self.ids.get(id) {
Some(Ptr {
@@ -132,6 +129,15 @@ impl<B> Store<B> {
}
}
impl<B> Resolve<B> for Store<B> {
fn resolve(&mut self, key: Key) -> Ptr<B> {
Ptr {
key: key,
slab: &mut self.slab,
}
}
}
impl<B> ops::Index<Key> for Store<B> {
type Output = Stream<B>;
@@ -209,7 +215,8 @@ impl<B, N> Queue<B, N>
true
}
pub fn pop<'a>(&mut self, store: &'a mut Store<B>) -> Option<store::Ptr<'a, B>>
pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a, B>>
where R: Resolve<B>
{
if let Some(mut idxs) = self.indices {
let mut stream = store.resolve(idxs.head);
@@ -238,8 +245,10 @@ impl<'a, B: 'a> Ptr<'a, B> {
pub fn key(&self) -> Key {
self.key
}
}
pub fn resolve(&mut self, key: Key) -> Ptr<B> {
impl<'a, B: 'a> Resolve<B> for Ptr<'a, B> {
fn resolve(&mut self, key: Key) -> Ptr<B> {
Ptr {
key: key,
slab: &mut *self.slab,

View File

@@ -1,6 +1,7 @@
use {client, server, HeaderMap};
use proto::*;
use super::*;
use super::store::Resolve;
use std::sync::{Arc, Mutex};