feat(client): introduce lower-level Connection API

Closes #1449
This commit is contained in:
Sean McArthur
2018-03-07 12:59:55 -08:00
parent 0786ea1f87
commit 1207c2b624
19 changed files with 1814 additions and 792 deletions

View File

@@ -1,154 +0,0 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use futures::{Async, Future, Poll};
use futures::task::{self, Task};
use common::Never;
use self::lock::Lock;
#[derive(Clone)]
pub struct Cancel {
inner: Arc<Inner>,
}
pub struct Canceled {
inner: Arc<Inner>,
}
struct Inner {
is_canceled: AtomicBool,
task: Lock<Option<Task>>,
}
impl Cancel {
pub fn new() -> (Cancel, Canceled) {
let inner = Arc::new(Inner {
is_canceled: AtomicBool::new(false),
task: Lock::new(None),
});
let inner2 = inner.clone();
(
Cancel {
inner: inner,
},
Canceled {
inner: inner2,
},
)
}
pub fn cancel(&self) {
if !self.inner.is_canceled.swap(true, Ordering::SeqCst) {
if let Some(mut locked) = self.inner.task.try_lock() {
if let Some(task) = locked.take() {
task.notify();
}
}
// if we couldn't take the lock, Canceled was trying to park.
// After parking, it will check is_canceled one last time,
// so we can just stop here.
}
}
pub fn is_canceled(&self) -> bool {
self.inner.is_canceled.load(Ordering::SeqCst)
}
}
impl Canceled {
pub fn cancel(&self) {
self.inner.is_canceled.store(true, Ordering::SeqCst);
}
}
impl Future for Canceled {
type Item = ();
type Error = Never;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.inner.is_canceled.load(Ordering::SeqCst) {
Ok(Async::Ready(()))
} else {
if let Some(mut locked) = self.inner.task.try_lock() {
if locked.is_none() {
// it's possible a Cancel just tried to cancel on another thread,
// and we just missed it. Once we have the lock, we should check
// one more time before parking this task and going away.
if self.inner.is_canceled.load(Ordering::SeqCst) {
return Ok(Async::Ready(()));
}
*locked = Some(task::current());
}
Ok(Async::NotReady)
} else {
// if we couldn't take the lock, then a Cancel taken has it.
// The *ONLY* reason is because it is in the process of canceling.
Ok(Async::Ready(()))
}
}
}
}
impl Drop for Canceled {
fn drop(&mut self) {
self.cancel();
}
}
// a sub module just to protect unsafety
mod lock {
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
pub struct Lock<T> {
is_locked: AtomicBool,
value: UnsafeCell<T>,
}
impl<T> Lock<T> {
pub fn new(val: T) -> Lock<T> {
Lock {
is_locked: AtomicBool::new(false),
value: UnsafeCell::new(val),
}
}
pub fn try_lock(&self) -> Option<Locked<T>> {
if !self.is_locked.swap(true, Ordering::SeqCst) {
Some(Locked { lock: self })
} else {
None
}
}
}
unsafe impl<T: Send> Send for Lock<T> {}
unsafe impl<T: Send> Sync for Lock<T> {}
pub struct Locked<'a, T: 'a> {
lock: &'a Lock<T>,
}
impl<'a, T> Deref for Locked<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.value.get() }
}
}
impl<'a, T> DerefMut for Locked<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.lock.value.get() }
}
}
impl<'a, T> Drop for Locked<'a, T> {
fn drop(&mut self) {
self.lock.is_locked.store(false, Ordering::SeqCst);
}
}
}

487
src/client/conn.rs Normal file
View File

@@ -0,0 +1,487 @@
//! Lower-level client connection API.
//!
//! The types in thie module are to provide a lower-level API based around a
//! single connection. Connecting to a host, pooling connections, and the like
//! are not handled at this level. This module provides the building blocks to
//! customize those things externally.
//!
//! If don't have need to manage connections yourself, consider using the
//! higher-level [Client](super) API.
use std::fmt;
use std::marker::PhantomData;
use bytes::Bytes;
use futures::{Async, Future, Poll, Stream};
use futures::future::{self, Either};
use tokio_io::{AsyncRead, AsyncWrite};
use proto;
use super::{dispatch, Request, Response};
/// Returns a `Handshake` future over some IO.
///
/// This is a shortcut for `Builder::new().handshake(io)`.
pub fn handshake<T>(io: T) -> Handshake<T, ::Body>
where
T: AsyncRead + AsyncWrite,
{
Builder::new()
.handshake(io)
}
/// The sender side of an established connection.
pub struct SendRequest<B> {
dispatch: dispatch::Sender<proto::dispatch::ClientMsg<B>, ::Response>,
}
/// A future that processes all HTTP state for the IO object.
///
/// In most cases, this should just be spawned into an executor, so that it
/// can process incoming and outgoing messages, notice hangups, and the like.
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
inner: proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
B,
T,
B::Item,
proto::ClientUpgradeTransaction,
>,
}
/// A builder to configure an HTTP connection.
///
/// After setting options, the builder is used to create a `Handshake` future.
#[derive(Clone, Debug)]
pub struct Builder {
h1_writev: bool,
}
/// A future setting up HTTP over an IO object.
///
/// If successful, yields a `(SendRequest, Connection)` pair.
#[must_use = "futures do nothing unless polled"]
pub struct Handshake<T, B> {
inner: HandshakeInner<T, B, proto::ClientUpgradeTransaction>,
}
/// A future returned by `SendRequest::send_request`.
///
/// Yields a `Response` if successful.
#[must_use = "futures do nothing unless polled"]
pub struct ResponseFuture {
// for now, a Box is used to hide away the internal `B`
// that can be returned if canceled
inner: Box<Future<Item=Response, Error=::Error>>,
}
/// Deconstructed parts of a `Connection`.
///
/// This allows taking apart a `Connection` at a later time, in order to
/// reclaim the IO object, and additional related pieces.
#[derive(Debug)]
pub struct Parts<T> {
/// The original IO object used in the handshake.
pub io: T,
/// A buffer of bytes that have been read but not processed as HTTP.
///
/// For instance, if the `Connection` is used for an HTTP upgrade request,
/// it is possible the server sent back the first bytes of the new protocol
/// along with the response upgrade.
///
/// You will want to check for any existing bytes if you plan to continue
/// communicating on the IO object.
pub read_buf: Bytes,
_inner: (),
}
// internal client api
#[must_use = "futures do nothing unless polled"]
pub(super) struct HandshakeNoUpgrades<T, B> {
inner: HandshakeInner<T, B, proto::ClientTransaction>,
}
struct HandshakeInner<T, B, R> {
builder: Builder,
io: Option<T>,
_marker: PhantomData<(B, R)>,
}
// ===== impl SendRequest
impl<B> SendRequest<B>
{
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
self.dispatch.poll_ready()
}
pub(super) fn is_closed(&self) -> bool {
self.dispatch.is_closed()
}
}
impl<B> SendRequest<B>
where
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
/// Sends a `Request` on the associated connection.
///
/// Returns a future that if successful, yields the `Response`.
///
/// # Note
///
/// There are some key differences in what automatic things the `Client`
/// does for you that will not be done here:
///
/// - `Client` requires absolute-form `Uri`s, since the scheme and
/// authority are need to connect. They aren't required here.
/// - Since the `Client` requires absolute-form `Uri`s, it can add
/// the `Host` header based on it. You must add a `Host` header yourself
/// before calling this method.
/// - Since absolute-form `Uri`s are not required, if received, they will
/// be serialized as-is, irregardless of calling `Request::set_proxy`.
///
/// # Example
///
/// ```
/// # extern crate futures;
/// # extern crate hyper;
/// # use hyper::client::conn::SendRequest;
/// # use hyper::Body;
/// use futures::Future;
/// use hyper::{Method, Request};
/// use hyper::header::Host;
///
/// # fn doc(mut tx: SendRequest<Body>) {
/// // build a Request
/// let path = "/foo/bar".parse().expect("valid path");
/// let mut req = Request::new(Method::Get, path);
/// req.headers_mut().set(Host::new("hyper.rs", None));
///
/// // send it and get a future back
/// let fut = tx.send_request(req)
/// .map(|res| {
/// // got the Response
/// assert!(res.status().is_success());
/// });
/// # drop(fut);
/// # }
/// # fn main() {}
/// ```
pub fn send_request(&mut self, mut req: Request<B>) -> ResponseFuture {
// The Connection API does less things automatically than the Client
// API does. For instance, right here, we always assume set_proxy, so
// that if an absolute-form URI is provided, it is serialized as-is.
//
// Part of the reason for this is to prepare for the change to `http`
// types, where there is no more set_proxy.
//
// It's important that this method isn't called directly from the
// `Client`, so that `set_proxy` there is still respected.
req.set_proxy(true);
let inner = self.send_request_retryable(req).map_err(|e| {
let (err, _orig_req) = e;
err
});
ResponseFuture {
inner: Box::new(inner),
}
}
//TODO: replace with `impl Future` when stable
pub(crate) fn send_request_retryable(&mut self, req: Request<B>) -> Box<Future<Item=Response, Error=(::Error, Option<(::proto::RequestHead, Option<B>)>)>> {
let (head, body) = proto::request::split(req);
let inner = match self.dispatch.send((head, body)) {
Ok(rx) => {
Either::A(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
},
Err(req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
Either::B(future::err((err, Some(req))))
}
};
Box::new(inner)
}
}
/* TODO(0.12.0): when we change from tokio-service to tower.
impl<T, B> Service for SendRequest<T, B> {
type Request = Request<B>;
type Response = Response;
type Error = ::Error;
type Future = ResponseFuture;
fn call(&self, req: Self::Request) -> Self::Future {
}
}
*/
impl<B> fmt::Debug for SendRequest<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("SendRequest")
.finish()
}
}
// ===== impl Connection
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
/// Return the inner IO object, and additional information.
pub fn into_parts(self) -> Parts<T> {
let (io, read_buf) = self.inner.into_inner();
Parts {
io: io,
read_buf: read_buf,
_inner: (),
}
}
/// Poll the connection for completion, but without calling `shutdown`
/// on the underlying IO.
///
/// This is useful to allow running a connection while doing an HTTP
/// upgrade. Once the upgrade is completed, the connection would be "done",
/// but it is not desired to actally shutdown the IO object. Instead you
/// would take it back using `into_parts`.
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
self.inner.poll_without_shutdown()
}
}
impl<T, B> Future for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Item = ();
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
impl<T, B> fmt::Debug for Connection<T, B>
where
T: AsyncRead + AsyncWrite + fmt::Debug,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Connection")
.finish()
}
}
// ===== impl Builder
impl Builder {
/// Creates a new connection builder.
#[inline]
pub fn new() -> Builder {
Builder {
h1_writev: true,
}
}
pub(super) fn h1_writev(&mut self, enabled: bool) -> &mut Builder {
self.h1_writev = enabled;
self
}
/// Constructs a connection with the configured options and IO.
#[inline]
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
Handshake {
inner: HandshakeInner {
builder: self.clone(),
io: Some(io),
_marker: PhantomData,
}
}
}
pub(super) fn handshake_no_upgrades<T, B>(&self, io: T) -> HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
HandshakeNoUpgrades {
inner: HandshakeInner {
builder: self.clone(),
io: Some(io),
_marker: PhantomData,
}
}
}
}
// ===== impl Handshake
impl<T, B> Future for Handshake<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Item = (SendRequest<B>, Connection<T, B>);
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
.map(|async| {
async.map(|(tx, dispatch)| {
(tx, Connection { inner: dispatch })
})
})
}
}
impl<T, B> fmt::Debug for Handshake<T, B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Handshake")
.finish()
}
}
impl<T, B> Future for HandshakeNoUpgrades<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Item = (SendRequest<B>, proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
B,
T,
B::Item,
proto::ClientTransaction,
>);
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
impl<T, B, R> Future for HandshakeInner<T, B, R>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
R: proto::Http1Transaction<
Incoming=proto::RawStatus,
Outgoing=proto::RequestLine,
>,
{
type Item = (SendRequest<B>, proto::dispatch::Dispatcher<
proto::dispatch::Client<B>,
B,
T,
B::Item,
R,
>);
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let io = self.io.take().expect("polled more than once");
let (tx, rx) = dispatch::channel();
let mut conn = proto::Conn::new(io);
if !self.builder.h1_writev {
conn.set_write_strategy_flatten();
}
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
Ok(Async::Ready((
SendRequest {
dispatch: tx,
},
dispatch,
)))
}
}
// ===== impl ResponseFuture
impl Future for ResponseFuture {
type Item = Response;
type Error = ::Error;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
impl fmt::Debug for ResponseFuture {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ResponseFuture")
.finish()
}
}
// assert trait markers
trait AssertSend: Send {}
trait AssertSendSync: Send + Sync {}
#[doc(hidden)]
impl<B: Send> AssertSendSync for SendRequest<B> {}
#[doc(hidden)]
impl<T: Send, B: Send> AssertSend for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]> + Send,
{}
#[doc(hidden)]
impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
where
T: AsyncRead + AsyncWrite,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]> + Send + Sync,
{}
#[doc(hidden)]
impl AssertSendSync for Builder {}
// TODO: This could be done by using a dispatch channel that doesn't
// return the `B` on Error, removing the possibility of contains some !Send
// thing.
//#[doc(hidden)]
//impl AssertSend for ResponseFuture {}

View File

@@ -1,60 +1,64 @@
use futures::{Async, Future, Poll, Stream};
use futures::{Async, Poll, Stream};
use futures::sync::{mpsc, oneshot};
use common::Never;
use super::cancel::{Cancel, Canceled};
use super::signal;
pub type Callback<T, U> = oneshot::Sender<Result<U, (::Error, Option<T>)>>;
pub type Promise<T, U> = oneshot::Receiver<Result<U, (::Error, Option<T>)>>;
pub fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::unbounded();
let (cancel, canceled) = Cancel::new();
let (tx, rx) = mpsc::channel(0);
let (giver, taker) = signal::new();
let tx = Sender {
cancel: cancel,
giver: giver,
inner: tx,
};
let rx = Receiver {
canceled: canceled,
inner: rx,
taker: taker,
};
(tx, rx)
}
pub struct Sender<T, U> {
cancel: Cancel,
inner: mpsc::UnboundedSender<(T, Callback<T, U>)>,
// The Giver helps watch that the the Receiver side has been polled
// when the queue is empty. This helps us know when a request and
// response have been fully processed, and a connection is ready
// for more.
giver: signal::Giver,
inner: mpsc::Sender<(T, Callback<T, U>)>,
}
impl<T, U> Sender<T, U> {
pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
match self.inner.poll_ready() {
Ok(Async::Ready(())) => {
// there's room in the queue, but does the Connection
// want a message yet?
self.giver.poll_want()
.map_err(|_| ::Error::Closed)
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(::Error::Closed),
}
}
pub fn is_closed(&self) -> bool {
self.cancel.is_canceled()
self.giver.is_canceled()
}
pub fn cancel(&self) {
self.cancel.cancel();
}
pub fn send(&self, val: T) -> Result<Promise<T, U>, T> {
pub fn send(&mut self, val: T) -> Result<Promise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner.unbounded_send((val, tx))
self.inner.try_send((val, tx))
.map(move |_| rx)
.map_err(|e| e.into_inner().0)
}
}
impl<T, U> Clone for Sender<T, U> {
fn clone(&self) -> Sender<T, U> {
Sender {
cancel: self.cancel.clone(),
inner: self.inner.clone(),
}
}
}
pub struct Receiver<T, U> {
canceled: Canceled,
inner: mpsc::UnboundedReceiver<(T, Callback<T, U>)>,
inner: mpsc::Receiver<(T, Callback<T, U>)>,
taker: signal::Taker,
}
impl<T, U> Stream for Receiver<T, U> {
@@ -62,16 +66,20 @@ impl<T, U> Stream for Receiver<T, U> {
type Error = Never;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Async::Ready(()) = self.canceled.poll()? {
return Ok(Async::Ready(None));
match self.inner.poll() {
Ok(Async::Ready(item)) => Ok(Async::Ready(item)),
Ok(Async::NotReady) => {
self.taker.want();
Ok(Async::NotReady)
}
Err(()) => unreachable!("mpsc never errors"),
}
self.inner.poll().map_err(|()| unreachable!("mpsc never errors"))
}
}
impl<T, U> Drop for Receiver<T, U> {
fn drop(&mut self) {
self.canceled.cancel();
self.taker.cancel();
self.inner.close();
// This poll() is safe to call in `Drop`, because we've
@@ -84,8 +92,7 @@ impl<T, U> Drop for Receiver<T, U> {
// - NotReady: unreachable
// - Err: unreachable
while let Ok(Async::Ready(Some((val, cb)))) = self.inner.poll() {
// maybe in future, we pass the value along with the error?
let _ = cb.send(Err((::Error::new_canceled(None), Some(val))));
let _ = cb.send(Err((::Error::new_canceled(None::<::Error>), Some(val))));
}
}
@@ -109,7 +116,7 @@ mod tests {
future::lazy(|| {
#[derive(Debug)]
struct Custom(i32);
let (tx, rx) = super::channel::<Custom, ()>();
let (mut tx, rx) = super::channel::<Custom, ()>();
let promise = tx.send(Custom(43)).unwrap();
drop(rx);
@@ -128,8 +135,8 @@ mod tests {
#[cfg(feature = "nightly")]
#[bench]
fn cancelable_queue_throughput(b: &mut test::Bencher) {
let (tx, mut rx) = super::channel::<i32, ()>();
fn giver_queue_throughput(b: &mut test::Bencher) {
let (mut tx, mut rx) = super::channel::<i32, ()>();
b.iter(move || {
::futures::future::lazy(|| {
@@ -149,7 +156,7 @@ mod tests {
#[cfg(feature = "nightly")]
#[bench]
fn cancelable_queue_not_ready(b: &mut test::Bencher) {
fn giver_queue_not_ready(b: &mut test::Bencher) {
let (_tx, mut rx) = super::channel::<i32, ()>();
b.iter(move || {
@@ -163,11 +170,11 @@ mod tests {
#[cfg(feature = "nightly")]
#[bench]
fn cancelable_queue_cancel(b: &mut test::Bencher) {
let (tx, _rx) = super::channel::<i32, ()>();
fn giver_queue_cancel(b: &mut test::Bencher) {
let (_tx, rx) = super::channel::<i32, ()>();
b.iter(move || {
tx.cancel();
rx.taker.cancel();
})
}
}

View File

@@ -1,14 +1,14 @@
//! HTTP Client
use std::cell::Cell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use futures::{Async, Future, Poll, Stream};
use futures::future::{self, Either, Executor};
use futures::future::{self, Executor};
#[cfg(feature = "compat")]
use http;
use tokio::reactor::Handle;
@@ -28,7 +28,7 @@ pub use self::connect::{HttpConnector, Connect};
use self::background::{bg, Background};
mod cancel;
pub mod conn;
mod connect;
//TODO(easy): move cancel and dispatch into common instead
pub(crate) mod dispatch;
@@ -36,6 +36,7 @@ mod dns;
mod pool;
#[cfg(feature = "compat")]
pub mod compat;
mod signal;
#[cfg(test)]
mod tests;
@@ -44,7 +45,7 @@ pub struct Client<C, B = proto::Body> {
connector: Rc<C>,
executor: Exec,
h1_writev: bool,
pool: Pool<HyperClient<B>>,
pool: Pool<PoolClient<B>>,
retry_canceled_requests: bool,
set_host: bool,
}
@@ -191,77 +192,73 @@ where C: Connect,
//TODO: replace with `impl Future` when stable
fn send_request(&self, req: Request<B>, domain: &Uri) -> Box<Future<Item=Response, Error=ClientError<B>>> {
//fn send_request(&self, req: Request<B>, domain: &Uri) -> Box<Future<Item=Response, Error=::Error>> {
let url = req.uri().clone();
let (head, body) = request::split(req);
let checkout = self.pool.checkout(domain.as_ref());
let connect = {
let executor = self.executor.clone();
let pool = self.pool.clone();
let pool_key = Rc::new(domain.to_string());
let pool_key = Arc::new(domain.to_string());
let h1_writev = self.h1_writev;
let connector = self.connector.clone();
future::lazy(move || {
connector.connect(url)
.from_err()
.and_then(move |io| {
let (tx, rx) = dispatch::channel();
let tx = HyperClient {
conn::Builder::new()
.h1_writev(h1_writev)
.handshake_no_upgrades(io)
}).and_then(move |(tx, conn)| {
executor.execute(conn.map_err(|e| debug!("client connection error: {}", e)))?;
Ok(pool.pooled(pool_key, PoolClient {
tx: tx,
should_close: Cell::new(true),
};
let pooled = pool.pooled(pool_key, tx);
let mut conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
if !h1_writev {
conn.set_write_strategy_flatten();
}
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?;
Ok(pooled)
}))
})
})
};
let race = checkout.select(connect)
.map(|(client, _work)| client)
.map(|(pooled, _work)| pooled)
.map_err(|(e, _checkout)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
ClientError::Normal(e.into())
ClientError::Normal(e)
});
let resp = race.and_then(move |client| {
let conn_reused = client.is_reused();
match client.tx.send((head, body)) {
Ok(rx) => {
client.should_close.set(false);
Either::A(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err((err, orig_req))) => Err(match orig_req {
Some(req) => ClientError::Canceled {
connection_reused: conn_reused,
reason: err,
req: req,
},
None => ClientError::Normal(err),
}),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
let executor = self.executor.clone();
let resp = race.and_then(move |mut pooled| {
let conn_reused = pooled.is_reused();
let fut = pooled.tx.send_request_retryable(req)
.map_err(move |(err, orig_req)| {
if let Some(req) = orig_req {
ClientError::Canceled {
connection_reused: conn_reused,
reason: err,
req: req,
}
}))
},
Err(req) => {
debug!("pooled connection was not ready");
let err = ClientError::Canceled {
connection_reused: conn_reused,
reason: ::Error::new_canceled(None),
req: req,
};
Either::B(future::err(err))
}
}
} else {
ClientError::Normal(err)
}
});
// when pooled is dropped, it will try to insert back into the
// pool. To delay that, spawn a future that completes once the
// sender is ready again.
//
// This *should* only be once the related `Connection` has polled
// for a new request to start.
//
// If the executor doesn't have room, oh well. Things will likely
// be blowing up soon, but this specific task isn't required.
let _ = executor.execute(future::poll_fn(move || {
pooled.tx.poll_ready().map_err(|_| ())
}));
fut
});
Box::new(resp)
@@ -373,35 +370,19 @@ where
}
}
struct HyperClient<B> {
should_close: Cell<bool>,
tx: dispatch::Sender<proto::dispatch::ClientMsg<B>, ::Response>,
struct PoolClient<B> {
tx: conn::SendRequest<B>,
}
impl<B> Clone for HyperClient<B> {
fn clone(&self) -> HyperClient<B> {
HyperClient {
tx: self.tx.clone(),
should_close: self.should_close.clone(),
}
}
}
impl<B> self::pool::Closed for HyperClient<B> {
impl<B> self::pool::Closed for PoolClient<B>
where
B: 'static,
{
fn is_closed(&self) -> bool {
self.tx.is_closed()
}
}
impl<B> Drop for HyperClient<B> {
fn drop(&mut self) {
if self.should_close.get() {
self.should_close.set(false);
self.tx.cancel();
}
}
}
pub(crate) enum ClientError<B> {
Normal(::Error),
Canceled {

View File

@@ -1,19 +1,15 @@
use std::cell::{Cell, RefCell};
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io;
use std::ops::{Deref, DerefMut, BitAndAssign};
use std::rc::{Rc, Weak};
use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use futures::{Future, Async, Poll, Stream};
use futures::sync::oneshot;
use tokio::reactor::{Handle, Interval};
use relay;
use proto::{KeepAlive, KA};
pub struct Pool<T> {
inner: Rc<RefCell<PoolInner<T>>>,
inner: Arc<Mutex<PoolInner<T>>>,
}
// Before using a pooled connection, make sure the sender is not dead.
@@ -29,7 +25,7 @@ struct PoolInner<T> {
enabled: bool,
// These are internal Conns sitting in the event loop in the KeepAlive
// state, waiting to receive a new Request to send on the socket.
idle: HashMap<Rc<String>, Vec<Entry<T>>>,
idle: HashMap<Arc<String>, Vec<Idle<T>>>,
// These are outstanding Checkouts that are waiting for a socket to be
// able to send a Request one. This is used when "racing" for a new
// connection.
@@ -39,7 +35,7 @@ struct PoolInner<T> {
// this list is checked for any parked Checkouts, and tries to notify
// them that the Conn could be used instead of waiting for a brand new
// connection.
parked: HashMap<Rc<String>, VecDeque<relay::Sender<Entry<T>>>>,
parked: HashMap<Arc<String>, VecDeque<oneshot::Sender<T>>>,
timeout: Option<Duration>,
// Used to prevent multiple intervals from being spawned to clear
// expired connections.
@@ -49,10 +45,10 @@ struct PoolInner<T> {
expired_timer_spawned: bool,
}
impl<T: Clone + Closed> Pool<T> {
impl<T> Pool<T> {
pub fn new(enabled: bool, timeout: Option<Duration>) -> Pool<T> {
Pool {
inner: Rc::new(RefCell::new(PoolInner {
inner: Arc::new(Mutex::new(PoolInner {
enabled: enabled,
idle: HashMap::new(),
parked: HashMap::new(),
@@ -61,74 +57,33 @@ impl<T: Clone + Closed> Pool<T> {
})),
}
}
}
impl<T: Closed> Pool<T> {
pub fn checkout(&self, key: &str) -> Checkout<T> {
Checkout {
key: Rc::new(key.to_owned()),
key: Arc::new(key.to_owned()),
pool: self.clone(),
parked: None,
}
}
fn put(&mut self, key: Rc<String>, entry: Entry<T>) {
trace!("Pool::put {:?}", key);
let mut inner = self.inner.borrow_mut();
let mut remove_parked = false;
let mut entry = Some(entry);
if let Some(parked) = inner.parked.get_mut(&key) {
while let Some(tx) = parked.pop_front() {
if tx.is_canceled() {
trace!("Pool::put removing canceled parked {:?}", key);
} else {
tx.complete(entry.take().unwrap());
break;
}
/*
match tx.send(entry.take().unwrap()) {
Ok(()) => break,
Err(e) => {
trace!("Pool::put removing canceled parked {:?}", key);
entry = Some(e);
}
}
*/
}
remove_parked = parked.is_empty();
}
if remove_parked {
inner.parked.remove(&key);
}
match entry {
Some(entry) => {
debug!("pooling idle connection for {:?}", key);
inner.idle.entry(key)
.or_insert(Vec::new())
.push(entry);
}
None => trace!("Pool::put found parked {:?}", key),
}
}
fn take(&self, key: &Rc<String>) -> Option<Pooled<T>> {
fn take(&self, key: &Arc<String>) -> Option<Pooled<T>> {
let entry = {
let mut inner = self.inner.borrow_mut();
let mut inner = self.inner.lock().unwrap();
let expiration = Expiration::new(inner.timeout);
let mut should_remove = false;
let entry = inner.idle.get_mut(key).and_then(|list| {
trace!("take; url = {:?}, expiration = {:?}", key, expiration.0);
while let Some(entry) = list.pop() {
match entry.status.get() {
TimedKA::Idle(idle_at) if !expiration.expires(idle_at) => {
if !entry.value.is_closed() {
should_remove = list.is_empty();
return Some(entry);
}
},
_ => {},
if !expiration.expires(entry.idle_at) {
if !entry.value.is_closed() {
should_remove = list.is_empty();
return Some(entry);
}
}
trace!("removing unacceptable pooled {:?}", key);
// every other case the Entry should just be dropped
// every other case the Idle should just be dropped
// 1. Idle but expired
// 2. Busy (something else somehow took it?)
// 3. Disabled don't reuse of course
@@ -143,72 +98,102 @@ impl<T: Clone + Closed> Pool<T> {
entry
};
entry.map(|e| self.reuse(key, e))
entry.map(|e| self.reuse(key, e.value))
}
pub fn pooled(&self, key: Rc<String>, value: T) -> Pooled<T> {
pub fn pooled(&self, key: Arc<String>, value: T) -> Pooled<T> {
Pooled {
entry: Entry {
value: value,
is_reused: false,
status: Rc::new(Cell::new(TimedKA::Busy)),
},
is_reused: false,
key: key,
pool: Rc::downgrade(&self.inner),
pool: Arc::downgrade(&self.inner),
value: Some(value)
}
}
fn is_enabled(&self) -> bool {
self.inner.borrow().enabled
}
fn reuse(&self, key: &Rc<String>, mut entry: Entry<T>) -> Pooled<T> {
fn reuse(&self, key: &Arc<String>, value: T) -> Pooled<T> {
debug!("reuse idle connection for {:?}", key);
entry.is_reused = true;
entry.status.set(TimedKA::Busy);
Pooled {
entry: entry,
is_reused: true,
key: key.clone(),
pool: Rc::downgrade(&self.inner),
pool: Arc::downgrade(&self.inner),
value: Some(value),
}
}
fn park(&mut self, key: Rc<String>, tx: relay::Sender<Entry<T>>) {
fn park(&mut self, key: Arc<String>, tx: oneshot::Sender<T>) {
trace!("park; waiting for idle connection: {:?}", key);
self.inner.borrow_mut()
self.inner.lock().unwrap()
.parked.entry(key)
.or_insert(VecDeque::new())
.push_back(tx);
}
}
impl<T> Pool<T> {
impl<T: Closed> PoolInner<T> {
fn put(&mut self, key: Arc<String>, value: T) {
if !self.enabled {
return;
}
trace!("Pool::put {:?}", key);
let mut remove_parked = false;
let mut value = Some(value);
if let Some(parked) = self.parked.get_mut(&key) {
while let Some(tx) = parked.pop_front() {
if !tx.is_canceled() {
match tx.send(value.take().unwrap()) {
Ok(()) => break,
Err(e) => {
value = Some(e);
}
}
}
trace!("Pool::put removing canceled parked {:?}", key);
}
remove_parked = parked.is_empty();
}
if remove_parked {
self.parked.remove(&key);
}
match value {
Some(value) => {
debug!("pooling idle connection for {:?}", key);
self.idle.entry(key)
.or_insert(Vec::new())
.push(Idle {
value: value,
idle_at: Instant::now(),
});
}
None => trace!("Pool::put found parked {:?}", key),
}
}
}
impl<T> PoolInner<T> {
/// Any `FutureResponse`s that were created will have made a `Checkout`,
/// and possibly inserted into the pool that it is waiting for an idle
/// connection. If a user ever dropped that future, we need to clean out
/// those parked senders.
fn clean_parked(&mut self, key: &Rc<String>) {
let mut inner = self.inner.borrow_mut();
fn clean_parked(&mut self, key: &Arc<String>) {
let mut remove_parked = false;
if let Some(parked) = inner.parked.get_mut(key) {
if let Some(parked) = self.parked.get_mut(key) {
parked.retain(|tx| {
!tx.is_canceled()
});
remove_parked = parked.is_empty();
}
if remove_parked {
inner.parked.remove(key);
self.parked.remove(key);
}
}
}
impl<T: Closed> Pool<T> {
fn clear_expired(&self) {
let mut inner = self.inner.borrow_mut();
let dur = if let Some(dur) = inner.timeout {
impl<T: Closed> PoolInner<T> {
fn clear_expired(&mut self) {
let dur = if let Some(dur) = self.timeout {
dur
} else {
return
@@ -217,19 +202,13 @@ impl<T: Closed> Pool<T> {
let now = Instant::now();
//self.last_idle_check_at = now;
inner.idle.retain(|_key, values| {
self.idle.retain(|_key, values| {
values.retain(|val| {
if val.value.is_closed() {
values.retain(|entry| {
if entry.value.is_closed() {
return false;
}
match val.status.get() {
TimedKA::Idle(idle_at) if now - idle_at < dur => {
true
},
_ => false,
}
//now - val.idle_at < dur
now - entry.idle_at < dur
});
// returning false evicts this key/val
@@ -241,28 +220,30 @@ impl<T: Closed> Pool<T> {
impl<T: Closed + 'static> Pool<T> {
pub(super) fn spawn_expired_interval(&self, handle: &Handle) {
let mut inner = self.inner.borrow_mut();
let dur = {
let mut inner = self.inner.lock().unwrap();
if !inner.enabled {
return;
}
if !inner.enabled {
return;
}
if inner.expired_timer_spawned {
return;
}
inner.expired_timer_spawned = true;
if inner.expired_timer_spawned {
return;
}
inner.expired_timer_spawned = true;
let dur = if let Some(dur) = inner.timeout {
dur
} else {
return
if let Some(dur) = inner.timeout {
dur
} else {
return
}
};
let interval = Interval::new(dur, handle)
.expect("reactor is gone");
handle.spawn(IdleInterval {
interval: interval,
pool: Rc::downgrade(&self.inner),
pool: Arc::downgrade(&self.inner),
});
}
}
@@ -275,121 +256,83 @@ impl<T> Clone for Pool<T> {
}
}
#[derive(Clone)]
pub struct Pooled<T> {
entry: Entry<T>,
key: Rc<String>,
pool: Weak<RefCell<PoolInner<T>>>,
pub struct Pooled<T: Closed> {
value: Option<T>,
is_reused: bool,
key: Arc<String>,
pool: Weak<Mutex<PoolInner<T>>>,
}
impl<T> Pooled<T> {
impl<T: Closed> Pooled<T> {
pub fn is_reused(&self) -> bool {
self.entry.is_reused
self.is_reused
}
fn as_ref(&self) -> &T {
self.value.as_ref().expect("not dropped")
}
fn as_mut(&mut self) -> &mut T {
self.value.as_mut().expect("not dropped")
}
}
impl<T> Deref for Pooled<T> {
impl<T: Closed> Deref for Pooled<T> {
type Target = T;
fn deref(&self) -> &T {
&self.entry.value
self.as_ref()
}
}
impl<T> DerefMut for Pooled<T> {
impl<T: Closed> DerefMut for Pooled<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.entry.value
self.as_mut()
}
}
impl<T: Clone + Closed> KeepAlive for Pooled<T> {
fn busy(&mut self) {
self.entry.status.set(TimedKA::Busy);
}
fn disable(&mut self) {
self.entry.status.set(TimedKA::Disabled);
}
fn idle(&mut self) {
let previous = self.status();
self.entry.status.set(TimedKA::Idle(Instant::now()));
if let KA::Idle = previous {
trace!("Pooled::idle already idle");
return;
}
self.entry.is_reused = true;
if let Some(inner) = self.pool.upgrade() {
let mut pool = Pool {
inner: inner,
};
if pool.is_enabled() {
pool.put(self.key.clone(), self.entry.clone());
impl<T: Closed> Drop for Pooled<T> {
fn drop(&mut self) {
if let Some(value) = self.value.take() {
if let Some(inner) = self.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
inner.put(self.key.clone(), value);
}
} else {
trace!("keepalive disabled, dropping pooled ({:?})", self.key);
self.disable();
trace!("pool dropped, dropping pooled ({:?})", self.key);
}
} else {
trace!("pool dropped, dropping pooled ({:?})", self.key);
self.disable();
}
}
fn status(&self) -> KA {
match self.entry.status.get() {
TimedKA::Idle(_) => KA::Idle,
TimedKA::Busy => KA::Busy,
TimedKA::Disabled => KA::Disabled,
}
}
}
impl<T> fmt::Debug for Pooled<T> {
impl<T: Closed> fmt::Debug for Pooled<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Pooled")
.field("status", &self.entry.status.get())
.field("key", &self.key)
.finish()
}
}
impl<T: Clone + Closed> BitAndAssign<bool> for Pooled<T> {
fn bitand_assign(&mut self, enabled: bool) {
if !enabled {
self.disable();
}
}
}
#[derive(Clone)]
struct Entry<T> {
struct Idle<T> {
idle_at: Instant,
value: T,
is_reused: bool,
status: Rc<Cell<TimedKA>>,
}
#[derive(Clone, Copy, Debug)]
enum TimedKA {
Idle(Instant),
Busy,
Disabled,
}
pub struct Checkout<T> {
key: Rc<String>,
key: Arc<String>,
pool: Pool<T>,
parked: Option<relay::Receiver<Entry<T>>>,
parked: Option<oneshot::Receiver<T>>,
}
struct NotParked;
impl<T: Clone + Closed> Checkout<T> {
impl<T: Closed> Checkout<T> {
fn poll_parked(&mut self) -> Poll<Pooled<T>, NotParked> {
let mut drop_parked = false;
if let Some(ref mut rx) = self.parked {
match rx.poll() {
Ok(Async::Ready(entry)) => {
if !entry.value.is_closed() {
return Ok(Async::Ready(self.pool.reuse(&self.key, entry)));
Ok(Async::Ready(value)) => {
if !value.is_closed() {
return Ok(Async::Ready(self.pool.reuse(&self.key, value)));
}
drop_parked = true;
},
@@ -405,7 +348,7 @@ impl<T: Clone + Closed> Checkout<T> {
fn park(&mut self) {
if self.parked.is_none() {
let (tx, mut rx) = relay::channel();
let (tx, mut rx) = oneshot::channel();
let _ = rx.poll(); // park this task
self.pool.park(self.key.clone(), tx);
self.parked = Some(rx);
@@ -413,9 +356,9 @@ impl<T: Clone + Closed> Checkout<T> {
}
}
impl<T: Clone + Closed> Future for Checkout<T> {
impl<T: Closed> Future for Checkout<T> {
type Item = Pooled<T>;
type Error = io::Error;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.poll_parked() {
@@ -437,7 +380,9 @@ impl<T: Clone + Closed> Future for Checkout<T> {
impl<T> Drop for Checkout<T> {
fn drop(&mut self) {
self.parked.take();
self.pool.clean_parked(&self.key);
if let Ok(mut inner) = self.pool.inner.lock() {
inner.clean_parked(&self.key);
}
}
}
@@ -458,7 +403,7 @@ impl Expiration {
struct IdleInterval<T> {
interval: Interval,
pool: Weak<RefCell<PoolInner<T>>>,
pool: Weak<Mutex<PoolInner<T>>>,
}
impl<T: Closed + 'static> Future for IdleInterval<T> {
@@ -470,22 +415,22 @@ impl<T: Closed + 'static> Future for IdleInterval<T> {
try_ready!(self.interval.poll().map_err(|_| unreachable!("interval cannot error")));
if let Some(inner) = self.pool.upgrade() {
let pool = Pool { inner: inner };
pool.clear_expired();
} else {
return Ok(Async::Ready(()));
if let Ok(mut inner) = inner.lock() {
inner.clear_expired();
continue;
}
}
return Ok(Async::Ready(()));
}
}
}
#[cfg(test)]
mod tests {
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use futures::{Async, Future};
use futures::future;
use proto::KeepAlive;
use super::{Closed, Pool};
impl Closed for i32 {
@@ -497,9 +442,10 @@ mod tests {
#[test]
fn test_pool_checkout_smoke() {
let pool = Pool::new(true, Some(Duration::from_secs(5)));
let key = Rc::new("foo".to_string());
let mut pooled = pool.pooled(key.clone(), 41);
pooled.idle();
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
drop(pooled);
match pool.checkout(&key).poll().unwrap() {
Async::Ready(pooled) => assert_eq!(*pooled, 41),
@@ -510,11 +456,11 @@ mod tests {
#[test]
fn test_pool_checkout_returns_none_if_expired() {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_secs(1)));
let key = Rc::new("foo".to_string());
let mut pooled = pool.pooled(key.clone(), 41);
pooled.idle();
::std::thread::sleep(pool.inner.borrow().timeout.unwrap());
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
drop(pooled);
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
assert!(pool.checkout(&key).poll().unwrap().is_not_ready());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();
@@ -522,26 +468,23 @@ mod tests {
#[test]
fn test_pool_checkout_removes_expired() {
let pool = Pool::new(true, Some(Duration::from_secs(1)));
let key = Rc::new("foo".to_string());
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_millis(100)));
let key = Arc::new("foo".to_string());
let mut pooled1 = pool.pooled(key.clone(), 41);
pooled1.idle();
let mut pooled2 = pool.pooled(key.clone(), 5);
pooled2.idle();
let mut pooled3 = pool.pooled(key.clone(), 99);
pooled3.idle();
pool.pooled(key.clone(), 41);
pool.pooled(key.clone(), 5);
pool.pooled(key.clone(), 99);
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
::std::thread::sleep(pool.inner.lock().unwrap().timeout.unwrap());
assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3));
::std::thread::sleep(pool.inner.borrow().timeout.unwrap());
// checkout.poll() should clean out the expired
pool.checkout(&key).poll().unwrap();
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
pooled1.idle();
pooled2.idle(); // idle after sleep, not expired
pool.checkout(&key).poll().unwrap();
assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(1));
pool.checkout(&key).poll().unwrap();
assert!(pool.inner.borrow().idle.get(&key).is_none());
Ok::<(), ()>(())
}).wait().unwrap();
}
#[test]
@@ -549,16 +492,13 @@ mod tests {
let mut core = ::tokio::reactor::Core::new().unwrap();
let pool = Pool::new(true, Some(Duration::from_millis(100)));
pool.spawn_expired_interval(&core.handle());
let key = Rc::new("foo".to_string());
let key = Arc::new("foo".to_string());
let mut pooled1 = pool.pooled(key.clone(), 41);
pooled1.idle();
let mut pooled2 = pool.pooled(key.clone(), 5);
pooled2.idle();
let mut pooled3 = pool.pooled(key.clone(), 99);
pooled3.idle();
pool.pooled(key.clone(), 41);
pool.pooled(key.clone(), 5);
pool.pooled(key.clone(), 99);
assert_eq!(pool.inner.borrow().idle.get(&key).map(|entries| entries.len()), Some(3));
assert_eq!(pool.inner.lock().unwrap().idle.get(&key).map(|entries| entries.len()), Some(3));
let timeout = ::tokio::reactor::Timeout::new(
Duration::from_millis(400), // allow for too-good resolution
@@ -566,49 +506,48 @@ mod tests {
).unwrap();
core.run(timeout).unwrap();
assert!(pool.inner.borrow().idle.get(&key).is_none());
assert!(pool.inner.lock().unwrap().idle.get(&key).is_none());
}
#[test]
fn test_pool_checkout_task_unparked() {
let pool = Pool::new(true, Some(Duration::from_secs(10)));
let key = Rc::new("foo".to_string());
let pooled1 = pool.pooled(key.clone(), 41);
let key = Arc::new("foo".to_string());
let pooled = pool.pooled(key.clone(), 41);
let mut pooled = pooled1.clone();
let checkout = pool.checkout(&key).join(future::lazy(move || {
// the checkout future will park first,
// and then this lazy future will be polled, which will insert
// the pooled back into the pool
//
// this test makes sure that doing so will unpark the checkout
pooled.idle();
drop(pooled);
Ok(())
})).map(|(entry, _)| entry);
assert_eq!(*checkout.wait().unwrap(), *pooled1);
assert_eq!(*checkout.wait().unwrap(), 41);
}
#[test]
fn test_pool_checkout_drop_cleans_up_parked() {
future::lazy(|| {
let pool = Pool::new(true, Some(Duration::from_secs(10)));
let key = Rc::new("localhost:12345".to_string());
let _pooled1 = pool.pooled(key.clone(), 41);
let pool = Pool::<i32>::new(true, Some(Duration::from_secs(10)));
let key = Arc::new("localhost:12345".to_string());
let mut checkout1 = pool.checkout(&key);
let mut checkout2 = pool.checkout(&key);
// first poll needed to get into Pool's parked
checkout1.poll().unwrap();
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1);
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 1);
checkout2.poll().unwrap();
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 2);
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 2);
// on drop, clean up Pool
drop(checkout1);
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1);
assert_eq!(pool.inner.lock().unwrap().parked.get(&key).unwrap().len(), 1);
drop(checkout2);
assert!(pool.inner.borrow().parked.get(&key).is_none());
assert!(pool.inner.lock().unwrap().parked.get(&key).is_none());
::futures::future::ok::<(), ()>(())
}).wait().unwrap();

188
src/client/signal.rs Normal file
View File

@@ -0,0 +1,188 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::{Async, Poll};
use futures::task::{self, Task};
use self::lock::Lock;
pub fn new() -> (Giver, Taker) {
let inner = Arc::new(Inner {
state: AtomicUsize::new(STATE_IDLE),
task: Lock::new(None),
});
let inner2 = inner.clone();
(
Giver {
inner: inner,
},
Taker {
inner: inner2,
},
)
}
#[derive(Clone)]
pub struct Giver {
inner: Arc<Inner>,
}
pub struct Taker {
inner: Arc<Inner>,
}
const STATE_IDLE: usize = 0;
const STATE_WANT: usize = 1;
const STATE_GIVE: usize = 2;
const STATE_CLOSED: usize = 3;
struct Inner {
state: AtomicUsize,
task: Lock<Option<Task>>,
}
impl Giver {
pub fn poll_want(&mut self) -> Poll<(), ()> {
loop {
let state = self.inner.state.load(Ordering::SeqCst);
match state {
STATE_WANT => {
// only set to IDLE if it is still Want
self.inner.state.compare_and_swap(
STATE_WANT,
STATE_IDLE,
Ordering::SeqCst,
);
return Ok(Async::Ready(()))
},
STATE_GIVE => {
// we're already waiting, return
return Ok(Async::NotReady)
}
STATE_CLOSED => return Err(()),
// Taker doesn't want anything yet, so park.
_ => {
if let Some(mut locked) = self.inner.task.try_lock() {
// While we have the lock, try to set to GIVE.
let old = self.inner.state.compare_and_swap(
STATE_IDLE,
STATE_GIVE,
Ordering::SeqCst,
);
// If it's not still IDLE, something happened!
// Go around the loop again.
if old == STATE_IDLE {
*locked = Some(task::current());
return Ok(Async::NotReady)
}
} else {
// if we couldn't take the lock, then a Taker has it.
// The *ONLY* reason is because it is in the process of notifying us
// of its want.
//
// We need to loop again to see what state it was changed to.
}
},
}
}
}
pub fn is_canceled(&self) -> bool {
self.inner.state.load(Ordering::SeqCst) == STATE_CLOSED
}
}
impl Taker {
pub fn cancel(&self) {
self.signal(STATE_CLOSED)
}
pub fn want(&self) {
self.signal(STATE_WANT)
}
fn signal(&self, state: usize) {
let old_state = self.inner.state.swap(state, Ordering::SeqCst);
match old_state {
STATE_WANT | STATE_CLOSED | STATE_IDLE => (),
_ => {
loop {
if let Some(mut locked) = self.inner.task.try_lock() {
if let Some(task) = locked.take() {
task.notify();
}
return;
} else {
// if we couldn't take the lock, then a Giver has it.
// The *ONLY* reason is because it is in the process of parking.
//
// We need to loop and take the lock so we can notify this task.
}
}
},
}
}
}
impl Drop for Taker {
fn drop(&mut self) {
self.cancel();
}
}
// a sub module just to protect unsafety
mod lock {
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
pub struct Lock<T> {
is_locked: AtomicBool,
value: UnsafeCell<T>,
}
impl<T> Lock<T> {
pub fn new(val: T) -> Lock<T> {
Lock {
is_locked: AtomicBool::new(false),
value: UnsafeCell::new(val),
}
}
pub fn try_lock(&self) -> Option<Locked<T>> {
if !self.is_locked.swap(true, Ordering::SeqCst) {
Some(Locked { lock: self })
} else {
None
}
}
}
unsafe impl<T: Send> Send for Lock<T> {}
unsafe impl<T: Send> Sync for Lock<T> {}
pub struct Locked<'a, T: 'a> {
lock: &'a Lock<T>,
}
impl<'a, T> Deref for Locked<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.lock.value.get() }
}
}
impl<'a, T> DerefMut for Locked<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.lock.value.get() }
}
}
impl<'a, T> Drop for Locked<'a, T> {
fn drop(&mut self) {
self.lock.is_locked.store(false, Ordering::SeqCst);
}
}
}

View File

@@ -45,3 +45,47 @@ fn retryable_request() {
core.run(res2.join(srv2)).expect("res2");
}
#[test]
fn conn_reset_after_write() {
let _ = pretty_env_logger::try_init();
let mut core = Core::new().unwrap();
let mut connector = MockConnector::new();
let sock1 = connector.mock("http://mock.local/a");
let client = Client::configure()
.connector(connector)
.build(&core.handle());
{
let res1 = client.get("http://mock.local/a".parse().unwrap());
let srv1 = poll_fn(|| {
try_ready!(sock1.read(&mut [0u8; 512]));
try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"));
Ok(Async::Ready(()))
});
core.run(res1.join(srv1)).expect("res1");
}
let res2 = client.get("http://mock.local/a".parse().unwrap());
let mut sock1 = Some(sock1);
let srv2 = poll_fn(|| {
// We purposefully keep the socket open until the client
// has written the second request, and THEN disconnect.
//
// Not because we expect servers to be jerks, but to trigger
// state where we write on an assumedly good connetion, and
// only reset the close AFTER we wrote bytes.
try_ready!(sock1.as_mut().unwrap().read(&mut [0u8; 512]));
sock1.take();
Ok(Async::Ready(()))
});
let err = core.run(res2.join(srv2)).expect_err("res2");
match err {
::Error::Incomplete => (),
other => panic!("expected Incomplete, found {:?}", other)
}
}