feat(client): add ability to include SO_REUSEADDR option on sockets

Closes #1599
This commit is contained in:
Eli Snow
2018-07-10 16:32:42 -06:00
committed by Sean McArthur
parent 02a9c29e2e
commit 13862d11ad

View File

@@ -403,12 +403,16 @@ mod http {
use self::http_connector::HttpConnectorBlockingTask; use self::http_connector::HttpConnectorBlockingTask;
fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>) -> io::Result<ConnectFuture> { fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>, reuse_address: bool) -> io::Result<ConnectFuture> {
let builder = match addr { let builder = match addr {
&SocketAddr::V4(_) => TcpBuilder::new_v4()?, &SocketAddr::V4(_) => TcpBuilder::new_v4()?,
&SocketAddr::V6(_) => TcpBuilder::new_v6()?, &SocketAddr::V6(_) => TcpBuilder::new_v6()?,
}; };
if reuse_address {
builder.reuse_address(reuse_address)?;
}
if let Some(ref local_addr) = *local_addr { if let Some(ref local_addr) = *local_addr {
// Caller has requested this socket be bound before calling connect // Caller has requested this socket be bound before calling connect
builder.bind(SocketAddr::new(local_addr.clone(), 0))?; builder.bind(SocketAddr::new(local_addr.clone(), 0))?;
@@ -446,6 +450,7 @@ mod http {
nodelay: bool, nodelay: bool,
local_address: Option<IpAddr>, local_address: Option<IpAddr>,
happy_eyeballs_timeout: Option<Duration>, happy_eyeballs_timeout: Option<Duration>,
reuse_address: bool,
} }
impl HttpConnector { impl HttpConnector {
@@ -484,6 +489,7 @@ mod http {
nodelay: false, nodelay: false,
local_address: None, local_address: None,
happy_eyeballs_timeout: Some(Duration::from_millis(300)), happy_eyeballs_timeout: Some(Duration::from_millis(300)),
reuse_address: false,
} }
} }
@@ -539,6 +545,15 @@ mod http {
pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) { pub fn set_happy_eyeballs_timeout(&mut self, dur: Option<Duration>) {
self.happy_eyeballs_timeout = dur; self.happy_eyeballs_timeout = dur;
} }
/// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`.
///
/// Default is `false`.
#[inline]
pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self {
self.reuse_address = reuse_address;
self
}
} }
impl fmt::Debug for HttpConnector { impl fmt::Debug for HttpConnector {
@@ -585,6 +600,7 @@ mod http {
keep_alive_timeout: self.keep_alive_timeout, keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay, nodelay: self.nodelay,
happy_eyeballs_timeout: self.happy_eyeballs_timeout, happy_eyeballs_timeout: self.happy_eyeballs_timeout,
reuse_address: self.reuse_address,
} }
} }
} }
@@ -597,6 +613,7 @@ mod http {
keep_alive_timeout: None, keep_alive_timeout: None,
nodelay: false, nodelay: false,
happy_eyeballs_timeout: None, happy_eyeballs_timeout: None,
reuse_address: false,
} }
} }
@@ -630,6 +647,7 @@ mod http {
keep_alive_timeout: Option<Duration>, keep_alive_timeout: Option<Duration>,
nodelay: bool, nodelay: bool,
happy_eyeballs_timeout: Option<Duration>, happy_eyeballs_timeout: Option<Duration>,
reuse_address: bool,
} }
enum State { enum State {
@@ -652,7 +670,7 @@ mod http {
// skip resolving the dns and start connecting right away. // skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
state = State::Connecting(ConnectingTcp::new( state = State::Connecting(ConnectingTcp::new(
local_addr, addrs, self.happy_eyeballs_timeout)); local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address));
} else { } else {
let host = mem::replace(host, String::new()); let host = mem::replace(host, String::new());
let work = dns::Work::new(host, port); let work = dns::Work::new(host, port);
@@ -664,7 +682,7 @@ mod http {
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => { Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp::new( state = State::Connecting(ConnectingTcp::new(
local_addr, addrs, self.happy_eyeballs_timeout)); local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address));
} }
}; };
}, },
@@ -696,6 +714,7 @@ mod http {
local_addr: Option<IpAddr>, local_addr: Option<IpAddr>,
preferred: ConnectingTcpRemote, preferred: ConnectingTcpRemote,
fallback: Option<ConnectingTcpFallback>, fallback: Option<ConnectingTcpFallback>,
reuse_address: bool,
} }
impl ConnectingTcp { impl ConnectingTcp {
@@ -703,6 +722,7 @@ mod http {
local_addr: Option<IpAddr>, local_addr: Option<IpAddr>,
remote_addrs: dns::IpAddrs, remote_addrs: dns::IpAddrs,
fallback_timeout: Option<Duration>, fallback_timeout: Option<Duration>,
reuse_address: bool,
) -> ConnectingTcp { ) -> ConnectingTcp {
if let Some(fallback_timeout) = fallback_timeout { if let Some(fallback_timeout) = fallback_timeout {
let (preferred_addrs, fallback_addrs) = remote_addrs.split_by_preference(); let (preferred_addrs, fallback_addrs) = remote_addrs.split_by_preference();
@@ -711,6 +731,7 @@ mod http {
local_addr, local_addr,
preferred: ConnectingTcpRemote::new(preferred_addrs), preferred: ConnectingTcpRemote::new(preferred_addrs),
fallback: None, fallback: None,
reuse_address,
}; };
} }
@@ -721,12 +742,14 @@ mod http {
delay: Delay::new(Instant::now() + fallback_timeout), delay: Delay::new(Instant::now() + fallback_timeout),
remote: ConnectingTcpRemote::new(fallback_addrs), remote: ConnectingTcpRemote::new(fallback_addrs),
}), }),
reuse_address,
} }
} else { } else {
ConnectingTcp { ConnectingTcp {
local_addr, local_addr,
preferred: ConnectingTcpRemote::new(remote_addrs), preferred: ConnectingTcpRemote::new(remote_addrs),
fallback: None, fallback: None,
reuse_address,
} }
} }
} }
@@ -757,6 +780,7 @@ mod http {
&mut self, &mut self,
local_addr: &Option<IpAddr>, local_addr: &Option<IpAddr>,
handle: &Option<Handle>, handle: &Option<Handle>,
reuse_address: bool,
) -> Poll<TcpStream, io::Error> { ) -> Poll<TcpStream, io::Error> {
let mut err = None; let mut err = None;
loop { loop {
@@ -768,14 +792,14 @@ mod http {
err = Some(e); err = Some(e);
if let Some(addr) = self.addrs.next() { if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr); debug!("connecting to {}", addr);
*current = connect(&addr, local_addr, handle)?; *current = connect(&addr, local_addr, handle, reuse_address)?;
continue; continue;
} }
} }
} }
} else if let Some(addr) = self.addrs.next() { } else if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr); debug!("connecting to {}", addr);
self.current = Some(connect(&addr, local_addr, handle)?); self.current = Some(connect(&addr, local_addr, handle, reuse_address)?);
continue; continue;
} }
@@ -788,14 +812,14 @@ mod http {
// not a Future, since passing a &Handle to poll // not a Future, since passing a &Handle to poll
fn poll(&mut self, handle: &Option<Handle>) -> Poll<TcpStream, io::Error> { fn poll(&mut self, handle: &Option<Handle>) -> Poll<TcpStream, io::Error> {
match self.fallback.take() { match self.fallback.take() {
None => self.preferred.poll(&self.local_addr, handle), None => self.preferred.poll(&self.local_addr, handle, self.reuse_address),
Some(mut fallback) => match self.preferred.poll(&self.local_addr, handle) { Some(mut fallback) => match self.preferred.poll(&self.local_addr, handle, self.reuse_address) {
Ok(Async::Ready(stream)) => { Ok(Async::Ready(stream)) => {
// Preferred successful - drop fallback. // Preferred successful - drop fallback.
Ok(Async::Ready(stream)) Ok(Async::Ready(stream))
} }
Ok(Async::NotReady) => match fallback.delay.poll() { Ok(Async::NotReady) => match fallback.delay.poll() {
Ok(Async::Ready(_)) => match fallback.remote.poll(&self.local_addr, handle) { Ok(Async::Ready(_)) => match fallback.remote.poll(&self.local_addr, handle, self.reuse_address) {
Ok(Async::Ready(stream)) => { Ok(Async::Ready(stream)) => {
// Fallback successful - drop current preferred, // Fallback successful - drop current preferred,
// but keep fallback as new preferred. // but keep fallback as new preferred.
@@ -825,7 +849,7 @@ mod http {
Err(_) => { Err(_) => {
// Preferred failed - use fallback as new preferred. // Preferred failed - use fallback as new preferred.
self.preferred = fallback.remote; self.preferred = fallback.remote;
self.preferred.poll(&self.local_addr, handle) self.preferred.poll(&self.local_addr, handle, self.reuse_address)
} }
} }
} }
@@ -980,7 +1004,7 @@ mod http {
} }
let addrs = hosts.iter().map(|host| (host.clone(), addr.port()).into()).collect(); let addrs = hosts.iter().map(|host| (host.clone(), addr.port()).into()).collect();
let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout)); let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout), false);
let fut = ConnectingTcpFuture(connecting_tcp); let fut = ConnectingTcpFuture(connecting_tcp);
let start = Instant::now(); let start = Instant::now();