diff --git a/src/client.rs b/src/client.rs index 89ac90f..20feb73 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,10 @@ use {frame, ConnectionError, StreamId}; use {Body, Chunk}; -use proto::{self, Connection}; +use proto::{self, Connection, WindowSize}; use error::Reason::*; use http::{self, Request, Response}; -use futures::{self, Future, Poll, Sink, AsyncSink}; +use futures::{self, Future, Poll, Sink, Async, AsyncSink}; use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, IntoBuf}; @@ -147,6 +147,25 @@ impl Stream { Ok(Response::from_parts(parts, body).into()) } + /// Request capacity to send data + pub fn reserve_capacity(&mut self, capacity: usize) + -> Result<(), ConnectionError> + { + // TODO: Check for overflow + self.inner.reserve_capacity(capacity as WindowSize) + } + + /// Returns the stream's current send capacity. + pub fn capacity(&self) -> usize { + self.inner.capacity() as usize + } + + /// Request to be notified when the stream's capacity increases + pub fn poll_capacity(&mut self) -> Poll, ConnectionError> { + let res = try_ready!(self.inner.poll_capacity()); + Ok(Async::Ready(res.map(|v| v as usize))) + } + /// Send data pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index b2689da..f2c3a28 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -1,121 +1,81 @@ use ConnectionError; use proto::*; +use std::cmp; + #[derive(Copy, Clone, Debug)] pub struct FlowControl { - /// Amount that may be claimed. - window_size: WindowSize, + /// Window size as indicated by the peer. This can go negative. + window_size: i32, - /// Amount to be removed by future increments. - underflow: WindowSize, - - /// The amount that has been incremented but not yet advertised (to the - /// application or the remote). - next_window_update: WindowSize, + /// The amount of the window that is currently available to consume. + available: WindowSize, } impl FlowControl { - pub fn new(window_size: WindowSize) -> FlowControl { + pub fn new() -> FlowControl { FlowControl { - window_size, - underflow: 0, - next_window_update: 0, + window_size: 0, + available: 0, } } - pub fn has_capacity(&self) -> bool { - self.effective_window_size() > 0 - } - - pub fn effective_window_size(&self) -> WindowSize { - let plus = self.window_size + self.next_window_update; - - if self.underflow >= plus { - return 0; - } - - plus - self.underflow - } - - /// Returns true iff `claim_window(sz)` would return succeed. - pub fn ensure_window(&mut self, sz: WindowSize, err: T) -> Result<(), ConnectionError> - where T: Into, - { - if sz <= self.window_size { - Ok(()) + /// Returns the window size as known by the peer + pub fn window_size(&self) -> WindowSize { + if self.window_size < 0 { + 0 } else { - Err(err.into()) + self.window_size as WindowSize } } - /// Reduce future capacity of the window. - /// - /// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. - pub fn shrink_window(&mut self, dec: WindowSize) { - /* - if decr < self.next_window_update { - self.next_window_update -= decr - } else { - self.underflow += decr - self.next_window_update; - self.next_window_update = 0; + /// Returns the window size available to the consumer + pub fn available(&self) -> WindowSize { + self.available + } + + /// Returns true if there is unavailable window capacity + pub fn has_unavailable(&self) -> bool { + if self.window_size < 0 { + return false; } - */ + + self.window_size as WindowSize > self.available } + pub fn claim_capacity(&mut self, capacity: WindowSize) { + assert!(self.available >= capacity); + self.available -= capacity; + } - /// Claims the provided amount from the window, if there is enough space. + pub fn assign_capacity(&mut self, capacity: WindowSize) { + assert!(self.window_size() >= self.available + capacity); + self.available += capacity; + } + + /// Update the window size. /// - /// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than - /// have been previously claimed. - pub fn claim_window(&mut self, sz: WindowSize, err: T) - -> Result<(), ConnectionError> - where T: Into, - { - self.ensure_window(sz, err)?; - - self.window_size -= sz; - Ok(()) - } - - /// Increase the _unadvertised_ window capacity. - pub fn expand_window(&mut self, sz: WindowSize) - -> Result<(), ConnectionError> - { + /// This is called after receiving a WINDOW_UPDATE frame + pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), ConnectionError> { // TODO: Handle invalid increment - if sz <= self.underflow { - self.underflow -= sz; - return Ok(()); - } - - let added = sz - self.underflow; - self.next_window_update += added; - self.underflow = 0; - + self.window_size += sz as i32; Ok(()) } - /* - /// Obtains the unadvertised window update. - /// - /// This does not apply the window update to `self`. - pub fn peek_window_update(&mut self) -> Option { - if self.next_window_update == 0 { - None - } else { - Some(self.next_window_update) - } - } - */ + /// Decrements the window reflecting data has actually been sent. The caller + /// must ensure that the window has capacity. + pub fn send_data(&mut self, sz: WindowSize) { + trace!("send_data; sz={}; window={}; available={}", + sz, self.window_size, self.available); - /// Obtains and applies an unadvertised window update. - pub fn apply_window_update(&mut self) -> Option { - if self.next_window_update == 0 { - return None; - } + // Available cannot be greater than the window + debug_assert!(self.available as i32 <= self.window_size || self.available == 0); - let incr = self.next_window_update; - self.next_window_update = 0; - self.window_size += incr; - Some(incr) + // Ensure that the argument is correct + assert!(sz <= self.window_size as WindowSize); + + // Update values + self.window_size -= sz as i32; + self.available -= sz; } } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 424b756..358e522 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -6,17 +6,14 @@ use std::{fmt, cmp}; #[derive(Debug)] pub(super) struct Prioritize { - /// Streams that have pending frames - pending_send: store::List, + /// Queue of streams waiting for socket capacity to send a frame + pending_send: store::Queue, - /// Streams that are waiting for connection level flow control capacity - pending_capacity: store::List, + /// Queue of streams waiting for window capacity to produce data. + pending_capacity: store::Queue, /// Connection level flow control governing sent data - flow_control: FlowControl, - - /// Total amount of buffered data in data frames - buffered_data: usize, + flow: FlowControl, /// Holds frames that are waiting to be written to the socket buffer: Buffer, @@ -42,83 +39,226 @@ impl Prioritize where B: Buf, { pub fn new(config: &Config) -> Prioritize { + let mut flow = FlowControl::new(); + + flow.inc_window(config.init_local_window_sz); + flow.assign_capacity(config.init_local_window_sz); + Prioritize { - pending_send: store::List::new(), - pending_capacity: store::List::new(), - flow_control: FlowControl::new(config.init_local_window_sz), - buffered_data: 0, + pending_send: store::Queue::new(), + pending_capacity: store::Queue::new(), + flow: flow, buffer: Buffer::new(), conn_task: None, } } - pub fn available_window(&self) -> WindowSize { - let win = self.flow_control.effective_window_size(); - - if self.buffered_data >= win as usize { - 0 - } else { - win - self.buffered_data as WindowSize - } - } - - pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) - -> Result<(), ConnectionError> - { - // Expand the window - self.flow_control.expand_window(frame.size_increment())?; - - // Imediately apply the update - self.flow_control.apply_window_update(); - - Ok(()) - } - + /// Queue a frame to be sent to the remote pub fn queue_frame(&mut self, frame: Frame, stream: &mut store::Ptr) { - if self.queue_frame2(frame, stream) { - // Notification required - if let Some(ref task) = self.conn_task { - task.notify(); - } - } - } - - /// Queue frame without actually notifying. Returns ture if the queue was - /// succesfful. - fn queue_frame2(&mut self, frame: Frame, stream: &mut store::Ptr) - -> bool - { - self.buffered_data += frame.flow_len(); - - // queue the frame in the buffer + // Queue the frame in the buffer stream.pending_send.push_back(&mut self.buffer, frame); // Queue the stream - !push_sender(&mut self.pending_send, stream) + self.pending_send.push(stream); + + // Notify the connection. + if let Some(task) = self.conn_task.take() { + task.notify(); + } } - /// Push the frame to the front of the stream's deque, scheduling the - /// steream if needed. - fn push_back_frame(&mut self, frame: Frame, stream: &mut store::Ptr) { - // Push the frame to the front of the stream's deque - stream.pending_send.push_front(&mut self.buffer, frame); + /// Send a data frame + pub fn send_data(&mut self, + frame: frame::Data, + stream: &mut store::Ptr) + -> Result<(), ConnectionError> + { + let sz = frame.payload().remaining(); - // If needed, schedule the sender - push_sender(&mut self.pending_capacity, stream); + if sz > MAX_WINDOW_SIZE as usize { + // TODO: handle overflow + unimplemented!(); + } + + let sz = sz as WindowSize; + + if !stream.state.is_send_streaming() { + if stream.state.is_closed() { + return Err(InactiveStreamId.into()); + } else { + return Err(UnexpectedFrameType.into()); + } + } + + // Update the buffered data counter + stream.buffered_send_data += sz; + + // Implicitly request more send capacity if not enough has been + // requested yet. + if stream.requested_send_capacity < stream.buffered_send_data { + // Update the target requested capacity + stream.requested_send_capacity = stream.buffered_send_data; + + self.try_assign_capacity(stream); + } + + if frame.is_end_stream() { + try!(stream.state.send_close()); + } + + if stream.send_flow.available() > stream.buffered_send_data { + // The stream currently has capacity to send the data frame, so + // queue it up and notify the connection task. + self.queue_frame(frame.into(), stream); + } else { + // The stream has no capacity to send the frame now, save it but + // don't notify the conneciton task. Once additional capacity + // becomes available, the frame will be flushed. + stream.pending_send.push_back(&mut self.buffer, frame.into()); + } + + Ok(()) } + /// Request capacity to send data + pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) + -> Result<(), ConnectionError> + { + // Actual capacity is `capacity` + the current amount of buffered data. + // It it were less, then we could never send out the buffered data. + let capacity = capacity + stream.buffered_send_data; + + if capacity == stream.requested_send_capacity { + // Nothing to do + return Ok(()); + } else if capacity < stream.requested_send_capacity { + // TODO: release capacity + unimplemented!(); + } else { + // Update the target requested capacity + stream.requested_send_capacity = capacity; + + // Try to assign additional capacity to the stream. If none is + // currently available, the stream will be queued to receive some + // when more becomes available. + self.try_assign_capacity(stream); + + Ok(()) + } + } + + pub fn recv_stream_window_update(&mut self, + inc: WindowSize, + stream: &mut store::Ptr) + -> Result<(), ConnectionError> + { + if !stream.state.is_send_streaming() { + return Ok(()); + } + + // Update the stream level flow control. + stream.send_flow.inc_window(inc)?; + + // If the stream is waiting on additional capacity, then this will + // assign it (if available on the connection) and notify the producer + self.try_assign_capacity(stream); + + Ok(()) + } + + pub fn recv_connection_window_update(&mut self, + inc: WindowSize, + store: &mut Store) + -> Result<(), ConnectionError> + { + // Update the connection's window + self.flow.inc_window(inc)?; + + // Assign newly acquired capacity to streams pending capacity. + while self.flow.available() > 0 { + let mut stream = match self.pending_capacity.pop(store) { + Some(stream) => stream, + None => return Ok(()), + }; + + // Try to assign capacity to the stream. This will also re-queue the + // stream if there isn't enough connection level capacity to fulfill + // the capacity request. + self.try_assign_capacity(&mut stream); + } + + Ok(()) + } + + /// Request capacity to send data + fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { + let total_requested = stream.requested_send_capacity; + + // Total requested should never go below actual assigned + // (Note: the window size can go lower than assigned) + debug_assert!(total_requested >= stream.send_flow.available()); + + // The amount of additional capacity that the stream requests. + // Don't assign more than the window has available! + let mut additional = cmp::min( + total_requested - stream.send_flow.available(), + stream.send_flow.window_size()); + + trace!("try_assign_capacity; requested={}; additional={}; conn={}", + total_requested, additional, self.flow.available()); + + if additional == 0 { + // Nothing more to do + return; + } + + // The amount of currently available capacity on the connection + let conn_available = self.flow.available(); + + // First check if capacity is immediately available + if conn_available > 0 { + // There should be no streams pending capacity + debug_assert!(self.pending_capacity.is_empty()); + + // The amount of capacity to assign to the stream + // TODO: Should prioritization factor into this? + let assign = cmp::min(conn_available, additional); + + // Assign the capacity to the stream + stream.assign_capacity(assign); + + // Claim the capacity from the connection + self.flow.claim_capacity(assign); + } + + if stream.send_flow.available() < stream.requested_send_capacity { + if stream.send_flow.has_unavailable() { + // The stream requires additional capacity and the stream's + // window has availablel capacity, but the connection window + // does not. + // + // In this case, the stream needs to be queued up for when the + // connection has more capacity. + self.pending_capacity.push(stream); + } + } + + // If data is buffered, then schedule the stream for execution + if stream.buffered_send_data > 0 { + self.pending_send.push(stream); + } + } + + pub fn poll_complete(&mut self, store: &mut Store, dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, { - // Track the task - self.conn_task = Some(task::current()); - // Ensure codec is ready try_ready!(dst.poll_ready()); @@ -129,16 +269,10 @@ impl Prioritize let max_frame_len = dst.max_send_frame_size(); trace!("poll_complete"); + loop { match self.pop_frame(store, max_frame_len) { Some(frame) => { - // Figure out the byte size this frame applies to flow - // control - let len = cmp::min(frame.flow_len(), max_frame_len); - - // Subtract the data size - self.buffered_data -= len; - trace!("writing frame={:?}", frame); let res = dst.start_send(frame)?; @@ -160,6 +294,9 @@ impl Prioritize // This might release a data frame... if !self.reclaim_frame(store, dst) { + // Nothing else to do, track the task + self.conn_task = Some(task::current()); + return Ok(().into()); } @@ -170,84 +307,13 @@ impl Prioritize } } - fn pop_frame(&mut self, store: &mut Store, max_len: usize) - -> Option>> - { - loop { - match self.pop_sender(store) { - Some(mut stream) => { - let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { - Frame::Data(frame) => { - let len = frame.payload().remaining(); - - if len > self.flow_control.effective_window_size() as usize { - // TODO: This could be smarter... - self.push_back_frame(frame.into(), &mut stream); - - // Try again w/ the next stream - continue; - } - - frame.into() - } - frame => frame, - }; - - if !stream.pending_send.is_empty() { - push_sender(&mut self.pending_send, &mut stream); - } - - let frame = match frame { - Frame::Data(mut frame) => { - let eos = frame.is_end_stream(); - - if frame.payload().remaining() > max_len { - frame.unset_end_stream(); - } - - Frame::Data(frame.map(|buf| { - Prioritized { - inner: buf.take(max_len), - end_of_stream: eos, - stream: stream.key(), - } - })) - } - frame => frame.map(|_| unreachable!()), - }; - - return Some(frame); - } - None => return None, - } - } - } - - fn pop_sender<'a>(&mut self, store: &'a mut Store) -> Option> { - // If the connection level window has capacity, pop off of the pending - // capacity list first. - - if self.flow_control.has_capacity() && !self.pending_capacity.is_empty() { - let mut stream = self.pending_capacity - .pop::(store) - .unwrap(); - - stream.is_pending_send = false; - Some(stream) - } else { - let stream = self.pending_send - .pop::(store); - - match stream { - Some(mut stream) => { - stream.is_pending_send = false; - Some(stream) - } - None => None, - } - } - } - + /// Tries to reclaim a pending data frame from the codec. + /// + /// Returns true if a frame was reclaimed. + /// + /// When a data frame is written to the codec, it may not be written in its + /// entirety (large chunks are split up into potentially many data frames). + /// In this case, the stream needs to be reprioritized. fn reclaim_frame(&mut self, store: &mut Store, dst: &mut Codec>) -> bool @@ -282,21 +348,100 @@ impl Prioritize false } -} -/// Push the stream onto the `pending_send` list. Returns true if the sender was -/// not already queued. -fn push_sender(list: &mut store::List, stream: &mut store::Ptr) - -> bool -{ - if stream.is_pending_send { - return false; + /// Push the frame to the front of the stream's deque, scheduling the + /// steream if needed. + fn push_back_frame(&mut self, frame: Frame, stream: &mut store::Ptr) { + // Push the frame to the front of the stream's deque + stream.pending_send.push_front(&mut self.buffer, frame); + + // If needed, schedule the sender + self.pending_send.push(stream); } - list.push::(stream); - stream.is_pending_send = true; + // =========== OLD JUNK =========== - true + fn pop_frame(&mut self, store: &mut Store, max_len: usize) + -> Option>> + { + loop { + trace!("pop frame"); + match self.pending_send.pop(store) { + Some(mut stream) => { + let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { + Frame::Data(mut frame) => { + trace!(" --> data frame"); + + // Get the amount of capacity remaining for stream's + // window. + // + // TODO: Is this the right thing to check? + let stream_capacity = stream.send_flow.window_size(); + + if stream_capacity == 0 { + trace!(" --> stream capacity is 0, return"); + // The stream has no more capacity, this can + // happen if the remote reduced the stream + // window. In this case, we need to buffer the + // frame and wait for a window update... + stream.pending_send.push_front(&mut self.buffer, frame.into()); + continue; + } + + // Only send up to the max frame length + let len = cmp::min( + frame.payload().remaining(), + max_len); + + // Only send up to the stream's window capacity + let len = cmp::min(len, stream_capacity as usize); + + // There *must* be be enough connection level + // capacity at this point. + debug_assert!(len <= self.flow.window_size() as usize); + + // Update the flow control + trace!(" -- updating stream flow --"); + stream.send_flow.send_data(len as WindowSize); + + // Assign the capacity back to the connection that + // was just consumed from the stream in the previous + // line. + self.flow.assign_capacity(len as WindowSize); + + trace!(" -- updating connection flow --"); + self.flow.send_data(len as WindowSize); + + // Wrap the frame's data payload to ensure that the + // correct amount of data gets written. + + let eos = frame.is_end_stream(); + + if frame.payload().remaining() > len { + frame.unset_end_stream(); + } + + Frame::Data(frame.map(|buf| { + Prioritized { + inner: buf.take(len), + end_of_stream: eos, + stream: stream.key(), + } + })) + } + frame => frame.map(|_| unreachable!()), + }; + + if !stream.pending_send.is_empty() { + self.pending_send.push(&mut stream); + } + + return Some(frame); + } + None => return None, + } + } + } } // ===== impl Prioritized ===== diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 92ee567..2509a2d 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -29,7 +29,7 @@ pub(super) struct Recv { pending_window_updates: VecDeque, /// New streams to be accepted - pending_accept: store::List, + pending_accept: store::Queue, /// Holds frames that are waiting to be read buffer: Buffer, @@ -60,14 +60,18 @@ impl Recv where B: Buf { 2 }; + let mut flow = FlowControl::new(); + + flow.inc_window(config.init_remote_window_sz); + Recv { max_streams: config.max_remote_initiated, num_streams: 0, init_window_sz: config.init_remote_window_sz, - flow_control: FlowControl::new(config.init_remote_window_sz), + flow_control: flow, next_stream_id: next_stream_id.into(), pending_window_updates: VecDeque::new(), - pending_accept: store::List::new(), + pending_accept: store::Queue::new(), buffer: Buffer::new(), refused: None, _p: PhantomData, @@ -132,7 +136,9 @@ impl Recv where B: Buf { -> Result<(), ConnectionError> { trace!("opening stream; init_window={}", self.init_window_sz); - let is_initial = stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?; + let is_initial = stream.state.recv_open(frame.is_end_stream())?; + + // TODO: Update flow control if is_initial { if !self.can_inc_num_streams() { @@ -157,7 +163,7 @@ impl Recv where B: Buf { // Only servers can receive a headers frame that initiates the stream. // This is verified in `Streams` before calling this function. if P::is_server() { - self.pending_accept.push::(stream); + self.pending_accept.push(stream); } Ok(()) @@ -192,6 +198,8 @@ impl Recv where B: Buf { let sz = sz as WindowSize; + // TODO: implement + /* match stream.recv_flow_control() { Some(flow) => { // Ensure there's enough capacity on the connection before @@ -207,6 +215,7 @@ impl Recv where B: Buf { } None => return Err(ProtocolError.into()), } + */ if frame.is_end_stream() { try!(stream.state.recv_close()); @@ -255,7 +264,7 @@ impl Recv where B: Buf { let mut new_stream = store .insert(frame.promised_id(), new_stream); - ppp.push::(&mut new_stream); + ppp.push(&mut new_stream); } let stream = &mut store[stream]; @@ -378,10 +387,13 @@ impl Recv where B: Buf { pub fn expand_connection_window(&mut self, sz: WindowSize) -> Result<(), ConnectionError> { + unimplemented!(); + /* // TODO: handle overflow self.flow_control.expand_window(sz); Ok(()) + */ } pub fn expand_stream_window(&mut self, @@ -390,6 +402,8 @@ impl Recv where B: Buf { stream: &mut store::Ptr) -> Result<(), ConnectionError> { + unimplemented!(); + /* // TODO: handle overflow if let Some(flow) = stream.recv_flow_control() { flow.expand_window(sz); @@ -397,6 +411,7 @@ impl Recv where B: Buf { } Ok(()) + */ } /* @@ -420,7 +435,7 @@ impl Recv where B: Buf { */ pub fn next_incoming(&mut self, store: &mut Store) -> Option { - self.pending_accept.pop::(store) + self.pending_accept.pop(store) .map(|ptr| ptr.key()) } diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 250be77..7cd04a8 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -24,9 +24,6 @@ pub(super) struct Send { /// Initial window size of locally initiated streams init_window_sz: WindowSize, - /// List of streams waiting for outbound connection capacity - pending_capacity: store::List, - /// Task awaiting notification to open a new stream. blocked_open: Option, @@ -44,7 +41,6 @@ impl Send where B: Buf { num_streams: 0, next_stream_id: next_stream_id.into(), init_window_sz: config.init_local_window_sz, - pending_capacity: store::List::new(), blocked_open: None, prioritize: Prioritize::new(config), } @@ -93,7 +89,11 @@ impl Send where B: Buf { { trace!("send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz); // Update the state - stream.state.send_open(self.init_window_sz, frame.is_end_stream())?; + stream.state.send_open(frame.is_end_stream())?; + + if stream.state.is_send_streaming() { + stream.send_flow.inc_window(self.init_window_sz)?; + } // Queue the frame for sending self.prioritize.queue_frame(frame.into(), stream); @@ -112,48 +112,7 @@ impl Send where B: Buf { stream: &mut store::Ptr) -> Result<(), ConnectionError> { - let sz = frame.payload().remaining(); - - if sz > MAX_WINDOW_SIZE as usize { - // TODO: handle overflow - unimplemented!(); - } - - let sz = sz as WindowSize; - - // Make borrow checker happy - loop { - let unadvertised = stream.unadvertised_send_window; - - match stream.send_flow_control() { - Some(flow) => { - // Ensure that the size fits within the advertised size - try!(flow.ensure_window( - sz + unadvertised, FlowControlViolation)); - - // Now, claim the window on the stream - flow.claim_window(sz, FlowControlViolation) - .expect("local connection flow control error"); - - break; - } - None => {} - } - - if stream.state.is_closed() { - return Err(InactiveStreamId.into()); - } else { - return Err(UnexpectedFrameType.into()); - } - } - - if frame.is_end_stream() { - try!(stream.state.send_close()); - } - - self.prioritize.queue_frame(frame.into(), stream); - - Ok(()) + self.prioritize.send_data(frame, stream) } pub fn poll_complete(&mut self, @@ -165,63 +124,47 @@ impl Send where B: Buf { self.prioritize.poll_complete(store, dst) } + /// Request capacity to send data + pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) + -> Result<(), ConnectionError> + { + self.prioritize.reserve_capacity(capacity, stream) + } + + pub fn poll_capacity(&mut self, stream: &mut store::Ptr) + -> Poll, ConnectionError> + { + if !stream.state.is_send_streaming() { + return Ok(Async::Ready(None)); + } + + if !stream.send_capacity_inc { + return Ok(Async::NotReady); + } + + stream.send_capacity_inc = false; + + Ok(Async::Ready(Some(self.capacity(stream)))) + } + + /// Current available stream send capacity + pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { + let available = stream.send_flow.available(); + let buffered = stream.buffered_send_data; + + if available <= buffered { + 0 + } else { + available - buffered + } + } + pub fn recv_connection_window_update(&mut self, frame: frame::WindowUpdate, store: &mut Store) -> Result<(), ConnectionError> { - self.prioritize.recv_window_update(frame)?; - - // Get the current connection capacity - let connection = self.prioritize.available_window(); - - // Walk each stream pending capacity and see if this change to the - // connection window can increase the advertised capacity of the stream. - // - // TODO: This is not a hugely efficient operation. It could be better to - // change the pending_capacity structure to a red-black tree. - // - self.pending_capacity.retain::( - store, - |stream| { - // Make sure that the stream is flagged as queued - debug_assert!(stream.is_pending_send_capacity); - - // Get the current unadvertised window - let unadvertised = stream.unadvertised_send_window; - - if unadvertised == 0 { - stream.is_pending_send_capacity = false; - return false; - } - - let effective_window_size = match stream.state.send_flow_control() { - Some(flow) => flow.effective_window_size(), - None => { - // The state transitioned and this stream is no longer - // waiting for updates - stream.is_pending_send_capacity = false; - return false; - } - }; - - if connection <= effective_window_size - unadvertised { - // The window is not increased, but we remain interested in - // updates in the future. - return true; - } - - if connection >= effective_window_size { - stream.unadvertised_send_window = 0; - } else { - stream.unadvertised_send_window = effective_window_size - connection; - } - - stream.notify_send(); - true - }); - - Ok(()) + self.prioritize.recv_connection_window_update(frame.size_increment(), store) } pub fn recv_stream_window_update(&mut self, @@ -229,60 +172,7 @@ impl Send where B: Buf { stream: &mut store::Ptr) -> Result<(), ConnectionError> { - let connection = self.prioritize.available_window(); - let unadvertised = stream.unadvertised_send_window; - - let effective_window_size = { - let mut flow = match stream.state.send_flow_control() { - Some(flow) => flow, - None => return Ok(()), - }; - - debug_assert!(unadvertised == 0 || connection == 0); - - // Expand the full window - flow.expand_window(frame.size_increment())?; - flow.effective_window_size() - }; - - if connection < effective_window_size { - stream.unadvertised_send_window = effective_window_size - connection; - - if !stream.is_pending_send_capacity { - stream.is_pending_send_capacity = true; - self.pending_capacity.push::(stream); - } - } - - if stream.unadvertised_send_window == frame.size_increment() + unadvertised { - // The entire window update is unadvertised, no need to do anything - // else - return Ok(()); - } - - stream.notify_send(); - - Ok(()) - } - - pub fn window_size(&mut self, stream: &mut Stream) -> usize { - if let Some(flow) = stream.state.send_flow_control() { - // Track the current task - stream.send_task = Some(task::current()); - - // We are observing the window, so apply the pending updates - flow.apply_window_update(); - - let mut window = flow.effective_window_size(); - - if stream.unadvertised_send_window > window { - return 0; - } - - return (window - stream.unadvertised_send_window) as usize; - } - - 0 + self.prioritize.recv_stream_window_update(frame.size_increment(), stream) } pub fn apply_remote_settings(&mut self, @@ -320,6 +210,8 @@ impl Send where B: Buf { store.for_each(|mut stream| { let stream = &mut *stream; + unimplemented!(); + /* if let Some(flow) = stream.state.send_flow_control() { flow.shrink_window(val); @@ -332,14 +224,18 @@ impl Send where B: Buf { unimplemented!(); } + */ }); } else if val > old_val { let inc = val - old_val; store.for_each(|mut stream| { + unimplemented!(); + /* if let Some(flow) = stream.state.send_flow_control() { unimplemented!(); } + */ }); } } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 3597bf4..45b4436 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -72,8 +72,7 @@ enum Inner { #[derive(Debug, Copy, Clone)] enum Peer { AwaitingHeaders, - /// Contains a FlowControl representing the _receiver_ of this this data stream. - Streaming(FlowControl), + Streaming, } #[derive(Debug, Copy, Clone)] @@ -84,8 +83,8 @@ enum Cause { impl State { /// Opens the send-half of a stream if it is not already open. - pub fn send_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> { - let local = Peer::streaming(sz); + pub fn send_open(&mut self, eos: bool) -> Result<(), ConnectionError> { + let local = Peer::Streaming; self.inner = match self.inner { Idle => { @@ -128,8 +127,8 @@ impl State { /// frame is received. /// /// Returns true if this transitions the state to Open - pub fn recv_open(&mut self, sz: WindowSize, eos: bool) -> Result { - let remote = Peer::streaming(sz); + pub fn recv_open(&mut self, eos: bool) -> Result { + let remote = Peer::Streaming; let mut initial = false; self.inner = match self.inner { @@ -254,6 +253,22 @@ impl State { } } + pub fn is_send_streaming(&self) -> bool { + match self.inner { + Open { local: Peer::Streaming, .. } => true, + HalfClosedRemote(Peer::Streaming) => true, + _ => false, + } + } + + pub fn is_recv_streaming(&self) -> bool { + match self.inner { + Open { remote: Peer::Streaming, .. } => true, + HalfClosedLocal(Peer::Streaming) => true, + _ => false, + } + } + pub fn is_closed(&self) -> bool { match self.inner { Closed(_) => true, @@ -268,22 +283,6 @@ impl State { } } - pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { - match self.inner { - Open { ref mut remote, .. } | - HalfClosedLocal(ref mut remote) => remote.flow_control(), - _ => None, - } - } - - pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> { - match self.inner { - Open { ref mut local, .. } | - HalfClosedRemote(ref mut local) => local.flow_control(), - _ => None, - } - } - pub fn ensure_recv_open(&self) -> Result<(), ConnectionError> { use std::io; @@ -311,16 +310,3 @@ impl Default for Peer { Peer::AwaitingHeaders } } - -impl Peer { - fn streaming(sz: WindowSize) -> Peer { - Peer::Streaming(FlowControl::new(sz)) - } - - fn flow_control(&mut self) -> Option<&mut FlowControl> { - match *self { - Streaming(ref mut flow) => Some(flow), - _ => None, - } - } -} diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 809f344..7c04065 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -24,9 +24,9 @@ pub(super) struct Ptr<'a, B: 'a> { pub(super) struct Key(usize); #[derive(Debug)] -pub(super) struct List { +pub(super) struct Queue { indices: Option, - _p: PhantomData, + _p: PhantomData<(B, N)>, } pub(super) trait Next { @@ -35,6 +35,10 @@ pub(super) trait Next { fn set_next(stream: &mut Stream, key: Option); fn take_next(stream: &mut Stream) -> Option; + + fn is_queued(stream: &Stream) -> bool; + + fn set_queued(stream: &mut Stream, val: bool); } /// A linked list @@ -142,11 +146,13 @@ impl ops::IndexMut for Store { } } -// ===== impl List ===== +// ===== impl Queue ===== -impl List { +impl Queue + where N: Next, +{ pub fn new() -> Self { - List { + Queue { indices: None, _p: PhantomData, } @@ -157,15 +163,22 @@ impl List { } pub fn take(&mut self) -> Self { - List { + Queue { indices: self.indices.take(), _p: PhantomData, } } - pub fn push(&mut self, stream: &mut store::Ptr) - where N: Next, - { + /// Queue the stream. + /// + /// If the stream is already contained by the list, return `false`. + pub fn push(&mut self, stream: &mut store::Ptr) -> bool { + if N::is_queued(stream) { + return false; + } + + N::set_queued(stream, true); + // The next pointer shouldn't be set debug_assert!(N::next(stream).is_none()); @@ -186,10 +199,11 @@ impl List { }); } } + + true } - pub fn pop<'a, N>(&mut self, store: &'a mut Store) -> Option> - where N: Next, + pub fn pop<'a>(&mut self, store: &'a mut Store) -> Option> { if let Some(mut idxs) = self.indices { let mut stream = store.resolve(idxs.head); @@ -202,63 +216,14 @@ impl List { self.indices = Some(idxs); } + debug_assert!(N::is_queued(&*stream)); + N::set_queued(&mut *stream, false); + return Some(stream); } None } - - pub fn retain(&mut self, store: &mut Store, mut f: F) - where N: Next, - F: FnMut(&mut Stream) -> bool, - { - if let Some(mut idxs) = self.indices { - let mut prev = None; - let mut curr = idxs.head; - - loop { - if f(&mut store[curr]) { - // Element is retained, walk to the next - if let Some(next) = N::next(&mut store[curr]) { - prev = Some(curr); - curr = next; - } else { - // Tail - break; - } - } else { - // Element is dropped - if let Some(prev) = prev { - let next = N::take_next(&mut store[curr]); - N::set_next(&mut store[prev], next); - - match next { - Some(next) => { - curr = next; - } - None => { - // current is last element, but guaranteed to not be the - // only one - idxs.tail = prev; - break; - } - } - } else { - if let Some(next) = N::take_next(&mut store[curr]) { - curr = next; - idxs.head = next; - } else { - // Only element - self.indices = None; - return; - } - } - } - } - - self.indices = Some(idxs); - } - } } // ===== impl Ptr ===== @@ -327,169 +292,3 @@ impl<'a, B> VacantEntry<'a, B> { Key(key) } } - -#[cfg(test)] -mod test { - use super::*; - use super::stream::Next; - - #[test] - fn test_retain_empty_list_and_store() { - let mut store = new_store(); - let mut list = List::new(); - - - retain(&mut store, &mut list, |_| panic!()); - - assert!(store.slab.is_empty()); - assert!(list.is_empty()); - } - - #[test] - fn test_retain_one_item() { - let mut store = new_store(); - let mut list = list_with(&mut store, &[1]); - - // Keep - retain(&mut store, &mut list, |s| true); - - let ids = get(&store, &list); - assert_eq!(ids, &[1]); - - // Drop - retain(&mut store, &mut list, |s| false); - - assert!(list.is_empty()); - assert_eq!(1, store.slab.len()); - } - - #[test] - fn test_retain_none_long_list() { - let mut expect = vec![1, 2, 3, 4, 5]; - - let mut store = new_store(); - let mut list = list_with(&mut store, &expect); - - retain(&mut store, &mut list, |s| { - assert_eq!(s.id, expect.remove(0)); - false - }); - - assert!(list.is_empty()); - } - - #[test] - fn test_retain_last_elem_long_list() { - let mut expect = vec![1, 2, 3, 4, 5]; - - let mut store = new_store(); - let mut list = list_with(&mut store, &expect); - - retain(&mut store, &mut list, |s| { - if expect.len() > 1 { - assert_eq!(s.id, expect.remove(0)); - false - } else { - assert_eq!(s.id, 5); - true - } - }); - - let ids = get(&store, &list); - assert_eq!(ids, &[5]); - } - - #[test] - fn test_retain_first_elem_long_list() { - let mut expect = vec![1, 2, 3, 4, 5]; - - let mut store = new_store(); - let mut list = list_with(&mut store, &expect); - - retain(&mut store, &mut list, |s| { - let e = expect.remove(0); - assert_eq!(s.id, e); - e == 1 - }); - - let ids = get(&store, &list); - assert_eq!(ids, &[1]); - } - - #[test] - fn test_drop_middle_elem_long_list() { - let mut expect = vec![1, 2, 3, 4, 5]; - - let mut store = new_store(); - let mut list = list_with(&mut store, &expect); - - retain(&mut store, &mut list, |s| { - let e = expect.remove(0); - assert_eq!(s.id, e); - e != 3 - }); - - let ids = get(&store, &list); - assert_eq!(ids, &[1, 2, 4, 5]); - } - - #[test] - fn test_drop_two_middle_elem_long_list() { - let mut expect = vec![1, 2, 3, 4, 5]; - - let mut store = new_store(); - let mut list = list_with(&mut store, &expect); - - retain(&mut store, &mut list, |s| { - let e = expect.remove(0); - assert_eq!(s.id, e); - e != 3 - }); - - let ids = get(&store, &list); - assert_eq!(ids, &[1, 2, 4, 5]); - } - - fn new_store() -> Store<()> { - Store::new() - } - - fn push(store: &mut Store<()>, list: &mut List<()>, id: u32) { - let id = StreamId::from(id); - let mut ptr = store.insert(id, Stream::new(id)); - list.push::(&mut ptr); - } - - fn list_with(store: &mut Store<()>, ids: &[u32]) -> List<()> { - let mut list = List::new(); - - for &id in ids { - push(store, &mut list, id); - } - - list - } - - fn pop(store: &mut Store<()>, list: &mut List<()>) -> Option { - list.pop::(store).map(|p| p.id) - } - - fn retain(store: &mut Store<()>, list: &mut List<()>, f: F) - where F: FnMut(&mut Stream<()>) -> bool - { - list.retain::(store, f); - } - - fn get(store: &Store<()>, list: &List<()>) -> Vec { - let mut dst = vec![]; - - let mut curr = list.indices.map(|i| i.head); - - while let Some(c) = curr { - dst.push(store[c].id); - curr = store[c].next; - } - - dst - } -} diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 6f328c7..7a6790f 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -8,11 +8,28 @@ pub(super) struct Stream { /// Current state of the stream pub state: State, - /// Frames pending for this stream to read - pub pending_recv: buffer::Deque, + /// Next node in the `Stream` linked list. + /// + /// This field is used in different linked lists depending on the stream + /// state. First, it is used as part of the linked list of streams waiting + /// to be accepted (either by a server or by a client as a push promise). + /// Once the stream is accepted, this is used for the linked list of streams + /// waiting to flush prioritized frames to the socket. + pub next: Option, - /// Task tracking receiving frames - pub recv_task: Option, + /// Set to true when the stream is queued + pub is_queued: bool, + + // ===== Fields related to sending ===== + + /// Send data flow control + pub send_flow: FlowControl, + + /// Amount of send capacity that has been requested, but not yet allocated. + pub requested_send_capacity: WindowSize, + + /// Amount of data buffered at the prioritization layer. + pub buffered_send_data: WindowSize, /// Task tracking additional send capacity (i.e. window updates). pub send_task: Option, @@ -20,63 +37,75 @@ pub(super) struct Stream { /// Frames pending for this stream being sent to the socket pub pending_send: buffer::Deque, - /// Next node in the `Stream` linked list. - /// - /// This field is used in different linked lists depending on the stream - /// state. - pub next: Option, - /// Next node in the linked list of streams waiting for additional /// connection level capacity. - pub next_capacity: Option, + pub next_pending_send_capacity: Option, /// True if the stream is waiting for outbound connection capacity pub is_pending_send_capacity: bool, + /// Set to true when the send capacity has been incremented + pub send_capacity_inc: bool, + + // ===== Fields related to receiving ===== + + /// Receive data flow control + pub recv_flow: FlowControl, + + /// Frames pending for this stream to read + pub pending_recv: buffer::Deque, + + /// Task tracking receiving frames + pub recv_task: Option, + /// The stream's pending push promises - pub pending_push_promises: store::List, - - /// True if the stream is currently pending send - pub is_pending_send: bool, - - /// A stream's capacity is never advertised past the connection's capacity. - /// This value represents the amount of the stream window that has been - /// temporarily withheld. - pub unadvertised_send_window: WindowSize, + pub pending_push_promises: store::Queue, } #[derive(Debug)] pub(super) struct Next; #[derive(Debug)] -pub(super) struct NextCapacity; +pub(super) struct NextSendCapacity; impl Stream { - pub fn new(id: StreamId) -> Stream { + pub fn new(id: StreamId) -> Stream + { Stream { id, state: State::default(), - pending_recv: buffer::Deque::new(), - recv_task: None, + next: None, + is_queued: false, + + // ===== Fields related to sending ===== + + send_flow: FlowControl::new(), + requested_send_capacity: 0, + buffered_send_data: 0, send_task: None, pending_send: buffer::Deque::new(), - next: None, - next_capacity: None, is_pending_send_capacity: false, - pending_push_promises: store::List::new(), - is_pending_send: false, - unadvertised_send_window: 0, + next_pending_send_capacity: None, + send_capacity_inc: false, + + // ===== Fields related to receiving ===== + + recv_flow: FlowControl::new(), + pending_recv: buffer::Deque::new(), + recv_task: None, + pending_push_promises: store::Queue::new(), } } - // TODO: remove? - pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> { - self.state.send_flow_control() - } + pub fn assign_capacity(&mut self, capacity: WindowSize) { + debug_assert!(capacity > 0); + self.send_capacity_inc = true; + self.send_flow.assign_capacity(capacity); - // TODO: remove? - pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { - self.state.recv_flow_control() + // Only notify if the capacity exceeds the amount of buffered data + if self.send_flow.available() > self.buffered_send_data { + self.notify_send(); + } } pub fn notify_send(&mut self) { @@ -104,18 +133,34 @@ impl store::Next for Next { fn take_next(stream: &mut Stream) -> Option { stream.next.take() } + + fn is_queued(stream: &Stream) -> bool { + stream.is_queued + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_queued = val; + } } -impl store::Next for NextCapacity { +impl store::Next for NextSendCapacity { fn next(stream: &Stream) -> Option { - stream.next_capacity + stream.next_pending_send_capacity } fn set_next(stream: &mut Stream, key: Option) { - stream.next_capacity = key; + stream.next_pending_send_capacity = key; } fn take_next(stream: &mut Stream) -> Option { - stream.next_capacity.take() + stream.next_pending_send_capacity.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_send_capacity + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_send_capacity = val; } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 29b31d9..bb2a3c5 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -382,13 +382,36 @@ impl StreamRef Ok(chunk.into()) } - /// Returns the current window size - pub fn window_size(&mut self) -> usize { + /// Request capacity to send data + pub fn reserve_capacity(&mut self, capacity: WindowSize) + -> Result<(), ConnectionError> + { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let mut stream = me.store.resolve(self.key); - me.actions.send.window_size(&mut stream) + + me.actions.send.reserve_capacity(capacity, &mut stream) + } + + /// Returns the stream's current send capacity. + pub fn capacity(&self) -> WindowSize { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.send.capacity(&mut stream) + } + + /// Request to be notified when the stream's capacity increases + pub fn poll_capacity(&mut self) -> Poll, ConnectionError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.send.poll_capacity(&mut stream) } } diff --git a/src/server.rs b/src/server.rs index 422c4f4..7d6b79e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,6 @@ use {frame, ConnectionError, StreamId}; use {Body, Chunk}; -use proto::{self, Connection}; +use proto::{self, Connection, WindowSize}; use error::Reason::*; use http::{self, Request, Response}; @@ -152,20 +152,32 @@ impl fmt::Debug for Server // ===== impl Stream ===== impl Stream { - /// Returns the current window size. - /// - /// This function also registers interest changes. The current task will be - /// notified when the window size is *increased*. - pub fn window_size(&mut self) -> usize { - self.inner.window_size() - } - + /// Send a response pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool) -> Result<(), ConnectionError> { self.inner.send_response(response, end_of_stream) } + /// Request capacity to send data + pub fn reserve_capacity(&mut self, capacity: usize) + -> Result<(), ConnectionError> + { + // TODO: Check for overflow + self.inner.reserve_capacity(capacity as WindowSize) + } + + /// Returns the stream's current send capacity. + pub fn capacity(&self) -> usize { + self.inner.capacity() as usize + } + + /// Request to be notified when the stream's capacity increases + pub fn poll_capacity(&mut self) -> Poll, ConnectionError> { + let res = try_ready!(self.inner.poll_capacity()); + Ok(Async::Ready(res.map(|v| v as usize))) + } + /// Send a single data frame pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> @@ -208,25 +220,33 @@ impl Future for Send loop { if self.buf.is_none() { + // Get a chunk to send to the H2 stream self.buf = try_ready!(self.src.poll()); } match self.buf.take() { Some(mut buf) => { - let cap = self.dst.as_mut().unwrap().window_size(); + let dst = self.dst.as_mut().unwrap(); + + // Ask for the amount of capacity needed + dst.reserve_capacity(buf.len()); + + let cap = dst.capacity(); if cap == 0 { self.buf = Some(buf); - return Ok(Async::NotReady); - } if cap >= buf.len() { - self.dst.as_mut().unwrap().send_data(buf, false)?; - } else { - let chunk = buf.split_to(cap); - self.buf = Some(buf); - self.dst.as_mut().unwrap().send_data(chunk, false)?; - - return Ok(Async::NotReady); + // TODO: This seems kind of lame :( + try_ready!(dst.poll_capacity()); + continue; } + + let chunk = buf.split_to(cap); + + if !buf.is_empty() { + self.buf = Some(buf); + } + + dst.send_data(chunk, false)?; } None => { // TODO: It would be nice to not have to send an extra diff --git a/tests/flow_control.rs b/tests/flow_control.rs index b561c22..00a2ef6 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -1,3 +1,53 @@ +pub mod support; +use support::*; + +// In this case, the stream & connection both have capacity, but capacity is not +// explicitly requested. +#[test] +fn send_data_without_requesting_capacity() { + let _ = ::env_logger::init(); + + let payload = [0; 1024]; + + let mock = mock_io::Builder::new() + .handshake() + .write(&[ + // POST / + 0, 0, 16, 1, 4, 0, 0, 0, 1, 131, 135, 65, 139, 157, 41, + 172, 75, 143, 168, 233, 25, 151, 33, 233, 132, + ]) + .write(&[ + // DATA + 0, 4, 0, 0, 1, 0, 0, 0, 1, + ]) + .write(&payload[..]) + .write(frames::SETTINGS_ACK) + // Read response + .read(&[0, 0, 1, 1, 5, 0, 0, 0, 1, 0x89]) + .build(); + + let mut h2 = Client::handshake(mock) + .wait().unwrap(); + + let request = Request::builder() + .method(method::POST) + .uri("https://http2.akamai.com/") + .body(()).unwrap(); + + let mut stream = h2.request(request, false).unwrap(); + + // The capacity should be immediately allocated + assert_eq!(stream.capacity(), 0); + + // Send the data + stream.send_data(payload[..].into(), true).unwrap(); + + // Get the response + let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap(); + assert_eq!(resp.status(), status::NO_CONTENT); + + h2.wait().unwrap(); +} #[test] #[ignore] diff --git a/tests/prioritization.rs b/tests/prioritization.rs index b561bf5..bc71834 100644 --- a/tests/prioritization.rs +++ b/tests/prioritization.rs @@ -34,6 +34,12 @@ fn single_stream_send_large_body() { let mut stream = h2.request(request, false).unwrap(); + // Reserve capacity to send the payload + stream.reserve_capacity(payload.len()).unwrap(); + + // The capacity should be immediately allocated + assert_eq!(stream.capacity(), payload.len()); + // Send the data stream.send_data(payload[..].into(), true).unwrap(); @@ -82,6 +88,11 @@ fn single_stream_send_extra_large_body_multi_frames_one_buffer() { let mut stream = h2.request(request, false).unwrap(); + stream.reserve_capacity(payload.len()).unwrap(); + + // The capacity should be immediately allocated + assert_eq!(stream.capacity(), payload.len()); + // Send the data stream.send_data(payload.into(), true).unwrap(); @@ -142,6 +153,11 @@ fn single_stream_send_extra_large_body_multi_frames_multi_buffer() { let mut stream = h2.request(request, false).unwrap(); + stream.reserve_capacity(payload.len()).unwrap(); + + // The capacity should be immediately allocated + assert_eq!(stream.capacity(), payload.len()); + // Send the data stream.send_data(payload.into(), true).unwrap(); diff --git a/tests/stream_states.rs b/tests/stream_states.rs index 6ef557a..c127e9d 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -74,6 +74,11 @@ fn send_recv_data() { info!("sending request"); let mut stream = h2.request(request, false).unwrap(); + // Reserve send capacity + stream.reserve_capacity(5).unwrap(); + + assert_eq!(stream.capacity(), 5); + // Send the data stream.send_data("hello", true).unwrap();