test(server): Update and re-enable the tests/server.rs test
These tests were temporarily disabled during the migration to the `std::future::Future` type that's part of the stable Rust now. This commit updates the tests after the breaking changes and makes them pass again.
This commit is contained in:
committed by
Sean McArthur
parent
750ee95a7c
commit
b831ae1870
@@ -1,10 +1,8 @@
|
|||||||
|
#![feature(async_await, async_closure)]
|
||||||
#![deny(warnings)]
|
#![deny(warnings)]
|
||||||
extern crate http;
|
extern crate http;
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate h2;
|
extern crate h2;
|
||||||
#[macro_use]
|
|
||||||
extern crate futures;
|
|
||||||
extern crate futures_timer;
|
|
||||||
extern crate net2;
|
extern crate net2;
|
||||||
extern crate spmc;
|
extern crate spmc;
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
@@ -18,24 +16,32 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
|||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::sync::{Arc};
|
use std::sync::{Arc};
|
||||||
use std::net::{TcpListener as StdTcpListener};
|
use std::net::{TcpListener as StdTcpListener};
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use futures::{Future, Stream};
|
use futures_channel::oneshot;
|
||||||
use futures::future::{self, FutureResult, Either};
|
use futures_core::ready;
|
||||||
use futures::sync::oneshot;
|
use futures_core::future::BoxFuture;
|
||||||
use futures_timer::Delay;
|
use futures_util::future::{self, Either, FutureExt};
|
||||||
|
use futures_util::stream::StreamExt;
|
||||||
|
use futures_util::try_future::{self, TryFutureExt};
|
||||||
|
use futures_util::try_stream::TryStreamExt;
|
||||||
use http::header::{HeaderName, HeaderValue};
|
use http::header::{HeaderName, HeaderValue};
|
||||||
use tokio_net::tcp::{TcpListener, TcpStream as TkTcpStream};
|
use tokio_net::tcp::{TcpListener, TcpStream as TkTcpStream};
|
||||||
use tokio::runtime::current_thread::Runtime;
|
use tokio::runtime::current_thread::Runtime;
|
||||||
use tokio::reactor::Handle;
|
use tokio::reactor::Handle;
|
||||||
|
use tokio::runtime::current_thread::Runtime;
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio_tcp::{TcpListener, TcpStream as TkTcpStream};
|
||||||
|
use tokio_timer::Delay;
|
||||||
|
|
||||||
use hyper::{Body, Request, Response, StatusCode, Version};
|
use hyper::{Body, Request, Response, StatusCode, Version};
|
||||||
use hyper::client::Client;
|
use hyper::client::Client;
|
||||||
use hyper::server::conn::Http;
|
use hyper::server::conn::Http;
|
||||||
use hyper::server::Server;
|
use hyper::server::Server;
|
||||||
use hyper::service::{service_fn, service_fn_ok, Service};
|
use hyper::service::{make_service_fn, service_fn, Service};
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -300,6 +306,7 @@ mod response_body_lengths {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
|
||||||
#[test]
|
#[test]
|
||||||
fn http2_auto_response_with_known_length() {
|
fn http2_auto_response_with_known_length() {
|
||||||
use hyper::body::Payload;
|
use hyper::body::Payload;
|
||||||
@@ -309,7 +316,7 @@ mod response_body_lengths {
|
|||||||
server.reply().body("Hello, World!");
|
server.reply().body("Hello, World!");
|
||||||
|
|
||||||
let mut rt = Runtime::new().expect("rt new");
|
let mut rt = Runtime::new().expect("rt new");
|
||||||
rt.block_on(hyper::rt::lazy(move || {
|
rt.block_on({
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.http2_only(true)
|
.http2_only(true)
|
||||||
.build_http::<hyper::Body>();
|
.build_http::<hyper::Body>();
|
||||||
@@ -319,16 +326,16 @@ mod response_body_lengths {
|
|||||||
|
|
||||||
client
|
client
|
||||||
.get(uri)
|
.get(uri)
|
||||||
.and_then(|res| {
|
.map_ok(|res| {
|
||||||
assert_eq!(res.headers().get("content-length").unwrap(), "13");
|
assert_eq!(res.headers().get("content-length").unwrap(), "13");
|
||||||
assert_eq!(res.body().content_length(), Some(13));
|
assert_eq!(res.body().content_length(), Some(13));
|
||||||
Ok(())
|
()
|
||||||
})
|
})
|
||||||
.map(|_| ())
|
|
||||||
.map_err(|_e| ())
|
.map_err(|_e| ())
|
||||||
})).unwrap();
|
}).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
|
||||||
#[test]
|
#[test]
|
||||||
fn http2_auto_response_with_conflicting_lengths() {
|
fn http2_auto_response_with_conflicting_lengths() {
|
||||||
use hyper::body::Payload;
|
use hyper::body::Payload;
|
||||||
@@ -341,7 +348,7 @@ mod response_body_lengths {
|
|||||||
.body("Hello, World!");
|
.body("Hello, World!");
|
||||||
|
|
||||||
let mut rt = Runtime::new().expect("rt new");
|
let mut rt = Runtime::new().expect("rt new");
|
||||||
rt.block_on(hyper::rt::lazy(move || {
|
rt.block_on({
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.http2_only(true)
|
.http2_only(true)
|
||||||
.build_http::<hyper::Body>();
|
.build_http::<hyper::Body>();
|
||||||
@@ -351,14 +358,13 @@ mod response_body_lengths {
|
|||||||
|
|
||||||
client
|
client
|
||||||
.get(uri)
|
.get(uri)
|
||||||
.and_then(|res| {
|
.map_ok(|res| {
|
||||||
assert_eq!(res.headers().get("content-length").unwrap(), "10");
|
assert_eq!(res.headers().get("content-length").unwrap(), "10");
|
||||||
assert_eq!(res.body().content_length(), Some(10));
|
assert_eq!(res.body().content_length(), Some(10));
|
||||||
Ok(())
|
()
|
||||||
})
|
})
|
||||||
.map(|_| ())
|
|
||||||
.map_err(|_e| ())
|
.map_err(|_e| ())
|
||||||
})).unwrap();
|
}).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -847,35 +853,35 @@ fn disable_keep_alive_mid_request() {
|
|||||||
let addr = listener.local_addr().unwrap();
|
let addr = listener.local_addr().unwrap();
|
||||||
|
|
||||||
let (tx1, rx1) = oneshot::channel();
|
let (tx1, rx1) = oneshot::channel();
|
||||||
let (tx2, rx2) = oneshot::channel();
|
let (tx2, rx2) = mpsc::channel();
|
||||||
|
|
||||||
let child = thread::spawn(move || {
|
let child = thread::spawn(move || {
|
||||||
let mut req = connect(&addr);
|
let mut req = connect(&addr);
|
||||||
req.write_all(b"GET / HTTP/1.1\r\n").unwrap();
|
req.write_all(b"GET / HTTP/1.1\r\n").unwrap();
|
||||||
tx1.send(()).unwrap();
|
tx1.send(()).unwrap();
|
||||||
rx2.wait().unwrap();
|
rx2.recv().unwrap();
|
||||||
req.write_all(b"Host: localhost\r\n\r\n").unwrap();
|
req.write_all(b"Host: localhost\r\n\r\n").unwrap();
|
||||||
let mut buf = vec![];
|
let mut buf = vec![];
|
||||||
req.read_to_end(&mut buf).unwrap();
|
req.read_to_end(&mut buf).unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
|
.map(Option::unwrap)
|
||||||
.map_err(|_| unreachable!())
|
.map_err(|_| unreachable!())
|
||||||
.and_then(|(item, _incoming)| {
|
.and_then(|socket| {
|
||||||
let socket = item.unwrap();
|
let srv = Http::new().serve_connection(socket, HelloWorld);
|
||||||
Http::new().serve_connection(socket, HelloWorld)
|
try_future::try_select(srv, rx1)
|
||||||
.select2(rx1)
|
|
||||||
.then(|r| {
|
.then(|r| {
|
||||||
match r {
|
match r {
|
||||||
Ok(Either::A(_)) => panic!("expected rx first"),
|
Ok(Either::Left(_)) => panic!("expected rx first"),
|
||||||
Ok(Either::B(((), mut conn))) => {
|
Ok(Either::Right(((), mut conn))) => {
|
||||||
conn.graceful_shutdown();
|
Pin::new(&mut conn).graceful_shutdown();
|
||||||
tx2.send(()).unwrap();
|
tx2.send(()).unwrap();
|
||||||
conn
|
conn
|
||||||
}
|
}
|
||||||
Err(Either::A((e, _))) => panic!("unexpected error {}", e),
|
Err(Either::Left((e, _))) => panic!("unexpected error {}", e),
|
||||||
Err(Either::B((e, _))) => panic!("unexpected error {}", e),
|
Err(Either::Right((e, _))) => panic!("unexpected error {}", e),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
@@ -914,26 +920,26 @@ fn disable_keep_alive_post_request() {
|
|||||||
|
|
||||||
let dropped = Dropped::new();
|
let dropped = Dropped::new();
|
||||||
let dropped2 = dropped.clone();
|
let dropped2 = dropped.clone();
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
|
.map(Option::unwrap)
|
||||||
.map_err(|_| unreachable!())
|
.map_err(|_| unreachable!())
|
||||||
.and_then(|(item, _incoming)| {
|
.and_then(|socket| {
|
||||||
let socket = item.expect("accepted socket");
|
|
||||||
let transport = DebugStream {
|
let transport = DebugStream {
|
||||||
stream: socket,
|
stream: socket,
|
||||||
_debug: dropped2,
|
_debug: dropped2,
|
||||||
};
|
};
|
||||||
Http::new().serve_connection(transport, HelloWorld)
|
let server = Http::new().serve_connection(transport, HelloWorld);
|
||||||
.select2(rx1)
|
try_future::try_select(server, rx1)
|
||||||
.then(|r| {
|
.then(|r| {
|
||||||
match r {
|
match r {
|
||||||
Ok(Either::A(_)) => panic!("expected rx first"),
|
Ok(Either::Left(_)) => panic!("expected rx first"),
|
||||||
Ok(Either::B(((), mut conn))) => {
|
Ok(Either::Right(((), mut conn))) => {
|
||||||
conn.graceful_shutdown();
|
Pin::new(&mut conn).graceful_shutdown();
|
||||||
conn
|
conn
|
||||||
}
|
}
|
||||||
Err(Either::A((e, _))) => panic!("unexpected error {}", e),
|
Err(Either::Left((e, _))) => panic!("unexpected error {}", e),
|
||||||
Err(Either::B((e, _))) => panic!("unexpected error {}", e),
|
Err(Either::Right((e, _))) => panic!("unexpected error {}", e),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
@@ -944,8 +950,8 @@ fn disable_keep_alive_post_request() {
|
|||||||
// the read-blocked socket.
|
// the read-blocked socket.
|
||||||
//
|
//
|
||||||
// See https://github.com/carllerche/mio/issues/776
|
// See https://github.com/carllerche/mio/issues/776
|
||||||
let timeout = Delay::new(Duration::from_millis(10));
|
let timeout = Delay::new(Instant::now() + Duration::from_millis(10));
|
||||||
rt.block_on(timeout).unwrap();
|
rt.block_on(timeout);
|
||||||
assert!(dropped.load());
|
assert!(dropped.load());
|
||||||
child.join().unwrap();
|
child.join().unwrap();
|
||||||
}
|
}
|
||||||
@@ -960,13 +966,11 @@ fn empty_parse_eof_does_not_return_error() {
|
|||||||
let _tcp = connect(&addr);
|
let _tcp = connect(&addr);
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
|
.map(Option::unwrap)
|
||||||
.map_err(|_| unreachable!())
|
.map_err(|_| unreachable!())
|
||||||
.and_then(|(item, _incoming)| {
|
.and_then(|socket| Http::new().serve_connection(socket, HelloWorld));
|
||||||
let socket = item.unwrap();
|
|
||||||
Http::new().serve_connection(socket, HelloWorld)
|
|
||||||
});
|
|
||||||
|
|
||||||
rt.block_on(fut).expect("empty parse eof is ok");
|
rt.block_on(fut).expect("empty parse eof is ok");
|
||||||
}
|
}
|
||||||
@@ -982,13 +986,11 @@ fn nonempty_parse_eof_returns_error() {
|
|||||||
tcp.write_all(b"GET / HTTP/1.1").unwrap();
|
tcp.write_all(b"GET / HTTP/1.1").unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
|
.map(Option::unwrap)
|
||||||
.map_err(|_| unreachable!())
|
.map_err(|_| unreachable!())
|
||||||
.and_then(|(item, _incoming)| {
|
.and_then(|socket| Http::new().serve_connection(socket, HelloWorld));
|
||||||
let socket = item.unwrap();
|
|
||||||
Http::new().serve_connection(socket, HelloWorld)
|
|
||||||
});
|
|
||||||
|
|
||||||
rt.block_on(fut).expect_err("partial parse eof is error");
|
rt.block_on(fut).expect_err("partial parse eof is error");
|
||||||
}
|
}
|
||||||
@@ -1011,17 +1013,14 @@ fn socket_half_closed() {
|
|||||||
assert_eq!(s(&buf[..expected.len()]), expected);
|
assert_eq!(s(&buf[..expected.len()]), expected);
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
|
.map(Option::unwrap)
|
||||||
.map_err(|_| unreachable!())
|
.map_err(|_| unreachable!())
|
||||||
.and_then(|(item, _incoming)| {
|
.and_then(|socket| {
|
||||||
let socket = item.unwrap();
|
Http::new().serve_connection(socket, service_fn(|_| {
|
||||||
Http::new()
|
Delay::new(Instant::now() + Duration::from_millis(500))
|
||||||
.serve_connection(socket, service_fn(|_| {
|
.map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty())))
|
||||||
Delay::new(Duration::from_millis(500))
|
|
||||||
.map(|_| {
|
|
||||||
Response::new(Body::empty())
|
|
||||||
})
|
|
||||||
}))
|
}))
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -1040,16 +1039,16 @@ fn disconnect_after_reading_request_before_responding() {
|
|||||||
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
|
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
|
.map(Option::unwrap)
|
||||||
.map_err(|_| unreachable!())
|
.map_err(|_| unreachable!())
|
||||||
.and_then(|(item, _incoming)| {
|
.and_then(|socket| {
|
||||||
let socket = item.unwrap();
|
|
||||||
Http::new()
|
Http::new()
|
||||||
.http1_half_close(false)
|
.http1_half_close(false)
|
||||||
.serve_connection(socket, service_fn(|_| {
|
.serve_connection(socket, service_fn(|_| {
|
||||||
Delay::new(Duration::from_secs(2))
|
Delay::new(Instant::now() + Duration::from_secs(2))
|
||||||
.map(|_| -> Response<Body> {
|
.map(|_| -> Result<Response<Body>, hyper::Error> {
|
||||||
panic!("response future should have been dropped");
|
panic!("response future should have been dropped");
|
||||||
})
|
})
|
||||||
}))
|
}))
|
||||||
@@ -1074,13 +1073,13 @@ fn returning_1xx_response_is_error() {
|
|||||||
assert_eq!(s(&buf[..expected.len()]), expected);
|
assert_eq!(s(&buf[..expected.len()]), expected);
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
|
.map(Option::unwrap)
|
||||||
.map_err(|_| unreachable!())
|
.map_err(|_| unreachable!())
|
||||||
.and_then(|(item, _incoming)| {
|
.and_then(|socket| {
|
||||||
let socket = item.unwrap();
|
|
||||||
Http::new()
|
Http::new()
|
||||||
.serve_connection(socket, service_fn(|_| {
|
.serve_connection(socket, service_fn(|_| async move {
|
||||||
Ok::<_, hyper::Error>(Response::builder()
|
Ok::<_, hyper::Error>(Response::builder()
|
||||||
.status(StatusCode::CONTINUE)
|
.status(StatusCode::CONTINUE)
|
||||||
.body(Body::empty())
|
.body(Body::empty())
|
||||||
@@ -1111,7 +1110,8 @@ fn header_name_too_long() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn upgrades() {
|
fn upgrades() {
|
||||||
use tokio_io::io::{read_to_end, write_all};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
let _ = pretty_env_logger::try_init();
|
let _ = pretty_env_logger::try_init();
|
||||||
let mut rt = Runtime::new().unwrap();
|
let mut rt = Runtime::new().unwrap();
|
||||||
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
||||||
@@ -1139,11 +1139,11 @@ fn upgrades() {
|
|||||||
tcp.write_all(b"bar=foo").expect("write 2");
|
tcp.write_all(b"bar=foo").expect("write 2");
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
.map_err(|_| -> hyper::Error { unreachable!() })
|
.map(Option::unwrap)
|
||||||
.and_then(|(item, _incoming)| {
|
.map_err(|_| unreachable!())
|
||||||
let socket = item.unwrap();
|
.and_then(|socket| {
|
||||||
let conn = Http::new()
|
let conn = Http::new()
|
||||||
.serve_connection(socket, service_fn(|_| {
|
.serve_connection(socket, service_fn(|_| {
|
||||||
let res = Response::builder()
|
let res = Response::builder()
|
||||||
@@ -1151,14 +1151,14 @@ fn upgrades() {
|
|||||||
.header("upgrade", "foobar")
|
.header("upgrade", "foobar")
|
||||||
.body(hyper::Body::empty())
|
.body(hyper::Body::empty())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
Ok::<_, hyper::Error>(res)
|
future::ready(Ok::<_, hyper::Error>(res))
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let mut conn_opt = Some(conn);
|
let mut conn_opt = Some(conn);
|
||||||
future::poll_fn(move || {
|
future::poll_fn(move |ctx| {
|
||||||
try_ready!(conn_opt.as_mut().unwrap().poll_without_shutdown());
|
ready!(conn_opt.as_mut().unwrap().poll_without_shutdown(ctx)).unwrap();
|
||||||
// conn is done with HTTP now
|
// conn is done with HTTP now
|
||||||
Ok(conn_opt.take().unwrap().into())
|
Poll::Ready(Ok(conn_opt.take().unwrap()))
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -1168,17 +1168,19 @@ fn upgrades() {
|
|||||||
rt.block_on(rx).unwrap();
|
rt.block_on(rx).unwrap();
|
||||||
|
|
||||||
let parts = conn.into_parts();
|
let parts = conn.into_parts();
|
||||||
let io = parts.io;
|
|
||||||
assert_eq!(parts.read_buf, "eagerly optimistic");
|
assert_eq!(parts.read_buf, "eagerly optimistic");
|
||||||
|
|
||||||
let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0;
|
let mut io = parts.io;
|
||||||
let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1;
|
rt.block_on(io.write_all(b"foo=bar")).unwrap();
|
||||||
|
let mut vec = vec![];
|
||||||
|
rt.block_on(io.read_to_end(&mut vec)).unwrap();
|
||||||
assert_eq!(vec, b"bar=foo");
|
assert_eq!(vec, b"bar=foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn http_connect() {
|
fn http_connect() {
|
||||||
use tokio_io::io::{read_to_end, write_all};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
let _ = pretty_env_logger::try_init();
|
let _ = pretty_env_logger::try_init();
|
||||||
let mut rt = Runtime::new().unwrap();
|
let mut rt = Runtime::new().unwrap();
|
||||||
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
||||||
@@ -1204,25 +1206,25 @@ fn http_connect() {
|
|||||||
tcp.write_all(b"bar=foo").expect("write 2");
|
tcp.write_all(b"bar=foo").expect("write 2");
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
.map_err(|_| -> hyper::Error { unreachable!() })
|
.map(Option::unwrap)
|
||||||
.and_then(|(item, _incoming)| {
|
.map_err(|_| unreachable!())
|
||||||
let socket = item.unwrap();
|
.and_then(|socket| {
|
||||||
let conn = Http::new()
|
let conn = Http::new()
|
||||||
.serve_connection(socket, service_fn(|_| {
|
.serve_connection(socket, service_fn(|_| {
|
||||||
let res = Response::builder()
|
let res = Response::builder()
|
||||||
.status(200)
|
.status(200)
|
||||||
.body(hyper::Body::empty())
|
.body(hyper::Body::empty())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
Ok::<_, hyper::Error>(res)
|
future::ready(Ok::<_, hyper::Error>(res))
|
||||||
}));
|
}));
|
||||||
|
|
||||||
let mut conn_opt = Some(conn);
|
let mut conn_opt = Some(conn);
|
||||||
future::poll_fn(move || {
|
future::poll_fn(move |ctx| {
|
||||||
try_ready!(conn_opt.as_mut().unwrap().poll_without_shutdown());
|
ready!(conn_opt.as_mut().unwrap().poll_without_shutdown(ctx)).unwrap();
|
||||||
// conn is done with HTTP now
|
// conn is done with HTTP now
|
||||||
Ok(conn_opt.take().unwrap().into())
|
Poll::Ready(Ok(conn_opt.take().unwrap()))
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -1232,17 +1234,19 @@ fn http_connect() {
|
|||||||
rt.block_on(rx).unwrap();
|
rt.block_on(rx).unwrap();
|
||||||
|
|
||||||
let parts = conn.into_parts();
|
let parts = conn.into_parts();
|
||||||
let io = parts.io;
|
|
||||||
assert_eq!(parts.read_buf, "eagerly optimistic");
|
assert_eq!(parts.read_buf, "eagerly optimistic");
|
||||||
|
|
||||||
let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0;
|
let mut io = parts.io;
|
||||||
let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1;
|
rt.block_on(io.write_all(b"foo=bar")).unwrap();
|
||||||
|
let mut vec = vec![];
|
||||||
|
rt.block_on(io.read_to_end(&mut vec)).unwrap();
|
||||||
assert_eq!(vec, b"bar=foo");
|
assert_eq!(vec, b"bar=foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn upgrades_new() {
|
fn upgrades_new() {
|
||||||
use tokio_io::io::{read_to_end, write_all};
|
use crate::tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
let _ = pretty_env_logger::try_init();
|
let _ = pretty_env_logger::try_init();
|
||||||
let mut rt = Runtime::new().unwrap();
|
let mut rt = Runtime::new().unwrap();
|
||||||
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
||||||
@@ -1271,26 +1275,24 @@ fn upgrades_new() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
let (upgrades_tx, upgrades_rx) = mpsc::channel();
|
let (upgrades_tx, upgrades_rx) = mpsc::channel();
|
||||||
let svc = service_fn_ok(move |req: Request<Body>| {
|
let svc = service_fn(move |req: Request<Body>| {
|
||||||
let on_upgrade = req
|
let on_upgrade = req
|
||||||
.into_body()
|
.into_body()
|
||||||
.on_upgrade();
|
.on_upgrade();
|
||||||
let _ = upgrades_tx.send(on_upgrade);
|
let _ = upgrades_tx.send(on_upgrade);
|
||||||
Response::builder()
|
future::ok::<_, hyper::Error>(Response::builder()
|
||||||
.status(101)
|
.status(101)
|
||||||
.header("upgrade", "foobar")
|
.header("upgrade", "foobar")
|
||||||
.body(hyper::Body::empty())
|
.body(hyper::Body::empty())
|
||||||
.unwrap()
|
.unwrap())
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
.map_err(|_| -> hyper::Error { unreachable!() })
|
.map(Option::unwrap)
|
||||||
.and_then(move |(item, _incoming)| {
|
.map_err(|_| unreachable!())
|
||||||
let socket = item.unwrap();
|
.and_then(|socket| {
|
||||||
Http::new()
|
Http::new().serve_connection(socket, svc).with_upgrades()
|
||||||
.serve_connection(socket, svc)
|
|
||||||
.with_upgrades()
|
|
||||||
});
|
});
|
||||||
|
|
||||||
rt.block_on(fut).unwrap();
|
rt.block_on(fut).unwrap();
|
||||||
@@ -1301,17 +1303,19 @@ fn upgrades_new() {
|
|||||||
|
|
||||||
let upgraded = rt.block_on(on_upgrade).unwrap();
|
let upgraded = rt.block_on(on_upgrade).unwrap();
|
||||||
let parts = upgraded.downcast::<TkTcpStream>().unwrap();
|
let parts = upgraded.downcast::<TkTcpStream>().unwrap();
|
||||||
let io = parts.io;
|
|
||||||
assert_eq!(parts.read_buf, "eagerly optimistic");
|
assert_eq!(parts.read_buf, "eagerly optimistic");
|
||||||
|
|
||||||
let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0;
|
let mut io = parts.io;
|
||||||
let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1;
|
rt.block_on(io.write_all(b"foo=bar")).unwrap();
|
||||||
|
let mut vec = vec![];
|
||||||
|
rt.block_on(io.read_to_end(&mut vec)).unwrap();
|
||||||
assert_eq!(s(&vec), "bar=foo");
|
assert_eq!(s(&vec), "bar=foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn http_connect_new() {
|
fn http_connect_new() {
|
||||||
use tokio_io::io::{read_to_end, write_all};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
|
||||||
let _ = pretty_env_logger::try_init();
|
let _ = pretty_env_logger::try_init();
|
||||||
let mut rt = Runtime::new().unwrap();
|
let mut rt = Runtime::new().unwrap();
|
||||||
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
||||||
@@ -1338,25 +1342,23 @@ fn http_connect_new() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
let (upgrades_tx, upgrades_rx) = mpsc::channel();
|
let (upgrades_tx, upgrades_rx) = mpsc::channel();
|
||||||
let svc = service_fn_ok(move |req: Request<Body>| {
|
let svc = service_fn(move |req: Request<Body>| {
|
||||||
let on_upgrade = req
|
let on_upgrade = req
|
||||||
.into_body()
|
.into_body()
|
||||||
.on_upgrade();
|
.on_upgrade();
|
||||||
let _ = upgrades_tx.send(on_upgrade);
|
let _ = upgrades_tx.send(on_upgrade);
|
||||||
Response::builder()
|
future::ok::<_, hyper::Error>(Response::builder()
|
||||||
.status(200)
|
.status(200)
|
||||||
.body(hyper::Body::empty())
|
.body(hyper::Body::empty())
|
||||||
.unwrap()
|
.unwrap())
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
.map_err(|_| -> hyper::Error { unreachable!() })
|
.map(Option::unwrap)
|
||||||
.and_then(move |(item, _incoming)| {
|
.map_err(|_| unreachable!())
|
||||||
let socket = item.unwrap();
|
.and_then(|socket| {
|
||||||
Http::new()
|
Http::new().serve_connection(socket, svc).with_upgrades()
|
||||||
.serve_connection(socket, svc)
|
|
||||||
.with_upgrades()
|
|
||||||
});
|
});
|
||||||
|
|
||||||
rt.block_on(fut).unwrap();
|
rt.block_on(fut).unwrap();
|
||||||
@@ -1367,17 +1369,20 @@ fn http_connect_new() {
|
|||||||
|
|
||||||
let upgraded = rt.block_on(on_upgrade).unwrap();
|
let upgraded = rt.block_on(on_upgrade).unwrap();
|
||||||
let parts = upgraded.downcast::<TkTcpStream>().unwrap();
|
let parts = upgraded.downcast::<TkTcpStream>().unwrap();
|
||||||
let io = parts.io;
|
|
||||||
assert_eq!(parts.read_buf, "eagerly optimistic");
|
assert_eq!(parts.read_buf, "eagerly optimistic");
|
||||||
|
|
||||||
let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0;
|
let mut io = parts.io;
|
||||||
let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1;
|
rt.block_on(io.write_all(b"foo=bar")).unwrap();
|
||||||
|
let mut vec = vec![];
|
||||||
|
rt.block_on(io.read_to_end(&mut vec)).unwrap();
|
||||||
assert_eq!(s(&vec), "bar=foo");
|
assert_eq!(s(&vec), "bar=foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn parse_errors_send_4xx_response() {
|
fn parse_errors_send_4xx_response() {
|
||||||
|
|
||||||
|
|
||||||
let mut rt = Runtime::new().unwrap();
|
let mut rt = Runtime::new().unwrap();
|
||||||
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
||||||
let addr = listener.local_addr().unwrap();
|
let addr = listener.local_addr().unwrap();
|
||||||
@@ -1392,20 +1397,19 @@ fn parse_errors_send_4xx_response() {
|
|||||||
assert_eq!(s(&buf[..expected.len()]), expected);
|
assert_eq!(s(&buf[..expected.len()]), expected);
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
|
.map(Option::unwrap)
|
||||||
.map_err(|_| unreachable!())
|
.map_err(|_| unreachable!())
|
||||||
.and_then(|(item, _incoming)| {
|
.and_then(|socket| Http::new().serve_connection(socket, HelloWorld));
|
||||||
let socket = item.unwrap();
|
|
||||||
Http::new()
|
|
||||||
.serve_connection(socket, HelloWorld)
|
|
||||||
});
|
|
||||||
|
|
||||||
rt.block_on(fut).expect_err("HTTP parse error");
|
rt.block_on(fut).expect_err("HTTP parse error");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn illegal_request_length_returns_400_response() {
|
fn illegal_request_length_returns_400_response() {
|
||||||
|
|
||||||
|
|
||||||
let mut rt = Runtime::new().unwrap();
|
let mut rt = Runtime::new().unwrap();
|
||||||
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
|
||||||
let addr = listener.local_addr().unwrap();
|
let addr = listener.local_addr().unwrap();
|
||||||
@@ -1420,14 +1424,11 @@ fn illegal_request_length_returns_400_response() {
|
|||||||
assert_eq!(s(&buf[..expected.len()]), expected);
|
assert_eq!(s(&buf[..expected.len()]), expected);
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
|
.map(Option::unwrap)
|
||||||
.map_err(|_| unreachable!())
|
.map_err(|_| unreachable!())
|
||||||
.and_then(|(item, _incoming)| {
|
.and_then(|socket| Http::new().serve_connection(socket, HelloWorld));
|
||||||
let socket = item.unwrap();
|
|
||||||
Http::new()
|
|
||||||
.serve_connection(socket, HelloWorld)
|
|
||||||
});
|
|
||||||
|
|
||||||
rt.block_on(fut).expect_err("illegal Content-Length should error");
|
rt.block_on(fut).expect_err("illegal Content-Length should error");
|
||||||
}
|
}
|
||||||
@@ -1464,11 +1465,11 @@ fn max_buf_size() {
|
|||||||
assert_eq!(s(&buf[..expected.len()]), expected);
|
assert_eq!(s(&buf[..expected.len()]), expected);
|
||||||
});
|
});
|
||||||
|
|
||||||
let fut = listener.incoming()
|
let mut incoming = listener.incoming();
|
||||||
.into_future()
|
let fut = incoming.next()
|
||||||
|
.map(Option::unwrap)
|
||||||
.map_err(|_| unreachable!())
|
.map_err(|_| unreachable!())
|
||||||
.and_then(|(item, _incoming)| {
|
.and_then(|socket| {
|
||||||
let socket = item.unwrap();
|
|
||||||
Http::new()
|
Http::new()
|
||||||
.max_buf_size(MAX)
|
.max_buf_size(MAX)
|
||||||
.serve_connection(socket, HelloWorld)
|
.serve_connection(socket, HelloWorld)
|
||||||
@@ -1487,8 +1488,8 @@ fn streaming_body() {
|
|||||||
.serve();
|
.serve();
|
||||||
|
|
||||||
static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _;
|
static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _;
|
||||||
let b = ::futures::stream::iter_ok::<_, String>(S.into_iter())
|
let b = ::futures_util::stream::iter(S.into_iter())
|
||||||
.map(|&s| s);
|
.map(|&s| Ok::<_, hyper::Error>(s));
|
||||||
let b = hyper::Body::wrap_stream(b);
|
let b = hyper::Body::wrap_stream(b);
|
||||||
server
|
server
|
||||||
.reply()
|
.reply()
|
||||||
@@ -1514,14 +1515,14 @@ fn http1_response_with_http2_version() {
|
|||||||
.reply()
|
.reply()
|
||||||
.version(hyper::Version::HTTP_2);
|
.version(hyper::Version::HTTP_2);
|
||||||
|
|
||||||
rt.block_on(hyper::rt::lazy(move || {
|
rt.block_on({
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
let uri = addr_str.parse().expect("server addr should parse");
|
let uri = addr_str.parse().expect("server addr should parse");
|
||||||
|
|
||||||
client.get(uri)
|
client.get(uri)
|
||||||
})).unwrap();
|
}).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
|
||||||
#[test]
|
#[test]
|
||||||
fn try_h2() {
|
fn try_h2() {
|
||||||
let server = serve();
|
let server = serve();
|
||||||
@@ -1529,21 +1530,22 @@ fn try_h2() {
|
|||||||
|
|
||||||
let mut rt = Runtime::new().expect("runtime new");
|
let mut rt = Runtime::new().expect("runtime new");
|
||||||
|
|
||||||
rt.block_on(hyper::rt::lazy(move || {
|
rt.block_on({
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.http2_only(true)
|
.http2_only(true)
|
||||||
.build_http::<hyper::Body>();
|
.build_http::<hyper::Body>();
|
||||||
let uri = addr_str.parse().expect("server addr should parse");
|
let uri = addr_str.parse().expect("server addr should parse");
|
||||||
|
|
||||||
client.get(uri)
|
client
|
||||||
.and_then(|_res| { Ok(()) })
|
.get(uri)
|
||||||
.map(|_| { () })
|
.map_ok(|_| { () })
|
||||||
.map_err(|_e| { () })
|
.map_err(|_e| { () })
|
||||||
})).unwrap();
|
}).unwrap();
|
||||||
|
|
||||||
assert_eq!(server.body(), b"");
|
assert_eq!(server.body(), b"");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
|
||||||
#[test]
|
#[test]
|
||||||
fn http1_only() {
|
fn http1_only() {
|
||||||
let server = serve_opts()
|
let server = serve_opts()
|
||||||
@@ -1553,19 +1555,20 @@ fn http1_only() {
|
|||||||
|
|
||||||
let mut rt = Runtime::new().expect("runtime new");
|
let mut rt = Runtime::new().expect("runtime new");
|
||||||
|
|
||||||
rt.block_on(hyper::rt::lazy(move || {
|
rt.block_on({
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.http2_only(true)
|
.http2_only(true)
|
||||||
.build_http::<hyper::Body>();
|
.build_http::<hyper::Body>();
|
||||||
let uri = addr_str.parse().expect("server addr should parse");
|
let uri = addr_str.parse().expect("server addr should parse");
|
||||||
|
|
||||||
client.get(uri)
|
client.get(uri)
|
||||||
})).unwrap_err();
|
}).unwrap_err();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
|
||||||
#[test]
|
#[test]
|
||||||
fn http2_service_error_sends_reset_reason() {
|
fn http2_service_error_sends_reset_reason() {
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
|
|
||||||
let server = serve();
|
let server = serve();
|
||||||
let addr_str = format!("http://{}", server.addr());
|
let addr_str = format!("http://{}", server.addr());
|
||||||
|
|
||||||
@@ -1575,14 +1578,14 @@ fn http2_service_error_sends_reset_reason() {
|
|||||||
|
|
||||||
let mut rt = Runtime::new().expect("runtime new");
|
let mut rt = Runtime::new().expect("runtime new");
|
||||||
|
|
||||||
let err = rt.block_on(hyper::rt::lazy(move || {
|
let err = rt.block_on({
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.http2_only(true)
|
.http2_only(true)
|
||||||
.build_http::<hyper::Body>();
|
.build_http::<hyper::Body>();
|
||||||
let uri = addr_str.parse().expect("server addr should parse");
|
let uri = addr_str.parse().expect("server addr should parse");
|
||||||
|
|
||||||
client.get(uri)
|
client.get(uri)
|
||||||
})).unwrap_err();
|
}).unwrap_err();
|
||||||
|
|
||||||
let h2_err = err
|
let h2_err = err
|
||||||
.source()
|
.source()
|
||||||
@@ -1593,15 +1596,16 @@ fn http2_service_error_sends_reset_reason() {
|
|||||||
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
|
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
|
||||||
#[test]
|
#[test]
|
||||||
fn http2_body_user_error_sends_reset_reason() {
|
fn http2_body_user_error_sends_reset_reason() {
|
||||||
use std::error::Error;
|
use std::error::Error;
|
||||||
let server = serve();
|
let server = serve();
|
||||||
let addr_str = format!("http://{}", server.addr());
|
let addr_str = format!("http://{}", server.addr());
|
||||||
|
|
||||||
let b = ::futures::stream::once::<String, h2::Error>(Err(
|
let b = ::futures_util::stream::once(
|
||||||
h2::Error::from(h2::Reason::INADEQUATE_SECURITY)
|
future::err::<String, _>(h2::Error::from(h2::Reason::INADEQUATE_SECURITY))
|
||||||
));
|
);
|
||||||
let b = hyper::Body::wrap_stream(b);
|
let b = hyper::Body::wrap_stream(b);
|
||||||
|
|
||||||
server
|
server
|
||||||
@@ -1610,7 +1614,7 @@ fn http2_body_user_error_sends_reset_reason() {
|
|||||||
|
|
||||||
let mut rt = Runtime::new().expect("runtime new");
|
let mut rt = Runtime::new().expect("runtime new");
|
||||||
|
|
||||||
let err = rt.block_on(hyper::rt::lazy(move || {
|
let err = rt.block_on({
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.http2_only(true)
|
.http2_only(true)
|
||||||
.build_http::<hyper::Body>();
|
.build_http::<hyper::Body>();
|
||||||
@@ -1618,8 +1622,8 @@ fn http2_body_user_error_sends_reset_reason() {
|
|||||||
|
|
||||||
client
|
client
|
||||||
.get(uri)
|
.get(uri)
|
||||||
.and_then(|res| res.into_body().concat2())
|
.and_then(|res| res.into_body().try_concat())
|
||||||
})).unwrap_err();
|
}).unwrap_err();
|
||||||
|
|
||||||
let h2_err = err
|
let h2_err = err
|
||||||
.source()
|
.source()
|
||||||
@@ -1630,23 +1634,18 @@ fn http2_body_user_error_sends_reset_reason() {
|
|||||||
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
|
assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn http2_service_poll_ready_error_sends_goaway() {
|
|
||||||
use std::error::Error;
|
|
||||||
|
|
||||||
struct Svc;
|
struct Svc;
|
||||||
|
|
||||||
impl hyper::service::Service for Svc {
|
impl hyper::service::Service for Svc {
|
||||||
type ReqBody = hyper::Body;
|
type ReqBody = hyper::Body;
|
||||||
type ResBody = hyper::Body;
|
type ResBody = hyper::Body;
|
||||||
type Error = h2::Error;
|
type Error = h2::Error;
|
||||||
type Future = Box<dyn Future<
|
type Future = Box<dyn futures_core::Future<
|
||||||
Item = hyper::Response<Self::ResBody>,
|
Output = Result<hyper::Response<Self::ResBody>, Self::Error>
|
||||||
Error = Self::Error
|
> + Send + Sync + Unpin>;
|
||||||
> + Send + Sync>;
|
|
||||||
|
|
||||||
fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> {
|
fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
Err(h2::Error::from(h2::Reason::INADEQUATE_SECURITY))
|
Poll::Ready(Err::<(), _>(h2::Error::from(h2::Reason::INADEQUATE_SECURITY)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&mut self, _: hyper::Request<Self::ResBody>) -> Self::Future {
|
fn call(&mut self, _: hyper::Request<Self::ResBody>) -> Self::Future {
|
||||||
@@ -1654,28 +1653,32 @@ fn http2_service_poll_ready_error_sends_goaway() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[ignore] // Re-enable as soon as HTTP/2.0 is supported again.
|
||||||
|
#[test]
|
||||||
|
fn http2_service_poll_ready_error_sends_goaway() {
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
let _ = pretty_env_logger::try_init();
|
let _ = pretty_env_logger::try_init();
|
||||||
|
|
||||||
let server = hyper::Server::bind(&([127, 0, 0, 1], 0).into())
|
let server = hyper::Server::bind(&([127, 0, 0, 1], 0).into())
|
||||||
.http2_only(true)
|
.http2_only(true)
|
||||||
.serve(|| Ok::<_, BoxError>(Svc));
|
.serve(make_service_fn(|_| async move { Ok::<_, BoxError>(Svc) }));
|
||||||
|
|
||||||
let addr_str = format!("http://{}", server.local_addr());
|
let addr_str = format!("http://{}", server.local_addr());
|
||||||
|
|
||||||
|
|
||||||
let mut rt = Runtime::new().expect("runtime new");
|
let mut rt = Runtime::new().expect("runtime new");
|
||||||
|
|
||||||
rt.spawn(server.map_err(|e| unreachable!("server shouldn't error: {:?}", e)));
|
rt.spawn(server
|
||||||
|
.map_err(|e| unreachable!("server shouldn't error: {:?}", e))
|
||||||
|
.map(|_| ()));
|
||||||
|
|
||||||
let err = rt.block_on(hyper::rt::lazy(move || {
|
let err = rt.block_on({
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.http2_only(true)
|
.http2_only(true)
|
||||||
.build_http::<hyper::Body>();
|
.build_http::<hyper::Body>();
|
||||||
let uri = addr_str.parse().expect("server addr should parse");
|
let uri = addr_str.parse().expect("server addr should parse");
|
||||||
|
client.get(uri)
|
||||||
client
|
}).unwrap_err();
|
||||||
.get(uri)
|
|
||||||
})).unwrap_err();
|
|
||||||
|
|
||||||
// client request should have gotten the specific GOAWAY error...
|
// client request should have gotten the specific GOAWAY error...
|
||||||
let h2_err = err
|
let h2_err = err
|
||||||
@@ -1811,6 +1814,7 @@ impl<'a> ReplyBuilder<'a> {
|
|||||||
self.tx.send(Reply::Body(body)).unwrap();
|
self.tx.send(Reply::Body(body)).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
fn error<E: Into<BoxError>>(self, err: E) {
|
fn error<E: Into<BoxError>>(self, err: E) {
|
||||||
self.tx.send(Reply::Error(err.into())).unwrap();
|
self.tx.send(Reply::Error(err.into())).unwrap();
|
||||||
}
|
}
|
||||||
@@ -1825,7 +1829,11 @@ impl<'a> Drop for ReplyBuilder<'a> {
|
|||||||
impl Drop for Serve {
|
impl Drop for Serve {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
drop(self.shutdown_signal.take());
|
drop(self.shutdown_signal.take());
|
||||||
self.thread.take().unwrap().join().unwrap();
|
let r = self.thread.take().unwrap().join();
|
||||||
|
if let Err(ref e) = r {
|
||||||
|
println!("{:?}", e);
|
||||||
|
}
|
||||||
|
r.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1852,26 +1860,38 @@ enum Msg {
|
|||||||
End,
|
End,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestService {
|
impl Service for TestService {
|
||||||
fn call(&self, req: Request<Body>) -> Box<dyn Future<Item=Response<Body>, Error=BoxError> + Send> {
|
type ReqBody = Body;
|
||||||
|
type ResBody = Body;
|
||||||
|
type Error = BoxError;
|
||||||
|
type Future = BoxFuture<'static, Result<Response<Body>, BoxError>>;
|
||||||
|
|
||||||
|
fn call(&mut self, req: Request<Body>) -> Self::Future {
|
||||||
let tx1 = self.tx.clone();
|
let tx1 = self.tx.clone();
|
||||||
let tx2 = self.tx.clone();
|
let tx2 = self.tx.clone();
|
||||||
let replies = self.reply.clone();
|
let replies = self.reply.clone();
|
||||||
Box::new(req.into_body().for_each(move |chunk| {
|
req
|
||||||
|
.into_body()
|
||||||
|
.try_concat()
|
||||||
|
.map_ok(move |chunk| {
|
||||||
tx1.send(Msg::Chunk(chunk.to_vec())).unwrap();
|
tx1.send(Msg::Chunk(chunk.to_vec())).unwrap();
|
||||||
Ok(())
|
()
|
||||||
}).then(move |result| {
|
})
|
||||||
|
.map(move |result| {
|
||||||
let msg = match result {
|
let msg = match result {
|
||||||
Ok(()) => Msg::End,
|
Ok(()) => Msg::End,
|
||||||
Err(e) => Msg::Error(e),
|
Err(e) => Msg::Error(e),
|
||||||
};
|
};
|
||||||
tx2.send(msg).unwrap();
|
tx2.send(msg).unwrap();
|
||||||
Ok(())
|
})
|
||||||
}).and_then(move |_| {
|
.map(move |_| {
|
||||||
TestService::build_reply(replies)
|
TestService::build_reply(replies)
|
||||||
}))
|
})
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl TestService {
|
||||||
fn build_reply(replies: spmc::Receiver<Reply>) -> Result<Response<Body>, BoxError> {
|
fn build_reply(replies: spmc::Receiver<Reply>) -> Result<Response<Body>, BoxError> {
|
||||||
let mut res = Response::new(Body::empty());
|
let mut res = Response::new(Body::empty());
|
||||||
while let Ok(reply) = replies.try_recv() {
|
while let Ok(reply) = replies.try_recv() {
|
||||||
@@ -1904,11 +1924,11 @@ impl Service for HelloWorld {
|
|||||||
type ReqBody = Body;
|
type ReqBody = Body;
|
||||||
type ResBody = Body;
|
type ResBody = Body;
|
||||||
type Error = hyper::Error;
|
type Error = hyper::Error;
|
||||||
type Future = FutureResult<Response<Body>, Self::Error>;
|
type Future = BoxFuture<'static, Result<hyper::Response<Self::ResBody>, Self::Error>>;
|
||||||
|
|
||||||
fn call(&mut self, _req: Request<Body>) -> Self::Future {
|
fn call(&mut self, _req: Request<Body>) -> Self::Future {
|
||||||
let response = Response::new(HELLO.into());
|
let response = Response::new(HELLO.into());
|
||||||
future::ok(response)
|
future::ok(response).boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1978,29 +1998,39 @@ impl ServeOptions {
|
|||||||
.name()
|
.name()
|
||||||
.unwrap_or("<unknown test case name>")
|
.unwrap_or("<unknown test case name>")
|
||||||
);
|
);
|
||||||
let thread = thread::Builder::new().name(thread_name).spawn(move || {
|
let thread = thread::Builder::new()
|
||||||
|
.name(thread_name)
|
||||||
|
.spawn(move || {
|
||||||
|
let service = make_service_fn(|_| {
|
||||||
|
let msg_tx = msg_tx.clone();
|
||||||
|
let reply_rx = reply_rx.clone();
|
||||||
|
future::ok::<_, BoxError>(TestService {
|
||||||
|
tx: msg_tx.clone(),
|
||||||
|
reply: reply_rx.clone(),
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
let server = Server::bind(&addr)
|
let server = Server::bind(&addr)
|
||||||
.http1_only(options.http1_only)
|
.http1_only(options.http1_only)
|
||||||
.http1_keepalive(options.keep_alive)
|
.http1_keepalive(options.keep_alive)
|
||||||
.http1_pipeline_flush(options.pipeline)
|
.http1_pipeline_flush(options.pipeline)
|
||||||
.serve(move || {
|
.serve(service);
|
||||||
let ts = TestService {
|
|
||||||
tx: msg_tx.clone(),
|
|
||||||
reply: reply_rx.clone(),
|
|
||||||
};
|
|
||||||
service_fn(move |req| ts.call(req))
|
|
||||||
});
|
|
||||||
|
|
||||||
addr_tx.send(
|
addr_tx.send(
|
||||||
server.local_addr()
|
server.local_addr()
|
||||||
).expect("server addr tx");
|
).expect("server addr tx");
|
||||||
|
|
||||||
let fut = server
|
let fut = server
|
||||||
.with_graceful_shutdown(shutdown_rx);
|
.with_graceful_shutdown(async {
|
||||||
|
shutdown_rx.await.ok();
|
||||||
|
});
|
||||||
|
|
||||||
let mut rt = Runtime::new().expect("rt new");
|
let mut rt = Runtime::new().expect("rt new");
|
||||||
rt.block_on(fut).unwrap();
|
rt
|
||||||
}).expect("thread spawn");
|
.block_on(fut)
|
||||||
|
.unwrap();
|
||||||
|
})
|
||||||
|
.expect("thread spawn");
|
||||||
|
|
||||||
let addr = addr_rx.recv().expect("server addr rx");
|
let addr = addr_rx.recv().expect("server addr rx");
|
||||||
|
|
||||||
@@ -2059,6 +2089,8 @@ struct DebugStream<T, D> {
|
|||||||
_debug: D,
|
_debug: D,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T: Unpin, D> Unpin for DebugStream<T, D> {}
|
||||||
|
|
||||||
impl<T: Read, D> Read for DebugStream<T, D> {
|
impl<T: Read, D> Read for DebugStream<T, D> {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
self.stream.read(buf)
|
self.stream.read(buf)
|
||||||
@@ -2076,14 +2108,34 @@ impl<T: Write, D> Write for DebugStream<T, D> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
impl<T: AsyncWrite, D> AsyncWrite for DebugStream<T, D> {
|
impl<T: AsyncWrite + Unpin, D> AsyncWrite for DebugStream<T, D> {
|
||||||
fn shutdown(&mut self) -> futures::Poll<(), io::Error> {
|
fn poll_write(
|
||||||
self.stream.shutdown()
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<Result<usize, io::Error>> {
|
||||||
|
Pin::new(&mut self.stream).poll_write(cx, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||||
|
Pin::new(&mut self.stream).poll_flush(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||||
|
Pin::new(&mut self.stream).poll_shutdown(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
impl<T: AsyncRead, D> AsyncRead for DebugStream<T, D> {}
|
impl<T: AsyncRead + Unpin, D: Unpin> AsyncRead for DebugStream<T, D> {
|
||||||
|
fn poll_read(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
Pin::new(&mut self.stream).poll_read(cx, buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct Dropped(Arc<AtomicBool>);
|
struct Dropped(Arc<AtomicBool>);
|
||||||
Reference in New Issue
Block a user