From 48c973424904baf46a655c61ce977cfde84e8d74 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 10 Aug 2017 20:14:00 -0700 Subject: [PATCH] Support receiving continuation frames --- src/frame/headers.rs | 79 ++++++++++++++++++++++++---------------- src/proto/framed_read.rs | 70 +++++++++++++++++++++++++++-------- 2 files changed, 101 insertions(+), 48 deletions(-) diff --git a/src/frame/headers.rs b/src/frame/headers.rs index f10144a..ed8d85b 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -123,40 +123,36 @@ impl Headers { } } - pub fn load(head: Head, src: &mut Cursor, decoder: &mut hpack::Decoder) - -> Result + /// Loads the header frame but doesn't actually do HPACK decoding. + /// + /// HPACK decoding is done in the `load_hpack` step. + pub fn load(head: Head, mut src: BytesMut) + -> Result<(Self, BytesMut), Error> { let flags = HeadersFlag(head.flag()); + let mut pad = 0; trace!("loading headers; flags={:?}", flags); // Read the padding length if flags.is_padded() { - let pad = src.get_u8() as usize; - let len = src.get_ref().len(); + // TODO: Ensure payload is sized correctly + pad = src[0] as usize; - if pad >= len { - trace!("too much padding"); - return Err(Error::TooMuchPadding); - } - - // Truncate the last `pad` bytes. - let len = src.get_ref().len() - pad; - src.get_mut().truncate(len); + // Drop the padding + let _ = src.split_to(1); } // Read the stream dependency let stream_dep = if flags.is_priority() { - let mut buf = [0u8; 4]; - - // Read the next 4 bytes - src.copy_to_slice(&mut buf); - // Parse the stream ID and exclusive flag - let (stream_id, is_exclusive) = StreamId::parse(&buf); + let (stream_id, is_exclusive) = StreamId::parse(&src[..4]); // Read the weight - let weight = src.get_u8(); + let weight = src[4]; + + // Drop the next 5 bytes + let _ = src.split_to(5); Some(StreamDependency { stream_id, @@ -167,31 +163,56 @@ impl Headers { None }; - let mut pseudo = Pseudo::default(); - let mut fields = HeaderMap::new(); + if pad > 0 { + if pad > src.len() { + return Err(Error::TooMuchPadding); + } + + let len = src.len() - pad; + src.truncate(len); + } + + let headers = Headers { + stream_id: head.stream_id(), + stream_dep: stream_dep, + fields: HeaderMap::new(), + pseudo: Pseudo::default(), + flags: flags, + }; + + Ok((headers, src)) + } + + pub fn load_hpack(&mut self, + src: BytesMut, + decoder: &mut hpack::Decoder) + -> Result<(), Error> + { let mut err = false; macro_rules! set_pseudo { ($field:ident, $val:expr) => {{ - if pseudo.$field.is_some() { + if self.pseudo.$field.is_some() { err = true; } else { - pseudo.$field = Some($val); + self.pseudo.$field = Some($val); } }} } + let mut src = Cursor::new(src.freeze()); + // At this point, we're going to assume that the hpack encoded headers // contain the entire payload. Later, we need to check for stream // priority. // // TODO: Provide a way to abort decoding if an error is hit. - let res = decoder.decode(src, |header| { + let res = decoder.decode(&mut src, |header| { use hpack::Header::*; match header { Field { name, value } => { - fields.append(name, value); + self.fields.append(name, value); } Authority(v) => set_pseudo!(authority, v), Method(v) => set_pseudo!(method, v), @@ -211,13 +232,7 @@ impl Headers { return Err(hpack::DecoderError::RepeatedPseudo.into()); } - Ok(Headers { - stream_id: head.stream_id(), - stream_dep: stream_dep, - fields: fields, - pseudo: pseudo, - flags: flags, - }) + Ok(()) } /// Returns `true` if the frame represents trailers diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 1691eed..e4445a5 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -2,10 +2,11 @@ use {hpack, ConnectionError}; use frame::{self, Frame, Kind}; use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE; use proto::*; +use error::Reason::*; use futures::*; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use tokio_io::AsyncRead; use tokio_io::codec::length_delimited; @@ -24,7 +25,16 @@ pub struct FramedRead { /// Partially loaded headers frame #[derive(Debug)] -enum Partial { +struct Partial { + /// Empty frame + frame: Continuable, + + /// Partial header payload + buf: BytesMut, +} + +#[derive(Debug)] +enum Continuable { Headers(frame::Headers), // PushPromise(frame::PushPromise), } @@ -38,14 +48,14 @@ impl FramedRead { } } - fn decode_frame(&mut self, mut bytes: Bytes) -> Result, ConnectionError> { + fn decode_frame(&mut self, mut bytes: BytesMut) -> Result, ConnectionError> { trace!("decoding frame from {}B", bytes.len()); // Parse the head let head = frame::Head::parse(&bytes); if self.partial.is_some() && head.kind() != Kind::Continuation { - unimplemented!(); + return Err(ProtocolError.into()); } let kind = head.kind(); @@ -64,22 +74,29 @@ impl FramedRead { } Kind::Data => { let _ = bytes.split_to(frame::HEADER_LEN); - frame::Data::load(head, bytes)?.into() + frame::Data::load(head, bytes.freeze())?.into() } Kind::Headers => { - let mut buf = Cursor::new(bytes); - buf.set_position(frame::HEADER_LEN as u64); - + // Drop the frame header // TODO: Change to drain: carllerche/bytes#130 - let frame = try!(frame::Headers::load(head, &mut buf, &mut self.hpack)); + let _ = bytes.split_to(frame::HEADER_LEN); + + // Parse the header frame w/o parsing the payload + let (mut headers, payload) = frame::Headers::load(head, bytes)?; + + if headers.is_end_headers() { + // Load the HPACK encoded headers & return the frame + headers.load_hpack(payload, &mut self.hpack)?; + headers.into() + } else { + // Defer loading the frame + self.partial = Some(Partial { + frame: Continuable::Headers(headers), + buf: payload, + }); - if !frame.is_end_headers() { - // Wait for continuation frames - self.partial = Some(Partial::Headers(frame)); return Ok(None); } - - frame.into() } Kind::Reset => { frame::Reset::load(head, &bytes[frame::HEADER_LEN..])?.into() @@ -95,7 +112,28 @@ impl FramedRead { return Ok(None); } Kind::Continuation => { - unimplemented!(); + // TODO: Un-hack this + let end_of_headers = (head.flag() & 0x4) == 0x4; + + let mut partial = match self.partial.take() { + Some(partial) => partial, + None => return Err(ProtocolError.into()), + }; + + // Extend the buf + partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]); + + if !end_of_headers { + self.partial = Some(partial); + return Ok(None); + } + + match partial.frame { + Continuable::Headers(mut frame) => { + frame.load_hpack(partial.buf, &mut self.hpack)?; + frame.into() + } + } } Kind::Unknown => { unimplemented!() @@ -116,7 +154,7 @@ impl futures::Stream for FramedRead loop { trace!("poll"); let bytes = match try_ready!(self.inner.poll()) { - Some(bytes) => bytes.freeze(), + Some(bytes) => bytes, None => return Ok(Async::Ready(None)), };