refactor(client): change retryable request future from boxed to 'impl Future'
This commit is contained in:
@@ -247,25 +247,45 @@ where C: Connect + Sync + 'static,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
let client = self.clone();
|
|
||||||
let uri = req.uri().clone();
|
|
||||||
let pool_key = (Arc::new(domain.to_string()), self.ver);
|
let pool_key = (Arc::new(domain.to_string()), self.ver);
|
||||||
let fut = RetryableSendRequest {
|
ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key)))
|
||||||
client,
|
|
||||||
future: self.send_request(req, pool_key.clone()),
|
|
||||||
pool_key,
|
|
||||||
uri,
|
|
||||||
};
|
|
||||||
ResponseFuture::new(Box::new(fut))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: replace with `impl Future` when stable
|
fn retryably_send_request(&self, req: Request<B>, pool_key: PoolKey) -> impl Future<Item=Response<Body>, Error=::Error> {
|
||||||
fn send_request(&self, mut req: Request<B>, pool_key: PoolKey) -> Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send> {
|
let client = self.clone();
|
||||||
|
let uri = req.uri().clone();
|
||||||
|
|
||||||
|
let mut send_fut = client.send_request(req, pool_key.clone());
|
||||||
|
future::poll_fn(move || loop {
|
||||||
|
match send_fut.poll() {
|
||||||
|
Ok(Async::Ready(resp)) => return Ok(Async::Ready(resp)),
|
||||||
|
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||||
|
Err(ClientError::Normal(err)) => return Err(err),
|
||||||
|
Err(ClientError::Canceled {
|
||||||
|
connection_reused,
|
||||||
|
mut req,
|
||||||
|
reason,
|
||||||
|
}) => {
|
||||||
|
if !client.retry_canceled_requests || !connection_reused {
|
||||||
|
// if client disabled, don't retry
|
||||||
|
// a fresh connection means we definitely can't retry
|
||||||
|
return Err(reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!("unstarted request canceled, trying again (reason={:?})", reason);
|
||||||
|
*req.uri_mut() = uri.clone();
|
||||||
|
send_fut = client.send_request(req, pool_key.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_request(&self, mut req: Request<B>, pool_key: PoolKey) -> impl Future<Item=Response<Body>, Error=ClientError<B>> {
|
||||||
let race = self.pool_checkout_or_connect(req.uri().clone(), pool_key);
|
let race = self.pool_checkout_or_connect(req.uri().clone(), pool_key);
|
||||||
|
|
||||||
let ver = self.ver;
|
let ver = self.ver;
|
||||||
let executor = self.executor.clone();
|
let executor = self.executor.clone();
|
||||||
Box::new(race.and_then(move |mut pooled| {
|
race.and_then(move |mut pooled| {
|
||||||
if ver == Ver::Http1 {
|
if ver == Ver::Http1 {
|
||||||
// CONNECT always sends origin-form, so check it first...
|
// CONNECT always sends origin-form, so check it first...
|
||||||
if req.method() == &Method::CONNECT {
|
if req.method() == &Method::CONNECT {
|
||||||
@@ -342,7 +362,7 @@ where C: Connect + Sync + 'static,
|
|||||||
}
|
}
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}))
|
}))
|
||||||
}))
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pool_checkout_or_connect(&self, uri: Uri, pool_key: PoolKey)
|
fn pool_checkout_or_connect(&self, uri: Uri, pool_key: PoolKey)
|
||||||
@@ -529,55 +549,6 @@ impl Future for ResponseFuture {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct RetryableSendRequest<C, B> {
|
|
||||||
client: Client<C, B>,
|
|
||||||
future: Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send>,
|
|
||||||
pool_key: PoolKey,
|
|
||||||
uri: Uri,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C, B> fmt::Debug for RetryableSendRequest<C, B> {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
f.pad("Future<Response>")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C, B> Future for RetryableSendRequest<C, B>
|
|
||||||
where
|
|
||||||
C: Connect + 'static,
|
|
||||||
C::Future: 'static,
|
|
||||||
B: Payload + Send + 'static,
|
|
||||||
B::Data: Send,
|
|
||||||
{
|
|
||||||
type Item = Response<Body>;
|
|
||||||
type Error = ::Error;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
loop {
|
|
||||||
match self.future.poll() {
|
|
||||||
Ok(Async::Ready(resp)) => return Ok(Async::Ready(resp)),
|
|
||||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
|
||||||
Err(ClientError::Normal(err)) => return Err(err),
|
|
||||||
Err(ClientError::Canceled {
|
|
||||||
connection_reused,
|
|
||||||
mut req,
|
|
||||||
reason,
|
|
||||||
}) => {
|
|
||||||
if !self.client.retry_canceled_requests || !connection_reused {
|
|
||||||
// if client disabled, don't retry
|
|
||||||
// a fresh connection means we definitely can't retry
|
|
||||||
return Err(reason);
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!("unstarted request canceled, trying again (reason={:?})", reason);
|
|
||||||
*req.uri_mut() = self.uri.clone();
|
|
||||||
self.future = self.client.send_request(req, self.pool_key.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
struct PoolClient<B> {
|
struct PoolClient<B> {
|
||||||
@@ -614,14 +585,14 @@ impl<B> PoolClient<B> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<B: Payload + 'static> PoolClient<B> {
|
impl<B: Payload + 'static> PoolClient<B> {
|
||||||
fn send_request_retryable(&mut self, req: Request<B>) -> impl_trait!(ty: Future<Item = Response<Body>, Error = (::Error, Option<Request<B>>)> + Send)
|
fn send_request_retryable(&mut self, req: Request<B>) -> impl Future<Item = Response<Body>, Error = (::Error, Option<Request<B>>)>
|
||||||
where
|
where
|
||||||
B: Send,
|
B: Send,
|
||||||
{
|
{
|
||||||
impl_trait!(e: match self.tx {
|
match self.tx {
|
||||||
PoolTx::Http1(ref mut tx) => Either::A(tx.send_request_retryable(req)),
|
PoolTx::Http1(ref mut tx) => Either::A(tx.send_request_retryable(req)),
|
||||||
PoolTx::Http2(ref mut tx) => Either::B(tx.send_request_retryable(req)),
|
PoolTx::Http2(ref mut tx) => Either::B(tx.send_request_retryable(req)),
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user