diff --git a/src/client/mod.rs b/src/client/mod.rs index cb4eea11..566b0b35 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -249,24 +249,23 @@ where C: Connect + Sync + 'static, let client = self.clone(); let uri = req.uri().clone(); + let pool_key = (Arc::new(domain.to_string()), self.ver); let fut = RetryableSendRequest { - client: client, - future: self.send_request(req, &domain), - domain: domain, - uri: uri, + client, + future: self.send_request(req, pool_key.clone()), + pool_key, + uri, }; ResponseFuture::new(Box::new(fut)) } //TODO: replace with `impl Future` when stable - fn send_request(&self, mut req: Request, domain: &str) -> Box, Error=ClientError> + Send> { - let pool_key = (Arc::new(domain.to_string()), self.ver); + fn send_request(&self, mut req: Request, pool_key: PoolKey) -> Box, Error=ClientError> + Send> { let race = self.pool_checkout_or_connect(req.uri().clone(), pool_key); let ver = self.ver; let executor = self.executor.clone(); - let resp = race.and_then(move |mut pooled| { - let conn_reused = pooled.is_reused(); + Box::new(race.and_then(move |mut pooled| { if ver == Ver::Http1 { // CONNECT always sends origin-form, so check it first... if req.method() == &Method::CONNECT { @@ -283,7 +282,8 @@ where C: Connect + Sync + 'static, ); } - let fut = pooled.send_request_retryable(req); + let fut = pooled.send_request_retryable(req) + .map_err(ClientError::map_with_reused(pooled.is_reused())); // As of futures@0.1.21, there is a race condition in the mpsc // channel, such that sending when the receiver is closing can @@ -293,83 +293,56 @@ where C: Connect + Sync + 'static, // To counteract this, we must check if our senders 'want' channel // has been closed after having tried to send. If so, error out... if pooled.is_closed() { - drop(pooled); - let fut = fut - .map_err(move |(err, orig_req)| { - if let Some(req) = orig_req { - ClientError::Canceled { - connection_reused: conn_reused, - reason: err, - req, - } - } else { - ClientError::Normal(err) - } - }); - Either::A(fut) - } else { - let fut = fut - .map_err(move |(err, orig_req)| { - if let Some(req) = orig_req { - ClientError::Canceled { - connection_reused: conn_reused, - reason: err, - req, - } - } else { - ClientError::Normal(err) - } - }) - .and_then(move |mut res| { - // If pooled is HTTP/2, we can toss this reference immediately. - // - // when pooled is dropped, it will try to insert back into the - // pool. To delay that, spawn a future that completes once the - // sender is ready again. - // - // This *should* only be once the related `Connection` has polled - // for a new request to start. - // - // It won't be ready if there is a body to stream. - if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() { - drop(pooled); - } else if !res.body().is_end_stream() { - let (delayed_tx, delayed_rx) = oneshot::channel(); - res.body_mut().delayed_eof(delayed_rx); - let on_idle = future::poll_fn(move || { - pooled.poll_ready() - }) - .then(move |_| { - // At this point, `pooled` is dropped, and had a chance - // to insert into the pool (if conn was idle) - drop(delayed_tx); - Ok(()) - }); - - if let Err(err) = executor.execute(on_idle) { - // This task isn't critical, so just log and ignore. - warn!("error spawning task to insert idle connection: {}", err); - } - } else { - // There's no body to delay, but the connection isn't - // ready yet. Only re-insert when it's ready - let on_idle = future::poll_fn(move || { - pooled.poll_ready() - }) - .then(|_| Ok(())); - - if let Err(err) = executor.execute(on_idle) { - // This task isn't critical, so just log and ignore. - warn!("error spawning task to insert idle connection: {}", err); - } - } - Ok(res) - }); - Either::B(fut) + return Either::A(fut); } - }); - Box::new(resp) + Either::B(fut + .and_then(move |mut res| { + // If pooled is HTTP/2, we can toss this reference immediately. + // + // when pooled is dropped, it will try to insert back into the + // pool. To delay that, spawn a future that completes once the + // sender is ready again. + // + // This *should* only be once the related `Connection` has polled + // for a new request to start. + // + // It won't be ready if there is a body to stream. + if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() { + drop(pooled); + } else if !res.body().is_end_stream() { + let (delayed_tx, delayed_rx) = oneshot::channel(); + res.body_mut().delayed_eof(delayed_rx); + let on_idle = future::poll_fn(move || { + pooled.poll_ready() + }) + .then(move |_| { + // At this point, `pooled` is dropped, and had a chance + // to insert into the pool (if conn was idle) + drop(delayed_tx); + Ok(()) + }); + + if let Err(err) = executor.execute(on_idle) { + // This task isn't critical, so just log and ignore. + warn!("error spawning task to insert idle connection: {}", err); + } + } else { + // There's no body to delay, but the connection isn't + // ready yet. Only re-insert when it's ready + let on_idle = future::poll_fn(move || { + pooled.poll_ready() + }) + .then(|_| Ok(())); + + if let Err(err) = executor.execute(on_idle) { + // This task isn't critical, so just log and ignore. + warn!("error spawning task to insert idle connection: {}", err); + } + } + Ok(res) + })) + })) } fn pool_checkout_or_connect(&self, uri: Uri, pool_key: PoolKey) @@ -558,8 +531,8 @@ impl Future for ResponseFuture { struct RetryableSendRequest { client: Client, - domain: String, future: Box, Error=ClientError> + Send>, + pool_key: PoolKey, uri: Uri, } @@ -598,7 +571,7 @@ where trace!("unstarted request canceled, trying again (reason={:?})", reason); *req.uri_mut() = self.uri.clone(); - self.future = self.client.send_request(req, &self.domain); + self.future = self.client.send_request(req, self.pool_key.clone()); } } } @@ -697,6 +670,24 @@ enum ClientError { } } +impl ClientError { + fn map_with_reused(conn_reused: bool) + -> impl Fn((::Error, Option>)) -> Self + { + move |(err, orig_req)| { + if let Some(req) = orig_req { + ClientError::Canceled { + connection_reused: conn_reused, + reason: err, + req, + } + } else { + ClientError::Normal(err) + } + } + } +} + /// A marker to identify what version a pooled connection is. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] enum Ver {