From 91b970086200f76b22e9fea829cd6c6829e35301 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 13 Mar 2018 18:46:52 -0700 Subject: [PATCH] refactor(client): only spawn pooled in executor if not ready when response recieved --- src/client/mod.rs | 31 ++++--- src/client/tests.rs | 9 +- tests/client.rs | 195 +++++++++++++++++++++++++------------------- 3 files changed, 130 insertions(+), 105 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 0ad70b06..eb8c10ea 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -243,20 +243,27 @@ where C: Connect, } else { ClientError::Normal(err) } + }) + .map(move |res| { + // when pooled is dropped, it will try to insert back into the + // pool. To delay that, spawn a future that completes once the + // sender is ready again. + // + // This *should* only be once the related `Connection` has polled + // for a new request to start. + // + // It won't be ready if there is a body to stream. + if let Ok(Async::NotReady) = pooled.tx.poll_ready() { + // If the executor doesn't have room, oh well. Things will likely + // be blowing up soon, but this specific task isn't required. + let _ = executor.execute(future::poll_fn(move || { + pooled.tx.poll_ready().map_err(|_| ()) + })); + } + + res }); - // when pooled is dropped, it will try to insert back into the - // pool. To delay that, spawn a future that completes once the - // sender is ready again. - // - // This *should* only be once the related `Connection` has polled - // for a new request to start. - // - // If the executor doesn't have room, oh well. Things will likely - // be blowing up soon, but this specific task isn't required. - let _ = executor.execute(future::poll_fn(move || { - pooled.tx.poll_ready().map_err(|_| ()) - })); fut }); diff --git a/src/client/tests.rs b/src/client/tests.rs index daf442f2..7d2157dd 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -1,10 +1,8 @@ extern crate pretty_env_logger; -use std::time::Duration; - use futures::Async; use futures::future::poll_fn; -use tokio::reactor::{Core, Timeout}; +use tokio::reactor::Core; use mock::MockConnector; use super::*; @@ -70,11 +68,6 @@ fn conn_reset_after_write() { Ok(Async::Ready(())) }); core.run(res1.join(srv1)).expect("res1"); - - // run a tiny timeout just to spin the core, so that the pool - // can tell the socket is ready again - let timeout = Timeout::new(Duration::from_millis(50), &core.handle()).unwrap(); - core.run(timeout).unwrap(); } let res2 = client.get("http://mock.local/a".parse().unwrap()); diff --git a/tests/client.rs b/tests/client.rs index abfeebe3..72783b3d 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1,4 +1,5 @@ #![deny(warnings)] +extern crate bytes; extern crate hyper; extern crate futures; extern crate tokio_core; @@ -10,7 +11,7 @@ use std::net::TcpListener; use std::thread; use std::time::Duration; -use hyper::client::{Client, Request, HttpConnector}; +use hyper::client::{Client, Request}; use hyper::{Method, StatusCode}; use futures::{Future, Stream}; @@ -18,10 +19,6 @@ use futures::sync::oneshot; use tokio_core::reactor::{Core, Handle}; -fn client(handle: &Handle) -> Client { - Client::new(handle) -} - fn s(buf: &[u8]) -> &str { ::std::str::from_utf8(buf).unwrap() } @@ -647,86 +644,6 @@ test! { body: None, } -#[test] -fn client_keep_alive() { - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let client = client(&core.handle()); - - - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1"); - let _ = tx1.send(()); - - let n2 = sock.read(&mut buf).expect("read 2"); - assert_ne!(n2, 0); - let second_get = "GET /b HTTP/1.1\r\n"; - assert_eq!(s(&buf[..second_get.len()]), second_get); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2"); - let _ = tx2.send(()); - }); - - - - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let res = client.get(format!("http://{}/a", addr).parse().unwrap()); - core.run(res.join(rx).map(|r| r.0)).unwrap(); - - let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let res = client.get(format!("http://{}/b", addr).parse().unwrap()); - core.run(res.join(rx).map(|r| r.0)).unwrap(); -} - -#[test] -fn client_keep_alive_extra_body() { - let _ = pretty_env_logger::try_init(); - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let mut core = Core::new().unwrap(); - let client = client(&core.handle()); - - - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello").expect("write 1"); - // the body "hello", while ignored because its a HEAD request, should mean the connection - // cannot be put back in the pool - let _ = tx1.send(()); - - let mut sock2 = server.accept().unwrap().0; - let n2 = sock2.read(&mut buf).expect("read 2"); - assert_ne!(n2, 0); - let second_get = "GET /b HTTP/1.1\r\n"; - assert_eq!(s(&buf[..second_get.len()]), second_get); - sock2.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2"); - let _ = tx2.send(()); - }); - - - - let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let req = Request::new(Method::Head, format!("http://{}/a", addr).parse().unwrap()); - let res = client.request(req); - core.run(res.join(rx).map(|r| r.0)).unwrap(); - - let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); - let res = client.get(format!("http://{}/b", addr).parse().unwrap()); - core.run(res.join(rx).map(|r| r.0)).unwrap(); -} mod dispatch_impl { use super::*; @@ -1214,6 +1131,110 @@ mod dispatch_impl { assert_eq!(connects.load(Ordering::Relaxed), 0); } + #[test] + fn client_keep_alive_0() { + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + let connector = DebugConnector::new(&handle); + let connects = connector.connects.clone(); + + let client = Client::configure() + .connector(connector) + .build(&handle); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + //drop(server); + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1"); + let _ = tx1.send(()); + + let n2 = sock.read(&mut buf).expect("read 2"); + assert_ne!(n2, 0); + let second_get = "GET /b HTTP/1.1\r\n"; + assert_eq!(s(&buf[..second_get.len()]), second_get); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2"); + let _ = tx2.send(()); + }); + + + assert_eq!(connects.load(Ordering::Relaxed), 0); + + let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let res = client.get(format!("http://{}/a", addr).parse().unwrap()); + core.run(res.join(rx).map(|r| r.0)).unwrap(); + + assert_eq!(connects.load(Ordering::Relaxed), 1); + + let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let res = client.get(format!("http://{}/b", addr).parse().unwrap()); + core.run(res.join(rx).map(|r| r.0)).unwrap(); + + assert_eq!(connects.load(Ordering::Relaxed), 1, "second request should still only have 1 connect"); + } + + #[test] + fn client_keep_alive_extra_body() { + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + let connector = DebugConnector::new(&handle); + let connects = connector.connects.clone(); + + let client = Client::configure() + .connector(connector) + .build(&handle); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello").expect("write 1"); + // the body "hello", while ignored because its a HEAD request, should mean the connection + // cannot be put back in the pool + let _ = tx1.send(()); + + let mut sock2 = server.accept().unwrap().0; + let n2 = sock2.read(&mut buf).expect("read 2"); + assert_ne!(n2, 0); + let second_get = "GET /b HTTP/1.1\r\n"; + assert_eq!(s(&buf[..second_get.len()]), second_get); + sock2.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2"); + let _ = tx2.send(()); + }); + + + assert_eq!(connects.load(Ordering::Relaxed), 0); + + let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let req = Request::new(Method::Head, format!("http://{}/a", addr).parse().unwrap()); + let res = client.request(req); + core.run(res.join(rx).map(|r| r.0)).unwrap(); + + assert_eq!(connects.load(Ordering::Relaxed), 1); + + let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); + let res = client.get(format!("http://{}/b", addr).parse().unwrap()); + core.run(res.join(rx).map(|r| r.0)).unwrap(); + + assert_eq!(connects.load(Ordering::Relaxed), 2); + } + struct DebugConnector { http: HttpConnector, @@ -1274,6 +1295,10 @@ mod dispatch_impl { fn shutdown(&mut self) -> futures::Poll<(), io::Error> { AsyncWrite::shutdown(&mut self.0) } + + fn write_buf(&mut self, buf: &mut B) -> futures::Poll { + self.0.write_buf(buf) + } } impl Read for DebugStream {