From 9bb34d907a8c040ee9ab86577171f576a72d1e31 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 25 Aug 2017 23:45:15 -0400 Subject: [PATCH] Thread P generic through all --- examples/client.rs | 2 +- src/client.rs | 41 +++++++++- src/frame/headers.rs | 8 -- src/lib.rs | 41 ---------- src/proto/connection.rs | 22 +++--- src/proto/mod.rs | 2 - src/proto/settings.rs | 7 +- src/proto/streams/buffer.rs | 30 ++++---- src/proto/streams/prioritize.rs | 39 +++++----- src/proto/streams/recv.rs | 73 +++++++++--------- src/proto/streams/send.rs | 45 ++++++----- src/proto/streams/state.rs | 10 +++ src/proto/streams/store.rs | 111 +++++++++++++++++---------- src/proto/streams/stream.rs | 56 +++++++------- src/proto/streams/streams.rs | 128 ++++++++++++++++---------------- src/server.rs | 44 +++++++++-- 16 files changed, 368 insertions(+), 291 deletions(-) diff --git a/examples/client.rs b/examples/client.rs index d0d34dd..d0b3dcb 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -8,7 +8,7 @@ extern crate io_dump; extern crate env_logger; use h2::*; -use h2::client::Client; +use h2::client::{Client, Body}; use http::*; use futures::*; diff --git a/src/client.rs b/src/client.rs index 055de34..308521f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,4 @@ use {frame, HeaderMap, ConnectionError}; -use Body; use frame::StreamId; use proto::{self, Connection, WindowSize}; @@ -23,7 +22,12 @@ pub struct Client { #[derive(Debug)] pub struct Stream { - inner: proto::StreamRef, + inner: proto::StreamRef, +} + +#[derive(Debug)] +pub struct Body { + inner: proto::StreamRef, } #[derive(Debug)] @@ -168,14 +172,14 @@ impl Stream { pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> { - self.inner.send_data::(data.into_buf(), end_of_stream) + self.inner.send_data(data.into_buf(), end_of_stream) } /// Send trailers pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError> { - self.inner.send_trailers::(trailers) + self.inner.send_trailers(trailers) } } @@ -188,6 +192,35 @@ impl Future for Stream { } } +// ===== impl Body ===== + +impl Body { + pub fn is_empty(&self) -> bool { + // If the recv side is closed and the receive queue is empty, the body is empty. + self.inner.body_is_empty() + } + + pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> { + self.inner.release_capacity(sz as proto::WindowSize) + } + + /// Poll trailers + /// + /// This function **must** not be called until `Body::poll` returns `None`. + pub fn poll_trailers(&mut self) -> Poll, ConnectionError> { + self.inner.poll_trailers() + } +} + +impl ::futures::Stream for Body { + type Item = Bytes; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, Self::Error> { + self.inner.poll_data() + } +} + // ===== impl Peer ===== impl proto::Peer for Peer { diff --git a/src/frame/headers.rs b/src/frame/headers.rs index dfacd0f..fd39963 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -249,14 +249,6 @@ impl Headers { Ok(()) } - /// Returns `true` if the frame represents trailers - /// - /// Trailers are header frames that contain no pseudo headers. - pub fn is_trailers(&self) -> bool { - self.pseudo.method.is_none() && - self.pseudo.status.is_none() - } - pub fn stream_id(&self) -> StreamId { self.stream_id } diff --git a/src/lib.rs b/src/lib.rs index 69a3e0c..be82b58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,46 +33,5 @@ pub mod server; pub use error::{ConnectionError, Reason}; -use bytes::Bytes; - // TODO: remove if carllerche/http#90 lands pub type HeaderMap = http::HeaderMap; - -// TODO: Move into other location - -use bytes::IntoBuf; -use futures::Poll; - -#[derive(Debug)] -pub struct Body { - inner: proto::StreamRef, -} - -// ===== impl Body ===== - -impl Body { - pub fn is_empty(&self) -> bool { - // If the recv side is closed and the receive queue is empty, the body is empty. - self.inner.body_is_empty() - } - - pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> { - self.inner.release_capacity(sz as proto::WindowSize) - } - - /// Poll trailers - /// - /// This function **must** not be called until `Body::poll` returns `None`. - pub fn poll_trailers(&mut self) -> Poll, ConnectionError> { - self.inner.poll_trailers() - } -} - -impl futures::Stream for Body { - type Item = Bytes; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll_data() - } -} diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 1ee47af..9b640f7 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -11,7 +11,9 @@ use std::marker::PhantomData; /// An H2 connection #[derive(Debug)] -pub(crate) struct Connection { +pub(crate) struct Connection + where P: Peer, +{ /// Tracks the connection level state transitions. state: State, @@ -25,7 +27,7 @@ pub(crate) struct Connection { settings: Settings, /// Stream state handler - streams: Streams, + streams: Streams, /// Client or server _phantom: PhantomData

, @@ -53,7 +55,7 @@ impl Connection { pub fn new(codec: Codec>) -> Connection { // TODO: Actually configure - let streams = Streams::new::

(streams::Config { + let streams = Streams::new(streams::Config { max_remote_initiated: None, init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, max_local_initiated: None, @@ -147,7 +149,7 @@ impl Connection // Stream level error, reset the stream Err(Stream { id, reason }) => { trace!("stream level error; id={:?}; reason={:?}", id, reason); - self.streams.send_reset::

(id, reason); + self.streams.send_reset(id, reason); continue; } // I/O error, nothing more can be done @@ -161,19 +163,19 @@ impl Connection match frame { Some(Headers(frame)) => { trace!("recv HEADERS; frame={:?}", frame); - try!(self.streams.recv_headers::

(frame)); + try!(self.streams.recv_headers(frame)); } Some(Data(frame)) => { trace!("recv DATA; frame={:?}", frame); - try!(self.streams.recv_data::

(frame)); + try!(self.streams.recv_data(frame)); } Some(Reset(frame)) => { trace!("recv RST_STREAM; frame={:?}", frame); - try!(self.streams.recv_reset::

(frame)); + try!(self.streams.recv_reset(frame)); } Some(PushPromise(frame)) => { trace!("recv PUSH_PROMISE; frame={:?}", frame); - self.streams.recv_push_promise::

(frame)?; + self.streams.recv_push_promise(frame)?; } Some(Settings(frame)) => { trace!("recv SETTINGS; frame={:?}", frame); @@ -244,7 +246,7 @@ impl Connection { /// Initialize a new HTTP/2.0 stream and send the message. pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) - -> Result, ConnectionError> + -> Result, ConnectionError> { self.streams.send_request(request, end_of_stream) } @@ -254,7 +256,7 @@ impl Connection where T: AsyncRead + AsyncWrite, B: IntoBuf, { - pub fn next_incoming(&mut self) -> Option> { + pub fn next_incoming(&mut self) -> Option> { self.streams.next_incoming() } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 8d27d9e..de3365e 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -38,13 +38,11 @@ pub trait Peer { fn is_server() -> bool; - #[doc(hidden)] fn convert_send_message( id: StreamId, headers: Self::Send, end_of_stream: bool) -> frame::Headers; - #[doc(hidden)] fn convert_poll_message(headers: frame::Headers) -> Result; } diff --git a/src/proto/settings.rs b/src/proto/settings.rs index ee405f9..b415651 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -28,13 +28,14 @@ impl Settings { } } - pub fn send_pending_ack(&mut self, - dst: &mut Codec, - streams: &mut Streams) + pub fn send_pending_ack(&mut self, + dst: &mut Codec, + streams: &mut Streams) -> Poll<(), ConnectionError> where T: AsyncWrite, B: Buf, C: Buf, + P: Peer, { trace!("send_pending_ack; pending={:?}", self.pending); diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 2e81501..6ae33f4 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -1,13 +1,11 @@ -use frame::Frame; - use slab::Slab; use std::marker::PhantomData; /// Buffers frames for multiple streams. #[derive(Debug)] -pub struct Buffer { - slab: Slab>, +pub struct Buffer { + slab: Slab>, } /// A sequence of frames in a `Buffer` @@ -25,12 +23,12 @@ struct Indices { } #[derive(Debug)] -struct Slot { - frame: Frame, +struct Slot { + value: T, next: Option, } -impl Buffer { +impl Buffer { pub fn new() -> Self { Buffer { slab: Slab::new(), @@ -38,7 +36,7 @@ impl Buffer { } } -impl Deque { +impl Deque { pub fn new() -> Self { Deque { indices: None, @@ -50,9 +48,9 @@ impl Deque { self.indices.is_none() } - pub fn push_back(&mut self, buf: &mut Buffer, frame: Frame) { + pub fn push_back(&mut self, buf: &mut Buffer, value: T) { let key = buf.slab.insert(Slot { - frame, + value, next: None, }); @@ -70,9 +68,9 @@ impl Deque { } } - pub fn push_front(&mut self, buf: &mut Buffer, frame: Frame) { + pub fn push_front(&mut self, buf: &mut Buffer, value: T) { let key = buf.slab.insert(Slot { - frame, + value, next: None, }); @@ -90,7 +88,7 @@ impl Deque { } } - pub fn pop_front(&mut self, buf: &mut Buffer) -> Option> { + pub fn pop_front(&mut self, buf: &mut Buffer) -> Option { match self.indices { Some(mut idxs) => { let mut slot = buf.slab.remove(idxs.head); @@ -103,16 +101,16 @@ impl Deque { self.indices = Some(idxs); } - return Some(slot.frame); + return Some(slot.value); } None => None, } } - pub fn peek_front<'a>(&self, buf: &'a Buffer) -> Option<&'a Frame> { + pub fn peek_front<'a>(&self, buf: &'a Buffer) -> Option<&'a T> { match self.indices { Some(idxs) => { - Some(&buf.slab[idxs.head].frame) + Some(&buf.slab[idxs.head].value) } None => None, } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 58f5fdf..1411ae6 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -7,18 +7,20 @@ use futures::Sink; use std::{fmt, cmp}; #[derive(Debug)] -pub(super) struct Prioritize { +pub(super) struct Prioritize + where P: Peer, +{ /// Queue of streams waiting for socket capacity to send a frame - pending_send: store::Queue, + pending_send: store::Queue, /// Queue of streams waiting for window capacity to produce data. - pending_capacity: store::Queue, + pending_capacity: store::Queue, /// Connection level flow control governing sent data flow: FlowControl, /// Holds frames that are waiting to be written to the socket - buffer: Buffer, + buffer: Buffer>, } pub(crate) struct Prioritized { @@ -33,10 +35,11 @@ pub(crate) struct Prioritized { // ===== impl Prioritize ===== -impl Prioritize +impl Prioritize where B: Buf, + P: Peer, { - pub fn new(config: &Config) -> Prioritize { + pub fn new(config: &Config) -> Prioritize { let mut flow = FlowControl::new(); flow.inc_window(config.init_local_window_sz) @@ -57,7 +60,7 @@ impl Prioritize /// Queue a frame to be sent to the remote pub fn queue_frame(&mut self, frame: Frame, - stream: &mut store::Ptr, + stream: &mut store::Ptr, task: &mut Option) { // Queue the frame in the buffer @@ -75,7 +78,7 @@ impl Prioritize /// Send a data frame pub fn send_data(&mut self, frame: frame::Data, - stream: &mut store::Ptr, + stream: &mut store::Ptr, task: &mut Option) -> Result<(), ConnectionError> { @@ -134,7 +137,7 @@ impl Prioritize } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { + pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { // Actual capacity is `capacity` + the current amount of buffered data. // It it were less, then we could never send out the buffered data. let capacity = capacity + stream.buffered_send_data; @@ -157,7 +160,7 @@ impl Prioritize pub fn recv_stream_window_update(&mut self, inc: WindowSize, - stream: &mut store::Ptr) + stream: &mut store::Ptr) -> Result<(), ConnectionError> { trace!("recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}", @@ -175,7 +178,7 @@ impl Prioritize pub fn recv_connection_window_update(&mut self, inc: WindowSize, - store: &mut Store) + store: &mut Store) -> Result<(), ConnectionError> { // Update the connection's window @@ -188,7 +191,7 @@ impl Prioritize pub fn assign_connection_capacity(&mut self, inc: WindowSize, store: &mut R) - where R: Resolve + where R: Resolve { self.flow.assign_capacity(inc); @@ -207,7 +210,7 @@ impl Prioritize } /// Request capacity to send data - fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { + fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { let total_requested = stream.requested_send_capacity; // Total requested should never go below actual assigned @@ -279,7 +282,7 @@ impl Prioritize pub fn poll_complete(&mut self, - store: &mut Store, + store: &mut Store, dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, @@ -337,7 +340,7 @@ impl Prioritize /// entirety (large chunks are split up into potentially many data frames). /// In this case, the stream needs to be reprioritized. fn reclaim_frame(&mut self, - store: &mut Store, + store: &mut Store, dst: &mut Codec>) -> bool { trace!("try reclaim frame"); @@ -373,7 +376,7 @@ impl Prioritize /// Push the frame to the front of the stream's deque, scheduling the /// steream if needed. - fn push_back_frame(&mut self, frame: Frame, stream: &mut store::Ptr) { + fn push_back_frame(&mut self, frame: Frame, stream: &mut store::Ptr) { // Push the frame to the front of the stream's deque stream.pending_send.push_front(&mut self.buffer, frame); @@ -383,7 +386,7 @@ impl Prioritize } } - pub fn clear_queue(&mut self, stream: &mut store::Ptr) { + pub fn clear_queue(&mut self, stream: &mut store::Ptr) { trace!("clear_queue; stream-id={:?}", stream.id); // TODO: make this more efficient? @@ -392,7 +395,7 @@ impl Prioritize } } - fn pop_frame(&mut self, store: &mut Store, max_len: usize) + fn pop_frame(&mut self, store: &mut Store, max_len: usize) -> Option>> { trace!("pop_frame"); diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 9756162..5fb7cc8 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -8,7 +8,9 @@ use futures::Sink; use std::marker::PhantomData; #[derive(Debug)] -pub(super) struct Recv { +pub(super) struct Recv + where P: Peer, +{ /// Maximum number of remote initiated streams max_streams: Option, @@ -28,13 +30,13 @@ pub(super) struct Recv { last_processed_id: StreamId, /// Streams that have pending window updates - pending_window_updates: store::Queue, + pending_window_updates: store::Queue, /// New streams to be accepted - pending_accept: store::Queue, + pending_accept: store::Queue, /// Holds frames that are waiting to be read - buffer: Buffer, + buffer: Buffer>, /// Refused StreamId, this represents a frame that must be sent out. refused: Option, @@ -48,8 +50,11 @@ struct Indices { tail: store::Key, } -impl Recv where B: Buf { - pub fn new(config: &Config) -> Self { +impl Recv + where B: Buf, + P: Peer, +{ + pub fn new(config: &Config) -> Self { let next_stream_id = if P::is_server() { 1 } else { @@ -90,12 +95,12 @@ impl Recv where B: Buf { /// Update state reflecting a new, remotely opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self, id: StreamId) + pub fn open(&mut self, id: StreamId) -> Result, ConnectionError> { assert!(self.refused.is_none()); - try!(self.ensure_can_open::

(id)); + try!(self.ensure_can_open(id)); if !self.can_inc_num_streams() { self.refused = Some(id); @@ -105,7 +110,7 @@ impl Recv where B: Buf { Ok(Some(id)) } - pub fn take_request(&mut self, stream: &mut store::Ptr) + pub fn take_request(&mut self, stream: &mut store::Ptr) -> Result, ConnectionError> { match stream.pending_recv.pop_front(&mut self.buffer) { @@ -118,7 +123,7 @@ impl Recv where B: Buf { } } - pub fn poll_response(&mut self, stream: &mut store::Ptr) + pub fn poll_response(&mut self, stream: &mut store::Ptr) -> Poll, ConnectionError> { // If the buffer is not empty, then the first frame must be a HEADERS // frame or the user violated the contract. @@ -139,9 +144,9 @@ impl Recv where B: Buf { } /// Transition the stream state based on receiving headers - pub fn recv_headers(&mut self, - frame: frame::Headers, - stream: &mut store::Ptr) + pub fn recv_headers(&mut self, + frame: frame::Headers, + stream: &mut store::Ptr) -> Result<(), ConnectionError> { trace!("opening stream; init_window={}", self.init_window_sz); @@ -182,9 +187,9 @@ impl Recv where B: Buf { } /// Transition the stream based on receiving trailers - pub fn recv_trailers(&mut self, - frame: frame::Headers, - stream: &mut store::Ptr) + pub fn recv_trailers(&mut self, + frame: frame::Headers, + stream: &mut store::Ptr) -> Result<(), ConnectionError> { // Transition the state @@ -199,7 +204,7 @@ impl Recv where B: Buf { pub fn release_capacity(&mut self, capacity: WindowSize, - stream: &mut store::Ptr, + stream: &mut store::Ptr, task: &mut Option) -> Result<(), ConnectionError> { @@ -225,7 +230,7 @@ impl Recv where B: Buf { Ok(()) } - pub fn body_is_empty(&self, stream: &store::Ptr) -> bool { + pub fn body_is_empty(&self, stream: &store::Ptr) -> bool { if !stream.state.is_recv_closed() { return false; } @@ -237,7 +242,7 @@ impl Recv where B: Buf { pub fn recv_data(&mut self, frame: frame::Data, - stream: &mut store::Ptr) + stream: &mut store::Ptr) -> Result<(), ConnectionError> { let sz = frame.payload().len(); @@ -283,15 +288,15 @@ impl Recv where B: Buf { Ok(()) } - pub fn recv_push_promise(&mut self, - frame: frame::PushPromise, - send: &Send, - stream: store::Key, - store: &mut Store) + pub fn recv_push_promise(&mut self, + frame: frame::PushPromise, + send: &Send, + stream: store::Key, + store: &mut Store) -> Result<(), ConnectionError> { // First, make sure that the values are legit - self.ensure_can_reserve::

(frame.promised_id())?; + self.ensure_can_reserve(frame.promised_id())?; // Make sure that the stream state is valid store[stream].state.ensure_recv_open()?; @@ -343,7 +348,7 @@ impl Recv where B: Buf { Ok(()) } - pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) + pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) -> Result<(), ConnectionError> { let err = ConnectionError::Proto(frame.reason()); @@ -355,7 +360,7 @@ impl Recv where B: Buf { } /// Handle a received error - pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream) { + pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream) { // Receive an error stream.state.recv_err(err); @@ -388,7 +393,7 @@ impl Recv where B: Buf { } /// Returns true if the remote peer can initiate a stream with the given ID. - fn ensure_can_open(&self, id: StreamId) + fn ensure_can_open(&self, id: StreamId) -> Result<(), ConnectionError> { if !P::is_server() { @@ -406,7 +411,7 @@ impl Recv where B: Buf { } /// Returns true if the remote peer can reserve a stream with the given ID. - fn ensure_can_reserve(&self, promised_id: StreamId) + fn ensure_can_reserve(&self, promised_id: StreamId) -> Result<(), ConnectionError> { // TODO: Are there other rules? @@ -446,7 +451,7 @@ impl Recv where B: Buf { } pub fn poll_complete(&mut self, - store: &mut Store, + store: &mut Store, dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, @@ -483,7 +488,7 @@ impl Recv where B: Buf { /// Send stream level window update pub fn send_stream_window_updates(&mut self, - store: &mut Store, + store: &mut Store, dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, @@ -516,12 +521,12 @@ impl Recv where B: Buf { } } - pub fn next_incoming(&mut self, store: &mut Store) -> Option { + pub fn next_incoming(&mut self, store: &mut Store) -> Option { self.pending_accept.pop(store) .map(|ptr| ptr.key()) } - pub fn poll_data(&mut self, stream: &mut Stream) + pub fn poll_data(&mut self, stream: &mut Stream) -> Poll, ConnectionError> { match stream.pending_recv.pop_front(&mut self.buffer) { @@ -548,7 +553,7 @@ impl Recv where B: Buf { } } - pub fn poll_trailers(&mut self, stream: &mut Stream) + pub fn poll_trailers(&mut self, stream: &mut Stream) -> Poll, ConnectionError> { match stream.pending_recv.pop_front(&mut self.buffer) { diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 1390310..725cff9 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -8,7 +8,9 @@ use bytes::Buf; /// Manages state transitions related to outbound frames. #[derive(Debug)] -pub(super) struct Send { +pub(super) struct Send + where P: Peer, +{ /// Maximum number of locally initiated streams max_streams: Option, @@ -25,12 +27,15 @@ pub(super) struct Send { blocked_open: Option, /// Prioritization layer - prioritize: Prioritize, + prioritize: Prioritize, } -impl Send where B: Buf { +impl Send +where B: Buf, + P: Peer, +{ /// Create a new `Send` - pub fn new(config: &Config) -> Self { + pub fn new(config: &Config) -> Self { let next_stream_id = if P::is_server() { 2 } else { 1 }; Send { @@ -48,8 +53,8 @@ impl Send where B: Buf { self.init_window_sz } - pub fn poll_open_ready(&mut self) -> Poll<(), ConnectionError> { - try!(self.ensure_can_open::

()); + pub fn poll_open_ready(&mut self) -> Poll<(), ConnectionError> { + try!(self.ensure_can_open()); if let Some(max) = self.max_streams { if max <= self.num_streams { @@ -64,10 +69,10 @@ impl Send where B: Buf { /// Update state reflecting a new, locally opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self) + pub fn open(&mut self) -> Result { - try!(self.ensure_can_open::

()); + try!(self.ensure_can_open()); if let Some(max) = self.max_streams { if max <= self.num_streams { @@ -86,7 +91,7 @@ impl Send where B: Buf { pub fn send_headers(&mut self, frame: frame::Headers, - stream: &mut store::Ptr, + stream: &mut store::Ptr, task: &mut Option) -> Result<(), ConnectionError> { @@ -102,7 +107,7 @@ impl Send where B: Buf { pub fn send_reset(&mut self, reason: Reason, - stream: &mut store::Ptr, + stream: &mut store::Ptr, task: &mut Option) { if stream.state.is_reset() { @@ -138,7 +143,7 @@ impl Send where B: Buf { pub fn send_data(&mut self, frame: frame::Data, - stream: &mut store::Ptr, + stream: &mut store::Ptr, task: &mut Option) -> Result<(), ConnectionError> { @@ -147,7 +152,7 @@ impl Send where B: Buf { pub fn send_trailers(&mut self, frame: frame::Headers, - stream: &mut store::Ptr, + stream: &mut store::Ptr, task: &mut Option) -> Result<(), ConnectionError> { @@ -165,7 +170,7 @@ impl Send where B: Buf { } pub fn poll_complete(&mut self, - store: &mut Store, + store: &mut Store, dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, @@ -174,11 +179,11 @@ impl Send where B: Buf { } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { + pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { self.prioritize.reserve_capacity(capacity, stream) } - pub fn poll_capacity(&mut self, stream: &mut store::Ptr) + pub fn poll_capacity(&mut self, stream: &mut store::Ptr) -> Poll, ConnectionError> { if !stream.state.is_send_streaming() { @@ -195,7 +200,7 @@ impl Send where B: Buf { } /// Current available stream send capacity - pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { + pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { let available = stream.send_flow.available(); let buffered = stream.buffered_send_data; @@ -208,7 +213,7 @@ impl Send where B: Buf { pub fn recv_connection_window_update(&mut self, frame: frame::WindowUpdate, - store: &mut Store) + store: &mut Store) -> Result<(), ConnectionError> { self.prioritize.recv_connection_window_update(frame.size_increment(), store) @@ -216,7 +221,7 @@ impl Send where B: Buf { pub fn recv_stream_window_update(&mut self, sz: WindowSize, - stream: &mut store::Ptr, + stream: &mut store::Ptr, task: &mut Option) -> Result<(), ConnectionError> { @@ -230,7 +235,7 @@ impl Send where B: Buf { pub fn apply_remote_settings(&mut self, settings: &frame::Settings, - store: &mut Store, + store: &mut Store, task: &mut Option) -> Result<(), ConnectionError> { @@ -311,7 +316,7 @@ impl Send where B: Buf { } /// Returns true if the local actor can initiate a stream with the given ID. - fn ensure_can_open(&self) -> Result<(), ConnectionError> { + fn ensure_can_open(&self) -> Result<(), ConnectionError> { if P::is_server() { // Servers cannot open streams. PushPromise must first be reserved. return Err(UnexpectedFrameType.into()); diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 98783a4..8e477a3 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -274,6 +274,16 @@ impl State { } } + /// Returns true when the stream is in a state to receive headers + pub fn is_recv_headers(&self) -> bool { + match self.inner { + Idle => true, + Open { remote: AwaitingHeaders, .. } => true, + HalfClosedLocal(AwaitingHeaders) => true, + _ => false, + } + } + pub fn is_recv_streaming(&self) -> bool { match self.inner { Open { remote: Peer::Streaming, .. } => true, diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 0954c1f..8280214 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -8,15 +8,19 @@ use std::marker::PhantomData; /// Storage for streams #[derive(Debug)] -pub(super) struct Store { - slab: slab::Slab>, +pub(super) struct Store + where P: Peer, +{ + slab: slab::Slab>, ids: HashMap, } /// "Pointer" to an entry in the store -pub(super) struct Ptr<'a, B: 'a> { +pub(super) struct Ptr<'a, B: 'a, P> + where P: Peer + 'a, +{ key: Key, - slab: &'a mut slab::Slab>, + slab: &'a mut slab::Slab>, } /// References an entry in the store. @@ -24,21 +28,23 @@ pub(super) struct Ptr<'a, B: 'a> { pub(super) struct Key(usize); #[derive(Debug)] -pub(super) struct Queue { +pub(super) struct Queue + where P: Peer, +{ indices: Option, - _p: PhantomData<(B, N)>, + _p: PhantomData<(B, N, P)>, } pub(super) trait Next { - fn next(stream: &Stream) -> Option; + fn next(stream: &Stream) -> Option; - fn set_next(stream: &mut Stream, key: Option); + fn set_next(stream: &mut Stream, key: Option); - fn take_next(stream: &mut Stream) -> Option; + fn take_next(stream: &mut Stream) -> Option; - fn is_queued(stream: &Stream) -> bool; + fn is_queued(stream: &Stream) -> bool; - fn set_queued(stream: &mut Stream, val: bool); + fn set_queued(stream: &mut Stream, val: bool); } /// A linked list @@ -48,27 +54,33 @@ struct Indices { pub tail: Key, } -pub(super) enum Entry<'a, B: 'a> { +pub(super) enum Entry<'a, B: 'a, P: Peer + 'a> { Occupied(OccupiedEntry<'a>), - Vacant(VacantEntry<'a, B>), + Vacant(VacantEntry<'a, B, P>), } pub(super) struct OccupiedEntry<'a> { ids: hash_map::OccupiedEntry<'a, StreamId, usize>, } -pub(super) struct VacantEntry<'a, B: 'a> { +pub(super) struct VacantEntry<'a, B: 'a, P> + where P: Peer + 'a, +{ ids: hash_map::VacantEntry<'a, StreamId, usize>, - slab: &'a mut slab::Slab>, + slab: &'a mut slab::Slab>, } -pub(super) trait Resolve { - fn resolve(&mut self, key: Key) -> Ptr; +pub(super) trait Resolve + where P: Peer, +{ + fn resolve(&mut self, key: Key) -> Ptr; } // ===== impl Store ===== -impl Store { +impl Store + where P: Peer, +{ pub fn new() -> Self { Store { slab: slab::Slab::new(), @@ -76,7 +88,7 @@ impl Store { } } - pub fn find_mut(&mut self, id: &StreamId) -> Option> { + pub fn find_mut(&mut self, id: &StreamId) -> Option> { if let Some(&key) = self.ids.get(id) { Some(Ptr { key: Key(key), @@ -87,7 +99,7 @@ impl Store { } } - pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { + pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { let key = self.slab.insert(val); assert!(self.ids.insert(id, key).is_none()); @@ -97,7 +109,7 @@ impl Store { } } - pub fn find_entry(&mut self, id: StreamId) -> Entry { + pub fn find_entry(&mut self, id: StreamId) -> Entry { use self::hash_map::Entry::*; match self.ids.entry(id) { @@ -116,7 +128,7 @@ impl Store { } pub fn for_each(&mut self, mut f: F) -> Result<(), ConnectionError> - where F: FnMut(Ptr) -> Result<(), ConnectionError>, + where F: FnMut(Ptr) -> Result<(), ConnectionError>, { for &key in self.ids.values() { f(Ptr { @@ -129,8 +141,10 @@ impl Store { } } -impl Resolve for Store { - fn resolve(&mut self, key: Key) -> Ptr { +impl Resolve for Store + where P: Peer, +{ + fn resolve(&mut self, key: Key) -> Ptr { Ptr { key: key, slab: &mut self.slab, @@ -138,15 +152,19 @@ impl Resolve for Store { } } -impl ops::Index for Store { - type Output = Stream; +impl ops::Index for Store + where P: Peer, +{ + type Output = Stream; fn index(&self, key: Key) -> &Self::Output { self.slab.index(key.0) } } -impl ops::IndexMut for Store { +impl ops::IndexMut for Store + where P: Peer, +{ fn index_mut(&mut self, key: Key) -> &mut Self::Output { self.slab.index_mut(key.0) } @@ -154,8 +172,9 @@ impl ops::IndexMut for Store { // ===== impl Queue ===== -impl Queue +impl Queue where N: Next, + P: Peer, { pub fn new() -> Self { Queue { @@ -178,7 +197,7 @@ impl Queue /// Queue the stream. /// /// If the stream is already contained by the list, return `false`. - pub fn push(&mut self, stream: &mut store::Ptr) -> bool { + pub fn push(&mut self, stream: &mut store::Ptr) -> bool { trace!("Queue::push"); if N::is_queued(stream) { @@ -215,8 +234,8 @@ impl Queue true } - pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option> - where R: Resolve + pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option> + where R: Resolve { if let Some(mut idxs) = self.indices { let mut stream = store.resolve(idxs.head); @@ -241,14 +260,18 @@ impl Queue // ===== impl Ptr ===== -impl<'a, B: 'a> Ptr<'a, B> { +impl<'a, B: 'a, P> Ptr<'a, B, P> + where P: Peer, +{ pub fn key(&self) -> Key { self.key } } -impl<'a, B: 'a> Resolve for Ptr<'a, B> { - fn resolve(&mut self, key: Key) -> Ptr { +impl<'a, B: 'a, P> Resolve for Ptr<'a, B, P> + where P: Peer, +{ + fn resolve(&mut self, key: Key) -> Ptr { Ptr { key: key, slab: &mut *self.slab, @@ -256,16 +279,20 @@ impl<'a, B: 'a> Resolve for Ptr<'a, B> { } } -impl<'a, B: 'a> ops::Deref for Ptr<'a, B> { - type Target = Stream; +impl<'a, B: 'a, P> ops::Deref for Ptr<'a, B, P> + where P: Peer, +{ + type Target = Stream; - fn deref(&self) -> &Stream { + fn deref(&self) -> &Stream { &self.slab[self.key.0] } } -impl<'a, B: 'a> ops::DerefMut for Ptr<'a, B> { - fn deref_mut(&mut self) -> &mut Stream { +impl<'a, B: 'a, P> ops::DerefMut for Ptr<'a, B, P> + where P: Peer, +{ + fn deref_mut(&mut self) -> &mut Stream { &mut self.slab[self.key.0] } } @@ -280,8 +307,10 @@ impl<'a> OccupiedEntry<'a> { // ===== impl VacantEntry ===== -impl<'a, B> VacantEntry<'a, B> { - pub fn insert(self, value: Stream) -> Key { +impl<'a, B, P> VacantEntry<'a, B, P> + where P: Peer, +{ + pub fn insert(self, value: Stream) -> Key { // Insert the value in the slab let key = self.slab.insert(value); diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 6bf1ca7..cfafbb7 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -1,7 +1,9 @@ use super::*; #[derive(Debug)] -pub(super) struct Stream { +pub(super) struct Stream + where P: Peer, +{ /// The h2 stream identifier pub id: StreamId, @@ -30,7 +32,7 @@ pub(super) struct Stream { pub send_task: Option, /// Frames pending for this stream being sent to the socket - pub pending_send: buffer::Deque, + pub pending_send: buffer::Deque>, /// Next node in the linked list of streams waiting for additional /// connection level capacity. @@ -62,13 +64,13 @@ pub(super) struct Stream { pub is_pending_window_update: bool, /// Frames pending for this stream to read - pub pending_recv: buffer::Deque, + pub pending_recv: buffer::Deque>, /// Task tracking receiving frames pub recv_task: Option, /// The stream's pending push promises - pub pending_push_promises: store::Queue, + pub pending_push_promises: store::Queue, } #[derive(Debug)] @@ -83,10 +85,12 @@ pub(super) struct NextSendCapacity; #[derive(Debug)] pub(super) struct NextWindowUpdate; -impl Stream { +impl Stream + where P: Peer, +{ pub fn new(id: StreamId, init_send_window: WindowSize, - init_recv_window: WindowSize) -> Stream + init_recv_window: WindowSize) -> Stream { let mut send_flow = FlowControl::new(); let mut recv_flow = FlowControl::new(); @@ -154,89 +158,89 @@ impl Stream { } impl store::Next for NextAccept { - fn next(stream: &Stream) -> Option { + fn next(stream: &Stream) -> Option { stream.next_pending_accept } - fn set_next(stream: &mut Stream, key: Option) { + fn set_next(stream: &mut Stream, key: Option) { stream.next_pending_accept = key; } - fn take_next(stream: &mut Stream) -> Option { + fn take_next(stream: &mut Stream) -> Option { stream.next_pending_accept.take() } - fn is_queued(stream: &Stream) -> bool { + fn is_queued(stream: &Stream) -> bool { stream.is_pending_accept } - fn set_queued(stream: &mut Stream, val: bool) { + fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_accept = val; } } impl store::Next for NextSend { - fn next(stream: &Stream) -> Option { + fn next(stream: &Stream) -> Option { stream.next_pending_send } - fn set_next(stream: &mut Stream, key: Option) { + fn set_next(stream: &mut Stream, key: Option) { stream.next_pending_send = key; } - fn take_next(stream: &mut Stream) -> Option { + fn take_next(stream: &mut Stream) -> Option { stream.next_pending_send.take() } - fn is_queued(stream: &Stream) -> bool { + fn is_queued(stream: &Stream) -> bool { stream.is_pending_send } - fn set_queued(stream: &mut Stream, val: bool) { + fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_send = val; } } impl store::Next for NextSendCapacity { - fn next(stream: &Stream) -> Option { + fn next(stream: &Stream) -> Option { stream.next_pending_send_capacity } - fn set_next(stream: &mut Stream, key: Option) { + fn set_next(stream: &mut Stream, key: Option) { stream.next_pending_send_capacity = key; } - fn take_next(stream: &mut Stream) -> Option { + fn take_next(stream: &mut Stream) -> Option { stream.next_pending_send_capacity.take() } - fn is_queued(stream: &Stream) -> bool { + fn is_queued(stream: &Stream) -> bool { stream.is_pending_send_capacity } - fn set_queued(stream: &mut Stream, val: bool) { + fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_send_capacity = val; } } impl store::Next for NextWindowUpdate { - fn next(stream: &Stream) -> Option { + fn next(stream: &Stream) -> Option { stream.next_window_update } - fn set_next(stream: &mut Stream, key: Option) { + fn set_next(stream: &mut Stream, key: Option) { stream.next_window_update = key; } - fn take_next(stream: &mut Stream) -> Option { + fn take_next(stream: &mut Stream) -> Option { stream.next_window_update.take() } - fn is_queued(stream: &Stream) -> bool { + fn is_queued(stream: &Stream) -> bool { stream.is_pending_window_update } - fn set_queued(stream: &mut Stream, val: bool) { + fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_window_update = val; } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index bea5348..2d5f292 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -6,14 +6,18 @@ use super::store::Resolve; use std::sync::{Arc, Mutex}; #[derive(Debug)] -pub(crate) struct Streams { - inner: Arc>>, +pub(crate) struct Streams + where P: Peer, +{ + inner: Arc>>, } /// Reference to the stream state #[derive(Debug)] -pub(crate) struct StreamRef { - inner: Arc>>, +pub(crate) struct StreamRef + where P: Peer, +{ + inner: Arc>>, key: store::Key, } @@ -22,32 +26,37 @@ pub(crate) struct StreamRef { /// /// TODO: better name #[derive(Debug)] -struct Inner { - actions: Actions, - store: Store, +struct Inner + where P: Peer, +{ + actions: Actions, + store: Store, } #[derive(Debug)] -struct Actions { +struct Actions + where P: Peer, +{ /// Manages state transitions initiated by receiving frames - recv: Recv, + recv: Recv, /// Manages state transitions initiated by sending frames - send: Send, + send: Send, /// Task that calls `poll_complete`. task: Option, } -impl Streams +impl Streams where B: Buf, + P: Peer, { - pub fn new(config: Config) -> Self { + pub fn new(config: Config) -> Self { Streams { inner: Arc::new(Mutex::new(Inner { actions: Actions { - recv: Recv::new::

(&config), - send: Send::new::

(&config), + recv: Recv::new(&config), + send: Send::new(&config), task: None, }, store: Store::new(), @@ -56,7 +65,7 @@ impl Streams } /// Process inbound headers - pub fn recv_headers(&mut self, frame: frame::Headers) + pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), ConnectionError> { let id = frame.stream_id(); @@ -66,15 +75,7 @@ impl Streams let key = match me.store.find_entry(id) { Entry::Occupied(e) => e.key(), Entry::Vacant(e) => { - // Trailers cannot open a stream. Trailers are header frames - // that do not contain pseudo headers. Requests MUST contain a - // method and responses MUST contain a status. If they do not,t - // hey are considered to be malformed. - if frame.is_trailers() { - return Err(ProtocolError.into()); - } - - match try!(me.actions.recv.open::

(id)) { + match try!(me.actions.recv.open(id)) { Some(stream_id) => { let stream = Stream::new( stream_id, @@ -90,21 +91,21 @@ impl Streams let stream = me.store.resolve(key); - me.actions.transition::(stream, |actions, stream| { - if frame.is_trailers() { + me.actions.transition(stream, |actions, stream| { + if stream.state.is_recv_headers() { + actions.recv.recv_headers(frame, stream) + } else { if !frame.is_end_stream() { // TODO: Is this the right error return Err(ProtocolError.into()); } - actions.recv.recv_trailers::

(frame, stream) - } else { - actions.recv.recv_headers::

(frame, stream) + actions.recv.recv_trailers(frame, stream) } }) } - pub fn recv_data(&mut self, frame: frame::Data) + pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), ConnectionError> { let mut me = self.inner.lock().unwrap(); @@ -117,12 +118,12 @@ impl Streams None => return Err(ProtocolError.into()), }; - me.actions.transition::(stream, |actions, stream| { + me.actions.transition(stream, |actions, stream| { actions.recv.recv_data(frame, stream) }) } - pub fn recv_reset(&mut self, frame: frame::Reset) + pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), ConnectionError> { let mut me = self.inner.lock().unwrap(); @@ -138,12 +139,12 @@ impl Streams Some(stream) => stream, None => { // TODO: Are there other error cases? - me.actions.ensure_not_idle::

(id)?; + me.actions.ensure_not_idle(id)?; return Ok(()); } }; - me.actions.transition::(stream, |actions, stream| { + me.actions.transition(stream, |actions, stream| { actions.recv.recv_reset(frame, stream)?; assert!(stream.state.is_closed()); Ok(()) @@ -190,7 +191,7 @@ impl Streams Ok(()) } - pub fn recv_push_promise(&mut self, frame: frame::PushPromise) + pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), ConnectionError> { let mut me = self.inner.lock().unwrap(); @@ -203,11 +204,11 @@ impl Streams None => return Err(ProtocolError.into()), }; - me.actions.recv.recv_push_promise::

( + me.actions.recv.recv_push_promise( frame, &me.actions.send, stream, &mut me.store) } - pub fn next_incoming(&mut self) -> Option> { + pub fn next_incoming(&mut self) -> Option> { let key = { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -268,11 +269,11 @@ impl Streams let mut me = self.inner.lock().unwrap(); let me = &mut *me; - me.actions.send.poll_open_ready::() + me.actions.send.poll_open_ready() } pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) - -> Result, ConnectionError> + -> Result, ConnectionError> { // TODO: There is a hazard with assigning a stream ID before the // prioritize layer. If prioritization reorders new streams, this @@ -284,7 +285,7 @@ impl Streams let me = &mut *me; // Initialize a new stream. This fails if the connection is at capacity. - let stream_id = me.actions.send.open::()?; + let stream_id = me.actions.send.open()?; let stream = Stream::new( stream_id, @@ -313,14 +314,14 @@ impl Streams }) } - pub fn send_reset(&mut self, id: StreamId, reason: Reason) { + pub fn send_reset(&mut self, id: StreamId, reason: Reason) { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let key = match me.store.find_entry(id) { Entry::Occupied(e) => e.key(), Entry::Vacant(e) => { - match me.actions.recv.open::

(id) { + match me.actions.recv.open(id) { Ok(Some(stream_id)) => { let stream = Stream::new( stream_id, 0, 0); @@ -335,7 +336,7 @@ impl Streams let stream = me.store.resolve(key); - me.actions.transition::(stream, move |actions, stream| { + me.actions.transition(stream, move |actions, stream| { actions.send.send_reset(reason, stream, &mut actions.task) }) } @@ -343,10 +344,11 @@ impl Streams // ===== impl StreamRef ===== -impl StreamRef +impl StreamRef where B: Buf, + P: Peer, { - pub fn send_data(&mut self, data: B, end_of_stream: bool) + pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> { let mut me = self.inner.lock().unwrap(); @@ -357,13 +359,13 @@ impl StreamRef // Create the data frame let frame = frame::Data::from_buf(stream.id, data, end_of_stream); - me.actions.transition::(stream, |actions, stream| { + me.actions.transition(stream, |actions, stream| { // Send the data frame actions.send.send_data(frame, stream, &mut actions.task) }) } - pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError> + pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -373,7 +375,7 @@ impl StreamRef // Create the trailers frame let frame = frame::Headers::trailers(stream.id, trailers); - me.actions.transition::(stream, |actions, stream| { + me.actions.transition(stream, |actions, stream| { // Send the trailers frame actions.send.send_trailers(frame, stream, &mut actions.task) }) @@ -394,12 +396,12 @@ impl StreamRef me.actions.recv.take_request(&mut stream) } - pub fn send_reset(&mut self, reason: Reason) { + pub fn send_reset(&mut self, reason: Reason) { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let stream = me.store.resolve(self.key); - me.actions.transition::(stream, move |actions, stream| { + me.actions.transition(stream, move |actions, stream| { actions.send.send_reset(reason, stream, &mut actions.task) }) } @@ -415,7 +417,7 @@ impl StreamRef let frame = server::Peer::convert_send_message( stream.id, response, end_of_stream); - me.actions.transition::(stream, |actions, stream| { + me.actions.transition(stream, |actions, stream| { actions.send.send_headers(frame, stream, &mut actions.task) }) } @@ -501,7 +503,9 @@ impl StreamRef } } -impl Clone for StreamRef { +impl Clone for StreamRef + where P: Peer, +{ fn clone(&self) -> Self { StreamRef { inner: self.inner.clone(), @@ -512,42 +516,42 @@ impl Clone for StreamRef { // ===== impl Actions ===== -impl Actions +impl Actions where B: Buf, + P: Peer, { - fn ensure_not_idle(&mut self, id: StreamId) + fn ensure_not_idle(&mut self, id: StreamId) -> Result<(), ConnectionError> { - if self.is_local_init::

(id) { + if self.is_local_init(id) { self.send.ensure_not_idle(id) } else { self.recv.ensure_not_idle(id) } } - fn dec_num_streams(&mut self, id: StreamId) { - if self.is_local_init::

(id) { + fn dec_num_streams(&mut self, id: StreamId) { + if self.is_local_init(id) { self.send.dec_num_streams(); } else { self.recv.dec_num_streams(); } } - fn is_local_init(&self, id: StreamId) -> bool { + fn is_local_init(&self, id: StreamId) -> bool { assert!(!id.is_zero()); P::is_server() == id.is_server_initiated() } - fn transition(&mut self, mut stream: store::Ptr, f: F) -> U - where F: FnOnce(&mut Self, &mut store::Ptr) -> U, - P: Peer, + fn transition(&mut self, mut stream: store::Ptr, f: F) -> U + where F: FnOnce(&mut Self, &mut store::Ptr) -> U, { let is_counted = stream.state.is_counted(); let ret = f(self, &mut stream); if is_counted && stream.state.is_closed() { - self.dec_num_streams::

(stream.id); + self.dec_num_streams(stream.id); } ret diff --git a/src/server.rs b/src/server.rs index de5aafa..c9af36c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,4 +1,4 @@ -use {Body, HeaderMap, ConnectionError}; +use {HeaderMap, ConnectionError}; use frame::{self, StreamId}; use proto::{self, Connection, WindowSize}; use error::Reason; @@ -24,7 +24,12 @@ pub struct Server { #[derive(Debug)] pub struct Stream { - inner: proto::StreamRef, + inner: proto::StreamRef, +} + +#[derive(Debug)] +pub struct Body { + inner: proto::StreamRef, } #[derive(Debug)] @@ -181,18 +186,18 @@ impl Stream { pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> { - self.inner.send_data::(data.into_buf(), end_of_stream) + self.inner.send_data(data.into_buf(), end_of_stream) } /// Send trailers pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError> { - self.inner.send_trailers::(trailers) + self.inner.send_trailers(trailers) } pub fn send_reset(mut self, reason: Reason) { - self.inner.send_reset::(reason) + self.inner.send_reset(reason) } } @@ -210,6 +215,35 @@ impl Stream { } } +// ===== impl Body ===== + +impl Body { + pub fn is_empty(&self) -> bool { + // If the recv side is closed and the receive queue is empty, the body is empty. + self.inner.body_is_empty() + } + + pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> { + self.inner.release_capacity(sz as proto::WindowSize) + } + + /// Poll trailers + /// + /// This function **must** not be called until `Body::poll` returns `None`. + pub fn poll_trailers(&mut self) -> Poll, ConnectionError> { + self.inner.poll_trailers() + } +} + +impl futures::Stream for Body { + type Item = Bytes; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll, Self::Error> { + self.inner.poll_data() + } +} + // ===== impl Send ===== impl Future for Send