Send RST_STREAM of STREAM_CLOSED instead of GOAWAY if stream may have been forgotten
This commit is contained in:
		| @@ -516,28 +516,19 @@ impl Recv { | ||||
|             stream.recv_flow.window_size() | ||||
|         ); | ||||
|  | ||||
|         // Ensure that there is enough capacity on the connection before acting | ||||
|         // on the stream. | ||||
|         self.consume_connection_window(sz)?; | ||||
|  | ||||
|         if is_ignoring_frame { | ||||
|             trace!( | ||||
|                 "recv_data; frame ignored on locally reset {:?} for some time", | ||||
|                 stream.id, | ||||
|             ); | ||||
|             // we just checked for enough connection window capacity, and | ||||
|             // consumed it. Since we are ignoring this frame "for some time", | ||||
|             // we aren't returning the frame to the user. That means they | ||||
|             // have no way to release the capacity back to the connection. So | ||||
|             // we have to release it automatically. | ||||
|             // | ||||
|             // This call doesn't send a WINDOW_UPDATE immediately, just marks | ||||
|             // the capacity as available to be reclaimed. When the available | ||||
|             // capacity meets a threshold, a WINDOW_UPDATE is then sent. | ||||
|             self.release_connection_capacity(sz, &mut None); | ||||
|             return Ok(()); | ||||
|             return self.ignore_data(sz); | ||||
|         } | ||||
|  | ||||
|         // Ensure that there is enough capacity on the connection before acting | ||||
|         // on the stream. | ||||
|         self.consume_connection_window(sz)?; | ||||
|  | ||||
|         if stream.recv_flow.window_size() < sz { | ||||
|             // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE | ||||
|             // > A receiver MAY respond with a stream error (Section 5.4.2) or | ||||
| @@ -599,6 +590,22 @@ impl Recv { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn ignore_data(&mut self, sz: WindowSize) -> Result<(), RecvError> { | ||||
|         // Ensure that there is enough capacity on the connection... | ||||
|         self.consume_connection_window(sz)?; | ||||
|  | ||||
|         // Since we are ignoring this frame, | ||||
|         // we aren't returning the frame to the user. That means they | ||||
|         // have no way to release the capacity back to the connection. So | ||||
|         // we have to release it automatically. | ||||
|         // | ||||
|         // This call doesn't send a WINDOW_UPDATE immediately, just marks | ||||
|         // the capacity as available to be reclaimed. When the available | ||||
|         // capacity meets a threshold, a WINDOW_UPDATE is then sent. | ||||
|         self.release_connection_capacity(sz, &mut None); | ||||
|         return Ok(()); | ||||
|     } | ||||
|  | ||||
|     pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> { | ||||
|         if self.flow.window_size() < sz { | ||||
|             debug!( | ||||
| @@ -753,7 +760,7 @@ impl Recv { | ||||
|         self.max_stream_id | ||||
|     } | ||||
|  | ||||
|     fn next_stream_id(&self) -> Result<StreamId, RecvError> { | ||||
|     pub fn next_stream_id(&self) -> Result<StreamId, RecvError> { | ||||
|         if let Ok(id) = self.next_stream_id { | ||||
|             Ok(id) | ||||
|         } else { | ||||
| @@ -761,6 +768,19 @@ impl Recv { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn may_have_created_stream(&self, id: StreamId) -> bool { | ||||
|         if let Ok(next_id) = self.next_stream_id { | ||||
|             // Peer::is_local_init should have been called beforehand | ||||
|             debug_assert_eq!( | ||||
|                 id.is_server_initiated(), | ||||
|                 next_id.is_server_initiated(), | ||||
|             ); | ||||
|             id < next_id | ||||
|         } else { | ||||
|             true | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Returns true if the remote peer can reserve a stream with the given ID. | ||||
|     pub fn ensure_can_reserve(&self) | ||||
|         -> Result<(), RecvError> | ||||
|   | ||||
| @@ -109,12 +109,6 @@ impl Send { | ||||
|     } | ||||
|  | ||||
|     /// Send an explicit RST_STREAM frame | ||||
|     /// | ||||
|     /// # Arguments | ||||
|     /// + `reason`: the error code for the RST_STREAM frame | ||||
|     /// + `clear_queue`: if true, all pending outbound frames will be cleared, | ||||
|     ///    if false, the RST_STREAM frame will be appended to the end of the | ||||
|     ///    send queue. | ||||
|     pub fn send_reset<B>( | ||||
|         &mut self, | ||||
|         reason: Reason, | ||||
| @@ -452,4 +446,17 @@ impl Send { | ||||
|     pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> { | ||||
|         self.next_stream_id.map_err(|_| UserError::OverflowedStreamId) | ||||
|     } | ||||
|  | ||||
|     pub fn may_have_created_stream(&self, id: StreamId) -> bool { | ||||
|         if let Ok(next_id) = self.next_stream_id { | ||||
|             // Peer::is_local_init should have been called beforehand | ||||
|             debug_assert_eq!( | ||||
|                 id.is_server_initiated(), | ||||
|                 next_id.is_server_initiated(), | ||||
|             ); | ||||
|             id < next_id | ||||
|         } else { | ||||
|             true | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -140,17 +140,39 @@ where | ||||
|  | ||||
|         let key = match me.store.find_entry(id) { | ||||
|             Entry::Occupied(e) => e.key(), | ||||
|             Entry::Vacant(e) => match me.actions.recv.open(id, Open::Headers, &mut me.counts)? { | ||||
|                 Some(stream_id) => { | ||||
|                     let stream = Stream::new( | ||||
|                         stream_id, | ||||
|                         me.actions.send.init_window_sz(), | ||||
|                         me.actions.recv.init_window_sz(), | ||||
|                     ); | ||||
|             Entry::Vacant(e) => { | ||||
|                 // Client: it's possible to send a request, and then send | ||||
|                 // a RST_STREAM while the response HEADERS were in transit. | ||||
|                 // | ||||
|                 // Server: we can't reset a stream before having received | ||||
|                 // the request headers, so don't allow. | ||||
|                 if !P::is_server() { | ||||
|                     // This may be response headers for a stream we've already | ||||
|                     // forgotten about... | ||||
|                     if me.actions.may_have_forgotten_stream::<P>(id) { | ||||
|                         debug!( | ||||
|                             "recv_headers for old stream={:?}, sending STREAM_CLOSED", | ||||
|                             id, | ||||
|                         ); | ||||
|                         return Err(RecvError::Stream { | ||||
|                             id, | ||||
|                             reason: Reason::STREAM_CLOSED, | ||||
|                         }); | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                     e.insert(stream) | ||||
|                 }, | ||||
|                 None => return Ok(()), | ||||
|                 match me.actions.recv.open(id, Open::Headers, &mut me.counts)? { | ||||
|                     Some(stream_id) => { | ||||
|                         let stream = Stream::new( | ||||
|                             stream_id, | ||||
|                             me.actions.send.init_window_sz(), | ||||
|                             me.actions.recv.init_window_sz(), | ||||
|                         ); | ||||
|  | ||||
|                         e.insert(stream) | ||||
|                     }, | ||||
|                     None => return Ok(()), | ||||
|                 } | ||||
|             }, | ||||
|         }; | ||||
|  | ||||
| @@ -236,6 +258,25 @@ where | ||||
|                     return Ok(()); | ||||
|                 } | ||||
|  | ||||
|                 if me.actions.may_have_forgotten_stream::<P>(id) { | ||||
|                     debug!( | ||||
|                         "recv_data for old stream={:?}, sending STREAM_CLOSED", | ||||
|                         id, | ||||
|                     ); | ||||
|  | ||||
|                     let sz = frame.payload().len(); | ||||
|                     // This should have been enforced at the codec::FramedRead layer, so | ||||
|                     // this is just a sanity check. | ||||
|                     assert!(sz <= super::MAX_WINDOW_SIZE as usize); | ||||
|                     let sz = sz as WindowSize; | ||||
|  | ||||
|                     me.actions.recv.ignore_data(sz)?; | ||||
|                     return Err(RecvError::Stream { | ||||
|                         id, | ||||
|                         reason: Reason::STREAM_CLOSED, | ||||
|                     }); | ||||
|                 } | ||||
|  | ||||
|                 proto_err!(conn: "recv_data: stream not found; id={:?}", id); | ||||
|                 return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); | ||||
|             }, | ||||
| @@ -674,13 +715,10 @@ where | ||||
|  | ||||
|         let key = match me.store.find_entry(id) { | ||||
|             Entry::Occupied(e) => e.key(), | ||||
|             Entry::Vacant(e) => match me.actions.recv.open(id, Open::Headers, &mut me.counts) { | ||||
|                 Ok(Some(stream_id)) => { | ||||
|                     let stream = Stream::new(stream_id, 0, 0); | ||||
|             Entry::Vacant(e) => { | ||||
|                 let stream = Stream::new(id, 0, 0); | ||||
|  | ||||
|                     e.insert(stream) | ||||
|                 }, | ||||
|                 _ => return, | ||||
|                 e.insert(stream) | ||||
|             }, | ||||
|         }; | ||||
|  | ||||
| @@ -1250,6 +1288,26 @@ impl Actions { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Check if we possibly could have processed and since forgotten this stream. | ||||
|     /// | ||||
|     /// If we send a RST_STREAM for a stream, we will eventually "forget" about | ||||
|     /// the stream to free up memory. It's possible that the remote peer had | ||||
|     /// frames in-flight, and by the time we receive them, our own state is | ||||
|     /// gone. We *could* tear everything down by sending a GOAWAY, but it | ||||
|     /// is more likely to be latency/memory constraints that caused this, | ||||
|     /// and not a bad actor. So be less catastrophic, the spec allows | ||||
|     /// us to send another RST_STREAM of STREAM_CLOSED. | ||||
|     fn may_have_forgotten_stream<P: Peer>(&self, id: StreamId) -> bool { | ||||
|         if id.is_zero() { | ||||
|             return false; | ||||
|         } | ||||
|         if P::is_local_init(id) { | ||||
|             self.send.may_have_created_stream(id) | ||||
|         } else { | ||||
|             self.recv.may_have_created_stream(id) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn clear_queues(&mut self, | ||||
|                     clear_pending_accept: bool, | ||||
|                     store: &mut Store, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user