Promote SendRequest::pending to an OpaqueStreamRef. (#281)
Because `self.pending` doesn't necessarily get cleaned up in a timely fashion - rather, only when the user calls `poll_ready()` - it was possible for it to refer to a stream that has already been closed. This would lead to a panic the next time that `poll_ready()` was called. Instead, use an `OpaqueStreamRef`, bumping the refcount. A change to an existing test is included which demonstrates the issue.
This commit is contained in:
		
				
					committed by
					
						 Sean McArthur
						Sean McArthur
					
				
			
			
				
	
			
			
			
						parent
						
							1b9469ff75
						
					
				
				
					commit
					23234fa14f
				
			| @@ -223,7 +223,7 @@ pub struct Handshake<T, B: IntoBuf = Bytes> { | |||||||
| /// [`Error`]: ../struct.Error.html | /// [`Error`]: ../struct.Error.html | ||||||
| pub struct SendRequest<B: IntoBuf> { | pub struct SendRequest<B: IntoBuf> { | ||||||
|     inner: proto::Streams<B::Buf, Peer>, |     inner: proto::Streams<B::Buf, Peer>, | ||||||
|     pending: Option<proto::StreamKey>, |     pending: Option<proto::OpaqueStreamRef>, | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Returns a `SendRequest` instance once it is ready to send at least one | /// Returns a `SendRequest` instance once it is ready to send at least one | ||||||
| @@ -568,7 +568,7 @@ where | |||||||
|             .map_err(Into::into) |             .map_err(Into::into) | ||||||
|             .map(|stream| { |             .map(|stream| { | ||||||
|                 if stream.is_pending_open() { |                 if stream.is_pending_open() { | ||||||
|                     self.pending = Some(stream.key()); |                     self.pending = Some(stream.clone_to_opaque()); | ||||||
|                 } |                 } | ||||||
|  |  | ||||||
|                 let response = ResponseFuture { |                 let response = ResponseFuture { | ||||||
|   | |||||||
| @@ -9,7 +9,7 @@ mod streams; | |||||||
| pub(crate) use self::connection::{Config, Connection}; | pub(crate) use self::connection::{Config, Connection}; | ||||||
| pub(crate) use self::error::Error; | pub(crate) use self::error::Error; | ||||||
| pub(crate) use self::peer::{Peer, Dyn as DynPeer}; | pub(crate) use self::peer::{Peer, Dyn as DynPeer}; | ||||||
| pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams}; | pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams}; | ||||||
| pub(crate) use self::streams::{PollReset, Prioritized, Open}; | pub(crate) use self::streams::{PollReset, Prioritized, Open}; | ||||||
|  |  | ||||||
| use codec::Codec; | use codec::Codec; | ||||||
|   | |||||||
| @@ -12,7 +12,6 @@ mod streams; | |||||||
| pub(crate) use self::prioritize::Prioritized; | pub(crate) use self::prioritize::Prioritized; | ||||||
| pub(crate) use self::recv::Open; | pub(crate) use self::recv::Open; | ||||||
| pub(crate) use self::send::PollReset; | pub(crate) use self::send::PollReset; | ||||||
| pub(crate) use self::store::Key; |  | ||||||
| pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams}; | pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams}; | ||||||
|  |  | ||||||
| use self::buffer::Buffer; | use self::buffer::Buffer; | ||||||
|   | |||||||
| @@ -567,7 +567,7 @@ where | |||||||
|         &mut self, |         &mut self, | ||||||
|         request: Request<()>, |         request: Request<()>, | ||||||
|         end_of_stream: bool, |         end_of_stream: bool, | ||||||
|         pending: Option<&store::Key>, |         pending: Option<&OpaqueStreamRef>, | ||||||
|     ) -> Result<StreamRef<B>, SendError> { |     ) -> Result<StreamRef<B>, SendError> { | ||||||
|         use http::Method; |         use http::Method; | ||||||
|         use super::stream::ContentLength; |         use super::stream::ContentLength; | ||||||
| @@ -593,8 +593,8 @@ where | |||||||
|             // |             // | ||||||
|             // If that stream is still pending, the Client isn't allowed to |             // If that stream is still pending, the Client isn't allowed to | ||||||
|             // queue up another pending stream. They should use `poll_ready`. |             // queue up another pending stream. They should use `poll_ready`. | ||||||
|             if let Some(key) = pending { |             if let Some(stream) = pending { | ||||||
|                 if me.store.resolve(*key).is_pending_open { |                 if me.store.resolve(stream.key).is_pending_open { | ||||||
|                     return Err(UserError::Rejected.into()); |                     return Err(UserError::Rejected.into()); | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
| @@ -697,15 +697,15 @@ impl<B> Streams<B, client::Peer> | |||||||
| where | where | ||||||
|     B: Buf, |     B: Buf, | ||||||
| { | { | ||||||
|     pub fn poll_pending_open(&mut self, key: Option<&store::Key>) -> Poll<(), ::Error> { |     pub fn poll_pending_open(&mut self, pending: Option<&OpaqueStreamRef>) -> Poll<(), ::Error> { | ||||||
|         let mut me = self.inner.lock().unwrap(); |         let mut me = self.inner.lock().unwrap(); | ||||||
|         let me = &mut *me; |         let me = &mut *me; | ||||||
|  |  | ||||||
|         me.actions.ensure_no_conn_error()?; |         me.actions.ensure_no_conn_error()?; | ||||||
|         me.actions.send.ensure_next_stream_id()?; |         me.actions.send.ensure_next_stream_id()?; | ||||||
|  |  | ||||||
|         if let Some(key) = key { |         if let Some(pending) = pending { | ||||||
|             let mut stream = me.store.resolve(*key); |             let mut stream = me.store.resolve(pending.key); | ||||||
|             trace!("poll_pending_open; stream = {:?}", stream.is_pending_open); |             trace!("poll_pending_open; stream = {:?}", stream.is_pending_open); | ||||||
|             if stream.is_pending_open { |             if stream.is_pending_open { | ||||||
|                 stream.wait_send(); |                 stream.wait_send(); | ||||||
| @@ -941,10 +941,6 @@ impl<B> StreamRef<B> { | |||||||
|             .map_err(From::from) |             .map_err(From::from) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub(crate) fn key(&self) -> store::Key { |  | ||||||
|         self.opaque.key |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     pub fn clone_to_opaque(&self) -> OpaqueStreamRef |     pub fn clone_to_opaque(&self) -> OpaqueStreamRef | ||||||
|         where B: 'static, |         where B: 'static, | ||||||
|     { |     { | ||||||
|   | |||||||
| @@ -916,6 +916,8 @@ fn notify_on_send_capacity() { | |||||||
|                     assert_eq!(response.status(), StatusCode::OK); |                     assert_eq!(response.status(), StatusCode::OK); | ||||||
|                 } |                 } | ||||||
|  |  | ||||||
|  |                 poll_fn(|| client.poll_ready()).wait().unwrap(); | ||||||
|  |  | ||||||
|                 done_tx.send(()).unwrap(); |                 done_tx.send(()).unwrap(); | ||||||
|             }); |             }); | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user