Merge pull request #1387 from sfackler/keep-alive-leak
fix(client): don't leak connections with no keep-alive
This commit is contained in:
		| @@ -200,10 +200,13 @@ impl<T: Clone> KeepAlive for Pooled<T> { | |||||||
|             }; |             }; | ||||||
|             if pool.is_enabled() { |             if pool.is_enabled() { | ||||||
|                 pool.put(self.key.clone(), self.entry.clone()); |                 pool.put(self.key.clone(), self.entry.clone()); | ||||||
|  |             } else { | ||||||
|  |                 trace!("keepalive disabled, dropping pooled ({:?})", self.key); | ||||||
|  |                 self.disable(); | ||||||
|             } |             } | ||||||
|         } else { |         } else { | ||||||
|             trace!("pool dropped, dropping pooled ({:?})", self.key); |             trace!("pool dropped, dropping pooled ({:?})", self.key); | ||||||
|             self.entry.status.set(TimedKA::Disabled); |             self.disable(); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -654,6 +654,46 @@ mod dispatch_impl { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn no_keep_alive_closes_connection() { | ||||||
|  |         // https://github.com/hyperium/hyper/issues/1383 | ||||||
|  |         let _ = pretty_env_logger::init(); | ||||||
|  |  | ||||||
|  |         let server = TcpListener::bind("127.0.0.1:0").unwrap(); | ||||||
|  |         let addr = server.local_addr().unwrap(); | ||||||
|  |         let mut core = Core::new().unwrap(); | ||||||
|  |         let handle = core.handle(); | ||||||
|  |         let closes = Arc::new(AtomicUsize::new(0)); | ||||||
|  |  | ||||||
|  |         let (tx1, rx1) = oneshot::channel(); | ||||||
|  |  | ||||||
|  |         thread::spawn(move || { | ||||||
|  |             let mut sock = server.accept().unwrap().0; | ||||||
|  |             sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); | ||||||
|  |             sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); | ||||||
|  |             let mut buf = [0; 4096]; | ||||||
|  |             sock.read(&mut buf).expect("read 1"); | ||||||
|  |             sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap(); | ||||||
|  |             let _ = tx1.send(()); | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |         let uri = format!("http://{}/a", addr).parse().unwrap(); | ||||||
|  |  | ||||||
|  |         let client = Client::configure() | ||||||
|  |             .connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone())) | ||||||
|  |             .no_proto() | ||||||
|  |             .keep_alive(false) | ||||||
|  |             .build(&handle); | ||||||
|  |         let res = client.get(uri).and_then(move |res| { | ||||||
|  |             assert_eq!(res.status(), hyper::StatusCode::Ok); | ||||||
|  |             res.body().concat2() | ||||||
|  |         }); | ||||||
|  |         let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); | ||||||
|  |         core.run(res.join(rx).map(|r| r.0)).unwrap(); | ||||||
|  |  | ||||||
|  |         assert_eq!(closes.load(Ordering::Relaxed), 1); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |  | ||||||
|     struct DebugConnector(HttpConnector, Arc<AtomicUsize>); |     struct DebugConnector(HttpConnector, Arc<AtomicUsize>); | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user