test(client,server): add back tests around streaming bodies (#2905)

This commit is contained in:
Sean McArthur
2022-06-29 06:39:21 -07:00
committed by GitHub
parent ce72f73464
commit 3660443108
2 changed files with 249 additions and 234 deletions

View File

@@ -4,6 +4,7 @@
#[macro_use] #[macro_use]
extern crate matches; extern crate matches;
use std::convert::Infallible;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener}; use std::net::{SocketAddr, TcpListener};
use std::pin::Pin; use std::pin::Pin;
@@ -11,9 +12,11 @@ use std::task::{Context, Poll};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use http_body_util::{BodyExt, StreamBody};
use hyper::body::to_bytes as concat; use hyper::body::to_bytes as concat;
use hyper::{Body, Client, Method, Request, StatusCode}; use hyper::{Body, Client, Method, Request, StatusCode};
use bytes::Bytes;
use futures_channel::oneshot; use futures_channel::oneshot;
use futures_core::{Future, Stream, TryFuture}; use futures_core::{Future, Stream, TryFuture};
use futures_util::future::{self, FutureExt, TryFutureExt}; use futures_util::future::{self, FutureExt, TryFutureExt};
@@ -154,7 +157,7 @@ macro_rules! test {
.build(connector); .build(connector);
#[allow(unused_assignments, unused_mut)] #[allow(unused_assignments, unused_mut)]
let mut body = Body::empty(); let mut body = BodyExt::boxed(http_body_util::Empty::<bytes::Bytes>::new());
let mut req_builder = Request::builder(); let mut req_builder = Request::builder();
$( $(
test!(@client_request; req_builder, body, addr, $c_req_prop: $c_req_val); test!(@client_request; req_builder, body, addr, $c_req_prop: $c_req_val);
@@ -232,7 +235,11 @@ macro_rules! __client_req_prop {
}}; }};
($req_builder:ident, $body:ident, $addr:ident, body: $body_e:expr) => {{ ($req_builder:ident, $body:ident, $addr:ident, body: $body_e:expr) => {{
$body = $body_e.into(); $body = BodyExt::boxed(http_body_util::Full::from($body_e));
}};
($req_builder:ident, $body:ident, $addr:ident, body_stream: $body_e:expr) => {{
$body = BodyExt::boxed(StreamBody::new($body_e));
}}; }};
} }
@@ -457,95 +464,100 @@ test! {
body: &b"hello"[..], body: &b"hello"[..],
} }
// TODO: Fix this, broken in PR #2896 test! {
// test! { name: client_get_req_body_sized,
// name: client_get_req_body_sized,
// server: server:
// expected: "\ expected: "\
// GET / HTTP/1.1\r\n\ GET / HTTP/1.1\r\n\
// content-length: 5\r\n\ content-length: 5\r\n\
// host: {addr}\r\n\ host: {addr}\r\n\
// \r\n\ \r\n\
// hello\ hello\
// ", ",
// reply: REPLY_OK, reply: REPLY_OK,
// client: client:
// request: { request: {
// method: GET, method: GET,
// url: "http://{addr}/", url: "http://{addr}/",
// headers: { headers: {
// "Content-Length" => "5", "Content-Length" => "5",
// }, },
// body: (Body::wrap_stream(Body::from("hello"))), // use a "stream" (where Body doesn't know length) with a
// }, // content-length header
// response: body_stream: (futures_util::stream::once(async {
// status: OK, Ok::<_, Infallible>(Bytes::from("hello"))
// headers: {}, })),
// body: None, },
// } response:
status: OK,
headers: {},
body: None,
}
// TODO: Fix this, broken in PR #2896 test! {
// test! { name: client_get_req_body_unknown,
// name: client_get_req_body_unknown,
// server: server:
// expected: "\ expected: "\
// GET / HTTP/1.1\r\n\ GET / HTTP/1.1\r\n\
// host: {addr}\r\n\ host: {addr}\r\n\
// \r\n\ \r\n\
// ", ",
// reply: REPLY_OK, reply: REPLY_OK,
// client: client:
// request: { request: {
// method: GET, method: GET,
// url: "http://{addr}/", url: "http://{addr}/",
// // wrap_steam means we don't know the content-length, // steam means we don't know the content-length,
// // but we're wrapping a non-empty stream. // but we're wrapping a non-empty stream.
// // //
// // But since the headers cannot tell us, and the method typically // But since the headers cannot tell us, and the method typically
// // doesn't have a body, the body must be ignored. // doesn't have a body, the body must be ignored.
// body: (Body::from("hello")), body_stream: (futures_util::stream::once(async {
// }, Ok::<_, Infallible>(Bytes::from("hello"))
// response: })),
// status: OK, },
// headers: {}, response:
// body: None, status: OK,
// } headers: {},
body: None,
}
// TODO: Fix this, broken in PR #2896 test! {
// test! { name: client_get_req_body_unknown_http10,
// name: client_get_req_body_unknown_http10,
// server: server:
// expected: "\ expected: "\
// GET / HTTP/1.0\r\n\ GET / HTTP/1.0\r\n\
// host: {addr}\r\n\ host: {addr}\r\n\
// \r\n\ \r\n\
// ", ",
// reply: "HTTP/1.0 200 OK\r\ncontent-length: 0\r\n\r\n", reply: "HTTP/1.0 200 OK\r\ncontent-length: 0\r\n\r\n",
// client: client:
// request: { request: {
// method: GET, method: GET,
// url: "http://{addr}/", url: "http://{addr}/",
// headers: { headers: {
// "transfer-encoding" => "chunked", "transfer-encoding" => "chunked",
// }, },
// version: HTTP_10, version: HTTP_10,
// // wrap_steam means we don't know the content-length, // steam means we don't know the content-length,
// // but we're wrapping a non-empty stream. // but we're wrapping a non-empty stream.
// // //
// // But since the headers cannot tell us, the body must be ignored. // But since the headers cannot tell us, the body must be ignored.
// body: (Body::from("hello")), body_stream: (futures_util::stream::once(async {
// }, Ok::<_, Infallible>(Bytes::from("hello"))
// response: })),
// status: OK, },
// headers: {}, response:
// body: None, status: OK,
// } headers: {},
body: None,
}
test! { test! {
name: client_post_sized, name: client_post_sized,
@@ -605,33 +617,35 @@ test! {
body: None, body: None,
} }
// TODO: Fix this, broken in PR #2896 test! {
// test! { name: client_post_unknown,
// name: client_post_unknown,
// server: server:
// expected: "\ expected: "\
// POST /chunks HTTP/1.1\r\n\ POST /chunks HTTP/1.1\r\n\
// host: {addr}\r\n\ host: {addr}\r\n\
// transfer-encoding: chunked\r\n\ transfer-encoding: chunked\r\n\
// \r\n\ \r\n\
// B\r\n\ B\r\n\
// foo bar baz\r\n\ foo bar baz\r\n\
// 0\r\n\r\n\ 0\r\n\r\n\
// ", ",
// reply: REPLY_OK, reply: REPLY_OK,
// client: client:
// request: { request: {
// method: POST, method: POST,
// url: "http://{addr}/chunks", url: "http://{addr}/chunks",
// body: (Body::from("foo bar baz")), // use a stream to "hide" that the full amount is known
// }, body_stream: (futures_util::stream::once(async {
// response: Ok::<_, Infallible>(Bytes::from("foo bar baz"))
// status: OK, })),
// headers: {}, },
// body: None, response:
// } status: OK,
headers: {},
body: None,
}
test! { test! {
name: client_post_empty, name: client_post_empty,
@@ -1665,79 +1679,78 @@ mod dispatch_impl {
assert_eq!(connects.load(Ordering::Relaxed), 2); assert_eq!(connects.load(Ordering::Relaxed), 2);
} }
// TODO: Fix this, broken in PR #2896 #[test]
// #[test] fn client_keep_alive_when_response_before_request_body_ends() {
// fn client_keep_alive_when_response_before_request_body_ends() { let _ = pretty_env_logger::try_init();
// let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
// let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap();
// let addr = server.local_addr().unwrap(); let rt = support::runtime();
// let rt = support::runtime();
// let connector = DebugConnector::new(); let connector = DebugConnector::new();
// let connects = connector.connects.clone(); let connects = connector.connects.clone();
// let client = Client::builder().build(connector); let client = Client::builder().build(connector);
// let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
// let (tx2, rx2) = oneshot::channel(); let (tx2, rx2) = oneshot::channel();
// let (tx3, rx3) = oneshot::channel(); let (tx3, rx3) = oneshot::channel();
// thread::spawn(move || { thread::spawn(move || {
// let mut sock = server.accept().unwrap().0; let mut sock = server.accept().unwrap().0;
// sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
// sock.set_write_timeout(Some(Duration::from_secs(5))) sock.set_write_timeout(Some(Duration::from_secs(5)))
// .unwrap(); .unwrap();
// let mut buf = [0; 4096]; let mut buf = [0; 4096];
// sock.read(&mut buf).expect("read 1"); sock.read(&mut buf).expect("read 1");
// sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
// .expect("write 1"); .expect("write 1");
// // after writing the response, THEN stream the body // after writing the response, THEN stream the body
// let _ = tx1.send(()); let _ = tx1.send(());
// sock.read(&mut buf).expect("read 2"); sock.read(&mut buf).expect("read 2");
// let _ = tx2.send(()); let _ = tx2.send(());
// let n2 = sock.read(&mut buf).expect("read 3"); let n2 = sock.read(&mut buf).expect("read 3");
// assert_ne!(n2, 0); assert_ne!(n2, 0);
// let second_get = "GET /b HTTP/1.1\r\n"; let second_get = "GET /b HTTP/1.1\r\n";
// assert_eq!(s(&buf[..second_get.len()]), second_get); assert_eq!(s(&buf[..second_get.len()]), second_get);
// sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
// .expect("write 2"); .expect("write 2");
// let _ = tx3.send(()); let _ = tx3.send(());
// }); });
// assert_eq!(connects.load(Ordering::Relaxed), 0); assert_eq!(connects.load(Ordering::Relaxed), 0);
// let delayed_body = rx1 let delayed_body = rx1
// .then(|_| tokio::time::sleep(Duration::from_millis(200))) .then(|_| tokio::time::sleep(Duration::from_millis(200)))
// .map(|_| Ok::<_, ()>("hello a")) .map(|_| Ok::<_, ()>(Bytes::from("hello a")))
// .map_err(|_| -> hyper::Error { panic!("rx1") }) .map_err(|_| -> std::convert::Infallible { panic!("rx1") })
// .into_stream(); .into_stream();
// let rx = rx2.expect("thread panicked"); let rx = rx2.expect("thread panicked");
// let req = Request::builder() let req = Request::builder()
// .method("POST") .method("POST")
// .uri(&*format!("http://{}/a", addr)) .uri(&*format!("http://{}/a", addr))
// .body(Body::wrap_stream(delayed_body)) .body(BodyExt::boxed(StreamBody::new(delayed_body)))
// .unwrap(); .unwrap();
// let client2 = client.clone(); let client2 = client.clone();
// // req 1 // req 1
// let fut = future::join(client.request(req), rx) let fut = future::join(client.request(req), rx)
// .then(|_| tokio::time::sleep(Duration::from_millis(200))) .then(|_| tokio::time::sleep(Duration::from_millis(200)))
// // req 2 // req 2
// .then(move |()| { .then(move |()| {
// let rx = rx3.expect("thread panicked"); let rx = rx3.expect("thread panicked");
// let req = Request::builder() let req = Request::builder()
// .uri(&*format!("http://{}/b", addr)) .uri(&*format!("http://{}/b", addr))
// .body(Body::empty()) .body(BodyExt::boxed(http_body_util::Empty::new()))
// .unwrap(); .unwrap();
// future::join(client2.request(req), rx).map(|r| r.0) future::join(client2.request(req), rx).map(|r| r.0)
// }); });
// rt.block_on(fut).unwrap(); rt.block_on(fut).unwrap();
// assert_eq!(connects.load(Ordering::Relaxed), 1); assert_eq!(connects.load(Ordering::Relaxed), 1);
// } }
#[tokio::test] #[tokio::test]
async fn client_keep_alive_eager_when_chunked() { async fn client_keep_alive_eager_when_chunked() {

View File

@@ -20,6 +20,7 @@ use futures_util::future::{self, Either, FutureExt, TryFutureExt};
use h2::client::SendRequest; use h2::client::SendRequest;
use h2::{RecvStream, SendStream}; use h2::{RecvStream, SendStream};
use http::header::{HeaderName, HeaderValue}; use http::header::{HeaderName, HeaderValue};
use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::net::{TcpListener, TcpStream as TkTcpStream}; use tokio::net::{TcpListener, TcpStream as TkTcpStream};
@@ -108,8 +109,7 @@ mod response_body_lengths {
b b
} }
Bd::Unknown(b) => { Bd::Unknown(b) => {
let (mut tx, body) = hyper::Body::channel(); let body = futures_util::stream::once(async move { Ok(b.into()) });
tx.try_send_data(b.into()).expect("try_send_data");
reply.body_stream(body); reply.body_stream(body);
b b
} }
@@ -2191,31 +2191,30 @@ async fn max_buf_size() {
.expect_err("should TooLarge error"); .expect_err("should TooLarge error");
} }
// TODO: Broken in PR #2896. Fix this if we don't have other tests to verify that the #[test]
// HTTP/1 server dispatcher properly handles a streaming body fn streaming_body() {
// #[test] use futures_util::StreamExt;
// fn streaming_body() { let _ = pretty_env_logger::try_init();
// let _ = pretty_env_logger::try_init();
// // disable keep-alive so we can use read_to_end // disable keep-alive so we can use read_to_end
// let server = serve_opts().keep_alive(false).serve(); let server = serve_opts().keep_alive(false).serve();
// static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 100] as _;
// let b = futures_util::stream::iter(S.iter()).map(|&s| Ok::<_, hyper::Error>(s)); let b =
// let b = hyper::Body::wrap_stream(b); futures_util::stream::iter(S.iter()).map(|&s| Ok::<_, BoxError>(Bytes::copy_from_slice(s)));
// server.reply().body_stream(b); server.reply().body_stream(b);
// let mut tcp = connect(server.addr()); let mut tcp = connect(server.addr());
// tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
// let mut buf = Vec::new(); let mut buf = Vec::new();
// tcp.read_to_end(&mut buf).expect("read 1"); tcp.read_to_end(&mut buf).expect("read 1");
// assert!( assert!(
// buf.starts_with(b"HTTP/1.1 200 OK\r\n"), buf.starts_with(b"HTTP/1.1 200 OK\r\n"),
// "response is 200 OK" "response is 200 OK"
// ); );
// assert_eq!(buf.len(), 100_789, "full streamed body read"); assert_eq!(buf.len(), 100_789, "full streamed body read");
// } }
#[test] #[test]
fn http1_response_with_http2_version() { fn http1_response_with_http2_version() {
@@ -2300,42 +2299,39 @@ async fn http2_service_error_sends_reset_reason() {
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
} }
// TODO: Fix this, broken in PR #2896 #[test]
// #[test] fn http2_body_user_error_sends_reset_reason() {
// fn http2_body_user_error_sends_reset_reason() { use std::error::Error;
// use std::error::Error; let server = serve();
// let server = serve(); let addr_str = format!("http://{}", server.addr());
// let addr_str = format!("http://{}", server.addr());
// let b = futures_util::stream::once(future::err::<String, _>(h2::Error::from( let b = futures_util::stream::once(future::err::<Bytes, BoxError>(Box::new(h2::Error::from(
// h2::Reason::INADEQUATE_SECURITY, h2::Reason::INADEQUATE_SECURITY,
// ))); ))));
// let b = hyper::Body::wrap_stream(b); server.reply().body_stream(b);
// server.reply().body_stream(b); let rt = support::runtime();
// let rt = support::runtime(); let err: hyper::Error = rt
.block_on(async move {
let client = Client::builder()
.http2_only(true)
.build_http::<hyper::Body>();
let uri = addr_str.parse().expect("server addr should parse");
// let err: hyper::Error = rt let mut res = client.get(uri).await?;
// .block_on(async move {
// let client = Client::builder()
// .http2_only(true)
// .build_http::<hyper::Body>();
// let uri = addr_str.parse().expect("server addr should parse");
// let mut res = client.get(uri).await?; while let Some(chunk) = res.body_mut().data().await {
chunk?;
}
Ok(())
})
.unwrap_err();
// while let Some(chunk) = res.body_mut().next().await { let h2_err = err.source().unwrap().downcast_ref::<h2::Error>().unwrap();
// chunk?;
// }
// Ok(())
// })
// .unwrap_err();
// let h2_err = err.source().unwrap().downcast_ref::<h2::Error>().unwrap(); assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
}
// assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
// }
struct Http2ReadyErrorSvc; struct Http2ReadyErrorSvc;
@@ -2687,7 +2683,7 @@ impl Serve {
} }
type BoxError = Box<dyn std::error::Error + Send + Sync>; type BoxError = Box<dyn std::error::Error + Send + Sync>;
type BoxFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, BoxError>> + Send>>; type BoxFuture = Pin<Box<dyn Future<Output = Result<Response<ReplyBody>, BoxError>> + Send>>;
struct ReplyBuilder<'a> { struct ReplyBuilder<'a> {
tx: &'a Mutex<spmc::Sender<Reply>>, tx: &'a Mutex<spmc::Sender<Reply>>,
@@ -2731,14 +2727,16 @@ impl<'a> ReplyBuilder<'a> {
} }
fn body<T: AsRef<[u8]>>(self, body: T) { fn body<T: AsRef<[u8]>>(self, body: T) {
self.tx let chunk = Bytes::copy_from_slice(body.as_ref());
.lock() let body = BodyExt::boxed(http_body_util::Full::new(chunk).map_err(|e| match e {}));
.unwrap() self.tx.lock().unwrap().send(Reply::Body(body)).unwrap();
.send(Reply::Body(body.as_ref().to_vec().into()))
.unwrap();
} }
fn body_stream(self, body: Body) { fn body_stream<S>(self, stream: S)
where
S: futures_util::Stream<Item = Result<Bytes, BoxError>> + Send + Sync + 'static,
{
let body = BodyExt::boxed(StreamBody::new(stream));
self.tx.lock().unwrap().send(Reply::Body(body)).unwrap(); self.tx.lock().unwrap().send(Reply::Body(body)).unwrap();
} }
@@ -2780,13 +2778,15 @@ struct TestService {
reply: spmc::Receiver<Reply>, reply: spmc::Receiver<Reply>,
} }
type ReplyBody = BoxBody<Bytes, BoxError>;
#[derive(Debug)] #[derive(Debug)]
enum Reply { enum Reply {
Status(hyper::StatusCode), Status(hyper::StatusCode),
ReasonPhrase(hyper::ext::ReasonPhrase), ReasonPhrase(hyper::ext::ReasonPhrase),
Version(hyper::Version), Version(hyper::Version),
Header(HeaderName, HeaderValue), Header(HeaderName, HeaderValue),
Body(hyper::Body), Body(ReplyBody),
Error(BoxError), Error(BoxError),
End, End,
} }
@@ -2799,7 +2799,7 @@ enum Msg {
} }
impl tower_service::Service<Request<Body>> for TestService { impl tower_service::Service<Request<Body>> for TestService {
type Response = Response<Body>; type Response = Response<ReplyBody>;
type Error = BoxError; type Error = BoxError;
type Future = BoxFuture; type Future = BoxFuture;
@@ -2832,8 +2832,10 @@ impl tower_service::Service<Request<Body>> for TestService {
} }
impl TestService { impl TestService {
fn build_reply(replies: spmc::Receiver<Reply>) -> Result<Response<Body>, BoxError> { fn build_reply(replies: spmc::Receiver<Reply>) -> Result<Response<ReplyBody>, BoxError> {
let mut res = Response::new(Body::empty()); let empty =
BodyExt::boxed(http_body_util::Empty::new().map_err(|e| -> BoxError { match e {} }));
let mut res = Response::new(empty);
while let Ok(reply) = replies.try_recv() { while let Ok(reply) = replies.try_recv() {
match reply { match reply {
Reply::Status(s) => { Reply::Status(s) => {
@@ -2880,7 +2882,7 @@ impl tower_service::Service<Request<Body>> for HelloWorld {
fn unreachable_service() -> impl tower_service::Service< fn unreachable_service() -> impl tower_service::Service<
http::Request<hyper::Body>, http::Request<hyper::Body>,
Response = http::Response<hyper::Body>, Response = http::Response<ReplyBody>,
Error = BoxError, Error = BoxError,
Future = BoxFuture, Future = BoxFuture,
> { > {