refactor(client): change last Weak::new to an Option<Weak>
This commit is contained in:
@@ -50,11 +50,6 @@ type Key = (Arc<String>, Ver);
|
|||||||
struct PoolInner<T> {
|
struct PoolInner<T> {
|
||||||
connections: Mutex<Connections<T>>,
|
connections: Mutex<Connections<T>>,
|
||||||
enabled: bool,
|
enabled: bool,
|
||||||
/// A single Weak pointer used every time a proper weak reference
|
|
||||||
/// is not needed. This prevents allocating space in the heap to hold
|
|
||||||
/// a PoolInner<T> *every single time*, and instead we just allocate
|
|
||||||
/// this one extra per pool.
|
|
||||||
weak: Weak<PoolInner<T>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Connections<T> {
|
struct Connections<T> {
|
||||||
@@ -84,6 +79,10 @@ struct Connections<T> {
|
|||||||
timeout: Option<Duration>,
|
timeout: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is because `Weak::new()` *allocates* space for `T`, even if it
|
||||||
|
// doesn't need it!
|
||||||
|
struct WeakOpt<T>(Option<Weak<T>>);
|
||||||
|
|
||||||
impl<T> Pool<T> {
|
impl<T> Pool<T> {
|
||||||
pub fn new(enabled: bool, timeout: Option<Duration>, __exec: &Exec) -> Pool<T> {
|
pub fn new(enabled: bool, timeout: Option<Duration>, __exec: &Exec) -> Pool<T> {
|
||||||
Pool {
|
Pool {
|
||||||
@@ -99,7 +98,6 @@ impl<T> Pool<T> {
|
|||||||
timeout,
|
timeout,
|
||||||
}),
|
}),
|
||||||
enabled,
|
enabled,
|
||||||
weak: Weak::new(),
|
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -136,7 +134,7 @@ impl<T: Poolable> Pool<T> {
|
|||||||
if inner.connecting.insert(key.clone()) {
|
if inner.connecting.insert(key.clone()) {
|
||||||
let connecting = Connecting {
|
let connecting = Connecting {
|
||||||
key: key.clone(),
|
key: key.clone(),
|
||||||
pool: Arc::downgrade(&self.inner),
|
pool: WeakOpt::downgrade(&self.inner),
|
||||||
};
|
};
|
||||||
Some(connecting)
|
Some(connecting)
|
||||||
} else {
|
} else {
|
||||||
@@ -148,7 +146,7 @@ impl<T: Poolable> Pool<T> {
|
|||||||
key: key.clone(),
|
key: key.clone(),
|
||||||
// in HTTP/1's case, there is never a lock, so we don't
|
// in HTTP/1's case, there is never a lock, so we don't
|
||||||
// need to do anything in Drop.
|
// need to do anything in Drop.
|
||||||
pool: self.inner.weak.clone(),
|
pool: WeakOpt::none(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -189,7 +187,7 @@ impl<T: Poolable> Pool<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
|
pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> {
|
||||||
let (value, pool_ref, has_pool) = if self.inner.enabled {
|
let (value, pool_ref) = if self.inner.enabled {
|
||||||
match value.reserve() {
|
match value.reserve() {
|
||||||
Reservation::Shared(to_insert, to_return) => {
|
Reservation::Shared(to_insert, to_return) => {
|
||||||
debug_assert_eq!(
|
debug_assert_eq!(
|
||||||
@@ -203,17 +201,17 @@ impl<T: Poolable> Pool<T> {
|
|||||||
// already have a lock, no need to lock the mutex twice.
|
// already have a lock, no need to lock the mutex twice.
|
||||||
inner.connected(&connecting.key);
|
inner.connected(&connecting.key);
|
||||||
// prevent the Drop of Connecting from repeating inner.connected()
|
// prevent the Drop of Connecting from repeating inner.connected()
|
||||||
connecting.pool = self.inner.weak.clone();
|
connecting.pool = WeakOpt::none();
|
||||||
|
|
||||||
// Shared reservations don't need a reference to the pool,
|
// Shared reservations don't need a reference to the pool,
|
||||||
// since the pool always keeps a copy.
|
// since the pool always keeps a copy.
|
||||||
(to_return, self.inner.weak.clone(), false)
|
(to_return, WeakOpt::none())
|
||||||
},
|
},
|
||||||
Reservation::Unique(value) => {
|
Reservation::Unique(value) => {
|
||||||
// Unique reservations must take a reference to the pool
|
// Unique reservations must take a reference to the pool
|
||||||
// since they hope to reinsert once the reservation is
|
// since they hope to reinsert once the reservation is
|
||||||
// completed
|
// completed
|
||||||
(value, Arc::downgrade(&self.inner), true)
|
(value, WeakOpt::downgrade(&self.inner))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -222,11 +220,10 @@ impl<T: Poolable> Pool<T> {
|
|||||||
// The Connecting should have had no pool ref
|
// The Connecting should have had no pool ref
|
||||||
debug_assert!(connecting.pool.upgrade().is_none());
|
debug_assert!(connecting.pool.upgrade().is_none());
|
||||||
|
|
||||||
(value, self.inner.weak.clone(), false)
|
(value, WeakOpt::none())
|
||||||
};
|
};
|
||||||
Pooled {
|
Pooled {
|
||||||
key: connecting.key.clone(),
|
key: connecting.key.clone(),
|
||||||
has_pool,
|
|
||||||
is_reused: false,
|
is_reused: false,
|
||||||
pool: pool_ref,
|
pool: pool_ref,
|
||||||
value: Some(value)
|
value: Some(value)
|
||||||
@@ -243,14 +240,13 @@ impl<T: Poolable> Pool<T> {
|
|||||||
// we just have the final value, without knowledge of if this is
|
// we just have the final value, without knowledge of if this is
|
||||||
// unique or shared. So, the hack is to just assume Ver::Http2 means
|
// unique or shared. So, the hack is to just assume Ver::Http2 means
|
||||||
// shared... :(
|
// shared... :(
|
||||||
let (pool_ref, has_pool) = if key.1 == Ver::Http2 {
|
let pool_ref = if key.1 == Ver::Http2 {
|
||||||
(self.inner.weak.clone(), false)
|
WeakOpt::none()
|
||||||
} else {
|
} else {
|
||||||
(Arc::downgrade(&self.inner), true)
|
WeakOpt::downgrade(&self.inner)
|
||||||
};
|
};
|
||||||
|
|
||||||
Pooled {
|
Pooled {
|
||||||
has_pool,
|
|
||||||
is_reused: true,
|
is_reused: true,
|
||||||
key: key.clone(),
|
key: key.clone(),
|
||||||
pool: pool_ref,
|
pool: pool_ref,
|
||||||
@@ -414,7 +410,7 @@ impl<T: Poolable> Connections<T> {
|
|||||||
let interval = Interval::new(start, dur);
|
let interval = Interval::new(start, dur);
|
||||||
self.exec.execute(IdleInterval {
|
self.exec.execute(IdleInterval {
|
||||||
interval: interval,
|
interval: interval,
|
||||||
pool: Arc::downgrade(pool_ref),
|
pool: WeakOpt::downgrade(pool_ref),
|
||||||
pool_drop_notifier: rx,
|
pool_drop_notifier: rx,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -481,10 +477,9 @@ impl<T> Clone for Pool<T> {
|
|||||||
// Note: The bounds `T: Poolable` is needed for the Drop impl.
|
// Note: The bounds `T: Poolable` is needed for the Drop impl.
|
||||||
pub(super) struct Pooled<T: Poolable> {
|
pub(super) struct Pooled<T: Poolable> {
|
||||||
value: Option<T>,
|
value: Option<T>,
|
||||||
has_pool: bool,
|
|
||||||
is_reused: bool,
|
is_reused: bool,
|
||||||
key: Key,
|
key: Key,
|
||||||
pool: Weak<PoolInner<T>>,
|
pool: WeakOpt<PoolInner<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Poolable> Pooled<T> {
|
impl<T: Poolable> Pooled<T> {
|
||||||
@@ -493,7 +488,7 @@ impl<T: Poolable> Pooled<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_pool_enabled(&self) -> bool {
|
pub fn is_pool_enabled(&self) -> bool {
|
||||||
self.has_pool
|
self.pool.0.is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn as_ref(&self) -> &T {
|
fn as_ref(&self) -> &T {
|
||||||
@@ -628,7 +623,7 @@ impl<T> Drop for Checkout<T> {
|
|||||||
|
|
||||||
pub(super) struct Connecting<T: Poolable> {
|
pub(super) struct Connecting<T: Poolable> {
|
||||||
key: Key,
|
key: Key,
|
||||||
pool: Weak<PoolInner<T>>,
|
pool: WeakOpt<PoolInner<T>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Poolable> Drop for Connecting<T> {
|
impl<T: Poolable> Drop for Connecting<T> {
|
||||||
@@ -665,7 +660,7 @@ impl Expiration {
|
|||||||
#[cfg(feature = "runtime")]
|
#[cfg(feature = "runtime")]
|
||||||
struct IdleInterval<T> {
|
struct IdleInterval<T> {
|
||||||
interval: Interval,
|
interval: Interval,
|
||||||
pool: Weak<PoolInner<T>>,
|
pool: WeakOpt<PoolInner<T>>,
|
||||||
// This allows the IdleInterval to be notified as soon as the entire
|
// This allows the IdleInterval to be notified as soon as the entire
|
||||||
// Pool is fully dropped, and shutdown. This channel is never sent on,
|
// Pool is fully dropped, and shutdown. This channel is never sent on,
|
||||||
// but Err(Canceled) will be received when the Pool is dropped.
|
// but Err(Canceled) will be received when the Pool is dropped.
|
||||||
@@ -707,14 +702,30 @@ impl<T: Poolable + 'static> Future for IdleInterval<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<T> WeakOpt<T> {
|
||||||
|
fn none() -> Self {
|
||||||
|
WeakOpt(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn downgrade(arc: &Arc<T>) -> Self {
|
||||||
|
WeakOpt(Some(Arc::downgrade(arc)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn upgrade(&self) -> Option<Arc<T>> {
|
||||||
|
self.0
|
||||||
|
.as_ref()
|
||||||
|
.and_then(Weak::upgrade)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use futures::{Async, Future};
|
use futures::{Async, Future};
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use common::Exec;
|
use common::Exec;
|
||||||
use super::{Connecting, Key, Poolable, Pool, Reservation, Ver};
|
use super::{Connecting, Key, Poolable, Pool, Reservation, Ver, WeakOpt};
|
||||||
|
|
||||||
/// Test unique reservations.
|
/// Test unique reservations.
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
@@ -733,7 +744,7 @@ mod tests {
|
|||||||
fn c<T: Poolable>(key: Key) -> Connecting<T> {
|
fn c<T: Poolable>(key: Key) -> Connecting<T> {
|
||||||
Connecting {
|
Connecting {
|
||||||
key,
|
key,
|
||||||
pool: Weak::new(),
|
pool: WeakOpt::none(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user