fix(client): handle race condition when sending while connection is closing
This commit is contained in:
		| @@ -156,37 +156,13 @@ impl<T, U> Stream for Receiver<T, U> { | ||||
|     } | ||||
| } | ||||
|  | ||||
| /* | ||||
| TODO: with futures 0.2, bring this Drop back and toss Envelope | ||||
|  | ||||
| The problem is, there is a bug in futures 0.1 mpsc channel, where | ||||
| even though you may call `rx.close()`, `rx.poll()` may still think | ||||
| there are messages and so should park the current task. In futures | ||||
| 0.2, we can use `try_next`, and not even risk such a bug. | ||||
|  | ||||
| For now, use an `Envelope` that has this drop guard logic instead. | ||||
|  | ||||
| impl<T, U> Drop for Receiver<T, U> { | ||||
|     fn drop(&mut self) { | ||||
|         // Notify the giver about the closure first, before dropping | ||||
|         // the mpsc::Receiver. | ||||
|         self.taker.cancel(); | ||||
|         self.inner.close(); | ||||
|  | ||||
|         // This poll() is safe to call in `Drop`, because we've | ||||
|         // called, `close`, which promises that no new messages | ||||
|         // will arrive, and thus, once we reach the end, we won't | ||||
|         // see a `NotReady` (and try to park), but a Ready(None). | ||||
|         // | ||||
|         // All other variants: | ||||
|         // - Ready(None): the end. we want to stop looping | ||||
|         // - NotReady: unreachable | ||||
|         // - Err: unreachable | ||||
|         while let Ok(Async::Ready(Some((val, cb)))) = self.inner.poll() { | ||||
|             let _ = cb.send(Err((::Error::new_canceled(None::<::Error>), Some(val)))); | ||||
|         } | ||||
|     } | ||||
|  | ||||
| } | ||||
| */ | ||||
|  | ||||
| struct Envelope<T, U>(Option<(T, Callback<T, U>)>); | ||||
|  | ||||
|   | ||||
| @@ -250,50 +250,75 @@ where C: Connect + Sync + 'static, | ||||
|             if ver == Ver::Http1 { | ||||
|                 set_relative_uri(req.uri_mut(), pooled.is_proxied); | ||||
|             } | ||||
|             let fut = pooled.send_request_retryable(req) | ||||
|                 .map_err(move |(err, orig_req)| { | ||||
|                     if let Some(req) = orig_req { | ||||
|                         ClientError::Canceled { | ||||
|                             connection_reused: conn_reused, | ||||
|                             reason: err, | ||||
|                             req, | ||||
|                         } | ||||
|                     } else { | ||||
|                         ClientError::Normal(err) | ||||
|                     } | ||||
|                 }) | ||||
|                 .and_then(move |mut res| { | ||||
|                     // If pooled is HTTP/2, we can toss this reference immediately. | ||||
|                     // | ||||
|                     // when pooled is dropped, it will try to insert back into the | ||||
|                     // pool. To delay that, spawn a future that completes once the | ||||
|                     // sender is ready again. | ||||
|                     // | ||||
|                     // This *should* only be once the related `Connection` has polled | ||||
|                     // for a new request to start. | ||||
|                     // | ||||
|                     // It won't be ready if there is a body to stream. | ||||
|                     if ver == Ver::Http2 || pooled.is_ready() { | ||||
|                         drop(pooled); | ||||
|                     } else if !res.body().is_empty() { | ||||
|                         let (delayed_tx, delayed_rx) = oneshot::channel(); | ||||
|                         res.body_mut().delayed_eof(delayed_rx); | ||||
|                         executor.execute( | ||||
|                             future::poll_fn(move || { | ||||
|                                 pooled.poll_ready() | ||||
|                             }) | ||||
|                             .then(move |_| { | ||||
|                                 // At this point, `pooled` is dropped, and had a chance | ||||
|                                 // to insert into the pool (if conn was idle) | ||||
|                                 drop(delayed_tx); | ||||
|                                 Ok(()) | ||||
|                             }) | ||||
|                         ); | ||||
|                     } | ||||
|                     Ok(res) | ||||
|                 }); | ||||
|             let fut = pooled.send_request_retryable(req); | ||||
|  | ||||
|             fut | ||||
|             // As of futures@0.1.21, there is a race condition in the mpsc | ||||
|             // channel, such that sending when the receiver is closing can | ||||
|             // result in the message being stuck inside the queue. It won't | ||||
|             // ever notify until the Sender side is dropped. | ||||
|             // | ||||
|             // To counteract this, we must check if our senders 'want' channel | ||||
|             // has been closed after having tried to send. If so, error out... | ||||
|             if pooled.is_closed() { | ||||
|                 drop(pooled); | ||||
|                 let fut = fut | ||||
|                     .map_err(move |(err, orig_req)| { | ||||
|                         if let Some(req) = orig_req { | ||||
|                             ClientError::Canceled { | ||||
|                                 connection_reused: conn_reused, | ||||
|                                 reason: err, | ||||
|                                 req, | ||||
|                             } | ||||
|                         } else { | ||||
|                             ClientError::Normal(err) | ||||
|                         } | ||||
|                     }); | ||||
|                 Either::A(fut) | ||||
|             } else { | ||||
|                 let fut = fut | ||||
|                     .map_err(move |(err, orig_req)| { | ||||
|                         if let Some(req) = orig_req { | ||||
|                             ClientError::Canceled { | ||||
|                                 connection_reused: conn_reused, | ||||
|                                 reason: err, | ||||
|                                 req, | ||||
|                             } | ||||
|                         } else { | ||||
|                             ClientError::Normal(err) | ||||
|                         } | ||||
|                     }) | ||||
|                     .and_then(move |mut res| { | ||||
|                         // If pooled is HTTP/2, we can toss this reference immediately. | ||||
|                         // | ||||
|                         // when pooled is dropped, it will try to insert back into the | ||||
|                         // pool. To delay that, spawn a future that completes once the | ||||
|                         // sender is ready again. | ||||
|                         // | ||||
|                         // This *should* only be once the related `Connection` has polled | ||||
|                         // for a new request to start. | ||||
|                         // | ||||
|                         // It won't be ready if there is a body to stream. | ||||
|                         if ver == Ver::Http2 || pooled.is_ready() { | ||||
|                             drop(pooled); | ||||
|                         } else if !res.body().is_empty() { | ||||
|                             let (delayed_tx, delayed_rx) = oneshot::channel(); | ||||
|                             res.body_mut().delayed_eof(delayed_rx); | ||||
|                             executor.execute( | ||||
|                                 future::poll_fn(move || { | ||||
|                                     pooled.poll_ready() | ||||
|                                 }) | ||||
|                                 .then(move |_| { | ||||
|                                     // At this point, `pooled` is dropped, and had a chance | ||||
|                                     // to insert into the pool (if conn was idle) | ||||
|                                     drop(delayed_tx); | ||||
|                                     Ok(()) | ||||
|                                 }) | ||||
|                             ); | ||||
|                         } | ||||
|                         Ok(res) | ||||
|                     }); | ||||
|                 Either::B(fut) | ||||
|             } | ||||
|         }); | ||||
|  | ||||
|         Box::new(resp) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user