Bunch of work

This commit is contained in:
Carl Lerche
2017-06-16 16:37:51 -07:00
parent c12a9a86ae
commit f6fd6a6d6e
10 changed files with 204 additions and 127 deletions

View File

@@ -1,6 +1,7 @@
use frame::{util, Head, Error, StreamId};
use bytes::Bytes;
use frame::{util, Head, Error, StreamId, Kind};
use bytes::{BufMut, Bytes};
#[derive(Debug)]
pub struct Data {
stream_id: StreamId,
data: Bytes,
@@ -33,6 +34,23 @@ impl Data {
pad_len: pad_len,
})
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn encode<T: BufMut>(&self, dst: &mut T) {
self.head().encode(self.len(), dst);
dst.put(&self.data);
}
pub fn head(&self) -> Head {
Head::new(Kind::Data, self.flags.into(), self.stream_id)
}
pub fn into_payload(self) -> Bytes {
self.data
}
}
@@ -57,3 +75,9 @@ impl DataFlag {
self.0 & PADDED == PADDED
}
}
impl From<DataFlag> for u8 {
fn from(src: DataFlag) -> u8 {
src.0
}
}

View File

@@ -65,7 +65,7 @@ impl Head {
super::HEADER_LEN
}
pub fn encode<T: BufMut>(&self, payload_len: usize, dst: &mut T) -> Result<(), Error> {
pub fn encode<T: BufMut>(&self, payload_len: usize, dst: &mut T) {
debug_assert_eq!(self.encode_len(), dst.remaining_mut());
debug_assert!(self.stream_id & STREAM_ID_MASK == 0);
@@ -73,7 +73,6 @@ impl Head {
dst.put_u8(self.kind as u8);
dst.put_u8(self.flag);
dst.put_u32::<BigEndian>(self.stream_id);
Ok(())
}
}

View File

@@ -1,43 +1,79 @@
use super::StreamId;
use util::byte_str::ByteStr;
use http::{Method, StatusCode};
use http::header::{self, HeaderMap, HeaderValue};
/// Header frame
///
/// This could be either a request or a response.
#[derive(Debug)]
pub struct Headers {
/// The ID of the stream with which this frame is associated.
stream_id: StreamId,
/// The stream dependency information, if any.
stream_dep: Option<StreamDependency>,
/// The decoded headers
headers: HeaderMap,
headers: HeaderMap<HeaderValue>,
/// Pseudo headers, these are broken out as they must be sent as part of the
/// headers frame.
pseudo: Pseudo,
flags: HeaderFlag,
/// The associated flags
flags: HeadersFlag,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)]
pub struct HeadersFlag(u8);
pub struct PushPromise;
#[derive(Debug)]
pub struct PushPromise {
/// The ID of the stream with which this frame is associated.
stream_id: StreamId,
/// The ID of the stream being reserved by this PushPromise.
promised_id: StreamId,
/// The associated flags
flags: HeadersFlag,
}
#[derive(Debug)]
pub struct StreamDependency {
/// The ID of the stream dependency target
stream_id: StreamId,
/// The weight for the stream. The value exposed (and set) here is always in
/// the range [0, 255], instead of [1, 256] (as defined in section 5.3.2.)
/// so that the value fits into a `u8`.
weight: u8,
/// True if the stream dependency is exclusive.
is_exclusive: bool,
}
#[derive(Debug)]
pub struct Pseudo {
// Request
method: Option<()>,
scheme: Option<()>,
authority: Option<()>,
path: Option<()>,
method: Option<Method>,
scheme: Option<ByteStr>,
authority: Option<ByteStr>,
path: Option<ByteStr>,
// Response
status: Option<()>,
status: Option<StatusCode>,
}
#[derive(Debug)]
pub struct Iter {
/// Pseudo headers
pseudo: Option<Pseudo>,
/// Headers
headers: header::IntoIter<HeaderValue>,
}
const END_STREAM: u8 = 0x1;
@@ -52,7 +88,7 @@ const ALL: u8 = END_STREAM
// ===== impl HeadersFlag =====
impl HeadersFlag {
pub empty() -> HeadersFlag {
pub fn empty() -> HeadersFlag {
HeadersFlag(0)
}

View File

@@ -26,31 +26,29 @@ macro_rules! unpack_octets_4 {
mod data;
mod head;
mod headers;
mod settings;
mod unknown;
mod util;
pub use self::data::Data;
pub use self::head::{Head, Kind, StreamId};
pub use self::headers::{Headers, PushPromise};
pub use self::settings::{Settings, SettingSet};
pub use self::unknown::Unknown;
// Re-export some constants
pub use self::settings::{
DEFAULT_SETTINGS_HEADER_TABLE_SIZE,
DEFAULT_MAX_FRAME_SIZE,
};
pub const HEADER_LEN: usize = 9;
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug /*, Clone, PartialEq */)]
pub enum Frame {
/*
Data(DataFrame<'a>),
HeadersFrame(HeadersFrame<'a>),
RstStreamFrame(RstStreamFrame),
SettingsFrame(SettingsFrame),
PingFrame(PingFrame),
GoawayFrame(GoawayFrame<'a>),
WindowUpdateFrame(WindowUpdateFrame),
UnknownFrame(RawFrame<'a>),
*/
Data(Data),
Headers(Headers),
PushPromise(PushPromise),
Settings(Settings),
Unknown(Unknown),
}
/// Errors that can occur during parsing an HTTP/2 frame.
@@ -99,40 +97,6 @@ pub enum Error {
// ===== impl Frame ======
impl Frame {
pub fn load(mut frame: Bytes) -> Result<Frame, Error> {
let head = Head::parse(&frame);
// Extract the payload from the frame
let _ = frame.drain_to(HEADER_LEN);
match head.kind() {
Kind::Unknown => {
let unknown = Unknown::new(head, frame);
Ok(Frame::Unknown(unknown))
}
_ => unimplemented!(),
}
}
pub fn encode_len(&self) -> usize {
use self::Frame::*;
match *self {
Settings(ref frame) => frame.encode_len(),
Unknown(ref frame) => frame.encode_len(),
}
}
pub fn encode(&self, dst: &mut BytesMut) -> Result<(), Error> {
use self::Frame::*;
debug_assert!(dst.remaining_mut() >= self.encode_len());
match *self {
Settings(ref frame) => frame.encode(dst),
Unknown(ref frame) => frame.encode(dst),
}
}
}
// ===== impl Error =====

View File

@@ -37,6 +37,9 @@ pub struct SettingsFlag(u8);
const ACK: u8 = 0x1;
const ALL: u8 = ACK;
pub const DEFAULT_SETTINGS_HEADER_TABLE_SIZE: usize = 4_096;
pub const DEFAULT_MAX_FRAME_SIZE: usize = 16_384;
// ===== impl Settings =====
impl Settings {
@@ -111,27 +114,21 @@ impl Settings {
Ok(settings)
}
pub fn encode_len(&self) -> usize {
super::HEADER_LEN + self.payload_len()
}
fn payload_len(&self) -> usize {
let mut len = 0;
self.for_each(|_| len += 6);
len
}
pub fn encode(&self, dst: &mut BytesMut) -> Result<(), Error> {
pub fn encode(&self, dst: &mut BytesMut) {
// Create & encode an appropriate frame head
let head = Head::new(Kind::Settings, self.flag.into(), 0);
let payload_len = self.payload_len();
try!(head.encode(payload_len, dst));
head.encode(payload_len, dst);
// Encode the settings
self.for_each(|setting| setting.encode(dst));
Ok(())
}
fn for_each<F: FnMut(Setting)>(&self, mut f: F) {

View File

@@ -4,16 +4,21 @@ use super::table::{Table, Index};
use http::header::{HeaderName, HeaderValue};
use bytes::{BytesMut, BufMut};
#[derive(Debug)]
pub struct Encoder {
table: Table,
size_update: Option<SizeUpdate>,
}
#[derive(Debug)]
pub enum Encode {
Full,
Partial(Index),
Partial(EncodeState),
}
#[derive(Debug)]
pub struct EncodeState(Index);
#[derive(Debug, PartialEq, Eq)]
pub enum EncoderError {
BufferOverflow,
@@ -67,7 +72,7 @@ impl Encoder {
}
/// Encode a set of headers into the provide buffer
pub fn encode<I>(&mut self, resume: Option<Index>, headers: &mut I, dst: &mut BytesMut)
pub fn encode<I>(&mut self, resume: Option<EncodeState>, headers: &mut I, dst: &mut BytesMut)
-> Result<Encode, EncoderError>
where I: Iterator<Item=Header>,
{
@@ -81,13 +86,13 @@ impl Encoder {
return Err(e);
}
if let Some(index) = resume {
if let Some(resume) = resume {
let len = dst.len();
match self.encode_header(&index, dst) {
match self.encode_header(&resume.0, dst) {
Err(EncoderError::BufferOverflow) => {
dst.truncate(len);
return Ok(Encode::Partial(index));
return Ok(Encode::Partial(resume));
}
Err(e) => return Err(e),
Ok(_) => {}
@@ -101,7 +106,7 @@ impl Encoder {
match self.encode_header(&index, dst) {
Err(EncoderError::BufferOverflow) => {
dst.truncate(len);
return Ok(Encode::Partial(index));
return Ok(Encode::Partial(EncodeState(index)));
}
Err(e) => return Err(e),
Ok(_) => {}

View File

@@ -7,7 +7,6 @@ mod table;
#[cfg(test)]
mod test;
pub use self::encoder::{Encoder, Encode, EncoderError};
pub use self::encoder::{Encoder, Encode, EncoderError, EncodeState};
pub use self::header::Header;
pub use self::decoder::{Decoder, DecoderError};
pub use self::table::Index;

View File

@@ -9,6 +9,7 @@ use std::collections::VecDeque;
use std::hash::{Hash, Hasher};
/// HPACK encoder table
#[derive(Debug)]
pub struct Table {
mask: usize,
indices: Vec<Option<Pos>>,
@@ -37,6 +38,7 @@ pub enum Index {
NotIndexed(Header),
}
#[derive(Debug)]
struct Slot {
hash: HashValue,
header: Header,

View File

@@ -1,5 +1,6 @@
use ConnectionError;
use {hpack, ConnectionError};
use frame::{self, Frame, Kind};
use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE;
use tokio_io::AsyncWrite;
@@ -12,7 +13,7 @@ pub struct FramedRead<T> {
inner: T,
// hpack decoder state
// hpack: hpack::Decoder,
hpack: hpack::Decoder,
}
@@ -23,6 +24,7 @@ impl<T> FramedRead<T>
pub fn new(inner: T) -> FramedRead<T> {
FramedRead {
inner: inner,
hpack: hpack::Decoder::new(DEFAULT_SETTINGS_HEADER_TABLE_SIZE),
}
}
}
@@ -46,10 +48,7 @@ impl<T> FramedRead<T> {
Kind::GoAway => unimplemented!(),
Kind::WindowUpdate => unimplemented!(),
Kind::Continuation => unimplemented!(),
Kind::Unknown => {
let _ = bytes.split_to(frame::HEADER_LEN);
frame::Unknown::new(head, bytes).into()
}
Kind::Unknown => return Ok(None),
};
Ok(Some(frame))

View File

@@ -1,33 +1,86 @@
use {ConnectionError, Reason};
use frame::{Frame, Error};
use frame::{self, Data, Frame, Error, Headers, PushPromise, Settings};
use hpack;
use tokio_io::AsyncWrite;
use futures::*;
use bytes::{BytesMut, Buf, BufMut};
use tokio_io::AsyncWrite;
use bytes::{Bytes, BytesMut, Buf, BufMut};
use http::header::{self, HeaderValue};
use std::cmp;
use std::io::{self, Cursor};
#[derive(Debug)]
pub struct FramedWrite<T> {
/// Upstream `AsyncWrite`
inner: T,
buf: Cursor<BytesMut>,
/// HPACK encoder
hpack: hpack::Encoder,
/// Write buffer
buf: BytesMut,
/// Position in the frame
pos: usize,
/// Next frame to encode
next: Option<Next>,
/// Max frame size, this is specified by the peer
max_frame_size: usize,
}
const DEFAULT_BUFFER_CAPACITY: usize = 8 * 1_024;
const MAX_BUFFER_CAPACITY: usize = 16 * 1_024;
#[derive(Debug)]
enum Next {
Data {
/// Length of the current frame being written
frame_len: usize,
/// Data frame to encode
data: frame::Data
},
Continuation {
/// Stream ID of continuation frame
stream_id: frame::StreamId,
/// Argument to pass to the HPACK encoder to resume encoding
resume: hpack::EncodeState,
/// remaining headers to encode
rem: header::IntoIter<HeaderValue>,
},
}
/// Initialze the connection with this amount of write buffer.
const DEFAULT_BUFFER_CAPACITY: usize = 4 * 1_024;
/// Min buffer required to attempt to write a frame
const MIN_BUFFER_CAPACITY: usize = frame::HEADER_LEN + CHAIN_THRESHOLD;
/// Chain payloads bigger than this. The remote will never advertise a max frame
/// size less than this (well, the spec says the max frame size can't be less
/// than 16kb, so not even close).
const CHAIN_THRESHOLD: usize = 256;
impl<T: AsyncWrite> FramedWrite<T> {
pub fn new(inner: T) -> FramedWrite<T> {
let buf = BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY);
FramedWrite {
inner: inner,
buf: Cursor::new(buf),
hpack: hpack::Encoder::default(),
buf: BytesMut::with_capacity(DEFAULT_BUFFER_CAPACITY),
pos: 0,
next: None,
max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE,
}
}
fn write_buf(&mut self) -> &mut BytesMut {
self.buf.get_mut()
fn has_capacity(&self) -> bool {
self.next.is_none() && self.buf.remaining_mut() >= MIN_BUFFER_CAPACITY
}
fn frame_len(&self, data: &Data) -> usize {
cmp::min(self.max_frame_size, data.len())
}
}
@@ -36,50 +89,49 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
type SinkError = ConnectionError;
fn start_send(&mut self, item: Frame) -> StartSend<Frame, ConnectionError> {
let len = item.encode_len();
if len > MAX_BUFFER_CAPACITY {
// This case should never happen. Large frames should be chunked at
// a higher level, so this is an internal error.
return Err(ConnectionError::Proto(Reason::InternalError));
}
if self.write_buf().remaining_mut() <= len {
// Try flushing the buffer
if self.has_capacity() {
// Try flushing
try!(self.poll_complete());
let rem = self.write_buf().remaining_mut();
let additional = len - rem;
if self.write_buf().capacity() + additional > MAX_BUFFER_CAPACITY {
if self.has_capacity() {
return Ok(AsyncSink::NotReady(item));
}
// Grow the buffer
self.write_buf().reserve(additional);
}
// At this point, the buffer contains enough space
item.encode(self.write_buf());
match item {
Frame::Data(v) => {
if v.len() >= CHAIN_THRESHOLD {
let head = v.head();
let len = self.frame_len(&v);
// Encode the frame head to the buffer
head.encode(len, &mut self.buf);
// Save the data frame
self.next = Some(Next::Data {
frame_len: len,
data: v,
});
} else {
v.encode(&mut self.buf);
}
}
Frame::Headers(v) => {
unimplemented!();
}
Frame::PushPromise(v) => {
unimplemented!();
}
Frame::Settings(v) => {
v.encode(&mut self.buf);
}
}
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
while self.buf.has_remaining() {
try_ready!(self.inner.write_buf(&mut self.buf));
if !self.buf.has_remaining() {
// Reset the buffer
self.write_buf().clear();
self.buf.set_position(0);
}
}
// Try flushing the underlying IO
try_nb!(self.inner.flush());
return Ok(Async::Ready(()));
unimplemented!();
}
fn close(&mut self) -> Poll<(), ConnectionError> {