diff --git a/Cargo.toml b/Cargo.toml
index ed5e1052..a1527674 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,20 +22,17 @@ include = [
[dependencies]
bytes = "0.4.4"
-futures = "0.1.17"
-futures-cpupool = "0.1.6"
-futures-timer = "0.1.0"
+futures = "0.2.0-beta"
+futures-timer = { git = "https://github.com/alexcrichton/futures-timer.git" }
http = "0.1.5"
httparse = "1.0"
iovec = "0.1"
log = "0.4"
net2 = "0.2.32"
time = "0.1"
-tokio = "0.1.3"
-tokio-executor = "0.1.0"
-tokio-service = "0.1"
-tokio-io = "0.1"
-want = "0.0.2"
+tokio = { git = "https://github.com/seanmonstar/tokio.git", branch = "futures2-use-after-free", features = ["unstable-futures"] }
+tokio-executor = { git = "https://github.com/seanmonstar/tokio.git", branch = "futures2-use-after-free", features = ["unstable-futures"] }
+want = { git = "https://github.com/srijs/want.git", branch = "futures-0.2" }
[dev-dependencies]
num_cpus = "1.0"
@@ -45,3 +42,6 @@ url = "1.0"
[features]
nightly = []
+
+[replace]
+"futures:0.2.0-beta" = { git = "https://github.com/srijs/futures-rs.git", branch = "with-executor" }
diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs
index 8c9aeecf..fe803b7d 100644
--- a/benches/end_to_end.rs
+++ b/benches/end_to_end.rs
@@ -8,7 +8,8 @@ extern crate tokio;
use std::net::SocketAddr;
-use futures::{Future, Stream};
+use futures::{FutureExt, StreamExt};
+use futures::executor::block_on;
use tokio::runtime::Runtime;
use tokio::net::TcpListener;
@@ -22,19 +23,20 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
let addr = spawn_hello(&mut rt);
let client = hyper::Client::configure()
- .build_with_executor(&rt.handle(), rt.executor());
+ .build(&rt.handle());
let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();
b.bytes = 160 * 2 + PHRASE.len() as u64;
b.iter(move || {
- client.get(url.clone())
+ block_on(client.get(url.clone())
+ .with_executor(rt.executor())
.and_then(|res| {
res.into_body().into_stream().for_each(|_chunk| {
Ok(())
- })
+ }).map(|_| ())
})
- .wait().expect("client wait");
+ ).expect("client wait");
});
}
@@ -44,7 +46,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let addr = spawn_hello(&mut rt);
let client = hyper::Client::configure()
- .build_with_executor(&rt.handle(), rt.executor());
+ .build(&rt.handle());
let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();
@@ -54,11 +56,14 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let mut req = Request::new(post.into());
*req.method_mut() = Method::POST;
*req.uri_mut() = url.clone();
- client.request(req).and_then(|res| {
- res.into_body().into_stream().for_each(|_chunk| {
- Ok(())
+ block_on(client.request(req)
+ .with_executor(rt.executor())
+ .and_then(|res| {
+ res.into_body().into_stream().for_each(|_chunk| {
+ Ok(())
+ }).map(|_| ())
})
- }).wait().expect("client wait");
+ ).expect("client wait");
});
}
@@ -76,21 +81,22 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
let service = const_service(service_fn(|req: Request
| {
req.into_body()
.into_stream()
- .concat2()
+ .concat()
.map(|_| {
Response::new(Body::from(PHRASE))
})
}));
let srv = listener.incoming()
- .into_future()
+ .next()
.map_err(|(e, _inc)| panic!("accept error: {}", e))
.and_then(move |(accepted, _inc)| {
let socket = accepted.expect("accepted socket");
http.serve_connection(socket, service.new_service().expect("new_service"))
.map(|_| ())
.map_err(|_| ())
- });
- rt.spawn(srv);
+ })
+ .map_err(|_| panic!("server error"));
+ rt.spawn2(srv);
return addr
}
diff --git a/benches/server.rs b/benches/server.rs
index 158bdb1f..25c98da3 100644
--- a/benches/server.rs
+++ b/benches/server.rs
@@ -11,8 +11,8 @@ use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;
-use futures::{future, stream, Future, Stream};
-use futures::sync::oneshot;
+use futures::{future, stream, FutureExt, StreamExt};
+use futures::channel::oneshot;
use hyper::{Body, Request, Response};
use hyper::server::Service;
@@ -31,7 +31,7 @@ macro_rules! bench_server {
})).unwrap();
let addr = srv.local_addr().unwrap();
addr_tx.send(addr).unwrap();
- tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
+ tokio::runtime::run2(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
});
addr_rx.recv().unwrap()
diff --git a/examples/client.rs b/examples/client.rs
index e5d9a286..53507e81 100644
--- a/examples/client.rs
+++ b/examples/client.rs
@@ -8,7 +8,7 @@ extern crate pretty_env_logger;
use std::env;
use std::io::{self, Write};
-use futures::{Future, Stream};
+use futures::{FutureExt, StreamExt};
use futures::future::lazy;
use hyper::{Body, Client, Request};
@@ -30,7 +30,7 @@ fn main() {
return;
}
- tokio::run(lazy(move || {
+ tokio::runtime::run2(lazy(move |_| {
let client = Client::default();
let mut req = Request::new(Body::empty());
@@ -43,10 +43,13 @@ fn main() {
res.into_parts().1.into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk).map_err(From::from)
})
- }).map(|_| {
- println!("\n\nDone.");
- }).map_err(|err| {
- eprintln!("Error {}", err);
+ }).then(|result| {
+ if let Some(err) = result.err() {
+ eprintln!("Error {}", err);
+ } else {
+ println!("\n\nDone.");
+ }
+ Ok(())
})
}));
}
diff --git a/examples/hello.rs b/examples/hello.rs
index 80d33542..b4195d5d 100644
--- a/examples/hello.rs
+++ b/examples/hello.rs
@@ -4,7 +4,7 @@ extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;
-use futures::Future;
+use futures::FutureExt;
use futures::future::lazy;
use hyper::{Body, Response};
@@ -20,13 +20,13 @@ fn main() {
Ok(Response::new(Body::from(PHRASE)))
}));
- tokio::run(lazy(move || {
+ tokio::runtime::run2(lazy(move |_| {
let server = Http::new()
.sleep_on_errors(true)
.bind(&addr, new_service)
.unwrap();
- println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
- server.run().map_err(|err| eprintln!("Server error {}", err))
+ println!("Listening on http://{}", server.local_addr().unwrap());
+ server.run().map_err(|err| panic!("Server error {}", err))
}));
}
diff --git a/examples/multi_server.rs b/examples/multi_server.rs
index 239d7abb..a42fc5bf 100644
--- a/examples/multi_server.rs
+++ b/examples/multi_server.rs
@@ -4,8 +4,9 @@ extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;
-use futures::{Future, Stream};
+use futures::{FutureExt, StreamExt};
use futures::future::{FutureResult, lazy};
+use futures::executor::spawn;
use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
@@ -43,22 +44,20 @@ fn main() {
let addr1 = "127.0.0.1:1337".parse().unwrap();
let addr2 = "127.0.0.1:1338".parse().unwrap();
- tokio::run(lazy(move || {
+ tokio::runtime::run2(lazy(move |_| {
let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap();
let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap();
println!("Listening on http://{}", srv1.incoming_ref().local_addr());
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
- tokio::spawn(srv1.for_each(move |conn| {
- tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err)));
- Ok(())
- }).map_err(|_| ()));
+ spawn(srv1.map_err(|err| panic!("srv1 error: {:?}", err)).for_each(move |conn| {
+ spawn(conn.map(|_| ()).map_err(|err| panic!("srv1 error: {:?}", err)))
+ }).map(|_| ()));
- tokio::spawn(srv2.for_each(move |conn| {
- tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err)));
- Ok(())
- }).map_err(|_| ()));
+ spawn(srv2.map_err(|err| panic!("srv2 error: {:?}", err)).for_each(move |conn| {
+ spawn(conn.map(|_| ()).map_err(|err| panic!("srv2 error: {:?}", err)))
+ }).map(|_| ()));
Ok(())
}));
diff --git a/examples/params.rs b/examples/params.rs
index d362840f..eb576cc1 100644
--- a/examples/params.rs
+++ b/examples/params.rs
@@ -5,7 +5,7 @@ extern crate pretty_env_logger;
extern crate tokio;
extern crate url;
-use futures::{Future, Stream};
+use futures::{Future, FutureExt, StreamExt};
use futures::future::lazy;
use hyper::{Body, Method, Request, Response, StatusCode};
@@ -32,7 +32,7 @@ impl Service for ParamExample {
Box::new(futures::future::ok(Response::new(INDEX.into())))
},
(&Method::POST, "/post") => {
- Box::new(req.into_parts().1.into_stream().concat2().map(|b| {
+ Box::new(req.into_parts().1.into_stream().concat().map(|b| {
// Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so
@@ -98,9 +98,11 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
- tokio::run(lazy(move || {
+ tokio::runtime::run2(lazy(move |_| {
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
- println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
- server.run().map_err(|err| eprintln!("Server error {}", err))
+ println!("Listening on http://{}", server.local_addr().unwrap());
+ server.run().recover(|err| {
+ eprintln!("Server error {}", err)
+ })
}));
}
diff --git a/examples/send_file.rs b/examples/send_file.rs
index 8e6fe917..ad0cdc12 100644
--- a/examples/send_file.rs
+++ b/examples/send_file.rs
@@ -4,9 +4,9 @@ extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;
-use futures::{Future/*, Sink*/};
+use futures::{Future, FutureExt};
use futures::future::lazy;
-use futures::sync::oneshot;
+use futures::channel::oneshot;
use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
use hyper::error::Error;
@@ -141,9 +141,9 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
- tokio::run(lazy(move || {
+ tokio::runtime::run2(lazy(move |_| {
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
- println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
- server.run().map_err(|err| eprintln!("Server error {}", err))
+ println!("Listening on http://{}", server.local_addr().unwrap());
+ server.run().map_err(|err| panic!("Server error {}", err))
}));
}
diff --git a/examples/server.rs b/examples/server.rs
index b5d21539..9486ab8d 100644
--- a/examples/server.rs
+++ b/examples/server.rs
@@ -4,7 +4,7 @@ extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;
-use futures::Future;
+use futures::FutureExt;
use futures::future::{FutureResult, lazy};
use hyper::{Body, Method, Request, Response, StatusCode};
@@ -43,9 +43,11 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
- tokio::run(lazy(move || {
+ tokio::runtime::run2(lazy(move |_| {
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
- println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
- server.run().map_err(|err| eprintln!("Server error {}", err))
+ println!("Listening on http://{}", server.local_addr().unwrap());
+ server.run().recover(|err| {
+ eprintln!("Server error {}", err)
+ })
}));
}
diff --git a/examples/web_api.rs b/examples/web_api.rs
index d3261e9a..d792eeaf 100644
--- a/examples/web_api.rs
+++ b/examples/web_api.rs
@@ -4,14 +4,15 @@ extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;
-use futures::{Future, Stream};
+use futures::{Future, FutureExt, StreamExt};
+use futures::executor::spawn;
use futures::future::lazy;
use tokio::reactor::Handle;
use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
-#[allow(unused)]
+#[allow(unused, deprecated)]
use std::ascii::AsciiExt;
static NOTFOUND: &[u8] = b"Not Found";
@@ -78,13 +79,15 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
- tokio::run(lazy(move || {
+ tokio::runtime::run2(lazy(move |_| {
let handle = Handle::current();
let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(handle.clone()))).unwrap();
- println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
+ println!("Listening on http://{}", serve.incoming_ref().local_addr());
- serve.map_err(|_| ()).for_each(move |conn| {
- tokio::spawn(conn.map(|_| ()).map_err(|err| println!("serve error: {:?}", err)))
- })
+ serve.map_err(|err| panic!("server error {:?}", err)).for_each(move |conn| {
+ spawn(conn.recover(|err| {
+ println!("connection error: {:?}", err);
+ }))
+ }).map(|_| ())
}));
}
diff --git a/src/client/conn.rs b/src/client/conn.rs
index 9d43ec26..6dee97fa 100644
--- a/src/client/conn.rs
+++ b/src/client/conn.rs
@@ -11,9 +11,10 @@ use std::fmt;
use std::marker::PhantomData;
use bytes::Bytes;
-use futures::{Async, Future, Poll};
+use futures::{Async, Future, FutureExt, Poll};
use futures::future::{self, Either};
-use tokio_io::{AsyncRead, AsyncWrite};
+use futures::task;
+use futures::io::{AsyncRead, AsyncWrite};
use proto;
use proto::body::Entity;
@@ -123,8 +124,8 @@ impl SendRequest
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
- pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
- self.dispatch.poll_ready()
+ pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
+ self.dispatch.poll_ready(cx)
}
pub(super) fn is_closed(&self) -> bool {
@@ -162,7 +163,7 @@ where
/// # use http::header::HOST;
/// # use hyper::client::conn::SendRequest;
/// # use hyper::Body;
- /// use futures::Future;
+ /// use futures::FutureExt;
/// use hyper::Request;
///
/// # fn doc(mut tx: SendRequest) {
@@ -186,19 +187,19 @@ where
pub fn send_request(&mut self, req: Request) -> ResponseFuture {
let inner = match self.dispatch.send(req) {
Ok(rx) => {
- Either::A(rx.then(move |res| {
+ rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
- }))
+ }).left()
},
Err(_req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
- Either::B(future::err(err))
+ future::err(err).right()
}
};
@@ -214,7 +215,7 @@ where
{
let inner = match self.dispatch.try_send(req) {
Ok(rx) => {
- Either::A(rx.then(move |res| {
+ Either::Left(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
@@ -226,7 +227,7 @@ where
Err(req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
- Either::B(future::err((err, Some(req))))
+ Either::Right(future::err((err, Some(req))))
}
};
Box::new(inner)
@@ -277,8 +278,8 @@ where
/// upgrade. Once the upgrade is completed, the connection would be "done",
/// but it is not desired to actally shutdown the IO object. Instead you
/// would take it back using `into_parts`.
- pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
- self.inner.poll_without_shutdown()
+ pub fn poll_without_shutdown(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
+ self.inner.poll_without_shutdown(cx)
}
}
@@ -290,8 +291,8 @@ where
type Item = ();
type Error = ::Error;
- fn poll(&mut self) -> Poll {
- self.inner.poll()
+ fn poll(&mut self, cx: &mut task::Context) -> Poll {
+ self.inner.poll(cx)
}
}
@@ -363,8 +364,8 @@ where
type Item = (SendRequest, Connection);
type Error = ::Error;
- fn poll(&mut self) -> Poll {
- self.inner.poll()
+ fn poll(&mut self, cx: &mut task::Context) -> Poll {
+ self.inner.poll(cx)
.map(|async| {
async.map(|(tx, dispatch)| {
(tx, Connection { inner: dispatch })
@@ -394,8 +395,8 @@ where
>);
type Error = ::Error;
- fn poll(&mut self) -> Poll {
- self.inner.poll()
+ fn poll(&mut self, cx: &mut task::Context) -> Poll {
+ self.inner.poll(cx)
}
}
@@ -417,7 +418,7 @@ where
>);
type Error = ::Error;
- fn poll(&mut self) -> Poll {
+ fn poll(&mut self, _cx: &mut task::Context) -> Poll {
let io = self.io.take().expect("polled more than once");
let (tx, rx) = dispatch::channel();
let mut conn = proto::Conn::new(io);
@@ -441,8 +442,8 @@ impl Future for ResponseFuture {
type Error = ::Error;
#[inline]
- fn poll(&mut self) -> Poll {
- self.inner.poll()
+ fn poll(&mut self, cx: &mut task::Context) -> Poll {
+ self.inner.poll(cx)
}
}
diff --git a/src/client/connect.rs b/src/client/connect.rs
index 33e636c0..ad5ab7b5 100644
--- a/src/client/connect.rs
+++ b/src/client/connect.rs
@@ -8,24 +8,21 @@
use std::error::Error as StdError;
use std::fmt;
use std::io;
-use std::mem;
use std::net::SocketAddr;
-use std::sync::Arc;
use std::time::Duration;
-use futures::{Future, Poll, Async};
-use futures::future::{Executor, ExecuteError};
-use futures::sync::oneshot;
-use futures_cpupool::{Builder as CpuPoolBuilder};
+use futures::{Future, Never, Poll, Async};
+use futures::executor::{Executor, SpawnError, ThreadPoolBuilder};
+use futures::task;
+use futures::io::{AsyncRead, AsyncWrite};
use http::Uri;
use http::uri::Scheme;
use net2::TcpBuilder;
-use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio::net::{TcpStream, ConnectFuture};
+use executor::CloneBoxedExecutor;
use super::dns;
-use self::http_connector::HttpConnectorBlockingTask;
/// Connect to a destination, returning an IO transport.
///
@@ -174,7 +171,7 @@ impl HttpConnector {
/// Takes number of DNS worker threads.
#[inline]
pub fn new(threads: usize, handle: &Handle) -> HttpConnector {
- let pool = CpuPoolBuilder::new()
+ let pool = ThreadPoolBuilder::new()
.name_prefix("hyper-dns")
.pool_size(threads)
.create();
@@ -186,10 +183,10 @@ impl HttpConnector {
/// Takes an executor to run blocking tasks on.
#[inline]
pub fn new_with_executor(executor: E, handle: &Handle) -> HttpConnector
- where E: Executor + Send + Sync
+ where E: Executor + Clone + Send + Sync
{
HttpConnector {
- executor: HttpConnectExecutor(Arc::new(executor)),
+ executor: HttpConnectExecutor(Box::new(executor)),
enforce_http: true,
handle: handle.clone(),
keep_alive_timeout: None,
@@ -298,7 +295,7 @@ pub struct HttpConnecting {
enum State {
Lazy(HttpConnectExecutor, String, u16),
- Resolving(oneshot::SpawnHandle),
+ Resolving(dns::Resolving),
Connecting(ConnectingTcp),
Error(Option),
}
@@ -307,11 +304,11 @@ impl Future for HttpConnecting {
type Item = (TcpStream, Connected);
type Error = io::Error;
- fn poll(&mut self) -> Poll {
+ fn poll(&mut self, cx: &mut task::Context) -> Poll {
loop {
let state;
match self.state {
- State::Lazy(ref executor, ref mut host, port) => {
+ State::Lazy(ref mut executor, ref mut host, port) => {
// If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
@@ -320,24 +317,19 @@ impl Future for HttpConnecting {
current: None
})
} else {
- let host = mem::replace(host, String::new());
- let work = dns::Work::new(host, port);
- state = State::Resolving(oneshot::spawn(work, executor));
+ let host = ::std::mem::replace(host, String::new());
+ state = State::Resolving(dns::Resolving::spawn(host, port, executor));
}
},
State::Resolving(ref mut future) => {
- match try!(future.poll()) {
- Async::NotReady => return Ok(Async::NotReady),
- Async::Ready(addrs) => {
- state = State::Connecting(ConnectingTcp {
- addrs: addrs,
- current: None,
- })
- }
- };
+ let addrs = try_ready!(future.poll(cx));
+ state = State::Connecting(ConnectingTcp {
+ addrs: addrs,
+ current: None,
+ });
},
State::Connecting(ref mut c) => {
- let sock = try_ready!(c.poll(&self.handle));
+ let sock = try_ready!(c.poll(cx, &self.handle));
if let Some(dur) = self.keep_alive_timeout {
sock.set_keepalive(Some(dur))?;
@@ -365,11 +357,11 @@ struct ConnectingTcp {
impl ConnectingTcp {
// not a Future, since passing a &Handle to poll
- fn poll(&mut self, handle: &Handle) -> Poll {
+ fn poll(&mut self, cx: &mut task::Context, handle: &Handle) -> Poll {
let mut err = None;
loop {
if let Some(ref mut current) = self.current {
- match current.poll() {
+ match current.poll(cx) {
Ok(ok) => return Ok(ok),
Err(e) => {
trace!("connect error {:?}", e);
@@ -392,37 +384,19 @@ impl ConnectingTcp {
}
}
-// Make this Future unnameable outside of this crate.
-mod http_connector {
- use super::*;
- // Blocking task to be executed on a thread pool.
- pub struct HttpConnectorBlockingTask {
- pub(super) work: oneshot::Execute
- }
-
- impl fmt::Debug for HttpConnectorBlockingTask {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.pad("HttpConnectorBlockingTask")
- }
- }
-
- impl Future for HttpConnectorBlockingTask {
- type Item = ();
- type Error = ();
-
- fn poll(&mut self) -> Poll<(), ()> {
- self.work.poll()
- }
- }
-}
-
#[derive(Clone)]
-struct HttpConnectExecutor(Arc + Send + Sync>);
+struct HttpConnectExecutor(Box);
-impl Executor> for HttpConnectExecutor {
- fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> {
- self.0.execute(HttpConnectorBlockingTask { work: future })
- .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work))
+impl Executor for HttpConnectExecutor {
+ fn spawn(
+ &mut self,
+ f: Box + 'static + Send>
+ ) -> Result<(), SpawnError> {
+ self.0.spawn(f)
+ }
+
+ fn status(&self) -> Result<(), SpawnError> {
+ self.0.status()
}
}
@@ -430,7 +404,7 @@ impl Executor> for HttpConnectExecutor {
mod tests {
#![allow(deprecated)]
use std::io;
- use futures::Future;
+ use futures::executor::block_on;
use tokio::runtime::Runtime;
use super::{Connect, Destination, HttpConnector};
@@ -443,7 +417,7 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
- assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
+ assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
#[test]
@@ -455,7 +429,7 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
- assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
+ assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
@@ -468,6 +442,6 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
- assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
+ assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
}
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 51eae1d4..77402afe 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -1,9 +1,8 @@
-use futures::{Async, Poll, Stream};
-use futures::sync::{mpsc, oneshot};
+use futures::{Async, Never, Poll, Stream};
+use futures::channel::{mpsc, oneshot};
+use futures::task;
use want;
-use common::Never;
-
//pub type Callback = oneshot::Sender)>>;
pub type RetryPromise = oneshot::Receiver)>>;
pub type Promise = oneshot::Receiver>;
@@ -33,15 +32,15 @@ pub struct Sender {
}
impl Sender {
- pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
- match self.inner.poll_ready() {
+ pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
+ match self.inner.poll_ready(cx) {
Ok(Async::Ready(())) => {
// there's room in the queue, but does the Connection
// want a message yet?
- self.giver.poll_want()
+ self.giver.poll_want(cx)
.map_err(|_| ::Error::Closed)
},
- Ok(Async::NotReady) => Ok(Async::NotReady),
+ Ok(Async::Pending) => Ok(Async::Pending),
Err(_) => Err(::Error::Closed),
}
}
@@ -75,16 +74,15 @@ impl Stream for Receiver {
type Item = (T, Callback);
type Error = Never;
- fn poll(&mut self) -> Poll