refactor(client): skip some pool housekeeping on shared h2 handles
This commit is contained in:
		| @@ -251,7 +251,12 @@ impl Connect for HttpConnector { | |||||||
|     type Future = HttpConnecting; |     type Future = HttpConnecting; | ||||||
|  |  | ||||||
|     fn connect(&self, dst: Destination) -> Self::Future { |     fn connect(&self, dst: Destination) -> Self::Future { | ||||||
|         trace!("Http::connect({:?})", dst.uri); |         trace!( | ||||||
|  |             "Http::connect; scheme={}, host={}, port={:?}", | ||||||
|  |             dst.scheme(), | ||||||
|  |             dst.host(), | ||||||
|  |             dst.port(), | ||||||
|  |         ); | ||||||
|  |  | ||||||
|         if self.enforce_http { |         if self.enforce_http { | ||||||
|             if dst.uri.scheme_part() != Some(&Scheme::HTTP) { |             if dst.uri.scheme_part() != Some(&Scheme::HTTP) { | ||||||
|   | |||||||
| @@ -255,6 +255,8 @@ where C: Connect + Sync + 'static, | |||||||
|                     } |                     } | ||||||
|                 }) |                 }) | ||||||
|                 .and_then(move |mut res| { |                 .and_then(move |mut res| { | ||||||
|  |                     // If pooled is HTTP/2, we can toss this reference immediately. | ||||||
|  |                     // | ||||||
|                     // when pooled is dropped, it will try to insert back into the |                     // when pooled is dropped, it will try to insert back into the | ||||||
|                     // pool. To delay that, spawn a future that completes once the |                     // pool. To delay that, spawn a future that completes once the | ||||||
|                     // sender is ready again. |                     // sender is ready again. | ||||||
| @@ -263,7 +265,7 @@ where C: Connect + Sync + 'static, | |||||||
|                     // for a new request to start. |                     // for a new request to start. | ||||||
|                     // |                     // | ||||||
|                     // It won't be ready if there is a body to stream. |                     // It won't be ready if there is a body to stream. | ||||||
|                     if pooled.is_ready() { |                     if ver == Ver::Http2 || pooled.is_ready() { | ||||||
|                         drop(pooled); |                         drop(pooled); | ||||||
|                     } else if !res.body().is_empty() { |                     } else if !res.body().is_empty() { | ||||||
|                         let (delayed_tx, delayed_rx) = oneshot::channel(); |                         let (delayed_tx, delayed_rx) = oneshot::channel(); | ||||||
|   | |||||||
| @@ -159,7 +159,7 @@ impl<T: Poolable> Pool<T> { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> { |     pub(super) fn pooled(&self, mut connecting: Connecting<T>, value: T) -> Pooled<T> { | ||||||
|         let value = match value.reserve() { |         let (value, pool_ref) = match value.reserve() { | ||||||
|             Reservation::Shared(to_insert, to_return) => { |             Reservation::Shared(to_insert, to_return) => { | ||||||
|                 debug_assert_eq!( |                 debug_assert_eq!( | ||||||
|                     connecting.key.1, |                     connecting.key.1, | ||||||
| @@ -174,24 +174,45 @@ impl<T: Poolable> Pool<T> { | |||||||
|                 // prevent the Drop of Connecting from repeating inner.connected() |                 // prevent the Drop of Connecting from repeating inner.connected() | ||||||
|                 connecting.pool = Weak::new(); |                 connecting.pool = Weak::new(); | ||||||
|  |  | ||||||
|                 to_return |                 // Shared reservations don't need a reference to the pool, | ||||||
|  |                 // since the pool always keeps a copy. | ||||||
|  |                 (to_return, Weak::new()) | ||||||
|  |             }, | ||||||
|  |             Reservation::Unique(value) => { | ||||||
|  |                 // Unique reservations must take a reference to the pool | ||||||
|  |                 // since they hope to reinsert once the reservation is | ||||||
|  |                 // completed | ||||||
|  |                 (value, Arc::downgrade(&self.inner)) | ||||||
|             }, |             }, | ||||||
|             Reservation::Unique(value) => value, |  | ||||||
|         }; |         }; | ||||||
|         Pooled { |         Pooled { | ||||||
|             is_reused: false, |             is_reused: false, | ||||||
|             key: connecting.key.clone(), |             key: connecting.key.clone(), | ||||||
|             pool: Arc::downgrade(&self.inner), |             pool: pool_ref, | ||||||
|             value: Some(value) |             value: Some(value) | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn reuse(&self, key: &Key, value: T) -> Pooled<T> { |     fn reuse(&self, key: &Key, value: T) -> Pooled<T> { | ||||||
|         debug!("reuse idle connection for {:?}", key); |         debug!("reuse idle connection for {:?}", key); | ||||||
|  |         // TODO: unhack this | ||||||
|  |         // In Pool::pooled(), which is used for inserting brand new connections, | ||||||
|  |         // there's some code that adjusts the pool reference taken depending | ||||||
|  |         // on if the Reservation can be shared or is unique. By the time | ||||||
|  |         // reuse() is called, the reservation has already been made, and | ||||||
|  |         // we just have the final value, without knowledge of if this is | ||||||
|  |         // unique or shared. So, the hack is to just assume Ver::Http2 means | ||||||
|  |         // shared... :( | ||||||
|  |         let pool_ref = if key.1 == Ver::Http2 { | ||||||
|  |             Weak::new() | ||||||
|  |         } else { | ||||||
|  |             Arc::downgrade(&self.inner) | ||||||
|  |         }; | ||||||
|  |  | ||||||
|         Pooled { |         Pooled { | ||||||
|             is_reused: true, |             is_reused: true, | ||||||
|             key: key.clone(), |             key: key.clone(), | ||||||
|             pool: Arc::downgrade(&self.inner), |             pool: pool_ref, | ||||||
|             value: Some(value), |             value: Some(value), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| @@ -216,15 +237,18 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { | |||||||
|         while let Some(entry) = self.list.pop() { |         while let Some(entry) = self.list.pop() { | ||||||
|             // If the connection has been closed, or is older than our idle |             // If the connection has been closed, or is older than our idle | ||||||
|             // timeout, simply drop it and keep looking... |             // timeout, simply drop it and keep looking... | ||||||
|             // |             if entry.value.is_closed() { | ||||||
|  |                 trace!("removing closed connection for {:?}", self.key); | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|             // TODO: Actually, since the `idle` list is pushed to the end always, |             // TODO: Actually, since the `idle` list is pushed to the end always, | ||||||
|             // that would imply that if *this* entry is expired, then anything |             // that would imply that if *this* entry is expired, then anything | ||||||
|             // "earlier" in the list would *have* to be expired also... Right? |             // "earlier" in the list would *have* to be expired also... Right? | ||||||
|             // |             // | ||||||
|             // In that case, we could just break out of the loop and drop the |             // In that case, we could just break out of the loop and drop the | ||||||
|             // whole list... |             // whole list... | ||||||
|             if entry.value.is_closed() || expiration.expires(entry.idle_at) { |             if expiration.expires(entry.idle_at) { | ||||||
|                 trace!("remove unacceptable pooled connection for {:?}", self.key); |                 trace!("removing expired connection for {:?}", self.key); | ||||||
|                 continue; |                 continue; | ||||||
|             } |             } | ||||||
|  |  | ||||||
| @@ -257,10 +281,10 @@ impl<T: Poolable> PoolInner<T> { | |||||||
|             return; |             return; | ||||||
|         } |         } | ||||||
|         if key.1 == Ver::Http2 && self.idle.contains_key(&key) { |         if key.1 == Ver::Http2 && self.idle.contains_key(&key) { | ||||||
|             trace!("Pool::put; existing idle HTTP/2 connection for {:?}", key); |             trace!("put; existing idle HTTP/2 connection for {:?}", key); | ||||||
|             return; |             return; | ||||||
|         } |         } | ||||||
|         trace!("Pool::put {:?}", key); |         trace!("put; add idle connection for {:?}", key); | ||||||
|         let mut remove_parked = false; |         let mut remove_parked = false; | ||||||
|         let mut value = Some(value); |         let mut value = Some(value); | ||||||
|         if let Some(parked) = self.parked.get_mut(&key) { |         if let Some(parked) = self.parked.get_mut(&key) { | ||||||
| @@ -288,7 +312,7 @@ impl<T: Poolable> PoolInner<T> { | |||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|  |  | ||||||
|                 trace!("Pool::put removing canceled parked {:?}", key); |                 trace!("put; removing canceled waiter for {:?}", key); | ||||||
|             } |             } | ||||||
|             remove_parked = parked.is_empty(); |             remove_parked = parked.is_empty(); | ||||||
|         } |         } | ||||||
| @@ -306,7 +330,7 @@ impl<T: Poolable> PoolInner<T> { | |||||||
|                          idle_at: Instant::now(), |                          idle_at: Instant::now(), | ||||||
|                      }); |                      }); | ||||||
|             } |             } | ||||||
|             None => trace!("Pool::put found parked {:?}", key), |             None => trace!("put; found waiter for {:?}", key), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -345,23 +369,26 @@ impl<T> PoolInner<T> { | |||||||
| } | } | ||||||
|  |  | ||||||
| impl<T: Poolable> PoolInner<T> { | impl<T: Poolable> PoolInner<T> { | ||||||
|  |     /// This should *only* be called by the IdleInterval. | ||||||
|     fn clear_expired(&mut self) { |     fn clear_expired(&mut self) { | ||||||
|         let dur = if let Some(dur) = self.timeout { |         let dur = self.timeout.expect("interval assumes timeout"); | ||||||
|             dur |  | ||||||
|         } else { |  | ||||||
|             return |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         let now = Instant::now(); |         let now = Instant::now(); | ||||||
|         //self.last_idle_check_at = now; |         //self.last_idle_check_at = now; | ||||||
|  |  | ||||||
|         self.idle.retain(|_key, values| { |         self.idle.retain(|key, values| { | ||||||
|  |  | ||||||
|             values.retain(|entry| { |             values.retain(|entry| { | ||||||
|                 if entry.value.is_closed() { |                 if entry.value.is_closed() { | ||||||
|  |                     trace!("idle interval evicting closed for {:?}", key); | ||||||
|                     return false; |                     return false; | ||||||
|                 } |                 } | ||||||
|                 now - entry.idle_at < dur |                 if now - entry.idle_at > dur { | ||||||
|  |                     trace!("idle interval evicting expired for {:?}", key); | ||||||
|  |                     return false; | ||||||
|  |                 } | ||||||
|  |  | ||||||
|  |                 // Otherwise, keep this value... | ||||||
|  |                 true | ||||||
|             }); |             }); | ||||||
|  |  | ||||||
|             // returning false evicts this key/val |             // returning false evicts this key/val | ||||||
| @@ -459,9 +486,11 @@ impl<T: Poolable> Drop for Pooled<T> { | |||||||
|                 if let Ok(mut inner) = inner.lock() { |                 if let Ok(mut inner) = inner.lock() { | ||||||
|                     inner.put(self.key.clone(), value); |                     inner.put(self.key.clone(), value); | ||||||
|                 } |                 } | ||||||
|             } else { |             } else if self.key.1 == Ver::Http1 { | ||||||
|                 trace!("pool dropped, dropping pooled ({:?})", self.key); |                 trace!("pool dropped, dropping pooled ({:?})", self.key); | ||||||
|             } |             } | ||||||
|  |             // Ver::Http2 is already in the Pool (or dead), so we wouldn't | ||||||
|  |             // have an actual reference to the Pool. | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user