refactor(client): clarify some code in send_request
This commit is contained in:
@@ -249,24 +249,23 @@ where C: Connect + Sync + 'static,
|
|||||||
|
|
||||||
let client = self.clone();
|
let client = self.clone();
|
||||||
let uri = req.uri().clone();
|
let uri = req.uri().clone();
|
||||||
|
let pool_key = (Arc::new(domain.to_string()), self.ver);
|
||||||
let fut = RetryableSendRequest {
|
let fut = RetryableSendRequest {
|
||||||
client: client,
|
client,
|
||||||
future: self.send_request(req, &domain),
|
future: self.send_request(req, pool_key.clone()),
|
||||||
domain: domain,
|
pool_key,
|
||||||
uri: uri,
|
uri,
|
||||||
};
|
};
|
||||||
ResponseFuture::new(Box::new(fut))
|
ResponseFuture::new(Box::new(fut))
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: replace with `impl Future` when stable
|
//TODO: replace with `impl Future` when stable
|
||||||
fn send_request(&self, mut req: Request<B>, domain: &str) -> Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send> {
|
fn send_request(&self, mut req: Request<B>, pool_key: PoolKey) -> Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send> {
|
||||||
let pool_key = (Arc::new(domain.to_string()), self.ver);
|
|
||||||
let race = self.pool_checkout_or_connect(req.uri().clone(), pool_key);
|
let race = self.pool_checkout_or_connect(req.uri().clone(), pool_key);
|
||||||
|
|
||||||
let ver = self.ver;
|
let ver = self.ver;
|
||||||
let executor = self.executor.clone();
|
let executor = self.executor.clone();
|
||||||
let resp = race.and_then(move |mut pooled| {
|
Box::new(race.and_then(move |mut pooled| {
|
||||||
let conn_reused = pooled.is_reused();
|
|
||||||
if ver == Ver::Http1 {
|
if ver == Ver::Http1 {
|
||||||
// CONNECT always sends origin-form, so check it first...
|
// CONNECT always sends origin-form, so check it first...
|
||||||
if req.method() == &Method::CONNECT {
|
if req.method() == &Method::CONNECT {
|
||||||
@@ -283,7 +282,8 @@ where C: Connect + Sync + 'static,
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
let fut = pooled.send_request_retryable(req);
|
let fut = pooled.send_request_retryable(req)
|
||||||
|
.map_err(ClientError::map_with_reused(pooled.is_reused()));
|
||||||
|
|
||||||
// As of futures@0.1.21, there is a race condition in the mpsc
|
// As of futures@0.1.21, there is a race condition in the mpsc
|
||||||
// channel, such that sending when the receiver is closing can
|
// channel, such that sending when the receiver is closing can
|
||||||
@@ -293,83 +293,56 @@ where C: Connect + Sync + 'static,
|
|||||||
// To counteract this, we must check if our senders 'want' channel
|
// To counteract this, we must check if our senders 'want' channel
|
||||||
// has been closed after having tried to send. If so, error out...
|
// has been closed after having tried to send. If so, error out...
|
||||||
if pooled.is_closed() {
|
if pooled.is_closed() {
|
||||||
drop(pooled);
|
return Either::A(fut);
|
||||||
let fut = fut
|
|
||||||
.map_err(move |(err, orig_req)| {
|
|
||||||
if let Some(req) = orig_req {
|
|
||||||
ClientError::Canceled {
|
|
||||||
connection_reused: conn_reused,
|
|
||||||
reason: err,
|
|
||||||
req,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ClientError::Normal(err)
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Either::A(fut)
|
|
||||||
} else {
|
|
||||||
let fut = fut
|
|
||||||
.map_err(move |(err, orig_req)| {
|
|
||||||
if let Some(req) = orig_req {
|
|
||||||
ClientError::Canceled {
|
|
||||||
connection_reused: conn_reused,
|
|
||||||
reason: err,
|
|
||||||
req,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ClientError::Normal(err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.and_then(move |mut res| {
|
|
||||||
// If pooled is HTTP/2, we can toss this reference immediately.
|
|
||||||
//
|
|
||||||
// when pooled is dropped, it will try to insert back into the
|
|
||||||
// pool. To delay that, spawn a future that completes once the
|
|
||||||
// sender is ready again.
|
|
||||||
//
|
|
||||||
// This *should* only be once the related `Connection` has polled
|
|
||||||
// for a new request to start.
|
|
||||||
//
|
|
||||||
// It won't be ready if there is a body to stream.
|
|
||||||
if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() {
|
|
||||||
drop(pooled);
|
|
||||||
} else if !res.body().is_end_stream() {
|
|
||||||
let (delayed_tx, delayed_rx) = oneshot::channel();
|
|
||||||
res.body_mut().delayed_eof(delayed_rx);
|
|
||||||
let on_idle = future::poll_fn(move || {
|
|
||||||
pooled.poll_ready()
|
|
||||||
})
|
|
||||||
.then(move |_| {
|
|
||||||
// At this point, `pooled` is dropped, and had a chance
|
|
||||||
// to insert into the pool (if conn was idle)
|
|
||||||
drop(delayed_tx);
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Err(err) = executor.execute(on_idle) {
|
|
||||||
// This task isn't critical, so just log and ignore.
|
|
||||||
warn!("error spawning task to insert idle connection: {}", err);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// There's no body to delay, but the connection isn't
|
|
||||||
// ready yet. Only re-insert when it's ready
|
|
||||||
let on_idle = future::poll_fn(move || {
|
|
||||||
pooled.poll_ready()
|
|
||||||
})
|
|
||||||
.then(|_| Ok(()));
|
|
||||||
|
|
||||||
if let Err(err) = executor.execute(on_idle) {
|
|
||||||
// This task isn't critical, so just log and ignore.
|
|
||||||
warn!("error spawning task to insert idle connection: {}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(res)
|
|
||||||
});
|
|
||||||
Either::B(fut)
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
Box::new(resp)
|
Either::B(fut
|
||||||
|
.and_then(move |mut res| {
|
||||||
|
// If pooled is HTTP/2, we can toss this reference immediately.
|
||||||
|
//
|
||||||
|
// when pooled is dropped, it will try to insert back into the
|
||||||
|
// pool. To delay that, spawn a future that completes once the
|
||||||
|
// sender is ready again.
|
||||||
|
//
|
||||||
|
// This *should* only be once the related `Connection` has polled
|
||||||
|
// for a new request to start.
|
||||||
|
//
|
||||||
|
// It won't be ready if there is a body to stream.
|
||||||
|
if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() {
|
||||||
|
drop(pooled);
|
||||||
|
} else if !res.body().is_end_stream() {
|
||||||
|
let (delayed_tx, delayed_rx) = oneshot::channel();
|
||||||
|
res.body_mut().delayed_eof(delayed_rx);
|
||||||
|
let on_idle = future::poll_fn(move || {
|
||||||
|
pooled.poll_ready()
|
||||||
|
})
|
||||||
|
.then(move |_| {
|
||||||
|
// At this point, `pooled` is dropped, and had a chance
|
||||||
|
// to insert into the pool (if conn was idle)
|
||||||
|
drop(delayed_tx);
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Err(err) = executor.execute(on_idle) {
|
||||||
|
// This task isn't critical, so just log and ignore.
|
||||||
|
warn!("error spawning task to insert idle connection: {}", err);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// There's no body to delay, but the connection isn't
|
||||||
|
// ready yet. Only re-insert when it's ready
|
||||||
|
let on_idle = future::poll_fn(move || {
|
||||||
|
pooled.poll_ready()
|
||||||
|
})
|
||||||
|
.then(|_| Ok(()));
|
||||||
|
|
||||||
|
if let Err(err) = executor.execute(on_idle) {
|
||||||
|
// This task isn't critical, so just log and ignore.
|
||||||
|
warn!("error spawning task to insert idle connection: {}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(res)
|
||||||
|
}))
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn pool_checkout_or_connect(&self, uri: Uri, pool_key: PoolKey)
|
fn pool_checkout_or_connect(&self, uri: Uri, pool_key: PoolKey)
|
||||||
@@ -558,8 +531,8 @@ impl Future for ResponseFuture {
|
|||||||
|
|
||||||
struct RetryableSendRequest<C, B> {
|
struct RetryableSendRequest<C, B> {
|
||||||
client: Client<C, B>,
|
client: Client<C, B>,
|
||||||
domain: String,
|
|
||||||
future: Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send>,
|
future: Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send>,
|
||||||
|
pool_key: PoolKey,
|
||||||
uri: Uri,
|
uri: Uri,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -598,7 +571,7 @@ where
|
|||||||
|
|
||||||
trace!("unstarted request canceled, trying again (reason={:?})", reason);
|
trace!("unstarted request canceled, trying again (reason={:?})", reason);
|
||||||
*req.uri_mut() = self.uri.clone();
|
*req.uri_mut() = self.uri.clone();
|
||||||
self.future = self.client.send_request(req, &self.domain);
|
self.future = self.client.send_request(req, self.pool_key.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -697,6 +670,24 @@ enum ClientError<B> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<B> ClientError<B> {
|
||||||
|
fn map_with_reused(conn_reused: bool)
|
||||||
|
-> impl Fn((::Error, Option<Request<B>>)) -> Self
|
||||||
|
{
|
||||||
|
move |(err, orig_req)| {
|
||||||
|
if let Some(req) = orig_req {
|
||||||
|
ClientError::Canceled {
|
||||||
|
connection_reused: conn_reused,
|
||||||
|
reason: err,
|
||||||
|
req,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ClientError::Normal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A marker to identify what version a pooled connection is.
|
/// A marker to identify what version a pooled connection is.
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
||||||
enum Ver {
|
enum Ver {
|
||||||
|
|||||||
Reference in New Issue
Block a user