Add more stream state tests (#271)
This commit is contained in:
@@ -474,6 +474,36 @@ pub trait HandleFutureExt {
|
||||
}
|
||||
}
|
||||
|
||||
fn send_bytes(self, data: &[u8]) -> Box<Future<Item = Handle, Error = Self::Error>>
|
||||
where
|
||||
Self: Future<Item = Handle> + Sized + 'static,
|
||||
Self::Error: fmt::Debug,
|
||||
{
|
||||
use bytes::Buf;
|
||||
use futures::future::poll_fn;
|
||||
use std::io::Cursor;
|
||||
|
||||
let buf: Vec<_> = data.into();
|
||||
let mut buf = Cursor::new(buf);
|
||||
|
||||
Box::new(self.and_then(move |handle| {
|
||||
let mut handle = Some(handle);
|
||||
|
||||
poll_fn(move || {
|
||||
while buf.has_remaining() {
|
||||
let res = handle.as_mut().unwrap()
|
||||
.codec.get_mut()
|
||||
.write_buf(&mut buf)
|
||||
.map_err(|e| panic!("write err={:?}", e));
|
||||
|
||||
try_ready!(res);
|
||||
}
|
||||
|
||||
Ok(handle.take().unwrap().into())
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
fn ping_pong(self, payload: [u8; 8]) -> RecvFrame<<SendFrameFut<Self> as IntoRecvFrame>::Future>
|
||||
where
|
||||
Self: Future<Item=Handle> + Sized + 'static,
|
||||
|
||||
@@ -945,3 +945,141 @@ fn rst_with_buffered_data() {
|
||||
|
||||
client.join(srv).wait().expect("wait");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn err_with_buffered_data() {
|
||||
// Data is buffered in `FramedWrite` and the stream is reset locally before
|
||||
// the data is fully flushed. Given that resetting a stream requires
|
||||
// clearing all associated state for that stream, this test ensures that the
|
||||
// buffered up frame is correctly handled.
|
||||
let _ = ::env_logger::try_init();
|
||||
|
||||
// This allows the settings + headers frame through
|
||||
let (io, srv) = mock::new_with_write_capacity(73);
|
||||
|
||||
// Synchronize the client / server on response
|
||||
let (tx, rx) = ::futures::sync::oneshot::channel();
|
||||
|
||||
let srv = srv.assert_client_handshake()
|
||||
.unwrap()
|
||||
.recv_settings()
|
||||
.recv_frame(
|
||||
frames::headers(1)
|
||||
.request("POST", "https://example.com/")
|
||||
)
|
||||
.buffer_bytes(128)
|
||||
.send_frame(frames::headers(1).response(204).eos())
|
||||
// Send invalid data
|
||||
.send_bytes(b"\x00\x00\x00\x00\x00\x00\x00\x00\x00")
|
||||
.wait_for(rx)
|
||||
.unbounded_bytes()
|
||||
.recv_frame(
|
||||
frames::data(1, vec![0; 16_384]))
|
||||
.close()
|
||||
;
|
||||
|
||||
// A large body
|
||||
let body = vec![0; 2 * frame::DEFAULT_INITIAL_WINDOW_SIZE as usize];
|
||||
|
||||
let client = client::handshake(io)
|
||||
.then(|res| {
|
||||
let (mut client, conn) = res.unwrap();
|
||||
|
||||
let request = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("https://example.com/")
|
||||
.body(())
|
||||
.unwrap();
|
||||
|
||||
// Send the request
|
||||
let (resp, mut stream) = client.send_request(request, false)
|
||||
.expect("send_request");
|
||||
|
||||
// Send the data
|
||||
stream.send_data(body.into(), true).unwrap();
|
||||
|
||||
conn.join(resp)
|
||||
})
|
||||
.then(move |res| {
|
||||
assert!(res.is_err());
|
||||
tx.send(()).unwrap();
|
||||
Ok(())
|
||||
});
|
||||
|
||||
|
||||
client.join(srv).wait().expect("wait");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn send_err_with_buffered_data() {
|
||||
// Data is buffered in `FramedWrite` and the stream is reset locally before
|
||||
// the data is fully flushed. Given that resetting a stream requires
|
||||
// clearing all associated state for that stream, this test ensures that the
|
||||
// buffered up frame is correctly handled.
|
||||
let _ = ::env_logger::try_init();
|
||||
|
||||
// This allows the settings + headers frame through
|
||||
let (io, srv) = mock::new_with_write_capacity(73);
|
||||
|
||||
// Synchronize the client / server on response
|
||||
let (tx, rx) = ::futures::sync::oneshot::channel();
|
||||
|
||||
let srv = srv.assert_client_handshake()
|
||||
.unwrap()
|
||||
.recv_settings()
|
||||
.recv_frame(
|
||||
frames::headers(1)
|
||||
.request("POST", "https://example.com/")
|
||||
)
|
||||
.buffer_bytes(128)
|
||||
.send_frame(frames::headers(1).response(204).eos())
|
||||
.wait_for(rx)
|
||||
.unbounded_bytes()
|
||||
.recv_frame(
|
||||
frames::data(1, vec![0; 16_384]))
|
||||
.recv_frame(frames::reset(1).cancel())
|
||||
.close()
|
||||
;
|
||||
|
||||
// A large body
|
||||
let body = vec![0; 2 * frame::DEFAULT_INITIAL_WINDOW_SIZE as usize];
|
||||
|
||||
let client = client::handshake(io)
|
||||
.expect("handshake")
|
||||
.and_then(|(mut client, mut conn)| {
|
||||
let request = Request::builder()
|
||||
.method(Method::POST)
|
||||
.uri("https://example.com/")
|
||||
.body(())
|
||||
.unwrap();
|
||||
|
||||
// Send the request
|
||||
let (resp, mut stream) = client.send_request(request, false)
|
||||
.expect("send_request");
|
||||
|
||||
// Send the data
|
||||
stream.send_data(body.into(), true).unwrap();
|
||||
|
||||
// Hack to drive the connection, trying to flush data
|
||||
::futures::future::lazy(|| {
|
||||
conn.poll().unwrap();
|
||||
Ok::<_, ()>(())
|
||||
}).wait().unwrap();
|
||||
|
||||
// Send a reset
|
||||
stream.send_reset(Reason::CANCEL);
|
||||
|
||||
conn.drive({
|
||||
resp.then(|_res| {
|
||||
Ok::<_, ()>(())
|
||||
})
|
||||
})
|
||||
})
|
||||
.and_then(move |(conn, _)| {
|
||||
tx.send(()).unwrap();
|
||||
conn.unwrap()
|
||||
});
|
||||
|
||||
|
||||
client.join(srv).wait().expect("wait");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user