diff --git a/src/frame/head.rs b/src/frame/head.rs index a0d5dc7..9b76282 100644 --- a/src/frame/head.rs +++ b/src/frame/head.rs @@ -90,7 +90,7 @@ pub fn parse_stream_id(buf: &[u8]) -> StreamId { impl Kind { pub fn new(byte: u8) -> Kind { - return match byte { + match byte { 0 => Kind::Data, 1 => Kind::Headers, 2 => Kind::Priority, diff --git a/src/frame/mod.rs b/src/frame/mod.rs index b0d7f62..3b3e5bc 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -27,6 +27,7 @@ mod data; mod go_away; mod head; mod headers; +mod ping; mod reset; mod settings; mod util; @@ -35,6 +36,7 @@ pub use self::data::Data; pub use self::go_away::GoAway; pub use self::head::{Head, Kind, StreamId}; pub use self::headers::{Headers, PushPromise, Continuation, Pseudo}; +pub use self::ping::Ping; pub use self::reset::Reset; pub use self::settings::{Settings, SettingSet}; @@ -52,6 +54,7 @@ pub enum Frame { Headers(Headers), PushPromise(PushPromise), Settings(Settings), + Ping(Ping) } /// Errors that can occur during parsing an HTTP/2 frame. @@ -66,6 +69,9 @@ pub enum Error { /// An unsupported value was set for the frame kind. BadKind, + /// A length value other than 8 was set on a PING message. + BadFrameSize, + /// The padding length was larger than the frame-header-specified /// length of the payload. TooMuchPadding, @@ -92,7 +98,7 @@ pub enum Error { /// An invalid stream identifier was provided. /// - /// This is returned if a settings frame is received with a stream + /// This is returned if a SETTINGS or PING frame is received with a stream /// identifier other than zero. InvalidStreamId, diff --git a/src/frame/ping.rs b/src/frame/ping.rs new file mode 100644 index 0000000..09b8558 --- /dev/null +++ b/src/frame/ping.rs @@ -0,0 +1,77 @@ +use bytes::{Buf, BufMut, IntoBuf}; +use frame::{Frame, Head, Kind, Error}; + +const ACK_FLAG: u8 = 0x1; + +pub type Payload = [u8; 8]; + +#[derive(Debug)] +pub struct Ping { + ack: bool, + payload: Payload, +} + +impl Ping { + pub fn ping(payload: Payload) -> Ping { + Ping { ack: false, payload } + } + + pub fn pong(payload: Payload) -> Ping { + Ping { ack: true, payload } + } + + pub fn is_ack(&self) -> bool { + self.ack + } + + pub fn into_payload(self) -> Payload { + self.payload + } + + /// Builds a `Ping` frame from a raw frame. + pub fn load(head: Head, bytes: &[u8]) -> Result { + debug_assert_eq!(head.kind(), ::frame::Kind::Ping); + + // PING frames are not associated with any individual stream. If a PING + // frame is received with a stream identifier field value other than + // 0x0, the recipient MUST respond with a connection error + // (Section 5.4.1) of type PROTOCOL_ERROR. + if head.stream_id() != 0 { + return Err(Error::InvalidStreamId); + } + + // In addition to the frame header, PING frames MUST contain 8 octets of opaque + // data in the payload. + if bytes.len() != 8 { + return Err(Error::BadFrameSize); + } + let mut payload = [0; 8]; + bytes.into_buf().copy_to_slice(&mut payload); + + // The PING frame defines the following flags: + // + // ACK (0x1): When set, bit 0 indicates that this PING frame is a PING + // response. An endpoint MUST set this flag in PING responses. An + // endpoint MUST NOT respond to PING frames containing this flag. + let ack = head.flag() & ACK_FLAG != 0; + + Ok(Ping { ack, payload }) + } + + pub fn encode(&self, dst: &mut B) { + let sz = self.payload.len(); + trace!("encoding PING; ack={} len={}", self.ack, sz); + + let flags = if self.ack { ACK_FLAG } else { 0 }; + let head = Head::new(Kind::Ping, flags, 0); + + head.encode(sz, dst); + dst.put_slice(&self.payload); + } +} + +impl From for Frame { + fn from(src: Ping) -> Frame { + Frame::Ping(src) + } +} diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index ecad3c9..4c5b2c0 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -287,9 +287,9 @@ impl Decoder { buf.advance(len); return ret; - } else { - Ok(take(buf, len)) } + + Ok(take(buf, len)) } } diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 8fd4d0e..ee476d4 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -88,7 +88,9 @@ impl FramedRead { // TODO: implement return Ok(None); } - Kind::Ping => unimplemented!(), + Kind::Ping => { + try!(frame::Ping::load(head, &bytes[frame::HEADER_LEN..])).into() + } Kind::GoAway => { let frame = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..])); debug!("decoded; frame={:?}", frame); diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index ae7e40f..07bd934 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -116,6 +116,10 @@ impl Sink for FramedWrite { v.encode(self.buf.get_mut()); trace!("encoded settings; rem={:?}", self.buf.remaining()); } + Frame::Ping(v) => { + v.encode(self.buf.get_mut()); + trace!("encoded ping; rem={:?}", self.buf.remaining()); + } } Ok(AsyncSink::Ready) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 0443c50..b09d802 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -46,7 +46,7 @@ pub fn from_io(io: T, settings: frame::SettingSet) // Map to `Frame` types let framed = FramedRead::new(framed_read); - // Add ping/pong handler + // Add ping/pong responder. let ping_pong = PingPong::new(framed); // Add settings handler diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index ea4b4fe..d23f9ca 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,12 +1,13 @@ use ConnectionError; -use frame::Frame; +use frame::{Frame, Ping}; +use futures::*; use proto::ReadySink; -use futures::*; - +/// Acknowledges ping requests from the remote. #[derive(Debug)] pub struct PingPong { inner: T, + pong: Option, } impl PingPong @@ -15,11 +16,28 @@ impl PingPong { pub fn new(inner: T) -> PingPong { PingPong { - inner: inner, + inner, + pong: None, } } + + fn try_send_pong(&mut self) -> Poll<(), ConnectionError> { + if let Some(pong) = self.pong.take() { + if let AsyncSink::NotReady(pong) = self.inner.start_send(pong)? { + // If the pong can't be sent, save it. + self.pong = Some(pong); + return Ok(Async::NotReady); + } + } + + Ok(Async::Ready(())) + } } +/// > Receivers of a PING frame that does not include an ACK flag MUST send +/// > a PING frame with the ACK flag set in response, with an identical +/// > payload. PING responses SHOULD be given higher priority than any +/// > other frame. impl Stream for PingPong where T: Stream, T: Sink, @@ -27,8 +45,34 @@ impl Stream for PingPong type Item = Frame; type Error = ConnectionError; + /// Reads the next frame from the underlying socket, eliding ping requests. + /// + /// If a PING is received without the ACK flag, the frame is sent to the remote with + /// its ACK flag set. fn poll(&mut self) -> Poll, ConnectionError> { - self.inner.poll() + loop { + // Don't read any frames until `inner` accepts any pending pong. + try_ready!(self.try_send_pong()); + + match self.inner.poll()? { + Async::Ready(Some(Frame::Ping(ping))) => { + if ping.is_ack() { + // If we received an ACK, pass it on (nothing to be done here). + return Ok(Async::Ready(Some(ping.into()))); + } + + // Save a pong to be sent when there is nothing more to be returned + // from the stream or when frames are sent to the sink. + let pong = Ping::pong(ping.into_payload()); + self.pong = Some(pong.into()); + } + + // Everything other than ping gets passed through. + f => { + return Ok(f); + } + } + } } } @@ -40,10 +84,18 @@ impl Sink for PingPong type SinkError = ConnectionError; fn start_send(&mut self, item: Frame) -> StartSend { + // Pings _SHOULD_ have priority over other messages, so attempt to send pending + // ping frames before attempting to send `item`. + if self.try_send_pong()?.is_not_ready() { + return Ok(AsyncSink::NotReady(item)); + } + self.inner.start_send(item) } + /// Polls the underlying sink and tries to flush pending pong frames. fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + try_ready!(self.try_send_pong()); self.inner.poll_complete() } } @@ -54,6 +106,188 @@ impl ReadySink for PingPong T: ReadySink, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + try_ready!(self.try_send_pong()); self.inner.poll_ready() } } + +#[cfg(test)] +mod test { + use super::*; + use std::cell::RefCell; + use std::collections::VecDeque; + use std::rc::Rc; + + #[test] + fn responds_to_ping_with_pong() { + let trans = Transport::default(); + let mut ping_pong = PingPong::new(trans.clone()); + + { + let mut trans = trans.0.borrow_mut(); + let ping = Ping::ping(*b"buoyant_"); + trans.from_socket.push_back(ping.into()); + } + + match ping_pong.poll() { + Ok(Async::NotReady) => {} // cool + rsp => panic!("unexpected poll result: {:?}", rsp), + } + + { + let mut trans = trans.0.borrow_mut(); + assert_eq!(trans.to_socket.len(), 1); + match trans.to_socket.pop_front().unwrap() { + Frame::Ping(pong) => { + assert!(pong.is_ack()); + assert_eq!(&pong.into_payload(), b"buoyant_"); + } + f => panic!("unexpected frame: {:?}", f), + } + } + } + + #[test] + fn responds_to_ping_even_when_blocked() { + let trans = Transport::default(); + let mut ping_pong = PingPong::new(trans.clone()); + + // Configure the transport so that writes can't proceed. + { + let mut trans = trans.0.borrow_mut(); + trans.start_send_blocked = true; + } + + // The transport receives a ping but can't send it immediately. + { + let mut trans = trans.0.borrow_mut(); + let ping = Ping::ping(*b"buoyant?"); + trans.from_socket.push_back(ping.into()); + } + assert!(ping_pong.poll().unwrap().is_not_ready()); + + // The transport receives another ping but can't send it immediately. + { + let mut trans = trans.0.borrow_mut(); + let ping = Ping::ping(*b"buoyant!"); + trans.from_socket.push_back(ping.into()); + } + assert!(ping_pong.poll().unwrap().is_not_ready()); + + // At this point, ping_pong is holding two pongs that it cannot send. + { + let mut trans = trans.0.borrow_mut(); + assert!(trans.to_socket.is_empty()); + + trans.start_send_blocked = false; + } + + // Now that start_send_blocked is disabled, the next poll will successfully send + // the pongs on the transport. + assert!(ping_pong.poll().unwrap().is_not_ready()); + { + let mut trans = trans.0.borrow_mut(); + assert_eq!(trans.to_socket.len(), 2); + match trans.to_socket.pop_front().unwrap() { + Frame::Ping(pong) => { + assert!(pong.is_ack()); + assert_eq!(&pong.into_payload(), b"buoyant?"); + } + f => panic!("unexpected frame: {:?}", f), + } + match trans.to_socket.pop_front().unwrap() { + Frame::Ping(pong) => { + assert!(pong.is_ack()); + assert_eq!(&pong.into_payload(), b"buoyant!"); + } + f => panic!("unexpected frame: {:?}", f), + } + } + } + + #[test] + fn pong_passes_through() { + let trans = Transport::default(); + let mut ping_pong = PingPong::new(trans.clone()); + + { + let mut trans = trans.0.borrow_mut(); + let pong = Ping::pong(*b"buoyant!"); + trans.from_socket.push_back(pong.into()); + } + + match ping_pong.poll().unwrap() { + Async::Ready(Some(Frame::Ping(pong))) => { + assert!(pong.is_ack()); + assert_eq!(&pong.into_payload(), b"buoyant!"); + } + f => panic!("unexpected frame: {:?}", f), + } + + { + let trans = trans.0.borrow(); + assert_eq!(trans.to_socket.len(), 0); + } + } + + /// A stubbed transport for tests.a + /// + /// We probably want/have something generic for this? + #[derive(Clone, Default)] + struct Transport(Rc>); + + #[derive(Default)] + struct Inner { + from_socket: VecDeque, + to_socket: VecDeque, + read_blocked: bool, + start_send_blocked: bool, + closing: bool, + } + + impl Stream for Transport { + type Item = Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + let mut trans = self.0.borrow_mut(); + if trans.read_blocked || (!trans.closing && trans.from_socket.is_empty()) { + Ok(Async::NotReady) + } else { + Ok(trans.from_socket.pop_front().into()) + } + } + } + + impl Sink for Transport { + type SinkItem = Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: Frame) -> StartSend { + let mut trans = self.0.borrow_mut(); + if trans.closing || trans.start_send_blocked { + Ok(AsyncSink::NotReady(item)) + } else { + trans.to_socket.push_back(item); + Ok(AsyncSink::Ready) + } + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + let trans = self.0.borrow(); + if !trans.to_socket.is_empty() { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + } + + fn close(&mut self) -> Poll<(), ConnectionError> { + { + let mut trans = self.0.borrow_mut(); + trans.closing = true; + } + self.poll_complete() + } + } +}