Streams receiving peer reset clear pending send (#238)
Because streams that were being peer reset were not clearing pending send frames / buffered_send_data, they were not being counted towards the concurrency limit.
This commit is contained in:
		
				
					committed by
					
						 Sean McArthur
						Sean McArthur
					
				
			
			
				
	
			
			
			
						parent
						
							267789da92
						
					
				
				
					commit
					f8baeb7211
				
			| @@ -555,6 +555,9 @@ impl Prioritize { | |||||||
|         while let Some(frame) = stream.pending_send.pop_front(buffer) { |         while let Some(frame) = stream.pending_send.pop_front(buffer) { | ||||||
|             trace!("dropping; frame={:?}", frame); |             trace!("dropping; frame={:?}", frame); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  |         stream.buffered_send_data = 0; | ||||||
|  |         stream.requested_send_capacity = 0; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn pop_frame<B>( |     fn pop_frame<B>( | ||||||
| @@ -574,6 +577,14 @@ impl Prioritize { | |||||||
|                 Some(mut stream) => { |                 Some(mut stream) => { | ||||||
|                     trace!("pop_frame; stream={:?}", stream.id); |                     trace!("pop_frame; stream={:?}", stream.id); | ||||||
|  |  | ||||||
|  |                     // If the stream receives a RESET from the peer, it may have | ||||||
|  |                     // had data buffered to be sent, but all the frames are cleared | ||||||
|  |                     // in clear_queue(). Instead of doing O(N) traversal through queue | ||||||
|  |                     // to remove, lets just ignore peer_reset streams here. | ||||||
|  |                     if stream.state.is_peer_reset() { | ||||||
|  |                         continue; | ||||||
|  |                     } | ||||||
|  |  | ||||||
|                     // It's possible that this stream, besides having data to send, |                     // It's possible that this stream, besides having data to send, | ||||||
|                     // is also queued to send a reset, and thus is already in the queue |                     // is also queued to send a reset, and thus is already in the queue | ||||||
|                     // to wait for "some time" after a reset. |                     // to wait for "some time" after a reset. | ||||||
|   | |||||||
| @@ -588,16 +588,12 @@ impl Recv { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Handle remote sending an explicit RST_STREAM. |     /// Handle remote sending an explicit RST_STREAM. | ||||||
|     pub fn recv_reset( |     pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { | ||||||
|         &mut self, |  | ||||||
|         frame: frame::Reset, |  | ||||||
|         stream: &mut Stream, |  | ||||||
|     ) -> Result<(), RecvError> { |  | ||||||
|         // Notify the stream |         // Notify the stream | ||||||
|         stream.state.recv_reset(frame.reason()); |         stream.state.recv_reset(frame.reason()); | ||||||
|  |  | ||||||
|         stream.notify_send(); |         stream.notify_send(); | ||||||
|         stream.notify_recv(); |         stream.notify_recv(); | ||||||
|         Ok(()) |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Handle a received error |     /// Handle a received error | ||||||
|   | |||||||
| @@ -283,6 +283,15 @@ impl Send { | |||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn recv_reset<B>( | ||||||
|  |         &mut self, | ||||||
|  |         buffer: &mut Buffer<Frame<B>>, | ||||||
|  |         stream: &mut store::Ptr | ||||||
|  |     ) { | ||||||
|  |         // Clear all pending outbound frames | ||||||
|  |         self.prioritize.clear_queue(buffer, stream); | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn recv_err<B>( |     pub fn recv_err<B>( | ||||||
|         &mut self, |         &mut self, | ||||||
|         buffer: &mut Buffer<Frame<B>>, |         buffer: &mut Buffer<Frame<B>>, | ||||||
|   | |||||||
| @@ -301,6 +301,13 @@ impl State { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn is_peer_reset(&self) -> bool { | ||||||
|  |         match self.inner { | ||||||
|  |             Closed(Cause::Proto(_)) => true, | ||||||
|  |             _ => false, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     /// Returns true if the stream is already reset. |     /// Returns true if the stream is already reset. | ||||||
|     pub fn is_reset(&self) -> bool { |     pub fn is_reset(&self) -> bool { | ||||||
|         match self.inner { |         match self.inner { | ||||||
|   | |||||||
| @@ -255,10 +255,14 @@ where | |||||||
|             }, |             }, | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|  |         let mut send_buffer = self.send_buffer.inner.lock().unwrap(); | ||||||
|  |         let send_buffer = &mut *send_buffer; | ||||||
|  |  | ||||||
|         let actions = &mut me.actions; |         let actions = &mut me.actions; | ||||||
|  |  | ||||||
|         me.counts.transition(stream, |_, stream| { |         me.counts.transition(stream, |_, stream| { | ||||||
|             actions.recv.recv_reset(frame, stream)?; |             actions.recv.recv_reset(frame, stream); | ||||||
|  |             actions.send.recv_reset(send_buffer, stream); | ||||||
|             assert!(stream.state.is_closed()); |             assert!(stream.state.is_closed()); | ||||||
|             Ok(()) |             Ok(()) | ||||||
|         }) |         }) | ||||||
|   | |||||||
| @@ -708,6 +708,69 @@ fn recv_too_big_headers() { | |||||||
|  |  | ||||||
| } | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn pending_send_request_gets_reset_by_peer_properly() { | ||||||
|  |     let _ = ::env_logger::try_init(); | ||||||
|  |     let (io, srv) = mock::new(); | ||||||
|  |  | ||||||
|  |     let payload = [0; (frame::DEFAULT_INITIAL_WINDOW_SIZE * 2) as usize]; | ||||||
|  |     let max_frame_size = frame::DEFAULT_MAX_FRAME_SIZE as usize; | ||||||
|  |  | ||||||
|  |     let srv = srv.assert_client_handshake() | ||||||
|  |         .unwrap() | ||||||
|  |         .recv_settings() | ||||||
|  |         .recv_frame( | ||||||
|  |             frames::headers(1) | ||||||
|  |                 .request("GET", "https://http2.akamai.com/"), | ||||||
|  |         ) | ||||||
|  |         // Note that we can only send up to ~4 frames of data by default | ||||||
|  |         .recv_frame(frames::data(1, &payload[0..max_frame_size])) | ||||||
|  |         .recv_frame(frames::data(1, &payload[max_frame_size..(max_frame_size*2)])) | ||||||
|  |         .recv_frame(frames::data(1, &payload[(max_frame_size*2)..(max_frame_size*3)])) | ||||||
|  |         .recv_frame(frames::data(1, &payload[(max_frame_size*3)..(max_frame_size*4-1)])) | ||||||
|  |  | ||||||
|  |         .idle_ms(100) | ||||||
|  |  | ||||||
|  |         .send_frame(frames::reset(1).refused()) | ||||||
|  |         // Because all active requests are finished, connection should shutdown | ||||||
|  |         // and send a GO_AWAY frame. If the reset stream is bugged (and doesn't | ||||||
|  |         // count towards concurrency limit), then connection will not send | ||||||
|  |         // a GO_AWAY and this test will fail. | ||||||
|  |         .recv_frame(frames::go_away(0)) | ||||||
|  |  | ||||||
|  |         .close(); | ||||||
|  |  | ||||||
|  |     let client = client::Builder::new() | ||||||
|  |         .handshake::<_, Bytes>(io) | ||||||
|  |         .expect("handshake") | ||||||
|  |         .and_then(|(mut client, conn)| { | ||||||
|  |             let request = Request::builder() | ||||||
|  |                 .uri("https://http2.akamai.com/") | ||||||
|  |                 .body(()) | ||||||
|  |                 .unwrap(); | ||||||
|  |  | ||||||
|  |             let (response, mut stream) = client | ||||||
|  |                 .send_request(request, false) | ||||||
|  |                 .expect("send_request"); | ||||||
|  |  | ||||||
|  |             let response = response.expect_err("response") | ||||||
|  |                 .map(|err| { | ||||||
|  |                     assert_eq!( | ||||||
|  |                         err.reason(), | ||||||
|  |                         Some(Reason::REFUSED_STREAM) | ||||||
|  |                     ); | ||||||
|  |                 }); | ||||||
|  |  | ||||||
|  |             // Send the data | ||||||
|  |             stream.send_data(payload[..].into(), true).unwrap(); | ||||||
|  |  | ||||||
|  |             conn.drive(response) | ||||||
|  |                 .and_then(|(conn, _)| conn.expect("client")) | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |     client.join(srv).wait().expect("wait"); | ||||||
|  | } | ||||||
|  |  | ||||||
| #[test] | #[test] | ||||||
| fn request_without_path() { | fn request_without_path() { | ||||||
|     let _ = ::env_logger::try_init(); |     let _ = ::env_logger::try_init(); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user