Data frame (#50)

* Rename DataFlag -> DataFlags
* Polish Data frame API
This commit is contained in:
Carl Lerche
2017-09-05 10:00:05 -07:00
committed by GitHub
parent bbcfffee46
commit b4fa5134f9
5 changed files with 121 additions and 74 deletions

View File

@@ -3,74 +3,95 @@ use bytes::{BufMut, Bytes, Buf};
use std::fmt; use std::fmt;
/// Data frame
///
/// Data frames convey arbitrary, variable-length sequences of octets associated
/// with a stream. One or more DATA frames are used, for instance, to carry HTTP
/// request or response payloads.
#[derive(Eq, PartialEq)] #[derive(Eq, PartialEq)]
pub struct Data<T = Bytes> { pub struct Data<T = Bytes> {
stream_id: StreamId, stream_id: StreamId,
data: T, data: T,
flags: DataFlag, flags: DataFlags,
pad_len: Option<u8>, pad_len: Option<u8>,
} }
#[derive(Copy, Clone, Eq, PartialEq)] #[derive(Copy, Clone, Eq, PartialEq)]
pub struct DataFlag(u8); struct DataFlags(u8);
const END_STREAM: u8 = 0x1; const END_STREAM: u8 = 0x1;
const PADDED: u8 = 0x8; const PADDED: u8 = 0x8;
const ALL: u8 = END_STREAM | PADDED; const ALL: u8 = END_STREAM | PADDED;
impl Data<Bytes> {
pub fn load(head: Head, mut payload: Bytes) -> Result<Self, Error> {
let flags = DataFlag::load(head.flag());
let pad_len = if flags.is_padded() {
let len = try!(util::strip_padding(&mut payload));
Some(len)
} else {
None
};
Ok(Data {
stream_id: head.stream_id(),
data: payload,
flags: flags,
pad_len: pad_len,
})
}
}
impl<T> Data<T> { impl<T> Data<T> {
/// Creates a new DATA frame.
pub fn new(stream_id: StreamId, payload: T) -> Self {
assert!(!stream_id.is_zero());
Data {
stream_id: stream_id,
data: payload,
flags: DataFlags::default(),
pad_len: None,
}
}
/// Returns the stream identifer that this frame is associated with.
///
/// This cannot be a zero stream identifier.
pub fn stream_id(&self) -> StreamId { pub fn stream_id(&self) -> StreamId {
self.stream_id self.stream_id
} }
/// Gets the value of the `END_STREAM` flag for this frame.
///
/// If true, this frame is the last that the endpoint will send for the
/// identified stream.
///
/// Setting this flag causes the stream to enter one of the "half-closed"
/// states or the "closed" state (Section 5.1).
pub fn is_end_stream(&self) -> bool { pub fn is_end_stream(&self) -> bool {
self.flags.is_end_stream() self.flags.is_end_stream()
} }
pub fn set_end_stream(&mut self) { /// Sets the value for the `END_STREAM` flag on this frame.
pub fn set_end_stream(&mut self, val: bool) {
if val {
self.flags.set_end_stream(); self.flags.set_end_stream();
} } else {
pub fn unset_end_stream(&mut self) {
self.flags.unset_end_stream(); self.flags.unset_end_stream();
} }
pub fn head(&self) -> Head {
Head::new(Kind::Data, self.flags.into(), self.stream_id)
} }
/// Returns a reference to this frame's payload.
///
/// This does **not** include any padding that might have been originally
/// included.
pub fn payload(&self) -> &T { pub fn payload(&self) -> &T {
&self.data &self.data
} }
/// Returns a mutable reference to this frame's payload.
///
/// This does **not** include any padding that might have been originally
/// included.
pub fn payload_mut(&mut self) -> &mut T { pub fn payload_mut(&mut self) -> &mut T {
&mut self.data &mut self.data
} }
/// Consumes `self` and returns the frame's payload.
///
/// This does **not** include any padding that might have been originally
/// included.
pub fn into_payload(self) -> T { pub fn into_payload(self) -> T {
self.data self.data
} }
pub fn map<F, U>(self, f: F) -> Data<U> pub(crate) fn head(&self) -> Head {
Head::new(Kind::Data, self.flags.into(), self.stream_id)
}
pub(crate) fn map<F, U>(self, f: F) -> Data<U>
where F: FnOnce(T) -> U, where F: FnOnce(T) -> U,
{ {
Data { Data {
@@ -82,23 +103,35 @@ impl<T> Data<T> {
} }
} }
impl<T: Buf> Data<T> { impl Data<Bytes> {
pub fn from_buf(stream_id: StreamId, data: T, eos: bool) -> Self { pub(crate) fn load(head: Head, mut payload: Bytes) -> Result<Self, Error> {
// TODO ensure that data.remaining() < MAX_FRAME_SIZE let flags = DataFlags::load(head.flag());
let mut flags = DataFlag::default();
if eos { // The stream identifier must not be zero
flags.set_end_stream(); if head.stream_id().is_zero() {
return Err(Error::InvalidStreamId);
} }
Data {
stream_id, let pad_len = if flags.is_padded() {
data, let len = util::strip_padding(&mut payload)?;
flags, Some(len)
pad_len: None, } else {
None
};
Ok(Data {
stream_id: head.stream_id(),
data: payload,
flags: flags,
pad_len: pad_len,
})
} }
} }
pub fn encode_chunk<U: BufMut>(&mut self, dst: &mut U) { impl<T: Buf> Data<T> {
pub(crate) fn encode_chunk<U: BufMut>(&mut self, dst: &mut U) {
let len = self.data.remaining() as usize; let len = self.data.remaining() as usize;
if len > dst.remaining_mut() { if len > dst.remaining_mut() {
unimplemented!(); unimplemented!();
} }
@@ -125,54 +158,45 @@ impl<T> fmt::Debug for Data<T> {
} }
} }
// ===== impl DataFlag ===== // ===== impl DataFlags =====
impl DataFlag { impl DataFlags {
pub fn load(bits: u8) -> DataFlag { fn load(bits: u8) -> DataFlags {
DataFlag(bits & ALL) DataFlags(bits & ALL)
} }
pub fn end_stream() -> DataFlag { fn is_end_stream(&self) -> bool {
DataFlag(END_STREAM)
}
pub fn padded() -> DataFlag {
DataFlag(PADDED)
}
pub fn is_end_stream(&self) -> bool {
self.0 & END_STREAM == END_STREAM self.0 & END_STREAM == END_STREAM
} }
pub fn set_end_stream(&mut self) { fn set_end_stream(&mut self) {
self.0 |= END_STREAM self.0 |= END_STREAM
} }
pub fn unset_end_stream(&mut self) { fn unset_end_stream(&mut self) {
self.0 &= !END_STREAM self.0 &= !END_STREAM
} }
pub fn is_padded(&self) -> bool { fn is_padded(&self) -> bool {
self.0 & PADDED == PADDED self.0 & PADDED == PADDED
} }
} }
impl Default for DataFlag { impl Default for DataFlags {
/// Returns a `HeadersFlag` value with `END_HEADERS` set.
fn default() -> Self { fn default() -> Self {
DataFlag(0) DataFlags(0)
} }
} }
impl From<DataFlag> for u8 { impl From<DataFlags> for u8 {
fn from(src: DataFlag) -> u8 { fn from(src: DataFlags) -> u8 {
src.0 src.0
} }
} }
impl fmt::Debug for DataFlag { impl fmt::Debug for DataFlags {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut f = fmt.debug_struct("DataFlag"); let mut f = fmt.debug_struct("DataFlags");
if self.is_end_stream() { if self.is_end_stream() {
f.field("end_stream", &true); f.field("end_stream", &true);

View File

@@ -362,7 +362,7 @@ impl<B, P> Prioritize<B, P>
let mut stream = store.resolve(key); let mut stream = store.resolve(key);
if eos { if eos {
frame.set_end_stream(); frame.set_end_stream(true);
} }
self.push_back_frame(frame.into(), &mut stream); self.push_back_frame(frame.into(), &mut stream);
@@ -470,7 +470,7 @@ impl<B, P> Prioritize<B, P>
let eos = frame.is_end_stream(); let eos = frame.is_end_stream();
if frame.payload().remaining() > len { if frame.payload().remaining() > len {
frame.unset_end_stream(); frame.set_end_stream(false);
} }
Frame::Data(frame.map(|buf| { Frame::Data(frame.map(|buf| {

View File

@@ -390,7 +390,7 @@ impl<B, P> StreamRef<B, P>
where B: Buf, where B: Buf,
P: Peer, P: Peer,
{ {
pub fn send_data(&mut self, data: B, end_of_stream: bool) pub fn send_data(&mut self, data: B, end_stream: bool)
-> Result<(), UserError> -> Result<(), UserError>
{ {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
@@ -399,7 +399,8 @@ impl<B, P> StreamRef<B, P>
let stream = me.store.resolve(self.key); let stream = me.store.resolve(self.key);
// Create the data frame // Create the data frame
let frame = frame::Data::from_buf(stream.id, data, end_of_stream); let mut frame = frame::Data::new(stream.id, data);
frame.set_end_stream(end_stream);
me.actions.transition(stream, |actions, stream| { me.actions.transition(stream, |actions, stream| {
// Send the data frame // Send the data frame

View File

@@ -46,3 +46,15 @@ fn read_data_padding() {
assert_closed!(codec); assert_closed!(codec);
} }
#[test]
fn read_data_stream_id_zero() {
let mut codec = raw_codec! {
read => [
0, 0, 5, 0, 0, 0, 0, 0, 0,
"hello", // Data
];
};
poll_err!(codec);
}

View File

@@ -5,6 +5,16 @@ macro_rules! assert_closed {
}} }}
} }
#[macro_export]
macro_rules! poll_err {
($transport:expr) => {{
match $transport.poll() {
Err(e) => e,
frame => panic!("expected error; actual={:?}", frame),
}
}}
}
#[macro_export] #[macro_export]
macro_rules! poll_data { macro_rules! poll_data {
($transport:expr) => {{ ($transport:expr) => {{