recv_reset resets closed streams with queued EOS frames (#247)
This commit is contained in:
		
				
					committed by
					
						 Carl Lerche
						Carl Lerche
					
				
			
			
				
	
			
			
			
						parent
						
							e61788a57f
						
					
				
				
					commit
					23090c9fed
				
			| @@ -575,7 +575,8 @@ impl Prioritize { | |||||||
|         loop { |         loop { | ||||||
|             match self.pending_send.pop(store) { |             match self.pending_send.pop(store) { | ||||||
|                 Some(mut stream) => { |                 Some(mut stream) => { | ||||||
|                     trace!("pop_frame; stream={:?}", stream.id); |                     trace!("pop_frame; stream={:?}; stream.state={:?}", | ||||||
|  |                         stream.id, stream.state); | ||||||
|  |  | ||||||
|                     // If the stream receives a RESET from the peer, it may have |                     // If the stream receives a RESET from the peer, it may have | ||||||
|                     // had data buffered to be sent, but all the frames are cleared |                     // had data buffered to be sent, but all the frames are cleared | ||||||
|   | |||||||
| @@ -590,7 +590,7 @@ impl Recv { | |||||||
|     /// Handle remote sending an explicit RST_STREAM. |     /// Handle remote sending an explicit RST_STREAM. | ||||||
|     pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { |     pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) { | ||||||
|         // Notify the stream |         // Notify the stream | ||||||
|         stream.state.recv_reset(frame.reason()); |         stream.state.recv_reset(frame.reason(), stream.is_pending_send); | ||||||
|  |  | ||||||
|         stream.notify_send(); |         stream.notify_send(); | ||||||
|         stream.notify_recv(); |         stream.notify_recv(); | ||||||
|   | |||||||
| @@ -214,8 +214,15 @@ impl State { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// The remote explicitly sent a RST_STREAM. |     /// The remote explicitly sent a RST_STREAM. | ||||||
|     pub fn recv_reset(&mut self, reason: Reason) { |     pub fn recv_reset(&mut self, reason: Reason, queued: bool) { | ||||||
|  |  | ||||||
|         match self.inner { |         match self.inner { | ||||||
|  |             Closed(Cause::EndStream) if queued => { | ||||||
|  |                 // If the stream has a queued EOS frame, transition to peer | ||||||
|  |                 // reset. | ||||||
|  |                 trace!("recv_reset: reason={:?}; queued=true", reason); | ||||||
|  |                 self.inner = Closed(Cause::Proto(reason)); | ||||||
|  |             }, | ||||||
|             Closed(..) => {}, |             Closed(..) => {}, | ||||||
|             _ => { |             _ => { | ||||||
|                 trace!("recv_reset; reason={:?}", reason); |                 trace!("recv_reset; reason={:?}", reason); | ||||||
|   | |||||||
| @@ -808,3 +808,65 @@ fn send_data_after_headers_eos() { | |||||||
| fn exceed_max_streams() { | fn exceed_max_streams() { | ||||||
| } | } | ||||||
| */ | */ | ||||||
|  |  | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn rst_while_closing() { | ||||||
|  |     // Test to reproduce panic in issue #246 --- receipt of a RST_STREAM frame | ||||||
|  |     // on a stream in the Half Closed (remote) state with a queued EOS causes | ||||||
|  |     // a panic. | ||||||
|  |     let _ = ::env_logger::try_init(); | ||||||
|  |     let (io, srv) = mock::new(); | ||||||
|  |  | ||||||
|  |     let srv = srv.assert_client_handshake() | ||||||
|  |         .unwrap() | ||||||
|  |         .recv_settings() | ||||||
|  |         .recv_frame( | ||||||
|  |             frames::headers(1) | ||||||
|  |                 .request("GET", "https://example.com/") | ||||||
|  |         ) | ||||||
|  |         .send_frame(frames::headers(1).response(200)) | ||||||
|  |         .send_frame(frames::headers(1).eos()) | ||||||
|  |         // Idling for a moment here is necessary to ensure that the client | ||||||
|  |         // enqueues its TRAILERS frame *before* we send the RST_STREAM frame | ||||||
|  |         // which causes the panic. | ||||||
|  |         .idle_ms(1) | ||||||
|  |         // Send the RST_STREAM frame which causes the client to panic. | ||||||
|  |         .send_frame(frames::reset(1).cancel()) | ||||||
|  |         .ping_pong([1; 8]) | ||||||
|  |         .close(); | ||||||
|  |         ; | ||||||
|  |  | ||||||
|  |     let client = client::handshake(io) | ||||||
|  |         .expect("handshake") | ||||||
|  |         .and_then(|(mut client, conn)| { | ||||||
|  |             let request = Request::builder() | ||||||
|  |                 .method(Method::GET) | ||||||
|  |                 .uri("https://example.com/") | ||||||
|  |                 .body(()) | ||||||
|  |                 .unwrap(); | ||||||
|  |  | ||||||
|  |             // The request should be left streaming. | ||||||
|  |             let (resp, mut stream) = client.send_request(request, false) | ||||||
|  |                 .expect("send_request"); | ||||||
|  |             let req = resp | ||||||
|  |                 // on receipt of an EOS response from the server, transition | ||||||
|  |                 // the stream Open => Half Closed (remote). | ||||||
|  |                 .expect("response") | ||||||
|  |                 .and_then(move |resp| { | ||||||
|  |                     assert_eq!(resp.status(), StatusCode::OK); | ||||||
|  |                     // Enqueue trailers frame. | ||||||
|  |                     let _ = stream.send_trailers(HeaderMap::new()); | ||||||
|  |                     Ok(()) | ||||||
|  |                 }) | ||||||
|  |                 .map_err(|()| -> Error { | ||||||
|  |                     unreachable!() | ||||||
|  |                 }); | ||||||
|  |  | ||||||
|  |             conn.drive(req) | ||||||
|  |                 .and_then(|(conn, _)| conn.expect("client")) | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |  | ||||||
|  |     client.join(srv).wait().expect("wait"); | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user