diff --git a/tests_disabled/integration.rs b/tests/integration.rs similarity index 100% rename from tests_disabled/integration.rs rename to tests/integration.rs diff --git a/tests_disabled/server.rs b/tests/server.rs similarity index 81% rename from tests_disabled/server.rs rename to tests/server.rs index f2a4f06c..8483d385 100644 --- a/tests_disabled/server.rs +++ b/tests/server.rs @@ -1,10 +1,8 @@ +#![feature(async_await, async_closure)] #![deny(warnings)] extern crate http; extern crate hyper; extern crate h2; -#[macro_use] -extern crate futures; -extern crate futures_timer; extern crate net2; extern crate spmc; extern crate pretty_env_logger; @@ -18,24 +16,32 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use std::sync::{Arc}; use std::net::{TcpListener as StdTcpListener}; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; -use futures::{Future, Stream}; -use futures::future::{self, FutureResult, Either}; -use futures::sync::oneshot; -use futures_timer::Delay; +use futures_channel::oneshot; +use futures_core::ready; +use futures_core::future::BoxFuture; +use futures_util::future::{self, Either, FutureExt}; +use futures_util::stream::StreamExt; +use futures_util::try_future::{self, TryFutureExt}; +use futures_util::try_stream::TryStreamExt; use http::header::{HeaderName, HeaderValue}; use tokio_net::tcp::{TcpListener, TcpStream as TkTcpStream}; use tokio::runtime::current_thread::Runtime; use tokio::reactor::Handle; +use tokio::runtime::current_thread::Runtime; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_tcp::{TcpListener, TcpStream as TkTcpStream}; +use tokio_timer::Delay; use hyper::{Body, Request, Response, StatusCode, Version}; use hyper::client::Client; use hyper::server::conn::Http; use hyper::server::Server; -use hyper::service::{service_fn, service_fn_ok, Service}; +use hyper::service::{make_service_fn, service_fn, Service}; #[test] @@ -300,6 +306,7 @@ mod response_body_lengths { }); } + #[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_auto_response_with_known_length() { use hyper::body::Payload; @@ -309,7 +316,7 @@ mod response_body_lengths { server.reply().body("Hello, World!"); let mut rt = Runtime::new().expect("rt new"); - rt.block_on(hyper::rt::lazy(move || { + rt.block_on({ let client = Client::builder() .http2_only(true) .build_http::(); @@ -319,16 +326,16 @@ mod response_body_lengths { client .get(uri) - .and_then(|res| { + .map_ok(|res| { assert_eq!(res.headers().get("content-length").unwrap(), "13"); assert_eq!(res.body().content_length(), Some(13)); - Ok(()) + () }) - .map(|_| ()) .map_err(|_e| ()) - })).unwrap(); + }).unwrap(); } + #[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_auto_response_with_conflicting_lengths() { use hyper::body::Payload; @@ -341,7 +348,7 @@ mod response_body_lengths { .body("Hello, World!"); let mut rt = Runtime::new().expect("rt new"); - rt.block_on(hyper::rt::lazy(move || { + rt.block_on({ let client = Client::builder() .http2_only(true) .build_http::(); @@ -351,14 +358,13 @@ mod response_body_lengths { client .get(uri) - .and_then(|res| { + .map_ok(|res| { assert_eq!(res.headers().get("content-length").unwrap(), "10"); assert_eq!(res.body().content_length(), Some(10)); - Ok(()) + () }) - .map(|_| ()) .map_err(|_e| ()) - })).unwrap(); + }).unwrap(); } } @@ -847,35 +853,35 @@ fn disable_keep_alive_mid_request() { let addr = listener.local_addr().unwrap(); let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); + let (tx2, rx2) = mpsc::channel(); let child = thread::spawn(move || { let mut req = connect(&addr); req.write_all(b"GET / HTTP/1.1\r\n").unwrap(); tx1.send(()).unwrap(); - rx2.wait().unwrap(); + rx2.recv().unwrap(); req.write_all(b"Host: localhost\r\n\r\n").unwrap(); let mut buf = vec![]; req.read_to_end(&mut buf).unwrap(); }); - let fut = listener.incoming() - .into_future() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); - Http::new().serve_connection(socket, HelloWorld) - .select2(rx1) + .and_then(|socket| { + let srv = Http::new().serve_connection(socket, HelloWorld); + try_future::try_select(srv, rx1) .then(|r| { match r { - Ok(Either::A(_)) => panic!("expected rx first"), - Ok(Either::B(((), mut conn))) => { - conn.graceful_shutdown(); + Ok(Either::Left(_)) => panic!("expected rx first"), + Ok(Either::Right(((), mut conn))) => { + Pin::new(&mut conn).graceful_shutdown(); tx2.send(()).unwrap(); conn } - Err(Either::A((e, _))) => panic!("unexpected error {}", e), - Err(Either::B((e, _))) => panic!("unexpected error {}", e), + Err(Either::Left((e, _))) => panic!("unexpected error {}", e), + Err(Either::Right((e, _))) => panic!("unexpected error {}", e), } }) }); @@ -914,26 +920,26 @@ fn disable_keep_alive_post_request() { let dropped = Dropped::new(); let dropped2 = dropped.clone(); - let fut = listener.incoming() - .into_future() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.expect("accepted socket"); + .and_then(|socket| { let transport = DebugStream { stream: socket, _debug: dropped2, }; - Http::new().serve_connection(transport, HelloWorld) - .select2(rx1) + let server = Http::new().serve_connection(transport, HelloWorld); + try_future::try_select(server, rx1) .then(|r| { match r { - Ok(Either::A(_)) => panic!("expected rx first"), - Ok(Either::B(((), mut conn))) => { - conn.graceful_shutdown(); + Ok(Either::Left(_)) => panic!("expected rx first"), + Ok(Either::Right(((), mut conn))) => { + Pin::new(&mut conn).graceful_shutdown(); conn } - Err(Either::A((e, _))) => panic!("unexpected error {}", e), - Err(Either::B((e, _))) => panic!("unexpected error {}", e), + Err(Either::Left((e, _))) => panic!("unexpected error {}", e), + Err(Either::Right((e, _))) => panic!("unexpected error {}", e), } }) }); @@ -944,8 +950,8 @@ fn disable_keep_alive_post_request() { // the read-blocked socket. // // See https://github.com/carllerche/mio/issues/776 - let timeout = Delay::new(Duration::from_millis(10)); - rt.block_on(timeout).unwrap(); + let timeout = Delay::new(Instant::now() + Duration::from_millis(10)); + rt.block_on(timeout); assert!(dropped.load()); child.join().unwrap(); } @@ -960,13 +966,11 @@ fn empty_parse_eof_does_not_return_error() { let _tcp = connect(&addr); }); - let fut = listener.incoming() - .into_future() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); - Http::new().serve_connection(socket, HelloWorld) - }); + .and_then(|socket| Http::new().serve_connection(socket, HelloWorld)); rt.block_on(fut).expect("empty parse eof is ok"); } @@ -982,13 +986,11 @@ fn nonempty_parse_eof_returns_error() { tcp.write_all(b"GET / HTTP/1.1").unwrap(); }); - let fut = listener.incoming() - .into_future() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); - Http::new().serve_connection(socket, HelloWorld) - }); + .and_then(|socket| Http::new().serve_connection(socket, HelloWorld)); rt.block_on(fut).expect_err("partial parse eof is error"); } @@ -1011,17 +1013,14 @@ fn socket_half_closed() { assert_eq!(s(&buf[..expected.len()]), expected); }); - let fut = listener.incoming() - .into_future() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); - Http::new() - .serve_connection(socket, service_fn(|_| { - Delay::new(Duration::from_millis(500)) - .map(|_| { - Response::new(Body::empty()) - }) + .and_then(|socket| { + Http::new().serve_connection(socket, service_fn(|_| { + Delay::new(Instant::now() + Duration::from_millis(500)) + .map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty()))) })) }); @@ -1040,16 +1039,16 @@ fn disconnect_after_reading_request_before_responding() { tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); }); - let fut = listener.incoming() - .into_future() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); + .and_then(|socket| { Http::new() .http1_half_close(false) .serve_connection(socket, service_fn(|_| { - Delay::new(Duration::from_secs(2)) - .map(|_| -> Response { + Delay::new(Instant::now() + Duration::from_secs(2)) + .map(|_| -> Result, hyper::Error> { panic!("response future should have been dropped"); }) })) @@ -1074,13 +1073,13 @@ fn returning_1xx_response_is_error() { assert_eq!(s(&buf[..expected.len()]), expected); }); - let fut = listener.incoming() - .into_future() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); + .and_then(|socket| { Http::new() - .serve_connection(socket, service_fn(|_| { + .serve_connection(socket, service_fn(|_| async move { Ok::<_, hyper::Error>(Response::builder() .status(StatusCode::CONTINUE) .body(Body::empty()) @@ -1111,7 +1110,8 @@ fn header_name_too_long() { #[test] fn upgrades() { - use tokio_io::io::{read_to_end, write_all}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); @@ -1139,11 +1139,11 @@ fn upgrades() { tcp.write_all(b"bar=foo").expect("write 2"); }); - let fut = listener.incoming() - .into_future() - .map_err(|_| -> hyper::Error { unreachable!() }) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) + .map_err(|_| unreachable!()) + .and_then(|socket| { let conn = Http::new() .serve_connection(socket, service_fn(|_| { let res = Response::builder() @@ -1151,14 +1151,14 @@ fn upgrades() { .header("upgrade", "foobar") .body(hyper::Body::empty()) .unwrap(); - Ok::<_, hyper::Error>(res) + future::ready(Ok::<_, hyper::Error>(res)) })); let mut conn_opt = Some(conn); - future::poll_fn(move || { - try_ready!(conn_opt.as_mut().unwrap().poll_without_shutdown()); + future::poll_fn(move |ctx| { + ready!(conn_opt.as_mut().unwrap().poll_without_shutdown(ctx)).unwrap(); // conn is done with HTTP now - Ok(conn_opt.take().unwrap().into()) + Poll::Ready(Ok(conn_opt.take().unwrap())) }) }); @@ -1168,17 +1168,19 @@ fn upgrades() { rt.block_on(rx).unwrap(); let parts = conn.into_parts(); - let io = parts.io; assert_eq!(parts.read_buf, "eagerly optimistic"); - let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; - let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; + let mut io = parts.io; + rt.block_on(io.write_all(b"foo=bar")).unwrap(); + let mut vec = vec![]; + rt.block_on(io.read_to_end(&mut vec)).unwrap(); assert_eq!(vec, b"bar=foo"); } #[test] fn http_connect() { - use tokio_io::io::{read_to_end, write_all}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); @@ -1204,25 +1206,25 @@ fn http_connect() { tcp.write_all(b"bar=foo").expect("write 2"); }); - let fut = listener.incoming() - .into_future() - .map_err(|_| -> hyper::Error { unreachable!() }) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) + .map_err(|_| unreachable!()) + .and_then(|socket| { let conn = Http::new() .serve_connection(socket, service_fn(|_| { let res = Response::builder() .status(200) .body(hyper::Body::empty()) .unwrap(); - Ok::<_, hyper::Error>(res) + future::ready(Ok::<_, hyper::Error>(res)) })); let mut conn_opt = Some(conn); - future::poll_fn(move || { - try_ready!(conn_opt.as_mut().unwrap().poll_without_shutdown()); + future::poll_fn(move |ctx| { + ready!(conn_opt.as_mut().unwrap().poll_without_shutdown(ctx)).unwrap(); // conn is done with HTTP now - Ok(conn_opt.take().unwrap().into()) + Poll::Ready(Ok(conn_opt.take().unwrap())) }) }); @@ -1232,17 +1234,19 @@ fn http_connect() { rt.block_on(rx).unwrap(); let parts = conn.into_parts(); - let io = parts.io; assert_eq!(parts.read_buf, "eagerly optimistic"); - let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; - let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; + let mut io = parts.io; + rt.block_on(io.write_all(b"foo=bar")).unwrap(); + let mut vec = vec![]; + rt.block_on(io.read_to_end(&mut vec)).unwrap(); assert_eq!(vec, b"bar=foo"); } #[test] fn upgrades_new() { - use tokio_io::io::{read_to_end, write_all}; + use crate::tokio::io::{AsyncReadExt, AsyncWriteExt}; + let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); @@ -1271,26 +1275,24 @@ fn upgrades_new() { }); let (upgrades_tx, upgrades_rx) = mpsc::channel(); - let svc = service_fn_ok(move |req: Request| { + let svc = service_fn(move |req: Request| { let on_upgrade = req .into_body() .on_upgrade(); let _ = upgrades_tx.send(on_upgrade); - Response::builder() + future::ok::<_, hyper::Error>(Response::builder() .status(101) .header("upgrade", "foobar") .body(hyper::Body::empty()) - .unwrap() + .unwrap()) }); - let fut = listener.incoming() - .into_future() - .map_err(|_| -> hyper::Error { unreachable!() }) - .and_then(move |(item, _incoming)| { - let socket = item.unwrap(); - Http::new() - .serve_connection(socket, svc) - .with_upgrades() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) + .map_err(|_| unreachable!()) + .and_then(|socket| { + Http::new().serve_connection(socket, svc).with_upgrades() }); rt.block_on(fut).unwrap(); @@ -1301,17 +1303,19 @@ fn upgrades_new() { let upgraded = rt.block_on(on_upgrade).unwrap(); let parts = upgraded.downcast::().unwrap(); - let io = parts.io; assert_eq!(parts.read_buf, "eagerly optimistic"); - let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; - let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; + let mut io = parts.io; + rt.block_on(io.write_all(b"foo=bar")).unwrap(); + let mut vec = vec![]; + rt.block_on(io.read_to_end(&mut vec)).unwrap(); assert_eq!(s(&vec), "bar=foo"); } #[test] fn http_connect_new() { - use tokio_io::io::{read_to_end, write_all}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); @@ -1338,25 +1342,23 @@ fn http_connect_new() { }); let (upgrades_tx, upgrades_rx) = mpsc::channel(); - let svc = service_fn_ok(move |req: Request| { + let svc = service_fn(move |req: Request| { let on_upgrade = req .into_body() .on_upgrade(); let _ = upgrades_tx.send(on_upgrade); - Response::builder() + future::ok::<_, hyper::Error>(Response::builder() .status(200) .body(hyper::Body::empty()) - .unwrap() + .unwrap()) }); - let fut = listener.incoming() - .into_future() - .map_err(|_| -> hyper::Error { unreachable!() }) - .and_then(move |(item, _incoming)| { - let socket = item.unwrap(); - Http::new() - .serve_connection(socket, svc) - .with_upgrades() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) + .map_err(|_| unreachable!()) + .and_then(|socket| { + Http::new().serve_connection(socket, svc).with_upgrades() }); rt.block_on(fut).unwrap(); @@ -1367,17 +1369,20 @@ fn http_connect_new() { let upgraded = rt.block_on(on_upgrade).unwrap(); let parts = upgraded.downcast::().unwrap(); - let io = parts.io; assert_eq!(parts.read_buf, "eagerly optimistic"); - let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; - let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; + let mut io = parts.io; + rt.block_on(io.write_all(b"foo=bar")).unwrap(); + let mut vec = vec![]; + rt.block_on(io.read_to_end(&mut vec)).unwrap(); assert_eq!(s(&vec), "bar=foo"); } #[test] fn parse_errors_send_4xx_response() { + + let mut rt = Runtime::new().unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); @@ -1392,20 +1397,19 @@ fn parse_errors_send_4xx_response() { assert_eq!(s(&buf[..expected.len()]), expected); }); - let fut = listener.incoming() - .into_future() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); - Http::new() - .serve_connection(socket, HelloWorld) - }); + .and_then(|socket| Http::new().serve_connection(socket, HelloWorld)); rt.block_on(fut).expect_err("HTTP parse error"); } #[test] fn illegal_request_length_returns_400_response() { + + let mut rt = Runtime::new().unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); @@ -1420,14 +1424,11 @@ fn illegal_request_length_returns_400_response() { assert_eq!(s(&buf[..expected.len()]), expected); }); - let fut = listener.incoming() - .into_future() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); - Http::new() - .serve_connection(socket, HelloWorld) - }); + .and_then(|socket| Http::new().serve_connection(socket, HelloWorld)); rt.block_on(fut).expect_err("illegal Content-Length should error"); } @@ -1464,11 +1465,11 @@ fn max_buf_size() { assert_eq!(s(&buf[..expected.len()]), expected); }); - let fut = listener.incoming() - .into_future() + let mut incoming = listener.incoming(); + let fut = incoming.next() + .map(Option::unwrap) .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { - let socket = item.unwrap(); + .and_then(|socket| { Http::new() .max_buf_size(MAX) .serve_connection(socket, HelloWorld) @@ -1487,8 +1488,8 @@ fn streaming_body() { .serve(); static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; - let b = ::futures::stream::iter_ok::<_, String>(S.into_iter()) - .map(|&s| s); + let b = ::futures_util::stream::iter(S.into_iter()) + .map(|&s| Ok::<_, hyper::Error>(s)); let b = hyper::Body::wrap_stream(b); server .reply() @@ -1514,14 +1515,14 @@ fn http1_response_with_http2_version() { .reply() .version(hyper::Version::HTTP_2); - rt.block_on(hyper::rt::lazy(move || { + rt.block_on({ let client = Client::new(); let uri = addr_str.parse().expect("server addr should parse"); - client.get(uri) - })).unwrap(); + }).unwrap(); } +#[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn try_h2() { let server = serve(); @@ -1529,21 +1530,22 @@ fn try_h2() { let mut rt = Runtime::new().expect("runtime new"); - rt.block_on(hyper::rt::lazy(move || { + rt.block_on({ let client = Client::builder() .http2_only(true) .build_http::(); let uri = addr_str.parse().expect("server addr should parse"); - client.get(uri) - .and_then(|_res| { Ok(()) }) - .map(|_| { () }) + client + .get(uri) + .map_ok(|_| { () }) .map_err(|_e| { () }) - })).unwrap(); + }).unwrap(); assert_eq!(server.body(), b""); } +#[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http1_only() { let server = serve_opts() @@ -1553,19 +1555,20 @@ fn http1_only() { let mut rt = Runtime::new().expect("runtime new"); - rt.block_on(hyper::rt::lazy(move || { + rt.block_on({ let client = Client::builder() .http2_only(true) .build_http::(); let uri = addr_str.parse().expect("server addr should parse"); - client.get(uri) - })).unwrap_err(); + }).unwrap_err(); } +#[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_service_error_sends_reset_reason() { use std::error::Error; + let server = serve(); let addr_str = format!("http://{}", server.addr()); @@ -1575,14 +1578,14 @@ fn http2_service_error_sends_reset_reason() { let mut rt = Runtime::new().expect("runtime new"); - let err = rt.block_on(hyper::rt::lazy(move || { + let err = rt.block_on({ let client = Client::builder() .http2_only(true) .build_http::(); let uri = addr_str.parse().expect("server addr should parse"); client.get(uri) - })).unwrap_err(); + }).unwrap_err(); let h2_err = err .source() @@ -1593,15 +1596,16 @@ fn http2_service_error_sends_reset_reason() { assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); } +#[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_body_user_error_sends_reset_reason() { use std::error::Error; let server = serve(); let addr_str = format!("http://{}", server.addr()); - let b = ::futures::stream::once::(Err( - h2::Error::from(h2::Reason::INADEQUATE_SECURITY) - )); + let b = ::futures_util::stream::once( + future::err::(h2::Error::from(h2::Reason::INADEQUATE_SECURITY)) + ); let b = hyper::Body::wrap_stream(b); server @@ -1610,7 +1614,7 @@ fn http2_body_user_error_sends_reset_reason() { let mut rt = Runtime::new().expect("runtime new"); - let err = rt.block_on(hyper::rt::lazy(move || { + let err = rt.block_on({ let client = Client::builder() .http2_only(true) .build_http::(); @@ -1618,8 +1622,8 @@ fn http2_body_user_error_sends_reset_reason() { client .get(uri) - .and_then(|res| res.into_body().concat2()) - })).unwrap_err(); + .and_then(|res| res.into_body().try_concat()) + }).unwrap_err(); let h2_err = err .source() @@ -1630,52 +1634,51 @@ fn http2_body_user_error_sends_reset_reason() { assert_eq!(h2_err.reason(), Some(h2::Reason::INADEQUATE_SECURITY)); } +struct Svc; + +impl hyper::service::Service for Svc { + type ReqBody = hyper::Body; + type ResBody = hyper::Body; + type Error = h2::Error; + type Future = Box, Self::Error> + > + Send + Sync + Unpin>; + + fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll> { + Poll::Ready(Err::<(), _>(h2::Error::from(h2::Reason::INADEQUATE_SECURITY))) + } + + fn call(&mut self, _: hyper::Request) -> Self::Future { + unreachable!("poll_ready error should have shutdown conn"); + } +} + +#[ignore] // Re-enable as soon as HTTP/2.0 is supported again. #[test] fn http2_service_poll_ready_error_sends_goaway() { use std::error::Error; - struct Svc; - - impl hyper::service::Service for Svc { - type ReqBody = hyper::Body; - type ResBody = hyper::Body; - type Error = h2::Error; - type Future = Box, - Error = Self::Error - > + Send + Sync>; - - fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { - Err(h2::Error::from(h2::Reason::INADEQUATE_SECURITY)) - } - - fn call(&mut self, _: hyper::Request) -> Self::Future { - unreachable!("poll_ready error should have shutdown conn"); - } - } - let _ = pretty_env_logger::try_init(); let server = hyper::Server::bind(&([127, 0, 0, 1], 0).into()) .http2_only(true) - .serve(|| Ok::<_, BoxError>(Svc)); + .serve(make_service_fn(|_| async move { Ok::<_, BoxError>(Svc) })); let addr_str = format!("http://{}", server.local_addr()); - let mut rt = Runtime::new().expect("runtime new"); - rt.spawn(server.map_err(|e| unreachable!("server shouldn't error: {:?}", e))); + rt.spawn(server + .map_err(|e| unreachable!("server shouldn't error: {:?}", e)) + .map(|_| ())); - let err = rt.block_on(hyper::rt::lazy(move || { + let err = rt.block_on({ let client = Client::builder() .http2_only(true) .build_http::(); let uri = addr_str.parse().expect("server addr should parse"); - - client - .get(uri) - })).unwrap_err(); + client.get(uri) + }).unwrap_err(); // client request should have gotten the specific GOAWAY error... let h2_err = err @@ -1811,6 +1814,7 @@ impl<'a> ReplyBuilder<'a> { self.tx.send(Reply::Body(body)).unwrap(); } + #[allow(dead_code)] fn error>(self, err: E) { self.tx.send(Reply::Error(err.into())).unwrap(); } @@ -1825,7 +1829,11 @@ impl<'a> Drop for ReplyBuilder<'a> { impl Drop for Serve { fn drop(&mut self) { drop(self.shutdown_signal.take()); - self.thread.take().unwrap().join().unwrap(); + let r = self.thread.take().unwrap().join(); + if let Err(ref e) = r { + println!("{:?}", e); + } + r.unwrap(); } } @@ -1852,26 +1860,38 @@ enum Msg { End, } -impl TestService { - fn call(&self, req: Request) -> Box, Error=BoxError> + Send> { +impl Service for TestService { + type ReqBody = Body; + type ResBody = Body; + type Error = BoxError; + type Future = BoxFuture<'static, Result, BoxError>>; + + fn call(&mut self, req: Request) -> Self::Future { let tx1 = self.tx.clone(); let tx2 = self.tx.clone(); let replies = self.reply.clone(); - Box::new(req.into_body().for_each(move |chunk| { - tx1.send(Msg::Chunk(chunk.to_vec())).unwrap(); - Ok(()) - }).then(move |result| { - let msg = match result { - Ok(()) => Msg::End, - Err(e) => Msg::Error(e), - }; - tx2.send(msg).unwrap(); - Ok(()) - }).and_then(move |_| { - TestService::build_reply(replies) - })) + req + .into_body() + .try_concat() + .map_ok(move |chunk| { + tx1.send(Msg::Chunk(chunk.to_vec())).unwrap(); + () + }) + .map(move |result| { + let msg = match result { + Ok(()) => Msg::End, + Err(e) => Msg::Error(e), + }; + tx2.send(msg).unwrap(); + }) + .map(move |_| { + TestService::build_reply(replies) + }) + .boxed() } +} +impl TestService { fn build_reply(replies: spmc::Receiver) -> Result, BoxError> { let mut res = Response::new(Body::empty()); while let Ok(reply) = replies.try_recv() { @@ -1904,11 +1924,11 @@ impl Service for HelloWorld { type ReqBody = Body; type ResBody = Body; type Error = hyper::Error; - type Future = FutureResult, Self::Error>; + type Future = BoxFuture<'static, Result, Self::Error>>; fn call(&mut self, _req: Request) -> Self::Future { let response = Response::new(HELLO.into()); - future::ok(response) + future::ok(response).boxed() } } @@ -1978,29 +1998,39 @@ impl ServeOptions { .name() .unwrap_or("") ); - let thread = thread::Builder::new().name(thread_name).spawn(move || { - let server = Server::bind(&addr) - .http1_only(options.http1_only) - .http1_keepalive(options.keep_alive) - .http1_pipeline_flush(options.pipeline) - .serve(move || { - let ts = TestService { + let thread = thread::Builder::new() + .name(thread_name) + .spawn(move || { + let service = make_service_fn(|_| { + let msg_tx = msg_tx.clone(); + let reply_rx = reply_rx.clone(); + future::ok::<_, BoxError>(TestService { tx: msg_tx.clone(), reply: reply_rx.clone(), - }; - service_fn(move |req| ts.call(req)) + }) }); - addr_tx.send( - server.local_addr() - ).expect("server addr tx"); + let server = Server::bind(&addr) + .http1_only(options.http1_only) + .http1_keepalive(options.keep_alive) + .http1_pipeline_flush(options.pipeline) + .serve(service); - let fut = server - .with_graceful_shutdown(shutdown_rx); + addr_tx.send( + server.local_addr() + ).expect("server addr tx"); - let mut rt = Runtime::new().expect("rt new"); - rt.block_on(fut).unwrap(); - }).expect("thread spawn"); + let fut = server + .with_graceful_shutdown(async { + shutdown_rx.await.ok(); + }); + + let mut rt = Runtime::new().expect("rt new"); + rt + .block_on(fut) + .unwrap(); + }) + .expect("thread spawn"); let addr = addr_rx.recv().expect("server addr rx"); @@ -2059,6 +2089,8 @@ struct DebugStream { _debug: D, } +impl Unpin for DebugStream {} + impl Read for DebugStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.stream.read(buf) @@ -2076,14 +2108,34 @@ impl Write for DebugStream { } -impl AsyncWrite for DebugStream { - fn shutdown(&mut self) -> futures::Poll<(), io::Error> { - self.stream.shutdown() +impl AsyncWrite for DebugStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.stream).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.stream).poll_shutdown(cx) } } -impl AsyncRead for DebugStream {} +impl AsyncRead for DebugStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.stream).poll_read(cx, buf) + } +} #[derive(Clone)] struct Dropped(Arc);