From fa21970656056ca89e37c8941c0049b44049ad71 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 23 Jun 2017 13:13:50 -0700 Subject: [PATCH] Much work --- Cargo.toml | 11 ++- src/client.rs | 107 ++++++++++++++++++++ src/frame/head.rs | 2 +- src/frame/headers.rs | 49 +++++++++- src/frame/mod.rs | 2 +- src/frame/settings.rs | 1 + src/lib.rs | 43 +++++++- src/proto/connection.rs | 201 ++++++++++++++++++++++++++++---------- src/proto/framed_read.rs | 8 ++ src/proto/framed_write.rs | 38 +++++-- src/proto/mod.rs | 26 ++++- src/proto/ping_pong.rs | 11 +++ src/proto/ready.rs | 5 + src/proto/settings.rs | 32 ++++-- src/proto/state.rs | 72 ++++++++++++++ 15 files changed, 531 insertions(+), 77 deletions(-) create mode 100644 src/client.rs create mode 100644 src/proto/ready.rs create mode 100644 src/proto/state.rs diff --git a/Cargo.toml b/Cargo.toml index 4329e25..55955d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,11 +8,11 @@ futures = "0.1" tokio-io = "0.1" tokio-timer = { git = "https://github.com/tokio-rs/tokio-timer" } bytes = "0.4" -http = { path = "/Users/carllerche/Code/Oss/Tokio/tower/http" } +http = { git = "https://github.com/carllerche/http" } byteorder = "1.0" log = "0.3.8" -# tower = { path = "/Users/carllerche/Code/Oss/Tokio/tower/tower-http" } fnv = "1.0.5" +ordermap = "0.2.10" [dev-dependencies] hex = "0.2.0" @@ -21,3 +21,10 @@ serde = "1.0.0" serde_json = "1.0.0" quickcheck = "0.4.1" rand = "0.3.15" + +# Akamai example +tokio-core = "0.1" +openssl = { version = "0.9.14", "features" = ["v102"] } +tokio-openssl = "0.1.3" +env_logger = "0.4.3" +io-dump = { path = "/Users/carllerche/Code/Oss/Tokio/util/io-dump" } diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..5531e87 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,107 @@ +use {frame, proto, Frame, Peer, ConnectionError, StreamId}; + +use http; + +use futures::{Future, Poll}; + +use tokio_io::{AsyncRead, AsyncWrite}; + +/// In progress H2 connection binding +pub struct Handshake { + // TODO: unbox + inner: Box, Error = ConnectionError>>, +} + +/// Marker type indicating a client peer +pub struct Client; + +pub type Connection = super::Connection; + +/// Bind an H2 client connection. +/// +/// Returns a future which resolves to the connection value once the H2 +/// handshake has been completed. +pub fn bind(io: T) -> Handshake + where T: AsyncRead + AsyncWrite + 'static, +{ + use tokio_io::io; + + debug!("binding client connection"); + + let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + .map(|(io, _)| { + debug!("client connection bound"); + proto::new_connection(io) + }) + .map_err(ConnectionError::from); + + Handshake { inner: Box::new(handshake) } +} + +impl Peer for Client { + type Send = http::request::Head; + type Poll = http::response::Head; + + fn check_initiating_id(id: StreamId) -> Result<(), ConnectionError> { + if id % 2 == 0 { + // Client stream identifiers must be odd + unimplemented!(); + } + + // TODO: Ensure the `id` doesn't overflow u31 + + Ok(()) + } + + fn convert_send_message( + id: StreamId, + message: Self::Send, + body: bool) -> proto::SendMessage + { + use http::request::Head; + + // Extract the components of the HTTP request + let Head { method, uri, headers, .. } = message; + + // Build the set pseudo header set. All requests will include `method` + // and `path`. + let mut pseudo = frame::Pseudo::request(method, uri.path().into()); + + // If the URI includes a scheme component, add it to the pseudo headers + if let Some(scheme) = uri.scheme() { + pseudo.set_scheme(scheme.into()); + } + + // If the URI includes an authority component, add it to the pseudo + // headers + if let Some(authority) = uri.authority() { + pseudo.set_authority(authority.into()); + } + + // Create the HEADERS frame + let mut frame = frame::Headers::new(id, pseudo, headers); + + // TODO: Factor in trailers + if !body { + frame.set_end_stream(); + } else { + unimplemented!(); + } + + // Return the `SendMessage` + proto::SendMessage::new(frame) + } + + fn convert_poll_message(message: proto::PollMessage) -> Frame { + unimplemented!(); + } +} + +impl Future for Handshake { + type Item = Connection; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll { + self.inner.poll() + } +} diff --git a/src/frame/head.rs b/src/frame/head.rs index 2f29404..0dd57e4 100644 --- a/src/frame/head.rs +++ b/src/frame/head.rs @@ -66,7 +66,7 @@ impl Head { } pub fn encode(&self, payload_len: usize, dst: &mut T) { - debug_assert_eq!(self.encode_len(), dst.remaining_mut()); + debug_assert!(self.encode_len() <= dst.remaining_mut()); debug_assert!(self.stream_id & STREAM_ID_MASK == 0); dst.put_uint::(payload_len as u64, 3); diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 2817354..64119ae 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -34,7 +34,7 @@ pub struct Headers { flags: HeadersFlag, } -#[derive(Debug, Copy, Clone, Eq, PartialEq, Default)] +#[derive(Debug, Copy, Clone, Eq, PartialEq)] pub struct HeadersFlag(u8); #[derive(Debug)] @@ -108,6 +108,16 @@ const ALL: u8 = END_STREAM // ===== impl Headers ===== impl Headers { + pub fn new(stream_id: StreamId, pseudo: Pseudo, fields: HeaderMap) -> Self { + Headers { + stream_id: stream_id, + stream_dep: None, + fields: fields, + pseudo: pseudo, + flags: HeadersFlag::default(), + } + } + pub fn load(head: Head, src: &mut Cursor, decoder: &mut hpack::Decoder) -> Result { @@ -162,6 +172,10 @@ impl Headers { }) } + pub fn is_end_headers(&self) -> bool { + self.flags.is_end_headers() + } + pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut) -> Option { @@ -210,6 +224,28 @@ impl From for Frame { } } +// ===== impl Pseudo ===== + +impl Pseudo { + pub fn request(method: Method, path: ByteStr) -> Self { + Pseudo { + method: Some(method), + scheme: None, + authority: None, + path: Some(path), + status: None, + } + } + + pub fn set_scheme(&mut self, scheme: ByteStr) { + self.scheme = Some(scheme); + } + + pub fn set_authority(&mut self, authority: ByteStr) { + self.authority = Some(authority); + } +} + // ===== impl Iter ===== impl Iterator for Iter { @@ -264,6 +300,10 @@ impl HeadersFlag { self.0 & END_STREAM == END_STREAM } + pub fn set_end_stream(&mut self) { + self.0 |= END_STREAM + } + pub fn is_end_headers(&self) -> bool { self.0 & END_HEADERS == END_HEADERS } @@ -277,6 +317,13 @@ impl HeadersFlag { } } +impl Default for HeadersFlag { + /// Returns a `HeadersFlag` value with `END_HEADERS` set. + fn default() -> Self { + HeadersFlag(END_HEADERS) + } +} + impl From for u8 { fn from(src: HeadersFlag) -> u8 { src.0 diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 9b70fc9..841af54 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -34,7 +34,7 @@ mod util; pub use self::data::Data; pub use self::head::{Head, Kind, StreamId}; -pub use self::headers::{Headers, PushPromise, Continuation}; +pub use self::headers::{Headers, PushPromise, Continuation, Pseudo}; pub use self::settings::{Settings, SettingSet}; // Re-export some constants diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 361c412..99bdeb7 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -81,6 +81,7 @@ impl Settings { // Ensure the payload length is correct, each setting is 6 bytes long. if payload.len() % 6 != 0 { + debug!("invalid settings payload length; len={:?}", payload.len()); return Err(Error::InvalidPayloadAckSettings); } diff --git a/src/lib.rs b/src/lib.rs index 46e7ace..086da68 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,14 +13,17 @@ extern crate http; // Buffer utilities extern crate bytes; -// Hash function used for HPACK encoding +// Hash function used for HPACK encoding and tracking stream states. extern crate fnv; +extern crate ordermap; + extern crate byteorder; #[macro_use] extern crate log; +pub mod client; pub mod error; pub mod hpack; pub mod proto; @@ -29,4 +32,42 @@ pub mod frame; mod util; pub use error::{ConnectionError, StreamError, Reason}; +pub use frame::{StreamId}; pub use proto::Connection; + +/// An H2 connection frame +pub enum Frame { + Message { + id: StreamId, + message: T, + body: bool, + }, + Body { + id: StreamId, + chunk: Option<()>, + }, + Error { + id: StreamId, + error: (), + } +} + +/// Either a Client or a Server +pub trait Peer { + /// Message type sent into the transport + type Send; + + /// Message type polled from the transport + type Poll; + + fn check_initiating_id(id: StreamId) -> Result<(), ConnectionError>; + + #[doc(hidden)] + fn convert_send_message( + id: StreamId, + message: Self::Send, + body: bool) -> proto::SendMessage; + + #[doc(hidden)] + fn convert_poll_message(message: proto::PollMessage) -> Frame; +} diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 3bd63b4..d9cd7ec 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,11 +1,24 @@ -use {frame, proto, ConnectionError}; +use {frame, Frame, ConnectionError, Peer, StreamId}; +use proto::{self, ReadySink, State}; + use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; +use http; + use futures::*; -pub struct Connection { +use ordermap::OrderMap; +use fnv::FnvHasher; + +use std::marker::PhantomData; +use std::hash::BuildHasherDefault; + +/// An H2 connection +pub struct Connection { inner: Inner, + streams: StreamMap, + peer: PhantomData

, } type Inner = @@ -15,72 +28,154 @@ type Inner = proto::FramedRead< length_delimited::FramedRead>>>>; -impl Connection { - pub fn new(io: T) -> Connection { - // Delimit the frames - let framed_read = length_delimited::Builder::new() - .big_endian() - .length_field_length(3) - .length_adjustment(6) - .num_skip(0) // Don't skip the header - .new_read(io); +type StreamMap = OrderMap>; - // Map to `Frame` types - let framed_read = proto::FramedRead::new(framed_read); +/// Returns a new `Connection` backed by the given `io`. +pub fn new(io: T) -> Connection + where T: AsyncRead + AsyncWrite, + P: Peer, +{ - // Frame encoder - let mut framed = proto::FramedWrite::new(framed_read); + // Delimit the frames + let framed_read = length_delimited::Builder::new() + .big_endian() + .length_field_length(3) + .length_adjustment(9) + .num_skip(0) // Don't skip the header + .new_read(io); - // Ok, so this is a **little** hacky, but it works for now. - // - // The ping/pong behavior SHOULD be given highest priority (6.7). - // However, the connection handshake requires the settings frame to be - // sent as the very first one. This needs special handling because - // otherwise there is a race condition where the peer could send its - // settings frame followed immediately by a Ping, in which case, we - // don't want to accidentally send the pong before finishing the - // connection hand shake. - // - // So, to ensure correct ordering, we write the settings frame here - // before fully constructing the connection struct. Technically, `Async` - // operations should not be performed in `new` because this might not - // happen on a task, however we have full control of the I/O and we know - // that the settings frame will get buffered and not actually perform an - // I/O op. - let initial_settings = frame::SettingSet::default(); - let frame = frame::Settings::new(initial_settings.clone()); - assert!(framed.start_send(frame.into()).unwrap().is_ready()); + // Map to `Frame` types + let framed_read = proto::FramedRead::new(framed_read); - // Add ping/pong handler - let ping_pong = proto::PingPong::new(framed); + // Frame encoder + let mut framed = proto::FramedWrite::new(framed_read); - // Add settings handler - let connection = proto::Settings::new(ping_pong, initial_settings); + // Ok, so this is a **little** hacky, but it works for now. + // + // The ping/pong behavior SHOULD be given highest priority (6.7). + // However, the connection handshake requires the settings frame to be + // sent as the very first one. This needs special handling because + // otherwise there is a race condition where the peer could send its + // settings frame followed immediately by a Ping, in which case, we + // don't want to accidentally send the pong before finishing the + // connection hand shake. + // + // So, to ensure correct ordering, we write the settings frame here + // before fully constructing the connection struct. Technically, `Async` + // operations should not be performed in `new` because this might not + // happen on a task, however we have full control of the I/O and we know + // that the settings frame will get buffered and not actually perform an + // I/O op. + let initial_settings = frame::SettingSet::default(); + let frame = frame::Settings::new(initial_settings.clone()); + assert!(framed.start_send(frame.into()).unwrap().is_ready()); - Connection { - inner: connection, + // Add ping/pong handler + let ping_pong = proto::PingPong::new(framed); + + // Add settings handler + let connection = proto::Settings::new(ping_pong, initial_settings); + + Connection { + inner: connection, + streams: StreamMap::default(), + peer: PhantomData, + } +} + +impl Connection + where T: AsyncRead + AsyncWrite, + P: Peer, +{ + /// Completes when the connection has terminated + pub fn poll_shutdown(&mut self) -> Poll<(), ConnectionError> { + try_ready!(self.poll_complete()); + Ok(Async::NotReady) + } +} + +impl Stream for Connection + where T: AsyncRead + AsyncWrite, + P: Peer, +{ + type Item = Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + use frame::Frame::*; + + match try_ready!(self.inner.poll()) { + Some(Headers(v)) => unimplemented!(), + Some(frame) => panic!("unexpected frame; frame={:?}", frame), + None => return Ok(Async::Ready(None)), + _ => unimplemented!(), } } } -impl Stream for Connection { - type Item = frame::Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - self.inner.poll() - } -} - -impl Sink for Connection { - type SinkItem = frame::Frame; +impl Sink for Connection + where T: AsyncRead + AsyncWrite, + P: Peer, +{ + type SinkItem = Frame; type SinkError = ConnectionError; - fn start_send(&mut self, item: frame::Frame) -> StartSend { - self.inner.start_send(item) + fn start_send(&mut self, item: Self::SinkItem) + -> StartSend + { + // First ensure that the upstream can process a new item + if !try!(self.poll_ready()).is_ready() { + return Ok(AsyncSink::NotReady(item)); + } + + match item { + Frame::Message { id, message, body } => { + // Ensure ID is valid + try!(P::check_initiating_id(id)); + + // TODO: Ensure available capacity for a new stream + // This won't be as simple as self.streams.len() as closed + // connections should not be factored. + + // Transition the stream state, creating a new entry if needed + try!(self.streams.entry(id) + .or_insert(State::default()) + .send_headers()); + + let message = P::convert_send_message(id, message, body); + + // TODO: Handle trailers and all that jazz + + // We already ensured that the upstream can handle the frame, so + // panic if it gets rejected. + let res = try!(self.inner.start_send(frame::Frame::Headers(message.frame))); + + // This is a one-way conversion. By checking `poll_ready` first, + // it's already been determined that the inner `Sink` can accept + // the item. If the item is rejected, then there is a bug. + assert!(res.is_ready()); + + Ok(AsyncSink::Ready) + } + Frame::Body { id, chunk } => { + unimplemented!(); + } + Frame::Error { id, error } => { + unimplemented!(); + } + } } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { self.inner.poll_complete() } } + +impl ReadySink for Connection + where T: AsyncRead + AsyncWrite, + P: Peer, +{ + fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { + self.inner.poll_ready() + } +} diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 0cfc1ab..384f83d 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -54,6 +54,13 @@ impl FramedRead { // TODO: Change to drain: carllerche/bytes#130 let frame = try!(frame::Headers::load(head, &mut buf, &mut self.hpack)); + + if !frame.is_end_headers() { + // Wait for continuation frames + self.partial = Some(Partial::Headers(frame)); + return Ok(None); + } + frame.into() } Kind::Priority => unimplemented!(), @@ -88,6 +95,7 @@ impl Stream for FramedRead }; if let Some(frame) = try!(self.decode_frame(bytes)) { + debug!("poll; frame={:?}", frame); return Ok(Async::Ready(Some(frame))); } } diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 85b2854..ca6dcbe 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -1,6 +1,6 @@ -use {ConnectionError, Reason}; +use {hpack, ConnectionError, Reason}; use frame::{self, Frame, Error}; -use hpack; +use proto::ReadySink; use futures::*; use tokio_io::AsyncWrite; @@ -67,7 +67,7 @@ impl FramedWrite { } fn is_empty(&self) -> bool { - self.next.is_none() && self.buf.has_remaining() + self.next.is_none() && !self.buf.has_remaining() } fn frame_len(&self, data: &frame::Data) -> usize { @@ -80,13 +80,10 @@ impl Sink for FramedWrite { type SinkError = ConnectionError; fn start_send(&mut self, item: Frame) -> StartSend { - if self.has_capacity() { - // Try flushing - try!(self.poll_complete()); + debug!("start_send; frame={:?}", item); - if self.has_capacity() { - return Ok(AsyncSink::NotReady(item)); - } + if !try!(self.poll_ready()).is_ready() { + return Ok(AsyncSink::NotReady(item)); } match item { @@ -117,6 +114,7 @@ impl Sink for FramedWrite { } Frame::Settings(v) => { v.encode(self.buf.get_mut()); + trace!("encoded settings; rem={:?}", self.buf.remaining()); } } @@ -124,6 +122,8 @@ impl Sink for FramedWrite { } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + trace!("FramedWrite::poll_complete"); + // TODO: implement match self.next { Some(Next::Data { .. }) => unimplemented!(), @@ -132,9 +132,14 @@ impl Sink for FramedWrite { // As long as there is data to write, try to write it! while !self.is_empty() { + trace!("writing buffer; next={:?}; rem={:?}", self.next, self.buf.remaining()); try_ready!(self.inner.write_buf(&mut self.buf)); } + trace!("flushing buffer"); + // Flush the upstream + try_nb!(self.inner.flush()); + // Clear internal buffer self.buf.set_position(0); self.buf.get_mut().clear(); @@ -148,6 +153,21 @@ impl Sink for FramedWrite { } } +impl ReadySink for FramedWrite { + fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { + if !self.has_capacity() { + // Try flushing + try!(self.poll_complete()); + + if !self.has_capacity() { + return Ok(Async::NotReady); + } + } + + Ok(Async::Ready(())) + } +} + impl Stream for FramedWrite { type Item = T::Item; type Error = T::Error; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index b64c6c4..8a17233 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -2,10 +2,34 @@ mod connection; mod framed_read; mod framed_write; mod ping_pong; +mod ready; mod settings; +mod state; -pub use self::connection::Connection; +pub use self::connection::{Connection, new as new_connection}; pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; pub use self::ping_pong::PingPong; +pub use self::ready::ReadySink; pub use self::settings::Settings; +pub use self::state::State; + +use frame; + +/// A request or response issued by the current process. +pub struct SendMessage { + frame: frame::Headers, +} + +/// A request or response received by the current process. +pub struct PollMessage { + frame: frame::Headers, +} + +impl SendMessage { + pub fn new(frame: frame::Headers) -> Self { + SendMessage { + frame: frame, + } + } +} diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index 1f7c77c..2fba80d 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,5 +1,6 @@ use ConnectionError; use frame::Frame; +use proto::ReadySink; use futures::*; @@ -45,3 +46,13 @@ impl Sink for PingPong self.inner.poll_complete() } } + +impl ReadySink for PingPong + where T: Stream, + T: Sink, + T: ReadySink, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } +} diff --git a/src/proto/ready.rs b/src/proto/ready.rs new file mode 100644 index 0000000..57e0d80 --- /dev/null +++ b/src/proto/ready.rs @@ -0,0 +1,5 @@ +use futures::{Sink, Poll}; + +pub trait ReadySink: Sink { + fn poll_ready(&mut self) -> Poll<(), Self::SinkError>; +} diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 2024331..fb1e8f3 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,16 +1,22 @@ use ConnectionError; use frame::{self, Frame}; +use proto::ReadySink; use futures::*; pub struct Settings { + // Upstream transport inner: T, + // Our settings local: frame::SettingSet, + // Peer settings remote: frame::SettingSet, + // Number of acks remaining to send to the peer remaining_acks: usize, + // True when the local settings must be flushed to the remote is_dirty: bool, } @@ -30,17 +36,13 @@ impl Settings local: local, remote: frame::SettingSet::default(), remaining_acks: 0, - is_dirty: false, + is_dirty: true, } } - fn has_pending_sends(&self) -> bool { - self.is_dirty || self.remaining_acks > 0 - } - fn try_send_pending(&mut self) -> Poll<(), ConnectionError> { if self.is_dirty { - let frame = frame::Settings::new(self.local.clone()).into(); + let frame = frame::Settings::new(self.local.clone()); try_ready!(self.try_send(frame)); self.is_dirty = false; @@ -56,8 +58,8 @@ impl Settings Ok(Async::Ready(())) } - fn try_send(&mut self, item: frame::Frame) -> Poll<(), ConnectionError> { - if let AsyncSink::NotReady(_) = try!(self.inner.start_send(item)) { + fn try_send(&mut self, item: frame::Settings) -> Poll<(), ConnectionError> { + if let AsyncSink::NotReady(_) = try!(self.inner.start_send(item.into())) { // Ensure that call to `poll_complete` guarantee is called to satisfied try!(self.inner.poll_complete()); @@ -110,3 +112,17 @@ impl Sink for Settings self.inner.close() } } + +impl ReadySink for Settings + where T: Stream, + T: Sink, + T: ReadySink, +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + if try!(self.try_send_pending()).is_ready() { + return self.inner.poll_ready(); + } + + Ok(Async::NotReady) + } +} diff --git a/src/proto/state.rs b/src/proto/state.rs new file mode 100644 index 0000000..4e5a1c9 --- /dev/null +++ b/src/proto/state.rs @@ -0,0 +1,72 @@ +use ConnectionError; + +/// Represents the state of an H2 stream +/// +/// ```not_rust +/// +--------+ +/// send PP | | recv PP +/// ,--------| idle |--------. +/// / | | \ +/// v +--------+ v +/// +----------+ | +----------+ +/// | | | send H / | | +/// ,------| reserved | | recv H | reserved |------. +/// | | (local) | | | (remote) | | +/// | +----------+ v +----------+ | +/// | | +--------+ | | +/// | | recv ES | | send ES | | +/// | send H | ,-------| open |-------. | recv H | +/// | | / | | \ | | +/// | v v +--------+ v v | +/// | +----------+ | +----------+ | +/// | | half | | | half | | +/// | | closed | | send R / | closed | | +/// | | (remote) | | recv R | (local) | | +/// | +----------+ | +----------+ | +/// | | | | | +/// | | send ES / | recv ES / | | +/// | | send R / v send R / | | +/// | | recv R +--------+ recv R | | +/// | send R / `----------->| |<-----------' send R / | +/// | recv R | closed | recv R | +/// `----------------------->| |<----------------------' +/// +--------+ +/// +/// send: endpoint sends this frame +/// recv: endpoint receives this frame +/// +/// H: HEADERS frame (with implied CONTINUATIONs) +/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) +/// ES: END_STREAM flag +/// R: RST_STREAM frame +/// ``` +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum State { + Idle, + ReservedLocal, + ReservedRemote, + Open, + HalfClosedLocal, + HalfClosedRemote, + Closed, +} + +impl State { + /// Transition the state to represent headers being sent. + /// + /// Returns an error if this is an invalid state transition. + pub fn send_headers(&mut self) -> Result<(), ConnectionError> { + if *self != State::Idle { + unimplemented!(); + } + + *self = State::Open; + Ok(()) + } +} + +impl Default for State { + fn default() -> State { + State::Idle + } +}