fix(client): ensure idle connection is pooled before response body finishes
This commit is contained in:
		| @@ -128,6 +128,10 @@ impl<B> SendRequest<B> | |||||||
|         self.dispatch.poll_ready(cx) |         self.dispatch.poll_ready(cx) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub(super) fn is_ready(&self) -> bool { | ||||||
|  |         self.dispatch.is_ready() | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub(super) fn is_closed(&self) -> bool { |     pub(super) fn is_closed(&self) -> bool { | ||||||
|         self.dispatch.is_closed() |         self.dispatch.is_closed() | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -45,6 +45,10 @@ impl<T, U> Sender<T, U> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn is_ready(&self) -> bool { | ||||||
|  |         self.giver.is_wanting() | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn is_closed(&self) -> bool { |     pub fn is_closed(&self) -> bool { | ||||||
|         self.giver.is_canceled() |         self.giver.is_canceled() | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -7,6 +7,7 @@ use std::sync::Arc; | |||||||
| use std::time::Duration; | use std::time::Duration; | ||||||
|  |  | ||||||
| use futures::{Async, Future, FutureExt, Never, Poll}; | use futures::{Async, Future, FutureExt, Never, Poll}; | ||||||
|  | use futures::channel::oneshot; | ||||||
| use futures::future; | use futures::future; | ||||||
| use futures::task; | use futures::task; | ||||||
| use http::{Method, Request, Response, Uri, Version}; | use http::{Method, Request, Response, Uri, Version}; | ||||||
| @@ -235,7 +236,7 @@ where C: Connect<Error=io::Error> + Sync + 'static, | |||||||
|                         ClientError::Normal(err) |                         ClientError::Normal(err) | ||||||
|                     } |                     } | ||||||
|                 }) |                 }) | ||||||
|                 .and_then(move |res| { |                 .and_then(move |mut res| { | ||||||
|                     future::lazy(move |cx| { |                     future::lazy(move |cx| { | ||||||
|                         // when pooled is dropped, it will try to insert back into the |                         // when pooled is dropped, it will try to insert back into the | ||||||
|                         // pool. To delay that, spawn a future that completes once the |                         // pool. To delay that, spawn a future that completes once the | ||||||
| @@ -245,14 +246,24 @@ where C: Connect<Error=io::Error> + Sync + 'static, | |||||||
|                         // for a new request to start. |                         // for a new request to start. | ||||||
|                         // |                         // | ||||||
|                         // It won't be ready if there is a body to stream. |                         // It won't be ready if there is a body to stream. | ||||||
|                         if let Ok(Async::Pending) = pooled.tx.poll_ready(cx) { |                         if pooled.tx.is_ready() { | ||||||
|  |                             drop(pooled); | ||||||
|  |                         } else if !res.body().is_empty() { | ||||||
|  |                             let (delayed_tx, delayed_rx) = oneshot::channel(); | ||||||
|  |                             res.body_mut().delayed_eof(delayed_rx); | ||||||
|                             // If the executor doesn't have room, oh well. Things will likely |                             // If the executor doesn't have room, oh well. Things will likely | ||||||
|                             // be blowing up soon, but this specific task isn't required. |                             // be blowing up soon, but this specific task isn't required. | ||||||
|                             execute(future::poll_fn(move |cx| { |                             let fut = future::poll_fn(move |cx| { | ||||||
|                                 pooled.tx.poll_ready(cx).or(Ok(Async::Ready(()))) |                                 pooled.tx.poll_ready(cx) | ||||||
|                             }), cx).ok(); |                             }) | ||||||
|  |                                 .then(move |_| { | ||||||
|  |                                     // At this point, `pooled` is dropped, and had a chance | ||||||
|  |                                     // to insert into the pool (if conn was idle) | ||||||
|  |                                     drop(delayed_tx); | ||||||
|  |                                     Ok(()) | ||||||
|  |                                 }); | ||||||
|  |                             execute(fut, cx).ok(); | ||||||
|                         } |                         } | ||||||
|  |  | ||||||
|                         Ok(res) |                         Ok(res) | ||||||
|                     }) |                     }) | ||||||
|                 }); |                 }); | ||||||
|   | |||||||
| @@ -3,7 +3,7 @@ use std::borrow::Cow; | |||||||
| use std::fmt; | use std::fmt; | ||||||
|  |  | ||||||
| use bytes::Bytes; | use bytes::Bytes; | ||||||
| use futures::{Async, Future, Poll, Stream, StreamExt}; | use futures::{Async, Future, Never, Poll, Stream, StreamExt}; | ||||||
| use futures::task; | use futures::task; | ||||||
| use futures::channel::{mpsc, oneshot}; | use futures::channel::{mpsc, oneshot}; | ||||||
| use http::HeaderMap; | use http::HeaderMap; | ||||||
| @@ -125,6 +125,15 @@ impl<E: Entity> Stream for EntityStream<E> { | |||||||
| #[must_use = "streams do nothing unless polled"] | #[must_use = "streams do nothing unless polled"] | ||||||
| pub struct Body { | pub struct Body { | ||||||
|     kind: Kind, |     kind: Kind, | ||||||
|  |     /// Allow the client to pass a future to delay the `Body` from returning | ||||||
|  |     /// EOF. This allows the `Client` to try to put the idle connection | ||||||
|  |     /// back into the pool before the body is "finished". | ||||||
|  |     /// | ||||||
|  |     /// The reason for this is so that creating a new request after finishing | ||||||
|  |     /// streaming the body of a response could sometimes result in creating | ||||||
|  |     /// a brand new connection, since the pool didn't know about the idle | ||||||
|  |     /// connection yet. | ||||||
|  |     delayed_eof: Option<DelayEof>, | ||||||
| } | } | ||||||
|  |  | ||||||
| enum Kind { | enum Kind { | ||||||
| @@ -137,6 +146,17 @@ enum Kind { | |||||||
|     Empty, |     Empty, | ||||||
| } | } | ||||||
|  |  | ||||||
|  | type DelayEofUntil = oneshot::Receiver<Never>; | ||||||
|  |  | ||||||
|  | enum DelayEof { | ||||||
|  |     /// Initial state, stream hasn't seen EOF yet. | ||||||
|  |     NotEof(DelayEofUntil), | ||||||
|  |     /// Transitions to this state once we've seen `poll` try to | ||||||
|  |     /// return EOF (`None`). This future is then polled, and | ||||||
|  |     /// when it completes, the Body finally returns EOF (`None`). | ||||||
|  |     Eof(DelayEofUntil), | ||||||
|  | } | ||||||
|  |  | ||||||
| /// A sender half used with `Body::channel()`. | /// A sender half used with `Body::channel()`. | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| pub struct Sender { | pub struct Sender { | ||||||
| @@ -253,6 +273,63 @@ impl Body { | |||||||
|     fn new(kind: Kind) -> Body { |     fn new(kind: Kind) -> Body { | ||||||
|         Body { |         Body { | ||||||
|             kind: kind, |             kind: kind, | ||||||
|  |             delayed_eof: None, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { | ||||||
|  |         self.delayed_eof = Some(DelayEof::NotEof(fut)); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn poll_eof(&mut self, cx: &mut task::Context) -> Poll<Option<Chunk>, ::Error> { | ||||||
|  |         match self.delayed_eof.take() { | ||||||
|  |             Some(DelayEof::NotEof(mut delay)) => { | ||||||
|  |                 match self.poll_inner(cx) { | ||||||
|  |                     ok @ Ok(Async::Ready(Some(..))) | | ||||||
|  |                     ok @ Ok(Async::Pending) => { | ||||||
|  |                         self.delayed_eof = Some(DelayEof::NotEof(delay)); | ||||||
|  |                         ok | ||||||
|  |                     }, | ||||||
|  |                     Ok(Async::Ready(None)) => match delay.poll(cx) { | ||||||
|  |                         Ok(Async::Ready(never)) => match never {}, | ||||||
|  |                         Ok(Async::Pending) => { | ||||||
|  |                             self.delayed_eof = Some(DelayEof::Eof(delay)); | ||||||
|  |                             Ok(Async::Pending) | ||||||
|  |                         }, | ||||||
|  |                         Err(_done) => { | ||||||
|  |                             Ok(Async::Ready(None)) | ||||||
|  |                         }, | ||||||
|  |                     }, | ||||||
|  |                     Err(e) => Err(e), | ||||||
|  |                 } | ||||||
|  |             }, | ||||||
|  |             Some(DelayEof::Eof(mut delay)) => { | ||||||
|  |                 match delay.poll(cx) { | ||||||
|  |                     Ok(Async::Ready(never)) => match never {}, | ||||||
|  |                     Ok(Async::Pending) => { | ||||||
|  |                         self.delayed_eof = Some(DelayEof::Eof(delay)); | ||||||
|  |                         Ok(Async::Pending) | ||||||
|  |                     }, | ||||||
|  |                     Err(_done) => { | ||||||
|  |                         Ok(Async::Ready(None)) | ||||||
|  |                     }, | ||||||
|  |                 } | ||||||
|  |             }, | ||||||
|  |             None => self.poll_inner(cx), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn poll_inner(&mut self, cx: &mut task::Context) -> Poll<Option<Chunk>, ::Error> { | ||||||
|  |         match self.kind { | ||||||
|  |             Kind::Chan { ref mut rx, .. } => match rx.poll_next(cx).expect("mpsc cannot error") { | ||||||
|  |                 Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))), | ||||||
|  |                 Async::Ready(Some(Err(err))) => Err(err), | ||||||
|  |                 Async::Ready(None) => Ok(Async::Ready(None)), | ||||||
|  |                 Async::Pending => Ok(Async::Pending), | ||||||
|  |             }, | ||||||
|  |             Kind::Wrapped(ref mut s) => s.poll_next(cx), | ||||||
|  |             Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), | ||||||
|  |             Kind::Empty => Ok(Async::Ready(None)), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| @@ -269,17 +346,7 @@ impl Entity for Body { | |||||||
|     type Error = ::Error; |     type Error = ::Error; | ||||||
|  |  | ||||||
|     fn poll_data(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Data>, Self::Error> { |     fn poll_data(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Data>, Self::Error> { | ||||||
|         match self.kind { |         self.poll_eof(cx) | ||||||
|             Kind::Chan { ref mut rx, .. } => match rx.poll_next(cx).expect("mpsc cannot error") { |  | ||||||
|                 Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))), |  | ||||||
|                 Async::Ready(Some(Err(err))) => Err(err), |  | ||||||
|                 Async::Ready(None) => Ok(Async::Ready(None)), |  | ||||||
|                 Async::Pending => Ok(Async::Pending), |  | ||||||
|             }, |  | ||||||
|             Kind::Wrapped(ref mut s) => s.poll_next(cx), |  | ||||||
|             Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), |  | ||||||
|             Kind::Empty => Ok(Async::Ready(None)), |  | ||||||
|         } |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn is_end_stream(&self) -> bool { |     fn is_end_stream(&self) -> bool { | ||||||
| @@ -300,6 +367,7 @@ impl Entity for Body { | |||||||
|             Kind::Empty => Some(0) |             Kind::Empty => Some(0) | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |      | ||||||
| } | } | ||||||
|  |  | ||||||
| impl fmt::Debug for Body { | impl fmt::Debug for Body { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user