Change ReserveCapacity to expanded FlowControl type (#423)

- Adds `FlowControl::available_capacity` method.
- Adds `FlowControl::used_capacity` method.
This commit is contained in:
Sean McArthur
2019-10-08 11:28:15 -07:00
committed by GitHub
parent 3cfcab016e
commit 86e53054a6
7 changed files with 98 additions and 76 deletions

View File

@@ -97,20 +97,20 @@
//!
//! println!("Received response: {:?}", head);
//!
//! // The `release_capacity` handle allows the caller to manage
//! // The `flow_control` handle allows the caller to manage
//! // flow control.
//! //
//! // Whenever data is received, the caller is responsible for
//! // releasing capacity back to the server once it has freed
//! // the data from memory.
//! let mut release_capacity = body.release_capacity().clone();
//! let mut flow_control = body.flow_control().clone();
//!
//! while let Some(chunk) = body.data().await {
//! let chunk = chunk?;
//! println!("RX: {:?}", chunk);
//!
//! // Let the server send more data.
//! let _ = release_capacity.release_capacity(chunk.len());
//! let _ = flow_control.release_capacity(chunk.len());
//! }
//!
//! Ok(())
@@ -138,7 +138,7 @@
use crate::codec::{Codec, RecvError, SendError, UserError};
use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
use crate::proto;
use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream};
use crate::{FlowControl, PingPong, RecvStream, SendStream};
use bytes::{Bytes, IntoBuf};
use http::{uri, HeaderMap, Method, Request, Response, Version};
@@ -628,11 +628,11 @@ impl Builder {
/// flow control for received data.
///
/// The initial window of a stream is used as part of flow control. For more
/// details, see [`ReleaseCapacity`].
/// details, see [`FlowControl`].
///
/// The default value is 65,535.
///
/// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
/// [`FlowControl`]: ../struct.FlowControl.html
///
/// # Examples
///
@@ -663,11 +663,11 @@ impl Builder {
/// for received data.
///
/// The initial window of a connection is used as part of flow control. For more details,
/// see [`ReleaseCapacity`].
/// see [`FlowControl`].
///
/// The default value is 65,535.
///
/// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
/// [`FlowControl`]: ../struct.FlowControl.html
///
/// # Examples
///
@@ -1187,14 +1187,14 @@ where
///
/// If `size` is less than the current value, nothing will happen
/// immediately. However, as window capacity is released by
/// [`ReleaseCapacity`] instances, no `WINDOW_UPDATE` frames will be sent
/// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
/// out until the number of "in flight" bytes drops below `size`.
///
/// The default value is 65,535.
///
/// See [`ReleaseCapacity`] documentation for more details.
/// See [`FlowControl`] documentation for more details.
///
/// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
/// [`FlowControl`]: ../struct.FlowControl.html
/// [library level]: ../index.html#flow-control
pub fn set_target_window_size(&mut self, size: u32) {
assert!(size <= proto::MAX_WINDOW_SIZE);
@@ -1263,7 +1263,7 @@ impl Future for ResponseFuture {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
let body = RecvStream::new(ReleaseCapacity::new(self.inner.clone()));
let body = RecvStream::new(FlowControl::new(self.inner.clone()));
Poll::Ready(Ok(Response::from_parts(parts, body)))
}

View File

@@ -62,7 +62,7 @@
//! There is also a **connection level** window governing data sent across all
//! streams.
//!
//! Managing flow control for inbound data is done through [`ReleaseCapacity`].
//! Managing flow control for inbound data is done through [`FlowControl`].
//! Managing flow control for outbound data is done through [`SendStream`]. See
//! the struct level documentation for those two types for more details.
//!
@@ -71,7 +71,7 @@
//! [`client`]: client/index.html
//! [`server`]: server/index.html
//! [Flow control]: http://httpwg.org/specs/rfc7540.html#FlowControl
//! [`ReleaseCapacity`]: struct.ReleaseCapacity.html
//! [`FlowControl`]: struct.FlowControl.html
//! [`SendStream`]: struct.SendStream.html
//! [Starting HTTP/2]: http://httpwg.org/specs/rfc7540.html#starting
//! [upgrade]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism
@@ -118,7 +118,7 @@ pub mod server;
mod share;
pub use crate::error::{Error, Reason};
pub use crate::share::{Ping, PingPong, Pong, RecvStream, ReleaseCapacity, SendStream, StreamId};
pub use crate::share::{FlowControl, Ping, PingPong, Pong, RecvStream, SendStream, StreamId};
#[cfg(feature = "unstable")]
pub use codec::{Codec, RecvError, SendError, UserError};

View File

@@ -266,3 +266,9 @@ impl fmt::Display for Window {
fmt::Display::fmt(&self.0, f)
}
}
impl From<Window> for isize {
fn from(w: Window) -> isize {
w.0 as isize
}
}

View File

@@ -1187,6 +1187,22 @@ impl OpaqueStreamRef {
me.actions.recv.poll_trailers(cx, &mut stream)
}
pub(crate) fn available_recv_capacity(&self) -> isize {
let me = self.inner.lock().unwrap();
let me = &*me;
let stream = &me.store[self.key];
stream.recv_flow.available().into()
}
pub(crate) fn used_recv_capacity(&self) -> WindowSize {
let me = self.inner.lock().unwrap();
let me = &*me;
let stream = &me.store[self.key];
stream.in_flight_recv_data
}
/// Releases recv capacity back to the peer. This may result in sending
/// WINDOW_UPDATE frames on both the stream and connection.
pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {

View File

@@ -118,7 +118,7 @@
use crate::codec::{Codec, RecvError, UserError};
use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId};
use crate::proto::{self, Config, Prioritized};
use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream};
use crate::{FlowControl, PingPong, RecvStream, SendStream};
use bytes::{Buf, Bytes, IntoBuf};
use http::{HeaderMap, Request, Response};
@@ -408,7 +408,7 @@ where
if let Some(inner) = self.connection.next_incoming() {
log::trace!("received incoming");
let (head, _) = inner.take_request().into_parts();
let body = RecvStream::new(ReleaseCapacity::new(inner.clone_to_opaque()));
let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque()));
let request = Request::from_parts(head, body);
let respond = SendResponse { inner };
@@ -427,14 +427,14 @@ where
///
/// If `size` is less than the current value, nothing will happen
/// immediately. However, as window capacity is released by
/// [`ReleaseCapacity`] instances, no `WINDOW_UPDATE` frames will be sent
/// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
/// out until the number of "in flight" bytes drops below `size`.
///
/// The default value is 65,535.
///
/// See [`ReleaseCapacity`] documentation for more details.
/// See [`FlowControl`] documentation for more details.
///
/// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
/// [`FlowControl`]: ../struct.FlowControl.html
/// [library level]: ../index.html#flow-control
pub fn set_target_window_size(&mut self, size: u32) {
assert!(size <= proto::MAX_WINDOW_SIZE);
@@ -591,11 +591,11 @@ impl Builder {
/// flow control for received data.
///
/// The initial window of a stream is used as part of flow control. For more
/// details, see [`ReleaseCapacity`].
/// details, see [`FlowControl`].
///
/// The default value is 65,535.
///
/// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
/// [`FlowControl`]: ../struct.FlowControl.html
///
/// # Examples
///
@@ -625,11 +625,11 @@ impl Builder {
/// for received data.
///
/// The initial window of a connection is used as part of flow control. For more details,
/// see [`ReleaseCapacity`].
/// see [`FlowControl`].
///
/// The default value is 65,535.
///
/// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html
/// [`FlowControl`]: ../struct.FlowControl.html
///
/// # Examples
///

View File

@@ -123,7 +123,7 @@ pub struct StreamId(u32);
/// control.
///
/// See method level documentation for more details on receiving data. See
/// [`ReleaseCapacity`] for more details on inbound flow control.
/// [`FlowControl`] for more details on inbound flow control.
///
/// Note that this type implements [`Stream`], yielding the received data frames.
/// When this implementation is used, the capacity is immediately released when
@@ -132,11 +132,11 @@ pub struct StreamId(u32);
///
/// [`client::ResponseFuture`]: client/struct.ResponseFuture.html
/// [`server::Connection`]: server/struct.Connection.html
/// [`ReleaseCapacity`]: struct.ReleaseCapacity.html
/// [`FlowControl`]: struct.FlowControl.html
/// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html
#[must_use = "streams do nothing unless polled"]
pub struct RecvStream {
inner: ReleaseCapacity,
inner: FlowControl,
}
/// A handle to release window capacity to a remote stream.
@@ -190,9 +190,9 @@ pub struct RecvStream {
/// data.
///
/// [flow control]: ../index.html#flow-control
/// [`release_capacity`]: struct.ReleaseCapacity.html#method.release_capacity
#[derive(Debug)]
pub struct ReleaseCapacity {
/// [`release_capacity`]: struct.FlowControl.html#method.release_capacity
#[derive(Clone, Debug)]
pub struct FlowControl {
inner: proto::OpaqueStreamRef,
}
@@ -392,25 +392,10 @@ impl StreamId {
// ===== impl RecvStream =====
impl RecvStream {
pub(crate) fn new(inner: ReleaseCapacity) -> Self {
pub(crate) fn new(inner: FlowControl) -> Self {
RecvStream { inner }
}
/// Returns true if the receive half has reached the end of stream.
///
/// A return value of `true` means that calls to `poll` and `poll_trailers`
/// will both return `None`.
pub fn is_end_stream(&self) -> bool {
self.inner.inner.is_end_stream()
}
/// Get a mutable reference to this streams `ReleaseCapacity`.
///
/// It can be used immediately, or cloned to be used later.
pub fn release_capacity(&mut self) -> &mut ReleaseCapacity {
&mut self.inner
}
/// Get the next data frame.
pub async fn data(&mut self) -> Option<Result<Bytes, crate::Error>> {
futures_util::future::poll_fn(move |cx| self.poll_data(cx)).await
@@ -438,6 +423,21 @@ impl RecvStream {
}
}
/// Returns true if the receive half has reached the end of stream.
///
/// A return value of `true` means that calls to `poll` and `poll_trailers`
/// will both return `None`.
pub fn is_end_stream(&self) -> bool {
self.inner.inner.is_end_stream()
}
/// Get a mutable reference to this stream's `FlowControl`.
///
/// It can be used immediately, or cloned to be used later.
pub fn flow_control(&mut self) -> &mut FlowControl {
&mut self.inner
}
/// Returns the stream ID of this stream.
///
/// # Panics
@@ -476,23 +476,31 @@ impl Drop for RecvStream {
}
}
// ===== impl ReleaseCapacity =====
// ===== impl FlowControl =====
impl ReleaseCapacity {
impl FlowControl {
pub(crate) fn new(inner: proto::OpaqueStreamRef) -> Self {
ReleaseCapacity { inner }
FlowControl { inner }
}
/// Returns the stream ID of the stream whose capacity will
/// be released by this `ReleaseCapacity`.
///
/// # Panics
///
/// If the lock on the stream store has been poisoned.
/// be released by this `FlowControl`.
pub fn stream_id(&self) -> StreamId {
StreamId::from_internal(self.inner.stream_id())
}
/// Get the current available capacity of data this stream *could* receive.
pub fn available_capacity(&self) -> isize {
self.inner.available_recv_capacity()
}
/// Get the currently *used* capacity for this stream.
///
/// This is the amount of bytes that can be released back to the remote.
pub fn used_capacity(&self) -> usize {
self.inner.used_recv_capacity() as usize
}
/// Release window capacity back to remote stream.
///
/// This releases capacity back to the stream level and the connection level
@@ -500,16 +508,15 @@ impl ReleaseCapacity {
///
/// See [struct level] documentation for more details.
///
/// # Panics
/// # Errors
///
/// This function panics if increasing the receive window size by `sz` would
/// result in a window size greater than the target window size set by
/// [`set_target_window_size`]. In other words, the caller cannot release
/// more capacity than data has been received. If 1024 bytes of data have
/// been received, at most 1024 bytes can be released.
/// This function errors if increasing the receive window size by `sz` would
/// result in a window size greater than the target window size. In other
/// words, the caller cannot release more capacity than data has been
/// received. If 1024 bytes of data have been received, at most 1024 bytes
/// can be released.
///
/// [struct level]: #
/// [`set_target_window_size`]: server/struct.Server.html#method.set_target_window_size
pub fn release_capacity(&mut self, sz: usize) -> Result<(), crate::Error> {
if sz > proto::MAX_WINDOW_SIZE as usize {
return Err(UserError::ReleaseCapacityTooBig.into());
@@ -520,13 +527,6 @@ impl ReleaseCapacity {
}
}
impl Clone for ReleaseCapacity {
fn clone(&self) -> Self {
let inner = self.inner.clone();
ReleaseCapacity { inner }
}
}
// ===== impl PingPong =====
impl PingPong {

View File

@@ -101,9 +101,7 @@ async fn release_capacity_sends_window_update() {
let buf = body.next().await.unwrap().unwrap();
assert_eq!(buf.len(), payload_len);
body.release_capacity()
.release_capacity(buf.len() * 2)
.unwrap();
body.flow_control().release_capacity(buf.len() * 2).unwrap();
let buf = body.next().await.unwrap().unwrap();
assert_eq!(buf.len(), payload_len);
@@ -158,7 +156,7 @@ async fn release_capacity_of_small_amount_does_not_send_window_update() {
let buf = body.next().await.unwrap().unwrap();
// read the small body and then release it
assert_eq!(buf.len(), 16);
body.release_capacity().release_capacity(buf.len()).unwrap();
body.flow_control().release_capacity(buf.len()).unwrap();
let buf = body.next().await;
assert!(buf.is_none());
};
@@ -342,7 +340,7 @@ async fn stream_error_release_connection_capacity() {
.expect("response");
assert_eq!(resp.status(), StatusCode::OK);
let mut body = resp.into_parts().1;
let mut cap = body.release_capacity().clone();
let mut cap = body.flow_control().clone();
let to_release = 16_384 * 2;
let mut should_recv_bytes = to_release;
let mut should_recv_frames = 2usize;
@@ -787,7 +785,7 @@ async fn connection_notified_on_released_capacity() {
idle_ms(100).await;
// Release the capacity
a.release_capacity().release_capacity(16_384).unwrap();
a.flow_control().release_capacity(16_384).unwrap();
th1_rx.await.unwrap();
th2_rx.await.unwrap();
@@ -1156,7 +1154,7 @@ async fn decrease_target_window_size() {
let res = conn.drive(resp).await.expect("response");
conn.set_target_window_size(16_384);
let mut body = res.into_parts().1;
let mut cap = body.release_capacity().clone();
let mut cap = body.flow_control().clone();
let bytes = conn.drive(body.try_concat()).await.expect("concat");
assert_eq!(bytes.len(), 65_535);
@@ -1341,8 +1339,10 @@ async fn client_decrease_initial_window_size() {
.await;
// stream 5 went negative, so release back to 0
assert_eq!(body5.flow_control().available_capacity(), -100);
assert_eq!(body5.flow_control().used_capacity(), 100);
body5
.release_capacity()
.flow_control()
.release_capacity(100)
.expect("release_capacity");
conn.drive(yield_once()).await;