diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 9ff672a..bdeb7ec 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -53,90 +53,75 @@ fn send_data_without_requesting_capacity() { fn release_capacity_sends_window_update() { let _ = ::env_logger::init(); - let payload = vec![0; 65_535]; + let payload = vec![0u8; 65_535]; - let mock = mock_io::Builder::new() - .handshake() - .write(&[ - // GET / - 0, 0, 16, 1, 5, 0, 0, 0, 1, 130, 135, 65, 139, 157, 41, - 172, 75, 143, 168, 233, 25, 151, 33, 233, 132, - ]) - .write(frames::SETTINGS_ACK) - // Read response - .read(&[0, 0, 1, 1, 4, 0, 0, 0, 1, 0x88]) - .read(&[ - // DATA - 0, 64, 0, 0, 0, 0, 0, 0, 1, - ]) - .read(&payload[0..16_384]) - .read(&[ - // DATA - 0, 64, 0, 0, 0, 0, 0, 0, 1, - ]) - .read(&payload[16_384..16_384*2]) - .read(&[ - // DATA - 0, 64, 0, 0, 0, 0, 0, 0, 1, - ]) - .read(&payload[16_384*2..16_384*3]) - .write(&[0, 0, 4, 8, 0, 0, 0, 0, 0, 0, 0, 128, 0]) - .write(&[0, 0, 4, 8, 0, 0, 0, 0, 1, 0, 0, 128, 0]) - .read(&[ - // DATA - 0, 63, 255, 0, 1, 0, 0, 0, 1, - ]) - .read(&payload[16_384*3..16_384*4 - 1]) + let (io, srv) = mock::new(); - // we send a 2nd stream in order to test the window update is - // is actually written to the socket - .write(&[ - // GET / - 0, 0, 4, 1, 5, 0, 0, 0, 3, 130, 135, 190, 132, - ]) - .read(&[0, 0, 1, 1, 5, 0, 0, 0, 3, 0x88]) - .build(); + let mock = srv.assert_client_handshake().unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos() + ) + .send_frame( + frames::headers(1) + .response(200) + ) + .send_frame(frames::data(1, &payload[0..16_384])) + .send_frame(frames::data(1, &payload[16_384..16_384 * 2])) + .send_frame(frames::data(1, &payload[16_384 * 2..16_384 * 3])) + .recv_frame( + frames::window_update(0, 32_768) + ) + .recv_frame( + frames::window_update(1, 32_768) + ) + .send_frame( + frames::data(1, &payload[16_384 * 3..]) + .eos() + ) + // gotta end the connection + .map(drop); - let mut h2 = Client::handshake(mock) - .wait().unwrap(); + let h2 = Client::handshake(io).unwrap() + .and_then(|mut h2| { + let request = Request::builder() + .method(Method::GET) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); - let request = Request::builder() - .method(Method::GET) - .uri("https://http2.akamai.com/") - .body(()).unwrap(); + let req = h2.request(request, true).unwrap() + .unwrap() + // Get the response + .and_then(|resp| { + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_parts().1; + body.into_future().unwrap() + }) - let mut stream = h2.request(request, true).unwrap(); - - // Get the response - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); - assert_eq!(resp.status(), StatusCode::OK); - - // read some body to use up window size to below half - let mut body = resp.into_parts().1; - let buf = h2.run(poll_fn(|| body.poll())).unwrap().unwrap(); - assert_eq!(buf.len(), 16_384); - let buf = h2.run(poll_fn(|| body.poll())).unwrap().unwrap(); - assert_eq!(buf.len(), 16_384); - let buf = h2.run(poll_fn(|| body.poll())).unwrap().unwrap(); - assert_eq!(buf.len(), 16_384); - - // release some capacity to send a window_update - body.release_capacity(buf.len() * 2).unwrap(); - - let buf = h2.run(poll_fn(|| body.poll())).unwrap().unwrap(); - assert_eq!(buf.len(), 16_383); - - - // send a 2nd stream to force flushing of window updates - let request = Request::builder() - .method(Method::GET) - .uri("https://http2.akamai.com/") - .body(()).unwrap(); - let mut stream = h2.request(request, true).unwrap(); - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); - assert_eq!(resp.status(), StatusCode::OK); - - h2.wait().unwrap(); + // read some body to use up window size to below half + .and_then(|(buf, body)| { + assert_eq!(buf.unwrap().len(), 16_384); + body.into_future().unwrap() + }) + .and_then(|(buf, body)| { + assert_eq!(buf.unwrap().len(), 16_384); + body.into_future().unwrap() + }) + .and_then(|(buf, mut body)| { + let buf = buf.unwrap(); + assert_eq!(buf.len(), 16_384); + body.release_capacity(buf.len() * 2).unwrap(); + body.into_future().unwrap() + }) + .and_then(|(buf, _)| { + assert_eq!(buf.unwrap().len(), 16_383); + Ok(()) + }); + h2.unwrap().join(req) + }); + h2.join(mock).wait().unwrap(); } #[test] @@ -145,63 +130,52 @@ fn release_capacity_of_small_amount_does_not_send_window_update() { let payload = [0; 16]; - let mock = mock_io::Builder::new() - .handshake() - .write(&[ - // GET / - 0, 0, 16, 1, 5, 0, 0, 0, 1, 130, 135, 65, 139, 157, 41, - 172, 75, 143, 168, 233, 25, 151, 33, 233, 132, - ]) - .write(frames::SETTINGS_ACK) - // Read response - .read(&[0, 0, 1, 1, 4, 0, 0, 0, 1, 0x88]) - .read(&[ - // DATA - 0, 0, 16, 0, 1, 0, 0, 0, 1, - ]) - .read(&payload[..]) - // write() or WINDOW_UPDATE purposefully excluded + let (io, srv) = mock::new(); - // we send a 2nd stream in order to test the window update is - // is actually written to the socket - .write(&[ - // GET / - 0, 0, 4, 1, 5, 0, 0, 0, 3, 130, 135, 190, 132, - ]) - .read(&[0, 0, 1, 1, 5, 0, 0, 0, 3, 0x88]) - .build(); + let mock = srv.assert_client_handshake().unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos() + ) + .send_frame( + frames::headers(1) + .response(200) + ) + .send_frame(frames::data(1, &payload[..]).eos()) + // gotta end the connection + .map(drop); - let mut h2 = Client::handshake(mock) - .wait().unwrap(); + let h2 = Client::handshake(io).unwrap() + .and_then(|mut h2| { + let request = Request::builder() + .method(Method::GET) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); - let request = Request::builder() - .method(Method::GET) - .uri("https://http2.akamai.com/") - .body(()).unwrap(); - - let mut stream = h2.request(request, true).unwrap(); - - // Get the response - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); - assert_eq!(resp.status(), StatusCode::OK); - - let mut body = resp.into_parts().1; - let buf = h2.run(poll_fn(|| body.poll())).unwrap().unwrap(); - - // release some capacity to send a window_update - body.release_capacity(buf.len()).unwrap(); - - // send a 2nd stream to force flushing of window updates - let request = Request::builder() - .method(Method::GET) - .uri("https://http2.akamai.com/") - .body(()).unwrap(); - let mut stream = h2.request(request, true).unwrap(); - let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); - assert_eq!(resp.status(), StatusCode::OK); - - - h2.wait().unwrap(); + let req = h2.request(request, true).unwrap() + .unwrap() + // Get the response + .and_then(|resp| { + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_parts().1; + body.into_future().unwrap() + }) + // read the small body and then release it + .and_then(|(buf, mut body)| { + let buf = buf.unwrap(); + assert_eq!(buf.len(), 16); + body.release_capacity(buf.len()).unwrap(); + body.into_future().unwrap() + }) + .and_then(|(buf, _)| { + assert!(buf.is_none()); + Ok(()) + }); + h2.unwrap().join(req) + }); + h2.join(mock).wait().unwrap(); } #[test] diff --git a/tests/support/src/frames.rs b/tests/support/src/frames.rs new file mode 100644 index 0000000..3348ee4 --- /dev/null +++ b/tests/support/src/frames.rs @@ -0,0 +1,159 @@ +use std::fmt; + +use bytes::{Bytes, IntoBuf}; +use http::{self, HeaderMap, HttpTryFrom}; + +use h2::frame::{self, Frame, StreamId}; +use super::SendFrame; + +pub const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; +pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; + +// ==== helper functions to easily construct h2 Frames ==== + +pub fn headers(id: T) -> MockHeaders + where T: Into, +{ + MockHeaders(frame::Headers::new( + id.into(), + frame::Pseudo::default(), + HeaderMap::default(), + )) +} + +pub fn data(id: T, buf: B) -> MockData + where T: Into, + B: Into, +{ + MockData(frame::Data::new(id.into(), buf.into())) +} + +pub fn window_update(id: T, sz: u32) -> frame::WindowUpdate + where T: Into, +{ + frame::WindowUpdate::new(id.into(), sz) +} + +// Headers helpers + +pub struct MockHeaders(frame::Headers); + +impl MockHeaders { + pub fn request(self, method: M, uri: U) -> Self + where M: HttpTryInto, + U: HttpTryInto, + { + let method = method.try_into().unwrap(); + let uri = uri.try_into().unwrap(); + let (id, _, fields) = self.into_parts(); + let frame = frame::Headers::new( + id, + frame::Pseudo::request(method, uri), + fields + ); + MockHeaders(frame) + } + + pub fn response(self, status: S) -> Self + where S: HttpTryInto, + { + let status = status.try_into().unwrap(); + let (id, _, fields) = self.into_parts(); + let frame = frame::Headers::new( + id, + frame::Pseudo::response(status), + fields + ); + MockHeaders(frame) + } + + pub fn fields(self, fields: HeaderMap) -> Self { + let (id, pseudo, _) = self.into_parts(); + let frame = frame::Headers::new(id, pseudo, fields); + MockHeaders(frame) + } + + pub fn eos(mut self) -> Self { + self.0.set_end_stream(); + self + } + + fn into_parts(self) -> (StreamId, frame::Pseudo, HeaderMap) { + assert!(!self.0.is_end_stream(), "eos flag will be lost"); + assert!(self.0.is_end_headers(), "unset eoh will be lost"); + let id = self.0.stream_id(); + let parts = self.0.into_parts(); + (id, parts.0, parts.1) + } +} + +impl fmt::Debug for MockHeaders { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.0, f) + } +} + +impl From for Frame { + fn from(src: MockHeaders) -> Self { + Frame::Headers(src.0) + } +} + +impl From for SendFrame { + fn from(src: MockHeaders) -> Self { + Frame::Headers(src.0) + } +} + +// Data helpers + +pub struct MockData(frame::Data); + +impl MockData { + pub fn eos(mut self) -> Self { + self.0.set_end_stream(true); + self + } +} + +impl fmt::Debug for MockData { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&self.0, f) + } +} + +impl From for Frame { + fn from(src: MockData) -> Self { + Frame::Data(src.0) + } +} + +impl From for SendFrame { + fn from(src: MockData) -> Self { + let id = src.0.stream_id(); + let eos = src.0.is_end_stream(); + let payload = src.0.into_payload(); + let mut frame = frame::Data::new(id, payload.into_buf()); + frame.set_end_stream(eos); + Frame::Data(frame) + } +} + +// ==== "trait alias" for types that are HttpTryFrom and have Debug Errors ==== + +pub trait HttpTryInto { + type Error: fmt::Debug; + + fn try_into(self) -> Result; +} + +impl HttpTryInto for U + where T: HttpTryFrom, + T::Error: fmt::Debug, +{ + type Error = T::Error; + + fn try_into(self) -> Result { + T::try_from(self) + } +} diff --git a/tests/support/src/lib.rs b/tests/support/src/lib.rs index 9fe0814..d060fbd 100644 --- a/tests/support/src/lib.rs +++ b/tests/support/src/lib.rs @@ -16,6 +16,7 @@ mod assert; #[macro_use] pub mod raw; +pub mod frames; pub mod prelude; pub mod mock; diff --git a/tests/support/src/mock.rs b/tests/support/src/mock.rs index cc53f76..b4e4eb5 100644 --- a/tests/support/src/mock.rs +++ b/tests/support/src/mock.rs @@ -3,13 +3,13 @@ use {FutureExt, SendFrame}; use h2::{self, SendError, RecvError}; use h2::frame::{self, Frame}; -use futures::{Future, Stream, Poll}; +use futures::{Async, Future, Stream, Poll}; use futures::task::{self, Task}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::read_exact; -use std::{cmp, io}; +use std::{cmp, fmt, io}; use std::io::ErrorKind::WouldBlock; use std::sync::{Arc, Mutex}; @@ -303,3 +303,118 @@ impl AsyncWrite for Pipe { Ok(().into()) } } + +pub trait HandleFutureExt { + fn recv_settings(self) -> RecvFrame, Handle), Error=()>>> + where Self: Sized + 'static, + Self: Future, + Self::Error: fmt::Debug, + { + let map = self.map(|(settings, handle)| (Some(settings.into()), handle)).unwrap(); + + let boxed: Box, Handle), Error=()>> = Box::new(map); + RecvFrame { + inner: boxed, + frame: frame::Settings::default().into(), + } + } + + fn recv_frame(self, frame: T) -> RecvFrame<::Future> + where Self: IntoRecvFrame + Sized, + T: Into, + { + self.into_recv_frame(frame.into()) + } + + fn send_frame(self, frame: T) -> SendFrameFut + where Self: Sized, + T: Into, + { + SendFrameFut { + inner: self, + frame: Some(frame.into()), + } + } +} + +pub struct RecvFrame { + inner: T, + frame: Frame, +} + +impl Future for RecvFrame + where T: Future, Handle)>, + T::Error: fmt::Debug, +{ + type Item = Handle; + type Error = (); + + fn poll(&mut self) -> Poll { + let (frame, handle) = match self.inner.poll().unwrap() { + Async::Ready((frame, handle)) => (frame, handle), + Async::NotReady => return Ok(Async::NotReady), + }; + assert_eq!(frame.unwrap(), self.frame); + Ok(Async::Ready(handle)) + } +} + +pub struct SendFrameFut { + inner: T, + frame: Option, +} + +impl Future for SendFrameFut + where T: Future, + T::Error: fmt::Debug, +{ + type Item = Handle; + type Error = (); + + fn poll(&mut self) -> Poll { + let mut handle = match self.inner.poll().unwrap() { + Async::Ready(handle) => handle, + Async::NotReady => return Ok(Async::NotReady), + }; + handle.send(self.frame.take().unwrap()).unwrap(); + Ok(Async::Ready(handle)) + } +} + +impl HandleFutureExt for T + where T: Future + 'static, +{ +} + +pub trait IntoRecvFrame { + type Future: Future; + fn into_recv_frame(self, frame: Frame) -> RecvFrame; +} + +impl IntoRecvFrame for Handle { + type Future = ::futures::stream::StreamFuture; + + fn into_recv_frame(self, frame: Frame) -> RecvFrame { + RecvFrame { + inner: self.into_future(), + frame: frame, + } + } +} + +impl IntoRecvFrame for T + where T: Future + 'static, + T::Error: fmt::Debug, +{ + type Future = Box, Handle), Error=()>>; + + fn into_recv_frame(self, frame: Frame) -> RecvFrame { + let into_fut = Box::new(self.unwrap() + .and_then(|handle| handle.into_future().unwrap()) + ); + RecvFrame { + inner: into_fut, + frame: frame, + } + } +} diff --git a/tests/support/src/prelude.rs b/tests/support/src/prelude.rs index d4c662f..f8d7c87 100644 --- a/tests/support/src/prelude.rs +++ b/tests/support/src/prelude.rs @@ -8,7 +8,10 @@ pub use self::h2::client::{self, Client}; pub use self::h2::server::{self, Server}; // Re-export mock -pub use super::mock; +pub use super::mock::{self, HandleFutureExt}; + +// Re-export frames helpers +pub use super::frames; // Re-export some type defines pub use super::{Codec, SendFrame}; @@ -100,9 +103,3 @@ impl ClientExt for Client } } -pub mod frames { - //! Some useful frames - - pub const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; - pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; -}