#![cfg(feature = "runtime")] extern crate pretty_env_logger; use futures::{Async, Future, Stream}; use futures::future::poll_fn; use futures::sync::oneshot; use tokio::runtime::current_thread::Runtime; use mock::MockConnector; use super::*; #[test] fn retryable_request() { let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().expect("new rt"); let mut connector = MockConnector::new(); let sock1 = connector.mock("http://mock.local"); let sock2 = connector.mock("http://mock.local"); let client = Client::builder() .build::<_, ::Body>(connector); client.pool.no_timer(); { let req = Request::builder() .uri("http://mock.local/a") .body(Default::default()) .unwrap(); let res1 = client.request(req); let srv1 = poll_fn(|| { try_ready!(sock1.read(&mut [0u8; 512])); try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); rt.block_on(res1.join(srv1)).expect("res1"); } drop(sock1); let req = Request::builder() .uri("http://mock.local/b") .body(Default::default()) .unwrap(); let res2 = client.request(req) .map(|res| { assert_eq!(res.status().as_u16(), 222); }); let srv2 = poll_fn(|| { try_ready!(sock2.read(&mut [0u8; 512])); try_ready!(sock2.write(b"HTTP/1.1 222 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e)); rt.block_on(res2.join(srv2)).expect("res2"); } #[test] fn conn_reset_after_write() { let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().expect("new rt"); let mut connector = MockConnector::new(); let sock1 = connector.mock("http://mock.local"); let client = Client::builder() .build::<_, ::Body>(connector); client.pool.no_timer(); { let req = Request::builder() .uri("http://mock.local/a") .body(Default::default()) .unwrap(); let res1 = client.request(req); let srv1 = poll_fn(|| { try_ready!(sock1.read(&mut [0u8; 512])); try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); rt.block_on(res1.join(srv1)).expect("res1"); } let req = Request::builder() .uri("http://mock.local/a") .body(Default::default()) .unwrap(); let res2 = client.request(req); let mut sock1 = Some(sock1); let srv2 = poll_fn(|| { // We purposefully keep the socket open until the client // has written the second request, and THEN disconnect. // // Not because we expect servers to be jerks, but to trigger // state where we write on an assumedly good connetion, and // only reset the close AFTER we wrote bytes. try_ready!(sock1.as_mut().unwrap().read(&mut [0u8; 512])); sock1.take(); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e)); let err = rt.block_on(res2.join(srv2)).expect_err("res2"); match err.kind() { &::error::Kind::Incomplete => (), other => panic!("expected Incomplete, found {:?}", other) } } #[test] fn checkout_win_allows_connect_future_to_be_pooled() { let _ = pretty_env_logger::try_init(); let mut rt = Runtime::new().expect("new rt"); let mut connector = MockConnector::new(); let (tx, rx) = oneshot::channel::<()>(); let sock1 = connector.mock("http://mock.local"); let sock2 = connector.mock_fut("http://mock.local", rx); let client = Client::builder() .build::<_, ::Body>(connector); client.pool.no_timer(); let uri = "http://mock.local/a".parse::<::Uri>().expect("uri parse"); // First request just sets us up to have a connection able to be put // back in the pool. *However*, it doesn't insert immediately. The // body has 1 pending byte, and we will only drain in request 2, once // the connect future has been started. let mut body = { let res1 = client.get(uri.clone()) .map(|res| res.into_body().concat2()); let srv1 = poll_fn(|| { try_ready!(sock1.read(&mut [0u8; 512])); // Chunked is used so as to force 2 body reads. try_ready!(sock1.write(b"\ HTTP/1.1 200 OK\r\n\ transfer-encoding: chunked\r\n\ \r\n\ 1\r\nx\r\n\ 0\r\n\r\n\ ")); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv1 poll_fn error: {}", e)); rt.block_on(res1.join(srv1)).expect("res1").0 }; // The second request triggers the only mocked connect future, but then // the drained body allows the first socket to go back to the pool, // "winning" the checkout race. { let res2 = client.get(uri.clone()); let drain = poll_fn(move || { body.poll() }); let srv2 = poll_fn(|| { try_ready!(sock1.read(&mut [0u8; 512])); try_ready!(sock1.write(b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\nx")); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv2 poll_fn error: {}", e)); rt.block_on(res2.join(drain).join(srv2)).expect("res2"); } // "Release" the mocked connect future, and let the runtime spin once so // it's all setup... { let mut tx = Some(tx); let client = &client; let key = client.pool.h1_key("http://mock.local"); let mut tick_cnt = 0; let fut = poll_fn(move || { tx.take(); if client.pool.idle_count(&key) == 0 { tick_cnt += 1; assert!(tick_cnt < 10, "ticked too many times waiting for idle"); trace!("no idle yet; tick count: {}", tick_cnt); ::futures::task::current().notify(); Ok(Async::NotReady) } else { Ok::<_, ()>(Async::Ready(())) } }); rt.block_on(fut).unwrap(); } // Third request just tests out that the "loser" connection was pooled. If // it isn't, this will panic since the MockConnector doesn't have any more // mocks to give out. { let res3 = client.get(uri); let srv3 = poll_fn(|| { try_ready!(sock2.read(&mut [0u8; 512])); try_ready!(sock2.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")); Ok(Async::Ready(())) }).map_err(|e: ::std::io::Error| panic!("srv3 poll_fn error: {}", e)); rt.block_on(res3.join(srv3)).expect("res3"); } }