From b4383b6a8cbabf6b725c4a4698bcabfc0d830979 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 4 May 2018 14:11:40 -0700 Subject: [PATCH] Add more stream state tests (#271) --- tests/h2-support/src/mock.rs | 30 ++++++ tests/h2-tests/tests/stream_states.rs | 138 ++++++++++++++++++++++++++ 2 files changed, 168 insertions(+) diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index 1f0e98b..d1ae3d0 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -474,6 +474,36 @@ pub trait HandleFutureExt { } } + fn send_bytes(self, data: &[u8]) -> Box> + where + Self: Future + Sized + 'static, + Self::Error: fmt::Debug, + { + use bytes::Buf; + use futures::future::poll_fn; + use std::io::Cursor; + + let buf: Vec<_> = data.into(); + let mut buf = Cursor::new(buf); + + Box::new(self.and_then(move |handle| { + let mut handle = Some(handle); + + poll_fn(move || { + while buf.has_remaining() { + let res = handle.as_mut().unwrap() + .codec.get_mut() + .write_buf(&mut buf) + .map_err(|e| panic!("write err={:?}", e)); + + try_ready!(res); + } + + Ok(handle.take().unwrap().into()) + }) + })) + } + fn ping_pong(self, payload: [u8; 8]) -> RecvFrame< as IntoRecvFrame>::Future> where Self: Future + Sized + 'static, diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index 4cce496..5e1bfc8 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -945,3 +945,141 @@ fn rst_with_buffered_data() { client.join(srv).wait().expect("wait"); } + +#[test] +fn err_with_buffered_data() { + // Data is buffered in `FramedWrite` and the stream is reset locally before + // the data is fully flushed. Given that resetting a stream requires + // clearing all associated state for that stream, this test ensures that the + // buffered up frame is correctly handled. + let _ = ::env_logger::try_init(); + + // This allows the settings + headers frame through + let (io, srv) = mock::new_with_write_capacity(73); + + // Synchronize the client / server on response + let (tx, rx) = ::futures::sync::oneshot::channel(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("POST", "https://example.com/") + ) + .buffer_bytes(128) + .send_frame(frames::headers(1).response(204).eos()) + // Send invalid data + .send_bytes(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00") + .wait_for(rx) + .unbounded_bytes() + .recv_frame( + frames::data(1, vec![0; 16_384])) + .close() + ; + + // A large body + let body = vec![0; 2 * frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; + + let client = client::handshake(io) + .then(|res| { + let (mut client, conn) = res.unwrap(); + + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + + // Send the request + let (resp, mut stream) = client.send_request(request, false) + .expect("send_request"); + + // Send the data + stream.send_data(body.into(), true).unwrap(); + + conn.join(resp) + }) + .then(move |res| { + assert!(res.is_err()); + tx.send(()).unwrap(); + Ok(()) + }); + + + client.join(srv).wait().expect("wait"); +} + +#[test] +fn send_err_with_buffered_data() { + // Data is buffered in `FramedWrite` and the stream is reset locally before + // the data is fully flushed. Given that resetting a stream requires + // clearing all associated state for that stream, this test ensures that the + // buffered up frame is correctly handled. + let _ = ::env_logger::try_init(); + + // This allows the settings + headers frame through + let (io, srv) = mock::new_with_write_capacity(73); + + // Synchronize the client / server on response + let (tx, rx) = ::futures::sync::oneshot::channel(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("POST", "https://example.com/") + ) + .buffer_bytes(128) + .send_frame(frames::headers(1).response(204).eos()) + .wait_for(rx) + .unbounded_bytes() + .recv_frame( + frames::data(1, vec![0; 16_384])) + .recv_frame(frames::reset(1).cancel()) + .close() + ; + + // A large body + let body = vec![0; 2 * frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; + + let client = client::handshake(io) + .expect("handshake") + .and_then(|(mut client, mut conn)| { + let request = Request::builder() + .method(Method::POST) + .uri("https://example.com/") + .body(()) + .unwrap(); + + // Send the request + let (resp, mut stream) = client.send_request(request, false) + .expect("send_request"); + + // Send the data + stream.send_data(body.into(), true).unwrap(); + + // Hack to drive the connection, trying to flush data + ::futures::future::lazy(|| { + conn.poll().unwrap(); + Ok::<_, ()>(()) + }).wait().unwrap(); + + // Send a reset + stream.send_reset(Reason::CANCEL); + + conn.drive({ + resp.then(|_res| { + Ok::<_, ()>(()) + }) + }) + }) + .and_then(move |(conn, _)| { + tx.send(()).unwrap(); + conn.unwrap() + }); + + + client.join(srv).wait().expect("wait"); +}