From 32d4c2d5a9f1c96f7a1e8d2f475ba084a76cbd6a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 11 Aug 2017 12:00:22 -0700 Subject: [PATCH] Many more changes --- Cargo.toml | 3 ++ src/frame/mod.rs | 2 +- src/frame/settings.rs | 59 +++++++++++++------------- src/proto/codec.rs | 69 +++++++++++++++++++++++++++++++ src/proto/connection.rs | 26 ++---------- src/proto/framed_read.rs | 8 ++++ src/proto/framed_write.rs | 8 ++++ src/proto/mod.rs | 8 ++-- src/proto/settings.rs | 32 +++++++------- src/proto/streams/flow_control.rs | 15 +++++++ src/proto/streams/recv.rs | 11 +++-- src/proto/streams/send.rs | 57 +++++++++++++++++++++++++ src/proto/streams/store.rs | 29 +++++++------ src/proto/streams/streams.rs | 15 +++++-- 14 files changed, 246 insertions(+), 96 deletions(-) create mode 100644 src/proto/codec.rs diff --git a/Cargo.toml b/Cargo.toml index f21d939..de03638 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,3 +34,6 @@ openssl = { version = "0.9.14", "features" = ["v102"] } tokio-openssl = "0.1.3" env_logger = "0.4.3" io-dump = { git = "https://github.com/carllerche/io-dump" } + +[replace] +"tokio-io:0.1.2" = { git = "https://github.com/tokio-rs/tokio-io" } diff --git a/src/frame/mod.rs b/src/frame/mod.rs index c43aeec..9b2ab7e 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -46,7 +46,7 @@ pub use self::headers::{Headers, PushPromise, Continuation, Pseudo}; pub use self::ping::Ping; pub use self::priority::{Priority, StreamDependency}; pub use self::reset::Reset; -pub use self::settings::{Settings, SettingSet}; +pub use self::settings::Settings; pub use self::stream_id::StreamId; pub use self::window_update::WindowUpdate; diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 0f4e03d..d42084c 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -6,11 +6,6 @@ use bytes::{BytesMut, BufMut, BigEndian}; pub struct Settings { flags: SettingsFlags, // Fields - values: SettingSet, -} - -#[derive(Debug, Clone, Default, Eq, PartialEq)] -pub struct SettingSet { header_table_size: Option, enable_push: Option, max_concurrent_streams: Option, @@ -19,20 +14,6 @@ pub struct SettingSet { max_header_list_size: Option, } -impl SettingSet { - pub fn enable_push(&self) -> Option { - self.enable_push.map(|n| n != 0) - } - - pub fn initial_window_size(&self) -> Option { - self.initial_window_size - } - - pub fn max_concurrent_streams(&self) -> Option { - self.max_concurrent_streams - } -} - /// An enum that lists all valid settings that can be sent in a SETTINGS /// frame. /// @@ -73,6 +54,22 @@ impl Settings { self.flags.is_ack() } + pub fn enable_push(&self) -> Option { + self.enable_push.map(|n| n != 0) + } + + pub fn initial_window_size(&self) -> Option { + self.initial_window_size + } + + pub fn max_concurrent_streams(&self) -> Option { + self.max_concurrent_streams + } + + pub fn max_frame_size(&self) -> Option { + self.max_frame_size + } + pub fn load(head: Head, payload: &[u8]) -> Result { use self::Setting::*; @@ -107,12 +104,12 @@ impl Settings { for raw in payload.chunks(6) { match Setting::load(raw) { Some(HeaderTableSize(val)) => { - settings.values.header_table_size = Some(val); + settings.header_table_size = Some(val); } Some(EnablePush(val)) => { match val { 0 | 1 => { - settings.values.enable_push = Some(val); + settings.enable_push = Some(val); } _ => { return Err(Error::InvalidSettingValue); @@ -120,24 +117,24 @@ impl Settings { } } Some(MaxConcurrentStreams(val)) => { - settings.values.max_concurrent_streams = Some(val); + settings.max_concurrent_streams = Some(val); } Some(InitialWindowSize(val)) => { if val as usize > MAX_INITIAL_WINDOW_SIZE { return Err(Error::InvalidSettingValue); } else { - settings.values.initial_window_size = Some(val); + settings.initial_window_size = Some(val); } } Some(MaxFrameSize(val)) => { if val < DEFAULT_MAX_FRAME_SIZE || val as usize > MAX_MAX_FRAME_SIZE { return Err(Error::InvalidSettingValue); } else { - settings.values.max_frame_size = Some(val); + settings.max_frame_size = Some(val); } } Some(MaxHeaderListSize(val)) => { - settings.values.max_header_list_size = Some(val); + settings.max_header_list_size = Some(val); } None => {} } @@ -171,27 +168,27 @@ impl Settings { fn for_each(&self, mut f: F) { use self::Setting::*; - if let Some(v) = self.values.header_table_size { + if let Some(v) = self.header_table_size { f(HeaderTableSize(v)); } - if let Some(v) = self.values.enable_push { + if let Some(v) = self.enable_push { f(EnablePush(v)); } - if let Some(v) = self.values.max_concurrent_streams { + if let Some(v) = self.max_concurrent_streams { f(MaxConcurrentStreams(v)); } - if let Some(v) = self.values.initial_window_size { + if let Some(v) = self.initial_window_size { f(InitialWindowSize(v)); } - if let Some(v) = self.values.max_frame_size { + if let Some(v) = self.max_frame_size { f(MaxFrameSize(v)); } - if let Some(v) = self.values.max_header_list_size { + if let Some(v) = self.max_header_list_size { f(MaxHeaderListSize(v)); } } diff --git a/src/proto/codec.rs b/src/proto/codec.rs new file mode 100644 index 0000000..460dec4 --- /dev/null +++ b/src/proto/codec.rs @@ -0,0 +1,69 @@ +use super::*; +use futures::*; + +#[derive(Debug)] +pub struct Codec { + inner: FramedRead>, +} + +impl Codec { + pub fn apply_remote_settings(&mut self, frame: &frame::Settings) { + self.framed_read().apply_remote_settings(frame); + self.framed_write().apply_remote_settings(frame); + } + + fn framed_read(&mut self) -> &mut FramedRead> { + &mut self.inner + } + + fn framed_write(&mut self) -> &mut FramedWrite { + self.inner.get_mut() + } +} + +impl Codec + where T: AsyncRead + AsyncWrite, + B: Buf, +{ + pub fn from_framed(inner: FramedRead>) -> Self { + Codec { inner } + } +} + +impl Codec + where T: AsyncWrite, + B: Buf, +{ + pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + self.inner.poll_ready() + } + +} + +impl futures::Stream for Codec + where T: AsyncRead, +{ + type Item = Frame; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, ConnectionError> { + use futures::Stream; + self.inner.poll() + } +} + +impl Sink for Codec + where T: AsyncWrite, + B: Buf, +{ + type SinkItem = Frame; + type SinkError = ConnectionError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.inner.poll_complete() + } +} diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 36cd65e..217ded2 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -47,22 +47,6 @@ impl Connection } } - pub fn update_local_settings(&mut self, _local: frame::SettingSet) -> Result<(), ConnectionError> { - unimplemented!(); - } - - pub fn remote_initial_window_size(&self) -> u32 { - unimplemented!(); - } - - pub fn remote_max_concurrent_streams(&self) -> Option { - unimplemented!(); - } - - pub fn remote_push_enabled(&self) -> Option { - unimplemented!(); - } - /// Returns `Ready` when the connection is ready to receive a frame. pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { try_ready!(self.poll_send_ready()); @@ -125,8 +109,6 @@ impl Connection Some(Settings(frame)) => { trace!("recv SETTINGS; frame={:?}", frame); self.settings.recv_settings(frame); - - // TODO: ACK must be sent THEN settings applied. } Some(GoAway(frame)) => { // TODO: handle the last_stream_id. Also, should this be @@ -160,12 +142,10 @@ impl Connection /// Returns `Ready` when the `Connection` is ready to receive a frame from /// the socket. fn poll_recv_ready(&mut self) -> Poll<(), ConnectionError> { - // Pong, settings ack, and stream refusals are high priority frames to - // send. If the write buffer is full, we stop reading any further frames - // until these high priority writes can be committed to the buffer. - + // The order of these calls don't really matter too much as only one + // should have pending work. try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); - try_ready!(self.settings.send_pending_ack(&mut self.codec)); + try_ready!(self.settings.send_pending_ack(&mut self.codec, &mut self.streams)); try_ready!(self.streams.send_pending_refusal(&mut self.codec)); Ok(().into()) diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 9dd89c5..d6774d6 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -48,6 +48,10 @@ impl FramedRead { } } + pub fn apply_remote_settings(&mut self, _settings: &frame::Settings) { + // TODO: Is this needed? + } + fn decode_frame(&mut self, mut bytes: BytesMut) -> Result, ConnectionError> { trace!("decoding frame from {}B", bytes.len()); @@ -142,6 +146,10 @@ impl FramedRead { Ok(Some(frame)) } + + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut() + } } impl futures::Stream for FramedRead diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 9f2fa6f..c95577c 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -90,6 +90,14 @@ impl FramedWrite } } +impl FramedWrite { + pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { + if let Some(val) = settings.max_frame_size() { + self.max_frame_size = val; + } + } +} + impl Sink for FramedWrite where T: AsyncWrite, B: Buf, diff --git a/src/proto/mod.rs b/src/proto/mod.rs index e7f1d02..9be1465 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,3 +1,4 @@ +mod codec; mod connection; mod framed_read; mod framed_write; @@ -8,6 +9,7 @@ mod streams; pub use self::connection::Connection; pub use self::streams::{Streams, StreamRef, Chunk}; +use self::codec::Codec; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; use self::ping_pong::PingPong; @@ -52,10 +54,6 @@ pub struct WindowUpdate { increment: WindowSize, } -type Codec = - FramedRead< - FramedWrite>; - // Constants pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; pub const MAX_WINDOW_SIZE: WindowSize = ::std::u32::MAX; @@ -90,7 +88,7 @@ pub fn from_framed_write(framed_write: FramedWrite) .max_frame_length(frame::DEFAULT_MAX_FRAME_SIZE as usize) .new_read(framed_write); - let codec = FramedRead::new(framed); + let codec = Codec::from_framed(FramedRead::new(framed)); Connection::new(codec) } diff --git a/src/proto/settings.rs b/src/proto/settings.rs index f138672..5952b31 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -3,13 +3,16 @@ use proto::*; #[derive(Debug)] pub struct Settings { - pending_ack: bool, + /// Received SETTINGS frame pending processing. The ACK must be written to + /// the socket first then the settings applied **before** receiving any + /// further frames. + pending: Option, } impl Settings { pub fn new() -> Self { Settings { - pending_ack: false, + pending: None, } } @@ -18,30 +21,31 @@ impl Settings { debug!("received remote settings ack"); // TODO: handle acks } else { - assert!(!self.pending_ack); - self.pending_ack = true; + assert!(self.pending.is_none()); + self.pending = Some(frame); } } - pub fn send_pending_ack(&mut self, dst: &mut Codec) + pub fn send_pending_ack(&mut self, + dst: &mut Codec, + streams: &mut Streams) -> Poll<(), ConnectionError> where T: AsyncWrite, B: Buf, { - if self.pending_ack { + if let Some(ref settings) = self.pending { let frame = frame::Settings::ack(); - match dst.start_send(frame.into())? { - AsyncSink::Ready => { - self.pending_ack = false; - return Ok(().into()); - } - AsyncSink::NotReady(_) => { - return Ok(Async::NotReady); - } + if let AsyncSink::NotReady(_) = dst.start_send(frame.into())? { + return Ok(Async::NotReady); } + + dst.apply_remote_settings(settings); + streams.apply_remote_settings(settings); } + self.pending = None; + Ok(().into()) } } diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index d54e203..b2689da 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -48,6 +48,21 @@ impl FlowControl { } } + /// Reduce future capacity of the window. + /// + /// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. + pub fn shrink_window(&mut self, dec: WindowSize) { + /* + if decr < self.next_window_update { + self.next_window_update -= decr + } else { + self.underflow += decr - self.next_window_update; + self.next_window_update = 0; + } + */ + } + + /// Claims the provided amount from the window, if there is enough space. /// /// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 70cc3bd..3af49c0 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -221,14 +221,15 @@ impl Recv where B: Buf { pub fn recv_push_promise(&mut self, frame: frame::PushPromise, - stream: &mut store::Ptr) + stream: store::Key, + store: &mut Store) -> Result<(), ConnectionError> { // First, make sure that the values are legit self.ensure_can_reserve::

(frame.promised_id())?; // Make sure that the stream state is valid - stream.state.ensure_recv_open()?; + store[stream].state.ensure_recv_open()?; // TODO: Streams in the reserved states do not count towards the concurrency // limit. However, it seems like there should be a cap otherwise this @@ -247,16 +248,18 @@ impl Recv where B: Buf { let mut new_stream = Stream::new(frame.promised_id()); new_stream.state.reserve_remote(); - let mut ppp = stream.pending_push_promises.take(); + let mut ppp = store[stream].pending_push_promises.take(); { // Store the stream - let mut new_stream = stream.store() + let mut new_stream = store .insert(frame.promised_id(), new_stream); ppp.push::(&mut new_stream); } + let stream = &mut store[stream]; + stream.pending_push_promises = ppp; stream.notify_recv(); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 53d3f77..565c871 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -273,6 +273,63 @@ impl Send where B: Buf { 0 } + pub fn apply_remote_settings(&mut self, + settings: &frame::Settings, + store: &mut Store) { + if let Some(val) = settings.max_concurrent_streams() { + self.max_streams = Some(val as usize); + } + + // Applies an update to the remote endpoint's initial window size. + // + // Per RFC 7540 ยง6.9.2: + // + // In addition to changing the flow-control window for streams that are + // not yet active, a SETTINGS frame can alter the initial flow-control + // window size for streams with active flow-control windows (that is, + // streams in the "open" or "half-closed (remote)" state). When the + // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust + // the size of all stream flow-control windows that it maintains by the + // difference between the new value and the old value. + // + // A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available + // space in a flow-control window to become negative. A sender MUST + // track the negative flow-control window and MUST NOT send new + // flow-controlled frames until it receives WINDOW_UPDATE frames that + // cause the flow-control window to become positive. + if let Some(val) = settings.initial_window_size() { + let old_val = self.init_window_sz; + self.init_window_sz = val; + + if val < old_val { + let dec = old_val - val; + + store.for_each(|mut stream| { + let stream = &mut *stream; + + if let Some(flow) = stream.state.send_flow_control() { + flow.shrink_window(val); + + // Update the unadvertised number as well + if stream.unadvertised_send_window < dec { + stream.unadvertised_send_window = 0; + } else { + stream.unadvertised_send_window -= dec; + } + } + }); + } else if val > old_val { + let inc = val - old_val; + + store.for_each(|mut stream| { + if let Some(flow) = stream.state.send_flow_control() { + unimplemented!(); + } + }); + } + } + } + pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), ConnectionError> { if id >= self.next_stream_id { return Err(ProtocolError.into()); diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 6b5937e..809f344 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -16,7 +16,7 @@ pub(super) struct Store { /// "Pointer" to an entry in the store pub(super) struct Ptr<'a, B: 'a> { key: Key, - store: &'a mut Store, + slab: &'a mut slab::Slab>, } /// References an entry in the store. @@ -72,7 +72,7 @@ impl Store { pub fn resolve(&mut self, key: Key) -> Ptr { Ptr { key: key, - store: self, + slab: &mut self.slab, } } @@ -80,7 +80,7 @@ impl Store { if let Some(&key) = self.ids.get(id) { Some(Ptr { key: Key(key), - store: self, + slab: &mut self.slab, }) } else { None @@ -93,7 +93,7 @@ impl Store { Ptr { key: Key(key), - store: self, + slab: &mut self.slab, } } @@ -117,10 +117,13 @@ impl Store { } pub fn for_each(&mut self, mut f: F) - where F: FnMut(&mut Stream) + where F: FnMut(Ptr) { - for &id in self.ids.values() { - f(&mut self.slab[id]) + for &key in self.ids.values() { + f(Ptr { + key: Key(key), + slab: &mut self.slab, + }); } } } @@ -265,19 +268,15 @@ impl<'a, B: 'a> Ptr<'a, B> { self.key } - pub fn store(&mut self) -> &mut Store { - &mut self.store - } - pub fn resolve(&mut self, key: Key) -> Ptr { Ptr { key: key, - store: self.store, + slab: &mut *self.slab, } } pub fn into_mut(self) -> &'a mut Stream { - &mut self.store.slab[self.key.0] + &mut self.slab[self.key.0] } } @@ -285,13 +284,13 @@ impl<'a, B: 'a> ops::Deref for Ptr<'a, B> { type Target = Stream; fn deref(&self) -> &Stream { - &self.store.slab[self.key.0] + &self.slab[self.key.0] } } impl<'a, B: 'a> ops::DerefMut for Ptr<'a, B> { fn deref_mut(&mut self) -> &mut Stream { - &mut self.store.slab[self.key.0] + &mut self.slab[self.key.0] } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index f928547..86b016f 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -154,7 +154,9 @@ impl Streams let me = &mut *me; let actions = &mut me.actions; - me.store.for_each(|stream| actions.recv.recv_err(err, stream)); + me.store.for_each(|mut stream| { + actions.recv.recv_err(err, &mut *stream) + }); } pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) @@ -189,11 +191,11 @@ impl Streams let id = frame.stream_id(); let mut stream = match me.store.find_mut(&id) { - Some(stream) => stream, + Some(stream) => stream.key(), None => return Err(ProtocolError.into()), }; - me.actions.recv.recv_push_promise::

(frame, &mut stream) + me.actions.recv.recv_push_promise::

(frame, stream, &mut me.store) } pub fn next_incoming(&mut self) -> Option> { @@ -247,6 +249,13 @@ impl Streams me.actions.send.poll_complete(&mut me.store, dst) } + + pub fn apply_remote_settings(&mut self, frame: &frame::Settings) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions.send.apply_remote_settings(frame, &mut me.store); + } } impl Streams