From 86e53054a6fa37890f18bce4011f060c3026e2fc Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 8 Oct 2019 11:28:15 -0700 Subject: [PATCH] Change `ReserveCapacity` to expanded `FlowControl` type (#423) - Adds `FlowControl::available_capacity` method. - Adds `FlowControl::used_capacity` method. --- src/client.rs | 24 ++++---- src/lib.rs | 6 +- src/proto/streams/flow_control.rs | 6 ++ src/proto/streams/streams.rs | 16 +++++ src/server.rs | 18 +++--- src/share.rs | 88 ++++++++++++++-------------- tests/h2-tests/tests/flow_control.rs | 16 ++--- 7 files changed, 98 insertions(+), 76 deletions(-) diff --git a/src/client.rs b/src/client.rs index e5c535e..8c8cc44 100644 --- a/src/client.rs +++ b/src/client.rs @@ -97,20 +97,20 @@ //! //! println!("Received response: {:?}", head); //! -//! // The `release_capacity` handle allows the caller to manage +//! // The `flow_control` handle allows the caller to manage //! // flow control. //! // //! // Whenever data is received, the caller is responsible for //! // releasing capacity back to the server once it has freed //! // the data from memory. -//! let mut release_capacity = body.release_capacity().clone(); +//! let mut flow_control = body.flow_control().clone(); //! //! while let Some(chunk) = body.data().await { //! let chunk = chunk?; //! println!("RX: {:?}", chunk); //! //! // Let the server send more data. -//! let _ = release_capacity.release_capacity(chunk.len()); +//! let _ = flow_control.release_capacity(chunk.len()); //! } //! //! Ok(()) @@ -138,7 +138,7 @@ use crate::codec::{Codec, RecvError, SendError, UserError}; use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId}; use crate::proto; -use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream}; +use crate::{FlowControl, PingPong, RecvStream, SendStream}; use bytes::{Bytes, IntoBuf}; use http::{uri, HeaderMap, Method, Request, Response, Version}; @@ -628,11 +628,11 @@ impl Builder { /// flow control for received data. /// /// The initial window of a stream is used as part of flow control. For more - /// details, see [`ReleaseCapacity`]. + /// details, see [`FlowControl`]. /// /// The default value is 65,535. /// - /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html + /// [`FlowControl`]: ../struct.FlowControl.html /// /// # Examples /// @@ -663,11 +663,11 @@ impl Builder { /// for received data. /// /// The initial window of a connection is used as part of flow control. For more details, - /// see [`ReleaseCapacity`]. + /// see [`FlowControl`]. /// /// The default value is 65,535. /// - /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html + /// [`FlowControl`]: ../struct.FlowControl.html /// /// # Examples /// @@ -1187,14 +1187,14 @@ where /// /// If `size` is less than the current value, nothing will happen /// immediately. However, as window capacity is released by - /// [`ReleaseCapacity`] instances, no `WINDOW_UPDATE` frames will be sent + /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent /// out until the number of "in flight" bytes drops below `size`. /// /// The default value is 65,535. /// - /// See [`ReleaseCapacity`] documentation for more details. + /// See [`FlowControl`] documentation for more details. /// - /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html + /// [`FlowControl`]: ../struct.FlowControl.html /// [library level]: ../index.html#flow-control pub fn set_target_window_size(&mut self, size: u32) { assert!(size <= proto::MAX_WINDOW_SIZE); @@ -1263,7 +1263,7 @@ impl Future for ResponseFuture { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts(); - let body = RecvStream::new(ReleaseCapacity::new(self.inner.clone())); + let body = RecvStream::new(FlowControl::new(self.inner.clone())); Poll::Ready(Ok(Response::from_parts(parts, body))) } diff --git a/src/lib.rs b/src/lib.rs index de17e6f..8601262 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,7 +62,7 @@ //! There is also a **connection level** window governing data sent across all //! streams. //! -//! Managing flow control for inbound data is done through [`ReleaseCapacity`]. +//! Managing flow control for inbound data is done through [`FlowControl`]. //! Managing flow control for outbound data is done through [`SendStream`]. See //! the struct level documentation for those two types for more details. //! @@ -71,7 +71,7 @@ //! [`client`]: client/index.html //! [`server`]: server/index.html //! [Flow control]: http://httpwg.org/specs/rfc7540.html#FlowControl -//! [`ReleaseCapacity`]: struct.ReleaseCapacity.html +//! [`FlowControl`]: struct.FlowControl.html //! [`SendStream`]: struct.SendStream.html //! [Starting HTTP/2]: http://httpwg.org/specs/rfc7540.html#starting //! [upgrade]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Protocol_upgrade_mechanism @@ -118,7 +118,7 @@ pub mod server; mod share; pub use crate::error::{Error, Reason}; -pub use crate::share::{Ping, PingPong, Pong, RecvStream, ReleaseCapacity, SendStream, StreamId}; +pub use crate::share::{FlowControl, Ping, PingPong, Pong, RecvStream, SendStream, StreamId}; #[cfg(feature = "unstable")] pub use codec::{Codec, RecvError, SendError, UserError}; diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index 6bb1966..f3cea16 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -266,3 +266,9 @@ impl fmt::Display for Window { fmt::Display::fmt(&self.0, f) } } + +impl From for isize { + fn from(w: Window) -> isize { + w.0 as isize + } +} diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index b1d49fb..10e9a56 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1187,6 +1187,22 @@ impl OpaqueStreamRef { me.actions.recv.poll_trailers(cx, &mut stream) } + pub(crate) fn available_recv_capacity(&self) -> isize { + let me = self.inner.lock().unwrap(); + let me = &*me; + + let stream = &me.store[self.key]; + stream.recv_flow.available().into() + } + + pub(crate) fn used_recv_capacity(&self) -> WindowSize { + let me = self.inner.lock().unwrap(); + let me = &*me; + + let stream = &me.store[self.key]; + stream.in_flight_recv_data + } + /// Releases recv capacity back to the peer. This may result in sending /// WINDOW_UPDATE frames on both the stream and connection. pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> { diff --git a/src/server.rs b/src/server.rs index e68de6e..379b307 100644 --- a/src/server.rs +++ b/src/server.rs @@ -118,7 +118,7 @@ use crate::codec::{Codec, RecvError, UserError}; use crate::frame::{self, Pseudo, PushPromiseHeaderError, Reason, Settings, StreamId}; use crate::proto::{self, Config, Prioritized}; -use crate::{PingPong, RecvStream, ReleaseCapacity, SendStream}; +use crate::{FlowControl, PingPong, RecvStream, SendStream}; use bytes::{Buf, Bytes, IntoBuf}; use http::{HeaderMap, Request, Response}; @@ -408,7 +408,7 @@ where if let Some(inner) = self.connection.next_incoming() { log::trace!("received incoming"); let (head, _) = inner.take_request().into_parts(); - let body = RecvStream::new(ReleaseCapacity::new(inner.clone_to_opaque())); + let body = RecvStream::new(FlowControl::new(inner.clone_to_opaque())); let request = Request::from_parts(head, body); let respond = SendResponse { inner }; @@ -427,14 +427,14 @@ where /// /// If `size` is less than the current value, nothing will happen /// immediately. However, as window capacity is released by - /// [`ReleaseCapacity`] instances, no `WINDOW_UPDATE` frames will be sent + /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent /// out until the number of "in flight" bytes drops below `size`. /// /// The default value is 65,535. /// - /// See [`ReleaseCapacity`] documentation for more details. + /// See [`FlowControl`] documentation for more details. /// - /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html + /// [`FlowControl`]: ../struct.FlowControl.html /// [library level]: ../index.html#flow-control pub fn set_target_window_size(&mut self, size: u32) { assert!(size <= proto::MAX_WINDOW_SIZE); @@ -591,11 +591,11 @@ impl Builder { /// flow control for received data. /// /// The initial window of a stream is used as part of flow control. For more - /// details, see [`ReleaseCapacity`]. + /// details, see [`FlowControl`]. /// /// The default value is 65,535. /// - /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html + /// [`FlowControl`]: ../struct.FlowControl.html /// /// # Examples /// @@ -625,11 +625,11 @@ impl Builder { /// for received data. /// /// The initial window of a connection is used as part of flow control. For more details, - /// see [`ReleaseCapacity`]. + /// see [`FlowControl`]. /// /// The default value is 65,535. /// - /// [`ReleaseCapacity`]: ../struct.ReleaseCapacity.html + /// [`FlowControl`]: ../struct.FlowControl.html /// /// # Examples /// diff --git a/src/share.rs b/src/share.rs index 8c2f211..6dc574c 100644 --- a/src/share.rs +++ b/src/share.rs @@ -123,7 +123,7 @@ pub struct StreamId(u32); /// control. /// /// See method level documentation for more details on receiving data. See -/// [`ReleaseCapacity`] for more details on inbound flow control. +/// [`FlowControl`] for more details on inbound flow control. /// /// Note that this type implements [`Stream`], yielding the received data frames. /// When this implementation is used, the capacity is immediately released when @@ -132,11 +132,11 @@ pub struct StreamId(u32); /// /// [`client::ResponseFuture`]: client/struct.ResponseFuture.html /// [`server::Connection`]: server/struct.Connection.html -/// [`ReleaseCapacity`]: struct.ReleaseCapacity.html +/// [`FlowControl`]: struct.FlowControl.html /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html #[must_use = "streams do nothing unless polled"] pub struct RecvStream { - inner: ReleaseCapacity, + inner: FlowControl, } /// A handle to release window capacity to a remote stream. @@ -190,9 +190,9 @@ pub struct RecvStream { /// data. /// /// [flow control]: ../index.html#flow-control -/// [`release_capacity`]: struct.ReleaseCapacity.html#method.release_capacity -#[derive(Debug)] -pub struct ReleaseCapacity { +/// [`release_capacity`]: struct.FlowControl.html#method.release_capacity +#[derive(Clone, Debug)] +pub struct FlowControl { inner: proto::OpaqueStreamRef, } @@ -392,25 +392,10 @@ impl StreamId { // ===== impl RecvStream ===== impl RecvStream { - pub(crate) fn new(inner: ReleaseCapacity) -> Self { + pub(crate) fn new(inner: FlowControl) -> Self { RecvStream { inner } } - /// Returns true if the receive half has reached the end of stream. - /// - /// A return value of `true` means that calls to `poll` and `poll_trailers` - /// will both return `None`. - pub fn is_end_stream(&self) -> bool { - self.inner.inner.is_end_stream() - } - - /// Get a mutable reference to this streams `ReleaseCapacity`. - /// - /// It can be used immediately, or cloned to be used later. - pub fn release_capacity(&mut self) -> &mut ReleaseCapacity { - &mut self.inner - } - /// Get the next data frame. pub async fn data(&mut self) -> Option> { futures_util::future::poll_fn(move |cx| self.poll_data(cx)).await @@ -438,6 +423,21 @@ impl RecvStream { } } + /// Returns true if the receive half has reached the end of stream. + /// + /// A return value of `true` means that calls to `poll` and `poll_trailers` + /// will both return `None`. + pub fn is_end_stream(&self) -> bool { + self.inner.inner.is_end_stream() + } + + /// Get a mutable reference to this stream's `FlowControl`. + /// + /// It can be used immediately, or cloned to be used later. + pub fn flow_control(&mut self) -> &mut FlowControl { + &mut self.inner + } + /// Returns the stream ID of this stream. /// /// # Panics @@ -476,23 +476,31 @@ impl Drop for RecvStream { } } -// ===== impl ReleaseCapacity ===== +// ===== impl FlowControl ===== -impl ReleaseCapacity { +impl FlowControl { pub(crate) fn new(inner: proto::OpaqueStreamRef) -> Self { - ReleaseCapacity { inner } + FlowControl { inner } } /// Returns the stream ID of the stream whose capacity will - /// be released by this `ReleaseCapacity`. - /// - /// # Panics - /// - /// If the lock on the stream store has been poisoned. + /// be released by this `FlowControl`. pub fn stream_id(&self) -> StreamId { StreamId::from_internal(self.inner.stream_id()) } + /// Get the current available capacity of data this stream *could* receive. + pub fn available_capacity(&self) -> isize { + self.inner.available_recv_capacity() + } + + /// Get the currently *used* capacity for this stream. + /// + /// This is the amount of bytes that can be released back to the remote. + pub fn used_capacity(&self) -> usize { + self.inner.used_recv_capacity() as usize + } + /// Release window capacity back to remote stream. /// /// This releases capacity back to the stream level and the connection level @@ -500,16 +508,15 @@ impl ReleaseCapacity { /// /// See [struct level] documentation for more details. /// - /// # Panics + /// # Errors /// - /// This function panics if increasing the receive window size by `sz` would - /// result in a window size greater than the target window size set by - /// [`set_target_window_size`]. In other words, the caller cannot release - /// more capacity than data has been received. If 1024 bytes of data have - /// been received, at most 1024 bytes can be released. + /// This function errors if increasing the receive window size by `sz` would + /// result in a window size greater than the target window size. In other + /// words, the caller cannot release more capacity than data has been + /// received. If 1024 bytes of data have been received, at most 1024 bytes + /// can be released. /// /// [struct level]: # - /// [`set_target_window_size`]: server/struct.Server.html#method.set_target_window_size pub fn release_capacity(&mut self, sz: usize) -> Result<(), crate::Error> { if sz > proto::MAX_WINDOW_SIZE as usize { return Err(UserError::ReleaseCapacityTooBig.into()); @@ -520,13 +527,6 @@ impl ReleaseCapacity { } } -impl Clone for ReleaseCapacity { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - ReleaseCapacity { inner } - } -} - // ===== impl PingPong ===== impl PingPong { diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 1695edc..d9c6c3b 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -101,9 +101,7 @@ async fn release_capacity_sends_window_update() { let buf = body.next().await.unwrap().unwrap(); assert_eq!(buf.len(), payload_len); - body.release_capacity() - .release_capacity(buf.len() * 2) - .unwrap(); + body.flow_control().release_capacity(buf.len() * 2).unwrap(); let buf = body.next().await.unwrap().unwrap(); assert_eq!(buf.len(), payload_len); @@ -158,7 +156,7 @@ async fn release_capacity_of_small_amount_does_not_send_window_update() { let buf = body.next().await.unwrap().unwrap(); // read the small body and then release it assert_eq!(buf.len(), 16); - body.release_capacity().release_capacity(buf.len()).unwrap(); + body.flow_control().release_capacity(buf.len()).unwrap(); let buf = body.next().await; assert!(buf.is_none()); }; @@ -342,7 +340,7 @@ async fn stream_error_release_connection_capacity() { .expect("response"); assert_eq!(resp.status(), StatusCode::OK); let mut body = resp.into_parts().1; - let mut cap = body.release_capacity().clone(); + let mut cap = body.flow_control().clone(); let to_release = 16_384 * 2; let mut should_recv_bytes = to_release; let mut should_recv_frames = 2usize; @@ -787,7 +785,7 @@ async fn connection_notified_on_released_capacity() { idle_ms(100).await; // Release the capacity - a.release_capacity().release_capacity(16_384).unwrap(); + a.flow_control().release_capacity(16_384).unwrap(); th1_rx.await.unwrap(); th2_rx.await.unwrap(); @@ -1156,7 +1154,7 @@ async fn decrease_target_window_size() { let res = conn.drive(resp).await.expect("response"); conn.set_target_window_size(16_384); let mut body = res.into_parts().1; - let mut cap = body.release_capacity().clone(); + let mut cap = body.flow_control().clone(); let bytes = conn.drive(body.try_concat()).await.expect("concat"); assert_eq!(bytes.len(), 65_535); @@ -1341,8 +1339,10 @@ async fn client_decrease_initial_window_size() { .await; // stream 5 went negative, so release back to 0 + assert_eq!(body5.flow_control().available_capacity(), -100); + assert_eq!(body5.flow_control().used_capacity(), 100); body5 - .release_capacity() + .flow_control() .release_capacity(100) .expect("release_capacity"); conn.drive(yield_once()).await;