add ability to synchronize in tests
- Adds `wait_for` that takes another future to signal the mock should continue. - Adds `yield_once` to allow one chain of futures to yield to the other.
This commit is contained in:
@@ -818,6 +818,9 @@ fn rst_while_closing() {
|
|||||||
let _ = ::env_logger::try_init();
|
let _ = ::env_logger::try_init();
|
||||||
let (io, srv) = mock::new();
|
let (io, srv) = mock::new();
|
||||||
|
|
||||||
|
// Rendevous when we've queued a trailers frame
|
||||||
|
let (tx, rx) = ::futures::sync::oneshot::channel();
|
||||||
|
|
||||||
let srv = srv.assert_client_handshake()
|
let srv = srv.assert_client_handshake()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.recv_settings()
|
.recv_settings()
|
||||||
@@ -830,7 +833,7 @@ fn rst_while_closing() {
|
|||||||
// Idling for a moment here is necessary to ensure that the client
|
// Idling for a moment here is necessary to ensure that the client
|
||||||
// enqueues its TRAILERS frame *before* we send the RST_STREAM frame
|
// enqueues its TRAILERS frame *before* we send the RST_STREAM frame
|
||||||
// which causes the panic.
|
// which causes the panic.
|
||||||
.idle_ms(1)
|
.wait_for(rx)
|
||||||
// Send the RST_STREAM frame which causes the client to panic.
|
// Send the RST_STREAM frame which causes the client to panic.
|
||||||
.send_frame(frames::reset(1).cancel())
|
.send_frame(frames::reset(1).cancel())
|
||||||
.ping_pong([1; 8])
|
.ping_pong([1; 8])
|
||||||
@@ -847,24 +850,29 @@ fn rst_while_closing() {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// The request should be left streaming.
|
// The request should be left streaming.
|
||||||
let (resp, mut stream) = client.send_request(request, false)
|
let (resp, stream) = client.send_request(request, false)
|
||||||
.expect("send_request");
|
.expect("send_request");
|
||||||
let req = resp
|
let req = resp
|
||||||
// on receipt of an EOS response from the server, transition
|
// on receipt of an EOS response from the server, transition
|
||||||
// the stream Open => Half Closed (remote).
|
// the stream Open => Half Closed (remote).
|
||||||
.expect("response")
|
.expect("response");
|
||||||
.and_then(move |resp| {
|
|
||||||
assert_eq!(resp.status(), StatusCode::OK);
|
|
||||||
// Enqueue trailers frame.
|
|
||||||
let _ = stream.send_trailers(HeaderMap::new());
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.map_err(|()| -> Error {
|
|
||||||
unreachable!()
|
|
||||||
});
|
|
||||||
|
|
||||||
conn.drive(req)
|
conn.drive(req)
|
||||||
.and_then(|(conn, _)| conn.expect("client"))
|
.map(move |(conn, resp)| {
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
(conn, stream)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.and_then(|(conn, mut stream)| {
|
||||||
|
// Enqueue trailers frame.
|
||||||
|
let _ = stream.send_trailers(HeaderMap::new());
|
||||||
|
// Signal the server mock to send RST_FRAME
|
||||||
|
let _ = tx.send(());
|
||||||
|
|
||||||
|
conn
|
||||||
|
// yield once to allow the server mock to be polled
|
||||||
|
// before the conn flushes its buffer
|
||||||
|
.yield_once()
|
||||||
|
.expect("client")
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -67,6 +67,26 @@ pub trait FutureExt: Future {
|
|||||||
future: other,
|
future: other,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wrap this future in one that will yield NotReady once before continuing.
|
||||||
|
///
|
||||||
|
/// This allows the executor to poll other futures before trying this one
|
||||||
|
/// again.
|
||||||
|
fn yield_once(self) -> Box<Future<Item = Self::Item, Error = Self::Error>>
|
||||||
|
where
|
||||||
|
Self: Future + Sized + 'static,
|
||||||
|
{
|
||||||
|
let mut ready = false;
|
||||||
|
Box::new(::futures::future::poll_fn(move || {
|
||||||
|
if ready {
|
||||||
|
Ok::<_, ()>(().into())
|
||||||
|
} else {
|
||||||
|
ready = true;
|
||||||
|
::futures::task::current().notify();
|
||||||
|
Ok(::futures::Async::NotReady)
|
||||||
|
}
|
||||||
|
}).then(|_| self))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Future> FutureExt for T {}
|
impl<T: Future> FutureExt for T {}
|
||||||
|
|||||||
@@ -477,6 +477,16 @@ pub trait HandleFutureExt {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn wait_for<F>(self, other: F) -> Box<Future<Item = Self::Item, Error = Self::Error>>
|
||||||
|
where
|
||||||
|
F: Future + 'static,
|
||||||
|
Self: Future + Sized + 'static
|
||||||
|
{
|
||||||
|
Box::new(self.then(move |result| {
|
||||||
|
other.then(move |_| result)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
fn close(self) -> Box<Future<Item = (), Error = ()>>
|
fn close(self) -> Box<Future<Item = (), Error = ()>>
|
||||||
where
|
where
|
||||||
Self: Future<Error = ()> + Sized + 'static,
|
Self: Future<Error = ()> + Sized + 'static,
|
||||||
|
|||||||
Reference in New Issue
Block a user