From 4c7ecf158d40375afedf0ba331c4157bf8edc080 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 9 Oct 2017 14:00:28 -0700 Subject: [PATCH] Add ReleaseCapacity handle. (#148) This enables releasing stream capacity without having the `Body` handle. --- src/client.rs | 48 ++++++++++++++++++++++++++++++++++-------- src/server.rs | 49 +++++++++++++++++++++++++++++++++++-------- tests/flow_control.rs | 6 +++--- 3 files changed, 82 insertions(+), 21 deletions(-) diff --git a/src/client.rs b/src/client.rs index ad7ef18..434e20b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -34,8 +34,12 @@ pub struct Stream { inner: proto::StreamRef, } -#[derive(Debug)] pub struct Body { + inner: ReleaseCapacity, +} + +#[derive(Debug)] +pub struct ReleaseCapacity { inner: proto::StreamRef, } @@ -310,7 +314,7 @@ impl Stream { pub fn poll_response(&mut self) -> Poll>, ::Error> { let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); let body = Body { - inner: self.inner.clone(), + inner: ReleaseCapacity { inner: self.inner.clone() }, }; Ok(Response::from_parts(parts, body).into()) @@ -364,20 +368,18 @@ impl Future for Stream { impl Body { pub fn is_empty(&self) -> bool { // If the recv side is closed and the receive queue is empty, the body is empty. - self.inner.body_is_empty() + self.inner.inner.body_is_empty() } - pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { - self.inner - .release_capacity(sz as proto::WindowSize) - .map_err(Into::into) + pub fn release_capacity(&mut self) -> &mut ReleaseCapacity { + &mut self.inner } /// Poll trailers /// /// This function **must** not be called until `Body::poll` returns `None`. pub fn poll_trailers(&mut self) -> Poll, ::Error> { - self.inner.poll_trailers().map_err(Into::into) + self.inner.inner.poll_trailers().map_err(Into::into) } } @@ -386,7 +388,35 @@ impl ::futures::Stream for Body { type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll_data().map_err(Into::into) + self.inner.inner.poll_data().map_err(Into::into) + } +} + +impl fmt::Debug for Body +where B: fmt::Debug, + B::Buf: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Body") + .field("inner", &self.inner) + .finish() + } +} + +// ===== impl ReleaseCapacity ===== + +impl ReleaseCapacity { + pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { + self.inner + .release_capacity(sz as proto::WindowSize) + .map_err(Into::into) + } +} + +impl Clone for ReleaseCapacity { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + ReleaseCapacity { inner } } } diff --git a/src/server.rs b/src/server.rs index e327874..329a57f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -31,8 +31,12 @@ pub struct Stream { inner: proto::StreamRef, } -#[derive(Debug)] pub struct Body { + inner: ReleaseCapacity, +} + +#[derive(Debug)] +pub struct ReleaseCapacity { inner: proto::StreamRef, } @@ -147,7 +151,7 @@ where trace!("received incoming"); let (head, _) = inner.take_request().into_parts(); let body = Body { - inner: inner.clone(), + inner: ReleaseCapacity { inner: inner.clone() }, }; let request = Request::from_parts(head, body); @@ -271,20 +275,18 @@ impl Stream { impl Body { pub fn is_empty(&self) -> bool { // If the recv side is closed and the receive queue is empty, the body is empty. - self.inner.body_is_empty() + self.inner.inner.body_is_empty() } - pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { - self.inner - .release_capacity(sz as proto::WindowSize) - .map_err(Into::into) + pub fn release_capacity(&mut self) -> &mut ReleaseCapacity { + &mut self.inner } /// Poll trailers /// /// This function **must** not be called until `Body::poll` returns `None`. pub fn poll_trailers(&mut self) -> Poll, ::Error> { - self.inner.poll_trailers().map_err(Into::into) + self.inner.inner.poll_trailers().map_err(Into::into) } } @@ -293,7 +295,36 @@ impl futures::Stream for Body { type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll_data().map_err(Into::into) + self.inner.inner.poll_data().map_err(Into::into) + } +} + + +impl fmt::Debug for Body +where B: fmt::Debug, + B::Buf: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Body") + .field("inner", &self.inner) + .finish() + } +} + +// ===== impl ReleaseCapacity ===== + +impl ReleaseCapacity { + pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { + self.inner + .release_capacity(sz as proto::WindowSize) + .map_err(Into::into) + } +} + +impl Clone for ReleaseCapacity { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + ReleaseCapacity { inner } } } diff --git a/tests/flow_control.rs b/tests/flow_control.rs index e4122c3..86901ec 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -110,7 +110,7 @@ fn release_capacity_sends_window_update() { .and_then(|(buf, mut body)| { let buf = buf.unwrap(); assert_eq!(buf.len(), payload.len()); - body.release_capacity(buf.len() * 2).unwrap(); + body.release_capacity().release_capacity(buf.len() * 2).unwrap(); body.into_future().unwrap() }) .and_then(|(buf, _)| { @@ -164,7 +164,7 @@ fn release_capacity_of_small_amount_does_not_send_window_update() { .and_then(|(buf, mut body)| { let buf = buf.unwrap(); assert_eq!(buf.len(), 16); - body.release_capacity(buf.len()).unwrap(); + body.release_capacity().release_capacity(buf.len()).unwrap(); body.into_future().unwrap() }) .and_then(|(buf, _)| { @@ -734,7 +734,7 @@ fn connection_notified_on_released_capacity() { thread::sleep(Duration::from_millis(100)); // Release the capacity - a.release_capacity(16_384).unwrap(); + a.release_capacity().release_capacity(16_384).unwrap(); th1.join().unwrap(); th2.join().unwrap();