Fix some flow control bugs
When a stream is closed, connection level capacity should be released back to the connection and then assigned to other streams waiting for capacity.
This commit is contained in:
		| @@ -121,6 +121,7 @@ impl<B, P> Prioritize<B, P> | ||||
|  | ||||
|         if frame.is_end_stream() { | ||||
|             stream.state.send_close(); | ||||
|             self.reserve_capacity(0, stream); | ||||
|         } | ||||
|  | ||||
|         trace!("send_data (2); available={}; buffered={}", | ||||
| @@ -150,8 +151,21 @@ impl<B, P> Prioritize<B, P> | ||||
|         if capacity == stream.requested_send_capacity { | ||||
|             // Nothing to do | ||||
|         } else if capacity < stream.requested_send_capacity { | ||||
|             // TODO: release capacity | ||||
|             unimplemented!(); | ||||
|             // Update the target requested capacity | ||||
|             stream.requested_send_capacity = capacity; | ||||
|  | ||||
|             // Currently available capacity assigned to the stream | ||||
|             let available = stream.send_flow.available(); | ||||
|  | ||||
|             // If the stream has more assigned capacity than requested, reclaim | ||||
|             // some for the connection | ||||
|             if available > capacity { | ||||
|                 let diff = available - capacity; | ||||
|  | ||||
|                 stream.send_flow.claim_capacity(diff); | ||||
|  | ||||
|                 self.assign_connection_capacity(diff, stream); | ||||
|             } | ||||
|         } else { | ||||
|             // Update the target requested capacity | ||||
|             stream.requested_send_capacity = capacity; | ||||
| @@ -239,7 +253,8 @@ impl<B, P> Prioritize<B, P> | ||||
|         // If the stream has requested capacity, then it must be in the | ||||
|         // streaming state (more data could be sent) or there is buffered data | ||||
|         // waiting to be sent. | ||||
|         debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0); | ||||
|         debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0, | ||||
|                       "state={:?}", stream.state); | ||||
|  | ||||
|         // The amount of currently available capacity on the connection | ||||
|         let conn_available = self.flow.available(); | ||||
| @@ -287,7 +302,6 @@ impl<B, P> Prioritize<B, P> | ||||
|         } | ||||
|     } | ||||
|  | ||||
|  | ||||
|     pub fn poll_complete<T>(&mut self, | ||||
|                             store: &mut Store<B, P>, | ||||
|                             dst: &mut Codec<T, Prioritized<B>>) | ||||
|   | ||||
| @@ -86,8 +86,11 @@ where B: Buf, | ||||
|         -> Result<(), UserError> | ||||
|     { | ||||
|         trace!("send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz); | ||||
|  | ||||
|         let end_stream = frame.is_end_stream(); | ||||
|  | ||||
|         // Update the state | ||||
|         stream.state.send_open(frame.is_end_stream())?; | ||||
|         stream.state.send_open(end_stream)?; | ||||
|  | ||||
|         // Queue the frame for sending | ||||
|         self.prioritize.queue_frame(frame.into(), stream, task); | ||||
| @@ -156,6 +159,9 @@ where B: Buf, | ||||
|         trace!("send_trailers -- queuing; frame={:?}", frame); | ||||
|         self.prioritize.queue_frame(frame.into(), stream, task); | ||||
|  | ||||
|         // Release any excess capacity | ||||
|         self.prioritize.reserve_capacity(0, stream); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user