diff --git a/src/client.rs b/src/client.rs index df8f135..26f977a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -156,7 +156,7 @@ //! [`Builder`]: struct.Builder.html //! [`Error`]: ../struct.Error.html -use {SendStream, RecvStream, ReleaseCapacity}; +use {SendStream, RecvStream, ReleaseCapacity, PingPong}; use codec::{Codec, RecvError, SendError, UserError}; use frame::{Headers, Pseudo, Reason, Settings, StreamId}; use proto; @@ -319,31 +319,13 @@ pub struct PushPromise { response: PushedResponseFuture, } -#[derive(Debug)] /// A stream of pushed responses and corresponding promised requests +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] pub struct PushPromises { inner: proto::OpaqueStreamRef, } -impl Stream for PushPromises { - type Item = PushPromise; - type Error = ::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - match try_ready!(self.inner.poll_pushed()) { - Some((request, response)) => { - let response = PushedResponseFuture { - inner: ResponseFuture { - inner: response, push_promise_consumed: false - } - }; - Ok(Async::Ready(Some(PushPromise{request, response}))) - } - None => Ok(Async::Ready(None)), - } - } -} - /// Builds client connections with custom configuration values. /// /// Methods can be chained in order to set the configuration values. @@ -1282,6 +1264,17 @@ where assert!(size <= proto::MAX_WINDOW_SIZE); self.inner.set_target_window_size(size); } + + /// Takes a `PingPong` instance from the connection. + /// + /// # Note + /// + /// This may only be called once. Calling multiple times will return `None`. + pub fn ping_pong(&mut self) -> Option { + self.inner + .take_user_pings() + .map(PingPong::new) + } } impl Future for Connection @@ -1416,6 +1409,27 @@ impl ResponseFuture { } } +// ===== impl PushPromises ===== + +impl Stream for PushPromises { + type Item = PushPromise; + type Error = ::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match try_ready!(self.inner.poll_pushed()) { + Some((request, response)) => { + let response = PushedResponseFuture { + inner: ResponseFuture { + inner: response, push_promise_consumed: false + } + }; + Ok(Async::Ready(Some(PushPromise{request, response}))) + } + None => Ok(Async::Ready(None)), + } + } +} + // ===== impl PushPromise ===== impl PushPromise { diff --git a/src/codec/error.rs b/src/codec/error.rs index b979ae2..45b4251 100644 --- a/src/codec/error.rs +++ b/src/codec/error.rs @@ -54,6 +54,9 @@ pub enum UserError { /// Calls `SendResponse::poll_reset` after having called `send_response`. PollResetAfterSendResponse, + + /// Calls `PingPong::send_ping` before receiving a pong. + SendPingWhilePending, } // ===== impl RecvError ===== @@ -134,6 +137,7 @@ impl error::Error for UserError { MalformedHeaders => "malformed headers", MissingUriSchemeAndAuthority => "request URI missing scheme and authority", PollResetAfterSendResponse => "poll_reset after send_response is illegal", + SendPingWhilePending => "send_ping before received previous pong", } } } diff --git a/src/frame/ping.rs b/src/frame/ping.rs index 7f2c1ba..e183819 100644 --- a/src/frame/ping.rs +++ b/src/frame/ping.rs @@ -14,6 +14,7 @@ pub struct Ping { // This was just 8 randomly generated bytes. We use something besides just // zeroes to distinguish this specific PING from any other. const SHUTDOWN_PAYLOAD: Payload = [0x0b, 0x7b, 0xa2, 0xf0, 0x8b, 0x9b, 0xfe, 0x54]; +const USER_PAYLOAD: Payload = [0x3b, 0x7c, 0xdb, 0x7a, 0x0b, 0x87, 0x16, 0xb4]; impl Ping { @@ -23,6 +24,12 @@ impl Ping { #[cfg(not(feature = "unstable"))] pub(crate) const SHUTDOWN: Payload = SHUTDOWN_PAYLOAD; + #[cfg(feature = "unstable")] + pub const USER: Payload = USER_PAYLOAD; + + #[cfg(not(feature = "unstable"))] + pub(crate) const USER: Payload = USER_PAYLOAD; + pub fn new(payload: Payload) -> Ping { Ping { ack: false, diff --git a/src/lib.rs b/src/lib.rs index b56c1be..1ed6de3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,7 +129,7 @@ pub mod server; mod share; pub use error::{Error, Reason}; -pub use share::{SendStream, StreamId, RecvStream, ReleaseCapacity}; +pub use share::{SendStream, StreamId, RecvStream, ReleaseCapacity, PingPong, Ping, Pong}; #[cfg(feature = "unstable")] pub use codec::{Codec, RecvError, SendError, UserError}; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index fb1e83e..87ac4a0 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -178,6 +178,10 @@ where } } + pub(crate) fn take_user_pings(&mut self) -> Option { + self.ping_pong.take_user_pings() + } + /// Advances the internal state of the connection. pub fn poll(&mut self) -> Poll<(), proto::Error> { use codec::RecvError::*; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index f9e815d..2ace6f7 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -9,6 +9,7 @@ mod streams; pub(crate) use self::connection::{Config, Connection}; pub(crate) use self::error::Error; pub(crate) use self::peer::{Peer, Dyn as DynPeer}; +pub(crate) use self::ping_pong::UserPings; pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams}; pub(crate) use self::streams::{PollReset, Prioritized, Open}; diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index 8cce222..46f9d83 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,17 +1,36 @@ use codec::Codec; use frame::Ping; -use proto::PingPayload; +use proto::{self, PingPayload}; use bytes::Buf; use futures::{Async, Poll}; +use futures::task::AtomicTask; use std::io; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use tokio_io::AsyncWrite; /// Acknowledges ping requests from the remote. #[derive(Debug)] -pub struct PingPong { +pub(crate) struct PingPong { pending_ping: Option, pending_pong: Option, + user_pings: Option, +} + +#[derive(Debug)] +pub(crate) struct UserPings(Arc); + +#[derive(Debug)] +struct UserPingsRx(Arc); + +#[derive(Debug)] +struct UserPingsInner { + state: AtomicUsize, + /// Task to wake up the main `Connection`. + ping_task: AtomicTask, + /// Task to wake up `share::PingPong::poll_pong`. + pong_task: AtomicTask, } #[derive(Debug)] @@ -28,15 +47,44 @@ pub(crate) enum ReceivedPing { Shutdown, } +/// No user ping pending. +const USER_STATE_EMPTY: usize = 0; +/// User has called `send_ping`, but PING hasn't been written yet. +const USER_STATE_PENDING_PING: usize = 1; +/// User PING has been written, waiting for PONG. +const USER_STATE_PENDING_PONG: usize = 2; +/// We've received user PONG, waiting for user to `poll_pong`. +const USER_STATE_RECEIVED_PONG: usize = 3; +/// The connection is closed. +const USER_STATE_CLOSED: usize = 4; + +// ===== impl PingPong ===== + impl PingPong { - pub fn new() -> Self { + pub(crate) fn new() -> Self { PingPong { pending_ping: None, pending_pong: None, + user_pings: None, } } - pub fn ping_shutdown(&mut self) { + /// Can only be called once. If called a second time, returns `None`. + pub(crate) fn take_user_pings(&mut self) -> Option { + if self.user_pings.is_some() { + return None; + } + + let user_pings = Arc::new(UserPingsInner { + state: AtomicUsize::new(USER_STATE_EMPTY), + ping_task: AtomicTask::new(), + pong_task: AtomicTask::new(), + }); + self.user_pings = Some(UserPingsRx(user_pings.clone())); + Some(UserPings(user_pings)) + } + + pub(crate) fn ping_shutdown(&mut self) { assert!(self.pending_ping.is_none()); self.pending_ping = Some(PendingPing { @@ -54,7 +102,12 @@ impl PingPong { if ping.is_ack() { if let Some(pending) = self.pending_ping.take() { if &pending.payload == ping.payload() { - trace!("recv PING ack"); + assert_eq!( + &pending.payload, + &Ping::SHUTDOWN, + "pending_ping should be for shutdown", + ); + trace!("recv PING SHUTDOWN ack"); return ReceivedPing::Shutdown; } @@ -62,6 +115,13 @@ impl PingPong { self.pending_ping = Some(pending); } + if let Some(ref users) = self.user_pings { + if ping.payload() == &Ping::USER && users.receive_pong() { + trace!("recv PING USER ack"); + return ReceivedPing::Unknown; + } + } + // else we were acked a ping we didn't send? // The spec doesn't require us to do anything about this, // so for resiliency, just ignore it for now. @@ -75,7 +135,7 @@ impl PingPong { } /// Send any pending pongs. - pub fn send_pending_pong(&mut self, dst: &mut Codec) -> Poll<(), io::Error> + pub(crate) fn send_pending_pong(&mut self, dst: &mut Codec) -> Poll<(), io::Error> where T: AsyncWrite, B: Buf, @@ -94,7 +154,7 @@ impl PingPong { } /// Send any pending pings. - pub fn send_pending_ping(&mut self, dst: &mut Codec) -> Poll<(), io::Error> + pub(crate) fn send_pending_ping(&mut self, dst: &mut Codec) -> Poll<(), io::Error> where T: AsyncWrite, B: Buf, @@ -109,6 +169,18 @@ impl PingPong { .expect("invalid ping frame"); ping.sent = true; } + } else if let Some(ref users) = self.user_pings { + if users.0.state.load(Ordering::Acquire) == USER_STATE_PENDING_PING { + if !dst.poll_ready()?.is_ready() { + return Ok(Async::NotReady); + } + + dst.buffer(Ping::new(Ping::USER).into()) + .expect("invalid ping frame"); + users.0.state.store(USER_STATE_PENDING_PONG, Ordering::Release); + } else { + users.0.ping_task.register(); + } } Ok(Async::Ready(())) @@ -116,10 +188,83 @@ impl PingPong { } impl ReceivedPing { - pub fn is_shutdown(&self) -> bool { + pub(crate) fn is_shutdown(&self) -> bool { match *self { ReceivedPing::Shutdown => true, _ => false, } } } + +// ===== impl UserPings ===== + +impl UserPings { + pub(crate) fn send_ping(&self) -> Result<(), Option> { + let prev = self.0.state.compare_and_swap( + USER_STATE_EMPTY, // current + USER_STATE_PENDING_PING, // new + Ordering::AcqRel, + ); + + match prev { + USER_STATE_EMPTY => { + self.0.ping_task.notify(); + Ok(()) + }, + USER_STATE_CLOSED => { + Err(Some(broken_pipe().into())) + } + _ => { + // Was already pending, user error! + Err(None) + } + } + } + + pub(crate) fn poll_pong(&self) -> Poll<(), proto::Error> { + // Must register before checking state, in case state were to change + // before we could register, and then the ping would just be lost. + self.0.pong_task.register(); + let prev = self.0.state.compare_and_swap( + USER_STATE_RECEIVED_PONG, // current + USER_STATE_EMPTY, // new + Ordering::AcqRel, + ); + + match prev { + USER_STATE_RECEIVED_PONG => Ok(Async::Ready(())), + USER_STATE_CLOSED => Err(broken_pipe().into()), + _ => Ok(Async::NotReady), + } + } +} + +// ===== impl UserPingsRx ===== + +impl UserPingsRx { + fn receive_pong(&self) -> bool { + let prev = self.0.state.compare_and_swap( + USER_STATE_PENDING_PONG, // current + USER_STATE_RECEIVED_PONG, // new + Ordering::AcqRel, + ); + + if prev == USER_STATE_PENDING_PONG { + self.0.pong_task.notify(); + true + } else { + false + } + } +} + +impl Drop for UserPingsRx { + fn drop(&mut self) { + self.0.state.store(USER_STATE_CLOSED, Ordering::Release); + self.0.pong_task.notify(); + } +} + +fn broken_pipe() -> io::Error { + io::ErrorKind::BrokenPipe.into() +} diff --git a/src/server.rs b/src/server.rs index 6072db5..32eb4d1 100644 --- a/src/server.rs +++ b/src/server.rs @@ -129,7 +129,7 @@ //! [`SendStream`]: ../struct.SendStream.html //! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html -use {SendStream, RecvStream, ReleaseCapacity}; +use {SendStream, RecvStream, ReleaseCapacity, PingPong}; use codec::{Codec, RecvError}; use frame::{self, Pseudo, Reason, Settings, StreamId}; use proto::{self, Config, Prioritized}; @@ -459,6 +459,17 @@ where pub fn graceful_shutdown(&mut self) { self.connection.go_away_gracefully(); } + + /// Takes a `PingPong` instance from the connection. + /// + /// # Note + /// + /// This may only be called once. Calling multiple times will return `None`. + pub fn ping_pong(&mut self) -> Option { + self.connection + .take_user_pings() + .map(PingPong::new) + } } impl futures::Stream for Connection diff --git a/src/share.rs b/src/share.rs index a42d5e1..95b38f7 100644 --- a/src/share.rs +++ b/src/share.rs @@ -193,6 +193,27 @@ pub struct ReleaseCapacity { inner: proto::OpaqueStreamRef, } +/// A handle to send and receive PING frames with the peer. +// NOT Clone on purpose +pub struct PingPong { + inner: proto::UserPings, +} + +/// Sent via [`PingPong`][] to send a PING frame to a peer. +/// +/// [`PingPong`]: struct.PingPong.html +pub struct Ping { + _p: (), +} + +/// Received via [`PingPong`][] when a peer acknowledges a [`Ping`][]. +/// +/// [`PingPong`]: struct.PingPong.html +/// [`Ping`]: struct.Ping.html +pub struct Pong { + _p: (), +} + // ===== impl SendStream ===== impl SendStream { @@ -477,3 +498,112 @@ impl Clone for ReleaseCapacity { ReleaseCapacity { inner } } } + +// ===== impl PingPong ===== + +impl PingPong { + pub(crate) fn new(inner: proto::UserPings) -> Self { + PingPong { + inner, + } + } + + /// Send a `PING` frame to the peer. + /// + /// Only one ping can be pending at a time, so trying to send while + /// a pong has not be received means this will return a user error. + /// + /// # Example + /// + /// ``` + /// # fn doc(mut ping_pong: h2::PingPong) { + /// // let mut ping_pong = ... + /// ping_pong + /// .send_ping(h2::Ping::opaque()) + /// .unwrap(); + /// # } + /// ``` + pub fn send_ping(&mut self, ping: Ping) -> Result<(), ::Error> { + // Passing a `Ping` here is just to be forwards-compatible with + // eventually allowing choosing a ping payload. For now, we can + // just drop it. + drop(ping); + + self.inner + .send_ping() + .map_err(|err| match err { + Some(err) => err.into(), + None => UserError::SendPingWhilePending.into() + }) + } + + /// Polls for the acknowledgement of a previously [sent][] `PING` frame. + /// + /// # Example + /// + /// ``` + /// # extern crate futures; + /// # extern crate h2; + /// # use futures::Future; + /// # fn doc(mut ping_pong: h2::PingPong) { + /// // let mut ping_pong = ... + /// + /// // First, send a PING. + /// ping_pong + /// .send_ping(h2::Ping::opaque()) + /// .unwrap(); + /// + /// // And then wait for the PONG. + /// futures::future::poll_fn(move || { + /// ping_pong.poll_pong() + /// }).wait().unwrap(); + /// # } + /// # fn main() {} + /// ``` + /// + /// [sent]: struct.PingPong.html#method.send_ping + pub fn poll_pong(&mut self) -> Poll { + try_ready!(self.inner.poll_pong()); + Ok(Async::Ready(Pong { + _p: (), + })) + } +} + +impl fmt::Debug for PingPong { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("PingPong") + .finish() + } +} + +// ===== impl Ping ===== + +impl Ping { + /// Creates a new opaque `Ping` to be sent via a [`PingPong`][]. + /// + /// The payload is "opaque", such that it shouldn't be depended on. + /// + /// [`PingPong`]: struct.PingPong.html + pub fn opaque() -> Ping { + Ping { + _p: (), + } + } +} + +impl fmt::Debug for Ping { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Ping") + .finish() + } +} + +// ===== impl Pong ===== + +impl fmt::Debug for Pong { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Pong") + .finish() + } +} diff --git a/tests/h2-support/src/prelude.rs b/tests/h2-support/src/prelude.rs index 5204009..39967d3 100644 --- a/tests/h2-support/src/prelude.rs +++ b/tests/h2-support/src/prelude.rs @@ -38,6 +38,7 @@ pub use self::bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf}; pub use tokio_io::{AsyncRead, AsyncWrite}; +pub use std::thread; pub use std::time::Duration; // ===== Everything under here shouldn't be used ===== diff --git a/tests/h2-support/src/util.rs b/tests/h2-support/src/util.rs index 430dc91..afa52d9 100644 --- a/tests/h2-support/src/util.rs +++ b/tests/h2-support/src/util.rs @@ -8,6 +8,19 @@ pub fn byte_str(s: &str) -> String { String::try_from(Bytes::from(s)).unwrap() } +pub fn yield_once() -> impl Future { + let mut yielded = false; + futures::future::poll_fn(move || { + if yielded { + Ok(Async::Ready(())) + } else { + yielded = true; + futures::task::current().notify(); + Ok(Async::NotReady) + } + }) +} + pub fn wait_for_capacity(stream: h2::SendStream, target: usize) -> WaitForCapacity { WaitForCapacity { stream: Some(stream), diff --git a/tests/h2-tests/tests/ping_pong.rs b/tests/h2-tests/tests/ping_pong.rs index b726790..cdda1cd 100644 --- a/tests/h2-tests/tests/ping_pong.rs +++ b/tests/h2-tests/tests/ping_pong.rs @@ -108,3 +108,100 @@ fn pong_has_highest_priority() { srv.join(client).wait().expect("wait"); } + +#[test] +fn user_ping_pong() { + let _ = ::env_logger::try_init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .expect("srv handshake") + .recv_settings() + .recv_frame(frames::ping(frame::Ping::USER)) + .send_frame(frames::ping(frame::Ping::USER).pong()) + .recv_frame(frames::go_away(0)) + .recv_eof(); + + let client = client::handshake(io) + .expect("client handshake") + .and_then(|(client, conn)| { + // yield once so we can ack server settings + conn + .drive(util::yield_once()) + .map(move |(conn, ())| (client, conn)) + }) + .and_then(|(client, mut conn)| { + // `ping_pong()` method conflict with mock future ext trait. + let mut ping_pong = client::Connection::ping_pong(&mut conn) + .expect("taking ping_pong"); + ping_pong + .send_ping(Ping::opaque()) + .expect("send ping"); + + // multiple pings results in a user error... + assert_eq!( + ping_pong.send_ping(Ping::opaque()).expect_err("ping 2").to_string(), + "user error: send_ping before received previous pong", + "send_ping while ping pending is a user error", + ); + + conn + .drive(futures::future::poll_fn(move || { + ping_pong.poll_pong() + })) + .and_then(move |(conn, _pong)| { + drop(client); + conn.expect("client") + }) + }); + + client.join(srv).wait().expect("wait"); +} + +#[test] +fn user_notifies_when_connection_closes() { + let _ = ::env_logger::try_init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .expect("srv handshake") + .recv_settings(); + + let client = client::handshake(io) + .expect("client handshake") + .and_then(|(client, conn)| { + // yield once so we can ack server settings + conn + .drive(util::yield_once()) + .map(move |(conn, ())| (client, conn)) + }) + .map(|(_client, conn)| conn); + + let (mut client, srv) = client.join(srv).wait().expect("wait"); + + // `ping_pong()` method conflict with mock future ext trait. + let mut ping_pong = client::Connection::ping_pong(&mut client) + .expect("taking ping_pong"); + + // Spawn a thread so we can park a task waiting on `poll_pong`, and then + // drop the client and be sure the parked task is notified... + let t = thread::spawn(move || { + poll_fn(|| { ping_pong.poll_pong() }) + .wait() + .expect_err("poll_pong should error"); + ping_pong + }); + + // Sleep to let the ping thread park its task... + thread::sleep(Duration::from_millis(50)); + drop(client); + drop(srv); + + let mut ping_pong = t.join().expect("ping pong thread join"); + + // Now that the connection is closed, also test `send_ping` errors... + assert_eq!( + ping_pong.send_ping(Ping::opaque()).expect_err("send_ping").to_string(), + "broken pipe", + ); +}