Add server support for push (#327)

Closes #291, closes #185
This commit is contained in:
Michael Beaumont
2019-09-16 20:30:59 +02:00
committed by Sean McArthur
parent 0527f5b72a
commit fac165e451
10 changed files with 735 additions and 134 deletions

View File

@@ -111,7 +111,7 @@ impl Prioritize {
pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Waker>) {
// If the stream is waiting to be opened, nothing more to do.
if !stream.is_pending_open {
if stream.is_send_ready() {
log::trace!("schedule_send; {:?}", stream.id);
// Queue the stream
self.pending_send.push(stream);
@@ -445,19 +445,9 @@ impl Prioritize {
self.pending_capacity.push(stream);
}
// If data is buffered and the stream is not pending open, then
// If data is buffered and the stream is send ready, then
// schedule the stream for execution
//
// Why do we not push into pending_send when the stream is in pending_open?
//
// We allow users to call send_request() which schedules a stream to be pending_open
// if there is no room according to the concurrency limit (max_send_streams), and we
// also allow data to be buffered for send with send_data() if there is no capacity for
// the stream to send the data, which attempts to place the stream in pending_send.
// If the stream is not open, we don't want the stream to be scheduled for
// execution (pending_send). Note that if the stream is in pending_open, it will be
// pushed to pending_send when there is room for an open stream.
if stream.buffered_send_data > 0 && !stream.is_pending_open {
if stream.buffered_send_data > 0 && stream.is_send_ready() {
// TODO: This assertion isn't *exactly* correct. There can still be
// buffered send data while the stream's pending send queue is
// empty. This can happen when a large data frame is in the process
@@ -766,6 +756,22 @@ impl Prioritize {
stream: stream.key(),
}))
}
Some(Frame::PushPromise(pp)) => {
let mut pushed =
stream.store_mut().find_mut(&pp.promised_id()).unwrap();
pushed.is_pending_push = false;
// Transition stream from pending_push to pending_open
// if possible
if !pushed.pending_send.is_empty() {
if counts.can_inc_num_send_streams() {
counts.inc_num_send_streams(&mut pushed);
self.pending_send.push(&mut pushed);
} else {
self.queue_open(&mut pushed);
}
}
Frame::PushPromise(pp)
}
Some(frame) => frame.map(|_| {
unreachable!(
"Frame::map closure will only be called \

View File

@@ -1,10 +1,10 @@
use super::*;
use crate::codec::{RecvError, UserError};
use crate::frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE};
use crate::frame::{PushPromiseHeaderError, Reason, DEFAULT_INITIAL_WINDOW_SIZE};
use crate::{frame, proto};
use std::task::Context;
use http::{HeaderMap, Method, Request, Response};
use http::{HeaderMap, Request, Response};
use std::io;
use std::task::{Poll, Waker};
@@ -178,7 +178,7 @@ impl Recv {
use http::header;
if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) {
let content_length = match parse_u64(content_length.as_bytes()) {
let content_length = match frame::parse_u64(content_length.as_bytes()) {
Ok(v) => v,
Err(()) => {
proto_err!(stream: "could not parse content-length; stream={:?}", stream.id);
@@ -632,44 +632,31 @@ impl Recv {
}
let promised_id = frame.promised_id();
use http::header;
let (pseudo, fields) = frame.into_parts();
let req = crate::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
// The spec has some requirements for promised request headers
// [https://httpwg.org/specs/rfc7540.html#PushRequests]
// A promised request "that indicates the presence of a request body
// MUST reset the promised stream with a stream error"
if let Some(content_length) = req.headers().get(header::CONTENT_LENGTH) {
match parse_u64(content_length.as_bytes()) {
Ok(0) => {}
otherwise => {
proto_err!(stream:
"recv_push_promise; promised request has content-length {:?}; promised_id={:?}",
otherwise,
promised_id,
);
return Err(RecvError::Stream {
id: promised_id,
reason: Reason::PROTOCOL_ERROR,
});
}
if let Err(e) = frame::PushPromise::validate_request(&req) {
use PushPromiseHeaderError::*;
match e {
NotSafeAndCacheable => proto_err!(
stream:
"recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
req.method(),
promised_id,
),
InvalidContentLength(e) => proto_err!(
stream:
"recv_push_promise; promised request has invalid content-length {:?}; promised_id={:?}",
e,
promised_id,
),
}
}
// "The server MUST include a method in the :method pseudo-header field
// that is safe and cacheable"
if !Self::safe_and_cacheable(req.method()) {
proto_err!(
stream:
"recv_push_promise: method {} is not safe and cacheable; promised_id={:?}",
req.method(),
promised_id,
);
return Err(RecvError::Stream {
id: promised_id,
reason: Reason::PROTOCOL_ERROR,
});
}
use super::peer::PollMessage::*;
stream
.pending_recv
@@ -678,12 +665,6 @@ impl Recv {
Ok(())
}
fn safe_and_cacheable(method: &Method) -> bool {
// Cacheable: https://httpwg.org/specs/rfc7231.html#cacheable.methods
// Safe: https://httpwg.org/specs/rfc7231.html#safe.methods
method == Method::GET || method == Method::HEAD
}
/// Ensures that `id` is not in the `Idle` state.
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
if let Ok(next) = self.next_stream_id {
@@ -1057,25 +1038,3 @@ impl<T> From<RecvError> for RecvHeaderBlockError<T> {
RecvHeaderBlockError::State(err)
}
}
// ===== util =====
fn parse_u64(src: &[u8]) -> Result<u64, ()> {
if src.len() > 19 {
// At danger for overflow...
return Err(());
}
let mut ret = 0;
for &d in src {
if d < b'0' || d > b'9' {
return Err(());
}
ret *= 10;
ret += u64::from(d - b'0');
}
Ok(ret)
}

View File

@@ -53,6 +53,53 @@ impl Send {
Ok(stream_id)
}
pub fn reserve_local(&mut self) -> Result<StreamId, UserError> {
let stream_id = self.ensure_next_stream_id()?;
self.next_stream_id = stream_id.next_id();
Ok(stream_id)
}
fn check_headers(fields: &http::HeaderMap) -> Result<(), UserError> {
// 8.1.2.2. Connection-Specific Header Fields
if fields.contains_key(http::header::CONNECTION)
|| fields.contains_key(http::header::TRANSFER_ENCODING)
|| fields.contains_key(http::header::UPGRADE)
|| fields.contains_key("keep-alive")
|| fields.contains_key("proxy-connection")
{
log::debug!("illegal connection-specific headers found");
return Err(UserError::MalformedHeaders);
} else if let Some(te) = fields.get(http::header::TE) {
if te != "trailers" {
log::debug!("illegal connection-specific headers found");
return Err(UserError::MalformedHeaders);
}
}
Ok(())
}
pub fn send_push_promise<B>(
&mut self,
frame: frame::PushPromise,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
task: &mut Option<Waker>,
) -> Result<(), UserError> {
log::trace!(
"send_push_promise; frame={:?}; init_window={:?}",
frame,
self.init_window_sz
);
Self::check_headers(frame.fields())?;
// Queue the frame for sending
self.prioritize
.queue_frame(frame.into(), buffer, stream, task);
Ok(())
}
pub fn send_headers<B>(
&mut self,
frame: frame::Headers,
@@ -67,21 +114,7 @@ impl Send {
self.init_window_sz
);
// 8.1.2.2. Connection-Specific Header Fields
if frame.fields().contains_key(http::header::CONNECTION)
|| frame.fields().contains_key(http::header::TRANSFER_ENCODING)
|| frame.fields().contains_key(http::header::UPGRADE)
|| frame.fields().contains_key("keep-alive")
|| frame.fields().contains_key("proxy-connection")
{
log::debug!("illegal connection-specific headers found");
return Err(UserError::MalformedHeaders);
} else if let Some(te) = frame.fields().get(http::header::TE) {
if te != "trailers" {
log::debug!("illegal connection-specific headers found");
return Err(UserError::MalformedHeaders);
}
}
Self::check_headers(frame.fields())?;
if frame.has_too_big_field() {
return Err(UserError::HeaderTooBig);
@@ -93,10 +126,14 @@ impl Send {
stream.state.send_open(end_stream)?;
if counts.peer().is_local_init(frame.stream_id()) {
if counts.can_inc_num_send_streams() {
counts.inc_num_send_streams(stream);
} else {
self.prioritize.queue_open(stream);
// If we're waiting on a PushPromise anyway
// handle potentially queueing the stream at that point
if !stream.is_pending_push {
if counts.can_inc_num_send_streams() {
counts.inc_num_send_streams(stream);
} else {
self.prioritize.queue_open(stream);
}
}
}

View File

@@ -57,7 +57,7 @@ pub struct State {
enum Inner {
Idle,
// TODO: these states shouldn't count against concurrency limits:
//ReservedLocal,
ReservedLocal,
ReservedRemote,
Open { local: Peer, remote: Peer },
HalfClosedLocal(Peer), // TODO: explicitly name this value
@@ -114,7 +114,7 @@ impl State {
Open { local, remote }
}
}
HalfClosedRemote(AwaitingHeaders) => {
HalfClosedRemote(AwaitingHeaders) | ReservedLocal => {
if eos {
Closed(Cause::EndStream)
} else {
@@ -200,6 +200,17 @@ impl State {
}
}
/// Transition from Idle -> ReservedLocal
pub fn reserve_local(&mut self) -> Result<(), UserError> {
match self.inner {
Idle => {
self.inner = ReservedLocal;
Ok(())
}
_ => Err(UserError::UnexpectedFrameType),
}
}
/// Indicates that the remote side will not send more data to the local.
pub fn recv_close(&mut self) -> Result<(), RecvError> {
match self.inner {
@@ -384,7 +395,7 @@ impl State {
pub fn is_recv_closed(&self) -> bool {
match self.inner {
Closed(..) | HalfClosedRemote(..) => true,
Closed(..) | HalfClosedRemote(..) | ReservedLocal => true,
_ => false,
}
}
@@ -410,7 +421,7 @@ impl State {
| Closed(Cause::LocallyReset(reason))
| Closed(Cause::Scheduled(reason)) => Err(proto::Error::Proto(reason)),
Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())),
Closed(Cause::EndStream) | HalfClosedRemote(..) => Ok(false),
Closed(Cause::EndStream) | HalfClosedRemote(..) | ReservedLocal => Ok(false),
_ => Ok(true),
}
}

View File

@@ -69,6 +69,9 @@ pub(super) struct Stream {
/// Set to true when the stream is pending to be opened
pub is_pending_open: bool,
/// Set to true when a push is pending for this stream
pub is_pending_push: bool,
// ===== Fields related to receiving =====
/// Next node in the accept linked list
pub next_pending_accept: Option<store::Key>,
@@ -165,6 +168,7 @@ impl Stream {
send_capacity_inc: false,
is_pending_open: false,
next_open: None,
is_pending_push: false,
// ===== Fields related to receiving =====
next_pending_accept: None,
@@ -200,6 +204,26 @@ impl Stream {
self.reset_at.is_some()
}
/// Returns true if frames for this stream are ready to be sent over the wire
pub fn is_send_ready(&self) -> bool {
// Why do we check pending_open?
//
// We allow users to call send_request() which schedules a stream to be pending_open
// if there is no room according to the concurrency limit (max_send_streams), and we
// also allow data to be buffered for send with send_data() if there is no capacity for
// the stream to send the data, which attempts to place the stream in pending_send.
// If the stream is not open, we don't want the stream to be scheduled for
// execution (pending_send). Note that if the stream is in pending_open, it will be
// pushed to pending_send when there is room for an open stream.
//
// In pending_push we track whether a PushPromise still needs to be sent
// from a different stream before we can start sending frames on this one.
// This is different from the "open" check because reserved streams don't count
// toward the concurrency limit.
// See https://httpwg.org/specs/rfc7540.html#rfc.section.5.1.2
!self.is_pending_open && !self.is_pending_push
}
/// Returns true if the stream is closed
pub fn is_closed(&self) -> bool {
// The state has fully transitioned to closed.

View File

@@ -973,6 +973,56 @@ impl<B> StreamRef<B> {
})
}
pub fn send_push_promise(&mut self, request: Request<()>) -> Result<StreamRef<B>, UserError> {
let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
let actions = &mut me.actions;
let promised_id = actions.send.reserve_local()?;
let child_key = {
let mut child_stream = me.store.insert(
promised_id,
Stream::new(
promised_id,
actions.send.init_window_sz(),
actions.recv.init_window_sz(),
),
);
child_stream.state.reserve_local()?;
child_stream.is_pending_push = true;
child_stream.key()
};
let pushed = {
let mut stream = me.store.resolve(self.opaque.key);
let frame = crate::server::Peer::convert_push_message(stream.id, promised_id, request)?;
actions
.send
.send_push_promise(frame, send_buffer, &mut stream, &mut actions.task)
};
if let Err(err) = pushed {
let mut child_stream = me.store.resolve(child_key);
child_stream.unlink();
child_stream.remove();
return Err(err.into());
}
let opaque =
OpaqueStreamRef::new(self.opaque.inner.clone(), &mut me.store.resolve(child_key));
Ok(StreamRef {
opaque,
send_buffer: self.send_buffer.clone(),
})
}
/// Called by the server after the stream is accepted. Given that clients
/// initialize streams by sending HEADERS, the request will always be
/// available.