narrow the surface area of the ControlSettings api to expose only a few remote settings

This commit is contained in:
Oliver Gould
2017-07-23 18:32:22 +00:00
parent 82ba0dde71
commit df5f31a63c
8 changed files with 110 additions and 74 deletions

View File

@@ -20,8 +20,12 @@ pub struct SettingSet {
}
impl SettingSet {
pub fn initial_window_size(&self) -> u32 {
self.initial_window_size.unwrap_or(65_535)
pub fn enable_push(&self) -> Option<bool> {
self.enable_push.map(|n| n != 0)
}
pub fn initial_window_size(&self) -> Option<u32> {
self.initial_window_size
}
pub fn max_concurrent_streams(&self) -> Option<u32> {

View File

@@ -53,6 +53,18 @@ impl<T, P, B> Connection<T, P, B>
self.inner.update_local_settings(local)
}
pub fn remote_initial_window_size(&self) -> u32 {
self.inner.remote_initial_window_size()
}
pub fn remote_max_concurrent_streams(&self) -> Option<u32> {
self.inner.remote_max_concurrent_streams()
}
pub fn remote_push_enabled(&self) -> Option<bool> {
self.inner.remote_push_enabled()
}
pub fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}

View File

@@ -317,28 +317,30 @@ impl<T> ApplySettings for FlowControl<T>
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)?;
if let Some(new_window_size) = set.initial_window_size() {
let old_window_size = self.local_initial;
let new_window_size = set.initial_window_size();
if new_window_size == old_window_size {
return Ok(());
}
self.inner.update_inital_recv_window_size(old_window_size, new_window_size);
self.local_initial = new_window_size;
}
Ok(())
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)?;
if let Some(new_window_size) = set.initial_window_size() {
let old_window_size = self.remote_initial;
let new_window_size = set.initial_window_size();
if new_window_size == old_window_size {
return Ok(());
}
self.inner.update_inital_send_window_size(old_window_size, new_window_size);
self.remote_initial = new_window_size;
}
Ok(())
}
}

View File

@@ -30,7 +30,7 @@ use self::framed_read::FramedRead;
use self::framed_write::FramedWrite;
use self::ping_pong::{ControlPing, PingPayload, PingPong};
use self::ready::ReadySink;
use self::settings::{ApplySettings, /*ControlSettings,*/ Settings};
use self::settings::{ApplySettings, ControlSettings, Settings};
use self::stream_recv_close::StreamRecvClose;
use self::stream_recv_open::StreamRecvOpen;
use self::stream_send_close::StreamSendClose;
@@ -165,7 +165,7 @@ impl WindowUpdate {
/// Create a full H2 transport from an I/O handle.
///
/// This is called as the final step of the client handshake future.
pub fn from_io<T, P, B>(io: T, settings: frame::SettingSet)
pub fn from_io<T, P, B>(io: T, local_settings: frame::SettingSet)
-> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
@@ -177,10 +177,9 @@ pub fn from_io<T, P, B>(io: T, settings: frame::SettingSet)
// weird, but oh well...
//
// We first create a Settings directly around a framed writer
let transport = Settings::new(
framed_write, settings);
let transport = Settings::new(framed_write, local_settings.clone());
from_server_handshaker(transport)
from_server_handshaker(transport, local_settings)
}
/// Create a transport prepared to handle the server handshake.
@@ -198,16 +197,18 @@ pub fn server_handshaker<T, B>(io: T, settings: frame::SettingSet)
}
/// Create a full H2 transport from the server handshaker
pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf>>)
pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf>>,
local_settings: frame::SettingSet)
-> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
let initial_recv_window_size = settings.local_settings().initial_window_size();
let initial_send_window_size = settings.remote_settings().initial_window_size();
let local_max_concurrency = settings.local_settings().max_concurrent_streams();
let remote_max_concurrency = settings.remote_settings().max_concurrent_streams();
let initial_recv_window_size = local_settings.initial_window_size().unwrap_or(65_535);
let local_max_concurrency = local_settings.max_concurrent_streams();
let initial_send_window_size = settings.remote_initial_window_size();
let remote_max_concurrency = settings.remote_max_concurrent_streams();
// Replace Settings' writer with a full transport.
let transport = settings.swap_inner(|io| {

View File

@@ -11,8 +11,10 @@ use std::io;
/// above---Connection).
pub trait ControlSettings {
fn update_local_settings(&mut self, set: frame::SettingSet) -> Result<(), ConnectionError>;
fn local_settings(&self) -> &SettingSet;
fn remote_settings(&self) -> &SettingSet;
fn remote_push_enabled(&self) -> Option<bool>;
fn remote_max_concurrent_streams(&self) -> Option<u32>;
fn remote_initial_window_size(&self) -> WindowSize;
}
/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to
@@ -27,17 +29,15 @@ pub struct Settings<T> {
// Upstream transport
inner: T,
// Our settings
local: SettingSet,
// Peer settings
remote: SettingSet,
remote_push_enabled: Option<bool>,
remote_max_concurrent_streams: Option<u32>,
remote_initial_window_size: WindowSize,
// Number of acks remaining to send to the peer
remaining_acks: usize,
// True when the local settings must be flushed to the remote
local_valid_id_dirty: bool,
// Holds a new set of local values to be applied.
pending_local: Option<SettingSet>,
// True when we have received a settings frame from the remote.
received_remote: bool,
@@ -49,46 +49,34 @@ impl<T, U> Settings<T>
pub fn new(inner: T, local: SettingSet) -> Settings<T> {
Settings {
inner: inner,
local: local,
remote: SettingSet::default(),
pending_local: Some(local),
remote_push_enabled: None,
remote_max_concurrent_streams: None,
remote_initial_window_size: 65_535,
remaining_acks: 0,
local_valid_id_dirty: true,
received_remote: false,
}
}
// TODO remove this
pub fn local_settings(&self) -> &SettingSet {
&self.local
}
// TODO replace this with settings a client needs to know about (concurrency, initial
// window size, etc).
pub fn remote_settings(&self) -> &SettingSet {
&self.local
}
/// Swap the inner transport while maintaining the current state.
pub fn swap_inner<T2, F: FnOnce(T) -> T2>(self, f: F) -> Settings<T2> {
let inner = f(self.inner);
Settings {
inner: inner,
local: self.local,
remote: self.remote,
remote_push_enabled: self.remote_push_enabled,
remote_max_concurrent_streams: self.remote_max_concurrent_streams,
remote_initial_window_size: self.remote_initial_window_size,
remaining_acks: self.remaining_acks,
local_valid_id_dirty: self.local_valid_id_dirty,
pending_local: self.pending_local,
received_remote: self.received_remote,
}
}
fn try_send_pending(&mut self) -> Poll<(), ConnectionError> {
trace!("try_send_pending; dirty={} acks={}", self.local_valid_id_dirty, self.remaining_acks);
if self.local_valid_id_dirty {
let frame = frame::Settings::new(self.local.clone());
try_ready!(self.try_send(frame));
self.local_valid_id_dirty = false;
trace!("try_send_pending; dirty={} acks={}", self.pending_local.is_some(), self.remaining_acks);
if let Some(local) = self.pending_local.take() {
try_ready!(self.try_send_local(local));
}
while self.remaining_acks > 0 {
@@ -101,9 +89,19 @@ impl<T, U> Settings<T>
Ok(Async::Ready(()))
}
fn try_send(&mut self, item: frame::Settings) -> Poll<(), ConnectionError> {
fn try_send_local(&mut self, local: SettingSet) -> Poll<(), ConnectionError> {
let frame = frame::Settings::new(local.clone()).into();
if self.try_send(frame)?.is_not_ready() {
self.pending_local = Some(local);
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
}
fn try_send(&mut self, frame: frame::Settings) -> Poll<(), ConnectionError> {
trace!("try_send");
if self.inner.start_send(item.into())?.is_ready() {
if self.inner.start_send(frame.into())?.is_ready() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
@@ -111,19 +109,24 @@ impl<T, U> Settings<T>
}
}
impl<T> ControlSettings for Settings<T>{
fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> {
self.local = local;
self.local_valid_id_dirty = true;
impl<T, U> ControlSettings for Settings<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
fn update_local_settings(&mut self, local: SettingSet) -> Result<(), ConnectionError> {
self.try_send_local(local)?;
Ok(())
}
fn local_settings(&self) -> &SettingSet {
&self.local
fn remote_initial_window_size(&self) -> u32 {
self.remote_initial_window_size
}
fn remote_settings(&self) -> &SettingSet {
&self.remote
fn remote_max_concurrent_streams(&self) -> Option<u32> {
self.remote_max_concurrent_streams
}
fn remote_push_enabled(&self) -> Option<bool> {
self.remote_push_enabled
}
}
@@ -147,7 +150,16 @@ impl<T, U> Stream for Settings<T>
// acknowledgements.
let settings = v.into_set();
self.inner.apply_remote_settings(&settings)?;
self.remote = settings;
if let Some(sz) = settings.initial_window_size() {
self.remote_initial_window_size = sz;
}
if let Some(max) = settings.max_concurrent_streams() {
self.remote_max_concurrent_streams = Some(max);
}
if let Some(ok) = settings.enable_push() {
self.remote_push_enabled = Some(ok);
}
self.remaining_acks += 1;
let _ = try!(self.try_send_pending());

View File

@@ -66,7 +66,9 @@ impl<T> ApplySettings for StreamRecvOpen<T>
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.max_concurrency = set.max_concurrent_streams();
self.initial_window_size = set.initial_window_size();
if let Some(sz) = set.initial_window_size() {
self.initial_window_size = sz;
}
self.inner.apply_local_settings(set)
}

View File

@@ -38,7 +38,9 @@ impl<T: ApplySettings> ApplySettings for StreamSendOpen<T> {
fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.max_concurrency = set.max_concurrent_streams();
self.initial_window_size = set.initial_window_size();
if let Some(sz) = set.initial_window_size() {
self.initial_window_size = sz;
}
self.inner.apply_remote_settings(set)
}
}

View File

@@ -46,12 +46,13 @@ pub fn handshake2<T, B: IntoBuf>(io: T) -> Handshake<T, B>
where T: AsyncRead + AsyncWrite + 'static,
B: 'static, // TODO: Why is this required but not in client?
{
let transport = proto::server_handshaker(io, Default::default());
let local_settings = frame::SettingSet::default();
let transport = proto::server_handshaker(io, local_settings.clone());
// Flush pending settings frame and then wait for the client preface
let handshake = Flush::new(transport)
.and_then(ReadPreface::new)
.map(proto::from_server_handshaker)
.map(move |t| proto::from_server_handshaker(t, local_settings))
;
Handshake { inner: Box::new(handshake) }