feat(client): adds HttpInfo to responses when HttpConnector is used
- Adds `client::connect::Connected::extra()`, which allows connectors to specify arbitrary custom information about a connected transport. If a connector provides this extra value, it will be set in the `Response` extensions. Closes #1402
This commit is contained in:
@@ -10,7 +10,6 @@ static PHRASE: &'static [u8] = b"Hello World!";
|
|||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
pretty_env_logger::init();
|
pretty_env_logger::init();
|
||||||
|
|
||||||
let addr = ([127, 0, 0, 1], 3000).into();
|
let addr = ([127, 0, 0, 1], 3000).into();
|
||||||
|
|
||||||
// new_service is run for each connection, creating a 'service'
|
// new_service is run for each connection, creating a 'service'
|
||||||
|
|||||||
@@ -24,6 +24,11 @@ use self::sealed::HttpConnectorBlockingTask;
|
|||||||
/// A connector for the `http` scheme.
|
/// A connector for the `http` scheme.
|
||||||
///
|
///
|
||||||
/// Performs DNS resolution in a thread pool, and then connects over TCP.
|
/// Performs DNS resolution in a thread pool, and then connects over TCP.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
///
|
||||||
|
/// Sets the [`HttpInfo`](HttpInfo) value on responses, which includes
|
||||||
|
/// transport information such as the remote socket address used.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct HttpConnector {
|
pub struct HttpConnector {
|
||||||
executor: HttpConnectExecutor,
|
executor: HttpConnectExecutor,
|
||||||
@@ -36,6 +41,37 @@ pub struct HttpConnector {
|
|||||||
reuse_address: bool,
|
reuse_address: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Extra information about the transport when an HttpConnector is used.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
///
|
||||||
|
/// ```rust
|
||||||
|
/// use hyper::client::{Client, connect::HttpInfo};
|
||||||
|
/// use hyper::rt::Future;
|
||||||
|
///
|
||||||
|
/// let client = Client::new();
|
||||||
|
///
|
||||||
|
/// let fut = client.get("http://example.local".parse().unwrap())
|
||||||
|
/// .inspect(|resp| {
|
||||||
|
/// resp
|
||||||
|
/// .extensions()
|
||||||
|
/// .get::<HttpInfo>()
|
||||||
|
/// .map(|info| {
|
||||||
|
/// println!("remote addr = {}", info.remote_addr());
|
||||||
|
/// });
|
||||||
|
/// });
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
///
|
||||||
|
/// If a different connector is used besides [`HttpConnector`](HttpConnector),
|
||||||
|
/// this value will not exist in the extensions. Consult that specific
|
||||||
|
/// connector to see what "extra" information it might provide to responses.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct HttpInfo {
|
||||||
|
remote_addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
impl HttpConnector {
|
impl HttpConnector {
|
||||||
/// Construct a new HttpConnector.
|
/// Construct a new HttpConnector.
|
||||||
///
|
///
|
||||||
@@ -187,6 +223,13 @@ impl Connect for HttpConnector {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl HttpInfo {
|
||||||
|
/// Get the remote address of the transport used.
|
||||||
|
pub fn remote_addr(&self) -> SocketAddr {
|
||||||
|
self.remote_addr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn invalid_url(err: InvalidUrl, handle: &Option<Handle>) -> HttpConnecting {
|
fn invalid_url(err: InvalidUrl, handle: &Option<Handle>) -> HttpConnecting {
|
||||||
HttpConnecting {
|
HttpConnecting {
|
||||||
@@ -277,7 +320,13 @@ impl Future for HttpConnecting {
|
|||||||
|
|
||||||
sock.set_nodelay(self.nodelay)?;
|
sock.set_nodelay(self.nodelay)?;
|
||||||
|
|
||||||
return Ok(Async::Ready((sock, Connected::new())));
|
let extra = HttpInfo {
|
||||||
|
remote_addr: sock.peer_addr()?,
|
||||||
|
};
|
||||||
|
let connected = Connected::new()
|
||||||
|
.extra(extra);
|
||||||
|
|
||||||
|
return Ok(Async::Ready((sock, connected)));
|
||||||
},
|
},
|
||||||
State::Error(ref mut e) => return Err(e.take().expect("polled more than once")),
|
State::Error(ref mut e) => return Err(e.take().expect("polled more than once")),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,16 +6,16 @@
|
|||||||
//! establishes connections over TCP.
|
//! establishes connections over TCP.
|
||||||
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
|
//! - The [`Connect`](Connect) trait and related types to build custom connectors.
|
||||||
use std::error::Error as StdError;
|
use std::error::Error as StdError;
|
||||||
use std::mem;
|
use std::{fmt, mem};
|
||||||
|
|
||||||
use bytes::{BufMut, Bytes, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use http::{uri, Uri};
|
use http::{uri, Response, Uri};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
#[cfg(feature = "runtime")] mod dns;
|
#[cfg(feature = "runtime")] mod dns;
|
||||||
#[cfg(feature = "runtime")] mod http;
|
#[cfg(feature = "runtime")] mod http;
|
||||||
#[cfg(feature = "runtime")] pub use self::http::HttpConnector;
|
#[cfg(feature = "runtime")] pub use self::http::{HttpConnector, HttpInfo};
|
||||||
|
|
||||||
/// Connect to a destination, returning an IO transport.
|
/// Connect to a destination, returning an IO transport.
|
||||||
///
|
///
|
||||||
@@ -48,8 +48,11 @@ pub struct Destination {
|
|||||||
pub struct Connected {
|
pub struct Connected {
|
||||||
//alpn: Alpn,
|
//alpn: Alpn,
|
||||||
pub(super) is_proxied: bool,
|
pub(super) is_proxied: bool,
|
||||||
|
pub(super) extra: Option<Extra>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(super) struct Extra(Box<ExtraInner>);
|
||||||
|
|
||||||
/*TODO: when HTTP1 Upgrades to H2 are added, this will be needed
|
/*TODO: when HTTP1 Upgrades to H2 are added, this will be needed
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(super) enum Alpn {
|
pub(super) enum Alpn {
|
||||||
@@ -245,6 +248,7 @@ impl Connected {
|
|||||||
Connected {
|
Connected {
|
||||||
//alpn: Alpn::Http1,
|
//alpn: Alpn::Http1,
|
||||||
is_proxied: false,
|
is_proxied: false,
|
||||||
|
extra: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -260,6 +264,12 @@ impl Connected {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set extra connection information to be set in the extensions of every `Response`.
|
||||||
|
pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected {
|
||||||
|
self.extra = Some(Extra(Box::new(ExtraEnvelope(extra))));
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
/// Set that the connected transport negotiated HTTP/2 as it's
|
/// Set that the connected transport negotiated HTTP/2 as it's
|
||||||
/// next protocol.
|
/// next protocol.
|
||||||
@@ -268,6 +278,61 @@ impl Connected {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// Don't public expose that `Connected` is `Clone`, unsure if we want to
|
||||||
|
// keep that contract...
|
||||||
|
pub(super) fn clone(&self) -> Connected {
|
||||||
|
Connected {
|
||||||
|
is_proxied: self.is_proxied,
|
||||||
|
extra: self.extra.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ===== impl Extra =====
|
||||||
|
|
||||||
|
impl Extra {
|
||||||
|
pub(super) fn set(&self, res: &mut Response<::Body>) {
|
||||||
|
self.0.set(res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for Extra {
|
||||||
|
fn clone(&self) -> Extra {
|
||||||
|
Extra(self.0.clone_box())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Extra {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
f.debug_struct("Extra")
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This indirection allows the `Connected` to have a type-erased "extra" value,
|
||||||
|
// while that type still knows its inner extra type. This allows the correct
|
||||||
|
// TypeId to be used when inserting into `res.extensions_mut()`.
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct ExtraEnvelope<T>(T);
|
||||||
|
|
||||||
|
trait ExtraInner: Send + Sync {
|
||||||
|
fn clone_box(&self) -> Box<ExtraInner>;
|
||||||
|
fn set(&self, res: &mut Response<::Body>);
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ExtraInner for ExtraEnvelope<T>
|
||||||
|
where
|
||||||
|
T: Clone + Send + Sync + 'static
|
||||||
|
{
|
||||||
|
fn clone_box(&self) -> Box<ExtraInner> {
|
||||||
|
Box::new(self.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set(&self, res: &mut Response<::Body>) {
|
||||||
|
let extra = self.0.clone();
|
||||||
|
res.extensions_mut().insert(extra);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -91,7 +91,7 @@ use http::uri::Scheme;
|
|||||||
|
|
||||||
use body::{Body, Payload};
|
use body::{Body, Payload};
|
||||||
use common::{Exec, lazy as hyper_lazy, Lazy};
|
use common::{Exec, lazy as hyper_lazy, Lazy};
|
||||||
use self::connect::{Connect, Destination};
|
use self::connect::{Connect, Connected, Destination};
|
||||||
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
|
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
|
||||||
|
|
||||||
#[cfg(feature = "runtime")] pub use self::connect::HttpConnector;
|
#[cfg(feature = "runtime")] pub use self::connect::HttpConnector;
|
||||||
@@ -290,7 +290,7 @@ where C: Connect + Sync + 'static,
|
|||||||
// CONNECT always sends origin-form, so check it first...
|
// CONNECT always sends origin-form, so check it first...
|
||||||
if req.method() == &Method::CONNECT {
|
if req.method() == &Method::CONNECT {
|
||||||
authority_form(req.uri_mut());
|
authority_form(req.uri_mut());
|
||||||
} else if pooled.is_proxied {
|
} else if pooled.conn_info.is_proxied {
|
||||||
absolute_form(req.uri_mut());
|
absolute_form(req.uri_mut());
|
||||||
} else {
|
} else {
|
||||||
origin_form(req.uri_mut());
|
origin_form(req.uri_mut());
|
||||||
@@ -305,6 +305,15 @@ where C: Connect + Sync + 'static,
|
|||||||
let fut = pooled.send_request_retryable(req)
|
let fut = pooled.send_request_retryable(req)
|
||||||
.map_err(ClientError::map_with_reused(pooled.is_reused()));
|
.map_err(ClientError::map_with_reused(pooled.is_reused()));
|
||||||
|
|
||||||
|
// If the Connector included 'extra' info, add to Response...
|
||||||
|
let extra_info = pooled.conn_info.extra.clone();
|
||||||
|
let fut = fut.map(move |mut res| {
|
||||||
|
if let Some(extra) = extra_info {
|
||||||
|
extra.set(&mut res);
|
||||||
|
}
|
||||||
|
res
|
||||||
|
});
|
||||||
|
|
||||||
// As of futures@0.1.21, there is a race condition in the mpsc
|
// As of futures@0.1.21, there is a race condition in the mpsc
|
||||||
// channel, such that sending when the receiver is closing can
|
// channel, such that sending when the receiver is closing can
|
||||||
// result in the message being stuck inside the queue. It won't
|
// result in the message being stuck inside the queue. It won't
|
||||||
@@ -499,7 +508,7 @@ where C: Connect + Sync + 'static,
|
|||||||
})
|
})
|
||||||
.map(move |tx| {
|
.map(move |tx| {
|
||||||
pool.pooled(connecting, PoolClient {
|
pool.pooled(connecting, PoolClient {
|
||||||
is_proxied: connected.is_proxied,
|
conn_info: connected,
|
||||||
tx: match ver {
|
tx: match ver {
|
||||||
Ver::Http1 => PoolTx::Http1(tx),
|
Ver::Http1 => PoolTx::Http1(tx),
|
||||||
Ver::Http2 => PoolTx::Http2(tx.into_http2()),
|
Ver::Http2 => PoolTx::Http2(tx.into_http2()),
|
||||||
@@ -565,7 +574,7 @@ impl Future for ResponseFuture {
|
|||||||
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
struct PoolClient<B> {
|
struct PoolClient<B> {
|
||||||
is_proxied: bool,
|
conn_info: Connected,
|
||||||
tx: PoolTx<B>,
|
tx: PoolTx<B>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -624,17 +633,17 @@ where
|
|||||||
match self.tx {
|
match self.tx {
|
||||||
PoolTx::Http1(tx) => {
|
PoolTx::Http1(tx) => {
|
||||||
Reservation::Unique(PoolClient {
|
Reservation::Unique(PoolClient {
|
||||||
is_proxied: self.is_proxied,
|
conn_info: self.conn_info,
|
||||||
tx: PoolTx::Http1(tx),
|
tx: PoolTx::Http1(tx),
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
PoolTx::Http2(tx) => {
|
PoolTx::Http2(tx) => {
|
||||||
let b = PoolClient {
|
let b = PoolClient {
|
||||||
is_proxied: self.is_proxied,
|
conn_info: self.conn_info.clone(),
|
||||||
tx: PoolTx::Http2(tx.clone()),
|
tx: PoolTx::Http2(tx.clone()),
|
||||||
};
|
};
|
||||||
let a = PoolClient {
|
let a = PoolClient {
|
||||||
is_proxied: self.is_proxied,
|
conn_info: self.conn_info,
|
||||||
tx: PoolTx::Http2(tx),
|
tx: PoolTx::Http2(tx),
|
||||||
};
|
};
|
||||||
Reservation::Shared(a, b)
|
Reservation::Shared(a, b)
|
||||||
|
|||||||
@@ -274,7 +274,17 @@ macro_rules! test {
|
|||||||
|
|
||||||
let rx = rx.expect("thread panicked");
|
let rx = rx.expect("thread panicked");
|
||||||
|
|
||||||
rt.block_on(res.join(rx).map(|r| r.0))
|
rt.block_on(res.join(rx).map(|r| r.0)).map(move |mut resp| {
|
||||||
|
// Always check that HttpConnector has set the "extra" info...
|
||||||
|
let extra = resp
|
||||||
|
.extensions_mut()
|
||||||
|
.remove::<::hyper::client::connect::HttpInfo>()
|
||||||
|
.expect("HttpConnector should set HttpInfo");
|
||||||
|
|
||||||
|
assert_eq!(extra.remote_addr(), addr, "HttpInfo should have server addr");
|
||||||
|
|
||||||
|
resp
|
||||||
|
})
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user