From a3950354aa5edad4752e88859d289553979a91b0 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 20 Jun 2017 09:16:21 -0700 Subject: [PATCH] Header frame decoding --- src/frame/headers.rs | 83 +++++++++++++++++++++++++++++++++++----- src/frame/mod.rs | 5 +++ src/hpack/decoder.rs | 37 ++++++++++-------- src/hpack/test.rs | 9 +++-- src/proto/framed_read.rs | 23 ++++++++++- 5 files changed, 126 insertions(+), 31 deletions(-) diff --git a/src/frame/headers.rs b/src/frame/headers.rs index c059338..2817354 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -1,14 +1,17 @@ use super::StreamId; -use {frame, hpack}; -use frame::{Head, Kind}; +use hpack; +use error::Reason; +use frame::{self, Frame, Head, Kind, Error}; use util::byte_str::ByteStr; use http::{Method, StatusCode}; use http::header::{self, HeaderMap, HeaderName, HeaderValue}; -use bytes::BytesMut; +use bytes::{BytesMut, Bytes}; use byteorder::{BigEndian, ByteOrder}; +use std::io::Cursor; + /// Header frame /// /// This could be either a request or a response. @@ -20,8 +23,8 @@ pub struct Headers { /// The stream dependency information, if any. stream_dep: Option, - /// The decoded headers - headers: HeaderMap, + /// The decoded header fields + fields: HeaderMap, /// Pseudo headers, these are broken out as they must be sent as part of the /// headers frame. @@ -72,7 +75,7 @@ pub struct StreamDependency { is_exclusive: bool, } -#[derive(Debug)] +#[derive(Debug, Default)] pub struct Pseudo { // Request method: Option, @@ -89,8 +92,8 @@ pub struct Iter { /// Pseudo headers pseudo: Option, - /// Headers - headers: header::IntoIter, + /// Header fields + fields: header::IntoIter, } const END_STREAM: u8 = 0x1; @@ -105,6 +108,60 @@ const ALL: u8 = END_STREAM // ===== impl Headers ===== impl Headers { + pub fn load(head: Head, src: &mut Cursor, decoder: &mut hpack::Decoder) + -> Result + { + let flags = HeadersFlag(head.flag()); + + assert!(!flags.is_priority(), "unimplemented stream priority"); + + let mut pseudo = Pseudo::default(); + let mut fields = HeaderMap::new(); + let mut err = false; + + macro_rules! set_pseudo { + ($field:ident, $val:expr) => {{ + if pseudo.$field.is_some() { + err = true; + } else { + pseudo.$field = Some($val); + } + }} + } + + // At this point, we're going to assume that the hpack encoded headers + // contain the entire payload. Later, we need to check for stream + // priority. + // + // TODO: Provide a way to abort decoding if an error is hit. + try!(decoder.decode(src, |header| { + use hpack::Header::*; + + match header { + Field { name, value } => { + fields.append(name, value); + } + Authority(v) => set_pseudo!(authority, v), + Method(v) => set_pseudo!(method, v), + Scheme(v) => set_pseudo!(scheme, v), + Path(v) => set_pseudo!(path, v), + Status(v) => set_pseudo!(status, v), + } + })); + + if err { + return Err(hpack::DecoderError::RepeatedPseudo.into()); + } + + Ok(Headers { + stream_id: head.stream_id(), + stream_dep: None, + fields: fields, + pseudo: pseudo, + flags: flags, + }) + } + pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut) -> Option { @@ -119,7 +176,7 @@ impl Headers { // Encode the frame let mut headers = Iter { pseudo: Some(self.pseudo), - headers: self.headers.into_iter(), + fields: self.fields.into_iter(), }; let ret = match encoder.encode(None, &mut headers, dst) { @@ -147,6 +204,12 @@ impl Headers { } } +impl From for Frame { + fn from(src: Headers) -> Frame { + Frame::Headers(src) + } +} + // ===== impl Iter ===== impl Iterator for Iter { @@ -179,7 +242,7 @@ impl Iterator for Iter { self.pseudo = None; - self.headers.next() + self.fields.next() .map(|(name, value)| { Field { name: name, value: value} }) diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 052b0c1..9b70fc9 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -1,4 +1,6 @@ +use hpack; use error::{ConnectionError, Reason}; + use bytes::{Bytes, BytesMut, BufMut}; use std::io; @@ -92,6 +94,9 @@ pub enum Error { /// This is returned if a settings frame is received with a stream /// identifier other than zero. InvalidStreamId, + + /// Failed to perform HPACK decoding + Hpack(hpack::DecoderError), } // ===== impl Frame ====== diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index 36207fb..91941e9 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -1,4 +1,5 @@ use super::{huffman, header as h2_header, Header}; +use frame; use util::byte_str::FromUtf8Error; use http::{method, header, status, StatusCode, Method}; @@ -19,7 +20,7 @@ pub struct Decoder { /// Represents all errors that can be encountered while performing the decoding /// of an HPACK header set. -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum DecoderError { InvalidRepresentation, InvalidIntegerPrefix, @@ -32,6 +33,7 @@ pub enum DecoderError { IntegerUnderflow, IntegerOverflow, StringUnderflow, + RepeatedPseudo, } enum Representation { @@ -155,30 +157,29 @@ impl Decoder { } /// Decodes the headers found in the given buffer. - pub fn decode(&mut self, src: &Bytes, mut f: F) -> Result<(), DecoderError> + pub fn decode(&mut self, src: &mut Cursor, mut f: F) -> Result<(), DecoderError> where F: FnMut(Header) { use self::Representation::*; - let mut buf = Cursor::new(src); let mut can_resize = true; if let Some(size) = self.max_size_update.take() { self.last_max_update = size; } - while buf.has_remaining() { + while src.has_remaining() { // At this point we are always at the beginning of the next block // within the HPACK data. The type of the block can always be // determined from the first byte. - match try!(Representation::load(peek_u8(&mut buf))) { + match try!(Representation::load(peek_u8(src))) { Indexed => { can_resize = false; - f(try!(self.decode_indexed(&mut buf))); + f(try!(self.decode_indexed(src))); } LiteralWithIndexing => { can_resize = false; - let entry = try!(self.decode_literal(&mut buf, true)); + let entry = try!(self.decode_literal(src, true)); // Insert the header into the table self.table.insert(entry.clone()); @@ -187,12 +188,12 @@ impl Decoder { } LiteralWithoutIndexing => { can_resize = false; - let entry = try!(self.decode_literal(&mut buf, false)); + let entry = try!(self.decode_literal(src, false)); f(entry); } LiteralNeverIndexed => { can_resize = false; - let entry = try!(self.decode_literal(&mut buf, false)); + let entry = try!(self.decode_literal(src, false)); // TODO: Track that this should never be indexed @@ -204,7 +205,7 @@ impl Decoder { } // Handle the dynamic table size update - try!(self.process_size_update(&mut buf)); + try!(self.process_size_update(src)); } } } @@ -212,7 +213,7 @@ impl Decoder { Ok(()) } - fn process_size_update(&mut self, buf: &mut Cursor<&Bytes>) + fn process_size_update(&mut self, buf: &mut Cursor) -> Result<(), DecoderError> { let new_size = try!(decode_int(buf, 5)); @@ -229,14 +230,14 @@ impl Decoder { Ok(()) } - fn decode_indexed(&self, buf: &mut Cursor<&Bytes>) + fn decode_indexed(&self, buf: &mut Cursor) -> Result { let index = try!(decode_int(buf, 7)); self.table.get(index) } - fn decode_literal(&mut self, buf: &mut Cursor<&Bytes>, index: bool) + fn decode_literal(&mut self, buf: &mut Cursor, index: bool) -> Result { let prefix = if index { @@ -263,7 +264,7 @@ impl Decoder { } } - fn decode_string(&mut self, buf: &mut Cursor<&Bytes>) -> Result { + fn decode_string(&mut self, buf: &mut Cursor) -> Result { const HUFF_FLAG: u8 = 0b10000000; // The first bit in the first byte contains the huffman encoded flag. @@ -388,7 +389,7 @@ fn peek_u8(buf: &mut B) -> u8 { buf.bytes()[0] } -fn take(buf: &mut Cursor<&Bytes>, n: usize) -> Bytes { +fn take(buf: &mut Cursor, n: usize) -> Bytes { let pos = buf.position() as usize; let ret = buf.get_ref().slice(pos, pos + n); buf.set_position((pos + n) as u64); @@ -520,6 +521,12 @@ impl From for DecoderError { } } +impl From for frame::Error { + fn from(src: DecoderError) -> Self { + frame::Error::Hpack(src) + } +} + /// Get an entry from the static table pub fn get_static(idx: usize) -> Header { use http::{status, method, header}; diff --git a/src/hpack/test.rs b/src/hpack/test.rs index 55f41c6..61b33e7 100644 --- a/src/hpack/test.rs +++ b/src/hpack/test.rs @@ -20,6 +20,7 @@ use self::rand::{StdRng, Rng, SeedableRng}; use std::env; use std::fs::File; use std::io::prelude::*; +use std::io::Cursor; use std::path::Path; use std::str; @@ -220,7 +221,7 @@ impl FuzzHpack { index = Some(i); // Decode the chunk! - decoder.decode(&buf.into(), |e| { + decoder.decode(&mut Cursor::new(buf.into()), |e| { assert_eq!(e, expect.remove(0).reify().unwrap()); }).unwrap(); @@ -231,7 +232,7 @@ impl FuzzHpack { } // Decode the chunk! - decoder.decode(&buf.into(), |e| { + decoder.decode(&mut Cursor::new(buf.into()), |e| { assert_eq!(e, expect.remove(0).reify().unwrap()); }).unwrap(); } @@ -506,7 +507,7 @@ fn test_story(story: Value) { decoder.queue_size_update(size); } - decoder.decode(&case.wire.clone().into(), |e| { + decoder.decode(&mut Cursor::new(case.wire.clone().into()), |e| { let (name, value) = expect.remove(0); assert_eq!(name, key_str(&e)); assert_eq!(value, value_str(&e)); @@ -533,7 +534,7 @@ fn test_story(story: Value) { encoder.encode(None, &mut input.clone().into_iter(), &mut buf); - decoder.decode(&buf.into(), |e| { + decoder.decode(&mut Cursor::new(buf.into()), |e| { assert_eq!(e, input.remove(0).reify().unwrap()); }).unwrap(); diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 403a0db..0cfc1ab 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -7,7 +7,7 @@ use tokio_io::AsyncWrite; use futures::*; use bytes::{Bytes, BytesMut, Buf}; -use std::io::{self, Write}; +use std::io::{self, Write, Cursor}; pub struct FramedRead { inner: T, @@ -15,6 +15,13 @@ pub struct FramedRead { // hpack decoder state hpack: hpack::Decoder, + partial: Option, +} + +/// Partially loaded headers frame +enum Partial { + Headers(frame::Headers), + PushPromise(frame::PushPromise), } impl FramedRead @@ -25,6 +32,7 @@ impl FramedRead FramedRead { inner: inner, hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE), + partial: None, } } } @@ -34,9 +42,20 @@ impl FramedRead { // Parse the head let head = frame::Head::parse(&bytes); + if self.partial.is_some() && head.kind() != Kind::Continuation { + unimplemented!(); + } + let frame = match head.kind() { Kind::Data => unimplemented!(), - Kind::Headers => unimplemented!(), + Kind::Headers => { + let mut buf = Cursor::new(bytes); + buf.set_position(frame::HEADER_LEN as u64); + + // TODO: Change to drain: carllerche/bytes#130 + let frame = try!(frame::Headers::load(head, &mut buf, &mut self.hpack)); + frame.into() + } Kind::Priority => unimplemented!(), Kind::Reset => unimplemented!(), Kind::Settings => {