From c88011da4ed5b5ca9107c4a2339a7ab054c5f27f Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 14 Sep 2021 16:18:28 -0700 Subject: [PATCH] fix(client): don't reuse a connection while still flushing A client connection that read a full response while the request body was still flushing would see incorrect behavior, since the pool would let it be checked out again for a new request. In debug builds, it would then panic, but in release builds it would intermix the 2nd request bytes with the body of the previous request. In practice, this only ever happens if a server replies with a full response before reading the full request, while also choosing to not close that connection. Most servers either wait for the full request, or close the connection after the new response is written, so as to stop reading. --- src/proto/h1/conn.rs | 7 ++++++- src/proto/h1/dispatch.rs | 29 ++++++++++++++++++++++++++++- src/proto/h1/io.rs | 16 +++++++++++++++- 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index c84689e0..887dee48 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -71,6 +71,11 @@ where self.io.set_flush_pipeline(enabled); } + #[cfg(test)] + pub(crate) fn set_write_strategy_queue(&mut self) { + self.io.set_write_strategy_queue(); + } + pub(crate) fn set_max_buf_size(&mut self, max: usize) { self.io.set_max_buf_size(max); } @@ -461,7 +466,7 @@ where } } match self.state.writing { - Writing::Init => true, + Writing::Init => self.io.can_headers_buf(), _ => false, } } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index d2c1428a..677131bf 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -665,7 +665,6 @@ mod tests { // Block at 0 for now, but we will release this response before // the request is ready to write later... - //let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0); let (mut tx, rx) = crate::client::dispatch::channel(); let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); let mut dispatcher = Dispatcher::new(Client::new(rx), conn); @@ -692,6 +691,34 @@ mod tests { }); } + #[tokio::test] + async fn client_flushing_is_not_ready_for_next_request() { + let _ = pretty_env_logger::try_init(); + + let (io, _handle) = tokio_test::io::Builder::new() + .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n") + .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n") + .wait(std::time::Duration::from_secs(2)) + .build_with_handle(); + + let (mut tx, rx) = crate::client::dispatch::channel(); + let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); + conn.set_write_strategy_queue(); + + let dispatcher = Dispatcher::new(Client::new(rx), conn); + let _dispatcher = tokio::spawn(async move { dispatcher.await }); + + let req = crate::Request::builder() + .method("POST") + .body(crate::Body::from("reee")) + .unwrap(); + + let res = tx.try_send(req).unwrap().await.expect("response"); + drop(res); + + assert!(!tx.is_ready()); + } + #[tokio::test] async fn body_empty_chunks_ignored() { let _ = pretty_env_logger::try_init(); diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index db4eece6..a7523001 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -98,13 +98,18 @@ where } #[cfg(feature = "server")] - pub(crate) fn set_write_strategy_flatten(&mut self) { + fn set_write_strategy_flatten(&mut self) { // this should always be called only at construction time, // so this assert is here to catch myself debug_assert!(self.write_buf.queue.bufs_cnt() == 0); self.write_buf.set_strategy(WriteStrategy::Flatten); } + #[cfg(test)] + pub(crate) fn set_write_strategy_queue(&mut self) { + self.write_buf.set_strategy(WriteStrategy::Queue); + } + pub(crate) fn read_buf(&self) -> &[u8] { self.read_buf.as_ref() } @@ -121,6 +126,15 @@ where self.read_buf.capacity() - self.read_buf.len() } + /// Return whether we can append to the headers buffer. + /// + /// Reasons we can't: + /// - The write buf is in queue mode, and some of the past body is still + /// needing to be flushed. + pub(crate) fn can_headers_buf(&self) -> bool { + !self.write_buf.queue.has_remaining() + } + pub(crate) fn headers_buf(&mut self) -> &mut Vec { let buf = self.write_buf.headers_mut(); &mut buf.bytes