diff --git a/tests/stream_states.rs b/tests/stream_states.rs index d7ed610..c328a2a 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -818,6 +818,9 @@ fn rst_while_closing() { let _ = ::env_logger::try_init(); let (io, srv) = mock::new(); + // Rendevous when we've queued a trailers frame + let (tx, rx) = ::futures::sync::oneshot::channel(); + let srv = srv.assert_client_handshake() .unwrap() .recv_settings() @@ -830,7 +833,7 @@ fn rst_while_closing() { // Idling for a moment here is necessary to ensure that the client // enqueues its TRAILERS frame *before* we send the RST_STREAM frame // which causes the panic. - .idle_ms(1) + .wait_for(rx) // Send the RST_STREAM frame which causes the client to panic. .send_frame(frames::reset(1).cancel()) .ping_pong([1; 8]) @@ -847,24 +850,29 @@ fn rst_while_closing() { .unwrap(); // The request should be left streaming. - let (resp, mut stream) = client.send_request(request, false) + let (resp, stream) = client.send_request(request, false) .expect("send_request"); let req = resp // on receipt of an EOS response from the server, transition // the stream Open => Half Closed (remote). - .expect("response") - .and_then(move |resp| { - assert_eq!(resp.status(), StatusCode::OK); - // Enqueue trailers frame. - let _ = stream.send_trailers(HeaderMap::new()); - Ok(()) - }) - .map_err(|()| -> Error { - unreachable!() - }); - + .expect("response"); conn.drive(req) - .and_then(|(conn, _)| conn.expect("client")) + .map(move |(conn, resp)| { + assert_eq!(resp.status(), StatusCode::OK); + (conn, stream) + }) + }) + .and_then(|(conn, mut stream)| { + // Enqueue trailers frame. + let _ = stream.send_trailers(HeaderMap::new()); + // Signal the server mock to send RST_FRAME + let _ = tx.send(()); + + conn + // yield once to allow the server mock to be polled + // before the conn flushes its buffer + .yield_once() + .expect("client") }); diff --git a/tests/support/future_ext.rs b/tests/support/future_ext.rs index 2419ead..afa216f 100644 --- a/tests/support/future_ext.rs +++ b/tests/support/future_ext.rs @@ -67,6 +67,26 @@ pub trait FutureExt: Future { future: other, } } + + /// Wrap this future in one that will yield NotReady once before continuing. + /// + /// This allows the executor to poll other futures before trying this one + /// again. + fn yield_once(self) -> Box> + where + Self: Future + Sized + 'static, + { + let mut ready = false; + Box::new(::futures::future::poll_fn(move || { + if ready { + Ok::<_, ()>(().into()) + } else { + ready = true; + ::futures::task::current().notify(); + Ok(::futures::Async::NotReady) + } + }).then(|_| self)) + } } impl FutureExt for T {} diff --git a/tests/support/mock.rs b/tests/support/mock.rs index d12dd4f..b510e89 100644 --- a/tests/support/mock.rs +++ b/tests/support/mock.rs @@ -477,6 +477,16 @@ pub trait HandleFutureExt { })) } + fn wait_for(self, other: F) -> Box> + where + F: Future + 'static, + Self: Future + Sized + 'static + { + Box::new(self.then(move |result| { + other.then(move |_| result) + })) + } + fn close(self) -> Box> where Self: Future + Sized + 'static,