From dd8412d66096ac78b704e594752e546ef733d74b Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 3 Aug 2017 15:50:13 -0700 Subject: [PATCH] Much work --- src/client.rs | 4 +- src/lib.rs | 2 + src/proto/connection.rs | 4 +- src/proto/mod.rs | 2 +- src/proto/streams/buffer.rs | 60 ++++++++++++++++++++++++++ src/proto/streams/mod.rs | 5 ++- src/proto/streams/recv.rs | 46 ++++++++++---------- src/proto/streams/send.rs | 46 ++++++++++++-------- src/proto/streams/store.rs | 38 ++++++++--------- src/proto/streams/stream.rs | 27 ++++++++++++ src/proto/streams/streams.rs | 81 ++++++++++++++++++++---------------- 11 files changed, 211 insertions(+), 104 deletions(-) create mode 100644 src/proto/streams/buffer.rs diff --git a/src/client.rs b/src/client.rs index b200034..94b043a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -25,8 +25,7 @@ pub struct Client { /// Client half of an active HTTP/2.0 stream. pub struct Stream { - inner: proto::Stream, - _p: ::std::marker::PhantomData, + inner: proto::StreamRef, } impl Client @@ -86,7 +85,6 @@ impl Client self.connection.send_request(request, end_of_stream) .map(|stream| Stream { inner: stream, - _p: ::std::marker::PhantomData, }) } } diff --git a/src/lib.rs b/src/lib.rs index 693a6db..08ac626 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,8 @@ extern crate fnv; extern crate byteorder; +extern crate slab; + #[macro_use] extern crate log; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 24fe62f..3e3d724 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -19,7 +19,7 @@ pub struct Connection { // TODO: Remove ping_pong: PingPong, settings: Settings, - streams: Streams

, + streams: Streams, _phantom: PhantomData

, } @@ -255,7 +255,7 @@ impl Connection { /// Initialize a new HTTP/2.0 stream and send the message. pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) - -> Result, ConnectionError> + -> Result, ConnectionError> { self.streams.send_request(request, end_of_stream) } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 7129402..aaca6db 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -6,7 +6,7 @@ mod settings; mod streams; pub use self::connection::Connection; -pub use self::streams::{Streams, Stream}; +pub use self::streams::{Streams, StreamRef}; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs new file mode 100644 index 0000000..a8613c6 --- /dev/null +++ b/src/proto/streams/buffer.rs @@ -0,0 +1,60 @@ +use frame::{self, Frame}; + +use slab::Slab; + +use std::marker::PhantomData; + +/// Buffers frames for multiple streams. +#[derive(Debug)] +pub struct Buffer { + slab: Slab>, +} + +/// A sequence of frames in a `Buffer` +#[derive(Debug)] +pub struct Deque { + indices: Option, + _p: PhantomData, +} + +/// Tracks the head & tail for a sequence of frames in a `Buffer`. +#[derive(Debug, Default)] +struct Indices { + head: usize, + tail: usize, +} + +#[derive(Debug)] +struct Slot { + frame: Frame, + next: usize, +} + +impl Buffer { + pub fn new() -> Self { + Buffer { + slab: Slab::new(), + } + } +} + +impl Deque { + pub fn new() -> Self { + Deque { + indices: None, + _p: PhantomData, + } + } + + pub fn is_empty(&self) -> bool { + self.indices.is_none() + } + + pub fn push_back(&mut self, buf: &mut Buffer, val: Frame) { + unimplemented!(); + } + + pub fn pop_front(&mut self, buf: &mut Buffer) -> Option> { + unimplemented!(); + } +} diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 7165bd7..a36602f 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -1,3 +1,4 @@ +mod buffer; mod flow_control; mod recv; mod send; @@ -6,13 +7,15 @@ mod store; mod stream; mod streams; -pub use self::streams::{Streams, Stream}; +pub use self::streams::{Streams, StreamRef}; +use self::buffer::Buffer; use self::flow_control::FlowControl; use self::recv::Recv; use self::send::Send; use self::state::State; use self::store::{Store, Entry}; +use self::stream::Stream; use {frame, StreamId, ConnectionError}; use proto::*; diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index ce12f02..c222afe 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -8,7 +8,7 @@ use std::collections::VecDeque; use std::marker::PhantomData; #[derive(Debug)] -pub struct Recv

{ +pub(super) struct Recv { /// Maximum number of remote initiated streams max_streams: Option, @@ -26,10 +26,13 @@ pub struct Recv

{ /// Refused StreamId, this represents a frame that must be sent out. refused: Option, - _p: PhantomData

, + _p: PhantomData<(P, B)>, } -impl Recv

{ +impl Recv + where P: Peer, + B: Buf, +{ pub fn new(config: &Config) -> Self { Recv { max_streams: config.max_remote_initiated, @@ -45,7 +48,7 @@ impl Recv

{ /// Update state reflecting a new, remotely opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self, id: StreamId) -> Result, ConnectionError> { + pub fn open(&mut self, id: StreamId) -> Result>, ConnectionError> { assert!(self.refused.is_none()); try!(self.ensure_can_open(id)); @@ -60,25 +63,25 @@ impl Recv

{ // Increment the number of remote initiated streams self.num_streams += 1; - Ok(Some(State::default())) + Ok(Some(Stream::new())) } /// Transition the stream state based on receiving headers - pub fn recv_headers(&mut self, state: &mut State, eos: bool) + pub fn recv_headers(&mut self, stream: &mut Stream, eos: bool) -> Result<(), ConnectionError> { - state.recv_open(self.init_window_sz, eos) + stream.state.recv_open(self.init_window_sz, eos) } - pub fn recv_eos(&mut self, state: &mut State) + pub fn recv_eos(&mut self, stream: &mut Stream) -> Result<(), ConnectionError> { - state.recv_close() + stream.state.recv_close() } pub fn recv_data(&mut self, frame: &frame::Data, - state: &mut State) + stream: &mut Stream) -> Result<(), ConnectionError> { let sz = frame.payload().len(); @@ -89,7 +92,7 @@ impl Recv

{ let sz = sz as WindowSize; - match state.recv_flow_control() { + match stream.recv_flow_control() { Some(flow) => { // Ensure there's enough capacity on the connection before // acting on the stream. @@ -106,7 +109,7 @@ impl Recv

{ } if frame.is_end_stream() { - try!(state.recv_close()); + try!(stream.state.recv_close()); } Ok(()) @@ -133,10 +136,9 @@ impl Recv

{ } /// Send any pending refusals. - pub fn send_pending_refusal(&mut self, dst: &mut Codec) + pub fn send_pending_refusal(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> where T: AsyncWrite, - B: Buf, { if let Some(stream_id) = self.refused.take() { let frame = frame::Reset::new(stream_id, RefusedStream); @@ -168,11 +170,11 @@ impl Recv

{ pub fn expand_stream_window(&mut self, id: StreamId, sz: WindowSize, - state: &mut State) + stream: &mut Stream) -> Result<(), ConnectionError> { // TODO: handle overflow - if let Some(flow) = state.recv_flow_control() { + if let Some(flow) = stream.recv_flow_control() { flow.expand_window(sz); self.pending_window_updates.push_back(id); } @@ -181,10 +183,9 @@ impl Recv

{ } /// Send connection level window update - pub fn send_connection_window_update(&mut self, dst: &mut Codec) + pub fn send_connection_window_update(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> where T: AsyncWrite, - B: Buf, { if let Some(incr) = self.flow_control.peek_window_update() { let frame = frame::WindowUpdate::new(StreamId::zero(), incr); @@ -200,16 +201,15 @@ impl Recv

{ } /// Send stream level window update - pub fn send_stream_window_update(&mut self, - streams: &mut Store, - dst: &mut Codec) + pub fn send_stream_window_update(&mut self, + streams: &mut Store, + dst: &mut Codec) -> Poll<(), ConnectionError> where T: AsyncWrite, - B: Buf, { while let Some(id) = self.pending_window_updates.pop_front() { let flow = streams.get_mut(&id) - .and_then(|state| state.recv_flow_control()); + .and_then(|stream| stream.recv_flow_control()); if let Some(flow) = flow { diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index a233b83..bbc363c 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -10,7 +10,7 @@ use std::collections::VecDeque; use std::marker::PhantomData; #[derive(Debug)] -pub struct Send

{ +pub(super) struct Send { /// Maximum number of locally initiated streams max_streams: Option, @@ -30,6 +30,9 @@ pub struct Send

{ // XXX It would be cool if this didn't exist. pending_window_updates: VecDeque, + /// Holds frames that are waiting to be written to the socket + buffer: Buffer, + /// When `poll_window_update` is not ready, then the calling task is saved to /// be notified later. Access to poll_window_update must not be shared across tasks, /// as we only track a single task (and *not* i.e. a task per stream id). @@ -38,7 +41,10 @@ pub struct Send

{ _p: PhantomData

, } -impl Send

{ +impl Send + where P: Peer, + B: Buf, +{ pub fn new(config: &Config) -> Self { let next_stream_id = if P::is_server() { 2 @@ -53,6 +59,7 @@ impl Send

{ init_window_sz: config.init_local_window_sz, flow_control: FlowControl::new(config.init_local_window_sz), pending_window_updates: VecDeque::new(), + buffer: Buffer::new(), blocked: None, _p: PhantomData, } @@ -61,7 +68,7 @@ impl Send

{ /// Update state reflecting a new, locally opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self) -> Result<(StreamId, State), ConnectionError> { + pub fn open(&mut self) -> Result<(StreamId, Stream), ConnectionError> { try!(self.ensure_can_open()); if let Some(max) = self.max_streams { @@ -70,7 +77,7 @@ impl Send

{ } } - let ret = (self.next_stream_id, State::default()); + let ret = (self.next_stream_id, Stream::new()); // Increment the number of locally initiated streams self.num_streams += 1; @@ -79,21 +86,24 @@ impl Send

{ Ok(ret) } - pub fn send_headers(&mut self, state: &mut State, eos: bool) + pub fn send_headers(&mut self, stream: &mut Stream, frame: frame::Headers) -> Result<(), ConnectionError> { - state.send_open(self.init_window_sz, eos) + // Update the state + stream.state.send_open(self.init_window_sz, frame.is_end_stream())?; + // stream.send_buf.headers = Some(frame); + Ok(()) } - pub fn send_eos(&mut self, state: &mut State) + pub fn send_eos(&mut self, stream: &mut Stream) -> Result<(), ConnectionError> { - state.send_close() + stream.state.send_close() } - pub fn send_data(&mut self, - frame: &frame::Data, - state: &mut State) + pub fn send_data(&mut self, + frame: &frame::Data, + stream: &mut Stream) -> Result<(), ConnectionError> { let sz = frame.payload().remaining(); @@ -107,7 +117,7 @@ impl Send

{ // Make borrow checker happy loop { - match state.send_flow_control() { + match stream.send_flow_control() { Some(flow) => { try!(self.flow_control.ensure_window(sz, FlowControlViolation)); @@ -123,7 +133,7 @@ impl Send

{ None => {} } - if state.is_closed() { + if stream.state.is_closed() { return Err(InactiveStreamId.into()) } else { return Err(UnexpectedFrameType.into()) @@ -131,14 +141,14 @@ impl Send

{ } if frame.is_end_stream() { - try!(state.send_close()); + try!(stream.state.send_close()); } Ok(()) } /// Get pending window updates - pub fn poll_window_update(&mut self, streams: &mut Store) + pub fn poll_window_update(&mut self, streams: &mut Store) -> Poll { // This biases connection window updates, which probably makes sense. @@ -152,7 +162,7 @@ impl Send

{ let update = self.pending_window_updates.pop_front() .and_then(|id| { streams.get_mut(&id) - .and_then(|state| state.send_flow_control()) + .and_then(|stream| stream.send_flow_control()) .and_then(|flow| flow.apply_window_update()) .map(|incr| WindowUpdate::new(id, incr)) }); @@ -184,10 +194,10 @@ impl Send

{ pub fn recv_stream_window_update(&mut self, frame: frame::WindowUpdate, - state: &mut State) + stream: &mut Stream) -> Result<(), ConnectionError> { - if let Some(flow) = state.send_flow_control() { + if let Some(flow) = stream.send_flow_control() { // TODO: Handle invalid increment flow.expand_window(frame.size_increment()); } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 3e54530..4c2dbf7 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -1,32 +1,32 @@ -extern crate slab; - use super::*; +use slab; + use std::collections::{HashMap, hash_map}; /// Storage for streams #[derive(Debug)] -pub struct Store { - slab: slab::Slab, +pub(super) struct Store { + slab: slab::Slab>, ids: HashMap, } -pub enum Entry<'a> { - Occupied(OccupiedEntry<'a>), - Vacant(VacantEntry<'a>), +pub(super) enum Entry<'a, B: 'a> { + Occupied(OccupiedEntry<'a, B>), + Vacant(VacantEntry<'a, B>), } -pub struct OccupiedEntry<'a> { +pub(super) struct OccupiedEntry<'a, B: 'a> { ids: hash_map::OccupiedEntry<'a, StreamId, usize>, - slab: &'a mut slab::Slab, + slab: &'a mut slab::Slab>, } -pub struct VacantEntry<'a> { +pub(super) struct VacantEntry<'a, B: 'a> { ids: hash_map::VacantEntry<'a, StreamId, usize>, - slab: &'a mut slab::Slab, + slab: &'a mut slab::Slab>, } -impl Store { +impl Store { pub fn new() -> Self { Store { slab: slab::Slab::new(), @@ -34,7 +34,7 @@ impl Store { } } - pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut State> { + pub fn get_mut(&mut self, id: &StreamId) -> Option<&mut Stream> { if let Some(handle) = self.ids.get(id) { Some(&mut self.slab[*handle]) } else { @@ -42,12 +42,12 @@ impl Store { } } - pub fn insert(&mut self, id: StreamId, val: State) { + pub fn insert(&mut self, id: StreamId, val: Stream) { let handle = self.slab.insert(val); assert!(self.ids.insert(id, handle).is_none()); } - pub fn entry(&mut self, id: StreamId) -> Entry { + pub fn entry(&mut self, id: StreamId) -> Entry { use self::hash_map::Entry::*; match self.ids.entry(id) { @@ -67,14 +67,14 @@ impl Store { } } -impl<'a> OccupiedEntry<'a> { - pub fn into_mut(self) -> &'a mut State { +impl<'a, B> OccupiedEntry<'a, B> { + pub fn into_mut(self) -> &'a mut Stream { &mut self.slab[*self.ids.get()] } } -impl<'a> VacantEntry<'a> { - pub fn insert(self, value: State) -> &'a mut State { +impl<'a, B> VacantEntry<'a, B> { + pub fn insert(self, value: Stream) -> &'a mut Stream { // Insert the value in the slab let handle = self.slab.insert(value); diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index e69de29..2b3a67b 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -0,0 +1,27 @@ +use super::*; + +#[derive(Debug)] +pub(super) struct Stream { + /// Current state of the stream + pub state: State, + + /// Frames pending for this stream being sent to the socket + pub pending_send: buffer::Deque, +} + +impl Stream { + pub fn new() -> Stream { + Stream { + state: State::default(), + pending_send: buffer::Deque::new(), + } + } + + pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> { + self.state.send_flow_control() + } + + pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { + self.state.recv_flow_control() + } +} diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 2022a3c..d385956 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -7,13 +7,14 @@ use std::sync::{Arc, Mutex}; // TODO: All the VecDeques should become linked lists using the State // values. #[derive(Debug)] -pub struct Streams

{ - inner: Arc>>, +pub struct Streams { + inner: Arc>>, } +/// Reference to the stream state #[derive(Debug)] -pub struct Stream

{ - inner: Arc>>, +pub struct StreamRef { + inner: Arc>>, id: StreamId, } @@ -22,21 +23,24 @@ pub struct Stream

{ /// /// TODO: better name #[derive(Debug)] -struct Inner

{ - actions: Actions

, - store: Store, +struct Inner { + actions: Actions, + store: Store, } #[derive(Debug)] -struct Actions

{ +struct Actions { /// Manages state transitions initiated by receiving frames - recv: Recv

, + recv: Recv, /// Manages state transitions initiated by sending frames - send: Send

, + send: Send, } -impl Streams

{ +impl Streams + where P: Peer, + B: Buf, +{ pub fn new(config: Config) -> Self { Streams { inner: Arc::new(Mutex::new(Inner { @@ -56,7 +60,7 @@ impl Streams

{ let mut me = self.inner.lock().unwrap(); let me = &mut *me; - let state = match me.store.entry(id) { + let stream = match me.store.entry(id) { Entry::Occupied(e) => e.into_mut(), Entry::Vacant(e) => { // Trailers cannot open a stream. Trailers are header frames @@ -68,7 +72,7 @@ impl Streams

{ } match try!(me.actions.recv.open(id)) { - Some(state) => e.insert(state), + Some(stream) => e.insert(stream), None => return Ok(None), } } @@ -80,12 +84,12 @@ impl Streams

{ unimplemented!(); } - try!(me.actions.recv.recv_eos(state)); + try!(me.actions.recv.recv_eos(stream)); } else { - try!(me.actions.recv.recv_headers(state, frame.is_end_stream())); + try!(me.actions.recv.recv_headers(stream, frame.is_end_stream())); } - if state.is_closed() { + if stream.state.is_closed() { me.actions.dec_num_streams(id); } @@ -99,16 +103,16 @@ impl Streams

{ let mut me = self.inner.lock().unwrap(); let me = &mut *me; - let state = match me.store.get_mut(&id) { - Some(state) => state, + let stream = match me.store.get_mut(&id) { + Some(stream) => stream, None => return Err(ProtocolError.into()), }; // Ensure there's enough capacity on the connection before acting on the // stream. - try!(me.actions.recv.recv_data(frame, state)); + try!(me.actions.recv.recv_data(frame, stream)); - if state.is_closed() { + if stream.state.is_closed() { me.actions.dec_num_streams(id); } @@ -180,23 +184,23 @@ impl Streams

{ */ } - pub fn send_data(&mut self, frame: &frame::Data) + pub fn send_data(&mut self, frame: &frame::Data) -> Result<(), ConnectionError> { let id = frame.stream_id(); let mut me = self.inner.lock().unwrap(); let me = &mut *me; - let state = match me.store.get_mut(&id) { - Some(state) => state, + let stream = match me.store.get_mut(&id) { + Some(stream) => stream, None => return Err(UnexpectedFrameType.into()), }; // Ensure there's enough capacity on the connection before acting on the // stream. - try!(me.actions.send.send_data(frame, state)); + try!(me.actions.send.send_data(frame, stream)); - if state.is_closed() { + if stream.state.is_closed() { me.actions.dec_num_streams(id); } @@ -228,20 +232,18 @@ impl Streams

{ Ok(()) } - pub fn send_pending_refusal(&mut self, dst: &mut Codec) + pub fn send_pending_refusal(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> where T: AsyncWrite, - B: Buf, { let mut me = self.inner.lock().unwrap(); let me = &mut *me; me.actions.recv.send_pending_refusal(dst) } - pub fn send_pending_window_updates(&mut self, dst: &mut Codec) + pub fn send_pending_window_updates(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> where T: AsyncWrite, - B: Buf, { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -252,41 +254,46 @@ impl Streams

{ } } -impl Streams { +impl Streams + where B: Buf, +{ pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) - -> Result, ConnectionError> + -> Result, ConnectionError> { let id = { let mut me = self.inner.lock().unwrap(); let me = &mut *me; // Initialize a new stream. This fails if the connection is at capacity. - let (id, mut state) = me.actions.send.open()?; + let (id, mut stream) = me.actions.send.open()?; // Convert the message let headers = client::Peer::convert_send_message( id, request, end_of_stream); - me.actions.send.send_headers(&mut state, end_of_stream)?; + me.actions.send.send_headers(&mut stream, headers)?; // Given that the stream has been initialized, it should not be in the // closed state. - debug_assert!(!state.is_closed()); + debug_assert!(!stream.state.is_closed()); // Store the state - me.store.insert(id, state); + me.store.insert(id, stream); id }; - Ok(Stream { + Ok(StreamRef { inner: self.inner.clone(), id: id, }) } } -impl Actions

{ +impl Actions + where P: Peer, + B: Buf, +{ fn dec_num_streams(&mut self, id: StreamId) { if self.is_local_init(id) { self.send.dec_num_streams();