The Connection type is a `Future` that drives all of the IO of the client connection. The Client type is separate, and is used to send requests into the connection.
		
			
				
	
	
		
			424 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			424 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use super::*;
 | |
| 
 | |
| use slab;
 | |
| 
 | |
| use ordermap::{self, OrderMap};
 | |
| 
 | |
| use std::marker::PhantomData;
 | |
| use std::ops;
 | |
| 
 | |
| /// Storage for streams
 | |
| #[derive(Debug)]
 | |
| pub(super) struct Store<B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     slab: slab::Slab<(StoreId, Stream<B, P>)>,
 | |
|     ids: OrderMap<StreamId, (usize, StoreId)>,
 | |
|     counter: StoreId,
 | |
| }
 | |
| 
 | |
| /// "Pointer" to an entry in the store
 | |
| pub(super) struct Ptr<'a, B: 'a, P>
 | |
| where
 | |
|     P: Peer + 'a,
 | |
| {
 | |
|     key: Key,
 | |
|     store: &'a mut Store<B, P>,
 | |
| }
 | |
| 
 | |
| /// References an entry in the store.
 | |
| #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 | |
| pub(crate) struct Key {
 | |
|     index: usize,
 | |
|     store_id: StoreId,
 | |
| }
 | |
| 
 | |
| type StoreId = usize;
 | |
| 
 | |
| #[derive(Debug)]
 | |
| pub(super) struct Queue<B, N, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     indices: Option<store::Indices>,
 | |
|     _p: PhantomData<(B, N, P)>,
 | |
| }
 | |
| 
 | |
| pub(super) trait Next {
 | |
|     fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<Key>;
 | |
| 
 | |
|     fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<Key>);
 | |
| 
 | |
|     fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<Key>;
 | |
| 
 | |
|     fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool;
 | |
| 
 | |
|     fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool);
 | |
| }
 | |
| 
 | |
| /// A linked list
 | |
| #[derive(Debug, Clone, Copy)]
 | |
| struct Indices {
 | |
|     pub head: Key,
 | |
|     pub tail: Key,
 | |
| }
 | |
| 
 | |
| pub(super) enum Entry<'a, B: 'a, P: Peer + 'a> {
 | |
|     Occupied(OccupiedEntry<'a>),
 | |
|     Vacant(VacantEntry<'a, B, P>),
 | |
| }
 | |
| 
 | |
| pub(super) struct OccupiedEntry<'a> {
 | |
|     ids: ordermap::OccupiedEntry<'a, StreamId, (usize, StoreId)>,
 | |
| }
 | |
| 
 | |
| pub(super) struct VacantEntry<'a, B: 'a, P>
 | |
| where
 | |
|     P: Peer + 'a,
 | |
| {
 | |
|     ids: ordermap::VacantEntry<'a, StreamId, (usize, StoreId)>,
 | |
|     slab: &'a mut slab::Slab<(StoreId, Stream<B, P>)>,
 | |
|     counter: &'a mut usize,
 | |
| }
 | |
| 
 | |
| pub(super) trait Resolve<B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     fn resolve(&mut self, key: Key) -> Ptr<B, P>;
 | |
| }
 | |
| 
 | |
| // ===== impl Store =====
 | |
| 
 | |
| impl<B, P> Store<B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     pub fn new() -> Self {
 | |
|         Store {
 | |
|             slab: slab::Slab::new(),
 | |
|             ids: OrderMap::new(),
 | |
|             counter: 0,
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     pub fn contains_id(&self, id: &StreamId) -> bool {
 | |
|         self.ids.contains_key(id)
 | |
|     }
 | |
| 
 | |
|     pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<B, P>> {
 | |
|         let key = match self.ids.get(id) {
 | |
|             Some(key) => *key,
 | |
|             None => return None,
 | |
|         };
 | |
| 
 | |
|         Some(Ptr {
 | |
|             key: Key {
 | |
|                 index: key.0,
 | |
|                 store_id: key.1,
 | |
|             },
 | |
|             store: self,
 | |
|         })
 | |
|     }
 | |
| 
 | |
|     pub fn insert(&mut self, id: StreamId, val: Stream<B, P>) -> Ptr<B, P> {
 | |
|         let store_id = self.counter;
 | |
|         self.counter = self.counter.wrapping_add(1);
 | |
|         let key = self.slab.insert((store_id, val));
 | |
|         assert!(self.ids.insert(id, (key, store_id)).is_none());
 | |
| 
 | |
|         Ptr {
 | |
|             key: Key {
 | |
|                 index: key,
 | |
|                 store_id,
 | |
|             },
 | |
|             store: self,
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     pub fn find_entry(&mut self, id: StreamId) -> Entry<B, P> {
 | |
|         use self::ordermap::Entry::*;
 | |
| 
 | |
|         match self.ids.entry(id) {
 | |
|             Occupied(e) => Entry::Occupied(OccupiedEntry {
 | |
|                 ids: e,
 | |
|             }),
 | |
|             Vacant(e) => Entry::Vacant(VacantEntry {
 | |
|                 ids: e,
 | |
|                 slab: &mut self.slab,
 | |
|                 counter: &mut self.counter,
 | |
|             }),
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     pub fn for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
 | |
|     where
 | |
|         F: FnMut(Ptr<B, P>) -> Result<(), E>,
 | |
|     {
 | |
|         let mut len = self.ids.len();
 | |
|         let mut i = 0;
 | |
| 
 | |
|         while i < len {
 | |
|             // Get the key by index, this makes the borrow checker happy
 | |
|             let key = *self.ids.get_index(i).unwrap().1;
 | |
| 
 | |
|             f(Ptr {
 | |
|                 key: Key {
 | |
|                     index: key.0,
 | |
|                     store_id: key.1,
 | |
|                 },
 | |
|                 store: self,
 | |
|             })?;
 | |
| 
 | |
|             // TODO: This logic probably could be better...
 | |
|             let new_len = self.ids.len();
 | |
| 
 | |
|             if new_len < len {
 | |
|                 debug_assert!(new_len == len - 1);
 | |
|                 len -= 1;
 | |
|             } else {
 | |
|                 i += 1;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         Ok(())
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl<B, P> Resolve<B, P> for Store<B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     fn resolve(&mut self, key: Key) -> Ptr<B, P> {
 | |
|         Ptr {
 | |
|             key: key,
 | |
|             store: self,
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl<B, P> ops::Index<Key> for Store<B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     type Output = Stream<B, P>;
 | |
| 
 | |
|     fn index(&self, key: Key) -> &Self::Output {
 | |
|         let slot = self.slab.index(key.index);
 | |
|         assert_eq!(slot.0, key.store_id);
 | |
|         &slot.1
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl<B, P> ops::IndexMut<Key> for Store<B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     fn index_mut(&mut self, key: Key) -> &mut Self::Output {
 | |
|         let slot = self.slab.index_mut(key.index);
 | |
|         assert_eq!(slot.0, key.store_id);
 | |
|         &mut slot.1
 | |
|     }
 | |
| }
 | |
| 
 | |
| #[cfg(feature = "unstable")]
 | |
| impl<B, P> Store<B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     pub fn num_active_streams(&self) -> usize {
 | |
|         self.ids.len()
 | |
|     }
 | |
| 
 | |
|     pub fn num_wired_streams(&self) -> usize {
 | |
|         self.slab.len()
 | |
|     }
 | |
| }
 | |
| 
 | |
| // ===== impl Queue =====
 | |
| 
 | |
| impl<B, N, P> Queue<B, N, P>
 | |
| where
 | |
|     N: Next,
 | |
|     P: Peer,
 | |
| {
 | |
|     pub fn new() -> Self {
 | |
|         Queue {
 | |
|             indices: None,
 | |
|             _p: PhantomData,
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     pub fn is_empty(&self) -> bool {
 | |
|         self.indices.is_none()
 | |
|     }
 | |
| 
 | |
|     pub fn take(&mut self) -> Self {
 | |
|         Queue {
 | |
|             indices: self.indices.take(),
 | |
|             _p: PhantomData,
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /// Queue the stream.
 | |
|     ///
 | |
|     /// If the stream is already contained by the list, return `false`.
 | |
|     pub fn push(&mut self, stream: &mut store::Ptr<B, P>) -> bool {
 | |
|         trace!("Queue::push");
 | |
| 
 | |
|         if N::is_queued(stream) {
 | |
|             trace!(" -> already queued");
 | |
|             return false;
 | |
|         }
 | |
| 
 | |
|         N::set_queued(stream, true);
 | |
| 
 | |
|         // The next pointer shouldn't be set
 | |
|         debug_assert!(N::next(stream).is_none());
 | |
| 
 | |
|         // Queue the stream
 | |
|         match self.indices {
 | |
|             Some(ref mut idxs) => {
 | |
|                 trace!(" -> existing entries");
 | |
| 
 | |
|                 // Update the current tail node to point to `stream`
 | |
|                 let key = stream.key();
 | |
|                 N::set_next(&mut stream.resolve(idxs.tail), Some(key));
 | |
| 
 | |
|                 // Update the tail pointer
 | |
|                 idxs.tail = stream.key();
 | |
|             },
 | |
|             None => {
 | |
|                 trace!(" -> first entry");
 | |
|                 self.indices = Some(store::Indices {
 | |
|                     head: stream.key(),
 | |
|                     tail: stream.key(),
 | |
|                 });
 | |
|             },
 | |
|         }
 | |
| 
 | |
|         true
 | |
|     }
 | |
| 
 | |
|     pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a, B, P>>
 | |
|     where
 | |
|         R: Resolve<B, P>,
 | |
|     {
 | |
|         if let Some(mut idxs) = self.indices {
 | |
|             let mut stream = store.resolve(idxs.head);
 | |
| 
 | |
|             if idxs.head == idxs.tail {
 | |
|                 assert!(N::next(&*stream).is_none());
 | |
|                 self.indices = None;
 | |
|             } else {
 | |
|                 idxs.head = N::take_next(&mut *stream).unwrap();
 | |
|                 self.indices = Some(idxs);
 | |
|             }
 | |
| 
 | |
|             debug_assert!(N::is_queued(&*stream));
 | |
|             N::set_queued(&mut *stream, false);
 | |
| 
 | |
|             return Some(stream);
 | |
|         }
 | |
| 
 | |
|         None
 | |
|     }
 | |
| }
 | |
| 
 | |
| // ===== impl Ptr =====
 | |
| 
 | |
| impl<'a, B: 'a, P> Ptr<'a, B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     /// Returns the Key associated with the stream
 | |
|     pub fn key(&self) -> Key {
 | |
|         self.key
 | |
|     }
 | |
| 
 | |
|     /// Remove the stream from the store
 | |
|     pub fn remove(self) -> StreamId {
 | |
|         // The stream must have been unlinked before this point
 | |
|         debug_assert!(!self.store.ids.contains_key(&self.id));
 | |
| 
 | |
|         // Remove the stream state
 | |
|         self.store.slab.remove(self.key.index).1.id
 | |
|     }
 | |
| 
 | |
|     /// Remove the StreamId -> stream state association.
 | |
|     ///
 | |
|     /// This will effectively remove the stream as far as the H2 protocol is
 | |
|     /// concerned.
 | |
|     pub fn unlink(&mut self) {
 | |
|         let id = self.id;
 | |
|         self.store.ids.remove(&id);
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl<'a, B: 'a, P> Resolve<B, P> for Ptr<'a, B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     fn resolve(&mut self, key: Key) -> Ptr<B, P> {
 | |
|         Ptr {
 | |
|             key: key,
 | |
|             store: &mut *self.store,
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl<'a, B: 'a, P> ops::Deref for Ptr<'a, B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     type Target = Stream<B, P>;
 | |
| 
 | |
|     fn deref(&self) -> &Stream<B, P> {
 | |
|         &self.store.slab[self.key.index].1
 | |
|     }
 | |
| }
 | |
| 
 | |
| impl<'a, B: 'a, P> ops::DerefMut for Ptr<'a, B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     fn deref_mut(&mut self) -> &mut Stream<B, P> {
 | |
|         &mut self.store.slab[self.key.index].1
 | |
|     }
 | |
| }
 | |
| 
 | |
| // ===== impl OccupiedEntry =====
 | |
| 
 | |
| impl<'a> OccupiedEntry<'a> {
 | |
|     pub fn key(&self) -> Key {
 | |
|         let tup = self.ids.get();
 | |
|         Key {
 | |
|             index: tup.0,
 | |
|             store_id: tup.1,
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| // ===== impl VacantEntry =====
 | |
| 
 | |
| impl<'a, B, P> VacantEntry<'a, B, P>
 | |
| where
 | |
|     P: Peer,
 | |
| {
 | |
|     pub fn insert(self, value: Stream<B, P>) -> Key {
 | |
|         // Insert the value in the slab
 | |
|         let store_id = *self.counter;
 | |
|         *self.counter = store_id.wrapping_add(1);
 | |
|         let index = self.slab.insert((store_id, value));
 | |
| 
 | |
|         // Insert the handle in the ID map
 | |
|         self.ids.insert((index, store_id));
 | |
| 
 | |
|         Key {
 | |
|             index,
 | |
|             store_id,
 | |
|         }
 | |
|     }
 | |
| }
 |