refactor(client): breakout checkout and connect race into separate function
This commit is contained in:
@@ -260,72 +260,10 @@ where C: Connect + Sync + 'static,
|
||||
|
||||
//TODO: replace with `impl Future` when stable
|
||||
fn send_request(&self, mut req: Request<B>, domain: &str) -> Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send> {
|
||||
let ver = self.ver;
|
||||
let pool_key = (Arc::new(domain.to_string()), self.ver);
|
||||
let checkout = self.pool.checkout(pool_key.clone());
|
||||
let connect = self.connect_to(req.uri().clone(), pool_key);
|
||||
|
||||
let executor = self.executor.clone();
|
||||
// The order of the `select` is depended on below...
|
||||
let race = checkout.select2(connect)
|
||||
.map(move |either| match either {
|
||||
// Checkout won, connect future may have been started or not.
|
||||
//
|
||||
// If it has, let it finish and insert back into the pool,
|
||||
// so as to not waste the socket...
|
||||
Either::A((checked_out, connecting)) => {
|
||||
// This depends on the `select` above having the correct
|
||||
// order, such that if the checkout future were ready
|
||||
// immediately, the connect future will never have been
|
||||
// started.
|
||||
//
|
||||
// If it *wasn't* ready yet, then the connect future will
|
||||
// have been started...
|
||||
if connecting.started() {
|
||||
let bg = connecting
|
||||
.map(|_pooled| {
|
||||
// dropping here should just place it in
|
||||
// the Pool for us...
|
||||
})
|
||||
.map_err(|err| {
|
||||
trace!("background connect error: {}", err);
|
||||
});
|
||||
// An execute error here isn't important, we're just trying
|
||||
// to prevent a waste of a socket...
|
||||
let _ = executor.execute(bg);
|
||||
}
|
||||
checked_out
|
||||
},
|
||||
// Connect won, checkout can just be dropped.
|
||||
Either::B((connected, _checkout)) => {
|
||||
connected
|
||||
},
|
||||
})
|
||||
.or_else(|either| match either {
|
||||
// Either checkout or connect could get canceled:
|
||||
//
|
||||
// 1. Connect is canceled if this is HTTP/2 and there is
|
||||
// an outstanding HTTP/2 connecting task.
|
||||
// 2. Checkout is canceled if the pool cannot deliver an
|
||||
// idle connection reliably.
|
||||
//
|
||||
// In both cases, we should just wait for the other future.
|
||||
Either::A((err, connecting)) => {
|
||||
if err.is_canceled() {
|
||||
Either::A(Either::A(connecting.map_err(ClientError::Normal)))
|
||||
} else {
|
||||
Either::B(future::err(ClientError::Normal(err)))
|
||||
}
|
||||
},
|
||||
Either::B((err, checkout)) => {
|
||||
if err.is_canceled() {
|
||||
Either::A(Either::B(checkout.map_err(ClientError::Normal)))
|
||||
} else {
|
||||
Either::B(future::err(ClientError::Normal(err)))
|
||||
}
|
||||
}
|
||||
});
|
||||
let race = self.pool_checkout_or_connect(req.uri().clone(), pool_key);
|
||||
|
||||
let ver = self.ver;
|
||||
let executor = self.executor.clone();
|
||||
let resp = race.and_then(move |mut pooled| {
|
||||
let conn_reused = pooled.is_reused();
|
||||
@@ -434,6 +372,75 @@ where C: Connect + Sync + 'static,
|
||||
Box::new(resp)
|
||||
}
|
||||
|
||||
fn pool_checkout_or_connect(&self, uri: Uri, pool_key: PoolKey)
|
||||
-> impl Future<Item=Pooled<PoolClient<B>>, Error=ClientError<B>>
|
||||
{
|
||||
let checkout = self.pool.checkout(pool_key.clone());
|
||||
let connect = self.connect_to(uri, pool_key);
|
||||
|
||||
let executor = self.executor.clone();
|
||||
checkout
|
||||
// The order of the `select` is depended on below...
|
||||
.select2(connect)
|
||||
.map(move |either| match either {
|
||||
// Checkout won, connect future may have been started or not.
|
||||
//
|
||||
// If it has, let it finish and insert back into the pool,
|
||||
// so as to not waste the socket...
|
||||
Either::A((checked_out, connecting)) => {
|
||||
// This depends on the `select` above having the correct
|
||||
// order, such that if the checkout future were ready
|
||||
// immediately, the connect future will never have been
|
||||
// started.
|
||||
//
|
||||
// If it *wasn't* ready yet, then the connect future will
|
||||
// have been started...
|
||||
if connecting.started() {
|
||||
let bg = connecting
|
||||
.map(|_pooled| {
|
||||
// dropping here should just place it in
|
||||
// the Pool for us...
|
||||
})
|
||||
.map_err(|err| {
|
||||
trace!("background connect error: {}", err);
|
||||
});
|
||||
// An execute error here isn't important, we're just trying
|
||||
// to prevent a waste of a socket...
|
||||
let _ = executor.execute(bg);
|
||||
}
|
||||
checked_out
|
||||
},
|
||||
// Connect won, checkout can just be dropped.
|
||||
Either::B((connected, _checkout)) => {
|
||||
connected
|
||||
},
|
||||
})
|
||||
.or_else(|either| match either {
|
||||
// Either checkout or connect could get canceled:
|
||||
//
|
||||
// 1. Connect is canceled if this is HTTP/2 and there is
|
||||
// an outstanding HTTP/2 connecting task.
|
||||
// 2. Checkout is canceled if the pool cannot deliver an
|
||||
// idle connection reliably.
|
||||
//
|
||||
// In both cases, we should just wait for the other future.
|
||||
Either::A((err, connecting)) => {
|
||||
if err.is_canceled() {
|
||||
Either::A(Either::A(connecting.map_err(ClientError::Normal)))
|
||||
} else {
|
||||
Either::B(future::err(ClientError::Normal(err)))
|
||||
}
|
||||
},
|
||||
Either::B((err, checkout)) => {
|
||||
if err.is_canceled() {
|
||||
Either::A(Either::B(checkout.map_err(ClientError::Normal)))
|
||||
} else {
|
||||
Either::B(future::err(ClientError::Normal(err)))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn connect_to(&self, uri: Uri, pool_key: PoolKey)
|
||||
-> impl Lazy<Item=Pooled<PoolClient<B>>, Error=::Error>
|
||||
{
|
||||
@@ -679,6 +686,8 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: allow() required due to `impl Trait` leaking types to this lint
|
||||
#[allow(missing_debug_implementations)]
|
||||
enum ClientError<B> {
|
||||
Normal(::Error),
|
||||
Canceled {
|
||||
|
||||
Reference in New Issue
Block a user