refactor(client): de-duplicate HttpConnector::call code

This commit is contained in:
Sean McArthur
2019-10-22 17:50:13 -07:00
parent d67e49f149
commit 3ee47199d9

View File

@@ -262,15 +262,15 @@ where
if self.config.enforce_http { if self.config.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) { if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url(InvalidUrl::NotHttp, &self.config.handle); return invalid_url(InvalidUrl::NotHttp);
} }
} else if dst.uri.scheme_part().is_none() { } else if dst.uri.scheme_part().is_none() {
return invalid_url(InvalidUrl::MissingScheme, &self.config.handle); return invalid_url(InvalidUrl::MissingScheme);
} }
let host = match dst.uri.host() { let host = match dst.uri.host() {
Some(s) => s, Some(s) => s,
None => return invalid_url(InvalidUrl::MissingAuthority, &self.config.handle), None => return invalid_url(InvalidUrl::MissingAuthority),
}; };
let port = match dst.uri.port_part() { let port = match dst.uri.port_part() {
Some(port) => port.as_u16(), Some(port) => port.as_u16(),
@@ -297,56 +297,19 @@ where
R: Resolve + Clone + Send + Sync + 'static, R: Resolve + Clone + Send + Sync + 'static,
R::Future: Send, R::Future: Send,
{ {
type Response = TcpStream; type Response = TcpStream;
type Error = io::Error; type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>; type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into() tower_service::Service::<Destination>::poll_ready(self, cx)
} }
fn call(&mut self, dst: Uri) -> Self::Future { fn call(&mut self, uri: Uri) -> Self::Future {
// TODO: return error here self
let dst = Destination::try_from_uri(dst).unwrap(); .call(Destination { uri })
.map_ok(|(s, _)| s)
trace!( .boxed()
"Http::connect; scheme={}, host={}, port={:?}",
dst.scheme(),
dst.host(),
dst.port(),
);
if self.config.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url::<R>(InvalidUrl::NotHttp, &self.config.handle).map_ok(|(s, _)| s).boxed();
}
} else if dst.uri.scheme_part().is_none() {
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.config.handle).map_ok(|(s, _)| s).boxed();
}
let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.config.handle).map_ok(|(s, _)| s).boxed(),
};
let port = match dst.uri.port_part() {
Some(port) => port.as_u16(),
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
};
let fut = HttpConnecting {
state: State::Lazy(self.resolver.clone(), host.into(), self.config.local_address),
handle: self.config.handle.clone(),
connect_timeout: self.config.connect_timeout,
happy_eyeballs_timeout: self.config.happy_eyeballs_timeout,
keep_alive_timeout: self.config.keep_alive_timeout,
nodelay: self.config.nodelay,
port,
reuse_address: self.config.reuse_address,
send_buffer_size: self.config.send_buffer_size,
recv_buffer_size: self.config.recv_buffer_size,
};
fut.map_ok(|(s, _)| s).boxed()
} }
} }
@@ -357,11 +320,10 @@ impl HttpInfo {
} }
} }
#[inline] fn invalid_url<R: Resolve>(err: InvalidUrl) -> HttpConnecting<R> {
fn invalid_url<R: Resolve>(err: InvalidUrl, handle: &Option<Handle>) -> HttpConnecting<R> {
HttpConnecting { HttpConnecting {
state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))), state: State::Error(Some(io::Error::new(io::ErrorKind::InvalidInput, err))),
handle: handle.clone(), handle: None,
keep_alive_timeout: None, keep_alive_timeout: None,
nodelay: false, nodelay: false,
port: 0, port: 0,