From db5e94039cffda3586d6cf6b2361171e5d2ab2f4 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sat, 11 Mar 2017 13:19:28 -0800 Subject: [PATCH] More hackin --- src/frame/headers.rs | 58 ++++++++++++++++++++++++++++++++++++++++ src/proto/framed_read.rs | 41 +++++++++++++++++----------- 2 files changed, 83 insertions(+), 16 deletions(-) diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 076369c..f202d4e 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -3,9 +3,30 @@ /// /// This could be either a request or a response. pub struct Headers { + /// The ID of the stream with which this frame is associated. stream_id: StreamId, + /// The stream dependency information, if any. + stream_dep: Option, + /// The decoded headers headers: HeaderMap, pseudo: Pseudo, + flags: HeaderFlag, +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)] +pub struct HeadersFlag(u8); + +pub struct PushPromise; + +pub struct StreamDependency { + /// The ID of the stream dependency target + stream_id: StreamId, + /// The weight for the stream. The value exposed (and set) here is always in + /// the range [0, 255], instead of [1, 256] (as defined in section 5.3.2.) + /// so that the value fits into a `u8`. + weight: u8, + /// True if the stream dependency is exclusive. + is_exclusive: bool, } pub struct Pseudo { @@ -18,3 +39,40 @@ pub struct Pseudo { // Response status: Option<()>, } + +const END_STREAM: u8 = 0x1; +const END_HEADERS: u8 = 0x4; +const PADDED: u8 = 0x8; +const PRIORITY: u8 = 0x20; +const ALL: u8 = END_STREAM + | END_HEADERS + | PADDED + | PRIORITY; + +// ===== impl HeadersFlag ===== + +impl HeadersFlag { + pub empty() -> HeadersFlag { + HeadersFlag(0) + } + + pub fn load(bits: u8) -> HeadersFlag { + HeadersFlag(bits & ALL) + } + + pub fn is_end_stream(&self) -> bool { + self.0 & END_STREAM == END_STREAM + } + + pub fn is_end_headers(&self) -> bool { + self.0 & END_HEADERS == END_HEADERS + } + + pub fn is_padded(&self) -> bool { + self.0 & PADDED == PADDED + } + + pub fn is_priority(&self) -> bool { + self.0 & PRIORITY == PRIORITY + } +} diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 3c1dae8..5386a9a 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -4,7 +4,7 @@ use frame::{self, Frame, Kind}; use tokio_io::AsyncWrite; use futures::*; -use bytes::{BytesMut, Buf}; +use bytes::{Bytes, BytesMut, Buf}; use std::io::{self, Write}; @@ -27,18 +27,8 @@ impl FramedRead } } -impl Stream for FramedRead - where T: Stream, -{ - type Item = Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - let mut bytes = match try_ready!(self.inner.poll()) { - Some(bytes) => bytes, - None => return Ok(Async::Ready(None)), - }; - +impl FramedRead { + fn decode_frame(&mut self, mut bytes: Bytes) -> Result, ConnectionError> { // Parse the head let head = frame::Head::parse(&bytes); @@ -58,12 +48,31 @@ impl Stream for FramedRead Kind::Continuation => unimplemented!(), Kind::Unknown => { let _ = bytes.split_to(frame::HEADER_LEN); - frame::Unknown::new(head, bytes.freeze()).into() + frame::Unknown::new(head, bytes).into() } - _ => unimplemented!(), }; - Ok(Async::Ready(Some(frame))) + Ok(Some(frame)) + } +} + +impl Stream for FramedRead + where T: Stream, +{ + type Item = Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + loop { + let mut bytes = match try_ready!(self.inner.poll()) { + Some(bytes) => bytes.freeze(), + None => return Ok(Async::Ready(None)), + }; + + if let Some(frame) = try!(self.decode_frame(bytes)) { + return Ok(Async::Ready(Some(frame))); + } + } } }