feat(rt): make tokio runtime optional
A Cargo feature `runtime` is added, which is enabled by default, that includes the following: - The `client::HttpConnector`, which uses `tokio::net::TcpStream`. - The `server::AddrStream`, which uses `tokio::net::TcpListener`. - The `hyper::rt` module, which includes useful utilities to work with the runtime without needing to import `futures` or `tokio` explicity. Disabling the feature removes many of these niceties, but allows people to use hyper in environments that have an alternative runtime, without needing to download an unused one.
This commit is contained in:
@@ -9,13 +9,12 @@ matrix:
|
|||||||
- rust: beta
|
- rust: beta
|
||||||
- rust: stable
|
- rust: stable
|
||||||
env: HYPER_DOCS=1
|
env: HYPER_DOCS=1
|
||||||
|
- rust: stable
|
||||||
|
env: FEATURES="--no-default-features"
|
||||||
- rust: 1.21.0
|
- rust: 1.21.0
|
||||||
|
|
||||||
cache:
|
cache:
|
||||||
apt: true
|
apt: true
|
||||||
directories:
|
|
||||||
- target/debug/deps
|
|
||||||
- target/debug/build
|
|
||||||
|
|
||||||
script:
|
script:
|
||||||
- ./.travis/readme.py
|
- ./.travis/readme.py
|
||||||
|
|||||||
84
Cargo.toml
84
Cargo.toml
@@ -22,19 +22,21 @@ include = [
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "0.4.4"
|
bytes = "0.4.4"
|
||||||
futures = "0.1.17"
|
futures = "0.1.21"
|
||||||
futures-cpupool = "0.1.6"
|
futures-cpupool = { version = "0.1.6", optional = true }
|
||||||
futures-timer = "0.1.0"
|
futures-timer = "0.1.0"
|
||||||
http = "0.1.5"
|
http = "0.1.5"
|
||||||
httparse = "1.0"
|
httparse = "1.0"
|
||||||
h2 = "0.1.5"
|
h2 = "0.1.5"
|
||||||
iovec = "0.1"
|
iovec = "0.1"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
net2 = "0.2.32"
|
net2 = { version = "0.2.32", optional = true }
|
||||||
time = "0.1"
|
time = "0.1"
|
||||||
tokio = "0.1.5"
|
tokio = { version = "0.1.5", optional = true }
|
||||||
tokio-executor = "0.1.0"
|
tokio-executor = { version = "0.1.0", optional = true }
|
||||||
tokio-io = "0.1"
|
tokio-io = "0.1"
|
||||||
|
tokio-reactor = { version = "0.1", optional = true }
|
||||||
|
tokio-tcp = { version = "0.1", optional = true }
|
||||||
want = "0.0.3"
|
want = "0.0.3"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
@@ -44,4 +46,76 @@ spmc = "0.2"
|
|||||||
url = "1.0"
|
url = "1.0"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
|
default = ["runtime"]
|
||||||
nightly = []
|
nightly = []
|
||||||
|
runtime = [
|
||||||
|
"futures-cpupool",
|
||||||
|
"net2",
|
||||||
|
"tokio",
|
||||||
|
"tokio-executor",
|
||||||
|
"tokio-reactor",
|
||||||
|
"tokio-tcp",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "client"
|
||||||
|
path = "examples/client.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "hello"
|
||||||
|
path = "examples/hello.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "multi_server"
|
||||||
|
path = "examples/multi_server.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "params"
|
||||||
|
path = "examples/params.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "send_file"
|
||||||
|
path = "examples/send_file.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "server"
|
||||||
|
path = "examples/server.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "web_api"
|
||||||
|
path = "examples/web_api.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
|
||||||
|
[[bench]]
|
||||||
|
name = "end_to_end"
|
||||||
|
path = "benches/end_to_end.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
[[bench]]
|
||||||
|
name = "server"
|
||||||
|
path = "benches/server.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
|
||||||
|
[[test]]
|
||||||
|
name = "client"
|
||||||
|
path = "tests/client.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
[[test]]
|
||||||
|
name = "integration"
|
||||||
|
path = "tests/integration.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
[[test]]
|
||||||
|
name = "server"
|
||||||
|
path = "tests/server.rs"
|
||||||
|
required-features = ["runtime"]
|
||||||
|
|
||||||
|
|||||||
@@ -1,17 +1,12 @@
|
|||||||
//#![deny(warnings)]
|
#![deny(warnings)]
|
||||||
extern crate futures;
|
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate tokio;
|
|
||||||
|
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
|
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::io::{self, Write};
|
use std::io::{self, Write};
|
||||||
|
|
||||||
use futures::{Future, Stream};
|
|
||||||
use futures::future::lazy;
|
|
||||||
|
|
||||||
use hyper::{Body, Client, Request};
|
use hyper::{Body, Client, Request};
|
||||||
|
use hyper::rt::{self, Future, Stream};
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
pretty_env_logger::init();
|
pretty_env_logger::init();
|
||||||
@@ -30,7 +25,7 @@ fn main() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::run(lazy(move || {
|
rt::run(rt::lazy(move || {
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
|
|
||||||
let mut req = Request::new(Body::empty());
|
let mut req = Request::new(Body::empty());
|
||||||
|
|||||||
@@ -1,13 +1,10 @@
|
|||||||
#![deny(warnings)]
|
#![deny(warnings)]
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate futures;
|
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
extern crate tokio;
|
|
||||||
|
|
||||||
use futures::Future;
|
|
||||||
|
|
||||||
use hyper::{Body, Response, Server};
|
use hyper::{Body, Response, Server};
|
||||||
use hyper::service::service_fn_ok;
|
use hyper::service::service_fn_ok;
|
||||||
|
use hyper::rt::{self, Future};
|
||||||
|
|
||||||
static PHRASE: &'static [u8] = b"Hello World!";
|
static PHRASE: &'static [u8] = b"Hello World!";
|
||||||
|
|
||||||
@@ -33,5 +30,5 @@ fn main() {
|
|||||||
|
|
||||||
println!("Listening on http://{}", addr);
|
println!("Listening on http://{}", addr);
|
||||||
|
|
||||||
tokio::run(server);
|
rt::run(server);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,14 +1,10 @@
|
|||||||
#![deny(warnings)]
|
#![deny(warnings)]
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate futures;
|
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
extern crate tokio;
|
|
||||||
|
|
||||||
use futures::{Future};
|
|
||||||
use futures::future::{lazy};
|
|
||||||
|
|
||||||
use hyper::{Body, Response, Server};
|
use hyper::{Body, Response, Server};
|
||||||
use hyper::service::service_fn_ok;
|
use hyper::service::service_fn_ok;
|
||||||
|
use hyper::rt::{self, Future};
|
||||||
|
|
||||||
static INDEX1: &'static [u8] = b"The 1st service!";
|
static INDEX1: &'static [u8] = b"The 1st service!";
|
||||||
static INDEX2: &'static [u8] = b"The 2nd service!";
|
static INDEX2: &'static [u8] = b"The 2nd service!";
|
||||||
@@ -19,7 +15,7 @@ fn main() {
|
|||||||
let addr1 = ([127, 0, 0, 1], 1337).into();
|
let addr1 = ([127, 0, 0, 1], 1337).into();
|
||||||
let addr2 = ([127, 0, 0, 1], 1338).into();
|
let addr2 = ([127, 0, 0, 1], 1338).into();
|
||||||
|
|
||||||
tokio::run(lazy(move || {
|
rt::run(rt::lazy(move || {
|
||||||
let srv1 = Server::bind(&addr1)
|
let srv1 = Server::bind(&addr1)
|
||||||
.serve(|| service_fn_ok(|_| Response::new(Body::from(INDEX1))))
|
.serve(|| service_fn_ok(|_| Response::new(Body::from(INDEX1))))
|
||||||
.map_err(|e| eprintln!("server 1 error: {}", e));
|
.map_err(|e| eprintln!("server 1 error: {}", e));
|
||||||
@@ -30,8 +26,8 @@ fn main() {
|
|||||||
|
|
||||||
println!("Listening on http://{} and http://{}", addr1, addr2);
|
println!("Listening on http://{} and http://{}", addr1, addr2);
|
||||||
|
|
||||||
tokio::spawn(srv1);
|
rt::spawn(srv1);
|
||||||
tokio::spawn(srv2);
|
rt::spawn(srv2);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}));
|
}));
|
||||||
|
|||||||
@@ -2,7 +2,6 @@
|
|||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
extern crate tokio;
|
|
||||||
extern crate url;
|
extern crate url;
|
||||||
|
|
||||||
use futures::{future, Future, Stream};
|
use futures::{future, Future, Stream};
|
||||||
@@ -93,5 +92,5 @@ fn main() {
|
|||||||
.serve(|| service_fn(param_example))
|
.serve(|| service_fn(param_example))
|
||||||
.map_err(|e| eprintln!("server error: {}", e));
|
.map_err(|e| eprintln!("server error: {}", e));
|
||||||
|
|
||||||
tokio::run(server);
|
hyper::rt::run(server);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,6 @@
|
|||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
extern crate tokio;
|
|
||||||
|
|
||||||
use futures::{future, Future};
|
use futures::{future, Future};
|
||||||
use futures::sync::oneshot;
|
use futures::sync::oneshot;
|
||||||
@@ -29,7 +28,7 @@ fn main() {
|
|||||||
|
|
||||||
println!("Listening on http://{}", addr);
|
println!("Listening on http://{}", addr);
|
||||||
|
|
||||||
tokio::run(server);
|
hyper::rt::run(server);
|
||||||
}
|
}
|
||||||
|
|
||||||
type ResponseFuture = Box<Future<Item=Response<Body>, Error=io::Error> + Send>;
|
type ResponseFuture = Box<Future<Item=Response<Body>, Error=io::Error> + Send>;
|
||||||
|
|||||||
@@ -1,13 +1,10 @@
|
|||||||
#![deny(warnings)]
|
#![deny(warnings)]
|
||||||
extern crate futures;
|
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
extern crate tokio;
|
|
||||||
|
|
||||||
use futures::Future;
|
|
||||||
|
|
||||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||||
use hyper::service::service_fn_ok;
|
use hyper::service::service_fn_ok;
|
||||||
|
use hyper::rt::Future;
|
||||||
|
|
||||||
static INDEX: &'static [u8] = b"Try POST /echo";
|
static INDEX: &'static [u8] = b"Try POST /echo";
|
||||||
|
|
||||||
@@ -40,5 +37,5 @@ fn main() {
|
|||||||
|
|
||||||
println!("Listening on http://{}", addr);
|
println!("Listening on http://{}", addr);
|
||||||
|
|
||||||
tokio::run(server);
|
hyper::rt::run(server);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,6 @@
|
|||||||
extern crate futures;
|
extern crate futures;
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
extern crate tokio;
|
|
||||||
|
|
||||||
use futures::{future, Future, Stream};
|
use futures::{future, Future, Stream};
|
||||||
|
|
||||||
@@ -68,7 +67,7 @@ fn main() {
|
|||||||
|
|
||||||
let addr = "127.0.0.1:1337".parse().unwrap();
|
let addr = "127.0.0.1:1337".parse().unwrap();
|
||||||
|
|
||||||
tokio::run(future::lazy(move || {
|
hyper::rt::run(future::lazy(move || {
|
||||||
// Share a `Client` with all `Service`s
|
// Share a `Client` with all `Service`s
|
||||||
let client = Client::new();
|
let client = Client::new();
|
||||||
|
|
||||||
|
|||||||
@@ -6,26 +6,12 @@
|
|||||||
//! establishes connections over TCP.
|
//! establishes connections over TCP.
|
||||||
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
|
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
|
||||||
use std::error::Error as StdError;
|
use std::error::Error as StdError;
|
||||||
use std::fmt;
|
|
||||||
use std::io;
|
|
||||||
use std::mem;
|
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use futures::{Future, Poll, Async};
|
use futures::Future;
|
||||||
use futures::future::{Executor, ExecuteError};
|
|
||||||
use futures::sync::oneshot;
|
|
||||||
use futures_cpupool::{Builder as CpuPoolBuilder};
|
|
||||||
use http::Uri;
|
use http::Uri;
|
||||||
use http::uri::Scheme;
|
|
||||||
use net2::TcpBuilder;
|
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
use tokio::reactor::Handle;
|
|
||||||
use tokio::net::{TcpStream, ConnectFuture};
|
|
||||||
|
|
||||||
use super::dns;
|
#[cfg(feature = "runtime")] pub use self::http::HttpConnector;
|
||||||
use self::http_connector::HttpConnectorBlockingTask;
|
|
||||||
|
|
||||||
/// Connect to a destination, returning an IO transport.
|
/// Connect to a destination, returning an IO transport.
|
||||||
///
|
///
|
||||||
@@ -135,367 +121,393 @@ impl Connected {
|
|||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
fn connect(addr: &SocketAddr, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
|
#[cfg(feature = "runtime")]
|
||||||
if let Some(ref handle) = *handle {
|
mod http {
|
||||||
let builder = match addr {
|
use super::*;
|
||||||
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
|
|
||||||
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
|
|
||||||
};
|
|
||||||
|
|
||||||
if cfg!(windows) {
|
use std::fmt;
|
||||||
// Windows requires a socket be bound before calling connect
|
use std::io;
|
||||||
let any: SocketAddr = match addr {
|
use std::mem;
|
||||||
&SocketAddr::V4(_) => {
|
use std::net::SocketAddr;
|
||||||
([0, 0, 0, 0], 0).into()
|
use std::sync::Arc;
|
||||||
},
|
use std::time::Duration;
|
||||||
&SocketAddr::V6(_) => {
|
|
||||||
([0, 0, 0, 0, 0, 0, 0, 0], 0).into()
|
use futures::{Async, Poll};
|
||||||
}
|
use futures::future::{Executor, ExecuteError};
|
||||||
|
use futures::sync::oneshot;
|
||||||
|
use futures_cpupool::{Builder as CpuPoolBuilder};
|
||||||
|
use http::uri::Scheme;
|
||||||
|
use net2::TcpBuilder;
|
||||||
|
use tokio_reactor::Handle;
|
||||||
|
use tokio_tcp::{TcpStream, ConnectFuture};
|
||||||
|
|
||||||
|
use super::super::dns;
|
||||||
|
|
||||||
|
use self::http_connector::HttpConnectorBlockingTask;
|
||||||
|
|
||||||
|
|
||||||
|
fn connect(addr: &SocketAddr, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
|
||||||
|
if let Some(ref handle) = *handle {
|
||||||
|
let builder = match addr {
|
||||||
|
&SocketAddr::V4(_) => TcpBuilder::new_v4()?,
|
||||||
|
&SocketAddr::V6(_) => TcpBuilder::new_v6()?,
|
||||||
};
|
};
|
||||||
builder.bind(any)?;
|
|
||||||
|
if cfg!(windows) {
|
||||||
|
// Windows requires a socket be bound before calling connect
|
||||||
|
let any: SocketAddr = match addr {
|
||||||
|
&SocketAddr::V4(_) => {
|
||||||
|
([0, 0, 0, 0], 0).into()
|
||||||
|
},
|
||||||
|
&SocketAddr::V6(_) => {
|
||||||
|
([0, 0, 0, 0, 0, 0, 0, 0], 0).into()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
builder.bind(any)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle))
|
||||||
|
} else {
|
||||||
|
Ok(TcpStream::connect(addr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A connector for the `http` scheme.
|
||||||
|
///
|
||||||
|
/// Performs DNS resolution in a thread pool, and then connects over TCP.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct HttpConnector {
|
||||||
|
executor: HttpConnectExecutor,
|
||||||
|
enforce_http: bool,
|
||||||
|
handle: Option<Handle>,
|
||||||
|
keep_alive_timeout: Option<Duration>,
|
||||||
|
nodelay: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HttpConnector {
|
||||||
|
/// Construct a new HttpConnector.
|
||||||
|
///
|
||||||
|
/// Takes number of DNS worker threads.
|
||||||
|
#[inline]
|
||||||
|
pub fn new(threads: usize) -> HttpConnector {
|
||||||
|
HttpConnector::new_with_handle_opt(threads, None)
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle))
|
/// Construct a new HttpConnector with a specific Tokio handle.
|
||||||
} else {
|
pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector {
|
||||||
Ok(TcpStream::connect(addr))
|
HttpConnector::new_with_handle_opt(threads, Some(handle))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_with_handle_opt(threads: usize, handle: Option<Handle>) -> HttpConnector {
|
||||||
|
let pool = CpuPoolBuilder::new()
|
||||||
|
.name_prefix("hyper-dns")
|
||||||
|
.pool_size(threads)
|
||||||
|
.create();
|
||||||
|
HttpConnector::new_with_executor(pool, handle)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Construct a new HttpConnector.
|
||||||
|
///
|
||||||
|
/// Takes an executor to run blocking tasks on.
|
||||||
|
pub fn new_with_executor<E: 'static>(executor: E, handle: Option<Handle>) -> HttpConnector
|
||||||
|
where E: Executor<HttpConnectorBlockingTask> + Send + Sync
|
||||||
|
{
|
||||||
|
HttpConnector {
|
||||||
|
executor: HttpConnectExecutor(Arc::new(executor)),
|
||||||
|
enforce_http: true,
|
||||||
|
handle,
|
||||||
|
keep_alive_timeout: None,
|
||||||
|
nodelay: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Option to enforce all `Uri`s have the `http` scheme.
|
||||||
|
///
|
||||||
|
/// Enabled by default.
|
||||||
|
#[inline]
|
||||||
|
pub fn enforce_http(&mut self, is_enforced: bool) {
|
||||||
|
self.enforce_http = is_enforced;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration.
|
||||||
|
///
|
||||||
|
/// If `None`, the option will not be set.
|
||||||
|
///
|
||||||
|
/// Default is `None`.
|
||||||
|
#[inline]
|
||||||
|
pub fn set_keepalive(&mut self, dur: Option<Duration>) {
|
||||||
|
self.keep_alive_timeout = dur;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
|
||||||
|
///
|
||||||
|
/// Default is `false`.
|
||||||
|
#[inline]
|
||||||
|
pub fn set_nodelay(&mut self, nodelay: bool) {
|
||||||
|
self.nodelay = nodelay;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/// A connector for the `http` scheme.
|
impl fmt::Debug for HttpConnector {
|
||||||
///
|
#[inline]
|
||||||
/// Performs DNS resolution in a thread pool, and then connects over TCP.
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
#[derive(Clone)]
|
f.debug_struct("HttpConnector")
|
||||||
pub struct HttpConnector {
|
.finish()
|
||||||
executor: HttpConnectExecutor,
|
}
|
||||||
enforce_http: bool,
|
}
|
||||||
handle: Option<Handle>,
|
|
||||||
keep_alive_timeout: Option<Duration>,
|
impl Connect for HttpConnector {
|
||||||
nodelay: bool,
|
type Transport = TcpStream;
|
||||||
}
|
type Error = io::Error;
|
||||||
|
type Future = HttpConnecting;
|
||||||
|
|
||||||
|
fn connect(&self, dst: Destination) -> Self::Future {
|
||||||
|
trace!(
|
||||||
|
"Http::connect; scheme={}, host={}, port={:?}",
|
||||||
|
dst.scheme(),
|
||||||
|
dst.host(),
|
||||||
|
dst.port(),
|
||||||
|
);
|
||||||
|
|
||||||
|
if self.enforce_http {
|
||||||
|
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
|
||||||
|
return invalid_url(InvalidUrl::NotHttp, &self.handle);
|
||||||
|
}
|
||||||
|
} else if dst.uri.scheme_part().is_none() {
|
||||||
|
return invalid_url(InvalidUrl::MissingScheme, &self.handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
let host = match dst.uri.host() {
|
||||||
|
Some(s) => s,
|
||||||
|
None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle),
|
||||||
|
};
|
||||||
|
let port = match dst.uri.port() {
|
||||||
|
Some(port) => port,
|
||||||
|
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
|
||||||
|
};
|
||||||
|
|
||||||
|
HttpConnecting {
|
||||||
|
state: State::Lazy(self.executor.clone(), host.into(), port),
|
||||||
|
handle: self.handle.clone(),
|
||||||
|
keep_alive_timeout: self.keep_alive_timeout,
|
||||||
|
nodelay: self.nodelay,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl HttpConnector {
|
|
||||||
/// Construct a new HttpConnector.
|
|
||||||
///
|
|
||||||
/// Takes number of DNS worker threads.
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn new(threads: usize) -> HttpConnector {
|
fn invalid_url(err: InvalidUrl, handle: &Option<Handle>) -> HttpConnecting {
|
||||||
HttpConnector::new_with_handle_opt(threads, None)
|
HttpConnecting {
|
||||||
}
|
state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))),
|
||||||
|
handle: handle.clone(),
|
||||||
/// Construct a new HttpConnector with a specific Tokio handle.
|
|
||||||
pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector {
|
|
||||||
HttpConnector::new_with_handle_opt(threads, Some(handle))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_with_handle_opt(threads: usize, handle: Option<Handle>) -> HttpConnector {
|
|
||||||
let pool = CpuPoolBuilder::new()
|
|
||||||
.name_prefix("hyper-dns")
|
|
||||||
.pool_size(threads)
|
|
||||||
.create();
|
|
||||||
HttpConnector::new_with_executor(pool, handle)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Construct a new HttpConnector.
|
|
||||||
///
|
|
||||||
/// Takes an executor to run blocking tasks on.
|
|
||||||
pub fn new_with_executor<E: 'static>(executor: E, handle: Option<Handle>) -> HttpConnector
|
|
||||||
where E: Executor<HttpConnectorBlockingTask> + Send + Sync
|
|
||||||
{
|
|
||||||
HttpConnector {
|
|
||||||
executor: HttpConnectExecutor(Arc::new(executor)),
|
|
||||||
enforce_http: true,
|
|
||||||
handle,
|
|
||||||
keep_alive_timeout: None,
|
keep_alive_timeout: None,
|
||||||
nodelay: false,
|
nodelay: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Option to enforce all `Uri`s have the `http` scheme.
|
#[derive(Debug, Clone, Copy)]
|
||||||
///
|
enum InvalidUrl {
|
||||||
/// Enabled by default.
|
MissingScheme,
|
||||||
#[inline]
|
NotHttp,
|
||||||
pub fn enforce_http(&mut self, is_enforced: bool) {
|
MissingAuthority,
|
||||||
self.enforce_http = is_enforced;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration.
|
impl fmt::Display for InvalidUrl {
|
||||||
///
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
/// If `None`, the option will not be set.
|
f.write_str(self.description())
|
||||||
///
|
}
|
||||||
/// Default is `None`.
|
|
||||||
#[inline]
|
|
||||||
pub fn set_keepalive(&mut self, dur: Option<Duration>) {
|
|
||||||
self.keep_alive_timeout = dur;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
|
impl StdError for InvalidUrl {
|
||||||
///
|
fn description(&self) -> &str {
|
||||||
/// Default is `false`.
|
match *self {
|
||||||
#[inline]
|
InvalidUrl::MissingScheme => "invalid URL, missing scheme",
|
||||||
pub fn set_nodelay(&mut self, nodelay: bool) {
|
InvalidUrl::NotHttp => "invalid URL, scheme must be http",
|
||||||
self.nodelay = nodelay;
|
InvalidUrl::MissingAuthority => "invalid URL, missing domain",
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for HttpConnector {
|
|
||||||
#[inline]
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
f.debug_struct("HttpConnector")
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connect for HttpConnector {
|
|
||||||
type Transport = TcpStream;
|
|
||||||
type Error = io::Error;
|
|
||||||
type Future = HttpConnecting;
|
|
||||||
|
|
||||||
fn connect(&self, dst: Destination) -> Self::Future {
|
|
||||||
trace!(
|
|
||||||
"Http::connect; scheme={}, host={}, port={:?}",
|
|
||||||
dst.scheme(),
|
|
||||||
dst.host(),
|
|
||||||
dst.port(),
|
|
||||||
);
|
|
||||||
|
|
||||||
if self.enforce_http {
|
|
||||||
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
|
|
||||||
return invalid_url(InvalidUrl::NotHttp, &self.handle);
|
|
||||||
}
|
}
|
||||||
} else if dst.uri.scheme_part().is_none() {
|
|
||||||
return invalid_url(InvalidUrl::MissingScheme, &self.handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
let host = match dst.uri.host() {
|
|
||||||
Some(s) => s,
|
|
||||||
None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle),
|
|
||||||
};
|
|
||||||
let port = match dst.uri.port() {
|
|
||||||
Some(port) => port,
|
|
||||||
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
|
|
||||||
};
|
|
||||||
|
|
||||||
HttpConnecting {
|
|
||||||
state: State::Lazy(self.executor.clone(), host.into(), port),
|
|
||||||
handle: self.handle.clone(),
|
|
||||||
keep_alive_timeout: self.keep_alive_timeout,
|
|
||||||
nodelay: self.nodelay,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
/// A Future representing work to connect to a URL.
|
||||||
|
#[must_use = "futures do nothing unless polled"]
|
||||||
#[inline]
|
pub struct HttpConnecting {
|
||||||
fn invalid_url(err: InvalidUrl, handle: &Option<Handle>) -> HttpConnecting {
|
state: State,
|
||||||
HttpConnecting {
|
handle: Option<Handle>,
|
||||||
state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))),
|
keep_alive_timeout: Option<Duration>,
|
||||||
handle: handle.clone(),
|
nodelay: bool,
|
||||||
keep_alive_timeout: None,
|
|
||||||
nodelay: false,
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy)]
|
enum State {
|
||||||
enum InvalidUrl {
|
Lazy(HttpConnectExecutor, String, u16),
|
||||||
MissingScheme,
|
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>),
|
||||||
NotHttp,
|
Connecting(ConnectingTcp),
|
||||||
MissingAuthority,
|
Error(Option<io::Error>),
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for InvalidUrl {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
f.write_str(self.description())
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl StdError for InvalidUrl {
|
impl Future for HttpConnecting {
|
||||||
fn description(&self) -> &str {
|
type Item = (TcpStream, Connected);
|
||||||
match *self {
|
type Error = io::Error;
|
||||||
InvalidUrl::MissingScheme => "invalid URL, missing scheme",
|
|
||||||
InvalidUrl::NotHttp => "invalid URL, scheme must be http",
|
|
||||||
InvalidUrl::MissingAuthority => "invalid URL, missing domain",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A Future representing work to connect to a URL.
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
#[must_use = "futures do nothing unless polled"]
|
loop {
|
||||||
pub struct HttpConnecting {
|
let state;
|
||||||
state: State,
|
match self.state {
|
||||||
handle: Option<Handle>,
|
State::Lazy(ref executor, ref mut host, port) => {
|
||||||
keep_alive_timeout: Option<Duration>,
|
// If the host is already an IP addr (v4 or v6),
|
||||||
nodelay: bool,
|
// skip resolving the dns and start connecting right away.
|
||||||
}
|
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
|
||||||
|
|
||||||
enum State {
|
|
||||||
Lazy(HttpConnectExecutor, String, u16),
|
|
||||||
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>),
|
|
||||||
Connecting(ConnectingTcp),
|
|
||||||
Error(Option<io::Error>),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Future for HttpConnecting {
|
|
||||||
type Item = (TcpStream, Connected);
|
|
||||||
type Error = io::Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
loop {
|
|
||||||
let state;
|
|
||||||
match self.state {
|
|
||||||
State::Lazy(ref executor, ref mut host, port) => {
|
|
||||||
// If the host is already an IP addr (v4 or v6),
|
|
||||||
// skip resolving the dns and start connecting right away.
|
|
||||||
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
|
|
||||||
state = State::Connecting(ConnectingTcp {
|
|
||||||
addrs: addrs,
|
|
||||||
current: None
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
let host = mem::replace(host, String::new());
|
|
||||||
let work = dns::Work::new(host, port);
|
|
||||||
state = State::Resolving(oneshot::spawn(work, executor));
|
|
||||||
}
|
|
||||||
},
|
|
||||||
State::Resolving(ref mut future) => {
|
|
||||||
match try!(future.poll()) {
|
|
||||||
Async::NotReady => return Ok(Async::NotReady),
|
|
||||||
Async::Ready(addrs) => {
|
|
||||||
state = State::Connecting(ConnectingTcp {
|
state = State::Connecting(ConnectingTcp {
|
||||||
addrs: addrs,
|
addrs: addrs,
|
||||||
current: None,
|
current: None
|
||||||
})
|
})
|
||||||
|
} else {
|
||||||
|
let host = mem::replace(host, String::new());
|
||||||
|
let work = dns::Work::new(host, port);
|
||||||
|
state = State::Resolving(oneshot::spawn(work, executor));
|
||||||
}
|
}
|
||||||
};
|
},
|
||||||
},
|
State::Resolving(ref mut future) => {
|
||||||
State::Connecting(ref mut c) => {
|
match try!(future.poll()) {
|
||||||
let sock = try_ready!(c.poll(&self.handle));
|
Async::NotReady => return Ok(Async::NotReady),
|
||||||
|
Async::Ready(addrs) => {
|
||||||
|
state = State::Connecting(ConnectingTcp {
|
||||||
|
addrs: addrs,
|
||||||
|
current: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
};
|
||||||
|
},
|
||||||
|
State::Connecting(ref mut c) => {
|
||||||
|
let sock = try_ready!(c.poll(&self.handle));
|
||||||
|
|
||||||
if let Some(dur) = self.keep_alive_timeout {
|
if let Some(dur) = self.keep_alive_timeout {
|
||||||
sock.set_keepalive(Some(dur))?;
|
sock.set_keepalive(Some(dur))?;
|
||||||
}
|
|
||||||
|
|
||||||
sock.set_nodelay(self.nodelay)?;
|
|
||||||
|
|
||||||
return Ok(Async::Ready((sock, Connected::new())));
|
|
||||||
},
|
|
||||||
State::Error(ref mut e) => return Err(e.take().expect("polled more than once")),
|
|
||||||
}
|
|
||||||
self.state = state;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for HttpConnecting {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
f.pad("HttpConnecting")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ConnectingTcp {
|
|
||||||
addrs: dns::IpAddrs,
|
|
||||||
current: Option<ConnectFuture>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConnectingTcp {
|
|
||||||
// not a Future, since passing a &Handle to poll
|
|
||||||
fn poll(&mut self, handle: &Option<Handle>) -> Poll<TcpStream, io::Error> {
|
|
||||||
let mut err = None;
|
|
||||||
loop {
|
|
||||||
if let Some(ref mut current) = self.current {
|
|
||||||
match current.poll() {
|
|
||||||
Ok(ok) => return Ok(ok),
|
|
||||||
Err(e) => {
|
|
||||||
trace!("connect error {:?}", e);
|
|
||||||
err = Some(e);
|
|
||||||
if let Some(addr) = self.addrs.next() {
|
|
||||||
debug!("connecting to {}", addr);
|
|
||||||
*current = connect(&addr, handle)?;
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
sock.set_nodelay(self.nodelay)?;
|
||||||
|
|
||||||
|
return Ok(Async::Ready((sock, Connected::new())));
|
||||||
|
},
|
||||||
|
State::Error(ref mut e) => return Err(e.take().expect("polled more than once")),
|
||||||
}
|
}
|
||||||
} else if let Some(addr) = self.addrs.next() {
|
self.state = state;
|
||||||
debug!("connecting to {}", addr);
|
|
||||||
self.current = Some(connect(&addr, handle)?);
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Err(err.take().expect("missing connect error"));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Make this Future unnameable outside of this crate.
|
impl fmt::Debug for HttpConnecting {
|
||||||
mod http_connector {
|
|
||||||
use super::*;
|
|
||||||
// Blocking task to be executed on a thread pool.
|
|
||||||
pub struct HttpConnectorBlockingTask {
|
|
||||||
pub(super) work: oneshot::Execute<dns::Work>
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for HttpConnectorBlockingTask {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
f.pad("HttpConnectorBlockingTask")
|
f.pad("HttpConnecting")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for HttpConnectorBlockingTask {
|
struct ConnectingTcp {
|
||||||
type Item = ();
|
addrs: dns::IpAddrs,
|
||||||
type Error = ();
|
current: Option<ConnectFuture>,
|
||||||
|
}
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<(), ()> {
|
impl ConnectingTcp {
|
||||||
self.work.poll()
|
// not a Future, since passing a &Handle to poll
|
||||||
|
fn poll(&mut self, handle: &Option<Handle>) -> Poll<TcpStream, io::Error> {
|
||||||
|
let mut err = None;
|
||||||
|
loop {
|
||||||
|
if let Some(ref mut current) = self.current {
|
||||||
|
match current.poll() {
|
||||||
|
Ok(ok) => return Ok(ok),
|
||||||
|
Err(e) => {
|
||||||
|
trace!("connect error {:?}", e);
|
||||||
|
err = Some(e);
|
||||||
|
if let Some(addr) = self.addrs.next() {
|
||||||
|
debug!("connecting to {}", addr);
|
||||||
|
*current = connect(&addr, handle)?;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if let Some(addr) = self.addrs.next() {
|
||||||
|
debug!("connecting to {}", addr);
|
||||||
|
self.current = Some(connect(&addr, handle)?);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Err(err.take().expect("missing connect error"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make this Future unnameable outside of this crate.
|
||||||
|
mod http_connector {
|
||||||
|
use super::*;
|
||||||
|
// Blocking task to be executed on a thread pool.
|
||||||
|
pub struct HttpConnectorBlockingTask {
|
||||||
|
pub(super) work: oneshot::Execute<dns::Work>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for HttpConnectorBlockingTask {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
f.pad("HttpConnectorBlockingTask")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for HttpConnectorBlockingTask {
|
||||||
|
type Item = ();
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<(), ()> {
|
||||||
|
self.work.poll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct HttpConnectExecutor(Arc<Executor<HttpConnectorBlockingTask> + Send + Sync>);
|
||||||
|
|
||||||
|
impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
|
||||||
|
fn execute(&self, future: oneshot::Execute<dns::Work>) -> Result<(), ExecuteError<oneshot::Execute<dns::Work>>> {
|
||||||
|
self.0.execute(HttpConnectorBlockingTask { work: future })
|
||||||
|
.map_err(|err| ExecuteError::new(err.kind(), err.into_future().work))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
#![allow(deprecated)]
|
||||||
|
use std::io;
|
||||||
|
use futures::Future;
|
||||||
|
use super::{Connect, Destination, HttpConnector};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_errors_missing_authority() {
|
||||||
|
let uri = "/foo/bar?baz".parse().unwrap();
|
||||||
|
let dst = Destination {
|
||||||
|
uri,
|
||||||
|
};
|
||||||
|
let connector = HttpConnector::new(1);
|
||||||
|
|
||||||
|
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_errors_enforce_http() {
|
||||||
|
let uri = "https://example.domain/foo/bar?baz".parse().unwrap();
|
||||||
|
let dst = Destination {
|
||||||
|
uri,
|
||||||
|
};
|
||||||
|
let connector = HttpConnector::new(1);
|
||||||
|
|
||||||
|
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_errors_missing_scheme() {
|
||||||
|
let uri = "example.domain".parse().unwrap();
|
||||||
|
let dst = Destination {
|
||||||
|
uri,
|
||||||
|
};
|
||||||
|
let connector = HttpConnector::new(1);
|
||||||
|
|
||||||
|
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct HttpConnectExecutor(Arc<Executor<HttpConnectorBlockingTask> + Send + Sync>);
|
|
||||||
|
|
||||||
impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
|
|
||||||
fn execute(&self, future: oneshot::Execute<dns::Work>) -> Result<(), ExecuteError<oneshot::Execute<dns::Work>>> {
|
|
||||||
self.0.execute(HttpConnectorBlockingTask { work: future })
|
|
||||||
.map_err(|err| ExecuteError::new(err.kind(), err.into_future().work))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
#![allow(deprecated)]
|
|
||||||
use std::io;
|
|
||||||
use futures::Future;
|
|
||||||
use super::{Connect, Destination, HttpConnector};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_errors_missing_authority() {
|
|
||||||
let uri = "/foo/bar?baz".parse().unwrap();
|
|
||||||
let dst = Destination {
|
|
||||||
uri,
|
|
||||||
};
|
|
||||||
let connector = HttpConnector::new(1);
|
|
||||||
|
|
||||||
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_errors_enforce_http() {
|
|
||||||
let uri = "https://example.domain/foo/bar?baz".parse().unwrap();
|
|
||||||
let dst = Destination {
|
|
||||||
uri,
|
|
||||||
};
|
|
||||||
let connector = HttpConnector::new(1);
|
|
||||||
|
|
||||||
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_errors_missing_scheme() {
|
|
||||||
let uri = "example.domain".parse().unwrap();
|
|
||||||
let dst = Destination {
|
|
||||||
uri,
|
|
||||||
};
|
|
||||||
let connector = HttpConnector::new(1);
|
|
||||||
|
|
||||||
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -16,14 +16,15 @@ use body::{Body, Payload};
|
|||||||
use common::Exec;
|
use common::Exec;
|
||||||
use self::pool::{Pool, Poolable, Reservation};
|
use self::pool::{Pool, Poolable, Reservation};
|
||||||
|
|
||||||
pub use self::connect::{Connect, HttpConnector};
|
pub use self::connect::Connect;
|
||||||
|
#[cfg(feature = "runtime")] pub use self::connect::HttpConnector;
|
||||||
|
|
||||||
use self::connect::Destination;
|
use self::connect::Destination;
|
||||||
|
|
||||||
pub mod conn;
|
pub mod conn;
|
||||||
pub mod connect;
|
pub mod connect;
|
||||||
pub(crate) mod dispatch;
|
pub(crate) mod dispatch;
|
||||||
mod dns;
|
#[cfg(feature = "runtime")] mod dns;
|
||||||
mod pool;
|
mod pool;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
@@ -39,6 +40,7 @@ pub struct Client<C, B = Body> {
|
|||||||
ver: Ver,
|
ver: Ver,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl Client<HttpConnector, Body> {
|
impl Client<HttpConnector, Body> {
|
||||||
/// Create a new Client with the default config.
|
/// Create a new Client with the default config.
|
||||||
#[inline]
|
#[inline]
|
||||||
@@ -47,18 +49,22 @@ impl Client<HttpConnector, Body> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl Default for Client<HttpConnector, Body> {
|
impl Default for Client<HttpConnector, Body> {
|
||||||
fn default() -> Client<HttpConnector, Body> {
|
fn default() -> Client<HttpConnector, Body> {
|
||||||
Client::new()
|
Client::new()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client<HttpConnector, Body> {
|
impl Client<(), Body> {
|
||||||
/// Configure a Client.
|
/// Configure a Client.
|
||||||
///
|
///
|
||||||
/// # Example
|
/// # Example
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
|
/// # extern crate hyper;
|
||||||
|
/// # #[cfg(feature = "runtime")]
|
||||||
|
/// fn run () {
|
||||||
/// use hyper::Client;
|
/// use hyper::Client;
|
||||||
///
|
///
|
||||||
/// let client = Client::builder()
|
/// let client = Client::builder()
|
||||||
@@ -66,6 +72,8 @@ impl Client<HttpConnector, Body> {
|
|||||||
/// .build_http();
|
/// .build_http();
|
||||||
/// # let infer: Client<_, hyper::Body> = client;
|
/// # let infer: Client<_, hyper::Body> = client;
|
||||||
/// # drop(infer);
|
/// # drop(infer);
|
||||||
|
/// # }
|
||||||
|
/// # fn main() {}
|
||||||
/// ```
|
/// ```
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn builder() -> Builder {
|
pub fn builder() -> Builder {
|
||||||
@@ -603,6 +611,7 @@ impl Builder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Builder a client with this configuration and the default `HttpConnector`.
|
/// Builder a client with this configuration and the default `HttpConnector`.
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
pub fn build_http<B>(&self) -> Client<HttpConnector, B>
|
pub fn build_http<B>(&self) -> Client<HttpConnector, B>
|
||||||
where
|
where
|
||||||
B: Payload + Send,
|
B: Payload + Send,
|
||||||
|
|||||||
@@ -652,7 +652,7 @@ mod tests {
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use futures::{Async, Future};
|
use futures::{Async, Future};
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use super::{Connecting, Key, Poolable, Pool, Reservation, Exec, Ver};
|
use super::{Connecting, Key, Poolable, Pool, Reservation, Ver};
|
||||||
|
|
||||||
/// Test unique reservations.
|
/// Test unique reservations.
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
@@ -738,9 +738,11 @@ mod tests {
|
|||||||
}).wait().unwrap();
|
}).wait().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
#[test]
|
#[test]
|
||||||
fn test_pool_timer_removes_expired() {
|
fn test_pool_timer_removes_expired() {
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use common::Exec;
|
||||||
let runtime = ::tokio::runtime::Runtime::new().unwrap();
|
let runtime = ::tokio::runtime::Runtime::new().unwrap();
|
||||||
let pool = Pool::new(true, Some(Duration::from_millis(100)));
|
let pool = Pool::new(true, Some(Duration::from_millis(100)));
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "runtime")]
|
||||||
extern crate pretty_env_logger;
|
extern crate pretty_env_logger;
|
||||||
|
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ use std::fmt;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::future::{Executor, Future};
|
use futures::future::{Executor, Future};
|
||||||
use tokio_executor::spawn;
|
|
||||||
|
|
||||||
/// Either the user provides an executor for background tasks, or we use
|
/// Either the user provides an executor for background tasks, or we use
|
||||||
/// `tokio::spawn`.
|
/// `tokio::spawn`.
|
||||||
@@ -19,7 +18,17 @@ impl Exec {
|
|||||||
F: Future<Item=(), Error=()> + Send + 'static,
|
F: Future<Item=(), Error=()> + Send + 'static,
|
||||||
{
|
{
|
||||||
match *self {
|
match *self {
|
||||||
Exec::Default => spawn(fut),
|
Exec::Default => {
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
|
{
|
||||||
|
::tokio_executor::spawn(fut)
|
||||||
|
}
|
||||||
|
#[cfg(not(feature = "runtime"))]
|
||||||
|
{
|
||||||
|
// If no runtime, we need an executor!
|
||||||
|
panic!("executor must be set")
|
||||||
|
}
|
||||||
|
},
|
||||||
Exec::Executor(ref e) => {
|
Exec::Executor(ref e) => {
|
||||||
let _ = e.execute(Box::new(fut))
|
let _ = e.execute(Box::new(fut))
|
||||||
.map_err(|err| {
|
.map_err(|err| {
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ pub(crate) enum Kind {
|
|||||||
/// Error occurred while connecting.
|
/// Error occurred while connecting.
|
||||||
Connect,
|
Connect,
|
||||||
/// Error creating a TcpListener.
|
/// Error creating a TcpListener.
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
Listen,
|
Listen,
|
||||||
/// Error accepting on an Incoming stream.
|
/// Error accepting on an Incoming stream.
|
||||||
Accept,
|
Accept,
|
||||||
@@ -171,6 +172,7 @@ impl Error {
|
|||||||
Error::new(Kind::Io, Some(cause.into()))
|
Error::new(Kind::Io, Some(cause.into()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
pub(crate) fn new_listen<E: Into<Cause>>(cause: E) -> Error {
|
pub(crate) fn new_listen<E: Into<Cause>>(cause: E) -> Error {
|
||||||
Error::new(Kind::Listen, Some(cause.into()))
|
Error::new(Kind::Listen, Some(cause.into()))
|
||||||
}
|
}
|
||||||
@@ -258,6 +260,7 @@ impl StdError for Error {
|
|||||||
Kind::Closed => "connection closed",
|
Kind::Closed => "connection closed",
|
||||||
Kind::Connect => "an error occurred trying to connect",
|
Kind::Connect => "an error occurred trying to connect",
|
||||||
Kind::Canceled => "an operation was canceled internally before starting",
|
Kind::Canceled => "an operation was canceled internally before starting",
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
Kind::Listen => "error creating server listener",
|
Kind::Listen => "error creating server listener",
|
||||||
Kind::Accept => "error accepting connection",
|
Kind::Accept => "error accepting connection",
|
||||||
Kind::NewService => "calling user's new_service failed",
|
Kind::NewService => "calling user's new_service failed",
|
||||||
|
|||||||
11
src/lib.rs
11
src/lib.rs
@@ -18,18 +18,20 @@
|
|||||||
|
|
||||||
extern crate bytes;
|
extern crate bytes;
|
||||||
#[macro_use] extern crate futures;
|
#[macro_use] extern crate futures;
|
||||||
extern crate futures_cpupool;
|
#[cfg(feature = "runtime")] extern crate futures_cpupool;
|
||||||
extern crate futures_timer;
|
extern crate futures_timer;
|
||||||
extern crate h2;
|
extern crate h2;
|
||||||
extern crate http;
|
extern crate http;
|
||||||
extern crate httparse;
|
extern crate httparse;
|
||||||
extern crate iovec;
|
extern crate iovec;
|
||||||
#[macro_use] extern crate log;
|
#[macro_use] extern crate log;
|
||||||
extern crate net2;
|
#[cfg(feature = "runtime")] extern crate net2;
|
||||||
extern crate time;
|
extern crate time;
|
||||||
extern crate tokio;
|
#[cfg(feature = "runtime")] extern crate tokio;
|
||||||
extern crate tokio_executor;
|
#[cfg(feature = "runtime")] extern crate tokio_executor;
|
||||||
#[macro_use] extern crate tokio_io;
|
#[macro_use] extern crate tokio_io;
|
||||||
|
#[cfg(feature = "runtime")] extern crate tokio_reactor;
|
||||||
|
#[cfg(feature = "runtime")] extern crate tokio_tcp;
|
||||||
extern crate want;
|
extern crate want;
|
||||||
|
|
||||||
#[cfg(all(test, feature = "nightly"))]
|
#[cfg(all(test, feature = "nightly"))]
|
||||||
@@ -62,3 +64,4 @@ mod headers;
|
|||||||
mod proto;
|
mod proto;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
pub mod service;
|
pub mod service;
|
||||||
|
#[cfg(feature = "runtime")] pub mod rt;
|
||||||
|
|||||||
18
src/mock.rs
18
src/mock.rs
@@ -1,6 +1,8 @@
|
|||||||
|
#[cfg(feature = "runtime")]
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use bytes::Buf;
|
use bytes::Buf;
|
||||||
@@ -8,6 +10,7 @@ use futures::{Async, Poll};
|
|||||||
use futures::task::{self, Task};
|
use futures::task::{self, Task};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
use ::client::connect::{Connect, Connected, Destination};
|
use ::client::connect::{Connect, Connected, Destination};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -112,6 +115,7 @@ impl<T> AsyncIo<T> {
|
|||||||
self.max_read_vecs = cnt;
|
self.max_read_vecs = cnt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
pub fn park_tasks(&mut self, enabled: bool) {
|
pub fn park_tasks(&mut self, enabled: bool) {
|
||||||
self.park_tasks = enabled;
|
self.park_tasks = enabled;
|
||||||
}
|
}
|
||||||
@@ -151,6 +155,7 @@ impl AsyncIo<MockCursor> {
|
|||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
fn close(&mut self) {
|
fn close(&mut self) {
|
||||||
self.block_in(1);
|
self.block_in(1);
|
||||||
assert_eq!(self.inner.vec.len(), self.inner.pos);
|
assert_eq!(self.inner.vec.len(), self.inner.pos);
|
||||||
@@ -282,22 +287,26 @@ impl ::std::ops::Deref for AsyncIo<MockCursor> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
pub struct Duplex {
|
pub struct Duplex {
|
||||||
inner: Arc<Mutex<DuplexInner>>,
|
inner: Arc<Mutex<DuplexInner>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
struct DuplexInner {
|
struct DuplexInner {
|
||||||
handle_read_task: Option<Task>,
|
handle_read_task: Option<Task>,
|
||||||
read: AsyncIo<MockCursor>,
|
read: AsyncIo<MockCursor>,
|
||||||
write: AsyncIo<MockCursor>,
|
write: AsyncIo<MockCursor>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl Read for Duplex {
|
impl Read for Duplex {
|
||||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
self.inner.lock().unwrap().read.read(buf)
|
self.inner.lock().unwrap().read.read(buf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl Write for Duplex {
|
impl Write for Duplex {
|
||||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
@@ -313,10 +322,11 @@ impl Write for Duplex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl AsyncRead for Duplex {
|
impl AsyncRead for Duplex {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl AsyncWrite for Duplex {
|
impl AsyncWrite for Duplex {
|
||||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||||
Ok(().into())
|
Ok(().into())
|
||||||
@@ -331,10 +341,12 @@ impl AsyncWrite for Duplex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
pub struct DuplexHandle {
|
pub struct DuplexHandle {
|
||||||
inner: Arc<Mutex<DuplexInner>>,
|
inner: Arc<Mutex<DuplexInner>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl DuplexHandle {
|
impl DuplexHandle {
|
||||||
pub fn read(&self, buf: &mut [u8]) -> Poll<usize, io::Error> {
|
pub fn read(&self, buf: &mut [u8]) -> Poll<usize, io::Error> {
|
||||||
let mut inner = self.inner.lock().unwrap();
|
let mut inner = self.inner.lock().unwrap();
|
||||||
@@ -362,6 +374,7 @@ impl DuplexHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl Drop for DuplexHandle {
|
impl Drop for DuplexHandle {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
trace!("mock duplex handle drop");
|
trace!("mock duplex handle drop");
|
||||||
@@ -371,10 +384,12 @@ impl Drop for DuplexHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
pub struct MockConnector {
|
pub struct MockConnector {
|
||||||
mocks: Mutex<HashMap<String, Vec<Duplex>>>,
|
mocks: Mutex<HashMap<String, Vec<Duplex>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl MockConnector {
|
impl MockConnector {
|
||||||
pub fn new() -> MockConnector {
|
pub fn new() -> MockConnector {
|
||||||
MockConnector {
|
MockConnector {
|
||||||
@@ -410,6 +425,7 @@ impl MockConnector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl Connect for MockConnector {
|
impl Connect for MockConnector {
|
||||||
type Transport = Duplex;
|
type Transport = Duplex;
|
||||||
type Error = io::Error;
|
type Error = io::Error;
|
||||||
|
|||||||
11
src/rt.rs
Normal file
11
src/rt.rs
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
//! Default runtime
|
||||||
|
//!
|
||||||
|
//! By default, hyper includes the [tokio](https://tokio.rs) runtime. To ease
|
||||||
|
//! using it, several types are re-exported here.
|
||||||
|
//!
|
||||||
|
//! The inclusion of a default runtime can be disabled by turning off hyper's
|
||||||
|
//! `runtime` Cargo feature.
|
||||||
|
|
||||||
|
pub use futures::{Future, Stream};
|
||||||
|
pub use futures::future::{lazy, poll_fn};
|
||||||
|
pub use tokio::{run, spawn};
|
||||||
@@ -9,23 +9,22 @@
|
|||||||
//! higher-level [Server](super) API.
|
//! higher-level [Server](super) API.
|
||||||
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::net::SocketAddr;
|
#[cfg(feature = "runtime")] use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
#[cfg(feature = "runtime")] use std::time::Duration;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::{Async, Future, Poll, Stream};
|
use futures::{Async, Future, Poll, Stream};
|
||||||
use futures::future::{Either, Executor};
|
use futures::future::{Either, Executor};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
//TODO: change these tokio:: to sub-crates
|
#[cfg(feature = "runtime")] use tokio_reactor::Handle;
|
||||||
use tokio::reactor::Handle;
|
|
||||||
|
|
||||||
use common::Exec;
|
use common::Exec;
|
||||||
use proto;
|
use proto;
|
||||||
use body::{Body, Payload};
|
use body::{Body, Payload};
|
||||||
use service::{NewService, Service};
|
use service::{NewService, Service};
|
||||||
|
|
||||||
pub use super::tcp::AddrIncoming;
|
#[cfg(feature = "runtime")] pub use super::tcp::AddrIncoming;
|
||||||
|
|
||||||
/// A lower-level configuration of the HTTP protocol.
|
/// A lower-level configuration of the HTTP protocol.
|
||||||
///
|
///
|
||||||
@@ -190,22 +189,23 @@ impl Http {
|
|||||||
/// # Example
|
/// # Example
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
/// # extern crate futures;
|
|
||||||
/// # extern crate hyper;
|
/// # extern crate hyper;
|
||||||
/// # extern crate tokio;
|
|
||||||
/// # extern crate tokio_io;
|
/// # extern crate tokio_io;
|
||||||
/// # use futures::Future;
|
/// # #[cfg(feature = "runtime")]
|
||||||
|
/// # extern crate tokio;
|
||||||
/// # use hyper::{Body, Request, Response};
|
/// # use hyper::{Body, Request, Response};
|
||||||
/// # use hyper::service::Service;
|
/// # use hyper::service::Service;
|
||||||
/// # use hyper::server::conn::Http;
|
/// # use hyper::server::conn::Http;
|
||||||
/// # use tokio_io::{AsyncRead, AsyncWrite};
|
/// # use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
/// # use tokio::reactor::Handle;
|
/// # #[cfg(feature = "runtime")]
|
||||||
/// # fn run<I, S>(some_io: I, some_service: S)
|
/// # fn run<I, S>(some_io: I, some_service: S)
|
||||||
/// # where
|
/// # where
|
||||||
/// # I: AsyncRead + AsyncWrite + Send + 'static,
|
/// # I: AsyncRead + AsyncWrite + Send + 'static,
|
||||||
/// # S: Service<ReqBody=Body, ResBody=Body> + Send + 'static,
|
/// # S: Service<ReqBody=Body, ResBody=Body> + Send + 'static,
|
||||||
/// # S::Future: Send
|
/// # S::Future: Send
|
||||||
/// # {
|
/// # {
|
||||||
|
/// # use hyper::rt::Future;
|
||||||
|
/// # use tokio::reactor::Handle;
|
||||||
/// let http = Http::new();
|
/// let http = Http::new();
|
||||||
/// let conn = http.serve_connection(some_io, some_service);
|
/// let conn = http.serve_connection(some_io, some_service);
|
||||||
///
|
///
|
||||||
@@ -213,7 +213,7 @@ impl Http {
|
|||||||
/// eprintln!("server connection error: {}", e);
|
/// eprintln!("server connection error: {}", e);
|
||||||
/// });
|
/// });
|
||||||
///
|
///
|
||||||
/// tokio::spawn(fut);
|
/// hyper::rt::spawn(fut);
|
||||||
/// # }
|
/// # }
|
||||||
/// # fn main() {}
|
/// # fn main() {}
|
||||||
/// ```
|
/// ```
|
||||||
@@ -252,6 +252,7 @@ impl Http {
|
|||||||
/// to accept connections. Each connection will be processed with the
|
/// to accept connections. Each connection will be processed with the
|
||||||
/// `new_service` object provided, creating a new service per
|
/// `new_service` object provided, creating a new service per
|
||||||
/// connection.
|
/// connection.
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
|
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
|
||||||
where
|
where
|
||||||
S: NewService<ReqBody=Body, ResBody=Bd>,
|
S: NewService<ReqBody=Body, ResBody=Bd>,
|
||||||
@@ -271,6 +272,7 @@ impl Http {
|
|||||||
/// to accept connections. Each connection will be processed with the
|
/// to accept connections. Each connection will be processed with the
|
||||||
/// `new_service` object provided, creating a new service per
|
/// `new_service` object provided, creating a new service per
|
||||||
/// connection.
|
/// connection.
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
|
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
|
||||||
where
|
where
|
||||||
S: NewService<ReqBody=Body, ResBody=Bd>,
|
S: NewService<ReqBody=Body, ResBody=Bd>,
|
||||||
@@ -465,6 +467,7 @@ where
|
|||||||
|
|
||||||
// ===== impl SpawnAll =====
|
// ===== impl SpawnAll =====
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl<S> SpawnAll<AddrIncoming, S> {
|
impl<S> SpawnAll<AddrIncoming, S> {
|
||||||
pub(super) fn local_addr(&self) -> SocketAddr {
|
pub(super) fn local_addr(&self) -> SocketAddr {
|
||||||
self.serve.incoming.local_addr()
|
self.serve.incoming.local_addr()
|
||||||
|
|||||||
@@ -17,15 +17,14 @@
|
|||||||
//! ## Example
|
//! ## Example
|
||||||
//!
|
//!
|
||||||
//! ```no_run
|
//! ```no_run
|
||||||
//! extern crate futures;
|
|
||||||
//! extern crate hyper;
|
//! extern crate hyper;
|
||||||
//! extern crate tokio;
|
|
||||||
//!
|
//!
|
||||||
//! use futures::Future;
|
|
||||||
//! use hyper::{Body, Response, Server};
|
//! use hyper::{Body, Response, Server};
|
||||||
//! use hyper::service::service_fn_ok;
|
//! use hyper::service::service_fn_ok;
|
||||||
//!
|
//!
|
||||||
|
//! # #[cfg(feature = "runtime")]
|
||||||
//! fn main() {
|
//! fn main() {
|
||||||
|
//! # use hyper::rt::Future;
|
||||||
//! // Construct our SocketAddr to listen on...
|
//! // Construct our SocketAddr to listen on...
|
||||||
//! let addr = ([127, 0, 0, 1], 3000).into();
|
//! let addr = ([127, 0, 0, 1], 3000).into();
|
||||||
//!
|
//!
|
||||||
@@ -41,18 +40,20 @@
|
|||||||
//! .serve(new_service);
|
//! .serve(new_service);
|
||||||
//!
|
//!
|
||||||
//! // Finally, spawn `server` onto an Executor...
|
//! // Finally, spawn `server` onto an Executor...
|
||||||
//! tokio::run(server.map_err(|e| {
|
//! hyper::rt::run(server.map_err(|e| {
|
||||||
//! eprintln!("server error: {}", e);
|
//! eprintln!("server error: {}", e);
|
||||||
//! }));
|
//! }));
|
||||||
//! }
|
//! }
|
||||||
|
//! # #[cfg(not(feature = "runtime"))]
|
||||||
|
//! # fn main() {}
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
pub mod conn;
|
pub mod conn;
|
||||||
mod tcp;
|
#[cfg(feature = "runtime")] mod tcp;
|
||||||
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::net::SocketAddr;
|
#[cfg(feature = "runtime")] use std::net::SocketAddr;
|
||||||
use std::time::Duration;
|
#[cfg(feature = "runtime")] use std::time::Duration;
|
||||||
|
|
||||||
use futures::{Future, Stream, Poll};
|
use futures::{Future, Stream, Poll};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
@@ -62,8 +63,7 @@ use service::{NewService, Service};
|
|||||||
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
|
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
|
||||||
// error that `hyper::server::Http` is private...
|
// error that `hyper::server::Http` is private...
|
||||||
use self::conn::{Http as Http_, SpawnAll};
|
use self::conn::{Http as Http_, SpawnAll};
|
||||||
//use self::hyper_service::HyperService;
|
#[cfg(feature = "runtime")] use self::tcp::{AddrIncoming};
|
||||||
use self::tcp::{AddrIncoming};
|
|
||||||
|
|
||||||
/// A listening HTTP server.
|
/// A listening HTTP server.
|
||||||
///
|
///
|
||||||
@@ -94,6 +94,7 @@ impl<I> Server<I, ()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl Server<AddrIncoming, ()> {
|
impl Server<AddrIncoming, ()> {
|
||||||
/// Binds to the provided address, and returns a [`Builder`](Builder).
|
/// Binds to the provided address, and returns a [`Builder`](Builder).
|
||||||
///
|
///
|
||||||
@@ -116,6 +117,7 @@ impl Server<AddrIncoming, ()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl<S> Server<AddrIncoming, S> {
|
impl<S> Server<AddrIncoming, S> {
|
||||||
/// Returns the local address that this server is bound to.
|
/// Returns the local address that this server is bound to.
|
||||||
pub fn local_addr(&self) -> SocketAddr {
|
pub fn local_addr(&self) -> SocketAddr {
|
||||||
@@ -176,7 +178,11 @@ impl<I> Builder<I> {
|
|||||||
///
|
///
|
||||||
/// # Example
|
/// # Example
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```
|
||||||
|
/// # extern crate hyper;
|
||||||
|
/// # fn main() {}
|
||||||
|
/// # #[cfg(feature = "runtime")]
|
||||||
|
/// # fn run() {
|
||||||
/// use hyper::{Body, Response, Server};
|
/// use hyper::{Body, Response, Server};
|
||||||
/// use hyper::service::service_fn_ok;
|
/// use hyper::service::service_fn_ok;
|
||||||
///
|
///
|
||||||
@@ -195,6 +201,7 @@ impl<I> Builder<I> {
|
|||||||
/// .serve(new_service);
|
/// .serve(new_service);
|
||||||
///
|
///
|
||||||
/// // Finally, spawn `server` onto an Executor...
|
/// // Finally, spawn `server` onto an Executor...
|
||||||
|
/// # }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn serve<S, B>(self, new_service: S) -> Server<I, S>
|
pub fn serve<S, B>(self, new_service: S) -> Server<I, S>
|
||||||
where
|
where
|
||||||
@@ -215,6 +222,7 @@ impl<I> Builder<I> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "runtime")]
|
||||||
impl Builder<AddrIncoming> {
|
impl Builder<AddrIncoming> {
|
||||||
/// Set whether TCP keepalive messages are enabled on accepted connections.
|
/// Set whether TCP keepalive messages are enabled on accepted connections.
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -5,9 +5,8 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use futures::{Async, Future, Poll, Stream};
|
use futures::{Async, Future, Poll, Stream};
|
||||||
use futures_timer::Delay;
|
use futures_timer::Delay;
|
||||||
//TODO: change to tokio_tcp::net::TcpListener
|
use tokio_tcp::TcpListener;
|
||||||
use tokio::net::TcpListener;
|
use tokio_reactor::Handle;
|
||||||
use tokio::reactor::Handle;
|
|
||||||
|
|
||||||
use self::addr_stream::AddrStream;
|
use self::addr_stream::AddrStream;
|
||||||
|
|
||||||
@@ -170,7 +169,7 @@ mod addr_stream {
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use bytes::{Buf, BufMut};
|
use bytes::{Buf, BufMut};
|
||||||
use futures::Poll;
|
use futures::Poll;
|
||||||
use tokio::net::TcpStream;
|
use tokio_tcp::TcpStream;
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user