diff --git a/src/client/conn.rs b/src/client/conn.rs index 6dee97fa..c01b24e4 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -128,6 +128,10 @@ impl SendRequest self.dispatch.poll_ready(cx) } + pub(super) fn is_ready(&self) -> bool { + self.dispatch.is_ready() + } + pub(super) fn is_closed(&self) -> bool { self.dispatch.is_closed() } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 77402afe..98c59152 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -45,6 +45,10 @@ impl Sender { } } + pub fn is_ready(&self) -> bool { + self.giver.is_wanting() + } + pub fn is_closed(&self) -> bool { self.giver.is_canceled() } diff --git a/src/client/mod.rs b/src/client/mod.rs index 4d81d11d..326db000 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use std::time::Duration; use futures::{Async, Future, FutureExt, Never, Poll}; +use futures::channel::oneshot; use futures::future; use futures::task; use http::{Method, Request, Response, Uri, Version}; @@ -235,7 +236,7 @@ where C: Connect + Sync + 'static, ClientError::Normal(err) } }) - .and_then(move |res| { + .and_then(move |mut res| { future::lazy(move |cx| { // when pooled is dropped, it will try to insert back into the // pool. To delay that, spawn a future that completes once the @@ -245,14 +246,24 @@ where C: Connect + Sync + 'static, // for a new request to start. // // It won't be ready if there is a body to stream. - if let Ok(Async::Pending) = pooled.tx.poll_ready(cx) { + if pooled.tx.is_ready() { + drop(pooled); + } else if !res.body().is_empty() { + let (delayed_tx, delayed_rx) = oneshot::channel(); + res.body_mut().delayed_eof(delayed_rx); // If the executor doesn't have room, oh well. Things will likely // be blowing up soon, but this specific task isn't required. - execute(future::poll_fn(move |cx| { - pooled.tx.poll_ready(cx).or(Ok(Async::Ready(()))) - }), cx).ok(); + let fut = future::poll_fn(move |cx| { + pooled.tx.poll_ready(cx) + }) + .then(move |_| { + // At this point, `pooled` is dropped, and had a chance + // to insert into the pool (if conn was idle) + drop(delayed_tx); + Ok(()) + }); + execute(fut, cx).ok(); } - Ok(res) }) }); diff --git a/src/proto/body.rs b/src/proto/body.rs index 236e6f2e..8adf3c37 100644 --- a/src/proto/body.rs +++ b/src/proto/body.rs @@ -3,7 +3,7 @@ use std::borrow::Cow; use std::fmt; use bytes::Bytes; -use futures::{Async, Future, Poll, Stream, StreamExt}; +use futures::{Async, Future, Never, Poll, Stream, StreamExt}; use futures::task; use futures::channel::{mpsc, oneshot}; use http::HeaderMap; @@ -125,6 +125,15 @@ impl Stream for EntityStream { #[must_use = "streams do nothing unless polled"] pub struct Body { kind: Kind, + /// Allow the client to pass a future to delay the `Body` from returning + /// EOF. This allows the `Client` to try to put the idle connection + /// back into the pool before the body is "finished". + /// + /// The reason for this is so that creating a new request after finishing + /// streaming the body of a response could sometimes result in creating + /// a brand new connection, since the pool didn't know about the idle + /// connection yet. + delayed_eof: Option, } enum Kind { @@ -137,6 +146,17 @@ enum Kind { Empty, } +type DelayEofUntil = oneshot::Receiver; + +enum DelayEof { + /// Initial state, stream hasn't seen EOF yet. + NotEof(DelayEofUntil), + /// Transitions to this state once we've seen `poll` try to + /// return EOF (`None`). This future is then polled, and + /// when it completes, the Body finally returns EOF (`None`). + Eof(DelayEofUntil), +} + /// A sender half used with `Body::channel()`. #[derive(Debug)] pub struct Sender { @@ -253,6 +273,63 @@ impl Body { fn new(kind: Kind) -> Body { Body { kind: kind, + delayed_eof: None, + } + } + + pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { + self.delayed_eof = Some(DelayEof::NotEof(fut)); + } + + fn poll_eof(&mut self, cx: &mut task::Context) -> Poll, ::Error> { + match self.delayed_eof.take() { + Some(DelayEof::NotEof(mut delay)) => { + match self.poll_inner(cx) { + ok @ Ok(Async::Ready(Some(..))) | + ok @ Ok(Async::Pending) => { + self.delayed_eof = Some(DelayEof::NotEof(delay)); + ok + }, + Ok(Async::Ready(None)) => match delay.poll(cx) { + Ok(Async::Ready(never)) => match never {}, + Ok(Async::Pending) => { + self.delayed_eof = Some(DelayEof::Eof(delay)); + Ok(Async::Pending) + }, + Err(_done) => { + Ok(Async::Ready(None)) + }, + }, + Err(e) => Err(e), + } + }, + Some(DelayEof::Eof(mut delay)) => { + match delay.poll(cx) { + Ok(Async::Ready(never)) => match never {}, + Ok(Async::Pending) => { + self.delayed_eof = Some(DelayEof::Eof(delay)); + Ok(Async::Pending) + }, + Err(_done) => { + Ok(Async::Ready(None)) + }, + } + }, + None => self.poll_inner(cx), + } + } + + fn poll_inner(&mut self, cx: &mut task::Context) -> Poll, ::Error> { + match self.kind { + Kind::Chan { ref mut rx, .. } => match rx.poll_next(cx).expect("mpsc cannot error") { + Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))), + Async::Ready(Some(Err(err))) => Err(err), + Async::Ready(None) => Ok(Async::Ready(None)), + Async::Pending => Ok(Async::Pending), + }, + Kind::Wrapped(ref mut s) => s.poll_next(cx), + Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), + Kind::Empty => Ok(Async::Ready(None)), } } } @@ -269,17 +346,7 @@ impl Entity for Body { type Error = ::Error; fn poll_data(&mut self, cx: &mut task::Context) -> Poll, Self::Error> { - match self.kind { - Kind::Chan { ref mut rx, .. } => match rx.poll_next(cx).expect("mpsc cannot error") { - Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))), - Async::Ready(Some(Err(err))) => Err(err), - Async::Ready(None) => Ok(Async::Ready(None)), - Async::Pending => Ok(Async::Pending), - }, - Kind::Wrapped(ref mut s) => s.poll_next(cx), - Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), - Kind::Empty => Ok(Async::Ready(None)), - } + self.poll_eof(cx) } fn is_end_stream(&self) -> bool { @@ -300,6 +367,7 @@ impl Entity for Body { Kind::Empty => Some(0) } } + } impl fmt::Debug for Body {