Validate received content-length header (#43)
If a content-length header is provided, the value should match the sum of all data frame lengths. If there is a mismatch, then the stream is reset.
This commit is contained in:
		| @@ -266,6 +266,10 @@ impl Headers { | |||||||
|         (self.pseudo, self.fields) |         (self.pseudo, self.fields) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn fields(&self) -> &HeaderMap { | ||||||
|  |         &self.fields | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn into_fields(self) -> HeaderMap { |     pub fn into_fields(self) -> HeaderMap { | ||||||
|         self.fields |         self.fields | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -149,6 +149,22 @@ impl<B, P> Recv<B, P> | |||||||
|             self.inc_num_streams(); |             self.inc_num_streams(); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  |         if !stream.content_length.is_head() { | ||||||
|  |             use http::header; | ||||||
|  |             use super::stream::ContentLength; | ||||||
|  |  | ||||||
|  |             if let Some(content_length) = frame.fields().get(header::CONTENT_LENGTH) { | ||||||
|  |                 let content_length = match parse_u64(content_length.as_bytes()) { | ||||||
|  |                     Ok(v) => v, | ||||||
|  |                     Err(_) => { | ||||||
|  |                         unimplemented!(); | ||||||
|  |                     } | ||||||
|  |                 }; | ||||||
|  |  | ||||||
|  |                 stream.content_length = ContentLength::Remaining(content_length); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|         let message = P::convert_poll_message(frame)?; |         let message = P::convert_poll_message(frame)?; | ||||||
|  |  | ||||||
|         // Push the frame onto the stream's recv buffer |         // Push the frame onto the stream's recv buffer | ||||||
| @@ -173,6 +189,13 @@ impl<B, P> Recv<B, P> | |||||||
|         // Transition the state |         // Transition the state | ||||||
|         stream.state.recv_close()?; |         stream.state.recv_close()?; | ||||||
|  |  | ||||||
|  |         if stream.ensure_content_length_zero().is_err() { | ||||||
|  |             return Err(ProtoError::Stream { | ||||||
|  |                 id: stream.id, | ||||||
|  |                 reason: ProtocolError, | ||||||
|  |             }); | ||||||
|  |         } | ||||||
|  |  | ||||||
|         let trailers = frame.into_fields(); |         let trailers = frame.into_fields(); | ||||||
|  |  | ||||||
|         // Push the frame onto the stream's recv buffer |         // Push the frame onto the stream's recv buffer | ||||||
| @@ -223,7 +246,7 @@ impl<B, P> Recv<B, P> | |||||||
|     pub fn recv_data(&mut self, |     pub fn recv_data(&mut self, | ||||||
|                      frame: frame::Data, |                      frame: frame::Data, | ||||||
|                      stream: &mut store::Ptr<B, P>) |                      stream: &mut store::Ptr<B, P>) | ||||||
|         -> Result<(), ConnectionError> |         -> Result<(), ProtoError> | ||||||
|     { |     { | ||||||
|         let sz = frame.payload().len(); |         let sz = frame.payload().len(); | ||||||
|  |  | ||||||
| @@ -236,7 +259,7 @@ impl<B, P> Recv<B, P> | |||||||
|         if !stream.state.is_recv_streaming() { |         if !stream.state.is_recv_streaming() { | ||||||
|             // Receiving a DATA frame when not expecting one is a protocol |             // Receiving a DATA frame when not expecting one is a protocol | ||||||
|             // error. |             // error. | ||||||
|             return Err(ProtocolError.into()); |             return Err(ProtoError::Connection(ProtocolError)); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         trace!("recv_data; size={}; connection={}; stream={}", |         trace!("recv_data; size={}; connection={}; stream={}", | ||||||
| @@ -245,7 +268,7 @@ impl<B, P> Recv<B, P> | |||||||
|         // Ensure that there is enough capacity on the connection before acting |         // Ensure that there is enough capacity on the connection before acting | ||||||
|         // on the stream. |         // on the stream. | ||||||
|         if self.flow.window_size() < sz || stream.recv_flow.window_size() < sz { |         if self.flow.window_size() < sz || stream.recv_flow.window_size() < sz { | ||||||
|             return Err(FlowControlError.into()); |             return Err(ProtoError::Connection(FlowControlError)); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Update connection level flow control |         // Update connection level flow control | ||||||
| @@ -257,9 +280,23 @@ impl<B, P> Recv<B, P> | |||||||
|         // Track the data as in-flight |         // Track the data as in-flight | ||||||
|         stream.in_flight_recv_data += sz; |         stream.in_flight_recv_data += sz; | ||||||
|  |  | ||||||
|  |         if stream.dec_content_length(frame.payload().len()).is_err() { | ||||||
|  |             return Err(ProtoError::Stream { | ||||||
|  |                 id: stream.id, | ||||||
|  |                 reason: ProtocolError, | ||||||
|  |             }); | ||||||
|  |         } | ||||||
|  |  | ||||||
|         if frame.is_end_stream() { |         if frame.is_end_stream() { | ||||||
|  |             if stream.ensure_content_length_zero().is_err() { | ||||||
|  |                 return Err(ProtoError::Stream { | ||||||
|  |                     id: stream.id, | ||||||
|  |                     reason: ProtocolError, | ||||||
|  |                 }); | ||||||
|  |             } | ||||||
|  |  | ||||||
|             if stream.state.recv_close().is_err() { |             if stream.state.recv_close().is_err() { | ||||||
|                 return Err(ProtocolError.into()); |                 return Err(ProtoError::Connection(ProtocolError)); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |  | ||||||
| @@ -619,3 +656,25 @@ impl<T> Event<T> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // ===== util ===== | ||||||
|  |  | ||||||
|  | fn parse_u64(src: &[u8]) -> Result<u64, ()> { | ||||||
|  |     if src.len() > 19 { | ||||||
|  |         // At danger for overflow... | ||||||
|  |         return Err(()); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     let mut ret = 0; | ||||||
|  |  | ||||||
|  |     for &d in src { | ||||||
|  |         if d < b'0' || d > b'9' { | ||||||
|  |             return Err(()); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         ret *= 10; | ||||||
|  |         ret += (d - b'0') as u64; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     Ok(ret) | ||||||
|  | } | ||||||
|   | |||||||
| @@ -71,6 +71,18 @@ pub(super) struct Stream<B, P> | |||||||
|  |  | ||||||
|     /// The stream's pending push promises |     /// The stream's pending push promises | ||||||
|     pub pending_push_promises: store::Queue<B, NextAccept, P>, |     pub pending_push_promises: store::Queue<B, NextAccept, P>, | ||||||
|  |  | ||||||
|  |     /// Validate content-length headers | ||||||
|  |     pub content_length: ContentLength, | ||||||
|  |  | ||||||
|  | } | ||||||
|  |  | ||||||
|  | /// State related to validating a stream's content-length | ||||||
|  | #[derive(Debug)] | ||||||
|  | pub enum ContentLength { | ||||||
|  |     Omitted, | ||||||
|  |     Head, | ||||||
|  |     Remaining(u64), | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| @@ -130,6 +142,7 @@ impl<B, P> Stream<B, P> | |||||||
|             pending_recv: buffer::Deque::new(), |             pending_recv: buffer::Deque::new(), | ||||||
|             recv_task: None, |             recv_task: None, | ||||||
|             pending_push_promises: store::Queue::new(), |             pending_push_promises: store::Queue::new(), | ||||||
|  |             content_length: ContentLength::Omitted, | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -144,6 +157,30 @@ impl<B, P> Stream<B, P> | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     /// Returns `Err` when the decrement cannot be completed due to overflow. | ||||||
|  |     pub fn dec_content_length(&mut self, len: usize) -> Result<(), ()> { | ||||||
|  |         match self.content_length { | ||||||
|  |             ContentLength::Remaining(ref mut rem) => { | ||||||
|  |                 match rem.checked_sub(len as u64) { | ||||||
|  |                     Some(val) => *rem = val, | ||||||
|  |                     None => return Err(()), | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |             ContentLength::Head => return Err(()), | ||||||
|  |             _ => {} | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn ensure_content_length_zero(&self) -> Result<(), ()> { | ||||||
|  |         match self.content_length { | ||||||
|  |             ContentLength::Remaining(0) => Ok(()), | ||||||
|  |             ContentLength::Remaining(_) => Err(()), | ||||||
|  |             _ => Ok(()), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn notify_send(&mut self) { |     pub fn notify_send(&mut self) { | ||||||
|         if let Some(task) = self.send_task.take() { |         if let Some(task) = self.send_task.take() { | ||||||
|             task.notify(); |             task.notify(); | ||||||
| @@ -244,3 +281,14 @@ impl store::Next for NextWindowUpdate { | |||||||
|         stream.is_pending_window_update = val; |         stream.is_pending_window_update = val; | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // ===== impl ContentLength ===== | ||||||
|  |  | ||||||
|  | impl ContentLength { | ||||||
|  |     pub fn is_head(&self) -> bool { | ||||||
|  |         match *self { | ||||||
|  |             ContentLength::Head => true, | ||||||
|  |             _ => false, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|   | |||||||
| @@ -103,6 +103,7 @@ impl<B, P> Streams<B, P> | |||||||
|                 actions.recv.recv_trailers(frame, stream) |                 actions.recv.recv_trailers(frame, stream) | ||||||
|             }; |             }; | ||||||
|  |  | ||||||
|  |             // TODO: extract this | ||||||
|             match res { |             match res { | ||||||
|                 Ok(()) => Ok(()), |                 Ok(()) => Ok(()), | ||||||
|                 Err(ProtoError::Connection(reason)) => Err(reason.into()), |                 Err(ProtoError::Connection(reason)) => Err(reason.into()), | ||||||
| @@ -130,7 +131,16 @@ impl<B, P> Streams<B, P> | |||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         me.actions.transition(stream, |actions, stream| { |         me.actions.transition(stream, |actions, stream| { | ||||||
|             actions.recv.recv_data(frame, stream) |             match actions.recv.recv_data(frame, stream) { | ||||||
|  |                 Ok(()) => Ok(()), | ||||||
|  |                 Err(ProtoError::Connection(reason)) => Err(reason.into()), | ||||||
|  |                 Err(ProtoError::Stream { reason, .. }) => { | ||||||
|  |                     // Reset the stream. | ||||||
|  |                     actions.send.send_reset(reason, stream, &mut actions.task); | ||||||
|  |                     Ok(()) | ||||||
|  |                 } | ||||||
|  |                 Err(ProtoError::Io(_)) => unreachable!(), | ||||||
|  |             } | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -286,6 +296,9 @@ impl<B, P> Streams<B, P> | |||||||
|     pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) |     pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) | ||||||
|         -> Result<StreamRef<B, P>, ConnectionError> |         -> Result<StreamRef<B, P>, ConnectionError> | ||||||
|     { |     { | ||||||
|  |         use http::method; | ||||||
|  |         use super::stream::ContentLength; | ||||||
|  |  | ||||||
|         // TODO: There is a hazard with assigning a stream ID before the |         // TODO: There is a hazard with assigning a stream ID before the | ||||||
|         // prioritize layer. If prioritization reorders new streams, this |         // prioritize layer. If prioritization reorders new streams, this | ||||||
|         // implicitly closes the earlier stream IDs. |         // implicitly closes the earlier stream IDs. | ||||||
| @@ -298,11 +311,15 @@ impl<B, P> Streams<B, P> | |||||||
|             // Initialize a new stream. This fails if the connection is at capacity. |             // Initialize a new stream. This fails if the connection is at capacity. | ||||||
|             let stream_id = me.actions.send.open()?; |             let stream_id = me.actions.send.open()?; | ||||||
|  |  | ||||||
|             let stream = Stream::new( |             let mut stream = Stream::new( | ||||||
|                 stream_id, |                 stream_id, | ||||||
|                 me.actions.send.init_window_sz(), |                 me.actions.send.init_window_sz(), | ||||||
|                 me.actions.recv.init_window_sz()); |                 me.actions.recv.init_window_sz()); | ||||||
|  |  | ||||||
|  |             if *request.method() == method::HEAD { | ||||||
|  |                 stream.content_length = ContentLength::Head; | ||||||
|  |             } | ||||||
|  |  | ||||||
|             // Convert the message |             // Convert the message | ||||||
|             let headers = client::Peer::convert_send_message( |             let headers = client::Peer::convert_send_message( | ||||||
|                 stream_id, request, end_of_stream); |                 stream_id, request, end_of_stream); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user