FramedWrite

This commit is contained in:
Carl Lerche
2017-06-19 13:34:08 -07:00
parent 327ca79550
commit 29951da962
7 changed files with 159 additions and 49 deletions

View File

@@ -9,6 +9,7 @@ tokio-io = "0.1"
tokio-timer = { git = "https://github.com/tokio-rs/tokio-timer" } tokio-timer = { git = "https://github.com/tokio-rs/tokio-timer" }
bytes = "0.4" bytes = "0.4"
http = { path = "/Users/carllerche/Code/Oss/Tokio/tower/http" } http = { path = "/Users/carllerche/Code/Oss/Tokio/tower/http" }
byteorder = "1.0"
log = "0.3.8" log = "0.3.8"
# tower = { path = "/Users/carllerche/Code/Oss/Tokio/tower/tower-http" } # tower = { path = "/Users/carllerche/Code/Oss/Tokio/tower/tower-http" }
fnv = "1.0.5" fnv = "1.0.5"

View File

@@ -1,8 +1,13 @@
use super::StreamId; use super::StreamId;
use {frame, hpack};
use frame::{Head, Kind};
use util::byte_str::ByteStr; use util::byte_str::ByteStr;
use http::{Method, StatusCode}; use http::{Method, StatusCode};
use http::header::{self, HeaderMap, HeaderValue}; use http::header::{self, HeaderMap, HeaderName, HeaderValue};
use bytes::BytesMut;
use byteorder::{BigEndian, ByteOrder};
/// Header frame /// Header frame
/// ///
@@ -41,6 +46,18 @@ pub struct PushPromise {
flags: HeadersFlag, flags: HeadersFlag,
} }
#[derive(Debug)]
pub struct Continuation {
/// Stream ID of continuation frame
stream_id: StreamId,
/// Argument to pass to the HPACK encoder to resume encoding
hpack: hpack::EncodeState,
/// remaining headers to encode
headers: Iter,
}
#[derive(Debug)] #[derive(Debug)]
pub struct StreamDependency { pub struct StreamDependency {
/// The ID of the stream dependency target /// The ID of the stream dependency target
@@ -85,6 +102,90 @@ const ALL: u8 = END_STREAM
| PADDED | PADDED
| PRIORITY; | PRIORITY;
// ===== impl Headers =====
impl Headers {
pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut)
-> Option<Continuation>
{
let head = self.head();
let pos = dst.len();
// At this point, we don't know how big the h2 frame will be.
// So, we write the head with length 0, then write the body, and
// finally write the length once we know the size.
head.encode(0, dst);
// Encode the frame
let mut headers = Iter {
pseudo: Some(self.pseudo),
headers: self.headers.into_iter(),
};
let ret = match encoder.encode(None, &mut headers, dst) {
hpack::Encode::Full => None,
hpack::Encode::Partial(state) => {
Some(Continuation {
stream_id: self.stream_id,
hpack: state,
headers: headers,
})
}
};
// Compute the frame length
let len = (dst.len() - pos) - frame::HEADER_LEN;
// Write the frame length
BigEndian::write_u32(&mut dst[pos..pos+3], len as u32);
ret
}
fn head(&self) -> Head {
Head::new(Kind::Data, self.flags.into(), self.stream_id)
}
}
// ===== impl Iter =====
impl Iterator for Iter {
type Item = hpack::Header<Option<HeaderName>>;
fn next(&mut self) -> Option<Self::Item> {
use hpack::Header::*;
if let Some(ref mut pseudo) = self.pseudo {
if let Some(method) = pseudo.method.take() {
return Some(Method(method));
}
if let Some(scheme) = pseudo.scheme.take() {
return Some(Scheme(scheme));
}
if let Some(authority) = pseudo.authority.take() {
return Some(Authority(authority));
}
if let Some(path) = pseudo.path.take() {
return Some(Path(path));
}
if let Some(status) = pseudo.status.take() {
return Some(Status(status));
}
}
self.pseudo = None;
self.headers.next()
.map(|(name, value)| {
Field { name: name, value: value}
})
}
}
// ===== impl HeadersFlag ===== // ===== impl HeadersFlag =====
impl HeadersFlag { impl HeadersFlag {
@@ -112,3 +213,9 @@ impl HeadersFlag {
self.0 & PRIORITY == PRIORITY self.0 & PRIORITY == PRIORITY
} }
} }
impl From<HeadersFlag> for u8 {
fn from(src: HeadersFlag) -> u8 {
src.0
}
}

View File

@@ -32,7 +32,7 @@ mod util;
pub use self::data::Data; pub use self::data::Data;
pub use self::head::{Head, Kind, StreamId}; pub use self::head::{Head, Kind, StreamId};
pub use self::headers::{Headers, PushPromise}; pub use self::headers::{Headers, PushPromise, Continuation};
pub use self::settings::{Settings, SettingSet}; pub use self::settings::{Settings, SettingSet};
// Re-export some constants // Re-export some constants

View File

@@ -76,7 +76,7 @@ impl Encoder {
/// Encode a set of headers into the provide buffer /// Encode a set of headers into the provide buffer
pub fn encode<I>(&mut self, resume: Option<EncodeState>, headers: &mut I, dst: &mut BytesMut) pub fn encode<I>(&mut self, resume: Option<EncodeState>, headers: &mut I, dst: &mut BytesMut)
-> Result<Encode, EncoderError> -> Encode
where I: Iterator<Item=Header<Option<HeaderName>>>, where I: Iterator<Item=Header<Option<HeaderName>>>,
{ {
let len = dst.len(); let len = dst.len();
@@ -86,7 +86,7 @@ impl Encoder {
dst.truncate(len); dst.truncate(len);
} }
return Err(e); unreachable!();
} }
if let Some(resume) = resume { if let Some(resume) = resume {
@@ -104,9 +104,9 @@ impl Encoder {
} }
}; };
if try!(is_buffer_overflow(res)) { if res.is_err() {
dst.truncate(len); dst.truncate(len);
return Ok(Encode::Partial(resume)); return Encode::Partial(resume);
} }
} }
@@ -122,12 +122,12 @@ impl Encoder {
let index = self.table.index(header); let index = self.table.index(header);
let res = self.encode_header(&index, dst); let res = self.encode_header(&index, dst);
if try!(is_buffer_overflow(res)) { if res.is_err() {
dst.truncate(len); dst.truncate(len);
return Ok(Encode::Partial(EncodeState { return Encode::Partial(EncodeState {
index: index, index: index,
value: None, value: None,
})); });
} }
last_index = Some(index); last_index = Some(index);
@@ -142,18 +142,18 @@ impl Encoder {
&value, &value,
dst); dst);
if try!(is_buffer_overflow(res)) { if res.is_err() {
dst.truncate(len); dst.truncate(len);
return Ok(Encode::Partial(EncodeState { return Encode::Partial(EncodeState {
index: last_index.unwrap(), index: last_index.unwrap(),
value: Some(value), value: Some(value),
})); });
} }
} }
}; };
} }
Ok(Encode::Full) Encode::Full
} }
fn encode_size_updates(&mut self, dst: &mut BytesMut) -> Result<(), EncoderError> { fn encode_size_updates(&mut self, dst: &mut BytesMut) -> Result<(), EncoderError> {
@@ -417,14 +417,6 @@ fn encode_int_one_byte(value: usize, prefix_bits: usize) -> bool {
value < (1 << prefix_bits) - 1 value < (1 << prefix_bits) - 1
} }
fn is_buffer_overflow(res: Result<(), EncoderError>) -> Result<bool, EncoderError> {
match res {
Err(EncoderError::BufferOverflow) => Ok(true),
Err(e) => Err(e),
Ok(_) => Ok(false),
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
@@ -789,7 +781,7 @@ mod test {
}, },
].into_iter(); ].into_iter();
let resume = match encoder.encode(None, &mut input, &mut dst).unwrap() { let resume = match encoder.encode(None, &mut input, &mut dst) {
Encode::Partial(r) => r, Encode::Partial(r) => r,
_ => panic!(), _ => panic!(),
}; };
@@ -801,7 +793,7 @@ mod test {
dst.clear(); dst.clear();
match encoder.encode(Some(resume), &mut input, &mut dst).unwrap() { match encoder.encode(Some(resume), &mut input, &mut dst) {
Encode::Full => {} Encode::Full => {}
_ => panic!(), _ => panic!(),
} }

View File

@@ -214,7 +214,7 @@ impl FuzzHpack {
} }
loop { loop {
match encoder.encode(index.take(), &mut input, &mut buf).unwrap() { match encoder.encode(index.take(), &mut input, &mut buf) {
Encode::Full => break, Encode::Full => break,
Encode::Partial(i) => { Encode::Partial(i) => {
index = Some(i); index = Some(i);
@@ -531,7 +531,7 @@ fn test_story(story: Value) {
Header::new(name.clone().into(), value.clone().into()).unwrap().into() Header::new(name.clone().into(), value.clone().into()).unwrap().into()
}).collect(); }).collect();
encoder.encode(None, &mut input.clone().into_iter(), &mut buf).unwrap(); encoder.encode(None, &mut input.clone().into_iter(), &mut buf);
decoder.decode(&buf.into(), |e| { decoder.decode(&buf.into(), |e| {
assert_eq!(e, input.remove(0).reify().unwrap()); assert_eq!(e, input.remove(0).reify().unwrap());

View File

@@ -16,6 +16,8 @@ extern crate bytes;
// Hash function used for HPACK encoding // Hash function used for HPACK encoding
extern crate fnv; extern crate fnv;
extern crate byteorder;
#[macro_use] #[macro_use]
extern crate log; extern crate log;

View File

@@ -1,5 +1,5 @@
use {ConnectionError, Reason}; use {ConnectionError, Reason};
use frame::{self, Data, Frame, Error, Headers, PushPromise, Settings}; use frame::{self, Frame, Error};
use hpack; use hpack;
use futures::*; use futures::*;
@@ -19,10 +19,7 @@ pub struct FramedWrite<T> {
hpack: hpack::Encoder, hpack: hpack::Encoder,
/// Write buffer /// Write buffer
buf: BytesMut, buf: Cursor<BytesMut>,
/// Position in the frame
pos: usize,
/// Next frame to encode /// Next frame to encode
next: Option<Next>, next: Option<Next>,
@@ -40,16 +37,7 @@ enum Next {
/// Data frame to encode /// Data frame to encode
data: frame::Data data: frame::Data
}, },
Continuation { Continuation(frame::Continuation),
/// Stream ID of continuation frame
stream_id: frame::StreamId,
/// Argument to pass to the HPACK encoder to resume encoding
resume: hpack::EncodeState,
/// remaining headers to encode
rem: header::IntoIter<HeaderValue>,
},
} }
/// Initialze the connection with this amount of write buffer. /// Initialze the connection with this amount of write buffer.
@@ -68,18 +56,21 @@ impl<T: AsyncWrite> FramedWrite<T> {
FramedWrite { FramedWrite {
inner: inner, inner: inner,
hpack: hpack::Encoder::default(), hpack: hpack::Encoder::default(),
buf: BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY), buf: Cursor::new(BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY)),
pos: 0,
next: None, next: None,
max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
} }
} }
fn has_capacity(&self) -> bool { fn has_capacity(&self) -> bool {
self.next.is_none() && self.buf.remaining_mut() >= MIN_BUFFER_CAPACITY self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY
} }
fn frame_len(&self, data: &Data) -> usize { fn is_empty(&self) -> bool {
self.next.is_none() && self.buf.has_remaining()
}
fn frame_len(&self, data: &frame::Data) -> usize {
cmp::min(self.max_frame_size, data.len()) cmp::min(self.max_frame_size, data.len())
} }
} }
@@ -105,7 +96,7 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
let len = self.frame_len(&v); let len = self.frame_len(&v);
// Encode the frame head to the buffer // Encode the frame head to the buffer
head.encode(len, &mut self.buf); head.encode(len, self.buf.get_mut());
// Save the data frame // Save the data frame
self.next = Some(Next::Data { self.next = Some(Next::Data {
@@ -113,17 +104,19 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
data: v, data: v,
}); });
} else { } else {
v.encode(&mut self.buf); v.encode(self.buf.get_mut());
} }
} }
Frame::Headers(v) => { Frame::Headers(v) => {
unimplemented!(); if let Some(continuation) = v.encode(&mut self.hpack, self.buf.get_mut()) {
self.next = Some(Next::Continuation(continuation));
}
} }
Frame::PushPromise(v) => { Frame::PushPromise(v) => {
unimplemented!(); unimplemented!();
} }
Frame::Settings(v) => { Frame::Settings(v) => {
v.encode(&mut self.buf); v.encode(self.buf.get_mut());
} }
} }
@@ -131,7 +124,22 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
} }
fn poll_complete(&mut self) -> Poll<(), ConnectionError> { fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
unimplemented!(); // TODO: implement
match self.next {
Some(Next::Data { .. }) => unimplemented!(),
_ => {}
}
// As long as there is data to write, try to write it!
while !self.is_empty() {
try_ready!(self.inner.write_buf(&mut self.buf));
}
// Clear internal buffer
self.buf.set_position(0);
self.buf.get_mut().clear();
Ok(Async::Ready(()))
} }
fn close(&mut self) -> Poll<(), ConnectionError> { fn close(&mut self) -> Poll<(), ConnectionError> {