From fc5efe73d672b3fe857f68862f85dc58e56a486a Mon Sep 17 00:00:00 2001 From: Michael Beaumont Date: Thu, 18 Oct 2018 08:09:28 +0200 Subject: [PATCH] Add OpaqueStreamRef constructor (#325) Closes #318 --- src/proto/streams/streams.rs | 153 +++++++++++++++-------------------- 1 file changed, 67 insertions(+), 86 deletions(-) diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 0b41530..cb8a6d5 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -482,30 +482,15 @@ where } pub fn next_incoming(&mut self) -> Option> { - let key = { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - match me.actions.recv.next_incoming(&mut me.store) { - Some(key) => { - let mut stream = me.store.resolve(key); - trace!("next_incoming; id={:?}, state={:?}", stream.id, stream.state); - // Increment the ref count - stream.ref_inc(); - - // Return the key - Some(key) - }, - None => None, - } - }; + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + let key = me.actions.recv.next_incoming(&mut me.store); key.map(|key| { + let stream = &mut me.store.resolve(key); + trace!("next_incoming; id={:?}, state={:?}", stream.id, stream.state); StreamRef { - opaque: OpaqueStreamRef { - inner: self.inner.clone(), - key, - }, + opaque: OpaqueStreamRef::new(self.inner.clone(), stream), send_buffer: self.send_buffer.clone(), } }) @@ -586,82 +571,75 @@ where // implicitly closes the earlier stream IDs. // // See: carllerche/h2#11 - let key = { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; - let mut send_buffer = self.send_buffer.inner.lock().unwrap(); - let send_buffer = &mut *send_buffer; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; - me.actions.ensure_no_conn_error()?; - me.actions.send.ensure_next_stream_id()?; + me.actions.ensure_no_conn_error()?; + me.actions.send.ensure_next_stream_id()?; - // The `pending` argument is provided by the `Client`, and holds - // a store `Key` of a `Stream` that may have been not been opened - // yet. - // - // If that stream is still pending, the Client isn't allowed to - // queue up another pending stream. They should use `poll_ready`. - if let Some(stream) = pending { - if me.store.resolve(stream.key).is_pending_open { - return Err(UserError::Rejected.into()); - } + // The `pending` argument is provided by the `Client`, and holds + // a store `Key` of a `Stream` that may have been not been opened + // yet. + // + // If that stream is still pending, the Client isn't allowed to + // queue up another pending stream. They should use `poll_ready`. + if let Some(stream) = pending { + if me.store.resolve(stream.key).is_pending_open { + return Err(UserError::Rejected.into()); } + } - if me.counts.peer().is_server() { - // Servers cannot open streams. PushPromise must first be reserved. - return Err(UserError::UnexpectedFrameType.into()); - } + if me.counts.peer().is_server() { + // Servers cannot open streams. PushPromise must first be reserved. + return Err(UserError::UnexpectedFrameType.into()); + } - let stream_id = me.actions.send.open()?; + let stream_id = me.actions.send.open()?; - let mut stream = Stream::new( - stream_id, - me.actions.send.init_window_sz(), - me.actions.recv.init_window_sz(), - ); + let mut stream = Stream::new( + stream_id, + me.actions.send.init_window_sz(), + me.actions.recv.init_window_sz(), + ); - if *request.method() == Method::HEAD { - stream.content_length = ContentLength::Head; - } + if *request.method() == Method::HEAD { + stream.content_length = ContentLength::Head; + } - // Convert the message - let headers = client::Peer::convert_send_message( - stream_id, request, end_of_stream)?; + // Convert the message + let headers = client::Peer::convert_send_message( + stream_id, request, end_of_stream)?; - let mut stream = me.store.insert(stream.id, stream); + let mut stream = me.store.insert(stream.id, stream); - let sent = me.actions.send.send_headers( - headers, - send_buffer, - &mut stream, - &mut me.counts, - &mut me.actions.task, - ); + let sent = me.actions.send.send_headers( + headers, + send_buffer, + &mut stream, + &mut me.counts, + &mut me.actions.task, + ); - // send_headers can return a UserError, if it does, - // we should forget about this stream. - if let Err(err) = sent { - stream.unlink(); - stream.remove(); - return Err(err.into()); - } + // send_headers can return a UserError, if it does, + // we should forget about this stream. + if let Err(err) = sent { + stream.unlink(); + stream.remove(); + return Err(err.into()); + } - // Given that the stream has been initialized, it should not be in the - // closed state. - debug_assert!(!stream.state.is_closed()); - - // Increment the stream ref count as we will be returning a handle. - stream.ref_inc(); - - stream.key() - }; + // Given that the stream has been initialized, it should not be in the + // closed state. + debug_assert!(!stream.state.is_closed()); Ok(StreamRef { - opaque: OpaqueStreamRef { - inner: self.inner.clone(), - key: key, - }, + opaque: OpaqueStreamRef::new( + self.inner.clone(), + &mut stream, + ), send_buffer: self.send_buffer.clone(), }) } @@ -973,6 +951,12 @@ impl Clone for StreamRef { // ===== impl OpaqueStreamRef ===== impl OpaqueStreamRef { + fn new(inner: Arc>, stream: &mut store::Ptr) -> OpaqueStreamRef { + stream.ref_inc(); + OpaqueStreamRef { + inner, key: stream.key() + } + } /// Called by a client to check for a received response. pub fn poll_response(&mut self) -> Poll, proto::Error> { let mut me = self.inner.lock().unwrap(); @@ -994,11 +978,8 @@ impl OpaqueStreamRef { try_ready!(me.actions.recv.poll_pushed(&mut stream)) }; Ok(Async::Ready(res.map(|(h, key)| { - me.store.resolve(key).ref_inc(); let opaque_ref = - OpaqueStreamRef { - inner: self.inner.clone(), key, - }; + OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key)); (h, opaque_ref) }))) }