Fix bug in prioritization (#63)
The stream buffered data counter was never decremented.
This commit is contained in:
		| @@ -54,6 +54,7 @@ pub use self::window_update::WindowUpdate; | ||||
| // Re-export some constants | ||||
| pub use self::settings::{ | ||||
|     DEFAULT_SETTINGS_HEADER_TABLE_SIZE, | ||||
|     DEFAULT_INITIAL_WINDOW_SIZE, | ||||
|     DEFAULT_MAX_FRAME_SIZE, | ||||
|     MAX_INITIAL_WINDOW_SIZE, | ||||
|     MAX_MAX_FRAME_SIZE, | ||||
|   | ||||
| @@ -12,6 +12,7 @@ pub struct Ping { | ||||
| } | ||||
|  | ||||
| impl Ping { | ||||
|     #[cfg(feature = "unstable")] | ||||
|     pub fn new() -> Ping { | ||||
|         Ping { | ||||
|             ack: false, | ||||
|   | ||||
| @@ -36,6 +36,9 @@ const ALL: u8 = ACK; | ||||
| /// The default value of SETTINGS_HEADER_TABLE_SIZE | ||||
| pub const DEFAULT_SETTINGS_HEADER_TABLE_SIZE: usize = 4_096; | ||||
|  | ||||
| /// The default value of SETTINGS_INITIAL_WINDOW_SIZE | ||||
| pub const DEFAULT_INITIAL_WINDOW_SIZE: u32 = 65_535; | ||||
|  | ||||
| /// The default value of MAX_FRAME_SIZE | ||||
| pub const DEFAULT_MAX_FRAME_SIZE: FrameSize = 16_384; | ||||
|  | ||||
|   | ||||
| @@ -2,6 +2,7 @@ use {client, frame, server, proto}; | ||||
| use frame::Reason; | ||||
| use codec::{SendError, RecvError}; | ||||
|  | ||||
| use frame::DEFAULT_INITIAL_WINDOW_SIZE; | ||||
| use proto::*; | ||||
|  | ||||
| use http::Request; | ||||
|   | ||||
| @@ -30,6 +30,4 @@ pub type PingPayload = [u8; 8]; | ||||
| pub type WindowSize = u32; | ||||
|  | ||||
| // Constants | ||||
| // TODO: Move these into `frame` | ||||
| pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; | ||||
| pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; | ||||
|   | ||||
| @@ -281,6 +281,8 @@ impl<B, P> Prioritize<B, P> | ||||
|         // If data is buffered, then schedule the stream for execution | ||||
|         if stream.buffered_send_data > 0 { | ||||
|             debug_assert!(stream.send_flow.available() > 0); | ||||
|             debug_assert!(!stream.pending_send.is_empty()); | ||||
|  | ||||
|             self.pending_send.push(stream); | ||||
|         } | ||||
|     } | ||||
| @@ -382,6 +384,7 @@ impl<B, P> Prioritize<B, P> | ||||
|  | ||||
|         // If needed, schedule the sender | ||||
|         if stream.send_flow.available() > 0 { | ||||
|             debug_assert!(!stream.pending_send.is_empty()); | ||||
|             self.pending_send.push(stream); | ||||
|         } | ||||
|     } | ||||
| @@ -404,6 +407,7 @@ impl<B, P> Prioritize<B, P> | ||||
|             match self.pending_send.pop(store) { | ||||
|                 Some(mut stream) => { | ||||
|                     trace!("pop_frame; stream={:?}", stream.id); | ||||
|                     debug_assert!(!stream.pending_send.is_empty()); | ||||
|  | ||||
|                     let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { | ||||
|                         Frame::Data(mut frame) => { | ||||
| @@ -456,6 +460,10 @@ impl<B, P> Prioritize<B, P> | ||||
|                             trace!(" -- updating stream flow --"); | ||||
|                             stream.send_flow.send_data(len as WindowSize); | ||||
|  | ||||
|                             // Decrement the stream's buffered data counter | ||||
|                             debug_assert!(stream.buffered_send_data >= len as u32); | ||||
|                             stream.buffered_send_data -= len as u32; | ||||
|  | ||||
|                             // Assign the capacity back to the connection that | ||||
|                             // was just consumed from the stream in the previous | ||||
|                             // line. | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
| #[macro_use] | ||||
| extern crate h2_test_support; | ||||
| use h2_test_support::prelude::*; | ||||
|  | ||||
| @@ -168,3 +169,153 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { | ||||
|  | ||||
|     h2.wait().unwrap(); | ||||
| } | ||||
|  | ||||
| #[macro_use] | ||||
| extern crate futures; | ||||
|  | ||||
| use futures::{Poll, Async}; | ||||
| use std::fmt; | ||||
|  | ||||
| // TODO: These types should be extracted out | ||||
| struct WaitForCapacity { | ||||
|     stream: Option<client::Stream<Bytes>>, | ||||
|     target: usize, | ||||
| } | ||||
|  | ||||
| struct Drive<T> { | ||||
|     conn: Option<Client<mock::Mock, Bytes>>, | ||||
|     fut: T, | ||||
| } | ||||
|  | ||||
| impl<T> Future for Drive<T> | ||||
|     where T: Future, | ||||
|           T::Error: fmt::Debug, | ||||
| { | ||||
|     type Item = (Client<mock::Mock, Bytes>, T::Item); | ||||
|     type Error = (); | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||
|         match self.fut.poll() { | ||||
|             Ok(Async::Ready(v)) => return Ok((self.conn.take().unwrap(), v).into()), | ||||
|             Ok(_) => {} | ||||
|             Err(e) => panic!("unexpected error; {:?}", e), | ||||
|         } | ||||
|  | ||||
|         match self.conn.as_mut().unwrap().poll() { | ||||
|             Ok(Async::Ready(_)) => panic!(), | ||||
|             Ok(Async::NotReady) => {} | ||||
|             Err(e) => panic!("unexpected error; {:?}", e), | ||||
|         } | ||||
|  | ||||
|         Ok(Async::NotReady) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl WaitForCapacity { | ||||
|     fn stream(&mut self) -> &mut client::Stream<Bytes> { | ||||
|         self.stream.as_mut().unwrap() | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Future for WaitForCapacity { | ||||
|     type Item = client::Stream<Bytes>; | ||||
|     type Error = (); | ||||
|  | ||||
|     fn poll(&mut self) -> Poll<Self::Item, ()> { | ||||
|         let _ = try_ready!(self.stream().poll_capacity().map_err(|_| panic!())); | ||||
|  | ||||
|         let act = self.stream().capacity(); | ||||
|  | ||||
|         if act >= self.target { | ||||
|             return Ok(self.stream.take().unwrap().into()) | ||||
|         } | ||||
|  | ||||
|         Ok(Async::NotReady) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[test] | ||||
| fn send_data_receive_window_update() { | ||||
|     let _ = ::env_logger::init(); | ||||
|     let (m, mock) = mock::new(); | ||||
|  | ||||
|     let h2 = Client::handshake(m).unwrap() | ||||
|         .and_then(|mut h2| { | ||||
|             let request = Request::builder() | ||||
|                 .method(Method::POST) | ||||
|                 .uri("https://http2.akamai.com/") | ||||
|                 .body(()).unwrap(); | ||||
|  | ||||
|             // Send request | ||||
|             let mut stream = h2.request(request, false).unwrap(); | ||||
|  | ||||
|             // Send data frame | ||||
|             stream.send_data("hello".into(), false).unwrap(); | ||||
|  | ||||
|             stream.reserve_capacity(frame::DEFAULT_INITIAL_WINDOW_SIZE as usize); | ||||
|  | ||||
|             // Wait for capacity | ||||
|             let fut = WaitForCapacity { | ||||
|                 stream: Some(stream), | ||||
|                 target: frame::DEFAULT_INITIAL_WINDOW_SIZE as usize, | ||||
|             }; | ||||
|  | ||||
|             Drive { | ||||
|                 conn: Some(h2), | ||||
|                 fut: fut, | ||||
|             } | ||||
|         }) | ||||
|         .and_then(|(h2, mut stream)| { | ||||
|             let payload = vec![0; frame::DEFAULT_INITIAL_WINDOW_SIZE as usize]; | ||||
|             stream.send_data(payload.into(), true).unwrap(); | ||||
|  | ||||
|             h2.unwrap() | ||||
|         }); | ||||
|  | ||||
|     let mock = mock.assert_client_handshake().unwrap() | ||||
|         .and_then(|(_, mock)| { | ||||
|             mock.into_future().unwrap() | ||||
|         }) | ||||
|         .and_then(|(frame, mock)| { | ||||
|             let _ = assert_headers!(frame.unwrap()); | ||||
|             mock.into_future().unwrap() | ||||
|         }) | ||||
|         .and_then(|(frame, mut mock)| { | ||||
|             let data = assert_data!(frame.unwrap()); | ||||
|  | ||||
|             // Update the windows | ||||
|             let len = data.payload().len(); | ||||
|             let f = frame::WindowUpdate::new(StreamId::zero(), len as u32); | ||||
|             mock.send(f.into()).unwrap(); | ||||
|  | ||||
|             let f = frame::WindowUpdate::new(data.stream_id(), len as u32); | ||||
|             mock.send(f.into()).unwrap(); | ||||
|  | ||||
|             mock.into_future().unwrap() | ||||
|         }) | ||||
|         // TODO: Dedup the following lines | ||||
|         .and_then(|(frame, mock)| { | ||||
|             let data = assert_data!(frame.unwrap()); | ||||
|             assert_eq!(data.payload().len(), frame::DEFAULT_MAX_FRAME_SIZE as usize); | ||||
|             mock.into_future().unwrap() | ||||
|         }) | ||||
|         .and_then(|(frame, mock)| { | ||||
|             let data = assert_data!(frame.unwrap()); | ||||
|             assert_eq!(data.payload().len(), frame::DEFAULT_MAX_FRAME_SIZE as usize); | ||||
|             mock.into_future().unwrap() | ||||
|         }) | ||||
|         .and_then(|(frame, mock)| { | ||||
|             let data = assert_data!(frame.unwrap()); | ||||
|             assert_eq!(data.payload().len(), frame::DEFAULT_MAX_FRAME_SIZE as usize); | ||||
|             mock.into_future().unwrap() | ||||
|         }) | ||||
|         .and_then(|(frame, _)| { | ||||
|             let data = assert_data!(frame.unwrap()); | ||||
|             assert_eq!(data.payload().len(), (frame::DEFAULT_MAX_FRAME_SIZE-1) as usize); | ||||
|             Ok(()) | ||||
|         }) | ||||
|         ; | ||||
|  | ||||
|     let _ = h2.join(mock) | ||||
|         .wait().unwrap(); | ||||
| } | ||||
|   | ||||
| @@ -6,6 +6,26 @@ macro_rules! assert_closed { | ||||
|     }} | ||||
| } | ||||
|  | ||||
| #[macro_export] | ||||
| macro_rules! assert_headers { | ||||
|     ($frame:expr) => {{ | ||||
|         match $frame { | ||||
|             ::h2::frame::Frame::Headers(v) => v, | ||||
|             f => panic!("expected HEADERS; actual={:?}", f), | ||||
|         } | ||||
|     }} | ||||
| } | ||||
|  | ||||
| #[macro_export] | ||||
| macro_rules! assert_data { | ||||
|     ($frame:expr) => {{ | ||||
|         match $frame { | ||||
|             ::h2::frame::Frame::Data(v) => v, | ||||
|             f => panic!("expected DATA; actual={:?}", f), | ||||
|         } | ||||
|     }} | ||||
| } | ||||
|  | ||||
| #[macro_export] | ||||
| macro_rules! assert_ping { | ||||
|     ($frame:expr) => {{ | ||||
|   | ||||
| @@ -3,6 +3,7 @@ | ||||
| pub use super::h2; | ||||
|  | ||||
| pub use self::h2::*; | ||||
| pub use self::h2::frame::StreamId; | ||||
| pub use self::h2::client::{self, Client}; | ||||
| pub use self::h2::server::{self, Server}; | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user