From e4ebf4482372497f0cd5fa63d6492bd1c51d7b21 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 18 Jun 2018 12:30:46 -0700 Subject: [PATCH] chore(tests): change tests to use current_thread runtime --- benches/end_to_end.rs | 19 ++-- src/client/pool.rs | 25 +++-- src/client/tests.rs | 23 ++-- tests/client.rs | 241 ++++++++++++++++++------------------------ tests/server.rs | 102 +++++++++--------- tests/support/mod.rs | 27 ++--- 6 files changed, 200 insertions(+), 237 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index f62ce86a..aedfaec2 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -9,7 +9,7 @@ extern crate tokio; use std::net::SocketAddr; use futures::{Future, Stream}; -use tokio::runtime::Runtime; +use tokio::runtime::current_thread::Runtime; use tokio::net::TcpListener; use hyper::{Body, Method, Request, Response}; @@ -22,22 +22,21 @@ fn get_one_at_a_time(b: &mut test::Bencher) { let mut rt = Runtime::new().unwrap(); let addr = spawn_hello(&mut rt); - let connector = HttpConnector::new_with_handle(1, rt.reactor().clone()); + let connector = HttpConnector::new(1); let client = hyper::Client::builder() - .executor(rt.executor()) .build::<_, Body>(connector); let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap(); b.bytes = 160 * 2 + PHRASE.len() as u64; b.iter(move || { - client.get(url.clone()) + rt.block_on(client.get(url.clone()) .and_then(|res| { res.into_body().for_each(|_chunk| { Ok(()) }) - }) - .wait().expect("client wait"); + })) + .expect("client wait"); }); } @@ -46,9 +45,8 @@ fn post_one_at_a_time(b: &mut test::Bencher) { let mut rt = Runtime::new().unwrap(); let addr = spawn_hello(&mut rt); - let connector = HttpConnector::new_with_handle(1, rt.reactor().clone()); + let connector = HttpConnector::new(1); let client = hyper::Client::builder() - .executor(rt.executor()) .build::<_, Body>(connector); let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap(); @@ -59,12 +57,11 @@ fn post_one_at_a_time(b: &mut test::Bencher) { let mut req = Request::new(post.into()); *req.method_mut() = Method::POST; *req.uri_mut() = url.clone(); - client.request(req).and_then(|res| { + rt.block_on(client.request(req).and_then(|res| { res.into_body().for_each(|_chunk| { Ok(()) }) - }).wait().expect("client wait"); - + })).expect("client wait"); }); } diff --git a/src/client/pool.rs b/src/client/pool.rs index c86577c6..74eb7d63 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -805,20 +805,29 @@ mod tests { #[cfg(feature = "runtime")] #[test] fn test_pool_timer_removes_expired() { - use std::sync::Arc; - let runtime = ::tokio::runtime::Runtime::new().unwrap(); - let executor = runtime.executor(); - let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Executor(Arc::new(executor))); + use std::time::Instant; + use tokio_timer::Delay; + let mut rt = ::tokio::runtime::current_thread::Runtime::new().unwrap(); + let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Default); let key = (Arc::new("foo".to_string()), Ver::Http1); - pool.pooled(c(key.clone()), Uniq(41)); - pool.pooled(c(key.clone()), Uniq(5)); - pool.pooled(c(key.clone()), Uniq(99)); + // Since pool.pooled() will be calling spawn on drop, need to be sure + // those drops are called while `rt` is the current executor. To do so, + // call those inside a future. + rt.block_on(::futures::future::lazy(|| { + pool.pooled(c(key.clone()), Uniq(41)); + pool.pooled(c(key.clone()), Uniq(5)); + pool.pooled(c(key.clone()), Uniq(99)); + Ok::<_, ()>(()) + })).unwrap(); assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); - ::std::thread::sleep(Duration::from_millis(400)); // allow for too-good resolution + // Let the timer tick passed the expiration... + rt + .block_on(Delay::new(Instant::now() + Duration::from_millis(200))) + .expect("rt block_on 200ms"); assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none()); } diff --git a/src/client/tests.rs b/src/client/tests.rs index 18a48a1f..7c59e3cf 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -1,12 +1,9 @@ #![cfg(feature = "runtime")] extern crate pretty_env_logger; -use std::thread; -use std::time::Duration; - use futures::Async; use futures::future::poll_fn; -use tokio::executor::thread_pool::{Builder as ThreadPoolBuilder}; +use tokio::runtime::current_thread::Runtime; use mock::MockConnector; use super::*; @@ -15,14 +12,13 @@ use super::*; fn retryable_request() { let _ = pretty_env_logger::try_init(); - let executor = ThreadPoolBuilder::new().pool_size(1).build(); + let mut rt = Runtime::new().expect("new rt"); let mut connector = MockConnector::new(); let sock1 = connector.mock("http://mock.local"); let sock2 = connector.mock("http://mock.local"); let client = Client::builder() - .executor(executor.sender().clone()) .build::<_, ::Body>(connector); client.pool.no_timer(); @@ -39,7 +35,7 @@ fn retryable_request() { try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); - res1.join(srv1).wait().expect("res1"); + rt.block_on(res1.join(srv1)).expect("res1"); } drop(sock1); @@ -57,20 +53,19 @@ fn retryable_request() { Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e)); - res2.join(srv2).wait().expect("res2"); + rt.block_on(res2.join(srv2)).expect("res2"); } #[test] fn conn_reset_after_write() { let _ = pretty_env_logger::try_init(); - let executor = ThreadPoolBuilder::new().pool_size(1).build(); + let mut rt = Runtime::new().expect("new rt"); let mut connector = MockConnector::new(); let sock1 = connector.mock("http://mock.local"); let client = Client::builder() - .executor(executor.sender().clone()) .build::<_, ::Body>(connector); client.pool.no_timer(); @@ -88,12 +83,9 @@ fn conn_reset_after_write() { try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); - res1.join(srv1).wait().expect("res1"); + rt.block_on(res1.join(srv1)).expect("res1"); } - // sleep to allow some time for the connection to return to the pool - thread::sleep(Duration::from_millis(10)); - let req = Request::builder() .uri("http://mock.local/a") .body(Default::default()) @@ -111,9 +103,10 @@ fn conn_reset_after_write() { sock1.take(); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e)); - let err = res2.join(srv2).wait().expect_err("res2"); + let err = rt.block_on(res2.join(srv2)).expect_err("res2"); match err.kind() { &::error::Kind::Incomplete => (), other => panic!("expected Incomplete, found {:?}", other) } } + diff --git a/tests/client.rs b/tests/client.rs index 119533f4..93eb77cd 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -18,7 +18,7 @@ use hyper::{Body, Client, Method, Request, StatusCode}; use futures::{Future, Stream}; use futures::sync::oneshot; use tokio::reactor::Handle; -use tokio::runtime::Runtime; +use tokio::runtime::current_thread::Runtime; use tokio::net::{ConnectFuture, TcpStream}; fn s(buf: &[u8]) -> &str { @@ -126,12 +126,12 @@ macro_rules! test { #[test] fn $name() { let _ = pretty_env_logger::try_init(); - let runtime = Runtime::new().expect("runtime new"); + let mut rt = Runtime::new().expect("runtime new"); let res = test! { INNER; name: $name, - runtime: &runtime, + runtime: &mut rt, server: expected: $server_expected, reply: $server_reply, @@ -151,17 +151,14 @@ macro_rules! test { assert_eq!(res.headers()[$response_header_name], $response_header_val); )* - let body = res + let body = rt.block_on(res .into_body() - .concat2() - .wait() + .concat2()) .expect("body concat wait"); let expected_res_body = Option::<&[u8]>::from($response_body) .unwrap_or_default(); assert_eq!(body.as_ref(), expected_res_body); - - runtime.shutdown_on_idle().wait().expect("rt shutdown"); } ); ( @@ -181,12 +178,12 @@ macro_rules! test { #[test] fn $name() { let _ = pretty_env_logger::try_init(); - let runtime = Runtime::new().expect("runtime new"); + let mut rt = Runtime::new().expect("runtime new"); let err: ::hyper::Error = test! { INNER; name: $name, - runtime: &runtime, + runtime: &mut rt, server: expected: $server_expected, reply: $server_reply, @@ -206,8 +203,6 @@ macro_rules! test { if !closure(&err) { panic!("expected error, unexpected variant: {:?}", err); } - - runtime.shutdown_on_idle().wait().expect("rt shutdown"); } ); @@ -229,13 +224,12 @@ macro_rules! test { ) => ({ let server = TcpListener::bind("127.0.0.1:0").expect("bind"); let addr = server.local_addr().expect("local_addr"); - let runtime = $runtime; + let mut rt = $runtime; - let connector = ::hyper::client::HttpConnector::new_with_handle(1, runtime.reactor().clone()); + let connector = ::hyper::client::HttpConnector::new_with_handle(1, Handle::default()); let client = Client::builder() .set_host($set_host) .http1_title_case_headers($title_case_headers) - .executor(runtime.executor()) .build(connector); let body = if let Some(body) = $request_body { @@ -280,7 +274,7 @@ macro_rules! test { let rx = rx.expect("thread panicked"); - res.join(rx).map(|r| r.0).wait() + rt.block_on(res.join(rx).map(|r| r.0)) }); } @@ -690,7 +684,7 @@ mod dispatch_impl { use futures::sync::{mpsc, oneshot}; use futures_timer::Delay; use tokio::net::TcpStream; - use tokio::runtime::Runtime; + use tokio::runtime::current_thread::Runtime; use tokio_io::{AsyncRead, AsyncWrite}; use hyper::client::connect::{Connect, Connected, Destination, HttpConnector}; @@ -706,11 +700,10 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); + let mut rt = Runtime::new().unwrap(); let (closes_tx, closes) = mpsc::channel(10); let client = Client::builder() - .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, runtime.reactor().clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); let (tx1, rx1) = oneshot::channel(); @@ -736,9 +729,9 @@ mod dispatch_impl { .expect("timeout") }); let rx = rx1.expect("thread panicked"); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); - closes.into_future().wait().unwrap().0.expect("closes"); + rt.block_on(closes.into_future()).unwrap().0.expect("closes"); } #[test] @@ -748,8 +741,7 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); + let mut rt = Runtime::new().unwrap(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -768,8 +760,7 @@ mod dispatch_impl { let res = { let client = Client::builder() - .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -785,9 +776,9 @@ mod dispatch_impl { }; // client is dropped let rx = rx1.expect("thread panicked"); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); - closes.into_future().wait().unwrap().0.expect("closes"); + rt.block_on(closes.into_future()).unwrap().0.expect("closes"); } @@ -797,8 +788,7 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); + let mut rt = Runtime::new().unwrap(); let (closes_tx, mut closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -821,8 +811,7 @@ mod dispatch_impl { }); let client = Client::builder() - .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -833,14 +822,14 @@ mod dispatch_impl { res.into_body().concat2() }); let rx = rx1.expect("thread panicked"); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); // not closed yet, just idle { - futures::future::poll_fn(|| { + rt.block_on(futures::future::poll_fn(|| { assert!(closes.poll()?.is_not_ready()); Ok::<_, ()>(().into()) - }).wait().unwrap(); + })).unwrap(); } drop(client); @@ -851,7 +840,7 @@ mod dispatch_impl { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = t.select(close).wait(); + let _ = rt.block_on(t.select(close)); } @@ -861,8 +850,7 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); + let mut rt = Runtime::new().unwrap(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -885,8 +873,7 @@ mod dispatch_impl { let res = { let client = Client::builder() - .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -895,7 +882,7 @@ mod dispatch_impl { client.request(req) }; - res.select2(rx1).wait().unwrap(); + rt.block_on(res.select2(rx1)).unwrap(); // res now dropped let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); @@ -904,7 +891,7 @@ mod dispatch_impl { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = t.select(close).wait(); + let _ = rt.block_on(t.select(close)); } #[test] @@ -913,8 +900,7 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); + let mut rt = Runtime::new().unwrap(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -936,8 +922,7 @@ mod dispatch_impl { let res = { let client = Client::builder() - .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -948,7 +933,7 @@ mod dispatch_impl { }; let rx = rx1.expect("thread panicked"); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); @@ -957,7 +942,7 @@ mod dispatch_impl { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = t.select(close).wait(); + let _ = rt.block_on(t.select(close)); } #[test] @@ -967,8 +952,7 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); + let mut rt = Runtime::new().unwrap(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -987,8 +971,7 @@ mod dispatch_impl { let client = Client::builder() .keep_alive(false) - .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -999,7 +982,7 @@ mod dispatch_impl { res.into_body().concat2() }); let rx = rx1.expect("thread panicked"); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); @@ -1008,7 +991,7 @@ mod dispatch_impl { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = t.select(close).wait(); + let _ = rt.block_on(t.select(close)); } #[test] @@ -1018,8 +1001,7 @@ mod dispatch_impl { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); + let mut rt = Runtime::new().unwrap(); let (closes_tx, closes) = mpsc::channel(10); let (tx1, rx1) = oneshot::channel(); @@ -1035,8 +1017,7 @@ mod dispatch_impl { }); let client = Client::builder() - .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -1047,7 +1028,7 @@ mod dispatch_impl { res.into_body().concat2() }); let rx = rx1.expect("thread panicked"); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); let t = Delay::new(Duration::from_millis(100)) .map(|_| panic!("time out")); @@ -1056,7 +1037,7 @@ mod dispatch_impl { opt.expect("closes"); }) .map_err(|_| panic!("closes dropped")); - let _ = t.select(close).wait(); + let _ = rt.block_on(t.select(close)); } #[test] @@ -1065,13 +1046,11 @@ mod dispatch_impl { // idle connections that the Checkout would have found let _ = pretty_env_logger::try_init(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); - let connector = DebugConnector::new(&handle); + let _rt = Runtime::new().unwrap(); + let connector = DebugConnector::new(); let connects = connector.connects.clone(); let client = Client::builder() - .executor(runtime.executor()) .build(connector); assert_eq!(connects.load(Ordering::Relaxed), 0); @@ -1090,12 +1069,11 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let connector = DebugConnector::new(runtime.reactor()); + let mut rt = Runtime::new().unwrap(); + let connector = DebugConnector::new(); let connects = connector.connects.clone(); let client = Client::builder() - .executor(runtime.executor()) .build(connector); let (tx1, rx1) = oneshot::channel(); @@ -1127,7 +1105,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); assert_eq!(connects.load(Ordering::SeqCst), 1); @@ -1141,12 +1119,10 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); assert_eq!(connects.load(Ordering::SeqCst), 1, "second request should still only have 1 connect"); drop(client); - - runtime.shutdown_on_idle().wait().expect("rt shutdown"); } #[test] @@ -1154,14 +1130,12 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); + let mut rt = Runtime::new().unwrap(); - let connector = DebugConnector::new(&handle); + let connector = DebugConnector::new(); let connects = connector.connects.clone(); let client = Client::builder() - .executor(runtime.executor()) .build(connector); let (tx1, rx1) = oneshot::channel(); @@ -1196,7 +1170,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); assert_eq!(connects.load(Ordering::Relaxed), 1); @@ -1206,7 +1180,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); assert_eq!(connects.load(Ordering::Relaxed), 2); } @@ -1216,13 +1190,11 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); - let connector = DebugConnector::new(&handle) + let mut rt = Runtime::new().unwrap(); + let connector = DebugConnector::new() .proxy(); let client = Client::builder() - .executor(runtime.executor()) .build(connector); let (tx1, rx1) = oneshot::channel(); @@ -1247,7 +1219,7 @@ mod dispatch_impl { .body(Body::empty()) .unwrap(); let res = client.request(req); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); } #[test] @@ -1256,13 +1228,11 @@ mod dispatch_impl { let _ = pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let runtime = Runtime::new().unwrap(); - let handle = runtime.reactor(); + let mut rt = Runtime::new().unwrap(); - let connector = DebugConnector::new(&handle); + let connector = DebugConnector::new(); let client = Client::builder() - .executor(runtime.executor()) .build(connector); let (tx1, rx1) = oneshot::channel(); @@ -1294,21 +1264,20 @@ mod dispatch_impl { .unwrap(); let res = client.request(req); - let res = res.join(rx).map(|r| r.0).wait().unwrap(); + let res = rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); assert_eq!(res.status(), 101); - let upgraded = res + let upgraded = rt.block_on(res .into_body() - .on_upgrade() - .wait() + .on_upgrade()) .expect("on_upgrade"); let parts = upgraded.downcast::().unwrap(); assert_eq!(s(&parts.read_buf), "foobar=ready"); let io = parts.io; - let io = write_all(io, b"foo=bar").wait().unwrap().0; - let vec = read_to_end(io, vec![]).wait().unwrap().1; + let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; + let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; assert_eq!(vec, b"bar=foo"); } @@ -1321,8 +1290,8 @@ mod dispatch_impl { } impl DebugConnector { - fn new(handle: &Handle) -> DebugConnector { - let http = HttpConnector::new_with_handle(1, handle.clone()); + fn new() -> DebugConnector { + let http = HttpConnector::new(1); let (tx, _) = mpsc::channel(10); DebugConnector::with_http_and_closes(http, tx) } @@ -1404,7 +1373,7 @@ mod conn { use futures::future::poll_fn; use futures::sync::oneshot; use futures_timer::Delay; - use tokio::runtime::Runtime; + use tokio::runtime::current_thread::Runtime; use tokio::net::TcpStream; use tokio_io::{AsyncRead, AsyncWrite}; @@ -1417,7 +1386,7 @@ mod conn { fn get() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut runtime = Runtime::new().unwrap(); + let mut rt = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1438,11 +1407,11 @@ mod conn { let _ = tx1.send(()); }); - let tcp = tcp_connect(&addr).wait().unwrap(); + let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); + let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); let req = Request::builder() .uri("/a") @@ -1456,7 +1425,7 @@ mod conn { let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.expect("timeout")); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); } #[test] @@ -1465,7 +1434,7 @@ mod conn { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut runtime = Runtime::new().unwrap(); + let mut rt = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1483,11 +1452,11 @@ mod conn { let _ = tx1.send(()); }); - let tcp = tcp_connect(&addr).wait().unwrap(); + let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); + let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); let req = Request::builder() .uri("/") @@ -1513,7 +1482,7 @@ mod conn { let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.expect("timeout")); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); } #[test] @@ -1521,7 +1490,7 @@ mod conn { let _ = ::pretty_env_logger::try_init(); let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut runtime = Runtime::new().unwrap(); + let mut rt = Runtime::new().unwrap(); let (tx, rx) = oneshot::channel(); let server = thread::spawn(move || { @@ -1538,11 +1507,11 @@ mod conn { assert_eq!(sock.read(&mut buf).expect("read 2"), 0); }); - let tcp = tcp_connect(&addr).wait().unwrap(); + let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); + let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); let (mut sender, body) = Body::channel(); let sender = thread::spawn(move || { @@ -1557,7 +1526,7 @@ mod conn { .body(body) .unwrap(); let res = client.send_request(req); - res.wait().unwrap_err(); + rt.block_on(res).unwrap_err(); server.join().expect("server thread panicked"); sender.join().expect("sender thread panicked"); @@ -1567,7 +1536,7 @@ mod conn { fn uri_absolute_form() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut runtime = Runtime::new().unwrap(); + let mut rt = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1587,11 +1556,11 @@ mod conn { let _ = tx1.send(()); }); - let tcp = tcp_connect(&addr).wait().unwrap(); + let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); + let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); let req = Request::builder() .uri("http://hyper.local/a") @@ -1606,14 +1575,14 @@ mod conn { let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.expect("timeout")); - res.join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res.join(rx).map(|r| r.0)).unwrap(); } #[test] fn pipeline() { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let mut runtime = Runtime::new().unwrap(); + let mut rt = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1628,11 +1597,11 @@ mod conn { let _ = tx1.send(()); }); - let tcp = tcp_connect(&addr).wait().unwrap(); + let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); - let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); + let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap(); - runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); + rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); let req = Request::builder() .uri("/a") @@ -1659,7 +1628,7 @@ mod conn { let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.expect("timeout")); - res1.join(res2).join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(res1.join(res2).join(rx).map(|r| r.0)).unwrap(); } #[test] @@ -1669,7 +1638,7 @@ mod conn { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let _runtime = Runtime::new().unwrap(); + let mut rt = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1692,14 +1661,14 @@ mod conn { sock.write_all(b"bar=foo").expect("write 2"); }); - let tcp = tcp_connect(&addr).wait().unwrap(); + let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); let io = DebugStream { tcp: tcp, shutdown_called: false, }; - let (mut client, mut conn) = conn::handshake(io).wait().unwrap(); + let (mut client, mut conn) = rt.block_on(conn::handshake(io)).unwrap(); { let until_upgrade = poll_fn(|| { @@ -1720,13 +1689,13 @@ mod conn { let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.expect("timeout")); - until_upgrade.join(res).join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(until_upgrade.join(res).join(rx).map(|r| r.0)).unwrap(); // should not be ready now - poll_fn(|| { + rt.block_on(poll_fn(|| { assert!(client.poll_ready().unwrap().is_not_ready()); Ok::<_, ()>(Async::Ready(())) - }).wait().unwrap(); + })).unwrap(); } let parts = conn.into_parts(); @@ -1737,8 +1706,8 @@ mod conn { assert!(!io.shutdown_called, "upgrade shouldn't shutdown AsyncWrite"); assert!(client.poll_ready().is_err()); - let io = write_all(io, b"foo=bar").wait().unwrap().0; - let vec = read_to_end(io, vec![]).wait().unwrap().1; + let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; + let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; assert_eq!(vec, b"bar=foo"); } @@ -1749,7 +1718,7 @@ mod conn { let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); - let _runtime = Runtime::new().unwrap(); + let mut rt = Runtime::new().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1771,14 +1740,14 @@ mod conn { sock.write_all(b"bar=foo").expect("write 2"); }); - let tcp = tcp_connect(&addr).wait().unwrap(); + let tcp = rt.block_on(tcp_connect(&addr)).unwrap(); let io = DebugStream { tcp: tcp, shutdown_called: false, }; - let (mut client, mut conn) = conn::handshake(io).wait().unwrap(); + let (mut client, mut conn) = rt.block_on(conn::handshake(io)).unwrap(); { let until_tunneled = poll_fn(|| { @@ -1803,13 +1772,13 @@ mod conn { let timeout = Delay::new(Duration::from_millis(200)); let rx = rx.and_then(move |_| timeout.expect("timeout")); - until_tunneled.join(res).join(rx).map(|r| r.0).wait().unwrap(); + rt.block_on(until_tunneled.join(res).join(rx).map(|r| r.0)).unwrap(); // should not be ready now - poll_fn(|| { + rt.block_on(poll_fn(|| { assert!(client.poll_ready().unwrap().is_not_ready()); Ok::<_, ()>(Async::Ready(())) - }).wait().unwrap(); + })).unwrap(); } let parts = conn.into_parts(); @@ -1820,8 +1789,8 @@ mod conn { assert!(!io.shutdown_called, "tunnel shouldn't shutdown AsyncWrite"); assert!(client.poll_ready().is_err()); - let io = write_all(io, b"foo=bar").wait().unwrap().0; - let vec = read_to_end(io, vec![]).wait().unwrap().1; + let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; + let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; assert_eq!(vec, b"bar=foo"); } diff --git a/tests/server.rs b/tests/server.rs index 71f66ec0..2dcb361f 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -25,7 +25,7 @@ use futures::sync::oneshot; use futures_timer::Delay; use http::header::{HeaderName, HeaderValue}; use tokio::net::{TcpListener, TcpStream as TkTcpStream}; -use tokio::runtime::Runtime; +use tokio::runtime::current_thread::Runtime; use tokio::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; @@ -35,9 +35,9 @@ use hyper::client::Client; use hyper::server::conn::Http; use hyper::service::{service_fn, service_fn_ok, Service}; -fn tcp_bind(addr: &SocketAddr, handle: &Handle) -> ::tokio::io::Result { +fn tcp_bind(addr: &SocketAddr) -> ::tokio::io::Result { let std_listener = StdTcpListener::bind(addr).unwrap(); - TcpListener::from_std(std_listener, handle) + TcpListener::from_std(std_listener, &Handle::default()) } #[test] @@ -45,7 +45,9 @@ fn try_h2() { let server = serve(); let addr_str = format!("http://{}", server.addr()); - hyper::rt::run(hyper::rt::lazy(move || { + let mut rt = Runtime::new().expect("runtime new"); + + rt.block_on(hyper::rt::lazy(move || { let client: Client<_, hyper::Body> = Client::builder().http2_only(true).build_http(); let uri = addr_str.parse::().expect("server addr should parse"); @@ -53,7 +55,7 @@ fn try_h2() { .and_then(|_res| { Ok(()) }) .map(|_| { () }) .map_err(|_e| { () }) - })); + })).unwrap(); assert_eq!(server.body(), b""); } @@ -96,8 +98,8 @@ fn get_with_body() { #[test] fn get_implicitly_empty() { // See https://github.com/hyperium/hyper/issues/1373 - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -124,7 +126,7 @@ fn get_implicitly_empty() { })) }); - fut.wait().unwrap(); + rt.block_on(fut).unwrap(); } mod response_body_lengths { @@ -945,8 +947,8 @@ fn http_10_request_receives_http_10_response() { #[test] fn disable_keep_alive_mid_request() { - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -983,15 +985,15 @@ fn disable_keep_alive_mid_request() { }) }); - fut.wait().unwrap(); + rt.block_on(fut).unwrap(); child.join().unwrap(); } #[test] fn disable_keep_alive_post_request() { let _ = pretty_env_logger::try_init(); - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx1, rx1) = oneshot::channel(); @@ -1048,21 +1050,21 @@ fn disable_keep_alive_post_request() { }); assert!(!dropped.load()); - fut.wait().unwrap(); + rt.block_on(fut).unwrap(); // we must poll the Core one more time in order for Windows to drop // the read-blocked socket. // // See https://github.com/carllerche/mio/issues/776 let timeout = Delay::new(Duration::from_millis(10)); - timeout.wait().unwrap(); + rt.block_on(timeout).unwrap(); assert!(dropped.load()); child.join().unwrap(); } #[test] fn empty_parse_eof_does_not_return_error() { - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1077,13 +1079,13 @@ fn empty_parse_eof_does_not_return_error() { Http::new().serve_connection(socket, HelloWorld) }); - fut.wait().unwrap(); + rt.block_on(fut).unwrap(); } #[test] fn nonempty_parse_eof_returns_error() { - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1099,13 +1101,13 @@ fn nonempty_parse_eof_returns_error() { Http::new().serve_connection(socket, HelloWorld) }); - fut.wait().unwrap_err(); + rt.block_on(fut).unwrap_err(); } #[test] fn returning_1xx_response_is_error() { - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1132,15 +1134,15 @@ fn returning_1xx_response_is_error() { })) }); - fut.wait().unwrap_err(); + rt.block_on(fut).unwrap_err(); } #[test] fn upgrades() { use tokio_io::io::{read_to_end, write_all}; let _ = pretty_env_logger::try_init(); - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx, rx) = oneshot::channel(); @@ -1188,17 +1190,17 @@ fn upgrades() { }) }); - let conn = fut.wait().unwrap(); + let conn = rt.block_on(fut).unwrap(); // wait so that we don't write until other side saw 101 response - rx.wait().unwrap(); + rt.block_on(rx).unwrap(); let parts = conn.into_parts(); let io = parts.io; assert_eq!(parts.read_buf, "eagerly optimistic"); - let io = write_all(io, b"foo=bar").wait().unwrap().0; - let vec = read_to_end(io, vec![]).wait().unwrap().1; + let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; + let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; assert_eq!(vec, b"bar=foo"); } @@ -1206,8 +1208,8 @@ fn upgrades() { fn http_connect() { use tokio_io::io::{read_to_end, write_all}; let _ = pretty_env_logger::try_init(); - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx, rx) = oneshot::channel(); @@ -1252,17 +1254,17 @@ fn http_connect() { }) }); - let conn = fut.wait().unwrap(); + let conn = rt.block_on(fut).unwrap(); // wait so that we don't write until other side saw 101 response - rx.wait().unwrap(); + rt.block_on(rx).unwrap(); let parts = conn.into_parts(); let io = parts.io; assert_eq!(parts.read_buf, "eagerly optimistic"); - let io = write_all(io, b"foo=bar").wait().unwrap().0; - let vec = read_to_end(io, vec![]).wait().unwrap().1; + let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0; + let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1; assert_eq!(vec, b"bar=foo"); } @@ -1271,7 +1273,7 @@ fn upgrades_new() { use tokio_io::io::{read_to_end, write_all}; let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &rt.reactor()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (read_101_tx, read_101_rx) = oneshot::channel(); @@ -1340,7 +1342,7 @@ fn http_connect_new() { use tokio_io::io::{read_to_end, write_all}; let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &rt.reactor()).unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (read_200_tx, read_200_rx) = oneshot::channel(); @@ -1404,8 +1406,8 @@ fn http_connect_new() { #[test] fn parse_errors_send_4xx_response() { - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1427,13 +1429,13 @@ fn parse_errors_send_4xx_response() { .serve_connection(socket, HelloWorld) }); - fut.wait().unwrap_err(); + rt.block_on(fut).unwrap_err(); } #[test] fn illegal_request_length_returns_400_response() { - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); thread::spawn(move || { @@ -1455,7 +1457,7 @@ fn illegal_request_length_returns_400_response() { .serve_connection(socket, HelloWorld) }); - fut.wait().unwrap_err(); + rt.block_on(fut).unwrap_err(); } #[test] @@ -1473,8 +1475,8 @@ fn max_buf_size_no_panic() { #[test] fn max_buf_size() { let _ = pretty_env_logger::try_init(); - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); const MAX: usize = 16_000; @@ -1500,14 +1502,14 @@ fn max_buf_size() { .serve_connection(socket, HelloWorld) }); - fut.wait().unwrap_err(); + rt.block_on(fut).unwrap_err(); } #[test] fn streaming_body() { let _ = pretty_env_logger::try_init(); - let runtime = Runtime::new().unwrap(); - let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); + let mut rt = Runtime::new().unwrap(); + let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); let addr = listener.local_addr().unwrap(); let (tx, rx) = oneshot::channel(); @@ -1549,7 +1551,7 @@ fn streaming_body() { })) }); - fut.join(rx).wait().unwrap(); + rt.block_on(fut.join(rx)).unwrap(); } // ------------------------------------------------- diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 683299e6..be60a959 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -6,7 +6,7 @@ pub use std::net::SocketAddr; pub use self::futures::{future, Future, Stream}; pub use self::futures::sync::oneshot; pub use self::hyper::{HeaderMap, StatusCode}; -pub use self::tokio::runtime::Runtime; +pub use self::tokio::runtime::current_thread::Runtime; macro_rules! t { ( @@ -194,8 +194,7 @@ pub fn __run_test(cfg: __TestConfig) { use std::sync::{Arc, Mutex}; use std::time::Duration; let _ = pretty_env_logger::try_init(); - let rt = Runtime::new().expect("new rt"); - let handle = rt.reactor().clone(); + let mut rt = Runtime::new().expect("new rt"); assert_eq!(cfg.client_version, cfg.server_version); @@ -205,11 +204,10 @@ pub fn __run_test(cfg: __TestConfig) { Version::HTTP_11 }; - let connector = HttpConnector::new_with_handle(1, handle.clone()); + let connector = HttpConnector::new(1); let client = Client::builder() .keep_alive_timeout(Duration::from_secs(10)) .http2_only(cfg.client_version == 2) - .executor(rt.executor()) .build::<_, Body>(connector); let serve_handles = Arc::new(Mutex::new( @@ -250,16 +248,13 @@ pub fn __run_test(cfg: __TestConfig) { let serve = hyper::server::conn::Http::new() .http2_only(cfg.server_version == 2) - .executor(rt.executor()) - .serve_addr_handle( + .serve_addr( &SocketAddr::from(([127, 0, 0, 1], 0)), - &handle, new_service, ) - .expect("serve_addr_handle"); + .expect("serve_addr"); let addr = serve.incoming_ref().local_addr(); - let exe = rt.executor(); let (shutdown_tx, shutdown_rx) = oneshot::channel(); let (success_tx, success_rx) = oneshot::channel(); let expected_connections = cfg.connections; @@ -269,7 +264,7 @@ pub fn __run_test(cfg: __TestConfig) { .map_err(|never| -> hyper::Error { match never {} }) .flatten() .map_err(|e| panic!("server connection error: {}", e)); - exe.spawn(fut); + ::tokio::spawn(fut); Ok::<_, hyper::Error>(cnt + 1) }) .map(move |cnt| { @@ -282,7 +277,7 @@ pub fn __run_test(cfg: __TestConfig) { }) .map_err(|_| panic!("shutdown not ok")); - rt.executor().spawn(server); + rt.spawn(server); let make_request = Arc::new(move |client: &Client, creq: __CReq, cres: __CRes| { @@ -343,11 +338,9 @@ pub fn __run_test(cfg: __TestConfig) { let client_futures = client_futures.map(move |_| { let _ = shutdown_tx.send(()); }); - rt.executor().spawn(client_futures); - rt.shutdown_on_idle().wait().expect("rt"); - success_rx - .map_err(|_| "something panicked") - .wait() + rt.spawn(client_futures); + rt.block_on(success_rx + .map_err(|_| "something panicked")) .expect("shutdown succeeded"); }