Implement the extended CONNECT protocol from RFC 8441 (#565)

This commit is contained in:
Anthony Ramine
2021-11-24 10:05:10 +01:00
committed by GitHub
parent dbaa3a4285
commit 87969c1f29
22 changed files with 694 additions and 120 deletions

View File

@@ -110,6 +110,10 @@ where
initial_max_send_streams: config.initial_max_send_streams,
local_next_stream_id: config.next_stream_id,
local_push_enabled: config.settings.is_push_enabled().unwrap_or(true),
extended_connect_protocol_enabled: config
.settings
.is_extended_connect_protocol_enabled()
.unwrap_or(false),
local_reset_duration: config.reset_stream_duration,
local_reset_max: config.reset_stream_max,
remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
@@ -147,6 +151,13 @@ where
self.inner.settings.send_settings(settings)
}
/// Send a new SETTINGS frame with extended CONNECT protocol enabled.
pub(crate) fn set_enable_connect_protocol(&mut self) -> Result<(), UserError> {
let mut settings = frame::Settings::default();
settings.set_enable_connect_protocol(Some(1));
self.inner.settings.send_settings(settings)
}
/// Returns the maximum number of concurrent streams that may be initiated
/// by this peer.
pub(crate) fn max_send_streams(&self) -> usize {

View File

@@ -117,6 +117,8 @@ impl Settings {
tracing::trace!("ACK sent; applying settings");
streams.apply_remote_settings(settings)?;
if let Some(val) = settings.header_table_size() {
dst.set_send_header_table_size(val as usize);
}
@@ -124,8 +126,6 @@ impl Settings {
if let Some(val) = settings.max_frame_size() {
dst.set_max_send_frame_size(val as usize);
}
streams.apply_remote_settings(settings)?;
}
self.remote = None;

View File

@@ -47,6 +47,9 @@ pub struct Config {
/// If the local peer is willing to receive push promises
pub local_push_enabled: bool,
/// If extended connect protocol is enabled.
pub extended_connect_protocol_enabled: bool,
/// How long a locally reset stream should ignore frames
pub local_reset_duration: Duration,

View File

@@ -56,6 +56,9 @@ pub(super) struct Recv {
/// If push promises are allowed to be received.
is_push_enabled: bool,
/// If extended connect protocol is enabled.
is_extended_connect_protocol_enabled: bool,
}
#[derive(Debug)]
@@ -103,6 +106,7 @@ impl Recv {
buffer: Buffer::new(),
refused: None,
is_push_enabled: config.local_push_enabled,
is_extended_connect_protocol_enabled: config.extended_connect_protocol_enabled,
}
}
@@ -216,6 +220,14 @@ impl Recv {
let stream_id = frame.stream_id();
let (pseudo, fields) = frame.into_parts();
if pseudo.protocol.is_some() {
if counts.peer().is_server() && !self.is_extended_connect_protocol_enabled {
proto_err!(stream: "cannot use :protocol if extended connect protocol is disabled; stream={:?}", stream.id);
return Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR).into());
}
}
if !pseudo.is_informational() {
let message = counts
.peer()
@@ -449,60 +461,58 @@ impl Recv {
settings: &frame::Settings,
store: &mut Store,
) -> Result<(), proto::Error> {
let target = if let Some(val) = settings.initial_window_size() {
val
} else {
return Ok(());
};
let old_sz = self.init_window_sz;
self.init_window_sz = target;
tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
// Per RFC 7540 §6.9.2:
//
// In addition to changing the flow-control window for streams that are
// not yet active, a SETTINGS frame can alter the initial flow-control
// window size for streams with active flow-control windows (that is,
// streams in the "open" or "half-closed (remote)" state). When the
// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
// the size of all stream flow-control windows that it maintains by the
// difference between the new value and the old value.
//
// A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
// space in a flow-control window to become negative. A sender MUST
// track the negative flow-control window and MUST NOT send new
// flow-controlled frames until it receives WINDOW_UPDATE frames that
// cause the flow-control window to become positive.
if target < old_sz {
// We must decrease the (local) window on every open stream.
let dec = old_sz - target;
tracing::trace!("decrementing all windows; dec={}", dec);
store.for_each(|mut stream| {
stream.recv_flow.dec_recv_window(dec);
Ok(())
})
} else if target > old_sz {
// We must increase the (local) window on every open stream.
let inc = target - old_sz;
tracing::trace!("incrementing all windows; inc={}", inc);
store.for_each(|mut stream| {
// XXX: Shouldn't the peer have already noticed our
// overflow and sent us a GOAWAY?
stream
.recv_flow
.inc_window(inc)
.map_err(proto::Error::library_go_away)?;
stream.recv_flow.assign_capacity(inc);
Ok(())
})
} else {
// size is the same... so do nothing
Ok(())
if let Some(val) = settings.is_extended_connect_protocol_enabled() {
self.is_extended_connect_protocol_enabled = val;
}
if let Some(target) = settings.initial_window_size() {
let old_sz = self.init_window_sz;
self.init_window_sz = target;
tracing::trace!("update_initial_window_size; new={}; old={}", target, old_sz,);
// Per RFC 7540 §6.9.2:
//
// In addition to changing the flow-control window for streams that are
// not yet active, a SETTINGS frame can alter the initial flow-control
// window size for streams with active flow-control windows (that is,
// streams in the "open" or "half-closed (remote)" state). When the
// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
// the size of all stream flow-control windows that it maintains by the
// difference between the new value and the old value.
//
// A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
// space in a flow-control window to become negative. A sender MUST
// track the negative flow-control window and MUST NOT send new
// flow-controlled frames until it receives WINDOW_UPDATE frames that
// cause the flow-control window to become positive.
if target < old_sz {
// We must decrease the (local) window on every open stream.
let dec = old_sz - target;
tracing::trace!("decrementing all windows; dec={}", dec);
store.for_each(|mut stream| {
stream.recv_flow.dec_recv_window(dec);
})
} else if target > old_sz {
// We must increase the (local) window on every open stream.
let inc = target - old_sz;
tracing::trace!("incrementing all windows; inc={}", inc);
store.try_for_each(|mut stream| {
// XXX: Shouldn't the peer have already noticed our
// overflow and sent us a GOAWAY?
stream
.recv_flow
.inc_window(inc)
.map_err(proto::Error::library_go_away)?;
stream.recv_flow.assign_capacity(inc);
Ok::<_, proto::Error>(())
})?;
}
}
Ok(())
}
pub fn is_end_stream(&self, stream: &store::Ptr) -> bool {

View File

@@ -35,6 +35,9 @@ pub(super) struct Send {
prioritize: Prioritize,
is_push_enabled: bool,
/// If extended connect protocol is enabled.
is_extended_connect_protocol_enabled: bool,
}
/// A value to detect which public API has called `poll_reset`.
@@ -53,6 +56,7 @@ impl Send {
next_stream_id: Ok(config.local_next_stream_id),
prioritize: Prioritize::new(config),
is_push_enabled: true,
is_extended_connect_protocol_enabled: false,
}
}
@@ -429,6 +433,10 @@ impl Send {
counts: &mut Counts,
task: &mut Option<Waker>,
) -> Result<(), Error> {
if let Some(val) = settings.is_extended_connect_protocol_enabled() {
self.is_extended_connect_protocol_enabled = val;
}
// Applies an update to the remote endpoint's initial window size.
//
// Per RFC 7540 §6.9.2:
@@ -490,16 +498,14 @@ impl Send {
// TODO: Should this notify the producer when the capacity
// of a stream is reduced? Maybe it should if the capacity
// is reduced to zero, allowing the producer to stop work.
Ok::<_, Error>(())
})?;
});
self.prioritize
.assign_connection_capacity(total_reclaimed, store, counts);
} else if val > old_val {
let inc = val - old_val;
store.for_each(|mut stream| {
store.try_for_each(|mut stream| {
self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
.map_err(Error::library_go_away)
})?;
@@ -554,4 +560,8 @@ impl Send {
}
}
}
pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
self.is_extended_connect_protocol_enabled
}
}

View File

@@ -4,6 +4,7 @@ use slab;
use indexmap::{self, IndexMap};
use std::convert::Infallible;
use std::fmt;
use std::marker::PhantomData;
use std::ops;
@@ -128,7 +129,20 @@ impl Store {
}
}
pub fn for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
pub(crate) fn for_each<F>(&mut self, mut f: F)
where
F: FnMut(Ptr),
{
match self.try_for_each(|ptr| {
f(ptr);
Ok::<_, Infallible>(())
}) {
Ok(()) => (),
Err(infallible) => match infallible {},
}
}
pub fn try_for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
where
F: FnMut(Ptr) -> Result<(), E>,
{

View File

@@ -2,6 +2,7 @@ use super::recv::RecvHeaderBlockError;
use super::store::{self, Entry, Resolve, Store};
use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
use crate::codec::{Codec, SendError, UserError};
use crate::ext::Protocol;
use crate::frame::{self, Frame, Reason};
use crate::proto::{peer, Error, Initiator, Open, Peer, WindowSize};
use crate::{client, proto, server};
@@ -214,6 +215,8 @@ where
use super::stream::ContentLength;
use http::Method;
let protocol = request.extensions_mut().remove::<Protocol>();
// Clear before taking lock, incase extensions contain a StreamRef.
request.extensions_mut().clear();
@@ -261,7 +264,8 @@ where
}
// Convert the message
let headers = client::Peer::convert_send_message(stream_id, request, end_of_stream)?;
let headers =
client::Peer::convert_send_message(stream_id, request, protocol, end_of_stream)?;
let mut stream = me.store.insert(stream.id, stream);
@@ -294,6 +298,15 @@ where
send_buffer: self.send_buffer.clone(),
})
}
pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
self.inner
.lock()
.unwrap()
.actions
.send
.is_extended_connect_protocol_enabled()
}
}
impl<B> DynStreams<'_, B> {
@@ -643,15 +656,12 @@ impl Inner {
let last_processed_id = actions.recv.last_processed_id();
self.store
.for_each(|stream| {
counts.transition(stream, |counts, stream| {
actions.recv.handle_error(&err, &mut *stream);
actions.send.handle_error(send_buffer, stream, counts);
Ok::<_, ()>(())
})
self.store.for_each(|stream| {
counts.transition(stream, |counts, stream| {
actions.recv.handle_error(&err, &mut *stream);
actions.send.handle_error(send_buffer, stream, counts);
})
.unwrap();
});
actions.conn_error = Some(err);
@@ -674,19 +684,14 @@ impl Inner {
let err = Error::remote_go_away(frame.debug_data().clone(), frame.reason());
self.store
.for_each(|stream| {
if stream.id > last_stream_id {
counts.transition(stream, |counts, stream| {
actions.recv.handle_error(&err, &mut *stream);
actions.send.handle_error(send_buffer, stream, counts);
Ok::<_, ()>(())
})
} else {
Ok::<_, ()>(())
}
})
.unwrap();
self.store.for_each(|stream| {
if stream.id > last_stream_id {
counts.transition(stream, |counts, stream| {
actions.recv.handle_error(&err, &mut *stream);
actions.send.handle_error(send_buffer, stream, counts);
})
}
});
actions.conn_error = Some(err);
@@ -807,18 +812,15 @@ impl Inner {
tracing::trace!("Streams::recv_eof");
self.store
.for_each(|stream| {
counts.transition(stream, |counts, stream| {
actions.recv.recv_eof(stream);
self.store.for_each(|stream| {
counts.transition(stream, |counts, stream| {
actions.recv.recv_eof(stream);
// This handles resetting send state associated with the
// stream
actions.send.handle_error(send_buffer, stream, counts);
Ok::<_, ()>(())
})
// This handles resetting send state associated with the
// stream
actions.send.handle_error(send_buffer, stream, counts);
})
.expect("recv_eof");
});
actions.clear_queues(clear_pending_accept, &mut self.store, counts);
Ok(())