diff --git a/src/client/connect.rs b/src/client/connect.rs index 99541717..8c659812 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -251,7 +251,12 @@ impl Connect for HttpConnector { type Future = HttpConnecting; fn connect(&self, dst: Destination) -> Self::Future { - trace!("Http::connect({:?})", dst.uri); + trace!( + "Http::connect; scheme={}, host={}, port={:?}", + dst.scheme(), + dst.host(), + dst.port(), + ); if self.enforce_http { if dst.uri.scheme_part() != Some(&Scheme::HTTP) { diff --git a/src/client/mod.rs b/src/client/mod.rs index c93da09f..f580874e 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -255,6 +255,8 @@ where C: Connect + Sync + 'static, } }) .and_then(move |mut res| { + // If pooled is HTTP/2, we can toss this reference immediately. + // // when pooled is dropped, it will try to insert back into the // pool. To delay that, spawn a future that completes once the // sender is ready again. @@ -263,7 +265,7 @@ where C: Connect + Sync + 'static, // for a new request to start. // // It won't be ready if there is a body to stream. - if pooled.is_ready() { + if ver == Ver::Http2 || pooled.is_ready() { drop(pooled); } else if !res.body().is_empty() { let (delayed_tx, delayed_rx) = oneshot::channel(); diff --git a/src/client/pool.rs b/src/client/pool.rs index a5730f82..0adda3c9 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -159,7 +159,7 @@ impl Pool { } pub(super) fn pooled(&self, mut connecting: Connecting, value: T) -> Pooled { - let value = match value.reserve() { + let (value, pool_ref) = match value.reserve() { Reservation::Shared(to_insert, to_return) => { debug_assert_eq!( connecting.key.1, @@ -174,24 +174,45 @@ impl Pool { // prevent the Drop of Connecting from repeating inner.connected() connecting.pool = Weak::new(); - to_return + // Shared reservations don't need a reference to the pool, + // since the pool always keeps a copy. + (to_return, Weak::new()) + }, + Reservation::Unique(value) => { + // Unique reservations must take a reference to the pool + // since they hope to reinsert once the reservation is + // completed + (value, Arc::downgrade(&self.inner)) }, - Reservation::Unique(value) => value, }; Pooled { is_reused: false, key: connecting.key.clone(), - pool: Arc::downgrade(&self.inner), + pool: pool_ref, value: Some(value) } } fn reuse(&self, key: &Key, value: T) -> Pooled { debug!("reuse idle connection for {:?}", key); + // TODO: unhack this + // In Pool::pooled(), which is used for inserting brand new connections, + // there's some code that adjusts the pool reference taken depending + // on if the Reservation can be shared or is unique. By the time + // reuse() is called, the reservation has already been made, and + // we just have the final value, without knowledge of if this is + // unique or shared. So, the hack is to just assume Ver::Http2 means + // shared... :( + let pool_ref = if key.1 == Ver::Http2 { + Weak::new() + } else { + Arc::downgrade(&self.inner) + }; + Pooled { is_reused: true, key: key.clone(), - pool: Arc::downgrade(&self.inner), + pool: pool_ref, value: Some(value), } } @@ -216,15 +237,18 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { while let Some(entry) = self.list.pop() { // If the connection has been closed, or is older than our idle // timeout, simply drop it and keep looking... - // + if entry.value.is_closed() { + trace!("removing closed connection for {:?}", self.key); + continue; + } // TODO: Actually, since the `idle` list is pushed to the end always, // that would imply that if *this* entry is expired, then anything // "earlier" in the list would *have* to be expired also... Right? // // In that case, we could just break out of the loop and drop the // whole list... - if entry.value.is_closed() || expiration.expires(entry.idle_at) { - trace!("remove unacceptable pooled connection for {:?}", self.key); + if expiration.expires(entry.idle_at) { + trace!("removing expired connection for {:?}", self.key); continue; } @@ -257,10 +281,10 @@ impl PoolInner { return; } if key.1 == Ver::Http2 && self.idle.contains_key(&key) { - trace!("Pool::put; existing idle HTTP/2 connection for {:?}", key); + trace!("put; existing idle HTTP/2 connection for {:?}", key); return; } - trace!("Pool::put {:?}", key); + trace!("put; add idle connection for {:?}", key); let mut remove_parked = false; let mut value = Some(value); if let Some(parked) = self.parked.get_mut(&key) { @@ -288,7 +312,7 @@ impl PoolInner { } } - trace!("Pool::put removing canceled parked {:?}", key); + trace!("put; removing canceled waiter for {:?}", key); } remove_parked = parked.is_empty(); } @@ -306,7 +330,7 @@ impl PoolInner { idle_at: Instant::now(), }); } - None => trace!("Pool::put found parked {:?}", key), + None => trace!("put; found waiter for {:?}", key), } } @@ -345,23 +369,26 @@ impl PoolInner { } impl PoolInner { + /// This should *only* be called by the IdleInterval. fn clear_expired(&mut self) { - let dur = if let Some(dur) = self.timeout { - dur - } else { - return - }; + let dur = self.timeout.expect("interval assumes timeout"); let now = Instant::now(); //self.last_idle_check_at = now; - self.idle.retain(|_key, values| { - + self.idle.retain(|key, values| { values.retain(|entry| { if entry.value.is_closed() { + trace!("idle interval evicting closed for {:?}", key); return false; } - now - entry.idle_at < dur + if now - entry.idle_at > dur { + trace!("idle interval evicting expired for {:?}", key); + return false; + } + + // Otherwise, keep this value... + true }); // returning false evicts this key/val @@ -459,9 +486,11 @@ impl Drop for Pooled { if let Ok(mut inner) = inner.lock() { inner.put(self.key.clone(), value); } - } else { + } else if self.key.1 == Ver::Http1 { trace!("pool dropped, dropping pooled ({:?})", self.key); } + // Ver::Http2 is already in the Pool (or dead), so we wouldn't + // have an actual reference to the Pool. } } }