diff --git a/src/frame/data.rs b/src/frame/data.rs index 7c06e41..3dc18e3 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -44,6 +44,14 @@ impl Data { self.flags.is_end_stream() } + pub fn set_end_stream(&mut self) { + self.flags.set_end_stream(); + } + + pub fn unset_end_stream(&mut self) { + self.flags.unset_end_stream(); + } + pub fn head(&self) -> Head { Head::new(Kind::Data, self.flags.into(), self.stream_id) } @@ -127,6 +135,10 @@ impl DataFlag { self.0 |= END_STREAM } + pub fn unset_end_stream(&mut self) { + self.0 &= !END_STREAM + } + pub fn is_padded(&self) -> bool { self.0 & PADDED == PADDED } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index dd4dafb..cac05dc 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -1,5 +1,9 @@ use super::*; +use bytes::buf::Take; + +use std::cmp; + #[derive(Debug)] pub(super) struct Prioritize { /// Streams that have pending frames @@ -25,7 +29,9 @@ pub(super) struct Prioritize { #[derive(Debug)] pub(crate) struct Prioritized { // The buffer - inner: B, + inner: Take, + + end_of_stream: bool, // The stream that this is associated with stream: store::Key, @@ -120,13 +126,21 @@ impl Prioritize // Reclaim any frame that has previously been written self.reclaim_frame(store, dst); + // The max frame length + let max_frame_len = dst.max_send_frame_size(); + trace!("poll_complete"); loop { - match self.pop_frame(store) { + match self.pop_frame(store, max_frame_len) { Some(frame) => { - trace!("writing frame={:?}", frame); + // Figure out the byte size this frame applies to flow + // control + let len = cmp::min(frame.flow_len(), max_frame_len); + // Subtract the data size - self.buffered_data -= frame.flow_len(); + self.buffered_data -= len; + + trace!("writing frame={:?}", frame); let res = dst.start_send(frame)?; @@ -157,7 +171,9 @@ impl Prioritize } } - fn pop_frame(&mut self, store: &mut Store) -> Option>> { + fn pop_frame(&mut self, store: &mut Store, max_len: usize) + -> Option>> + { loop { match self.pop_sender(store) { Some(mut stream) => { @@ -182,13 +198,24 @@ impl Prioritize push_sender(&mut self.pending_send, &mut stream); } - // Add prioritization logic - let frame = frame.map(|buf| { - Prioritized { - inner: buf, - stream: stream.key(), + let frame = match frame { + Frame::Data(mut frame) => { + let eos = frame.is_end_stream(); + + if frame.payload().remaining() > max_len { + frame.unset_end_stream(); + } + + Frame::Data(frame.map(|buf| { + Prioritized { + inner: buf.take(max_len), + end_of_stream: eos, + stream: stream.key(), + } + })) } - }); + frame => frame.map(|_| unreachable!()), + }; return Some(frame); } @@ -231,11 +258,18 @@ impl Prioritize if frame.payload().has_remaining() { let mut stream = store.resolve(frame.payload().stream); - let frame = frame.map(|prioritized| { + let mut eos = false; + + let mut frame = frame.map(|prioritized| { // TODO: Ensure fully written - prioritized.inner + eos = prioritized.end_of_stream; + prioritized.inner.into_inner() }); + if eos { + frame.set_end_stream(); + } + self.push_back_frame(frame.into(), &mut stream); return true; diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 7b3bdc3..3597bf4 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -219,7 +219,7 @@ impl State { self.inner = Closed(match *err { ConnectionError::Proto(reason) => Some(Cause::Proto(reason)), ConnectionError::Io(..) => Some(Cause::Io), - _ => panic!("cannot terminate stream with user error"), + ref e => panic!("cannot terminate stream with user error; err={:?}", e), }); } } diff --git a/tests/prioritization.rs b/tests/prioritization.rs index a131218..c57d344 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -1,13 +1,12 @@ pub mod support; use support::*; -use h2::Frame; - #[test] -#[ignore] fn single_stream_send_large_body() { let _ = ::env_logger::init(); + let payload = [0; 1024]; + let mock = mock_io::Builder::new() .handshake() .write(&[ @@ -17,54 +16,78 @@ fn single_stream_send_large_body() { ]) .write(&[ // DATA - 0, 0, 5, 0, 1, 0, 0, 0, 1, 104, 101, 108, 108, 111, + 0, 4, 0, 0, 1, 0, 0, 0, 1, ]) + .write(&payload[..]) .write(frames::SETTINGS_ACK) // Read response - .read(&[ - // HEADERS - 0, 0, 1, 1, 4, 0, 0, 0, 1, 136, - // DATA - 0, 0, 5, 0, 1, 0, 0, 0, 1, 119, 111, 114, 108, 100 - ]) + .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) .build(); - /* - let h2 = client::handshake(mock) + let mut h2 = Client::handshake(mock) .wait().unwrap(); - // Send the request - let mut request = request::Head::default(); - request.method = method::POST; - request.uri = "https://http2.akamai.com/".parse().unwrap(); - let h2 = h2.send_request(1.into(), request, false).wait().unwrap(); + let request = Request::builder() + .method(method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + let mut stream = h2.request(request, false).unwrap(); // Send the data - let b = [0; 300]; - let h2 = h2.send_data(1.into(), (&b[..]).into(), true).wait().unwrap(); + stream.send_data(payload[..].into(), true).unwrap(); - // Get the response headers - let (resp, h2) = h2.into_future().wait().unwrap(); + // Get the response + let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(resp.status(), status::NO_CONTENT); - match resp.unwrap() { - Frame::Headers { headers, .. } => { - assert_eq!(headers.status, status::OK); - } - _ => panic!("unexpected frame"), - } - - // Get the response body - let (data, h2) = h2.into_future().wait().unwrap(); - - match data.unwrap() { - Frame::Data { id, data, end_of_stream, .. } => { - assert_eq!(id, 1.into()); - assert_eq!(data, &b"world"[..]); - assert!(end_of_stream); - } - _ => panic!("unexpected frame"), - } - - assert!(Stream::wait(h2).next().is_none());; - */ + h2.wait().unwrap(); +} + +#[test] +fn single_stream_send_extra_large_body_multi_frames() { + let _ = ::env_logger::init(); + + let payload = vec![0; 32_768]; + + let mock = mock_io::Builder::new() + .handshake() + .write(&[ + // POST / + 0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, + 172, 75, 143, 168, 233, 25, 151, 33, 233, 132, + ]) + .write(&[ + // DATA + 0, 64, 0, 0, 0, 0, 0, 0, 1, + ]) + .write(&payload[0..16_384]) + .write(frames::SETTINGS_ACK) + .write(&[ + // DATA + 0, 64, 0, 0, 1, 0, 0, 0, 1, + ]) + .write(&payload[16_384..]) + // Read response + .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) + .build(); + + let mut h2 = Client::handshake(mock) + .wait().unwrap(); + + let request = Request::builder() + .method(method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + let mut stream = h2.request(request, false).unwrap(); + + // Send the data + stream.send_data(payload.into(), true).unwrap(); + + // Get the response + let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(resp.status(), status::NO_CONTENT); + + h2.wait().unwrap(); }