diff --git a/tests/client.rs b/tests/client.rs index 8cbf6744..34b34907 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -4,6 +4,7 @@ #[macro_use] extern crate matches; +use std::convert::Infallible; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpListener}; use std::pin::Pin; @@ -11,9 +12,11 @@ use std::task::{Context, Poll}; use std::thread; use std::time::Duration; +use http_body_util::{BodyExt, StreamBody}; use hyper::body::to_bytes as concat; use hyper::{Body, Client, Method, Request, StatusCode}; +use bytes::Bytes; use futures_channel::oneshot; use futures_core::{Future, Stream, TryFuture}; use futures_util::future::{self, FutureExt, TryFutureExt}; @@ -154,7 +157,7 @@ macro_rules! test { .build(connector); #[allow(unused_assignments, unused_mut)] - let mut body = Body::empty(); + let mut body = BodyExt::boxed(http_body_util::Empty::::new()); let mut req_builder = Request::builder(); $( test!(@client_request; req_builder, body, addr, $c_req_prop: $c_req_val); @@ -232,7 +235,11 @@ macro_rules! __client_req_prop { }}; ($req_builder:ident, $body:ident, $addr:ident, body: $body_e:expr) => {{ - $body = $body_e.into(); + $body = BodyExt::boxed(http_body_util::Full::from($body_e)); + }}; + + ($req_builder:ident, $body:ident, $addr:ident, body_stream: $body_e:expr) => {{ + $body = BodyExt::boxed(StreamBody::new($body_e)); }}; } @@ -457,95 +464,100 @@ test! { body: &b"hello"[..], } -// TODO: Fix this, broken in PR #2896 -// test! { -// name: client_get_req_body_sized, +test! { + name: client_get_req_body_sized, -// server: -// expected: "\ -// GET / HTTP/1.1\r\n\ -// content-length: 5\r\n\ -// host: {addr}\r\n\ -// \r\n\ -// hello\ -// ", -// reply: REPLY_OK, + server: + expected: "\ + GET / HTTP/1.1\r\n\ + content-length: 5\r\n\ + host: {addr}\r\n\ + \r\n\ + hello\ + ", + reply: REPLY_OK, -// client: -// request: { -// method: GET, -// url: "http://{addr}/", -// headers: { -// "Content-Length" => "5", -// }, -// body: (Body::wrap_stream(Body::from("hello"))), -// }, -// response: -// status: OK, -// headers: {}, -// body: None, -// } + client: + request: { + method: GET, + url: "http://{addr}/", + headers: { + "Content-Length" => "5", + }, + // use a "stream" (where Body doesn't know length) with a + // content-length header + body_stream: (futures_util::stream::once(async { + Ok::<_, Infallible>(Bytes::from("hello")) + })), + }, + response: + status: OK, + headers: {}, + body: None, +} -// TODO: Fix this, broken in PR #2896 -// test! { -// name: client_get_req_body_unknown, +test! { + name: client_get_req_body_unknown, -// server: -// expected: "\ -// GET / HTTP/1.1\r\n\ -// host: {addr}\r\n\ -// \r\n\ -// ", -// reply: REPLY_OK, + server: + expected: "\ + GET / HTTP/1.1\r\n\ + host: {addr}\r\n\ + \r\n\ + ", + reply: REPLY_OK, -// client: -// request: { -// method: GET, -// url: "http://{addr}/", -// // wrap_steam means we don't know the content-length, -// // but we're wrapping a non-empty stream. -// // -// // But since the headers cannot tell us, and the method typically -// // doesn't have a body, the body must be ignored. -// body: (Body::from("hello")), -// }, -// response: -// status: OK, -// headers: {}, -// body: None, -// } + client: + request: { + method: GET, + url: "http://{addr}/", + // steam means we don't know the content-length, + // but we're wrapping a non-empty stream. + // + // But since the headers cannot tell us, and the method typically + // doesn't have a body, the body must be ignored. + body_stream: (futures_util::stream::once(async { + Ok::<_, Infallible>(Bytes::from("hello")) + })), + }, + response: + status: OK, + headers: {}, + body: None, +} -// TODO: Fix this, broken in PR #2896 -// test! { -// name: client_get_req_body_unknown_http10, +test! { + name: client_get_req_body_unknown_http10, -// server: -// expected: "\ -// GET / HTTP/1.0\r\n\ -// host: {addr}\r\n\ -// \r\n\ -// ", -// reply: "HTTP/1.0 200 OK\r\ncontent-length: 0\r\n\r\n", + server: + expected: "\ + GET / HTTP/1.0\r\n\ + host: {addr}\r\n\ + \r\n\ + ", + reply: "HTTP/1.0 200 OK\r\ncontent-length: 0\r\n\r\n", -// client: -// request: { -// method: GET, -// url: "http://{addr}/", -// headers: { -// "transfer-encoding" => "chunked", -// }, -// version: HTTP_10, -// // wrap_steam means we don't know the content-length, -// // but we're wrapping a non-empty stream. -// // -// // But since the headers cannot tell us, the body must be ignored. -// body: (Body::from("hello")), -// }, -// response: -// status: OK, -// headers: {}, -// body: None, -// } + client: + request: { + method: GET, + url: "http://{addr}/", + headers: { + "transfer-encoding" => "chunked", + }, + version: HTTP_10, + // steam means we don't know the content-length, + // but we're wrapping a non-empty stream. + // + // But since the headers cannot tell us, the body must be ignored. + body_stream: (futures_util::stream::once(async { + Ok::<_, Infallible>(Bytes::from("hello")) + })), + }, + response: + status: OK, + headers: {}, + body: None, +} test! { name: client_post_sized, @@ -605,33 +617,35 @@ test! { body: None, } -// TODO: Fix this, broken in PR #2896 -// test! { -// name: client_post_unknown, +test! { + name: client_post_unknown, -// server: -// expected: "\ -// POST /chunks HTTP/1.1\r\n\ -// host: {addr}\r\n\ -// transfer-encoding: chunked\r\n\ -// \r\n\ -// B\r\n\ -// foo bar baz\r\n\ -// 0\r\n\r\n\ -// ", -// reply: REPLY_OK, + server: + expected: "\ + POST /chunks HTTP/1.1\r\n\ + host: {addr}\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + B\r\n\ + foo bar baz\r\n\ + 0\r\n\r\n\ + ", + reply: REPLY_OK, -// client: -// request: { -// method: POST, -// url: "http://{addr}/chunks", -// body: (Body::from("foo bar baz")), -// }, -// response: -// status: OK, -// headers: {}, -// body: None, -// } + client: + request: { + method: POST, + url: "http://{addr}/chunks", + // use a stream to "hide" that the full amount is known + body_stream: (futures_util::stream::once(async { + Ok::<_, Infallible>(Bytes::from("foo bar baz")) + })), + }, + response: + status: OK, + headers: {}, + body: None, +} test! { name: client_post_empty, @@ -1665,79 +1679,78 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 2); } - // TODO: Fix this, broken in PR #2896 - // #[test] - // fn client_keep_alive_when_response_before_request_body_ends() { - // let _ = pretty_env_logger::try_init(); - // let server = TcpListener::bind("127.0.0.1:0").unwrap(); - // let addr = server.local_addr().unwrap(); - // let rt = support::runtime(); + #[test] + fn client_keep_alive_when_response_before_request_body_ends() { + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let rt = support::runtime(); - // let connector = DebugConnector::new(); - // let connects = connector.connects.clone(); + let connector = DebugConnector::new(); + let connects = connector.connects.clone(); - // let client = Client::builder().build(connector); + let client = Client::builder().build(connector); - // let (tx1, rx1) = oneshot::channel(); - // let (tx2, rx2) = oneshot::channel(); - // let (tx3, rx3) = oneshot::channel(); - // thread::spawn(move || { - // let mut sock = server.accept().unwrap().0; - // sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - // sock.set_write_timeout(Some(Duration::from_secs(5))) - // .unwrap(); - // let mut buf = [0; 4096]; - // sock.read(&mut buf).expect("read 1"); - // sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - // .expect("write 1"); - // // after writing the response, THEN stream the body - // let _ = tx1.send(()); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (tx3, rx3) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 1"); + // after writing the response, THEN stream the body + let _ = tx1.send(()); - // sock.read(&mut buf).expect("read 2"); - // let _ = tx2.send(()); + sock.read(&mut buf).expect("read 2"); + let _ = tx2.send(()); - // let n2 = sock.read(&mut buf).expect("read 3"); - // assert_ne!(n2, 0); - // let second_get = "GET /b HTTP/1.1\r\n"; - // assert_eq!(s(&buf[..second_get.len()]), second_get); - // sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") - // .expect("write 2"); - // let _ = tx3.send(()); - // }); + let n2 = sock.read(&mut buf).expect("read 3"); + assert_ne!(n2, 0); + let second_get = "GET /b HTTP/1.1\r\n"; + assert_eq!(s(&buf[..second_get.len()]), second_get); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 2"); + let _ = tx3.send(()); + }); - // assert_eq!(connects.load(Ordering::Relaxed), 0); + assert_eq!(connects.load(Ordering::Relaxed), 0); - // let delayed_body = rx1 - // .then(|_| tokio::time::sleep(Duration::from_millis(200))) - // .map(|_| Ok::<_, ()>("hello a")) - // .map_err(|_| -> hyper::Error { panic!("rx1") }) - // .into_stream(); + let delayed_body = rx1 + .then(|_| tokio::time::sleep(Duration::from_millis(200))) + .map(|_| Ok::<_, ()>(Bytes::from("hello a"))) + .map_err(|_| -> std::convert::Infallible { panic!("rx1") }) + .into_stream(); - // let rx = rx2.expect("thread panicked"); - // let req = Request::builder() - // .method("POST") - // .uri(&*format!("http://{}/a", addr)) - // .body(Body::wrap_stream(delayed_body)) - // .unwrap(); - // let client2 = client.clone(); + let rx = rx2.expect("thread panicked"); + let req = Request::builder() + .method("POST") + .uri(&*format!("http://{}/a", addr)) + .body(BodyExt::boxed(StreamBody::new(delayed_body))) + .unwrap(); + let client2 = client.clone(); - // // req 1 - // let fut = future::join(client.request(req), rx) - // .then(|_| tokio::time::sleep(Duration::from_millis(200))) - // // req 2 - // .then(move |()| { - // let rx = rx3.expect("thread panicked"); - // let req = Request::builder() - // .uri(&*format!("http://{}/b", addr)) - // .body(Body::empty()) - // .unwrap(); - // future::join(client2.request(req), rx).map(|r| r.0) - // }); + // req 1 + let fut = future::join(client.request(req), rx) + .then(|_| tokio::time::sleep(Duration::from_millis(200))) + // req 2 + .then(move |()| { + let rx = rx3.expect("thread panicked"); + let req = Request::builder() + .uri(&*format!("http://{}/b", addr)) + .body(BodyExt::boxed(http_body_util::Empty::new())) + .unwrap(); + future::join(client2.request(req), rx).map(|r| r.0) + }); - // rt.block_on(fut).unwrap(); + rt.block_on(fut).unwrap(); - // assert_eq!(connects.load(Ordering::Relaxed), 1); - // } + assert_eq!(connects.load(Ordering::Relaxed), 1); + } #[tokio::test] async fn client_keep_alive_eager_when_chunked() { diff --git a/tests/server.rs b/tests/server.rs index 239e92c5..0e98b40f 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -20,6 +20,7 @@ use futures_util::future::{self, Either, FutureExt, TryFutureExt}; use h2::client::SendRequest; use h2::{RecvStream, SendStream}; use http::header::{HeaderName, HeaderValue}; +use http_body_util::{combinators::BoxBody, BodyExt, StreamBody}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::net::{TcpListener, TcpStream as TkTcpStream}; @@ -108,8 +109,7 @@ mod response_body_lengths { b } Bd::Unknown(b) => { - let (mut tx, body) = hyper::Body::channel(); - tx.try_send_data(b.into()).expect("try_send_data"); + let body = futures_util::stream::once(async move { Ok(b.into()) }); reply.body_stream(body); b } @@ -2191,31 +2191,30 @@ async fn max_buf_size() { .expect_err("should TooLarge error"); } -// TODO: Broken in PR #2896. Fix this if we don't have other tests to verify that the -// HTTP/1 server dispatcher properly handles a streaming body -// #[test] -// fn streaming_body() { -// let _ = pretty_env_logger::try_init(); +#[test] +fn streaming_body() { + use futures_util::StreamExt; + let _ = pretty_env_logger::try_init(); -// // disable keep-alive so we can use read_to_end -// let server = serve_opts().keep_alive(false).serve(); + // disable keep-alive so we can use read_to_end + let server = serve_opts().keep_alive(false).serve(); -// static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; -// let b = futures_util::stream::iter(S.iter()).map(|&s| Ok::<_, hyper::Error>(s)); -// let b = hyper::Body::wrap_stream(b); -// server.reply().body_stream(b); + static S: &[&[u8]] = &[&[b'x'; 1_000] as &[u8]; 100] as _; + let b = + futures_util::stream::iter(S.iter()).map(|&s| Ok::<_, BoxError>(Bytes::copy_from_slice(s))); + server.reply().body_stream(b); -// let mut tcp = connect(server.addr()); -// tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); -// let mut buf = Vec::new(); -// tcp.read_to_end(&mut buf).expect("read 1"); + let mut tcp = connect(server.addr()); + tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); + let mut buf = Vec::new(); + tcp.read_to_end(&mut buf).expect("read 1"); -// assert!( -// buf.starts_with(b"HTTP/1.1 200 OK\r\n"), -// "response is 200 OK" -// ); -// assert_eq!(buf.len(), 100_789, "full streamed body read"); -// } + assert!( + buf.starts_with(b"HTTP/1.1 200 OK\r\n"), + "response is 200 OK" + ); + assert_eq!(buf.len(), 100_789, "full streamed body read"); +} #[test] fn http1_response_with_http2_version() { @@ -2300,42 +2299,39 @@ async fn http2_service_error_sends_reset_reason() { assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); } -// TODO: Fix this, broken in PR #2896 -// #[test] -// fn http2_body_user_error_sends_reset_reason() { -// use std::error::Error; -// let server = serve(); -// let addr_str = format!("http://{}", server.addr()); +#[test] +fn http2_body_user_error_sends_reset_reason() { + use std::error::Error; + let server = serve(); + let addr_str = format!("http://{}", server.addr()); -// let b = futures_util::stream::once(future::err::(h2::Error::from( -// h2::Reason::INADEQUATE_SECURITY, -// ))); -// let b = hyper::Body::wrap_stream(b); + let b = futures_util::stream::once(future::err::(Box::new(h2::Error::from( + h2::Reason::INADEQUATE_SECURITY, + )))); + server.reply().body_stream(b); -// server.reply().body_stream(b); + let rt = support::runtime(); -// let rt = support::runtime(); + let err: hyper::Error = rt + .block_on(async move { + let client = Client::builder() + .http2_only(true) + .build_http::(); + let uri = addr_str.parse().expect("server addr should parse"); -// let err: hyper::Error = rt -// .block_on(async move { -// let client = Client::builder() -// .http2_only(true) -// .build_http::(); -// let uri = addr_str.parse().expect("server addr should parse"); + let mut res = client.get(uri).await?; -// let mut res = client.get(uri).await?; + while let Some(chunk) = res.body_mut().data().await { + chunk?; + } + Ok(()) + }) + .unwrap_err(); -// while let Some(chunk) = res.body_mut().next().await { -// chunk?; -// } -// Ok(()) -// }) -// .unwrap_err(); + let h2_err = err.source().unwrap().downcast_ref::().unwrap(); -// let h2_err = err.source().unwrap().downcast_ref::().unwrap(); - -// assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); -// } + assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); +} struct Http2ReadyErrorSvc; @@ -2687,7 +2683,7 @@ impl Serve { } type BoxError = Box; -type BoxFuture = Pin, BoxError>> + Send>>; +type BoxFuture = Pin, BoxError>> + Send>>; struct ReplyBuilder<'a> { tx: &'a Mutex>, @@ -2731,14 +2727,16 @@ impl<'a> ReplyBuilder<'a> { } fn body>(self, body: T) { - self.tx - .lock() - .unwrap() - .send(Reply::Body(body.as_ref().to_vec().into())) - .unwrap(); + let chunk = Bytes::copy_from_slice(body.as_ref()); + let body = BodyExt::boxed(http_body_util::Full::new(chunk).map_err(|e| match e {})); + self.tx.lock().unwrap().send(Reply::Body(body)).unwrap(); } - fn body_stream(self, body: Body) { + fn body_stream(self, stream: S) + where + S: futures_util::Stream> + Send + Sync + 'static, + { + let body = BodyExt::boxed(StreamBody::new(stream)); self.tx.lock().unwrap().send(Reply::Body(body)).unwrap(); } @@ -2780,13 +2778,15 @@ struct TestService { reply: spmc::Receiver, } +type ReplyBody = BoxBody; + #[derive(Debug)] enum Reply { Status(hyper::StatusCode), ReasonPhrase(hyper::ext::ReasonPhrase), Version(hyper::Version), Header(HeaderName, HeaderValue), - Body(hyper::Body), + Body(ReplyBody), Error(BoxError), End, } @@ -2799,7 +2799,7 @@ enum Msg { } impl tower_service::Service> for TestService { - type Response = Response; + type Response = Response; type Error = BoxError; type Future = BoxFuture; @@ -2832,8 +2832,10 @@ impl tower_service::Service> for TestService { } impl TestService { - fn build_reply(replies: spmc::Receiver) -> Result, BoxError> { - let mut res = Response::new(Body::empty()); + fn build_reply(replies: spmc::Receiver) -> Result, BoxError> { + let empty = + BodyExt::boxed(http_body_util::Empty::new().map_err(|e| -> BoxError { match e {} })); + let mut res = Response::new(empty); while let Ok(reply) = replies.try_recv() { match reply { Reply::Status(s) => { @@ -2880,7 +2882,7 @@ impl tower_service::Service> for HelloWorld { fn unreachable_service() -> impl tower_service::Service< http::Request, - Response = http::Response, + Response = http::Response, Error = BoxError, Future = BoxFuture, > {