548 lines
17 KiB
Rust
548 lines
17 KiB
Rust
use super::{
|
|
store, Buffer, Codec, Config, Counts, Frame, Prioritize, Prioritized, Store, Stream, StreamId,
|
|
StreamIdOverflow, WindowSize,
|
|
};
|
|
use crate::codec::UserError;
|
|
use crate::frame::{self, Reason};
|
|
use crate::proto::{Error, Initiator};
|
|
|
|
use bytes::Buf;
|
|
use http;
|
|
use std::task::{Context, Poll, Waker};
|
|
use tokio::io::AsyncWrite;
|
|
|
|
use std::io;
|
|
|
|
/// Manages state transitions related to outbound frames.
|
|
#[derive(Debug)]
|
|
pub(super) struct Send {
|
|
/// Stream identifier to use for next initialized stream.
|
|
next_stream_id: Result<StreamId, StreamIdOverflow>,
|
|
|
|
/// Any streams with a higher ID are ignored.
|
|
///
|
|
/// This starts as MAX, but is lowered when a GOAWAY is received.
|
|
///
|
|
/// > After sending a GOAWAY frame, the sender can discard frames for
|
|
/// > streams initiated by the receiver with identifiers higher than
|
|
/// > the identified last stream.
|
|
max_stream_id: StreamId,
|
|
|
|
/// Initial window size of locally initiated streams
|
|
init_window_sz: WindowSize,
|
|
|
|
/// Prioritization layer
|
|
prioritize: Prioritize,
|
|
|
|
is_push_enabled: bool,
|
|
}
|
|
|
|
/// A value to detect which public API has called `poll_reset`.
|
|
#[derive(Debug)]
|
|
pub(crate) enum PollReset {
|
|
AwaitingHeaders,
|
|
Streaming,
|
|
}
|
|
|
|
impl Send {
|
|
/// Create a new `Send`
|
|
pub fn new(config: &Config) -> Self {
|
|
Send {
|
|
init_window_sz: config.remote_init_window_sz,
|
|
max_stream_id: StreamId::MAX,
|
|
next_stream_id: Ok(config.local_next_stream_id),
|
|
prioritize: Prioritize::new(config),
|
|
is_push_enabled: true,
|
|
}
|
|
}
|
|
|
|
/// Returns the initial send window size
|
|
pub fn init_window_sz(&self) -> WindowSize {
|
|
self.init_window_sz
|
|
}
|
|
|
|
pub fn open(&mut self) -> Result<StreamId, UserError> {
|
|
let stream_id = self.ensure_next_stream_id()?;
|
|
self.next_stream_id = stream_id.next_id();
|
|
Ok(stream_id)
|
|
}
|
|
|
|
pub fn reserve_local(&mut self) -> Result<StreamId, UserError> {
|
|
let stream_id = self.ensure_next_stream_id()?;
|
|
self.next_stream_id = stream_id.next_id();
|
|
Ok(stream_id)
|
|
}
|
|
|
|
fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> {
|
|
// 8.1.2.2. Connection-Specific Header Fields
|
|
if fields.contains_key(http::header::CONNECTION)
|
|
|| fields.contains_key(http::header::TRANSFER_ENCODING)
|
|
|| fields.contains_key(http::header::UPGRADE)
|
|
|| fields.contains_key("keep-alive")
|
|
|| fields.contains_key("proxy-connection")
|
|
{
|
|
tracing::debug!("illegal connection-specific headers found");
|
|
return Err(UserError::MalformedHeaders);
|
|
} else if let Some(te) = fields.get(http::header::TE) {
|
|
if te != "trailers" {
|
|
tracing::debug!("illegal connection-specific headers found");
|
|
return Err(UserError::MalformedHeaders);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub fn send_push_promise<B>(
|
|
&mut self,
|
|
frame: frame::PushPromise,
|
|
buffer: &mut Buffer<Frame<B>>,
|
|
stream: &mut store::Ptr,
|
|
task: &mut Option<Waker>,
|
|
) -> Result<(), UserError> {
|
|
if !self.is_push_enabled {
|
|
return Err(UserError::PeerDisabledServerPush);
|
|
}
|
|
|
|
tracing::trace!(
|
|
"send_push_promise; frame={:?}; init_window={:?}",
|
|
frame,
|
|
self.init_window_sz
|
|
);
|
|
|
|
Self::check_headers(frame.fields())?;
|
|
|
|
// Queue the frame for sending
|
|
self.prioritize
|
|
.queue_frame(frame.into(), buffer, stream, task);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn send_headers<B>(
|
|
&mut self,
|
|
frame: frame::Headers,
|
|
buffer: &mut Buffer<Frame<B>>,
|
|
stream: &mut store::Ptr,
|
|
counts: &mut Counts,
|
|
task: &mut Option<Waker>,
|
|
) -> Result<(), UserError> {
|
|
tracing::trace!(
|
|
"send_headers; frame={:?}; init_window={:?}",
|
|
frame,
|
|
self.init_window_sz
|
|
);
|
|
|
|
Self::check_headers(frame.fields())?;
|
|
|
|
let end_stream = frame.is_end_stream();
|
|
|
|
// Update the state
|
|
stream.state.send_open(end_stream)?;
|
|
|
|
if counts.peer().is_local_init(frame.stream_id()) {
|
|
// If we're waiting on a PushPromise anyway
|
|
// handle potentially queueing the stream at that point
|
|
if !stream.is_pending_push {
|
|
if counts.can_inc_num_send_streams() {
|
|
counts.inc_num_send_streams(stream);
|
|
} else {
|
|
self.prioritize.queue_open(stream);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Queue the frame for sending
|
|
self.prioritize
|
|
.queue_frame(frame.into(), buffer, stream, task);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Send an explicit RST_STREAM frame
|
|
pub fn send_reset<B>(
|
|
&mut self,
|
|
reason: Reason,
|
|
initiator: Initiator,
|
|
buffer: &mut Buffer<Frame<B>>,
|
|
stream: &mut store::Ptr,
|
|
counts: &mut Counts,
|
|
task: &mut Option<Waker>,
|
|
) {
|
|
let is_reset = stream.state.is_reset();
|
|
let is_closed = stream.state.is_closed();
|
|
let is_empty = stream.pending_send.is_empty();
|
|
let stream_id = stream.id;
|
|
|
|
tracing::trace!(
|
|
"send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \
|
|
is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
|
|
state={:?} \
|
|
",
|
|
reason,
|
|
initiator,
|
|
stream_id,
|
|
is_reset,
|
|
is_closed,
|
|
is_empty,
|
|
stream.state
|
|
);
|
|
|
|
if is_reset {
|
|
// Don't double reset
|
|
tracing::trace!(
|
|
" -> not sending RST_STREAM ({:?} is already reset)",
|
|
stream_id
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Transition the state to reset no matter what.
|
|
stream.state.set_reset(stream_id, reason, initiator);
|
|
|
|
// If closed AND the send queue is flushed, then the stream cannot be
|
|
// reset explicitly, either. Implicit resets can still be queued.
|
|
if is_closed && is_empty {
|
|
tracing::trace!(
|
|
" -> not sending explicit RST_STREAM ({:?} was closed \
|
|
and send queue was flushed)",
|
|
stream_id
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Clear all pending outbound frames.
|
|
// Note that we don't call `self.recv_err` because we want to enqueue
|
|
// the reset frame before transitioning the stream inside
|
|
// `reclaim_all_capacity`.
|
|
self.prioritize.clear_queue(buffer, stream);
|
|
|
|
let frame = frame::Reset::new(stream.id, reason);
|
|
|
|
tracing::trace!("send_reset -- queueing; frame={:?}", frame);
|
|
self.prioritize
|
|
.queue_frame(frame.into(), buffer, stream, task);
|
|
self.prioritize.reclaim_all_capacity(stream, counts);
|
|
}
|
|
|
|
pub fn schedule_implicit_reset(
|
|
&mut self,
|
|
stream: &mut store::Ptr,
|
|
reason: Reason,
|
|
counts: &mut Counts,
|
|
task: &mut Option<Waker>,
|
|
) {
|
|
if stream.state.is_closed() {
|
|
// Stream is already closed, nothing more to do
|
|
return;
|
|
}
|
|
|
|
stream.state.set_scheduled_reset(reason);
|
|
|
|
self.prioritize.reclaim_reserved_capacity(stream, counts);
|
|
self.prioritize.schedule_send(stream, task);
|
|
}
|
|
|
|
pub fn send_data<B>(
|
|
&mut self,
|
|
frame: frame::Data<B>,
|
|
buffer: &mut Buffer<Frame<B>>,
|
|
stream: &mut store::Ptr,
|
|
counts: &mut Counts,
|
|
task: &mut Option<Waker>,
|
|
) -> Result<(), UserError>
|
|
where
|
|
B: Buf,
|
|
{
|
|
self.prioritize
|
|
.send_data(frame, buffer, stream, counts, task)
|
|
}
|
|
|
|
pub fn send_trailers<B>(
|
|
&mut self,
|
|
frame: frame::Headers,
|
|
buffer: &mut Buffer<Frame<B>>,
|
|
stream: &mut store::Ptr,
|
|
counts: &mut Counts,
|
|
task: &mut Option<Waker>,
|
|
) -> Result<(), UserError> {
|
|
// TODO: Should this logic be moved into state.rs?
|
|
if !stream.state.is_send_streaming() {
|
|
return Err(UserError::UnexpectedFrameType);
|
|
}
|
|
|
|
stream.state.send_close();
|
|
|
|
tracing::trace!("send_trailers -- queuing; frame={:?}", frame);
|
|
self.prioritize
|
|
.queue_frame(frame.into(), buffer, stream, task);
|
|
|
|
// Release any excess capacity
|
|
self.prioritize.reserve_capacity(0, stream, counts);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn poll_complete<T, B>(
|
|
&mut self,
|
|
cx: &mut Context,
|
|
buffer: &mut Buffer<Frame<B>>,
|
|
store: &mut Store,
|
|
counts: &mut Counts,
|
|
dst: &mut Codec<T, Prioritized<B>>,
|
|
) -> Poll<io::Result<()>>
|
|
where
|
|
T: AsyncWrite + Unpin,
|
|
B: Buf,
|
|
{
|
|
self.prioritize
|
|
.poll_complete(cx, buffer, store, counts, dst)
|
|
}
|
|
|
|
/// Request capacity to send data
|
|
pub fn reserve_capacity(
|
|
&mut self,
|
|
capacity: WindowSize,
|
|
stream: &mut store::Ptr,
|
|
counts: &mut Counts,
|
|
) {
|
|
self.prioritize.reserve_capacity(capacity, stream, counts)
|
|
}
|
|
|
|
pub fn poll_capacity(
|
|
&mut self,
|
|
cx: &Context,
|
|
stream: &mut store::Ptr,
|
|
) -> Poll<Option<Result<WindowSize, UserError>>> {
|
|
if !stream.state.is_send_streaming() {
|
|
return Poll::Ready(None);
|
|
}
|
|
|
|
if !stream.send_capacity_inc {
|
|
stream.wait_send(cx);
|
|
return Poll::Pending;
|
|
}
|
|
|
|
stream.send_capacity_inc = false;
|
|
|
|
Poll::Ready(Some(Ok(self.capacity(stream))))
|
|
}
|
|
|
|
/// Current available stream send capacity
|
|
pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
|
|
let available = stream.send_flow.available().as_size();
|
|
let buffered = stream.buffered_send_data;
|
|
|
|
if available as usize <= buffered {
|
|
0
|
|
} else {
|
|
available - buffered as WindowSize
|
|
}
|
|
}
|
|
|
|
pub fn poll_reset(
|
|
&self,
|
|
cx: &Context,
|
|
stream: &mut Stream,
|
|
mode: PollReset,
|
|
) -> Poll<Result<Reason, crate::Error>> {
|
|
match stream.state.ensure_reason(mode)? {
|
|
Some(reason) => Poll::Ready(Ok(reason)),
|
|
None => {
|
|
stream.wait_send(cx);
|
|
Poll::Pending
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn recv_connection_window_update(
|
|
&mut self,
|
|
frame: frame::WindowUpdate,
|
|
store: &mut Store,
|
|
counts: &mut Counts,
|
|
) -> Result<(), Reason> {
|
|
self.prioritize
|
|
.recv_connection_window_update(frame.size_increment(), store, counts)
|
|
}
|
|
|
|
pub fn recv_stream_window_update<B>(
|
|
&mut self,
|
|
sz: WindowSize,
|
|
buffer: &mut Buffer<Frame<B>>,
|
|
stream: &mut store::Ptr,
|
|
counts: &mut Counts,
|
|
task: &mut Option<Waker>,
|
|
) -> Result<(), Reason> {
|
|
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
|
|
tracing::debug!("recv_stream_window_update !!; err={:?}", e);
|
|
|
|
self.send_reset(
|
|
Reason::FLOW_CONTROL_ERROR,
|
|
Initiator::Library,
|
|
buffer,
|
|
stream,
|
|
counts,
|
|
task,
|
|
);
|
|
|
|
return Err(e);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub(super) fn recv_go_away(&mut self, last_stream_id: StreamId) -> Result<(), Error> {
|
|
if last_stream_id > self.max_stream_id {
|
|
// The remote endpoint sent a `GOAWAY` frame indicating a stream
|
|
// that we never sent, or that we have already terminated on account
|
|
// of previous `GOAWAY` frame. In either case, that is illegal.
|
|
// (When sending multiple `GOAWAY`s, "Endpoints MUST NOT increase
|
|
// the value they send in the last stream identifier, since the
|
|
// peers might already have retried unprocessed requests on another
|
|
// connection.")
|
|
proto_err!(conn:
|
|
"recv_go_away: last_stream_id ({:?}) > max_stream_id ({:?})",
|
|
last_stream_id, self.max_stream_id,
|
|
);
|
|
return Err(Error::library_go_away(Reason::PROTOCOL_ERROR));
|
|
}
|
|
|
|
self.max_stream_id = last_stream_id;
|
|
Ok(())
|
|
}
|
|
|
|
pub fn handle_error<B>(
|
|
&mut self,
|
|
buffer: &mut Buffer<Frame<B>>,
|
|
stream: &mut store::Ptr,
|
|
counts: &mut Counts,
|
|
) {
|
|
// Clear all pending outbound frames
|
|
self.prioritize.clear_queue(buffer, stream);
|
|
self.prioritize.reclaim_all_capacity(stream, counts);
|
|
}
|
|
|
|
pub fn apply_remote_settings<B>(
|
|
&mut self,
|
|
settings: &frame::Settings,
|
|
buffer: &mut Buffer<Frame<B>>,
|
|
store: &mut Store,
|
|
counts: &mut Counts,
|
|
task: &mut Option<Waker>,
|
|
) -> Result<(), Error> {
|
|
// Applies an update to the remote endpoint's initial window size.
|
|
//
|
|
// Per RFC 7540 §6.9.2:
|
|
//
|
|
// In addition to changing the flow-control window for streams that are
|
|
// not yet active, a SETTINGS frame can alter the initial flow-control
|
|
// window size for streams with active flow-control windows (that is,
|
|
// streams in the "open" or "half-closed (remote)" state). When the
|
|
// value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust
|
|
// the size of all stream flow-control windows that it maintains by the
|
|
// difference between the new value and the old value.
|
|
//
|
|
// A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available
|
|
// space in a flow-control window to become negative. A sender MUST
|
|
// track the negative flow-control window and MUST NOT send new
|
|
// flow-controlled frames until it receives WINDOW_UPDATE frames that
|
|
// cause the flow-control window to become positive.
|
|
if let Some(val) = settings.initial_window_size() {
|
|
let old_val = self.init_window_sz;
|
|
self.init_window_sz = val;
|
|
|
|
if val < old_val {
|
|
// We must decrease the (remote) window on every open stream.
|
|
let dec = old_val - val;
|
|
tracing::trace!("decrementing all windows; dec={}", dec);
|
|
|
|
let mut total_reclaimed = 0;
|
|
store.for_each(|mut stream| {
|
|
let stream = &mut *stream;
|
|
|
|
stream.send_flow.dec_send_window(dec);
|
|
|
|
// It's possible that decreasing the window causes
|
|
// `window_size` (the stream-specific window) to fall below
|
|
// `available` (the portion of the connection-level window
|
|
// that we have allocated to the stream).
|
|
// In this case, we should take that excess allocation away
|
|
// and reassign it to other streams.
|
|
let window_size = stream.send_flow.window_size();
|
|
let available = stream.send_flow.available().as_size();
|
|
let reclaimed = if available > window_size {
|
|
// Drop down to `window_size`.
|
|
let reclaim = available - window_size;
|
|
stream.send_flow.claim_capacity(reclaim);
|
|
total_reclaimed += reclaim;
|
|
reclaim
|
|
} else {
|
|
0
|
|
};
|
|
|
|
tracing::trace!(
|
|
"decremented stream window; id={:?}; decr={}; reclaimed={}; flow={:?}",
|
|
stream.id,
|
|
dec,
|
|
reclaimed,
|
|
stream.send_flow
|
|
);
|
|
|
|
// TODO: Should this notify the producer when the capacity
|
|
// of a stream is reduced? Maybe it should if the capacity
|
|
// is reduced to zero, allowing the producer to stop work.
|
|
|
|
Ok::<_, Error>(())
|
|
})?;
|
|
|
|
self.prioritize
|
|
.assign_connection_capacity(total_reclaimed, store, counts);
|
|
} else if val > old_val {
|
|
let inc = val - old_val;
|
|
|
|
store.for_each(|mut stream| {
|
|
self.recv_stream_window_update(inc, buffer, &mut stream, counts, task)
|
|
.map_err(Error::library_go_away)
|
|
})?;
|
|
}
|
|
}
|
|
|
|
if let Some(val) = settings.is_push_enabled() {
|
|
self.is_push_enabled = val
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn clear_queues(&mut self, store: &mut Store, counts: &mut Counts) {
|
|
self.prioritize.clear_pending_capacity(store, counts);
|
|
self.prioritize.clear_pending_send(store, counts);
|
|
self.prioritize.clear_pending_open(store, counts);
|
|
}
|
|
|
|
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
|
|
if let Ok(next) = self.next_stream_id {
|
|
if id >= next {
|
|
return Err(Reason::PROTOCOL_ERROR);
|
|
}
|
|
}
|
|
// if next_stream_id is overflowed, that's ok.
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> {
|
|
self.next_stream_id
|
|
.map_err(|_| UserError::OverflowedStreamId)
|
|
}
|
|
|
|
pub fn may_have_created_stream(&self, id: StreamId) -> bool {
|
|
if let Ok(next_id) = self.next_stream_id {
|
|
// Peer::is_local_init should have been called beforehand
|
|
debug_assert_eq!(id.is_server_initiated(), next_id.is_server_initiated(),);
|
|
id < next_id
|
|
} else {
|
|
true
|
|
}
|
|
}
|
|
}
|