diff --git a/src/frame/data.rs b/src/frame/data.rs index fbaca6f..b627ce3 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -3,74 +3,95 @@ use bytes::{BufMut, Bytes, Buf}; use std::fmt; +/// Data frame +/// +/// Data frames convey arbitrary, variable-length sequences of octets associated +/// with a stream. One or more DATA frames are used, for instance, to carry HTTP +/// request or response payloads. #[derive(Eq, PartialEq)] pub struct Data { stream_id: StreamId, data: T, - flags: DataFlag, + flags: DataFlags, pad_len: Option, } #[derive(Copy, Clone, Eq, PartialEq)] -pub struct DataFlag(u8); +struct DataFlags(u8); const END_STREAM: u8 = 0x1; const PADDED: u8 = 0x8; const ALL: u8 = END_STREAM | PADDED; -impl Data { - pub fn load(head: Head, mut payload: Bytes) -> Result { - let flags = DataFlag::load(head.flag()); - - let pad_len = if flags.is_padded() { - let len = try!(util::strip_padding(&mut payload)); - Some(len) - } else { - None - }; - Ok(Data { - stream_id: head.stream_id(), - data: payload, - flags: flags, - pad_len: pad_len, - }) - } -} - impl Data { + /// Creates a new DATA frame. + pub fn new(stream_id: StreamId, payload: T) -> Self { + assert!(!stream_id.is_zero()); + + Data { + stream_id: stream_id, + data: payload, + flags: DataFlags::default(), + pad_len: None, + } + } + + /// Returns the stream identifer that this frame is associated with. + /// + /// This cannot be a zero stream identifier. pub fn stream_id(&self) -> StreamId { self.stream_id } + /// Gets the value of the `END_STREAM` flag for this frame. + /// + /// If true, this frame is the last that the endpoint will send for the + /// identified stream. + /// + /// Setting this flag causes the stream to enter one of the "half-closed" + /// states or the "closed" state (Section 5.1). pub fn is_end_stream(&self) -> bool { self.flags.is_end_stream() } - pub fn set_end_stream(&mut self) { - self.flags.set_end_stream(); - } - - pub fn unset_end_stream(&mut self) { - self.flags.unset_end_stream(); - } - - pub fn head(&self) -> Head { - Head::new(Kind::Data, self.flags.into(), self.stream_id) + /// Sets the value for the `END_STREAM` flag on this frame. + pub fn set_end_stream(&mut self, val: bool) { + if val { + self.flags.set_end_stream(); + } else { + self.flags.unset_end_stream(); + } } + /// Returns a reference to this frame's payload. + /// + /// This does **not** include any padding that might have been originally + /// included. pub fn payload(&self) -> &T { &self.data } + /// Returns a mutable reference to this frame's payload. + /// + /// This does **not** include any padding that might have been originally + /// included. pub fn payload_mut(&mut self) -> &mut T { &mut self.data } + /// Consumes `self` and returns the frame's payload. + /// + /// This does **not** include any padding that might have been originally + /// included. pub fn into_payload(self) -> T { self.data } - pub fn map(self, f: F) -> Data + pub(crate) fn head(&self) -> Head { + Head::new(Kind::Data, self.flags.into(), self.stream_id) + } + + pub(crate) fn map(self, f: F) -> Data where F: FnOnce(T) -> U, { Data { @@ -82,23 +103,35 @@ impl Data { } } -impl Data { - pub fn from_buf(stream_id: StreamId, data: T, eos: bool) -> Self { - // TODO ensure that data.remaining() < MAX_FRAME_SIZE - let mut flags = DataFlag::default(); - if eos { - flags.set_end_stream(); - } - Data { - stream_id, - data, - flags, - pad_len: None, - } - } +impl Data { + pub(crate) fn load(head: Head, mut payload: Bytes) -> Result { + let flags = DataFlags::load(head.flag()); - pub fn encode_chunk(&mut self, dst: &mut U) { + // The stream identifier must not be zero + if head.stream_id().is_zero() { + return Err(Error::InvalidStreamId); + } + + let pad_len = if flags.is_padded() { + let len = util::strip_padding(&mut payload)?; + Some(len) + } else { + None + }; + + Ok(Data { + stream_id: head.stream_id(), + data: payload, + flags: flags, + pad_len: pad_len, + }) + } +} + +impl Data { + pub(crate) fn encode_chunk(&mut self, dst: &mut U) { let len = self.data.remaining() as usize; + if len > dst.remaining_mut() { unimplemented!(); } @@ -125,54 +158,45 @@ impl fmt::Debug for Data { } } -// ===== impl DataFlag ===== +// ===== impl DataFlags ===== -impl DataFlag { - pub fn load(bits: u8) -> DataFlag { - DataFlag(bits & ALL) +impl DataFlags { + fn load(bits: u8) -> DataFlags { + DataFlags(bits & ALL) } - pub fn end_stream() -> DataFlag { - DataFlag(END_STREAM) - } - - pub fn padded() -> DataFlag { - DataFlag(PADDED) - } - - pub fn is_end_stream(&self) -> bool { + fn is_end_stream(&self) -> bool { self.0 & END_STREAM == END_STREAM } - pub fn set_end_stream(&mut self) { + fn set_end_stream(&mut self) { self.0 |= END_STREAM } - pub fn unset_end_stream(&mut self) { + fn unset_end_stream(&mut self) { self.0 &= !END_STREAM } - pub fn is_padded(&self) -> bool { + fn is_padded(&self) -> bool { self.0 & PADDED == PADDED } } -impl Default for DataFlag { - /// Returns a `HeadersFlag` value with `END_HEADERS` set. +impl Default for DataFlags { fn default() -> Self { - DataFlag(0) + DataFlags(0) } } -impl From for u8 { - fn from(src: DataFlag) -> u8 { +impl From for u8 { + fn from(src: DataFlags) -> u8 { src.0 } } -impl fmt::Debug for DataFlag { +impl fmt::Debug for DataFlags { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - let mut f = fmt.debug_struct("DataFlag"); + let mut f = fmt.debug_struct("DataFlags"); if self.is_end_stream() { f.field("end_stream", &true); diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index bc878bc..a732625 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -362,7 +362,7 @@ impl Prioritize let mut stream = store.resolve(key); if eos { - frame.set_end_stream(); + frame.set_end_stream(true); } self.push_back_frame(frame.into(), &mut stream); @@ -470,7 +470,7 @@ impl Prioritize let eos = frame.is_end_stream(); if frame.payload().remaining() > len { - frame.unset_end_stream(); + frame.set_end_stream(false); } Frame::Data(frame.map(|buf| { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index db7d524..638bc02 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -390,7 +390,7 @@ impl StreamRef where B: Buf, P: Peer, { - pub fn send_data(&mut self, data: B, end_of_stream: bool) + pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError> { let mut me = self.inner.lock().unwrap(); @@ -399,7 +399,8 @@ impl StreamRef let stream = me.store.resolve(self.key); // Create the data frame - let frame = frame::Data::from_buf(stream.id, data, end_of_stream); + let mut frame = frame::Data::new(stream.id, data); + frame.set_end_stream(end_stream); me.actions.transition(stream, |actions, stream| { // Send the data frame diff --git a/tests/codec_read.rs b/tests/codec_read.rs index 7c19578..0f6f91a 100644 --- a/tests/codec_read.rs +++ b/tests/codec_read.rs @@ -46,3 +46,15 @@ fn read_data_padding() { assert_closed!(codec); } + +#[test] +fn read_data_stream_id_zero() { + let mut codec = raw_codec! { + read => [ + 0, 0, 5, 0, 0, 0, 0, 0, 0, + "hello", // Data + ]; + }; + + poll_err!(codec); +} diff --git a/tests/support/src/codec.rs b/tests/support/src/codec.rs index 90a0dfe..8d0b4f6 100644 --- a/tests/support/src/codec.rs +++ b/tests/support/src/codec.rs @@ -5,6 +5,16 @@ macro_rules! assert_closed { }} } +#[macro_export] +macro_rules! poll_err { + ($transport:expr) => {{ + match $transport.poll() { + Err(e) => e, + frame => panic!("expected error; actual={:?}", frame), + } + }} +} + #[macro_export] macro_rules! poll_data { ($transport:expr) => {{