aggregate WINDOW_UPDATE frames until change is over 50% available
This commit is contained in:
		
				
					committed by
					
						 Carl Lerche
						Carl Lerche
					
				
			
			
				
	
			
			
			
						parent
						
							b040ca1478
						
					
				
				
					commit
					452e49dc3e
				
			| @@ -1,6 +1,27 @@ | ||||
| use frame::Reason; | ||||
| use frame::Reason::*; | ||||
| use proto::*; | ||||
| use proto::{MAX_WINDOW_SIZE, WindowSize}; | ||||
|  | ||||
| // We don't want to send WINDOW_UPDATE frames for tiny changes, but instead | ||||
| // aggregate them when the changes are significant. Many implementations do | ||||
| // this by keeping a "ratio" of the update version the allowed window size. | ||||
| // | ||||
| // While some may wish to represent this ratio as percentage, using a f32, | ||||
| // we skip having to deal with float math and stick to integers. To do so, | ||||
| // the "ratio" is represented by 2 i32s, split into the numerator and | ||||
| // denominator. For example, a 50% ratio is simply represented as 1/2. | ||||
| // | ||||
| // An example applying this ratio: If a stream has an allowed window size of | ||||
| // 100 bytes, WINDOW_UPDATE frames are scheduled when the unclaimed change | ||||
| // becomes greater than 1/2, or 50 bytes. | ||||
| const UNCLAIMED_NUMERATOR: i32 = 1; | ||||
| const UNCLAIMED_DENOMINATOR: i32 = 2; | ||||
|  | ||||
| #[test] | ||||
| fn sanity_unclaimed_ratio() { | ||||
|     assert!(UNCLAIMED_NUMERATOR < UNCLAIMED_DENOMINATOR); | ||||
|     assert!(UNCLAIMED_NUMERATOR >= 0); | ||||
|     assert!(UNCLAIMED_DENOMINATOR > 0); | ||||
| } | ||||
|  | ||||
| #[derive(Copy, Clone, Debug)] | ||||
| pub struct FlowControl { | ||||
| @@ -51,17 +72,29 @@ impl FlowControl { | ||||
|         self.available += capacity; | ||||
|     } | ||||
|  | ||||
|     /// Returns the number of bytes available but not assigned to the window. | ||||
|     /// If a WINDOW_UPDATE frame should be sent, returns a positive number | ||||
|     /// representing the increment to be used. | ||||
|     /// | ||||
|     /// If there is no available bytes to be reclaimed, or the number of | ||||
|     /// available bytes does not reach the threshold, this returns `None`. | ||||
|     /// | ||||
|     /// This represents pending outbound WINDOW_UPDATE frames. | ||||
|     pub fn unclaimed_capacity(&self) -> WindowSize { | ||||
|     pub fn unclaimed_capacity(&self) -> Option<WindowSize> { | ||||
|         let available = self.available as i32; | ||||
|  | ||||
|         if self.window_size >= available { | ||||
|             return 0; | ||||
|             return None; | ||||
|         } | ||||
|  | ||||
|         (available - self.window_size) as WindowSize | ||||
|         let unclaimed = available - self.window_size; | ||||
|         let threshold = self.window_size / UNCLAIMED_DENOMINATOR | ||||
|             * UNCLAIMED_NUMERATOR; | ||||
|  | ||||
|         if unclaimed < threshold { | ||||
|             None | ||||
|         } else { | ||||
|             Some(unclaimed as WindowSize) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Increase the window size. | ||||
| @@ -71,11 +104,11 @@ impl FlowControl { | ||||
|         let (val, overflow) = self.window_size.overflowing_add(sz as i32); | ||||
|  | ||||
|         if overflow { | ||||
|             return Err(FlowControlError); | ||||
|             return Err(Reason::FlowControlError); | ||||
|         } | ||||
|  | ||||
|         if val > MAX_WINDOW_SIZE as i32 { | ||||
|             return Err(FlowControlError); | ||||
|             return Err(Reason::FlowControlError); | ||||
|         } | ||||
|  | ||||
|         trace!("inc_window; sz={}; old={}; new={}", sz, self.window_size, val); | ||||
|   | ||||
| @@ -214,6 +214,7 @@ impl<B, P> Recv<B, P> | ||||
|                             task: &mut Option<Task>) | ||||
|         -> Result<(), UserError> | ||||
|     { | ||||
|         trace!("release_capacity; size={}", capacity); | ||||
|         if capacity > stream.in_flight_recv_data { | ||||
|             // TODO: Handle error | ||||
|             unimplemented!(); | ||||
| @@ -226,11 +227,13 @@ impl<B, P> Recv<B, P> | ||||
|         self.flow.assign_capacity(capacity); | ||||
|         stream.recv_flow.assign_capacity(capacity); | ||||
|  | ||||
|         // Queue the stream for sending the WINDOW_UPDATE frame. | ||||
|         self.pending_window_updates.push(stream); | ||||
|         if stream.recv_flow.unclaimed_capacity().is_some() { | ||||
|             // Queue the stream for sending the WINDOW_UPDATE frame. | ||||
|             self.pending_window_updates.push(stream); | ||||
|  | ||||
|         if let Some(task) = task.take() { | ||||
|             task.notify(); | ||||
|             if let Some(task) = task.take() { | ||||
|                 task.notify(); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
| @@ -493,9 +496,7 @@ impl<B, P> Recv<B, P> | ||||
|         -> Poll<(), io::Error> | ||||
|         where T: AsyncWrite, | ||||
|     { | ||||
|         let incr = self.flow.unclaimed_capacity(); | ||||
|  | ||||
|         if incr > 0 { | ||||
|         if let Some(incr) = self.flow.unclaimed_capacity() { | ||||
|             let frame = frame::WindowUpdate::new(StreamId::zero(), incr); | ||||
|  | ||||
|             // Ensure the codec has capacity | ||||
| @@ -536,9 +537,7 @@ impl<B, P> Recv<B, P> | ||||
|             } | ||||
|  | ||||
|             // TODO: de-dup | ||||
|             let incr = stream.recv_flow.unclaimed_capacity(); | ||||
|  | ||||
|             if incr > 0 { | ||||
|             if let Some(incr) = stream.recv_flow.unclaimed_capacity() { | ||||
|                 // Create the WINDOW_UPDATE frame | ||||
|                 let frame = frame::WindowUpdate::new(stream.id, incr); | ||||
|  | ||||
|   | ||||
| @@ -478,7 +478,7 @@ impl<B, P> StreamRef<B, P> | ||||
|         me.actions.recv.poll_trailers(&mut stream) | ||||
|     } | ||||
|  | ||||
|     /// Releases recv capacity back to the peer. This will result in sending | ||||
|     /// Releases recv capacity back to the peer. This may result in sending | ||||
|     /// WINDOW_UPDATE frames on both the stream and connection. | ||||
|     pub fn release_capacity(&mut self, capacity: WindowSize) | ||||
|         -> Result<(), UserError> | ||||
|   | ||||
		Reference in New Issue
	
	Block a user