Handle more H2 details

This commit is contained in:
Carl Lerche
2017-08-10 21:08:57 -07:00
parent e2fac3e823
commit c439232ed2
5 changed files with 64 additions and 4 deletions

View File

@@ -36,6 +36,7 @@ pub enum DecoderError {
IntegerOverflow, IntegerOverflow,
StringUnderflow, StringUnderflow,
RepeatedPseudo, RepeatedPseudo,
UnexpectedEndOfStream,
} }
enum Representation { enum Representation {
@@ -277,6 +278,10 @@ impl Decoder {
fn decode_string(&mut self, buf: &mut Cursor<Bytes>) -> Result<Bytes, DecoderError> { fn decode_string(&mut self, buf: &mut Cursor<Bytes>) -> Result<Bytes, DecoderError> {
const HUFF_FLAG: u8 = 0b10000000; const HUFF_FLAG: u8 = 0b10000000;
if !buf.has_remaining() {
return Err(DecoderError::UnexpectedEndOfStream);
}
// The first bit in the first byte contains the huffman encoded flag. // The first bit in the first byte contains the huffman encoded flag.
let huff = peek_u8(buf) & HUFF_FLAG == HUFF_FLAG; let huff = peek_u8(buf) & HUFF_FLAG == HUFF_FLAG;

View File

@@ -85,6 +85,9 @@ pub fn from_framed_write<T, P, B>(framed_write: FramedWrite<T, B::Buf>)
.length_field_length(3) .length_field_length(3)
.length_adjustment(9) .length_adjustment(9)
.num_skip(0) // Don't skip the header .num_skip(0) // Don't skip the header
// TODO: make this configurable and allow it to be changed during
// runtime.
.max_frame_length(frame::DEFAULT_MAX_FRAME_SIZE as usize)
.new_read(framed_write); .new_read(framed_write);
let codec = FramedRead::new(framed); let codec = FramedRead::new(framed);

View File

@@ -21,6 +21,9 @@ pub(super) struct Recv<B> {
/// Connection level flow control governing received data /// Connection level flow control governing received data
flow_control: FlowControl, flow_control: FlowControl,
/// The lowest stream ID that is still idle
next_stream_id: StreamId,
/// Streams that have pending window updates /// Streams that have pending window updates
/// TODO: don't use a VecDeque /// TODO: don't use a VecDeque
pending_window_updates: VecDeque<StreamId>, pending_window_updates: VecDeque<StreamId>,
@@ -50,12 +53,19 @@ struct Indices {
} }
impl<B> Recv<B> where B: Buf { impl<B> Recv<B> where B: Buf {
pub fn new(config: &Config) -> Self { pub fn new<P: Peer>(config: &Config) -> Self {
let next_stream_id = if P::is_server() {
1
} else {
2
};
Recv { Recv {
max_streams: config.max_remote_initiated, max_streams: config.max_remote_initiated,
num_streams: 0, num_streams: 0,
init_window_sz: config.init_remote_window_sz, init_window_sz: config.init_remote_window_sz,
flow_control: FlowControl::new(config.init_remote_window_sz), flow_control: FlowControl::new(config.init_remote_window_sz),
next_stream_id: next_stream_id.into(),
pending_window_updates: VecDeque::new(), pending_window_updates: VecDeque::new(),
pending_accept: store::List::new(), pending_accept: store::List::new(),
buffer: Buffer::new(), buffer: Buffer::new(),
@@ -129,6 +139,13 @@ impl<B> Recv<B> where B: Buf {
unimplemented!(); unimplemented!();
} }
if frame.stream_id() >= self.next_stream_id {
self.next_stream_id = frame.stream_id();
self.next_stream_id.increment();
} else {
return Err(ProtocolError.into());
}
// Increment the number of concurrent streams // Increment the number of concurrent streams
self.inc_num_streams(); self.inc_num_streams();
} }
@@ -246,6 +263,14 @@ impl<B> Recv<B> where B: Buf {
Ok(()) Ok(())
} }
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), ConnectionError> {
if id >= self.next_stream_id {
return Err(ProtocolError.into());
}
Ok(())
}
pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream<B>) pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {

View File

@@ -273,6 +273,14 @@ impl<B> Send<B> where B: Buf {
0 0
} }
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), ConnectionError> {
if id >= self.next_stream_id {
return Err(ProtocolError.into());
}
Ok(())
}
pub fn dec_num_streams(&mut self) { pub fn dec_num_streams(&mut self) {
self.num_streams -= 1; self.num_streams -= 1;
} }

View File

@@ -53,7 +53,7 @@ impl<B> Streams<B>
Streams { Streams {
inner: Arc::new(Mutex::new(Inner { inner: Arc::new(Mutex::new(Inner {
actions: Actions { actions: Actions {
recv: Recv::new(&config), recv: Recv::new::<P>(&config),
send: Send::new::<P>(&config), send: Send::new::<P>(&config),
}, },
store: Store::new(), store: Store::new(),
@@ -129,10 +129,17 @@ impl<B> Streams<B>
let id = frame.stream_id(); let id = frame.stream_id();
if id.is_zero() {
return Err(ProtocolError.into());
}
let mut stream = match me.store.find_mut(&id) { let mut stream = match me.store.find_mut(&id) {
Some(stream) => stream, Some(stream) => stream,
// TODO: should this be an error? None => {
None => return Ok(()), // TODO: Are there other error cases?
me.actions.ensure_not_idle::<P>(id)?;
return Ok(());
}
}; };
me.actions.transition::<P, _, _>(stream, |actions, stream| { me.actions.transition::<P, _, _>(stream, |actions, stream| {
@@ -165,6 +172,8 @@ impl<B> Streams<B>
// considers closed. It's ok... // considers closed. It's ok...
if let Some(mut stream) = me.store.find_mut(&id) { if let Some(mut stream) = me.store.find_mut(&id) {
try!(me.actions.send.recv_stream_window_update(frame, &mut stream)); try!(me.actions.send.recv_stream_window_update(frame, &mut stream));
} else {
me.actions.recv.ensure_not_idle(id)?;
} }
} }
@@ -413,6 +422,16 @@ impl<B> Drop for Chunk<B>
impl<B> Actions<B> impl<B> Actions<B>
where B: Buf, where B: Buf,
{ {
fn ensure_not_idle<P: Peer>(&mut self, id: StreamId)
-> Result<(), ConnectionError>
{
if self.is_local_init::<P>(id) {
self.send.ensure_not_idle(id)
} else {
self.recv.ensure_not_idle(id)
}
}
fn dec_num_streams<P: Peer>(&mut self, id: StreamId) { fn dec_num_streams<P: Peer>(&mut self, id: StreamId) {
if self.is_local_init::<P>(id) { if self.is_local_init::<P>(id) {
self.send.dec_num_streams(); self.send.dec_num_streams();