feat(client,server): remove tcp feature and code (#2929)

This removes the `tcp` feature from hyper's `Cargo.toml`, and the code it enabled:

- `HttpConnector`
- `GaiResolver`
- `AddrStream`

And parts of `Client` and `Server` that used those types. Alternatives will be available in the `hyper-util` crate.

Closes #2856 
Co-authored-by: MrGunflame <mrgunflame@protonmail.com>
This commit is contained in:
Sean McArthur
2022-07-29 10:07:09 -07:00
committed by GitHub
parent d4b5bd4ee6
commit 0c8ee93d7f
40 changed files with 1565 additions and 2676 deletions

View File

@@ -5,6 +5,7 @@
extern crate matches;
use std::convert::Infallible;
use std::fmt;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener};
use std::pin::Pin;
@@ -12,9 +13,11 @@ use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use http::uri::PathAndQuery;
use http_body_util::{BodyExt, StreamBody};
use hyper::body::to_bytes as concat;
use hyper::{Body, Client, Method, Request, StatusCode};
use hyper::header::HeaderValue;
use hyper::{Body, Method, Request, StatusCode, Uri, Version};
use bytes::Bytes;
use futures_channel::oneshot;
@@ -31,6 +34,71 @@ fn tcp_connect(addr: &SocketAddr) -> impl Future<Output = std::io::Result<TcpStr
TcpStream::connect(*addr)
}
struct HttpInfo {
remote_addr: SocketAddr,
}
#[derive(Debug)]
enum Error {
Io(std::io::Error),
Hyper(hyper::Error),
AbsoluteUriRequired,
UnsupportedVersion,
}
impl Error {
fn is_incomplete_message(&self) -> bool {
match self {
Self::Hyper(err) => err.is_incomplete_message(),
_ => false,
}
}
fn is_parse(&self) -> bool {
match self {
Self::Hyper(err) => err.is_parse(),
_ => false,
}
}
fn is_parse_too_large(&self) -> bool {
match self {
Self::Hyper(err) => err.is_parse_too_large(),
_ => false,
}
}
fn is_parse_status(&self) -> bool {
match self {
Self::Hyper(err) => err.is_parse_status(),
_ => false,
}
}
}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Io(err) => err.fmt(fmt),
Self::Hyper(err) => err.fmt(fmt),
Self::AbsoluteUriRequired => write!(fmt, "client requires absolute-form URIs"),
Self::UnsupportedVersion => write!(fmt, "request has unsupported HTTP version"),
}
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::Io(err)
}
}
impl From<hyper::Error> for Error {
fn from(err: hyper::Error) -> Self {
Self::Hyper(err)
}
}
macro_rules! test {
(
name: $name:ident,
@@ -110,7 +178,7 @@ macro_rules! test {
let _ = pretty_env_logger::try_init();
let rt = support::runtime();
let err: ::hyper::Error = test! {
let err: Error = test! {
INNER;
name: $name,
runtime: &rt,
@@ -123,7 +191,7 @@ macro_rules! test {
)*},
}.unwrap_err();
fn infer_closure<F: FnOnce(&::hyper::Error) -> bool>(f: F) -> F { f }
fn infer_closure<F: FnOnce(&Error) -> bool>(f: F) -> F { f }
let closure = infer_closure($err);
if !closure(&err) {
@@ -151,22 +219,123 @@ macro_rules! test {
let addr = server.local_addr().expect("local_addr");
let rt = $runtime;
let connector = ::hyper::client::HttpConnector::new();
let client = Client::builder()
$($(.$c_opt_prop($c_opt_val))*)?
.build(connector);
#[allow(unused_assignments, unused_mut)]
let mut body = BodyExt::boxed(http_body_util::Empty::<bytes::Bytes>::new());
let mut req_builder = Request::builder();
$(
test!(@client_request; req_builder, body, addr, $c_req_prop: $c_req_val);
)*
let req = req_builder
let mut req = req_builder
.body(body)
.expect("request builder");
let res = client.request(req);
let res = async move {
// Wrapper around hyper::client::conn::Builder with set_host field to mimic
// hyper::client::Builder.
struct Builder {
inner: hyper::client::conn::Builder,
set_host: bool,
http09_responses: bool,
http2_only: bool,
}
impl Builder {
fn new() -> Self {
Self {
inner: hyper::client::conn::Builder::new(),
set_host: true,
http09_responses: false,
http2_only: false,
}
}
#[allow(unused)]
fn set_host(&mut self, val: bool) -> &mut Self {
self.set_host = val;
self
}
#[allow(unused)]
fn http09_responses(&mut self, val: bool) -> &mut Self {
self.http09_responses = val;
self.inner.http09_responses(val);
self
}
#[allow(unused)]
fn http2_only(&mut self, val: bool) -> &mut Self {
self.http2_only = val;
self.inner.http2_only(val);
self
}
}
impl std::ops::Deref for Builder {
type Target = hyper::client::conn::Builder;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl std::ops::DerefMut for Builder {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
#[allow(unused_mut)]
let mut builder = Builder::new();
$(builder$(.$c_opt_prop($c_opt_val))*;)?
if req.version() == Version::HTTP_09 && !builder.http09_responses {
return Err(Error::UnsupportedVersion);
}
if req.version() == Version::HTTP_2 && !builder.http2_only {
return Err(Error::UnsupportedVersion);
}
let host = req.uri().host().ok_or(Error::AbsoluteUriRequired)?;
let port = req.uri().port_u16().unwrap_or(80);
let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
let extra = HttpInfo {
remote_addr: stream.peer_addr().unwrap(),
};
if builder.set_host {
let host = req.uri().host().expect("no host in uri");
let port = req.uri().port_u16().expect("no port in uri");
let host = format!("{}:{}", host, port);
req.headers_mut().append("Host", HeaderValue::from_str(&host).unwrap());
}
let (mut sender, conn) = builder.handshake(stream).await?;
tokio::task::spawn(async move {
if let Err(err) = conn.await {
panic!("{}", err);
}
});
let mut builder = Uri::builder();
if req.method() == Method::CONNECT {
builder = builder.path_and_query(format!("{}:{}", req.uri().host().unwrap(), req.uri().port_u16().unwrap()));
} else {
builder = builder.path_and_query(req.uri().path_and_query().cloned().unwrap_or(PathAndQuery::from_static("/")));
}
*req.uri_mut() = builder.build().unwrap();
let mut resp = sender.send_request(req).await?;
resp.extensions_mut().insert(extra);
Ok(resp)
};
let (tx, rx) = oneshot::channel();
@@ -188,7 +357,7 @@ macro_rules! test {
assert_eq!(s(&buf[..n]), expected);
inc.write_all($server_reply.as_ref()).expect("write_all");
let _ = tx.send(Ok::<_, hyper::Error>(()));
let _ = tx.send(Ok::<_, Error>(()));
}).expect("thread spawn");
let rx = rx.expect("thread panicked");
@@ -197,10 +366,10 @@ macro_rules! test {
// Always check that HttpConnector has set the "extra" info...
let extra = resp
.extensions_mut()
.remove::<::hyper::client::connect::HttpInfo>()
.remove::<HttpInfo>()
.expect("HttpConnector should set HttpInfo");
assert_eq!(extra.remote_addr(), addr, "HttpInfo should have server addr");
assert_eq!(extra.remote_addr, addr, "HttpInfo should have server addr");
resp
})
@@ -1174,7 +1343,7 @@ mod dispatch_impl {
use super::support;
use hyper::body::HttpBody;
use hyper::client::connect::{Connected, Connection, HttpConnector};
use hyper::client::connect::{Connected, Connection};
use hyper::Client;
#[test]
@@ -1186,10 +1355,7 @@ mod dispatch_impl {
let addr = server.local_addr().unwrap();
let rt = support::runtime();
let (closes_tx, closes) = mpsc::channel(10);
let client = Client::builder().build(DebugConnector::with_http_and_closes(
HttpConnector::new(),
closes_tx,
));
let client = Client::builder().build(DebugConnector::with_closes(closes_tx));
let (tx1, rx1) = oneshot::channel();
@@ -1259,10 +1425,7 @@ mod dispatch_impl {
});
let res = {
let client = Client::builder().build(DebugConnector::with_http_and_closes(
HttpConnector::new(),
closes_tx,
));
let client = Client::builder().build(DebugConnector::with_closes(closes_tx));
let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
@@ -1322,10 +1485,7 @@ mod dispatch_impl {
support::runtime().block_on(client_drop_rx.into_future())
});
let client = Client::builder().build(DebugConnector::with_http_and_closes(
HttpConnector::new(),
closes_tx,
));
let client = Client::builder().build(DebugConnector::with_closes(closes_tx));
let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
@@ -1385,10 +1545,7 @@ mod dispatch_impl {
});
let res = {
let client = Client::builder().build(DebugConnector::with_http_and_closes(
HttpConnector::new(),
closes_tx,
));
let client = Client::builder().build(DebugConnector::with_closes(closes_tx));
let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
@@ -1438,10 +1595,7 @@ mod dispatch_impl {
let rx = rx1.expect("thread panicked");
let res = {
let client = Client::builder().build(DebugConnector::with_http_and_closes(
HttpConnector::new(),
closes_tx,
));
let client = Client::builder().build(DebugConnector::with_closes(closes_tx));
let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
@@ -1490,9 +1644,9 @@ mod dispatch_impl {
let _ = rx2.recv();
});
let client = Client::builder().pool_max_idle_per_host(0).build(
DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx),
);
let client = Client::builder()
.pool_max_idle_per_host(0)
.build(DebugConnector::with_closes(closes_tx));
let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
@@ -1536,10 +1690,7 @@ mod dispatch_impl {
let _ = tx1.send(());
});
let client = Client::builder().build(DebugConnector::with_http_and_closes(
HttpConnector::new(),
closes_tx,
));
let client = Client::builder().build(DebugConnector::with_closes(closes_tx));
let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
@@ -2085,7 +2236,6 @@ mod dispatch_impl {
#[derive(Clone)]
struct DebugConnector {
http: HttpConnector,
closes: mpsc::Sender<()>,
connects: Arc<AtomicUsize>,
is_proxy: bool,
@@ -2094,14 +2244,12 @@ mod dispatch_impl {
impl DebugConnector {
fn new() -> DebugConnector {
let http = HttpConnector::new();
let (tx, _) = mpsc::channel(10);
DebugConnector::with_http_and_closes(http, tx)
DebugConnector::with_closes(tx)
}
fn with_http_and_closes(http: HttpConnector, closes: mpsc::Sender<()>) -> DebugConnector {
fn with_closes(closes: mpsc::Sender<()>) -> DebugConnector {
DebugConnector {
http,
closes,
connects: Arc::new(AtomicUsize::new(0)),
is_proxy: false,
@@ -2117,12 +2265,11 @@ mod dispatch_impl {
impl hyper::service::Service<Uri> for DebugConnector {
type Response = DebugStream;
type Error = <HttpConnector as hyper::service::Service<Uri>>::Error;
type Error = std::io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// don't forget to check inner service is ready :)
hyper::service::Service::<Uri>::poll_ready(&mut self.http, cx)
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, dst: Uri) -> Self::Future {
@@ -2130,12 +2277,20 @@ mod dispatch_impl {
let closes = self.closes.clone();
let is_proxy = self.is_proxy;
let is_alpn_h2 = self.alpn_h2;
Box::pin(self.http.call(dst).map_ok(move |tcp| DebugStream {
tcp,
on_drop: closes,
is_alpn_h2,
is_proxy,
}))
Box::pin(async move {
let host = dst.host().expect("no host in uri");
let port = dst.port_u16().expect("no port in uri");
let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
Ok(DebugStream {
tcp: stream,
on_drop: closes,
is_alpn_h2,
is_proxy,
})
})
}
}
@@ -2188,7 +2343,7 @@ mod dispatch_impl {
impl Connection for DebugStream {
fn connected(&self) -> Connected {
let connected = self.tcp.connected().proxy(self.is_proxy);
let connected = Connected::new().proxy(self.is_proxy);
if self.is_alpn_h2 {
connected.negotiated_h2()
@@ -2744,27 +2899,45 @@ mod conn {
#[tokio::test]
async fn http2_detect_conn_eof() {
use futures_util::future;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Response, Server};
let _ = pretty_env_logger::try_init();
let server = Server::bind(&([127, 0, 0, 1], 0).into())
.http2_only(true)
.serve(make_service_fn(|_| async move {
Ok::<_, hyper::Error>(service_fn(|_req| {
future::ok::<_, hyper::Error>(Response::new(Body::empty()))
}))
}));
let addr = server.local_addr();
let (shdn_tx, shdn_rx) = oneshot::channel();
let addr = SocketAddr::from(([127, 0, 0, 1], 0));
let listener = TkTcpListener::bind(addr).await.unwrap();
let addr = listener.local_addr().unwrap();
let (shdn_tx, mut shdn_rx) = tokio::sync::watch::channel(false);
tokio::task::spawn(async move {
server
.with_graceful_shutdown(async move {
let _ = shdn_rx.await;
})
.await
.expect("server")
use hyper::server::conn::Http;
use hyper::service::service_fn;
loop {
tokio::select! {
res = listener.accept() => {
let (stream, _) = res.unwrap();
let service = service_fn(|_:Request<Body>| future::ok::<Response<Body>, hyper::Error>(Response::new(Body::empty())));
let mut shdn_rx = shdn_rx.clone();
tokio::task::spawn(async move {
let mut conn = Http::new().http2_only(true).serve_connection(stream, service);
tokio::select! {
res = &mut conn => {
res.unwrap();
}
_ = shdn_rx.changed() => {
Pin::new(&mut conn).graceful_shutdown();
conn.await.unwrap();
}
}
});
}
_ = shdn_rx.changed() => {
break;
}
}
}
});
let io = tcp_connect(&addr).await.expect("tcp connect");
@@ -2796,7 +2969,7 @@ mod conn {
.expect("client poll ready after");
// Trigger the server shutdown...
let _ = shdn_tx.send(());
let _ = shdn_tx.send(true);
// Allow time for graceful shutdown roundtrips...
tokio::time::sleep(Duration::from_millis(100)).await;

View File

@@ -21,15 +21,14 @@ use h2::client::SendRequest;
use h2::{RecvStream, SendStream};
use http::header::{HeaderName, HeaderValue};
use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio::net::{TcpListener, TcpStream as TkTcpStream};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream};
use hyper::body::HttpBody as _;
use hyper::client::Client;
use hyper::body::HttpBody;
use hyper::server::conn::Http;
use hyper::server::Server;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, StatusCode, Version};
use hyper::service::service_fn;
use hyper::{Body, Method, Request, Response, StatusCode, Uri, Version};
mod support;
@@ -320,15 +319,11 @@ mod response_body_lengths {
#[tokio::test]
async fn http2_auto_response_with_known_length() {
use http_body::Body;
let server = serve();
let addr_str = format!("http://{}", server.addr());
server.reply().body("Hello, World!");
let client = Client::builder()
.http2_only(true)
.build_http::<hyper::Body>();
let client = TestClient::new().http2_only();
let uri = addr_str
.parse::<hyper::Uri>()
.expect("server addr should parse");
@@ -340,8 +335,6 @@ mod response_body_lengths {
#[tokio::test]
async fn http2_auto_response_with_conflicting_lengths() {
use http_body::Body;
let server = serve();
let addr_str = format!("http://{}", server.addr());
server
@@ -349,9 +342,7 @@ mod response_body_lengths {
.header("content-length", "10")
.body("Hello, World!");
let client = Client::builder()
.http2_only(true)
.build_http::<hyper::Body>();
let client = TestClient::new().http2_only();
let uri = addr_str
.parse::<hyper::Uri>()
.expect("server addr should parse");
@@ -363,15 +354,11 @@ mod response_body_lengths {
#[tokio::test]
async fn http2_implicit_empty_size_hint() {
use http_body::Body;
let server = serve();
let addr_str = format!("http://{}", server.addr());
server.reply();
let client = Client::builder()
.http2_only(true)
.build_http::<hyper::Body>();
let client = TestClient::new().http2_only();
let uri = addr_str
.parse::<hyper::Uri>()
.expect("server addr should parse");
@@ -1480,8 +1467,6 @@ async fn header_read_timeout_slow_writes_multiple_requests() {
#[tokio::test]
async fn upgrades() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let _ = pretty_env_logger::try_init();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
@@ -1539,8 +1524,6 @@ async fn upgrades() {
#[tokio::test]
async fn http_connect() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let _ = pretty_env_logger::try_init();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
@@ -1675,15 +1658,19 @@ async fn upgrades_ignored() {
future::ok::<_, hyper::Error>(Response::new(hyper::Body::empty()))
});
let (socket, _) = listener.accept().await.unwrap();
Http::new()
.serve_connection(socket, svc)
.with_upgrades()
.await
.expect("server task");
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::task::spawn(async move {
Http::new()
.serve_connection(socket, svc)
.with_upgrades()
.await
.expect("server task");
});
}
});
let client = hyper::Client::new();
let client = TestClient::new();
let url = format!("http://{}/", addr);
let make_req = || {
@@ -1705,8 +1692,6 @@ async fn upgrades_ignored() {
#[tokio::test]
async fn http_connect_new() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let _ = pretty_env_logger::try_init();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
@@ -1771,8 +1756,6 @@ async fn http_connect_new() {
#[tokio::test]
async fn h2_connect() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let _ = pretty_env_logger::try_init();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
@@ -1843,7 +1826,6 @@ async fn h2_connect() {
async fn h2_connect_multiplex() {
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let _ = pretty_env_logger::try_init();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
@@ -1954,8 +1936,6 @@ async fn h2_connect_multiplex() {
#[tokio::test]
async fn h2_connect_large_body() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let _ = pretty_env_logger::try_init();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
@@ -2031,8 +2011,6 @@ async fn h2_connect_large_body() {
#[tokio::test]
async fn h2_connect_empty_frames() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let _ = pretty_env_logger::try_init();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
@@ -2225,8 +2203,8 @@ fn http1_response_with_http2_version() {
server.reply().version(hyper::Version::HTTP_2);
let client = TestClient::new();
rt.block_on({
let client = Client::new();
let uri = addr_str.parse().expect("server addr should parse");
client.get(uri)
})
@@ -2240,10 +2218,8 @@ fn try_h2() {
let rt = support::runtime();
let client = TestClient::new().http2_only();
rt.block_on({
let client = Client::builder()
.http2_only(true)
.build_http::<hyper::Body>();
let uri = addr_str.parse().expect("server addr should parse");
client.get(uri).map_ok(|_| ()).map_err(|_e| ())
@@ -2260,10 +2236,8 @@ fn http1_only() {
let rt = support::runtime();
let client = TestClient::new().http2_only();
rt.block_on({
let client = Client::builder()
.http2_only(true)
.build_http::<hyper::Body>();
let uri = addr_str.parse().expect("server addr should parse");
client.get(uri)
})
@@ -2283,9 +2257,8 @@ async fn http2_service_error_sends_reset_reason() {
let uri = addr_str.parse().expect("server addr should parse");
dbg!("start");
let err = dbg!(Client::builder()
.http2_only(true)
.build_http::<hyper::Body>()
let err = dbg!(TestClient::new()
.http2_only()
.get(uri)
.await
.expect_err("client.get"));
@@ -2314,9 +2287,8 @@ fn http2_body_user_error_sends_reset_reason() {
let err: hyper::Error = rt
.block_on(async move {
let client = Client::builder()
.http2_only(true)
.build_http::<hyper::Body>();
let client = TestClient::new().http2_only();
let uri = addr_str.parse().expect("server addr should parse");
let mut res = client.get(uri).await?;
@@ -2363,22 +2335,33 @@ async fn http2_service_poll_ready_error_sends_goaway() {
let _ = pretty_env_logger::try_init();
let server = hyper::Server::bind(&([127, 0, 0, 1], 0).into())
.http2_only(true)
.serve(make_service_fn(|_| async move {
Ok::<_, BoxError>(Http2ReadyErrorSvc)
}));
let listener = TkTcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let addr_str = format!("http://{}", server.local_addr());
let addr_str = format!("http://{}", listener.local_addr().unwrap());
tokio::task::spawn(async move {
server.await.expect("server");
loop {
tokio::select! {
res = listener.accept() => {
let (stream, _) = res.unwrap();
tokio::task::spawn(async move {
let mut http = Http::new();
http.http2_only(true);
let service = Http2ReadyErrorSvc;
http.serve_connection(stream, service).await.unwrap();
});
}
}
}
});
let uri = addr_str.parse().expect("server addr should parse");
let err = dbg!(Client::builder()
.http2_only(true)
.build_http::<hyper::Body>()
let err = dbg!(TestClient::new()
.http2_only()
.get(uri)
.await
.expect_err("client.get should fail"));
@@ -2948,9 +2931,9 @@ impl ServeOptions {
let (addr_tx, addr_rx) = mpsc::channel();
let (msg_tx, msg_rx) = mpsc::channel();
let (reply_tx, reply_rx) = spmc::channel();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let addr = ([127, 0, 0, 1], 0).into();
let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
let thread_name = format!(
"test-server-{}",
@@ -2961,36 +2944,46 @@ impl ServeOptions {
let thread = thread::Builder::new()
.name(thread_name)
.spawn(move || {
support::runtime()
.block_on(async move {
let service = make_service_fn(|_| {
let msg_tx = msg_tx.clone();
let reply_rx = reply_rx.clone();
future::ok::<_, BoxError>(TestService {
tx: msg_tx,
reply: reply_rx,
})
});
support::runtime().block_on(async move {
let listener = TkTcpListener::bind(addr).await.unwrap();
let builder = Server::bind(&addr);
addr_tx
.send(listener.local_addr().unwrap())
.expect("server addr tx");
#[cfg(feature = "http1")]
let builder = builder
.http1_only(_options.http1_only)
.http1_keepalive(_options.keep_alive)
.http1_pipeline_flush(_options.pipeline);
loop {
let msg_tx = msg_tx.clone();
let reply_rx = reply_rx.clone();
let server = builder.serve(service);
tokio::select! {
res = listener.accept() => {
let (stream, _) = res.unwrap();
addr_tx.send(server.local_addr()).expect("server addr tx");
tokio::task::spawn(async move {
let mut http = Http::new();
server
.with_graceful_shutdown(async {
let _ = shutdown_rx.await;
})
.await
})
.expect("serve()");
#[cfg(feature = "http1")]
let http = http
.http1_only(_options.http1_only)
.http1_keep_alive(_options.keep_alive)
.pipeline_flush(_options.pipeline);
let msg_tx = msg_tx.clone();
let reply_rx = reply_rx.clone();
let service = TestService {
tx: msg_tx,
reply: reply_rx,
};
http.serve_connection(stream, service).await.unwrap();
});
}
_ = &mut shutdown_rx => {
break;
}
}
}
})
})
.expect("thread spawn");
@@ -3119,3 +3112,49 @@ impl Drop for Dropped {
self.0.store(true, Ordering::SeqCst);
}
}
struct TestClient {
http2_only: bool,
}
impl TestClient {
fn new() -> Self {
Self { http2_only: false }
}
fn http2_only(mut self) -> Self {
self.http2_only = true;
self
}
async fn get(&self, uri: Uri) -> Result<Response<Body>, hyper::Error> {
self.request(
Request::builder()
.uri(uri)
.method(Method::GET)
.body(Body::empty())
.unwrap(),
)
.await
}
async fn request(&self, req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
let host = req.uri().host().expect("uri has no host");
let port = req.uri().port_u16().expect("uri has no port");
let mut builder = hyper::client::conn::Builder::new();
builder.http2_only(self.http2_only);
let stream = TkTcpStream::connect(format!("{}:{}", host, port))
.await
.unwrap();
let (mut sender, conn) = builder.handshake(stream).await.unwrap();
tokio::task::spawn(async move {
conn.await.unwrap();
});
sender.send_request(req).await
}
}

View File

@@ -6,9 +6,12 @@ use std::sync::{
Arc, Mutex,
};
use hyper::client::HttpConnector;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client, Request, Response, Server, Version};
use hyper::client::conn::Builder;
use hyper::server::conn::Http;
use tokio::net::{TcpListener, TcpStream};
use hyper::service::service_fn;
use hyper::{Body, Request, Response, Version};
pub use futures_util::{
future, FutureExt as _, StreamExt as _, TryFutureExt as _, TryStreamExt as _,
@@ -326,16 +329,20 @@ async fn async_test(cfg: __TestConfig) {
Version::HTTP_11
};
let connector = HttpConnector::new();
let client = Client::builder()
.http2_only(cfg.client_version == 2)
.build::<_, Body>(connector);
let http2_only = cfg.server_version == 2;
let serve_handles = Arc::new(Mutex::new(cfg.server_msgs));
let listener = TcpListener::bind(&SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let mut addr = listener.local_addr().unwrap();
let expected_connections = cfg.connections;
let mut cnt = 0;
let new_service = make_service_fn(move |_| {
tokio::task::spawn(async move {
let mut cnt = 0;
cnt += 1;
assert!(
cnt <= expected_connections,
@@ -344,98 +351,108 @@ async fn async_test(cfg: __TestConfig) {
cnt
);
// Move a clone into the service_fn
let serve_handles = serve_handles.clone();
future::ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
let (sreq, sres) = serve_handles.lock().unwrap().remove(0);
loop {
let (stream, _) = listener.accept().await.expect("server error");
assert_eq!(req.uri().path(), sreq.uri, "client path");
assert_eq!(req.method(), &sreq.method, "client method");
assert_eq!(req.version(), version, "client version");
for func in &sreq.headers {
func(&req.headers());
}
let sbody = sreq.body;
hyper::body::to_bytes(req).map_ok(move |body| {
assert_eq!(body.as_ref(), sbody.as_slice(), "client body");
// Move a clone into the service_fn
let serve_handles = serve_handles.clone();
let service = service_fn(move |req: Request<Body>| {
let (sreq, sres) = serve_handles.lock().unwrap().remove(0);
let mut res = Response::builder()
.status(sres.status)
.body(Body::from(sres.body))
.expect("Response::build");
*res.headers_mut() = sres.headers;
res
})
}))
assert_eq!(req.uri().path(), sreq.uri, "client path");
assert_eq!(req.method(), &sreq.method, "client method");
assert_eq!(req.version(), version, "client version");
for func in &sreq.headers {
func(&req.headers());
}
let sbody = sreq.body;
hyper::body::to_bytes(req).map_ok(move |body| {
assert_eq!(body.as_ref(), sbody.as_slice(), "client body");
let mut res = Response::builder()
.status(sres.status)
.body(Body::from(sres.body))
.expect("Response::build");
*res.headers_mut() = sres.headers;
res
})
});
tokio::task::spawn(async move {
Http::new()
.http2_only(http2_only)
.serve_connection(stream, service)
.await
.expect("server error");
});
}
});
let server = hyper::Server::bind(&SocketAddr::from(([127, 0, 0, 1], 0)))
.http2_only(cfg.server_version == 2)
.serve(new_service);
let mut addr = server.local_addr();
tokio::task::spawn(server.map(|result| {
result.expect("server error");
}));
if cfg.proxy {
let (proxy_addr, proxy) = naive_proxy(ProxyConfig {
connections: cfg.connections,
dst: addr,
version: cfg.server_version,
});
})
.await;
tokio::task::spawn(proxy);
addr = proxy_addr;
}
let make_request = Arc::new(
move |client: &Client<HttpConnector>, creq: __CReq, cres: __CRes| {
let uri = format!("http://{}{}", addr, creq.uri);
let mut req = Request::builder()
.method(creq.method)
.uri(uri)
//.headers(creq.headers)
.body(creq.body.into())
.expect("Request::build");
*req.headers_mut() = creq.headers;
let cstatus = cres.status;
let cheaders = cres.headers;
let cbody = cres.body;
let make_request = Arc::new(move |creq: __CReq, cres: __CRes| {
let uri = format!("http://{}{}", addr, creq.uri);
let mut req = Request::builder()
.method(creq.method)
.uri(uri)
//.headers(creq.headers)
.body(creq.body.into())
.expect("Request::build");
*req.headers_mut() = creq.headers;
let cstatus = cres.status;
let cheaders = cres.headers;
let cbody = cres.body;
client
.request(req)
.and_then(move |res| {
assert_eq!(res.status(), cstatus, "server status");
assert_eq!(res.version(), version, "server version");
for func in &cheaders {
func(&res.headers());
}
hyper::body::to_bytes(res)
})
.map_ok(move |body| {
assert_eq!(body.as_ref(), cbody.as_slice(), "server body");
})
.map(|res| res.expect("client error"))
},
);
async move {
let stream = TcpStream::connect(addr).await.unwrap();
let (mut sender, conn) = hyper::client::conn::Builder::new()
.http2_only(http2_only)
.handshake::<TcpStream, Body>(stream)
.await
.unwrap();
tokio::task::spawn(async move {
if let Err(err) = conn.await {
panic!("{:?}", err);
}
});
let res = sender.send_request(req).await.unwrap();
assert_eq!(res.status(), cstatus, "server status");
assert_eq!(res.version(), version, "server version");
for func in &cheaders {
func(&res.headers());
}
let body = hyper::body::to_bytes(res).await.unwrap();
assert_eq!(body.as_ref(), cbody.as_slice(), "server body");
}
});
let client_futures: Pin<Box<dyn Future<Output = ()> + Send>> = if cfg.parallel {
let mut client_futures = vec![];
for (creq, cres) in cfg.client_msgs {
client_futures.push(make_request(&client, creq, cres));
client_futures.push(make_request(creq, cres));
}
drop(client);
Box::pin(future::join_all(client_futures).map(|_| ()))
} else {
let mut client_futures: Pin<Box<dyn Future<Output = Client<HttpConnector>> + Send>> =
Box::pin(future::ready(client));
let mut client_futures: Pin<Box<dyn Future<Output = ()> + Send>> =
Box::pin(future::ready(()));
for (creq, cres) in cfg.client_msgs {
let mk_request = make_request.clone();
client_futures = Box::pin(client_futures.then(move |client| {
let fut = mk_request(&client, creq, cres);
fut.map(move |()| client)
}));
client_futures = Box::pin(client_futures.then(move |_| mk_request(creq, cres)));
}
Box::pin(client_futures.map(|_| ()))
};
@@ -449,27 +466,75 @@ struct ProxyConfig {
version: usize,
}
fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future<Output = ()>) {
let client = Client::builder()
.http2_only(cfg.version == 2)
.build_http::<Body>();
async fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future<Output = ()>) {
let dst_addr = cfg.dst;
let max_connections = cfg.connections;
let counter = AtomicUsize::new(0);
let http2_only = cfg.version == 2;
let srv = Server::bind(&([127, 0, 0, 1], 0).into()).serve(make_service_fn(move |_| {
let prev = counter.fetch_add(1, Ordering::Relaxed);
assert!(max_connections > prev, "proxy max connections");
let client = client.clone();
future::ok::<_, hyper::Error>(service_fn(move |mut req| {
let uri = format!("http://{}{}", dst_addr, req.uri().path())
.parse()
.expect("proxy new uri parse");
*req.uri_mut() = uri;
client.request(req)
}))
}));
let proxy_addr = srv.local_addr();
(proxy_addr, srv.map(|res| res.expect("proxy error")))
let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
.await
.unwrap();
let proxy_addr = listener.local_addr().unwrap();
let fut = async move {
tokio::task::spawn(async move {
let prev = counter.fetch_add(1, Ordering::Relaxed);
assert!(max_connections > prev, "proxy max connections");
loop {
let (stream, _) = listener.accept().await.unwrap();
let service = service_fn(move |mut req| {
async move {
let uri = format!("http://{}{}", dst_addr, req.uri().path())
.parse()
.expect("proxy new uri parse");
*req.uri_mut() = uri;
// Make the client request
let uri = req.uri().host().expect("uri has no host");
let port = req.uri().port_u16().expect("uri has no port");
let stream = TcpStream::connect(format!("{}:{}", uri, port))
.await
.unwrap();
let mut builder = Builder::new();
builder.http2_only(http2_only);
let (mut sender, conn) = builder.handshake(stream).await.unwrap();
tokio::task::spawn(async move {
if let Err(err) = conn.await {
panic!("{:?}", err);
}
});
let resp = sender.send_request(req).await?;
let (mut parts, body) = resp.into_parts();
// Remove the Connection header for HTTP/1.1 proxy connections.
if !http2_only {
parts.headers.remove("Connection");
}
let mut builder = Response::builder().status(parts.status);
*builder.headers_mut().unwrap() = parts.headers;
Result::<Response<Body>, hyper::Error>::Ok(builder.body(body).unwrap())
}
});
Http::new()
.http2_only(http2_only)
.serve_connection(stream, service)
.await
.unwrap();
}
});
};
(proxy_addr, fut)
}