diff --git a/src/client.rs b/src/client.rs index d008274..a94f26f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1353,6 +1353,17 @@ impl Future for ResponseFuture { } } +impl ResponseFuture { + /// Returns the stream ID of the response stream. + /// + /// # Panics + /// + /// If the lock on the stream store has been poisoned. + pub fn stream_id(&self) -> ::StreamId { + ::StreamId::from_internal(self.inner.stream_id()) + } +} + // ===== impl Peer ===== impl Peer { diff --git a/src/frame/stream_id.rs b/src/frame/stream_id.rs index 7406e70..039936f 100644 --- a/src/frame/stream_id.rs +++ b/src/frame/stream_id.rs @@ -1,6 +1,16 @@ use byteorder::{BigEndian, ByteOrder}; use std::u32; +/// A stream identifier, as described in [Section 5.1.1] of RFC 7540. +/// +/// Streams are identified with an unsigned 31-bit integer. Streams +/// initiated by a client MUST use odd-numbered stream identifiers; those +/// initiated by the server MUST use even-numbered stream identifiers. A +/// stream identifier of zero (0x0) is used for connection control +/// messages; the stream identifier of zero cannot be used to establish a +/// new stream. +/// +/// [Section 5.1.1]: https://tools.ietf.org/html/rfc7540#section-5.1.1 #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct StreamId(u32); @@ -10,8 +20,10 @@ pub struct StreamIdOverflow; const STREAM_ID_MASK: u32 = 1 << 31; impl StreamId { + /// Stream ID 0. pub const ZERO: StreamId = StreamId(0); + /// The maximum allowed stream ID. pub const MAX: StreamId = StreamId(u32::MAX >> 1); /// Parse the stream ID @@ -25,25 +37,34 @@ impl StreamId { (StreamId(unpacked & !STREAM_ID_MASK), flag) } + /// Returns true if this stream ID corresponds to a stream that + /// was initiated by the client. pub fn is_client_initiated(&self) -> bool { let id = self.0; id != 0 && id % 2 == 1 } + /// Returns true if this stream ID corresponds to a stream that + /// was initiated by the server. pub fn is_server_initiated(&self) -> bool { let id = self.0; id != 0 && id % 2 == 0 } + /// Return a new `StreamId` for stream 0. #[inline] pub fn zero() -> StreamId { StreamId::ZERO } + /// Returns true if this stream ID is zero. pub fn is_zero(&self) -> bool { self.0 == 0 } + /// Returns the next stream ID initiated by the same peer as this stream + /// ID, or an error if incrementing this stream ID would overflow the + /// maximum. pub fn next_id(&self) -> Result { let next = self.0 + 2; if next > StreamId::MAX.0 { diff --git a/src/lib.rs b/src/lib.rs index b1c381c..f723ec3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -128,7 +128,7 @@ pub mod server; mod share; pub use error::{Error, Reason}; -pub use share::{SendStream, RecvStream, ReleaseCapacity}; +pub use share::{SendStream, StreamId, RecvStream, ReleaseCapacity}; #[cfg(feature = "unstable")] pub use codec::{Codec, RecvError, SendError, UserError}; diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 2310143..de59576 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -946,6 +946,10 @@ impl StreamRef { { self.opaque.clone() } + + pub fn stream_id(&self) -> StreamId { + self.opaque.stream_id() + } } impl Clone for StreamRef { @@ -1018,6 +1022,13 @@ impl OpaqueStreamRef { .recv .release_capacity(capacity, &mut stream, &mut me.actions.task) } + + pub fn stream_id(&self) -> StreamId { + self.inner.lock() + .unwrap() + .store[self.key] + .id + } } impl fmt::Debug for OpaqueStreamRef { diff --git a/src/server.rs b/src/server.rs index bd2185c..e07946e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -988,6 +988,15 @@ impl SendResponse { self.inner.poll_reset(proto::PollReset::AwaitingHeaders) } + /// Returns the stream ID of the response stream. + /// + /// # Panics + /// + /// If the lock on the strean store has been poisoned. + pub fn stream_id(&self) -> ::StreamId { + ::StreamId::from_internal(self.inner.stream_id()) + } + // TODO: Support reserving push promises. } diff --git a/src/share.rs b/src/share.rs index dc628b7..d98a592 100644 --- a/src/share.rs +++ b/src/share.rs @@ -96,6 +96,19 @@ pub struct SendStream { inner: proto::StreamRef, } +/// A stream identifier, as described in [Section 5.1.1] of RFC 7540. +/// +/// Streams are identified with an unsigned 31-bit integer. Streams +/// initiated by a client MUST use odd-numbered stream identifiers; those +/// initiated by the server MUST use even-numbered stream identifiers. A +/// stream identifier of zero (0x0) is used for connection control +/// messages; the stream identifier of zero cannot be used to establish a +/// new stream. +/// +/// [Section 5.1.1]: https://tools.ietf.org/html/rfc7540#section-5.1.1 +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct StreamId(u32); + /// Receives the body stream and trailers from the remote peer. /// /// A `RecvStream` is provided by [`client::ResponseFuture`] and @@ -335,8 +348,24 @@ impl SendStream { pub fn poll_reset(&mut self) -> Poll { self.inner.poll_reset(proto::PollReset::Streaming) } + + /// Returns the stream ID of this `SendStream`. + /// + /// # Panics + /// + /// If the lock on the stream store has been poisoned. + pub fn stream_id(&self) -> StreamId { + StreamId::from_internal(self.inner.stream_id()) + } } +// ===== impl StreamId ===== + +impl StreamId { + pub(crate) fn from_internal(id: ::frame::StreamId) -> Self { + StreamId(id.into()) + } +} // ===== impl RecvStream ===== impl RecvStream { @@ -370,6 +399,15 @@ impl RecvStream { pub fn poll_trailers(&mut self) -> Poll, ::Error> { self.inner.inner.poll_trailers().map_err(Into::into) } + + /// Returns the stream ID of this stream. + /// + /// # Panics + /// + /// If the lock on the stream store has been poisoned. + pub fn stream_id(&self) -> StreamId { + self.inner.stream_id() + } } impl futures::Stream for RecvStream { @@ -396,6 +434,16 @@ impl ReleaseCapacity { ReleaseCapacity { inner } } + /// Returns the stream ID of the stream whose capacity will + /// be released by this `ReleaseCapacity`. + /// + /// # Panics + /// + /// If the lock on the stream store has been poisoned. + pub fn stream_id(&self) -> StreamId { + StreamId::from_internal(self.inner.stream_id()) + } + /// Release window capacity back to remote stream. /// /// This releases capacity back to the stream level and the connection level