Update h2-tests to std-future

This commit is contained in:
Gurwinder Singh
2019-08-15 08:26:12 +05:30
committed by Sean McArthur
parent 529ef4cd40
commit 97a4c8049c
12 changed files with 3610 additions and 3779 deletions

View File

@@ -10,4 +10,5 @@ edition = "2018"
[dev-dependencies]
h2-support = { path = "../h2-support" }
log = "0.4.1"
tokio = "0.1.8"
futures-preview = "0.3.0-alpha.18"
tokio = { git = "https://github.com/tokio-rs/tokio" }

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +1,11 @@
use h2_support::prelude::*;
#![feature(async_await)]
use futures::future::join;
use h2_support::prelude::*;
use std::error::Error;
#[test]
fn read_none() {
#[tokio::test]
async fn read_none() {
let mut codec = Codec::from(mock_io::Builder::new().build());
assert_closed!(codec);
@@ -15,8 +17,8 @@ fn read_frame_too_big() {}
// ===== DATA =====
#[test]
fn read_data_no_padding() {
#[tokio::test]
async fn read_data_no_padding() {
let mut codec = raw_codec! {
read => [
0, 0, 5, 0, 0, 0, 0, 0, 1,
@@ -32,8 +34,8 @@ fn read_data_no_padding() {
assert_closed!(codec);
}
#[test]
fn read_data_empty_payload() {
#[tokio::test]
async fn read_data_empty_payload() {
let mut codec = raw_codec! {
read => [
0, 0, 0, 0, 0, 0, 0, 0, 1,
@@ -48,8 +50,8 @@ fn read_data_empty_payload() {
assert_closed!(codec);
}
#[test]
fn read_data_end_stream() {
#[tokio::test]
async fn read_data_end_stream() {
let mut codec = raw_codec! {
read => [
0, 0, 5, 0, 1, 0, 0, 0, 1,
@@ -61,12 +63,11 @@ fn read_data_end_stream() {
assert_eq!(data.stream_id(), 1);
assert_eq!(data.payload(), &b"hello"[..]);
assert!(data.is_end_stream());
assert_closed!(codec);
}
#[test]
fn read_data_padding() {
#[tokio::test]
async fn read_data_padding() {
let mut codec = raw_codec! {
read => [
0, 0, 16, 0, 0x8, 0, 0, 0, 1,
@@ -84,8 +85,8 @@ fn read_data_padding() {
assert_closed!(codec);
}
#[test]
fn read_push_promise() {
#[tokio::test]
async fn read_push_promise() {
let mut codec = raw_codec! {
read => [
0, 0, 0x5,
@@ -104,8 +105,8 @@ fn read_push_promise() {
assert_closed!(codec);
}
#[test]
fn read_data_stream_id_zero() {
#[tokio::test]
async fn read_data_stream_id_zero() {
let mut codec = raw_codec! {
read => [
0, 0, 5, 0, 0, 0, 0, 0, 0,
@@ -130,64 +131,70 @@ fn read_headers_with_pseudo() {}
#[ignore]
fn read_headers_empty_payload() {}
#[test]
fn read_continuation_frames() {
#[tokio::test]
async fn read_continuation_frames() {
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let (io, mut srv) = mock::new();
let large = build_large_headers();
let frame = large.iter().fold(
frames::headers(1).response(200),
|frame, &(name, ref value)| frame.field(name, &value[..]),
).eos();
let frame = large
.iter()
.fold(
frames::headers(1).response(200),
|frame, &(name, ref value)| frame.field(name, &value[..]),
)
.eos();
let srv = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(frame)
.close();
.await;
srv.send_frame(frame).await;
};
let client = client::handshake(io)
.expect("handshake")
.and_then(|(mut client, conn)| {
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let client = async move {
let (mut client, mut conn) = client::handshake(io).await.expect("handshake");
let req = client
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = async {
let res = client
.send_request(request, true)
.expect("send_request")
.0
.expect("response")
.map(move |res| {
assert_eq!(res.status(), StatusCode::OK);
let (head, _body) = res.into_parts();
let expected = large.iter().fold(HeaderMap::new(), |mut map, &(name, ref value)| {
use h2_support::frames::HttpTryInto;
map.append(name, value.as_str().try_into().unwrap());
map
});
assert_eq!(head.headers, expected);
.await
.expect("response");
assert_eq!(res.status(), StatusCode::OK);
let (head, _body) = res.into_parts();
let expected = large
.iter()
.fold(HeaderMap::new(), |mut map, &(name, ref value)| {
use h2_support::frames::HttpTryInto;
map.append(name, value.as_str().try_into().unwrap());
map
});
assert_eq!(head.headers, expected);
};
conn.drive(req)
.and_then(move |(h2, _)| {
h2.expect("client")
}).map(|c| (client, c))
});
client.join(srv).wait().expect("wait");
conn.drive(req).await;
conn.await.expect("client");
};
join(srv, client).await;
}
#[test]
fn update_max_frame_len_at_rest() {
#[tokio::test]
async fn update_max_frame_len_at_rest() {
use futures::StreamExt;
let _ = env_logger::try_init();
// TODO: add test for updating max frame length in flight as well?
let mut codec = raw_codec! {
@@ -205,7 +212,7 @@ fn update_max_frame_len_at_rest() {
assert_eq!(codec.max_recv_frame_size(), 16_384);
assert_eq!(
codec.poll().unwrap_err().description(),
codec.next().await.unwrap().unwrap_err().description(),
"frame with invalid size"
);
}

View File

@@ -1,61 +1,55 @@
#![feature(async_await)]
use futures::future::join;
use h2_support::prelude::*;
#[test]
fn write_continuation_frames() {
#[tokio::test]
async fn write_continuation_frames() {
// An invalid dependency ID results in a stream level error. The hpack
// payload should still be decoded.
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let (io, mut srv) = mock::new();
let large = build_large_headers();
// Build the large request frame
let frame = large.iter().fold(
frames::headers(1).request("GET", "https://http2.akamai.com/"),
|frame, &(name, ref value)| frame.field(name, &value[..]));
|frame, &(name, ref value)| frame.field(name, &value[..]),
);
let srv = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(frame.eos())
.send_frame(
frames::headers(1)
.response(204)
.eos(),
)
.close();
let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(frame.eos()).await;
srv.send_frame(frames::headers(1).response(204).eos()).await;
};
let client = client::handshake(io)
.expect("handshake")
.and_then(|(mut client, conn)| {
let mut request = Request::builder();
request.uri("https://http2.akamai.com/");
let client = async move {
let (mut client, mut conn) = client::handshake(io).await.expect("handshake");
for &(name, ref value) in &large {
request.header(name, &value[..]);
}
let mut request = Request::builder();
request.uri("https://http2.akamai.com/");
let request = request
.body(())
.unwrap();
for &(name, ref value) in &large {
request.header(name, &value[..]);
}
let req = client
let request = request.body(()).unwrap();
let req = async {
let res = client
.send_request(request, true)
.expect("send_request1")
.0
.then(|res| {
let response = res.unwrap();
assert_eq!(response.status(), StatusCode::NO_CONTENT);
Ok::<_, ()>(())
});
.await;
let response = res.unwrap();
assert_eq!(response.status(), StatusCode::NO_CONTENT);
};
conn.drive(req)
.and_then(move |(h2, _)| {
h2.unwrap()
}).map(|c| {
(c, client)
})
});
conn.drive(req).await;
conn.await.unwrap();
};
client.join(srv).wait().expect("wait");
join(srv, client).await;
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,13 +1,26 @@
use h2_support::prelude::*;
use futures::{Async, Poll};
#![feature(async_await)]
use futures::{ready, FutureExt, StreamExt, TryFutureExt};
use h2_support::prelude::*;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::io;
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
thread,
};
use tokio::net::{TcpListener, TcpStream};
use std::{net::SocketAddr, thread, sync::{atomic::{AtomicUsize, Ordering}, Arc}};
struct Server {
addr: SocketAddr,
reqs: Arc<AtomicUsize>,
join: Option<thread::JoinHandle<()>>,
_join: Option<thread::JoinHandle<()>>,
}
impl Server {
@@ -23,32 +36,27 @@ impl Server {
let reqs = Arc::new(AtomicUsize::new(0));
let reqs2 = reqs.clone();
let join = thread::spawn(move || {
let server = listener.incoming().for_each(move |socket| {
let reqs = reqs2.clone();
let mk_data = mk_data.clone();
let connection = server::handshake(socket)
.and_then(move |conn| {
conn.for_each(move |(_, mut respond)| {
reqs.fetch_add(1, Ordering::Release);
let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
let mut send = respond.send_response(response, false)?;
send.send_data(mk_data(), true).map(|_|())
})
})
.map_err(|e| eprintln!("serve conn error: {:?}", e));
let server = async move {
let mut incoming = listener.incoming();
while let Some(socket) = incoming.next().await {
let reqs = reqs2.clone();
let mk_data = mk_data.clone();
tokio::spawn(async move {
if let Err(e) = handle_request(socket, reqs, mk_data).await {
eprintln!("serve conn error: {:?}", e)
}
});
}
};
tokio::spawn(Box::new(connection));
Ok(())
})
.map_err(|e| eprintln!("serve error: {:?}", e));
tokio::run(server);
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(server);
});
Self {
addr,
join: Some(join),
reqs
_join: Some(join),
reqs,
}
}
@@ -61,6 +69,25 @@ impl Server {
}
}
async fn handle_request<F>(
socket: io::Result<TcpStream>,
reqs: Arc<AtomicUsize>,
mk_data: Arc<F>,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Fn() -> Bytes,
F: Send + Sync + 'static,
{
let mut conn = server::handshake(socket?).await?;
while let Some(result) = conn.next().await {
let (_, mut respond) = result?;
reqs.fetch_add(1, Ordering::Release);
let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
let mut send = respond.send_response(response, false)?;
send.send_data(mk_data(), true)?;
}
Ok(())
}
struct Process {
body: RecvStream,
@@ -68,30 +95,25 @@ struct Process {
}
impl Future for Process {
type Item = ();
type Error = h2::Error;
type Output = Result<(), h2::Error>;
fn poll(&mut self) -> Poll<(), h2::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
if self.trailers {
return match self.body.poll_trailers()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(_) => Ok(().into()),
};
ready!(self.body.poll_trailers(cx));
return Poll::Ready(Ok(()));
} else {
match self.body.poll()? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => {
match ready!(Pin::new(&mut self.body).poll_next(cx)) {
None => {
self.trailers = true;
},
_ => {},
}
_ => {}
}
}
}
}
}
#[test]
fn hammer_client_concurrency() {
// This reproduces issue #326.
@@ -106,10 +128,12 @@ fn hammer_client_concurrency() {
print!("sending {}", i);
let rsps = rsps.clone();
let tcp = TcpStream::connect(&addr);
let tcp = tcp.then(|res| {
let tcp = res.unwrap();
client::handshake(tcp)
}).then(move |res| {
let tcp = tcp
.then(|res| {
let tcp = res.unwrap();
client::handshake(tcp)
})
.then(move |res| {
let rsps = rsps;
let (mut client, h2) = res.unwrap();
let request = Request::builder()
@@ -120,7 +144,9 @@ fn hammer_client_concurrency() {
let (response, mut stream) = client.send_request(request, false).unwrap();
stream.send_trailers(HeaderMap::new()).unwrap();
tokio::spawn(h2.map_err(|e| panic!("client conn error: {:?}", e)));
tokio::spawn(async move {
h2.await.unwrap();
});
response
.and_then(|response| {
@@ -139,7 +165,8 @@ fn hammer_client_concurrency() {
})
});
tokio::run(tcp);
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(tcp);
println!("...done");
}

View File

@@ -1,205 +1,188 @@
use h2_support::prelude::*;
use h2_support::assert_ping;
#![feature(async_await)]
#[test]
fn recv_single_ping() {
use futures::channel::oneshot;
use futures::future::join;
use futures::{StreamExt, TryStreamExt};
use h2_support::assert_ping;
use h2_support::prelude::*;
#[tokio::test]
async fn recv_single_ping() {
let _ = env_logger::try_init();
let (m, mock) = mock::new();
let (m, mut mock) = mock::new();
// Create the handshake
let h2 = client::handshake(m)
.unwrap()
.and_then(|(client, conn)| {
conn.unwrap()
.map(|c| (client, c))
});
let h2 = async move {
let (client, conn) = client::handshake(m).await.unwrap();
let c = conn.await.unwrap();
(client, c)
};
let mock = mock.assert_client_handshake()
.unwrap()
.and_then(|(_, mut mock)| {
let frame = frame::Ping::new(Default::default());
mock.send(frame.into()).unwrap();
let mock = async move {
let _ = mock.assert_client_handshake().await;
let frame = frame::Ping::new(Default::default());
mock.send(frame.into()).await.unwrap();
let frame = mock.next().await.unwrap();
mock.into_future().unwrap()
})
.and_then(|(frame, _)| {
let pong = assert_ping!(frame.unwrap());
let pong = assert_ping!(frame.unwrap());
// Payload is correct
assert_eq!(*pong.payload(), <[u8; 8]>::default());
// Payload is correct
assert_eq!(*pong.payload(), <[u8; 8]>::default());
// Is ACK
assert!(pong.is_ack());
// Is ACK
assert!(pong.is_ack());
};
Ok(())
});
let _ = h2.join(mock).wait().unwrap();
join(mock, h2).await;
}
#[test]
fn recv_multiple_pings() {
#[tokio::test]
async fn recv_multiple_pings() {
let _ = env_logger::try_init();
let (io, client) = mock::new();
let (io, mut client) = mock::new();
let client = client.assert_server_handshake()
.expect("client handshake")
.recv_settings()
.send_frame(frames::ping([1; 8]))
.send_frame(frames::ping([2; 8]))
.recv_frame(frames::ping([1; 8]).pong())
.recv_frame(frames::ping([2; 8]).pong())
.close();
let client = async move {
let settings = client.assert_server_handshake().await;
assert_default_settings!(settings);
client.send_frame(frames::ping([1; 8])).await;
client.send_frame(frames::ping([2; 8])).await;
client.recv_frame(frames::ping([1; 8]).pong()).await;
client.recv_frame(frames::ping([2; 8]).pong()).await;
};
let srv = server::handshake(io)
.expect("handshake")
.and_then(|srv| {
// future of first request, which never comes
srv.into_future().unwrap()
});
let srv = async move {
let mut s = server::handshake(io).await.expect("handshake");
assert!(s.next().await.is_none());
};
srv.join(client).wait().expect("wait");
join(client, srv).await;
}
#[test]
fn pong_has_highest_priority() {
#[tokio::test]
async fn pong_has_highest_priority() {
let _ = env_logger::try_init();
let (io, client) = mock::new();
let (io, mut client) = mock::new();
let data = Bytes::from(vec![0; 16_384]);
let data_clone = data.clone();
let client = client.assert_server_handshake()
.expect("client handshake")
.recv_settings()
.send_frame(
frames::headers(1)
.request("POST", "https://http2.akamai.com/")
)
.send_frame(frames::data(1, data.clone()).eos())
.send_frame(frames::ping([1; 8]))
.recv_frame(frames::ping([1; 8]).pong())
.recv_frame(frames::headers(1).response(200).eos())
.close();
let client = async move {
let settings = client.assert_server_handshake().await;
assert_default_settings!(settings);
client
.send_frame(frames::headers(1).request("POST", "https://http2.akamai.com/"))
.await;
client.send_frame(frames::data(1, data_clone).eos()).await;
client.send_frame(frames::ping([1; 8])).await;
client.recv_frame(frames::ping([1; 8]).pong()).await;
client
.recv_frame(frames::headers(1).response(200).eos())
.await;
};
let srv = server::handshake(io)
.expect("handshake")
.and_then(|srv| {
// future of first request
srv.into_future().unwrap()
}).and_then(move |(reqstream, srv)| {
let (req, mut stream) = reqstream.expect("request");
assert_eq!(req.method(), "POST");
let body = req.into_parts().1;
let srv = async move {
let mut s = server::handshake(io).await.expect("handshake");
let (req, mut stream) = s.next().await.unwrap().unwrap();
assert_eq!(req.method(), "POST");
let body = req.into_parts().1;
body.concat2()
.expect("body")
.and_then(move |body| {
assert_eq!(body.len(), data.len());
let res = Response::builder()
.status(200)
.body(())
.unwrap();
stream.send_response(res, true).expect("response");
srv.into_future().unwrap()
})
});
let body = body.try_concat().await.expect("body");
assert_eq!(body.len(), data.len());
let res = Response::builder().status(200).body(()).unwrap();
stream.send_response(res, true).expect("response");
assert!(s.next().await.is_none());
};
srv.join(client).wait().expect("wait");
join(client, srv).await;
}
#[test]
fn user_ping_pong() {
#[tokio::test]
async fn user_ping_pong() {
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let (io, mut srv) = mock::new();
let srv = srv.assert_client_handshake()
.expect("srv handshake")
.recv_settings()
.recv_frame(frames::ping(frame::Ping::USER))
.send_frame(frames::ping(frame::Ping::USER).pong())
.recv_frame(frames::go_away(0))
.recv_eof();
let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(frames::ping(frame::Ping::USER)).await;
srv.send_frame(frames::ping(frame::Ping::USER).pong()).await;
srv.recv_frame(frames::go_away(0)).await;
srv.recv_eof().await;
};
let client = client::handshake(io)
.expect("client handshake")
.and_then(|(client, conn)| {
// yield once so we can ack server settings
conn
.drive(util::yield_once())
.map(move |(conn, ())| (client, conn))
})
.and_then(|(client, mut conn)| {
// `ping_pong()` method conflict with mock future ext trait.
let mut ping_pong = client::Connection::ping_pong(&mut conn)
.expect("taking ping_pong");
let client = async move {
let (client, mut conn) = client::handshake(io).await.expect("client handshake");
// yield once so we can ack server settings
conn.drive(util::yield_once()).await;
// `ping_pong()` method conflict with mock future ext trait.
let mut ping_pong = client::Connection::ping_pong(&mut conn).expect("taking ping_pong");
ping_pong.send_ping(Ping::opaque()).expect("send ping");
// multiple pings results in a user error...
assert_eq!(
ping_pong
.send_ping(Ping::opaque())
.expect("send ping");
.expect_err("ping 2")
.to_string(),
"user error: send_ping before received previous pong",
"send_ping while ping pending is a user error",
);
// multiple pings results in a user error...
assert_eq!(
ping_pong.send_ping(Ping::opaque()).expect_err("ping 2").to_string(),
"user error: send_ping before received previous pong",
"send_ping while ping pending is a user error",
);
conn.drive(futures::future::poll_fn(move |cx| ping_pong.poll_pong(cx)))
.await
.unwrap();
drop(client);
conn.await.expect("client");
};
conn
.drive(futures::future::poll_fn(move || {
ping_pong.poll_pong()
}))
.and_then(move |(conn, _pong)| {
drop(client);
conn.expect("client")
})
});
client.join(srv).wait().expect("wait");
join(srv, client).await;
}
#[test]
fn user_notifies_when_connection_closes() {
#[tokio::test]
async fn user_notifies_when_connection_closes() {
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let (io, mut srv) = mock::new();
let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv
};
let srv = srv.assert_client_handshake()
.expect("srv handshake")
.recv_settings();
let client = async move {
let (_client, mut conn) = client::handshake(io).await.expect("client handshake");
// yield once so we can ack server settings
conn.drive(util::yield_once()).await;
conn
};
let client = client::handshake(io)
.expect("client handshake")
.and_then(|(client, conn)| {
// yield once so we can ack server settings
conn
.drive(util::yield_once())
.map(move |(conn, ())| (client, conn))
})
.map(|(_client, conn)| conn);
let (mut client, srv) = client.join(srv).wait().expect("wait");
let (srv, mut client) = join(srv, client).await;
// `ping_pong()` method conflict with mock future ext trait.
let mut ping_pong = client::Connection::ping_pong(&mut client)
.expect("taking ping_pong");
let mut ping_pong = client::Connection::ping_pong(&mut client).expect("taking ping_pong");
// Spawn a thread so we can park a task waiting on `poll_pong`, and then
// drop the client and be sure the parked task is notified...
let t = thread::spawn(move || {
poll_fn(|| { ping_pong.poll_pong() })
.wait()
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
poll_fn(|cx| ping_pong.poll_pong(cx))
.await
.expect_err("poll_pong should error");
ping_pong
tx.send(ping_pong).unwrap();
});
// Sleep to let the ping thread park its task...
thread::sleep(Duration::from_millis(50));
idle_ms(50).await;
drop(client);
drop(srv);
let mut ping_pong = t.join().expect("ping pong thread join");
let mut ping_pong = rx.await.expect("ping pong spawn join");
// Now that the connection is closed, also test `send_ping` errors...
assert_eq!(
ping_pong.send_ping(Ping::opaque()).expect_err("send_ping").to_string(),
ping_pong
.send_ping(Ping::opaque())
.expect_err("send_ping")
.to_string(),
"broken pipe",
);
}

View File

@@ -1,8 +1,13 @@
use h2_support::{DEFAULT_WINDOW_SIZE};
use h2_support::prelude::*;
#![feature(async_await)]
#[test]
fn single_stream_send_large_body() {
use futures::future::join;
use futures::{FutureExt, StreamExt};
use h2_support::prelude::*;
use h2_support::DEFAULT_WINDOW_SIZE;
use std::task::Context;
#[tokio::test]
async fn single_stream_send_large_body() {
let _ = env_logger::try_init();
let payload = [0; 1024];
@@ -12,8 +17,8 @@ fn single_stream_send_large_body() {
.write(frames::SETTINGS_ACK)
.write(&[
// POST /
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41,
172, 75, 143, 168, 233, 25, 151, 33, 233, 132,
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
33, 233, 132,
])
.write(&[
// DATA
@@ -24,16 +29,15 @@ fn single_stream_send_large_body() {
.read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89])
.build();
let notify = MockNotify::new();
let (mut client, mut h2) = client::handshake(mock).wait().unwrap();
let (mut client, mut h2) = client::handshake(mock).await.unwrap();
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
// Poll h2 once to get notifications
loop {
// Run the connection until all work is done, this handles processing
// the handshake.
notify.with(|| h2.poll()).unwrap();
if !notify.is_notified() {
if !h2.poll_unpin(&mut cx).is_ready() {
break;
}
}
@@ -55,80 +59,78 @@ fn single_stream_send_large_body() {
// Send the data
stream.send_data(payload[..].into(), true).unwrap();
assert!(notify.is_notified());
// Get the response
let resp = h2.run(response).unwrap();
let resp = h2.run(response).await.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
h2.wait().unwrap();
h2.await.unwrap();
}
#[test]
fn multiple_streams_with_payload_greater_than_default_window() {
#[tokio::test]
async fn multiple_streams_with_payload_greater_than_default_window() {
let _ = env_logger::try_init();
let payload = vec![0; 16384*5-1];
let payload = vec![0; 16384 * 5 - 1];
let payload_clone = payload.clone();
let (io, srv) = mock::new();
let (io, mut srv) = mock::new();
let srv = srv.assert_client_handshake().unwrap()
.recv_settings()
.recv_frame(
frames::headers(1).request("POST", "https://http2.akamai.com/")
)
.recv_frame(
frames::headers(3).request("POST", "https://http2.akamai.com/")
)
.recv_frame(
frames::headers(5).request("POST", "https://http2.akamai.com/")
)
.recv_frame(frames::data(1, &payload[0..16_384]))
.recv_frame(frames::data(1, &payload[16_384..(16_384*2)]))
.recv_frame(frames::data(1, &payload[(16_384*2)..(16_384*3)]))
.recv_frame(frames::data(1, &payload[(16_384*3)..(16_384*4-1)]))
.send_frame(frames::settings())
.recv_frame(frames::settings_ack())
.send_frame(frames::headers(1).response(200).eos())
.send_frame(frames::headers(3).response(200).eos())
.send_frame(frames::headers(5).response(200).eos())
.close();
let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(frames::headers(1).request("POST", "https://http2.akamai.com/"))
.await;
srv.recv_frame(frames::headers(3).request("POST", "https://http2.akamai.com/"))
.await;
srv.recv_frame(frames::headers(5).request("POST", "https://http2.akamai.com/"))
.await;
srv.recv_frame(frames::data(1, &payload[0..16_384])).await;
srv.recv_frame(frames::data(1, &payload[16_384..(16_384 * 2)]))
.await;
srv.recv_frame(frames::data(1, &payload[(16_384 * 2)..(16_384 * 3)]))
.await;
srv.recv_frame(frames::data(1, &payload[(16_384 * 3)..(16_384 * 4 - 1)]))
.await;
srv.send_frame(frames::settings()).await;
srv.recv_frame(frames::settings_ack()).await;
srv.send_frame(frames::headers(1).response(200).eos()).await;
srv.send_frame(frames::headers(3).response(200).eos()).await;
srv.send_frame(frames::headers(5).response(200).eos()).await;
};
let client = client::handshake(io).unwrap()
.and_then(|(mut client, conn)| {
let request1 = Request::post("https://http2.akamai.com/").body(()).unwrap();
let request2 = Request::post("https://http2.akamai.com/").body(()).unwrap();
let request3 = Request::post("https://http2.akamai.com/").body(()).unwrap();
let (response1, mut stream1) = client.send_request(request1, false).unwrap();
let (_response2, mut stream2) = client.send_request(request2, false).unwrap();
let (_response3, mut stream3) = client.send_request(request3, false).unwrap();
let client = async move {
let (mut client, mut conn) = client::handshake(io).await.unwrap();
let request1 = Request::post("https://http2.akamai.com/").body(()).unwrap();
let request2 = Request::post("https://http2.akamai.com/").body(()).unwrap();
let request3 = Request::post("https://http2.akamai.com/").body(()).unwrap();
let (response1, mut stream1) = client.send_request(request1, false).unwrap();
let (_response2, mut stream2) = client.send_request(request2, false).unwrap();
let (_response3, mut stream3) = client.send_request(request3, false).unwrap();
// The capacity should be immediately
// allocated to default window size (smaller than payload)
stream1.reserve_capacity(payload.len());
assert_eq!(stream1.capacity(), DEFAULT_WINDOW_SIZE);
// The capacity should be immediately
// allocated to default window size (smaller than payload)
stream1.reserve_capacity(payload_clone.len());
assert_eq!(stream1.capacity(), DEFAULT_WINDOW_SIZE);
stream2.reserve_capacity(payload.len());
assert_eq!(stream2.capacity(), 0);
stream2.reserve_capacity(payload_clone.len());
assert_eq!(stream2.capacity(), 0);
stream3.reserve_capacity(payload.len());
assert_eq!(stream3.capacity(), 0);
stream3.reserve_capacity(payload_clone.len());
assert_eq!(stream3.capacity(), 0);
stream1.send_data(payload[..].into(), true).unwrap();
stream1.send_data(payload_clone[..].into(), true).unwrap();
// hold onto streams so they don't close
// stream1 doesn't close because response1 is used
conn.drive(response1.expect("response")).map(|c| (c, client, stream2, stream3))
})
.and_then(|((conn, _res), client, stream2, stream3)| {
conn.expect("client").map(|c| (c, client, stream2, stream3))
});
// hold onto streams so they don't close
// stream1 doesn't close because response1 is used
let _res = conn.drive(response1).await.expect("response");
conn.await.expect("client");
};
srv.join(client).wait().unwrap();
join(srv, client).await;
}
#[test]
fn single_stream_send_extra_large_body_multi_frames_one_buffer() {
#[tokio::test]
async fn single_stream_send_extra_large_body_multi_frames_one_buffer() {
let _ = env_logger::try_init();
let payload = vec![0; 32_768];
@@ -138,8 +140,8 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() {
.write(frames::SETTINGS_ACK)
.write(&[
// POST /
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41,
172, 75, 143, 168, 233, 25, 151, 33, 233, 132,
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
33, 233, 132,
])
.write(&[
// DATA
@@ -155,16 +157,15 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() {
.read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89])
.build();
let notify = MockNotify::new();
let (mut client, mut h2) = client::handshake(mock).wait().unwrap();
let (mut client, mut h2) = client::handshake(mock).await.unwrap();
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
// Poll h2 once to get notifications
loop {
// Run the connection until all work is done, this handles processing
// the handshake.
notify.with(|| h2.poll()).unwrap();
if !notify.is_notified() {
if !h2.poll_unpin(&mut cx).is_ready() {
break;
}
}
@@ -185,28 +186,26 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() {
// Send the data
stream.send_data(payload.into(), true).unwrap();
assert!(notify.is_notified());
// Get the response
let resp = h2.run(response).unwrap();
let resp = h2.run(response).await.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
h2.wait().unwrap();
h2.await.unwrap();
}
#[test]
fn single_stream_send_body_greater_than_default_window() {
#[tokio::test]
async fn single_stream_send_body_greater_than_default_window() {
let _ = env_logger::try_init();
let payload = vec![0; 16384*5-1];
let payload = vec![0; 16384 * 5 - 1];
let mock = mock_io::Builder::new()
.handshake()
.write(frames::SETTINGS_ACK)
.write(&[
// POST /
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41,
172, 75, 143, 168, 233, 25, 151, 33, 233, 132,
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
33, 233, 132,
])
.write(&[
// DATA
@@ -217,41 +216,38 @@ fn single_stream_send_body_greater_than_default_window() {
// DATA
0, 64, 0, 0, 0, 0, 0, 0, 1,
])
.write(&payload[16_384..(16_384*2)])
.write(&payload[16_384..(16_384 * 2)])
.write(&[
// DATA
0, 64, 0, 0, 0, 0, 0, 0, 1,
])
.write(&payload[(16_384*2)..(16_384*3)])
.write(&payload[(16_384 * 2)..(16_384 * 3)])
.write(&[
// DATA
0, 63, 255, 0, 0, 0, 0, 0, 1,
])
.write(&payload[(16_384*3)..(16_384*4-1)])
.write(&payload[(16_384 * 3)..(16_384 * 4 - 1)])
// Read window update
.read(&[0, 0, 4, 8, 0, 0, 0, 0, 0, 0, 0, 64, 0])
.read(&[0, 0, 4, 8, 0, 0, 0, 0, 1, 0, 0, 64, 0])
.write(&[
// DATA
0, 64, 0, 0, 1, 0, 0, 0, 1,
])
.write(&payload[(16_384*4-1)..(16_384*5-1)])
.write(&payload[(16_384 * 4 - 1)..(16_384 * 5 - 1)])
// Read response
.read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89])
.build();
let notify = MockNotify::new();
let (mut client, mut h2) = client::handshake(mock).wait().unwrap();
let (mut client, mut h2) = client::handshake(mock).await.unwrap();
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
// Poll h2 once to get notifications
loop {
// Run the connection until all work is done, this handles processing
// the handshake.
notify.with(|| h2.poll()).unwrap();
if !notify.is_notified() {
if !h2.poll_unpin(&mut cx).is_ready() {
break;
}
}
@@ -268,9 +264,7 @@ fn single_stream_send_body_greater_than_default_window() {
loop {
// Run the connection until all work is done, this handles processing
// the handshake.
notify.with(|| h2.poll()).unwrap();
if !notify.is_notified() {
if !h2.poll_unpin(&mut cx).is_ready() {
break;
}
}
@@ -278,17 +272,15 @@ fn single_stream_send_body_greater_than_default_window() {
// Send the data
stream.send_data(payload.into(), true).unwrap();
assert!(notify.is_notified());
// Get the response
let resp = h2.run(response).unwrap();
let resp = h2.run(response).await.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
h2.wait().unwrap();
h2.await.unwrap();
}
#[test]
fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
#[tokio::test]
async fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
let _ = env_logger::try_init();
let payload = vec![0; 32_768];
@@ -300,24 +292,20 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
.read(frames::SETTINGS)
// Add wait to force the data writes to chill
.wait(Duration::from_millis(10))
// Rest
.write(&[
// POST /
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41,
172, 75, 143, 168, 233, 25, 151, 33, 233, 132,
0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, 172, 75, 143, 168, 233, 25, 151,
33, 233, 132,
])
.write(&[
// DATA
0, 64, 0, 0, 0, 0, 0, 0, 1,
])
.write(&payload[0..16_384])
.write(frames::SETTINGS_ACK)
.read(frames::SETTINGS_ACK)
.wait(Duration::from_millis(10))
.write(&[
// DATA
0, 64, 0, 0, 1, 0, 0, 0, 1,
@@ -327,7 +315,7 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
.read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89])
.build();
let (mut client, mut h2) = client::handshake(mock).wait().unwrap();
let (mut client, mut h2) = client::handshake(mock).await.unwrap();
let request = Request::builder()
.method(Method::POST)
@@ -346,92 +334,79 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() {
stream.send_data(payload.into(), true).unwrap();
// Get the response
let resp = h2.run(response).unwrap();
let resp = h2.run(response).await.unwrap();
assert_eq!(resp.status(), StatusCode::NO_CONTENT);
h2.wait().unwrap();
h2.await.unwrap();
}
#[test]
fn send_data_receive_window_update() {
#[tokio::test]
async fn send_data_receive_window_update() {
let _ = env_logger::try_init();
let (m, mock) = mock::new();
let (m, mut mock) = mock::new();
let h2 = client::handshake(m)
.unwrap()
.and_then(|(mut client, h2)| {
let request = Request::builder()
.method(Method::POST)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let h2 = async move {
let (mut client, mut h2) = client::handshake(m).await.unwrap();
let request = Request::builder()
.method(Method::POST)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
// Send request
let (response, mut stream) = client.send_request(request, false).unwrap();
// Send request
let (_response, mut stream) = client.send_request(request, false).unwrap();
// Send data frame
stream.send_data("hello".into(), false).unwrap();
// Send data frame
stream.send_data("hello".into(), false).unwrap();
stream.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize);
stream.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize);
// Wait for capacity
h2.drive(util::wait_for_capacity(
// Wait for capacity
let mut stream = h2
.drive(util::wait_for_capacity(
stream,
frame::DEFAULT_INITIAL_WINDOW_SIZE as usize,
).map(|s| (response, s)))
})
.and_then(|(h2, (_r, mut stream))| {
let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize];
stream.send_data(payload.into(), true).unwrap();
))
.await;
let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize];
stream.send_data(payload.into(), true).unwrap();
// keep `stream` from being dropped in order to prevent
// it from sending an RST_STREAM frame.
std::mem::forget(stream);
h2.unwrap()
});
// keep `stream` from being dropped in order to prevent
// it from sending an RST_STREAM frame.
std::mem::forget(stream);
h2.await.unwrap();
};
let mock = mock.assert_client_handshake().unwrap()
.and_then(|(_, mock)| mock.into_future().unwrap())
.and_then(|(frame, mock)| {
let request = assert_headers!(frame.unwrap());
assert!(!request.is_end_stream());
mock.into_future().unwrap()
})
.and_then(|(frame, mut mock)| {
let data = assert_data!(frame.unwrap());
let mock = async move {
let _ = mock.assert_client_handshake().await;
// Update the windows
let len = data.payload().len();
let f = frame::WindowUpdate::new(StreamId::zero(), len as u32);
mock.send(f.into()).unwrap();
let frame = mock.next().await.unwrap();
let request = assert_headers!(frame.unwrap());
assert!(!request.is_end_stream());
let frame = mock.next().await.unwrap();
let data = assert_data!(frame.unwrap());
let f = frame::WindowUpdate::new(data.stream_id(), len as u32);
mock.send(f.into()).unwrap();
// Update the windows
let len = data.payload().len();
let f = frame::WindowUpdate::new(StreamId::zero(), len as u32);
mock.send(f.into()).await.unwrap();
mock.into_future().unwrap()
})
// TODO: Dedup the following lines
.and_then(|(frame, mock)| {
let f = frame::WindowUpdate::new(data.stream_id(), len as u32);
mock.send(f.into()).await.unwrap();
for _ in 0..3usize {
let frame = mock.next().await.unwrap();
let data = assert_data!(frame.unwrap());
assert_eq!(data.payload().len(), frame::DEFAULT_MAX_FRAME_SIZE as usize);
mock.into_future().unwrap()
})
.and_then(|(frame, mock)| {
let data = assert_data!(frame.unwrap());
assert_eq!(data.payload().len(), frame::DEFAULT_MAX_FRAME_SIZE as usize);
mock.into_future().unwrap()
})
.and_then(|(frame, mock)| {
let data = assert_data!(frame.unwrap());
assert_eq!(data.payload().len(), frame::DEFAULT_MAX_FRAME_SIZE as usize);
mock.into_future().unwrap()
})
.and_then(|(frame, _)| {
let data = assert_data!(frame.unwrap());
assert_eq!(data.payload().len(), (frame::DEFAULT_MAX_FRAME_SIZE-1) as usize);
Ok(())
});
}
let frame = mock.next().await.unwrap();
let data = assert_data!(frame.unwrap());
assert_eq!(
data.payload().len(),
(frame::DEFAULT_MAX_FRAME_SIZE - 1) as usize
);
};
let _ = h2.join(mock).wait().unwrap();
join(mock, h2).await;
}

View File

@@ -1,310 +1,346 @@
#![feature(async_await)]
use futures::future::join;
use futures::{StreamExt, TryStreamExt};
use h2_support::prelude::*;
#[test]
fn recv_push_works() {
#[tokio::test]
async fn recv_push_works() {
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let mock = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
let (io, mut srv) = mock::new();
let mock = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(frames::headers(1).response(404))
.send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"))
.send_frame(frames::data(1, "").eos())
.send_frame(frames::headers(2).response(200))
.send_frame(frames::data(2, "promised_data").eos());
let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| {
.await;
srv.send_frame(frames::headers(1).response(404)).await;
srv.send_frame(
frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"),
)
.await;
srv.send_frame(frames::data(1, "").eos()).await;
srv.send_frame(frames::headers(2).response(200)).await;
srv.send_frame(frames::data(2, "promised_data").eos()).await;
};
let h2 = async move {
let (mut client, mut h2) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let (mut resp, _) = client
.send_request(request, true)
.unwrap();
let (mut resp, _) = client.send_request(request, true).unwrap();
let pushed = resp.push_promises();
let check_resp_status = resp.unwrap().map(|resp| {
assert_eq!(resp.status(), StatusCode::NOT_FOUND)
});
let check_pushed_request = pushed.and_then(|headers| {
let (request, response) = headers.into_parts();
assert_eq!(request.into_parts().0.method, Method::GET);
response
});
let check_pushed_response = check_pushed_request.and_then(
|resp| {
assert_eq!(resp.status(), StatusCode::OK);
resp.into_body().concat2().map(|b| assert_eq!(b, "promised_data"))
}
).collect().unwrap().map(|ps| {
let check_resp_status = async move {
let resp = resp.await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
};
let check_pushed_response = async move {
let p = pushed.and_then(|headers| {
async move {
let (request, response) = headers.into_parts();
assert_eq!(request.into_parts().0.method, Method::GET);
let resp = response.await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let b = resp.into_body().try_concat().await.unwrap();
assert_eq!(b, "promised_data");
Ok(())
}
});
let ps: Vec<_> = p.collect().await;
assert_eq!(1, ps.len())
});
h2.drive(check_resp_status.join(check_pushed_response))
});
};
h2.join(mock).wait().unwrap();
h2.drive(join(check_resp_status, check_pushed_response))
.await;
};
join(mock, h2).await;
}
#[test]
fn pushed_streams_arent_dropped_too_early() {
#[tokio::test]
async fn pushed_streams_arent_dropped_too_early() {
// tests that by default, received push promises work
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let mock = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
let (io, mut srv) = mock::new();
let mock = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(frames::headers(1).response(404))
.send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"))
.send_frame(frames::push_promise(1, 4).request("GET", "https://http2.akamai.com/style2.css"))
.send_frame(frames::data(1, "").eos())
.idle_ms(10)
.send_frame(frames::headers(2).response(200))
.send_frame(frames::headers(4).response(200).eos())
.send_frame(frames::data(2, "").eos())
.recv_frame(frames::go_away(4));
.await;
srv.send_frame(frames::headers(1).response(404)).await;
srv.send_frame(
frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"),
)
.await;
srv.send_frame(
frames::push_promise(1, 4).request("GET", "https://http2.akamai.com/style2.css"),
)
.await;
srv.send_frame(frames::data(1, "").eos()).await;
idle_ms(10).await;
srv.send_frame(frames::headers(2).response(200)).await;
srv.send_frame(frames::headers(4).response(200).eos()).await;
srv.send_frame(frames::data(2, "").eos()).await;
srv.recv_frame(frames::go_away(4)).await;
};
let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| {
let h2 = async move {
let (mut client, mut h2) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let (mut resp, _) = client
.send_request(request, true)
.unwrap();
let pushed = resp.push_promises();
let check_status = resp.unwrap().and_then(|resp| {
let (mut resp, _) = client.send_request(request, true).unwrap();
let mut pushed = resp.push_promises();
let check_status = async move {
let resp = resp.await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
Ok(())
});
let check_pushed_headers = pushed.and_then(|headers| {
let (request, response) = headers.into_parts();
assert_eq!(request.into_parts().0.method, Method::GET);
response
});
let check_pushed = check_pushed_headers.map(
|resp| assert_eq!(resp.status(), StatusCode::OK)
).collect().unwrap().and_then(|ps| {
assert_eq!(2, ps.len());
Ok(())
});
h2.drive(check_status.join(check_pushed)).and_then(|(conn, _)| conn.expect("client"))
});
};
h2.join(mock).wait().unwrap();
let check_pushed = async move {
let mut count = 0;
while let Some(headers) = pushed.next().await {
let (request, response) = headers.unwrap().into_parts();
assert_eq!(request.into_parts().0.method, Method::GET);
let resp = response.await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
count += 1;
}
assert_eq!(2, count);
};
drop(client);
h2.drive(join(check_pushed, check_status)).await;
h2.await.expect("client");
};
join(mock, h2).await;
}
#[test]
fn recv_push_when_push_disabled_is_conn_error() {
#[tokio::test]
async fn recv_push_when_push_disabled_is_conn_error() {
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let mock = srv.assert_client_handshake()
.unwrap()
.ignore_settings()
.recv_frame(
let (io, mut srv) = mock::new();
let mock = async move {
let _ = srv.assert_client_handshake().await;
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(frames::push_promise(1, 3).request("GET", "https://http2.akamai.com/style.css"))
.send_frame(frames::headers(1).response(200).eos())
.recv_frame(frames::go_away(0).protocol_error());
let h2 = client::Builder::new()
.enable_push(false)
.handshake::<_, Bytes>(io)
.unwrap()
.and_then(|(mut client, h2)| {
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = client.send_request(request, true).unwrap().0.then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: unspecific protocol error detected"
);
Ok::<(), ()>(())
});
// client should see a protocol error
let conn = h2.then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: unspecific protocol error detected"
);
Ok::<(), ()>(())
});
conn.unwrap().join(req)
});
h2.join(mock).wait().unwrap();
}
#[test]
fn pending_push_promises_reset_when_dropped() {
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let srv = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
.await;
srv.send_frame(
frames::push_promise(1, 3).request("GET", "https://http2.akamai.com/style.css"),
)
.send_frame(
frames::push_promise(1, 2)
.request("GET", "https://http2.akamai.com/style.css")
)
.send_frame(frames::headers(1).response(200).eos())
.recv_frame(frames::reset(2).cancel())
.close();
.await;
srv.send_frame(frames::headers(1).response(200).eos()).await;
srv.recv_frame(frames::go_away(0).protocol_error()).await;
};
let client = client::handshake(io).unwrap().and_then(|(mut client, conn)| {
let h2 = async move {
let (mut client, h2) = client::Builder::new()
.enable_push(false)
.handshake::<_, Bytes>(io)
.await
.unwrap();
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = client
.send_request(request, true)
.unwrap()
.0.expect("response")
.and_then(|resp| {
assert_eq!(resp.status(), StatusCode::OK);
Ok(())
});
conn.drive(req)
.and_then(move |(conn, _)| conn.expect("client").map(move |()| drop(client)))
});
let req = async move {
let res = client.send_request(request, true).unwrap().0.await;
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: unspecific protocol error detected"
);
};
client.join(srv).wait().expect("wait");
// client should see a protocol error
let conn = async move {
let res = h2.await;
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: unspecific protocol error detected"
);
};
join(conn, req).await;
};
join(mock, h2).await;
}
#[test]
fn recv_push_promise_over_max_header_list_size() {
#[tokio::test]
async fn pending_push_promises_reset_when_dropped() {
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let srv = srv.assert_client_handshake()
.unwrap()
.recv_custom_settings(
frames::settings()
.max_header_list_size(10)
)
.recv_frame(
let (io, mut srv) = mock::new();
let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"))
.recv_frame(frames::reset(2).refused())
.send_frame(frames::headers(1).response(200).eos())
.idle_ms(10)
.close();
.await;
srv.send_frame(
frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"),
)
.await;
srv.send_frame(frames::headers(1).response(200).eos()).await;
srv.recv_frame(frames::reset(2).cancel()).await;
};
let client = client::Builder::new()
.max_header_list_size(10)
.handshake::<_, Bytes>(io)
.expect("handshake")
.and_then(|(mut client, conn)| {
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let client = async move {
let (mut client, mut conn) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = async {
let resp = client
.send_request(request, true)
.unwrap()
.0
.await
.expect("response");
assert_eq!(resp.status(), StatusCode::OK);
};
let req = client
let _ = conn.drive(req).await;
conn.await.expect("client");
drop(client);
};
join(srv, client).await;
}
#[tokio::test]
async fn recv_push_promise_over_max_header_list_size() {
let _ = env_logger::try_init();
let (io, mut srv) = mock::new();
let srv = async move {
let settings = srv.assert_client_handshake().await;
assert_frame_eq(settings, frames::settings().max_header_list_size(10));
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.await;
srv.send_frame(
frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"),
)
.await;
srv.recv_frame(frames::reset(2).refused()).await;
srv.send_frame(frames::headers(1).response(200).eos()).await;
idle_ms(10).await;
};
let client = async move {
let (mut client, mut conn) = client::Builder::new()
.max_header_list_size(10)
.handshake::<_, Bytes>(io)
.await
.expect("handshake");
let request = Request::builder()
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = async move {
let err = client
.send_request(request, true)
.expect("send_request")
.0
.expect_err("response")
.map(|err| {
assert_eq!(
err.reason(),
Some(Reason::REFUSED_STREAM)
);
});
.await
.expect_err("response");
assert_eq!(err.reason(), Some(Reason::REFUSED_STREAM));
};
conn.drive(req)
.and_then(|(conn, _)| conn.expect("client"))
});
client.join(srv).wait().expect("wait");
conn.drive(req).await;
conn.await.expect("client");
};
join(srv, client).await;
}
#[test]
fn recv_invalid_push_promise_headers_is_stream_protocol_error() {
#[tokio::test]
async fn recv_invalid_push_promise_headers_is_stream_protocol_error() {
// Unsafe method or content length is stream protocol error
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let mock = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
let (io, mut srv) = mock::new();
let mock = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(frames::headers(1).response(404))
.send_frame(frames::push_promise(1, 2).request("POST", "https://http2.akamai.com/style.css"))
.send_frame(
.await;
srv.send_frame(frames::headers(1).response(404)).await;
srv.send_frame(
frames::push_promise(1, 2).request("POST", "https://http2.akamai.com/style.css"),
)
.await;
srv.send_frame(
frames::push_promise(1, 4)
.request("GET", "https://http2.akamai.com/style.css")
.field(http::header::CONTENT_LENGTH, 1)
.field(http::header::CONTENT_LENGTH, 1),
)
.send_frame(
.await;
srv.send_frame(
frames::push_promise(1, 6)
.request("GET", "https://http2.akamai.com/style.css")
.field(http::header::CONTENT_LENGTH, 0)
.field(http::header::CONTENT_LENGTH, 0),
)
.send_frame(frames::headers(1).response(404).eos())
.recv_frame(frames::reset(2).protocol_error())
.recv_frame(frames::reset(4).protocol_error())
.send_frame(frames::headers(6).response(200).eos())
.close();
.await;
srv.send_frame(frames::headers(1).response(404).eos()).await;
srv.recv_frame(frames::reset(2).protocol_error()).await;
srv.recv_frame(frames::reset(4).protocol_error()).await;
srv.send_frame(frames::headers(6).response(200).eos()).await;
};
let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| {
let h2 = async move {
let (mut client, mut h2) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let (mut resp, _) = client
.send_request(request, true)
.unwrap();
let check_pushed_request = resp.push_promises().and_then(|headers| {
headers.into_parts().1
});
let check_pushed_response = check_pushed_request
.collect().unwrap().map(|ps| {
// CONTENT_LENGTH = 0 is ok
assert_eq!(1, ps.len())
});
h2.drive(check_pushed_response)
});
let (mut resp, _) = client.send_request(request, true).unwrap();
let check_pushed_response = async move {
let pushed = resp.push_promises();
let p = pushed.and_then(|headers| headers.into_parts().1);
let ps: Vec<_> = p.collect().await;
// CONTENT_LENGTH = 0 is ok
assert_eq!(1, ps.len());
};
h2.drive(check_pushed_response).await;
};
h2.join(mock).wait().unwrap();
join(mock, h2).await;
}
#[test]
@@ -313,102 +349,110 @@ fn recv_push_promise_with_wrong_authority_is_stream_error() {
// if server is foo.com, :authority = bar.com is stream error
}
#[test]
fn recv_push_promise_skipped_stream_id() {
#[tokio::test]
async fn recv_push_promise_skipped_stream_id() {
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let mock = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
let (io, mut srv) = mock::new();
let mock = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(frames::push_promise(1, 4).request("GET", "https://http2.akamai.com/style.css"))
.send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"))
.recv_frame(frames::go_away(0).protocol_error())
.close();
.await;
srv.send_frame(
frames::push_promise(1, 4).request("GET", "https://http2.akamai.com/style.css"),
)
.await;
srv.send_frame(
frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"),
)
.await;
srv.recv_frame(frames::go_away(0).protocol_error()).await;
};
let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| {
let h2 = async move {
let (mut client, h2) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = client
.send_request(request, true)
.unwrap()
.0
.then(|res| {
assert!(res.is_err());
Ok::<_, ()>(())
});
let req = async move {
let res = client.send_request(request, true).unwrap().0.await;
assert!(res.is_err());
};
// client should see a protocol error
let conn = h2.then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: unspecific protocol error detected"
);
Ok::<(), ()>(())
});
// client should see a protocol error
let conn = async move {
let res = h2.await;
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: unspecific protocol error detected"
);
};
conn.unwrap().join(req)
});
join(conn, req).await;
};
h2.join(mock).wait().unwrap();
join(mock, h2).await;
}
#[test]
fn recv_push_promise_dup_stream_id() {
#[tokio::test]
async fn recv_push_promise_dup_stream_id() {
let _ = env_logger::try_init();
let (io, srv) = mock::new();
let mock = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
let (io, mut srv) = mock::new();
let mock = async move {
let settings = srv.assert_client_handshake().await;
assert_default_settings!(settings);
srv.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"))
.send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"))
.recv_frame(frames::go_away(0).protocol_error())
.close();
.await;
srv.send_frame(
frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"),
)
.await;
srv.send_frame(
frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"),
)
.await;
srv.recv_frame(frames::go_away(0).protocol_error()).await;
};
let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| {
let h2 = async move {
let (mut client, h2) = client::handshake(io).await.unwrap();
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = client
.send_request(request, true)
.unwrap()
.0
.then(|res| {
assert!(res.is_err());
Ok::<_, ()>(())
});
let req = async move {
let res = client.send_request(request, true).unwrap().0.await;
assert!(res.is_err());
};
// client should see a protocol error
let conn = h2.then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: unspecific protocol error detected"
);
Ok::<(), ()>(())
});
// client should see a protocol error
let conn = async move {
let res = h2.await;
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: unspecific protocol error detected"
);
};
conn.unwrap().join(req)
});
join(conn, req).await;
};
h2.join(mock).wait().unwrap();
join(mock, h2).await;
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,25 +1,28 @@
#![feature(async_await)]
use futures::StreamExt;
use h2_support::prelude::*;
#[test]
fn recv_trailers_only() {
#[tokio::test]
async fn recv_trailers_only() {
let _ = env_logger::try_init();
let mock = mock_io::Builder::new()
.handshake()
// Write GET /
.write(&[
0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29,
0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84,
0, 0, 0x10, 1, 5, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, 0xAC, 0x4B, 0x8F,
0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84,
])
.write(frames::SETTINGS_ACK)
// Read response
.read(&[
0, 0, 1, 1, 4, 0, 0, 0, 1, 0x88, 0, 0, 9, 1, 5, 0, 0, 0, 1,
0x40, 0x84, 0x42, 0x46, 0x9B, 0x51, 0x82, 0x3F, 0x5F,
0, 0, 1, 1, 4, 0, 0, 0, 1, 0x88, 0, 0, 9, 1, 5, 0, 0, 0, 1, 0x40, 0x84, 0x42, 0x46,
0x9B, 0x51, 0x82, 0x3F, 0x5F,
])
.build();
let (mut client, mut h2) = client::handshake(mock).wait().unwrap();
let (mut client, mut h2) = client::handshake(mock).await.unwrap();
// Send the request
let request = Request::builder()
@@ -30,44 +33,47 @@ fn recv_trailers_only() {
log::info!("sending request");
let (response, _) = client.send_request(request, true).unwrap();
let response = h2.run(response).unwrap();
let response = h2.run(response).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let (_, mut body) = response.into_parts();
// Make sure there is no body
let chunk = h2.run(poll_fn(|| body.poll())).unwrap();
let chunk = h2.run(Box::pin(body.next())).await;
assert!(chunk.is_none());
let trailers = h2.run(poll_fn(|| body.poll_trailers())).unwrap().unwrap();
let trailers = h2
.run(poll_fn(|cx| body.poll_trailers(cx)))
.await
.unwrap()
.unwrap();
assert_eq!(1, trailers.len());
assert_eq!(trailers["status"], "ok");
h2.wait().unwrap();
h2.await.unwrap();
}
#[test]
fn send_trailers_immediately() {
#[tokio::test]
async fn send_trailers_immediately() {
let _ = env_logger::try_init();
let mock = mock_io::Builder::new()
.handshake()
// Write GET /
.write(&[
0, 0, 0x10, 1, 4, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29,
0xAC, 0x4B, 0x8F, 0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, 0, 0,
0x0A, 1, 5, 0, 0, 0, 1, 0x40, 0x83, 0xF6, 0x7A, 0x66, 0x84, 0x9C,
0xB4, 0x50, 0x7F,
0, 0, 0x10, 1, 4, 0, 0, 0, 1, 0x82, 0x87, 0x41, 0x8B, 0x9D, 0x29, 0xAC, 0x4B, 0x8F,
0xA8, 0xE9, 0x19, 0x97, 0x21, 0xE9, 0x84, 0, 0, 0x0A, 1, 5, 0, 0, 0, 1, 0x40, 0x83,
0xF6, 0x7A, 0x66, 0x84, 0x9C, 0xB4, 0x50, 0x7F,
])
.write(frames::SETTINGS_ACK)
// Read response
.read(&[
0, 0, 1, 1, 4, 0, 0, 0, 1, 0x88, 0, 0, 0x0B, 0, 1, 0, 0, 0, 1,
0x68, 0x65, 0x6C, 0x6C, 0x6F, 0x20, 0x77, 0x6F, 0x72, 0x6C, 0x64,
0, 0, 1, 1, 4, 0, 0, 0, 1, 0x88, 0, 0, 0x0B, 0, 1, 0, 0, 0, 1, 0x68, 0x65, 0x6C, 0x6C,
0x6F, 0x20, 0x77, 0x6F, 0x72, 0x6C, 0x64,
])
.build();
let (mut client, mut h2) = client::handshake(mock).wait().unwrap();
let (mut client, mut h2) = client::handshake(mock).await.unwrap();
// Send the request
let request = Request::builder()
@@ -83,22 +89,21 @@ fn send_trailers_immediately() {
stream.send_trailers(trailers).unwrap();
let response = h2.run(response).unwrap();
let response = h2.run(response).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let (_, mut body) = response.into_parts();
// There is a data chunk
let chunk = h2.run(poll_fn(|| body.poll())).unwrap();
assert!(chunk.is_some());
let _ = h2.run(body.next()).await.unwrap().unwrap();
let chunk = h2.run(poll_fn(|| body.poll())).unwrap();
let chunk = h2.run(body.next()).await;
assert!(chunk.is_none());
let trailers = h2.run(poll_fn(|| body.poll_trailers())).unwrap();
let trailers = h2.run(poll_fn(|cx| body.poll_trailers(cx))).await;
assert!(trailers.is_none());
h2.wait().unwrap();
h2.await.unwrap();
}
#[test]