Get large body writes working
This commit is contained in:
		| @@ -196,7 +196,7 @@ impl<T, B> Sink for FramedWrite<T, B> | |||||||
|         while !self.is_empty() { |         while !self.is_empty() { | ||||||
|             match self.next { |             match self.next { | ||||||
|                 Some(Next::Data(ref mut frame)) => { |                 Some(Next::Data(ref mut frame)) => { | ||||||
|                     let mut buf = self.buf.by_ref().chain(frame.payload_mut()); |                     let mut buf = Buf::by_ref(&mut self.buf).chain(frame.payload_mut()); | ||||||
|                     try_ready!(self.inner.write_buf(&mut buf)); |                     try_ready!(self.inner.write_buf(&mut buf)); | ||||||
|                 } |                 } | ||||||
|                 _ => { |                 _ => { | ||||||
|   | |||||||
| @@ -2,7 +2,7 @@ use super::*; | |||||||
|  |  | ||||||
| use bytes::buf::Take; | use bytes::buf::Take; | ||||||
|  |  | ||||||
| use std::cmp; | use std::{fmt, cmp}; | ||||||
|  |  | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| pub(super) struct Prioritize<B> { | pub(super) struct Prioritize<B> { | ||||||
| @@ -26,7 +26,6 @@ pub(super) struct Prioritize<B> { | |||||||
|     conn_task: Option<task::Task>, |     conn_task: Option<task::Task>, | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Debug)] |  | ||||||
| pub(crate) struct Prioritized<B> { | pub(crate) struct Prioritized<B> { | ||||||
|     // The buffer |     // The buffer | ||||||
|     inner: Take<B>, |     inner: Take<B>, | ||||||
| @@ -253,18 +252,23 @@ impl<B> Prioritize<B> | |||||||
|                         store: &mut Store<B>, |                         store: &mut Store<B>, | ||||||
|                         dst: &mut Codec<T, Prioritized<B>>) -> bool |                         dst: &mut Codec<T, Prioritized<B>>) -> bool | ||||||
|     { |     { | ||||||
|  |         trace!("try reclaim frame"); | ||||||
|  |  | ||||||
|         // First check if there are any data chunks to take back |         // First check if there are any data chunks to take back | ||||||
|         if let Some(frame) = dst.take_last_data_frame() { |         if let Some(frame) = dst.take_last_data_frame() { | ||||||
|  |             trace!("  -> reclaimed; frame={:?}", frame); | ||||||
|  |  | ||||||
|  |             let mut eos = false; | ||||||
|  |             let key = frame.payload().stream; | ||||||
|  |  | ||||||
|  |             let mut frame = frame.map(|prioritized| { | ||||||
|  |                 // TODO: Ensure fully written | ||||||
|  |                 eos = prioritized.end_of_stream; | ||||||
|  |                 prioritized.inner.into_inner() | ||||||
|  |             }); | ||||||
|  |  | ||||||
|             if frame.payload().has_remaining() { |             if frame.payload().has_remaining() { | ||||||
|                 let mut stream = store.resolve(frame.payload().stream); |                 let mut stream = store.resolve(key); | ||||||
|  |  | ||||||
|                 let mut eos = false; |  | ||||||
|  |  | ||||||
|                 let mut frame = frame.map(|prioritized| { |  | ||||||
|                     // TODO: Ensure fully written |  | ||||||
|                     eos = prioritized.end_of_stream; |  | ||||||
|                     prioritized.inner.into_inner() |  | ||||||
|                 }); |  | ||||||
|  |  | ||||||
|                 if eos { |                 if eos { | ||||||
|                     frame.set_end_stream(); |                     frame.set_end_stream(); | ||||||
| @@ -312,3 +316,13 @@ impl<B> Buf for Prioritized<B> | |||||||
|         self.inner.advance(cnt) |         self.inner.advance(cnt) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | impl<B: Buf> fmt::Debug for Prioritized<B> { | ||||||
|  |     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { | ||||||
|  |         fmt.debug_struct("Prioritized") | ||||||
|  |             .field("remaining", &self.inner.get_ref().remaining()) | ||||||
|  |             .field("end_of_stream", &self.end_of_stream) | ||||||
|  |             .field("stream", &self.stream) | ||||||
|  |             .finish() | ||||||
|  |     } | ||||||
|  | } | ||||||
|   | |||||||
| @@ -48,7 +48,7 @@ fn recv_invalid_server_stream_id() { | |||||||
|         .body(()).unwrap(); |         .body(()).unwrap(); | ||||||
|  |  | ||||||
|     info!("sending request"); |     info!("sending request"); | ||||||
|     let mut stream = h2.request(request, true).unwrap(); |     let stream = h2.request(request, true).unwrap(); | ||||||
|  |  | ||||||
|     // The connection errors |     // The connection errors | ||||||
|     assert_proto_err!(h2.wait().unwrap_err(), ProtocolError); |     assert_proto_err!(h2.wait().unwrap_err(), ProtocolError); | ||||||
|   | |||||||
| @@ -1,16 +1,14 @@ | |||||||
| pub mod support; | pub mod support; | ||||||
| use support::*; | use support::*; | ||||||
|  |  | ||||||
| use h2::Frame; |  | ||||||
|  |  | ||||||
| #[test] | #[test] | ||||||
| #[ignore] | #[ignore] | ||||||
| fn recv_single_ping() { | fn recv_single_ping() { | ||||||
|  |     /* | ||||||
|     let _ = ::env_logger::init(); |     let _ = ::env_logger::init(); | ||||||
|  |  | ||||||
|     let mock = mock_io::Builder::new() |     let mock = mock_io::Builder::new() | ||||||
|         .handshake() |         .handshake() | ||||||
|         /* |  | ||||||
|         .write(&[ |         .write(&[ | ||||||
|             // POST / |             // POST / | ||||||
|             0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, |             0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, | ||||||
| @@ -28,9 +26,9 @@ fn recv_single_ping() { | |||||||
|             // DATA |             // DATA | ||||||
|             0, 0, 5, 0, 1, 0, 0, 0, 1, 119, 111, 114, 108, 100 |             0, 0, 5, 0, 1, 0, 0, 0, 1, 119, 111, 114, 108, 100 | ||||||
|         ]) |         ]) | ||||||
|         */ |  | ||||||
|         .build(); |         .build(); | ||||||
|  |  | ||||||
|  |         */ | ||||||
|     /* |     /* | ||||||
|     let h2 = client::handshake(mock) |     let h2 = client::handshake(mock) | ||||||
|         .wait().unwrap(); |         .wait().unwrap(); | ||||||
|   | |||||||
| @@ -62,12 +62,12 @@ fn single_stream_send_extra_large_body_multi_frames() { | |||||||
|             0, 64, 0, 0, 0, 0, 0, 0, 1, |             0, 64, 0, 0, 0, 0, 0, 0, 1, | ||||||
|         ]) |         ]) | ||||||
|         .write(&payload[0..16_384]) |         .write(&payload[0..16_384]) | ||||||
|         .write(frames::SETTINGS_ACK) |  | ||||||
|         .write(&[ |         .write(&[ | ||||||
|             // DATA |             // DATA | ||||||
|             0, 64, 0, 0, 1, 0, 0, 0, 1, |             0, 64, 0, 0, 1, 0, 0, 0, 1, | ||||||
|         ]) |         ]) | ||||||
|         .write(&payload[16_384..]) |         .write(&payload[16_384..]) | ||||||
|  |         .write(frames::SETTINGS_ACK) | ||||||
|         // Read response |         // Read response | ||||||
|         .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) |         .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) | ||||||
|         .build(); |         .build(); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user