diff --git a/src/codec/error.rs b/src/codec/error.rs index e89d5a1..18703ab 100644 --- a/src/codec/error.rs +++ b/src/codec/error.rs @@ -55,15 +55,6 @@ pub enum UserError { // ===== impl RecvError ===== -impl RecvError { - pub(crate) fn is_stream_error(&self) -> bool { - match *self { - RecvError::Stream { .. } => true, - _ => false, - } - } -} - impl From for RecvError { fn from(src: io::Error) -> Self { RecvError::Io(src) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index f41afe7..30e69e8 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -10,7 +10,7 @@ pub(crate) use self::connection::{Config, Connection}; pub(crate) use self::error::Error; pub(crate) use self::peer::{Peer, Dyn as DynPeer}; pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams}; -pub(crate) use self::streams::Prioritized; +pub(crate) use self::streams::{Prioritized, Open}; use codec::Codec; diff --git a/src/proto/peer.rs b/src/proto/peer.rs index 8325b88..8362db6 100644 --- a/src/proto/peer.rs +++ b/src/proto/peer.rs @@ -1,13 +1,14 @@ use codec::RecvError; use error::Reason; use frame::{Headers, StreamId}; +use proto::Open; use http::{Request, Response}; use std::fmt; /// Either a Client or a Server -pub trait Peer { +pub(crate) trait Peer { /// Message type polled from the transport type Poll: fmt::Debug; @@ -27,7 +28,7 @@ pub trait Peer { /// /// This is used internally to avoid incurring a generic on all internal types. #[derive(Debug, Copy, Clone, Eq, PartialEq)] -pub enum Dyn { +pub(crate) enum Dyn { Client, Server, } @@ -61,20 +62,23 @@ impl Dyn { } /// Returns true if the remote peer can initiate a stream with the given ID. - pub fn ensure_can_open(&self, id: StreamId) -> Result<(), RecvError> { - if !self.is_server() { - trace!("Cannot open stream {:?} - not server, PROTOCOL_ERROR", id); - // Remote is a server and cannot open streams. PushPromise is - // registered by reserving, so does not go through this path. - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); - } + pub fn ensure_can_open(&self, id: StreamId, mode: Open) -> Result<(), RecvError> { + if self.is_server() { + // Ensure that the ID is a valid client initiated ID + if mode.is_push_promise() || !id.is_client_initiated() { + trace!("Cannot open stream {:?} - not client initiated, PROTOCOL_ERROR", id); + return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + } - // Ensure that the ID is a valid server initiated ID - if !id.is_client_initiated() { - trace!("Cannot open stream {:?} - not client initiated, PROTOCOL_ERROR", id); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); - } + Ok(()) + } else { + // Ensure that the ID is a valid server initiated ID + if !mode.is_push_promise() || !id.is_server_initiated() { + trace!("Cannot open stream {:?} - not server initiated, PROTOCOL_ERROR", id); + return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + } - Ok(()) + Ok(()) + } } } diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 7a95e67..bbf19fa 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -10,6 +10,7 @@ mod stream; mod streams; pub(crate) use self::prioritize::Prioritized; +pub(crate) use self::recv::Open; pub(crate) use self::store::Key; pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams}; diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index a4a3259..83e49a4 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,5 +1,4 @@ use super::*; -use super::store::Resolve; use {frame, proto}; use codec::{RecvError, UserError}; use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; @@ -70,6 +69,12 @@ pub(super) enum RecvHeaderBlockError { State(RecvError), } +#[derive(Debug)] +pub(crate) enum Open { + PushPromise, + Headers, +} + #[derive(Debug, Clone, Copy)] struct Indices { head: store::Key, @@ -121,11 +126,12 @@ impl Recv { pub fn open( &mut self, id: StreamId, + mode: Open, counts: &mut Counts, ) -> Result, RecvError> { assert!(self.refused.is_none()); - counts.peer().ensure_can_open(id)?; + counts.peer().ensure_can_open(id, mode)?; let next_id = self.next_stream_id()?; if id < next_id { @@ -530,42 +536,27 @@ impl Recv { pub fn recv_push_promise( &mut self, frame: frame::PushPromise, - send: &Send, - stream: store::Key, - store: &mut Store, + stream: &mut store::Ptr, ) -> Result<(), RecvError> { - // First, make sure that the values are legit - self.ensure_can_reserve(frame.promised_id())?; - - // Make sure that the stream state is valid - store[stream].state.ensure_recv_open()?; // TODO: Streams in the reserved states do not count towards the concurrency // limit. However, it seems like there should be a cap otherwise this // could grow in memory indefinitely. - /* - if !self.inc_num_streams() { - self.refused = Some(frame.promised_id()); - return Ok(()); - } - */ - - // TODO: All earlier stream IDs should be implicitly closed. - - // Now, create a new entry for the stream - let mut new_stream = Stream::new( - frame.promised_id(), - send.init_window_sz(), - self.init_window_sz, - ); - - new_stream.state.reserve_remote()?; - // Store the stream - let new_stream = store.insert(frame.promised_id(), new_stream).key(); - + stream.state.reserve_remote()?; if frame.is_over_size() { + // A frame is over size if the decoded header block was bigger than + // SETTINGS_MAX_HEADER_LIST_SIZE. + // + // > A server that receives a larger header block than it is willing + // > to handle can send an HTTP 431 (Request Header Fields Too + // > Large) status code [RFC6585]. A client can discard responses + // > that it cannot process. + // + // So, if peer is a server, we'll send a 431. In either case, + // an error is recorded, which will send a REFUSED_STREAM, + // since we don't want any of the data frames either. trace!("recv_push_promise; frame for {:?} is over size", frame.promised_id()); return Err(RecvError::Stream { id: frame.promised_id(), @@ -573,14 +564,6 @@ impl Recv { }); } - let mut ppp = store[stream].pending_push_promises.take(); - ppp.push(&mut store.resolve(new_stream)); - - let stream = &mut store[stream]; - - stream.pending_push_promises = ppp; - stream.notify_recv(); - Ok(()) } @@ -618,7 +601,6 @@ impl Recv { pub fn go_away(&mut self, last_processed_id: StreamId) { assert!(self.max_stream_id >= last_processed_id); - self.max_stream_id = last_processed_id; } @@ -644,17 +626,9 @@ impl Recv { } /// Returns true if the remote peer can reserve a stream with the given ID. - fn ensure_can_reserve(&self, promised_id: StreamId) + pub fn ensure_can_reserve(&self) -> Result<(), RecvError> { - if !promised_id.is_server_initiated() { - trace!( - "recv_push_promise; error promised id is invalid {:?}", - promised_id - ); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); - } - if !self.is_push_enabled { trace!("recv_push_promise; error push is disabled"); return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); @@ -934,6 +908,19 @@ impl Event { } } +// ===== impl Open ===== + +impl Open { + pub fn is_push_promise(&self) -> bool { + use self::Open::*; + + match *self { + PushPromise => true, + _ => false, + } + } +} + // ===== impl RecvHeaderBlockError ===== impl From for RecvHeaderBlockError { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 3954852..8b11643 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,7 +1,7 @@ use {client, proto, server}; use codec::{Codec, RecvError, SendError, UserError}; use frame::{self, Frame, Reason}; -use proto::{peer, Peer, WindowSize}; +use proto::{peer, Peer, Open, WindowSize}; use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; use super::recv::RecvHeaderBlockError; use super::store::{self, Entry, Resolve, Store}; @@ -127,6 +127,8 @@ where let mut me = self.inner.lock().unwrap(); let me = &mut *me; + // The GOAWAY process has begun. All streams with a greater ID than + // specified as part of GOAWAY should be ignored. if id > me.actions.recv.max_stream_id() { trace!("id ({:?}) > max_stream_id ({:?}), ignoring HEADERS", id, me.actions.recv.max_stream_id()); return Ok(()); @@ -134,7 +136,7 @@ where let key = match me.store.find_entry(id) { Entry::Occupied(e) => e.key(), - Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts)? { + Entry::Vacant(e) => match me.actions.recv.open(id, Open::Headers, &mut me.counts)? { Some(stream_id) => { let stream = Stream::new( stream_id, @@ -182,7 +184,9 @@ where Reason::REFUSED_STREAM, counts, &mut actions.task); + actions.recv.enqueue_reset_expiration(stream, counts); + Ok(()) } else { Err(RecvError::Stream { @@ -215,6 +219,8 @@ where let stream = match me.store.find_mut(&id) { Some(stream) => stream, None => { + // The GOAWAY process has begun. All streams with a greater ID + // than specified as part of GOAWAY should be ignored. if id > me.actions.recv.max_stream_id() { trace!("id ({:?}) > max_stream_id ({:?}), ignoring DATA", id, me.actions.recv.max_stream_id()); return Ok(()); @@ -254,6 +260,8 @@ where return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); } + // The GOAWAY process has begun. All streams with a greater ID than + // specified as part of GOAWAY should be ignored. if id > me.actions.recv.max_stream_id() { trace!("id ({:?}) > max_stream_id ({:?}), ignoring RST_STREAM", id, me.actions.recv.max_stream_id()); return Ok(()); @@ -402,44 +410,66 @@ where let id = frame.stream_id(); let promised_id = frame.promised_id(); - let res = { - let stream = match me.store.find_mut(&id) { - Some(stream) => stream.key(), - None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)), - }; + // First, ensure that the initiating stream is still in a valid state. + let parent_key = match me.store.find_mut(&id) { + Some(stream) => { + // The GOAWAY process has begun. All streams with a greater ID + // than specified as part of GOAWAY should be ignored. + if id > me.actions.recv.max_stream_id() { + trace!("id ({:?}) > max_stream_id ({:?}), ignoring PUSH_PROMISE", id, me.actions.recv.max_stream_id()); + return Ok(()); + } - if me.counts.peer().is_server() { - // The remote is a client and cannot reserve - trace!("recv_push_promise; error remote is client"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + // The stream must be receive open + stream.state.ensure_recv_open()?; + stream.key() } - - me.actions.recv.recv_push_promise(frame, - &me.actions.send, - stream, - &mut me.store) + None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)), }; - if let Err(err) = res { - if let Some(ref mut new_stream) = me.store.find_mut(&promised_id) { + // Ensure that we can reserve streams + me.actions.recv.ensure_can_reserve()?; + + // Next, open the stream. + // + // If `None` is returned, then the stream is being refused. There is no + // further work to be done. + if me.actions.recv.open(promised_id, Open::PushPromise, &mut me.counts)?.is_none() { + return Ok(()); + } + + // Create a scope + let child_key = { + // Create state for the stream + let stream = me.store.insert(promised_id, { + Stream::new( + promised_id, + me.actions.send.init_window_sz(), + me.actions.recv.init_window_sz()) + }); + + let actions = &mut me.actions; + + me.counts.transition(stream, |counts, stream| { + let res = actions.recv.recv_push_promise(frame, stream); let mut send_buffer = self.send_buffer.inner.lock().unwrap(); - me.actions.reset_on_recv_stream_err( - &mut *send_buffer, - new_stream, - &mut me.counts, - Err(err)) - } else { - // If there was a stream error, the stream should have been stored - // so we can track sending a reset. - // - // Otherwise, this MUST be an connection error. - assert!(!err.is_stream_error()); - Err(err) - } - } else { - res - } + actions.reset_on_recv_stream_err(&mut *send_buffer, stream, counts, res) + .map(|_| stream.key()) + })? + }; + + // Push the stream... this requires a bit of indirection to make + // the borrow checker happy. + let mut ppp = me.store[parent_key].pending_push_promises.take(); + ppp.push(&mut me.store.resolve(child_key)); + + let parent = &mut me.store[parent_key]; + + parent.pending_push_promises = ppp; + parent.notify_recv(); + + Ok(()) } pub fn next_incoming(&mut self) -> Option> { @@ -633,7 +663,7 @@ where let key = match me.store.find_entry(id) { Entry::Occupied(e) => e.key(), - Entry::Vacant(e) => match me.actions.recv.open(id, &mut me.counts) { + Entry::Vacant(e) => match me.actions.recv.open(id, Open::Headers, &mut me.counts) { Ok(Some(stream_id)) => { let stream = Stream::new(stream_id, 0, 0); diff --git a/tests/h2-tests/tests/push_promise.rs b/tests/h2-tests/tests/push_promise.rs index cc32d82..44cfefe 100644 --- a/tests/h2-tests/tests/push_promise.rs +++ b/tests/h2-tests/tests/push_promise.rs @@ -199,3 +199,109 @@ fn recv_push_promise_with_unsafe_method_is_stream_error() { fn recv_push_promise_with_wrong_authority_is_stream_error() { // if server is foo.com, :authority = bar.com is stream error } + +#[test] +fn recv_push_promise_skipped_stream_id() { + // tests that by default, received push promises work + // TODO: once API exists, read the pushed response + let _ = ::env_logger::try_init(); + + let (io, srv) = mock::new(); + let mock = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .send_frame(frames::push_promise(1, 4).request("GET", "https://http2.akamai.com/style.css")) + .send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css")) + .recv_frame(frames::go_away(0).protocol_error()) + .close() + ; + + let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + + let req = client + .send_request(request, true) + .unwrap() + .0 + .then(|res| { + assert!(res.is_err()); + Ok::<_, ()>(()) + }); + + // client should see a protocol error + let conn = h2.then(|res| { + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "protocol error: unspecific protocol error detected" + ); + Ok::<(), ()>(()) + }); + + conn.unwrap().join(req) + }); + + h2.join(mock).wait().unwrap(); +} + +#[test] +fn recv_push_promise_dup_stream_id() { + // tests that by default, received push promises work + // TODO: once API exists, read the pushed response + let _ = ::env_logger::try_init(); + + let (io, srv) = mock::new(); + let mock = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://http2.akamai.com/") + .eos(), + ) + .send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css")) + .send_frame(frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css")) + .recv_frame(frames::go_away(0).protocol_error()) + .close() + ; + + let h2 = client::handshake(io).unwrap().and_then(|(mut client, h2)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://http2.akamai.com/") + .body(()) + .unwrap(); + + let req = client + .send_request(request, true) + .unwrap() + .0 + .then(|res| { + assert!(res.is_err()); + Ok::<_, ()>(()) + }); + + // client should see a protocol error + let conn = h2.then(|res| { + let err = res.unwrap_err(); + assert_eq!( + err.to_string(), + "protocol error: unspecific protocol error detected" + ); + Ok::<(), ()>(()) + }); + + conn.unwrap().join(req) + }); + + h2.join(mock).wait().unwrap(); +}