feat(client): implement rfc 6555 (happy eyeballs)

Update client connector to attempt a parallel connection using
alternative address family, if connection using preferred address family
takes too long.

Closes: #1316
This commit is contained in:
Hrvoje Ban
2018-07-08 10:11:38 +02:00
committed by Sean McArthur
parent 5b5e309095
commit 02a9c29e2e
3 changed files with 337 additions and 17 deletions

View File

@@ -1,5 +1,5 @@
language: rust
sudo: false
sudo: true # Required for functional IPv6 (forces VM instead of Docker).
dist: trusty
matrix:
fast_finish: true
@@ -18,6 +18,13 @@ matrix:
cache:
apt: true
before_script:
# Add an IPv6 config - see the corresponding Travis issue
# https://github.com/travis-ci/travis-ci/issues/8361
- if [ "${TRAVIS_OS_NAME}" == "linux" ]; then
sudo sh -c 'echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6';
fi
script:
- ./.travis/readme.py
- cargo build $FEATURES

View File

@@ -386,7 +386,7 @@ mod http {
use std::mem;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use futures::{Async, Poll};
use futures::future::{Executor, ExecuteError};
@@ -396,6 +396,7 @@ mod http {
use net2::TcpBuilder;
use tokio_reactor::Handle;
use tokio_tcp::{TcpStream, ConnectFuture};
use tokio_timer::Delay;
use super::super::dns;
@@ -444,6 +445,7 @@ mod http {
keep_alive_timeout: Option<Duration>,
nodelay: bool,
local_address: Option<IpAddr>,
happy_eyeballs_timeout: Option<Duration>,
}
impl HttpConnector {
@@ -481,6 +483,7 @@ mod http {
keep_alive_timeout: None,
nodelay: false,
local_address: None,
happy_eyeballs_timeout: Some(Duration::from_millis(300)),
}
}
@@ -519,6 +522,23 @@ mod http {
pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
self.local_address = addr;
}
/// Set timeout for [RFC 6555 (Happy Eyeballs)][RFC 6555] algorithm.
///
/// If hostname resolves to both IPv4 and IPv6 addresses and connection
/// cannot be established using preferred address family before timeout
/// elapses, then connector will in parallel attempt connection using other
/// address family.
///
/// If `None`, parallel connection attempts are disabled.
///
/// Default is 300 milliseconds.
///
/// [RFC 6555]: https://tools.ietf.org/html/rfc6555
#[inline]
pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
self.happy_eyeballs_timeout = dur;
}
}
impl fmt::Debug for HttpConnector {
@@ -564,6 +584,7 @@ mod http {
handle: self.handle.clone(),
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
}
}
}
@@ -575,6 +596,7 @@ mod http {
handle: handle.clone(),
keep_alive_timeout: None,
nodelay: false,
happy_eyeballs_timeout: None,
}
}
@@ -607,6 +629,7 @@ mod http {
handle: Option<Handle>,
keep_alive_timeout: Option<Duration>,
nodelay: bool,
happy_eyeballs_timeout: Option<Duration>,
}
enum State {
@@ -628,11 +651,8 @@ mod http {
// If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
local_addr: local_addr,
current: None
})
state = State::Connecting(ConnectingTcp::new(
local_addr, addrs, self.happy_eyeballs_timeout));
} else {
let host = mem::replace(host, String::new());
let work = dns::Work::new(host, port);
@@ -643,11 +663,8 @@ mod http {
match try!(future.poll()) {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
local_addr: local_addr,
current: None,
})
state = State::Connecting(ConnectingTcp::new(
local_addr, addrs, self.happy_eyeballs_timeout));
}
};
},
@@ -676,14 +693,71 @@ mod http {
}
struct ConnectingTcp {
addrs: dns::IpAddrs,
local_addr: Option<IpAddr>,
current: Option<ConnectFuture>,
preferred: ConnectingTcpRemote,
fallback: Option<ConnectingTcpFallback>,
}
impl ConnectingTcp {
fn new(
local_addr: Option<IpAddr>,
remote_addrs: dns::IpAddrs,
fallback_timeout: Option<Duration>,
) -> ConnectingTcp {
if let Some(fallback_timeout) = fallback_timeout {
let (preferred_addrs, fallback_addrs) = remote_addrs.split_by_preference();
if fallback_addrs.is_empty() {
return ConnectingTcp {
local_addr,
preferred: ConnectingTcpRemote::new(preferred_addrs),
fallback: None,
};
}
ConnectingTcp {
local_addr,
preferred: ConnectingTcpRemote::new(preferred_addrs),
fallback: Some(ConnectingTcpFallback {
delay: Delay::new(Instant::now() + fallback_timeout),
remote: ConnectingTcpRemote::new(fallback_addrs),
}),
}
} else {
ConnectingTcp {
local_addr,
preferred: ConnectingTcpRemote::new(remote_addrs),
fallback: None,
}
}
}
}
struct ConnectingTcpFallback {
delay: Delay,
remote: ConnectingTcpRemote,
}
struct ConnectingTcpRemote {
addrs: dns::IpAddrs,
current: Option<ConnectFuture>,
}
impl ConnectingTcpRemote {
fn new(addrs: dns::IpAddrs) -> Self {
Self {
addrs,
current: None,
}
}
}
impl ConnectingTcpRemote {
// not a Future, since passing a &Handle to poll
fn poll(&mut self, handle: &Option<Handle>) -> Poll<TcpStream, io::Error> {
fn poll(
&mut self,
local_addr: &Option<IpAddr>,
handle: &Option<Handle>,
) -> Poll<TcpStream, io::Error> {
let mut err = None;
loop {
if let Some(ref mut current) = self.current {
@@ -694,14 +768,14 @@ mod http {
err = Some(e);
if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr);
*current = connect(&addr, &self.local_addr, handle)?;
*current = connect(&addr, local_addr, handle)?;
continue;
}
}
}
} else if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr);
self.current = Some(connect(&addr, &self.local_addr, handle)?);
self.current = Some(connect(&addr, local_addr, handle)?);
continue;
}
@@ -710,6 +784,54 @@ mod http {
}
}
impl ConnectingTcp {
// not a Future, since passing a &Handle to poll
fn poll(&mut self, handle: &Option<Handle>) -> Poll<TcpStream, io::Error> {
match self.fallback.take() {
None => self.preferred.poll(&self.local_addr, handle),
Some(mut fallback) => match self.preferred.poll(&self.local_addr, handle) {
Ok(Async::Ready(stream)) => {
// Preferred successful - drop fallback.
Ok(Async::Ready(stream))
}
Ok(Async::NotReady) => match fallback.delay.poll() {
Ok(Async::Ready(_)) => match fallback.remote.poll(&self.local_addr, handle) {
Ok(Async::Ready(stream)) => {
// Fallback successful - drop current preferred,
// but keep fallback as new preferred.
self.preferred = fallback.remote;
Ok(Async::Ready(stream))
}
Ok(Async::NotReady) => {
// Neither preferred nor fallback are ready.
self.fallback = Some(fallback);
Ok(Async::NotReady)
}
Err(_) => {
// Fallback failed - resume with preferred only.
Ok(Async::NotReady)
}
},
Ok(Async::NotReady) => {
// Too early to attempt fallback.
self.fallback = Some(fallback);
Ok(Async::NotReady)
}
Err(_) => {
// Fallback delay failed - resume with preferred only.
Ok(Async::NotReady)
}
}
Err(_) => {
// Preferred failed - use fallback as new preferred.
self.preferred = fallback.remote;
self.preferred.poll(&self.local_addr, handle)
}
}
}
}
}
// Make this Future unnameable outside of this crate.
mod http_connector {
use super::*;
@@ -783,6 +905,154 @@ mod http {
assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
#[test]
fn client_happy_eyeballs() {
extern crate pretty_env_logger;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, TcpListener};
use std::time::{Duration, Instant};
use futures::{Async, Poll};
use tokio::runtime::current_thread::Runtime;
use tokio_reactor::Handle;
use super::dns;
use super::ConnectingTcp;
let _ = pretty_env_logger::try_init();
let server4 = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server4.local_addr().unwrap();
let _server6 = TcpListener::bind(&format!("[::1]:{}", addr.port())).unwrap();
let mut rt = Runtime::new().unwrap();
let local_timeout = Duration::default();
let unreachable_v4_timeout = measure_connect(unreachable_ipv4_addr()).1;
let unreachable_v6_timeout = measure_connect(unreachable_ipv6_addr()).1;
let fallback_timeout = ::std::cmp::max(unreachable_v4_timeout, unreachable_v6_timeout)
+ Duration::from_millis(250);
let scenarios = &[
// Fast primary, without fallback.
(&[local_ipv4_addr()][..],
4, local_timeout, false),
(&[local_ipv6_addr()][..],
6, local_timeout, false),
// Fast primary, with (unused) fallback.
(&[local_ipv4_addr(), local_ipv6_addr()][..],
4, local_timeout, false),
(&[local_ipv6_addr(), local_ipv4_addr()][..],
6, local_timeout, false),
// Unreachable + fast primary, without fallback.
(&[unreachable_ipv4_addr(), local_ipv4_addr()][..],
4, unreachable_v4_timeout, false),
(&[unreachable_ipv6_addr(), local_ipv6_addr()][..],
6, unreachable_v6_timeout, false),
// Unreachable + fast primary, with (unused) fallback.
(&[unreachable_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..],
4, unreachable_v4_timeout, false),
(&[unreachable_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..],
6, unreachable_v6_timeout, true),
// Slow primary, with (used) fallback.
(&[slow_ipv4_addr(), local_ipv4_addr(), local_ipv6_addr()][..],
6, fallback_timeout, false),
(&[slow_ipv6_addr(), local_ipv6_addr(), local_ipv4_addr()][..],
4, fallback_timeout, true),
// Slow primary, with (used) unreachable + fast fallback.
(&[slow_ipv4_addr(), unreachable_ipv6_addr(), local_ipv6_addr()][..],
6, fallback_timeout + unreachable_v6_timeout, false),
(&[slow_ipv6_addr(), unreachable_ipv4_addr(), local_ipv4_addr()][..],
4, fallback_timeout + unreachable_v4_timeout, true),
];
// Scenarios for IPv6 -> IPv4 fallback require that host can access IPv6 network.
// Otherwise, connection to "slow" IPv6 address will error-out immediatelly.
let ipv6_accessible = measure_connect(slow_ipv6_addr()).0;
for &(hosts, family, timeout, needs_ipv6_access) in scenarios {
if needs_ipv6_access && !ipv6_accessible {
continue;
}
let addrs = hosts.iter().map(|host| (host.clone(), addr.port()).into()).collect();
let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout));
let fut = ConnectingTcpFuture(connecting_tcp);
let start = Instant::now();
let res = rt.block_on(fut).unwrap();
let duration = start.elapsed();
// Allow actual duration to be +/- 150ms off.
let min_duration = if timeout >= Duration::from_millis(150) {
timeout - Duration::from_millis(150)
} else {
Duration::default()
};
let max_duration = timeout + Duration::from_millis(150);
assert_eq!(res, family);
assert!(duration >= min_duration);
assert!(duration <= max_duration);
}
struct ConnectingTcpFuture(ConnectingTcp);
impl Future for ConnectingTcpFuture {
type Item = u8;
type Error = ::std::io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.0.poll(&Some(Handle::default())) {
Ok(Async::Ready(stream)) => Ok(Async::Ready(
if stream.peer_addr().unwrap().is_ipv4() { 4 } else { 6 }
)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err),
}
}
}
fn local_ipv4_addr() -> IpAddr {
Ipv4Addr::new(127, 0, 0, 1).into()
}
fn local_ipv6_addr() -> IpAddr {
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1).into()
}
fn unreachable_ipv4_addr() -> IpAddr {
Ipv4Addr::new(127, 0, 0, 2).into()
}
fn unreachable_ipv6_addr() -> IpAddr {
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2).into()
}
fn slow_ipv4_addr() -> IpAddr {
// RFC 6890 reserved IPv4 address.
Ipv4Addr::new(198, 18, 0, 25).into()
}
fn slow_ipv6_addr() -> IpAddr {
// RFC 6890 reserved IPv6 address.
Ipv6Addr::new(2001, 2, 0, 0, 0, 0, 0, 254).into()
}
fn measure_connect(addr: IpAddr) -> (bool, Duration) {
let start = Instant::now();
let result = ::std::net::TcpStream::connect_timeout(
&(addr, 80).into(), Duration::from_secs(1));
let reachable = result.is_ok() || result.unwrap_err().kind() == io::ErrorKind::TimedOut;
let duration = start.elapsed();
(reachable, duration)
}
}
}
}

View File

@@ -35,6 +35,10 @@ pub struct IpAddrs {
}
impl IpAddrs {
pub fn new(addrs: Vec<SocketAddr>) -> Self {
IpAddrs { iter: addrs.into_iter() }
}
pub fn try_parse(host: &str, port: u16) -> Option<IpAddrs> {
if let Ok(addr) = host.parse::<Ipv4Addr>() {
let addr = SocketAddrV4::new(addr, port);
@@ -46,6 +50,23 @@ impl IpAddrs {
}
None
}
pub fn split_by_preference(self) -> (IpAddrs, IpAddrs) {
let preferring_v6 = self.iter
.as_slice()
.first()
.map(SocketAddr::is_ipv6)
.unwrap_or(false);
let (preferred, fallback) = self.iter
.partition::<Vec<_>, _>(|addr| addr.is_ipv6() == preferring_v6);
(IpAddrs::new(preferred), IpAddrs::new(fallback))
}
pub fn is_empty(&self) -> bool {
self.iter.as_slice().is_empty()
}
}
impl Iterator for IpAddrs {
@@ -55,3 +76,25 @@ impl Iterator for IpAddrs {
self.iter.next()
}
}
#[cfg(test)]
mod tests {
use std::net::{Ipv4Addr, Ipv6Addr};
use super::*;
#[test]
fn test_ip_addrs_split_by_preference() {
let v4_addr = (Ipv4Addr::new(127, 0, 0, 1), 80).into();
let v6_addr = (Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), 80).into();
let (mut preferred, mut fallback) =
IpAddrs { iter: vec![v4_addr, v6_addr].into_iter() }.split_by_preference();
assert!(preferred.next().unwrap().is_ipv4());
assert!(fallback.next().unwrap().is_ipv6());
let (mut preferred, mut fallback) =
IpAddrs { iter: vec![v6_addr, v4_addr].into_iter() }.split_by_preference();
assert!(preferred.next().unwrap().is_ipv6());
assert!(fallback.next().unwrap().is_ipv4());
}
}