From 341e15769ec8b10f481a3b3894ea78e92f4f3cd4 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 2 Aug 2017 12:55:41 -0700 Subject: [PATCH] Move to custom stream storage --- Cargo.toml | 2 +- src/lib.rs | 2 - src/proto/streams/mod.rs | 10 ++--- src/proto/streams/recv.rs | 4 +- src/proto/streams/send.rs | 4 +- src/proto/streams/store.rs | 81 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 90 insertions(+), 13 deletions(-) create mode 100644 src/proto/streams/store.rs diff --git a/Cargo.toml b/Cargo.toml index 7fc0c99..2d1199c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ http = { git = "https://github.com/carllerche/http" } byteorder = "1.0" log = "0.3.8" fnv = "1.0.5" -ordermap = "0.2.10" +slab = "0.4.0" string = { git = "https://github.com/carllerche/string" } [dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index 3badfb7..b4fdf4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,8 +18,6 @@ extern crate bytes; // Hash function used for HPACK encoding and tracking stream states. extern crate fnv; -extern crate ordermap; - extern crate byteorder; #[macro_use] diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index da53607..c6ddbfa 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -1,16 +1,16 @@ mod recv; mod send; +mod store; use self::recv::Recv; use self::send::Send; +use self::store::{Store, Entry}; use {frame, Peer, StreamId, ConnectionError}; use proto::*; use error::Reason::*; use error::User::*; -use ordermap::{OrderMap, Entry}; - // TODO: All the VecDeques should become linked lists using the state::Stream // values. #[derive(Debug)] @@ -19,11 +19,9 @@ pub struct Streams

{ inner: Inner

, /// Streams - streams: StreamMap, + streams: Store, } -type StreamMap = OrderMap; - /// Fields needed to manage state related to managing the set of streams. This /// is mostly split out to make ownership happy. /// @@ -59,7 +57,7 @@ impl Streams

{ recv: Recv::new(&config), send: Send::new(&config), }, - streams: OrderMap::default(), + streams: Store::new(), } } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 83a9aff..f9f97a0 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,6 +1,6 @@ use {frame, Peer, ConnectionError}; use proto::*; -use super::{Config, StreamMap}; +use super::{Config, Store}; use error::Reason::*; @@ -201,7 +201,7 @@ impl Recv

{ /// Send stream level window update pub fn send_stream_window_update(&mut self, - streams: &mut StreamMap, + streams: &mut Store, dst: &mut Codec) -> Poll<(), ConnectionError> where T: AsyncWrite, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 1084dc8..f7a752f 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -1,6 +1,6 @@ use {frame, Peer, ConnectionError}; use proto::*; -use super::{Config, StreamMap}; +use super::{Config, Store}; use error::User::*; @@ -125,7 +125,7 @@ impl Send

{ } /// Get pending window updates - pub fn poll_window_update(&mut self, streams: &mut StreamMap) + pub fn poll_window_update(&mut self, streams: &mut Store) -> Poll { // This biases connection window updates, which probably makes sense. diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs new file mode 100644 index 0000000..494fd8f --- /dev/null +++ b/src/proto/streams/store.rs @@ -0,0 +1,81 @@ +extern crate slab; + +use proto::*; + +use std::collections::{HashMap, hash_map}; + +/// Storage for streams +#[derive(Debug)] +pub struct Store { + slab: slab::Slab, + ids: HashMap, +} + +pub enum Entry<'a> { + Occupied(OccupiedEntry<'a>), + Vacant(VacantEntry<'a>), +} + +pub struct OccupiedEntry<'a> { + ids: hash_map::OccupiedEntry<'a, StreamId, usize>, + slab: &'a mut slab::Slab, +} + +pub struct VacantEntry<'a> { + ids: hash_map::VacantEntry<'a, StreamId, usize>, + slab: &'a mut slab::Slab, +} + +impl Store { + pub fn new() -> Self { + Store { + slab: slab::Slab::new(), + ids: HashMap::new(), + } + } + + pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut state::Stream> { + if let Some(handle) = self.ids.get(id) { + Some(&mut self.slab[*handle]) + } else { + None + } + } + + pub fn entry(&mut self, id: StreamId) -> Entry { + use self::hash_map::Entry::*; + + match self.ids.entry(id) { + Occupied(e) => { + Entry::Occupied(OccupiedEntry { + ids: e, + slab: &mut self.slab, + }) + } + Vacant(e) => { + Entry::Vacant(VacantEntry { + ids: e, + slab: &mut self.slab, + }) + } + } + } +} + +impl<'a> OccupiedEntry<'a> { + pub fn into_mut(self) -> &'a mut state::Stream { + &mut self.slab[*self.ids.get()] + } +} + +impl<'a> VacantEntry<'a> { + pub fn insert(self, value: state::Stream) -> &'a mut state::Stream { + // Insert the value in the slab + let handle = self.slab.insert(value); + + // Insert the handle in the ID map + self.ids.insert(handle); + + &mut self.slab[handle] + } +}