fix(http1): try to drain connection buffer if user drops Body
This commit is contained in:
		| @@ -655,6 +655,20 @@ where | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// If the read side can be cheaply drained, do so. Otherwise, close. | ||||||
|  |     pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut task::Context<'_>) { | ||||||
|  |         let _ = self.poll_read_body(cx); | ||||||
|  |  | ||||||
|  |         // If still in Reading::Body, just give up | ||||||
|  |         match self.state.reading { | ||||||
|  |             Reading::Init | Reading::KeepAlive => { | ||||||
|  |                 trace!("body drained"); | ||||||
|  |                 return; | ||||||
|  |             } | ||||||
|  |             _ => self.close_read(), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn close_read(&mut self) { |     pub fn close_read(&mut self) { | ||||||
|         self.state.close_read(); |         self.state.close_read(); | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -186,9 +186,9 @@ where | |||||||
|                         Poll::Ready(Err(_canceled)) => { |                         Poll::Ready(Err(_canceled)) => { | ||||||
|                             // user doesn't care about the body |                             // user doesn't care about the body | ||||||
|                             // so we should stop reading |                             // so we should stop reading | ||||||
|                             trace!("body receiver dropped before eof, closing"); |                             trace!("body receiver dropped before eof, draining or closing"); | ||||||
|                             self.conn.close_read(); |                             self.conn.poll_drain_or_close_read(cx); | ||||||
|                             return Poll::Ready(Ok(())); |                             continue; | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|                     match self.conn.poll_read_body(cx) { |                     match self.conn.poll_read_body(cx) { | ||||||
|   | |||||||
| @@ -967,6 +967,7 @@ mod dispatch_impl { | |||||||
|     use tokio::net::TcpStream; |     use tokio::net::TcpStream; | ||||||
|     use tokio::runtime::Runtime; |     use tokio::runtime::Runtime; | ||||||
|  |  | ||||||
|  |     use hyper::body::HttpBody; | ||||||
|     use hyper::client::connect::{Connected, Connection, HttpConnector}; |     use hyper::client::connect::{Connected, Connection, HttpConnector}; | ||||||
|     use hyper::Client; |     use hyper::Client; | ||||||
|  |  | ||||||
| @@ -1574,6 +1575,95 @@ mod dispatch_impl { | |||||||
|         assert_eq!(connects.load(Ordering::Relaxed), 1); |         assert_eq!(connects.load(Ordering::Relaxed), 1); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     #[tokio::test] | ||||||
|  |     async fn client_keep_alive_eager_when_chunked() { | ||||||
|  |         // If a response body has been read to completion, with completion | ||||||
|  |         // determined by some other factor, like decompression, and thus | ||||||
|  |         // it is in't polled a final time to clear the final 0-len chunk, | ||||||
|  |         // try to eagerly clear it so the connection can still be used. | ||||||
|  |  | ||||||
|  |         let _ = pretty_env_logger::try_init(); | ||||||
|  |         let server = TcpListener::bind("127.0.0.1:0").unwrap(); | ||||||
|  |         let addr = server.local_addr().unwrap(); | ||||||
|  |         let connector = DebugConnector::new(); | ||||||
|  |         let connects = connector.connects.clone(); | ||||||
|  |  | ||||||
|  |         let client = Client::builder().build(connector); | ||||||
|  |  | ||||||
|  |         let (tx1, rx1) = oneshot::channel(); | ||||||
|  |         let (tx2, rx2) = oneshot::channel(); | ||||||
|  |         thread::spawn(move || { | ||||||
|  |             let mut sock = server.accept().unwrap().0; | ||||||
|  |             //drop(server); | ||||||
|  |             sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); | ||||||
|  |             sock.set_write_timeout(Some(Duration::from_secs(5))) | ||||||
|  |                 .unwrap(); | ||||||
|  |             let mut buf = [0; 4096]; | ||||||
|  |             sock.read(&mut buf).expect("read 1"); | ||||||
|  |             sock.write_all( | ||||||
|  |                 b"\ | ||||||
|  |                 HTTP/1.1 200 OK\r\n\ | ||||||
|  |                 transfer-encoding: chunked\r\n\ | ||||||
|  |                 \r\n\ | ||||||
|  |                 5\r\n\ | ||||||
|  |                 hello\r\n\ | ||||||
|  |                 0\r\n\r\n\ | ||||||
|  |             ", | ||||||
|  |             ) | ||||||
|  |             .expect("write 1"); | ||||||
|  |             let _ = tx1.send(()); | ||||||
|  |  | ||||||
|  |             let n2 = sock.read(&mut buf).expect("read 2"); | ||||||
|  |             assert_ne!(n2, 0, "bytes of second request"); | ||||||
|  |             let second_get = "GET /b HTTP/1.1\r\n"; | ||||||
|  |             assert_eq!(s(&buf[..second_get.len()]), second_get); | ||||||
|  |             sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") | ||||||
|  |                 .expect("write 2"); | ||||||
|  |             let _ = tx2.send(()); | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |         assert_eq!(connects.load(Ordering::SeqCst), 0); | ||||||
|  |  | ||||||
|  |         let rx = rx1.expect("thread panicked"); | ||||||
|  |         let req = Request::builder() | ||||||
|  |             .uri(&*format!("http://{}/a", addr)) | ||||||
|  |             .body(Body::empty()) | ||||||
|  |             .unwrap(); | ||||||
|  |         let fut = client.request(req); | ||||||
|  |  | ||||||
|  |         let mut resp = future::join(fut, rx).map(|r| r.0).await.unwrap(); | ||||||
|  |         assert_eq!(connects.load(Ordering::SeqCst), 1); | ||||||
|  |         assert_eq!(resp.status(), 200); | ||||||
|  |         assert_eq!(resp.headers()["transfer-encoding"], "chunked"); | ||||||
|  |  | ||||||
|  |         // Read the "hello" chunk... | ||||||
|  |         let chunk = resp.body_mut().data().await.unwrap().unwrap(); | ||||||
|  |         assert_eq!(chunk, "hello"); | ||||||
|  |  | ||||||
|  |         // With our prior knowledge, we know that's the end of the body. | ||||||
|  |         // So just drop the body, without polling for the `0\r\n\r\n` end. | ||||||
|  |         drop(resp); | ||||||
|  |  | ||||||
|  |         // sleep real quick to let the threadpool put connection in ready | ||||||
|  |         // state and back into client pool | ||||||
|  |         tokio::time::delay_for(Duration::from_millis(50)).await; | ||||||
|  |  | ||||||
|  |         let rx = rx2.expect("thread panicked"); | ||||||
|  |         let req = Request::builder() | ||||||
|  |             .uri(&*format!("http://{}/b", addr)) | ||||||
|  |             .body(Body::empty()) | ||||||
|  |             .unwrap(); | ||||||
|  |         let fut = client.request(req); | ||||||
|  |         future::join(fut, rx).map(|r| r.0).await.unwrap(); | ||||||
|  |  | ||||||
|  |         assert_eq!( | ||||||
|  |             connects.load(Ordering::SeqCst), | ||||||
|  |             1, | ||||||
|  |             "second request should still only have 1 connect" | ||||||
|  |         ); | ||||||
|  |         drop(client); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     #[test] |     #[test] | ||||||
|     fn connect_proxy_sends_absolute_uri() { |     fn connect_proxy_sends_absolute_uri() { | ||||||
|         let _ = pretty_env_logger::try_init(); |         let _ = pretty_env_logger::try_init(); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user