Add stream_id accessors to public API types (#292)

Problem:

Applications may want to access the underlying h2 stream ID for
diagnostics, etc. Stream IDs were not previously exposed in public APIs.

Solution:

Added a new public `share::StreamId` type, which has a more restricted 
API than the internal `frame::StreamId` type. The public API types 
`SendStream`, `RecvStream`, `ReleaseCapacity`, 
`client::ResponseFuture`, and `server::SendResponse` now all have 
`stream_id` methods which return the stream ID of the corresponding 
stream.

Closes #289.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman
2018-07-12 21:01:57 -07:00
committed by GitHub
parent 41aae14c64
commit f3806d5144
6 changed files with 101 additions and 1 deletions

View File

@@ -1353,6 +1353,17 @@ impl Future for ResponseFuture {
}
}
impl ResponseFuture {
/// Returns the stream ID of the response stream.
///
/// # Panics
///
/// If the lock on the stream store has been poisoned.
pub fn stream_id(&self) -> ::StreamId {
::StreamId::from_internal(self.inner.stream_id())
}
}
// ===== impl Peer =====
impl Peer {

View File

@@ -1,6 +1,16 @@
use byteorder::{BigEndian, ByteOrder};
use std::u32;
/// A stream identifier, as described in [Section 5.1.1] of RFC 7540.
///
/// Streams are identified with an unsigned 31-bit integer. Streams
/// initiated by a client MUST use odd-numbered stream identifiers; those
/// initiated by the server MUST use even-numbered stream identifiers. A
/// stream identifier of zero (0x0) is used for connection control
/// messages; the stream identifier of zero cannot be used to establish a
/// new stream.
///
/// [Section 5.1.1]: https://tools.ietf.org/html/rfc7540#section-5.1.1
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct StreamId(u32);
@@ -10,8 +20,10 @@ pub struct StreamIdOverflow;
const STREAM_ID_MASK: u32 = 1 << 31;
impl StreamId {
/// Stream ID 0.
pub const ZERO: StreamId = StreamId(0);
/// The maximum allowed stream ID.
pub const MAX: StreamId = StreamId(u32::MAX >> 1);
/// Parse the stream ID
@@ -25,25 +37,34 @@ impl StreamId {
(StreamId(unpacked & !STREAM_ID_MASK), flag)
}
/// Returns true if this stream ID corresponds to a stream that
/// was initiated by the client.
pub fn is_client_initiated(&self) -> bool {
let id = self.0;
id != 0 && id % 2 == 1
}
/// Returns true if this stream ID corresponds to a stream that
/// was initiated by the server.
pub fn is_server_initiated(&self) -> bool {
let id = self.0;
id != 0 && id % 2 == 0
}
/// Return a new `StreamId` for stream 0.
#[inline]
pub fn zero() -> StreamId {
StreamId::ZERO
}
/// Returns true if this stream ID is zero.
pub fn is_zero(&self) -> bool {
self.0 == 0
}
/// Returns the next stream ID initiated by the same peer as this stream
/// ID, or an error if incrementing this stream ID would overflow the
/// maximum.
pub fn next_id(&self) -> Result<StreamId, StreamIdOverflow> {
let next = self.0 + 2;
if next > StreamId::MAX.0 {

View File

@@ -128,7 +128,7 @@ pub mod server;
mod share;
pub use error::{Error, Reason};
pub use share::{SendStream, RecvStream, ReleaseCapacity};
pub use share::{SendStream, StreamId, RecvStream, ReleaseCapacity};
#[cfg(feature = "unstable")]
pub use codec::{Codec, RecvError, SendError, UserError};

View File

@@ -946,6 +946,10 @@ impl<B> StreamRef<B> {
{
self.opaque.clone()
}
pub fn stream_id(&self) -> StreamId {
self.opaque.stream_id()
}
}
impl<B> Clone for StreamRef<B> {
@@ -1018,6 +1022,13 @@ impl OpaqueStreamRef {
.recv
.release_capacity(capacity, &mut stream, &mut me.actions.task)
}
pub fn stream_id(&self) -> StreamId {
self.inner.lock()
.unwrap()
.store[self.key]
.id
}
}
impl fmt::Debug for OpaqueStreamRef {

View File

@@ -988,6 +988,15 @@ impl<B: IntoBuf> SendResponse<B> {
self.inner.poll_reset(proto::PollReset::AwaitingHeaders)
}
/// Returns the stream ID of the response stream.
///
/// # Panics
///
/// If the lock on the strean store has been poisoned.
pub fn stream_id(&self) -> ::StreamId {
::StreamId::from_internal(self.inner.stream_id())
}
// TODO: Support reserving push promises.
}

View File

@@ -96,6 +96,19 @@ pub struct SendStream<B: IntoBuf> {
inner: proto::StreamRef<B::Buf>,
}
/// A stream identifier, as described in [Section 5.1.1] of RFC 7540.
///
/// Streams are identified with an unsigned 31-bit integer. Streams
/// initiated by a client MUST use odd-numbered stream identifiers; those
/// initiated by the server MUST use even-numbered stream identifiers. A
/// stream identifier of zero (0x0) is used for connection control
/// messages; the stream identifier of zero cannot be used to establish a
/// new stream.
///
/// [Section 5.1.1]: https://tools.ietf.org/html/rfc7540#section-5.1.1
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct StreamId(u32);
/// Receives the body stream and trailers from the remote peer.
///
/// A `RecvStream` is provided by [`client::ResponseFuture`] and
@@ -335,8 +348,24 @@ impl<B: IntoBuf> SendStream<B> {
pub fn poll_reset(&mut self) -> Poll<Reason, ::Error> {
self.inner.poll_reset(proto::PollReset::Streaming)
}
/// Returns the stream ID of this `SendStream`.
///
/// # Panics
///
/// If the lock on the stream store has been poisoned.
pub fn stream_id(&self) -> StreamId {
StreamId::from_internal(self.inner.stream_id())
}
}
// ===== impl StreamId =====
impl StreamId {
pub(crate) fn from_internal(id: ::frame::StreamId) -> Self {
StreamId(id.into())
}
}
// ===== impl RecvStream =====
impl RecvStream {
@@ -370,6 +399,15 @@ impl RecvStream {
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ::Error> {
self.inner.inner.poll_trailers().map_err(Into::into)
}
/// Returns the stream ID of this stream.
///
/// # Panics
///
/// If the lock on the stream store has been poisoned.
pub fn stream_id(&self) -> StreamId {
self.inner.stream_id()
}
}
impl futures::Stream for RecvStream {
@@ -396,6 +434,16 @@ impl ReleaseCapacity {
ReleaseCapacity { inner }
}
/// Returns the stream ID of the stream whose capacity will
/// be released by this `ReleaseCapacity`.
///
/// # Panics
///
/// If the lock on the stream store has been poisoned.
pub fn stream_id(&self) -> StreamId {
StreamId::from_internal(self.inner.stream_id())
}
/// Release window capacity back to remote stream.
///
/// This releases capacity back to the stream level and the connection level