From 981af88838b7e080a399cb2c0ac48506192f121e Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sat, 8 Jul 2017 12:34:29 -0700 Subject: [PATCH] Get data frames working --- examples/server.rs | 16 +++++- src/client.rs | 24 ++++++--- src/error.rs | 99 ++++++++++++++++++++++++++++------ src/frame/data.rs | 62 ++++++++++++++++------ src/frame/mod.rs | 6 ++- src/frame/ping.rs | 4 +- src/frame/settings.rs | 4 +- src/lib.rs | 8 +-- src/proto/connection.rs | 84 +++++++++++++++++++++++------ src/proto/framed_read.rs | 7 +-- src/proto/framed_write.rs | 53 +++++++++++------- src/proto/mod.rs | 28 ++++++---- src/proto/ping_pong.rs | 28 +++++----- src/proto/settings.rs | 24 +++++---- src/proto/state.rs | 88 ++++++++++++++++++++++++++---- src/server.rs | 25 ++++++--- tests/client_request.rs | 109 +++++++++++++++++++++++++++++++++++++- 17 files changed, 523 insertions(+), 146 deletions(-) diff --git a/examples/server.rs b/examples/server.rs index 373e065..bcd1f54 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -43,9 +43,21 @@ pub fn main() { println!("Zomg frame; {:?}", frame); let mut response = response::Head::default(); - response.status = status::NO_CONTENT; + response.status = status::OK; - conn.send_response(1.into(), response, true) + conn.send_response(1.into(), response, false) + }) + .then(|res| { + let conn = res.unwrap(); + println!("... sending data frame"); + + conn.send_data(1.into(), "hello".into(), false) + }) + .then(|res| { + let conn = res.unwrap(); + println!("... sending next frame"); + + conn.send_data(1.into(), "world".into(), true) }) }) .then(|res| { diff --git a/src/client.rs b/src/client.rs index 4568885..bea78b3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,26 +3,33 @@ use {frame, proto, Peer, ConnectionError, StreamId}; use http; use futures::{Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; +use bytes::{Bytes, IntoBuf}; use std::fmt; /// In progress H2 connection binding -pub struct Handshake { +pub struct Handshake { // TODO: unbox - inner: Box, Error = ConnectionError>>, + inner: Box, Error = ConnectionError>>, } /// Marker type indicating a client peer #[derive(Debug)] pub struct Client; -pub type Connection = super::Connection; +pub type Connection = super::Connection; + +pub fn handshake(io: T) -> Handshake + where T: AsyncRead + AsyncWrite + 'static, +{ + handshake2(io) +} /// Bind an H2 client connection. /// /// Returns a future which resolves to the connection value once the H2 /// handshake has been completed. -pub fn handshake(io: T) -> Handshake +pub fn handshake2(io: T) -> Handshake where T: AsyncRead + AsyncWrite + 'static, { use tokio_io::io; @@ -97,8 +104,8 @@ impl Peer for Client { } } -impl Future for Handshake { - type Item = Connection; +impl Future for Handshake { + type Item = Connection; type Error = ConnectionError; fn poll(&mut self) -> Poll { @@ -106,7 +113,10 @@ impl Future for Handshake { } } -impl fmt::Debug for Handshake { +impl fmt::Debug for Handshake + where T: fmt::Debug, + B: fmt::Debug + IntoBuf, +{ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "client::Handshake") } diff --git a/src/error.rs b/src/error.rs index 8028054..606c3d9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,14 +3,24 @@ use std::{error, fmt, io}; /// The error type for HTTP/2 operations #[derive(Debug)] pub enum ConnectionError { - /// The HTTP/2 stream was reset + /// An error caused by an action taken by the remote peer. + /// + /// This is either an error received by the peer or caused by an invalid + /// action taken by the peer (i.e. a protocol error). Proto(Reason), + /// An `io::Error` occurred while trying to read or write. Io(io::Error), + + /// An error resulting from an invalid action taken by the user of this + /// library. + User(User), + + // TODO: reserve additional variants } #[derive(Debug)] -pub struct StreamError(Reason); +pub struct Stream(Reason); #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum Reason { @@ -29,27 +39,60 @@ pub enum Reason { InadequateSecurity, Http11Required, Other(u32), + // TODO: reserve additional variants +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum User { + /// The specified stream ID is invalid. + /// + /// For example, using a stream ID reserved for a push promise from the + /// client or using a non-zero stream ID for settings. + InvalidStreamId, + + /// The stream ID is no longer accepting frames. + InactiveStreamId, + + /// The stream is not currently expecting a frame of this type. + UnexpectedFrameType, + + // TODO: reserve additional variants } macro_rules! reason_desc { ($reason:expr) => (reason_desc!($reason, "")); ($reason:expr, $prefix:expr) => ({ + use self::Reason::*; + match $reason { - Reason::NoError => concat!($prefix, "not a result of an error"), - Reason::ProtocolError => concat!($prefix, "unspecific protocol error detected"), - Reason::InternalError => concat!($prefix, "unexpected internal error encountered"), - Reason::FlowControlError => concat!($prefix, "flow-control protocol violated"), - Reason::SettingsTimeout => concat!($prefix, "settings ACK not received in timely manner"), - Reason::StreamClosed => concat!($prefix, "received frame when stream half-closed"), - Reason::FrameSizeError => concat!($prefix, "frame sent with invalid size"), - Reason::RefusedStream => concat!($prefix, "refused stream before processing any application logic"), - Reason::Cancel => concat!($prefix, "stream no longer needed"), - Reason::CompressionError => concat!($prefix, "unable to maintain the header compression context"), - Reason::ConnectError => concat!($prefix, "connection established in response to a CONNECT request was reset or abnormally closed"), - Reason::EnhanceYourCalm => concat!($prefix, "detected excessive load generating behavior"), - Reason::InadequateSecurity => concat!($prefix, "security properties do not meet minimum requirements"), - Reason::Http11Required => concat!($prefix, "endpoint requires HTTP/1.1"), - Reason::Other(_) => concat!($prefix, "other reason"), + NoError => concat!($prefix, "not a result of an error"), + ProtocolError => concat!($prefix, "unspecific protocol error detected"), + InternalError => concat!($prefix, "unexpected internal error encountered"), + FlowControlError => concat!($prefix, "flow-control protocol violated"), + SettingsTimeout => concat!($prefix, "settings ACK not received in timely manner"), + StreamClosed => concat!($prefix, "received frame when stream half-closed"), + FrameSizeError => concat!($prefix, "frame sent with invalid size"), + RefusedStream => concat!($prefix, "refused stream before processing any application logic"), + Cancel => concat!($prefix, "stream no longer needed"), + CompressionError => concat!($prefix, "unable to maintain the header compression context"), + ConnectError => concat!($prefix, "connection established in response to a CONNECT request was reset or abnormally closed"), + EnhanceYourCalm => concat!($prefix, "detected excessive load generating behavior"), + InadequateSecurity => concat!($prefix, "security properties do not meet minimum requirements"), + Http11Required => concat!($prefix, "endpoint requires HTTP/1.1"), + Other(_) => concat!($prefix, "other reason (ain't no tellin')"), + } + }); +} + +macro_rules! user_desc { + ($reason:expr) => (user_desc!($reason, "")); + ($reason:expr, $prefix:expr) => ({ + use self::User::*; + + match $reason { + InvalidStreamId => concat!($prefix, "invalid stream ID"), + InactiveStreamId => concat!($prefix, "inactive stream ID"), + UnexpectedFrameType => concat!($prefix, "unexpected frame type"), } }); } @@ -68,6 +111,12 @@ impl From for ConnectionError { } } +impl From for ConnectionError { + fn from(src: User) -> ConnectionError { + ConnectionError::User(src) + } +} + impl From for io::Error { fn from(src: ConnectionError) -> io::Error { io::Error::new(io::ErrorKind::Other, src) @@ -81,6 +130,7 @@ impl fmt::Display for ConnectionError { match *self { Proto(reason) => write!(fmt, "protocol error: {}", reason), Io(ref e) => fmt::Display::fmt(e, fmt), + User(e) => write!(fmt, "user error: {}", e), } } } @@ -92,6 +142,7 @@ impl error::Error for ConnectionError { match *self { Io(ref e) => error::Error::description(e), Proto(reason) => reason_desc!(reason, "protocol error: "), + User(user) => user_desc!(user, "user error: "), } } } @@ -157,3 +208,17 @@ impl fmt::Display for Reason { write!(fmt, "{}", self.description()) } } + +// ===== impl User ===== + +impl User { + pub fn description(&self) -> &str { + user_desc!(*self) + } +} + +impl fmt::Display for User { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} diff --git a/src/frame/data.rs b/src/frame/data.rs index 3aef88f..6e3eea3 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -1,10 +1,10 @@ use frame::{util, Frame, Head, Error, StreamId, Kind}; -use bytes::{BufMut, Bytes}; +use bytes::{BufMut, Bytes, Buf}; #[derive(Debug)] -pub struct Data { +pub struct Data { stream_id: StreamId, - data: Bytes, + data: T, flags: DataFlag, pad_len: Option, } @@ -16,8 +16,8 @@ const END_STREAM: u8 = 0x1; const PADDED: u8 = 0x8; const ALL: u8 = END_STREAM | PADDED; -impl Data { - pub fn load(head: Head, mut payload: Bytes) -> Result { +impl Data { + pub fn load(head: Head, mut payload: Bytes) -> Result { let flags = DataFlag::load(head.flag()); let pad_len = if flags.is_padded() { @@ -34,35 +34,56 @@ impl Data { pad_len: pad_len, }) } +} + +impl Data { + pub fn new(stream_id: StreamId, data: T) -> Self { + Data { + stream_id: stream_id, + data: data, + flags: DataFlag::default(), + pad_len: None, + } + } pub fn stream_id(&self) -> StreamId { self.stream_id } - pub fn len(&self) -> usize { - self.data.len() - } - pub fn is_end_stream(&self) -> bool { self.flags.is_end_stream() } - pub fn encode(&self, dst: &mut T) { - self.head().encode(self.len(), dst); - dst.put(&self.data); + pub fn set_end_stream(&mut self) { + self.flags.set_end_stream() } pub fn head(&self) -> Head { Head::new(Kind::Data, self.flags.into(), self.stream_id) } - pub fn into_payload(self) -> Bytes { + pub fn into_payload(self) -> T { self.data } } -impl From for Frame { - fn from(src: Data) -> Frame { +impl Data { + pub fn len(&self) -> usize { + self.data.remaining() + } + + pub fn encode_chunk(&mut self, dst: &mut U) { + if self.len() > dst.remaining_mut() { + unimplemented!(); + } + + self.head().encode(self.len(), dst); + dst.put(&mut self.data); + } +} + +impl From> for Frame { + fn from(src: Data) -> Self { Frame::Data(src) } } @@ -86,11 +107,22 @@ impl DataFlag { self.0 & END_STREAM == END_STREAM } + pub fn set_end_stream(&mut self) { + self.0 |= END_STREAM + } + pub fn is_padded(&self) -> bool { self.0 & PADDED == PADDED } } +impl Default for DataFlag { + /// Returns a `HeadersFlag` value with `END_HEADERS` set. + fn default() -> Self { + DataFlag(0) + } +} + impl From for u8 { fn from(src: DataFlag) -> u8 { src.0 diff --git a/src/frame/mod.rs b/src/frame/mod.rs index c31866c..ba95bdd 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -1,6 +1,8 @@ use hpack; use error::{ConnectionError, Reason}; +use bytes::Bytes; + /// A helper macro that unpacks a sequence of 4 bytes found in the buffer with /// the given identifier, starting at the given offset, into the given integer /// type. Obviously, the integer type should be able to support at least 4 @@ -51,8 +53,8 @@ pub use self::settings::{ pub const HEADER_LEN: usize = 9; #[derive(Debug /*, Clone, PartialEq */)] -pub enum Frame { - Data(Data), +pub enum Frame { + Data(Data), Headers(Headers), PushPromise(PushPromise), Settings(Settings), diff --git a/src/frame/ping.rs b/src/frame/ping.rs index 46624ac..b30a1c9 100644 --- a/src/frame/ping.rs +++ b/src/frame/ping.rs @@ -71,8 +71,8 @@ impl Ping { } } -impl From for Frame { - fn from(src: Ping) -> Frame { +impl From for Frame { + fn from(src: Ping) -> Frame { Frame::Ping(src) } } diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 8edb0f3..6a43013 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -175,8 +175,8 @@ impl Settings { } } -impl From for Frame { - fn from(src: Settings) -> Frame { +impl From for Frame { + fn from(src: Settings) -> Frame { Frame::Settings(src) } } diff --git a/src/lib.rs b/src/lib.rs index 18b2666..bf05683 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,7 @@ pub mod server; mod util; -pub use error::{ConnectionError, StreamError, Reason}; +pub use error::ConnectionError; pub use frame::{StreamId}; pub use proto::Connection; @@ -42,15 +42,15 @@ use bytes::Bytes; /// An H2 connection frame #[derive(Debug)] -pub enum Frame { +pub enum Frame { Headers { id: StreamId, headers: T, end_of_stream: bool, }, - Body { + Data { id: StreamId, - chunk: Bytes, + data: B, end_of_stream: bool, }, Trailers { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index ec518f3..f480ebf 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,11 +1,14 @@ -use {frame, Frame, ConnectionError, Peer, StreamId}; +use {Frame, Peer}; use client::Client; use server::Server; +use frame::{self, StreamId}; use proto::{self, ReadySink, State}; +use error::{self, ConnectionError}; use tokio_io::{AsyncRead, AsyncWrite}; use http::{request, response}; +use bytes::{Bytes, IntoBuf}; use futures::*; @@ -17,17 +20,18 @@ use std::hash::BuildHasherDefault; /// An H2 connection #[derive(Debug)] -pub struct Connection { - inner: proto::Inner, +pub struct Connection { + inner: proto::Inner, streams: StreamMap, - peer: PhantomData

, + peer: PhantomData<(P, B)>, } type StreamMap = OrderMap>; -pub fn new(transport: proto::Inner) -> Connection +pub fn new(transport: proto::Inner) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, + B: IntoBuf, { Connection { inner: transport, @@ -36,8 +40,28 @@ pub fn new(transport: proto::Inner) -> Connection } } -impl Connection +impl Connection where T: AsyncRead + AsyncWrite, + P: Peer, + B: IntoBuf, +{ + pub fn send_data(self, + id: StreamId, + data: B, + end_of_stream: bool) + -> sink::Send + { + self.send(Frame::Data { + id: id, + data: data, + end_of_stream: end_of_stream, + }) + } +} + +impl Connection + where T: AsyncRead + AsyncWrite, + B: IntoBuf, { pub fn send_request(self, id: StreamId, // TODO: Generate one internally? @@ -53,8 +77,9 @@ impl Connection } } -impl Connection +impl Connection where T: AsyncRead + AsyncWrite, + B: IntoBuf, { pub fn send_response(self, id: StreamId, // TODO: Generate one internally? @@ -70,9 +95,10 @@ impl Connection } } -impl Stream for Connection +impl Stream for Connection where T: AsyncRead + AsyncWrite, P: Peer, + B: IntoBuf, { type Item = Frame; type Error = ConnectionError; @@ -96,7 +122,6 @@ impl Stream for Connection let frame = match frame { Some(Headers(v)) => { - // TODO: Update stream state let stream_id = v.stream_id(); let end_of_stream = v.is_end_stream(); @@ -121,14 +146,17 @@ impl Stream for Connection } } Some(Data(v)) => { - // TODO: Validate frame - let stream_id = v.stream_id(); let end_of_stream = v.is_end_stream(); - Frame::Body { + match self.streams.get_mut(&stream_id) { + None => return Err(error::Reason::ProtocolError.into()), + Some(state) => try!(state.recv_data(end_of_stream)), + } + + Frame::Data { id: stream_id, - chunk: v.into_payload(), + data: v.into_payload(), end_of_stream: end_of_stream, } } @@ -140,11 +168,12 @@ impl Stream for Connection } } -impl Sink for Connection +impl Sink for Connection where T: AsyncRead + AsyncWrite, P: Peer, + B: IntoBuf, { - type SinkItem = Frame; + type SinkItem = Frame; type SinkError = ConnectionError; fn start_send(&mut self, item: Self::SinkItem) @@ -171,7 +200,8 @@ impl Sink for Connection // connections should not be factored. // if !P::is_valid_local_stream_id(id) { - unimplemented!(); + // TODO: clear state + return Err(error::User::InvalidStreamId.into()); } } @@ -188,6 +218,25 @@ impl Sink for Connection Ok(AsyncSink::Ready) } + Frame::Data { id, data, end_of_stream } => { + // The stream must be initialized at this point + match self.streams.get_mut(&id) { + None => return Err(error::User::InactiveStreamId.into()), + Some(state) => try!(state.send_data(end_of_stream)), + } + + let mut frame = frame::Data::new(id, data.into_buf()); + + if end_of_stream { + frame.set_end_stream(); + } + + let res = try!(self.inner.start_send(frame.into())); + + assert!(res.is_ready()); + + Ok(AsyncSink::Ready) + } /* Frame::Trailers { id, headers } => { unimplemented!(); @@ -211,9 +260,10 @@ impl Sink for Connection } } -impl ReadySink for Connection +impl ReadySink for Connection where T: AsyncRead + AsyncWrite, P: Peer, + B: IntoBuf, { fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { self.inner.poll_ready() diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index ee476d4..469e4a1 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -29,10 +29,7 @@ enum Partial { // PushPromise(frame::PushPromise), } -impl FramedRead - where T: AsyncRead, - T: Sink, -{ +impl FramedRead { pub fn new(inner: length_delimited::FramedRead) -> FramedRead { FramedRead { inner: inner, @@ -40,9 +37,7 @@ impl FramedRead partial: None, } } -} -impl FramedRead { fn decode_frame(&mut self, mut bytes: Bytes) -> Result, ConnectionError> { // Parse the head let head = frame::Head::parse(&bytes); diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 07bd934..33209f8 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -10,7 +10,7 @@ use std::cmp; use std::io::{self, Cursor}; #[derive(Debug)] -pub struct FramedWrite { +pub struct FramedWrite { /// Upstream `AsyncWrite` inner: T, @@ -21,20 +21,20 @@ pub struct FramedWrite { buf: Cursor, /// Next frame to encode - next: Option, + next: Option>, /// Max frame size, this is specified by the peer max_frame_size: usize, } #[derive(Debug)] -enum Next { +enum Next { Data { /// Length of the current frame being written frame_len: usize, /// Data frame to encode - data: frame::Data + data: frame::Data, }, Continuation(frame::Continuation), } @@ -50,8 +50,12 @@ const MIN_BUFFER_CAPACITY: usize = frame::HEADER_LEN + CHAIN_THRESHOLD; /// than 16kb, so not even close). const CHAIN_THRESHOLD: usize = 256; -impl FramedWrite { - pub fn new(inner: T) -> FramedWrite { +// TODO: Make generic +impl FramedWrite + where T: AsyncWrite, + B: Buf, +{ + pub fn new(inner: T) -> FramedWrite { FramedWrite { inner: inner, hpack: hpack::Encoder::default(), @@ -69,24 +73,27 @@ impl FramedWrite { self.next.is_none() && !self.buf.has_remaining() } - fn frame_len(&self, data: &frame::Data) -> usize { + fn frame_len(&self, data: &frame::Data) -> usize { cmp::min(self.max_frame_size, data.len()) } } -impl Sink for FramedWrite { - type SinkItem = Frame; +impl Sink for FramedWrite + where T: AsyncWrite, + B: Buf, +{ + type SinkItem = Frame; type SinkError = ConnectionError; - fn start_send(&mut self, item: Frame) -> StartSend { - debug!("start_send; frame={:?}", item); - + fn start_send(&mut self, item: Self::SinkItem) + -> StartSend + { if !try!(self.poll_ready()).is_ready() { return Ok(AsyncSink::NotReady(item)); } match item { - Frame::Data(v) => { + Frame::Data(mut v) => { if v.len() >= CHAIN_THRESHOLD { let head = v.head(); let len = self.frame_len(&v); @@ -100,7 +107,11 @@ impl Sink for FramedWrite { data: v, }); } else { - v.encode(self.buf.get_mut()); + v.encode_chunk(self.buf.get_mut()); + + // The chunk has been fully encoded, so there is no need to + // keep it around + assert_eq!(v.len(), 0, "chunk not fully encoded"); } } Frame::Headers(v) => { @@ -136,7 +147,6 @@ impl Sink for FramedWrite { // As long as there is data to write, try to write it! while !self.is_empty() { - trace!("writing buffer; next={:?}; rem={:?}", self.next, self.buf.remaining()); try_ready!(self.inner.write_buf(&mut self.buf)); } @@ -157,7 +167,10 @@ impl Sink for FramedWrite { } } -impl ReadySink for FramedWrite { +impl ReadySink for FramedWrite + where T: AsyncWrite, + B: Buf, +{ fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { if !self.has_capacity() { // Try flushing @@ -172,7 +185,7 @@ impl ReadySink for FramedWrite { } } -impl Stream for FramedWrite { +impl Stream for FramedWrite { type Item = T::Item; type Error = T::Error; @@ -181,14 +194,14 @@ impl Stream for FramedWrite { } } -impl io::Read for FramedWrite { +impl io::Read for FramedWrite { fn read(&mut self, dst: &mut [u8]) -> io::Result { self.inner.read(dst) } } -impl AsyncRead for FramedWrite { - fn read_buf(&mut self, buf: &mut B) -> Poll +impl AsyncRead for FramedWrite { + fn read_buf(&mut self, buf: &mut B2) -> Poll where Self: Sized, { self.inner.read_buf(buf) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 923a284..bffa97f 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -19,24 +19,28 @@ use {frame, Peer}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; -type Inner = +use bytes::{Buf, IntoBuf}; + +type Inner = Settings< PingPong< - Framed>>; + Framed, + B>>; -type Framed = +type Framed = FramedRead< - FramedWrite>; + FramedWrite>; /// Create a full H2 transport from an I/O handle. /// /// This is called as the final step of the client handshake future. -pub fn from_io(io: T, settings: frame::SettingSet) - -> Connection +pub fn from_io(io: T, settings: frame::SettingSet) + -> Connection where T: AsyncRead + AsyncWrite, P: Peer, + B: IntoBuf, { - let framed_write = FramedWrite::new(io); + let framed_write: FramedWrite<_, B::Buf> = FramedWrite::new(io); // To avoid code duplication, we're going to go this route. It is a bit // weird, but oh well... @@ -51,9 +55,10 @@ pub fn from_io(io: T, settings: frame::SettingSet) /// When the server is performing the handshake, it is able to only send /// `Settings` frames and is expected to receive the client preface as a byte /// stream. To represent this, `Settings>` is returned. -pub fn server_handshaker(io: T, settings: frame::SettingSet) - -> Settings> +pub fn server_handshaker(io: T, settings: frame::SettingSet) + -> Settings> where T: AsyncRead + AsyncWrite, + B: Buf, { let framed_write = FramedWrite::new(io); @@ -61,10 +66,11 @@ pub fn server_handshaker(io: T, settings: frame::SettingSet) } /// Create a full H2 transport from the server handshaker -pub fn from_server_handshaker(transport: Settings>) - -> Connection +pub fn from_server_handshaker(transport: Settings>) + -> Connection where T: AsyncRead + AsyncWrite, P: Peer, + B: IntoBuf, { let settings = transport.swap_inner(|io| { // Delimit the frames diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index d23f9ca..8739e81 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -5,16 +5,16 @@ use proto::ReadySink; /// Acknowledges ping requests from the remote. #[derive(Debug)] -pub struct PingPong { +pub struct PingPong { inner: T, - pong: Option, + pong: Option>, } -impl PingPong +impl PingPong where T: Stream, - T: Sink, + T: Sink, SinkError = ConnectionError>, { - pub fn new(inner: T) -> PingPong { + pub fn new(inner: T) -> Self { PingPong { inner, pong: None, @@ -38,9 +38,9 @@ impl PingPong /// > a PING frame with the ACK flag set in response, with an identical /// > payload. PING responses SHOULD be given higher priority than any /// > other frame. -impl Stream for PingPong +impl Stream for PingPong where T: Stream, - T: Sink, + T: Sink, SinkError = ConnectionError>, { type Item = Frame; type Error = ConnectionError; @@ -76,14 +76,16 @@ impl Stream for PingPong } } -impl Sink for PingPong +impl Sink for PingPong where T: Stream, - T: Sink, + T: Sink, SinkError = ConnectionError>, { - type SinkItem = Frame; + type SinkItem = Frame; type SinkError = ConnectionError; - fn start_send(&mut self, item: Frame) -> StartSend { + fn start_send(&mut self, item: Self::SinkItem) + -> StartSend + { // Pings _SHOULD_ have priority over other messages, so attempt to send pending // ping frames before attempting to send `item`. if self.try_send_pong()?.is_not_ready() { @@ -100,9 +102,9 @@ impl Sink for PingPong } } -impl ReadySink for PingPong +impl ReadySink for PingPong where T: Stream, - T: Sink, + T: Sink, SinkError = ConnectionError>, T: ReadySink, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 959bec9..5a7d485 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -29,8 +29,8 @@ pub struct Settings { received_remote: bool, } -impl Settings - where T: Sink, +impl Settings + where T: Sink, SinkError = ConnectionError>, { pub fn new(inner: T, local: frame::SettingSet) -> Settings { Settings { @@ -44,7 +44,7 @@ impl Settings } /// Swap the inner transport while maintaining the current state. - pub fn swap_inner U>(self, f: F) -> Settings { + pub fn swap_inner T2>(self, f: F) -> Settings { let inner = f(self.inner); Settings { @@ -88,9 +88,9 @@ impl Settings } } -impl Stream for Settings +impl Stream for Settings where T: Stream, - T: Sink, + T: Sink, SinkError = ConnectionError>, { type Item = Frame; type Error = ConnectionError; @@ -118,13 +118,15 @@ impl Stream for Settings } } -impl Sink for Settings - where T: Sink, +impl Sink for Settings + where T: Sink, SinkError = ConnectionError>, { - type SinkItem = Frame; + type SinkItem = Frame; type SinkError = ConnectionError; - fn start_send(&mut self, item: Frame) -> StartSend { + fn start_send(&mut self, item: Self::SinkItem) + -> StartSend + { // Settings frames take priority, so `item` cannot be sent if there are // any pending acks OR the local settings have been changed w/o sending // an associated frame. @@ -147,8 +149,8 @@ impl Sink for Settings } } -impl ReadySink for Settings - where T: Sink, +impl ReadySink for Settings + where T: Sink, SinkError = ConnectionError>, T: ReadySink, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { diff --git a/src/proto/state.rs b/src/proto/state.rs index 31700ad..4bb2323 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -1,4 +1,7 @@ -use {ConnectionError, Reason, Peer}; +use Peer; +use error::ConnectionError; +use error::Reason::*; +use error::User::*; /// Represents the state of an H2 stream /// @@ -83,7 +86,7 @@ impl State { Ok(true) } Open { local, remote } => { - try!(remote.check_is_headers(Reason::ProtocolError)); + try!(remote.check_is_headers(ProtocolError.into())); *self = if eos { HalfClosedRemote(local) @@ -95,7 +98,7 @@ impl State { Ok(false) } HalfClosedLocal(remote) => { - try!(remote.check_is_headers(Reason::ProtocolError)); + try!(remote.check_is_headers(ProtocolError.into())); *self = if eos { Closed @@ -106,7 +109,36 @@ impl State { Ok(false) } Closed | HalfClosedRemote(..) => { - Err(Reason::ProtocolError.into()) + Err(ProtocolError.into()) + } + _ => unimplemented!(), + } + } + + pub fn recv_data(&mut self, eos: bool) -> Result<(), ConnectionError> { + use self::State::*; + + match *self { + Open { local, remote } => { + try!(remote.check_is_data(ProtocolError.into())); + + if eos { + *self = HalfClosedRemote(local); + } + + Ok(()) + } + HalfClosedLocal(remote) => { + try!(remote.check_is_data(ProtocolError.into())); + + if eos { + *self = Closed; + } + + Ok(()) + } + Closed | HalfClosedRemote(..) => { + Err(ProtocolError.into()) } _ => unimplemented!(), } @@ -134,7 +166,7 @@ impl State { Ok(true) } Open { local, remote } => { - try!(local.check_is_headers(Reason::InternalError)); + try!(local.check_is_headers(UnexpectedFrameType.into())); *self = if eos { HalfClosedLocal(remote) @@ -146,7 +178,7 @@ impl State { Ok(false) } HalfClosedRemote(local) => { - try!(local.check_is_headers(Reason::InternalError)); + try!(local.check_is_headers(UnexpectedFrameType.into())); *self = if eos { Closed @@ -157,7 +189,36 @@ impl State { Ok(false) } Closed | HalfClosedLocal(..) => { - Err(Reason::InternalError.into()) + Err(UnexpectedFrameType.into()) + } + _ => unimplemented!(), + } + } + + pub fn send_data(&mut self, eos: bool) -> Result<(), ConnectionError> { + use self::State::*; + + match *self { + Open { local, remote } => { + try!(local.check_is_data(UnexpectedFrameType.into())); + + if eos { + *self = HalfClosedLocal(remote); + } + + Ok(()) + } + HalfClosedRemote(local) => { + try!(local.check_is_data(UnexpectedFrameType.into())); + + if eos { + *self = Closed; + } + + Ok(()) + } + Closed | HalfClosedLocal(..) => { + Err(UnexpectedFrameType.into()) } _ => unimplemented!(), } @@ -166,12 +227,21 @@ impl State { impl PeerState { #[inline] - fn check_is_headers(&self, err: Reason) -> Result<(), ConnectionError> { + fn check_is_headers(&self, err: ConnectionError) -> Result<(), ConnectionError> { use self::PeerState::*; match *self { Headers => Ok(()), - _ => Err(err.into()), + _ => Err(err), + } + } + + fn check_is_data(&self, err: ConnectionError) -> Result<(), ConnectionError> { + use self::PeerState::*; + + match *self { + Data => Ok(()), + _ => Err(err), } } } diff --git a/src/server.rs b/src/server.rs index 9991708..fe40040 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,20 +3,21 @@ use {frame, proto, Peer, ConnectionError, StreamId}; use http; use futures::{Future, Sink, Poll, Async}; use tokio_io::{AsyncRead, AsyncWrite}; +use bytes::{Bytes, IntoBuf}; use std::fmt; /// In progress H2 connection binding -pub struct Handshake { +pub struct Handshake { // TODO: unbox - inner: Box, Error = ConnectionError>>, + inner: Box, Error = ConnectionError>>, } /// Marker type indicating a client peer #[derive(Debug)] pub struct Server; -pub type Connection = super::Connection; +pub type Connection = super::Connection; /// Flush a Sink struct Flush { @@ -31,12 +32,19 @@ struct ReadPreface { const PREFACE: [u8; 24] = *b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; +pub fn handshake(io: T) -> Handshake + where T: AsyncRead + AsyncWrite + 'static, +{ + handshake2(io) +} + /// Bind an H2 server connection. /// /// Returns a future which resolves to the connection value once the H2 /// handshake has been completed. -pub fn handshake(io: T) -> Handshake +pub fn handshake2(io: T) -> Handshake where T: AsyncRead + AsyncWrite + 'static, + B: 'static, // TODO: Why is this required but not in client? { let transport = proto::server_handshaker(io, Default::default()); @@ -141,8 +149,8 @@ impl Peer for Server { } } -impl Future for Handshake { - type Item = Connection; +impl Future for Handshake { + type Item = Connection; type Error = ConnectionError; fn poll(&mut self) -> Poll { @@ -150,7 +158,10 @@ impl Future for Handshake { } } -impl fmt::Debug for Handshake { +impl fmt::Debug for Handshake + where T: fmt::Debug, + B: fmt::Debug + IntoBuf, +{ fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "server::Handshake") } diff --git a/tests/client_request.rs b/tests/client_request.rs index 247d44c..6557135 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -3,12 +3,26 @@ extern crate http; extern crate futures; extern crate mock_io; extern crate env_logger; +extern crate bytes; use h2::client; use http::request; +use bytes::Bytes; use futures::*; +// TODO: move into another file +macro_rules! assert_user_err { + ($actual:expr, $err:ident) => {{ + use h2::error::{ConnectionError, User}; + + match $actual { + ConnectionError::User(e) => assert_eq!(e, User::$err), + _ => panic!("unexpected connection error type"), + } + }}; +} + #[test] fn handshake() { let _ = ::env_logger::init(); @@ -52,11 +66,99 @@ fn get_with_204_response() { // Get the response let (resp, h2) = h2.into_future().wait().unwrap(); - println!("RESP: {:?}", resp); + assert!(Stream::wait(h2).next().is_none()); +} + +#[test] +#[ignore] +fn get_with_200_response() { + let _ = ::env_logger::init(); + + let mock = mock_io::Builder::new() + .handshake() + // Write GET / + .write(&[ + 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, + 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, + ]) + .write(SETTINGS_ACK) + // Read response + .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) + .build(); + + let mut h2 = client::handshake(mock) + .wait().unwrap(); + + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + + // Get the response + let (resp, h2) = h2.into_future().wait().unwrap(); assert!(Stream::wait(h2).next().is_none()); } +#[test] +fn request_with_zero_stream_id() { + let mock = mock_io::Builder::new() + .handshake() + .build(); + + let h2 = client::handshake(mock) + .wait().unwrap(); + + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + + let err = h2.send_request(0.into(), request, true).wait().unwrap_err(); + assert_user_err!(err, InvalidStreamId); +} + +#[test] +fn request_with_server_stream_id() { + let mock = mock_io::Builder::new() + .handshake() + .build(); + + let h2 = client::handshake(mock) + .wait().unwrap(); + + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + + let err = h2.send_request(2.into(), request, true).wait().unwrap_err(); + assert_user_err!(err, InvalidStreamId); +} + +#[test] +#[ignore] +fn send_data_without_headers() { + let mock = mock_io::Builder::new() + .handshake() + .build(); + + let h2 = client::handshake(mock) + .wait().unwrap(); + + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + + /* + let err = h2.send_request(2.into(), request, true).wait().unwrap_err(); + assert_user_err!(err, InvalidStreamId); + */ +} + +#[test] +#[ignore] +fn send_data_after_headers_eos() { +} + #[test] #[ignore] fn request_without_scheme() { @@ -77,6 +179,11 @@ fn invalid_client_stream_id() { fn invalid_server_stream_id() { } +#[test] +#[ignore] +fn exceed_max_streams() { +} + const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];