Much work

This commit is contained in:
Carl Lerche
2017-06-23 13:13:50 -07:00
parent a3950354aa
commit fa21970656
15 changed files with 531 additions and 77 deletions

View File

@@ -1,11 +1,24 @@
use {frame, proto, ConnectionError};
use {frame, Frame, ConnectionError, Peer, StreamId};
use proto::{self, ReadySink, State};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use http;
use futures::*;
pub struct Connection<T> {
use ordermap::OrderMap;
use fnv::FnvHasher;
use std::marker::PhantomData;
use std::hash::BuildHasherDefault;
/// An H2 connection
pub struct Connection<T, P> {
inner: Inner<T>,
streams: StreamMap<State>,
peer: PhantomData<P>,
}
type Inner<T> =
@@ -15,72 +28,154 @@ type Inner<T> =
proto::FramedRead<
length_delimited::FramedRead<T>>>>>;
impl<T: AsyncRead + AsyncWrite> Connection<T> {
pub fn new(io: T) -> Connection<T> {
// Delimit the frames
let framed_read = length_delimited::Builder::new()
.big_endian()
.length_field_length(3)
.length_adjustment(6)
.num_skip(0) // Don't skip the header
.new_read(io);
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
// Map to `Frame` types
let framed_read = proto::FramedRead::new(framed_read);
/// Returns a new `Connection` backed by the given `io`.
pub fn new<T, P>(io: T) -> Connection<T, P>
where T: AsyncRead + AsyncWrite,
P: Peer,
{
// Frame encoder
let mut framed = proto::FramedWrite::new(framed_read);
// Delimit the frames
let framed_read = length_delimited::Builder::new()
.big_endian()
.length_field_length(3)
.length_adjustment(9)
.num_skip(0) // Don't skip the header
.new_read(io);
// Ok, so this is a **little** hacky, but it works for now.
//
// The ping/pong behavior SHOULD be given highest priority (6.7).
// However, the connection handshake requires the settings frame to be
// sent as the very first one. This needs special handling because
// otherwise there is a race condition where the peer could send its
// settings frame followed immediately by a Ping, in which case, we
// don't want to accidentally send the pong before finishing the
// connection hand shake.
//
// So, to ensure correct ordering, we write the settings frame here
// before fully constructing the connection struct. Technically, `Async`
// operations should not be performed in `new` because this might not
// happen on a task, however we have full control of the I/O and we know
// that the settings frame will get buffered and not actually perform an
// I/O op.
let initial_settings = frame::SettingSet::default();
let frame = frame::Settings::new(initial_settings.clone());
assert!(framed.start_send(frame.into()).unwrap().is_ready());
// Map to `Frame` types
let framed_read = proto::FramedRead::new(framed_read);
// Add ping/pong handler
let ping_pong = proto::PingPong::new(framed);
// Frame encoder
let mut framed = proto::FramedWrite::new(framed_read);
// Add settings handler
let connection = proto::Settings::new(ping_pong, initial_settings);
// Ok, so this is a **little** hacky, but it works for now.
//
// The ping/pong behavior SHOULD be given highest priority (6.7).
// However, the connection handshake requires the settings frame to be
// sent as the very first one. This needs special handling because
// otherwise there is a race condition where the peer could send its
// settings frame followed immediately by a Ping, in which case, we
// don't want to accidentally send the pong before finishing the
// connection hand shake.
//
// So, to ensure correct ordering, we write the settings frame here
// before fully constructing the connection struct. Technically, `Async`
// operations should not be performed in `new` because this might not
// happen on a task, however we have full control of the I/O and we know
// that the settings frame will get buffered and not actually perform an
// I/O op.
let initial_settings = frame::SettingSet::default();
let frame = frame::Settings::new(initial_settings.clone());
assert!(framed.start_send(frame.into()).unwrap().is_ready());
Connection {
inner: connection,
// Add ping/pong handler
let ping_pong = proto::PingPong::new(framed);
// Add settings handler
let connection = proto::Settings::new(ping_pong, initial_settings);
Connection {
inner: connection,
streams: StreamMap::default(),
peer: PhantomData,
}
}
impl<T, P> Connection<T, P>
where T: AsyncRead + AsyncWrite,
P: Peer,
{
/// Completes when the connection has terminated
pub fn poll_shutdown(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.poll_complete());
Ok(Async::NotReady)
}
}
impl<T, P> Stream for Connection<T, P>
where T: AsyncRead + AsyncWrite,
P: Peer,
{
type Item = Frame<P::Poll>;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> {
use frame::Frame::*;
match try_ready!(self.inner.poll()) {
Some(Headers(v)) => unimplemented!(),
Some(frame) => panic!("unexpected frame; frame={:?}", frame),
None => return Ok(Async::Ready(None)),
_ => unimplemented!(),
}
}
}
impl<T: AsyncRead + AsyncWrite> Stream for Connection<T> {
type Item = frame::Frame;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<frame::Frame>, ConnectionError> {
self.inner.poll()
}
}
impl<T: AsyncRead + AsyncWrite> Sink for Connection<T> {
type SinkItem = frame::Frame;
impl<T, P> Sink for Connection<T, P>
where T: AsyncRead + AsyncWrite,
P: Peer,
{
type SinkItem = Frame<P::Send>;
type SinkError = ConnectionError;
fn start_send(&mut self, item: frame::Frame) -> StartSend<frame::Frame, ConnectionError> {
self.inner.start_send(item)
fn start_send(&mut self, item: Self::SinkItem)
-> StartSend<Self::SinkItem, Self::SinkError>
{
// First ensure that the upstream can process a new item
if !try!(self.poll_ready()).is_ready() {
return Ok(AsyncSink::NotReady(item));
}
match item {
Frame::Message { id, message, body } => {
// Ensure ID is valid
try!(P::check_initiating_id(id));
// TODO: Ensure available capacity for a new stream
// This won't be as simple as self.streams.len() as closed
// connections should not be factored.
// Transition the stream state, creating a new entry if needed
try!(self.streams.entry(id)
.or_insert(State::default())
.send_headers());
let message = P::convert_send_message(id, message, body);
// TODO: Handle trailers and all that jazz
// We already ensured that the upstream can handle the frame, so
// panic if it gets rejected.
let res = try!(self.inner.start_send(frame::Frame::Headers(message.frame)));
// This is a one-way conversion. By checking `poll_ready` first,
// it's already been determined that the inner `Sink` can accept
// the item. If the item is rejected, then there is a bug.
assert!(res.is_ready());
Ok(AsyncSink::Ready)
}
Frame::Body { id, chunk } => {
unimplemented!();
}
Frame::Error { id, error } => {
unimplemented!();
}
}
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_complete()
}
}
impl<T, P> ReadySink for Connection<T, P>
where T: AsyncRead + AsyncWrite,
P: Peer,
{
fn poll_ready(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_ready()
}
}

View File

@@ -54,6 +54,13 @@ impl<T> FramedRead<T> {
// TODO: Change to drain: carllerche/bytes#130
let frame = try!(frame::Headers::load(head, &mut buf, &mut self.hpack));
if !frame.is_end_headers() {
// Wait for continuation frames
self.partial = Some(Partial::Headers(frame));
return Ok(None);
}
frame.into()
}
Kind::Priority => unimplemented!(),
@@ -88,6 +95,7 @@ impl<T> Stream for FramedRead<T>
};
if let Some(frame) = try!(self.decode_frame(bytes)) {
debug!("poll; frame={:?}", frame);
return Ok(Async::Ready(Some(frame)));
}
}

View File

@@ -1,6 +1,6 @@
use {ConnectionError, Reason};
use {hpack, ConnectionError, Reason};
use frame::{self, Frame, Error};
use hpack;
use proto::ReadySink;
use futures::*;
use tokio_io::AsyncWrite;
@@ -67,7 +67,7 @@ impl<T: AsyncWrite> FramedWrite<T> {
}
fn is_empty(&self) -> bool {
self.next.is_none() && self.buf.has_remaining()
self.next.is_none() && !self.buf.has_remaining()
}
fn frame_len(&self, data: &frame::Data) -> usize {
@@ -80,13 +80,10 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
type SinkError = ConnectionError;
fn start_send(&mut self, item: Frame) -> StartSend<Frame, ConnectionError> {
if self.has_capacity() {
// Try flushing
try!(self.poll_complete());
debug!("start_send; frame={:?}", item);
if self.has_capacity() {
return Ok(AsyncSink::NotReady(item));
}
if !try!(self.poll_ready()).is_ready() {
return Ok(AsyncSink::NotReady(item));
}
match item {
@@ -117,6 +114,7 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
}
Frame::Settings(v) => {
v.encode(self.buf.get_mut());
trace!("encoded settings; rem={:?}", self.buf.remaining());
}
}
@@ -124,6 +122,8 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
trace!("FramedWrite::poll_complete");
// TODO: implement
match self.next {
Some(Next::Data { .. }) => unimplemented!(),
@@ -132,9 +132,14 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
// As long as there is data to write, try to write it!
while !self.is_empty() {
trace!("writing buffer; next={:?}; rem={:?}", self.next, self.buf.remaining());
try_ready!(self.inner.write_buf(&mut self.buf));
}
trace!("flushing buffer");
// Flush the upstream
try_nb!(self.inner.flush());
// Clear internal buffer
self.buf.set_position(0);
self.buf.get_mut().clear();
@@ -148,6 +153,21 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
}
}
impl<T: AsyncWrite> ReadySink for FramedWrite<T> {
fn poll_ready(&mut self) -> Poll<(), Self::SinkError> {
if !self.has_capacity() {
// Try flushing
try!(self.poll_complete());
if !self.has_capacity() {
return Ok(Async::NotReady);
}
}
Ok(Async::Ready(()))
}
}
impl<T: Stream> Stream for FramedWrite<T> {
type Item = T::Item;
type Error = T::Error;

View File

@@ -2,10 +2,34 @@ mod connection;
mod framed_read;
mod framed_write;
mod ping_pong;
mod ready;
mod settings;
mod state;
pub use self::connection::Connection;
pub use self::connection::{Connection, new as new_connection};
pub use self::framed_read::FramedRead;
pub use self::framed_write::FramedWrite;
pub use self::ping_pong::PingPong;
pub use self::ready::ReadySink;
pub use self::settings::Settings;
pub use self::state::State;
use frame;
/// A request or response issued by the current process.
pub struct SendMessage {
frame: frame::Headers,
}
/// A request or response received by the current process.
pub struct PollMessage {
frame: frame::Headers,
}
impl SendMessage {
pub fn new(frame: frame::Headers) -> Self {
SendMessage {
frame: frame,
}
}
}

View File

@@ -1,5 +1,6 @@
use ConnectionError;
use frame::Frame;
use proto::ReadySink;
use futures::*;
@@ -45,3 +46,13 @@ impl<T> Sink for PingPong<T>
self.inner.poll_complete()
}
}
impl<T> ReadySink for PingPong<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
T: ReadySink,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}

5
src/proto/ready.rs Normal file
View File

@@ -0,0 +1,5 @@
use futures::{Sink, Poll};
pub trait ReadySink: Sink {
fn poll_ready(&mut self) -> Poll<(), Self::SinkError>;
}

View File

@@ -1,16 +1,22 @@
use ConnectionError;
use frame::{self, Frame};
use proto::ReadySink;
use futures::*;
pub struct Settings<T> {
// Upstream transport
inner: T,
// Our settings
local: frame::SettingSet,
// Peer settings
remote: frame::SettingSet,
// Number of acks remaining to send to the peer
remaining_acks: usize,
// True when the local settings must be flushed to the remote
is_dirty: bool,
}
@@ -30,17 +36,13 @@ impl<T> Settings<T>
local: local,
remote: frame::SettingSet::default(),
remaining_acks: 0,
is_dirty: false,
is_dirty: true,
}
}
fn has_pending_sends(&self) -> bool {
self.is_dirty || self.remaining_acks > 0
}
fn try_send_pending(&mut self) -> Poll<(), ConnectionError> {
if self.is_dirty {
let frame = frame::Settings::new(self.local.clone()).into();
let frame = frame::Settings::new(self.local.clone());
try_ready!(self.try_send(frame));
self.is_dirty = false;
@@ -56,8 +58,8 @@ impl<T> Settings<T>
Ok(Async::Ready(()))
}
fn try_send(&mut self, item: frame::Frame) -> Poll<(), ConnectionError> {
if let AsyncSink::NotReady(_) = try!(self.inner.start_send(item)) {
fn try_send(&mut self, item: frame::Settings) -> Poll<(), ConnectionError> {
if let AsyncSink::NotReady(_) = try!(self.inner.start_send(item.into())) {
// Ensure that call to `poll_complete` guarantee is called to satisfied
try!(self.inner.poll_complete());
@@ -110,3 +112,17 @@ impl<T> Sink for Settings<T>
self.inner.close()
}
}
impl<T> ReadySink for Settings<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
T: ReadySink,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
if try!(self.try_send_pending()).is_ready() {
return self.inner.poll_ready();
}
Ok(Async::NotReady)
}
}

72
src/proto/state.rs Normal file
View File

@@ -0,0 +1,72 @@
use ConnectionError;
/// Represents the state of an H2 stream
///
/// ```not_rust
/// +--------+
/// send PP | | recv PP
/// ,--------| idle |--------.
/// / | | \
/// v +--------+ v
/// +----------+ | +----------+
/// | | | send H / | |
/// ,------| reserved | | recv H | reserved |------.
/// | | (local) | | | (remote) | |
/// | +----------+ v +----------+ |
/// | | +--------+ | |
/// | | recv ES | | send ES | |
/// | send H | ,-------| open |-------. | recv H |
/// | | / | | \ | |
/// | v v +--------+ v v |
/// | +----------+ | +----------+ |
/// | | half | | | half | |
/// | | closed | | send R / | closed | |
/// | | (remote) | | recv R | (local) | |
/// | +----------+ | +----------+ |
/// | | | | |
/// | | send ES / | recv ES / | |
/// | | send R / v send R / | |
/// | | recv R +--------+ recv R | |
/// | send R / `----------->| |<-----------' send R / |
/// | recv R | closed | recv R |
/// `----------------------->| |<----------------------'
/// +--------+
///
/// send: endpoint sends this frame
/// recv: endpoint receives this frame
///
/// H: HEADERS frame (with implied CONTINUATIONs)
/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
/// ES: END_STREAM flag
/// R: RST_STREAM frame
/// ```
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum State {
Idle,
ReservedLocal,
ReservedRemote,
Open,
HalfClosedLocal,
HalfClosedRemote,
Closed,
}
impl State {
/// Transition the state to represent headers being sent.
///
/// Returns an error if this is an invalid state transition.
pub fn send_headers(&mut self) -> Result<(), ConnectionError> {
if *self != State::Idle {
unimplemented!();
}
*self = State::Open;
Ok(())
}
}
impl Default for State {
fn default() -> State {
State::Idle
}
}