diff --git a/Cargo.toml b/Cargo.toml index de6457db..cafc0fac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,16 +38,18 @@ pin-utils = "0.1.0-alpha.4" time = "0.1" tokio = { git = "https://github.com/tokio-rs/tokio", optional = true, default-features = false, features = ["rt-full"] } tokio-buf = "0.1" +tokio-current-thread = { git = "https://github.com/tokio-rs/tokio" } tokio-executor = { git = "https://github.com/tokio-rs/tokio" } tokio-io = { git = "https://github.com/tokio-rs/tokio" } tokio-reactor = { git = "https://github.com/tokio-rs/tokio", optional = true } tokio-sync = { git = "https://github.com/tokio-rs/tokio" } -tokio-tcp = { git = "https://github.com/tokio-rs/tokio", optional = true } +tokio-tcp = { git = "https://github.com/tokio-rs/tokio", optional = true, features = ["async-traits"] } tokio-threadpool = { git = "https://github.com/tokio-rs/tokio", optional = true } tokio-timer = { git = "https://github.com/tokio-rs/tokio", optional = true } want = { git = "https://github.com/seanmonstar/want", branch = "std-future" } [dev-dependencies] +matches = "0.1" num_cpus = "1.0" pretty_env_logger = "0.3" spmc = "0.2" diff --git a/src/body/body.rs b/src/body/body.rs index f8319201..d4e01155 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -553,15 +553,16 @@ mod tests { // FIXME: re-implement tests with `async/await`, this import should // trigger a warning to remind us use crate::Error; + use futures_util::try_stream::TryStreamExt; + use tokio::runtime::current_thread::Runtime; - /* use super::*; #[test] fn test_body_stream_concat() { let body = Body::from("hello world"); - let total = body.concat2().wait().unwrap(); + let mut rt = Runtime::new().unwrap(); + let total = rt.block_on(body.try_concat()).unwrap(); assert_eq!(total.as_ref(), b"hello world"); } - */ } diff --git a/tests_disabled/client.rs b/tests/client.rs similarity index 83% rename from tests_disabled/client.rs rename to tests/client.rs index 54d7e1d6..1e07a3c2 100644 --- a/tests_disabled/client.rs +++ b/tests/client.rs @@ -1,31 +1,37 @@ +#![feature(async_await)] #![deny(warnings)] extern crate bytes; extern crate hyper; -extern crate futures; -extern crate futures_timer; +#[macro_use] extern crate matches; extern crate net2; +extern crate pretty_env_logger; extern crate tokio; extern crate tokio_io; extern crate tokio_tcp; -extern crate pretty_env_logger; +extern crate tokio_timer; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpListener}; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::thread; use std::time::Duration; use hyper::{Body, Client, Method, Request, StatusCode}; -use futures::{Future, Stream}; -use futures::sync::oneshot; +use futures_core::{Future, Stream, TryFuture}; +use futures_channel::oneshot; +use futures_util::future::{self, FutureExt}; +use futures_util::try_future::{self, TryFutureExt}; +use futures_util::try_stream::TryStreamExt; use tokio::runtime::current_thread::Runtime; -use tokio_tcp::{ConnectFuture, TcpListener as TkTcpListener, TcpStream}; +use tokio_tcp::TcpStream; fn s(buf: &[u8]) -> &str { ::std::str::from_utf8(buf).expect("from_utf8") } -fn tcp_connect(addr: &SocketAddr) -> ConnectFuture { +fn tcp_connect(addr: &SocketAddr) -> impl Future> { TcpStream::connect(addr) } @@ -147,9 +153,7 @@ macro_rules! test { ); )* - let body = rt.block_on(res - .into_body() - .concat2()) + let body = rt.block_on(res.into_body().try_concat()) .expect("body concat wait"); let expected_res_body = Option::<&[u8]>::from($response_body) @@ -254,12 +258,12 @@ macro_rules! test { assert_eq!(s(&buf[..n]), expected); inc.write_all($server_reply.as_ref()).expect("write_all"); - let _ = tx.send(()); + let _ = tx.send(Ok::<_, hyper::Error>(())); }).expect("thread spawn"); let rx = rx.expect("thread panicked"); - rt.block_on(res.join(rx).map(|r| r.0)).map(move |mut resp| { + rt.block_on(try_future::try_join(res, rx).map_ok(|r| r.0)).map(move |mut resp| { // Always check that HttpConnector has set the "extra" info... let extra = resp .extensions_mut() @@ -759,14 +763,18 @@ mod dispatch_impl { use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; - use std::time::Duration; + use std::time::{Duration, Instant}; - use futures::{self, Future}; - use futures::sync::{mpsc, oneshot}; - use futures_timer::Delay; - use tokio_tcp::TcpStream; + use futures_core::{self, Future}; + use futures_channel::{mpsc, oneshot}; + use futures_util::future::FutureExt; + use futures_util::stream::StreamExt; + use futures_util::try_future::TryFutureExt; + use futures_util::try_stream::TryStreamExt; use tokio::runtime::current_thread::Runtime; use tokio_io::{AsyncRead, AsyncWrite}; + use tokio_tcp::TcpStream; + use tokio_timer::Delay; use hyper::client::connect::{Connect, Connected, Destination, HttpConnector}; use hyper::Client; @@ -804,15 +812,14 @@ mod dispatch_impl { .uri(&*format!("http://{}/a", addr)) .body(Body::empty()) .unwrap(); - let res = client.request(req).and_then(move |res| { + let res = client.request(req).map_ok(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - Delay::new(Duration::from_secs(1)) - .expect("timeout") + Delay::new(Instant::now() + Duration::from_secs(1)) }); let rx = rx1.expect("thread panicked"); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - rt.block_on(closes.into_future()).unwrap().0.expect("closes"); + rt.block_on(closes.into_future()).0.expect("closes"); } #[test] @@ -849,22 +856,23 @@ mod dispatch_impl { .unwrap(); client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() - }).and_then(|_| { - Delay::new(Duration::from_secs(1)) - .expect("timeout") + res.into_body().try_concat() + }).map_ok(|_| { + Delay::new(Instant::now() + Duration::from_secs(1)) }) }; // client is dropped let rx = rx1.expect("thread panicked"); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - rt.block_on(closes.into_future()).unwrap().0.expect("closes"); + rt.block_on(closes.into_future()).0.expect("closes"); } #[test] fn drop_client_closes_idle_connections() { + use futures_util::future; + let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); @@ -881,14 +889,14 @@ mod dispatch_impl { sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); let mut buf = [0; 4096]; sock.read(&mut buf).expect("read 1"); - let body =[b'x'; 64]; + let body = [b'x'; 64]; write!(sock, "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", body.len()).expect("write head"); let _ = sock.write_all(&body); let _ = tx1.send(()); // prevent this thread from closing until end of test, so the connection // stays open and idle until Client is dropped - let _ = client_drop_rx.wait(); + Runtime::new().unwrap().block_on(client_drop_rx.into_future()) }); let client = Client::builder() @@ -900,31 +908,28 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().try_concat() }); let rx = rx1.expect("thread panicked"); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); // not closed yet, just idle { - rt.block_on(futures::future::poll_fn(|| { - assert!(closes.poll()?.is_not_ready()); - Ok::<_, ()>(().into()) + rt.block_on(future::poll_fn(|ctx| { + assert!(Pin::new(&mut closes).poll_next(ctx).is_pending()); + Poll::Ready(Ok::<_, ()>(())) })).unwrap(); } drop(client); - let t = Delay::new(Duration::from_millis(100)) + let t = Delay::new(Instant::now() + Duration::from_millis(100)) .map(|_| panic!("time out")); - let close = closes.into_future() - .map(|(opt, _)| { - opt.expect("closes"); - }) - .map_err(|_| panic!("closes dropped")); - let _ = rt.block_on(t.select(close)); + let close = closes + .into_future() + .map(|(opt, _)| opt.expect("closes")); + let _ = rt.block_on(future::select(t, close)); } - #[test] fn drop_response_future_closes_in_progress_connection() { let _ = pretty_env_logger::try_init(); @@ -949,7 +954,7 @@ mod dispatch_impl { // prevent this thread from closing until end of test, so the connection // stays open and idle until Client is dropped - let _ = client_drop_rx.wait(); + Runtime::new().unwrap().block_on(client_drop_rx.into_future()).unwrap(); }); let res = { @@ -963,16 +968,15 @@ mod dispatch_impl { client.request(req) }; - rt.block_on(res.select2(rx1)).unwrap(); + rt.block_on(future::select(res, rx1)); + // res now dropped - let t = Delay::new(Duration::from_millis(100)) + let t = Delay::new(Instant::now() + Duration::from_millis(100)) .map(|_| panic!("time out")); - let close = closes.into_future() - .map(|(opt, _)| { - opt.expect("closes"); - }) - .map_err(|_| panic!("closes dropped")); - let _ = rt.block_on(t.select(close)); + let close = closes + .into_future() + .map(|(opt, _)| opt.expect("closes")); + let _ = rt.block_on(future::select(t, close)); } #[test] @@ -998,7 +1002,7 @@ mod dispatch_impl { // prevent this thread from closing until end of test, so the connection // stays open and idle until Client is dropped - let _ = client_drop_rx.wait(); + Runtime::new().unwrap().block_on(client_drop_rx.into_future()).unwrap(); }); let res = { @@ -1014,16 +1018,14 @@ mod dispatch_impl { }; let rx = rx1.expect("thread panicked"); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - let t = Delay::new(Duration::from_millis(100)) + let t = Delay::new(Instant::now() + Duration::from_millis(100)) .map(|_| panic!("time out")); - let close = closes.into_future() - .map(|(opt, _)| { - opt.expect("closes"); - }) - .map_err(|_| panic!("closes dropped")); - let _ = rt.block_on(t.select(close)); + let close = closes + .into_future() + .map(|(opt, _)| opt.expect("closes")); + let _ = rt.block_on(future::select(t, close)); } #[test] @@ -1047,7 +1049,7 @@ mod dispatch_impl { sock.read(&mut buf).expect("read 1"); sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap(); let _ = tx1.send(()); - let _ = rx2.wait(); + let _ = Runtime::new().unwrap().block_on(rx2).unwrap(); }); let client = Client::builder() @@ -1060,19 +1062,17 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().try_concat() }); let rx = rx1.expect("thread panicked"); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - let t = Delay::new(Duration::from_millis(100)) + let t = Delay::new(Instant::now() + Duration::from_millis(100)) .map(|_| panic!("time out")); - let close = closes.into_future() - .map(|(opt, _)| { - opt.expect("closes"); - }) - .map_err(|_| panic!("closes dropped")); - let _ = rt.block_on(t.select(close)); + let close = closes + .into_future() + .map(|(opt, _)| opt.expect("closes")); + let _ = rt.block_on(future::select(t, close)); } #[test] @@ -1106,19 +1106,17 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().try_concat() }); let rx = rx1.expect("thread panicked"); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); - let t = Delay::new(Duration::from_millis(100)) + let t = Delay::new(Instant::now() + Duration::from_millis(100)) .map(|_| panic!("time out")); - let close = closes.into_future() - .map(|(opt, _)| { - opt.expect("closes"); - }) - .map_err(|_| panic!("closes dropped")); - let _ = rt.block_on(t.select(close)); + let close = closes + .into_future() + .map(|(opt, _)| opt.expect("closes")); + let _ = rt.block_on(future::select(t, close)); } #[test] @@ -1186,7 +1184,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); assert_eq!(connects.load(Ordering::SeqCst), 1); @@ -1200,7 +1198,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); assert_eq!(connects.load(Ordering::SeqCst), 1, "second request should still only have 1 connect"); drop(client); @@ -1251,7 +1249,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); assert_eq!(connects.load(Ordering::Relaxed), 1); @@ -1261,14 +1259,15 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); assert_eq!(connects.load(Ordering::Relaxed), 2); } #[test] fn client_keep_alive_when_response_before_request_body_ends() { - use futures_timer::Delay; + use tokio_timer::Delay; + let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); @@ -1308,10 +1307,10 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 0); let delayed_body = rx1 + .then(|_| Delay::new(Instant::now() + Duration::from_millis(200))) + .map(|_| Ok::<_, ()>("hello a")) .map_err(|_| -> hyper::Error { panic!("rx1") }) - .and_then(|_| Delay::new(Duration::from_millis(200)).map_err(|_| panic!("delay"))) - .into_stream() - .map(|_| "hello a"); + .into_stream(); let rx = rx2.expect("thread panicked"); let req = Request::builder() @@ -1322,20 +1321,17 @@ mod dispatch_impl { let client2 = client.clone(); // req 1 - let fut = client.request(req) - .join(rx) - .and_then(|_| Delay::new(Duration::from_millis(200)).expect("delay")) + let fut = future::join(client.request(req), rx) + .then(|_| Delay::new(Instant::now() + Duration::from_millis(200))) // req 2 - .and_then(move |()| { + .then(move |()| { let rx = rx3.expect("thread panicked"); let req = Request::builder() .uri(&*format!("http://{}/b", addr)) .body(Body::empty()) .unwrap(); - client2 - .request(req) - .join(rx) - .map(|_| ()) + future::join(client2.request(req), rx) + .map(|r| r.0) }); rt.block_on(fut).unwrap(); @@ -1377,7 +1373,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } #[test] @@ -1415,12 +1411,13 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } #[test] fn client_upgrade() { - use tokio_io::io::{read_to_end, write_all}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); @@ -1460,7 +1457,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req); - let res = rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + let res = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); assert_eq!(res.status(), 101); let upgraded = rt.block_on(res @@ -1471,17 +1468,18 @@ mod dispatch_impl { let parts = upgraded.downcast::().unwrap(); assert_eq!(s(&parts.read_buf), "foobar=ready"); - let io = parts.io; - let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; - let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; + let mut io = parts.io; + rt.block_on(io.write_all(b"foo=bar")).unwrap(); + let mut vec = vec![]; + rt.block_on(io.read_to_end(&mut vec)).unwrap(); assert_eq!(vec, b"bar=foo"); } - #[test] + /*#[test] fn alpn_h2() { use hyper::Response; use hyper::server::conn::Http; - use hyper::service::service_fn_ok; + use hyper::service::service_fn; let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); @@ -1495,20 +1493,20 @@ mod dispatch_impl { .build::<_, ::hyper::Body>(connector); let srv = listener.incoming() - .into_future() + .try_next() .map_err(|_| unreachable!()) - .and_then(|(item, _incoming)| { + .and_then(|item| { let socket = item.unwrap(); Http::new() .http2_only(true) - .serve_connection(socket, service_fn_ok(|req| { + .serve_connection(socket, service_fn(|req| async move { assert_eq!(req.headers().get("host"), None); - Response::new(Body::empty()) + Ok(Response::new(Body::empty())) })) }) .map_err(|e| panic!("server error: {}", e)); - rt.spawn(srv); + rt.block_on(srv).unwrap(); assert_eq!(connects.load(Ordering::SeqCst), 0); @@ -1516,7 +1514,7 @@ mod dispatch_impl { let res1 = client.get(url.clone()); let res2 = client.get(url.clone()); let res3 = client.get(url.clone()); - rt.block_on(res1.join(res2).join(res3)).unwrap(); + rt.block_on(try_future::try_join3(res1, res2, res3)).unwrap(); // Since the client doesn't know it can ALPN at first, it will have // started 3 connections. But, the server above will only handle 1, @@ -1528,7 +1526,7 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::SeqCst), 3, "after ALPN, no more connects"); drop(client); - } + }*/ struct DebugConnector { @@ -1565,14 +1563,16 @@ mod dispatch_impl { impl Connect for DebugConnector { type Transport = DebugStream; type Error = io::Error; - type Future = Box + Send>; + type Future = Pin + > + Send>>; fn connect(&self, dst: Destination) -> Self::Future { self.connects.fetch_add(1, Ordering::SeqCst); let closes = self.closes.clone(); let is_proxy = self.is_proxy; let is_alpn_h2 = self.alpn_h2; - Box::new(self.http.connect(dst).map(move |(s, mut c)| { + Box::pin(self.http.connect(dst).map_ok(move |(s, mut c)| { if is_alpn_h2 { c = c.negotiated_h2(); } @@ -1589,48 +1589,51 @@ mod dispatch_impl { } } - impl Write for DebugStream { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.0.flush() - } - } - impl AsyncWrite for DebugStream { - fn shutdown(&mut self) -> futures::Poll<(), io::Error> { - AsyncWrite::shutdown(&mut self.0) + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) } - fn write_buf(&mut self, buf: &mut B) -> futures::Poll { - self.0.write_buf(buf) + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) } } - impl Read for DebugStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0.read(buf) + impl AsyncRead for DebugStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) } } - - impl AsyncRead for DebugStream {} } mod conn { use std::io::{self, Read, Write}; use std::net::TcpListener; + use std::pin::Pin; + use std::task::{Context, Poll}; use std::thread; - use std::time::Duration; + use std::time::{Duration, Instant}; - use futures::{Async, Future, Poll, Stream}; - use futures::future::poll_fn; - use futures::sync::oneshot; - use futures_timer::Delay; + use futures_channel::oneshot; + use futures_util::future::{self, poll_fn, FutureExt}; + use futures_util::try_future::TryFutureExt; + use futures_util::try_stream::TryStreamExt; use tokio::runtime::current_thread::Runtime; - use tokio_tcp::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; + use tokio_tcp::TcpStream; + use tokio_timer::Delay; use hyper::{self, Request, Body, Method}; use hyper::client::conn; @@ -1666,7 +1669,7 @@ mod conn { let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); let req = Request::builder() .uri("/a") @@ -1674,13 +1677,11 @@ mod conn { .unwrap(); let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().try_concat() }); let rx = rx1.expect("thread panicked"); - - let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.expect("timeout")); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + let rx = rx.then(|_| Delay::new(Instant::now() + Duration::from_millis(200))); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } #[test] @@ -1711,7 +1712,7 @@ mod conn { let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); let req = Request::builder() .uri("/") @@ -1721,23 +1722,13 @@ mod conn { assert_eq!(res.status(), hyper::StatusCode::OK); assert_eq!(res.body().content_length(), Some(5)); assert!(!res.body().is_end_stream()); - loop { - let chunk = res.body_mut().poll_data().unwrap(); - match chunk { - Async::Ready(Some(chunk)) => { - assert_eq!(chunk.len(), 5); - break; - } - _ => continue - } - } - res.into_body().concat2() + poll_fn(move |ctx| Pin::new(res.body_mut()).poll_data(ctx)).map(Option::unwrap) }); - let rx = rx1.expect("thread panicked"); - let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.expect("timeout")); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + let rx = rx1.expect("thread panicked"); + let rx = rx.then(|_| Delay::new(Instant::now() + Duration::from_millis(200))); + let chunk = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); + assert_eq!(chunk.len(), 5); } #[test] @@ -1766,12 +1757,12 @@ mod conn { let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); let (mut sender, body) = Body::channel(); let sender = thread::spawn(move || { sender.send_data("hello".into()).ok().unwrap(); - rx.wait().unwrap(); + Runtime::new().unwrap().block_on(rx).unwrap(); sender.abort(); }); @@ -1815,7 +1806,7 @@ mod conn { let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); let req = Request::builder() .uri("http://hyper.local/a") @@ -1824,13 +1815,11 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().try_concat() }); let rx = rx1.expect("thread panicked"); - - let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.expect("timeout")); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + let rx = rx.then(|_| Delay::new(Instant::now() + Duration::from_millis(200))); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } #[test] @@ -1860,7 +1849,7 @@ mod conn { let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); let req = Request::builder() .uri("/a") @@ -1870,13 +1859,11 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().try_concat() }); let rx = rx1.expect("thread panicked"); - - let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.expect("timeout")); - rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); + let rx = rx.then(|_| Delay::new(Instant::now() + Duration::from_millis(200))); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); } #[test] @@ -1895,14 +1882,14 @@ mod conn { sock.read(&mut buf).expect("read 1"); sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap(); - let _ = tx1.send(()); + let _ = tx1.send(Ok::<_, ()>(())); }); let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map_err(|e| panic!("conn error: {}", e)).map(|_| ())); let req = Request::builder() .uri("/a") @@ -1910,7 +1897,7 @@ mod conn { .unwrap(); let res1 = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().try_concat() }); // pipelined request will hit NotReady, and thus should return an Error::Cancel @@ -1918,23 +1905,23 @@ mod conn { .uri("/b") .body(Default::default()) .unwrap(); - let res2 = client.send_request(req) - .then(|result| { + let res2 = client + .send_request(req) + .map(|result| { let err = result.expect_err("res2"); assert!(err.is_canceled(), "err not canceled, {:?}", err); - Ok(()) + Ok::<_, ()>(()) }); let rx = rx1.expect("thread panicked"); - - let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.expect("timeout")); - rt.block_on(res1.join(res2).join(rx).map(|r| r.0)).unwrap(); + let rx = rx.then(|_| Delay::new(Instant::now() + Duration::from_millis(200))); + rt.block_on(future::join3(res1, res2, rx).map(|r| r.0)).unwrap(); } #[test] fn upgrade() { - use tokio_io::io::{read_to_end, write_all}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + let _ = ::pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); @@ -1972,8 +1959,8 @@ mod conn { let (mut client, mut conn) = rt.block_on(conn::handshake(io)).unwrap(); { - let until_upgrade = poll_fn(|| { - conn.poll_without_shutdown() + let until_upgrade = poll_fn(|ctx| { + conn.poll_without_shutdown(ctx) }); let req = Request::builder() @@ -1983,38 +1970,42 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::SWITCHING_PROTOCOLS); assert_eq!(res.headers()["Upgrade"], "foobar"); - res.into_body().concat2() + res.into_body().try_concat() }); let rx = rx1.expect("thread panicked"); - - let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.expect("timeout")); - rt.block_on(until_upgrade.join(res).join(rx).map(|r| r.0)).unwrap(); + let rx = rx.then(|_| Delay::new(Instant::now() + Duration::from_millis(200))); + rt.block_on(future::join3(until_upgrade, res, rx).map(|r| r.0)).unwrap(); // should not be ready now - rt.block_on(poll_fn(|| { - assert!(client.poll_ready().unwrap().is_not_ready()); - Ok::<_, ()>(Async::Ready(())) + rt.block_on(poll_fn(|ctx| { + assert!(client.poll_ready(ctx).is_pending()); + Poll::Ready(Ok::<_, ()>(())) })).unwrap(); } let parts = conn.into_parts(); - let io = parts.io; + let mut io = parts.io; let buf = parts.read_buf; assert_eq!(buf, b"foobar=ready"[..]); assert!(!io.shutdown_called, "upgrade shouldn't shutdown AsyncWrite"); - assert!(client.poll_ready().is_err()); + rt.block_on(poll_fn(|ctx| { + let ready = client.poll_ready(ctx); + assert_matches!(ready, Poll::Ready(Err(_))); + ready + })).unwrap_err(); - let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; - let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; + let mut vec = vec![]; + rt.block_on(io.write_all(b"foo=bar")).unwrap(); + rt.block_on(io.read_to_end(&mut vec)).unwrap(); assert_eq!(vec, b"bar=foo"); } #[test] fn connect_method() { - use tokio_io::io::{read_to_end, write_all}; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + let _ = ::pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); @@ -2034,7 +2025,7 @@ mod conn { \r\n\ foobar=ready\ ").unwrap(); - let _ = tx1.send(()); + let _ = tx1.send(Ok::<_, ()>(())); let n = sock.read(&mut buf).expect("read 2"); assert_eq!(&buf[..n], b"foo=bar", "sock read 2 bytes"); @@ -2051,8 +2042,8 @@ mod conn { let (mut client, mut conn) = rt.block_on(conn::handshake(io)).unwrap(); { - let until_tunneled = poll_fn(|| { - conn.poll_without_shutdown() + let until_tunneled = poll_fn(|ctx| { + conn.poll_without_shutdown(ctx) }); let req = Request::builder() @@ -2060,47 +2051,52 @@ mod conn { .uri(addr.to_string()) .body(Default::default()) .unwrap(); - let res = client.send_request(req) + let res = client + .send_request(req) .and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().try_concat() }) - .map(|body| { + .map_ok(|body| { assert_eq!(body.as_ref(), b""); }); let rx = rx1.expect("thread panicked"); - - let timeout = Delay::new(Duration::from_millis(200)); - let rx = rx.and_then(move |_| timeout.expect("timeout")); - rt.block_on(until_tunneled.join(res).join(rx).map(|r| r.0)).unwrap(); + let rx = rx.then(|_| Delay::new(Instant::now() + Duration::from_millis(200))); + rt.block_on(future::join3(until_tunneled, res, rx).map(|r| r.0)).unwrap(); // should not be ready now - rt.block_on(poll_fn(|| { - assert!(client.poll_ready().unwrap().is_not_ready()); - Ok::<_, ()>(Async::Ready(())) + rt.block_on(poll_fn(|ctx| { + assert!(client.poll_ready(ctx).is_pending()); + Poll::Ready(Ok::<_, ()>(())) })).unwrap(); } let parts = conn.into_parts(); - let io = parts.io; + let mut io = parts.io; let buf = parts.read_buf; assert_eq!(buf, b"foobar=ready"[..]); assert!(!io.shutdown_called, "tunnel shouldn't shutdown AsyncWrite"); - assert!(client.poll_ready().is_err()); - let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; - let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; + rt.block_on(poll_fn(|ctx| { + let ready = client.poll_ready(ctx); + assert_matches!(ready, Poll::Ready(Err(_))); + ready + })).unwrap_err(); + + let mut vec = vec![]; + rt.block_on(io.write_all(b"foo=bar")).unwrap(); + rt.block_on(io.read_to_end(&mut vec)).unwrap(); assert_eq!(vec, b"bar=foo"); } - - #[test] - fn http2_detect_conn_eof() { - use futures::future; + // DISABLED + // #[test] + /*fn http2_detect_conn_eof() { + use futures_util::future; use hyper::{Response, Server}; - use hyper::service::service_fn_ok; + use hyper::service::service_fn; use tokio::timer::Delay; let _ = pretty_env_logger::try_init(); @@ -2109,8 +2105,8 @@ mod conn { let server = Server::bind(&([127, 0, 0, 1], 0).into()) .http2_only(true) - .serve(|| service_fn_ok(|_req| { - Response::new(Body::empty()) + .serve(|| service_fn(|_req| { + Ok(Response::new(Body::empty())) })); let addr = server.local_addr(); let (shdn_tx, shdn_rx) = oneshot::channel(); @@ -2124,7 +2120,7 @@ mod conn { // Sanity check that client is ready - rt.block_on(future::poll_fn(|| client.poll_ready())).expect("client poll ready sanity"); + rt.block_on(future::poll_fn(|ctx| client.poll_ready(ctx))).expect("client poll ready sanity"); let req = Request::builder() .uri(format!("http://{}/", addr)) @@ -2134,7 +2130,7 @@ mod conn { rt.block_on(client.send_request(req)).expect("req1 send"); // Sanity check that client is STILL ready - rt.block_on(future::poll_fn(|| client.poll_ready())).expect("client poll ready after"); + rt.block_on(future::poll_fn(|ctx| client.poll_ready(ctx))).expect("client poll ready after"); // Trigger the server shutdown... let _ = shdn_tx.send(()); @@ -2143,15 +2139,15 @@ mod conn { rt.block_on(Delay::new(::std::time::Instant::now() + Duration::from_millis(100)).map_err(|e| panic!("delay error: {:?}", e))).expect("delay"); // After graceful shutdown roundtrips, the client should be closed... - rt.block_on(future::poll_fn(|| client.poll_ready())).expect_err("client should be closed"); - } + rt.block_on(future::poll_fn(|ctx| client.poll_ready(ctx))).expect_err("client should be closed"); + }*/ struct DebugStream { tcp: TcpStream, shutdown_called: bool, } - impl Write for DebugStream { + /*impl Write for DebugStream { fn write(&mut self, buf: &[u8]) -> io::Result { self.tcp.write(buf) } @@ -2159,34 +2155,50 @@ mod conn { fn flush(&mut self) -> io::Result<()> { self.tcp.flush() } - } + }*/ impl AsyncWrite for DebugStream { - fn shutdown(&mut self) -> Poll<(), io::Error> { + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.shutdown_called = true; - AsyncWrite::shutdown(&mut self.tcp) + Pin::new(&mut self.tcp).poll_shutdown(cx) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.tcp).poll_flush(cx) + } + + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.tcp).poll_write(cx, buf) } } - impl Read for DebugStream { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.tcp.read(buf) + impl AsyncRead for DebugStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.tcp).poll_read(cx, buf) } } - - impl AsyncRead for DebugStream {} } -trait FutureHyperExt: Future { - fn expect(self, msg: &'static str) -> Box>; +trait FutureHyperExt: TryFuture { + fn expect(self, msg: &'static str) -> Pin>>; } impl FutureHyperExt for F where - F: Future + 'static, + F: TryFuture + 'static, F::Error: ::std::fmt::Debug, { - fn expect(self, msg: &'static str) -> Box> { - Box::new(self.map_err(move |e| panic!("expect: {}; error={:?}", msg, e))) + fn expect(self, msg: &'static str) -> Pin>> { + Box::pin(self + .inspect_err(move |e| panic!("expect: {}; error={:?}", msg, e)) + .map(Result::unwrap)) } } diff --git a/tests_disabled/support/mod.rs b/tests/support/mod.rs similarity index 98% rename from tests_disabled/support/mod.rs rename to tests/support/mod.rs index 11437f96..f5bc0bda 100644 --- a/tests_disabled/support/mod.rs +++ b/tests/support/mod.rs @@ -1,9 +1,8 @@ -pub extern crate futures; pub extern crate hyper; pub extern crate tokio; use std::sync::{Arc, Mutex, atomic::{AtomicUsize, Ordering}}; -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::hyper::{Body, Client, Request, Response, Server, Version}; use crate::hyper::client::HttpConnector; @@ -11,7 +10,7 @@ use crate::hyper::service::service_fn; pub use std::net::SocketAddr; pub use self::futures::{future, Future, Stream}; -pub use self::futures::sync::oneshot; +pub use self::futures_channel::oneshot; pub use self::hyper::{HeaderMap, StatusCode}; pub use self::tokio::runtime::current_thread::Runtime; @@ -341,7 +340,7 @@ pub fn __run_test(cfg: __TestConfig) { } let sbody = sreq.body; req.into_body() - .concat2() + .try_concat() .map(move |body| { assert_eq!(body.as_ref(), sbody.as_slice(), "client body"); @@ -417,7 +416,7 @@ pub fn __run_test(cfg: __TestConfig) { for func in &cheaders { func(&res.headers()); } - res.into_body().concat2() + res.into_body().try_concat() }) .map(move |body| { assert_eq!(body.as_ref(), cbody.as_slice(), "server body");