From 0e35254bd8a94137e31ed970c5b153ec34fa0f06 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 10 Mar 2017 15:14:55 -0800 Subject: [PATCH] WIP --- src/frame/mod.rs | 2 +- src/frame/settings.rs | 64 +++++++++++++++++-------- src/proto/mod.rs | 1 + src/proto/settings.rs | 108 ++++++++++++++++++++++++++++++++++++++++++ src/settings.rs | 0 5 files changed, 155 insertions(+), 20 deletions(-) create mode 100644 src/proto/settings.rs create mode 100644 src/settings.rs diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 3769269..0f89055 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -32,7 +32,7 @@ mod util; pub use self::data::Data; pub use self::head::{Head, Kind, StreamId}; -pub use self::settings::Settings; +pub use self::settings::{Settings, SettingSet}; pub use self::unknown::Unknown; const FRAME_HEADER_LEN: usize = 9; diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 73b7f94..dc5fe5b 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -1,12 +1,17 @@ -use frame::{Error, Head, Kind}; +use frame::{Frame, Error, Head, Kind}; use bytes::{Bytes, BytesMut, BufMut, BigEndian}; #[derive(Debug, Clone, Default, Eq, PartialEq)] pub struct Settings { flag: SettingsFlag, // Fields + values: SettingSet, +} + +#[derive(Debug, Clone, Default, Eq, PartialEq)] +pub struct SettingSet { header_table_size: Option, - enable_push: Option, + enable_push: Option, max_concurrent_streams: Option, initial_window_size: Option, max_frame_size: Option, @@ -35,6 +40,20 @@ const ALL: u8 = ACK; // ===== impl Settings ===== impl Settings { + pub fn ack() -> Settings { + Settings { + flag: SettingsFlag::ack(), + .. Settings::default() + } + } + + pub fn new(values: SettingSet) -> Settings { + Settings { + flag: SettingsFlag::empty(), + values: values, + } + } + pub fn load(head: Head, payload: Bytes) -> Result { use self::Setting::*; @@ -54,10 +73,7 @@ impl Settings { } // Return the ACK frame - return Ok(Settings { - flag: flag, - .. Settings::default() - }); + return Ok(Settings::ack()); } // Ensure the payload length is correct, each setting is 6 bytes long. @@ -71,22 +87,22 @@ impl Settings { for raw in payload.chunks(6) { match Setting::load(raw) { Some(HeaderTableSize(val)) => { - settings.header_table_size = Some(val); + settings.values.header_table_size = Some(val); } Some(EnablePush(val)) => { - settings.enable_push = Some(val == 1); + settings.values.enable_push = Some(val); } Some(MaxConcurrentStreams(val)) => { - settings.max_concurrent_streams = Some(val); + settings.values.max_concurrent_streams = Some(val); } Some(InitialWindowSize(val)) => { - settings.initial_window_size = Some(val); + settings.values.initial_window_size = Some(val); } Some(MaxFrameSize(val)) => { - settings.max_frame_size = Some(val); + settings.values.max_frame_size = Some(val); } Some(MaxHeaderListSize(val)) => { - settings.max_header_list_size = Some(val); + settings.values.max_header_list_size = Some(val); } None => {} } @@ -121,32 +137,38 @@ impl Settings { fn for_each(&self, mut f: F) { use self::Setting::*; - if let Some(v) = self.header_table_size { + if let Some(v) = self.values.header_table_size { f(HeaderTableSize(v)); } - if let Some(v) = self.enable_push { - f(EnablePush(if v { 1 } else { 0 })); + if let Some(v) = self.values.enable_push { + f(EnablePush(v)); } - if let Some(v) = self.max_concurrent_streams { + if let Some(v) = self.values.max_concurrent_streams { f(MaxConcurrentStreams(v)); } - if let Some(v) = self.initial_window_size { + if let Some(v) = self.values.initial_window_size { f(InitialWindowSize(v)); } - if let Some(v) = self.max_frame_size { + if let Some(v) = self.values.max_frame_size { f(MaxFrameSize(v)); } - if let Some(v) = self.max_header_list_size { + if let Some(v) = self.values.max_header_list_size { f(MaxHeaderListSize(v)); } } } +impl From for Frame { + fn from(src: Settings) -> Frame { + Frame::Settings(src) + } +} + // ===== impl Setting ===== impl Setting { @@ -204,6 +226,10 @@ impl Setting { // ===== impl SettingsFlag ===== impl SettingsFlag { + pub fn empty() -> SettingsFlag { + SettingsFlag(0) + } + pub fn load(bits: u8) -> SettingsFlag { SettingsFlag(bits & ALL) } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 67922eb..617f8ad 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,5 +1,6 @@ mod framed_read; mod framed_write; +mod settings; use {frame, ConnectionError}; use self::framed_read::FramedRead; diff --git a/src/proto/settings.rs b/src/proto/settings.rs new file mode 100644 index 0000000..6644bde --- /dev/null +++ b/src/proto/settings.rs @@ -0,0 +1,108 @@ +use ConnectionError; +use frame::{self, Frame}; + +use futures::*; + +pub struct Settings { + inner: T, + // Our settings + local: frame::SettingSet, + // Peer settings + remote: frame::SettingSet, + // Number of acks remaining to send to the peer + remaining_acks: usize, + // True when the local settings must be flushed to the remote + is_dirty: bool, +} + +impl Settings + where T: Stream, + T: Sink, +{ + pub fn new(inner: T) -> Settings { + Settings { + inner: inner, + local: frame::SettingSet::default(), + remote: frame::SettingSet::default(), + remaining_acks: 0, + // Always start in the dirty state as sending the settings frame is + // part of the connection handshake + is_dirty: true, + } + } + + fn has_pending_sends(&self) -> bool { + self.is_dirty || self.remaining_acks > 0 + } + + fn try_send_pending(&mut self) -> Poll<(), ConnectionError> { + if self.is_dirty { + // Create the new frame + let frame = frame::Settings::new(self.local.clone()).into(); + try_ready!(self.try_send(frame)); + + self.is_dirty = false; + } + + unimplemented!(); + } + + fn try_send(&mut self, item: frame::Frame) -> Poll<(), ConnectionError> { + if let AsyncSink::NotReady(_) = try!(self.inner.start_send(item)) { + // Ensure that call to `poll_complete` guarantee is called to satisfied + try!(self.inner.poll_complete()); + + return Ok(Async::NotReady); + } + + Ok(Async::Ready(())) + } +} + +impl Stream for Settings + where T: Stream, + T: Sink, +{ + type Item = Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + self.inner.poll() + } +} + +impl Sink for Settings + where T: Stream, + T: Sink, +{ + type SinkItem = Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: Frame) -> StartSend { + // Settings frames take priority, so `item` cannot be sent if there are + // any pending acks OR the local settings have been changed w/o sending + // an associated frame. + if self.has_pending_sends() { + // Try to flush them + try!(self.poll_complete()); + + if self.has_pending_sends() { + return Ok(AsyncSink::NotReady(item)); + } + } + + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), ConnectionError> { + if self.has_pending_sends() { + try_ready!(self.poll_complete()); + } + + self.inner.close() + } +} diff --git a/src/settings.rs b/src/settings.rs new file mode 100644 index 0000000..e69de29