feat(client): change DNS Resolver to resolve to SocketAddrs (#2346)

The DNS resolver part of `HttpConnector` used to require resolving to
`IpAddr`s, and this changes it so that they resolve to `SocketAddr`s.
The main benefit here is allowing for resolvers to set the IPv6 zone ID
when resolving, but it also just more closely matches
`std::net::ToSocketAddrs`.

Closes #1937

BREAKING CHANGE: Custom resolvers used with `HttpConnector` must change
  to resolving to an iterator of `SocketAddr`s instead of `IpAddr`s.
This commit is contained in:
Sean McArthur
2020-12-03 14:21:19 -08:00
committed by GitHub
parent 3cb6b4e840
commit b4e24332a0
2 changed files with 42 additions and 39 deletions

View File

@@ -9,21 +9,21 @@
//! # Resolvers are `Service`s //! # Resolvers are `Service`s
//! //!
//! A resolver is just a //! A resolver is just a
//! `Service<Name, Response = impl Iterator<Item = IpAddr>>`. //! `Service<Name, Response = impl Iterator<Item = SocketAddr>>`.
//! //!
//! A simple resolver that ignores the name and always returns a specific //! A simple resolver that ignores the name and always returns a specific
//! address: //! address:
//! //!
//! ```rust,ignore //! ```rust,ignore
//! use std::{convert::Infallible, iter, net::IpAddr}; //! use std::{convert::Infallible, iter, net::SocketAddr};
//! //!
//! let resolver = tower::service_fn(|_name| async { //! let resolver = tower::service_fn(|_name| async {
//! Ok::<_, Infallible>(iter::once(IpAddr::from([127, 0, 0, 1]))) //! Ok::<_, Infallible>(iter::once(SocketAddr::from(([127, 0, 0, 1], 8080))))
//! }); //! });
//! ``` //! ```
use std::error::Error; use std::error::Error;
use std::future::Future; use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs};
use std::pin::Pin; use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::task::{self, Poll}; use std::task::{self, Poll};
@@ -48,12 +48,12 @@ pub struct GaiResolver {
/// An iterator of IP addresses returned from `getaddrinfo`. /// An iterator of IP addresses returned from `getaddrinfo`.
pub struct GaiAddrs { pub struct GaiAddrs {
inner: IpAddrs, inner: SocketAddrs,
} }
/// A future to resolve a name returned by `GaiResolver`. /// A future to resolve a name returned by `GaiResolver`.
pub struct GaiFuture { pub struct GaiFuture {
inner: JoinHandle<Result<IpAddrs, io::Error>>, inner: JoinHandle<Result<SocketAddrs, io::Error>>,
} }
impl Name { impl Name {
@@ -121,7 +121,7 @@ impl Service<Name> for GaiResolver {
debug!("resolving host={:?}", name.host); debug!("resolving host={:?}", name.host);
(&*name.host, 0) (&*name.host, 0)
.to_socket_addrs() .to_socket_addrs()
.map(|i| IpAddrs { iter: i }) .map(|i| SocketAddrs { iter: i })
}); });
GaiFuture { inner: blocking } GaiFuture { inner: blocking }
@@ -159,10 +159,10 @@ impl fmt::Debug for GaiFuture {
} }
impl Iterator for GaiAddrs { impl Iterator for GaiAddrs {
type Item = IpAddr; type Item = SocketAddr;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|sa| sa.ip()) self.inner.next()
} }
} }
@@ -172,28 +172,28 @@ impl fmt::Debug for GaiAddrs {
} }
} }
pub(super) struct IpAddrs { pub(super) struct SocketAddrs {
iter: vec::IntoIter<SocketAddr>, iter: vec::IntoIter<SocketAddr>,
} }
impl IpAddrs { impl SocketAddrs {
pub(super) fn new(addrs: Vec<SocketAddr>) -> Self { pub(super) fn new(addrs: Vec<SocketAddr>) -> Self {
IpAddrs { SocketAddrs {
iter: addrs.into_iter(), iter: addrs.into_iter(),
} }
} }
pub(super) fn try_parse(host: &str, port: u16) -> Option<IpAddrs> { pub(super) fn try_parse(host: &str, port: u16) -> Option<SocketAddrs> {
if let Ok(addr) = host.parse::<Ipv4Addr>() { if let Ok(addr) = host.parse::<Ipv4Addr>() {
let addr = SocketAddrV4::new(addr, port); let addr = SocketAddrV4::new(addr, port);
return Some(IpAddrs { return Some(SocketAddrs {
iter: vec![SocketAddr::V4(addr)].into_iter(), iter: vec![SocketAddr::V4(addr)].into_iter(),
}); });
} }
let host = host.trim_start_matches('[').trim_end_matches(']'); let host = host.trim_start_matches('[').trim_end_matches(']');
if let Ok(addr) = host.parse::<Ipv6Addr>() { if let Ok(addr) = host.parse::<Ipv6Addr>() {
let addr = SocketAddrV6::new(addr, port, 0, 0); let addr = SocketAddrV6::new(addr, port, 0, 0);
return Some(IpAddrs { return Some(SocketAddrs {
iter: vec![SocketAddr::V6(addr)].into_iter(), iter: vec![SocketAddr::V6(addr)].into_iter(),
}); });
} }
@@ -201,18 +201,18 @@ impl IpAddrs {
} }
#[inline] #[inline]
fn filter(self, predicate: impl FnMut(&SocketAddr) -> bool) -> IpAddrs { fn filter(self, predicate: impl FnMut(&SocketAddr) -> bool) -> SocketAddrs {
IpAddrs::new(self.iter.filter(predicate).collect()) SocketAddrs::new(self.iter.filter(predicate).collect())
} }
pub(super) fn split_by_preference( pub(super) fn split_by_preference(
self, self,
local_addr_ipv4: Option<Ipv4Addr>, local_addr_ipv4: Option<Ipv4Addr>,
local_addr_ipv6: Option<Ipv6Addr>, local_addr_ipv6: Option<Ipv6Addr>,
) -> (IpAddrs, IpAddrs) { ) -> (SocketAddrs, SocketAddrs) {
match (local_addr_ipv4, local_addr_ipv6) { match (local_addr_ipv4, local_addr_ipv6) {
(Some(_), None) => (self.filter(SocketAddr::is_ipv4), IpAddrs::new(vec![])), (Some(_), None) => (self.filter(SocketAddr::is_ipv4), SocketAddrs::new(vec![])),
(None, Some(_)) => (self.filter(SocketAddr::is_ipv6), IpAddrs::new(vec![])), (None, Some(_)) => (self.filter(SocketAddr::is_ipv6), SocketAddrs::new(vec![])),
_ => { _ => {
let preferring_v6 = self let preferring_v6 = self
.iter .iter
@@ -225,7 +225,7 @@ impl IpAddrs {
.iter .iter
.partition::<Vec<_>, _>(|addr| addr.is_ipv6() == preferring_v6); .partition::<Vec<_>, _>(|addr| addr.is_ipv6() == preferring_v6);
(IpAddrs::new(preferred), IpAddrs::new(fallback)) (SocketAddrs::new(preferred), SocketAddrs::new(fallback))
} }
} }
} }
@@ -239,7 +239,7 @@ impl IpAddrs {
} }
} }
impl Iterator for IpAddrs { impl Iterator for SocketAddrs {
type Item = SocketAddr; type Item = SocketAddr;
#[inline] #[inline]
fn next(&mut self) -> Option<SocketAddr> { fn next(&mut self) -> Option<SocketAddr> {
@@ -312,13 +312,13 @@ impl Future for TokioThreadpoolGaiFuture {
*/ */
mod sealed { mod sealed {
use super::{IpAddr, Name}; use super::{SocketAddr, Name};
use crate::common::{task, Future, Poll}; use crate::common::{task, Future, Poll};
use tower_service::Service; use tower_service::Service;
// "Trait alias" for `Service<Name, Response = Addrs>` // "Trait alias" for `Service<Name, Response = Addrs>`
pub trait Resolve { pub trait Resolve {
type Addrs: Iterator<Item = IpAddr>; type Addrs: Iterator<Item = SocketAddr>;
type Error: Into<Box<dyn std::error::Error + Send + Sync>>; type Error: Into<Box<dyn std::error::Error + Send + Sync>>;
type Future: Future<Output = Result<Self::Addrs, Self::Error>>; type Future: Future<Output = Result<Self::Addrs, Self::Error>>;
@@ -329,7 +329,7 @@ mod sealed {
impl<S> Resolve for S impl<S> Resolve for S
where where
S: Service<Name>, S: Service<Name>,
S::Response: Iterator<Item = IpAddr>, S::Response: Iterator<Item = SocketAddr>,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{ {
type Addrs = S::Response; type Addrs = S::Response;
@@ -366,42 +366,42 @@ mod tests {
let v4_addr = (ip_v4, 80).into(); let v4_addr = (ip_v4, 80).into();
let v6_addr = (ip_v6, 80).into(); let v6_addr = (ip_v6, 80).into();
let (mut preferred, mut fallback) = IpAddrs { let (mut preferred, mut fallback) = SocketAddrs {
iter: vec![v4_addr, v6_addr].into_iter(), iter: vec![v4_addr, v6_addr].into_iter(),
} }
.split_by_preference(None, None); .split_by_preference(None, None);
assert!(preferred.next().unwrap().is_ipv4()); assert!(preferred.next().unwrap().is_ipv4());
assert!(fallback.next().unwrap().is_ipv6()); assert!(fallback.next().unwrap().is_ipv6());
let (mut preferred, mut fallback) = IpAddrs { let (mut preferred, mut fallback) = SocketAddrs {
iter: vec![v6_addr, v4_addr].into_iter(), iter: vec![v6_addr, v4_addr].into_iter(),
} }
.split_by_preference(None, None); .split_by_preference(None, None);
assert!(preferred.next().unwrap().is_ipv6()); assert!(preferred.next().unwrap().is_ipv6());
assert!(fallback.next().unwrap().is_ipv4()); assert!(fallback.next().unwrap().is_ipv4());
let (mut preferred, mut fallback) = IpAddrs { let (mut preferred, mut fallback) = SocketAddrs {
iter: vec![v4_addr, v6_addr].into_iter(), iter: vec![v4_addr, v6_addr].into_iter(),
} }
.split_by_preference(Some(ip_v4), Some(ip_v6)); .split_by_preference(Some(ip_v4), Some(ip_v6));
assert!(preferred.next().unwrap().is_ipv4()); assert!(preferred.next().unwrap().is_ipv4());
assert!(fallback.next().unwrap().is_ipv6()); assert!(fallback.next().unwrap().is_ipv6());
let (mut preferred, mut fallback) = IpAddrs { let (mut preferred, mut fallback) = SocketAddrs {
iter: vec![v6_addr, v4_addr].into_iter(), iter: vec![v6_addr, v4_addr].into_iter(),
} }
.split_by_preference(Some(ip_v4), Some(ip_v6)); .split_by_preference(Some(ip_v4), Some(ip_v6));
assert!(preferred.next().unwrap().is_ipv6()); assert!(preferred.next().unwrap().is_ipv6());
assert!(fallback.next().unwrap().is_ipv4()); assert!(fallback.next().unwrap().is_ipv4());
let (mut preferred, fallback) = IpAddrs { let (mut preferred, fallback) = SocketAddrs {
iter: vec![v4_addr, v6_addr].into_iter(), iter: vec![v4_addr, v6_addr].into_iter(),
} }
.split_by_preference(Some(ip_v4), None); .split_by_preference(Some(ip_v4), None);
assert!(preferred.next().unwrap().is_ipv4()); assert!(preferred.next().unwrap().is_ipv4());
assert!(fallback.is_empty()); assert!(fallback.is_empty());
let (mut preferred, fallback) = IpAddrs { let (mut preferred, fallback) = SocketAddrs {
iter: vec![v4_addr, v6_addr].into_iter(), iter: vec![v4_addr, v6_addr].into_iter(),
} }
.split_by_preference(None, Some(ip_v6)); .split_by_preference(None, Some(ip_v6));
@@ -422,7 +422,7 @@ mod tests {
let dst = ::http::Uri::from_static("http://[::1]:8080/"); let dst = ::http::Uri::from_static("http://[::1]:8080/");
let mut addrs = let mut addrs =
IpAddrs::try_parse(dst.host().expect("host"), dst.port_u16().expect("port")) SocketAddrs::try_parse(dst.host().expect("host"), dst.port_u16().expect("port"))
.expect("try_parse"); .expect("try_parse");
let expected = "[::1]:8080".parse::<SocketAddr>().expect("expected"); let expected = "[::1]:8080".parse::<SocketAddr>().expect("expected");

View File

@@ -321,14 +321,17 @@ where
// If the host is already an IP addr (v4 or v6), // If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away. // skip resolving the dns and start connecting right away.
let addrs = if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { let addrs = if let Some(addrs) = dns::SocketAddrs::try_parse(host, port) {
addrs addrs
} else { } else {
let addrs = resolve(&mut self.resolver, dns::Name::new(host.into())) let addrs = resolve(&mut self.resolver, dns::Name::new(host.into()))
.await .await
.map_err(ConnectError::dns)?; .map_err(ConnectError::dns)?;
let addrs = addrs.map(|addr| SocketAddr::new(addr, port)).collect(); let addrs = addrs.map(|mut addr| {
dns::IpAddrs::new(addrs) addr.set_port(port);
addr
}).collect();
dns::SocketAddrs::new(addrs)
}; };
let c = ConnectingTcp::new(addrs, config); let c = ConnectingTcp::new(addrs, config);
@@ -457,7 +460,7 @@ struct ConnectingTcp<'a> {
} }
impl<'a> ConnectingTcp<'a> { impl<'a> ConnectingTcp<'a> {
fn new(remote_addrs: dns::IpAddrs, config: &'a Config) -> Self { fn new(remote_addrs: dns::SocketAddrs, config: &'a Config) -> Self {
if let Some(fallback_timeout) = config.happy_eyeballs_timeout { if let Some(fallback_timeout) = config.happy_eyeballs_timeout {
let (preferred_addrs, fallback_addrs) = remote_addrs let (preferred_addrs, fallback_addrs) = remote_addrs
.split_by_preference(config.local_address_ipv4, config.local_address_ipv6); .split_by_preference(config.local_address_ipv4, config.local_address_ipv6);
@@ -493,12 +496,12 @@ struct ConnectingTcpFallback {
} }
struct ConnectingTcpRemote { struct ConnectingTcpRemote {
addrs: dns::IpAddrs, addrs: dns::SocketAddrs,
connect_timeout: Option<Duration>, connect_timeout: Option<Duration>,
} }
impl ConnectingTcpRemote { impl ConnectingTcpRemote {
fn new(addrs: dns::IpAddrs, connect_timeout: Option<Duration>) -> Self { fn new(addrs: dns::SocketAddrs, connect_timeout: Option<Duration>) -> Self {
let connect_timeout = connect_timeout.map(|t| t / (addrs.len() as u32)); let connect_timeout = connect_timeout.map(|t| t / (addrs.len() as u32));
Self { Self {
@@ -920,7 +923,7 @@ mod tests {
send_buffer_size: None, send_buffer_size: None,
recv_buffer_size: None, recv_buffer_size: None,
}; };
let connecting_tcp = ConnectingTcp::new(dns::IpAddrs::new(addrs), &cfg); let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(addrs), &cfg);
let start = Instant::now(); let start = Instant::now();
Ok::<_, ConnectError>((start, ConnectingTcp::connect(connecting_tcp).await?)) Ok::<_, ConnectError>((start, ConnectingTcp::connect(connecting_tcp).await?))
}) })