diff --git a/Cargo.toml b/Cargo.toml index bfb011c..4329e25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ tokio-io = "0.1" tokio-timer = { git = "https://github.com/tokio-rs/tokio-timer" } bytes = "0.4" http = { path = "/Users/carllerche/Code/Oss/Tokio/tower/http" } +byteorder = "1.0" log = "0.3.8" # tower = { path = "/Users/carllerche/Code/Oss/Tokio/tower/tower-http" } fnv = "1.0.5" diff --git a/src/frame/headers.rs b/src/frame/headers.rs index a507b88..c059338 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -1,8 +1,13 @@ use super::StreamId; +use {frame, hpack}; +use frame::{Head, Kind}; use util::byte_str::ByteStr; use http::{Method, StatusCode}; -use http::header::{self, HeaderMap, HeaderValue}; +use http::header::{self, HeaderMap, HeaderName, HeaderValue}; + +use bytes::BytesMut; +use byteorder::{BigEndian, ByteOrder}; /// Header frame /// @@ -41,6 +46,18 @@ pub struct PushPromise { flags: HeadersFlag, } +#[derive(Debug)] +pub struct Continuation { + /// Stream ID of continuation frame + stream_id: StreamId, + + /// Argument to pass to the HPACK encoder to resume encoding + hpack: hpack::EncodeState, + + /// remaining headers to encode + headers: Iter, +} + #[derive(Debug)] pub struct StreamDependency { /// The ID of the stream dependency target @@ -85,6 +102,90 @@ const ALL: u8 = END_STREAM | PADDED | PRIORITY; +// ===== impl Headers ===== + +impl Headers { + pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut) + -> Option + { + let head = self.head(); + let pos = dst.len(); + + // At this point, we don't know how big the h2 frame will be. + // So, we write the head with length 0, then write the body, and + // finally write the length once we know the size. + head.encode(0, dst); + + // Encode the frame + let mut headers = Iter { + pseudo: Some(self.pseudo), + headers: self.headers.into_iter(), + }; + + let ret = match encoder.encode(None, &mut headers, dst) { + hpack::Encode::Full => None, + hpack::Encode::Partial(state) => { + Some(Continuation { + stream_id: self.stream_id, + hpack: state, + headers: headers, + }) + } + }; + + // Compute the frame length + let len = (dst.len() - pos) - frame::HEADER_LEN; + + // Write the frame length + BigEndian::write_u32(&mut dst[pos..pos+3], len as u32); + + ret + } + + fn head(&self) -> Head { + Head::new(Kind::Data, self.flags.into(), self.stream_id) + } +} + +// ===== impl Iter ===== + +impl Iterator for Iter { + type Item = hpack::Header>; + + fn next(&mut self) -> Option { + use hpack::Header::*; + + if let Some(ref mut pseudo) = self.pseudo { + if let Some(method) = pseudo.method.take() { + return Some(Method(method)); + } + + if let Some(scheme) = pseudo.scheme.take() { + return Some(Scheme(scheme)); + } + + if let Some(authority) = pseudo.authority.take() { + return Some(Authority(authority)); + } + + if let Some(path) = pseudo.path.take() { + return Some(Path(path)); + } + + if let Some(status) = pseudo.status.take() { + return Some(Status(status)); + } + } + + self.pseudo = None; + + self.headers.next() + .map(|(name, value)| { + Field { name: name, value: value} + }) + } +} + // ===== impl HeadersFlag ===== impl HeadersFlag { @@ -112,3 +213,9 @@ impl HeadersFlag { self.0 & PRIORITY == PRIORITY } } + +impl From for u8 { + fn from(src: HeadersFlag) -> u8 { + src.0 + } +} diff --git a/src/frame/mod.rs b/src/frame/mod.rs index bec36e6..052b0c1 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -32,7 +32,7 @@ mod util; pub use self::data::Data; pub use self::head::{Head, Kind, StreamId}; -pub use self::headers::{Headers, PushPromise}; +pub use self::headers::{Headers, PushPromise, Continuation}; pub use self::settings::{Settings, SettingSet}; // Re-export some constants diff --git a/src/hpack/encoder.rs b/src/hpack/encoder.rs index da2322b..c496136 100644 --- a/src/hpack/encoder.rs +++ b/src/hpack/encoder.rs @@ -76,7 +76,7 @@ impl Encoder { /// Encode a set of headers into the provide buffer pub fn encode(&mut self, resume: Option, headers: &mut I, dst: &mut BytesMut) - -> Result + -> Encode where I: Iterator>>, { let len = dst.len(); @@ -86,7 +86,7 @@ impl Encoder { dst.truncate(len); } - return Err(e); + unreachable!(); } if let Some(resume) = resume { @@ -104,9 +104,9 @@ impl Encoder { } }; - if try!(is_buffer_overflow(res)) { + if res.is_err() { dst.truncate(len); - return Ok(Encode::Partial(resume)); + return Encode::Partial(resume); } } @@ -122,12 +122,12 @@ impl Encoder { let index = self.table.index(header); let res = self.encode_header(&index, dst); - if try!(is_buffer_overflow(res)) { + if res.is_err() { dst.truncate(len); - return Ok(Encode::Partial(EncodeState { + return Encode::Partial(EncodeState { index: index, value: None, - })); + }); } last_index = Some(index); @@ -142,18 +142,18 @@ impl Encoder { &value, dst); - if try!(is_buffer_overflow(res)) { + if res.is_err() { dst.truncate(len); - return Ok(Encode::Partial(EncodeState { + return Encode::Partial(EncodeState { index: last_index.unwrap(), value: Some(value), - })); + }); } } }; } - Ok(Encode::Full) + Encode::Full } fn encode_size_updates(&mut self, dst: &mut BytesMut) -> Result<(), EncoderError> { @@ -417,14 +417,6 @@ fn encode_int_one_byte(value: usize, prefix_bits: usize) -> bool { value < (1 << prefix_bits) - 1 } -fn is_buffer_overflow(res: Result<(), EncoderError>) -> Result { - match res { - Err(EncoderError::BufferOverflow) => Ok(true), - Err(e) => Err(e), - Ok(_) => Ok(false), - } -} - #[cfg(test)] mod test { use super::*; @@ -789,7 +781,7 @@ mod test { }, ].into_iter(); - let resume = match encoder.encode(None, &mut input, &mut dst).unwrap() { + let resume = match encoder.encode(None, &mut input, &mut dst) { Encode::Partial(r) => r, _ => panic!(), }; @@ -801,7 +793,7 @@ mod test { dst.clear(); - match encoder.encode(Some(resume), &mut input, &mut dst).unwrap() { + match encoder.encode(Some(resume), &mut input, &mut dst) { Encode::Full => {} _ => panic!(), } diff --git a/src/hpack/test.rs b/src/hpack/test.rs index 50e4f8f..55f41c6 100644 --- a/src/hpack/test.rs +++ b/src/hpack/test.rs @@ -214,7 +214,7 @@ impl FuzzHpack { } loop { - match encoder.encode(index.take(), &mut input, &mut buf).unwrap() { + match encoder.encode(index.take(), &mut input, &mut buf) { Encode::Full => break, Encode::Partial(i) => { index = Some(i); @@ -531,7 +531,7 @@ fn test_story(story: Value) { Header::new(name.clone().into(), value.clone().into()).unwrap().into() }).collect(); - encoder.encode(None, &mut input.clone().into_iter(), &mut buf).unwrap(); + encoder.encode(None, &mut input.clone().into_iter(), &mut buf); decoder.decode(&buf.into(), |e| { assert_eq!(e, input.remove(0).reify().unwrap()); diff --git a/src/lib.rs b/src/lib.rs index 687bae8..46e7ace 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,8 @@ extern crate bytes; // Hash function used for HPACK encoding extern crate fnv; +extern crate byteorder; + #[macro_use] extern crate log; diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index a2ae50d..85b2854 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -1,5 +1,5 @@ use {ConnectionError, Reason}; -use frame::{self, Data, Frame, Error, Headers, PushPromise, Settings}; +use frame::{self, Frame, Error}; use hpack; use futures::*; @@ -19,10 +19,7 @@ pub struct FramedWrite { hpack: hpack::Encoder, /// Write buffer - buf: BytesMut, - - /// Position in the frame - pos: usize, + buf: Cursor, /// Next frame to encode next: Option, @@ -40,16 +37,7 @@ enum Next { /// Data frame to encode data: frame::Data }, - Continuation { - /// Stream ID of continuation frame - stream_id: frame::StreamId, - - /// Argument to pass to the HPACK encoder to resume encoding - resume: hpack::EncodeState, - - /// remaining headers to encode - rem: header::IntoIter, - }, + Continuation(frame::Continuation), } /// Initialze the connection with this amount of write buffer. @@ -68,18 +56,21 @@ impl FramedWrite { FramedWrite { inner: inner, hpack: hpack::Encoder::default(), - buf: BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY), - pos: 0, + buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)), next: None, max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, } } fn has_capacity(&self) -> bool { - self.next.is_none() && self.buf.remaining_mut() >= MIN_BUFFER_CAPACITY + self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY } - fn frame_len(&self, data: &Data) -> usize { + fn is_empty(&self) -> bool { + self.next.is_none() && self.buf.has_remaining() + } + + fn frame_len(&self, data: &frame::Data) -> usize { cmp::min(self.max_frame_size, data.len()) } } @@ -105,7 +96,7 @@ impl Sink for FramedWrite { let len = self.frame_len(&v); // Encode the frame head to the buffer - head.encode(len, &mut self.buf); + head.encode(len, self.buf.get_mut()); // Save the data frame self.next = Some(Next::Data { @@ -113,17 +104,19 @@ impl Sink for FramedWrite { data: v, }); } else { - v.encode(&mut self.buf); + v.encode(self.buf.get_mut()); } } Frame::Headers(v) => { - unimplemented!(); + if let Some(continuation) = v.encode(&mut self.hpack, self.buf.get_mut()) { + self.next = Some(Next::Continuation(continuation)); + } } Frame::PushPromise(v) => { unimplemented!(); } Frame::Settings(v) => { - v.encode(&mut self.buf); + v.encode(self.buf.get_mut()); } } @@ -131,7 +124,22 @@ impl Sink for FramedWrite { } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - unimplemented!(); + // TODO: implement + match self.next { + Some(Next::Data { .. }) => unimplemented!(), + _ => {} + } + + // As long as there is data to write, try to write it! + while !self.is_empty() { + try_ready!(self.inner.write_buf(&mut self.buf)); + } + + // Clear internal buffer + self.buf.set_position(0); + self.buf.get_mut().clear(); + + Ok(Async::Ready(())) } fn close(&mut self) -> Poll<(), ConnectionError> {