Refactor proto::streams::store indices

- Removes incrementing counter, instead just using the StreamId as a
  slab ABA guard.
- Adjusts Ptr::deref to use Store::index, as before it was skipping to
  check the ABA guard.
- Rename fields and types to clarify it actually is an ABA guard.
- Improve panic message in case a dangling Ptr is accessed.
This commit is contained in:
Sean McArthur
2019-05-31 15:43:11 -07:00
parent b8f1f0ccf1
commit cf5e53f0c3

View File

@@ -11,9 +11,8 @@ use std::ops;
/// Storage for streams
#[derive(Debug)]
pub(super) struct Store {
slab: slab::Slab<(StoreId, Stream)>,
ids: IndexMap<StreamId, (usize, StoreId)>,
counter: StoreId,
slab: slab::Slab<Stream>,
ids: IndexMap<StreamId, SlabIndex>,
}
/// "Pointer" to an entry in the store
@@ -25,11 +24,14 @@ pub(super) struct Ptr<'a> {
/// References an entry in the store.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct Key {
index: usize,
store_id: StoreId,
index: SlabIndex,
/// Keep the stream ID in the key as an ABA guard, since slab indices
/// could be re-used with a new stream.
stream_id: StreamId,
}
type StoreId = usize;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct SlabIndex(usize);
#[derive(Debug)]
pub(super) struct Queue<N> {
@@ -62,13 +64,12 @@ pub(super) enum Entry<'a> {
}
pub(super) struct OccupiedEntry<'a> {
ids: indexmap::map::OccupiedEntry<'a, StreamId, (usize, StoreId)>,
ids: indexmap::map::OccupiedEntry<'a, StreamId, SlabIndex>,
}
pub(super) struct VacantEntry<'a> {
ids: indexmap::map::VacantEntry<'a, StreamId, (usize, StoreId)>,
slab: &'a mut slab::Slab<(StoreId, Stream)>,
counter: &'a mut usize,
ids: indexmap::map::VacantEntry<'a, StreamId, SlabIndex>,
slab: &'a mut slab::Slab<Stream>,
}
pub(super) trait Resolve {
@@ -82,35 +83,32 @@ impl Store {
Store {
slab: slab::Slab::new(),
ids: IndexMap::new(),
counter: 0,
}
}
pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> {
let key = match self.ids.get(id) {
let index = match self.ids.get(id) {
Some(key) => *key,
None => return None,
};
Some(Ptr {
key: Key {
index: key.0,
store_id: key.1,
index,
stream_id: *id,
},
store: self,
})
}
pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr {
let store_id = self.counter;
self.counter = self.counter.wrapping_add(1);
let key = self.slab.insert((store_id, val));
assert!(self.ids.insert(id, (key, store_id)).is_none());
let index = SlabIndex(self.slab.insert(val));
assert!(self.ids.insert(id, index).is_none());
Ptr {
key: Key {
index: key,
store_id,
index,
stream_id: id,
},
store: self,
}
@@ -126,7 +124,6 @@ impl Store {
Vacant(e) => Entry::Vacant(VacantEntry {
ids: e,
slab: &mut self.slab,
counter: &mut self.counter,
}),
}
}
@@ -140,12 +137,15 @@ impl Store {
while i < len {
// Get the key by index, this makes the borrow checker happy
let key = *self.ids.get_index(i).unwrap().1;
let (stream_id, index) = {
let entry = self.ids.get_index(i).unwrap();
(*entry.0, *entry.1)
};
f(Ptr {
key: Key {
index: key.0,
store_id: key.1,
index,
stream_id,
},
store: self,
})?;
@@ -178,17 +178,23 @@ impl ops::Index<Key> for Store {
type Output = Stream;
fn index(&self, key: Key) -> &Self::Output {
let slot = self.slab.index(key.index);
assert_eq!(slot.0, key.store_id);
&slot.1
self.slab
.get(key.index.0)
.filter(|s| s.id == key.stream_id)
.unwrap_or_else(|| {
panic!("dangling store key for stream_id={:?}", key.stream_id);
})
}
}
impl ops::IndexMut<Key> for Store {
fn index_mut(&mut self, key: Key) -> &mut Self::Output {
let slot = self.slab.index_mut(key.index);
assert_eq!(slot.0, key.store_id);
&mut slot.1
self.slab
.get_mut(key.index.0)
.filter(|s| s.id == key.stream_id)
.unwrap_or_else(|| {
panic!("dangling store key for stream_id={:?}", key.stream_id);
})
}
}
@@ -329,10 +335,12 @@ impl<'a> Ptr<'a> {
/// Remove the stream from the store
pub fn remove(self) -> StreamId {
// The stream must have been unlinked before this point
debug_assert!(!self.store.ids.contains_key(&self.id));
debug_assert!(!self.store.ids.contains_key(&self.key.stream_id));
// Remove the stream state
self.store.slab.remove(self.key.index).1.id
let stream = self.store.slab.remove(self.key.index.0);
assert_eq!(stream.id, self.key.stream_id);
stream.id
}
/// Remove the StreamId -> stream state association.
@@ -340,7 +348,7 @@ impl<'a> Ptr<'a> {
/// This will effectively remove the stream as far as the H2 protocol is
/// concerned.
pub fn unlink(&mut self) {
let id = self.id;
let id = self.key.stream_id;
self.store.ids.remove(&id);
}
}
@@ -358,13 +366,13 @@ impl<'a> ops::Deref for Ptr<'a> {
type Target = Stream;
fn deref(&self) -> &Stream {
&self.store.slab[self.key.index].1
&self.store[self.key]
}
}
impl<'a> ops::DerefMut for Ptr<'a> {
fn deref_mut(&mut self) -> &mut Stream {
&mut self.store.slab[self.key.index].1
&mut self.store[self.key]
}
}
@@ -378,10 +386,11 @@ impl<'a> fmt::Debug for Ptr<'a> {
impl<'a> OccupiedEntry<'a> {
pub fn key(&self) -> Key {
let tup = self.ids.get();
let stream_id = *self.ids.key();
let index = *self.ids.get();
Key {
index: tup.0,
store_id: tup.1,
index,
stream_id,
}
}
}
@@ -391,16 +400,15 @@ impl<'a> OccupiedEntry<'a> {
impl<'a> VacantEntry<'a> {
pub fn insert(self, value: Stream) -> Key {
// Insert the value in the slab
let store_id = *self.counter;
*self.counter = store_id.wrapping_add(1);
let index = self.slab.insert((store_id, value));
let stream_id = value.id;
let index = SlabIndex(self.slab.insert(value));
// Insert the handle in the ID map
self.ids.insert((index, store_id));
self.ids.insert(index);
Key {
index,
store_id,
stream_id,
}
}
}