feat(client): Client will retry requests on fresh connections

If a request sees an error on a pooled connection before ever writing
any bytes, it will now retry with a new connection.

This can be configured with `Config::retry_canceled_requests(bool)`.
This commit is contained in:
Sean McArthur
2018-02-15 12:04:58 -08:00
parent 0ea3bcf8d5
commit ee61ea9adf
6 changed files with 234 additions and 124 deletions

View File

@@ -4,8 +4,8 @@ use futures::sync::{mpsc, oneshot};
use common::Never;
use super::cancel::{Cancel, Canceled};
pub type Callback<U> = oneshot::Sender<::Result<U>>;
pub type Promise<U> = oneshot::Receiver<::Result<U>>;
pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>;
pub type Promise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>;
pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::unbounded();
@@ -23,7 +23,7 @@ pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
pub struct Sender<T, U> {
cancel: Cancel,
inner: mpsc::UnboundedSender<(T, Callback<U>)>,
inner: mpsc::UnboundedSender<(T, Callback<T, U>)>,
}
impl<T, U> Sender<T, U> {
@@ -35,7 +35,7 @@ impl<T, U> Sender<T, U> {
self.cancel.cancel();
}
pub fn send(&self, val: T) -> Result<Promise<U>, T> {
pub fn send(&self, val: T) -> Result<Promise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner.unbounded_send((val, tx))
.map(move |_| rx)
@@ -54,11 +54,11 @@ impl<T, U> Clone for Sender<T, U> {
pub struct Receiver<T, U> {
canceled: Canceled,
inner: mpsc::UnboundedReceiver<(T, Callback<U>)>,
inner: mpsc::UnboundedReceiver<(T, Callback<T, U>)>,
}
impl<T, U> Stream for Receiver<T, U> {
type Item = (T, Callback<U>);
type Item = (T, Callback<T, U>);
type Error = Never;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
@@ -83,9 +83,9 @@ impl<T, U> Drop for Receiver<T, U> {
// - Ready(None): the end. we want to stop looping
// - NotReady: unreachable
// - Err: unreachable
while let Ok(Async::Ready(Some((_val, cb)))) = self.inner.poll() {
while let Ok(Async::Ready(Some((val, cb)))) = self.inner.poll() {
// maybe in future, we pass the value along with the error?
let _ = cb.send(Err(::Error::new_canceled(None)));
let _ = cb.send(Err((::Error::new_canceled(None), Some(val))));
}
}

View File

@@ -38,12 +38,12 @@ mod pool;
pub mod compat;
/// A Client to make outgoing HTTP requests.
// If the Connector is clone, then the Client can be clone easily.
pub struct Client<C, B = proto::Body> {
connector: C,
connector: Rc<C>,
executor: Exec,
h1_writev: bool,
pool: Pool<HyperClient<B>>,
retry_canceled_requests: bool,
}
impl Client<HttpConnector, proto::Body> {
@@ -95,10 +95,11 @@ impl<C, B> Client<C, B> {
#[inline]
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> {
Client {
connector: config.connector,
connector: Rc::new(config.connector),
executor: exec,
h1_writev: config.h1_writev,
pool: Pool::new(config.keep_alive, config.keep_alive_timeout)
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
retry_canceled_requests: config.retry_canceled_requests,
}
}
}
@@ -116,8 +117,46 @@ where C: Connect,
/// Send a constructed Request using this Client.
#[inline]
pub fn request(&self, req: Request<B>) -> FutureResponse {
self.call(req)
pub fn request(&self, mut req: Request<B>) -> FutureResponse {
match req.version() {
HttpVersion::Http10 |
HttpVersion::Http11 => (),
other => {
error!("Request has unsupported version \"{}\"", other);
return FutureResponse(Box::new(future::err(::Error::Version)));
}
}
let domain = match uri::scheme_and_authority(req.uri()) {
Some(uri) => uri,
None => {
return FutureResponse(Box::new(future::err(::Error::Io(
io::Error::new(
io::ErrorKind::InvalidInput,
"invalid URI for Client Request"
)
))));
}
};
if !req.headers().has::<Host>() {
let host = Host::new(
domain.host().expect("authority implies host").to_owned(),
domain.port(),
);
req.headers_mut().set_pos(0, host);
}
let client = self.clone();
let is_proxy = req.is_proxy();
let uri = req.uri().clone();
let fut = RetryableSendRequest {
client: client,
future: self.send_request(req, &domain),
domain: domain,
is_proxy: is_proxy,
uri: uri,
};
FutureResponse(Box::new(fut))
}
/// Send an `http::Request` using this Client.
@@ -132,68 +171,11 @@ where C: Connect,
pub fn into_compat(self) -> compat::CompatClient<C, B> {
self::compat::client(self)
}
}
/// A `Future` that will resolve to an HTTP Response.
#[must_use = "futures do nothing unless polled"]
pub struct FutureResponse(Box<Future<Item=Response, Error=::Error> + 'static>);
impl fmt::Debug for FutureResponse {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Future<Response>")
}
}
impl Future for FutureResponse {
type Item = Response;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
}
}
impl<C, B> Service for Client<C, B>
where C: Connect,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Request = Request<B>;
type Response = Response;
type Error = ::Error;
type Future = FutureResponse;
fn call(&self, req: Self::Request) -> Self::Future {
match req.version() {
HttpVersion::Http10 |
HttpVersion::Http11 => (),
other => {
error!("Request has unsupported version \"{}\"", other);
return FutureResponse(Box::new(future::err(::Error::Version)));
}
}
//TODO: replace with `impl Future` when stable
fn send_request(&self, req: Request<B>, domain: &Uri) -> Box<Future<Item=Response, Error=ClientError<B>>> {
let url = req.uri().clone();
let domain = match uri::scheme_and_authority(&url) {
Some(uri) => uri,
None => {
return FutureResponse(Box::new(future::err(::Error::Io(
io::Error::new(
io::ErrorKind::InvalidInput,
"invalid URI for Client Request"
)
))));
}
};
let (mut head, body) = request::split(req);
if !head.headers.has::<Host>() {
let host = Host::new(
domain.host().expect("authority implies host").to_owned(),
domain.port(),
);
head.headers.set_pos(0, host);
}
let (head, body) = request::split(req);
let checkout = self.pool.checkout(domain.as_ref());
let connect = {
let executor = self.executor.clone();
@@ -220,53 +202,147 @@ where C: Connect,
let race = checkout.select(connect)
.map(|(client, _work)| client)
.map_err(|(e, _work)| {
.map_err(|(e, _checkout)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
e.into()
ClientError::Normal(e.into())
});
let resp = race.and_then(move |client| {
let conn_reused = client.is_reused();
match client.tx.send((head, body)) {
Ok(rx) => {
client.should_close.set(false);
Either::A(rx.then(|res| {
Either::A(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Ok(Err((err, orig_req))) => Err(match orig_req {
Some(req) => ClientError::Canceled {
connection_reused: conn_reused,
reason: err,
req: req,
},
None => ClientError::Normal(err),
}),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
},
Err(_) => {
error!("pooled connection was not ready, this is a hyper bug");
Either::B(future::err(::Error::new_canceled(None)))
Err(req) => {
debug!("pooled connection was not ready");
let err = ClientError::Canceled {
connection_reused: conn_reused,
reason: ::Error::new_canceled(None),
req: req,
};
Either::B(future::err(err))
}
}
});
FutureResponse(Box::new(resp))
Box::new(resp)
}
}
impl<C: Clone, B> Clone for Client<C, B> {
impl<C, B> Service for Client<C, B>
where C: Connect,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Request = Request<B>;
type Response = Response;
type Error = ::Error;
type Future = FutureResponse;
fn call(&self, req: Self::Request) -> Self::Future {
self.request(req)
}
}
impl<C, B> Clone for Client<C, B> {
fn clone(&self) -> Client<C, B> {
Client {
connector: self.connector.clone(),
executor: self.executor.clone(),
h1_writev: self.h1_writev,
pool: self.pool.clone(),
retry_canceled_requests: self.retry_canceled_requests,
}
}
}
impl<C, B> fmt::Debug for Client<C, B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Client")
f.debug_struct("Client")
.finish()
}
}
/// A `Future` that will resolve to an HTTP Response.
#[must_use = "futures do nothing unless polled"]
pub struct FutureResponse(Box<Future<Item=Response, Error=::Error> + 'static>);
impl fmt::Debug for FutureResponse {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Future<Response>")
}
}
impl Future for FutureResponse {
type Item = Response;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
}
}
struct RetryableSendRequest<C, B> {
client: Client<C, B>,
domain: Uri,
future: Box<Future<Item=Response, Error=ClientError<B>>>,
is_proxy: bool,
uri: Uri,
}
impl<C, B> Future for RetryableSendRequest<C, B>
where
C: Connect,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Item = Response;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.future.poll() {
Ok(Async::Ready(resp)) => return Ok(Async::Ready(resp)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(ClientError::Normal(err)) => return Err(err),
Err(ClientError::Canceled {
connection_reused,
req,
reason,
}) => {
if !self.client.retry_canceled_requests || !connection_reused {
// if client disabled, don't retry
// a fresh connection means we definitely can't retry
return Err(reason);
}
trace!("unstarted request canceled, trying again (reason={:?})", reason);
let mut req = request::join(req);
req.set_proxy(self.is_proxy);
req.set_uri(self.uri.clone());
self.future = self.client.send_request(req, &self.domain);
}
}
}
}
}
@@ -303,6 +379,15 @@ impl<B> Drop for HyperClient<B> {
}
}
pub(crate) enum ClientError<B> {
Normal(::Error),
Canceled {
connection_reused: bool,
req: (::proto::RequestHead, Option<B>),
reason: ::Error,
}
}
/// Configuration for a Client
pub struct Config<C, B> {
_body_type: PhantomData<B>,
@@ -313,6 +398,7 @@ pub struct Config<C, B> {
h1_writev: bool,
//TODO: make use of max_idle config
max_idle: usize,
retry_canceled_requests: bool,
}
/// Phantom type used to signal that `Config` should create a `HttpConnector`.
@@ -323,12 +409,12 @@ impl Default for Config<UseDefaultConnector, proto::Body> {
fn default() -> Config<UseDefaultConnector, proto::Body> {
Config {
_body_type: PhantomData::<proto::Body>,
//connect_timeout: Duration::from_secs(10),
connector: UseDefaultConnector(()),
keep_alive: true,
keep_alive_timeout: Some(Duration::from_secs(90)),
h1_writev: true,
max_idle: 5,
retry_canceled_requests: true,
}
}
}
@@ -347,12 +433,12 @@ impl<C, B> Config<C, B> {
pub fn body<BB>(self) -> Config<C, BB> {
Config {
_body_type: PhantomData::<BB>,
//connect_timeout: self.connect_timeout,
connector: self.connector,
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
h1_writev: self.h1_writev,
max_idle: self.max_idle,
retry_canceled_requests: self.retry_canceled_requests,
}
}
@@ -361,12 +447,12 @@ impl<C, B> Config<C, B> {
pub fn connector<CC>(self, val: CC) -> Config<CC, B> {
Config {
_body_type: self._body_type,
//connect_timeout: self.connect_timeout,
connector: val,
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
h1_writev: self.h1_writev,
max_idle: self.max_idle,
retry_canceled_requests: self.retry_canceled_requests,
}
}
@@ -390,17 +476,6 @@ impl<C, B> Config<C, B> {
self
}
/*
/// Set the timeout for connecting to a URL.
///
/// Default is 10 seconds.
#[inline]
pub fn connect_timeout(mut self, val: Duration) -> Config<C, B> {
self.connect_timeout = val;
self
}
*/
/// Set whether HTTP/1 connections should try to use vectored writes,
/// or always flatten into a single buffer.
///
@@ -408,13 +483,30 @@ impl<C, B> Config<C, B> {
/// but may also improve performance when an IO transport doesn't
/// support vectored writes well, such as most TLS implementations.
///
/// Default is true.
/// Default is `true`.
#[inline]
pub fn http1_writev(mut self, val: bool) -> Config<C, B> {
self.h1_writev = val;
self
}
/// Set whether to retry requests that get disrupted before ever starting
/// to write.
///
/// This means a request that is queued, and gets given an idle, reused
/// connection, and then encounters an error immediately as the idle
/// connection was found to be unusable.
///
/// When this is set to `false`, the related `FutureResponse` would instead
/// resolve to an `Error::Cancel`.
///
/// Default is `true`.
#[inline]
pub fn retry_canceled_requests(mut self, val: bool) -> Config<C, B> {
self.retry_canceled_requests = val;
self
}
#[doc(hidden)]
#[deprecated(since="0.11.11", note="no_proto is always enabled")]
pub fn no_proto(self) -> Config<C, B> {

View File

@@ -211,6 +211,12 @@ pub struct Pooled<T> {
pool: Weak<RefCell<PoolInner<T>>>,
}
impl<T> Pooled<T> {
pub fn is_reused(&self) -> bool {
self.entry.is_reused
}
}
impl<T> Deref for Pooled<T> {
type Target = T;
fn deref(&self) -> &T {