Add OpaqueStreamRef constructor (#325)

Closes #318
This commit is contained in:
Michael Beaumont
2018-10-18 08:09:28 +02:00
committed by Carl Lerche
parent 80b4ec5073
commit fc5efe73d6

View File

@@ -482,30 +482,15 @@ where
} }
pub fn next_incoming(&mut self) -> Option<StreamRef<B>> { pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
let key = { let mut me = self.inner.lock().unwrap();
let mut me = self.inner.lock().unwrap(); let me = &mut *me;
let me = &mut *me; let key = me.actions.recv.next_incoming(&mut me.store);
match me.actions.recv.next_incoming(&mut me.store) {
Some(key) => {
let mut stream = me.store.resolve(key);
trace!("next_incoming; id={:?}, state={:?}", stream.id, stream.state);
// Increment the ref count
stream.ref_inc();
// Return the key
Some(key)
},
None => None,
}
};
key.map(|key| { key.map(|key| {
let stream = &mut me.store.resolve(key);
trace!("next_incoming; id={:?}, state={:?}", stream.id, stream.state);
StreamRef { StreamRef {
opaque: OpaqueStreamRef { opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
inner: self.inner.clone(),
key,
},
send_buffer: self.send_buffer.clone(), send_buffer: self.send_buffer.clone(),
} }
}) })
@@ -586,82 +571,75 @@ where
// implicitly closes the earlier stream IDs. // implicitly closes the earlier stream IDs.
// //
// See: carllerche/h2#11 // See: carllerche/h2#11
let key = { let mut me = self.inner.lock().unwrap();
let mut me = self.inner.lock().unwrap(); let me = &mut *me;
let me = &mut *me;
let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer; let send_buffer = &mut *send_buffer;
me.actions.ensure_no_conn_error()?; me.actions.ensure_no_conn_error()?;
me.actions.send.ensure_next_stream_id()?; me.actions.send.ensure_next_stream_id()?;
// The `pending` argument is provided by the `Client`, and holds // The `pending` argument is provided by the `Client`, and holds
// a store `Key` of a `Stream` that may have been not been opened // a store `Key` of a `Stream` that may have been not been opened
// yet. // yet.
// //
// If that stream is still pending, the Client isn't allowed to // If that stream is still pending, the Client isn't allowed to
// queue up another pending stream. They should use `poll_ready`. // queue up another pending stream. They should use `poll_ready`.
if let Some(stream) = pending { if let Some(stream) = pending {
if me.store.resolve(stream.key).is_pending_open { if me.store.resolve(stream.key).is_pending_open {
return Err(UserError::Rejected.into()); return Err(UserError::Rejected.into());
}
} }
}
if me.counts.peer().is_server() { if me.counts.peer().is_server() {
// Servers cannot open streams. PushPromise must first be reserved. // Servers cannot open streams. PushPromise must first be reserved.
return Err(UserError::UnexpectedFrameType.into()); return Err(UserError::UnexpectedFrameType.into());
} }
let stream_id = me.actions.send.open()?; let stream_id = me.actions.send.open()?;
let mut stream = Stream::new( let mut stream = Stream::new(
stream_id, stream_id,
me.actions.send.init_window_sz(), me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz(), me.actions.recv.init_window_sz(),
); );
if *request.method() == Method::HEAD { if *request.method() == Method::HEAD {
stream.content_length = ContentLength::Head; stream.content_length = ContentLength::Head;
} }
// Convert the message // Convert the message
let headers = client::Peer::convert_send_message( let headers = client::Peer::convert_send_message(
stream_id, request, end_of_stream)?; stream_id, request, end_of_stream)?;
let mut stream = me.store.insert(stream.id, stream); let mut stream = me.store.insert(stream.id, stream);
let sent = me.actions.send.send_headers( let sent = me.actions.send.send_headers(
headers, headers,
send_buffer, send_buffer,
&mut stream, &mut stream,
&mut me.counts, &mut me.counts,
&mut me.actions.task, &mut me.actions.task,
); );
// send_headers can return a UserError, if it does, // send_headers can return a UserError, if it does,
// we should forget about this stream. // we should forget about this stream.
if let Err(err) = sent { if let Err(err) = sent {
stream.unlink(); stream.unlink();
stream.remove(); stream.remove();
return Err(err.into()); return Err(err.into());
} }
// Given that the stream has been initialized, it should not be in the // Given that the stream has been initialized, it should not be in the
// closed state. // closed state.
debug_assert!(!stream.state.is_closed()); debug_assert!(!stream.state.is_closed());
// Increment the stream ref count as we will be returning a handle.
stream.ref_inc();
stream.key()
};
Ok(StreamRef { Ok(StreamRef {
opaque: OpaqueStreamRef { opaque: OpaqueStreamRef::new(
inner: self.inner.clone(), self.inner.clone(),
key: key, &mut stream,
}, ),
send_buffer: self.send_buffer.clone(), send_buffer: self.send_buffer.clone(),
}) })
} }
@@ -973,6 +951,12 @@ impl<B> Clone for StreamRef<B> {
// ===== impl OpaqueStreamRef ===== // ===== impl OpaqueStreamRef =====
impl OpaqueStreamRef { impl OpaqueStreamRef {
fn new(inner: Arc<Mutex<Inner>>, stream: &mut store::Ptr) -> OpaqueStreamRef {
stream.ref_inc();
OpaqueStreamRef {
inner, key: stream.key()
}
}
/// Called by a client to check for a received response. /// Called by a client to check for a received response.
pub fn poll_response(&mut self) -> Poll<Response<()>, proto::Error> { pub fn poll_response(&mut self) -> Poll<Response<()>, proto::Error> {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
@@ -994,11 +978,8 @@ impl OpaqueStreamRef {
try_ready!(me.actions.recv.poll_pushed(&mut stream)) try_ready!(me.actions.recv.poll_pushed(&mut stream))
}; };
Ok(Async::Ready(res.map(|(h, key)| { Ok(Async::Ready(res.map(|(h, key)| {
me.store.resolve(key).ref_inc();
let opaque_ref = let opaque_ref =
OpaqueStreamRef { OpaqueStreamRef::new(self.inner.clone(), &mut me.store.resolve(key));
inner: self.inner.clone(), key,
};
(h, opaque_ref) (h, opaque_ref)
}))) })))
} }