Merge pull request #1500 from kw217/1498-add-local-bind

feat(client): support local bind for HttpConnector
This commit is contained in:
Sean McArthur
2018-05-01 11:58:10 -07:00
committed by GitHub

View File

@@ -125,10 +125,11 @@ impl Connected {
mod http { mod http {
use super::*; use super::*;
use std::borrow::Cow;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::mem; use std::mem;
use std::net::SocketAddr; use std::net::{IpAddr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -146,14 +147,17 @@ mod http {
use self::http_connector::HttpConnectorBlockingTask; use self::http_connector::HttpConnectorBlockingTask;
fn connect(addr: &SocketAddr, handle: &Option<Handle>) -> io::Result<ConnectFuture> { fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
if let Some(ref handle) = *handle {
let builder = match addr { let builder = match addr {
&SocketAddr::V4(_) => TcpBuilder::new_v4()?, &SocketAddr::V4(_) => TcpBuilder::new_v4()?,
&SocketAddr::V6(_) => TcpBuilder::new_v6()?, &SocketAddr::V6(_) => TcpBuilder::new_v6()?,
}; };
if cfg!(windows) { if let Some(ref local_addr) = *local_addr {
// Caller has requested this socket be bound before calling connect
builder.bind(SocketAddr::new(local_addr.clone(), 0))?;
}
else if cfg!(windows) {
// Windows requires a socket be bound before calling connect // Windows requires a socket be bound before calling connect
let any: SocketAddr = match addr { let any: SocketAddr = match addr {
&SocketAddr::V4(_) => { &SocketAddr::V4(_) => {
@@ -166,10 +170,12 @@ mod http {
builder.bind(any)?; builder.bind(any)?;
} }
Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, handle)) let handle = match *handle {
} else { Some(ref handle) => Cow::Borrowed(handle),
Ok(TcpStream::connect(addr)) None => Cow::Owned(Handle::current()),
} };
Ok(TcpStream::connect_std(builder.to_tcp_stream()?, addr, &handle))
} }
/// A connector for the `http` scheme. /// A connector for the `http` scheme.
@@ -182,6 +188,7 @@ mod http {
handle: Option<Handle>, handle: Option<Handle>,
keep_alive_timeout: Option<Duration>, keep_alive_timeout: Option<Duration>,
nodelay: bool, nodelay: bool,
local_address: Option<IpAddr>,
} }
impl HttpConnector { impl HttpConnector {
@@ -218,6 +225,7 @@ mod http {
handle, handle,
keep_alive_timeout: None, keep_alive_timeout: None,
nodelay: false, nodelay: false,
local_address: None,
} }
} }
@@ -246,6 +254,16 @@ mod http {
pub fn set_nodelay(&mut self, nodelay: bool) { pub fn set_nodelay(&mut self, nodelay: bool) {
self.nodelay = nodelay; self.nodelay = nodelay;
} }
/// Set that all sockets are bound to the configured address before connection.
///
/// If `None`, the sockets will not be bound.
///
/// Default is `None`.
#[inline]
pub fn set_local_address(&mut self, addr: Option<IpAddr>) {
self.local_address = addr;
}
} }
impl fmt::Debug for HttpConnector { impl fmt::Debug for HttpConnector {
@@ -287,7 +305,7 @@ mod http {
}; };
HttpConnecting { HttpConnecting {
state: State::Lazy(self.executor.clone(), host.into(), port), state: State::Lazy(self.executor.clone(), host.into(), port, self.local_address),
handle: self.handle.clone(), handle: self.handle.clone(),
keep_alive_timeout: self.keep_alive_timeout, keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay, nodelay: self.nodelay,
@@ -337,8 +355,8 @@ mod http {
} }
enum State { enum State {
Lazy(HttpConnectExecutor, String, u16), Lazy(HttpConnectExecutor, String, u16, Option<IpAddr>),
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>), Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>, Option<IpAddr>),
Connecting(ConnectingTcp), Connecting(ConnectingTcp),
Error(Option<io::Error>), Error(Option<io::Error>),
} }
@@ -351,26 +369,28 @@ mod http {
loop { loop {
let state; let state;
match self.state { match self.state {
State::Lazy(ref executor, ref mut host, port) => { State::Lazy(ref executor, ref mut host, port, local_addr) => {
// If the host is already an IP addr (v4 or v6), // If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away. // skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
state = State::Connecting(ConnectingTcp { state = State::Connecting(ConnectingTcp {
addrs: addrs, addrs: addrs,
local_addr: local_addr,
current: None current: None
}) })
} else { } else {
let host = mem::replace(host, String::new()); let host = mem::replace(host, String::new());
let work = dns::Work::new(host, port); let work = dns::Work::new(host, port);
state = State::Resolving(oneshot::spawn(work, executor)); state = State::Resolving(oneshot::spawn(work, executor), local_addr);
} }
}, },
State::Resolving(ref mut future) => { State::Resolving(ref mut future, local_addr) => {
match try!(future.poll()) { match try!(future.poll()) {
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => { Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp { state = State::Connecting(ConnectingTcp {
addrs: addrs, addrs: addrs,
local_addr: local_addr,
current: None, current: None,
}) })
} }
@@ -402,6 +422,7 @@ mod http {
struct ConnectingTcp { struct ConnectingTcp {
addrs: dns::IpAddrs, addrs: dns::IpAddrs,
local_addr: Option<IpAddr>,
current: Option<ConnectFuture>, current: Option<ConnectFuture>,
} }
@@ -418,14 +439,14 @@ mod http {
err = Some(e); err = Some(e);
if let Some(addr) = self.addrs.next() { if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr); debug!("connecting to {}", addr);
*current = connect(&addr, handle)?; *current = connect(&addr, &self.local_addr, handle)?;
continue; continue;
} }
} }
} }
} else if let Some(addr) = self.addrs.next() { } else if let Some(addr) = self.addrs.next() {
debug!("connecting to {}", addr); debug!("connecting to {}", addr);
self.current = Some(connect(&addr, handle)?); self.current = Some(connect(&addr, &self.local_addr, handle)?);
continue; continue;
} }