diff --git a/src/frame/data.rs b/src/frame/data.rs index 81a4b01..b7205e1 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -1,6 +1,7 @@ -use frame::{util, Head, Error, StreamId}; -use bytes::Bytes; +use frame::{util, Head, Error, StreamId, Kind}; +use bytes::{BufMut, Bytes}; +#[derive(Debug)] pub struct Data { stream_id: StreamId, data: Bytes, @@ -33,6 +34,23 @@ impl Data { pad_len: pad_len, }) } + + pub fn len(&self) -> usize { + self.data.len() + } + + pub fn encode(&self, dst: &mut T) { + self.head().encode(self.len(), dst); + dst.put(&self.data); + } + + pub fn head(&self) -> Head { + Head::new(Kind::Data, self.flags.into(), self.stream_id) + } + + pub fn into_payload(self) -> Bytes { + self.data + } } @@ -57,3 +75,9 @@ impl DataFlag { self.0 & PADDED == PADDED } } + +impl From for u8 { + fn from(src: DataFlag) -> u8 { + src.0 + } +} diff --git a/src/frame/head.rs b/src/frame/head.rs index c215836..2f29404 100644 --- a/src/frame/head.rs +++ b/src/frame/head.rs @@ -65,7 +65,7 @@ impl Head { super::HEADER_LEN } - pub fn encode(&self, payload_len: usize, dst: &mut T) -> Result<(), Error> { + pub fn encode(&self, payload_len: usize, dst: &mut T) { debug_assert_eq!(self.encode_len(), dst.remaining_mut()); debug_assert!(self.stream_id & STREAM_ID_MASK == 0); @@ -73,7 +73,6 @@ impl Head { dst.put_u8(self.kind as u8); dst.put_u8(self.flag); dst.put_u32::(self.stream_id); - Ok(()) } } diff --git a/src/frame/headers.rs b/src/frame/headers.rs index f202d4e..a507b88 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -1,43 +1,79 @@ +use super::StreamId; +use util::byte_str::ByteStr; + +use http::{Method, StatusCode}; +use http::header::{self, HeaderMap, HeaderValue}; /// Header frame /// /// This could be either a request or a response. +#[derive(Debug)] pub struct Headers { /// The ID of the stream with which this frame is associated. stream_id: StreamId, + /// The stream dependency information, if any. stream_dep: Option, + /// The decoded headers - headers: HeaderMap, + headers: HeaderMap, + + /// Pseudo headers, these are broken out as they must be sent as part of the + /// headers frame. pseudo: Pseudo, - flags: HeaderFlag, + + /// The associated flags + flags: HeadersFlag, } #[derive(Debug, Copy, Clone, Eq, PartialEq, Default)] pub struct HeadersFlag(u8); -pub struct PushPromise; +#[derive(Debug)] +pub struct PushPromise { + /// The ID of the stream with which this frame is associated. + stream_id: StreamId, + /// The ID of the stream being reserved by this PushPromise. + promised_id: StreamId, + + /// The associated flags + flags: HeadersFlag, +} + +#[derive(Debug)] pub struct StreamDependency { /// The ID of the stream dependency target stream_id: StreamId, + /// The weight for the stream. The value exposed (and set) here is always in /// the range [0, 255], instead of [1, 256] (as defined in section 5.3.2.) /// so that the value fits into a `u8`. weight: u8, + /// True if the stream dependency is exclusive. is_exclusive: bool, } +#[derive(Debug)] pub struct Pseudo { // Request - method: Option<()>, - scheme: Option<()>, - authority: Option<()>, - path: Option<()>, + method: Option, + scheme: Option, + authority: Option, + path: Option, // Response - status: Option<()>, + status: Option, +} + +#[derive(Debug)] +pub struct Iter { + /// Pseudo headers + pseudo: Option, + + /// Headers + headers: header::IntoIter, } const END_STREAM: u8 = 0x1; @@ -52,7 +88,7 @@ const ALL: u8 = END_STREAM // ===== impl HeadersFlag ===== impl HeadersFlag { - pub empty() -> HeadersFlag { + pub fn empty() -> HeadersFlag { HeadersFlag(0) } diff --git a/src/frame/mod.rs b/src/frame/mod.rs index dbb05ea..bec36e6 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -26,31 +26,29 @@ macro_rules! unpack_octets_4 { mod data; mod head; +mod headers; mod settings; -mod unknown; mod util; pub use self::data::Data; pub use self::head::{Head, Kind, StreamId}; +pub use self::headers::{Headers, PushPromise}; pub use self::settings::{Settings, SettingSet}; -pub use self::unknown::Unknown; + +// Re-export some constants +pub use self::settings::{ + DEFAULT_SETTINGS_HEADER_TABLE_SIZE, + DEFAULT_MAX_FRAME_SIZE, +}; pub const HEADER_LEN: usize = 9; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug /*, Clone, PartialEq */)] pub enum Frame { - /* - Data(DataFrame<'a>), - HeadersFrame(HeadersFrame<'a>), - RstStreamFrame(RstStreamFrame), - SettingsFrame(SettingsFrame), - PingFrame(PingFrame), - GoawayFrame(GoawayFrame<'a>), - WindowUpdateFrame(WindowUpdateFrame), - UnknownFrame(RawFrame<'a>), - */ + Data(Data), + Headers(Headers), + PushPromise(PushPromise), Settings(Settings), - Unknown(Unknown), } /// Errors that can occur during parsing an HTTP/2 frame. @@ -99,40 +97,6 @@ pub enum Error { // ===== impl Frame ====== impl Frame { - pub fn load(mut frame: Bytes) -> Result { - let head = Head::parse(&frame); - - // Extract the payload from the frame - let _ = frame.drain_to(HEADER_LEN); - - match head.kind() { - Kind::Unknown => { - let unknown = Unknown::new(head, frame); - Ok(Frame::Unknown(unknown)) - } - _ => unimplemented!(), - } - } - - pub fn encode_len(&self) -> usize { - use self::Frame::*; - - match *self { - Settings(ref frame) => frame.encode_len(), - Unknown(ref frame) => frame.encode_len(), - } - } - - pub fn encode(&self, dst: &mut BytesMut) -> Result<(), Error> { - use self::Frame::*; - - debug_assert!(dst.remaining_mut() >= self.encode_len()); - - match *self { - Settings(ref frame) => frame.encode(dst), - Unknown(ref frame) => frame.encode(dst), - } - } } // ===== impl Error ===== diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 8e11a29..361c412 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -37,6 +37,9 @@ pub struct SettingsFlag(u8); const ACK: u8 = 0x1; const ALL: u8 = ACK; +pub const DEFAULT_SETTINGS_HEADER_TABLE_SIZE: usize = 4_096; +pub const DEFAULT_MAX_FRAME_SIZE: usize = 16_384; + // ===== impl Settings ===== impl Settings { @@ -111,27 +114,21 @@ impl Settings { Ok(settings) } - pub fn encode_len(&self) -> usize { - super::HEADER_LEN + self.payload_len() - } - fn payload_len(&self) -> usize { let mut len = 0; self.for_each(|_| len += 6); len } - pub fn encode(&self, dst: &mut BytesMut) -> Result<(), Error> { + pub fn encode(&self, dst: &mut BytesMut) { // Create & encode an appropriate frame head let head = Head::new(Kind::Settings, self.flag.into(), 0); let payload_len = self.payload_len(); - try!(head.encode(payload_len, dst)); + head.encode(payload_len, dst); // Encode the settings self.for_each(|setting| setting.encode(dst)); - - Ok(()) } fn for_each(&self, mut f: F) { diff --git a/src/hpack/encoder.rs b/src/hpack/encoder.rs index a689452..09118e4 100644 --- a/src/hpack/encoder.rs +++ b/src/hpack/encoder.rs @@ -4,16 +4,21 @@ use super::table::{Table, Index}; use http::header::{HeaderName, HeaderValue}; use bytes::{BytesMut, BufMut}; +#[derive(Debug)] pub struct Encoder { table: Table, size_update: Option, } +#[derive(Debug)] pub enum Encode { Full, - Partial(Index), + Partial(EncodeState), } +#[derive(Debug)] +pub struct EncodeState(Index); + #[derive(Debug, PartialEq, Eq)] pub enum EncoderError { BufferOverflow, @@ -67,7 +72,7 @@ impl Encoder { } /// Encode a set of headers into the provide buffer - pub fn encode(&mut self, resume: Option, headers: &mut I, dst: &mut BytesMut) + pub fn encode(&mut self, resume: Option, headers: &mut I, dst: &mut BytesMut) -> Result where I: Iterator, { @@ -81,13 +86,13 @@ impl Encoder { return Err(e); } - if let Some(index) = resume { + if let Some(resume) = resume { let len = dst.len(); - match self.encode_header(&index, dst) { + match self.encode_header(&resume.0, dst) { Err(EncoderError::BufferOverflow) => { dst.truncate(len); - return Ok(Encode::Partial(index)); + return Ok(Encode::Partial(resume)); } Err(e) => return Err(e), Ok(_) => {} @@ -101,7 +106,7 @@ impl Encoder { match self.encode_header(&index, dst) { Err(EncoderError::BufferOverflow) => { dst.truncate(len); - return Ok(Encode::Partial(index)); + return Ok(Encode::Partial(EncodeState(index))); } Err(e) => return Err(e), Ok(_) => {} diff --git a/src/hpack/mod.rs b/src/hpack/mod.rs index ed4aa98..d86999c 100644 --- a/src/hpack/mod.rs +++ b/src/hpack/mod.rs @@ -7,7 +7,6 @@ mod table; #[cfg(test)] mod test; -pub use self::encoder::{Encoder, Encode, EncoderError}; +pub use self::encoder::{Encoder, Encode, EncoderError, EncodeState}; pub use self::header::Header; pub use self::decoder::{Decoder, DecoderError}; -pub use self::table::Index; diff --git a/src/hpack/table.rs b/src/hpack/table.rs index e8d371e..197158a 100644 --- a/src/hpack/table.rs +++ b/src/hpack/table.rs @@ -9,6 +9,7 @@ use std::collections::VecDeque; use std::hash::{Hash, Hasher}; /// HPACK encoder table +#[derive(Debug)] pub struct Table { mask: usize, indices: Vec>, @@ -37,6 +38,7 @@ pub enum Index { NotIndexed(Header), } +#[derive(Debug)] struct Slot { hash: HashValue, header: Header, diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 5386a9a..403a0db 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -1,5 +1,6 @@ -use ConnectionError; +use {hpack, ConnectionError}; use frame::{self, Frame, Kind}; +use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE; use tokio_io::AsyncWrite; @@ -12,7 +13,7 @@ pub struct FramedRead { inner: T, // hpack decoder state - // hpack: hpack::Decoder, + hpack: hpack::Decoder, } @@ -23,6 +24,7 @@ impl FramedRead pub fn new(inner: T) -> FramedRead { FramedRead { inner: inner, + hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE), } } } @@ -46,10 +48,7 @@ impl FramedRead { Kind::GoAway => unimplemented!(), Kind::WindowUpdate => unimplemented!(), Kind::Continuation => unimplemented!(), - Kind::Unknown => { - let _ = bytes.split_to(frame::HEADER_LEN); - frame::Unknown::new(head, bytes).into() - } + Kind::Unknown => return Ok(None), }; Ok(Some(frame)) diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 83d60cb..a2ae50d 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -1,33 +1,86 @@ use {ConnectionError, Reason}; -use frame::{Frame, Error}; +use frame::{self, Data, Frame, Error, Headers, PushPromise, Settings}; +use hpack; -use tokio_io::AsyncWrite; use futures::*; -use bytes::{BytesMut, Buf, BufMut}; +use tokio_io::AsyncWrite; +use bytes::{Bytes, BytesMut, Buf, BufMut}; +use http::header::{self, HeaderValue}; +use std::cmp; use std::io::{self, Cursor}; #[derive(Debug)] pub struct FramedWrite { + /// Upstream `AsyncWrite` inner: T, - buf: Cursor, + + /// HPACK encoder + hpack: hpack::Encoder, + + /// Write buffer + buf: BytesMut, + + /// Position in the frame + pos: usize, + + /// Next frame to encode + next: Option, + + /// Max frame size, this is specified by the peer + max_frame_size: usize, } -const DEFAULT_BUFFER_CAPACITY: usize = 8 * 1_024; -const MAX_BUFFER_CAPACITY: usize = 16 * 1_024; +#[derive(Debug)] +enum Next { + Data { + /// Length of the current frame being written + frame_len: usize, + + /// Data frame to encode + data: frame::Data + }, + Continuation { + /// Stream ID of continuation frame + stream_id: frame::StreamId, + + /// Argument to pass to the HPACK encoder to resume encoding + resume: hpack::EncodeState, + + /// remaining headers to encode + rem: header::IntoIter, + }, +} + +/// Initialze the connection with this amount of write buffer. +const DEFAULT_BUFFER_CAPACITY: usize = 4 * 1_024; + +/// Min buffer required to attempt to write a frame +const MIN_BUFFER_CAPACITY: usize = frame::HEADER_LEN + CHAIN_THRESHOLD; + +/// Chain payloads bigger than this. The remote will never advertise a max frame +/// size less than this (well, the spec says the max frame size can't be less +/// than 16kb, so not even close). +const CHAIN_THRESHOLD: usize = 256; impl FramedWrite { pub fn new(inner: T) -> FramedWrite { - let buf = BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY); - FramedWrite { inner: inner, - buf: Cursor::new(buf), + hpack: hpack::Encoder::default(), + buf: BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY), + pos: 0, + next: None, + max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, } } - fn write_buf(&mut self) -> &mut BytesMut { - self.buf.get_mut() + fn has_capacity(&self) -> bool { + self.next.is_none() && self.buf.remaining_mut() >= MIN_BUFFER_CAPACITY + } + + fn frame_len(&self, data: &Data) -> usize { + cmp::min(self.max_frame_size, data.len()) } } @@ -36,50 +89,49 @@ impl Sink for FramedWrite { type SinkError = ConnectionError; fn start_send(&mut self, item: Frame) -> StartSend { - let len = item.encode_len(); - - if len > MAX_BUFFER_CAPACITY { - // This case should never happen. Large frames should be chunked at - // a higher level, so this is an internal error. - return Err(ConnectionError::Proto(Reason::InternalError)); - } - - if self.write_buf().remaining_mut() <= len { - // Try flushing the buffer + if self.has_capacity() { + // Try flushing try!(self.poll_complete()); - let rem = self.write_buf().remaining_mut(); - let additional = len - rem; - - if self.write_buf().capacity() + additional > MAX_BUFFER_CAPACITY { + if self.has_capacity() { return Ok(AsyncSink::NotReady(item)); } - - // Grow the buffer - self.write_buf().reserve(additional); } - // At this point, the buffer contains enough space - item.encode(self.write_buf()); + match item { + Frame::Data(v) => { + if v.len() >= CHAIN_THRESHOLD { + let head = v.head(); + let len = self.frame_len(&v); + + // Encode the frame head to the buffer + head.encode(len, &mut self.buf); + + // Save the data frame + self.next = Some(Next::Data { + frame_len: len, + data: v, + }); + } else { + v.encode(&mut self.buf); + } + } + Frame::Headers(v) => { + unimplemented!(); + } + Frame::PushPromise(v) => { + unimplemented!(); + } + Frame::Settings(v) => { + v.encode(&mut self.buf); + } + } Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - while self.buf.has_remaining() { - try_ready!(self.inner.write_buf(&mut self.buf)); - - if !self.buf.has_remaining() { - // Reset the buffer - self.write_buf().clear(); - self.buf.set_position(0); - } - } - - // Try flushing the underlying IO - try_nb!(self.inner.flush()); - - return Ok(Async::Ready(())); + unimplemented!(); } fn close(&mut self) -> Poll<(), ConnectionError> {