Implement ping_pong (#1)
* comments * wip * wip * Sketch out pingpong and keepalive stack modules PingPong responds to ping requests with acknowledgements. KeepAlive issues ping requests on idle connections. * remove keepalive for now * commentary * prettify ping_pong's poll * test ping pong and passthrough * add buffering test * Use a fixed-size slice for ping payloads * Improve pong dispatch pong messages should be buffered until Stream::poll returns Async::NotReady or Async::Ready(None) (i.e. such that it is not expected to be polled again). pong messages are now dispatched when no more data may be polled or the Sink half is activated. * revert name change * touchup * wip * Simplify Stream::poll Now PingPong only holds at most one pending pong and the stream will not produce additional frames unti the ping has been sent. Furthermore, we shouldn't have to call inner.poll_complete quite so frequently. * avoid Bytes::split_to * only use buf internally to Ping::load
This commit is contained in:
committed by
Carl Lerche
parent
a7b92d5ec2
commit
7f21954724
@@ -90,7 +90,7 @@ pub fn parse_stream_id(buf: &[u8]) -> StreamId {
|
||||
|
||||
impl Kind {
|
||||
pub fn new(byte: u8) -> Kind {
|
||||
return match byte {
|
||||
match byte {
|
||||
0 => Kind::Data,
|
||||
1 => Kind::Headers,
|
||||
2 => Kind::Priority,
|
||||
|
||||
@@ -27,6 +27,7 @@ mod data;
|
||||
mod go_away;
|
||||
mod head;
|
||||
mod headers;
|
||||
mod ping;
|
||||
mod reset;
|
||||
mod settings;
|
||||
mod util;
|
||||
@@ -35,6 +36,7 @@ pub use self::data::Data;
|
||||
pub use self::go_away::GoAway;
|
||||
pub use self::head::{Head, Kind, StreamId};
|
||||
pub use self::headers::{Headers, PushPromise, Continuation, Pseudo};
|
||||
pub use self::ping::Ping;
|
||||
pub use self::reset::Reset;
|
||||
pub use self::settings::{Settings, SettingSet};
|
||||
|
||||
@@ -52,6 +54,7 @@ pub enum Frame {
|
||||
Headers(Headers),
|
||||
PushPromise(PushPromise),
|
||||
Settings(Settings),
|
||||
Ping(Ping)
|
||||
}
|
||||
|
||||
/// Errors that can occur during parsing an HTTP/2 frame.
|
||||
@@ -66,6 +69,9 @@ pub enum Error {
|
||||
/// An unsupported value was set for the frame kind.
|
||||
BadKind,
|
||||
|
||||
/// A length value other than 8 was set on a PING message.
|
||||
BadFrameSize,
|
||||
|
||||
/// The padding length was larger than the frame-header-specified
|
||||
/// length of the payload.
|
||||
TooMuchPadding,
|
||||
@@ -92,7 +98,7 @@ pub enum Error {
|
||||
|
||||
/// An invalid stream identifier was provided.
|
||||
///
|
||||
/// This is returned if a settings frame is received with a stream
|
||||
/// This is returned if a SETTINGS or PING frame is received with a stream
|
||||
/// identifier other than zero.
|
||||
InvalidStreamId,
|
||||
|
||||
|
||||
77
src/frame/ping.rs
Normal file
77
src/frame/ping.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use bytes::{Buf, BufMut, IntoBuf};
|
||||
use frame::{Frame, Head, Kind, Error};
|
||||
|
||||
const ACK_FLAG: u8 = 0x1;
|
||||
|
||||
pub type Payload = [u8; 8];
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Ping {
|
||||
ack: bool,
|
||||
payload: Payload,
|
||||
}
|
||||
|
||||
impl Ping {
|
||||
pub fn ping(payload: Payload) -> Ping {
|
||||
Ping { ack: false, payload }
|
||||
}
|
||||
|
||||
pub fn pong(payload: Payload) -> Ping {
|
||||
Ping { ack: true, payload }
|
||||
}
|
||||
|
||||
pub fn is_ack(&self) -> bool {
|
||||
self.ack
|
||||
}
|
||||
|
||||
pub fn into_payload(self) -> Payload {
|
||||
self.payload
|
||||
}
|
||||
|
||||
/// Builds a `Ping` frame from a raw frame.
|
||||
pub fn load(head: Head, bytes: &[u8]) -> Result<Ping, Error> {
|
||||
debug_assert_eq!(head.kind(), ::frame::Kind::Ping);
|
||||
|
||||
// PING frames are not associated with any individual stream. If a PING
|
||||
// frame is received with a stream identifier field value other than
|
||||
// 0x0, the recipient MUST respond with a connection error
|
||||
// (Section 5.4.1) of type PROTOCOL_ERROR.
|
||||
if head.stream_id() != 0 {
|
||||
return Err(Error::InvalidStreamId);
|
||||
}
|
||||
|
||||
// In addition to the frame header, PING frames MUST contain 8 octets of opaque
|
||||
// data in the payload.
|
||||
if bytes.len() != 8 {
|
||||
return Err(Error::BadFrameSize);
|
||||
}
|
||||
let mut payload = [0; 8];
|
||||
bytes.into_buf().copy_to_slice(&mut payload);
|
||||
|
||||
// The PING frame defines the following flags:
|
||||
//
|
||||
// ACK (0x1): When set, bit 0 indicates that this PING frame is a PING
|
||||
// response. An endpoint MUST set this flag in PING responses. An
|
||||
// endpoint MUST NOT respond to PING frames containing this flag.
|
||||
let ack = head.flag() & ACK_FLAG != 0;
|
||||
|
||||
Ok(Ping { ack, payload })
|
||||
}
|
||||
|
||||
pub fn encode<B: BufMut>(&self, dst: &mut B) {
|
||||
let sz = self.payload.len();
|
||||
trace!("encoding PING; ack={} len={}", self.ack, sz);
|
||||
|
||||
let flags = if self.ack { ACK_FLAG } else { 0 };
|
||||
let head = Head::new(Kind::Ping, flags, 0);
|
||||
|
||||
head.encode(sz, dst);
|
||||
dst.put_slice(&self.payload);
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Ping> for Frame {
|
||||
fn from(src: Ping) -> Frame {
|
||||
Frame::Ping(src)
|
||||
}
|
||||
}
|
||||
@@ -287,9 +287,9 @@ impl Decoder {
|
||||
|
||||
buf.advance(len);
|
||||
return ret;
|
||||
} else {
|
||||
Ok(take(buf, len))
|
||||
}
|
||||
|
||||
Ok(take(buf, len))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -88,7 +88,9 @@ impl<T> FramedRead<T> {
|
||||
// TODO: implement
|
||||
return Ok(None);
|
||||
}
|
||||
Kind::Ping => unimplemented!(),
|
||||
Kind::Ping => {
|
||||
try!(frame::Ping::load(head, &bytes[frame::HEADER_LEN..])).into()
|
||||
}
|
||||
Kind::GoAway => {
|
||||
let frame = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..]));
|
||||
debug!("decoded; frame={:?}", frame);
|
||||
|
||||
@@ -116,6 +116,10 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
|
||||
v.encode(self.buf.get_mut());
|
||||
trace!("encoded settings; rem={:?}", self.buf.remaining());
|
||||
}
|
||||
Frame::Ping(v) => {
|
||||
v.encode(self.buf.get_mut());
|
||||
trace!("encoded ping; rem={:?}", self.buf.remaining());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AsyncSink::Ready)
|
||||
|
||||
@@ -46,7 +46,7 @@ pub fn from_io<T, P>(io: T, settings: frame::SettingSet)
|
||||
// Map to `Frame` types
|
||||
let framed = FramedRead::new(framed_read);
|
||||
|
||||
// Add ping/pong handler
|
||||
// Add ping/pong responder.
|
||||
let ping_pong = PingPong::new(framed);
|
||||
|
||||
// Add settings handler
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use ConnectionError;
|
||||
use frame::Frame;
|
||||
use frame::{Frame, Ping};
|
||||
use futures::*;
|
||||
use proto::ReadySink;
|
||||
|
||||
use futures::*;
|
||||
|
||||
/// Acknowledges ping requests from the remote.
|
||||
#[derive(Debug)]
|
||||
pub struct PingPong<T> {
|
||||
inner: T,
|
||||
pong: Option<Frame>,
|
||||
}
|
||||
|
||||
impl<T> PingPong<T>
|
||||
@@ -15,11 +16,28 @@ impl<T> PingPong<T>
|
||||
{
|
||||
pub fn new(inner: T) -> PingPong<T> {
|
||||
PingPong {
|
||||
inner: inner,
|
||||
inner,
|
||||
pong: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send_pong(&mut self) -> Poll<(), ConnectionError> {
|
||||
if let Some(pong) = self.pong.take() {
|
||||
if let AsyncSink::NotReady(pong) = self.inner.start_send(pong)? {
|
||||
// If the pong can't be sent, save it.
|
||||
self.pong = Some(pong);
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
|
||||
/// > Receivers of a PING frame that does not include an ACK flag MUST send
|
||||
/// > a PING frame with the ACK flag set in response, with an identical
|
||||
/// > payload. PING responses SHOULD be given higher priority than any
|
||||
/// > other frame.
|
||||
impl<T> Stream for PingPong<T>
|
||||
where T: Stream<Item = Frame, Error = ConnectionError>,
|
||||
T: Sink<SinkItem = Frame, SinkError = ConnectionError>,
|
||||
@@ -27,8 +45,34 @@ impl<T> Stream for PingPong<T>
|
||||
type Item = Frame;
|
||||
type Error = ConnectionError;
|
||||
|
||||
/// Reads the next frame from the underlying socket, eliding ping requests.
|
||||
///
|
||||
/// If a PING is received without the ACK flag, the frame is sent to the remote with
|
||||
/// its ACK flag set.
|
||||
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
|
||||
self.inner.poll()
|
||||
loop {
|
||||
// Don't read any frames until `inner` accepts any pending pong.
|
||||
try_ready!(self.try_send_pong());
|
||||
|
||||
match self.inner.poll()? {
|
||||
Async::Ready(Some(Frame::Ping(ping))) => {
|
||||
if ping.is_ack() {
|
||||
// If we received an ACK, pass it on (nothing to be done here).
|
||||
return Ok(Async::Ready(Some(ping.into())));
|
||||
}
|
||||
|
||||
// Save a pong to be sent when there is nothing more to be returned
|
||||
// from the stream or when frames are sent to the sink.
|
||||
let pong = Ping::pong(ping.into_payload());
|
||||
self.pong = Some(pong.into());
|
||||
}
|
||||
|
||||
// Everything other than ping gets passed through.
|
||||
f => {
|
||||
return Ok(f);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,10 +84,18 @@ impl<T> Sink for PingPong<T>
|
||||
type SinkError = ConnectionError;
|
||||
|
||||
fn start_send(&mut self, item: Frame) -> StartSend<Frame, ConnectionError> {
|
||||
// Pings _SHOULD_ have priority over other messages, so attempt to send pending
|
||||
// ping frames before attempting to send `item`.
|
||||
if self.try_send_pong()?.is_not_ready() {
|
||||
return Ok(AsyncSink::NotReady(item));
|
||||
}
|
||||
|
||||
self.inner.start_send(item)
|
||||
}
|
||||
|
||||
/// Polls the underlying sink and tries to flush pending pong frames.
|
||||
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
|
||||
try_ready!(self.try_send_pong());
|
||||
self.inner.poll_complete()
|
||||
}
|
||||
}
|
||||
@@ -54,6 +106,188 @@ impl<T> ReadySink for PingPong<T>
|
||||
T: ReadySink,
|
||||
{
|
||||
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
|
||||
try_ready!(self.try_send_pong());
|
||||
self.inner.poll_ready()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::rc::Rc;
|
||||
|
||||
#[test]
|
||||
fn responds_to_ping_with_pong() {
|
||||
let trans = Transport::default();
|
||||
let mut ping_pong = PingPong::new(trans.clone());
|
||||
|
||||
{
|
||||
let mut trans = trans.0.borrow_mut();
|
||||
let ping = Ping::ping(*b"buoyant_");
|
||||
trans.from_socket.push_back(ping.into());
|
||||
}
|
||||
|
||||
match ping_pong.poll() {
|
||||
Ok(Async::NotReady) => {} // cool
|
||||
rsp => panic!("unexpected poll result: {:?}", rsp),
|
||||
}
|
||||
|
||||
{
|
||||
let mut trans = trans.0.borrow_mut();
|
||||
assert_eq!(trans.to_socket.len(), 1);
|
||||
match trans.to_socket.pop_front().unwrap() {
|
||||
Frame::Ping(pong) => {
|
||||
assert!(pong.is_ack());
|
||||
assert_eq!(&pong.into_payload(), b"buoyant_");
|
||||
}
|
||||
f => panic!("unexpected frame: {:?}", f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn responds_to_ping_even_when_blocked() {
|
||||
let trans = Transport::default();
|
||||
let mut ping_pong = PingPong::new(trans.clone());
|
||||
|
||||
// Configure the transport so that writes can't proceed.
|
||||
{
|
||||
let mut trans = trans.0.borrow_mut();
|
||||
trans.start_send_blocked = true;
|
||||
}
|
||||
|
||||
// The transport receives a ping but can't send it immediately.
|
||||
{
|
||||
let mut trans = trans.0.borrow_mut();
|
||||
let ping = Ping::ping(*b"buoyant?");
|
||||
trans.from_socket.push_back(ping.into());
|
||||
}
|
||||
assert!(ping_pong.poll().unwrap().is_not_ready());
|
||||
|
||||
// The transport receives another ping but can't send it immediately.
|
||||
{
|
||||
let mut trans = trans.0.borrow_mut();
|
||||
let ping = Ping::ping(*b"buoyant!");
|
||||
trans.from_socket.push_back(ping.into());
|
||||
}
|
||||
assert!(ping_pong.poll().unwrap().is_not_ready());
|
||||
|
||||
// At this point, ping_pong is holding two pongs that it cannot send.
|
||||
{
|
||||
let mut trans = trans.0.borrow_mut();
|
||||
assert!(trans.to_socket.is_empty());
|
||||
|
||||
trans.start_send_blocked = false;
|
||||
}
|
||||
|
||||
// Now that start_send_blocked is disabled, the next poll will successfully send
|
||||
// the pongs on the transport.
|
||||
assert!(ping_pong.poll().unwrap().is_not_ready());
|
||||
{
|
||||
let mut trans = trans.0.borrow_mut();
|
||||
assert_eq!(trans.to_socket.len(), 2);
|
||||
match trans.to_socket.pop_front().unwrap() {
|
||||
Frame::Ping(pong) => {
|
||||
assert!(pong.is_ack());
|
||||
assert_eq!(&pong.into_payload(), b"buoyant?");
|
||||
}
|
||||
f => panic!("unexpected frame: {:?}", f),
|
||||
}
|
||||
match trans.to_socket.pop_front().unwrap() {
|
||||
Frame::Ping(pong) => {
|
||||
assert!(pong.is_ack());
|
||||
assert_eq!(&pong.into_payload(), b"buoyant!");
|
||||
}
|
||||
f => panic!("unexpected frame: {:?}", f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pong_passes_through() {
|
||||
let trans = Transport::default();
|
||||
let mut ping_pong = PingPong::new(trans.clone());
|
||||
|
||||
{
|
||||
let mut trans = trans.0.borrow_mut();
|
||||
let pong = Ping::pong(*b"buoyant!");
|
||||
trans.from_socket.push_back(pong.into());
|
||||
}
|
||||
|
||||
match ping_pong.poll().unwrap() {
|
||||
Async::Ready(Some(Frame::Ping(pong))) => {
|
||||
assert!(pong.is_ack());
|
||||
assert_eq!(&pong.into_payload(), b"buoyant!");
|
||||
}
|
||||
f => panic!("unexpected frame: {:?}", f),
|
||||
}
|
||||
|
||||
{
|
||||
let trans = trans.0.borrow();
|
||||
assert_eq!(trans.to_socket.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
/// A stubbed transport for tests.a
|
||||
///
|
||||
/// We probably want/have something generic for this?
|
||||
#[derive(Clone, Default)]
|
||||
struct Transport(Rc<RefCell<Inner>>);
|
||||
|
||||
#[derive(Default)]
|
||||
struct Inner {
|
||||
from_socket: VecDeque<Frame>,
|
||||
to_socket: VecDeque<Frame>,
|
||||
read_blocked: bool,
|
||||
start_send_blocked: bool,
|
||||
closing: bool,
|
||||
}
|
||||
|
||||
impl Stream for Transport {
|
||||
type Item = Frame;
|
||||
type Error = ConnectionError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
|
||||
let mut trans = self.0.borrow_mut();
|
||||
if trans.read_blocked || (!trans.closing && trans.from_socket.is_empty()) {
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Ok(trans.from_socket.pop_front().into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Sink for Transport {
|
||||
type SinkItem = Frame;
|
||||
type SinkError = ConnectionError;
|
||||
|
||||
fn start_send(&mut self, item: Frame) -> StartSend<Frame, ConnectionError> {
|
||||
let mut trans = self.0.borrow_mut();
|
||||
if trans.closing || trans.start_send_blocked {
|
||||
Ok(AsyncSink::NotReady(item))
|
||||
} else {
|
||||
trans.to_socket.push_back(item);
|
||||
Ok(AsyncSink::Ready)
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
|
||||
let trans = self.0.borrow();
|
||||
if !trans.to_socket.is_empty() {
|
||||
Ok(Async::NotReady)
|
||||
} else {
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
|
||||
fn close(&mut self) -> Poll<(), ConnectionError> {
|
||||
{
|
||||
let mut trans = self.0.borrow_mut();
|
||||
trans.closing = true;
|
||||
}
|
||||
self.poll_complete()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user