fix(client): drop in-use connections when they finish if Client is dropped
This commit is contained in:
		| @@ -3,7 +3,7 @@ use std::collections::{HashMap, VecDeque}; | ||||
| use std::fmt; | ||||
| use std::io; | ||||
| use std::ops::{Deref, DerefMut, BitAndAssign}; | ||||
| use std::rc::Rc; | ||||
| use std::rc::{Rc, Weak}; | ||||
| use std::time::{Duration, Instant}; | ||||
|  | ||||
| use futures::{Future, Async, Poll}; | ||||
| @@ -103,7 +103,7 @@ impl<T: Clone> Pool<T> { | ||||
|                 status: Rc::new(Cell::new(TimedKA::Busy)), | ||||
|             }, | ||||
|             key: key, | ||||
|             pool: self.clone(), | ||||
|             pool: Rc::downgrade(&self.inner), | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -118,7 +118,7 @@ impl<T: Clone> Pool<T> { | ||||
|         Pooled { | ||||
|             entry: entry, | ||||
|             key: key, | ||||
|             pool: self.clone(), | ||||
|             pool: Rc::downgrade(&self.inner), | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -161,7 +161,7 @@ impl<T> Clone for Pool<T> { | ||||
| pub struct Pooled<T> { | ||||
|     entry: Entry<T>, | ||||
|     key: Rc<String>, | ||||
|     pool: Pool<T>, | ||||
|     pool: Weak<RefCell<PoolInner<T>>>, | ||||
| } | ||||
|  | ||||
| impl<T> Deref for Pooled<T> { | ||||
| @@ -194,8 +194,16 @@ impl<T: Clone> KeepAlive for Pooled<T> { | ||||
|             return; | ||||
|         } | ||||
|         self.entry.is_reused = true; | ||||
|         if self.pool.is_enabled() { | ||||
|             self.pool.put(self.key.clone(), self.entry.clone()); | ||||
|         if let Some(inner) = self.pool.upgrade() { | ||||
|             let mut pool = Pool { | ||||
|                 inner: inner, | ||||
|             }; | ||||
|             if pool.is_enabled() { | ||||
|                 pool.put(self.key.clone(), self.entry.clone()); | ||||
|             } | ||||
|         } else { | ||||
|             trace!("pool dropped, dropping pooled ({:?})", self.key); | ||||
|             self.entry.status.set(TimedKA::Disabled); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|   | ||||
| @@ -235,6 +235,9 @@ where I: AsyncRead + AsyncWrite, | ||||
|         // | ||||
|         // When writing finishes, we need to wake the task up in case there | ||||
|         // is more reading that can be done, to start a new message. | ||||
|  | ||||
|  | ||||
|  | ||||
|         let wants_read = match self.state.reading { | ||||
|             Reading::Body(..) | | ||||
|             Reading::KeepAlive => return, | ||||
| @@ -242,13 +245,19 @@ where I: AsyncRead + AsyncWrite, | ||||
|             Reading::Closed => false, | ||||
|         }; | ||||
|  | ||||
|         match self.state.writing { | ||||
|         let wants_write = match self.state.writing { | ||||
|             Writing::Continue(..) | | ||||
|             Writing::Body(..) | | ||||
|             Writing::Ending(..) => return, | ||||
|             Writing::Init | | ||||
|             Writing::KeepAlive | | ||||
|             Writing::Closed => (), | ||||
|             Writing::Init => true, | ||||
|             Writing::KeepAlive => false, | ||||
|             Writing::Closed => false, | ||||
|         }; | ||||
|  | ||||
|         // if the client is at Reading::Init and Writing::Init, | ||||
|         // it's not actually looking for a read, but a write. | ||||
|         if wants_write && !T::should_read_first() { | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         if !self.io.is_read_blocked() { | ||||
| @@ -704,9 +713,13 @@ impl<B, K: KeepAlive> State<B, K> { | ||||
|  | ||||
|     fn idle(&mut self) { | ||||
|         self.method = None; | ||||
|         self.reading = Reading::Init; | ||||
|         self.writing = Writing::Init; | ||||
|         self.keep_alive.idle(); | ||||
|         if self.is_idle() { | ||||
|             self.reading = Reading::Init; | ||||
|             self.writing = Writing::Init; | ||||
|         } else { | ||||
|             self.close(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn is_idle(&self) -> bool { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user