diff --git a/src/proto/connection.rs b/src/proto/connection.rs new file mode 100644 index 0000000..3bd63b4 --- /dev/null +++ b/src/proto/connection.rs @@ -0,0 +1,86 @@ +use {frame, proto, ConnectionError}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::length_delimited; + +use futures::*; + +pub struct Connection { + inner: Inner, +} + +type Inner = + proto::Settings< + proto::PingPong< + proto::FramedWrite< + proto::FramedRead< + length_delimited::FramedRead>>>>; + +impl Connection { + pub fn new(io: T) -> Connection { + // Delimit the frames + let framed_read = length_delimited::Builder::new() + .big_endian() + .length_field_length(3) + .length_adjustment(6) + .num_skip(0) // Don't skip the header + .new_read(io); + + // Map to `Frame` types + let framed_read = proto::FramedRead::new(framed_read); + + // Frame encoder + let mut framed = proto::FramedWrite::new(framed_read); + + // Ok, so this is a **little** hacky, but it works for now. + // + // The ping/pong behavior SHOULD be given highest priority (6.7). + // However, the connection handshake requires the settings frame to be + // sent as the very first one. This needs special handling because + // otherwise there is a race condition where the peer could send its + // settings frame followed immediately by a Ping, in which case, we + // don't want to accidentally send the pong before finishing the + // connection hand shake. + // + // So, to ensure correct ordering, we write the settings frame here + // before fully constructing the connection struct. Technically, `Async` + // operations should not be performed in `new` because this might not + // happen on a task, however we have full control of the I/O and we know + // that the settings frame will get buffered and not actually perform an + // I/O op. + let initial_settings = frame::SettingSet::default(); + let frame = frame::Settings::new(initial_settings.clone()); + assert!(framed.start_send(frame.into()).unwrap().is_ready()); + + // Add ping/pong handler + let ping_pong = proto::PingPong::new(framed); + + // Add settings handler + let connection = proto::Settings::new(ping_pong, initial_settings); + + Connection { + inner: connection, + } + } +} + +impl Stream for Connection { + type Item = frame::Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + self.inner.poll() + } +} + +impl Sink for Connection { + type SinkItem = frame::Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: frame::Frame) -> StartSend { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_complete() + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 617f8ad..b64c6c4 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,65 +1,11 @@ +mod connection; mod framed_read; mod framed_write; +mod ping_pong; mod settings; -use {frame, ConnectionError}; -use self::framed_read::FramedRead; -use self::framed_write::FramedWrite; - -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::length_delimited; - -use futures::*; - -pub struct Connection { - inner: Inner, -} - -type Inner = - FramedWrite< - FramedRead< - length_delimited::FramedRead>>; - -impl Connection { - pub fn new(io: T) -> Connection { - // Delimit the frames - let framed_read = length_delimited::Builder::new() - .big_endian() - .length_field_length(3) - .length_adjustment(6) - .num_skip(0) // Don't skip the header - .new_read(io); - - // Map to `Frame` types - let framed_read = FramedRead::new(framed_read); - - // Frame encoder - let framed = FramedWrite::new(framed_read); - - Connection { - inner: framed, - } - } -} - -impl Stream for Connection { - type Item = frame::Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - self.inner.poll() - } -} - -impl Sink for Connection { - type SinkItem = frame::Frame; - type SinkError = ConnectionError; - - fn start_send(&mut self, item: frame::Frame) -> StartSend { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_complete() - } -} +pub use self::connection::Connection; +pub use self::framed_read::FramedRead; +pub use self::framed_write::FramedWrite; +pub use self::ping_pong::PingPong; +pub use self::settings::Settings; diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs new file mode 100644 index 0000000..1f7c77c --- /dev/null +++ b/src/proto/ping_pong.rs @@ -0,0 +1,47 @@ +use ConnectionError; +use frame::Frame; + +use futures::*; + +pub struct PingPong { + inner: T, +} + +impl PingPong + where T: Stream, + T: Sink, +{ + pub fn new(inner: T) -> PingPong { + PingPong { + inner: inner, + } + } +} + +impl Stream for PingPong + where T: Stream, + T: Sink, +{ + type Item = Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + self.inner.poll() + } +} + +impl Sink for PingPong + where T: Stream, + T: Sink, +{ + type SinkItem = Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: Frame) -> StartSend { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_complete() + } +} diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 6644bde..2024331 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -15,19 +15,22 @@ pub struct Settings { is_dirty: bool, } +/* + * TODO: + * - Settings ack timeout for connection error + */ + impl Settings where T: Stream, T: Sink, { - pub fn new(inner: T) -> Settings { + pub fn new(inner: T, local: frame::SettingSet) -> Settings { Settings { inner: inner, - local: frame::SettingSet::default(), + local: local, remote: frame::SettingSet::default(), remaining_acks: 0, - // Always start in the dirty state as sending the settings frame is - // part of the connection handshake - is_dirty: true, + is_dirty: false, } } @@ -37,14 +40,20 @@ impl Settings fn try_send_pending(&mut self) -> Poll<(), ConnectionError> { if self.is_dirty { - // Create the new frame let frame = frame::Settings::new(self.local.clone()).into(); try_ready!(self.try_send(frame)); self.is_dirty = false; } - unimplemented!(); + while self.remaining_acks > 0 { + let frame = frame::Settings::ack().into(); + try_ready!(self.try_send(frame)); + + self.remaining_acks -= 1; + } + + Ok(Async::Ready(())) } fn try_send(&mut self, item: frame::Frame) -> Poll<(), ConnectionError> { @@ -82,13 +91,8 @@ impl Sink for Settings // Settings frames take priority, so `item` cannot be sent if there are // any pending acks OR the local settings have been changed w/o sending // an associated frame. - if self.has_pending_sends() { - // Try to flush them - try!(self.poll_complete()); - - if self.has_pending_sends() { - return Ok(AsyncSink::NotReady(item)); - } + if !try!(self.try_send_pending()).is_ready() { + return Ok(AsyncSink::NotReady(item)); } self.inner.start_send(item) @@ -99,8 +103,8 @@ impl Sink for Settings } fn close(&mut self) -> Poll<(), ConnectionError> { - if self.has_pending_sends() { - try_ready!(self.poll_complete()); + if !try!(self.try_send_pending()).is_ready() { + return Ok(Async::NotReady); } self.inner.close()