diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 1b7f2a2..2584dc2 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -11,9 +11,8 @@ use std::ops; /// Storage for streams #[derive(Debug)] pub(super) struct Store { - slab: slab::Slab<(StoreId, Stream)>, - ids: IndexMap, - counter: StoreId, + slab: slab::Slab, + ids: IndexMap, } /// "Pointer" to an entry in the store @@ -25,11 +24,14 @@ pub(super) struct Ptr<'a> { /// References an entry in the store. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct Key { - index: usize, - store_id: StoreId, + index: SlabIndex, + /// Keep the stream ID in the key as an ABA guard, since slab indices + /// could be re-used with a new stream. + stream_id: StreamId, } -type StoreId = usize; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +struct SlabIndex(usize); #[derive(Debug)] pub(super) struct Queue { @@ -62,13 +64,12 @@ pub(super) enum Entry<'a> { } pub(super) struct OccupiedEntry<'a> { - ids: indexmap::map::OccupiedEntry<'a, StreamId, (usize, StoreId)>, + ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>, } pub(super) struct VacantEntry<'a> { - ids: indexmap::map::VacantEntry<'a, StreamId, (usize, StoreId)>, - slab: &'a mut slab::Slab<(StoreId, Stream)>, - counter: &'a mut usize, + ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>, + slab: &'a mut slab::Slab, } pub(super) trait Resolve { @@ -82,35 +83,32 @@ impl Store { Store { slab: slab::Slab::new(), ids: IndexMap::new(), - counter: 0, } } pub fn find_mut(&mut self, id: &StreamId) -> Option { - let key = match self.ids.get(id) { + let index = match self.ids.get(id) { Some(key) => *key, None => return None, }; Some(Ptr { key: Key { - index: key.0, - store_id: key.1, + index, + stream_id: *id, }, store: self, }) } pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { - let store_id = self.counter; - self.counter = self.counter.wrapping_add(1); - let key = self.slab.insert((store_id, val)); - assert!(self.ids.insert(id, (key, store_id)).is_none()); + let index = SlabIndex(self.slab.insert(val)); + assert!(self.ids.insert(id, index).is_none()); Ptr { key: Key { - index: key, - store_id, + index, + stream_id: id, }, store: self, } @@ -126,7 +124,6 @@ impl Store { Vacant(e) => Entry::Vacant(VacantEntry { ids: e, slab: &mut self.slab, - counter: &mut self.counter, }), } } @@ -140,12 +137,15 @@ impl Store { while i < len { // Get the key by index, this makes the borrow checker happy - let key = *self.ids.get_index(i).unwrap().1; + let (stream_id, index) = { + let entry = self.ids.get_index(i).unwrap(); + (*entry.0, *entry.1) + }; f(Ptr { key: Key { - index: key.0, - store_id: key.1, + index, + stream_id, }, store: self, })?; @@ -178,17 +178,23 @@ impl ops::Index for Store { type Output = Stream; fn index(&self, key: Key) -> &Self::Output { - let slot = self.slab.index(key.index); - assert_eq!(slot.0, key.store_id); - &slot.1 + self.slab + .get(key.index.0) + .filter(|s| s.id == key.stream_id) + .unwrap_or_else(|| { + panic!("dangling store key for stream_id={:?}", key.stream_id); + }) } } impl ops::IndexMut for Store { fn index_mut(&mut self, key: Key) -> &mut Self::Output { - let slot = self.slab.index_mut(key.index); - assert_eq!(slot.0, key.store_id); - &mut slot.1 + self.slab + .get_mut(key.index.0) + .filter(|s| s.id == key.stream_id) + .unwrap_or_else(|| { + panic!("dangling store key for stream_id={:?}", key.stream_id); + }) } } @@ -329,10 +335,12 @@ impl<'a> Ptr<'a> { /// Remove the stream from the store pub fn remove(self) -> StreamId { // The stream must have been unlinked before this point - debug_assert!(!self.store.ids.contains_key(&self.id)); + debug_assert!(!self.store.ids.contains_key(&self.key.stream_id)); // Remove the stream state - self.store.slab.remove(self.key.index).1.id + let stream = self.store.slab.remove(self.key.index.0); + assert_eq!(stream.id, self.key.stream_id); + stream.id } /// Remove the StreamId -> stream state association. @@ -340,7 +348,7 @@ impl<'a> Ptr<'a> { /// This will effectively remove the stream as far as the H2 protocol is /// concerned. pub fn unlink(&mut self) { - let id = self.id; + let id = self.key.stream_id; self.store.ids.remove(&id); } } @@ -358,13 +366,13 @@ impl<'a> ops::Deref for Ptr<'a> { type Target = Stream; fn deref(&self) -> &Stream { - &self.store.slab[self.key.index].1 + &self.store[self.key] } } impl<'a> ops::DerefMut for Ptr<'a> { fn deref_mut(&mut self) -> &mut Stream { - &mut self.store.slab[self.key.index].1 + &mut self.store[self.key] } } @@ -378,10 +386,11 @@ impl<'a> fmt::Debug for Ptr<'a> { impl<'a> OccupiedEntry<'a> { pub fn key(&self) -> Key { - let tup = self.ids.get(); + let stream_id = *self.ids.key(); + let index = *self.ids.get(); Key { - index: tup.0, - store_id: tup.1, + index, + stream_id, } } } @@ -391,16 +400,15 @@ impl<'a> OccupiedEntry<'a> { impl<'a> VacantEntry<'a> { pub fn insert(self, value: Stream) -> Key { // Insert the value in the slab - let store_id = *self.counter; - *self.counter = store_id.wrapping_add(1); - let index = self.slab.insert((store_id, value)); + let stream_id = value.id; + let index = SlabIndex(self.slab.insert(value)); // Insert the handle in the ID map - self.ids.insert((index, store_id)); + self.ids.insert(index); Key { index, - store_id, + stream_id, } } }