feat(client): provide tower::Service support for clients (#1915)

This commit is contained in:
Lucio Franco
2019-08-30 15:54:22 -04:00
committed by Sean McArthur
parent eef407d60e
commit eee2a72879
6 changed files with 184 additions and 9 deletions

View File

@@ -27,7 +27,7 @@ futures-core-preview = { version = "=0.3.0-alpha.18" }
futures-channel-preview = { version = "=0.3.0-alpha.18" }
futures-util-preview = { version = "=0.3.0-alpha.18" }
http = "0.1.15"
http-body = { git = "https://github.com/hyperium/http-body" }
http-body = "0.2.0-alpha.1"
httparse = "1.0"
h2 = { git = "https://github.com/hyperium/h2" }
iovec = "0.1"
@@ -38,6 +38,7 @@ pin-utils = "=0.1.0-alpha.4"
time = "0.1"
tokio = { version = "=0.2.0-alpha.4", optional = true, default-features = false, features = ["rt-full"] }
tower-service = "=0.3.0-alpha.1"
tower-make = { version = "0.1.0-alpha.2", features = ['io'] }
tokio-executor = { version = "=0.2.0-alpha.4", features = ["blocking"] }
tokio-io = "=0.2.0-alpha.4"
tokio-sync = "=0.2.0-alpha.4"

26
examples/tower_client.rs Normal file
View File

@@ -0,0 +1,26 @@
use hyper::client::service::{Connect, Service, MakeService};
use hyper::client::conn::Builder;
use hyper::client::connect::HttpConnector;
use hyper::{Body, Request};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
pretty_env_logger::init();
let mut mk_svc = Connect::new(HttpConnector::new(), Builder::new());
let uri = "http://127.0.0.1:8080".parse::<http::Uri>()?;
let mut svc = mk_svc.make_service(uri.clone()).await?;
let body = Body::empty();
let req = Request::get(uri).body(body)?;
let res = svc.call(req).await?;
println!("RESPONSE={:?}", res);
Ok(())
}

View File

@@ -14,6 +14,7 @@ use std::sync::Arc;
use bytes::Bytes;
use futures_util::future::{self, Either, FutureExt as _};
use tokio_io::{AsyncRead, AsyncWrite};
use tower_service::Service;
use crate::body::Payload;
use crate::common::{Exec, Future, Pin, Poll, task};
@@ -242,18 +243,21 @@ where
}
}
/* TODO(0.12.0): when we change from tokio-service to tower.
impl<T, B> Service for SendRequest<T, B> {
type Request = Request<B>;
type Response = Response;
type Error = ::Error;
impl<B> Service<Request<B>> for SendRequest<B>
where
B: Payload + 'static, {
type Response = Response<Body>;
type Error = crate::Error;
type Future = ResponseFuture;
fn call(&self, req: Self::Request) -> Self::Future {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_ready(cx)
}
fn call(&mut self, req: Request<B>) -> Self::Future {
self.send_request(req)
}
}
*/
impl<B> fmt::Debug for SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

View File

@@ -5,7 +5,8 @@ use std::mem;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;
use http::uri::Scheme;
use http::uri::{Scheme, Uri};
use futures_util::{TryFutureExt, FutureExt};
use net2::TcpBuilder;
use tokio_net::driver::Handle;
use tokio_net::tcp::TcpStream;
@@ -248,6 +249,63 @@ where
}
}
impl<R> tower_service::Service<Uri> for HttpConnector<R>
where
R: Resolve + Clone + Send + Sync + 'static,
R::Future: Send,
{
type Response = TcpStream;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, dst: Uri) -> Self::Future {
// TODO: return error here
let dst = Destination::try_from_uri(dst).unwrap();
trace!(
"Http::connect; scheme={}, host={}, port={:?}",
dst.scheme(),
dst.host(),
dst.port(),
);
if self.enforce_http {
if dst.uri.scheme_part() != Some(&Scheme::HTTP) {
return invalid_url::<R>(InvalidUrl::NotHttp, &self.handle).map_ok(|(s, _)| s).boxed();
}
} else if dst.uri.scheme_part().is_none() {
return invalid_url::<R>(InvalidUrl::MissingScheme, &self.handle).map_ok(|(s, _)| s).boxed();
}
let host = match dst.uri.host() {
Some(s) => s,
None => return invalid_url::<R>(InvalidUrl::MissingAuthority, &self.handle).map_ok(|(s, _)| s).boxed(),
};
let port = match dst.uri.port_part() {
Some(port) => port.as_u16(),
None => if dst.uri.scheme_part() == Some(&Scheme::HTTPS) { 443 } else { 80 },
};
let fut = HttpConnecting {
state: State::Lazy(self.resolver.clone(), host.into(), self.local_address),
handle: self.handle.clone(),
happy_eyeballs_timeout: self.happy_eyeballs_timeout,
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
port,
reuse_address: self.reuse_address,
send_buffer_size: self.send_buffer_size,
recv_buffer_size: self.recv_buffer_size,
};
fut.map_ok(|(s, _)| s).boxed()
}
}
impl HttpInfo {
/// Get the remote address of the transport used.
pub fn remote_addr(&self) -> SocketAddr {

View File

@@ -81,6 +81,7 @@ pub mod conn;
pub mod connect;
pub(crate) mod dispatch;
mod pool;
pub mod service;
#[cfg(test)]
mod tests;

85
src/client/service.rs Normal file
View File

@@ -0,0 +1,85 @@
//! Utilities used to interact with the Tower ecosystem.
//!
//! This module provides exports of `Service`, `MakeService` and `Connect` which
//! all provide hook-ins into the Tower ecosystem.
use super::conn::{SendRequest, Builder};
use std::marker::PhantomData;
use crate::{common::{Poll, task, Pin}, body::Payload};
use std::future::Future;
use std::error::Error as StdError;
use tower_make::MakeConnection;
pub use tower_service::Service;
pub use tower_make::MakeService;
/// Creates a connection via `SendRequest`.
///
/// This accepts a `hyper::client::conn::Builder` and provides
/// a `MakeService` implementation to create connections from some
/// target `T`.
#[derive(Debug)]
pub struct Connect<C, B, T> {
inner: C,
builder: Builder,
_pd: PhantomData<fn(T, B)>
}
impl<C, B, T> Connect<C, B, T> {
/// Create a new `Connect` with some inner connector `C` and a connection
/// builder.
pub fn new(inner: C, builder: Builder) -> Self {
Self {
inner,
builder,
_pd: PhantomData
}
}
}
impl<C, B, T> Service<T> for Connect<C, B, T>
where
C: MakeConnection<T>,
C::Connection: Unpin + Send + 'static,
C::Future: Send + 'static,
C::Error: Into<Box<dyn StdError + Send + Sync>> + Send,
B: Payload + Unpin + 'static,
B::Data: Unpin,
{
type Response = SendRequest<B>;
type Error = crate::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(|e| crate::Error::new(crate::error::Kind::Connect).with(e.into()))
}
fn call(&mut self, req: T) -> Self::Future {
let builder = self.builder.clone();
let io = self.inner.make_connection(req);
let fut = async move {
match io.await {
Ok(io) => {
match builder.handshake(io).await {
Ok((sr, conn)) => {
builder.exec.execute(async move {
if let Err(e) = conn.await {
debug!("connection error: {:?}", e);
}
})?;
Ok(sr)
},
Err(e) => Err(e)
}
},
Err(e) => {
let err = crate::Error::new(crate::error::Kind::Connect).with(e.into());
Err(err)
}
}
};
Box::pin(fut)
}
}