Thread P generic through all

This commit is contained in:
Carl Lerche
2017-08-25 23:45:15 -04:00
parent 88a7d56a60
commit 9bb34d907a
16 changed files with 368 additions and 291 deletions

View File

@@ -8,7 +8,7 @@ extern crate io_dump;
extern crate env_logger;
use h2::*;
use h2::client::Client;
use h2::client::{Client, Body};
use http::*;
use futures::*;

View File

@@ -1,5 +1,4 @@
use {frame, HeaderMap, ConnectionError};
use Body;
use frame::StreamId;
use proto::{self, Connection, WindowSize};
@@ -23,7 +22,12 @@ pub struct Client<T, B: IntoBuf> {
#[derive(Debug)]
pub struct Stream<B: IntoBuf> {
inner: proto::StreamRef<B::Buf>,
inner: proto::StreamRef<B::Buf, Peer>,
}
#[derive(Debug)]
pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<B::Buf, Peer>,
}
#[derive(Debug)]
@@ -168,14 +172,14 @@ impl<B: IntoBuf> Stream<B> {
pub fn send_data(&mut self, data: B, end_of_stream: bool)
-> Result<(), ConnectionError>
{
self.inner.send_data::<Peer>(data.into_buf(), end_of_stream)
self.inner.send_data(data.into_buf(), end_of_stream)
}
/// Send trailers
pub fn send_trailers(&mut self, trailers: HeaderMap)
-> Result<(), ConnectionError>
{
self.inner.send_trailers::<Peer>(trailers)
self.inner.send_trailers(trailers)
}
}
@@ -188,6 +192,35 @@ impl<B: IntoBuf> Future for Stream<B> {
}
}
// ===== impl Body =====
impl<B: IntoBuf> Body<B> {
pub fn is_empty(&self) -> bool {
// If the recv side is closed and the receive queue is empty, the body is empty.
self.inner.body_is_empty()
}
pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> {
self.inner.release_capacity(sz as proto::WindowSize)
}
/// Poll trailers
///
/// This function **must** not be called until `Body::poll` returns `None`.
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ConnectionError> {
self.inner.poll_trailers()
}
}
impl<B: IntoBuf> ::futures::Stream for Body<B> {
type Item = Bytes;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.poll_data()
}
}
// ===== impl Peer =====
impl proto::Peer for Peer {

View File

@@ -249,14 +249,6 @@ impl Headers {
Ok(())
}
/// Returns `true` if the frame represents trailers
///
/// Trailers are header frames that contain no pseudo headers.
pub fn is_trailers(&self) -> bool {
self.pseudo.method.is_none() &&
self.pseudo.status.is_none()
}
pub fn stream_id(&self) -> StreamId {
self.stream_id
}

View File

@@ -33,46 +33,5 @@ pub mod server;
pub use error::{ConnectionError, Reason};
use bytes::Bytes;
// TODO: remove if carllerche/http#90 lands
pub type HeaderMap = http::HeaderMap<http::header::HeaderValue>;
// TODO: Move into other location
use bytes::IntoBuf;
use futures::Poll;
#[derive(Debug)]
pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<B::Buf>,
}
// ===== impl Body =====
impl<B: IntoBuf> Body<B> {
pub fn is_empty(&self) -> bool {
// If the recv side is closed and the receive queue is empty, the body is empty.
self.inner.body_is_empty()
}
pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> {
self.inner.release_capacity(sz as proto::WindowSize)
}
/// Poll trailers
///
/// This function **must** not be called until `Body::poll` returns `None`.
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ConnectionError> {
self.inner.poll_trailers()
}
}
impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Bytes;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.poll_data()
}
}

View File

@@ -11,7 +11,9 @@ use std::marker::PhantomData;
/// An H2 connection
#[derive(Debug)]
pub(crate) struct Connection<T, P, B: IntoBuf = Bytes> {
pub(crate) struct Connection<T, P, B: IntoBuf = Bytes>
where P: Peer,
{
/// Tracks the connection level state transitions.
state: State,
@@ -25,7 +27,7 @@ pub(crate) struct Connection<T, P, B: IntoBuf = Bytes> {
settings: Settings,
/// Stream state handler
streams: Streams<B::Buf>,
streams: Streams<B::Buf, P>,
/// Client or server
_phantom: PhantomData<P>,
@@ -53,7 +55,7 @@ impl<T, P, B> Connection<T, P, B>
{
pub fn new(codec: Codec<T, Prioritized<B::Buf>>) -> Connection<T, P, B> {
// TODO: Actually configure
let streams = Streams::new::<P>(streams::Config {
let streams = Streams::new(streams::Config {
max_remote_initiated: None,
init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
max_local_initiated: None,
@@ -147,7 +149,7 @@ impl<T, P, B> Connection<T, P, B>
// Stream level error, reset the stream
Err(Stream { id, reason }) => {
trace!("stream level error; id={:?}; reason={:?}", id, reason);
self.streams.send_reset::<P>(id, reason);
self.streams.send_reset(id, reason);
continue;
}
// I/O error, nothing more can be done
@@ -161,19 +163,19 @@ impl<T, P, B> Connection<T, P, B>
match frame {
Some(Headers(frame)) => {
trace!("recv HEADERS; frame={:?}", frame);
try!(self.streams.recv_headers::<P>(frame));
try!(self.streams.recv_headers(frame));
}
Some(Data(frame)) => {
trace!("recv DATA; frame={:?}", frame);
try!(self.streams.recv_data::<P>(frame));
try!(self.streams.recv_data(frame));
}
Some(Reset(frame)) => {
trace!("recv RST_STREAM; frame={:?}", frame);
try!(self.streams.recv_reset::<P>(frame));
try!(self.streams.recv_reset(frame));
}
Some(PushPromise(frame)) => {
trace!("recv PUSH_PROMISE; frame={:?}", frame);
self.streams.recv_push_promise::<P>(frame)?;
self.streams.recv_push_promise(frame)?;
}
Some(Settings(frame)) => {
trace!("recv SETTINGS; frame={:?}", frame);
@@ -244,7 +246,7 @@ impl<T, B> Connection<T, client::Peer, B>
{
/// Initialize a new HTTP/2.0 stream and send the message.
pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool)
-> Result<StreamRef<B::Buf>, ConnectionError>
-> Result<StreamRef<B::Buf, client::Peer>, ConnectionError>
{
self.streams.send_request(request, end_of_stream)
}
@@ -254,7 +256,7 @@ impl<T, B> Connection<T, server::Peer, B>
where T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
pub fn next_incoming(&mut self) -> Option<StreamRef<B::Buf>> {
pub fn next_incoming(&mut self) -> Option<StreamRef<B::Buf, server::Peer>> {
self.streams.next_incoming()
}
}

View File

@@ -38,13 +38,11 @@ pub trait Peer {
fn is_server() -> bool;
#[doc(hidden)]
fn convert_send_message(
id: StreamId,
headers: Self::Send,
end_of_stream: bool) -> frame::Headers;
#[doc(hidden)]
fn convert_poll_message(headers: frame::Headers) -> Result<Self::Poll, ConnectionError>;
}

View File

@@ -28,13 +28,14 @@ impl Settings {
}
}
pub fn send_pending_ack<T, B, C>(&mut self,
dst: &mut Codec<T, B>,
streams: &mut Streams<C>)
pub fn send_pending_ack<T, B, C, P>(&mut self,
dst: &mut Codec<T, B>,
streams: &mut Streams<C, P>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
B: Buf,
C: Buf,
P: Peer,
{
trace!("send_pending_ack; pending={:?}", self.pending);

View File

@@ -1,13 +1,11 @@
use frame::Frame;
use slab::Slab;
use std::marker::PhantomData;
/// Buffers frames for multiple streams.
#[derive(Debug)]
pub struct Buffer<B> {
slab: Slab<Slot<B>>,
pub struct Buffer<T> {
slab: Slab<Slot<T>>,
}
/// A sequence of frames in a `Buffer`
@@ -25,12 +23,12 @@ struct Indices {
}
#[derive(Debug)]
struct Slot<B> {
frame: Frame<B>,
struct Slot<T> {
value: T,
next: Option<usize>,
}
impl<B> Buffer<B> {
impl<T> Buffer<T> {
pub fn new() -> Self {
Buffer {
slab: Slab::new(),
@@ -38,7 +36,7 @@ impl<B> Buffer<B> {
}
}
impl<B> Deque<B> {
impl<T> Deque<T> {
pub fn new() -> Self {
Deque {
indices: None,
@@ -50,9 +48,9 @@ impl<B> Deque<B> {
self.indices.is_none()
}
pub fn push_back(&mut self, buf: &mut Buffer<B>, frame: Frame<B>) {
pub fn push_back(&mut self, buf: &mut Buffer<T>, value: T) {
let key = buf.slab.insert(Slot {
frame,
value,
next: None,
});
@@ -70,9 +68,9 @@ impl<B> Deque<B> {
}
}
pub fn push_front(&mut self, buf: &mut Buffer<B>, frame: Frame<B>) {
pub fn push_front(&mut self, buf: &mut Buffer<T>, value: T) {
let key = buf.slab.insert(Slot {
frame,
value,
next: None,
});
@@ -90,7 +88,7 @@ impl<B> Deque<B> {
}
}
pub fn pop_front(&mut self, buf: &mut Buffer<B>) -> Option<Frame<B>> {
pub fn pop_front(&mut self, buf: &mut Buffer<T>) -> Option<T> {
match self.indices {
Some(mut idxs) => {
let mut slot = buf.slab.remove(idxs.head);
@@ -103,16 +101,16 @@ impl<B> Deque<B> {
self.indices = Some(idxs);
}
return Some(slot.frame);
return Some(slot.value);
}
None => None,
}
}
pub fn peek_front<'a>(&self, buf: &'a Buffer<B>) -> Option<&'a Frame<B>> {
pub fn peek_front<'a>(&self, buf: &'a Buffer<T>) -> Option<&'a T> {
match self.indices {
Some(idxs) => {
Some(&buf.slab[idxs.head].frame)
Some(&buf.slab[idxs.head].value)
}
None => None,
}

View File

@@ -7,18 +7,20 @@ use futures::Sink;
use std::{fmt, cmp};
#[derive(Debug)]
pub(super) struct Prioritize<B> {
pub(super) struct Prioritize<B, P>
where P: Peer,
{
/// Queue of streams waiting for socket capacity to send a frame
pending_send: store::Queue<B, stream::NextSend>,
pending_send: store::Queue<B, stream::NextSend, P>,
/// Queue of streams waiting for window capacity to produce data.
pending_capacity: store::Queue<B, stream::NextSendCapacity>,
pending_capacity: store::Queue<B, stream::NextSendCapacity, P>,
/// Connection level flow control governing sent data
flow: FlowControl,
/// Holds frames that are waiting to be written to the socket
buffer: Buffer<B>,
buffer: Buffer<Frame<B>>,
}
pub(crate) struct Prioritized<B> {
@@ -33,10 +35,11 @@ pub(crate) struct Prioritized<B> {
// ===== impl Prioritize =====
impl<B> Prioritize<B>
impl<B, P> Prioritize<B, P>
where B: Buf,
P: Peer,
{
pub fn new(config: &Config) -> Prioritize<B> {
pub fn new(config: &Config) -> Prioritize<B, P> {
let mut flow = FlowControl::new();
flow.inc_window(config.init_local_window_sz)
@@ -57,7 +60,7 @@ impl<B> Prioritize<B>
/// Queue a frame to be sent to the remote
pub fn queue_frame(&mut self,
frame: Frame<B>,
stream: &mut store::Ptr<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
{
// Queue the frame in the buffer
@@ -75,7 +78,7 @@ impl<B> Prioritize<B>
/// Send a data frame
pub fn send_data(&mut self,
frame: frame::Data<B>,
stream: &mut store::Ptr<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
@@ -134,7 +137,7 @@ impl<B> Prioritize<B>
}
/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr<B>) {
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr<B, P>) {
// Actual capacity is `capacity` + the current amount of buffered data.
// It it were less, then we could never send out the buffered data.
let capacity = capacity + stream.buffered_send_data;
@@ -157,7 +160,7 @@ impl<B> Prioritize<B>
pub fn recv_stream_window_update(&mut self,
inc: WindowSize,
stream: &mut store::Ptr<B>)
stream: &mut store::Ptr<B, P>)
-> Result<(), ConnectionError>
{
trace!("recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
@@ -175,7 +178,7 @@ impl<B> Prioritize<B>
pub fn recv_connection_window_update(&mut self,
inc: WindowSize,
store: &mut Store<B>)
store: &mut Store<B, P>)
-> Result<(), ConnectionError>
{
// Update the connection's window
@@ -188,7 +191,7 @@ impl<B> Prioritize<B>
pub fn assign_connection_capacity<R>(&mut self,
inc: WindowSize,
store: &mut R)
where R: Resolve<B>
where R: Resolve<B, P>
{
self.flow.assign_capacity(inc);
@@ -207,7 +210,7 @@ impl<B> Prioritize<B>
}
/// Request capacity to send data
fn try_assign_capacity(&mut self, stream: &mut store::Ptr<B>) {
fn try_assign_capacity(&mut self, stream: &mut store::Ptr<B, P>) {
let total_requested = stream.requested_send_capacity;
// Total requested should never go below actual assigned
@@ -279,7 +282,7 @@ impl<B> Prioritize<B>
pub fn poll_complete<T>(&mut self,
store: &mut Store<B>,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
@@ -337,7 +340,7 @@ impl<B> Prioritize<B>
/// entirety (large chunks are split up into potentially many data frames).
/// In this case, the stream needs to be reprioritized.
fn reclaim_frame<T>(&mut self,
store: &mut Store<B>,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>) -> bool
{
trace!("try reclaim frame");
@@ -373,7 +376,7 @@ impl<B> Prioritize<B>
/// Push the frame to the front of the stream's deque, scheduling the
/// steream if needed.
fn push_back_frame(&mut self, frame: Frame<B>, stream: &mut store::Ptr<B>) {
fn push_back_frame(&mut self, frame: Frame<B>, stream: &mut store::Ptr<B, P>) {
// Push the frame to the front of the stream's deque
stream.pending_send.push_front(&mut self.buffer, frame);
@@ -383,7 +386,7 @@ impl<B> Prioritize<B>
}
}
pub fn clear_queue(&mut self, stream: &mut store::Ptr<B>) {
pub fn clear_queue(&mut self, stream: &mut store::Ptr<B, P>) {
trace!("clear_queue; stream-id={:?}", stream.id);
// TODO: make this more efficient?
@@ -392,7 +395,7 @@ impl<B> Prioritize<B>
}
}
fn pop_frame(&mut self, store: &mut Store<B>, max_len: usize)
fn pop_frame(&mut self, store: &mut Store<B, P>, max_len: usize)
-> Option<Frame<Prioritized<B>>>
{
trace!("pop_frame");

View File

@@ -8,7 +8,9 @@ use futures::Sink;
use std::marker::PhantomData;
#[derive(Debug)]
pub(super) struct Recv<B> {
pub(super) struct Recv<B, P>
where P: Peer,
{
/// Maximum number of remote initiated streams
max_streams: Option<usize>,
@@ -28,13 +30,13 @@ pub(super) struct Recv<B> {
last_processed_id: StreamId,
/// Streams that have pending window updates
pending_window_updates: store::Queue<B, stream::NextWindowUpdate>,
pending_window_updates: store::Queue<B, stream::NextWindowUpdate, P>,
/// New streams to be accepted
pending_accept: store::Queue<B, stream::NextAccept>,
pending_accept: store::Queue<B, stream::NextAccept, P>,
/// Holds frames that are waiting to be read
buffer: Buffer<Bytes>,
buffer: Buffer<Frame<Bytes>>,
/// Refused StreamId, this represents a frame that must be sent out.
refused: Option<StreamId>,
@@ -48,8 +50,11 @@ struct Indices {
tail: store::Key,
}
impl<B> Recv<B> where B: Buf {
pub fn new<P: Peer>(config: &Config) -> Self {
impl<B, P> Recv<B, P>
where B: Buf,
P: Peer,
{
pub fn new(config: &Config) -> Self {
let next_stream_id = if P::is_server() {
1
} else {
@@ -90,12 +95,12 @@ impl<B> Recv<B> where B: Buf {
/// Update state reflecting a new, remotely opened stream
///
/// Returns the stream state if successful. `None` if refused
pub fn open<P: Peer>(&mut self, id: StreamId)
pub fn open(&mut self, id: StreamId)
-> Result<Option<StreamId>, ConnectionError>
{
assert!(self.refused.is_none());
try!(self.ensure_can_open::<P>(id));
try!(self.ensure_can_open(id));
if !self.can_inc_num_streams() {
self.refused = Some(id);
@@ -105,7 +110,7 @@ impl<B> Recv<B> where B: Buf {
Ok(Some(id))
}
pub fn take_request(&mut self, stream: &mut store::Ptr<B>)
pub fn take_request(&mut self, stream: &mut store::Ptr<B, P>)
-> Result<Request<()>, ConnectionError>
{
match stream.pending_recv.pop_front(&mut self.buffer) {
@@ -118,7 +123,7 @@ impl<B> Recv<B> where B: Buf {
}
}
pub fn poll_response(&mut self, stream: &mut store::Ptr<B>)
pub fn poll_response(&mut self, stream: &mut store::Ptr<B, P>)
-> Poll<Response<()>, ConnectionError> {
// If the buffer is not empty, then the first frame must be a HEADERS
// frame or the user violated the contract.
@@ -139,9 +144,9 @@ impl<B> Recv<B> where B: Buf {
}
/// Transition the stream state based on receiving headers
pub fn recv_headers<P: Peer>(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B>)
pub fn recv_headers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B, P>)
-> Result<(), ConnectionError>
{
trace!("opening stream; init_window={}", self.init_window_sz);
@@ -182,9 +187,9 @@ impl<B> Recv<B> where B: Buf {
}
/// Transition the stream based on receiving trailers
pub fn recv_trailers<P: Peer>(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B>)
pub fn recv_trailers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B, P>)
-> Result<(), ConnectionError>
{
// Transition the state
@@ -199,7 +204,7 @@ impl<B> Recv<B> where B: Buf {
pub fn release_capacity(&mut self,
capacity: WindowSize,
stream: &mut store::Ptr<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
@@ -225,7 +230,7 @@ impl<B> Recv<B> where B: Buf {
Ok(())
}
pub fn body_is_empty(&self, stream: &store::Ptr<B>) -> bool {
pub fn body_is_empty(&self, stream: &store::Ptr<B, P>) -> bool {
if !stream.state.is_recv_closed() {
return false;
}
@@ -237,7 +242,7 @@ impl<B> Recv<B> where B: Buf {
pub fn recv_data(&mut self,
frame: frame::Data,
stream: &mut store::Ptr<B>)
stream: &mut store::Ptr<B, P>)
-> Result<(), ConnectionError>
{
let sz = frame.payload().len();
@@ -283,15 +288,15 @@ impl<B> Recv<B> where B: Buf {
Ok(())
}
pub fn recv_push_promise<P: Peer>(&mut self,
frame: frame::PushPromise,
send: &Send<B>,
stream: store::Key,
store: &mut Store<B>)
pub fn recv_push_promise(&mut self,
frame: frame::PushPromise,
send: &Send<B, P>,
stream: store::Key,
store: &mut Store<B, P>)
-> Result<(), ConnectionError>
{
// First, make sure that the values are legit
self.ensure_can_reserve::<P>(frame.promised_id())?;
self.ensure_can_reserve(frame.promised_id())?;
// Make sure that the stream state is valid
store[stream].state.ensure_recv_open()?;
@@ -343,7 +348,7 @@ impl<B> Recv<B> where B: Buf {
Ok(())
}
pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream<B>)
pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream<B, P>)
-> Result<(), ConnectionError>
{
let err = ConnectionError::Proto(frame.reason());
@@ -355,7 +360,7 @@ impl<B> Recv<B> where B: Buf {
}
/// Handle a received error
pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream<B>) {
pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream<B, P>) {
// Receive an error
stream.state.recv_err(err);
@@ -388,7 +393,7 @@ impl<B> Recv<B> where B: Buf {
}
/// Returns true if the remote peer can initiate a stream with the given ID.
fn ensure_can_open<P: Peer>(&self, id: StreamId)
fn ensure_can_open(&self, id: StreamId)
-> Result<(), ConnectionError>
{
if !P::is_server() {
@@ -406,7 +411,7 @@ impl<B> Recv<B> where B: Buf {
}
/// Returns true if the remote peer can reserve a stream with the given ID.
fn ensure_can_reserve<P: Peer>(&self, promised_id: StreamId)
fn ensure_can_reserve(&self, promised_id: StreamId)
-> Result<(), ConnectionError>
{
// TODO: Are there other rules?
@@ -446,7 +451,7 @@ impl<B> Recv<B> where B: Buf {
}
pub fn poll_complete<T>(&mut self,
store: &mut Store<B>,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
@@ -483,7 +488,7 @@ impl<B> Recv<B> where B: Buf {
/// Send stream level window update
pub fn send_stream_window_updates<T>(&mut self,
store: &mut Store<B>,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
@@ -516,12 +521,12 @@ impl<B> Recv<B> where B: Buf {
}
}
pub fn next_incoming(&mut self, store: &mut Store<B>) -> Option<store::Key> {
pub fn next_incoming(&mut self, store: &mut Store<B, P>) -> Option<store::Key> {
self.pending_accept.pop(store)
.map(|ptr| ptr.key())
}
pub fn poll_data(&mut self, stream: &mut Stream<B>)
pub fn poll_data(&mut self, stream: &mut Stream<B, P>)
-> Poll<Option<Bytes>, ConnectionError>
{
match stream.pending_recv.pop_front(&mut self.buffer) {
@@ -548,7 +553,7 @@ impl<B> Recv<B> where B: Buf {
}
}
pub fn poll_trailers(&mut self, stream: &mut Stream<B>)
pub fn poll_trailers(&mut self, stream: &mut Stream<B, P>)
-> Poll<Option<HeaderMap>, ConnectionError>
{
match stream.pending_recv.pop_front(&mut self.buffer) {

View File

@@ -8,7 +8,9 @@ use bytes::Buf;
/// Manages state transitions related to outbound frames.
#[derive(Debug)]
pub(super) struct Send<B> {
pub(super) struct Send<B, P>
where P: Peer,
{
/// Maximum number of locally initiated streams
max_streams: Option<usize>,
@@ -25,12 +27,15 @@ pub(super) struct Send<B> {
blocked_open: Option<task::Task>,
/// Prioritization layer
prioritize: Prioritize<B>,
prioritize: Prioritize<B, P>,
}
impl<B> Send<B> where B: Buf {
impl<B, P> Send<B, P>
where B: Buf,
P: Peer,
{
/// Create a new `Send`
pub fn new<P: Peer>(config: &Config) -> Self {
pub fn new(config: &Config) -> Self {
let next_stream_id = if P::is_server() { 2 } else { 1 };
Send {
@@ -48,8 +53,8 @@ impl<B> Send<B> where B: Buf {
self.init_window_sz
}
pub fn poll_open_ready<P: Peer>(&mut self) -> Poll<(), ConnectionError> {
try!(self.ensure_can_open::<P>());
pub fn poll_open_ready(&mut self) -> Poll<(), ConnectionError> {
try!(self.ensure_can_open());
if let Some(max) = self.max_streams {
if max <= self.num_streams {
@@ -64,10 +69,10 @@ impl<B> Send<B> where B: Buf {
/// Update state reflecting a new, locally opened stream
///
/// Returns the stream state if successful. `None` if refused
pub fn open<P: Peer>(&mut self)
pub fn open(&mut self)
-> Result<StreamId, ConnectionError>
{
try!(self.ensure_can_open::<P>());
try!(self.ensure_can_open());
if let Some(max) = self.max_streams {
if max <= self.num_streams {
@@ -86,7 +91,7 @@ impl<B> Send<B> where B: Buf {
pub fn send_headers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
@@ -102,7 +107,7 @@ impl<B> Send<B> where B: Buf {
pub fn send_reset(&mut self,
reason: Reason,
stream: &mut store::Ptr<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
{
if stream.state.is_reset() {
@@ -138,7 +143,7 @@ impl<B> Send<B> where B: Buf {
pub fn send_data(&mut self,
frame: frame::Data<B>,
stream: &mut store::Ptr<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
@@ -147,7 +152,7 @@ impl<B> Send<B> where B: Buf {
pub fn send_trailers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
@@ -165,7 +170,7 @@ impl<B> Send<B> where B: Buf {
}
pub fn poll_complete<T>(&mut self,
store: &mut Store<B>,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
@@ -174,11 +179,11 @@ impl<B> Send<B> where B: Buf {
}
/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr<B>) {
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr<B, P>) {
self.prioritize.reserve_capacity(capacity, stream)
}
pub fn poll_capacity(&mut self, stream: &mut store::Ptr<B>)
pub fn poll_capacity(&mut self, stream: &mut store::Ptr<B, P>)
-> Poll<Option<WindowSize>, ConnectionError>
{
if !stream.state.is_send_streaming() {
@@ -195,7 +200,7 @@ impl<B> Send<B> where B: Buf {
}
/// Current available stream send capacity
pub fn capacity(&self, stream: &mut store::Ptr<B>) -> WindowSize {
pub fn capacity(&self, stream: &mut store::Ptr<B, P>) -> WindowSize {
let available = stream.send_flow.available();
let buffered = stream.buffered_send_data;
@@ -208,7 +213,7 @@ impl<B> Send<B> where B: Buf {
pub fn recv_connection_window_update(&mut self,
frame: frame::WindowUpdate,
store: &mut Store<B>)
store: &mut Store<B, P>)
-> Result<(), ConnectionError>
{
self.prioritize.recv_connection_window_update(frame.size_increment(), store)
@@ -216,7 +221,7 @@ impl<B> Send<B> where B: Buf {
pub fn recv_stream_window_update(&mut self,
sz: WindowSize,
stream: &mut store::Ptr<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
@@ -230,7 +235,7 @@ impl<B> Send<B> where B: Buf {
pub fn apply_remote_settings(&mut self,
settings: &frame::Settings,
store: &mut Store<B>,
store: &mut Store<B, P>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
@@ -311,7 +316,7 @@ impl<B> Send<B> where B: Buf {
}
/// Returns true if the local actor can initiate a stream with the given ID.
fn ensure_can_open<P: Peer>(&self) -> Result<(), ConnectionError> {
fn ensure_can_open(&self) -> Result<(), ConnectionError> {
if P::is_server() {
// Servers cannot open streams. PushPromise must first be reserved.
return Err(UnexpectedFrameType.into());

View File

@@ -274,6 +274,16 @@ impl State {
}
}
/// Returns true when the stream is in a state to receive headers
pub fn is_recv_headers(&self) -> bool {
match self.inner {
Idle => true,
Open { remote: AwaitingHeaders, .. } => true,
HalfClosedLocal(AwaitingHeaders) => true,
_ => false,
}
}
pub fn is_recv_streaming(&self) -> bool {
match self.inner {
Open { remote: Peer::Streaming, .. } => true,

View File

@@ -8,15 +8,19 @@ use std::marker::PhantomData;
/// Storage for streams
#[derive(Debug)]
pub(super) struct Store<B> {
slab: slab::Slab<Stream<B>>,
pub(super) struct Store<B, P>
where P: Peer,
{
slab: slab::Slab<Stream<B, P>>,
ids: HashMap<StreamId, usize>,
}
/// "Pointer" to an entry in the store
pub(super) struct Ptr<'a, B: 'a> {
pub(super) struct Ptr<'a, B: 'a, P>
where P: Peer + 'a,
{
key: Key,
slab: &'a mut slab::Slab<Stream<B>>,
slab: &'a mut slab::Slab<Stream<B, P>>,
}
/// References an entry in the store.
@@ -24,21 +28,23 @@ pub(super) struct Ptr<'a, B: 'a> {
pub(super) struct Key(usize);
#[derive(Debug)]
pub(super) struct Queue<B, N> {
pub(super) struct Queue<B, N, P>
where P: Peer,
{
indices: Option<store::Indices>,
_p: PhantomData<(B, N)>,
_p: PhantomData<(B, N, P)>,
}
pub(super) trait Next {
fn next<B>(stream: &Stream<B>) -> Option<Key>;
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<Key>;
fn set_next<B>(stream: &mut Stream<B>, key: Option<Key>);
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<Key>);
fn take_next<B>(stream: &mut Stream<B>) -> Option<Key>;
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<Key>;
fn is_queued<B>(stream: &Stream<B>) -> bool;
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool;
fn set_queued<B>(stream: &mut Stream<B>, val: bool);
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool);
}
/// A linked list
@@ -48,27 +54,33 @@ struct Indices {
pub tail: Key,
}
pub(super) enum Entry<'a, B: 'a> {
pub(super) enum Entry<'a, B: 'a, P: Peer + 'a> {
Occupied(OccupiedEntry<'a>),
Vacant(VacantEntry<'a, B>),
Vacant(VacantEntry<'a, B, P>),
}
pub(super) struct OccupiedEntry<'a> {
ids: hash_map::OccupiedEntry<'a, StreamId, usize>,
}
pub(super) struct VacantEntry<'a, B: 'a> {
pub(super) struct VacantEntry<'a, B: 'a, P>
where P: Peer + 'a,
{
ids: hash_map::VacantEntry<'a, StreamId, usize>,
slab: &'a mut slab::Slab<Stream<B>>,
slab: &'a mut slab::Slab<Stream<B, P>>,
}
pub(super) trait Resolve<B> {
fn resolve(&mut self, key: Key) -> Ptr<B>;
pub(super) trait Resolve<B, P>
where P: Peer,
{
fn resolve(&mut self, key: Key) -> Ptr<B, P>;
}
// ===== impl Store =====
impl<B> Store<B> {
impl<B, P> Store<B, P>
where P: Peer,
{
pub fn new() -> Self {
Store {
slab: slab::Slab::new(),
@@ -76,7 +88,7 @@ impl<B> Store<B> {
}
}
pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<B>> {
pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<B, P>> {
if let Some(&key) = self.ids.get(id) {
Some(Ptr {
key: Key(key),
@@ -87,7 +99,7 @@ impl<B> Store<B> {
}
}
pub fn insert(&mut self, id: StreamId, val: Stream<B>) -> Ptr<B> {
pub fn insert(&mut self, id: StreamId, val: Stream<B, P>) -> Ptr<B, P> {
let key = self.slab.insert(val);
assert!(self.ids.insert(id, key).is_none());
@@ -97,7 +109,7 @@ impl<B> Store<B> {
}
}
pub fn find_entry(&mut self, id: StreamId) -> Entry<B> {
pub fn find_entry(&mut self, id: StreamId) -> Entry<B, P> {
use self::hash_map::Entry::*;
match self.ids.entry(id) {
@@ -116,7 +128,7 @@ impl<B> Store<B> {
}
pub fn for_each<F>(&mut self, mut f: F) -> Result<(), ConnectionError>
where F: FnMut(Ptr<B>) -> Result<(), ConnectionError>,
where F: FnMut(Ptr<B, P>) -> Result<(), ConnectionError>,
{
for &key in self.ids.values() {
f(Ptr {
@@ -129,8 +141,10 @@ impl<B> Store<B> {
}
}
impl<B> Resolve<B> for Store<B> {
fn resolve(&mut self, key: Key) -> Ptr<B> {
impl<B, P> Resolve<B, P> for Store<B, P>
where P: Peer,
{
fn resolve(&mut self, key: Key) -> Ptr<B, P> {
Ptr {
key: key,
slab: &mut self.slab,
@@ -138,15 +152,19 @@ impl<B> Resolve<B> for Store<B> {
}
}
impl<B> ops::Index<Key> for Store<B> {
type Output = Stream<B>;
impl<B, P> ops::Index<Key> for Store<B, P>
where P: Peer,
{
type Output = Stream<B, P>;
fn index(&self, key: Key) -> &Self::Output {
self.slab.index(key.0)
}
}
impl<B> ops::IndexMut<Key> for Store<B> {
impl<B, P> ops::IndexMut<Key> for Store<B, P>
where P: Peer,
{
fn index_mut(&mut self, key: Key) -> &mut Self::Output {
self.slab.index_mut(key.0)
}
@@ -154,8 +172,9 @@ impl<B> ops::IndexMut<Key> for Store<B> {
// ===== impl Queue =====
impl<B, N> Queue<B, N>
impl<B, N, P> Queue<B, N, P>
where N: Next,
P: Peer,
{
pub fn new() -> Self {
Queue {
@@ -178,7 +197,7 @@ impl<B, N> Queue<B, N>
/// Queue the stream.
///
/// If the stream is already contained by the list, return `false`.
pub fn push(&mut self, stream: &mut store::Ptr<B>) -> bool {
pub fn push(&mut self, stream: &mut store::Ptr<B, P>) -> bool {
trace!("Queue::push");
if N::is_queued(stream) {
@@ -215,8 +234,8 @@ impl<B, N> Queue<B, N>
true
}
pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a, B>>
where R: Resolve<B>
pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a, B, P>>
where R: Resolve<B, P>
{
if let Some(mut idxs) = self.indices {
let mut stream = store.resolve(idxs.head);
@@ -241,14 +260,18 @@ impl<B, N> Queue<B, N>
// ===== impl Ptr =====
impl<'a, B: 'a> Ptr<'a, B> {
impl<'a, B: 'a, P> Ptr<'a, B, P>
where P: Peer,
{
pub fn key(&self) -> Key {
self.key
}
}
impl<'a, B: 'a> Resolve<B> for Ptr<'a, B> {
fn resolve(&mut self, key: Key) -> Ptr<B> {
impl<'a, B: 'a, P> Resolve<B, P> for Ptr<'a, B, P>
where P: Peer,
{
fn resolve(&mut self, key: Key) -> Ptr<B, P> {
Ptr {
key: key,
slab: &mut *self.slab,
@@ -256,16 +279,20 @@ impl<'a, B: 'a> Resolve<B> for Ptr<'a, B> {
}
}
impl<'a, B: 'a> ops::Deref for Ptr<'a, B> {
type Target = Stream<B>;
impl<'a, B: 'a, P> ops::Deref for Ptr<'a, B, P>
where P: Peer,
{
type Target = Stream<B, P>;
fn deref(&self) -> &Stream<B> {
fn deref(&self) -> &Stream<B, P> {
&self.slab[self.key.0]
}
}
impl<'a, B: 'a> ops::DerefMut for Ptr<'a, B> {
fn deref_mut(&mut self) -> &mut Stream<B> {
impl<'a, B: 'a, P> ops::DerefMut for Ptr<'a, B, P>
where P: Peer,
{
fn deref_mut(&mut self) -> &mut Stream<B, P> {
&mut self.slab[self.key.0]
}
}
@@ -280,8 +307,10 @@ impl<'a> OccupiedEntry<'a> {
// ===== impl VacantEntry =====
impl<'a, B> VacantEntry<'a, B> {
pub fn insert(self, value: Stream<B>) -> Key {
impl<'a, B, P> VacantEntry<'a, B, P>
where P: Peer,
{
pub fn insert(self, value: Stream<B, P>) -> Key {
// Insert the value in the slab
let key = self.slab.insert(value);

View File

@@ -1,7 +1,9 @@
use super::*;
#[derive(Debug)]
pub(super) struct Stream<B> {
pub(super) struct Stream<B, P>
where P: Peer,
{
/// The h2 stream identifier
pub id: StreamId,
@@ -30,7 +32,7 @@ pub(super) struct Stream<B> {
pub send_task: Option<task::Task>,
/// Frames pending for this stream being sent to the socket
pub pending_send: buffer::Deque<B>,
pub pending_send: buffer::Deque<Frame<B>>,
/// Next node in the linked list of streams waiting for additional
/// connection level capacity.
@@ -62,13 +64,13 @@ pub(super) struct Stream<B> {
pub is_pending_window_update: bool,
/// Frames pending for this stream to read
pub pending_recv: buffer::Deque<Bytes>,
pub pending_recv: buffer::Deque<Frame<Bytes>>,
/// Task tracking receiving frames
pub recv_task: Option<task::Task>,
/// The stream's pending push promises
pub pending_push_promises: store::Queue<B, NextAccept>,
pub pending_push_promises: store::Queue<B, NextAccept, P>,
}
#[derive(Debug)]
@@ -83,10 +85,12 @@ pub(super) struct NextSendCapacity;
#[derive(Debug)]
pub(super) struct NextWindowUpdate;
impl<B> Stream<B> {
impl<B, P> Stream<B, P>
where P: Peer,
{
pub fn new(id: StreamId,
init_send_window: WindowSize,
init_recv_window: WindowSize) -> Stream<B>
init_recv_window: WindowSize) -> Stream<B, P>
{
let mut send_flow = FlowControl::new();
let mut recv_flow = FlowControl::new();
@@ -154,89 +158,89 @@ impl<B> Stream<B> {
}
impl store::Next for NextAccept {
fn next<B>(stream: &Stream<B>) -> Option<store::Key> {
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<store::Key> {
stream.next_pending_accept
}
fn set_next<B>(stream: &mut Stream<B>, key: Option<store::Key>) {
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<store::Key>) {
stream.next_pending_accept = key;
}
fn take_next<B>(stream: &mut Stream<B>) -> Option<store::Key> {
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<store::Key> {
stream.next_pending_accept.take()
}
fn is_queued<B>(stream: &Stream<B>) -> bool {
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool {
stream.is_pending_accept
}
fn set_queued<B>(stream: &mut Stream<B>, val: bool) {
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool) {
stream.is_pending_accept = val;
}
}
impl store::Next for NextSend {
fn next<B>(stream: &Stream<B>) -> Option<store::Key> {
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<store::Key> {
stream.next_pending_send
}
fn set_next<B>(stream: &mut Stream<B>, key: Option<store::Key>) {
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<store::Key>) {
stream.next_pending_send = key;
}
fn take_next<B>(stream: &mut Stream<B>) -> Option<store::Key> {
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<store::Key> {
stream.next_pending_send.take()
}
fn is_queued<B>(stream: &Stream<B>) -> bool {
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool {
stream.is_pending_send
}
fn set_queued<B>(stream: &mut Stream<B>, val: bool) {
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool) {
stream.is_pending_send = val;
}
}
impl store::Next for NextSendCapacity {
fn next<B>(stream: &Stream<B>) -> Option<store::Key> {
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<store::Key> {
stream.next_pending_send_capacity
}
fn set_next<B>(stream: &mut Stream<B>, key: Option<store::Key>) {
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<store::Key>) {
stream.next_pending_send_capacity = key;
}
fn take_next<B>(stream: &mut Stream<B>) -> Option<store::Key> {
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<store::Key> {
stream.next_pending_send_capacity.take()
}
fn is_queued<B>(stream: &Stream<B>) -> bool {
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool {
stream.is_pending_send_capacity
}
fn set_queued<B>(stream: &mut Stream<B>, val: bool) {
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool) {
stream.is_pending_send_capacity = val;
}
}
impl store::Next for NextWindowUpdate {
fn next<B>(stream: &Stream<B>) -> Option<store::Key> {
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<store::Key> {
stream.next_window_update
}
fn set_next<B>(stream: &mut Stream<B>, key: Option<store::Key>) {
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<store::Key>) {
stream.next_window_update = key;
}
fn take_next<B>(stream: &mut Stream<B>) -> Option<store::Key> {
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<store::Key> {
stream.next_window_update.take()
}
fn is_queued<B>(stream: &Stream<B>) -> bool {
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool {
stream.is_pending_window_update
}
fn set_queued<B>(stream: &mut Stream<B>, val: bool) {
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool) {
stream.is_pending_window_update = val;
}
}

View File

@@ -6,14 +6,18 @@ use super::store::Resolve;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub(crate) struct Streams<B> {
inner: Arc<Mutex<Inner<B>>>,
pub(crate) struct Streams<B, P>
where P: Peer,
{
inner: Arc<Mutex<Inner<B, P>>>,
}
/// Reference to the stream state
#[derive(Debug)]
pub(crate) struct StreamRef<B> {
inner: Arc<Mutex<Inner<B>>>,
pub(crate) struct StreamRef<B, P>
where P: Peer,
{
inner: Arc<Mutex<Inner<B, P>>>,
key: store::Key,
}
@@ -22,32 +26,37 @@ pub(crate) struct StreamRef<B> {
///
/// TODO: better name
#[derive(Debug)]
struct Inner<B> {
actions: Actions<B>,
store: Store<B>,
struct Inner<B, P>
where P: Peer,
{
actions: Actions<B, P>,
store: Store<B, P>,
}
#[derive(Debug)]
struct Actions<B> {
struct Actions<B, P>
where P: Peer,
{
/// Manages state transitions initiated by receiving frames
recv: Recv<B>,
recv: Recv<B, P>,
/// Manages state transitions initiated by sending frames
send: Send<B>,
send: Send<B, P>,
/// Task that calls `poll_complete`.
task: Option<task::Task>,
}
impl<B> Streams<B>
impl<B, P> Streams<B, P>
where B: Buf,
P: Peer,
{
pub fn new<P: Peer>(config: Config) -> Self {
pub fn new(config: Config) -> Self {
Streams {
inner: Arc::new(Mutex::new(Inner {
actions: Actions {
recv: Recv::new::<P>(&config),
send: Send::new::<P>(&config),
recv: Recv::new(&config),
send: Send::new(&config),
task: None,
},
store: Store::new(),
@@ -56,7 +65,7 @@ impl<B> Streams<B>
}
/// Process inbound headers
pub fn recv_headers<P: Peer>(&mut self, frame: frame::Headers)
pub fn recv_headers(&mut self, frame: frame::Headers)
-> Result<(), ConnectionError>
{
let id = frame.stream_id();
@@ -66,15 +75,7 @@ impl<B> Streams<B>
let key = match me.store.find_entry(id) {
Entry::Occupied(e) => e.key(),
Entry::Vacant(e) => {
// Trailers cannot open a stream. Trailers are header frames
// that do not contain pseudo headers. Requests MUST contain a
// method and responses MUST contain a status. If they do not,t
// hey are considered to be malformed.
if frame.is_trailers() {
return Err(ProtocolError.into());
}
match try!(me.actions.recv.open::<P>(id)) {
match try!(me.actions.recv.open(id)) {
Some(stream_id) => {
let stream = Stream::new(
stream_id,
@@ -90,21 +91,21 @@ impl<B> Streams<B>
let stream = me.store.resolve(key);
me.actions.transition::<P, _, _>(stream, |actions, stream| {
if frame.is_trailers() {
me.actions.transition(stream, |actions, stream| {
if stream.state.is_recv_headers() {
actions.recv.recv_headers(frame, stream)
} else {
if !frame.is_end_stream() {
// TODO: Is this the right error
return Err(ProtocolError.into());
}
actions.recv.recv_trailers::<P>(frame, stream)
} else {
actions.recv.recv_headers::<P>(frame, stream)
actions.recv.recv_trailers(frame, stream)
}
})
}
pub fn recv_data<P: Peer>(&mut self, frame: frame::Data)
pub fn recv_data(&mut self, frame: frame::Data)
-> Result<(), ConnectionError>
{
let mut me = self.inner.lock().unwrap();
@@ -117,12 +118,12 @@ impl<B> Streams<B>
None => return Err(ProtocolError.into()),
};
me.actions.transition::<P, _, _>(stream, |actions, stream| {
me.actions.transition(stream, |actions, stream| {
actions.recv.recv_data(frame, stream)
})
}
pub fn recv_reset<P: Peer>(&mut self, frame: frame::Reset)
pub fn recv_reset(&mut self, frame: frame::Reset)
-> Result<(), ConnectionError>
{
let mut me = self.inner.lock().unwrap();
@@ -138,12 +139,12 @@ impl<B> Streams<B>
Some(stream) => stream,
None => {
// TODO: Are there other error cases?
me.actions.ensure_not_idle::<P>(id)?;
me.actions.ensure_not_idle(id)?;
return Ok(());
}
};
me.actions.transition::<P, _, _>(stream, |actions, stream| {
me.actions.transition(stream, |actions, stream| {
actions.recv.recv_reset(frame, stream)?;
assert!(stream.state.is_closed());
Ok(())
@@ -190,7 +191,7 @@ impl<B> Streams<B>
Ok(())
}
pub fn recv_push_promise<P: Peer>(&mut self, frame: frame::PushPromise)
pub fn recv_push_promise(&mut self, frame: frame::PushPromise)
-> Result<(), ConnectionError>
{
let mut me = self.inner.lock().unwrap();
@@ -203,11 +204,11 @@ impl<B> Streams<B>
None => return Err(ProtocolError.into()),
};
me.actions.recv.recv_push_promise::<P>(
me.actions.recv.recv_push_promise(
frame, &me.actions.send, stream, &mut me.store)
}
pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
pub fn next_incoming(&mut self) -> Option<StreamRef<B, P>> {
let key = {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -268,11 +269,11 @@ impl<B> Streams<B>
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
me.actions.send.poll_open_ready::<client::Peer>()
me.actions.send.poll_open_ready()
}
pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool)
-> Result<StreamRef<B>, ConnectionError>
-> Result<StreamRef<B, P>, ConnectionError>
{
// TODO: There is a hazard with assigning a stream ID before the
// prioritize layer. If prioritization reorders new streams, this
@@ -284,7 +285,7 @@ impl<B> Streams<B>
let me = &mut *me;
// Initialize a new stream. This fails if the connection is at capacity.
let stream_id = me.actions.send.open::<client::Peer>()?;
let stream_id = me.actions.send.open()?;
let stream = Stream::new(
stream_id,
@@ -313,14 +314,14 @@ impl<B> Streams<B>
})
}
pub fn send_reset<P: Peer>(&mut self, id: StreamId, reason: Reason) {
pub fn send_reset(&mut self, id: StreamId, reason: Reason) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let key = match me.store.find_entry(id) {
Entry::Occupied(e) => e.key(),
Entry::Vacant(e) => {
match me.actions.recv.open::<P>(id) {
match me.actions.recv.open(id) {
Ok(Some(stream_id)) => {
let stream = Stream::new(
stream_id, 0, 0);
@@ -335,7 +336,7 @@ impl<B> Streams<B>
let stream = me.store.resolve(key);
me.actions.transition::<P, _, _>(stream, move |actions, stream| {
me.actions.transition(stream, move |actions, stream| {
actions.send.send_reset(reason, stream, &mut actions.task)
})
}
@@ -343,10 +344,11 @@ impl<B> Streams<B>
// ===== impl StreamRef =====
impl<B> StreamRef<B>
impl<B, P> StreamRef<B, P>
where B: Buf,
P: Peer,
{
pub fn send_data<P: Peer>(&mut self, data: B, end_of_stream: bool)
pub fn send_data(&mut self, data: B, end_of_stream: bool)
-> Result<(), ConnectionError>
{
let mut me = self.inner.lock().unwrap();
@@ -357,13 +359,13 @@ impl<B> StreamRef<B>
// Create the data frame
let frame = frame::Data::from_buf(stream.id, data, end_of_stream);
me.actions.transition::<P, _, _>(stream, |actions, stream| {
me.actions.transition(stream, |actions, stream| {
// Send the data frame
actions.send.send_data(frame, stream, &mut actions.task)
})
}
pub fn send_trailers<P: Peer>(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError>
pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError>
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -373,7 +375,7 @@ impl<B> StreamRef<B>
// Create the trailers frame
let frame = frame::Headers::trailers(stream.id, trailers);
me.actions.transition::<P, _, _>(stream, |actions, stream| {
me.actions.transition(stream, |actions, stream| {
// Send the trailers frame
actions.send.send_trailers(frame, stream, &mut actions.task)
})
@@ -394,12 +396,12 @@ impl<B> StreamRef<B>
me.actions.recv.take_request(&mut stream)
}
pub fn send_reset<P: Peer>(&mut self, reason: Reason) {
pub fn send_reset(&mut self, reason: Reason) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let stream = me.store.resolve(self.key);
me.actions.transition::<P, _, _>(stream, move |actions, stream| {
me.actions.transition(stream, move |actions, stream| {
actions.send.send_reset(reason, stream, &mut actions.task)
})
}
@@ -415,7 +417,7 @@ impl<B> StreamRef<B>
let frame = server::Peer::convert_send_message(
stream.id, response, end_of_stream);
me.actions.transition::<server::Peer, _, _>(stream, |actions, stream| {
me.actions.transition(stream, |actions, stream| {
actions.send.send_headers(frame, stream, &mut actions.task)
})
}
@@ -501,7 +503,9 @@ impl<B> StreamRef<B>
}
}
impl<B> Clone for StreamRef<B> {
impl<B, P> Clone for StreamRef<B, P>
where P: Peer,
{
fn clone(&self) -> Self {
StreamRef {
inner: self.inner.clone(),
@@ -512,42 +516,42 @@ impl<B> Clone for StreamRef<B> {
// ===== impl Actions =====
impl<B> Actions<B>
impl<B, P> Actions<B, P>
where B: Buf,
P: Peer,
{
fn ensure_not_idle<P: Peer>(&mut self, id: StreamId)
fn ensure_not_idle(&mut self, id: StreamId)
-> Result<(), ConnectionError>
{
if self.is_local_init::<P>(id) {
if self.is_local_init(id) {
self.send.ensure_not_idle(id)
} else {
self.recv.ensure_not_idle(id)
}
}
fn dec_num_streams<P: Peer>(&mut self, id: StreamId) {
if self.is_local_init::<P>(id) {
fn dec_num_streams(&mut self, id: StreamId) {
if self.is_local_init(id) {
self.send.dec_num_streams();
} else {
self.recv.dec_num_streams();
}
}
fn is_local_init<P: Peer>(&self, id: StreamId) -> bool {
fn is_local_init(&self, id: StreamId) -> bool {
assert!(!id.is_zero());
P::is_server() == id.is_server_initiated()
}
fn transition<P, F, U>(&mut self, mut stream: store::Ptr<B>, f: F) -> U
where F: FnOnce(&mut Self, &mut store::Ptr<B>) -> U,
P: Peer,
fn transition<F, U>(&mut self, mut stream: store::Ptr<B, P>, f: F) -> U
where F: FnOnce(&mut Self, &mut store::Ptr<B, P>) -> U,
{
let is_counted = stream.state.is_counted();
let ret = f(self, &mut stream);
if is_counted && stream.state.is_closed() {
self.dec_num_streams::<P>(stream.id);
self.dec_num_streams(stream.id);
}
ret

View File

@@ -1,4 +1,4 @@
use {Body, HeaderMap, ConnectionError};
use {HeaderMap, ConnectionError};
use frame::{self, StreamId};
use proto::{self, Connection, WindowSize};
use error::Reason;
@@ -24,7 +24,12 @@ pub struct Server<T, B: IntoBuf> {
#[derive(Debug)]
pub struct Stream<B: IntoBuf> {
inner: proto::StreamRef<B::Buf>,
inner: proto::StreamRef<B::Buf, Peer>,
}
#[derive(Debug)]
pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<B::Buf, Peer>,
}
#[derive(Debug)]
@@ -181,18 +186,18 @@ impl<B: IntoBuf> Stream<B> {
pub fn send_data(&mut self, data: B, end_of_stream: bool)
-> Result<(), ConnectionError>
{
self.inner.send_data::<Peer>(data.into_buf(), end_of_stream)
self.inner.send_data(data.into_buf(), end_of_stream)
}
/// Send trailers
pub fn send_trailers(&mut self, trailers: HeaderMap)
-> Result<(), ConnectionError>
{
self.inner.send_trailers::<Peer>(trailers)
self.inner.send_trailers(trailers)
}
pub fn send_reset(mut self, reason: Reason) {
self.inner.send_reset::<Peer>(reason)
self.inner.send_reset(reason)
}
}
@@ -210,6 +215,35 @@ impl Stream<Bytes> {
}
}
// ===== impl Body =====
impl<B: IntoBuf> Body<B> {
pub fn is_empty(&self) -> bool {
// If the recv side is closed and the receive queue is empty, the body is empty.
self.inner.body_is_empty()
}
pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> {
self.inner.release_capacity(sz as proto::WindowSize)
}
/// Poll trailers
///
/// This function **must** not be called until `Body::poll` returns `None`.
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ConnectionError> {
self.inner.poll_trailers()
}
}
impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Bytes;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.poll_data()
}
}
// ===== impl Send =====
impl<T> Future for Send<T>