feat(client): replace default dispatcher

This commit is contained in:
Sean McArthur
2017-12-28 17:18:42 -08:00
parent 6fde13f759
commit 0892cb2777
3 changed files with 55 additions and 198 deletions

View File

@@ -32,9 +32,7 @@ fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap(); let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle(); let handle = core.handle();
let client = Client::configure() let client = Client::new(&handle);
.no_proto()
.build(&handle);
let work = client.get(url).and_then(|res| { let work = client.get(url).and_then(|res| {
println!("Response: {}", res.status()); println!("Response: {}", res.status());

View File

@@ -7,24 +7,17 @@ use std::marker::PhantomData;
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
use futures::{future, Poll, Async, Future, Stream}; use futures::{future, Poll, Future, Stream};
use futures::unsync::oneshot;
#[cfg(feature = "compat")] #[cfg(feature = "compat")]
use http; use http;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle; use tokio::reactor::Handle;
use tokio_proto::BindClient;
use tokio_proto::streaming::Message;
use tokio_proto::streaming::pipeline::ClientProto;
use tokio_proto::util::client_proxy::ClientProxy;
pub use tokio_service::Service; pub use tokio_service::Service;
use header::{Headers, Host}; use header::{Headers, Host};
use proto::{self, RequestHead, TokioBody}; use proto;
use proto::response;
use proto::request; use proto::request;
use method::Method; use method::Method;
use self::pool::{Pool, Pooled}; use self::pool::Pool;
use uri::{self, Uri}; use uri::{self, Uri};
use version::HttpVersion; use version::HttpVersion;
@@ -45,7 +38,7 @@ pub mod compat;
pub struct Client<C, B = proto::Body> { pub struct Client<C, B = proto::Body> {
connector: C, connector: C,
handle: Handle, handle: Handle,
pool: Dispatch<B>, pool: Pool<HyperClient<B>>,
} }
impl Client<HttpConnector, proto::Body> { impl Client<HttpConnector, proto::Body> {
@@ -93,11 +86,7 @@ impl<C, B> Client<C, B> {
Client { Client {
connector: config.connector, connector: config.connector,
handle: handle.clone(), handle: handle.clone(),
pool: if config.no_proto { pool: Pool::new(config.keep_alive, config.keep_alive_timeout)
Dispatch::Hyper(Pool::new(config.keep_alive, config.keep_alive_timeout))
} else {
Dispatch::Proto(Pool::new(config.keep_alive, config.keep_alive_timeout))
}
} }
} }
} }
@@ -191,105 +180,54 @@ where C: Connect,
headers.extend(head.headers.iter()); headers.extend(head.headers.iter());
head.headers = headers; head.headers = headers;
match self.pool { use futures::Sink;
Dispatch::Proto(ref pool) => { use futures::sync::{mpsc, oneshot};
trace!("proto_dispatch");
let checkout = pool.checkout(domain.as_ref());
let connect = {
let handle = self.handle.clone();
let pool = pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = oneshot::channel();
let client = HttpClient {
client_rx: RefCell::new(Some(rx)),
}.bind_client(&handle, io);
let pooled = pool.pooled(pool_key, client);
drop(tx.send(pooled.clone()));
pooled
})
};
let race = checkout.select(connect) let checkout = self.pool.checkout(domain.as_ref());
.map(|(client, _work)| client) let connect = {
.map_err(|(e, _work)| { let handle = self.handle.clone();
// the Pool Checkout cannot error, so the only error let pool = self.pool.clone();
// is from the Connector let pool_key = Rc::new(domain.to_string());
// XXX: should wait on the Checkout? Problem is self.connector.connect(url)
// that if the connector is failing, it may be that we .map(move |io| {
// never had a pooled stream at all let (tx, rx) = mpsc::channel(0);
e.into() let tx = HyperClient {
}); tx: RefCell::new(tx),
let resp = race.and_then(move |client| { should_close: true,
let msg = match body {
Some(body) => {
Message::WithBody(head, body.into())
},
None => Message::WithoutBody(head),
}; };
client.call(msg) let pooled = pool.pooled(pool_key, tx);
}); let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
FutureResponse(Box::new(resp.map(|msg| { let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
match msg { handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err)));
Message::WithoutBody(head) => response::from_wire(head, None), pooled
Message::WithBody(head, body) => response::from_wire(head, Some(body.into())), })
} };
})))
},
Dispatch::Hyper(ref pool) => {
trace!("no_proto dispatch");
use futures::Sink;
use futures::sync::{mpsc, oneshot};
let checkout = pool.checkout(domain.as_ref()); let race = checkout.select(connect)
let connect = { .map(|(client, _work)| client)
let handle = self.handle.clone(); .map_err(|(e, _work)| {
let pool = pool.clone(); // the Pool Checkout cannot error, so the only error
let pool_key = Rc::new(domain.to_string()); // is from the Connector
self.connector.connect(url) // XXX: should wait on the Checkout? Problem is
.map(move |io| { // that if the connector is failing, it may be that we
let (tx, rx) = mpsc::channel(0); // never had a pooled stream at all
let tx = HyperClient { e.into()
tx: RefCell::new(tx), });
should_close: true,
};
let pooled = pool.pooled(pool_key, tx);
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err)));
pooled
})
};
let race = checkout.select(connect) let resp = race.and_then(move |mut client| {
.map(|(client, _work)| client) let (callback, rx) = oneshot::channel();
.map_err(|(e, _work)| { client.tx.borrow_mut().start_send(proto::dispatch::ClientMsg::Request(head, body, callback)).unwrap();
// the Pool Checkout cannot error, so the only error client.should_close = false;
// is from the Connector rx.then(|res| {
// XXX: should wait on the Checkout? Problem is match res {
// that if the connector is failing, it may be that we Ok(Ok(res)) => Ok(res),
// never had a pooled stream at all Ok(Err(err)) => Err(err),
e.into() Err(_) => panic!("dispatch dropped without returning error"),
}); }
})
});
let resp = race.and_then(move |mut client| { FutureResponse(Box::new(resp))
let (callback, rx) = oneshot::channel();
client.tx.borrow_mut().start_send(proto::dispatch::ClientMsg::Request(head, body, callback)).unwrap();
client.should_close = false;
rx.then(|res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Err(_) => panic!("dispatch dropped without returning error"),
}
})
});
FutureResponse(Box::new(resp))
}
}
} }
} }
@@ -299,10 +237,7 @@ impl<C: Clone, B> Clone for Client<C, B> {
Client { Client {
connector: self.connector.clone(), connector: self.connector.clone(),
handle: self.handle.clone(), handle: self.handle.clone(),
pool: match self.pool { pool: self.pool.clone(),
Dispatch::Proto(ref pool) => Dispatch::Proto(pool.clone()),
Dispatch::Hyper(ref pool) => Dispatch::Hyper(pool.clone()),
}
} }
} }
} }
@@ -313,8 +248,6 @@ impl<C, B> fmt::Debug for Client<C, B> {
} }
} }
type ProtoClient<B> = ClientProxy<Message<RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>;
struct HyperClient<B> { struct HyperClient<B> {
tx: RefCell<::futures::sync::mpsc::Sender<proto::dispatch::ClientMsg<B>>>, tx: RefCell<::futures::sync::mpsc::Sender<proto::dispatch::ClientMsg<B>>>,
should_close: bool, should_close: bool,
@@ -338,60 +271,6 @@ impl<B> Drop for HyperClient<B> {
} }
} }
enum Dispatch<B> {
Proto(Pool<ProtoClient<B>>),
Hyper(Pool<HyperClient<B>>),
}
struct HttpClient<B> {
client_rx: RefCell<Option<oneshot::Receiver<Pooled<ProtoClient<B>>>>>,
}
impl<T, B> ClientProto<T> for HttpClient<B>
where T: AsyncRead + AsyncWrite + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Request = proto::RequestHead;
type RequestBody = B::Item;
type Response = proto::ResponseHead;
type ResponseBody = proto::Chunk;
type Error = ::Error;
type Transport = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<ProtoClient<B>>>;
type BindTransport = BindingClient<T, B>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
BindingClient {
rx: self.client_rx.borrow_mut().take().expect("client_rx was lost"),
io: Some(io),
}
}
}
struct BindingClient<T, B> {
rx: oneshot::Receiver<Pooled<ProtoClient<B>>>,
io: Option<T>,
}
impl<T, B> Future for BindingClient<T, B>
where T: AsyncRead + AsyncWrite + 'static,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type Item = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<ProtoClient<B>>>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.rx.poll() {
Ok(Async::Ready(client)) => Ok(Async::Ready(
proto::Conn::new(self.io.take().expect("binding client io lost"), client)
)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_canceled) => unreachable!(),
}
}
}
/// Configuration for a Client /// Configuration for a Client
pub struct Config<C, B> { pub struct Config<C, B> {
_body_type: PhantomData<B>, _body_type: PhantomData<B>,
@@ -490,10 +369,9 @@ impl<C, B> Config<C, B> {
} }
*/ */
/// Disable tokio-proto internal usage. #[doc(hidden)]
#[inline] #[deprecated(since="0.11.11", note="no_proto is always enabled")]
pub fn no_proto(mut self) -> Config<C, B> { pub fn no_proto(self) -> Config<C, B> {
self.no_proto = true;
self self
} }
} }

View File

@@ -19,24 +19,13 @@ use futures::sync::oneshot;
use tokio_core::reactor::{Core, Handle}; use tokio_core::reactor::{Core, Handle};
fn client(handle: &Handle) -> Client<HttpConnector> { fn client(handle: &Handle) -> Client<HttpConnector> {
let mut config = Client::configure(); Client::new(handle)
if env("HYPER_NO_PROTO", "1") {
config = config.no_proto();
}
config.build(handle)
} }
fn s(buf: &[u8]) -> &str { fn s(buf: &[u8]) -> &str {
::std::str::from_utf8(buf).unwrap() ::std::str::from_utf8(buf).unwrap()
} }
fn env(name: &str, val: &str) -> bool {
match ::std::env::var(name) {
Ok(var) => var == val,
Err(_) => false,
}
}
macro_rules! test { macro_rules! test {
( (
name: $name:ident, name: $name:ident,
@@ -463,8 +452,7 @@ test! {
body: None, body: None,
proxy: false, proxy: false,
error: |err| match err { error: |err| match err {
&hyper::Error::Version if env("HYPER_NO_PROTO", "1") => true, &hyper::Error::Version => true,
&hyper::Error::Io(_) if !env("HYPER_NO_PROTO", "1") => true,
_ => false, _ => false,
}, },
@@ -606,7 +594,6 @@ mod dispatch_impl {
let closes = Arc::new(AtomicUsize::new(0)); let closes = Arc::new(AtomicUsize::new(0));
let client = Client::configure() let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &core.handle()), closes.clone())) .connector(DebugConnector(HttpConnector::new(1, &core.handle()), closes.clone()))
.no_proto()
.build(&handle); .build(&handle);
let (tx1, rx1) = oneshot::channel(); let (tx1, rx1) = oneshot::channel();
@@ -666,7 +653,6 @@ mod dispatch_impl {
let res = { let res = {
let client = Client::configure() let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.no_proto()
.build(&handle); .build(&handle);
client.get(uri).and_then(move |res| { client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok); assert_eq!(res.status(), hyper::StatusCode::Ok);
@@ -717,7 +703,6 @@ mod dispatch_impl {
let client = Client::configure() let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.no_proto()
.build(&handle); .build(&handle);
let res = client.get(uri).and_then(move |res| { let res = client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok); assert_eq!(res.status(), hyper::StatusCode::Ok);
@@ -767,7 +752,6 @@ mod dispatch_impl {
let res = { let res = {
let client = Client::configure() let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.no_proto()
.build(&handle); .build(&handle);
client.get(uri) client.get(uri)
}; };
@@ -812,7 +796,6 @@ mod dispatch_impl {
let res = { let res = {
let client = Client::configure() let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.no_proto()
.build(&handle); .build(&handle);
// notably, havent read body yet // notably, havent read body yet
client.get(uri) client.get(uri)
@@ -852,7 +835,6 @@ mod dispatch_impl {
let client = Client::configure() let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.no_proto()
.keep_alive(false) .keep_alive(false)
.build(&handle); .build(&handle);
let res = client.get(uri).and_then(move |res| { let res = client.get(uri).and_then(move |res| {
@@ -892,7 +874,6 @@ mod dispatch_impl {
let client = Client::configure() let client = Client::configure()
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
.no_proto()
.build(&handle); .build(&handle);
let res = client.get(uri).and_then(move |res| { let res = client.get(uri).and_then(move |res| {
assert_eq!(res.status(), hyper::StatusCode::Ok); assert_eq!(res.status(), hyper::StatusCode::Ok);