diff --git a/src/frame/ping.rs b/src/frame/ping.rs index 83ae18a..af4742a 100644 --- a/src/frame/ping.rs +++ b/src/frame/ping.rs @@ -13,10 +13,10 @@ pub struct Ping { impl Ping { #[cfg(feature = "unstable")] - pub fn new() -> Ping { + pub fn new(payload: Payload) -> Ping { Ping { ack: false, - payload: Payload::default(), + payload, } } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index ea32135..d7bce19 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -30,7 +30,7 @@ where codec: Codec>, /// Ping/pong handler - ping_pong: PingPong>, + ping_pong: PingPong, /// Connection settings settings: Settings, diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index c49f047..21ce188 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,27 +1,22 @@ +use codec::Codec; use frame::Ping; -use proto::*; +use proto::PingPayload; +use bytes::Buf; +use futures::{Async, Poll}; use std::io; +use tokio_io::AsyncWrite; /// Acknowledges ping requests from the remote. #[derive(Debug)] -pub struct PingPong { - // TODO: this doesn't need to save the entire frame - sending_pong: Option>, - received_pong: Option, - // TODO: factor this out - blocked_ping: Option, +pub struct PingPong { + sending_pong: Option, } -impl PingPong -where - B: Buf, -{ +impl PingPong { pub fn new() -> Self { PingPong { sending_pong: None, - received_pong: None, - blocked_ping: None, } } @@ -31,24 +26,17 @@ where // calling `recv_ping`. assert!(self.sending_pong.is_none()); - if ping.is_ack() { - // Save acknowledgements to be returned from take_pong(). - self.received_pong = Some(ping.into_payload()); - - if let Some(task) = self.blocked_ping.take() { - task.notify(); - } - } else { + if !ping.is_ack() { // Save the ping's payload to be sent as an acknowledgement. - let pong = Ping::pong(ping.into_payload()); - self.sending_pong = Some(pong.into()); + self.sending_pong = Some(ping.into_payload()); } } /// Send any pending pongs. - pub fn send_pending_pong(&mut self, dst: &mut Codec) -> Poll<(), io::Error> + pub fn send_pending_pong(&mut self, dst: &mut Codec) -> Poll<(), io::Error> where T: AsyncWrite, + B: Buf, { if let Some(pong) = self.sending_pong.take() { if !dst.poll_ready()?.is_ready() { @@ -56,7 +44,7 @@ where return Ok(Async::NotReady); } - dst.buffer(pong).ok().expect("invalid pong frame"); + dst.buffer(Ping::pong(pong).into()).ok().expect("invalid pong frame"); } Ok(Async::Ready(())) diff --git a/tests/ping_pong.rs b/tests/ping_pong.rs index cf4869f..94df301 100644 --- a/tests/ping_pong.rs +++ b/tests/ping_pong.rs @@ -15,7 +15,7 @@ fn recv_single_ping() { let mock = mock.assert_client_handshake() .unwrap() .and_then(|(_, mut mock)| { - let frame = frame::Ping::new(); + let frame = frame::Ping::new(Default::default()); mock.send(frame.into()).unwrap(); mock.into_future().unwrap() @@ -34,3 +34,73 @@ fn recv_single_ping() { let _ = h2.join(mock).wait().unwrap(); } + +#[test] +fn recv_multiple_pings() { + let _ = ::env_logger::init(); + let (io, client) = mock::new(); + + let client = client.assert_server_handshake() + .expect("client handshake") + .recv_settings() + .send_frame(frames::ping([1; 8])) + .send_frame(frames::ping([2; 8])) + .recv_frame(frames::ping([1; 8]).pong()) + .recv_frame(frames::ping([2; 8]).pong()) + .close(); + + let srv = Server::handshake(io) + .expect("handshake") + .and_then(|srv| { + // future of first request, which never comes + srv.into_future().unwrap() + }); + + srv.join(client).wait().expect("wait"); +} + +#[test] +fn pong_has_highest_priority() { + let _ = ::env_logger::init(); + let (io, client) = mock::new(); + + let data = Bytes::from(vec![0; 16_384]); + + let client = client.assert_server_handshake() + .expect("client handshake") + .recv_settings() + .send_frame( + frames::headers(1) + .request("POST", "https://http2.akamai.com/") + ) + .send_frame(frames::data(1, data.clone()).eos()) + .send_frame(frames::ping([1; 8])) + .recv_frame(frames::ping([1; 8]).pong()) + .recv_frame(frames::headers(1).response(200).eos()) + .close(); + + let srv = Server::handshake(io) + .expect("handshake") + .and_then(|srv| { + // future of first request + srv.into_future().unwrap() + }).and_then(move |(reqstream, srv)| { + let (req, mut stream) = reqstream.expect("request"); + assert_eq!(req.method(), "POST"); + let body = req.into_parts().1; + + body.concat2() + .expect("body") + .and_then(move |body| { + assert_eq!(body.len(), data.len()); + let res = Response::builder() + .status(200) + .body(()) + .unwrap(); + stream.send_response(res, true).expect("response"); + srv.into_future().unwrap() + }) + }); + + srv.join(client).wait().expect("wait"); +} diff --git a/tests/support/frames.rs b/tests/support/frames.rs index ec9c9f1..5a0aa9f 100644 --- a/tests/support/frames.rs +++ b/tests/support/frames.rs @@ -68,6 +68,10 @@ pub fn settings() -> Mock { Mock(frame::Settings::default()) } +pub fn ping(payload: [u8; 8]) -> Mock { + Mock(frame::Ping::new(payload)) +} + // === Generic helpers of all frame types pub struct Mock(T); @@ -263,6 +267,21 @@ impl From> for SendFrame { } } +// ==== Ping helpers + +impl Mock { + pub fn pong(self) -> Self { + let payload = self.0.into_payload(); + Mock(frame::Ping::pong(payload)) + } +} + +impl From> for SendFrame { + fn from(src: Mock) -> Self { + Frame::Ping(src.0) + } +} + // ==== "trait alias" for types that are HttpTryFrom and have Debug Errors ==== pub trait HttpTryInto {