diff --git a/examples/akamai.rs b/examples/akamai.rs new file mode 100644 index 0000000..863f249 --- /dev/null +++ b/examples/akamai.rs @@ -0,0 +1,71 @@ +extern crate h2; +extern crate http; +extern crate futures; +extern crate tokio_io; +extern crate tokio_core; +extern crate tokio_openssl; +extern crate openssl; +extern crate io_dump; +extern crate env_logger; + +use h2::client; + +use http::request; + +use futures::*; + +use tokio_core::reactor; +use tokio_core::net::TcpStream; + +pub fn main() { + let _ = env_logger::init(); + + let mut core = reactor::Core::new().unwrap();; + + let tcp = TcpStream::connect( + &"23.39.23.98:443".parse().unwrap(), + &core.handle()); + + let tcp = tcp.then(|res| { + use openssl::ssl::{SslMethod, SslConnectorBuilder}; + use tokio_openssl::SslConnectorExt; + + let tcp = res.unwrap(); + + // Does anyone know how TLS even works? + + let mut b = SslConnectorBuilder::new(SslMethod::tls()).unwrap(); + b.builder_mut().set_alpn_protocols(&[b"h2"]).unwrap(); + + let connector = b.build(); + connector.connect_async("http2.akamai.com", tcp) + .then(|res| { + let tls = res.unwrap(); + assert_eq!(Some(&b"h2"[..]), tls.get_ref().ssl().selected_alpn_protocol()); + + // Dump output to stdout + let tls = io_dump::Dump::to_stdout(tls); + + client::handshake(tls) + }) + .then(|res| { + let conn = res.unwrap(); + + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + // request.version = version::H2; + + conn.send_request(1.into(), request, true) + }) + .then(|res| { + let conn = res.unwrap(); + // Get the next message + conn.for_each(|frame| { + println!("RX: {:?}", frame); + Ok(()) + }) + }) + }); + + core.run(tcp).unwrap(); +} diff --git a/examples/client.rs b/examples/client.rs index fc1c64c..130d9c4 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -37,7 +37,7 @@ pub fn main() { request.uri = "https://http2.akamai.com/".parse().unwrap(); // request.version = version::H2; - conn.send_request(1, request, true) + conn.send_request(1.into(), request, true) }) .then(|res| { let conn = res.unwrap(); diff --git a/examples/server.rs b/examples/server.rs index 1f19252..373e065 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -45,7 +45,7 @@ pub fn main() { let mut response = response::Head::default(); response.status = status::NO_CONTENT; - conn.send_response(1, response, true) + conn.send_response(1.into(), response, true) }) }) .then(|res| { diff --git a/src/client.rs b/src/client.rs index 7b5f3cb..4568885 100644 --- a/src/client.rs +++ b/src/client.rs @@ -45,15 +45,12 @@ impl Peer for Client { type Send = http::request::Head; type Poll = http::response::Head; - fn check_initiating_id(id: StreamId) -> Result<(), ConnectionError> { - if id % 2 == 0 { - // Client stream identifiers must be odd - unimplemented!(); - } + fn is_valid_local_stream_id(id: StreamId) -> bool { + id.is_client_initiated() + } - // TODO: Ensure the `id` doesn't overflow u31 - - Ok(()) + fn is_valid_remote_stream_id(id: StreamId) -> bool { + id.is_server_initiated() } fn convert_send_message( diff --git a/src/error.rs b/src/error.rs index 3fc594b..8028054 100644 --- a/src/error.rs +++ b/src/error.rs @@ -62,6 +62,12 @@ impl From for ConnectionError { } } +impl From for ConnectionError { + fn from(src: Reason) -> ConnectionError { + ConnectionError::Proto(src) + } +} + impl From for io::Error { fn from(src: ConnectionError) -> io::Error { io::Error::new(io::ErrorKind::Other, src) diff --git a/src/frame/go_away.rs b/src/frame/go_away.rs index 4d0145a..f7fc8df 100644 --- a/src/frame/go_away.rs +++ b/src/frame/go_away.rs @@ -1,5 +1,4 @@ -use frame::Error; -use super::{head, StreamId}; +use frame::{Error, StreamId}; #[derive(Debug)] pub struct GoAway { @@ -15,7 +14,7 @@ impl GoAway { unimplemented!(); } - let last_stream_id = head::parse_stream_id(&payload[..4]); + let last_stream_id = StreamId::parse(&payload[..4]); let error_code = unpack_octets_4!(payload, 4, u32); Ok(GoAway { diff --git a/src/frame/head.rs b/src/frame/head.rs index 9b76282..899dd18 100644 --- a/src/frame/head.rs +++ b/src/frame/head.rs @@ -1,3 +1,5 @@ +use super::StreamId; + use bytes::{BufMut, BigEndian}; #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -23,10 +25,6 @@ pub enum Kind { Unknown, } -pub type StreamId = u32; - -const STREAM_ID_MASK: StreamId = 0x80000000; - // ===== impl Head ===== impl Head { @@ -43,11 +41,11 @@ impl Head { Head { kind: Kind::new(header[3]), flag: header[4], - stream_id: parse_stream_id(&header[5..]), + stream_id: StreamId::parse(&header[5..]), } } - pub fn stream_id(&self) -> u32 { + pub fn stream_id(&self) -> StreamId { self.stream_id } @@ -65,27 +63,14 @@ impl Head { pub fn encode(&self, payload_len: usize, dst: &mut T) { debug_assert!(self.encode_len() <= dst.remaining_mut()); - debug_assert!(self.stream_id & STREAM_ID_MASK == 0); dst.put_uint::(payload_len as u64, 3); dst.put_u8(self.kind as u8); dst.put_u8(self.flag); - dst.put_u32::(self.stream_id); + dst.put_u32::(self.stream_id.into()); } } -/// Parse the next 4 octets in the given buffer, assuming they represent an -/// HTTP/2 stream ID. This means that the most significant bit of the first -/// octet is ignored and the rest interpreted as a network-endian 31-bit -/// integer. -#[inline] -pub fn parse_stream_id(buf: &[u8]) -> StreamId { - /// TODO: Move this onto the StreamId type? - let unpacked = unpack_octets_4!(buf, 0, u32); - // Now clear the most significant bit, as that is reserved and MUST be ignored when received. - unpacked & !STREAM_ID_MASK -} - // ===== impl Kind ===== impl Kind { diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 3b3e5bc..c31866c 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -30,15 +30,17 @@ mod headers; mod ping; mod reset; mod settings; +mod stream_id; mod util; pub use self::data::Data; pub use self::go_away::GoAway; -pub use self::head::{Head, Kind, StreamId}; +pub use self::head::{Head, Kind}; pub use self::headers::{Headers, PushPromise, Continuation, Pseudo}; pub use self::ping::Ping; pub use self::reset::Reset; pub use self::settings::{Settings, SettingSet}; +pub use self::stream_id::StreamId; // Re-export some constants pub use self::settings::{ diff --git a/src/frame/ping.rs b/src/frame/ping.rs index 09b8558..46624ac 100644 --- a/src/frame/ping.rs +++ b/src/frame/ping.rs @@ -1,5 +1,5 @@ use bytes::{Buf, BufMut, IntoBuf}; -use frame::{Frame, Head, Kind, Error}; +use frame::{Frame, Head, Kind, Error, StreamId}; const ACK_FLAG: u8 = 0x1; @@ -36,7 +36,7 @@ impl Ping { // frame is received with a stream identifier field value other than // 0x0, the recipient MUST respond with a connection error // (Section 5.4.1) of type PROTOCOL_ERROR. - if head.stream_id() != 0 { + if !head.stream_id().is_zero() { return Err(Error::InvalidStreamId); } @@ -45,6 +45,7 @@ impl Ping { if bytes.len() != 8 { return Err(Error::BadFrameSize); } + let mut payload = [0; 8]; bytes.into_buf().copy_to_slice(&mut payload); @@ -63,7 +64,7 @@ impl Ping { trace!("encoding PING; ack={} len={}", self.ack, sz); let flags = if self.ack { ACK_FLAG } else { 0 }; - let head = Head::new(Kind::Ping, flags, 0); + let head = Head::new(Kind::Ping, flags, StreamId::zero()); head.encode(sz, dst); dst.put_slice(&self.payload); diff --git a/src/frame/settings.rs b/src/frame/settings.rs index e206529..8edb0f3 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -1,4 +1,4 @@ -use frame::{Frame, Error, Head, Kind}; +use frame::{Frame, Error, Head, Kind, StreamId}; use bytes::{BytesMut, BufMut, BigEndian}; #[derive(Debug, Clone, Default, Eq, PartialEq)] @@ -71,7 +71,7 @@ impl Settings { debug_assert_eq!(head.kind(), ::frame::Kind::Settings); - if head.stream_id() != 0 { + if !head.stream_id().is_zero() { return Err(Error::InvalidStreamId); } @@ -132,7 +132,7 @@ impl Settings { pub fn encode(&self, dst: &mut BytesMut) { // Create & encode an appropriate frame head - let head = Head::new(Kind::Settings, self.flags.into(), 0); + let head = Head::new(Kind::Settings, self.flags.into(), StreamId::zero()); let payload_len = self.payload_len(); trace!("encoding SETTINGS; len={}", payload_len); diff --git a/src/frame/stream_id.rs b/src/frame/stream_id.rs new file mode 100644 index 0000000..6150bef --- /dev/null +++ b/src/frame/stream_id.rs @@ -0,0 +1,55 @@ +use byteorder::{BigEndian, ByteOrder}; +use std::u32; + +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct StreamId(u32); + +const STREAM_ID_MASK: u32 = 1 << 31; + +impl StreamId { + /// Parse the stream ID + #[inline] + pub fn parse(buf: &[u8]) -> StreamId { + let unpacked = BigEndian::read_u32(buf); + // Now clear the most significant bit, as that is reserved and MUST be + // ignored when received. + StreamId(unpacked & !STREAM_ID_MASK) + } + + pub fn is_client_initiated(&self) -> bool { + let id = self.0; + id != 0 && id % 2 == 1 + } + + pub fn is_server_initiated(&self) -> bool { + let id = self.0; + id != 0 && id % 2 == 0 + } + + #[inline] + pub fn zero() -> StreamId { + StreamId(0) + } + + #[inline] + pub fn max() -> StreamId { + StreamId(u32::MAX >> 1) + } + + pub fn is_zero(&self) -> bool { + self.0 == 0 + } +} + +impl From for StreamId { + fn from(src: u32) -> Self { + assert_eq!(src & STREAM_ID_MASK, 0, "invalid stream ID -- MSB is set"); + StreamId(src) + } +} + +impl From for u32 { + fn from(src: StreamId) -> Self { + src.0 + } +} diff --git a/src/lib.rs b/src/lib.rs index acdad1c..18b2666 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,7 +75,13 @@ pub trait Peer { /// Message type polled from the transport type Poll; - fn check_initiating_id(id: StreamId) -> Result<(), ConnectionError>; + /// Returns `true` if `id` is a valid StreamId for a stream initiated by the + /// local node. + fn is_valid_local_stream_id(id: StreamId) -> bool; + + /// Returns `true` if `id` is a valid StreamId for a stream initiated by the + /// remote node. + fn is_valid_remote_stream_id(id: StreamId) -> bool; #[doc(hidden)] fn convert_send_message( diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 7ec1666..ec518f3 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -100,6 +100,20 @@ impl Stream for Connection let stream_id = v.stream_id(); let end_of_stream = v.is_end_stream(); + let stream_initialized = try!(self.streams.entry(stream_id) + .or_insert(State::default()) + .recv_headers::

(end_of_stream)); + + if stream_initialized { + // TODO: Ensure available capacity for a new stream + // This won't be as simple as self.streams.len() as closed + // connections should not be factored. + + if !P::is_valid_remote_stream_id(stream_id) { + unimplemented!(); + } + } + Frame::Headers { id: stream_id, headers: P::convert_poll_message(v), @@ -143,22 +157,23 @@ impl Sink for Connection match item { Frame::Headers { id, headers, end_of_stream } => { - // Ensure ID is valid - // TODO: This check should only be done **if** this is a new - // stream ID - // try!(P::check_initiating_id(id)); - - // TODO: Ensure available capacity for a new stream - // This won't be as simple as self.streams.len() as closed - // connections should not be factored. - // Transition the stream state, creating a new entry if needed // // TODO: Response can send multiple headers frames before body // (1xx responses). - try!(self.streams.entry(id) + let stream_initialized = try!(self.streams.entry(id) .or_insert(State::default()) - .send_headers()); + .send_headers::

(end_of_stream)); + + if stream_initialized { + // TODO: Ensure available capacity for a new stream + // This won't be as simple as self.streams.len() as closed + // connections should not be factored. + // + if !P::is_valid_local_stream_id(id) { + unimplemented!(); + } + } let frame = P::convert_send_message(id, headers, end_of_stream); diff --git a/src/proto/state.rs b/src/proto/state.rs index 4e5a1c9..31700ad 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -1,4 +1,4 @@ -use ConnectionError; +use {ConnectionError, Reason, Peer}; /// Represents the state of an H2 stream /// @@ -45,23 +45,134 @@ pub enum State { Idle, ReservedLocal, ReservedRemote, - Open, - HalfClosedLocal, - HalfClosedRemote, + Open { + local: PeerState, + remote: PeerState, + }, + HalfClosedLocal(PeerState), + HalfClosedRemote(PeerState), Closed, } +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum PeerState { + Headers, + Data, +} + impl State { + /// Transition the state to represent headers being received. + /// + /// Returns true if this state transition results in iniitializing the + /// stream id. `Err` is returned if this is an invalid state transition. + pub fn recv_headers(&mut self, eos: bool) -> Result { + use self::State::*; + use self::PeerState::*; + + match *self { + Idle => { + *self = if eos { + HalfClosedRemote(Headers) + } else { + Open { + local: Headers, + remote: Data, + } + }; + + Ok(true) + } + Open { local, remote } => { + try!(remote.check_is_headers(Reason::ProtocolError)); + + *self = if eos { + HalfClosedRemote(local) + } else { + let remote = Data; + Open { local, remote } + }; + + Ok(false) + } + HalfClosedLocal(remote) => { + try!(remote.check_is_headers(Reason::ProtocolError)); + + *self = if eos { + Closed + } else { + HalfClosedLocal(Data) + }; + + Ok(false) + } + Closed | HalfClosedRemote(..) => { + Err(Reason::ProtocolError.into()) + } + _ => unimplemented!(), + } + } + /// Transition the state to represent headers being sent. /// - /// Returns an error if this is an invalid state transition. - pub fn send_headers(&mut self) -> Result<(), ConnectionError> { - if *self != State::Idle { - unimplemented!(); - } + /// Returns true if this state transition results in initializing the stream + /// id. `Err` is returned if this is an invalid state transition. + pub fn send_headers(&mut self, eos: bool) -> Result { + use self::State::*; + use self::PeerState::*; - *self = State::Open; - Ok(()) + match *self { + Idle => { + *self = if eos { + HalfClosedLocal(Headers) + } else { + Open { + local: Data, + remote: Headers, + } + }; + + Ok(true) + } + Open { local, remote } => { + try!(local.check_is_headers(Reason::InternalError)); + + *self = if eos { + HalfClosedLocal(remote) + } else { + let local = Data; + Open { local, remote } + }; + + Ok(false) + } + HalfClosedRemote(local) => { + try!(local.check_is_headers(Reason::InternalError)); + + *self = if eos { + Closed + } else { + HalfClosedRemote(Data) + }; + + Ok(false) + } + Closed | HalfClosedLocal(..) => { + Err(Reason::InternalError.into()) + } + _ => unimplemented!(), + } + } +} + +impl PeerState { + #[inline] + fn check_is_headers(&self, err: Reason) -> Result<(), ConnectionError> { + use self::PeerState::*; + + match *self { + Headers => Ok(()), + _ => Err(err.into()), + } } } diff --git a/src/server.rs b/src/server.rs index 4eb3e6c..9991708 100644 --- a/src/server.rs +++ b/src/server.rs @@ -102,15 +102,12 @@ impl Peer for Server { type Send = http::response::Head; type Poll = http::request::Head; - fn check_initiating_id(id: StreamId) -> Result<(), ConnectionError> { - if id % 2 == 1 { - // Server stream identifiers must be even - unimplemented!(); - } + fn is_valid_local_stream_id(id: StreamId) -> bool { + id.is_server_initiated() + } - // TODO: Ensure the `id` doesn't overflow u31 - - Ok(()) + fn is_valid_remote_stream_id(id: StreamId) -> bool { + id.is_client_initiated() } fn convert_send_message( diff --git a/tests/client_request.rs b/tests/client_request.rs index 78f5126..247d44c 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -11,10 +11,11 @@ use futures::*; #[test] fn handshake() { - let _ = ::env_logger::init().unwrap(); + let _ = ::env_logger::init(); let mock = mock_io::Builder::new() - .client_handshake() + .handshake() + .write(SETTINGS_ACK) .build(); let mut h2 = client::handshake(mock) @@ -25,36 +26,35 @@ fn handshake() { } #[test] -#[ignore] // Not working yet -fn hello_world() { - let _ = ::env_logger::init().unwrap(); +fn get_with_204_response() { + let _ = ::env_logger::init(); let mock = mock_io::Builder::new() - .client_handshake() - // GET https://example.com/ HEADERS frame - .write(&[0, 0, 13, 1, 5, 0, 0, 0, 1, 130, 135, 65, 136, 47, 145, 211, 93, 5, 92, 135, 167, 132]) - // .read(&[0, 0, 0, 1, 5, 0, 0, 0, 1]) + .handshake() + // Write GET / + .write(&[ + 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, + 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, + ]) + .write(SETTINGS_ACK) + // Read response + .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) .build(); let mut h2 = client::handshake(mock) .wait().unwrap(); - let mut request = request::Head::default(); - request.uri = "https://example.com/".parse().unwrap(); - // request.version = version::H2; - - println!("~~~ SEND REQUEST ~~~"); // Send the request - let mut h2 = h2.send_request(1, request, true).wait().unwrap(); + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); - println!("~~~ WAIT FOR RESPONSE ~~~"); - // Iterate the response frames - let mut h2 = Stream::wait(h2); + // Get the response + let (resp, h2) = h2.into_future().wait().unwrap(); - let headers = h2.next().unwrap(); + println!("RESP: {:?}", resp); - // At this point, the connection should be closed - assert!(h2.next().is_none()); + assert!(Stream::wait(h2).next().is_none()); } #[test] @@ -67,23 +67,29 @@ fn request_without_scheme() { fn request_with_h1_version() { } +#[test] +#[ignore] +fn invalid_client_stream_id() { +} + +#[test] +#[ignore] +fn invalid_server_stream_id() { +} + const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; trait MockH2 { - fn client_handshake(&mut self) -> &mut Self; + fn handshake(&mut self) -> &mut Self; } impl MockH2 for mock_io::Builder { - fn client_handshake(&mut self) -> &mut Self { + fn handshake(&mut self) -> &mut Self { self.write(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") // Settings frame .write(SETTINGS) - /* - .read(&[0, 0, 0, 4, 0, 0, 0, 0, 0]) - // Settings ACK - .write(&[0, 0, 0, 4, 1, 0, 0, 0, 0]) - .read(&[0, 0, 0, 4, 1, 0, 0, 0, 0]) - */ + .read(SETTINGS) + .read(SETTINGS_ACK) } }