Add client support for server push (#314)

This patch exposes push promises to the client API.

Closes #252
This commit is contained in:
Michael Beaumont
2018-10-16 21:51:08 +02:00
committed by Carl Lerche
parent 6d8554a23c
commit 6b23542a55
11 changed files with 397 additions and 76 deletions

View File

@@ -162,8 +162,8 @@ use frame::{Headers, Pseudo, Reason, Settings, StreamId};
use proto;
use bytes::{Bytes, IntoBuf};
use futures::{Async, Future, Poll};
use http::{uri, Request, Response, Method, Version};
use futures::{Async, Future, Poll, Stream};
use http::{uri, HeaderMap, Request, Response, Method, Version};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::WriteAll;
@@ -294,6 +294,54 @@ pub struct Connection<T, B: IntoBuf = Bytes> {
#[must_use = "futures do nothing unless polled"]
pub struct ResponseFuture {
inner: proto::OpaqueStreamRef,
push_promise_consumed: bool,
}
/// A future of a pushed HTTP response.
///
/// We have to differentiate between pushed and non pushed because of the spec
/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
/// > that is in either the "open" or "half-closed (remote)" state.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct PushedResponseFuture {
inner: ResponseFuture,
}
/// A pushed response and corresponding request headers
#[derive(Debug)]
pub struct PushPromise {
/// The request headers
request: Request<()>,
/// The pushed response
response: PushedResponseFuture,
}
#[derive(Debug)]
/// A stream of pushed responses and corresponding promised requests
pub struct PushPromises {
inner: proto::OpaqueStreamRef,
}
impl Stream for PushPromises {
type Item = PushPromise;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match try_ready!(self.inner.poll_pushed()) {
Some((request, response)) => {
let response = PushedResponseFuture {
inner: ResponseFuture {
inner: response, push_promise_consumed: false
}
};
Ok(Async::Ready(Some(PushPromise{request, response})))
}
None => Ok(Async::Ready(None)),
}
}
}
/// Builds client connections with custom configuration values.
@@ -516,7 +564,7 @@ where
/// .body(())
/// .unwrap();
///
/// // Send the request to the server. Since we are not sending a
/// // Send the request to the server. If we are not sending a
/// // body or trailers, we can drop the `SendStream` instance.
/// let (response, mut send_stream) = send_request
/// .send_request(request, false).unwrap();
@@ -567,6 +615,7 @@ where
let response = ResponseFuture {
inner: stream.clone_to_opaque(),
push_promise_consumed: false,
};
let stream = SendStream::new(stream);
@@ -1352,6 +1401,61 @@ impl ResponseFuture {
pub fn stream_id(&self) -> ::StreamId {
::StreamId::from_internal(self.inner.stream_id())
}
/// Returns a stream of PushPromises
///
/// # Panics
///
/// If this method has been called before
/// or the stream was itself was pushed
pub fn push_promises(&mut self) -> PushPromises {
if self.push_promise_consumed {
panic!("Reference to push promises stream taken!");
}
self.push_promise_consumed = true;
PushPromises { inner: self.inner.clone() }
}
}
// ===== impl PushPromise =====
impl PushPromise {
/// Returns a reference to the push promise's request headers.
pub fn request(&self) -> &Request<()> {
&self.request
}
/// Returns a mutable reference to the push promise's request headers.
pub fn request_mut(&mut self) -> &mut Request<()> {
&mut self.request
}
/// Consumes `self`, returning the push promise's request headers and
/// response future.
pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
(self.request, self.response)
}
}
// ===== impl PushedResponseFuture =====
impl Future for PushedResponseFuture {
type Item = Response<RecvStream>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
impl PushedResponseFuture {
/// Returns the stream ID of the response stream.
///
/// # Panics
///
/// If the lock on the stream store has been poisoned.
pub fn stream_id(&self) -> ::StreamId {
self.inner.stream_id()
}
}
// ===== impl Peer =====
@@ -1431,12 +1535,11 @@ impl proto::Peer for Peer {
false
}
fn convert_poll_message(headers: Headers) -> Result<Self::Poll, RecvError> {
fn convert_poll_message(
pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId
) -> Result<Self::Poll, RecvError> {
let mut b = Response::builder();
let stream_id = headers.stream_id();
let (pseudo, fields) = headers.into_parts();
b.version(Version::HTTP_2);
if let Some(status) = pseudo.status {

View File

@@ -380,6 +380,13 @@ impl PushPromise {
}
}
impl PushPromise {
/// Consume `self`, returning the parts of the frame
pub fn into_parts(self) -> (Pseudo, HeaderMap) {
(self.header_block.pseudo, self.header_block.fields)
}
}
#[cfg(feature = "unstable")]
impl PushPromise {
pub fn new(
@@ -400,10 +407,6 @@ impl PushPromise {
}
}
pub fn into_parts(self) -> (Pseudo, HeaderMap) {
(self.header_block.pseudo, self.header_block.fields)
}
pub fn fields(&self) -> &HeaderMap {
&self.header_block.fields
}

View File

@@ -1,9 +1,9 @@
use codec::RecvError;
use error::Reason;
use frame::{Headers, StreamId};
use frame::{Pseudo, StreamId};
use proto::Open;
use http::{Request, Response};
use http::{HeaderMap, Request, Response};
use std::fmt;
@@ -16,7 +16,9 @@ pub(crate) trait Peer {
fn is_server() -> bool;
fn convert_poll_message(headers: Headers) -> Result<Self::Poll, RecvError>;
fn convert_poll_message(
pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId
) -> Result<Self::Poll, RecvError>;
fn is_local_init(id: StreamId) -> bool {
assert!(!id.is_zero());
@@ -51,12 +53,14 @@ impl Dyn {
self.is_server() == id.is_server_initiated()
}
pub fn convert_poll_message(&self, headers: Headers) -> Result<PollMessage, RecvError> {
pub fn convert_poll_message(
&self, pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId
) -> Result<PollMessage, RecvError> {
if self.is_server() {
::server::Peer::convert_poll_message(headers)
::server::Peer::convert_poll_message(pseudo, fields, stream_id)
.map(PollMessage::Server)
} else {
::client::Peer::convert_poll_message(headers)
::client::Peer::convert_poll_message(pseudo, fields, stream_id)
.map(PollMessage::Client)
}
}

View File

@@ -28,7 +28,6 @@ use frame::{StreamId, StreamIdOverflow};
use proto::*;
use bytes::Bytes;
use http::{Request, Response};
use std::time::Duration;
#[derive(Debug)]

View File

@@ -3,7 +3,7 @@ use {frame, proto};
use codec::{RecvError, UserError};
use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE};
use http::HeaderMap;
use http::{HeaderMap, Response, Request, Method};
use std::io;
use std::time::{Duration, Instant};
@@ -216,7 +216,9 @@ impl Recv {
};
}
let message = counts.peer().convert_poll_message(frame)?;
let stream_id = frame.stream_id();
let (pseudo, fields) = frame.into_parts();
let message = counts.peer().convert_poll_message(pseudo, fields, stream_id)?;
// Push the frame onto the stream's recv buffer
stream
@@ -247,6 +249,37 @@ impl Recv {
}
}
/// Called by the client to get pushed response
pub fn poll_pushed(
&mut self, stream: &mut store::Ptr
) -> Poll<Option<(Request<()>, store::Key)>, proto::Error> {
use super::peer::PollMessage::*;
let mut ppp = stream.pending_push_promises.take();
let pushed = ppp.pop(stream.store_mut()).map(
|mut pushed| match pushed.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Headers(Server(headers))) =>
Async::Ready(Some((headers, pushed.key()))),
// When frames are pushed into the queue, it is verified that
// the first frame is a HEADERS frame.
_ => panic!("Headers not set on pushed stream")
}
);
stream.pending_push_promises = ppp;
if let Some(p) = pushed {
Ok(p)
} else {
let is_open = stream.state.ensure_recv_open()?;
if is_open {
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
} else {
Ok(Async::Ready(None))
}
}
}
/// Called by the client to get the response
pub fn poll_response(
&mut self,
@@ -538,13 +571,7 @@ impl Recv {
frame: frame::PushPromise,
stream: &mut store::Ptr,
) -> Result<(), RecvError> {
// TODO: Streams in the reserved states do not count towards the concurrency
// limit. However, it seems like there should be a cap otherwise this
// could grow in memory indefinitely.
stream.state.reserve_remote()?;
if frame.is_over_size() {
// A frame is over size if the decoded header block was bigger than
// SETTINGS_MAX_HEADER_LIST_SIZE.
@@ -564,9 +591,46 @@ impl Recv {
});
}
let promised_id = frame.promised_id();
use http::header;
let (pseudo, fields) = frame.into_parts();
let req = ::server::Peer::convert_poll_message(pseudo, fields, promised_id)?;
// The spec has some requirements for promised request headers
// [https://httpwg.org/specs/rfc7540.html#PushRequests]
// A promised request "that indicates the presence of a request body
// MUST reset the promised stream with a stream error"
if let Some(content_length) = req.headers().get(header::CONTENT_LENGTH) {
match parse_u64(content_length.as_bytes()) {
Ok(0) => {},
_ => {
return Err(RecvError::Stream {
id: promised_id,
reason: Reason::PROTOCOL_ERROR,
});
},
}
}
// "The server MUST include a method in the :method pseudo-header field
// that is safe and cacheable"
if !Self::safe_and_cacheable(req.method()) {
return Err(RecvError::Stream {
id: promised_id,
reason: Reason::PROTOCOL_ERROR,
});
}
use super::peer::PollMessage::*;
stream.pending_recv.push_back(&mut self.buffer, Event::Headers(Server(req)));
stream.notify_recv();
Ok(())
}
fn safe_and_cacheable(method: &Method) -> bool {
// Cacheable: https://httpwg.org/specs/rfc7231.html#cacheable.methods
// Safe: https://httpwg.org/specs/rfc7231.html#safe.methods
return method == Method::GET || method == Method::HEAD;
}
/// Ensures that `id` is not in the `Idle` state.
pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> {
if let Ok(next) = self.next_stream_id {

View File

@@ -153,10 +153,7 @@ impl State {
if eos {
Closed(Cause::EndStream)
} else {
Open {
local: AwaitingHeaders,
remote,
}
HalfClosedLocal(Streaming)
}
},
Open {

View File

@@ -245,7 +245,6 @@ where
if let Err(RecvError::Stream { .. }) = res {
actions.recv.release_connection_capacity(sz as WindowSize, &mut None);
}
actions.reset_on_recv_stream_err(send_buffer, stream, counts, res)
})
}
@@ -426,6 +425,10 @@ where
None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)),
};
// TODO: Streams in the reserved states do not count towards the concurrency
// limit. However, it seems like there should be a cap otherwise this
// could grow in memory indefinitely.
// Ensure that we can reserve streams
me.actions.recv.ensure_can_reserve()?;
@@ -437,8 +440,9 @@ where
return Ok(());
}
// Create a scope
let child_key = {
// Try to handle the frame and create a corresponding key for the pushed stream
// this requires a bit of indirection to make the borrow checker happy.
let child_key: Option<store::Key> = {
// Create state for the stream
let stream = me.store.insert(promised_id, {
Stream::new(
@@ -450,23 +454,29 @@ where
let actions = &mut me.actions;
me.counts.transition(stream, |counts, stream| {
let res = actions.recv.recv_push_promise(frame, stream);
let stream_valid =
actions.recv.recv_push_promise(frame, stream);
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
actions.reset_on_recv_stream_err(&mut *send_buffer, stream, counts, res)
.map(|_| stream.key())
match stream_valid {
Ok(()) =>
Ok(Some(stream.key())),
_ => {
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
actions.reset_on_recv_stream_err(&mut *send_buffer, stream, counts, stream_valid)
.map(|()| None)
}
}
})?
};
// If we're successful, push the headers and stream...
if let Some(child) = child_key {
let mut ppp = me.store[parent_key].pending_push_promises.take();
ppp.push(&mut me.store.resolve(child));
// Push the stream... this requires a bit of indirection to make
// the borrow checker happy.
let mut ppp = me.store[parent_key].pending_push_promises.take();
ppp.push(&mut me.store.resolve(child_key));
let parent = &mut me.store[parent_key];
parent.pending_push_promises = ppp;
parent.notify_recv();
let parent = &mut me.store.resolve(parent_key);
parent.pending_push_promises = ppp;
parent.notify_recv();
};
Ok(())
}
@@ -972,6 +982,26 @@ impl OpaqueStreamRef {
me.actions.recv.poll_response(&mut stream)
}
/// Called by a client to check for a pushed request.
pub fn poll_pushed(
&mut self
) -> Poll<Option<(Request<()>, OpaqueStreamRef)>, proto::Error> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let res = {
let mut stream = me.store.resolve(self.key);
try_ready!(me.actions.recv.poll_pushed(&mut stream))
};
Ok(Async::Ready(res.map(|(h, key)| {
me.store.resolve(key).ref_inc();
let opaque_ref =
OpaqueStreamRef {
inner: self.inner.clone(), key,
};
(h, opaque_ref)
})))
}
pub fn body_is_empty(&self) -> bool {
let mut me = self.inner.lock().unwrap();
@@ -1102,6 +1132,7 @@ fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
maybe_cancel(stream, actions, counts);
if stream.ref_count == 0 {
// We won't be able to reach our push promises anymore
let mut ppp = stream.pending_push_promises.take();
while let Some(promise) = ppp.pop(stream.store_mut()) {
counts.transition(promise, |counts, stream| {

View File

@@ -131,12 +131,12 @@
use {SendStream, RecvStream, ReleaseCapacity};
use codec::{Codec, RecvError};
use frame::{self, Reason, Settings, StreamId};
use frame::{self, Pseudo, Reason, Settings, StreamId};
use proto::{self, Config, Prioritized};
use bytes::{Buf, Bytes, IntoBuf};
use futures::{self, Async, Future, Poll};
use http::{Request, Response};
use http::{HeaderMap, Request, Response};
use std::{convert, fmt, io, mem};
use std::time::Duration;
use tokio_io::{AsyncRead, AsyncWrite};
@@ -1168,7 +1168,7 @@ impl Peer {
// Build the set pseudo header set. All requests will include `method`
// and `path`.
let pseudo = frame::Pseudo::response(status);
let pseudo = Pseudo::response(status);
// Create the HEADERS frame
let mut frame = frame::Headers::new(id, pseudo, headers);
@@ -1192,14 +1192,13 @@ impl proto::Peer for Peer {
proto::DynPeer::Server
}
fn convert_poll_message(headers: frame::Headers) -> Result<Self::Poll, RecvError> {
fn convert_poll_message(
pseudo: Pseudo, fields: HeaderMap, stream_id: StreamId
) -> Result<Self::Poll, RecvError> {
use http::{uri, Version};
let mut b = Request::builder();
let stream_id = headers.stream_id();
let (pseudo, fields) = headers.into_parts();
macro_rules! malformed {
($($arg:tt)*) => {{
debug!($($arg)*);