diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 220a150..2fba730 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -20,10 +20,14 @@ pub struct SettingSet { } impl SettingSet { - pub fn initial_window_size(&self) -> u32 { - self.initial_window_size.unwrap_or(65_535) + pub fn enable_push(&self) -> Option { + self.enable_push.map(|n| n != 0) } - + + pub fn initial_window_size(&self) -> Option { + self.initial_window_size + } + pub fn max_concurrent_streams(&self) -> Option { self.max_concurrent_streams } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 587e27a..28232cb 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -53,6 +53,18 @@ impl Connection self.inner.update_local_settings(local) } + pub fn remote_initial_window_size(&self) -> u32 { + self.inner.remote_initial_window_size() + } + + pub fn remote_max_concurrent_streams(&self) -> Option { + self.inner.remote_max_concurrent_streams() + } + + pub fn remote_push_enabled(&self) -> Option { + self.inner.remote_push_enabled() + } + pub fn start_ping(&mut self, body: PingPayload) -> StartSend { self.inner.start_ping(body) } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index a9e2dae..9128e57 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -317,28 +317,30 @@ impl ApplySettings for FlowControl fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.apply_local_settings(set)?; - let old_window_size = self.local_initial; - let new_window_size = set.initial_window_size(); - if new_window_size == old_window_size { - return Ok(()); - } + if let Some(new_window_size) = set.initial_window_size() { + let old_window_size = self.local_initial; + if new_window_size == old_window_size { + return Ok(()); + } - self.inner.update_inital_recv_window_size(old_window_size, new_window_size); - self.local_initial = new_window_size; + self.inner.update_inital_recv_window_size(old_window_size, new_window_size); + self.local_initial = new_window_size; + } Ok(()) } fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.apply_remote_settings(set)?; - let old_window_size = self.remote_initial; - let new_window_size = set.initial_window_size(); - if new_window_size == old_window_size { - return Ok(()); - } + if let Some(new_window_size) = set.initial_window_size() { + let old_window_size = self.remote_initial; + if new_window_size == old_window_size { + return Ok(()); + } - self.inner.update_inital_send_window_size(old_window_size, new_window_size); - self.remote_initial = new_window_size; + self.inner.update_inital_send_window_size(old_window_size, new_window_size); + self.remote_initial = new_window_size; + } Ok(()) } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 0c18332..53e2ef4 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -30,7 +30,7 @@ use self::framed_read::FramedRead; use self::framed_write::FramedWrite; use self::ping_pong::{ControlPing, PingPayload, PingPong}; use self::ready::ReadySink; -use self::settings::{ApplySettings, /*ControlSettings,*/ Settings}; +use self::settings::{ApplySettings, ControlSettings, Settings}; use self::stream_recv_close::StreamRecvClose; use self::stream_recv_open::StreamRecvOpen; use self::stream_send_close::StreamSendClose; @@ -165,7 +165,7 @@ impl WindowUpdate { /// Create a full H2 transport from an I/O handle. /// /// This is called as the final step of the client handshake future. -pub fn from_io(io: T, settings: frame::SettingSet) +pub fn from_io(io: T, local_settings: frame::SettingSet) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, @@ -177,10 +177,9 @@ pub fn from_io(io: T, settings: frame::SettingSet) // weird, but oh well... // // We first create a Settings directly around a framed writer - let transport = Settings::new( - framed_write, settings); + let transport = Settings::new(framed_write, local_settings.clone()); - from_server_handshaker(transport) + from_server_handshaker(transport, local_settings) } /// Create a transport prepared to handle the server handshake. @@ -198,16 +197,18 @@ pub fn server_handshaker(io: T, settings: frame::SettingSet) } /// Create a full H2 transport from the server handshaker -pub fn from_server_handshaker(settings: Settings>) +pub fn from_server_handshaker(settings: Settings>, + local_settings: frame::SettingSet) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, B: IntoBuf, { - let initial_recv_window_size = settings.local_settings().initial_window_size(); - let initial_send_window_size = settings.remote_settings().initial_window_size(); - let local_max_concurrency = settings.local_settings().max_concurrent_streams(); - let remote_max_concurrency = settings.remote_settings().max_concurrent_streams(); + let initial_recv_window_size = local_settings.initial_window_size().unwrap_or(65_535); + let local_max_concurrency = local_settings.max_concurrent_streams(); + + let initial_send_window_size = settings.remote_initial_window_size(); + let remote_max_concurrency = settings.remote_max_concurrent_streams(); // Replace Settings' writer with a full transport. let transport = settings.swap_inner(|io| { diff --git a/src/proto/settings.rs b/src/proto/settings.rs index b31aaf2..66cc98f 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -11,8 +11,10 @@ use std::io; /// above---Connection). pub trait ControlSettings { fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>; - fn local_settings(&self) -> &SettingSet; - fn remote_settings(&self) -> &SettingSet; + + fn remote_push_enabled(&self) -> Option; + fn remote_max_concurrent_streams(&self) -> Option; + fn remote_initial_window_size(&self) -> WindowSize; } /// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to @@ -27,17 +29,15 @@ pub struct Settings { // Upstream transport inner: T, - // Our settings - local: SettingSet, - - // Peer settings - remote: SettingSet, + remote_push_enabled: Option, + remote_max_concurrent_streams: Option, + remote_initial_window_size: WindowSize, // Number of acks remaining to send to the peer remaining_acks: usize, - // True when the local settings must be flushed to the remote - local_valid_id_dirty: bool, + // Holds a new set of local values to be applied. + pending_local: Option, // True when we have received a settings frame from the remote. received_remote: bool, @@ -49,46 +49,34 @@ impl Settings pub fn new(inner: T, local: SettingSet) -> Settings { Settings { inner: inner, - local: local, - remote: SettingSet::default(), + pending_local: Some(local), + remote_push_enabled: None, + remote_max_concurrent_streams: None, + remote_initial_window_size: 65_535, remaining_acks: 0, - local_valid_id_dirty: true, received_remote: false, } } - // TODO remove this - pub fn local_settings(&self) -> &SettingSet { - &self.local - } - - // TODO replace this with settings a client needs to know about (concurrency, initial - // window size, etc). - pub fn remote_settings(&self) -> &SettingSet { - &self.local - } - /// Swap the inner transport while maintaining the current state. pub fn swap_inner T2>(self, f: F) -> Settings { let inner = f(self.inner); Settings { inner: inner, - local: self.local, - remote: self.remote, + remote_push_enabled: self.remote_push_enabled, + remote_max_concurrent_streams: self.remote_max_concurrent_streams, + remote_initial_window_size: self.remote_initial_window_size, remaining_acks: self.remaining_acks, - local_valid_id_dirty: self.local_valid_id_dirty, + pending_local: self.pending_local, received_remote: self.received_remote, } } fn try_send_pending(&mut self) -> Poll<(), ConnectionError> { - trace!("try_send_pending; dirty={} acks={}", self.local_valid_id_dirty, self.remaining_acks); - if self.local_valid_id_dirty { - let frame = frame::Settings::new(self.local.clone()); - try_ready!(self.try_send(frame)); - - self.local_valid_id_dirty = false; + trace!("try_send_pending; dirty={} acks={}", self.pending_local.is_some(), self.remaining_acks); + if let Some(local) = self.pending_local.take() { + try_ready!(self.try_send_local(local)); } while self.remaining_acks > 0 { @@ -101,9 +89,19 @@ impl Settings Ok(Async::Ready(())) } - fn try_send(&mut self, item: frame::Settings) -> Poll<(), ConnectionError> { + fn try_send_local(&mut self, local: SettingSet) -> Poll<(), ConnectionError> { + let frame = frame::Settings::new(local.clone()).into(); + if self.try_send(frame)?.is_not_ready() { + self.pending_local = Some(local); + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + } + + fn try_send(&mut self, frame: frame::Settings) -> Poll<(), ConnectionError> { trace!("try_send"); - if self.inner.start_send(item.into())?.is_ready() { + if self.inner.start_send(frame.into())?.is_ready() { Ok(Async::Ready(())) } else { Ok(Async::NotReady) @@ -111,19 +109,24 @@ impl Settings } } -impl ControlSettings for Settings{ - fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { - self.local = local; - self.local_valid_id_dirty = true; +impl ControlSettings for Settings + where T: Sink, SinkError = ConnectionError>, +{ + fn update_local_settings(&mut self, local: SettingSet) -> Result<(), ConnectionError> { + self.try_send_local(local)?; Ok(()) } - fn local_settings(&self) -> &SettingSet { - &self.local + fn remote_initial_window_size(&self) -> u32 { + self.remote_initial_window_size } - fn remote_settings(&self) -> &SettingSet { - &self.remote + fn remote_max_concurrent_streams(&self) -> Option { + self.remote_max_concurrent_streams + } + + fn remote_push_enabled(&self) -> Option { + self.remote_push_enabled } } @@ -147,7 +150,16 @@ impl Stream for Settings // acknowledgements. let settings = v.into_set(); self.inner.apply_remote_settings(&settings)?; - self.remote = settings; + + if let Some(sz) = settings.initial_window_size() { + self.remote_initial_window_size = sz; + } + if let Some(max) = settings.max_concurrent_streams() { + self.remote_max_concurrent_streams = Some(max); + } + if let Some(ok) = settings.enable_push() { + self.remote_push_enabled = Some(ok); + } self.remaining_acks += 1; let _ = try!(self.try_send_pending()); diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs index 15c7282..74f70d7 100644 --- a/src/proto/stream_recv_open.rs +++ b/src/proto/stream_recv_open.rs @@ -66,7 +66,9 @@ impl ApplySettings for StreamRecvOpen { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.max_concurrency = set.max_concurrent_streams(); - self.initial_window_size = set.initial_window_size(); + if let Some(sz) = set.initial_window_size() { + self.initial_window_size = sz; + } self.inner.apply_local_settings(set) } diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs index b6701ac..13be1a0 100644 --- a/src/proto/stream_send_open.rs +++ b/src/proto/stream_send_open.rs @@ -38,7 +38,9 @@ impl ApplySettings for StreamSendOpen { fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { self.max_concurrency = set.max_concurrent_streams(); - self.initial_window_size = set.initial_window_size(); + if let Some(sz) = set.initial_window_size() { + self.initial_window_size = sz; + } self.inner.apply_remote_settings(set) } } diff --git a/src/server.rs b/src/server.rs index fac683a..b36c419 100644 --- a/src/server.rs +++ b/src/server.rs @@ -46,12 +46,13 @@ pub fn handshake2(io: T) -> Handshake where T: AsyncRead + AsyncWrite + 'static, B: 'static, // TODO: Why is this required but not in client? { - let transport = proto::server_handshaker(io, Default::default()); + let local_settings = frame::SettingSet::default(); + let transport = proto::server_handshaker(io, local_settings.clone()); // Flush pending settings frame and then wait for the client preface let handshake = Flush::new(transport) .and_then(ReadPreface::new) - .map(proto::from_server_handshaker) + .map(move |t| proto::from_server_handshaker(t, local_settings)) ; Handshake { inner: Box::new(handshake) }