fix(client): try to reuse connections when pool checkout wins

If a checkout wins, meaning an idle connection became available before
a connect future completed, instead of just dropping the connect future,
it spawns it into the background executor to allow being placed into
the pool on completion.
This commit is contained in:
Sean McArthur
2018-06-28 12:43:56 -07:00
committed by GitHub
parent 1f95f58837
commit f2d464ac79
6 changed files with 314 additions and 37 deletions

View File

@@ -91,6 +91,7 @@ use http::uri::Scheme;
use body::{Body, Payload};
use common::Exec;
use common::lazy as hyper_lazy;
use self::connect::{Connect, Destination};
use self::pool::{Pool, Poolable, Reservation};
@@ -274,7 +275,7 @@ where C: Connect + Sync + 'static,
let dst = Destination {
uri: url,
};
future::lazy(move || {
hyper_lazy(move || {
if let Some(connecting) = pool.connecting(&pool_key) {
Either::A(connector.connect(dst)
.map_err(::Error::new_connect)
@@ -318,9 +319,43 @@ where C: Connect + Sync + 'static,
})
};
let race = checkout.select(connect)
.map(|(pooled, _work)| pooled)
.or_else(|(e, other)| {
let executor = self.executor.clone();
// The order of the `select` is depended on below...
let race = checkout.select2(connect)
.map(move |either| match either {
// Checkout won, connect future may have been started or not.
//
// If it has, let it finish and insert back into the pool,
// so as to not waste the socket...
Either::A((checked_out, connecting)) => {
// This depends on the `select` above having the correct
// order, such that if the checkout future were ready
// immediately, the connect future will never have been
// started.
//
// If it *wasn't* ready yet, then the connect future will
// have been started...
if connecting.started() {
let bg = connecting
.map(|_pooled| {
// dropping here should just place it in
// the Pool for us...
})
.map_err(|err| {
trace!("background connect error: {}", err);
});
// An execute error here isn't important, we're just trying
// to prevent a waste of a socket...
let _ = executor.execute(bg);
}
checked_out
},
// Connect won, checkout can just be dropped.
Either::B((connected, _checkout)) => {
connected
},
})
.or_else(|either| match either {
// Either checkout or connect could get canceled:
//
// 1. Connect is canceled if this is HTTP/2 and there is
@@ -329,10 +364,19 @@ where C: Connect + Sync + 'static,
// idle connection reliably.
//
// In both cases, we should just wait for the other future.
if e.is_canceled() {
Either::A(other.map_err(ClientError::Normal))
} else {
Either::B(future::err(ClientError::Normal(e)))
Either::A((err, connecting)) => {
if err.is_canceled() {
Either::A(Either::A(connecting.map_err(ClientError::Normal)))
} else {
Either::B(future::err(ClientError::Normal(err)))
}
},
Either::B((err, checkout)) => {
if err.is_canceled() {
Either::A(Either::B(checkout.map_err(ClientError::Normal)))
} else {
Either::B(future::err(ClientError::Normal(err)))
}
}
});

View File

@@ -151,6 +151,26 @@ impl<T: Poolable> Pool<T> {
}
}
#[cfg(feature = "runtime")]
#[cfg(test)]
pub(super) fn h1_key(&self, s: &str) -> Key {
(Arc::new(s.to_string()), Ver::Http1)
}
#[cfg(feature = "runtime")]
#[cfg(test)]
pub(super) fn idle_count(&self, key: &Key) -> usize {
self
.inner
.connections
.lock()
.unwrap()
.idle
.get(key)
.map(|list| list.len())
.unwrap_or(0)
}
fn take(&self, key: &Key) -> Option<Pooled<T>> {
let entry = {
let mut inner = self.inner.connections.lock().unwrap();

View File

@@ -1,8 +1,9 @@
#![cfg(feature = "runtime")]
extern crate pretty_env_logger;
use futures::Async;
use futures::{Async, Future, Stream};
use futures::future::poll_fn;
use futures::sync::oneshot;
use tokio::runtime::current_thread::Runtime;
use mock::MockConnector;
@@ -73,8 +74,6 @@ fn conn_reset_after_write() {
{
let req = Request::builder()
.uri("http://mock.local/a")
//TODO: remove this header when auto lengths are fixed
.header("content-length", "0")
.body(Default::default())
.unwrap();
let res1 = client.request(req);
@@ -110,3 +109,95 @@ fn conn_reset_after_write() {
}
}
#[test]
fn checkout_win_allows_connect_future_to_be_pooled() {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().expect("new rt");
let mut connector = MockConnector::new();
let (tx, rx) = oneshot::channel::<()>();
let sock1 = connector.mock("http://mock.local");
let sock2 = connector.mock_fut("http://mock.local", rx);
let client = Client::builder()
.build::<_, ::Body>(connector);
client.pool.no_timer();
let uri = "http://mock.local/a".parse::<::Uri>().expect("uri parse");
// First request just sets us up to have a connection able to be put
// back in the pool. *However*, it doesn't insert immediately. The
// body has 1 pending byte, and we will only drain in request 2, once
// the connect future has been started.
let mut body = {
let res1 = client.get(uri.clone())
.map(|res| res.into_body().concat2());
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 1\r\n\r\nx"));
Ok(Async::Ready(()))
}).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e));
rt.block_on(res1.join(srv1)).expect("res1").0
};
// The second request triggers the only mocked connect future, but then
// the drained body allows the first socket to go back to the pool,
// "winning" the checkout race.
{
let res2 = client.get(uri.clone());
let drain = poll_fn(move || {
body.poll()
});
let srv2 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\nx"));
Ok(Async::Ready(()))
}).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e));
rt.block_on(res2.join(drain).join(srv2)).expect("res2");
}
// "Release" the mocked connect future, and let the runtime spin once so
// it's all setup...
{
let mut tx = Some(tx);
let client = &client;
let key = client.pool.h1_key("http://mock.local");
let mut tick_cnt = 0;
let fut = poll_fn(move || {
tx.take();
if client.pool.idle_count(&key) == 0 {
tick_cnt += 1;
assert!(tick_cnt < 10, "ticked too many times waiting for idle");
trace!("no idle yet; tick count: {}", tick_cnt);
::futures::task::current().notify();
Ok(Async::NotReady)
} else {
Ok::<_, ()>(Async::Ready(()))
}
});
rt.block_on(fut).unwrap();
}
// Third request just tests out that the "loser" connection was pooled. If
// it isn't, this will panic since the MockConnector doesn't have any more
// mocks to give out.
{
let res3 = client.get(uri);
let srv3 = poll_fn(|| {
try_ready!(sock2.read(&mut [0u8; 512]));
try_ready!(sock2.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
}).map_err(|e: ::std::io::Error| panic!("srv3 poll_fn error: {}", e));
rt.block_on(res3.join(srv3)).expect("res3");
}
}

63
src/common/lazy.rs Normal file
View File

@@ -0,0 +1,63 @@
use std::mem;
use futures::{Future, IntoFuture, Poll};
pub(crate) fn lazy<F, R>(func: F) -> Lazy<F, R>
where
F: FnOnce() -> R,
R: IntoFuture,
{
Lazy {
inner: Inner::Init(func),
}
}
pub struct Lazy<F, R: IntoFuture> {
inner: Inner<F, R::Future>
}
enum Inner<F, R> {
Init(F),
Fut(R),
Empty,
}
impl<F, R> Lazy<F, R>
where
F: FnOnce() -> R,
R: IntoFuture,
{
pub fn started(&self) -> bool {
match self.inner {
Inner::Init(_) => false,
Inner::Fut(_) |
Inner::Empty => true,
}
}
}
impl<F, R> Future for Lazy<F, R>
where
F: FnOnce() -> R,
R: IntoFuture,
{
type Item = R::Item;
type Error = R::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner {
Inner::Fut(ref mut f) => return f.poll(),
_ => (),
}
match mem::replace(&mut self.inner, Inner::Empty) {
Inner::Init(func) => {
let mut fut = func().into_future();
let ret = fut.poll();
self.inner = Inner::Fut(fut);
ret
},
_ => unreachable!("lazy state wrong"),
}
}
}

View File

@@ -1,8 +1,10 @@
mod buf;
mod exec;
pub(crate) mod io;
mod lazy;
mod never;
pub(crate) use self::buf::StaticBuf;
pub(crate) use self::exec::Exec;
pub(crate) use self::lazy::lazy;
pub use self::never::Never;

View File

@@ -7,6 +7,8 @@ use std::sync::{Arc, Mutex};
use bytes::Buf;
use futures::{Async, Poll};
#[cfg(feature = "runtime")]
use futures::Future;
use futures::task::{self, Task};
use tokio_io::{AsyncRead, AsyncWrite};
@@ -50,6 +52,7 @@ impl<S: AsRef<[u8]>> PartialEq<S> for MockCursor {
impl Write for MockCursor {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
trace!("MockCursor::write; len={}", data.len());
self.vec.extend(data);
Ok(data.len())
}
@@ -62,7 +65,13 @@ impl Write for MockCursor {
impl Read for MockCursor {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&self.vec[self.pos..]).read(buf).map(|n| {
trace!("MockCursor::read; len={}", n);
self.pos += n;
if self.pos == self.vec.len() {
trace!("MockCursor::read to end, clearing");
self.pos = 0;
self.vec.clear();
}
n
})
}
@@ -165,7 +174,11 @@ impl AsyncIo<MockCursor> {
#[cfg(feature = "runtime")]
fn close(&mut self) {
self.block_in(1);
assert_eq!(self.inner.vec.len(), self.inner.pos);
assert_eq!(
self.inner.vec.len(),
self.inner.pos,
"AsyncIo::close(), but cursor not consumed",
);
self.inner.vec.truncate(0);
self.inner.pos = 0;
}
@@ -309,6 +322,31 @@ struct DuplexInner {
write: AsyncIo<MockCursor>,
}
#[cfg(feature = "runtime")]
impl Duplex {
pub(crate) fn channel() -> (Duplex, DuplexHandle) {
let mut inner = DuplexInner {
handle_read_task: None,
read: AsyncIo::new_buf(Vec::new(), 0),
write: AsyncIo::new_buf(Vec::new(), ::std::usize::MAX),
};
inner.read.park_tasks(true);
inner.write.park_tasks(true);
let inner = Arc::new(Mutex::new(inner));
let duplex = Duplex {
inner: inner.clone(),
};
let handle = DuplexHandle {
inner: inner,
};
(duplex, handle)
}
}
#[cfg(feature = "runtime")]
impl Read for Duplex {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
@@ -372,8 +410,8 @@ impl DuplexHandle {
pub fn write(&self, bytes: &[u8]) -> Poll<usize, io::Error> {
let mut inner = self.inner.lock().unwrap();
assert!(inner.read.inner.vec.is_empty());
assert_eq!(inner.read.inner.pos, 0);
assert_eq!(inner.read.inner.vec.len(), 0, "write but read isn't empty");
inner
.read
.inner
@@ -388,15 +426,20 @@ impl DuplexHandle {
impl Drop for DuplexHandle {
fn drop(&mut self) {
trace!("mock duplex handle drop");
let mut inner = self.inner.lock().unwrap();
inner.read.close();
inner.write.close();
if !::std::thread::panicking() {
let mut inner = self.inner.lock().unwrap();
inner.read.close();
inner.write.close();
}
}
}
#[cfg(feature = "runtime")]
type BoxedConnectFut = Box<Future<Item=(Duplex, Connected), Error=io::Error> + Send>;
#[cfg(feature = "runtime")]
pub struct MockConnector {
mocks: Mutex<HashMap<String, Vec<Duplex>>>,
mocks: Mutex<HashMap<String, Vec<BoxedConnectFut>>>,
}
#[cfg(feature = "runtime")]
@@ -408,28 +451,25 @@ impl MockConnector {
}
pub fn mock(&mut self, key: &str) -> DuplexHandle {
use futures::future;
self.mock_fut(key, future::ok::<_, ()>(()))
}
pub fn mock_fut<F>(&mut self, key: &str, fut: F) -> DuplexHandle
where
F: Future + Send + 'static,
{
let key = key.to_owned();
let mut inner = DuplexInner {
handle_read_task: None,
read: AsyncIo::new_buf(Vec::new(), 0),
write: AsyncIo::new_buf(Vec::new(), ::std::usize::MAX),
};
inner.read.park_tasks(true);
inner.write.park_tasks(true);
let inner = Arc::new(Mutex::new(inner));
let duplex = Duplex {
inner: inner.clone(),
};
let handle = DuplexHandle {
inner: inner,
};
let (duplex, handle) = Duplex::channel();
let fut = Box::new(fut.then(move |_| {
trace!("MockConnector mocked fut ready");
Ok((duplex, Connected::new()))
}));
self.mocks.lock().unwrap().entry(key)
.or_insert(Vec::new())
.push(duplex);
.push(fut);
handle
}
@@ -439,10 +479,9 @@ impl MockConnector {
impl Connect for MockConnector {
type Transport = Duplex;
type Error = io::Error;
type Future = ::futures::future::FutureResult<(Self::Transport, Connected), Self::Error>;
type Future = BoxedConnectFut;
fn connect(&self, dst: Destination) -> Self::Future {
use futures::future;
trace!("mock connect: {:?}", dst);
let key = format!("{}://{}{}", dst.scheme(), dst.host(), if let Some(port) = dst.port() {
format!(":{}", port)
@@ -453,6 +492,24 @@ impl Connect for MockConnector {
let mocks = mocks.get_mut(&key)
.expect(&format!("unknown mocks uri: {}", key));
assert!(!mocks.is_empty(), "no additional mocks for {}", key);
future::ok((mocks.remove(0), Connected::new()))
mocks.remove(0)
}
}
#[cfg(feature = "runtime")]
impl Drop for MockConnector {
fn drop(&mut self) {
if !::std::thread::panicking() {
let mocks = self.mocks.lock().unwrap();
for (key, mocks) in mocks.iter() {
assert_eq!(
mocks.len(),
0,
"not all mocked connects for {:?} were used",
key,
);
}
}
}
}