feat(client): change Connect trait into an alias for Service

The `Connect` trait is now essentially an alias for
`Service<Destination>`, with a blanket implementation as such, and is
sealed.

Closes #1902

BREAKING CHANGE: Any manual implementations of `Connect` must instead
  implement `tower::Service<Destination>`.
This commit is contained in:
Sean McArthur
2019-10-21 17:26:21 -07:00
parent 4f2743991c
commit d67e49f149
6 changed files with 256 additions and 113 deletions

View File

@@ -3,6 +3,7 @@ use std::error::Error as StdError;
use std::io;
use std::mem;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use http::uri::{Scheme, Uri};
@@ -13,7 +14,7 @@ use tokio_net::tcp::TcpStream;
use tokio_timer::{Delay, Timeout};
use crate::common::{Future, Pin, Poll, task};
use super::{Connect, Connected, Destination};
use super::{Connected, Destination};
use super::dns::{self, GaiResolver, Resolve};
#[cfg(feature = "runtime")] use super::dns::TokioThreadpoolGaiResolver;
@@ -30,17 +31,8 @@ type ConnectFuture = Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send>>
/// transport information such as the remote socket address used.
#[derive(Clone)]
pub struct HttpConnector<R = GaiResolver> {
enforce_http: bool,
handle: Option<Handle>,
connect_timeout: Option<Duration>,
happy_eyeballs_timeout: Option<Duration>,
keep_alive_timeout: Option<Duration>,
local_address: Option<IpAddr>,
nodelay: bool,
config: Arc<Config>,
resolver: R,
reuse_address: bool,
send_buffer_size: Option<usize>,
recv_buffer_size: Option<usize>,
}
/// Extra information about the transport when an HttpConnector is used.
@@ -76,6 +68,22 @@ pub struct HttpInfo {
remote_addr: SocketAddr,
}
#[derive(Clone)]
struct Config {
connect_timeout: Option<Duration>,
enforce_http: bool,
handle: Option<Handle>,
happy_eyeballs_timeout: Option<Duration>,
keep_alive_timeout: Option<Duration>,
local_address: Option<IpAddr>,
nodelay: bool,
reuse_address: bool,
send_buffer_size: Option<usize>,
recv_buffer_size: Option<usize>,
}
// ===== impl HttpConnector =====
impl HttpConnector {
/// Construct a new HttpConnector.
pub fn new() -> HttpConnector {
@@ -100,17 +108,19 @@ impl<R> HttpConnector<R> {
/// Takes a `Resolve` to handle DNS lookups.
pub fn new_with_resolver(resolver: R) -> HttpConnector<R> {
HttpConnector {
enforce_http: true,
handle: None,
connect_timeout: None,
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
keep_alive_timeout: None,
local_address: None,
nodelay: false,
config: Arc::new(Config {
connect_timeout: None,
enforce_http: true,
handle: None,
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
keep_alive_timeout: None,
local_address: None,
nodelay: false,
reuse_address: false,
send_buffer_size: None,
recv_buffer_size: None,
}),
resolver,
reuse_address: false,
send_buffer_size: None,
recv_buffer_size: None,
}
}
@@ -119,7 +129,7 @@ impl<R> HttpConnector<R> {
/// Enabled by default.
#[inline]
pub fn enforce_http(&mut self, is_enforced: bool) {
self.enforce_http = is_enforced;
self.config_mut().enforce_http = is_enforced;
}
/// Set a handle to a `Reactor` to register connections to.
@@ -127,7 +137,7 @@ impl<R> HttpConnector<R> {
/// If `None`, the implicit default reactor will be used.
#[inline]
pub fn set_reactor(&mut self, handle: Option<Handle>) {
self.handle = handle;
self.config_mut().handle = handle;
}
/// Set that all sockets have `SO_KEEPALIVE` set with the supplied duration.
@@ -137,7 +147,7 @@ impl<R> HttpConnector<R> {
/// Default is `None`.
#[inline]
pub fn set_keepalive(&mut self, dur: Option<Duration>) {
self.keep_alive_timeout = dur;
self.config_mut().keep_alive_timeout = dur;
}
/// Set that all sockets have `SO_NODELAY` set to the supplied value `nodelay`.
@@ -145,19 +155,19 @@ impl<R> HttpConnector<R> {
/// Default is `false`.
#[inline]
pub fn set_nodelay(&mut self, nodelay: bool) {
self.nodelay = nodelay;
self.config_mut().nodelay = nodelay;
}
/// Sets the value of the SO_SNDBUF option on the socket.
#[inline]
pub fn set_send_buffer_size(&mut self, size: Option<usize>) {
self.send_buffer_size = size;
self.config_mut().send_buffer_size = size;
}
/// Sets the value of the SO_RCVBUF option on the socket.
#[inline]
pub fn set_recv_buffer_size(&mut self, size: Option<usize>) {
self.recv_buffer_size = size;
self.config_mut().recv_buffer_size = size;
}
/// Set that all sockets are bound to the configured address before connection.
@@ -167,7 +177,7 @@ impl<R> HttpConnector<R> {
/// Default is `None`.
#[inline]
pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
self.local_address = addr;
self.config_mut().local_address = addr;
}
/// Set the connect timeout.
@@ -178,7 +188,7 @@ impl<R> HttpConnector<R> {
/// Default is `None`.
#[inline]
pub fn set_connect_timeout(&mut self, dur: Option<Duration>) {
self.connect_timeout = dur;
self.config_mut().connect_timeout = dur;
}
/// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
@@ -195,7 +205,7 @@ impl<R> HttpConnector<R> {
/// [RFC 6555]: https://tools.ietf.org/html/rfc6555
#[inline]
pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
self.happy_eyeballs_timeout = dur;
self.config_mut().happy_eyeballs_timeout = dur;
}
/// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`.
@@ -203,9 +213,18 @@ impl<R> HttpConnector<R> {
/// Default is `false`.
#[inline]
pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
self.reuse_address = reuse_address;
self.config_mut().reuse_address = reuse_address;
self
}
// private
fn config_mut(&mut self) -> &mut Config {
// If the are HttpConnector clones, this will clone the inner
// config. So mutating the config won't ever affect previous
// clones.
Arc::make_mut(&mut self.config)
}
}
// R: Debug required for now to allow adding it to debug output later...
@@ -216,16 +235,24 @@ impl<R: fmt::Debug> fmt::Debug for HttpConnector<R> {
}
}
impl<R> Connect for HttpConnector<R>
impl<R> tower_service::Service<Destination> for HttpConnector<R>
where
R: Resolve + Clone + Send + Sync,
R::Future: Send,
{
type Transport = TcpStream;
type Response = (TcpStream, Connected);
type Error = io::Error;
type Future = HttpConnecting<R>;
fn connect(&self, dst: Destination) -> Self::Future {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// For now, always ready.
// TODO: When `Resolve` becomes an alias for `Service`, check
// the resolver's readiness.
drop(cx);
Poll::Ready(Ok(()))
}
fn call(&mut self, dst: Destination) -> Self::Future {
trace!(
"Http::connect; scheme={}, host={}, port={:?}",
dst.scheme(),
@@ -233,17 +260,17 @@ where
dst.port(),
);
if self.enforce_http {
if self.config.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url(InvalidUrl::NotHttp, &self.handle);
return invalid_url(InvalidUrl::NotHttp, &self.config.handle);
}
} else if dst.uri.scheme_part().is_none() {
return invalid_url(InvalidUrl::MissingScheme, &self.handle);
return invalid_url(InvalidUrl::MissingScheme, &self.config.handle);
}
let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle),
None => return invalid_url(InvalidUrl::MissingAuthority, &self.config.handle),
};
let port = match dst.uri.port_part() {
Some(port) => port.as_u16(),
@@ -251,16 +278,16 @@ where
};
HttpConnecting {
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
handle: self.handle.clone(),
connect_timeout: self.connect_timeout,
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address),
handle: self.config.handle.clone(),
connect_timeout: self.config.connect_timeout,
happy_eyeballs_timeout: self.config.happy_eyeballs_timeout,
keep_alive_timeout: self.config.keep_alive_timeout,
nodelay: self.config.nodelay,
port,
reuse_address: self.reuse_address,
send_buffer_size: self.send_buffer_size,
recv_buffer_size: self.recv_buffer_size,
reuse_address: self.config.reuse_address,
send_buffer_size: self.config.send_buffer_size,
recv_buffer_size: self.config.recv_buffer_size,
}
}
}
@@ -289,17 +316,17 @@ where
dst.port(),
);
if self.enforce_http {
if self.config.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url::<R>(InvalidUrl::NotHttp, &self.handle).map_ok(|(s, _)| s).boxed();
return invalid_url::<R>(InvalidUrl::NotHttp, &self.config.handle).map_ok(|(s, _)| s).boxed();
}
} else if dst.uri.scheme_part().is_none() {
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.handle).map_ok(|(s, _)| s).boxed();
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.config.handle).map_ok(|(s, _)| s).boxed();
}
let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.handle).map_ok(|(s, _)| s).boxed(),
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.config.handle).map_ok(|(s, _)| s).boxed(),
};
let port = match dst.uri.port_part() {
Some(port) => port.as_u16(),
@@ -307,16 +334,16 @@ where
};
let fut = HttpConnecting {
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
handle: self.handle.clone(),
connect_timeout: self.connect_timeout,
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address),
handle: self.config.handle.clone(),
connect_timeout: self.config.connect_timeout,
happy_eyeballs_timeout: self.config.happy_eyeballs_timeout,
keep_alive_timeout: self.config.keep_alive_timeout,
nodelay: self.config.nodelay,
port,
reuse_address: self.reuse_address,
send_buffer_size: self.send_buffer_size,
recv_buffer_size: self.recv_buffer_size,
reuse_address: self.config.reuse_address,
send_buffer_size: self.config.send_buffer_size,
recv_buffer_size: self.config.recv_buffer_size,
};
fut.map_ok(|(s, _)| s).boxed()
@@ -671,7 +698,15 @@ mod tests {
use tokio::runtime::current_thread::Runtime;
use tokio_net::driver::Handle;
use super::{Connect, Destination, HttpConnector};
use super::{Connected, Destination, HttpConnector};
use super::super::sealed::Connect;
async fn connect<C>(connector: C, dst: Destination) -> Result<(C::Transport, Connected), C::Error>
where
C: Connect,
{
connector.connect(super::super::sealed::Internal, dst).await
}
#[test]
fn test_errors_missing_authority() {
@@ -684,7 +719,7 @@ mod tests {
rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
connect(connector, dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
})
@@ -701,7 +736,7 @@ mod tests {
rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
connect(connector, dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
})
@@ -718,7 +753,7 @@ mod tests {
rt.block_on(async {
assert_eq!(
connector.connect(dst).await.unwrap_err().kind(),
connect(connector, dst).await.unwrap_err().kind(),
io::ErrorKind::InvalidInput,
);
});

View File

@@ -1,52 +1,23 @@
//! The `Connect` trait, and supporting types.
//! Connectors used by the `Client`.
//!
//! This module contains:
//!
//! - A default [`HttpConnector`](HttpConnector) that does DNS resolution and
//! establishes connections over TCP.
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
//! - Types to build custom connectors.
use std::convert::TryFrom;
use std::error::Error as StdError;
use std::{fmt, mem};
use bytes::{BufMut, Bytes, BytesMut};
use ::http::{uri, Response, Uri};
use tokio_io::{AsyncRead, AsyncWrite};
use crate::common::{Future, Unpin};
#[cfg(feature = "tcp")] pub mod dns;
#[cfg(feature = "tcp")] mod http;
#[cfg(feature = "tcp")] pub use self::http::{HttpConnector, HttpInfo};
/// Connect to a destination, returning an IO transport.
///
/// A connector receives a [`Destination`](Destination) describing how a
/// connection should be estabilished, and returns a `Future` of the
/// ready connection.
pub trait Connect: Send + Sync {
/// The connected IO Stream.
type Transport: AsyncRead + AsyncWrite + Unpin + Send + 'static;
/// An error occured when trying to connect.
type Error: Into<Box<dyn StdError + Send + Sync>>;
/// A Future that will resolve to the connected Transport.
type Future: Future<Output=Result<(Self::Transport, Connected), Self::Error>> + Unpin + Send;
/// Connect to a destination.
fn connect(&self, dst: Destination) -> Self::Future;
}
impl<T: Connect + ?Sized> Connect for Box<T> {
type Transport = <T as Connect>::Transport;
type Error = <T as Connect>::Error;
type Future = <T as Connect>::Future;
fn connect(&self, dst: Destination) -> Self::Future {
<T as Connect>::connect(self, dst)
}
}
/// A set of properties to describe where and how to try to connect.
///
/// This type is passed an argument for the [`Connect`](Connect) trait.
/// This type is passed an argument to connectors.
#[derive(Clone, Debug)]
pub struct Destination {
pub(super) uri: Uri,
@@ -398,6 +369,66 @@ where
}
}
pub(super) mod sealed {
use std::error::Error as StdError;
use tokio_io::{AsyncRead, AsyncWrite};
use crate::common::{Future, Unpin};
use super::{Connected, Destination};
/// Connect to a destination, returning an IO transport.
///
/// A connector receives a [`Destination`](Destination) describing how a
/// connection should be estabilished, and returns a `Future` of the
/// ready connection.
///
/// # Trait Alias
///
/// This is really just an *alias* for the `tower::Service` trait, with
/// additional bounds set for convenience *inside* hyper. You don't actually
/// implement this trait, but `tower::Service<Destination>` instead.
// The `Sized` bound is to prevent creating `dyn Connect`, since they cannot
// fit the `Connect` bounds because of the blanket impl for `Service`.
pub trait Connect: Sealed + Sized {
/// The connected IO Stream.
type Transport: AsyncRead + AsyncWrite;
/// An error occured when trying to connect.
type Error: Into<Box<dyn StdError + Send + Sync>>;
/// A Future that will resolve to the connected Transport.
type Future: Future<Output=Result<(Self::Transport, Connected), Self::Error>>;
#[doc(hidden)]
fn connect(self, internal_only: Internal, dst: Destination) -> Self::Future;
}
impl<S, T> Connect for S
where
S: tower_service::Service<Destination, Response=(T, Connected)> + Send,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Transport = T;
type Error = S::Error;
type Future = crate::service::Oneshot<S, Destination>;
fn connect(self, _: Internal, dst: Destination) -> Self::Future {
crate::service::oneshot(self, dst)
}
}
impl<S, T> Sealed for S
where
S: tower_service::Service<Destination, Response=(T, Connected)> + Send,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{}
pub trait Sealed {}
#[allow(missing_debug_implementations)]
pub struct Internal;
}
#[cfg(test)]
mod tests {
use super::{Connected, Destination, TryFrom};