Work on prioritization
This commit is contained in:
		| @@ -44,6 +44,14 @@ impl<T> Data<T> { | |||||||
|         self.flags.is_end_stream() |         self.flags.is_end_stream() | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn set_end_stream(&mut self) { | ||||||
|  |         self.flags.set_end_stream(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn unset_end_stream(&mut self) { | ||||||
|  |         self.flags.unset_end_stream(); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn head(&self) -> Head { |     pub fn head(&self) -> Head { | ||||||
|         Head::new(Kind::Data, self.flags.into(), self.stream_id) |         Head::new(Kind::Data, self.flags.into(), self.stream_id) | ||||||
|     } |     } | ||||||
| @@ -127,6 +135,10 @@ impl DataFlag { | |||||||
|         self.0 |= END_STREAM |         self.0 |= END_STREAM | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn unset_end_stream(&mut self) { | ||||||
|  |         self.0 &= !END_STREAM | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn is_padded(&self) -> bool { |     pub fn is_padded(&self) -> bool { | ||||||
|         self.0 & PADDED == PADDED |         self.0 & PADDED == PADDED | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -1,5 +1,9 @@ | |||||||
| use super::*; | use super::*; | ||||||
|  |  | ||||||
|  | use bytes::buf::Take; | ||||||
|  |  | ||||||
|  | use std::cmp; | ||||||
|  |  | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| pub(super) struct Prioritize<B> { | pub(super) struct Prioritize<B> { | ||||||
|     /// Streams that have pending frames |     /// Streams that have pending frames | ||||||
| @@ -25,7 +29,9 @@ pub(super) struct Prioritize<B> { | |||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| pub(crate) struct Prioritized<B> { | pub(crate) struct Prioritized<B> { | ||||||
|     // The buffer |     // The buffer | ||||||
|     inner: B, |     inner: Take<B>, | ||||||
|  |  | ||||||
|  |     end_of_stream: bool, | ||||||
|  |  | ||||||
|     // The stream that this is associated with |     // The stream that this is associated with | ||||||
|     stream: store::Key, |     stream: store::Key, | ||||||
| @@ -120,13 +126,21 @@ impl<B> Prioritize<B> | |||||||
|         // Reclaim any frame that has previously been written |         // Reclaim any frame that has previously been written | ||||||
|         self.reclaim_frame(store, dst); |         self.reclaim_frame(store, dst); | ||||||
|  |  | ||||||
|  |         // The max frame length | ||||||
|  |         let max_frame_len = dst.max_send_frame_size(); | ||||||
|  |  | ||||||
|         trace!("poll_complete"); |         trace!("poll_complete"); | ||||||
|         loop { |         loop { | ||||||
|             match self.pop_frame(store) { |             match self.pop_frame(store, max_frame_len) { | ||||||
|                 Some(frame) => { |                 Some(frame) => { | ||||||
|                     trace!("writing frame={:?}", frame); |                     // Figure out the byte size this frame applies to flow | ||||||
|  |                     // control | ||||||
|  |                     let len = cmp::min(frame.flow_len(), max_frame_len); | ||||||
|  |  | ||||||
|                     // Subtract the data size |                     // Subtract the data size | ||||||
|                     self.buffered_data -= frame.flow_len(); |                     self.buffered_data -= len; | ||||||
|  |  | ||||||
|  |                     trace!("writing frame={:?}", frame); | ||||||
|  |  | ||||||
|                     let res = dst.start_send(frame)?; |                     let res = dst.start_send(frame)?; | ||||||
|  |  | ||||||
| @@ -157,7 +171,9 @@ impl<B> Prioritize<B> | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn pop_frame(&mut self, store: &mut Store<B>) -> Option<Frame<Prioritized<B>>> { |     fn pop_frame(&mut self, store: &mut Store<B>, max_len: usize) | ||||||
|  |         -> Option<Frame<Prioritized<B>>> | ||||||
|  |     { | ||||||
|         loop { |         loop { | ||||||
|             match self.pop_sender(store) { |             match self.pop_sender(store) { | ||||||
|                 Some(mut stream) => { |                 Some(mut stream) => { | ||||||
| @@ -182,13 +198,24 @@ impl<B> Prioritize<B> | |||||||
|                         push_sender(&mut self.pending_send, &mut stream); |                         push_sender(&mut self.pending_send, &mut stream); | ||||||
|                     } |                     } | ||||||
|  |  | ||||||
|                     // Add prioritization logic |                     let frame = match frame { | ||||||
|                     let frame = frame.map(|buf| { |                         Frame::Data(mut frame) => { | ||||||
|                         Prioritized { |                             let eos = frame.is_end_stream(); | ||||||
|                             inner: buf, |  | ||||||
|                             stream: stream.key(), |                             if frame.payload().remaining() > max_len { | ||||||
|  |                                 frame.unset_end_stream(); | ||||||
|  |                             } | ||||||
|  |  | ||||||
|  |                             Frame::Data(frame.map(|buf| { | ||||||
|  |                                 Prioritized { | ||||||
|  |                                     inner: buf.take(max_len), | ||||||
|  |                                     end_of_stream: eos, | ||||||
|  |                                     stream: stream.key(), | ||||||
|  |                                 } | ||||||
|  |                             })) | ||||||
|                         } |                         } | ||||||
|                     }); |                         frame => frame.map(|_| unreachable!()), | ||||||
|  |                     }; | ||||||
|  |  | ||||||
|                     return Some(frame); |                     return Some(frame); | ||||||
|                 } |                 } | ||||||
| @@ -231,11 +258,18 @@ impl<B> Prioritize<B> | |||||||
|             if frame.payload().has_remaining() { |             if frame.payload().has_remaining() { | ||||||
|                 let mut stream = store.resolve(frame.payload().stream); |                 let mut stream = store.resolve(frame.payload().stream); | ||||||
|  |  | ||||||
|                 let frame = frame.map(|prioritized| { |                 let mut eos = false; | ||||||
|  |  | ||||||
|  |                 let mut frame = frame.map(|prioritized| { | ||||||
|                     // TODO: Ensure fully written |                     // TODO: Ensure fully written | ||||||
|                     prioritized.inner |                     eos = prioritized.end_of_stream; | ||||||
|  |                     prioritized.inner.into_inner() | ||||||
|                 }); |                 }); | ||||||
|  |  | ||||||
|  |                 if eos { | ||||||
|  |                     frame.set_end_stream(); | ||||||
|  |                 } | ||||||
|  |  | ||||||
|                 self.push_back_frame(frame.into(), &mut stream); |                 self.push_back_frame(frame.into(), &mut stream); | ||||||
|  |  | ||||||
|                 return true; |                 return true; | ||||||
|   | |||||||
| @@ -219,7 +219,7 @@ impl State { | |||||||
|                 self.inner = Closed(match *err { |                 self.inner = Closed(match *err { | ||||||
|                     ConnectionError::Proto(reason) => Some(Cause::Proto(reason)), |                     ConnectionError::Proto(reason) => Some(Cause::Proto(reason)), | ||||||
|                     ConnectionError::Io(..) => Some(Cause::Io), |                     ConnectionError::Io(..) => Some(Cause::Io), | ||||||
|                     _ => panic!("cannot terminate stream with user error"), |                     ref e => panic!("cannot terminate stream with user error; err={:?}", e), | ||||||
|                 }); |                 }); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|   | |||||||
| @@ -1,13 +1,12 @@ | |||||||
| pub mod support; | pub mod support; | ||||||
| use support::*; | use support::*; | ||||||
|  |  | ||||||
| use h2::Frame; |  | ||||||
|  |  | ||||||
| #[test] | #[test] | ||||||
| #[ignore] |  | ||||||
| fn single_stream_send_large_body() { | fn single_stream_send_large_body() { | ||||||
|     let _ = ::env_logger::init(); |     let _ = ::env_logger::init(); | ||||||
|  |  | ||||||
|  |     let payload = [0; 1024]; | ||||||
|  |  | ||||||
|     let mock = mock_io::Builder::new() |     let mock = mock_io::Builder::new() | ||||||
|         .handshake() |         .handshake() | ||||||
|         .write(&[ |         .write(&[ | ||||||
| @@ -17,54 +16,78 @@ fn single_stream_send_large_body() { | |||||||
|         ]) |         ]) | ||||||
|         .write(&[ |         .write(&[ | ||||||
|             // DATA |             // DATA | ||||||
|             0, 0, 5, 0, 1, 0, 0, 0, 1, 104, 101, 108, 108, 111, |             0, 4, 0, 0, 1, 0, 0, 0, 1, | ||||||
|         ]) |         ]) | ||||||
|  |         .write(&payload[..]) | ||||||
|         .write(frames::SETTINGS_ACK) |         .write(frames::SETTINGS_ACK) | ||||||
|         // Read response |         // Read response | ||||||
|         .read(&[ |         .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) | ||||||
|             // HEADERS |  | ||||||
|             0, 0, 1, 1, 4, 0, 0, 0, 1, 136, |  | ||||||
|             // DATA |  | ||||||
|             0, 0, 5, 0, 1, 0, 0, 0, 1, 119, 111, 114, 108, 100 |  | ||||||
|         ]) |  | ||||||
|         .build(); |         .build(); | ||||||
|  |  | ||||||
|     /* |     let mut h2 = Client::handshake(mock) | ||||||
|     let h2 = client::handshake(mock) |  | ||||||
|         .wait().unwrap(); |         .wait().unwrap(); | ||||||
|  |  | ||||||
|     // Send the request |     let request = Request::builder() | ||||||
|     let mut request = request::Head::default(); |         .method(method::POST) | ||||||
|     request.method = method::POST; |         .uri("https://http2.akamai.com/") | ||||||
|     request.uri = "https://http2.akamai.com/".parse().unwrap(); |         .body(()).unwrap(); | ||||||
|     let h2 = h2.send_request(1.into(), request, false).wait().unwrap(); |  | ||||||
|  |     let mut stream = h2.request(request, false).unwrap(); | ||||||
|  |  | ||||||
|     // Send the data |     // Send the data | ||||||
|     let b = [0; 300]; |     stream.send_data(payload[..].into(), true).unwrap(); | ||||||
|     let h2 = h2.send_data(1.into(), (&b[..]).into(), true).wait().unwrap(); |  | ||||||
|  |  | ||||||
|     // Get the response headers |     // Get the response | ||||||
|     let (resp, h2) = h2.into_future().wait().unwrap(); |     let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); | ||||||
|  |     assert_eq!(resp.status(), status::NO_CONTENT); | ||||||
|  |  | ||||||
|     match resp.unwrap() { |     h2.wait().unwrap(); | ||||||
|         Frame::Headers { headers, .. } => { | } | ||||||
|             assert_eq!(headers.status, status::OK); |  | ||||||
|         } | #[test] | ||||||
|         _ => panic!("unexpected frame"), | fn single_stream_send_extra_large_body_multi_frames() { | ||||||
|     } |     let _ = ::env_logger::init(); | ||||||
|  |  | ||||||
|     // Get the response body |     let payload = vec![0; 32_768]; | ||||||
|     let (data, h2) = h2.into_future().wait().unwrap(); |  | ||||||
|  |     let mock = mock_io::Builder::new() | ||||||
|     match data.unwrap() { |         .handshake() | ||||||
|         Frame::Data { id, data, end_of_stream, .. } => { |         .write(&[ | ||||||
|             assert_eq!(id, 1.into()); |             // POST / | ||||||
|             assert_eq!(data, &b"world"[..]); |             0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, | ||||||
|             assert!(end_of_stream); |             172, 75, 143, 168, 233, 25, 151, 33, 233, 132, | ||||||
|         } |         ]) | ||||||
|         _ => panic!("unexpected frame"), |         .write(&[ | ||||||
|     } |             // DATA | ||||||
|  |             0, 64, 0, 0, 0, 0, 0, 0, 1, | ||||||
|     assert!(Stream::wait(h2).next().is_none());; |         ]) | ||||||
|     */ |         .write(&payload[0..16_384]) | ||||||
|  |         .write(frames::SETTINGS_ACK) | ||||||
|  |         .write(&[ | ||||||
|  |             // DATA | ||||||
|  |             0, 64, 0, 0, 1, 0, 0, 0, 1, | ||||||
|  |         ]) | ||||||
|  |         .write(&payload[16_384..]) | ||||||
|  |         // Read response | ||||||
|  |         .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) | ||||||
|  |         .build(); | ||||||
|  |  | ||||||
|  |     let mut h2 = Client::handshake(mock) | ||||||
|  |         .wait().unwrap(); | ||||||
|  |  | ||||||
|  |     let request = Request::builder() | ||||||
|  |         .method(method::POST) | ||||||
|  |         .uri("https://http2.akamai.com/") | ||||||
|  |         .body(()).unwrap(); | ||||||
|  |  | ||||||
|  |     let mut stream = h2.request(request, false).unwrap(); | ||||||
|  |  | ||||||
|  |     // Send the data | ||||||
|  |     stream.send_data(payload.into(), true).unwrap(); | ||||||
|  |  | ||||||
|  |     // Get the response | ||||||
|  |     let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); | ||||||
|  |     assert_eq!(resp.status(), status::NO_CONTENT); | ||||||
|  |  | ||||||
|  |     h2.wait().unwrap(); | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user