feat(client): redesign the Connect trait

The original `Connect` trait had some limitations:

- There was no way to provide more details to the connector about how to
  connect, other than the `Uri`.
- There was no way for the connector to return any extra information
  about the connected transport.
- The `Error` was forced to be an `std::io::Error`.
- The transport and future had `'static` requirements.

As hyper gains HTTP/2 support, some of these things needed to be
changed. We want to allow the user to configure whether they hope to
us ALPN to start an HTTP/2 connection, and the connector needs to be
able to return back to hyper if it did so.

The new `Connect` trait is meant to solve this.

- The `connect` method now receives a `Destination` type, instead of a
  `Uri`. This allows us to include additional data about how to connect.
- The `Future` returned from `connect` now must be a tuple of the
  transport, and a `Connected` metadata value. The `Connected` includes
  possibly extra data about what happened when connecting.

BREAKING CHANGE: Custom connectors should now implement `Connect`
  directly, instead of `Service`.

  Calls to `connect` no longer take `Uri`s, but `Destination`. There
  are `scheme`, `host`, and `port` methods to query relevant
  information.

  The returned future must be a tuple of the transport and `Connected`.
  If no relevant extra information is needed, simply return
  `Connected::new()`.

Closes #1428
This commit is contained in:
Sean McArthur
2018-03-14 14:12:36 -07:00
parent fbc449e49c
commit 8c52c2dfd3
5 changed files with 277 additions and 110 deletions

View File

@@ -1,3 +1,10 @@
//! The `Connect` trait, and supporting types.
//!
//! This module contains:
//!
//! - A default [`HttpConnector`](HttpConnector) that does DNS resolution and
//! establishes connections over TCP.
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
use std::error::Error as StdError;
use std::fmt;
use std::io;
@@ -14,38 +21,121 @@ use http::uri::Scheme;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio::net::{TcpStream, TcpStreamNew};
use tokio_service::Service;
use super::dns;
use self::http_connector::HttpConnectorBlockingTask;
/// A connector creates an Io to a remote address..
/// Connect to a destination, returning an IO transport.
///
/// This trait is not implemented directly, and only exists to make
/// the intent clearer. A connector should implement `Service` with
/// `Request=Uri` and `Response: Io` instead.
pub trait Connect: Service<Request=Uri, Error=io::Error> + 'static {
/// The connected Io Stream.
type Output: AsyncRead + AsyncWrite + 'static;
/// A Future that will resolve to the connected Stream.
type Future: Future<Item=Self::Output, Error=io::Error> + 'static;
/// Connect to a remote address.
fn connect(&self, Uri) -> <Self as Connect>::Future;
/// A connector receives a [`Destination`](Destination) describing how a
/// connection should be estabilished, and returns a `Future` of the
/// ready connection.
pub trait Connect {
/// The connected IO Stream.
type Transport: AsyncRead + AsyncWrite + 'static;
/// An error occured when trying to connect.
type Error;
/// A Future that will resolve to the connected Transport.
type Future: Future<Item=(Self::Transport, Connected), Error=Self::Error>;
/// Connect to a destination.
fn connect(&self, dst: Destination) -> Self::Future;
}
impl<T> Connect for T
where T: Service<Request=Uri, Error=io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error=io::Error>,
{
type Output = T::Response;
type Future = T::Future;
/// A set of properties to describe where and how to try to connect.
#[derive(Debug)]
pub struct Destination {
//pub(super) alpn: Alpn,
pub(super) uri: Uri,
}
fn connect(&self, url: Uri) -> <Self as Connect>::Future {
self.call(url)
/// Extra information about the connected transport.
///
/// This can be used to inform recipients about things like if ALPN
/// was used, or if connected to an HTTP proxy.
#[derive(Debug)]
pub struct Connected {
//alpn: Alpn,
pub(super) is_proxied: bool,
}
/*TODO: when HTTP1 Upgrades to H2 are added, this will be needed
#[derive(Debug)]
pub(super) enum Alpn {
Http1,
//H2,
//Http1OrH2
}
*/
impl Destination {
/// Get the protocol scheme.
#[inline]
pub fn scheme(&self) -> &str {
self.uri
.scheme_part()
.expect("destination uri has scheme")
.as_str()
}
/// Get the hostname.
#[inline]
pub fn host(&self) -> &str {
self.uri
.host()
.expect("destination uri has host")
}
/// Get the port, if specified.
#[inline]
pub fn port(&self) -> Option<u16> {
self.uri.port()
}
/*
/// Returns whether this connection must negotiate HTTP/2 via ALPN.
pub fn must_h2(&self) -> bool {
match self.alpn {
Alpn::Http1 => false,
Alpn::H2 => true,
}
}
*/
}
impl Connected {
/// Create new `Connected` type with empty metadata.
pub fn new() -> Connected {
Connected {
//alpn: Alpn::Http1,
is_proxied: false,
}
}
/// Set whether the connected transport is to an HTTP proxy.
///
/// This setting will affect if HTTP/1 requests written on the transport
/// will have the request-target in absolute-form or origin-form (such as
/// `GET http://hyper.rs/guide HTTP/1.1` or `GET /guide HTTP/1.1`).
///
/// Default is `false`.
pub fn proxy(mut self, is_proxied: bool) -> Connected {
self.is_proxied = is_proxied;
self
}
/*
/// Set that the connected transport negotiated HTTP/2 as it's
/// next protocol.
pub fn h2(mut self) -> Connected {
self.alpn = Alpn::H2;
self
}
*/
}
/// A connector for the `http` scheme.
///
/// Performs DNS resolution in a thread pool, and then connects over TCP.
#[derive(Clone)]
pub struct HttpConnector {
executor: HttpConnectExecutor,
@@ -109,30 +199,29 @@ impl fmt::Debug for HttpConnector {
}
}
impl Service for HttpConnector {
type Request = Uri;
type Response = TcpStream;
impl Connect for HttpConnector {
type Transport = TcpStream;
type Error = io::Error;
type Future = HttpConnecting;
fn call(&self, uri: Uri) -> Self::Future {
trace!("Http::connect({:?})", uri);
fn connect(&self, dst: Destination) -> Self::Future {
trace!("Http::connect({:?})", dst.uri);
if self.enforce_http {
if uri.scheme_part() != Some(&Scheme::HTTP) {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url(InvalidUrl::NotHttp, &self.handle);
}
} else if uri.scheme_part().is_none() {
} else if dst.uri.scheme_part().is_none() {
return invalid_url(InvalidUrl::MissingScheme, &self.handle);
}
let host = match uri.host() {
let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url(InvalidUrl::MissingAuthority, &self.handle),
};
let port = match uri.port() {
let port = match dst.uri.port() {
Some(port) => port,
None => if uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
};
HttpConnecting {
@@ -191,7 +280,7 @@ enum State {
}
impl Future for HttpConnecting {
type Item = TcpStream;
type Item = (TcpStream, Connected);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@@ -230,7 +319,7 @@ impl Future for HttpConnecting {
sock.set_keepalive(Some(dur))?;
}
return Ok(Async::Ready(sock));
return Ok(Async::Ready((sock, Connected::new())));
},
State::Error(ref mut e) => return Err(e.take().expect("polled more than once")),
}
@@ -279,23 +368,27 @@ impl ConnectingTcp {
}
}
/// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
work: oneshot::Execute<dns::Work>
}
impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
// Make this Future unnameable outside of this crate.
mod http_connector {
use super::*;
// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
pub(super) work: oneshot::Execute<dns::Work>
}
}
impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();
impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
}
}
fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
}
}
}
@@ -311,35 +404,45 @@ impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
#[cfg(test)]
mod tests {
#![allow(deprecated)]
use std::io;
use tokio::reactor::Core;
use super::{Connect, HttpConnector};
use super::{Connect, Destination, HttpConnector};
#[test]
fn test_errors_missing_authority() {
let mut core = Core::new().unwrap();
let url = "/foo/bar?baz".parse().unwrap();
let uri = "/foo/bar?baz".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1, &core.handle());
assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
#[test]
fn test_errors_enforce_http() {
let mut core = Core::new().unwrap();
let url = "https://example.domain/foo/bar?baz".parse().unwrap();
let uri = "https://example.domain/foo/bar?baz".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1, &core.handle());
assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
#[test]
fn test_errors_missing_scheme() {
let mut core = Core::new().unwrap();
let url = "example.domain".parse().unwrap();
let uri = "example.domain".parse().unwrap();
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1, &core.handle());
assert_eq!(core.run(connector.connect(url)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
assert_eq!(core.run(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
}

View File

@@ -11,6 +11,7 @@ use futures::{Async, Future, Poll};
use futures::future::{self, Executor};
use http::{Method, Request, Response, Uri, Version};
use http::header::{Entry, HeaderValue, HOST};
use http::uri::Scheme;
use tokio::reactor::Handle;
pub use tokio_service::Service;
@@ -18,12 +19,13 @@ use proto::body::{Body, Entity};
use proto;
use self::pool::Pool;
pub use self::connect::{HttpConnector, Connect};
pub use self::connect::{Connect, HttpConnector};
use self::background::{bg, Background};
use self::connect::Destination;
pub mod conn;
mod connect;
pub mod connect;
//TODO(easy): move cancel and dispatch into common instead
pub(crate) mod dispatch;
mod dns;
@@ -101,7 +103,9 @@ impl<C, B> Client<C, B> {
}
impl<C, B> Client<C, B>
where C: Connect,
where C: Connect<Error=io::Error> + 'static,
C::Transport: 'static,
C::Future: 'static,
B: Entity<Error=::Error> + 'static,
{
@@ -180,13 +184,11 @@ where C: Connect,
let client = self.clone();
//TODO: let is_proxy = req.is_proxy();
let uri = req.uri().clone();
let fut = RetryableSendRequest {
client: client,
future: self.send_request(req, &domain),
domain: domain,
//is_proxy: is_proxy,
uri: uri,
};
FutureResponse(Box::new(fut))
@@ -195,19 +197,6 @@ where C: Connect,
//TODO: replace with `impl Future` when stable
fn send_request(&self, mut req: Request<B>, domain: &str) -> Box<Future<Item=Response<Body>, Error=ClientError<B>>> {
let url = req.uri().clone();
let path = match url.path_and_query() {
Some(path) => {
let mut parts = ::http::uri::Parts::default();
parts.path_and_query = Some(path.clone());
Uri::from_parts(parts).expect("path is valid uri")
},
None => {
"/".parse().expect("/ is valid path")
}
};
*req.uri_mut() = path;
let checkout = self.pool.checkout(domain);
let connect = {
let executor = self.executor.clone();
@@ -215,18 +204,23 @@ where C: Connect,
let pool_key = Arc::new(domain.to_string());
let h1_writev = self.h1_writev;
let connector = self.connector.clone();
let dst = Destination {
uri: url,
};
future::lazy(move || {
connector.connect(url)
connector.connect(dst)
.from_err()
.and_then(move |io| {
.and_then(move |(io, connected)| {
conn::Builder::new()
.h1_writev(h1_writev)
.handshake_no_upgrades(io)
}).and_then(move |(tx, conn)| {
executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)))?;
Ok(pool.pooled(pool_key, PoolClient {
tx: tx,
}))
.and_then(move |(tx, conn)| {
executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)))?;
Ok(pool.pooled(pool_key, PoolClient {
is_proxied: connected.is_proxied,
tx: tx,
}))
})
})
})
};
@@ -245,13 +239,14 @@ where C: Connect,
let executor = self.executor.clone();
let resp = race.and_then(move |mut pooled| {
let conn_reused = pooled.is_reused();
set_relative_uri(req.uri_mut(), pooled.is_proxied);
let fut = pooled.tx.send_request_retryable(req)
.map_err(move |(err, orig_req)| {
if let Some(req) = orig_req {
ClientError::Canceled {
connection_reused: conn_reused,
reason: err,
req: req,
req,
}
} else {
ClientError::Normal(err)
@@ -292,7 +287,8 @@ where C: Connect,
}
impl<C, B> Service for Client<C, B>
where C: Connect,
where C: Connect<Error=io::Error> + 'static,
C::Future: 'static,
B: Entity<Error=::Error> + 'static,
{
type Request = Request<B>;
@@ -348,13 +344,13 @@ struct RetryableSendRequest<C, B> {
client: Client<C, B>,
domain: String,
future: Box<Future<Item=Response<Body>, Error=ClientError<B>>>,
//is_proxy: bool,
uri: Uri,
}
impl<C, B> Future for RetryableSendRequest<C, B>
where
C: Connect,
C: Connect<Error=io::Error> + 'static,
C::Future: 'static,
B: Entity<Error=::Error> + 'static,
{
type Item = Response<Body>;
@@ -387,6 +383,7 @@ where
}
struct PoolClient<B> {
is_proxied: bool,
tx: conn::SendRequest<B>,
}
@@ -399,7 +396,7 @@ where
}
}
pub(crate) enum ClientError<B> {
enum ClientError<B> {
Normal(::Error),
Canceled {
connection_reused: bool,
@@ -408,6 +405,23 @@ pub(crate) enum ClientError<B> {
}
}
fn set_relative_uri(uri: &mut Uri, is_proxied: bool) {
if is_proxied && uri.scheme_part() != Some(&Scheme::HTTPS) {
return;
}
let path = match uri.path_and_query() {
Some(path) => {
let mut parts = ::http::uri::Parts::default();
parts.path_and_query = Some(path.clone());
Uri::from_parts(parts).expect("path is valid uri")
},
None => {
"/".parse().expect("/ is valid path")
}
};
*uri = path;
}
/// Configuration for a Client
pub struct Config<C, B> {
_body_type: PhantomData<B>,
@@ -545,7 +559,9 @@ impl<C, B> Config<C, B> {
}
impl<C, B> Config<C, B>
where C: Connect,
where C: Connect<Error=io::Error>,
C::Transport: 'static,
C::Future: 'static,
B: Entity<Error=::Error>,
{
/// Construct the Client with this configuration.

View File

@@ -14,8 +14,8 @@ fn retryable_request() {
let mut connector = MockConnector::new();
let sock1 = connector.mock("http://mock.local/a");
let sock2 = connector.mock("http://mock.local/b");
let sock1 = connector.mock("http://mock.local");
let sock2 = connector.mock("http://mock.local");
let client = Client::configure()
.connector(connector)
@@ -62,7 +62,7 @@ fn conn_reset_after_write() {
let mut connector = MockConnector::new();
let sock1 = connector.mock("http://mock.local/a");
let sock1 = connector.mock("http://mock.local");
let client = Client::configure()
.connector(connector)

View File

@@ -8,9 +8,8 @@ use bytes::Buf;
use futures::{Async, Poll};
use futures::task::{self, Task};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_service::Service;
use ::Uri;
use ::client::connect::{Connect, Connected, Destination};
#[derive(Debug)]
pub struct MockCursor {
@@ -410,19 +409,23 @@ impl MockConnector {
}
}
impl Service for MockConnector {
type Request = Uri;
type Response = Duplex;
impl Connect for MockConnector {
type Transport = Duplex;
type Error = io::Error;
type Future = ::futures::future::FutureResult<Self::Response, Self::Error>;
type Future = ::futures::future::FutureResult<(Self::Transport, Connected), Self::Error>;
fn call(&self, uri: Uri) -> Self::Future {
fn connect(&self, dst: Destination) -> Self::Future {
use futures::future;
trace!("mock connect: {}", uri);
trace!("mock connect: {:?}", dst);
let key = format!("{}://{}{}", dst.scheme(), dst.host(), if let Some(port) = dst.port() {
format!(":{}", port)
} else {
"".to_owned()
});
let mut mocks = self.mocks.borrow_mut();
let mocks = mocks.get_mut(&uri.to_string())
.expect(&format!("unknown mocks uri: {}", uri));
assert!(!mocks.is_empty(), "no additional mocks for {}", uri);
future::ok(mocks.remove(0))
let mocks = mocks.get_mut(&key)
.expect(&format!("unknown mocks uri: {}", key));
assert!(!mocks.is_empty(), "no additional mocks for {}", key);
future::ok((mocks.remove(0), Connected::new()))
}
}

View File

@@ -638,9 +638,8 @@ mod dispatch_impl {
use tokio_core::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
use hyper::client::HttpConnector;
use hyper::server::Service;
use hyper::{Client, Uri};
use hyper::client::connect::{Connect, Connected, Destination, HttpConnector};
use hyper::Client;
use hyper;
@@ -1264,11 +1263,51 @@ mod dispatch_impl {
assert_eq!(connects.load(Ordering::Relaxed), 2);
}
#[test]
fn connect_proxy_sends_absolute_uri() {
let _ = pretty_env_logger::try_init();
let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let mut core = Core::new().unwrap();
let handle = core.handle();
let connector = DebugConnector::new(&handle)
.proxy();
let client = Client::configure()
.connector(connector)
.build(&handle);
let (tx1, rx1) = oneshot::channel();
thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
//drop(server);
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
let mut buf = [0; 4096];
let n = sock.read(&mut buf).expect("read 1");
let expected = format!("GET http://{addr}/foo/bar HTTP/1.1\r\nhost: {addr}\r\n\r\n", addr=addr);
assert_eq!(s(&buf[..n]), expected);
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1");
let _ = tx1.send(());
});
let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
let req = Request::builder()
.uri(&*format!("http://{}/foo/bar", addr))
.body(Body::empty())
.unwrap();
let res = client.request(req);
core.run(res.join(rx).map(|r| r.0)).unwrap();
}
struct DebugConnector {
http: HttpConnector,
closes: mpsc::Sender<()>,
connects: Arc<AtomicUsize>,
is_proxy: bool,
}
impl DebugConnector {
@@ -1283,21 +1322,27 @@ mod dispatch_impl {
http: http,
closes: closes,
connects: Arc::new(AtomicUsize::new(0)),
is_proxy: false,
}
}
fn proxy(mut self) -> Self {
self.is_proxy = true;
self
}
}
impl Service for DebugConnector {
type Request = Uri;
type Response = DebugStream;
impl Connect for DebugConnector {
type Transport = DebugStream;
type Error = io::Error;
type Future = Box<Future<Item = DebugStream, Error = io::Error>>;
type Future = Box<Future<Item = (DebugStream, Connected), Error = io::Error>>;
fn call(&self, uri: Uri) -> Self::Future {
fn connect(&self, dst: Destination) -> Self::Future {
self.connects.fetch_add(1, Ordering::SeqCst);
let closes = self.closes.clone();
Box::new(self.http.call(uri).map(move |s| {
DebugStream(s, closes)
let is_proxy = self.is_proxy;
Box::new(self.http.connect(dst).map(move |(s, c)| {
(DebugStream(s, closes), c.proxy(is_proxy))
}))
}
}