chore(tests): change tests to use current_thread runtime

This commit is contained in:
Sean McArthur
2018-06-18 12:30:46 -07:00
parent 9a28268b98
commit e4ebf44823
6 changed files with 200 additions and 237 deletions

View File

@@ -9,7 +9,7 @@ extern crate tokio;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures::{Future, Stream}; use futures::{Future, Stream};
use tokio::runtime::Runtime; use tokio::runtime::current_thread::Runtime;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use hyper::{Body, Method, Request, Response}; use hyper::{Body, Method, Request, Response};
@@ -22,22 +22,21 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt); let addr = spawn_hello(&mut rt);
let connector = HttpConnector::new_with_handle(1, rt.reactor().clone()); let connector = HttpConnector::new(1);
let client = hyper::Client::builder() let client = hyper::Client::builder()
.executor(rt.executor())
.build::<_, Body>(connector); .build::<_, Body>(connector);
let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap(); let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();
b.bytes = 160 * 2 + PHRASE.len() as u64; b.bytes = 160 * 2 + PHRASE.len() as u64;
b.iter(move || { b.iter(move || {
client.get(url.clone()) rt.block_on(client.get(url.clone())
.and_then(|res| { .and_then(|res| {
res.into_body().for_each(|_chunk| { res.into_body().for_each(|_chunk| {
Ok(()) Ok(())
}) })
}) }))
.wait().expect("client wait"); .expect("client wait");
}); });
} }
@@ -46,9 +45,8 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt); let addr = spawn_hello(&mut rt);
let connector = HttpConnector::new_with_handle(1, rt.reactor().clone()); let connector = HttpConnector::new(1);
let client = hyper::Client::builder() let client = hyper::Client::builder()
.executor(rt.executor())
.build::<_, Body>(connector); .build::<_, Body>(connector);
let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap(); let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();
@@ -59,12 +57,11 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let mut req = Request::new(post.into()); let mut req = Request::new(post.into());
*req.method_mut() = Method::POST; *req.method_mut() = Method::POST;
*req.uri_mut() = url.clone(); *req.uri_mut() = url.clone();
client.request(req).and_then(|res| { rt.block_on(client.request(req).and_then(|res| {
res.into_body().for_each(|_chunk| { res.into_body().for_each(|_chunk| {
Ok(()) Ok(())
}) })
}).wait().expect("client wait"); })).expect("client wait");
}); });
} }

View File

@@ -805,20 +805,29 @@ mod tests {
#[cfg(feature = "runtime")] #[cfg(feature = "runtime")]
#[test] #[test]
fn test_pool_timer_removes_expired() { fn test_pool_timer_removes_expired() {
use std::sync::Arc; use std::time::Instant;
let runtime = ::tokio::runtime::Runtime::new().unwrap(); use tokio_timer::Delay;
let executor = runtime.executor(); let mut rt = ::tokio::runtime::current_thread::Runtime::new().unwrap();
let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Executor(Arc::new(executor))); let pool = Pool::new(true, Some(Duration::from_millis(100)), &Exec::Default);
let key = (Arc::new("foo".to_string()), Ver::Http1); let key = (Arc::new("foo".to_string()), Ver::Http1);
pool.pooled(c(key.clone()), Uniq(41)); // Since pool.pooled() will be calling spawn on drop, need to be sure
pool.pooled(c(key.clone()), Uniq(5)); // those drops are called while `rt` is the current executor. To do so,
pool.pooled(c(key.clone()), Uniq(99)); // call those inside a future.
rt.block_on(::futures::future::lazy(|| {
pool.pooled(c(key.clone()), Uniq(41));
pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99));
Ok::<_, ()>(())
})).unwrap();
assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3)); assert_eq!(pool.inner.connections.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
::std::thread::sleep(Duration::from_millis(400)); // allow for too-good resolution // Let the timer tick passed the expiration...
rt
.block_on(Delay::new(Instant::now() + Duration::from_millis(200)))
.expect("rt block_on 200ms");
assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none()); assert!(pool.inner.connections.lock().unwrap().idle.get(&key).is_none());
} }

View File

@@ -1,12 +1,9 @@
#![cfg(feature = "runtime")] #![cfg(feature = "runtime")]
extern crate pretty_env_logger; extern crate pretty_env_logger;
use std::thread;
use std::time::Duration;
use futures::Async; use futures::Async;
use futures::future::poll_fn; use futures::future::poll_fn;
use tokio::executor::thread_pool::{Builder as ThreadPoolBuilder}; use tokio::runtime::current_thread::Runtime;
use mock::MockConnector; use mock::MockConnector;
use super::*; use super::*;
@@ -15,14 +12,13 @@ use super::*;
fn retryable_request() { fn retryable_request() {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let executor = ThreadPoolBuilder::new().pool_size(1).build(); let mut rt = Runtime::new().expect("new rt");
let mut connector = MockConnector::new(); let mut connector = MockConnector::new();
let sock1 = connector.mock("http://mock.local"); let sock1 = connector.mock("http://mock.local");
let sock2 = connector.mock("http://mock.local"); let sock2 = connector.mock("http://mock.local");
let client = Client::builder() let client = Client::builder()
.executor(executor.sender().clone())
.build::<_, ::Body>(connector); .build::<_, ::Body>(connector);
client.pool.no_timer(); client.pool.no_timer();
@@ -39,7 +35,7 @@ fn retryable_request() {
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(())) Ok(Async::Ready(()))
}).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e));
res1.join(srv1).wait().expect("res1"); rt.block_on(res1.join(srv1)).expect("res1");
} }
drop(sock1); drop(sock1);
@@ -57,20 +53,19 @@ fn retryable_request() {
Ok(Async::Ready(())) Ok(Async::Ready(()))
}).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e)); }).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e));
res2.join(srv2).wait().expect("res2"); rt.block_on(res2.join(srv2)).expect("res2");
} }
#[test] #[test]
fn conn_reset_after_write() { fn conn_reset_after_write() {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let executor = ThreadPoolBuilder::new().pool_size(1).build(); let mut rt = Runtime::new().expect("new rt");
let mut connector = MockConnector::new(); let mut connector = MockConnector::new();
let sock1 = connector.mock("http://mock.local"); let sock1 = connector.mock("http://mock.local");
let client = Client::builder() let client = Client::builder()
.executor(executor.sender().clone())
.build::<_, ::Body>(connector); .build::<_, ::Body>(connector);
client.pool.no_timer(); client.pool.no_timer();
@@ -88,12 +83,9 @@ fn conn_reset_after_write() {
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(())) Ok(Async::Ready(()))
}).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e));
res1.join(srv1).wait().expect("res1"); rt.block_on(res1.join(srv1)).expect("res1");
} }
// sleep to allow some time for the connection to return to the pool
thread::sleep(Duration::from_millis(10));
let req = Request::builder() let req = Request::builder()
.uri("http://mock.local/a") .uri("http://mock.local/a")
.body(Default::default()) .body(Default::default())
@@ -111,9 +103,10 @@ fn conn_reset_after_write() {
sock1.take(); sock1.take();
Ok(Async::Ready(())) Ok(Async::Ready(()))
}).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e)); }).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e));
let err = res2.join(srv2).wait().expect_err("res2"); let err = rt.block_on(res2.join(srv2)).expect_err("res2");
match err.kind() { match err.kind() {
&::error::Kind::Incomplete => (), &::error::Kind::Incomplete => (),
other => panic!("expected Incomplete, found {:?}", other) other => panic!("expected Incomplete, found {:?}", other)
} }
} }

View File

@@ -18,7 +18,7 @@ use hyper::{Body, Client, Method, Request, StatusCode};
use futures::{Future, Stream}; use futures::{Future, Stream};
use futures::sync::oneshot; use futures::sync::oneshot;
use tokio::reactor::Handle; use tokio::reactor::Handle;
use tokio::runtime::Runtime; use tokio::runtime::current_thread::Runtime;
use tokio::net::{ConnectFuture, TcpStream}; use tokio::net::{ConnectFuture, TcpStream};
fn s(buf: &[u8]) -> &str { fn s(buf: &[u8]) -> &str {
@@ -126,12 +126,12 @@ macro_rules! test {
#[test] #[test]
fn $name() { fn $name() {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let runtime = Runtime::new().expect("runtime new"); let mut rt = Runtime::new().expect("runtime new");
let res = test! { let res = test! {
INNER; INNER;
name: $name, name: $name,
runtime: &runtime, runtime: &mut rt,
server: server:
expected: $server_expected, expected: $server_expected,
reply: $server_reply, reply: $server_reply,
@@ -151,17 +151,14 @@ macro_rules! test {
assert_eq!(res.headers()[$response_header_name], $response_header_val); assert_eq!(res.headers()[$response_header_name], $response_header_val);
)* )*
let body = res let body = rt.block_on(res
.into_body() .into_body()
.concat2() .concat2())
.wait()
.expect("body concat wait"); .expect("body concat wait");
let expected_res_body = Option::<&[u8]>::from($response_body) let expected_res_body = Option::<&[u8]>::from($response_body)
.unwrap_or_default(); .unwrap_or_default();
assert_eq!(body.as_ref(), expected_res_body); assert_eq!(body.as_ref(), expected_res_body);
runtime.shutdown_on_idle().wait().expect("rt shutdown");
} }
); );
( (
@@ -181,12 +178,12 @@ macro_rules! test {
#[test] #[test]
fn $name() { fn $name() {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let runtime = Runtime::new().expect("runtime new"); let mut rt = Runtime::new().expect("runtime new");
let err: ::hyper::Error = test! { let err: ::hyper::Error = test! {
INNER; INNER;
name: $name, name: $name,
runtime: &runtime, runtime: &mut rt,
server: server:
expected: $server_expected, expected: $server_expected,
reply: $server_reply, reply: $server_reply,
@@ -206,8 +203,6 @@ macro_rules! test {
if !closure(&err) { if !closure(&err) {
panic!("expected error, unexpected variant: {:?}", err); panic!("expected error, unexpected variant: {:?}", err);
} }
runtime.shutdown_on_idle().wait().expect("rt shutdown");
} }
); );
@@ -229,13 +224,12 @@ macro_rules! test {
) => ({ ) => ({
let server = TcpListener::bind("127.0.0.1:0").expect("bind"); let server = TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = server.local_addr().expect("local_addr"); let addr = server.local_addr().expect("local_addr");
let runtime = $runtime; let mut rt = $runtime;
let connector = ::hyper::client::HttpConnector::new_with_handle(1, runtime.reactor().clone()); let connector = ::hyper::client::HttpConnector::new_with_handle(1, Handle::default());
let client = Client::builder() let client = Client::builder()
.set_host($set_host) .set_host($set_host)
.http1_title_case_headers($title_case_headers) .http1_title_case_headers($title_case_headers)
.executor(runtime.executor())
.build(connector); .build(connector);
let body = if let Some(body) = $request_body { let body = if let Some(body) = $request_body {
@@ -280,7 +274,7 @@ macro_rules! test {
let rx = rx.expect("thread panicked"); let rx = rx.expect("thread panicked");
res.join(rx).map(|r| r.0).wait() rt.block_on(res.join(rx).map(|r| r.0))
}); });
} }
@@ -690,7 +684,7 @@ mod dispatch_impl {
use futures::sync::{mpsc, oneshot}; use futures::sync::{mpsc, oneshot};
use futures_timer::Delay; use futures_timer::Delay;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::runtime::Runtime; use tokio::runtime::current_thread::Runtime;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use hyper::client::connect::{Connect, Connected, Destination, HttpConnector}; use hyper::client::connect::{Connect, Connected, Destination, HttpConnector};
@@ -706,11 +700,10 @@ mod dispatch_impl {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let (closes_tx, closes) = mpsc::channel(10); let (closes_tx, closes) = mpsc::channel(10);
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor()) .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, runtime.reactor().clone()), closes_tx));
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -736,9 +729,9 @@ mod dispatch_impl {
.expect("timeout") .expect("timeout")
}); });
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
closes.into_future().wait().unwrap().0.expect("closes"); rt.block_on(closes.into_future()).unwrap().0.expect("closes");
} }
#[test] #[test]
@@ -748,8 +741,7 @@ mod dispatch_impl {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let handle = runtime.reactor();
let (closes_tx, closes) = mpsc::channel(10); let (closes_tx, closes) = mpsc::channel(10);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -768,8 +760,7 @@ mod dispatch_impl {
let res = { let res = {
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor()) .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
let req = Request::builder() let req = Request::builder()
.uri(&*format!("http://{}/a", addr)) .uri(&*format!("http://{}/a", addr))
@@ -785,9 +776,9 @@ mod dispatch_impl {
}; };
// client is dropped // client is dropped
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
closes.into_future().wait().unwrap().0.expect("closes"); rt.block_on(closes.into_future()).unwrap().0.expect("closes");
} }
@@ -797,8 +788,7 @@ mod dispatch_impl {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let handle = runtime.reactor();
let (closes_tx, mut closes) = mpsc::channel(10); let (closes_tx, mut closes) = mpsc::channel(10);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -821,8 +811,7 @@ mod dispatch_impl {
}); });
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor()) .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
let req = Request::builder() let req = Request::builder()
.uri(&*format!("http://{}/a", addr)) .uri(&*format!("http://{}/a", addr))
@@ -833,14 +822,14 @@ mod dispatch_impl {
res.into_body().concat2() res.into_body().concat2()
}); });
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
// not closed yet, just idle // not closed yet, just idle
{ {
futures::future::poll_fn(|| { rt.block_on(futures::future::poll_fn(|| {
assert!(closes.poll()?.is_not_ready()); assert!(closes.poll()?.is_not_ready());
Ok::<_, ()>(().into()) Ok::<_, ()>(().into())
}).wait().unwrap(); })).unwrap();
} }
drop(client); drop(client);
@@ -851,7 +840,7 @@ mod dispatch_impl {
opt.expect("closes"); opt.expect("closes");
}) })
.map_err(|_| panic!("closes dropped")); .map_err(|_| panic!("closes dropped"));
let _ = t.select(close).wait(); let _ = rt.block_on(t.select(close));
} }
@@ -861,8 +850,7 @@ mod dispatch_impl {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let handle = runtime.reactor();
let (closes_tx, closes) = mpsc::channel(10); let (closes_tx, closes) = mpsc::channel(10);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -885,8 +873,7 @@ mod dispatch_impl {
let res = { let res = {
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor()) .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
let req = Request::builder() let req = Request::builder()
.uri(&*format!("http://{}/a", addr)) .uri(&*format!("http://{}/a", addr))
@@ -895,7 +882,7 @@ mod dispatch_impl {
client.request(req) client.request(req)
}; };
res.select2(rx1).wait().unwrap(); rt.block_on(res.select2(rx1)).unwrap();
// res now dropped // res now dropped
let t = Delay::new(Duration::from_millis(100)) let t = Delay::new(Duration::from_millis(100))
.map(|_| panic!("time out")); .map(|_| panic!("time out"));
@@ -904,7 +891,7 @@ mod dispatch_impl {
opt.expect("closes"); opt.expect("closes");
}) })
.map_err(|_| panic!("closes dropped")); .map_err(|_| panic!("closes dropped"));
let _ = t.select(close).wait(); let _ = rt.block_on(t.select(close));
} }
#[test] #[test]
@@ -913,8 +900,7 @@ mod dispatch_impl {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let handle = runtime.reactor();
let (closes_tx, closes) = mpsc::channel(10); let (closes_tx, closes) = mpsc::channel(10);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -936,8 +922,7 @@ mod dispatch_impl {
let res = { let res = {
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor()) .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
let req = Request::builder() let req = Request::builder()
.uri(&*format!("http://{}/a", addr)) .uri(&*format!("http://{}/a", addr))
@@ -948,7 +933,7 @@ mod dispatch_impl {
}; };
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
let t = Delay::new(Duration::from_millis(100)) let t = Delay::new(Duration::from_millis(100))
.map(|_| panic!("time out")); .map(|_| panic!("time out"));
@@ -957,7 +942,7 @@ mod dispatch_impl {
opt.expect("closes"); opt.expect("closes");
}) })
.map_err(|_| panic!("closes dropped")); .map_err(|_| panic!("closes dropped"));
let _ = t.select(close).wait(); let _ = rt.block_on(t.select(close));
} }
#[test] #[test]
@@ -967,8 +952,7 @@ mod dispatch_impl {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let handle = runtime.reactor();
let (closes_tx, closes) = mpsc::channel(10); let (closes_tx, closes) = mpsc::channel(10);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -987,8 +971,7 @@ mod dispatch_impl {
let client = Client::builder() let client = Client::builder()
.keep_alive(false) .keep_alive(false)
.executor(runtime.executor()) .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
let req = Request::builder() let req = Request::builder()
.uri(&*format!("http://{}/a", addr)) .uri(&*format!("http://{}/a", addr))
@@ -999,7 +982,7 @@ mod dispatch_impl {
res.into_body().concat2() res.into_body().concat2()
}); });
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
let t = Delay::new(Duration::from_millis(100)) let t = Delay::new(Duration::from_millis(100))
.map(|_| panic!("time out")); .map(|_| panic!("time out"));
@@ -1008,7 +991,7 @@ mod dispatch_impl {
opt.expect("closes"); opt.expect("closes");
}) })
.map_err(|_| panic!("closes dropped")); .map_err(|_| panic!("closes dropped"));
let _ = t.select(close).wait(); let _ = rt.block_on(t.select(close));
} }
#[test] #[test]
@@ -1018,8 +1001,7 @@ mod dispatch_impl {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let handle = runtime.reactor();
let (closes_tx, closes) = mpsc::channel(10); let (closes_tx, closes) = mpsc::channel(10);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1035,8 +1017,7 @@ mod dispatch_impl {
}); });
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor()) .build(DebugConnector::with_http_and_closes(HttpConnector::new(1), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
let req = Request::builder() let req = Request::builder()
.uri(&*format!("http://{}/a", addr)) .uri(&*format!("http://{}/a", addr))
@@ -1047,7 +1028,7 @@ mod dispatch_impl {
res.into_body().concat2() res.into_body().concat2()
}); });
let rx = rx1.expect("thread panicked"); let rx = rx1.expect("thread panicked");
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
let t = Delay::new(Duration::from_millis(100)) let t = Delay::new(Duration::from_millis(100))
.map(|_| panic!("time out")); .map(|_| panic!("time out"));
@@ -1056,7 +1037,7 @@ mod dispatch_impl {
opt.expect("closes"); opt.expect("closes");
}) })
.map_err(|_| panic!("closes dropped")); .map_err(|_| panic!("closes dropped"));
let _ = t.select(close).wait(); let _ = rt.block_on(t.select(close));
} }
#[test] #[test]
@@ -1065,13 +1046,11 @@ mod dispatch_impl {
// idle connections that the Checkout would have found // idle connections that the Checkout would have found
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let runtime = Runtime::new().unwrap(); let _rt = Runtime::new().unwrap();
let handle = runtime.reactor(); let connector = DebugConnector::new();
let connector = DebugConnector::new(&handle);
let connects = connector.connects.clone(); let connects = connector.connects.clone();
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor())
.build(connector); .build(connector);
assert_eq!(connects.load(Ordering::Relaxed), 0); assert_eq!(connects.load(Ordering::Relaxed), 0);
@@ -1090,12 +1069,11 @@ mod dispatch_impl {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let connector = DebugConnector::new(runtime.reactor()); let connector = DebugConnector::new();
let connects = connector.connects.clone(); let connects = connector.connects.clone();
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor())
.build(connector); .build(connector);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1127,7 +1105,7 @@ mod dispatch_impl {
.body(Body::empty()) .body(Body::empty())
.unwrap(); .unwrap();
let res = client.request(req); let res = client.request(req);
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
assert_eq!(connects.load(Ordering::SeqCst), 1); assert_eq!(connects.load(Ordering::SeqCst), 1);
@@ -1141,12 +1119,10 @@ mod dispatch_impl {
.body(Body::empty()) .body(Body::empty())
.unwrap(); .unwrap();
let res = client.request(req); let res = client.request(req);
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
assert_eq!(connects.load(Ordering::SeqCst), 1, "second request should still only have 1 connect"); assert_eq!(connects.load(Ordering::SeqCst), 1, "second request should still only have 1 connect");
drop(client); drop(client);
runtime.shutdown_on_idle().wait().expect("rt shutdown");
} }
#[test] #[test]
@@ -1154,14 +1130,12 @@ mod dispatch_impl {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let handle = runtime.reactor();
let connector = DebugConnector::new(&handle); let connector = DebugConnector::new();
let connects = connector.connects.clone(); let connects = connector.connects.clone();
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor())
.build(connector); .build(connector);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1196,7 +1170,7 @@ mod dispatch_impl {
.body(Body::empty()) .body(Body::empty())
.unwrap(); .unwrap();
let res = client.request(req); let res = client.request(req);
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
assert_eq!(connects.load(Ordering::Relaxed), 1); assert_eq!(connects.load(Ordering::Relaxed), 1);
@@ -1206,7 +1180,7 @@ mod dispatch_impl {
.body(Body::empty()) .body(Body::empty())
.unwrap(); .unwrap();
let res = client.request(req); let res = client.request(req);
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
assert_eq!(connects.load(Ordering::Relaxed), 2); assert_eq!(connects.load(Ordering::Relaxed), 2);
} }
@@ -1216,13 +1190,11 @@ mod dispatch_impl {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let handle = runtime.reactor(); let connector = DebugConnector::new()
let connector = DebugConnector::new(&handle)
.proxy(); .proxy();
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor())
.build(connector); .build(connector);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1247,7 +1219,7 @@ mod dispatch_impl {
.body(Body::empty()) .body(Body::empty())
.unwrap(); .unwrap();
let res = client.request(req); let res = client.request(req);
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
} }
#[test] #[test]
@@ -1256,13 +1228,11 @@ mod dispatch_impl {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let handle = runtime.reactor();
let connector = DebugConnector::new(&handle); let connector = DebugConnector::new();
let client = Client::builder() let client = Client::builder()
.executor(runtime.executor())
.build(connector); .build(connector);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1294,21 +1264,20 @@ mod dispatch_impl {
.unwrap(); .unwrap();
let res = client.request(req); let res = client.request(req);
let res = res.join(rx).map(|r| r.0).wait().unwrap(); let res = rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
assert_eq!(res.status(), 101); assert_eq!(res.status(), 101);
let upgraded = res let upgraded = rt.block_on(res
.into_body() .into_body()
.on_upgrade() .on_upgrade())
.wait()
.expect("on_upgrade"); .expect("on_upgrade");
let parts = upgraded.downcast::<DebugStream>().unwrap(); let parts = upgraded.downcast::<DebugStream>().unwrap();
assert_eq!(s(&parts.read_buf), "foobar=ready"); assert_eq!(s(&parts.read_buf), "foobar=ready");
let io = parts.io; let io = parts.io;
let io = write_all(io, b"foo=bar").wait().unwrap().0; let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0;
let vec = read_to_end(io, vec![]).wait().unwrap().1; let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1;
assert_eq!(vec, b"bar=foo"); assert_eq!(vec, b"bar=foo");
} }
@@ -1321,8 +1290,8 @@ mod dispatch_impl {
} }
impl DebugConnector { impl DebugConnector {
fn new(handle: &Handle) -> DebugConnector { fn new() -> DebugConnector {
let http = HttpConnector::new_with_handle(1, handle.clone()); let http = HttpConnector::new(1);
let (tx, _) = mpsc::channel(10); let (tx, _) = mpsc::channel(10);
DebugConnector::with_http_and_closes(http, tx) DebugConnector::with_http_and_closes(http, tx)
} }
@@ -1404,7 +1373,7 @@ mod conn {
use futures::future::poll_fn; use futures::future::poll_fn;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures_timer::Delay; use futures_timer::Delay;
use tokio::runtime::Runtime; use tokio::runtime::current_thread::Runtime;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@@ -1417,7 +1386,7 @@ mod conn {
fn get() { fn get() {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let mut runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1438,11 +1407,11 @@ mod conn {
let _ = tx1.send(()); let _ = tx1.send(());
}); });
let tcp = tcp_connect(&addr).wait().unwrap(); let tcp = rt.block_on(tcp_connect(&addr)).unwrap();
let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap();
runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e)));
let req = Request::builder() let req = Request::builder()
.uri("/a") .uri("/a")
@@ -1456,7 +1425,7 @@ mod conn {
let timeout = Delay::new(Duration::from_millis(200)); let timeout = Delay::new(Duration::from_millis(200));
let rx = rx.and_then(move |_| timeout.expect("timeout")); let rx = rx.and_then(move |_| timeout.expect("timeout"));
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
} }
#[test] #[test]
@@ -1465,7 +1434,7 @@ mod conn {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let mut runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1483,11 +1452,11 @@ mod conn {
let _ = tx1.send(()); let _ = tx1.send(());
}); });
let tcp = tcp_connect(&addr).wait().unwrap(); let tcp = rt.block_on(tcp_connect(&addr)).unwrap();
let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap();
runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e)));
let req = Request::builder() let req = Request::builder()
.uri("/") .uri("/")
@@ -1513,7 +1482,7 @@ mod conn {
let timeout = Delay::new(Duration::from_millis(200)); let timeout = Delay::new(Duration::from_millis(200));
let rx = rx.and_then(move |_| timeout.expect("timeout")); let rx = rx.and_then(move |_| timeout.expect("timeout"));
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
} }
#[test] #[test]
@@ -1521,7 +1490,7 @@ mod conn {
let _ = ::pretty_env_logger::try_init(); let _ = ::pretty_env_logger::try_init();
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let mut runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let server = thread::spawn(move || { let server = thread::spawn(move || {
@@ -1538,11 +1507,11 @@ mod conn {
assert_eq!(sock.read(&mut buf).expect("read 2"), 0); assert_eq!(sock.read(&mut buf).expect("read 2"), 0);
}); });
let tcp = tcp_connect(&addr).wait().unwrap(); let tcp = rt.block_on(tcp_connect(&addr)).unwrap();
let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap();
runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e)));
let (mut sender, body) = Body::channel(); let (mut sender, body) = Body::channel();
let sender = thread::spawn(move || { let sender = thread::spawn(move || {
@@ -1557,7 +1526,7 @@ mod conn {
.body(body) .body(body)
.unwrap(); .unwrap();
let res = client.send_request(req); let res = client.send_request(req);
res.wait().unwrap_err(); rt.block_on(res).unwrap_err();
server.join().expect("server thread panicked"); server.join().expect("server thread panicked");
sender.join().expect("sender thread panicked"); sender.join().expect("sender thread panicked");
@@ -1567,7 +1536,7 @@ mod conn {
fn uri_absolute_form() { fn uri_absolute_form() {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let mut runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1587,11 +1556,11 @@ mod conn {
let _ = tx1.send(()); let _ = tx1.send(());
}); });
let tcp = tcp_connect(&addr).wait().unwrap(); let tcp = rt.block_on(tcp_connect(&addr)).unwrap();
let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap();
runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e)));
let req = Request::builder() let req = Request::builder()
.uri("http://hyper.local/a") .uri("http://hyper.local/a")
@@ -1606,14 +1575,14 @@ mod conn {
let timeout = Delay::new(Duration::from_millis(200)); let timeout = Delay::new(Duration::from_millis(200));
let rx = rx.and_then(move |_| timeout.expect("timeout")); let rx = rx.and_then(move |_| timeout.expect("timeout"));
res.join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res.join(rx).map(|r| r.0)).unwrap();
} }
#[test] #[test]
fn pipeline() { fn pipeline() {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let mut runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1628,11 +1597,11 @@ mod conn {
let _ = tx1.send(()); let _ = tx1.send(());
}); });
let tcp = tcp_connect(&addr).wait().unwrap(); let tcp = rt.block_on(tcp_connect(&addr)).unwrap();
let (mut client, conn) = conn::handshake(tcp).wait().unwrap(); let (mut client, conn) = rt.block_on(conn::handshake(tcp)).unwrap();
runtime.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e))); rt.spawn(conn.map(|_| ()).map_err(|e| panic!("conn error: {}", e)));
let req = Request::builder() let req = Request::builder()
.uri("/a") .uri("/a")
@@ -1659,7 +1628,7 @@ mod conn {
let timeout = Delay::new(Duration::from_millis(200)); let timeout = Delay::new(Duration::from_millis(200));
let rx = rx.and_then(move |_| timeout.expect("timeout")); let rx = rx.and_then(move |_| timeout.expect("timeout"));
res1.join(res2).join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(res1.join(res2).join(rx).map(|r| r.0)).unwrap();
} }
#[test] #[test]
@@ -1669,7 +1638,7 @@ mod conn {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let _runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1692,14 +1661,14 @@ mod conn {
sock.write_all(b"bar=foo").expect("write 2"); sock.write_all(b"bar=foo").expect("write 2");
}); });
let tcp = tcp_connect(&addr).wait().unwrap(); let tcp = rt.block_on(tcp_connect(&addr)).unwrap();
let io = DebugStream { let io = DebugStream {
tcp: tcp, tcp: tcp,
shutdown_called: false, shutdown_called: false,
}; };
let (mut client, mut conn) = conn::handshake(io).wait().unwrap(); let (mut client, mut conn) = rt.block_on(conn::handshake(io)).unwrap();
{ {
let until_upgrade = poll_fn(|| { let until_upgrade = poll_fn(|| {
@@ -1720,13 +1689,13 @@ mod conn {
let timeout = Delay::new(Duration::from_millis(200)); let timeout = Delay::new(Duration::from_millis(200));
let rx = rx.and_then(move |_| timeout.expect("timeout")); let rx = rx.and_then(move |_| timeout.expect("timeout"));
until_upgrade.join(res).join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(until_upgrade.join(res).join(rx).map(|r| r.0)).unwrap();
// should not be ready now // should not be ready now
poll_fn(|| { rt.block_on(poll_fn(|| {
assert!(client.poll_ready().unwrap().is_not_ready()); assert!(client.poll_ready().unwrap().is_not_ready());
Ok::<_, ()>(Async::Ready(())) Ok::<_, ()>(Async::Ready(()))
}).wait().unwrap(); })).unwrap();
} }
let parts = conn.into_parts(); let parts = conn.into_parts();
@@ -1737,8 +1706,8 @@ mod conn {
assert!(!io.shutdown_called, "upgrade shouldn't shutdown AsyncWrite"); assert!(!io.shutdown_called, "upgrade shouldn't shutdown AsyncWrite");
assert!(client.poll_ready().is_err()); assert!(client.poll_ready().is_err());
let io = write_all(io, b"foo=bar").wait().unwrap().0; let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0;
let vec = read_to_end(io, vec![]).wait().unwrap().1; let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1;
assert_eq!(vec, b"bar=foo"); assert_eq!(vec, b"bar=foo");
} }
@@ -1749,7 +1718,7 @@ mod conn {
let server = TcpListener::bind("127.0.0.1:0").unwrap(); let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap(); let addr = server.local_addr().unwrap();
let _runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1771,14 +1740,14 @@ mod conn {
sock.write_all(b"bar=foo").expect("write 2"); sock.write_all(b"bar=foo").expect("write 2");
}); });
let tcp = tcp_connect(&addr).wait().unwrap(); let tcp = rt.block_on(tcp_connect(&addr)).unwrap();
let io = DebugStream { let io = DebugStream {
tcp: tcp, tcp: tcp,
shutdown_called: false, shutdown_called: false,
}; };
let (mut client, mut conn) = conn::handshake(io).wait().unwrap(); let (mut client, mut conn) = rt.block_on(conn::handshake(io)).unwrap();
{ {
let until_tunneled = poll_fn(|| { let until_tunneled = poll_fn(|| {
@@ -1803,13 +1772,13 @@ mod conn {
let timeout = Delay::new(Duration::from_millis(200)); let timeout = Delay::new(Duration::from_millis(200));
let rx = rx.and_then(move |_| timeout.expect("timeout")); let rx = rx.and_then(move |_| timeout.expect("timeout"));
until_tunneled.join(res).join(rx).map(|r| r.0).wait().unwrap(); rt.block_on(until_tunneled.join(res).join(rx).map(|r| r.0)).unwrap();
// should not be ready now // should not be ready now
poll_fn(|| { rt.block_on(poll_fn(|| {
assert!(client.poll_ready().unwrap().is_not_ready()); assert!(client.poll_ready().unwrap().is_not_ready());
Ok::<_, ()>(Async::Ready(())) Ok::<_, ()>(Async::Ready(()))
}).wait().unwrap(); })).unwrap();
} }
let parts = conn.into_parts(); let parts = conn.into_parts();
@@ -1820,8 +1789,8 @@ mod conn {
assert!(!io.shutdown_called, "tunnel shouldn't shutdown AsyncWrite"); assert!(!io.shutdown_called, "tunnel shouldn't shutdown AsyncWrite");
assert!(client.poll_ready().is_err()); assert!(client.poll_ready().is_err());
let io = write_all(io, b"foo=bar").wait().unwrap().0; let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0;
let vec = read_to_end(io, vec![]).wait().unwrap().1; let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1;
assert_eq!(vec, b"bar=foo"); assert_eq!(vec, b"bar=foo");
} }

View File

@@ -25,7 +25,7 @@ use futures::sync::oneshot;
use futures_timer::Delay; use futures_timer::Delay;
use http::header::{HeaderName, HeaderValue}; use http::header::{HeaderName, HeaderValue};
use tokio::net::{TcpListener, TcpStream as TkTcpStream}; use tokio::net::{TcpListener, TcpStream as TkTcpStream};
use tokio::runtime::Runtime; use tokio::runtime::current_thread::Runtime;
use tokio::reactor::Handle; use tokio::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@@ -35,9 +35,9 @@ use hyper::client::Client;
use hyper::server::conn::Http; use hyper::server::conn::Http;
use hyper::service::{service_fn, service_fn_ok, Service}; use hyper::service::{service_fn, service_fn_ok, Service};
fn tcp_bind(addr: &SocketAddr, handle: &Handle) -> ::tokio::io::Result<TcpListener> { fn tcp_bind(addr: &SocketAddr) -> ::tokio::io::Result<TcpListener> {
let std_listener = StdTcpListener::bind(addr).unwrap(); let std_listener = StdTcpListener::bind(addr).unwrap();
TcpListener::from_std(std_listener, handle) TcpListener::from_std(std_listener, &Handle::default())
} }
#[test] #[test]
@@ -45,7 +45,9 @@ fn try_h2() {
let server = serve(); let server = serve();
let addr_str = format!("http://{}", server.addr()); let addr_str = format!("http://{}", server.addr());
hyper::rt::run(hyper::rt::lazy(move || { let mut rt = Runtime::new().expect("runtime new");
rt.block_on(hyper::rt::lazy(move || {
let client: Client<_, hyper::Body> = Client::builder().http2_only(true).build_http(); let client: Client<_, hyper::Body> = Client::builder().http2_only(true).build_http();
let uri = addr_str.parse::<hyper::Uri>().expect("server addr should parse"); let uri = addr_str.parse::<hyper::Uri>().expect("server addr should parse");
@@ -53,7 +55,7 @@ fn try_h2() {
.and_then(|_res| { Ok(()) }) .and_then(|_res| { Ok(()) })
.map(|_| { () }) .map(|_| { () })
.map_err(|_e| { () }) .map_err(|_e| { () })
})); })).unwrap();
assert_eq!(server.body(), b""); assert_eq!(server.body(), b"");
} }
@@ -96,8 +98,8 @@ fn get_with_body() {
#[test] #[test]
fn get_implicitly_empty() { fn get_implicitly_empty() {
// See https://github.com/hyperium/hyper/issues/1373 // See https://github.com/hyperium/hyper/issues/1373
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
thread::spawn(move || { thread::spawn(move || {
@@ -124,7 +126,7 @@ fn get_implicitly_empty() {
})) }))
}); });
fut.wait().unwrap(); rt.block_on(fut).unwrap();
} }
mod response_body_lengths { mod response_body_lengths {
@@ -945,8 +947,8 @@ fn http_10_request_receives_http_10_response() {
#[test] #[test]
fn disable_keep_alive_mid_request() { fn disable_keep_alive_mid_request() {
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -983,15 +985,15 @@ fn disable_keep_alive_mid_request() {
}) })
}); });
fut.wait().unwrap(); rt.block_on(fut).unwrap();
child.join().unwrap(); child.join().unwrap();
} }
#[test] #[test]
fn disable_keep_alive_post_request() { fn disable_keep_alive_post_request() {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -1048,21 +1050,21 @@ fn disable_keep_alive_post_request() {
}); });
assert!(!dropped.load()); assert!(!dropped.load());
fut.wait().unwrap(); rt.block_on(fut).unwrap();
// we must poll the Core one more time in order for Windows to drop // we must poll the Core one more time in order for Windows to drop
// the read-blocked socket. // the read-blocked socket.
// //
// See https://github.com/carllerche/mio/issues/776 // See https://github.com/carllerche/mio/issues/776
let timeout = Delay::new(Duration::from_millis(10)); let timeout = Delay::new(Duration::from_millis(10));
timeout.wait().unwrap(); rt.block_on(timeout).unwrap();
assert!(dropped.load()); assert!(dropped.load());
child.join().unwrap(); child.join().unwrap();
} }
#[test] #[test]
fn empty_parse_eof_does_not_return_error() { fn empty_parse_eof_does_not_return_error() {
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
thread::spawn(move || { thread::spawn(move || {
@@ -1077,13 +1079,13 @@ fn empty_parse_eof_does_not_return_error() {
Http::new().serve_connection(socket, HelloWorld) Http::new().serve_connection(socket, HelloWorld)
}); });
fut.wait().unwrap(); rt.block_on(fut).unwrap();
} }
#[test] #[test]
fn nonempty_parse_eof_returns_error() { fn nonempty_parse_eof_returns_error() {
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
thread::spawn(move || { thread::spawn(move || {
@@ -1099,13 +1101,13 @@ fn nonempty_parse_eof_returns_error() {
Http::new().serve_connection(socket, HelloWorld) Http::new().serve_connection(socket, HelloWorld)
}); });
fut.wait().unwrap_err(); rt.block_on(fut).unwrap_err();
} }
#[test] #[test]
fn returning_1xx_response_is_error() { fn returning_1xx_response_is_error() {
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
thread::spawn(move || { thread::spawn(move || {
@@ -1132,15 +1134,15 @@ fn returning_1xx_response_is_error() {
})) }))
}); });
fut.wait().unwrap_err(); rt.block_on(fut).unwrap_err();
} }
#[test] #[test]
fn upgrades() { fn upgrades() {
use tokio_io::io::{read_to_end, write_all}; use tokio_io::io::{read_to_end, write_all};
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
@@ -1188,17 +1190,17 @@ fn upgrades() {
}) })
}); });
let conn = fut.wait().unwrap(); let conn = rt.block_on(fut).unwrap();
// wait so that we don't write until other side saw 101 response // wait so that we don't write until other side saw 101 response
rx.wait().unwrap(); rt.block_on(rx).unwrap();
let parts = conn.into_parts(); let parts = conn.into_parts();
let io = parts.io; let io = parts.io;
assert_eq!(parts.read_buf, "eagerly optimistic"); assert_eq!(parts.read_buf, "eagerly optimistic");
let io = write_all(io, b"foo=bar").wait().unwrap().0; let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0;
let vec = read_to_end(io, vec![]).wait().unwrap().1; let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1;
assert_eq!(vec, b"bar=foo"); assert_eq!(vec, b"bar=foo");
} }
@@ -1206,8 +1208,8 @@ fn upgrades() {
fn http_connect() { fn http_connect() {
use tokio_io::io::{read_to_end, write_all}; use tokio_io::io::{read_to_end, write_all};
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
@@ -1252,17 +1254,17 @@ fn http_connect() {
}) })
}); });
let conn = fut.wait().unwrap(); let conn = rt.block_on(fut).unwrap();
// wait so that we don't write until other side saw 101 response // wait so that we don't write until other side saw 101 response
rx.wait().unwrap(); rt.block_on(rx).unwrap();
let parts = conn.into_parts(); let parts = conn.into_parts();
let io = parts.io; let io = parts.io;
assert_eq!(parts.read_buf, "eagerly optimistic"); assert_eq!(parts.read_buf, "eagerly optimistic");
let io = write_all(io, b"foo=bar").wait().unwrap().0; let io = rt.block_on(write_all(io, b"foo=bar")).unwrap().0;
let vec = read_to_end(io, vec![]).wait().unwrap().1; let vec = rt.block_on(read_to_end(io, vec![])).unwrap().1;
assert_eq!(vec, b"bar=foo"); assert_eq!(vec, b"bar=foo");
} }
@@ -1271,7 +1273,7 @@ fn upgrades_new() {
use tokio_io::io::{read_to_end, write_all}; use tokio_io::io::{read_to_end, write_all};
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &rt.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
let (read_101_tx, read_101_rx) = oneshot::channel(); let (read_101_tx, read_101_rx) = oneshot::channel();
@@ -1340,7 +1342,7 @@ fn http_connect_new() {
use tokio_io::io::{read_to_end, write_all}; use tokio_io::io::{read_to_end, write_all};
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &rt.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
let (read_200_tx, read_200_rx) = oneshot::channel(); let (read_200_tx, read_200_rx) = oneshot::channel();
@@ -1404,8 +1406,8 @@ fn http_connect_new() {
#[test] #[test]
fn parse_errors_send_4xx_response() { fn parse_errors_send_4xx_response() {
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
thread::spawn(move || { thread::spawn(move || {
@@ -1427,13 +1429,13 @@ fn parse_errors_send_4xx_response() {
.serve_connection(socket, HelloWorld) .serve_connection(socket, HelloWorld)
}); });
fut.wait().unwrap_err(); rt.block_on(fut).unwrap_err();
} }
#[test] #[test]
fn illegal_request_length_returns_400_response() { fn illegal_request_length_returns_400_response() {
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
thread::spawn(move || { thread::spawn(move || {
@@ -1455,7 +1457,7 @@ fn illegal_request_length_returns_400_response() {
.serve_connection(socket, HelloWorld) .serve_connection(socket, HelloWorld)
}); });
fut.wait().unwrap_err(); rt.block_on(fut).unwrap_err();
} }
#[test] #[test]
@@ -1473,8 +1475,8 @@ fn max_buf_size_no_panic() {
#[test] #[test]
fn max_buf_size() { fn max_buf_size() {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
const MAX: usize = 16_000; const MAX: usize = 16_000;
@@ -1500,14 +1502,14 @@ fn max_buf_size() {
.serve_connection(socket, HelloWorld) .serve_connection(socket, HelloWorld)
}); });
fut.wait().unwrap_err(); rt.block_on(fut).unwrap_err();
} }
#[test] #[test]
fn streaming_body() { fn streaming_body() {
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let runtime = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap(), &runtime.reactor()).unwrap(); let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
@@ -1549,7 +1551,7 @@ fn streaming_body() {
})) }))
}); });
fut.join(rx).wait().unwrap(); rt.block_on(fut.join(rx)).unwrap();
} }
// ------------------------------------------------- // -------------------------------------------------

View File

@@ -6,7 +6,7 @@ pub use std::net::SocketAddr;
pub use self::futures::{future, Future, Stream}; pub use self::futures::{future, Future, Stream};
pub use self::futures::sync::oneshot; pub use self::futures::sync::oneshot;
pub use self::hyper::{HeaderMap, StatusCode}; pub use self::hyper::{HeaderMap, StatusCode};
pub use self::tokio::runtime::Runtime; pub use self::tokio::runtime::current_thread::Runtime;
macro_rules! t { macro_rules! t {
( (
@@ -194,8 +194,7 @@ pub fn __run_test(cfg: __TestConfig) {
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
let _ = pretty_env_logger::try_init(); let _ = pretty_env_logger::try_init();
let rt = Runtime::new().expect("new rt"); let mut rt = Runtime::new().expect("new rt");
let handle = rt.reactor().clone();
assert_eq!(cfg.client_version, cfg.server_version); assert_eq!(cfg.client_version, cfg.server_version);
@@ -205,11 +204,10 @@ pub fn __run_test(cfg: __TestConfig) {
Version::HTTP_11 Version::HTTP_11
}; };
let connector = HttpConnector::new_with_handle(1, handle.clone()); let connector = HttpConnector::new(1);
let client = Client::builder() let client = Client::builder()
.keep_alive_timeout(Duration::from_secs(10)) .keep_alive_timeout(Duration::from_secs(10))
.http2_only(cfg.client_version == 2) .http2_only(cfg.client_version == 2)
.executor(rt.executor())
.build::<_, Body>(connector); .build::<_, Body>(connector);
let serve_handles = Arc::new(Mutex::new( let serve_handles = Arc::new(Mutex::new(
@@ -250,16 +248,13 @@ pub fn __run_test(cfg: __TestConfig) {
let serve = hyper::server::conn::Http::new() let serve = hyper::server::conn::Http::new()
.http2_only(cfg.server_version == 2) .http2_only(cfg.server_version == 2)
.executor(rt.executor()) .serve_addr(
.serve_addr_handle(
&SocketAddr::from(([127, 0, 0, 1], 0)), &SocketAddr::from(([127, 0, 0, 1], 0)),
&handle,
new_service, new_service,
) )
.expect("serve_addr_handle"); .expect("serve_addr");
let addr = serve.incoming_ref().local_addr(); let addr = serve.incoming_ref().local_addr();
let exe = rt.executor();
let (shutdown_tx, shutdown_rx) = oneshot::channel(); let (shutdown_tx, shutdown_rx) = oneshot::channel();
let (success_tx, success_rx) = oneshot::channel(); let (success_tx, success_rx) = oneshot::channel();
let expected_connections = cfg.connections; let expected_connections = cfg.connections;
@@ -269,7 +264,7 @@ pub fn __run_test(cfg: __TestConfig) {
.map_err(|never| -> hyper::Error { match never {} }) .map_err(|never| -> hyper::Error { match never {} })
.flatten() .flatten()
.map_err(|e| panic!("server connection error: {}", e)); .map_err(|e| panic!("server connection error: {}", e));
exe.spawn(fut); ::tokio::spawn(fut);
Ok::<_, hyper::Error>(cnt + 1) Ok::<_, hyper::Error>(cnt + 1)
}) })
.map(move |cnt| { .map(move |cnt| {
@@ -282,7 +277,7 @@ pub fn __run_test(cfg: __TestConfig) {
}) })
.map_err(|_| panic!("shutdown not ok")); .map_err(|_| panic!("shutdown not ok"));
rt.executor().spawn(server); rt.spawn(server);
let make_request = Arc::new(move |client: &Client<HttpConnector>, creq: __CReq, cres: __CRes| { let make_request = Arc::new(move |client: &Client<HttpConnector>, creq: __CReq, cres: __CRes| {
@@ -343,11 +338,9 @@ pub fn __run_test(cfg: __TestConfig) {
let client_futures = client_futures.map(move |_| { let client_futures = client_futures.map(move |_| {
let _ = shutdown_tx.send(()); let _ = shutdown_tx.send(());
}); });
rt.executor().spawn(client_futures); rt.spawn(client_futures);
rt.shutdown_on_idle().wait().expect("rt"); rt.block_on(success_rx
success_rx .map_err(|_| "something panicked"))
.map_err(|_| "something panicked")
.wait()
.expect("shutdown succeeded"); .expect("shutdown succeeded");
} }