Move to custom stream storage

This commit is contained in:
Carl Lerche
2017-08-02 12:55:41 -07:00
parent 19e562f9e0
commit 341e15769e
6 changed files with 90 additions and 13 deletions

View File

@@ -12,7 +12,7 @@ http = { git = "https://github.com/carllerche/http" }
byteorder = "1.0" byteorder = "1.0"
log = "0.3.8" log = "0.3.8"
fnv = "1.0.5" fnv = "1.0.5"
ordermap = "0.2.10" slab = "0.4.0"
string = { git = "https://github.com/carllerche/string" } string = { git = "https://github.com/carllerche/string" }
[dev-dependencies] [dev-dependencies]

View File

@@ -18,8 +18,6 @@ extern crate bytes;
// Hash function used for HPACK encoding and tracking stream states. // Hash function used for HPACK encoding and tracking stream states.
extern crate fnv; extern crate fnv;
extern crate ordermap;
extern crate byteorder; extern crate byteorder;
#[macro_use] #[macro_use]

View File

@@ -1,16 +1,16 @@
mod recv; mod recv;
mod send; mod send;
mod store;
use self::recv::Recv; use self::recv::Recv;
use self::send::Send; use self::send::Send;
use self::store::{Store, Entry};
use {frame, Peer, StreamId, ConnectionError}; use {frame, Peer, StreamId, ConnectionError};
use proto::*; use proto::*;
use error::Reason::*; use error::Reason::*;
use error::User::*; use error::User::*;
use ordermap::{OrderMap, Entry};
// TODO: All the VecDeques should become linked lists using the state::Stream // TODO: All the VecDeques should become linked lists using the state::Stream
// values. // values.
#[derive(Debug)] #[derive(Debug)]
@@ -19,11 +19,9 @@ pub struct Streams<P> {
inner: Inner<P>, inner: Inner<P>,
/// Streams /// Streams
streams: StreamMap, streams: Store,
} }
type StreamMap = OrderMap<StreamId, state::Stream>;
/// Fields needed to manage state related to managing the set of streams. This /// Fields needed to manage state related to managing the set of streams. This
/// is mostly split out to make ownership happy. /// is mostly split out to make ownership happy.
/// ///
@@ -59,7 +57,7 @@ impl<P: Peer> Streams<P> {
recv: Recv::new(&config), recv: Recv::new(&config),
send: Send::new(&config), send: Send::new(&config),
}, },
streams: OrderMap::default(), streams: Store::new(),
} }
} }

View File

@@ -1,6 +1,6 @@
use {frame, Peer, ConnectionError}; use {frame, Peer, ConnectionError};
use proto::*; use proto::*;
use super::{Config, StreamMap}; use super::{Config, Store};
use error::Reason::*; use error::Reason::*;
@@ -201,7 +201,7 @@ impl<P: Peer> Recv<P> {
/// Send stream level window update /// Send stream level window update
pub fn send_stream_window_update<T, B>(&mut self, pub fn send_stream_window_update<T, B>(&mut self,
streams: &mut StreamMap, streams: &mut Store,
dst: &mut Codec<T, B>) dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError> -> Poll<(), ConnectionError>
where T: AsyncWrite, where T: AsyncWrite,

View File

@@ -1,6 +1,6 @@
use {frame, Peer, ConnectionError}; use {frame, Peer, ConnectionError};
use proto::*; use proto::*;
use super::{Config, StreamMap}; use super::{Config, Store};
use error::User::*; use error::User::*;
@@ -125,7 +125,7 @@ impl<P: Peer> Send<P> {
} }
/// Get pending window updates /// Get pending window updates
pub fn poll_window_update(&mut self, streams: &mut StreamMap) pub fn poll_window_update(&mut self, streams: &mut Store)
-> Poll<WindowUpdate, ConnectionError> -> Poll<WindowUpdate, ConnectionError>
{ {
// This biases connection window updates, which probably makes sense. // This biases connection window updates, which probably makes sense.

View File

@@ -0,0 +1,81 @@
extern crate slab;
use proto::*;
use std::collections::{HashMap, hash_map};
/// Storage for streams
#[derive(Debug)]
pub struct Store {
slab: slab::Slab<state::Stream>,
ids: HashMap<StreamId, usize>,
}
pub enum Entry<'a> {
Occupied(OccupiedEntry<'a>),
Vacant(VacantEntry<'a>),
}
pub struct OccupiedEntry<'a> {
ids: hash_map::OccupiedEntry<'a, StreamId, usize>,
slab: &'a mut slab::Slab<state::Stream>,
}
pub struct VacantEntry<'a> {
ids: hash_map::VacantEntry<'a, StreamId, usize>,
slab: &'a mut slab::Slab<state::Stream>,
}
impl Store {
pub fn new() -> Self {
Store {
slab: slab::Slab::new(),
ids: HashMap::new(),
}
}
pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut state::Stream> {
if let Some(handle) = self.ids.get(id) {
Some(&mut self.slab[*handle])
} else {
None
}
}
pub fn entry(&mut self, id: StreamId) -> Entry {
use self::hash_map::Entry::*;
match self.ids.entry(id) {
Occupied(e) => {
Entry::Occupied(OccupiedEntry {
ids: e,
slab: &mut self.slab,
})
}
Vacant(e) => {
Entry::Vacant(VacantEntry {
ids: e,
slab: &mut self.slab,
})
}
}
}
}
impl<'a> OccupiedEntry<'a> {
pub fn into_mut(self) -> &'a mut state::Stream {
&mut self.slab[*self.ids.get()]
}
}
impl<'a> VacantEntry<'a> {
pub fn insert(self, value: state::Stream) -> &'a mut state::Stream {
// Insert the value in the slab
let handle = self.slab.insert(value);
// Insert the handle in the ID map
self.ids.insert(handle);
&mut self.slab[handle]
}
}