Support receiving continuation frames
This commit is contained in:
		| @@ -123,40 +123,36 @@ impl Headers { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn load(head: Head, src: &mut Cursor<Bytes>, decoder: &mut hpack::Decoder) |     /// Loads the header frame but doesn't actually do HPACK decoding. | ||||||
|         -> Result<Self, Error> |     /// | ||||||
|  |     /// HPACK decoding is done in the `load_hpack` step. | ||||||
|  |     pub fn load(head: Head, mut src: BytesMut) | ||||||
|  |         -> Result<(Self, BytesMut), Error> | ||||||
|     { |     { | ||||||
|         let flags = HeadersFlag(head.flag()); |         let flags = HeadersFlag(head.flag()); | ||||||
|  |         let mut pad = 0; | ||||||
|  |  | ||||||
|         trace!("loading headers; flags={:?}", flags); |         trace!("loading headers; flags={:?}", flags); | ||||||
|  |  | ||||||
|         // Read the padding length |         // Read the padding length | ||||||
|         if flags.is_padded() { |         if flags.is_padded() { | ||||||
|             let pad = src.get_u8() as usize; |             // TODO: Ensure payload is sized correctly | ||||||
|             let len = src.get_ref().len(); |             pad = src[0] as usize; | ||||||
|  |  | ||||||
|             if pad >= len { |             // Drop the padding | ||||||
|                 trace!("too much padding"); |             let _ = src.split_to(1); | ||||||
|                 return Err(Error::TooMuchPadding); |  | ||||||
|             } |  | ||||||
|  |  | ||||||
|             // Truncate the last `pad` bytes. |  | ||||||
|             let len = src.get_ref().len() - pad; |  | ||||||
|             src.get_mut().truncate(len); |  | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Read the stream dependency |         // Read the stream dependency | ||||||
|         let stream_dep = if flags.is_priority() { |         let stream_dep = if flags.is_priority() { | ||||||
|             let mut buf = [0u8; 4]; |  | ||||||
|  |  | ||||||
|             // Read the next 4 bytes |  | ||||||
|             src.copy_to_slice(&mut buf); |  | ||||||
|  |  | ||||||
|             // Parse the stream ID and exclusive flag |             // Parse the stream ID and exclusive flag | ||||||
|             let (stream_id, is_exclusive) = StreamId::parse(&buf); |             let (stream_id, is_exclusive) = StreamId::parse(&src[..4]); | ||||||
|  |  | ||||||
|             // Read the weight |             // Read the weight | ||||||
|             let weight = src.get_u8(); |             let weight = src[4]; | ||||||
|  |  | ||||||
|  |             // Drop the next 5 bytes | ||||||
|  |             let _ = src.split_to(5); | ||||||
|  |  | ||||||
|             Some(StreamDependency { |             Some(StreamDependency { | ||||||
|                 stream_id, |                 stream_id, | ||||||
| @@ -167,31 +163,56 @@ impl Headers { | |||||||
|             None |             None | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         let mut pseudo = Pseudo::default(); |         if pad > 0 { | ||||||
|         let mut fields = HeaderMap::new(); |             if pad > src.len() { | ||||||
|  |                 return Err(Error::TooMuchPadding); | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             let len = src.len() - pad; | ||||||
|  |             src.truncate(len); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         let headers = Headers { | ||||||
|  |             stream_id: head.stream_id(), | ||||||
|  |             stream_dep: stream_dep, | ||||||
|  |             fields: HeaderMap::new(), | ||||||
|  |             pseudo: Pseudo::default(), | ||||||
|  |             flags: flags, | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         Ok((headers, src)) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn load_hpack(&mut self, | ||||||
|  |                       src: BytesMut, | ||||||
|  |                       decoder: &mut hpack::Decoder) | ||||||
|  |         -> Result<(), Error> | ||||||
|  |     { | ||||||
|         let mut err = false; |         let mut err = false; | ||||||
|  |  | ||||||
|         macro_rules! set_pseudo { |         macro_rules! set_pseudo { | ||||||
|             ($field:ident, $val:expr) => {{ |             ($field:ident, $val:expr) => {{ | ||||||
|                 if pseudo.$field.is_some() { |                 if self.pseudo.$field.is_some() { | ||||||
|                     err = true; |                     err = true; | ||||||
|                 } else { |                 } else { | ||||||
|                     pseudo.$field = Some($val); |                     self.pseudo.$field = Some($val); | ||||||
|                 } |                 } | ||||||
|             }} |             }} | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  |         let mut src = Cursor::new(src.freeze()); | ||||||
|  |  | ||||||
|         // At this point, we're going to assume that the hpack encoded headers |         // At this point, we're going to assume that the hpack encoded headers | ||||||
|         // contain the entire payload. Later, we need to check for stream |         // contain the entire payload. Later, we need to check for stream | ||||||
|         // priority. |         // priority. | ||||||
|         // |         // | ||||||
|         // TODO: Provide a way to abort decoding if an error is hit. |         // TODO: Provide a way to abort decoding if an error is hit. | ||||||
|         let res = decoder.decode(src, |header| { |         let res = decoder.decode(&mut src, |header| { | ||||||
|             use hpack::Header::*; |             use hpack::Header::*; | ||||||
|  |  | ||||||
|             match header { |             match header { | ||||||
|                 Field { name, value } => { |                 Field { name, value } => { | ||||||
|                     fields.append(name, value); |                     self.fields.append(name, value); | ||||||
|                 } |                 } | ||||||
|                 Authority(v) => set_pseudo!(authority, v), |                 Authority(v) => set_pseudo!(authority, v), | ||||||
|                 Method(v) => set_pseudo!(method, v), |                 Method(v) => set_pseudo!(method, v), | ||||||
| @@ -211,13 +232,7 @@ impl Headers { | |||||||
|             return Err(hpack::DecoderError::RepeatedPseudo.into()); |             return Err(hpack::DecoderError::RepeatedPseudo.into()); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         Ok(Headers { |         Ok(()) | ||||||
|             stream_id: head.stream_id(), |  | ||||||
|             stream_dep: stream_dep, |  | ||||||
|             fields: fields, |  | ||||||
|             pseudo: pseudo, |  | ||||||
|             flags: flags, |  | ||||||
|         }) |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Returns `true` if the frame represents trailers |     /// Returns `true` if the frame represents trailers | ||||||
|   | |||||||
| @@ -2,10 +2,11 @@ use {hpack, ConnectionError}; | |||||||
| use frame::{self, Frame, Kind}; | use frame::{self, Frame, Kind}; | ||||||
| use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE; | use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE; | ||||||
| use proto::*; | use proto::*; | ||||||
|  | use error::Reason::*; | ||||||
|  |  | ||||||
| use futures::*; | use futures::*; | ||||||
|  |  | ||||||
| use bytes::Bytes; | use bytes::{Bytes, BytesMut}; | ||||||
|  |  | ||||||
| use tokio_io::AsyncRead; | use tokio_io::AsyncRead; | ||||||
| use tokio_io::codec::length_delimited; | use tokio_io::codec::length_delimited; | ||||||
| @@ -24,7 +25,16 @@ pub struct FramedRead<T> { | |||||||
|  |  | ||||||
| /// Partially loaded headers frame | /// Partially loaded headers frame | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| enum Partial { | struct Partial { | ||||||
|  |     /// Empty frame | ||||||
|  |     frame: Continuable, | ||||||
|  |  | ||||||
|  |     /// Partial header payload | ||||||
|  |     buf: BytesMut, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[derive(Debug)] | ||||||
|  | enum Continuable { | ||||||
|     Headers(frame::Headers), |     Headers(frame::Headers), | ||||||
|     // PushPromise(frame::PushPromise), |     // PushPromise(frame::PushPromise), | ||||||
| } | } | ||||||
| @@ -38,14 +48,14 @@ impl<T> FramedRead<T> { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn decode_frame(&mut self, mut bytes: Bytes) -> Result<Option<Frame>, ConnectionError> { |     fn decode_frame(&mut self, mut bytes: BytesMut) -> Result<Option<Frame>, ConnectionError> { | ||||||
|         trace!("decoding frame from {}B", bytes.len()); |         trace!("decoding frame from {}B", bytes.len()); | ||||||
|  |  | ||||||
|         // Parse the head |         // Parse the head | ||||||
|         let head = frame::Head::parse(&bytes); |         let head = frame::Head::parse(&bytes); | ||||||
|  |  | ||||||
|         if self.partial.is_some() && head.kind() != Kind::Continuation { |         if self.partial.is_some() && head.kind() != Kind::Continuation { | ||||||
|             unimplemented!(); |             return Err(ProtocolError.into()); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         let kind = head.kind(); |         let kind = head.kind(); | ||||||
| @@ -64,22 +74,29 @@ impl<T> FramedRead<T> { | |||||||
|             } |             } | ||||||
|             Kind::Data => { |             Kind::Data => { | ||||||
|                 let _ = bytes.split_to(frame::HEADER_LEN); |                 let _ = bytes.split_to(frame::HEADER_LEN); | ||||||
|                 frame::Data::load(head, bytes)?.into() |                 frame::Data::load(head, bytes.freeze())?.into() | ||||||
|             } |             } | ||||||
|             Kind::Headers => { |             Kind::Headers => { | ||||||
|                 let mut buf = Cursor::new(bytes); |                 // Drop the frame header | ||||||
|                 buf.set_position(frame::HEADER_LEN as u64); |  | ||||||
|  |  | ||||||
|                 // TODO: Change to drain: carllerche/bytes#130 |                 // TODO: Change to drain: carllerche/bytes#130 | ||||||
|                 let frame = try!(frame::Headers::load(head, &mut buf, &mut self.hpack)); |                 let _ = bytes.split_to(frame::HEADER_LEN); | ||||||
|  |  | ||||||
|  |                 // Parse the header frame w/o parsing the payload | ||||||
|  |                 let (mut headers, payload) = frame::Headers::load(head, bytes)?; | ||||||
|  |  | ||||||
|  |                 if headers.is_end_headers() { | ||||||
|  |                     // Load the HPACK encoded headers & return the frame | ||||||
|  |                     headers.load_hpack(payload, &mut self.hpack)?; | ||||||
|  |                     headers.into() | ||||||
|  |                 } else { | ||||||
|  |                     // Defer loading the frame | ||||||
|  |                     self.partial = Some(Partial { | ||||||
|  |                         frame: Continuable::Headers(headers), | ||||||
|  |                         buf: payload, | ||||||
|  |                     }); | ||||||
|  |  | ||||||
|                 if !frame.is_end_headers() { |  | ||||||
|                     // Wait for continuation frames |  | ||||||
|                     self.partial = Some(Partial::Headers(frame)); |  | ||||||
|                     return Ok(None); |                     return Ok(None); | ||||||
|                 } |                 } | ||||||
|  |  | ||||||
|                 frame.into() |  | ||||||
|             } |             } | ||||||
|             Kind::Reset => { |             Kind::Reset => { | ||||||
|                 frame::Reset::load(head, &bytes[frame::HEADER_LEN..])?.into() |                 frame::Reset::load(head, &bytes[frame::HEADER_LEN..])?.into() | ||||||
| @@ -95,7 +112,28 @@ impl<T> FramedRead<T> { | |||||||
|                 return Ok(None); |                 return Ok(None); | ||||||
|             } |             } | ||||||
|             Kind::Continuation => { |             Kind::Continuation => { | ||||||
|                 unimplemented!(); |                 // TODO: Un-hack this | ||||||
|  |                 let end_of_headers = (head.flag() & 0x4) == 0x4; | ||||||
|  |  | ||||||
|  |                 let mut partial = match self.partial.take() { | ||||||
|  |                     Some(partial) => partial, | ||||||
|  |                     None => return Err(ProtocolError.into()), | ||||||
|  |                 }; | ||||||
|  |  | ||||||
|  |                 // Extend the buf | ||||||
|  |                 partial.buf.extend_from_slice(&bytes[frame::HEADER_LEN..]); | ||||||
|  |  | ||||||
|  |                 if !end_of_headers { | ||||||
|  |                     self.partial = Some(partial); | ||||||
|  |                     return Ok(None); | ||||||
|  |                 } | ||||||
|  |  | ||||||
|  |                 match partial.frame { | ||||||
|  |                     Continuable::Headers(mut frame) => { | ||||||
|  |                         frame.load_hpack(partial.buf, &mut self.hpack)?; | ||||||
|  |                         frame.into() | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|             Kind::Unknown => { |             Kind::Unknown => { | ||||||
|                 unimplemented!() |                 unimplemented!() | ||||||
| @@ -116,7 +154,7 @@ impl<T> futures::Stream for FramedRead<T> | |||||||
|         loop { |         loop { | ||||||
|             trace!("poll"); |             trace!("poll"); | ||||||
|             let bytes = match try_ready!(self.inner.poll()) { |             let bytes = match try_ready!(self.inner.poll()) { | ||||||
|                 Some(bytes) => bytes.freeze(), |                 Some(bytes) => bytes, | ||||||
|                 None => return Ok(Async::Ready(None)), |                 None => return Ok(Async::Ready(None)), | ||||||
|             }; |             }; | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user