feat(http2): add HTTP/2 support for Client and Server

This commit is contained in:
Sean McArthur
2018-04-13 13:20:47 -07:00
parent fe1578acf6
commit c119097fd0
25 changed files with 2014 additions and 363 deletions

View File

@@ -16,6 +16,7 @@ use futures::future::{self, Either};
use tokio_io::{AsyncRead, AsyncWrite};
use body::Payload;
use common::Exec;
use proto;
use super::dispatch;
use {Body, Request, Response, StatusCode};
@@ -25,7 +26,7 @@ use {Body, Request, Response, StatusCode};
/// This is a shortcut for `Builder::new().handshake(io)`.
pub fn handshake<T>(io: T) -> Handshake<T, ::Body>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Send + 'static,
{
Builder::new()
.handshake(io)
@@ -33,10 +34,10 @@ where
/// The sender side of an established connection.
pub struct SendRequest<B> {
dispatch: dispatch::Sender<proto::dispatch::ClientMsg<B>, Response<Body>>,
dispatch: dispatch::Sender<Request<B>, Response<Body>>,
}
/// A future that processes all HTTP state for the IO object.
///
/// In most cases, this should just be spawned into an executor, so that it
@@ -44,15 +45,17 @@ pub struct SendRequest<B> {
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
{
inner: proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
B,
T,
B::Data,
proto::ClientUpgradeTransaction,
inner: Either<
proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
B,
T,
proto::ClientUpgradeTransaction,
>,
proto::h2::Client<T, B>,
>,
}
@@ -62,7 +65,9 @@ where
/// After setting options, the builder is used to create a `Handshake` future.
#[derive(Clone, Debug)]
pub struct Builder {
exec: Exec,
h1_writev: bool,
http2: bool,
}
/// A future setting up HTTP over an IO object.
@@ -103,7 +108,18 @@ pub struct Parts<T> {
_inner: (),
}
// internal client api
// ========== internal client api
/// A `Future` for when `SendRequest::poll_ready()` is ready.
pub(super) struct WhenReady<B> {
tx: Option<SendRequest<B>>,
}
// A `SendRequest` that can be cloned to send HTTP2 requests.
// private for now, probably not a great idea of a type...
pub(super) struct Http2SendRequest<B> {
dispatch: dispatch::UnboundedSender<Request<B>, Response<Body>>,
}
#[must_use = "futures do nothing unless polled"]
pub(super) struct HandshakeNoUpgrades<T, B> {
@@ -127,6 +143,12 @@ impl<B> SendRequest<B>
self.dispatch.poll_ready()
}
pub(super) fn when_ready(self) -> WhenReady<B> {
WhenReady {
tx: Some(self),
}
}
pub(super) fn is_ready(&self) -> bool {
self.dispatch.is_ready()
}
@@ -134,6 +156,12 @@ impl<B> SendRequest<B>
pub(super) fn is_closed(&self) -> bool {
self.dispatch.is_closed()
}
pub(super) fn into_http2(self) -> Http2SendRequest<B> {
Http2SendRequest {
dispatch: self.dispatch.unbound(),
}
}
}
impl<B> SendRequest<B>
@@ -257,16 +285,81 @@ impl<B> fmt::Debug for SendRequest<B> {
}
}
// ===== impl Http2SendRequest
impl<B> Http2SendRequest<B> {
pub(super) fn is_ready(&self) -> bool {
self.dispatch.is_ready()
}
pub(super) fn is_closed(&self) -> bool {
self.dispatch.is_closed()
}
}
impl<B> Http2SendRequest<B>
where
B: Payload + 'static,
{
//TODO: replace with `impl Future` when stable
pub(super) fn send_request_retryable(&mut self, req: Request<B>) -> Box<Future<Item=Response<Body>, Error=(::Error, Option<Request<B>>)> + Send>
where
B: Send,
{
let inner = match self.dispatch.try_send(req) {
Ok(rx) => {
Either::A(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
},
Err(req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
Either::B(future::err((err, Some(req))))
}
};
Box::new(inner)
}
}
impl<B> fmt::Debug for Http2SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Http2SendRequest")
.finish()
}
}
impl<B> Clone for Http2SendRequest<B> {
fn clone(&self) -> Self {
Http2SendRequest {
dispatch: self.dispatch.clone(),
}
}
}
// ===== impl Connection
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
{
/// Return the inner IO object, and additional information.
///
/// Only works for HTTP/1 connections. HTTP/2 connections will panic.
pub fn into_parts(self) -> Parts<T> {
let (io, read_buf) = self.inner.into_inner();
let (io, read_buf) = match self.inner {
Either::A(h1) => h1.into_inner(),
Either::B(_h2) => {
panic!("http2 cannot into_inner");
}
};
Parts {
io: io,
read_buf: read_buf,
@@ -282,13 +375,20 @@ where
/// but it is not desired to actally shutdown the IO object. Instead you
/// would take it back using `into_parts`.
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
self.inner.poll_without_shutdown()
match self.inner {
Either::A(ref mut h1) => {
h1.poll_without_shutdown()
},
Either::B(ref mut h2) => {
h2.poll()
}
}
}
}
impl<T, B> Future for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
{
type Item = ();
@@ -301,7 +401,7 @@ where
impl<T, B> fmt::Debug for Connection<T, B>
where
T: AsyncRead + AsyncWrite + fmt::Debug,
T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
B: Payload + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -317,20 +417,37 @@ impl Builder {
#[inline]
pub fn new() -> Builder {
Builder {
exec: Exec::Default,
h1_writev: true,
http2: false,
}
}
/*
pub(super) fn exec(&mut self, exec: Exec) -> &mut Builder {
self.exec = exec;
self
}
*/
pub(super) fn h1_writev(&mut self, enabled: bool) -> &mut Builder {
self.h1_writev = enabled;
self
}
/// Sets whether HTTP2 is required.
///
/// Default is false.
pub fn http2_only(&mut self, enabled: bool) -> &mut Builder {
self.http2 = enabled;
self
}
/// Constructs a connection with the configured options and IO.
#[inline]
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
{
Handshake {
@@ -344,7 +461,7 @@ impl Builder {
pub(super) fn handshake_no_upgrades<T, B>(&self, io: T) -> HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
{
HandshakeNoUpgrades {
@@ -361,7 +478,7 @@ impl Builder {
impl<T, B> Future for Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
{
type Item = (SendRequest<B>, Connection<T, B>);
@@ -386,15 +503,17 @@ impl<T, B> fmt::Debug for Handshake<T, B> {
impl<T, B> Future for HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
{
type Item = (SendRequest<B>, proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
B,
T,
B::Data,
proto::ClientTransaction,
type Item = (SendRequest<B>, Either<
proto::h1::Dispatcher<
proto::h1::dispatch::Client<B>,
B,
T,
proto::ClientTransaction,
>,
proto::h2::Client<T, B>,
>);
type Error = ::Error;
@@ -405,35 +524,45 @@ where
impl<T, B, R> Future for HandshakeInner<T, B, R>
where
T: AsyncRead + AsyncWrite,
B: Payload + 'static,
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload,
R: proto::Http1Transaction<
Incoming=StatusCode,
Outgoing=proto::RequestLine,
>,
{
type Item = (SendRequest<B>, proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
B,
T,
B::Data,
R,
type Item = (SendRequest<B>, Either<
proto::h1::Dispatcher<
proto::h1::dispatch::Client<B>,
B,
T,
R,
>,
proto::h2::Client<T, B>,
>);
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let io = self.io.take().expect("polled more than once");
let (tx, rx) = dispatch::channel();
let mut conn = proto::Conn::new(io);
if !self.builder.h1_writev {
conn.set_write_strategy_flatten();
}
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
let either = if !self.builder.http2 {
let mut conn = proto::Conn::new(io);
if !self.builder.h1_writev {
conn.set_write_strategy_flatten();
}
let cd = proto::h1::dispatch::Client::new(rx);
let dispatch = proto::h1::Dispatcher::new(cd, conn);
Either::A(dispatch)
} else {
let h2 = proto::h2::Client::new(io, rx, self.builder.exec.clone());
Either::B(h2)
};
Ok(Async::Ready((
SendRequest {
dispatch: tx,
},
dispatch,
either,
)))
}
}
@@ -457,6 +586,24 @@ impl fmt::Debug for ResponseFuture {
}
}
// ===== impl WhenReady
impl<B> Future for WhenReady<B> {
type Item = SendRequest<B>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut tx = self.tx.take().expect("polled after complete");
match tx.poll_ready()? {
Async::Ready(()) => Ok(Async::Ready(tx)),
Async::NotReady => {
self.tx = Some(tx);
Ok(Async::NotReady)
}
}
}
}
// assert trait markers
trait AssertSend: Send {}
@@ -469,7 +616,7 @@ impl<B: Send> AssertSendSync for SendRequest<B> {}
#[doc(hidden)]
impl<T: Send, B: Send> AssertSend for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
B::Data: Send + 'static,
{}
@@ -477,7 +624,7 @@ where
#[doc(hidden)]
impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static,
B::Data: Send + Sync + 'static,
{}

View File

@@ -1,17 +1,19 @@
use std::sync::Arc;
use futures::{Async, Poll, Stream};
use futures::sync::{mpsc, oneshot};
use want;
use common::Never;
//pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>;
pub type RetryPromise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>;
pub type Promise<T> = oneshot::Receiver<Result<T, ::Error>>;
pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::channel(0);
let (tx, rx) = mpsc::unbounded();
let (giver, taker) = want::new();
let tx = Sender {
buffered_once: false,
giver: giver,
inner: tx,
};
@@ -22,28 +24,38 @@ pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
(tx, rx)
}
/// A bounded sender of requests and callbacks for when responses are ready.
///
/// While the inner sender is unbounded, the Giver is used to determine
/// if the Receiver is ready for another request.
pub struct Sender<T, U> {
// The Giver helps watch that the the Receiver side has been polled
// when the queue is empty. This helps us know when a request and
// response have been fully processed, and a connection is ready
// for more.
/// One message is always allowed, even if the Receiver hasn't asked
/// for it yet. This boolean keeps track of whether we've sent one
/// without notice.
buffered_once: bool,
/// The Giver helps watch that the the Receiver side has been polled
/// when the queue is empty. This helps us know when a request and
/// response have been fully processed, and a connection is ready
/// for more.
giver: want::Giver,
//inner: mpsc::Sender<(T, Callback<T, U>)>,
inner: mpsc::Sender<Envelope<T, U>>,
/// Actually bounded by the Giver, plus `buffered_once`.
inner: mpsc::UnboundedSender<Envelope<T, U>>,
}
/// An unbounded version.
///
/// Cannot poll the Giver, but can still use it to determine if the Receiver
/// has been dropped. However, this version can be cloned.
pub struct UnboundedSender<T, U> {
// Only used for `is_closed`, since mpsc::UnboundedSender cannot be checked.
giver: Arc<want::Giver>,
inner: mpsc::UnboundedSender<Envelope<T, U>>,
}
impl<T, U> Sender<T, U> {
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
match self.inner.poll_ready() {
Ok(Async::Ready(())) => {
// there's room in the queue, but does the Connection
// want a message yet?
self.giver.poll_want()
.map_err(|_| ::Error::new_closed())
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(::Error::new_closed()),
}
self.giver.poll_want()
.map_err(|_| ::Error::new_closed())
}
pub fn is_ready(&self) -> bool {
@@ -54,24 +66,75 @@ impl<T, U> Sender<T, U> {
self.giver.is_canceled()
}
fn can_send(&mut self) -> bool {
if self.giver.give() || !self.buffered_once {
// If the receiver is ready *now*, then of course we can send.
//
// If the receiver isn't ready yet, but we don't have anything
// in the channel yet, then allow one message.
self.buffered_once = true;
true
} else {
false
}
}
pub fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
if !self.can_send() {
return Err(val);
}
let (tx, rx) = oneshot::channel();
self.inner.try_send(Envelope(Some((val, Callback::Retry(tx)))))
self.inner.unbounded_send(Envelope(Some((val, Callback::Retry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
}
pub fn send(&mut self, val: T) -> Result<Promise<U>, T> {
if !self.can_send() {
return Err(val);
}
let (tx, rx) = oneshot::channel();
self.inner.try_send(Envelope(Some((val, Callback::NoRetry(tx)))))
self.inner.unbounded_send(Envelope(Some((val, Callback::NoRetry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
}
pub fn unbound(self) -> UnboundedSender<T, U> {
UnboundedSender {
giver: Arc::new(self.giver),
inner: self.inner,
}
}
}
impl<T, U> UnboundedSender<T, U> {
pub fn is_ready(&self) -> bool {
self.giver.is_wanting()
}
pub fn is_closed(&self) -> bool {
self.giver.is_canceled()
}
pub fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner.unbounded_send(Envelope(Some((val, Callback::Retry(tx)))))
.map(move |_| rx)
.map_err(|e| e.into_inner().0.take().expect("envelope not dropped").0)
}
}
impl<T, U> Clone for UnboundedSender<T, U> {
fn clone(&self) -> Self {
UnboundedSender {
giver: self.giver.clone(),
inner: self.inner.clone(),
}
}
}
pub struct Receiver<T, U> {
//inner: mpsc::Receiver<(T, Callback<T, U>)>,
inner: mpsc::Receiver<Envelope<T, U>>,
inner: mpsc::UnboundedReceiver<Envelope<T, U>>,
taker: want::Taker,
}
@@ -166,19 +229,21 @@ mod tests {
#[cfg(feature = "nightly")]
extern crate test;
use futures::{future, Future};
use futures::{future, Future, Stream};
#[cfg(feature = "nightly")]
use futures::{Stream};
#[derive(Debug)]
struct Custom(i32);
#[test]
fn drop_receiver_sends_cancel_errors() {
let _ = pretty_env_logger::try_init();
future::lazy(|| {
#[derive(Debug)]
struct Custom(i32);
let (mut tx, rx) = super::channel::<Custom, ()>();
let (mut tx, mut rx) = super::channel::<Custom, ()>();
// must poll once for try_send to succeed
assert!(rx.poll().expect("rx empty").is_not_ready());
let promise = tx.try_send(Custom(43)).unwrap();
drop(rx);
@@ -198,6 +263,40 @@ mod tests {
}).wait().unwrap();
}
#[test]
fn sender_checks_for_want_on_send() {
future::lazy(|| {
let (mut tx, mut rx) = super::channel::<Custom, ()>();
// one is allowed to buffer, second is rejected
let _ = tx.try_send(Custom(1)).expect("1 buffered");
tx.try_send(Custom(2)).expect_err("2 not ready");
assert!(rx.poll().expect("rx 1").is_ready());
// Even though 1 has been popped, only 1 could be buffered for the
// lifetime of the channel.
tx.try_send(Custom(2)).expect_err("2 still not ready");
assert!(rx.poll().expect("rx empty").is_not_ready());
let _ = tx.try_send(Custom(2)).expect("2 ready");
Ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
fn unbounded_sender_doesnt_bound_on_want() {
let (tx, rx) = super::channel::<Custom, ()>();
let mut tx = tx.unbound();
let _ = tx.try_send(Custom(1)).unwrap();
let _ = tx.try_send(Custom(2)).unwrap();
let _ = tx.try_send(Custom(3)).unwrap();
drop(rx);
let _ = tx.try_send(Custom(4)).unwrap_err();
}
#[cfg(feature = "nightly")]
#[bench]
fn giver_queue_throughput(b: &mut test::Bencher) {

View File

@@ -6,16 +6,16 @@ use std::sync::Arc;
use std::time::Duration;
use futures::{Async, Future, Poll};
use futures::future::{self, Executor};
use futures::future::{self, Either, Executor};
use futures::sync::oneshot;
use http::{Method, Request, Response, Uri, Version};
use http::header::{Entry, HeaderValue, HOST};
use http::uri::Scheme;
use tokio_executor::spawn;
pub use tokio_service::Service;
use body::{Body, Payload};
use self::pool::Pool;
use common::Exec;
use self::pool::{Pool, Poolable, Reservation};
pub use self::connect::{Connect, HttpConnector};
@@ -37,6 +37,7 @@ pub struct Client<C, B = Body> {
pool: Pool<PoolClient<B>>,
retry_canceled_requests: bool,
set_host: bool,
ver: Ver,
}
impl Client<HttpConnector, Body> {
@@ -143,7 +144,7 @@ where C: Connect + Sync + 'static,
}
};
if self.set_host {
if self.set_host && self.ver == Ver::Http1 {
if let Entry::Vacant(entry) = req.headers_mut().entry(HOST).expect("HOST is always valid header name") {
let hostname = uri.host().expect("authority implies host");
let host = if let Some(port) = uri.port() {
@@ -171,50 +172,78 @@ where C: Connect + Sync + 'static,
//TODO: replace with `impl Future` when stable
fn send_request(&self, mut req: Request<B>, domain: &str) -> Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send> {
let url = req.uri().clone();
let checkout = self.pool.checkout(domain);
let ver = self.ver;
let pool_key = (Arc::new(domain.to_string()), self.ver);
let checkout = self.pool.checkout(pool_key.clone());
let connect = {
let executor = self.executor.clone();
let pool = self.pool.clone();
let pool_key = Arc::new(domain.to_string());
let h1_writev = self.h1_writev;
let connector = self.connector.clone();
let dst = Destination {
uri: url,
};
future::lazy(move || {
connector.connect(dst)
.map_err(::Error::new_connect)
.and_then(move |(io, connected)| {
conn::Builder::new()
.h1_writev(h1_writev)
.handshake_no_upgrades(io)
.and_then(move |(tx, conn)| {
executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)));
Ok(pool.pooled(pool_key, PoolClient {
is_proxied: connected.is_proxied,
tx: tx,
}))
})
})
if let Some(connecting) = pool.connecting(&pool_key) {
Either::A(connector.connect(dst)
.map_err(::Error::new_connect)
.and_then(move |(io, connected)| {
conn::Builder::new()
.h1_writev(h1_writev)
.http2_only(pool_key.1 == Ver::Http2)
.handshake_no_upgrades(io)
.and_then(move |(tx, conn)| {
executor.execute(conn.map_err(|e| {
debug!("client connection error: {}", e)
}));
// Wait for 'conn' to ready up before we
// declare this tx as usable
tx.when_ready()
})
.map(move |tx| {
pool.pooled(connecting, PoolClient {
is_proxied: connected.is_proxied,
tx: match ver {
Ver::Http1 => PoolTx::Http1(tx),
Ver::Http2 => PoolTx::Http2(tx.into_http2()),
},
})
})
}))
} else {
let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress"));
Either::B(future::err(canceled))
}
})
};
let race = checkout.select(connect)
.map(|(pooled, _work)| pooled)
.map_err(|(e, _checkout)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
ClientError::Normal(e)
.or_else(|(e, other)| {
// Either checkout or connect could get canceled:
//
// 1. Connect is canceled if this is HTTP/2 and there is
// an outstanding HTTP/2 connecting task.
// 2. Checkout is canceled if the pool cannot deliver an
// idle connection reliably.
//
// In both cases, we should just wait for the other future.
if e.is_canceled() {
//trace!("checkout/connect race canceled: {}", e);
Either::A(other.map_err(ClientError::Normal))
} else {
Either::B(future::err(ClientError::Normal(e)))
}
});
let executor = self.executor.clone();
let resp = race.and_then(move |mut pooled| {
let conn_reused = pooled.is_reused();
set_relative_uri(req.uri_mut(), pooled.is_proxied);
let fut = pooled.tx.send_request_retryable(req)
if ver == Ver::Http1 {
set_relative_uri(req.uri_mut(), pooled.is_proxied);
}
let fut = pooled.send_request_retryable(req)
.map_err(move |(err, orig_req)| {
if let Some(req) = orig_req {
ClientError::Canceled {
@@ -235,14 +264,14 @@ where C: Connect + Sync + 'static,
// for a new request to start.
//
// It won't be ready if there is a body to stream.
if pooled.tx.is_ready() {
if pooled.is_ready() {
drop(pooled);
} else if !res.body().is_empty() {
let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx);
executor.execute(
future::poll_fn(move || {
pooled.tx.poll_ready()
pooled.poll_ready()
})
.then(move |_| {
// At this point, `pooled` is dropped, and had a chance
@@ -291,6 +320,7 @@ impl<C, B> Clone for Client<C, B> {
pool: self.pool.clone(),
retry_canceled_requests: self.retry_canceled_requests,
set_host: self.set_host,
ver: self.ver,
}
}
}
@@ -366,15 +396,74 @@ where
struct PoolClient<B> {
is_proxied: bool,
tx: conn::SendRequest<B>,
tx: PoolTx<B>,
}
impl<B> self::pool::Closed for PoolClient<B>
enum PoolTx<B> {
Http1(conn::SendRequest<B>),
Http2(conn::Http2SendRequest<B>),
}
impl<B> PoolClient<B> {
fn poll_ready(&mut self) -> Poll<(), ::Error> {
match self.tx {
PoolTx::Http1(ref mut tx) => tx.poll_ready(),
PoolTx::Http2(_) => Ok(Async::Ready(())),
}
}
fn is_ready(&self) -> bool {
match self.tx {
PoolTx::Http1(ref tx) => tx.is_ready(),
PoolTx::Http2(ref tx) => tx.is_ready(),
}
}
}
impl<B: Payload + 'static> PoolClient<B> {
//TODO: replace with `impl Future` when stable
fn send_request_retryable(&mut self, req: Request<B>) -> Box<Future<Item=Response<Body>, Error=(::Error, Option<Request<B>>)> + Send>
where
B: Send,
{
match self.tx {
PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req),
PoolTx::Http2(ref mut tx) => tx.send_request_retryable(req),
}
}
}
impl<B> Poolable for PoolClient<B>
where
B: 'static,
{
fn is_closed(&self) -> bool {
self.tx.is_closed()
match self.tx {
PoolTx::Http1(ref tx) => tx.is_closed(),
PoolTx::Http2(ref tx) => tx.is_closed(),
}
}
fn reserve(self) -> Reservation<Self> {
match self.tx {
PoolTx::Http1(tx) => {
Reservation::Unique(PoolClient {
is_proxied: self.is_proxied,
tx: PoolTx::Http1(tx),
})
},
PoolTx::Http2(tx) => {
let b = PoolClient {
is_proxied: self.is_proxied,
tx: PoolTx::Http2(tx.clone()),
};
let a = PoolClient {
is_proxied: self.is_proxied,
tx: PoolTx::Http2(tx),
};
Reservation::Shared(a, b)
}
}
}
}
@@ -387,17 +476,24 @@ enum ClientError<B> {
}
}
/// A marker to identify what version a pooled connection is.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum Ver {
Http1,
Http2,
}
fn set_relative_uri(uri: &mut Uri, is_proxied: bool) {
if is_proxied && uri.scheme_part() != Some(&Scheme::HTTPS) {
return;
}
let path = match uri.path_and_query() {
Some(path) => {
Some(path) if path.as_str() != "/" => {
let mut parts = ::http::uri::Parts::default();
parts.path_and_query = Some(path.clone());
Uri::from_parts(parts).expect("path is valid uri")
},
None => {
_none_or_just_slash => {
"/".parse().expect("/ is valid path")
}
};
@@ -416,6 +512,7 @@ pub struct Builder {
max_idle: usize,
retry_canceled_requests: bool,
set_host: bool,
ver: Ver,
}
impl Default for Builder {
@@ -428,6 +525,7 @@ impl Default for Builder {
max_idle: 5,
retry_canceled_requests: true,
set_host: true,
ver: Ver::Http1,
}
}
}
@@ -467,6 +565,20 @@ impl Builder {
self
}
/// Set whether the connection **must** use HTTP/2.
///
/// Note that setting this to true prevents HTTP/1 from being allowed.
///
/// Default is false.
pub fn http2_only(&mut self, val: bool) -> &mut Self {
self.ver = if val {
Ver::Http2
} else {
Ver::Http1
};
self
}
/// Set whether to retry requests that get disrupted before ever starting
/// to write.
///
@@ -534,6 +646,7 @@ impl Builder {
pool: Pool::new(self.keep_alive, self.keep_alive_timeout),
retry_canceled_requests: self.retry_canceled_requests,
set_host: self.set_host,
ver: self.ver,
}
}
}
@@ -546,33 +659,20 @@ impl fmt::Debug for Builder {
.field("http1_writev", &self.h1_writev)
.field("max_idle", &self.max_idle)
.field("set_host", &self.set_host)
.field("version", &self.ver)
.finish()
}
}
// ===== impl Exec =====
#[cfg(test)]
mod unit_tests {
use super::*;
#[derive(Clone)]
enum Exec {
Default,
Executor(Arc<Executor<Box<Future<Item=(), Error=()> + Send>> + Send + Sync>),
}
#[test]
fn set_relative_uri_with_implicit_path() {
let mut uri = "http://hyper.rs".parse().unwrap();
set_relative_uri(&mut uri, false);
impl Exec {
fn execute<F>(&self, fut: F)
where
F: Future<Item=(), Error=()> + Send + 'static,
{
match *self {
Exec::Default => spawn(fut),
Exec::Executor(ref e) => {
let _ = e.execute(Box::new(fut))
.map_err(|err| {
panic!("executor error: {:?}", err.kind());
});
},
}
assert_eq!(uri.to_string(), "/");
}
}

View File

@@ -1,4 +1,4 @@
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, Weak};
@@ -8,10 +8,10 @@ use futures::{Future, Async, Poll, Stream};
use futures::sync::oneshot;
use futures_timer::Interval;
use common::Never;
use super::Exec;
use common::{Exec, Never};
use super::Ver;
pub struct Pool<T> {
pub(super) struct Pool<T> {
inner: Arc<Mutex<PoolInner<T>>>,
}
@@ -20,15 +20,42 @@ pub struct Pool<T> {
// This is a trait to allow the `client::pool::tests` to work for `i32`.
//
// See https://github.com/hyperium/hyper/issues/1429
pub trait Closed {
pub(super) trait Poolable: Sized {
fn is_closed(&self) -> bool;
/// Reserve this connection.
///
/// Allows for HTTP/2 to return a shared reservation.
fn reserve(self) -> Reservation<Self>;
}
/// When checking out a pooled connection, it might be that the connection
/// only supports a single reservation, or it might be usable for many.
///
/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
/// used for multiple requests.
pub(super) enum Reservation<T> {
/// This connection could be used multiple times, the first one will be
/// reinserted into the `idle` pool, and the second will be given to
/// the `Checkout`.
#[allow(unused)]
Shared(T, T),
/// This connection requires unique access. It will be returned after
/// use is complete.
Unique(T),
}
/// Simple type alias in case the key type needs to be adjusted.
type Key = (Arc<String>, Ver);
struct PoolInner<T> {
// A flag that a connection is being estabilished, and the connection
// should be shared. This prevents making multiple HTTP/2 connections
// to the same host.
connecting: HashSet<Key>,
enabled: bool,
// These are internal Conns sitting in the event loop in the KeepAlive
// state, waiting to receive a new Request to send on the socket.
idle: HashMap<Arc<String>, Vec<Idle<T>>>,
idle: HashMap<Key, Vec<Idle<T>>>,
// These are outstanding Checkouts that are waiting for a socket to be
// able to send a Request one. This is used when "racing" for a new
// connection.
@@ -38,7 +65,7 @@ struct PoolInner<T> {
// this list is checked for any parked Checkouts, and tries to notify
// them that the Conn could be used instead of waiting for a brand new
// connection.
parked: HashMap<Arc<String>, VecDeque<oneshot::Sender<T>>>,
parked: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
timeout: Option<Duration>,
// A oneshot channel is used to allow the interval to be notified when
// the Pool completely drops. That way, the interval can cancel immediately.
@@ -49,6 +76,7 @@ impl<T> Pool<T> {
pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
Pool {
inner: Arc::new(Mutex::new(PoolInner {
connecting: HashSet::new(),
enabled: enabled,
idle: HashMap::new(),
idle_interval_ref: None,
@@ -59,40 +87,69 @@ impl<T> Pool<T> {
}
}
impl<T: Closed> Pool<T> {
pub fn checkout(&self, key: &str) -> Checkout<T> {
impl<T: Poolable> Pool<T> {
/// Returns a `Checkout` which is a future that resolves if an idle
/// connection becomes available.
pub fn checkout(&self, key: Key) -> Checkout<T> {
Checkout {
key: Arc::new(key.to_owned()),
key,
pool: self.clone(),
parked: None,
}
}
fn take(&self, key: &Arc<String>) -> Option<Pooled<T>> {
/// Ensure that there is only ever 1 connecting task for HTTP/2
/// connections. This does nothing for HTTP/1.
pub(super) fn connecting(&self, key: &Key) -> Option<Connecting<T>> {
if key.1 == Ver::Http2 {
let mut inner = self.inner.lock().unwrap();
if inner.connecting.insert(key.clone()) {
let connecting = Connecting {
key: key.clone(),
pool: Arc::downgrade(&self.inner),
};
Some(connecting)
} else {
trace!("HTTP/2 connecting already in progress for {:?}", key.0);
None
}
} else {
Some(Connecting {
key: key.clone(),
// in HTTP/1's case, there is never a lock, so we don't
// need to do anything in Drop.
pool: Weak::new(),
})
}
}
fn take(&self, key: &Key) -> Option<Pooled<T>> {
let entry = {
let mut inner = self.inner.lock().unwrap();
let expiration = Expiration::new(inner.timeout);
let mut should_remove = false;
let entry = inner.idle.get_mut(key).and_then(|list| {
trace!("take; url = {:?}, expiration = {:?}", key, expiration.0);
while let Some(entry) = list.pop() {
if !expiration.expires(entry.idle_at) {
if !entry.value.is_closed() {
should_remove = list.is_empty();
return Some(entry);
}
let maybe_entry = inner.idle.get_mut(key)
.and_then(|list| {
trace!("take? {:?}: expiration = {:?}", key, expiration.0);
// A block to end the mutable borrow on list,
// so the map below can check is_empty()
{
let popper = IdlePopper {
key,
list,
};
popper.pop(&expiration)
}
trace!("removing unacceptable pooled {:?}", key);
// every other case the Idle should just be dropped
// 1. Idle but expired
// 2. Busy (something else somehow took it?)
// 3. Disabled don't reuse of course
}
should_remove = true;
None
});
.map(|e| (e, list.is_empty()))
});
if should_remove {
let (entry, empty) = if let Some((e, empty)) = maybe_entry {
(Some(e), empty)
} else {
// No entry found means nuke the list for sure.
(None, true)
};
if empty {
//TODO: This could be done with the HashMap::entry API instead.
inner.idle.remove(key);
}
entry
@@ -101,17 +158,35 @@ impl<T: Closed> Pool<T> {
entry.map(|e| self.reuse(key, e.value))
}
pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
let value = match value.reserve() {
Reservation::Shared(to_insert, to_return) => {
debug_assert_eq!(
connecting.key.1,
Ver::Http2,
"shared reservation without Http2"
);
let mut inner = self.inner.lock().unwrap();
inner.put(connecting.key.clone(), to_insert);
// Do this here instead of Drop for Connecting because we
// already have a lock, no need to lock the mutex twice.
inner.connected(&connecting.key);
// prevent the Drop of Connecting from repeating inner.connected()
connecting.pool = Weak::new();
pub fn pooled(&self, key: Arc<String>, value: T) -> Pooled<T> {
to_return
},
Reservation::Unique(value) => value,
};
Pooled {
is_reused: false,
key: key,
key: connecting.key.clone(),
pool: Arc::downgrade(&self.inner),
value: Some(value)
}
}
fn reuse(&self, key: &Arc<String>, value: T) -> Pooled<T> {
fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
debug!("reuse idle connection for {:?}", key);
Pooled {
is_reused: true,
@@ -121,8 +196,8 @@ impl<T: Closed> Pool<T> {
}
}
fn park(&mut self, key: Arc<String>, tx: oneshot::Sender<T>) {
trace!("park; waiting for idle connection: {:?}", key);
fn park(&mut self, key: Key, tx: oneshot::Sender<T>) {
trace!("checkout waiting for idle connection: {:?}", key);
self.inner.lock().unwrap()
.parked.entry(key)
.or_insert(VecDeque::new())
@@ -130,19 +205,83 @@ impl<T: Closed> Pool<T> {
}
}
impl<T: Closed> PoolInner<T> {
fn put(&mut self, key: Arc<String>, value: T) {
/// Pop off this list, looking for a usable connection that hasn't expired.
struct IdlePopper<'a, T: 'a> {
key: &'a Key,
list: &'a mut Vec<Idle<T>>,
}
impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
while let Some(entry) = self.list.pop() {
// If the connection has been closed, or is older than our idle
// timeout, simply drop it and keep looking...
//
// TODO: Actually, since the `idle` list is pushed to the end always,
// that would imply that if *this* entry is expired, then anything
// "earlier" in the list would *have* to be expired also... Right?
//
// In that case, we could just break out of the loop and drop the
// whole list...
if entry.value.is_closed() || expiration.expires(entry.idle_at) {
trace!("remove unacceptable pooled connection for {:?}", self.key);
continue;
}
let value = match entry.value.reserve() {
Reservation::Shared(to_reinsert, to_checkout) => {
self.list.push(Idle {
idle_at: Instant::now(),
value: to_reinsert,
});
to_checkout
},
Reservation::Unique(unique) => {
unique
}
};
return Some(Idle {
idle_at: entry.idle_at,
value,
});
}
None
}
}
impl<T: Poolable> PoolInner<T> {
fn put(&mut self, key: Key, value: T) {
if !self.enabled {
return;
}
if key.1 == Ver::Http2 && self.idle.contains_key(&key) {
trace!("Pool::put; existing idle HTTP/2 connection for {:?}", key);
return;
}
trace!("Pool::put {:?}", key);
let mut remove_parked = false;
let mut value = Some(value);
if let Some(parked) = self.parked.get_mut(&key) {
while let Some(tx) = parked.pop_front() {
if !tx.is_canceled() {
match tx.send(value.take().unwrap()) {
Ok(()) => break,
let reserved = value.take().expect("value already sent");
let reserved = match reserved.reserve() {
Reservation::Shared(to_keep, to_send) => {
value = Some(to_keep);
to_send
},
Reservation::Unique(uniq) => uniq,
};
match tx.send(reserved) {
Ok(()) => {
if value.is_none() {
break;
} else {
continue;
}
},
Err(e) => {
value = Some(e);
}
@@ -170,6 +309,20 @@ impl<T: Closed> PoolInner<T> {
None => trace!("Pool::put found parked {:?}", key),
}
}
/// A `Connecting` task is complete. Not necessarily successfully,
/// but the lock is going away, so clean up.
fn connected(&mut self, key: &Key) {
let existed = self.connecting.remove(key);
debug_assert!(
existed,
"Connecting dropped, key not in pool.connecting"
);
// cancel any waiters. if there are any, it's because
// this Connecting task didn't complete successfully.
// those waiters would never receive a connection.
self.parked.remove(key);
}
}
impl<T> PoolInner<T> {
@@ -177,7 +330,7 @@ impl<T> PoolInner<T> {
/// and possibly inserted into the pool that it is waiting for an idle
/// connection. If a user ever dropped that future, we need to clean out
/// those parked senders.
fn clean_parked(&mut self, key: &Arc<String>) {
fn clean_parked(&mut self, key: &Key) {
let mut remove_parked = false;
if let Some(parked) = self.parked.get_mut(key) {
parked.retain(|tx| {
@@ -191,7 +344,7 @@ impl<T> PoolInner<T> {
}
}
impl<T: Closed> PoolInner<T> {
impl<T: Poolable> PoolInner<T> {
fn clear_expired(&mut self) {
let dur = if let Some(dur) = self.timeout {
dur
@@ -218,7 +371,7 @@ impl<T: Closed> PoolInner<T> {
}
impl<T: Closed + Send + 'static> Pool<T> {
impl<T: Poolable + Send + 'static> Pool<T> {
pub(super) fn spawn_expired_interval(&self, exec: &Exec) {
let (dur, rx) = {
let mut inner = self.inner.lock().unwrap();
@@ -257,14 +410,16 @@ impl<T> Clone for Pool<T> {
}
}
pub struct Pooled<T: Closed> {
/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
// Note: The bounds `T: Poolable` is needed for the Drop impl.
pub(super) struct Pooled<T: Poolable> {
value: Option<T>,
is_reused: bool,
key: Arc<String>,
key: Key,
pool: Weak<Mutex<PoolInner<T>>>,
}
impl<T: Closed> Pooled<T> {
impl<T: Poolable> Pooled<T> {
pub fn is_reused(&self) -> bool {
self.is_reused
}
@@ -278,22 +433,28 @@ impl<T: Closed> Pooled<T> {
}
}
impl<T: Closed> Deref for Pooled<T> {
impl<T: Poolable> Deref for Pooled<T> {
type Target = T;
fn deref(&self) -> &T {
self.as_ref()
}
}
impl<T: Closed> DerefMut for Pooled<T> {
impl<T: Poolable> DerefMut for Pooled<T> {
fn deref_mut(&mut self) -> &mut T {
self.as_mut()
}
}
impl<T: Closed> Drop for Pooled<T> {
impl<T: Poolable> Drop for Pooled<T> {
fn drop(&mut self) {
if let Some(value) = self.value.take() {
if value.is_closed() {
// If we *already* know the connection is done here,
// it shouldn't be re-inserted back into the pool.
return;
}
if let Some(inner) = self.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
inner.put(self.key.clone(), value);
@@ -305,7 +466,7 @@ impl<T: Closed> Drop for Pooled<T> {
}
}
impl<T: Closed> fmt::Debug for Pooled<T> {
impl<T: Poolable> fmt::Debug for Pooled<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Pooled")
.field("key", &self.key)
@@ -318,33 +479,30 @@ struct Idle<T> {
value: T,
}
pub struct Checkout<T> {
key: Arc<String>,
pub(super) struct Checkout<T> {
key: Key,
pool: Pool<T>,
parked: Option<oneshot::Receiver<T>>,
}
struct NotParked;
impl<T: Closed> Checkout<T> {
fn poll_parked(&mut self) -> Poll<Pooled<T>, NotParked> {
let mut drop_parked = false;
impl<T: Poolable> Checkout<T> {
fn poll_parked(&mut self) -> Poll<Option<Pooled<T>>, ::Error> {
static CANCELED: &str = "pool checkout failed";
if let Some(ref mut rx) = self.parked {
match rx.poll() {
Ok(Async::Ready(value)) => {
if !value.is_closed() {
return Ok(Async::Ready(self.pool.reuse(&self.key, value)));
Ok(Async::Ready(Some(self.pool.reuse(&self.key, value))))
} else {
Err(::Error::new_canceled(Some(CANCELED)))
}
drop_parked = true;
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_canceled) => drop_parked = true,
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_canceled) => Err(::Error::new_canceled(Some(CANCELED))),
}
} else {
Ok(Async::Ready(None))
}
if drop_parked {
self.parked.take();
}
Err(NotParked)
}
fn park(&mut self) {
@@ -357,14 +515,13 @@ impl<T: Closed> Checkout<T> {
}
}
impl<T: Closed> Future for Checkout<T> {
impl<T: Poolable> Future for Checkout<T> {
type Item = Pooled<T>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.poll_parked() {
Ok(async) => return Ok(async),
Err(_not_parked) => (),
if let Some(pooled) = try_ready!(self.poll_parked()) {
return Ok(Async::Ready(pooled));
}
let entry = self.pool.take(&self.key);
@@ -387,6 +544,27 @@ impl<T> Drop for Checkout<T> {
}
}
pub(super) struct Connecting<T: Poolable> {
key: Key,
pool: Weak<Mutex<PoolInner<T>>>,
}
impl<T: Poolable> Drop for Connecting<T> {
fn drop(&mut self) {
if let Some(pool) = self.pool.upgrade() {
// No need to panic on drop, that could abort!
if let Ok(mut inner) = pool.lock() {
debug_assert_eq!(
self.key.1,
Ver::Http2,
"Connecting constructed without Http2"
);
inner.connected(&self.key);
}
}
}
}
struct Expiration(Option<Duration>);
impl Expiration {
@@ -411,7 +589,7 @@ struct IdleInterval<T> {
pool_drop_notifier: oneshot::Receiver<Never>,
}
impl<T: Closed + 'static> Future for IdleInterval<T> {
impl<T: Poolable + 'static> Future for IdleInterval<T> {
type Item = ();
type Error = ();
@@ -441,28 +619,58 @@ impl<T: Closed + 'static> Future for IdleInterval<T> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::Duration;
use futures::{Async, Future};
use futures::future;
use super::{Closed, Pool, Exec};
use super::{Connecting, Key, Poolable, Pool, Reservation, Exec, Ver};
impl Closed for i32 {
/// Test unique reservations.
#[derive(Debug, PartialEq, Eq)]
struct Uniq<T>(T);
impl<T> Poolable for Uniq<T> {
fn is_closed(&self) -> bool {
false
}
fn reserve(self) -> Reservation<Self> {
Reservation::Unique(self)
}
}
/*
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
struct Share<T>(T);
impl<T> Poolable for Share<T> {
fn is_closed(&self) -> bool {
false
}
fn reserve(self) -> Reservation<Self> {
Reservation::Shared(self.clone(), self)
}
}
*/
fn c<T: Poolable>(key: Key) -> Connecting<T> {
Connecting {
key,
pool: Weak::new(),
}
}
#[test]
fn test_pool_checkout_smoke() {
let pool = Pool::new(true, Some(Duration::from_secs(5)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
let key = (Arc::new("foo".to_string()), Ver::Http1);
let pooled = pool.pooled(c(key.clone()), Uniq(41));
drop(pooled);
match pool.checkout(&key).poll().unwrap() {
Async::Ready(pooled) => assert_eq!(*pooled, 41),
match pool.checkout(key).poll().unwrap() {
Async::Ready(pooled) => assert_eq!(*pooled, Uniq(41)),
_ => panic!("not ready"),
}
}
@@ -471,11 +679,11 @@ mod tests {
fn test_pool_checkout_returns_none_if_expired() {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
let key = (Arc::new("foo".to_string()), Ver::Http1);
let pooled = pool.pooled(c(key.clone()), Uniq(41));
drop(pooled);
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
assert!(pool.checkout(&key).poll().unwrap().is_not_ready());
assert!(pool.checkout(key).poll().unwrap().is_not_ready());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
}
@@ -484,17 +692,17 @@ mod tests {
fn test_pool_checkout_removes_expired() {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let key = Arc::new("foo".to_string());
let key = (Arc::new("foo".to_string()), Ver::Http1);
pool.pooled(key.clone(), 41);
pool.pooled(key.clone(), 5);
pool.pooled(key.clone(), 99);
pool.pooled(c(key.clone()), Uniq(41));
pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99));
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
// checkout.poll() should clean out the expired
pool.checkout(&key).poll().unwrap();
pool.checkout(key.clone()).poll().unwrap();
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
Ok::<(), ()>(())
@@ -509,11 +717,11 @@ mod tests {
let executor = runtime.executor();
pool.spawn_expired_interval(&Exec::Executor(Arc::new(executor)));
let key = Arc::new("foo".to_string());
let key = (Arc::new("foo".to_string()), Ver::Http1);
pool.pooled(key.clone(), 41);
pool.pooled(key.clone(), 5);
pool.pooled(key.clone(), 99);
pool.pooled(c(key.clone()), Uniq(41));
pool.pooled(c(key.clone()), Uniq(5));
pool.pooled(c(key.clone()), Uniq(99));
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
@@ -527,10 +735,10 @@ mod tests {
#[test]
fn test_pool_checkout_task_unparked() {
let pool = Pool::new(true, Some(Duration::from_secs(10)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
let key = (Arc::new("foo".to_string()), Ver::Http1);
let pooled = pool.pooled(c(key.clone()), Uniq(41));
let checkout = pool.checkout(&key).join(future::lazy(move || {
let checkout = pool.checkout(key).join(future::lazy(move || {
// the checkout future will park first,
// and then this lazy future will be polled, which will insert
// the pooled back into the pool
@@ -539,17 +747,17 @@ mod tests {
drop(pooled);
Ok(())
})).map(|(entry, _)| entry);
assert_eq!(*checkout.wait().unwrap(), 41);
assert_eq!(*checkout.wait().unwrap(), Uniq(41));
}
#[test]
fn test_pool_checkout_drop_cleans_up_parked() {
future::lazy(|| {
let pool = Pool::<i32>::new(true, Some(Duration::from_secs(10)));
let key = Arc::new("localhost:12345".to_string());
let pool = Pool::<Uniq<i32>>::new(true, Some(Duration::from_secs(10)));
let key = (Arc::new("localhost:12345".to_string()), Ver::Http1);
let mut checkout1 = pool.checkout(&key);
let mut checkout2 = pool.checkout(&key);
let mut checkout1 = pool.checkout(key.clone());
let mut checkout2 = pool.checkout(key.clone());
// first poll needed to get into Pool's parked
checkout1.poll().unwrap();
@@ -567,4 +775,32 @@ mod tests {
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
}
#[derive(Debug)]
struct CanClose {
val: i32,
closed: bool,
}
impl Poolable for CanClose {
fn is_closed(&self) -> bool {
self.closed
}
fn reserve(self) -> Reservation<Self> {
Reservation::Unique(self)
}
}
#[test]
fn pooled_drop_if_closed_doesnt_reinsert() {
let pool = Pool::new(true, Some(Duration::from_secs(10)));
let key = (Arc::new("localhost:12345".to_string()), Ver::Http1);
pool.pooled(c(key.clone()), CanClose {
val: 57,
closed: true,
});
assert!(!pool.inner.lock().unwrap().idle.contains_key(&key));
}
}