diff --git a/src/frame/mod.rs b/src/frame/mod.rs index b7a8022..c0a5270 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -1,7 +1,7 @@ use hpack; use error::{ConnectionError, Reason}; -use bytes::Bytes; +use bytes::{Bytes, Buf}; use std::fmt; @@ -78,6 +78,18 @@ impl Frame { } } +impl Frame { + /// Returns the length of the frame as it applies to flow control. + pub fn flow_len(&self) -> usize { + use self::Frame::*; + + match *self { + Data(ref frame) => frame.payload().remaining(), + _ => 0, + } + } +} + impl fmt::Debug for Frame { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { use self::Frame::*; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 78fbc2f..1fb11ac 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -47,23 +47,6 @@ impl Connection } } - /// Polls for the next update to a remote flow control window. - pub fn poll_window_update(&mut self) -> Poll { - self.streams.poll_window_update() - } - - /// Increases the capacity of a local flow control window. - /// - /// # Panics - /// - /// THis function panics if `incr` is not a valid window size. - pub fn expand_window(&mut self, id: StreamId, incr: usize) - -> Result<(), ConnectionError> - { - assert!(incr <= MAX_WINDOW_SIZE as usize); - self.streams.expand_window(id, incr as WindowSize) - } - pub fn update_local_settings(&mut self, _local: frame::SettingSet) -> Result<(), ConnectionError> { unimplemented!(); } @@ -149,8 +132,7 @@ impl Connection } Some(WindowUpdate(frame)) => { trace!("recv WINDOW_UPDATE; frame={:?}", frame); - // TODO: implement - // try!(self.streams.recv_window_update(frame)); + self.streams.recv_window_update(frame)?; } None => { // TODO: Is this correct? diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 077d10d..4d30918 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -70,6 +70,26 @@ impl Deque { } } + pub fn push_front(&mut self, buf: &mut Buffer, frame: Frame) { + let key = buf.slab.insert(Slot { + frame, + next: None, + }); + + match self.indices { + Some(ref mut idxs) => { + buf.slab[key].next = Some(idxs.head); + idxs.head = key; + } + None => { + self.indices = Some(Indices { + head: key, + tail: key, + }); + } + } + } + pub fn pop_front(&mut self, buf: &mut Buffer) -> Option> { match self.indices { Some(mut idxs) => { diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index 81b57f5..1f43780 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -9,8 +9,8 @@ pub struct FlowControl { /// Amount to be removed by future increments. underflow: WindowSize, - /// The amount that has been incremented but not yet advertised (to the application or - /// the remote). + /// The amount that has been incremented but not yet advertised (to the + /// application or the remote). next_window_update: WindowSize, } @@ -23,6 +23,14 @@ impl FlowControl { } } + pub fn has_capacity(&self) -> bool { + self.window_size > 0 + } + + pub fn window_size(&self) -> WindowSize { + self.window_size + } + /// Returns true iff `claim_window(sz)` would return succeed. pub fn ensure_window(&mut self, sz: WindowSize, err: T) -> Result<(), ConnectionError> where T: Into, @@ -49,7 +57,10 @@ impl FlowControl { } /// Increase the _unadvertised_ window capacity. - pub fn expand_window(&mut self, sz: WindowSize) { + pub fn expand_window(&mut self, sz: WindowSize) + -> Result<(), ConnectionError> + { + // TODO: Handle invalid increment if sz <= self.underflow { self.underflow -= sz; return; @@ -60,6 +71,7 @@ impl FlowControl { self.underflow = 0; } + /* /// Obtains the unadvertised window update. /// /// This does not apply the window update to `self`. @@ -70,6 +82,7 @@ impl FlowControl { Some(self.next_window_update) } } + */ /// Obtains and applies an unadvertised window update. pub fn apply_window_update(&mut self) -> Option { diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 667bbf3..2977023 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -2,8 +2,18 @@ use super::*; #[derive(Debug)] pub(super) struct Prioritize { + /// Streams that have pending frames pending_send: store::List, + /// Streams that are waiting for connection level flow control capacity + pending_capacity: store::List, + + /// Connection level flow control governing sent data + flow_control: FlowControl, + + /// Total amount of buffered data in data frames + buffered_data: usize, + /// Holds frames that are waiting to be written to the socket buffer: Buffer, } @@ -11,17 +21,44 @@ pub(super) struct Prioritize { impl Prioritize where B: Buf, { - pub fn new() -> Prioritize { + pub fn new(config: &Config) -> Prioritize { Prioritize { pending_send: store::List::new(), + pending_capacity: store::List::new(), + flow_control: FlowControl::new(config.init_local_window_sz), + buffered_data: 0, buffer: Buffer::new(), } } + pub fn available_window(&self) -> WindowSize { + let win = self.flow_control.window_size(); + + if self.buffered_data >= win as usize { + 0 + } else { + win - self.buffered_data as WindowSize + } + } + + pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) + -> Result<(), ConnectionError> + { + // Expand the window + self.flow_control.expand_window(frame.size_increment())?; + + // Imediately apply the update + self.flow.apply_window_update(); + + Ok(()) + } + pub fn queue_frame(&mut self, frame: Frame, stream: &mut store::Ptr) { + self.buffered_data += frame.flow_len(); + // queue the frame in the buffer stream.pending_send.push_back(&mut self.buffer, frame); @@ -33,7 +70,7 @@ impl Prioritize } // Queue the stream - self.push_sender(stream); + push_sender(&mut self.pending_send, stream); } pub fn poll_complete(&mut self, @@ -48,7 +85,9 @@ impl Prioritize match self.pop_frame(store) { Some(frame) => { - // TODO: data frames should be handled specially... + // Subtract the data size + self.buffered_data -= frame.flow_len(); + let res = dst.start_send(frame)?; // We already verified that `dst` is ready to accept the @@ -63,35 +102,63 @@ impl Prioritize } fn pop_frame(&mut self, store: &mut Store) -> Option> { - match self.pop_sender(store) { - Some(mut stream) => { - let frame = stream.pending_send.pop_front(&mut self.buffer).unwrap(); + loop { + match self.pop_sender(store) { + Some(mut stream) => { + let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { + Frame::Data(frame) => { + let len = frame.payload().remaining(); - if !stream.pending_send.is_empty() { - self.push_sender(&mut stream); + if len > self.flow_control.window_size() as usize { + // TODO: This could be smarter... + stream.pending_send.push_front(&mut self.buffer, frame.into()); + + // Push the stream onto the list of streams + // waiting for connection capacity + push_sender(&mut self.pending_capacity, &mut stream); + + // Try again w/ the next stream + continue; + } + + frame.into() + } + frame => frame, + }; + + if !stream.pending_send.is_empty() { + push_sender(&mut self.pending_send, &mut stream); + } + + return Some(frame); } - - Some(frame) + None => return None, } - None => None, } } - fn push_sender(&mut self, stream: &mut store::Ptr) { - debug_assert!(!stream.is_pending_send); - - self.pending_send.push(stream); - - stream.is_pending_send = true; - } - fn pop_sender<'a>(&mut self, store: &'a mut Store) -> Option> { - match self.pending_send.pop(store) { - Some(mut stream) => { - stream.is_pending_send = false; - Some(stream) + // If the connection level window has capacity, pop off of the pending + // capacity list first. + + if self.flow_control.has_capacity() && !self.pending_capacity.is_empty() { + let mut stream = self.pending_capacity.pop(store).unwrap(); + stream.is_pending_send = false; + Some(stream) + } else { + match self.pending_send.pop(store) { + Some(mut stream) => { + stream.is_pending_send = false; + Some(stream) + } + None => None, } - None => None, } } } + +fn push_sender(list: &mut store::List, stream: &mut store::Ptr) { + debug_assert!(!stream.is_pending_send); + list.push(stream); + stream.is_pending_send = true; +} diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 81b1251..3e11a5a 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -360,6 +360,7 @@ impl Recv where B: Buf { Ok(()) } + /* /// Send connection level window update pub fn send_connection_window_update(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> @@ -377,6 +378,7 @@ impl Recv where B: Buf { Ok(().into()) } + */ pub fn next_incoming(&mut self, store: &mut Store) -> Option { self.pending_accept.pop(store) @@ -413,6 +415,7 @@ impl Recv where B: Buf { } } + /* /// Send stream level window update pub fn send_stream_window_update(&mut self, streams: &mut Store, @@ -441,6 +444,7 @@ impl Recv where B: Buf { Ok(().into()) } + */ fn reset(&mut self, _stream_id: StreamId, _reason: Reason) { unimplemented!(); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index b0130bd..a7b15e4 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -9,6 +9,7 @@ use bytes::Buf; use std::collections::VecDeque; use std::marker::PhantomData; +/// Manages state transitions related to outbound frames. #[derive(Debug)] pub(super) struct Send { /// Maximum number of locally initiated streams @@ -23,19 +24,7 @@ pub(super) struct Send { /// Initial window size of locally initiated streams init_window_sz: WindowSize, - /// Connection level flow control governing sent data - flow_control: FlowControl, - - /// Holds the list of streams on which local window updates may be sent. - // XXX It would be cool if this didn't exist. - pending_window_updates: VecDeque, - prioritize: Prioritize, - - /// When `poll_window_update` is not ready, then the calling task is saved to - /// be notified later. Access to poll_window_update must not be shared across tasks, - /// as we only track a single task (and *not* i.e. a task per stream id). - blocked: Option, } impl Send where B: Buf { @@ -53,10 +42,7 @@ impl Send where B: Buf { num_streams: 0, next_stream_id: next_stream_id.into(), init_window_sz: config.init_local_window_sz, - flow_control: FlowControl::new(config.init_local_window_sz), - prioritize: Prioritize::new(), - pending_window_updates: VecDeque::new(), - blocked: None, + prioritize: Prioritize::new(config), } } @@ -119,15 +105,16 @@ impl Send where B: Buf { // Make borrow checker happy loop { + let unadvertised = stream.unadvertised_send_window; + match stream.send_flow_control() { Some(flow) => { - try!(self.flow_control.ensure_window(sz, FlowControlViolation)); + // Ensure that the size fits within the advertised size + try!(flow.ensure_window( + sz + unadvertised, FlowControlViolation)); - // Claim the window on the stream - try!(flow.claim_window(sz, FlowControlViolation)); - - // Claim the window on the connection - self.flow_control.claim_window(sz, FlowControlViolation) + // Now, claim the window on the stream + flow.claim_window(sz, FlowControlViolation) .expect("local connection flow control error"); break; @@ -160,6 +147,7 @@ impl Send where B: Buf { self.prioritize.poll_complete(store, dst) } + /* /// Get pending window updates pub fn poll_window_update(&mut self, streams: &mut Store) -> Poll @@ -191,16 +179,15 @@ impl Send where B: Buf { return Ok(Async::NotReady); } + */ pub fn recv_connection_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), ConnectionError> { - // TODO: Handle invalid increment - self.flow_control.expand_window(frame.size_increment()); + self.priority.recv_window_update(frame)?; - if let Some(task) = self.blocked.take() { - task.notify(); - } + // TODO: If there is available connection capacity, release pending + // streams. Ok(()) } @@ -210,6 +197,8 @@ impl Send where B: Buf { stream: &mut store::Ptr) -> Result<(), ConnectionError> { + unimplemented!(); + /* if let Some(flow) = stream.send_flow_control() { // TODO: Handle invalid increment flow.expand_window(frame.size_increment()); @@ -220,6 +209,7 @@ impl Send where B: Buf { } Ok(()) + */ } pub fn dec_num_streams(&mut self) { diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 7518a48..351e055 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -28,6 +28,11 @@ pub(super) struct Stream { /// True if the stream is currently pending send pub is_pending_send: bool, + + /// A stream's capacity is never advertised past the connection's capacity. + /// This value represents the amount of the stream window that has been + /// temporarily withheld. + pub unadvertised_send_window: WindowSize, } impl Stream { @@ -41,6 +46,7 @@ impl Stream { next: None, pending_push_promises: store::List::new(), is_pending_send: false, + unadvertised_send_window: 0, } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 20d8b3f..6ba4c60 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -154,7 +154,8 @@ impl Streams } pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) - -> Result<(), ConnectionError> { + -> Result<(), ConnectionError> + { let id = frame.stream_id(); let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -238,14 +239,6 @@ impl Streams }) } - pub fn poll_window_update(&mut self) - -> Poll - { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - me.actions.send.poll_window_update(&mut me.store) - } - pub fn expand_window(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { @@ -279,12 +272,6 @@ impl Streams let mut me = self.inner.lock().unwrap(); let me = &mut *me; - // TODO: sending window updates should be part of Prioritize - /* - try_ready!(me.actions.recv.send_connection_window_update(dst)); - try_ready!(me.actions.recv.send_stream_window_update(&mut me.store, dst)); - */ - me.actions.send.poll_complete(&mut me.store, dst) } }