diff --git a/src/client.rs b/src/client.rs index d0f8d85..c01e577 100644 --- a/src/client.rs +++ b/src/client.rs @@ -13,6 +13,7 @@ pub struct Handshake { } /// Marker type indicating a client peer +#[derive(Debug)] pub struct Client; pub type Connection = super::Connection; @@ -29,9 +30,12 @@ pub fn bind(io: T) -> Handshake debug!("binding client connection"); let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") - .map(|(io, _)| { + .then(|res| { + let (io, _) = res.unwrap(); debug!("client connection bound"); - proto::new_connection(io) + + // Use default local settings for now + proto::Handshake::new(io, Default::default()) }) .map_err(ConnectionError::from); @@ -55,19 +59,23 @@ impl Peer for Client { fn convert_send_message( id: StreamId, - message: Self::Send, - body: bool) -> proto::SendMessage + headers: Self::Send, + end_of_stream: bool) -> frame::Headers { use http::request::Head; // Extract the components of the HTTP request - let Head { method, uri, headers, .. } = message; + let Head { method, uri, headers, .. } = headers; + + // TODO: Ensure that the version is set to H2 // Build the set pseudo header set. All requests will include `method` // and `path`. let mut pseudo = frame::Pseudo::request(method, uri.path().into()); // If the URI includes a scheme component, add it to the pseudo headers + // + // TODO: Scheme must be set... if let Some(scheme) = uri.scheme() { pseudo.set_scheme(scheme.into()); } @@ -81,18 +89,14 @@ impl Peer for Client { // Create the HEADERS frame let mut frame = frame::Headers::new(id, pseudo, headers); - // TODO: Factor in trailers - if !body { - // frame.set_end_stream(); - } else { - unimplemented!(); + if end_of_stream { + frame.set_end_stream() } - // Return the `SendMessage` - proto::SendMessage::new(frame) + frame } - fn convert_poll_message(message: proto::PollMessage) -> Frame { + fn convert_poll_message(headers: frame::Headers) -> Frame { unimplemented!(); } } diff --git a/src/frame/go_away.rs b/src/frame/go_away.rs new file mode 100644 index 0000000..4d0145a --- /dev/null +++ b/src/frame/go_away.rs @@ -0,0 +1,26 @@ +use frame::Error; +use super::{head, StreamId}; + +#[derive(Debug)] +pub struct GoAway { + last_stream_id: StreamId, + error_code: u32, +} + +impl GoAway { + pub fn load(payload: &[u8]) -> Result { + if payload.len() < 8 { + // Invalid payload len + // TODO: Handle error + unimplemented!(); + } + + let last_stream_id = head::parse_stream_id(&payload[..4]); + let error_code = unpack_octets_4!(payload, 4, u32); + + Ok(GoAway { + last_stream_id: last_stream_id, + error_code: error_code, + }) + } +} diff --git a/src/frame/head.rs b/src/frame/head.rs index 0dd57e4..b7b0c6a 100644 --- a/src/frame/head.rs +++ b/src/frame/head.rs @@ -81,7 +81,8 @@ impl Head { /// octet is ignored and the rest interpreted as a network-endian 31-bit /// integer. #[inline] -fn parse_stream_id(buf: &[u8]) -> StreamId { +pub fn parse_stream_id(buf: &[u8]) -> StreamId { + /// TODO: Move this onto the StreamId type? let unpacked = unpack_octets_4!(buf, 0, u32); // Now clear the most significant bit, as that is reserved and MUST be ignored when received. unpacked & !STREAM_ID_MASK diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 64119ae..4998069 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -176,6 +176,10 @@ impl Headers { self.flags.is_end_headers() } + pub fn set_end_stream(&mut self) { + self.flags.set_end_stream() + } + pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut) -> Option { @@ -208,13 +212,13 @@ impl Headers { let len = (dst.len() - pos) - frame::HEADER_LEN; // Write the frame length - BigEndian::write_u32(&mut dst[pos..pos+3], len as u32); + BigEndian::write_uint(&mut dst[pos..pos+3], len as u64, 3); ret } fn head(&self) -> Head { - Head::new(Kind::Data, self.flags.into(), self.stream_id) + Head::new(Kind::Headers, self.flags.into(), self.stream_id) } } diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 841af54..48cc089 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -18,6 +18,7 @@ use std::io; /// ``` #[macro_escape] macro_rules! unpack_octets_4 { + // TODO: Get rid of this macro ($buf:expr, $offset:expr, $tip:ty) => ( (($buf[$offset + 0] as $tip) << 24) | (($buf[$offset + 1] as $tip) << 16) | @@ -27,12 +28,14 @@ macro_rules! unpack_octets_4 { } mod data; +mod go_away; mod head; mod headers; mod settings; mod util; pub use self::data::Data; +pub use self::go_away::GoAway; pub use self::head::{Head, Kind, StreamId}; pub use self::headers::{Headers, PushPromise, Continuation, Pseudo}; pub use self::settings::{Settings, SettingSet}; diff --git a/src/frame/settings.rs b/src/frame/settings.rs index c6507f3..0d81d8f 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -22,6 +22,7 @@ pub struct SettingSet { /// frame. /// /// Each setting has a value that is a 32 bit unsigned integer (6.5.1.). +#[derive(Debug)] pub enum Setting { HeaderTableSize(u32), EnablePush(u32), @@ -134,10 +135,15 @@ impl Settings { let head = Head::new(Kind::Settings, self.flags.into(), 0); let payload_len = self.payload_len(); + trace!("encoding SETTINGS; len={}", payload_len); + head.encode(payload_len, dst); // Encode the settings - self.for_each(|setting| setting.encode(dst)); + self.for_each(|setting| { + trace!("encoding setting; val={:?}", setting); + setting.encode(dst) + }); } fn for_each(&self, mut f: F) { diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index 91941e9..4faf690 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -10,6 +10,7 @@ use std::io::Cursor; use std::collections::VecDeque; /// Decodes headers using HPACK +#[derive(Debug)] pub struct Decoder { // Protocol indicated that the max table size will update max_size_update: Option, @@ -127,6 +128,7 @@ enum Representation { SizeUpdate, } +#[derive(Debug)] struct Table { entries: VecDeque
, size: usize, diff --git a/src/lib.rs b/src/lib.rs index 5f29153..612fe58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![allow(warnings)] +#[macro_use] extern crate futures; #[macro_use] @@ -38,19 +39,28 @@ pub use proto::Connection; /// An H2 connection frame #[derive(Debug)] pub enum Frame { - Message { + Headers { id: StreamId, - message: T, - body: bool, + headers: T, + end_of_stream: bool, }, Body { id: StreamId, - chunk: Option<()>, + chunk: (), + end_of_stream: bool, + }, + Trailers { + id: StreamId, + headers: (), + }, + PushPromise { + id: StreamId, + promise: (), }, Error { id: StreamId, error: (), - } + }, } /// Either a Client or a Server @@ -66,9 +76,9 @@ pub trait Peer { #[doc(hidden)] fn convert_send_message( id: StreamId, - message: Self::Send, - body: bool) -> proto::SendMessage; + headers: Self::Send, + end_of_stream: bool) -> frame::Headers; #[doc(hidden)] - fn convert_poll_message(message: proto::PollMessage) -> Frame; + fn convert_poll_message(headers: frame::Headers) -> Frame; } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index d9cd7ec..cdf6451 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,10 +1,10 @@ use {frame, Frame, ConnectionError, Peer, StreamId}; +use client::Client; use proto::{self, ReadySink, State}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::length_delimited; -use http; +use http::{self, request, response}; use futures::*; @@ -15,74 +15,28 @@ use std::marker::PhantomData; use std::hash::BuildHasherDefault; /// An H2 connection +#[derive(Debug)] pub struct Connection { - inner: Inner, + inner: proto::Inner, streams: StreamMap, peer: PhantomData

, } -type Inner = - proto::Settings< - proto::PingPong< - proto::FramedWrite< - proto::FramedRead< - length_delimited::FramedRead>>>>; - -type StreamMap = OrderMap>; - -/// Returns a new `Connection` backed by the given `io`. -pub fn new(io: T) -> Connection +impl From> for Connection where T: AsyncRead + AsyncWrite, P: Peer, { - - // Delimit the frames - let framed_read = length_delimited::Builder::new() - .big_endian() - .length_field_length(3) - .length_adjustment(9) - .num_skip(0) // Don't skip the header - .new_read(io); - - // Map to `Frame` types - let framed_read = proto::FramedRead::new(framed_read); - - // Frame encoder - let mut framed = proto::FramedWrite::new(framed_read); - - // Ok, so this is a **little** hacky, but it works for now. - // - // The ping/pong behavior SHOULD be given highest priority (6.7). - // However, the connection handshake requires the settings frame to be - // sent as the very first one. This needs special handling because - // otherwise there is a race condition where the peer could send its - // settings frame followed immediately by a Ping, in which case, we - // don't want to accidentally send the pong before finishing the - // connection hand shake. - // - // So, to ensure correct ordering, we write the settings frame here - // before fully constructing the connection struct. Technically, `Async` - // operations should not be performed in `new` because this might not - // happen on a task, however we have full control of the I/O and we know - // that the settings frame will get buffered and not actually perform an - // I/O op. - let initial_settings = frame::SettingSet::default(); - let frame = frame::Settings::new(initial_settings.clone()); - assert!(framed.start_send(frame.into()).unwrap().is_ready()); - - // Add ping/pong handler - let ping_pong = proto::PingPong::new(framed); - - // Add settings handler - let connection = proto::Settings::new(ping_pong, initial_settings); - - Connection { - inner: connection, - streams: StreamMap::default(), - peer: PhantomData, + fn from(src: proto::Inner) -> Self { + Connection { + inner: src, + streams: StreamMap::default(), + peer: PhantomData, + } } } +type StreamMap = OrderMap>; + impl Connection where T: AsyncRead + AsyncWrite, P: Peer, @@ -94,6 +48,23 @@ impl Connection } } +impl Connection + where T: AsyncRead + AsyncWrite, +{ + pub fn send_request(self, + id: StreamId, // TODO: Generate one internally? + request: request::Head, + end_of_stream: bool) + -> sink::Send + { + self.send(Frame::Headers { + id: id, + headers: request, + end_of_stream: end_of_stream, + }) + } +} + impl Stream for Connection where T: AsyncRead + AsyncWrite, P: Peer, @@ -104,8 +75,15 @@ impl Stream for Connection fn poll(&mut self) -> Poll, ConnectionError> { use frame::Frame::*; + // Because receiving new frames may depend on ensuring that the write + // buffer is clear, `poll_complete` is called here. + let _ = try!(self.poll_complete()); + match try_ready!(self.inner.poll()) { - Some(Headers(v)) => unimplemented!(), + Some(Headers(v)) => { + debug!("poll; frame={:?}", v); + unimplemented!(); + } Some(frame) => panic!("unexpected frame; frame={:?}", frame), None => return Ok(Async::Ready(None)), _ => unimplemented!(), @@ -129,7 +107,7 @@ impl Sink for Connection } match item { - Frame::Message { id, message, body } => { + Frame::Headers { id, headers, end_of_stream } => { // Ensure ID is valid try!(P::check_initiating_id(id)); @@ -138,17 +116,18 @@ impl Sink for Connection // connections should not be factored. // Transition the stream state, creating a new entry if needed + // + // TODO: Response can send multiple headers frames before body + // (1xx responses). try!(self.streams.entry(id) .or_insert(State::default()) .send_headers()); - let message = P::convert_send_message(id, message, body); - - // TODO: Handle trailers and all that jazz + let frame = P::convert_send_message(id, headers, end_of_stream); // We already ensured that the upstream can handle the frame, so // panic if it gets rejected. - let res = try!(self.inner.start_send(frame::Frame::Headers(message.frame))); + let res = try!(self.inner.start_send(frame::Frame::Headers(frame))); // This is a one-way conversion. By checking `poll_ready` first, // it's already been determined that the inner `Sink` can accept @@ -157,7 +136,13 @@ impl Sink for Connection Ok(AsyncSink::Ready) } - Frame::Body { id, chunk } => { + Frame::Trailers { id, headers } => { + unimplemented!(); + } + Frame::Body { id, chunk, end_of_stream } => { + unimplemented!(); + } + Frame::PushPromise { id, promise } => { unimplemented!(); } Frame::Error { id, error } => { diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 384f83d..7ff30b2 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -9,6 +9,7 @@ use bytes::{Bytes, BytesMut, Buf}; use std::io::{self, Write, Cursor}; +#[derive(Debug)] pub struct FramedRead { inner: T, @@ -19,6 +20,7 @@ pub struct FramedRead { } /// Partially loaded headers frame +#[derive(Debug)] enum Partial { Headers(frame::Headers), PushPromise(frame::PushPromise), @@ -71,9 +73,15 @@ impl FramedRead { } Kind::PushPromise => unimplemented!(), Kind::Ping => unimplemented!(), - Kind::GoAway => unimplemented!(), + Kind::GoAway => { + let frame = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..])); + debug!("decoded; frame={:?}", frame); + unimplemented!(); + } Kind::WindowUpdate => unimplemented!(), - Kind::Continuation => unimplemented!(), + Kind::Continuation => { + unimplemented!(); + } Kind::Unknown => return Ok(None), }; diff --git a/src/proto/handshake.rs b/src/proto/handshake.rs new file mode 100644 index 0000000..9797189 --- /dev/null +++ b/src/proto/handshake.rs @@ -0,0 +1,131 @@ +use {ConnectionError, Peer}; +use frame::{self, Frame}; +use proto::{self, Connection}; + +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::length_delimited; + +use futures::{Future, Sink, Stream, Poll, Async, AsyncSink}; + +use std::marker::PhantomData; + +/// Implements the settings component of the initial H2 handshake +pub struct Handshake { + // Upstream transport + inner: Option>, + + // True when the local settings have been sent + settings_sent: bool, + + // Peer + peer: PhantomData

, +} + +struct Inner { + // Upstream transport + framed: proto::Framed, + + // Our settings + local: frame::SettingSet, +} + +impl Handshake + where T: AsyncRead + AsyncWrite, +{ + /// Initiate an HTTP/2.0 handshake. + pub fn new(io: T, local: frame::SettingSet) -> Self { + // Delimit the frames + let framed_read = length_delimited::Builder::new() + .big_endian() + .length_field_length(3) + .length_adjustment(9) + .num_skip(0) // Don't skip the header + .new_read(io); + + // Map to `Frame` types + let framed_read = proto::FramedRead::new(framed_read); + + // Frame encoder + let mut framed = proto::FramedWrite::new(framed_read); + + Handshake { + inner: Some(Inner { + framed: framed, + local: local, + }), + settings_sent: false, + peer: PhantomData, + } + } + + /// Returns a reference to the local settings. + /// + /// # Panics + /// + /// Panics if `HandshakeInner` has already been consumed. + fn local(&self) -> &frame::SettingSet { + &self.inner.as_ref().unwrap().local + } + + /// Returns a mutable reference to `HandshakeInner`. + /// + /// # Panics + /// + /// Panics if `HandshakeInner` has already been consumed. + fn inner_mut(&mut self) -> &mut proto::Framed { + &mut self.inner.as_mut().unwrap().framed + } +} + +// Either a client or server. satisfied when we have sent a SETTINGS frame and +// have sent an ACK for the remote's settings. +impl Future for Handshake + where T: AsyncRead + AsyncWrite, + P: Peer, +{ + type Item = Connection; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll { + if !self.settings_sent { + let frame = frame::Settings::new(self.local().clone()).into(); + + if let AsyncSink::NotReady(_) = try!(self.inner_mut().start_send(frame)) { + // This shouldn't really happen, but if it does, try again + // later. + return Ok(Async::NotReady); + } + + // Try flushing... + try!(self.inner_mut().poll_complete()); + + self.settings_sent = true; + } + + match try_ready!(self.inner_mut().poll()) { + Some(Frame::Settings(v)) => { + if v.is_ack() { + // TODO: unexpected ACK, protocol error + unimplemented!(); + } else { + let remote = v.into_set(); + let inner = self.inner.take().unwrap(); + + // Add ping/pong handler + let ping_pong = proto::PingPong::new(inner.framed); + + // Add settings handler + let settings = proto::Settings::new( + ping_pong, inner.local, remote); + + // Finally, convert to the `Connection` + let connection = settings.into(); + + return Ok(Async::Ready(connection)); + } + } + // TODO: handle handshake failure + _ => unimplemented!(), + } + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 8a17233..91f51c9 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,35 +1,30 @@ mod connection; mod framed_read; mod framed_write; +mod handshake; mod ping_pong; mod ready; mod settings; mod state; -pub use self::connection::{Connection, new as new_connection}; +pub use self::connection::{Connection}; pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; +pub use self::handshake::Handshake; pub use self::ping_pong::PingPong; pub use self::ready::ReadySink; pub use self::settings::Settings; pub use self::state::State; -use frame; +use tokio_io::codec::length_delimited; -/// A request or response issued by the current process. -pub struct SendMessage { - frame: frame::Headers, -} +/// Base HTTP/2.0 transport. Only handles framing. +type Framed = + FramedWrite< + FramedRead< + length_delimited::FramedRead>>; -/// A request or response received by the current process. -pub struct PollMessage { - frame: frame::Headers, -} - -impl SendMessage { - pub fn new(frame: frame::Headers) -> Self { - SendMessage { - frame: frame, - } - } -} +type Inner = + Settings< + PingPong< + Framed>>; diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index 2fba80d..ea4b4fe 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -4,6 +4,7 @@ use proto::ReadySink; use futures::*; +#[derive(Debug)] pub struct PingPong { inner: T, } diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 4ef687c..17de84f 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -4,6 +4,7 @@ use proto::ReadySink; use futures::*; +#[derive(Debug)] pub struct Settings { // Upstream transport inner: T, @@ -21,22 +22,17 @@ pub struct Settings { is_dirty: bool, } -/* - * TODO: - * - Settings ack timeout for connection error - */ - impl Settings where T: Stream, T: Sink, { - pub fn new(inner: T, local: frame::SettingSet) -> Settings { + pub fn new(inner: T, local: frame::SettingSet, remote: frame::SettingSet) -> Settings { Settings { inner: inner, local: local, - remote: frame::SettingSet::default(), - remaining_acks: 0, - is_dirty: true, + remote: remote, + remaining_acks: 1, + is_dirty: false, } } @@ -60,7 +56,8 @@ impl Settings fn try_send(&mut self, item: frame::Settings) -> Poll<(), ConnectionError> { if let AsyncSink::NotReady(_) = try!(self.inner.start_send(item.into())) { - // Ensure that call to `poll_complete` guarantee is called to satisfied + // TODO: I don't think this is needed actually... It was originally + // done to "satisfy the start_send" contract... try!(self.inner.poll_complete()); return Ok(Async::NotReady); @@ -117,14 +114,12 @@ impl Sink for Settings } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + try_ready!(self.try_send_pending()); self.inner.poll_complete() } fn close(&mut self) -> Poll<(), ConnectionError> { - if !try!(self.try_send_pending()).is_ready() { - return Ok(Async::NotReady); - } - + try_ready!(self.try_send_pending()); self.inner.close() } } diff --git a/src/settings.rs b/src/settings.rs deleted file mode 100644 index e69de29..0000000