diff --git a/src/client.rs b/src/client.rs index c01e577..54c6bd2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -96,8 +96,8 @@ impl Peer for Client { frame } - fn convert_poll_message(headers: frame::Headers) -> Frame { - unimplemented!(); + fn convert_poll_message(headers: frame::Headers) -> Self::Poll { + headers.into_response() } } diff --git a/src/frame/data.rs b/src/frame/data.rs index b7205e1..3aef88f 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -1,4 +1,4 @@ -use frame::{util, Head, Error, StreamId, Kind}; +use frame::{util, Frame, Head, Error, StreamId, Kind}; use bytes::{BufMut, Bytes}; #[derive(Debug)] @@ -35,10 +35,18 @@ impl Data { }) } + pub fn stream_id(&self) -> StreamId { + self.stream_id + } + pub fn len(&self) -> usize { self.data.len() } + pub fn is_end_stream(&self) -> bool { + self.flags.is_end_stream() + } + pub fn encode(&self, dst: &mut T) { self.head().encode(self.len(), dst); dst.put(&self.data); @@ -53,6 +61,13 @@ impl Data { } } +impl From for Frame { + fn from(src: Data) -> Frame { + Frame::Data(src) + } +} + +// ===== impl DataFlag ===== impl DataFlag { pub fn load(bits: u8) -> DataFlag { diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 4998069..35866cc 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -4,7 +4,7 @@ use error::Reason; use frame::{self, Frame, Head, Kind, Error}; use util::byte_str::ByteStr; -use http::{Method, StatusCode}; +use http::{request, response, Method, StatusCode}; use http::header::{self, HeaderMap, HeaderName, HeaderValue}; use bytes::{BytesMut, Bytes}; @@ -172,14 +172,35 @@ impl Headers { }) } + pub fn stream_id(&self) -> StreamId { + self.stream_id + } + pub fn is_end_headers(&self) -> bool { self.flags.is_end_headers() } + pub fn is_end_stream(&self) -> bool { + self.flags.is_end_stream() + } + pub fn set_end_stream(&mut self) { self.flags.set_end_stream() } + pub fn into_response(self) -> response::Head { + let mut response = response::Head::default(); + + if let Some(status) = self.pseudo.status { + response.status = status; + } else { + unimplemented!(); + } + + response.headers = self.fields; + response + } + pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut) -> Option { diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 48cc089..4d128c0 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -31,6 +31,7 @@ mod data; mod go_away; mod head; mod headers; +mod reset; mod settings; mod util; @@ -38,6 +39,7 @@ pub use self::data::Data; pub use self::go_away::GoAway; pub use self::head::{Head, Kind, StreamId}; pub use self::headers::{Headers, PushPromise, Continuation, Pseudo}; +pub use self::reset::Reset; pub use self::settings::{Settings, SettingSet}; // Re-export some constants diff --git a/src/frame/reset.rs b/src/frame/reset.rs new file mode 100644 index 0000000..36be347 --- /dev/null +++ b/src/frame/reset.rs @@ -0,0 +1,25 @@ +use frame::{Head, Error}; +use super::{head, StreamId}; + +#[derive(Debug)] +pub struct Reset { + stream_id: StreamId, + error_code: u32, +} + +impl Reset { + pub fn load(head: Head, payload: &[u8]) -> Result { + if payload.len() != 4 { + // Invalid payload len + // TODO: Handle error + unimplemented!(); + } + + let error_code = unpack_octets_4!(payload, 0, u32); + + Ok(Reset { + stream_id: head.stream_id(), + error_code: error_code, + }) + } +} diff --git a/src/lib.rs b/src/lib.rs index 612fe58..9accba8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,6 +36,8 @@ pub use error::{ConnectionError, StreamError, Reason}; pub use frame::{StreamId}; pub use proto::Connection; +use bytes::Bytes; + /// An H2 connection frame #[derive(Debug)] pub enum Frame { @@ -46,7 +48,7 @@ pub enum Frame { }, Body { id: StreamId, - chunk: (), + chunk: Bytes, end_of_stream: bool, }, Trailers { @@ -80,5 +82,5 @@ pub trait Peer { end_of_stream: bool) -> frame::Headers; #[doc(hidden)] - fn convert_poll_message(headers: frame::Headers) -> Frame; + fn convert_poll_message(headers: frame::Headers) -> Self::Poll; } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index cdf6451..b5995b3 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -79,15 +79,36 @@ impl Stream for Connection // buffer is clear, `poll_complete` is called here. let _ = try!(self.poll_complete()); - match try_ready!(self.inner.poll()) { + let frame = match try_ready!(self.inner.poll()) { Some(Headers(v)) => { - debug!("poll; frame={:?}", v); - unimplemented!(); + // TODO: Update stream state + let stream_id = v.stream_id(); + let end_of_stream = v.is_end_stream(); + + Frame::Headers { + id: stream_id, + headers: P::convert_poll_message(v), + end_of_stream: end_of_stream, + } + } + Some(Data(v)) => { + // TODO: Validate frame + + let stream_id = v.stream_id(); + let end_of_stream = v.is_end_stream(); + + Frame::Body { + id: stream_id, + chunk: v.into_payload(), + end_of_stream: end_of_stream, + } } Some(frame) => panic!("unexpected frame; frame={:?}", frame), None => return Ok(Async::Ready(None)), _ => unimplemented!(), - } + }; + + Ok(Async::Ready(Some(frame))) } } diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 7ff30b2..3547f3f 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -49,7 +49,11 @@ impl FramedRead { } let frame = match head.kind() { - Kind::Data => unimplemented!(), + Kind::Data => { + let _ = bytes.split_to(frame::HEADER_LEN); + let frame = try!(frame::Data::load(head, bytes)); + frame.into() + } Kind::Headers => { let mut buf = Cursor::new(bytes); buf.set_position(frame::HEADER_LEN as u64); @@ -66,12 +70,21 @@ impl FramedRead { frame.into() } Kind::Priority => unimplemented!(), - Kind::Reset => unimplemented!(), + Kind::Reset => { + let frame = try!(frame::Reset::load(head, &bytes[frame::HEADER_LEN..])); + debug!("decoded; frame={:?}", frame); + // TODO: implement + return Ok(None); + } Kind::Settings => { let frame = try!(frame::Settings::load(head, &bytes[frame::HEADER_LEN..])); frame.into() } - Kind::PushPromise => unimplemented!(), + Kind::PushPromise => { + debug!("received PUSH_PROMISE"); + // TODO: implement + return Ok(None); + } Kind::Ping => unimplemented!(), Kind::GoAway => { let frame = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..]));