diff --git a/examples/client.rs b/examples/client.rs index 130d9c4..7e79c51 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -8,7 +8,7 @@ extern crate env_logger; use h2::client; -use http::request; +use http::*; use futures::*; @@ -34,11 +34,19 @@ pub fn main() { println!("sending request"); let mut request = request::Head::default(); + request.method = method::POST; request.uri = "https://http2.akamai.com/".parse().unwrap(); // request.version = version::H2; conn.send_request(1.into(), request, true) }) + /* + .then(|res| { + let conn = res.unwrap(); + + conn.send_data(1.into(), "hello".into(), true) + }) + */ .then(|res| { let conn = res.unwrap(); // Get the next message diff --git a/examples/server.rs b/examples/server.rs index bcd1f54..d826595 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -38,6 +38,12 @@ pub fn main() { // Receive a request conn.into_future() + .then(|res| { + let (frame, conn) = res.unwrap(); + println!("Zomg frame; {:?}", frame); + + conn.into_future() + }) .then(|res| { let (frame, conn) = res.unwrap(); println!("Zomg frame; {:?}", frame); @@ -47,12 +53,6 @@ pub fn main() { conn.send_response(1.into(), response, false) }) - .then(|res| { - let conn = res.unwrap(); - println!("... sending data frame"); - - conn.send_data(1.into(), "hello".into(), false) - }) .then(|res| { let conn = res.unwrap(); println!("... sending next frame"); diff --git a/src/client.rs b/src/client.rs index bea78b3..eaaf601 100644 --- a/src/client.rs +++ b/src/client.rs @@ -56,8 +56,8 @@ impl Peer for Client { id.is_client_initiated() } - fn is_valid_remote_stream_id(id: StreamId) -> bool { - id.is_server_initiated() + fn is_valid_remote_stream_id(_id: StreamId) -> bool { + false } fn convert_send_message( diff --git a/src/error.rs b/src/error.rs index 5a84bc2..16c3ca0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -62,6 +62,9 @@ pub enum User { /// transmit a Data frame to the remote. FlowControlViolation, + /// The connection state is corrupt and the connection should be dropped. + Corrupt, + // TODO: reserve additional variants } @@ -100,6 +103,7 @@ macro_rules! user_desc { InactiveStreamId => concat!($prefix, "inactive stream ID"), UnexpectedFrameType => concat!($prefix, "unexpected frame type"), FlowControlViolation => concat!($prefix, "flow control violation"), + Corrupt => concat!($prefix, "connection state corrupt"), } }); } diff --git a/src/frame/data.rs b/src/frame/data.rs index 84deb62..cfe5516 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -5,7 +5,7 @@ use bytes::{BufMut, Bytes, Buf}; #[derive(Debug)] pub struct Data { stream_id: StreamId, - data_len: FrameSize, + //data_len: FrameSize, data: T, flags: DataFlag, pad_len: Option, @@ -30,7 +30,7 @@ impl Data { }; Ok(Data { stream_id: head.stream_id(), - data_len: payload.len() as FrameSize, + //data_len: payload.len() as FrameSize, data: payload, flags: flags, pad_len: pad_len, @@ -55,16 +55,16 @@ impl Data { Head::new(Kind::Data, self.flags.into(), self.stream_id) } - pub fn len(&self) -> FrameSize { - self.data_len - } - pub fn into_payload(self) -> T { self.data } } impl Data { + pub fn len(&self) -> usize { + self.data.remaining() + } + pub fn from_buf(stream_id: StreamId, data: T, eos: bool) -> Self { let mut flags = DataFlag::default(); if eos { @@ -72,7 +72,7 @@ impl Data { } Data { stream_id, - data_len: data.remaining() as FrameSize, + //data_len: data.remaining() as FrameSize, data, flags, pad_len: None, diff --git a/src/lib.rs b/src/lib.rs index 841a497..83352bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,7 +54,7 @@ pub enum Frame { id: StreamId, data: B, /// TODO figure out how to make this a requirement on `B` - data_len: FrameSize, + //data_len: FrameSize, end_of_stream: bool, }, Trailers { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 3aa6564..8d9fef1 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,23 +1,23 @@ use {ConnectionError, Frame, FrameSize}; use client::Client; +use error; use frame::{self, SettingSet, StreamId}; use proto::{self, ControlFlow, ControlPing, ControlSettings, Peer, PingPayload, ReadySink, WindowSize}; use server::Server; -use tokio_io::{AsyncRead, AsyncWrite}; - -use http::{request, response}; use bytes::{Bytes, IntoBuf}; - +use http::{request, response}; use futures::*; - +use tokio_io::{AsyncRead, AsyncWrite}; use std::marker::PhantomData; /// An H2 connection #[derive(Debug)] pub struct Connection { inner: proto::Transport, - peer: PhantomData

, + // Set to `true` as long as the connection is in a valid state. + active: bool, + _phantom: PhantomData<(P, B)>, } pub fn new(transport: proto::Transport) @@ -28,13 +28,14 @@ pub fn new(transport: proto::Transport) { Connection { inner: transport, - peer: PhantomData, + active: true, + _phantom: PhantomData, } } impl ControlSettings for Connection - where T: ControlSettings, + where T: AsyncRead + AsyncWrite, B: IntoBuf, { fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { @@ -51,7 +52,7 @@ impl ControlSettings for Connection } impl ControlFlow for Connection - where T: ControlFlow, + where T: AsyncRead + AsyncWrite, B: IntoBuf, { fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { @@ -65,7 +66,6 @@ impl ControlFlow for Connection impl ControlPing for Connection where T: AsyncRead + AsyncWrite, - T: ControlPing, P: Peer, B: IntoBuf, { @@ -146,6 +146,10 @@ impl Stream for Connection use frame::Frame::*; trace!("poll"); + if !self.active { + return Err(error::User::Corrupt.into()); + } + loop { let frame = match try!(self.inner.poll()) { Async::Ready(f) => f, @@ -153,7 +157,7 @@ impl Stream for Connection // Receiving new frames may depend on ensuring that the write buffer // is clear (e.g. if window updates need to be sent), so `poll_complete` // is called here. - try_ready!(self.inner.poll_complete()); + try_ready!(self.poll_complete()); // If the write buffer is cleared, attempt to poll the underlying // stream once more because it, may have been made ready. @@ -172,7 +176,7 @@ impl Stream for Connection Some(Data(v)) => Frame::Data { id: v.stream_id(), end_of_stream: v.is_end_stream(), - data_len: v.len(), + //data_len: v.len(), data: v.into_payload(), }, @@ -199,9 +203,13 @@ impl Sink for Connection { trace!("start_send"); + if !self.active { + return Err(error::User::Corrupt.into()); + } + // Ensure the transport is ready to send a frame before we transform the external - // `Frame` into an internal `frame::Framme`. - if self.inner.poll_ready()? == Async::NotReady { + // `Frame` into an internal `frame::Frame`. + if !try!(self.poll_ready()).is_ready() { return Ok(AsyncSink::NotReady(item)); } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index 4ea6bf8..f8f89f1 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -47,7 +47,7 @@ impl FlowControl initial_local_window_size, initial_remote_window_size, connection_local_flow_controller: FlowControlState::with_initial_size(initial_local_window_size), - connection_remote_flow_controller: FlowControlState::with_next_update(initial_remote_window_size), + connection_remote_flow_controller: FlowControlState::with_initial_size(initial_remote_window_size), blocked_remote_window_update: None, sending_local_window_update: None, pending_local_window_updates: VecDeque::new(), diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index 85d05ec..c7842e3 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -1,6 +1,6 @@ use ConnectionError; -use error::Reason::ProtocolError; -use error::User::InvalidStreamId; +use error::Reason; +use error::User; use frame::{self, Frame}; use proto::*; @@ -125,7 +125,7 @@ impl Stream for StreamTracker // connections should not be factored. if !P::is_valid_remote_stream_id(id) { - unimplemented!(); + return Err(Reason::ProtocolError.into()); } } @@ -134,7 +134,7 @@ impl Stream for StreamTracker Some(Data(v)) => { match self.streams.get_mut(&v.stream_id()) { - None => return Err(ProtocolError.into()), + None => return Err(Reason::ProtocolError.into()), Some(state) => state.recv_data(v.is_end_stream())?, } Ok(Async::Ready(Some(Data(v)))) @@ -179,14 +179,14 @@ impl Sink for StreamTracker // connections should not be factored. if !P::is_valid_local_stream_id(id) { // TODO: clear state - return Err(InvalidStreamId.into()); + return Err(User::InvalidStreamId.into()); } } } &Data(ref v) => { match self.streams.get_mut(&v.stream_id()) { - None => return Err(ProtocolError.into()), + None => return Err(User::InactiveStreamId.into()), Some(state) => state.send_data(v.is_end_stream())?, } } diff --git a/src/server.rs b/src/server.rs index fe40040..af8bbc2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -110,8 +110,8 @@ impl Peer for Server { type Send = http::response::Head; type Poll = http::request::Head; - fn is_valid_local_stream_id(id: StreamId) -> bool { - id.is_server_initiated() + fn is_valid_local_stream_id(_id: StreamId) -> bool { + false } fn is_valid_remote_stream_id(id: StreamId) -> bool { diff --git a/tests/client_request.rs b/tests/client_request.rs index 6557135..4407266 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -5,198 +5,472 @@ extern crate mock_io; extern crate env_logger; extern crate bytes; -use h2::client; -use http::request; -use bytes::Bytes; +// scoped so `cargo test client_request` dtrt. +mod client_request { + use h2::{client, Frame}; + use http::*; -use futures::*; + use futures::*; + use bytes::Bytes; + use mock_io; -// TODO: move into another file -macro_rules! assert_user_err { - ($actual:expr, $err:ident) => {{ - use h2::error::{ConnectionError, User}; + // TODO: move into another file + macro_rules! assert_user_err { + ($actual:expr, $err:ident) => {{ + use h2::error::{ConnectionError, User}; - match $actual { - ConnectionError::User(e) => assert_eq!(e, User::$err), - _ => panic!("unexpected connection error type"), + match $actual { + ConnectionError::User(e) => assert_eq!(e, User::$err), + _ => panic!("unexpected connection error type"), + } + }}; + } + + macro_rules! assert_proto_err { + ($actual:expr, $err:ident) => {{ + use h2::error::{ConnectionError, Reason}; + + match $actual { + ConnectionError::Proto(e) => assert_eq!(e, Reason::$err), + _ => panic!("unexpected connection error type"), + } + }}; + } + + #[test] + fn handshake() { + let _ = ::env_logger::init(); + + let mock = mock_io::Builder::new() + .handshake() + .write(SETTINGS_ACK) + .build(); + + let h2 = client::handshake(mock) + .wait().unwrap(); + + // At this point, the connection should be closed + assert!(Stream::wait(h2).next().is_none()); + } + + #[test] + fn get_with_204_response() { + let _ = ::env_logger::init(); + + let mock = mock_io::Builder::new() + .handshake() + // Write GET / + .write(&[ + 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, + 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, + ]) + .write(SETTINGS_ACK) + // Read response + .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) + .build(); + + let h2 = client::handshake(mock) + .wait().unwrap(); + + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + + // Get the response + let (resp, h2) = h2.into_future().wait().unwrap(); + + match resp.unwrap() { + Frame::Headers { headers, .. } => { + assert_eq!(headers.status, status::NO_CONTENT); + } + _ => panic!("unexpected frame"), } - }}; -} -#[test] -fn handshake() { - let _ = ::env_logger::init(); + // No more frames + assert!(Stream::wait(h2).next().is_none()); + } - let mock = mock_io::Builder::new() - .handshake() - .write(SETTINGS_ACK) - .build(); + #[test] + fn get_with_200_response() { + let _ = ::env_logger::init(); - let mut h2 = client::handshake(mock) - .wait().unwrap(); + let mock = mock_io::Builder::new() + .handshake() + // Write GET / + .write(&[ + 0, 0, 16, 1, 5, 0, 0, 0, 1, 130, 135, 65, 139, 157, 41, 172, 75, + 143, 168, 233, 25, 151, 33, 233, 132 + ]) + .write(SETTINGS_ACK) + // Read response + .read(&[ + 0, 0, 1, 1, 4, 0, 0, 0, 1, 136, 0, 0, 5, 0, 0, 0, 0, 0, 1, 104, 101, + 108, 108, 111, 0, 0, 5, 0, 1, 0, 0, 0, 1, 119, 111, 114, 108, 100, + ]) + .build(); - // At this point, the connection should be closed - assert!(Stream::wait(h2).next().is_none()); -} + let h2 = client::handshake(mock) + .wait().unwrap(); -#[test] -fn get_with_204_response() { - let _ = ::env_logger::init(); + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); - let mock = mock_io::Builder::new() - .handshake() - // Write GET / - .write(&[ - 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, - 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, - ]) - .write(SETTINGS_ACK) - // Read response - .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) - .build(); + // Get the response headers + let (resp, h2) = h2.into_future().wait().unwrap(); - let mut h2 = client::handshake(mock) - .wait().unwrap(); + match resp.unwrap() { + Frame::Headers { headers, .. } => { + assert_eq!(headers.status, status::OK); + } + _ => panic!("unexpected frame"), + } - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + // Get the response body + let (data, h2) = h2.into_future().wait().unwrap(); - // Get the response - let (resp, h2) = h2.into_future().wait().unwrap(); + match data.unwrap() { + Frame::Data { id, data, end_of_stream, .. } => { + assert_eq!(id, 1.into()); + assert_eq!(data, &b"hello"[..]); + assert!(!end_of_stream); + } + _ => panic!("unexpected frame"), + } - assert!(Stream::wait(h2).next().is_none()); -} + // Get the response body + let (data, h2) = h2.into_future().wait().unwrap(); -#[test] -#[ignore] -fn get_with_200_response() { - let _ = ::env_logger::init(); + match data.unwrap() { + Frame::Data { id, data, end_of_stream, .. } => { + assert_eq!(id, 1.into()); + assert_eq!(data, &b"world"[..]); + assert!(end_of_stream); + } + _ => panic!("unexpected frame"), + } - let mock = mock_io::Builder::new() - .handshake() - // Write GET / - .write(&[ - 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, - 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, - ]) - .write(SETTINGS_ACK) - // Read response - .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) - .build(); + assert!(Stream::wait(h2).next().is_none()); + } - let mut h2 = client::handshake(mock) - .wait().unwrap(); + #[test] + #[ignore] + fn post_with_large_body() { + let _ = ::env_logger::init(); - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + let mock = mock_io::Builder::new() + .handshake() + .write(&[ + // POST / + 0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, + 172, 75, 143, 168, 233, 25, 151, 33, 233, 132, + ]) + .write(&[ + // DATA + 0, 0, 5, 0, 1, 0, 0, 0, 1, 104, 101, 108, 108, 111, + ]) + .write(SETTINGS_ACK) + // Read response + .read(&[ + // HEADERS + 0, 0, 1, 1, 4, 0, 0, 0, 1, 136, + // DATA + 0, 0, 5, 0, 1, 0, 0, 0, 1, 119, 111, 114, 108, 100 + ]) + .build(); - // Get the response - let (resp, h2) = h2.into_future().wait().unwrap(); + let h2 = client::handshake(mock) + .wait().unwrap(); - assert!(Stream::wait(h2).next().is_none()); -} + // Send the request + let mut request = request::Head::default(); + request.method = method::POST; + request.uri = "https://http2.akamai.com/".parse().unwrap(); + let h2 = h2.send_request(1.into(), request, false).wait().unwrap(); -#[test] -fn request_with_zero_stream_id() { - let mock = mock_io::Builder::new() - .handshake() - .build(); + // Send the data + let b = [0; 300]; + let h2 = h2.send_data(1.into(), (&b[..]).into(), true).wait().unwrap(); - let h2 = client::handshake(mock) - .wait().unwrap(); + // Get the response headers + let (resp, h2) = h2.into_future().wait().unwrap(); - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); + match resp.unwrap() { + Frame::Headers { headers, .. } => { + assert_eq!(headers.status, status::OK); + } + _ => panic!("unexpected frame"), + } - let err = h2.send_request(0.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, InvalidStreamId); -} + // Get the response body + let (data, h2) = h2.into_future().wait().unwrap(); -#[test] -fn request_with_server_stream_id() { - let mock = mock_io::Builder::new() - .handshake() - .build(); + match data.unwrap() { + Frame::Data { id, data, end_of_stream, .. } => { + assert_eq!(id, 1.into()); + assert_eq!(data, &b"world"[..]); + assert!(end_of_stream); + } + _ => panic!("unexpected frame"), + } - let h2 = client::handshake(mock) - .wait().unwrap(); + assert!(Stream::wait(h2).next().is_none()); + } - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); + #[test] + fn request_with_zero_stream_id() { + let mock = mock_io::Builder::new() + .handshake() + .build(); - let err = h2.send_request(2.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, InvalidStreamId); -} + let h2 = client::handshake(mock) + .wait().unwrap(); -#[test] -#[ignore] -fn send_data_without_headers() { - let mock = mock_io::Builder::new() - .handshake() - .build(); + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = client::handshake(mock) - .wait().unwrap(); + let err = h2.send_request(0.into(), request, true).wait().unwrap_err(); + assert_user_err!(err, InvalidStreamId); + } - // Send the request - let mut request = request::Head::default(); - request.uri = "https://http2.akamai.com/".parse().unwrap(); + #[test] + fn post_with_200_response() { + let _ = ::env_logger::init(); - /* - let err = h2.send_request(2.into(), request, true).wait().unwrap_err(); - assert_user_err!(err, InvalidStreamId); - */ -} + let mock = mock_io::Builder::new() + .handshake() + .write(&[ + // POST / + 0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, + 172, 75, 143, 168, 233, 25, 151, 33, 233, 132, + ]) + .write(&[ + // DATA + 0, 0, 5, 0, 1, 0, 0, 0, 1, 104, 101, 108, 108, 111, + ]) + .write(SETTINGS_ACK) + // Read response + .read(&[ + // HEADERS + 0, 0, 1, 1, 4, 0, 0, 0, 1, 136, + // DATA + 0, 0, 5, 0, 1, 0, 0, 0, 1, 119, 111, 114, 108, 100 + ]) + .build(); -#[test] -#[ignore] -fn send_data_after_headers_eos() { -} + let h2 = client::handshake(mock).wait().expect("handshake"); -#[test] -#[ignore] -fn request_without_scheme() { -} + // Send the request + let mut request = request::Head::default(); + request.method = method::POST; + request.uri = "https://http2.akamai.com/".parse().unwrap(); + let h2 = h2.send_request(1.into(), request, false).wait().expect("send request"); -#[test] -#[ignore] -fn request_with_h1_version() { -} + let b = "hello"; -#[test] -#[ignore] -fn invalid_client_stream_id() { -} + // Send the data + let h2 = h2.send_data(1.into(), b.into(), true).wait().expect("send data"); -#[test] -#[ignore] -fn invalid_server_stream_id() { -} + // Get the response headers + let (resp, h2) = h2.into_future().wait().expect("into future"); -#[test] -#[ignore] -fn exceed_max_streams() { -} + match resp.expect("response headers") { + Frame::Headers { headers, .. } => { + assert_eq!(headers.status, status::OK); + } + _ => panic!("unexpected frame"), + } -const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; -const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; + // Get the response body + let (data, h2) = h2.into_future().wait().expect("into future"); -trait MockH2 { - fn handshake(&mut self) -> &mut Self; -} + match data.expect("response data") { + Frame::Data { id, data, end_of_stream, .. } => { + assert_eq!(id, 1.into()); + assert_eq!(data, &b"world"[..]); + assert!(end_of_stream); + } + _ => panic!("unexpected frame"), + } -impl MockH2 for mock_io::Builder { - fn handshake(&mut self) -> &mut Self { - self.write(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") - // Settings frame - .write(SETTINGS) - .read(SETTINGS) - .read(SETTINGS_ACK) + assert!(Stream::wait(h2).next().is_none()); + } + + #[test] + fn request_with_server_stream_id() { + let mock = mock_io::Builder::new() + .handshake() + .build(); + + let h2 = client::handshake(mock) + .wait().unwrap(); + + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + + let err = h2.send_request(2.into(), request, true).wait().unwrap_err(); + assert_user_err!(err, InvalidStreamId); + } + + #[test] + fn send_headers_twice_with_same_stream_id() { + let _ = ::env_logger::init(); + + let mock = mock_io::Builder::new() + .handshake() + // Write GET / + .write(&[ + 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, + 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, + ]) + .build(); + + let h2 = client::handshake(mock) + .wait().unwrap(); + + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + + // Send another request with the same stream ID + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + let err = h2.send_request(1.into(), request, true).wait().unwrap_err(); + + assert_user_err!(err, UnexpectedFrameType); + } + + #[test] + fn send_data_without_headers() { + let mock = mock_io::Builder::new() + .handshake() + .build(); + + let h2 = client::handshake(mock) + .wait().unwrap(); + + let b = Bytes::from_static(b"hello world"); + let err = h2.send_data(1.into(), b, true).wait().unwrap_err(); + + assert_user_err!(err, InactiveStreamId); + } + + #[test] + fn send_data_after_headers_eos() { + let _ = ::env_logger::init(); + + let mock = mock_io::Builder::new() + .handshake() + // Write GET / + .write(&[ + // GET /, no EOS + 0, 0, 16, 1, 5, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, + 75, 143, 168, 233, 25, 151, 33, 233, 132 + ]) + .build(); + + let h2 = client::handshake(mock) + .wait().expect("handshake"); + + // Send the request + let mut request = request::Head::default(); + request.method = method::POST; + request.uri = "https://http2.akamai.com/".parse().unwrap(); + + let id = 1.into(); + let h2 = h2.send_request(id, request, true).wait().expect("send request"); + + let body = "hello"; + + // Send the data + let err = h2.send_data(id, body.into(), true).wait().unwrap_err(); + assert_user_err!(err, UnexpectedFrameType); + } + + #[test] + #[ignore] + fn request_without_scheme() { + } + + #[test] + #[ignore] + fn request_with_h1_version() { + } + + #[test] + fn invalid_client_stream_id() { + let _ = ::env_logger::init(); + + for &id in &[0, 2] { + let mock = mock_io::Builder::new() + .handshake() + .build(); + + let h2 = client::handshake(mock) + .wait().unwrap(); + + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + let err = h2.send_request(id.into(), request, true).wait().unwrap_err(); + + assert_user_err!(err, InvalidStreamId); + } + } + + #[test] + fn invalid_server_stream_id() { + let _ = ::env_logger::init(); + + let mock = mock_io::Builder::new() + .handshake() + // Write GET / + .write(&[ + 0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, + 0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, + ]) + .write(SETTINGS_ACK) + // Read response + .read(&[0, 0, 1, 1, 5, 0, 0, 0, 2, 137]) + .build(); + + let h2 = client::handshake(mock) + .wait().unwrap(); + + // Send the request + let mut request = request::Head::default(); + request.uri = "https://http2.akamai.com/".parse().unwrap(); + let h2 = h2.send_request(1.into(), request, true).wait().unwrap(); + + // Get the response + let (err, _) = h2.into_future().wait().unwrap_err(); + assert_proto_err!(err, ProtocolError); + } + + #[test] + #[ignore] + fn exceed_max_streams() { + } + + const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; + const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; + + trait MockH2 { + fn handshake(&mut self) -> &mut Self; + } + + impl MockH2 for mock_io::Builder { + fn handshake(&mut self) -> &mut Self { + self.write(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") + // Settings frame + .write(SETTINGS) + .read(SETTINGS) + .read(SETTINGS_ACK) + } } }