diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index cf900cd..5955820 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -410,6 +410,8 @@ impl Recv { task, ); stream.in_flight_recv_data = 0; + + self.clear_recv_buffer(stream); } /// Set the "target" connection window size. @@ -703,6 +705,12 @@ impl Recv { stream.notify_recv(); } + pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) { + while let Some(_) = stream.pending_recv.pop_front(&mut self.buffer) { + // drop it + } + } + /// Get the max ID of streams we can receive. /// /// This gets lowered if we send a GOAWAY frame. diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index b0453bc..0c58ecb 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1044,6 +1044,17 @@ impl OpaqueStreamRef { .release_capacity(capacity, &mut stream, &mut me.actions.task) } + pub(crate) fn clear_recv_buffer(&mut self) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions + .recv + .clear_recv_buffer(&mut stream); + } + pub fn stream_id(&self) -> StreamId { self.inner.lock() .unwrap() diff --git a/src/share.rs b/src/share.rs index 95b38f7..ce4a0b1 100644 --- a/src/share.rs +++ b/src/share.rs @@ -448,6 +448,17 @@ impl fmt::Debug for RecvStream { } } +impl Drop for RecvStream { + fn drop(&mut self) { + // Eagerly clear any received DATA frames now, since its no longer + // possible to retrieve them. However, this will be called + // again once *all* stream refs have been dropped, since + // this won't send a RST_STREAM frame, in case the user wishes to + // still *send* DATA. + self.inner.inner.clear_recv_buffer(); + } +} + // ===== impl ReleaseCapacity ===== impl ReleaseCapacity {