Add user PING support (#346)

- Add `share::PingPong`, which can send `Ping`s, and poll for the `Pong`
  from the peer.
This commit is contained in:
Sean McArthur
2019-02-18 15:59:11 -08:00
committed by GitHub
parent 8a0b7ff64f
commit e3a73f726e
12 changed files with 458 additions and 31 deletions

View File

@@ -156,7 +156,7 @@
//! [`Builder`]: struct.Builder.html
//! [`Error`]: ../struct.Error.html
use {SendStream, RecvStream, ReleaseCapacity};
use {SendStream, RecvStream, ReleaseCapacity, PingPong};
use codec::{Codec, RecvError, SendError, UserError};
use frame::{Headers, Pseudo, Reason, Settings, StreamId};
use proto;
@@ -319,31 +319,13 @@ pub struct PushPromise {
response: PushedResponseFuture,
}
#[derive(Debug)]
/// A stream of pushed responses and corresponding promised requests
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct PushPromises {
inner: proto::OpaqueStreamRef,
}
impl Stream for PushPromises {
type Item = PushPromise;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.inner.poll_pushed()) {
Some((request, response)) => {
let response = PushedResponseFuture {
inner: ResponseFuture {
inner: response, push_promise_consumed: false
}
};
Ok(Async::Ready(Some(PushPromise{request, response})))
}
None => Ok(Async::Ready(None)),
}
}
}
/// Builds client connections with custom configuration values.
///
/// Methods can be chained in order to set the configuration values.
@@ -1282,6 +1264,17 @@ where
assert!(size <= proto::MAX_WINDOW_SIZE);
self.inner.set_target_window_size(size);
}
/// Takes a `PingPong` instance from the connection.
///
/// # Note
///
/// This may only be called once. Calling multiple times will return `None`.
pub fn ping_pong(&mut self) -> Option<PingPong> {
self.inner
.take_user_pings()
.map(PingPong::new)
}
}
impl<T, B> Future for Connection<T, B>
@@ -1416,6 +1409,27 @@ impl ResponseFuture {
}
}
// ===== impl PushPromises =====
impl Stream for PushPromises {
type Item = PushPromise;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.inner.poll_pushed()) {
Some((request, response)) => {
let response = PushedResponseFuture {
inner: ResponseFuture {
inner: response, push_promise_consumed: false
}
};
Ok(Async::Ready(Some(PushPromise{request, response})))
}
None => Ok(Async::Ready(None)),
}
}
}
// ===== impl PushPromise =====
impl PushPromise {

View File

@@ -54,6 +54,9 @@ pub enum UserError {
/// Calls `SendResponse::poll_reset` after having called `send_response`.
PollResetAfterSendResponse,
/// Calls `PingPong::send_ping` before receiving a pong.
SendPingWhilePending,
}
// ===== impl RecvError =====
@@ -134,6 +137,7 @@ impl error::Error for UserError {
MalformedHeaders => "malformed headers",
MissingUriSchemeAndAuthority => "request URI missing scheme and authority",
PollResetAfterSendResponse => "poll_reset after send_response is illegal",
SendPingWhilePending => "send_ping before received previous pong",
}
}
}

View File

@@ -14,6 +14,7 @@ pub struct Ping {
// This was just 8 randomly generated bytes. We use something besides just
// zeroes to distinguish this specific PING from any other.
const SHUTDOWN_PAYLOAD: Payload = [0x0b, 0x7b, 0xa2, 0xf0, 0x8b, 0x9b, 0xfe, 0x54];
const USER_PAYLOAD: Payload = [0x3b, 0x7c, 0xdb, 0x7a, 0x0b, 0x87, 0x16, 0xb4];
impl Ping {
@@ -23,6 +24,12 @@ impl Ping {
#[cfg(not(feature = "unstable"))]
pub(crate) const SHUTDOWN: Payload = SHUTDOWN_PAYLOAD;
#[cfg(feature = "unstable")]
pub const USER: Payload = USER_PAYLOAD;
#[cfg(not(feature = "unstable"))]
pub(crate) const USER: Payload = USER_PAYLOAD;
pub fn new(payload: Payload) -> Ping {
Ping {
ack: false,

View File

@@ -129,7 +129,7 @@ pub mod server;
mod share;
pub use error::{Error, Reason};
pub use share::{SendStream, StreamId, RecvStream, ReleaseCapacity};
pub use share::{SendStream, StreamId, RecvStream, ReleaseCapacity, PingPong, Ping, Pong};
#[cfg(feature = "unstable")]
pub use codec::{Codec, RecvError, SendError, UserError};

View File

@@ -178,6 +178,10 @@ where
}
}
pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
self.ping_pong.take_user_pings()
}
/// Advances the internal state of the connection.
pub fn poll(&mut self) -> Poll<(), proto::Error> {
use codec::RecvError::*;

View File

@@ -9,6 +9,7 @@ mod streams;
pub(crate) use self::connection::{Config, Connection};
pub(crate) use self::error::Error;
pub(crate) use self::peer::{Peer, Dyn as DynPeer};
pub(crate) use self::ping_pong::UserPings;
pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams};
pub(crate) use self::streams::{PollReset, Prioritized, Open};

View File

@@ -1,17 +1,36 @@
use codec::Codec;
use frame::Ping;
use proto::PingPayload;
use proto::{self, PingPayload};
use bytes::Buf;
use futures::{Async, Poll};
use futures::task::AtomicTask;
use std::io;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio_io::AsyncWrite;
/// Acknowledges ping requests from the remote.
#[derive(Debug)]
pub struct PingPong {
pub(crate) struct PingPong {
pending_ping: Option<PendingPing>,
pending_pong: Option<PingPayload>,
user_pings: Option<UserPingsRx>,
}
#[derive(Debug)]
pub(crate) struct UserPings(Arc<UserPingsInner>);
#[derive(Debug)]
struct UserPingsRx(Arc<UserPingsInner>);
#[derive(Debug)]
struct UserPingsInner {
state: AtomicUsize,
/// Task to wake up the main `Connection`.
ping_task: AtomicTask,
/// Task to wake up `share::PingPong::poll_pong`.
pong_task: AtomicTask,
}
#[derive(Debug)]
@@ -28,15 +47,44 @@ pub(crate) enum ReceivedPing {
Shutdown,
}
/// No user ping pending.
const USER_STATE_EMPTY: usize = 0;
/// User has called `send_ping`, but PING hasn't been written yet.
const USER_STATE_PENDING_PING: usize = 1;
/// User PING has been written, waiting for PONG.
const USER_STATE_PENDING_PONG: usize = 2;
/// We've received user PONG, waiting for user to `poll_pong`.
const USER_STATE_RECEIVED_PONG: usize = 3;
/// The connection is closed.
const USER_STATE_CLOSED: usize = 4;
// ===== impl PingPong =====
impl PingPong {
pub fn new() -> Self {
pub(crate) fn new() -> Self {
PingPong {
pending_ping: None,
pending_pong: None,
user_pings: None,
}
}
pub fn ping_shutdown(&mut self) {
/// Can only be called once. If called a second time, returns `None`.
pub(crate) fn take_user_pings(&mut self) -> Option<UserPings> {
if self.user_pings.is_some() {
return None;
}
let user_pings = Arc::new(UserPingsInner {
state: AtomicUsize::new(USER_STATE_EMPTY),
ping_task: AtomicTask::new(),
pong_task: AtomicTask::new(),
});
self.user_pings = Some(UserPingsRx(user_pings.clone()));
Some(UserPings(user_pings))
}
pub(crate) fn ping_shutdown(&mut self) {
assert!(self.pending_ping.is_none());
self.pending_ping = Some(PendingPing {
@@ -54,7 +102,12 @@ impl PingPong {
if ping.is_ack() {
if let Some(pending) = self.pending_ping.take() {
if &pending.payload == ping.payload() {
trace!("recv PING ack");
assert_eq!(
&pending.payload,
&Ping::SHUTDOWN,
"pending_ping should be for shutdown",
);
trace!("recv PING SHUTDOWN ack");
return ReceivedPing::Shutdown;
}
@@ -62,6 +115,13 @@ impl PingPong {
self.pending_ping = Some(pending);
}
if let Some(ref users) = self.user_pings {
if ping.payload() == &Ping::USER && users.receive_pong() {
trace!("recv PING USER ack");
return ReceivedPing::Unknown;
}
}
// else we were acked a ping we didn't send?
// The spec doesn't require us to do anything about this,
// so for resiliency, just ignore it for now.
@@ -75,7 +135,7 @@ impl PingPong {
}
/// Send any pending pongs.
pub fn send_pending_pong<T, B>(&mut self, dst: &mut Codec<T, B>) -> Poll<(), io::Error>
pub(crate) fn send_pending_pong<T, B>(&mut self, dst: &mut Codec<T, B>) -> Poll<(), io::Error>
where
T: AsyncWrite,
B: Buf,
@@ -94,7 +154,7 @@ impl PingPong {
}
/// Send any pending pings.
pub fn send_pending_ping<T, B>(&mut self, dst: &mut Codec<T, B>) -> Poll<(), io::Error>
pub(crate) fn send_pending_ping<T, B>(&mut self, dst: &mut Codec<T, B>) -> Poll<(), io::Error>
where
T: AsyncWrite,
B: Buf,
@@ -109,6 +169,18 @@ impl PingPong {
.expect("invalid ping frame");
ping.sent = true;
}
} else if let Some(ref users) = self.user_pings {
if users.0.state.load(Ordering::Acquire) == USER_STATE_PENDING_PING {
if !dst.poll_ready()?.is_ready() {
return Ok(Async::NotReady);
}
dst.buffer(Ping::new(Ping::USER).into())
.expect("invalid ping frame");
users.0.state.store(USER_STATE_PENDING_PONG, Ordering::Release);
} else {
users.0.ping_task.register();
}
}
Ok(Async::Ready(()))
@@ -116,10 +188,83 @@ impl PingPong {
}
impl ReceivedPing {
pub fn is_shutdown(&self) -> bool {
pub(crate) fn is_shutdown(&self) -> bool {
match *self {
ReceivedPing::Shutdown => true,
_ => false,
}
}
}
// ===== impl UserPings =====
impl UserPings {
pub(crate) fn send_ping(&self) -> Result<(), Option<proto::Error>> {
let prev = self.0.state.compare_and_swap(
USER_STATE_EMPTY, // current
USER_STATE_PENDING_PING, // new
Ordering::AcqRel,
);
match prev {
USER_STATE_EMPTY => {
self.0.ping_task.notify();
Ok(())
},
USER_STATE_CLOSED => {
Err(Some(broken_pipe().into()))
}
_ => {
// Was already pending, user error!
Err(None)
}
}
}
pub(crate) fn poll_pong(&self) -> Poll<(), proto::Error> {
// Must register before checking state, in case state were to change
// before we could register, and then the ping would just be lost.
self.0.pong_task.register();
let prev = self.0.state.compare_and_swap(
USER_STATE_RECEIVED_PONG, // current
USER_STATE_EMPTY, // new
Ordering::AcqRel,
);
match prev {
USER_STATE_RECEIVED_PONG => Ok(Async::Ready(())),
USER_STATE_CLOSED => Err(broken_pipe().into()),
_ => Ok(Async::NotReady),
}
}
}
// ===== impl UserPingsRx =====
impl UserPingsRx {
fn receive_pong(&self) -> bool {
let prev = self.0.state.compare_and_swap(
USER_STATE_PENDING_PONG, // current
USER_STATE_RECEIVED_PONG, // new
Ordering::AcqRel,
);
if prev == USER_STATE_PENDING_PONG {
self.0.pong_task.notify();
true
} else {
false
}
}
}
impl Drop for UserPingsRx {
fn drop(&mut self) {
self.0.state.store(USER_STATE_CLOSED, Ordering::Release);
self.0.pong_task.notify();
}
}
fn broken_pipe() -> io::Error {
io::ErrorKind::BrokenPipe.into()
}

View File

@@ -129,7 +129,7 @@
//! [`SendStream`]: ../struct.SendStream.html
//! [`TcpListener`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpListener.html
use {SendStream, RecvStream, ReleaseCapacity};
use {SendStream, RecvStream, ReleaseCapacity, PingPong};
use codec::{Codec, RecvError};
use frame::{self, Pseudo, Reason, Settings, StreamId};
use proto::{self, Config, Prioritized};
@@ -459,6 +459,17 @@ where
pub fn graceful_shutdown(&mut self) {
self.connection.go_away_gracefully();
}
/// Takes a `PingPong` instance from the connection.
///
/// # Note
///
/// This may only be called once. Calling multiple times will return `None`.
pub fn ping_pong(&mut self) -> Option<PingPong> {
self.connection
.take_user_pings()
.map(PingPong::new)
}
}
impl<T, B> futures::Stream for Connection<T, B>

View File

@@ -193,6 +193,27 @@ pub struct ReleaseCapacity {
inner: proto::OpaqueStreamRef,
}
/// A handle to send and receive PING frames with the peer.
// NOT Clone on purpose
pub struct PingPong {
inner: proto::UserPings,
}
/// Sent via [`PingPong`][] to send a PING frame to a peer.
///
/// [`PingPong`]: struct.PingPong.html
pub struct Ping {
_p: (),
}
/// Received via [`PingPong`][] when a peer acknowledges a [`Ping`][].
///
/// [`PingPong`]: struct.PingPong.html
/// [`Ping`]: struct.Ping.html
pub struct Pong {
_p: (),
}
// ===== impl SendStream =====
impl<B: IntoBuf> SendStream<B> {
@@ -477,3 +498,112 @@ impl Clone for ReleaseCapacity {
ReleaseCapacity { inner }
}
}
// ===== impl PingPong =====
impl PingPong {
pub(crate) fn new(inner: proto::UserPings) -> Self {
PingPong {
inner,
}
}
/// Send a `PING` frame to the peer.
///
/// Only one ping can be pending at a time, so trying to send while
/// a pong has not be received means this will return a user error.
///
/// # Example
///
/// ```
/// # fn doc(mut ping_pong: h2::PingPong) {
/// // let mut ping_pong = ...
/// ping_pong
/// .send_ping(h2::Ping::opaque())
/// .unwrap();
/// # }
/// ```
pub fn send_ping(&mut self, ping: Ping) -> Result<(), ::Error> {
// Passing a `Ping` here is just to be forwards-compatible with
// eventually allowing choosing a ping payload. For now, we can
// just drop it.
drop(ping);
self.inner
.send_ping()
.map_err(|err| match err {
Some(err) => err.into(),
None => UserError::SendPingWhilePending.into()
})
}
/// Polls for the acknowledgement of a previously [sent][] `PING` frame.
///
/// # Example
///
/// ```
/// # extern crate futures;
/// # extern crate h2;
/// # use futures::Future;
/// # fn doc(mut ping_pong: h2::PingPong) {
/// // let mut ping_pong = ...
///
/// // First, send a PING.
/// ping_pong
/// .send_ping(h2::Ping::opaque())
/// .unwrap();
///
/// // And then wait for the PONG.
/// futures::future::poll_fn(move || {
/// ping_pong.poll_pong()
/// }).wait().unwrap();
/// # }
/// # fn main() {}
/// ```
///
/// [sent]: struct.PingPong.html#method.send_ping
pub fn poll_pong(&mut self) -> Poll<Pong, ::Error> {
try_ready!(self.inner.poll_pong());
Ok(Async::Ready(Pong {
_p: (),
}))
}
}
impl fmt::Debug for PingPong {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("PingPong")
.finish()
}
}
// ===== impl Ping =====
impl Ping {
/// Creates a new opaque `Ping` to be sent via a [`PingPong`][].
///
/// The payload is "opaque", such that it shouldn't be depended on.
///
/// [`PingPong`]: struct.PingPong.html
pub fn opaque() -> Ping {
Ping {
_p: (),
}
}
}
impl fmt::Debug for Ping {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Ping")
.finish()
}
}
// ===== impl Pong =====
impl fmt::Debug for Pong {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Pong")
.finish()
}
}

View File

@@ -38,6 +38,7 @@ pub use self::bytes::{Buf, BufMut, Bytes, BytesMut, IntoBuf};
pub use tokio_io::{AsyncRead, AsyncWrite};
pub use std::thread;
pub use std::time::Duration;
// ===== Everything under here shouldn't be used =====

View File

@@ -8,6 +8,19 @@ pub fn byte_str(s: &str) -> String<Bytes> {
String::try_from(Bytes::from(s)).unwrap()
}
pub fn yield_once() -> impl Future<Item=(), Error=()> {
let mut yielded = false;
futures::future::poll_fn(move || {
if yielded {
Ok(Async::Ready(()))
} else {
yielded = true;
futures::task::current().notify();
Ok(Async::NotReady)
}
})
}
pub fn wait_for_capacity(stream: h2::SendStream<Bytes>, target: usize) -> WaitForCapacity {
WaitForCapacity {
stream: Some(stream),

View File

@@ -108,3 +108,100 @@ fn pong_has_highest_priority() {
srv.join(client).wait().expect("wait");
}
#[test]
fn user_ping_pong() {
let _ = ::env_logger::try_init();
let (io, srv) = mock::new();
let srv = srv.assert_client_handshake()
.expect("srv handshake")
.recv_settings()
.recv_frame(frames::ping(frame::Ping::USER))
.send_frame(frames::ping(frame::Ping::USER).pong())
.recv_frame(frames::go_away(0))
.recv_eof();
let client = client::handshake(io)
.expect("client handshake")
.and_then(|(client, conn)| {
// yield once so we can ack server settings
conn
.drive(util::yield_once())
.map(move |(conn, ())| (client, conn))
})
.and_then(|(client, mut conn)| {
// `ping_pong()` method conflict with mock future ext trait.
let mut ping_pong = client::Connection::ping_pong(&mut conn)
.expect("taking ping_pong");
ping_pong
.send_ping(Ping::opaque())
.expect("send ping");
// multiple pings results in a user error...
assert_eq!(
ping_pong.send_ping(Ping::opaque()).expect_err("ping 2").to_string(),
"user error: send_ping before received previous pong",
"send_ping while ping pending is a user error",
);
conn
.drive(futures::future::poll_fn(move || {
ping_pong.poll_pong()
}))
.and_then(move |(conn, _pong)| {
drop(client);
conn.expect("client")
})
});
client.join(srv).wait().expect("wait");
}
#[test]
fn user_notifies_when_connection_closes() {
let _ = ::env_logger::try_init();
let (io, srv) = mock::new();
let srv = srv.assert_client_handshake()
.expect("srv handshake")
.recv_settings();
let client = client::handshake(io)
.expect("client handshake")
.and_then(|(client, conn)| {
// yield once so we can ack server settings
conn
.drive(util::yield_once())
.map(move |(conn, ())| (client, conn))
})
.map(|(_client, conn)| conn);
let (mut client, srv) = client.join(srv).wait().expect("wait");
// `ping_pong()` method conflict with mock future ext trait.
let mut ping_pong = client::Connection::ping_pong(&mut client)
.expect("taking ping_pong");
// Spawn a thread so we can park a task waiting on `poll_pong`, and then
// drop the client and be sure the parked task is notified...
let t = thread::spawn(move || {
poll_fn(|| { ping_pong.poll_pong() })
.wait()
.expect_err("poll_pong should error");
ping_pong
});
// Sleep to let the ping thread park its task...
thread::sleep(Duration::from_millis(50));
drop(client);
drop(srv);
let mut ping_pong = t.join().expect("ping pong thread join");
// Now that the connection is closed, also test `send_ping` errors...
assert_eq!(
ping_pong.send_ping(Ping::opaque()).expect_err("send_ping").to_string(),
"broken pipe",
);
}