Enforce monotonic stream IDs for push promises (#275)

Previously, monotonic stream IDs (spec 5.1.1) for push promises were not
enforced. This was due to push promises going through an entirely
separate code path than normally initiated streams.

This patch unifies the code path for initializing streams via both
HEADERS and PUSH_PROMISE. This is done by first calling `recv.open` in
both cases.

Closes #272
This commit is contained in:
Carl Lerche
2018-05-14 10:20:57 -07:00
committed by GitHub
parent 173f9a67e7
commit bb454e017c
7 changed files with 227 additions and 108 deletions

View File

@@ -55,15 +55,6 @@ pub enum UserError {
// ===== impl RecvError ===== // ===== impl RecvError =====
impl RecvError {
pub(crate) fn is_stream_error(&self) -> bool {
match *self {
RecvError::Stream { .. } => true,
_ => false,
}
}
}
impl From<io::Error> for RecvError { impl From<io::Error> for RecvError {
fn from(src: io::Error) -> Self { fn from(src: io::Error) -> Self {
RecvError::Io(src) RecvError::Io(src)

View File

@@ -10,7 +10,7 @@ pub(crate) use self::connection::{Config, Connection};
pub(crate) use self::error::Error; pub(crate) use self::error::Error;
pub(crate) use self::peer::{Peer, Dyn as DynPeer}; pub(crate) use self::peer::{Peer, Dyn as DynPeer};
pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams}; pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams};
pub(crate) use self::streams::Prioritized; pub(crate) use self::streams::{Prioritized, Open};
use codec::Codec; use codec::Codec;

View File

@@ -1,13 +1,14 @@
use codec::RecvError; use codec::RecvError;
use error::Reason; use error::Reason;
use frame::{Headers, StreamId}; use frame::{Headers, StreamId};
use proto::Open;
use http::{Request, Response}; use http::{Request, Response};
use std::fmt; use std::fmt;
/// Either a Client or a Server /// Either a Client or a Server
pub trait Peer { pub(crate) trait Peer {
/// Message type polled from the transport /// Message type polled from the transport
type Poll: fmt::Debug; type Poll: fmt::Debug;
@@ -27,7 +28,7 @@ pub trait Peer {
/// ///
/// This is used internally to avoid incurring a generic on all internal types. /// This is used internally to avoid incurring a generic on all internal types.
#[derive(Debug, Copy, Clone, Eq, PartialEq)] #[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum Dyn { pub(crate) enum Dyn {
Client, Client,
Server, Server,
} }
@@ -61,20 +62,23 @@ impl Dyn {
} }
/// Returns true if the remote peer can initiate a stream with the given ID. /// Returns true if the remote peer can initiate a stream with the given ID.
pub fn ensure_can_open(&self, id: StreamId) -> Result<(), RecvError> { pub fn ensure_can_open(&self, id: StreamId, mode: Open) -> Result<(), RecvError> {
if !self.is_server() { if self.is_server() {
trace!("Cannot open stream {:?} - not server, PROTOCOL_ERROR", id); // Ensure that the ID is a valid client initiated ID
// Remote is a server and cannot open streams. PushPromise is if mode.is_push_promise() || !id.is_client_initiated() {
// registered by reserving, so does not go through this path. trace!("Cannot open stream {:?} - not client initiated, PROTOCOL_ERROR", id);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
} }
Ok(())
} else {
// Ensure that the ID is a valid server initiated ID // Ensure that the ID is a valid server initiated ID
if !id.is_client_initiated() { if !mode.is_push_promise() || !id.is_server_initiated() {
trace!("Cannot open stream {:?} - not client initiated, PROTOCOL_ERROR", id); trace!("Cannot open stream {:?} - not server initiated, PROTOCOL_ERROR", id);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
} }
Ok(()) Ok(())
} }
} }
}

View File

@@ -10,6 +10,7 @@ mod stream;
mod streams; mod streams;
pub(crate) use self::prioritize::Prioritized; pub(crate) use self::prioritize::Prioritized;
pub(crate) use self::recv::Open;
pub(crate) use self::store::Key; pub(crate) use self::store::Key;
pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams}; pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams};

View File

@@ -1,5 +1,4 @@
use super::*; use super::*;
use super::store::Resolve;
use {frame, proto}; use {frame, proto};
use codec::{RecvError, UserError}; use codec::{RecvError, UserError};
use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE};
@@ -70,6 +69,12 @@ pub(super) enum RecvHeaderBlockError<T> {
State(RecvError), State(RecvError),
} }
#[derive(Debug)]
pub(crate) enum Open {
PushPromise,
Headers,
}
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
struct Indices { struct Indices {
head: store::Key, head: store::Key,
@@ -121,11 +126,12 @@ impl Recv {
pub fn open( pub fn open(
&mut self, &mut self,
id: StreamId, id: StreamId,
mode: Open,
counts: &mut Counts, counts: &mut Counts,
) -> Result<Option<StreamId>, RecvError> { ) -> Result<Option<StreamId>, RecvError> {
assert!(self.refused.is_none()); assert!(self.refused.is_none());
counts.peer().ensure_can_open(id)?; counts.peer().ensure_can_open(id, mode)?;
let next_id = self.next_stream_id()?; let next_id = self.next_stream_id()?;
if id < next_id { if id < next_id {
@@ -530,42 +536,27 @@ impl Recv {
pub fn recv_push_promise( pub fn recv_push_promise(
&mut self, &mut self,
frame: frame::PushPromise, frame: frame::PushPromise,
send: &Send, stream: &mut store::Ptr,
stream: store::Key,
store: &mut Store,
) -> Result<(), RecvError> { ) -> Result<(), RecvError> {
// First, make sure that the values are legit
self.ensure_can_reserve(frame.promised_id())?;
// Make sure that the stream state is valid
store[stream].state.ensure_recv_open()?;
// TODO: Streams in the reserved states do not count towards the concurrency // TODO: Streams in the reserved states do not count towards the concurrency
// limit. However, it seems like there should be a cap otherwise this // limit. However, it seems like there should be a cap otherwise this
// could grow in memory indefinitely. // could grow in memory indefinitely.
/* stream.state.reserve_remote()?;
if !self.inc_num_streams() {
self.refused = Some(frame.promised_id());
return Ok(());
}
*/
// TODO: All earlier stream IDs should be implicitly closed.
// Now, create a new entry for the stream
let mut new_stream = Stream::new(
frame.promised_id(),
send.init_window_sz(),
self.init_window_sz,
);
new_stream.state.reserve_remote()?;
// Store the stream
let new_stream = store.insert(frame.promised_id(), new_stream).key();
if frame.is_over_size() { if frame.is_over_size() {
// A frame is over size if the decoded header block was bigger than
// SETTINGS_MAX_HEADER_LIST_SIZE.
//
// > A server that receives a larger header block than it is willing
// > to handle can send an HTTP 431 (Request Header Fields Too
// > Large) status code [RFC6585]. A client can discard responses
// > that it cannot process.
//
// So, if peer is a server, we'll send a 431. In either case,
// an error is recorded, which will send a REFUSED_STREAM,
// since we don't want any of the data frames either.
trace!("recv_push_promise; frame for {:?} is over size", frame.promised_id()); trace!("recv_push_promise; frame for {:?} is over size", frame.promised_id());
return Err(RecvError::Stream { return Err(RecvError::Stream {
id: frame.promised_id(), id: frame.promised_id(),
@@ -573,14 +564,6 @@ impl Recv {
}); });
} }
let mut ppp = store[stream].pending_push_promises.take();
ppp.push(&mut store.resolve(new_stream));
let stream = &mut store[stream];
stream.pending_push_promises = ppp;
stream.notify_recv();
Ok(()) Ok(())
} }
@@ -618,7 +601,6 @@ impl Recv {
pub fn go_away(&mut self, last_processed_id: StreamId) { pub fn go_away(&mut self, last_processed_id: StreamId) {
assert!(self.max_stream_id >= last_processed_id); assert!(self.max_stream_id >= last_processed_id);
self.max_stream_id = last_processed_id; self.max_stream_id = last_processed_id;
} }
@@ -644,17 +626,9 @@ impl Recv {
} }
/// Returns true if the remote peer can reserve a stream with the given ID. /// Returns true if the remote peer can reserve a stream with the given ID.
fn ensure_can_reserve(&self, promised_id: StreamId) pub fn ensure_can_reserve(&self)
-> Result<(), RecvError> -> Result<(), RecvError>
{ {
if !promised_id.is_server_initiated() {
trace!(
"recv_push_promise; error promised id is invalid {:?}",
promised_id
);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
}
if !self.is_push_enabled { if !self.is_push_enabled {
trace!("recv_push_promise; error push is disabled"); trace!("recv_push_promise; error push is disabled");
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
@@ -934,6 +908,19 @@ impl Event {
} }
} }
// ===== impl Open =====
impl Open {
pub fn is_push_promise(&self) -> bool {
use self::Open::*;
match *self {
PushPromise => true,
_ => false,
}
}
}
// ===== impl RecvHeaderBlockError ===== // ===== impl RecvHeaderBlockError =====
impl<T> From<RecvError> for RecvHeaderBlockError<T> { impl<T> From<RecvError> for RecvHeaderBlockError<T> {

View File

@@ -1,7 +1,7 @@
use {client, proto, server}; use {client, proto, server};
use codec::{Codec, RecvError, SendError, UserError}; use codec::{Codec, RecvError, SendError, UserError};
use frame::{self, Frame, Reason}; use frame::{self, Frame, Reason};
use proto::{peer, Peer, WindowSize}; use proto::{peer, Peer, Open, WindowSize};
use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
use super::recv::RecvHeaderBlockError; use super::recv::RecvHeaderBlockError;
use super::store::{self, Entry, Resolve, Store}; use super::store::{self, Entry, Resolve, Store};
@@ -127,6 +127,8 @@ where
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
// The GOAWAY process has begun. All streams with a greater ID than
// specified as part of GOAWAY should be ignored.
if id > me.actions.recv.max_stream_id() { if id > me.actions.recv.max_stream_id() {
trace!("id ({:?}) > max_stream_id ({:?}), ignoring HEADERS", id, me.actions.recv.max_stream_id()); trace!("id ({:?}) > max_stream_id ({:?}), ignoring HEADERS", id, me.actions.recv.max_stream_id());
return Ok(()); return Ok(());
@@ -134,7 +136,7 @@ where
let key = match me.store.find_entry(id) { let key = match me.store.find_entry(id) {
Entry::Occupied(e) => e.key(), Entry::Occupied(e) => e.key(),
Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts)? { Entry::Vacant(e) => match me.actions.recv.open(id, Open::Headers, &mut me.counts)? {
Some(stream_id) => { Some(stream_id) => {
let stream = Stream::new( let stream = Stream::new(
stream_id, stream_id,
@@ -182,7 +184,9 @@ where
Reason::REFUSED_STREAM, Reason::REFUSED_STREAM,
counts, counts,
&mut actions.task); &mut actions.task);
actions.recv.enqueue_reset_expiration(stream, counts); actions.recv.enqueue_reset_expiration(stream, counts);
Ok(()) Ok(())
} else { } else {
Err(RecvError::Stream { Err(RecvError::Stream {
@@ -215,6 +219,8 @@ where
let stream = match me.store.find_mut(&id) { let stream = match me.store.find_mut(&id) {
Some(stream) => stream, Some(stream) => stream,
None => { None => {
// The GOAWAY process has begun. All streams with a greater ID
// than specified as part of GOAWAY should be ignored.
if id > me.actions.recv.max_stream_id() { if id > me.actions.recv.max_stream_id() {
trace!("id ({:?}) > max_stream_id ({:?}), ignoring DATA", id, me.actions.recv.max_stream_id()); trace!("id ({:?}) > max_stream_id ({:?}), ignoring DATA", id, me.actions.recv.max_stream_id());
return Ok(()); return Ok(());
@@ -254,6 +260,8 @@ where
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
} }
// The GOAWAY process has begun. All streams with a greater ID than
// specified as part of GOAWAY should be ignored.
if id > me.actions.recv.max_stream_id() { if id > me.actions.recv.max_stream_id() {
trace!("id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM", id, me.actions.recv.max_stream_id()); trace!("id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM", id, me.actions.recv.max_stream_id());
return Ok(()); return Ok(());
@@ -402,44 +410,66 @@ where
let id = frame.stream_id(); let id = frame.stream_id();
let promised_id = frame.promised_id(); let promised_id = frame.promised_id();
let res = { // First, ensure that the initiating stream is still in a valid state.
let stream = match me.store.find_mut(&id) { let parent_key = match me.store.find_mut(&id) {
Some(stream) => stream.key(), Some(stream) => {
// The GOAWAY process has begun. All streams with a greater ID
// than specified as part of GOAWAY should be ignored.
if id > me.actions.recv.max_stream_id() {
trace!("id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE", id, me.actions.recv.max_stream_id());
return Ok(());
}
// The stream must be receive open
stream.state.ensure_recv_open()?;
stream.key()
}
None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)), None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)),
}; };
if me.counts.peer().is_server() { // Ensure that we can reserve streams
// The remote is a client and cannot reserve me.actions.recv.ensure_can_reserve()?;
trace!("recv_push_promise; error remote is client");
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); // Next, open the stream.
//
// If `None` is returned, then the stream is being refused. There is no
// further work to be done.
if me.actions.recv.open(promised_id, Open::PushPromise, &mut me.counts)?.is_none() {
return Ok(());
} }
me.actions.recv.recv_push_promise(frame, // Create a scope
&me.actions.send, let child_key = {
stream, // Create state for the stream
&mut me.store) let stream = me.store.insert(promised_id, {
}; Stream::new(
promised_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz())
});
if let Err(err) = res { let actions = &mut me.actions;
if let Some(ref mut new_stream) = me.store.find_mut(&promised_id) {
me.counts.transition(stream, |counts, stream| {
let res = actions.recv.recv_push_promise(frame, stream);
let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let mut send_buffer = self.send_buffer.inner.lock().unwrap();
me.actions.reset_on_recv_stream_err( actions.reset_on_recv_stream_err(&mut *send_buffer, stream, counts, res)
&mut *send_buffer, .map(|_| stream.key())
new_stream, })?
&mut me.counts, };
Err(err))
} else { // Push the stream... this requires a bit of indirection to make
// If there was a stream error, the stream should have been stored // the borrow checker happy.
// so we can track sending a reset. let mut ppp = me.store[parent_key].pending_push_promises.take();
// ppp.push(&mut me.store.resolve(child_key));
// Otherwise, this MUST be an connection error.
assert!(!err.is_stream_error()); let parent = &mut me.store[parent_key];
Err(err)
} parent.pending_push_promises = ppp;
} else { parent.notify_recv();
res
} Ok(())
} }
pub fn next_incoming(&mut self) -> Option<StreamRef<B>> { pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
@@ -633,7 +663,7 @@ where
let key = match me.store.find_entry(id) { let key = match me.store.find_entry(id) {
Entry::Occupied(e) => e.key(), Entry::Occupied(e) => e.key(),
Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts) { Entry::Vacant(e) => match me.actions.recv.open(id, Open::Headers, &mut me.counts) {
Ok(Some(stream_id)) => { Ok(Some(stream_id)) => {
let stream = Stream::new(stream_id, 0, 0); let stream = Stream::new(stream_id, 0, 0);

View File

@@ -199,3 +199,109 @@ fn recv_push_promise_with_unsafe_method_is_stream_error() {
fn recv_push_promise_with_wrong_authority_is_stream_error() { fn recv_push_promise_with_wrong_authority_is_stream_error() {
// if server is foo.com, :authority = bar.com is stream error // if server is foo.com, :authority = bar.com is stream error
} }
#[test]
fn recv_push_promise_skipped_stream_id() {
// tests that by default, received push promises work
// TODO: once API exists, read the pushed response
let _ = ::env_logger::try_init();
let (io, srv) = mock::new();
let mock = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(frames::push_promise(1, 4).request("GET", "https://http2.akamai.com/style.css"))
.send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"))
.recv_frame(frames::go_away(0).protocol_error())
.close()
;
let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| {
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = client
.send_request(request, true)
.unwrap()
.0
.then(|res| {
assert!(res.is_err());
Ok::<_, ()>(())
});
// client should see a protocol error
let conn = h2.then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: unspecific protocol error detected"
);
Ok::<(), ()>(())
});
conn.unwrap().join(req)
});
h2.join(mock).wait().unwrap();
}
#[test]
fn recv_push_promise_dup_stream_id() {
// tests that by default, received push promises work
// TODO: once API exists, read the pushed response
let _ = ::env_logger::try_init();
let (io, srv) = mock::new();
let mock = srv.assert_client_handshake()
.unwrap()
.recv_settings()
.recv_frame(
frames::headers(1)
.request("GET", "https://http2.akamai.com/")
.eos(),
)
.send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"))
.send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"))
.recv_frame(frames::go_away(0).protocol_error())
.close()
;
let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| {
let request = Request::builder()
.method(Method::GET)
.uri("https://http2.akamai.com/")
.body(())
.unwrap();
let req = client
.send_request(request, true)
.unwrap()
.0
.then(|res| {
assert!(res.is_err());
Ok::<_, ()>(())
});
// client should see a protocol error
let conn = h2.then(|res| {
let err = res.unwrap_err();
assert_eq!(
err.to_string(),
"protocol error: unspecific protocol error detected"
);
Ok::<(), ()>(())
});
conn.unwrap().join(req)
});
h2.join(mock).wait().unwrap();
}