Add ReleaseCapacity handle. (#148)
This enables releasing stream capacity without having the `Body` handle.
This commit is contained in:
		| @@ -34,8 +34,12 @@ pub struct Stream<B: IntoBuf> { | |||||||
|     inner: proto::StreamRef<B::Buf, Peer>, |     inner: proto::StreamRef<B::Buf, Peer>, | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Debug)] |  | ||||||
| pub struct Body<B: IntoBuf> { | pub struct Body<B: IntoBuf> { | ||||||
|  |     inner: ReleaseCapacity<B>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[derive(Debug)] | ||||||
|  | pub struct ReleaseCapacity<B: IntoBuf> { | ||||||
|     inner: proto::StreamRef<B::Buf, Peer>, |     inner: proto::StreamRef<B::Buf, Peer>, | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -310,7 +314,7 @@ impl<B: IntoBuf> Stream<B> { | |||||||
|     pub fn poll_response(&mut self) -> Poll<Response<Body<B>>, ::Error> { |     pub fn poll_response(&mut self) -> Poll<Response<Body<B>>, ::Error> { | ||||||
|         let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); |         let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); | ||||||
|         let body = Body { |         let body = Body { | ||||||
|             inner: self.inner.clone(), |             inner: ReleaseCapacity { inner: self.inner.clone() }, | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         Ok(Response::from_parts(parts, body).into()) |         Ok(Response::from_parts(parts, body).into()) | ||||||
| @@ -364,20 +368,18 @@ impl<B: IntoBuf> Future for Stream<B> { | |||||||
| impl<B: IntoBuf> Body<B> { | impl<B: IntoBuf> Body<B> { | ||||||
|     pub fn is_empty(&self) -> bool { |     pub fn is_empty(&self) -> bool { | ||||||
|         // If the recv side is closed and the receive queue is empty, the body is empty. |         // If the recv side is closed and the receive queue is empty, the body is empty. | ||||||
|         self.inner.body_is_empty() |         self.inner.inner.body_is_empty() | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { |     pub fn release_capacity(&mut self) -> &mut ReleaseCapacity<B> { | ||||||
|         self.inner |         &mut self.inner | ||||||
|             .release_capacity(sz as proto::WindowSize) |  | ||||||
|             .map_err(Into::into) |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Poll trailers |     /// Poll trailers | ||||||
|     /// |     /// | ||||||
|     /// This function **must** not be called until `Body::poll` returns `None`. |     /// This function **must** not be called until `Body::poll` returns `None`. | ||||||
|     pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ::Error> { |     pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ::Error> { | ||||||
|         self.inner.poll_trailers().map_err(Into::into) |         self.inner.inner.poll_trailers().map_err(Into::into) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -386,7 +388,35 @@ impl<B: IntoBuf> ::futures::Stream for Body<B> { | |||||||
|     type Error = ::Error; |     type Error = ::Error; | ||||||
|  |  | ||||||
|     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||||
|         self.inner.poll_data().map_err(Into::into) |         self.inner.inner.poll_data().map_err(Into::into) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<B: IntoBuf> fmt::Debug for Body<B> | ||||||
|  | where B: fmt::Debug, | ||||||
|  |       B::Buf: fmt::Debug, | ||||||
|  | { | ||||||
|  |     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { | ||||||
|  |         fmt.debug_struct("Body") | ||||||
|  |             .field("inner", &self.inner) | ||||||
|  |             .finish() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // ===== impl ReleaseCapacity ===== | ||||||
|  |  | ||||||
|  | impl<B: IntoBuf> ReleaseCapacity<B> { | ||||||
|  |     pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { | ||||||
|  |         self.inner | ||||||
|  |             .release_capacity(sz as proto::WindowSize) | ||||||
|  |             .map_err(Into::into) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<B: IntoBuf> Clone for ReleaseCapacity<B> { | ||||||
|  |     fn clone(&self) -> Self { | ||||||
|  |         let inner = self.inner.clone(); | ||||||
|  |         ReleaseCapacity { inner } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -31,8 +31,12 @@ pub struct Stream<B: IntoBuf> { | |||||||
|     inner: proto::StreamRef<B::Buf, Peer>, |     inner: proto::StreamRef<B::Buf, Peer>, | ||||||
| } | } | ||||||
|  |  | ||||||
| #[derive(Debug)] |  | ||||||
| pub struct Body<B: IntoBuf> { | pub struct Body<B: IntoBuf> { | ||||||
|  |     inner: ReleaseCapacity<B>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[derive(Debug)] | ||||||
|  | pub struct ReleaseCapacity<B: IntoBuf> { | ||||||
|     inner: proto::StreamRef<B::Buf, Peer>, |     inner: proto::StreamRef<B::Buf, Peer>, | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -147,7 +151,7 @@ where | |||||||
|             trace!("received incoming"); |             trace!("received incoming"); | ||||||
|             let (head, _) = inner.take_request().into_parts(); |             let (head, _) = inner.take_request().into_parts(); | ||||||
|             let body = Body { |             let body = Body { | ||||||
|                 inner: inner.clone(), |                 inner: ReleaseCapacity { inner: inner.clone() }, | ||||||
|             }; |             }; | ||||||
|  |  | ||||||
|             let request = Request::from_parts(head, body); |             let request = Request::from_parts(head, body); | ||||||
| @@ -271,20 +275,18 @@ impl Stream<Bytes> { | |||||||
| impl<B: IntoBuf> Body<B> { | impl<B: IntoBuf> Body<B> { | ||||||
|     pub fn is_empty(&self) -> bool { |     pub fn is_empty(&self) -> bool { | ||||||
|         // If the recv side is closed and the receive queue is empty, the body is empty. |         // If the recv side is closed and the receive queue is empty, the body is empty. | ||||||
|         self.inner.body_is_empty() |         self.inner.inner.body_is_empty() | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { |     pub fn release_capacity(&mut self) -> &mut ReleaseCapacity<B> { | ||||||
|         self.inner |         &mut self.inner | ||||||
|             .release_capacity(sz as proto::WindowSize) |  | ||||||
|             .map_err(Into::into) |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Poll trailers |     /// Poll trailers | ||||||
|     /// |     /// | ||||||
|     /// This function **must** not be called until `Body::poll` returns `None`. |     /// This function **must** not be called until `Body::poll` returns `None`. | ||||||
|     pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ::Error> { |     pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ::Error> { | ||||||
|         self.inner.poll_trailers().map_err(Into::into) |         self.inner.inner.poll_trailers().map_err(Into::into) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -293,7 +295,36 @@ impl<B: IntoBuf> futures::Stream for Body<B> { | |||||||
|     type Error = ::Error; |     type Error = ::Error; | ||||||
|  |  | ||||||
|     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | ||||||
|         self.inner.poll_data().map_err(Into::into) |         self.inner.inner.poll_data().map_err(Into::into) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  |  | ||||||
|  | impl<B: IntoBuf> fmt::Debug for Body<B> | ||||||
|  | where B: fmt::Debug, | ||||||
|  |       B::Buf: fmt::Debug, | ||||||
|  | { | ||||||
|  |     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { | ||||||
|  |         fmt.debug_struct("Body") | ||||||
|  |             .field("inner", &self.inner) | ||||||
|  |             .finish() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // ===== impl ReleaseCapacity ===== | ||||||
|  |  | ||||||
|  | impl<B: IntoBuf> ReleaseCapacity<B> { | ||||||
|  |     pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { | ||||||
|  |         self.inner | ||||||
|  |             .release_capacity(sz as proto::WindowSize) | ||||||
|  |             .map_err(Into::into) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<B: IntoBuf> Clone for ReleaseCapacity<B> { | ||||||
|  |     fn clone(&self) -> Self { | ||||||
|  |         let inner = self.inner.clone(); | ||||||
|  |         ReleaseCapacity { inner } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -110,7 +110,7 @@ fn release_capacity_sends_window_update() { | |||||||
|                 .and_then(|(buf, mut body)| { |                 .and_then(|(buf, mut body)| { | ||||||
|                     let buf = buf.unwrap(); |                     let buf = buf.unwrap(); | ||||||
|                     assert_eq!(buf.len(), payload.len()); |                     assert_eq!(buf.len(), payload.len()); | ||||||
|                     body.release_capacity(buf.len() * 2).unwrap(); |                     body.release_capacity().release_capacity(buf.len() * 2).unwrap(); | ||||||
|                     body.into_future().unwrap() |                     body.into_future().unwrap() | ||||||
|                 }) |                 }) | ||||||
|                 .and_then(|(buf, _)| { |                 .and_then(|(buf, _)| { | ||||||
| @@ -164,7 +164,7 @@ fn release_capacity_of_small_amount_does_not_send_window_update() { | |||||||
|                 .and_then(|(buf, mut body)| { |                 .and_then(|(buf, mut body)| { | ||||||
|                     let buf = buf.unwrap(); |                     let buf = buf.unwrap(); | ||||||
|                     assert_eq!(buf.len(), 16); |                     assert_eq!(buf.len(), 16); | ||||||
|                     body.release_capacity(buf.len()).unwrap(); |                     body.release_capacity().release_capacity(buf.len()).unwrap(); | ||||||
|                     body.into_future().unwrap() |                     body.into_future().unwrap() | ||||||
|                 }) |                 }) | ||||||
|                 .and_then(|(buf, _)| { |                 .and_then(|(buf, _)| { | ||||||
| @@ -734,7 +734,7 @@ fn connection_notified_on_released_capacity() { | |||||||
|     thread::sleep(Duration::from_millis(100)); |     thread::sleep(Duration::from_millis(100)); | ||||||
|  |  | ||||||
|     // Release the capacity |     // Release the capacity | ||||||
|     a.release_capacity(16_384).unwrap(); |     a.release_capacity().release_capacity(16_384).unwrap(); | ||||||
|  |  | ||||||
|     th1.join().unwrap(); |     th1.join().unwrap(); | ||||||
|     th2.join().unwrap(); |     th2.join().unwrap(); | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user