From 6fd9674759e12dabf7637798dab966728853b5d8 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 31 Aug 2017 12:40:02 -0400 Subject: [PATCH] Validate received content-length header (#43) If a content-length header is provided, the value should match the sum of all data frame lengths. If there is a mismatch, then the stream is reset. --- src/frame/headers.rs | 4 +++ src/proto/streams/recv.rs | 67 +++++++++++++++++++++++++++++++++--- src/proto/streams/stream.rs | 48 ++++++++++++++++++++++++++ src/proto/streams/streams.rs | 21 +++++++++-- 4 files changed, 134 insertions(+), 6 deletions(-) diff --git a/src/frame/headers.rs b/src/frame/headers.rs index 6211a83..e913c54 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -266,6 +266,10 @@ impl Headers { (self.pseudo, self.fields) } + pub fn fields(&self) -> &HeaderMap { + &self.fields + } + pub fn into_fields(self) -> HeaderMap { self.fields } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 2777822..9d75fbc 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -149,6 +149,22 @@ impl Recv self.inc_num_streams(); } + if !stream.content_length.is_head() { + use http::header; + use super::stream::ContentLength; + + if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) { + let content_length = match parse_u64(content_length.as_bytes()) { + Ok(v) => v, + Err(_) => { + unimplemented!(); + } + }; + + stream.content_length = ContentLength::Remaining(content_length); + } + } + let message = P::convert_poll_message(frame)?; // Push the frame onto the stream's recv buffer @@ -173,6 +189,13 @@ impl Recv // Transition the state stream.state.recv_close()?; + if stream.ensure_content_length_zero().is_err() { + return Err(ProtoError::Stream { + id: stream.id, + reason: ProtocolError, + }); + } + let trailers = frame.into_fields(); // Push the frame onto the stream's recv buffer @@ -223,7 +246,7 @@ impl Recv pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) - -> Result<(), ConnectionError> + -> Result<(), ProtoError> { let sz = frame.payload().len(); @@ -236,7 +259,7 @@ impl Recv if !stream.state.is_recv_streaming() { // Receiving a DATA frame when not expecting one is a protocol // error. - return Err(ProtocolError.into()); + return Err(ProtoError::Connection(ProtocolError)); } trace!("recv_data; size={}; connection={}; stream={}", @@ -245,7 +268,7 @@ impl Recv // Ensure that there is enough capacity on the connection before acting // on the stream. if self.flow.window_size() < sz || stream.recv_flow.window_size() < sz { - return Err(FlowControlError.into()); + return Err(ProtoError::Connection(FlowControlError)); } // Update connection level flow control @@ -257,9 +280,23 @@ impl Recv // Track the data as in-flight stream.in_flight_recv_data += sz; + if stream.dec_content_length(frame.payload().len()).is_err() { + return Err(ProtoError::Stream { + id: stream.id, + reason: ProtocolError, + }); + } + if frame.is_end_stream() { + if stream.ensure_content_length_zero().is_err() { + return Err(ProtoError::Stream { + id: stream.id, + reason: ProtocolError, + }); + } + if stream.state.recv_close().is_err() { - return Err(ProtocolError.into()); + return Err(ProtoError::Connection(ProtocolError)); } } @@ -619,3 +656,25 @@ impl Event { } } } + +// ===== util ===== + +fn parse_u64(src: &[u8]) -> Result { + if src.len() > 19 { + // At danger for overflow... + return Err(()); + } + + let mut ret = 0; + + for &d in src { + if d < b'0' || d > b'9' { + return Err(()); + } + + ret *= 10; + ret += (d - b'0') as u64; + } + + Ok(ret) +} diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index ab3859c..57e0d8c 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -71,6 +71,18 @@ pub(super) struct Stream /// The stream's pending push promises pub pending_push_promises: store::Queue, + + /// Validate content-length headers + pub content_length: ContentLength, + +} + +/// State related to validating a stream's content-length +#[derive(Debug)] +pub enum ContentLength { + Omitted, + Head, + Remaining(u64), } #[derive(Debug)] @@ -130,6 +142,7 @@ impl Stream pending_recv: buffer::Deque::new(), recv_task: None, pending_push_promises: store::Queue::new(), + content_length: ContentLength::Omitted, } } @@ -144,6 +157,30 @@ impl Stream } } + /// Returns `Err` when the decrement cannot be completed due to overflow. + pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { + match self.content_length { + ContentLength::Remaining(ref mut rem) => { + match rem.checked_sub(len as u64) { + Some(val) => *rem = val, + None => return Err(()), + } + } + ContentLength::Head => return Err(()), + _ => {} + } + + Ok(()) + } + + pub fn ensure_content_length_zero(&self) -> Result<(), ()> { + match self.content_length { + ContentLength::Remaining(0) => Ok(()), + ContentLength::Remaining(_) => Err(()), + _ => Ok(()), + } + } + pub fn notify_send(&mut self) { if let Some(task) = self.send_task.take() { task.notify(); @@ -244,3 +281,14 @@ impl store::Next for NextWindowUpdate { stream.is_pending_window_update = val; } } + +// ===== impl ContentLength ===== + +impl ContentLength { + pub fn is_head(&self) -> bool { + match *self { + ContentLength::Head => true, + _ => false, + } + } +} diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 388c3a1..f810913 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -103,6 +103,7 @@ impl Streams actions.recv.recv_trailers(frame, stream) }; + // TODO: extract this match res { Ok(()) => Ok(()), Err(ProtoError::Connection(reason)) => Err(reason.into()), @@ -130,7 +131,16 @@ impl Streams }; me.actions.transition(stream, |actions, stream| { - actions.recv.recv_data(frame, stream) + match actions.recv.recv_data(frame, stream) { + Ok(()) => Ok(()), + Err(ProtoError::Connection(reason)) => Err(reason.into()), + Err(ProtoError::Stream { reason, .. }) => { + // Reset the stream. + actions.send.send_reset(reason, stream, &mut actions.task); + Ok(()) + } + Err(ProtoError::Io(_)) => unreachable!(), + } }) } @@ -286,6 +296,9 @@ impl Streams pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) -> Result, ConnectionError> { + use http::method; + use super::stream::ContentLength; + // TODO: There is a hazard with assigning a stream ID before the // prioritize layer. If prioritization reorders new streams, this // implicitly closes the earlier stream IDs. @@ -298,11 +311,15 @@ impl Streams // Initialize a new stream. This fails if the connection is at capacity. let stream_id = me.actions.send.open()?; - let stream = Stream::new( + let mut stream = Stream::new( stream_id, me.actions.send.init_window_sz(), me.actions.recv.init_window_sz()); + if *request.method() == method::HEAD { + stream.content_length = ContentLength::Head; + } + // Convert the message let headers = client::Peer::convert_send_message( stream_id, request, end_of_stream);