API cleanup (#155)

* Change send_reset to take &mut self.

While calling this function is the last thing that should be done with
the instance, the intent of the h2 library is not to be used directly by
users, but to be used as an implementation detail by other libraries.

Requiring `self` on `send_reset` is pretty annoying when calling the
function from inside a `Future` implementation. Also, all the other fns
on the type take `&mut self`.

* Remove the P: Peer generic from internals

* Split out `Respond` from `server::Stream`

This new type is used to send HTTP responses to the client as well as
reserve streams for push promises.

* Remove unused `Send` helper.

This could be brought back later when the API becomes stable.

* Unite `client` and `server` types

* Remove `B` generic from internal proto structs

This is a first step in removing the `B` generic from public API types
that do not strictly require it.

Currently, all public API types must be generic over `B` even if they do
not actually interact with the send data frame type. The first step in
removing this is to remove `B` as a generic on all internal types.

* Remove `Buffer<B>` from inner stream state

This is the next step in removing the `B` generic from all public API
types. The send buffer is the only type that requires `B`. It has now
been extracted from the rest of the stream state.

The strategy used in this PR requires an additional `Arc` and `Mutex`,
but this is not a fundamental requirement. The additional overhead can
be avoided with a little bit of unsafe code. However, this optimization
should not be made until it is proven that it is required.

* Remove `B` generic from `Body` + `ReleaseCapacity`

This commit actually removes the generic from these two public API
types. Also note, that removing the generic requires that `B: 'static`.
This is because there is no more generic on `Body` and `ReleaseCapacity`
and the compiler must be able to ensure that `B` outlives all `Body` and
`ReleaseCapacity` handles.

In practice, in an async world, passing a non 'static `B` is never going
to happen.

* Remove generic from `ResponseFuture`

This change also makes generic free types `Send`. The original strategy
of using a trait object meant that those handles could not be `Send`.
The solution was to avoid using the send buffer when canceling a stream.
This is done by transitioning the stream state to `Canceled`, a new
`Cause` variant.

* Simplify Send::send_reset

Now that implicit cancelation goes through a separate path, the
send_reset function can be simplified.

* Export types common to client & server at root

* Rename Stream -> SendStream, Body -> RecvStream

* Implement send_reset on server::Respond
This commit is contained in:
Carl Lerche
2017-10-19 20:02:08 -07:00
committed by GitHub
parent 1e126aa752
commit c4fc2928fe
21 changed files with 882 additions and 945 deletions

View File

@@ -1,4 +1,3 @@
extern crate bytes;
extern crate env_logger; extern crate env_logger;
extern crate futures; extern crate futures;
extern crate h2; extern crate h2;
@@ -6,9 +5,9 @@ extern crate http;
extern crate io_dump; extern crate io_dump;
extern crate tokio_core; extern crate tokio_core;
use h2::client::{Body, Client}; use h2::client::{Client};
use h2::RecvStream;
use bytes::*;
use futures::*; use futures::*;
use http::*; use http::*;
@@ -16,7 +15,7 @@ use tokio_core::net::TcpStream;
use tokio_core::reactor; use tokio_core::reactor;
struct Process { struct Process {
body: Body<Bytes>, body: RecvStream,
trailers: bool, trailers: bool,
} }

View File

@@ -32,17 +32,21 @@ pub fn main() {
.and_then(|conn| { .and_then(|conn| {
println!("H2 connection bound"); println!("H2 connection bound");
conn.for_each(|(request, mut stream)| { conn.for_each(|(request, mut respond)| {
println!("GOT request: {:?}", request); println!("GOT request: {:?}", request);
let response = Response::builder().status(StatusCode::OK).body(()).unwrap(); let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
if let Err(e) = stream.send_response(response, false) { let mut send = match respond.send_response(response, false) {
println!(" error responding; err={:?}", e); Ok(send) => send,
} Err(e) => {
println!(" error respond; err={:?}", e);
return Ok(());
}
};
println!(">>>> sending data"); println!(">>>> sending data");
if let Err(e) = stream.send_data(Bytes::from_static(b"hello world"), false) { if let Err(e) = send.send_data(Bytes::from_static(b"hello world"), false) {
println!(" -> err={:?}", e); println!(" -> err={:?}", e);
} }
@@ -50,7 +54,7 @@ pub fn main() {
hdrs.insert("status", "ok".parse().unwrap()); hdrs.insert("status", "ok".parse().unwrap());
println!(">>>> sending trailers"); println!(">>>> sending trailers");
if let Err(e) = stream.send_trailers(hdrs) { if let Err(e) = send.send_trailers(hdrs) {
println!(" -> err={:?}", e); println!(" -> err={:?}", e);
} }

View File

@@ -31,18 +31,21 @@ pub fn main() {
.and_then(|conn| { .and_then(|conn| {
println!("H2 connection bound"); println!("H2 connection bound");
conn.for_each(|(request, mut stream)| { conn.for_each(|(request, mut respond)| {
println!("GOT request: {:?}", request); println!("GOT request: {:?}", request);
let response = Response::builder().status(StatusCode::OK).body(()).unwrap(); let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
if let Err(e) = stream.send_response(response, false) { let mut send = match respond.send_response(response, false) {
println!(" error responding; err={:?}", e); Ok(send) => send,
} Err(e) => {
println!(" error respond; err={:?}", e);
return Ok(());
}
};
println!(">>>> sending data"); println!(">>>> sending data");
if let Err(e) = stream.send_data(Bytes::from_static(b"hello world"), true) { if let Err(e) = send.send_data(Bytes::from_static(b"hello world"), true) {
println!(" -> err={:?}", e); println!(" -> err={:?}", e);
} }

View File

@@ -1,10 +1,11 @@
use {SendStream, RecvStream, ReleaseCapacity};
use codec::{Codec, RecvError}; use codec::{Codec, RecvError};
use frame::{Headers, Pseudo, Reason, Settings, StreamId}; use frame::{Headers, Pseudo, Reason, Settings, StreamId};
use proto::{self, WindowSize}; use proto;
use bytes::{Bytes, IntoBuf}; use bytes::{Bytes, IntoBuf};
use futures::{Async, Future, MapErr, Poll}; use futures::{Async, Future, MapErr, Poll};
use http::{HeaderMap, Request, Response}; use http::{Request, Response};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::WriteAll; use tokio_io::io::WriteAll;
@@ -30,22 +31,8 @@ pub struct Connection<T, B: IntoBuf> {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct ResponseFuture<B: IntoBuf> { pub struct ResponseFuture {
inner: proto::StreamRef<B::Buf, Peer>, inner: proto::OpaqueStreamRef,
}
#[derive(Debug)]
pub struct Stream<B: IntoBuf> {
inner: proto::StreamRef<B::Buf, Peer>,
}
pub struct Body<B: IntoBuf> {
inner: ReleaseCapacity<B>,
}
#[derive(Debug)]
pub struct ReleaseCapacity<B: IntoBuf> {
inner: proto::StreamRef<B::Buf, Peer>,
} }
/// Build a Client. /// Build a Client.
@@ -86,6 +73,7 @@ impl Client<Bytes> {
impl<B> Client<B> impl<B> Client<B>
where where
B: IntoBuf, B: IntoBuf,
B::Buf: 'static,
{ {
fn handshake2<T>(io: T, builder: Builder) -> Handshake<T, B> fn handshake2<T>(io: T, builder: Builder) -> Handshake<T, B>
where where
@@ -118,7 +106,7 @@ where
&mut self, &mut self,
request: Request<()>, request: Request<()>,
end_of_stream: bool, end_of_stream: bool,
) -> Result<(ResponseFuture<B>, Stream<B>), ::Error> { ) -> Result<(ResponseFuture, SendStream<B>), ::Error> {
self.inner self.inner
.send_request(request, end_of_stream, self.pending.as_ref()) .send_request(request, end_of_stream, self.pending.as_ref())
.map_err(Into::into) .map_err(Into::into)
@@ -128,12 +116,10 @@ where
} }
let response = ResponseFuture { let response = ResponseFuture {
inner: stream.clone(), inner: stream.clone_to_opaque(),
}; };
let stream = Stream { let stream = SendStream::new(stream);
inner: stream,
};
(response, stream) (response, stream)
}) })
@@ -239,6 +225,7 @@ impl Builder {
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
B: IntoBuf, B: IntoBuf,
B::Buf: 'static,
{ {
Client::handshake2(io, self.clone()) Client::handshake2(io, self.clone())
} }
@@ -297,9 +284,11 @@ where
// ===== impl Handshake ===== // ===== impl Handshake =====
impl<T, B: IntoBuf> Future for Handshake<T, B> impl<T, B> Future for Handshake<T, B>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
B: IntoBuf,
B::Buf: 'static,
{ {
type Item = (Client<B>, Connection<T, B>); type Item = (Client<B>, Connection<T, B>);
type Error = ::Error; type Error = ::Error;
@@ -348,121 +337,29 @@ where
// ===== impl ResponseFuture ===== // ===== impl ResponseFuture =====
impl<B: IntoBuf> Future for ResponseFuture<B> { impl Future for ResponseFuture {
type Item = Response<Body<B>>; type Item = Response<RecvStream>;
type Error = ::Error; type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); let (parts, _) = try_ready!(self.inner.poll_response()).into_parts();
let body = RecvStream::new(ReleaseCapacity::new(self.inner.clone()));
let body = Body {
inner: ReleaseCapacity { inner: self.inner.clone() },
};
Ok(Response::from_parts(parts, body).into()) Ok(Response::from_parts(parts, body).into())
} }
} }
// ===== impl Stream =====
impl<B: IntoBuf> Stream<B> {
/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: usize) {
// TODO: Check for overflow
self.inner.reserve_capacity(capacity as WindowSize)
}
/// Returns the stream's current send capacity.
pub fn capacity(&self) -> usize {
self.inner.capacity() as usize
}
/// Request to be notified when the stream's capacity increases
pub fn poll_capacity(&mut self) -> Poll<Option<usize>, ::Error> {
let res = try_ready!(self.inner.poll_capacity());
Ok(Async::Ready(res.map(|v| v as usize)))
}
/// Send data
pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ::Error> {
self.inner
.send_data(data.into_buf(), end_of_stream)
.map_err(Into::into)
}
/// Send trailers
pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ::Error> {
self.inner.send_trailers(trailers).map_err(Into::into)
}
pub fn send_reset(mut self, reason: Reason) {
self.inner.send_reset(reason)
}
}
// ===== impl Body =====
impl<B: IntoBuf> Body<B> {
pub fn is_empty(&self) -> bool {
// If the recv side is closed and the receive queue is empty, the body is empty.
self.inner.inner.body_is_empty()
}
pub fn release_capacity(&mut self) -> &mut ReleaseCapacity<B> {
&mut self.inner
}
/// Poll trailers
///
/// This function **must** not be called until `Body::poll` returns `None`.
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ::Error> {
self.inner.inner.poll_trailers().map_err(Into::into)
}
}
impl<B: IntoBuf> ::futures::Stream for Body<B> {
type Item = Bytes;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.inner.poll_data().map_err(Into::into)
}
}
impl<B: IntoBuf> fmt::Debug for Body<B>
where B: fmt::Debug,
B::Buf: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Body")
.field("inner", &self.inner)
.finish()
}
}
// ===== impl ReleaseCapacity =====
impl<B: IntoBuf> ReleaseCapacity<B> {
pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> {
self.inner
.release_capacity(sz as proto::WindowSize)
.map_err(Into::into)
}
}
impl<B: IntoBuf> Clone for ReleaseCapacity<B> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
ReleaseCapacity { inner }
}
}
// ===== impl Peer ===== // ===== impl Peer =====
impl proto::Peer for Peer { impl proto::Peer for Peer {
type Send = Request<()>; type Send = Request<()>;
type Poll = Response<()>; type Poll = Response<()>;
fn dyn() -> proto::DynPeer {
proto::DynPeer::Client
}
fn is_server() -> bool { fn is_server() -> bool {
false false
} }

View File

@@ -7,26 +7,20 @@ extern crate futures;
extern crate tokio_io; extern crate tokio_io;
// HTTP types // HTTP types
extern crate http; extern crate http;
// Buffer utilities // Buffer utilities
extern crate bytes; extern crate bytes;
// Hash function used for HPACK encoding and tracking stream states. // Hash function used for HPACK encoding and tracking stream states.
extern crate fnv; extern crate fnv;
extern crate byteorder; extern crate byteorder;
extern crate slab; extern crate slab;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate string; extern crate string;
extern crate ordermap; extern crate ordermap;
mod error; mod error;
@@ -42,8 +36,10 @@ pub mod frame;
pub mod client; pub mod client;
pub mod server; pub mod server;
mod share;
pub use error::{Error, Reason}; pub use error::{Error, Reason};
pub use share::{SendStream, RecvStream, ReleaseCapacity};
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
pub use codec::{Codec, RecvError, SendError, UserError}; pub use codec::{Codec, RecvError, SendError, UserError};

View File

@@ -301,7 +301,7 @@ where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
B: IntoBuf, B: IntoBuf,
{ {
pub fn next_incoming(&mut self) -> Option<StreamRef<B::Buf, server::Peer>> { pub fn next_incoming(&mut self) -> Option<StreamRef<B::Buf>> {
self.streams.next_incoming() self.streams.next_incoming()
} }
} }

View File

@@ -7,9 +7,10 @@ mod streams;
pub(crate) use self::connection::Connection; pub(crate) use self::connection::Connection;
pub(crate) use self::error::Error; pub(crate) use self::error::Error;
pub(crate) use self::peer::Peer; pub(crate) use self::peer::{Peer, Dyn as DynPeer};
pub(crate) use self::streams::{Key as StreamKey, StreamRef, Streams}; pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams};
pub(crate) use self::streams::Prioritized; pub(crate) use self::streams::Prioritized;
use codec::Codec; use codec::Codec;
use self::ping_pong::PingPong; use self::ping_pong::PingPong;

View File

@@ -1,6 +1,9 @@
use codec::RecvError; use codec::RecvError;
use error::Reason;
use frame::{Headers, StreamId}; use frame::{Headers, StreamId};
use http::{Request, Response};
use std::fmt; use std::fmt;
/// Either a Client or a Server /// Either a Client or a Server
@@ -11,6 +14,8 @@ pub trait Peer {
/// Message type polled from the transport /// Message type polled from the transport
type Poll: fmt::Debug; type Poll: fmt::Debug;
fn dyn() -> Dyn;
fn is_server() -> bool; fn is_server() -> bool;
fn convert_send_message(id: StreamId, headers: Self::Send, end_of_stream: bool) -> Headers; fn convert_send_message(id: StreamId, headers: Self::Send, end_of_stream: bool) -> Headers;
@@ -22,3 +27,57 @@ pub trait Peer {
Self::is_server() == id.is_server_initiated() Self::is_server() == id.is_server_initiated()
} }
} }
/// A dynamic representation of `Peer`.
///
/// This is used internally to avoid incurring a generic on all internal types.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum Dyn {
Client,
Server,
}
#[derive(Debug)]
pub enum PollMessage {
Client(Response<()>),
Server(Request<()>),
}
// ===== impl Dyn =====
impl Dyn {
pub fn is_server(&self) -> bool {
*self == Dyn::Server
}
pub fn is_local_init(&self, id: StreamId) -> bool {
assert!(!id.is_zero());
self.is_server() == id.is_server_initiated()
}
pub fn convert_poll_message(&self, headers: Headers) -> Result<PollMessage, RecvError> {
if self.is_server() {
::server::Peer::convert_poll_message(headers)
.map(PollMessage::Server)
} else {
::client::Peer::convert_poll_message(headers)
.map(PollMessage::Client)
}
}
/// Returns true if the remote peer can initiate a stream with the given ID.
pub fn ensure_can_open(&self, id: StreamId) -> Result<(), RecvError> {
if !self.is_server() {
// Remote is a server and cannot open streams. PushPromise is
// registered by reserving, so does not go through this path.
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
}
// Ensure that the ID is a valid server initiated ID
if !id.is_client_initiated() {
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
}
Ok(())
}
}

View File

@@ -1,7 +1,5 @@
use slab::Slab; use slab::Slab;
use std::marker::PhantomData;
/// Buffers frames for multiple streams. /// Buffers frames for multiple streams.
#[derive(Debug)] #[derive(Debug)]
pub struct Buffer<T> { pub struct Buffer<T> {
@@ -10,9 +8,8 @@ pub struct Buffer<T> {
/// A sequence of frames in a `Buffer` /// A sequence of frames in a `Buffer`
#[derive(Debug)] #[derive(Debug)]
pub struct Deque<B> { pub struct Deque {
indices: Option<Indices>, indices: Option<Indices>,
_p: PhantomData<B>,
} }
/// Tracks the head & tail for a sequence of frames in a `Buffer`. /// Tracks the head & tail for a sequence of frames in a `Buffer`.
@@ -36,11 +33,10 @@ impl<T> Buffer<T> {
} }
} }
impl<T> Deque<T> { impl Deque {
pub fn new() -> Self { pub fn new() -> Self {
Deque { Deque {
indices: None, indices: None,
_p: PhantomData,
} }
} }
@@ -48,7 +44,7 @@ impl<T> Deque<T> {
self.indices.is_none() self.indices.is_none()
} }
pub fn push_back(&mut self, buf: &mut Buffer<T>, value: T) { pub fn push_back<T>(&mut self, buf: &mut Buffer<T>, value: T) {
let key = buf.slab.insert(Slot { let key = buf.slab.insert(Slot {
value, value,
next: None, next: None,
@@ -68,7 +64,7 @@ impl<T> Deque<T> {
} }
} }
pub fn push_front(&mut self, buf: &mut Buffer<T>, value: T) { pub fn push_front<T>(&mut self, buf: &mut Buffer<T>, value: T) {
let key = buf.slab.insert(Slot { let key = buf.slab.insert(Slot {
value, value,
next: None, next: None,
@@ -88,7 +84,7 @@ impl<T> Deque<T> {
} }
} }
pub fn pop_front(&mut self, buf: &mut Buffer<T>) -> Option<T> { pub fn pop_front<T>(&mut self, buf: &mut Buffer<T>) -> Option<T> {
match self.indices { match self.indices {
Some(mut idxs) => { Some(mut idxs) => {
let mut slot = buf.slab.remove(idxs.head); let mut slot = buf.slab.remove(idxs.head);
@@ -107,7 +103,7 @@ impl<T> Deque<T> {
} }
} }
pub fn peek_front<'a>(&self, buf: &'a Buffer<T>) -> Option<&'a T> { pub fn peek_front<'a, T>(&self, buf: &'a Buffer<T>) -> Option<&'a T> {
match self.indices { match self.indices {
Some(idxs) => Some(&buf.slab[idxs.head].value), Some(idxs) => Some(&buf.slab[idxs.head].value),
None => None, None => None,

View File

@@ -1,13 +1,13 @@
use super::*; use super::*;
use std::marker::PhantomData;
use std::usize; use std::usize;
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Counts<P> pub(super) struct Counts {
where /// Acting as a client or server. This allows us to track which values to
P: Peer, /// inc / dec.
{ peer: peer::Dyn,
/// Maximum number of locally initiated streams /// Maximum number of locally initiated streams
max_send_streams: usize, max_send_streams: usize,
@@ -19,25 +19,25 @@ where
/// Current number of locally initiated streams /// Current number of locally initiated streams
num_recv_streams: usize, num_recv_streams: usize,
_p: PhantomData<P>,
} }
impl<P> Counts<P> impl Counts {
where
P: Peer,
{
/// Create a new `Counts` using the provided configuration values. /// Create a new `Counts` using the provided configuration values.
pub fn new(config: &Config) -> Self { pub fn new(peer: peer::Dyn, config: &Config) -> Self {
Counts { Counts {
peer,
max_send_streams: config.local_max_initiated.unwrap_or(usize::MAX), max_send_streams: config.local_max_initiated.unwrap_or(usize::MAX),
num_send_streams: 0, num_send_streams: 0,
max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
num_recv_streams: 0, num_recv_streams: 0,
_p: PhantomData,
} }
} }
/// Returns the current peer
pub fn peer(&self) -> peer::Dyn {
self.peer
}
/// Returns true if the receive stream concurrency can be incremented /// Returns true if the receive stream concurrency can be incremented
pub fn can_inc_num_recv_streams(&self) -> bool { pub fn can_inc_num_recv_streams(&self) -> bool {
self.max_recv_streams > self.num_recv_streams self.max_recv_streams > self.num_recv_streams
@@ -82,9 +82,9 @@ where
/// ///
/// If the stream state transitions to closed, this function will perform /// If the stream state transitions to closed, this function will perform
/// all necessary cleanup. /// all necessary cleanup.
pub fn transition<F, B, U>(&mut self, mut stream: store::Ptr<B, P>, f: F) -> U pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
where where
F: FnOnce(&mut Self, &mut store::Ptr<B, P>) -> U, F: FnOnce(&mut Self, &mut store::Ptr) -> U,
{ {
let is_counted = stream.is_counted(); let is_counted = stream.is_counted();
@@ -97,7 +97,7 @@ where
} }
// TODO: move this to macro? // TODO: move this to macro?
pub fn transition_after<B>(&mut self, mut stream: store::Ptr<B, P>, is_counted: bool) { pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool) {
if stream.is_closed() { if stream.is_closed() {
stream.unlink(); stream.unlink();
@@ -114,7 +114,7 @@ where
} }
fn dec_num_streams(&mut self, id: StreamId) { fn dec_num_streams(&mut self, id: StreamId) {
if P::is_local_init(id) { if self.peer.is_local_init(id) {
self.num_send_streams -= 1; self.num_send_streams -= 1;
} else { } else {
self.num_recv_streams -= 1; self.num_recv_streams -= 1;

View File

@@ -11,7 +11,7 @@ mod streams;
pub(crate) use self::prioritize::Prioritized; pub(crate) use self::prioritize::Prioritized;
pub(crate) use self::store::Key; pub(crate) use self::store::Key;
pub(crate) use self::streams::{StreamRef, Streams}; pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams};
use self::buffer::Buffer; use self::buffer::Buffer;
use self::counts::Counts; use self::counts::Counts;

View File

@@ -12,24 +12,18 @@ use std::{cmp, fmt};
use std::io; use std::io;
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Prioritize<B, P> pub(super) struct Prioritize {
where
P: Peer,
{
/// Queue of streams waiting for socket capacity to send a frame /// Queue of streams waiting for socket capacity to send a frame
pending_send: store::Queue<B, stream::NextSend, P>, pending_send: store::Queue<stream::NextSend>,
/// Queue of streams waiting for window capacity to produce data. /// Queue of streams waiting for window capacity to produce data.
pending_capacity: store::Queue<B, stream::NextSendCapacity, P>, pending_capacity: store::Queue<stream::NextSendCapacity>,
/// Streams waiting for capacity due to max concurrency /// Streams waiting for capacity due to max concurrency
pending_open: store::Queue<B, stream::NextOpen, P>, pending_open: store::Queue<stream::NextOpen>,
/// Connection level flow control governing sent data /// Connection level flow control governing sent data
flow: FlowControl, flow: FlowControl,
/// Holds frames that are waiting to be written to the socket
buffer: Buffer<Frame<B>>,
} }
pub(crate) struct Prioritized<B> { pub(crate) struct Prioritized<B> {
@@ -44,11 +38,8 @@ pub(crate) struct Prioritized<B> {
// ===== impl Prioritize ===== // ===== impl Prioritize =====
impl<B, P> Prioritize<B, P> impl Prioritize {
where pub fn new(config: &Config) -> Prioritize {
P: Peer,
{
pub fn new(config: &Config) -> Prioritize<B, P> {
let mut flow = FlowControl::new(); let mut flow = FlowControl::new();
flow.inc_window(config.remote_init_window_sz) flow.inc_window(config.remote_init_window_sz)
@@ -64,20 +55,23 @@ where
pending_capacity: store::Queue::new(), pending_capacity: store::Queue::new(),
pending_open: store::Queue::new(), pending_open: store::Queue::new(),
flow: flow, flow: flow,
buffer: Buffer::new(),
} }
} }
/// Queue a frame to be sent to the remote /// Queue a frame to be sent to the remote
pub fn queue_frame( pub fn queue_frame<B>(
&mut self, &mut self,
frame: Frame<B>, frame: Frame<B>,
stream: &mut store::Ptr<B, P>, buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
task: &mut Option<Task>, task: &mut Option<Task>,
) { ) {
// Queue the frame in the buffer // Queue the frame in the buffer
stream.pending_send.push_back(&mut self.buffer, frame); stream.pending_send.push_back(buffer, frame);
self.schedule_send(stream, task);
}
pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option<Task>) {
// If the stream is waiting to be opened, nothing more to do. // If the stream is waiting to be opened, nothing more to do.
if !stream.is_pending_open { if !stream.is_pending_open {
// Queue the stream // Queue the stream
@@ -90,15 +84,16 @@ where
} }
} }
pub fn queue_open(&mut self, stream: &mut store::Ptr<B, P>) { pub fn queue_open(&mut self, stream: &mut store::Ptr) {
self.pending_open.push(stream); self.pending_open.push(stream);
} }
/// Send a data frame /// Send a data frame
pub fn send_data( pub fn send_data<B>(
&mut self, &mut self,
frame: frame::Data<B>, frame: frame::Data<B>,
stream: &mut store::Ptr<B, P>, buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
task: &mut Option<Task>, task: &mut Option<Task>,
) -> Result<(), UserError> ) -> Result<(), UserError>
where where
@@ -153,21 +148,21 @@ where
if stream.send_flow.available() >= stream.buffered_send_data { if stream.send_flow.available() >= stream.buffered_send_data {
// The stream currently has capacity to send the data frame, so // The stream currently has capacity to send the data frame, so
// queue it up and notify the connection task. // queue it up and notify the connection task.
self.queue_frame(frame.into(), stream, task); self.queue_frame(frame.into(), buffer, stream, task);
} else { } else {
// The stream has no capacity to send the frame now, save it but // The stream has no capacity to send the frame now, save it but
// don't notify the conneciton task. Once additional capacity // don't notify the conneciton task. Once additional capacity
// becomes available, the frame will be flushed. // becomes available, the frame will be flushed.
stream stream
.pending_send .pending_send
.push_back(&mut self.buffer, frame.into()); .push_back(buffer, frame.into());
} }
Ok(()) Ok(())
} }
/// Request capacity to send data /// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr<B, P>) { pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) {
trace!( trace!(
"reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}", "reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
stream.id, stream.id,
@@ -212,7 +207,7 @@ where
pub fn recv_stream_window_update( pub fn recv_stream_window_update(
&mut self, &mut self,
inc: WindowSize, inc: WindowSize,
stream: &mut store::Ptr<B, P>, stream: &mut store::Ptr,
) -> Result<(), Reason> { ) -> Result<(), Reason> {
trace!( trace!(
"recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}", "recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
@@ -235,7 +230,7 @@ where
pub fn recv_connection_window_update( pub fn recv_connection_window_update(
&mut self, &mut self,
inc: WindowSize, inc: WindowSize,
store: &mut Store<B, P>, store: &mut Store,
) -> Result<(), Reason> { ) -> Result<(), Reason> {
// Update the connection's window // Update the connection's window
self.flow.inc_window(inc)?; self.flow.inc_window(inc)?;
@@ -246,7 +241,7 @@ where
pub fn assign_connection_capacity<R>(&mut self, inc: WindowSize, store: &mut R) pub fn assign_connection_capacity<R>(&mut self, inc: WindowSize, store: &mut R)
where where
R: Resolve<B, P>, R: Resolve,
{ {
trace!("assign_connection_capacity; inc={}", inc); trace!("assign_connection_capacity; inc={}", inc);
@@ -267,7 +262,7 @@ where
} }
/// Request capacity to send data /// Request capacity to send data
fn try_assign_capacity(&mut self, stream: &mut store::Ptr<B, P>) { fn try_assign_capacity(&mut self, stream: &mut store::Ptr) {
let total_requested = stream.requested_send_capacity; let total_requested = stream.requested_send_capacity;
// Total requested should never go below actual assigned // Total requested should never go below actual assigned
@@ -366,10 +361,11 @@ where
} }
} }
pub fn poll_complete<T>( pub fn poll_complete<T, B>(
&mut self, &mut self,
store: &mut Store<B, P>, buffer: &mut Buffer<Frame<B>>,
counts: &mut Counts<P>, store: &mut Store,
counts: &mut Counts,
dst: &mut Codec<T, Prioritized<B>>, dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error> ) -> Poll<(), io::Error>
where where
@@ -380,7 +376,7 @@ where
try_ready!(dst.poll_ready()); try_ready!(dst.poll_ready());
// Reclaim any frame that has previously been written // Reclaim any frame that has previously been written
self.reclaim_frame(store, dst); self.reclaim_frame(buffer, store, dst);
// The max frame length // The max frame length
let max_frame_len = dst.max_send_frame_size(); let max_frame_len = dst.max_send_frame_size();
@@ -389,7 +385,8 @@ where
loop { loop {
self.schedule_pending_open(store, counts); self.schedule_pending_open(store, counts);
match self.pop_frame(store, max_frame_len, counts) {
match self.pop_frame(buffer, store, max_frame_len, counts) {
Some(frame) => { Some(frame) => {
trace!("writing frame={:?}", frame); trace!("writing frame={:?}", frame);
@@ -399,14 +396,14 @@ where
try_ready!(dst.poll_ready()); try_ready!(dst.poll_ready());
// Because, always try to reclaim... // Because, always try to reclaim...
self.reclaim_frame(store, dst); self.reclaim_frame(buffer, store, dst);
}, },
None => { None => {
// Try to flush the codec. // Try to flush the codec.
try_ready!(dst.flush()); try_ready!(dst.flush());
// This might release a data frame... // This might release a data frame...
if !self.reclaim_frame(store, dst) { if !self.reclaim_frame(buffer, store, dst) {
return Ok(().into()); return Ok(().into());
} }
@@ -424,9 +421,10 @@ where
/// When a data frame is written to the codec, it may not be written in its /// When a data frame is written to the codec, it may not be written in its
/// entirety (large chunks are split up into potentially many data frames). /// entirety (large chunks are split up into potentially many data frames).
/// In this case, the stream needs to be reprioritized. /// In this case, the stream needs to be reprioritized.
fn reclaim_frame<T>( fn reclaim_frame<T, B>(
&mut self, &mut self,
store: &mut Store<B, P>, buffer: &mut Buffer<Frame<B>>,
store: &mut Store,
dst: &mut Codec<T, Prioritized<B>>, dst: &mut Codec<T, Prioritized<B>>,
) -> bool ) -> bool
where where
@@ -458,7 +456,7 @@ where
frame.set_end_stream(true); frame.set_end_stream(true);
} }
self.push_back_frame(frame.into(), &mut stream); self.push_back_frame(frame.into(), buffer, &mut stream);
return true; return true;
} }
@@ -469,9 +467,13 @@ where
/// Push the frame to the front of the stream's deque, scheduling the /// Push the frame to the front of the stream's deque, scheduling the
/// steream if needed. /// steream if needed.
fn push_back_frame(&mut self, frame: Frame<B>, stream: &mut store::Ptr<B, P>) { fn push_back_frame<B>(&mut self,
frame: Frame<B>,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr)
{
// Push the frame to the front of the stream's deque // Push the frame to the front of the stream's deque
stream.pending_send.push_front(&mut self.buffer, frame); stream.pending_send.push_front(buffer, frame);
// If needed, schedule the sender // If needed, schedule the sender
if stream.send_flow.available() > 0 { if stream.send_flow.available() > 0 {
@@ -480,20 +482,21 @@ where
} }
} }
pub fn clear_queue(&mut self, stream: &mut store::Ptr<B, P>) { pub fn clear_queue<B>(&mut self, buffer: &mut Buffer<Frame<B>>, stream: &mut store::Ptr) {
trace!("clear_queue; stream-id={:?}", stream.id); trace!("clear_queue; stream-id={:?}", stream.id);
// TODO: make this more efficient? // TODO: make this more efficient?
while let Some(frame) = stream.pending_send.pop_front(&mut self.buffer) { while let Some(frame) = stream.pending_send.pop_front(buffer) {
trace!("dropping; frame={:?}", frame); trace!("dropping; frame={:?}", frame);
} }
} }
fn pop_frame( fn pop_frame<B>(
&mut self, &mut self,
store: &mut Store<B, P>, buffer: &mut Buffer<Frame<B>>,
store: &mut Store,
max_len: usize, max_len: usize,
counts: &mut Counts<P>, counts: &mut Counts,
) -> Option<Frame<Prioritized<B>>> ) -> Option<Frame<Prioritized<B>>>
where where
B: Buf, B: Buf,
@@ -508,8 +511,8 @@ where
let is_counted = stream.is_counted(); let is_counted = stream.is_counted();
let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { let frame = match stream.pending_send.pop_front(buffer) {
Frame::Data(mut frame) => { Some(Frame::Data(mut frame)) => {
// Get the amount of capacity remaining for stream's // Get the amount of capacity remaining for stream's
// window. // window.
let stream_capacity = stream.send_flow.available(); let stream_capacity = stream.send_flow.available();
@@ -546,7 +549,8 @@ where
// frame and wait for a window update... // frame and wait for a window update...
stream stream
.pending_send .pending_send
.push_front(&mut self.buffer, frame.into()); .push_front(buffer, frame.into());
continue; continue;
} }
@@ -597,12 +601,19 @@ where
} }
})) }))
}, },
frame => frame.map(|_| unreachable!()), Some(frame) => frame.map(|_| unreachable!()),
None => {
assert!(stream.state.is_canceled());
stream.state.set_reset(Reason::CANCEL);
let frame = frame::Reset::new(stream.id, Reason::CANCEL);
Frame::Reset(frame)
}
}; };
trace!("pop_frame; frame={:?}", frame); trace!("pop_frame; frame={:?}", frame);
if !stream.pending_send.is_empty() { if !stream.pending_send.is_empty() || stream.state.is_canceled() {
// TODO: Only requeue the sender IF it is ready to send // TODO: Only requeue the sender IF it is ready to send
// the next frame. i.e. don't requeue it if the next // the next frame. i.e. don't requeue it if the next
// frame is a data frame and the stream does not have // frame is a data frame and the stream does not have
@@ -619,7 +630,7 @@ where
} }
} }
fn schedule_pending_open(&mut self, store: &mut Store<B, P>, counts: &mut Counts<P>) { fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) {
trace!("schedule_pending_open"); trace!("schedule_pending_open");
// check for any pending open streams // check for any pending open streams
while counts.can_inc_num_send_streams() { while counts.can_inc_num_send_streams() {

View File

@@ -1,5 +1,5 @@
use super::*; use super::*;
use {client, frame, proto, server}; use {frame, proto};
use codec::{RecvError, UserError}; use codec::{RecvError, UserError};
use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE};
use proto::*; use proto::*;
@@ -7,13 +7,9 @@ use proto::*;
use http::HeaderMap; use http::HeaderMap;
use std::io; use std::io;
use std::marker::PhantomData;
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Recv<B, P> pub(super) struct Recv {
where
P: Peer,
{
/// Initial window size of remote initiated streams /// Initial window size of remote initiated streams
init_window_sz: WindowSize, init_window_sz: WindowSize,
@@ -30,26 +26,24 @@ where
last_processed_id: StreamId, last_processed_id: StreamId,
/// Streams that have pending window updates /// Streams that have pending window updates
pending_window_updates: store::Queue<B, stream::NextWindowUpdate, P>, pending_window_updates: store::Queue<stream::NextWindowUpdate>,
/// New streams to be accepted /// New streams to be accepted
pending_accept: store::Queue<B, stream::NextAccept, P>, pending_accept: store::Queue<stream::NextAccept>,
/// Holds frames that are waiting to be read /// Holds frames that are waiting to be read
buffer: Buffer<Event<P::Poll>>, buffer: Buffer<Event>,
/// Refused StreamId, this represents a frame that must be sent out. /// Refused StreamId, this represents a frame that must be sent out.
refused: Option<StreamId>, refused: Option<StreamId>,
/// If push promises are allowed to be recevied. /// If push promises are allowed to be recevied.
is_push_enabled: bool, is_push_enabled: bool,
_p: PhantomData<B>,
} }
#[derive(Debug)] #[derive(Debug)]
pub(super) enum Event<T> { pub(super) enum Event {
Headers(T), Headers(peer::PollMessage),
Data(Bytes), Data(Bytes),
Trailers(HeaderMap), Trailers(HeaderMap),
} }
@@ -60,12 +54,9 @@ struct Indices {
tail: store::Key, tail: store::Key,
} }
impl<B, P> Recv<B, P> impl Recv {
where pub fn new(peer: peer::Dyn, config: &Config) -> Self {
P: Peer, let next_stream_id = if peer.is_server() { 1 } else { 2 };
{
pub fn new(config: &Config) -> Self {
let next_stream_id = if P::is_server() { 1 } else { 2 };
let mut flow = FlowControl::new(); let mut flow = FlowControl::new();
@@ -86,7 +77,6 @@ where
buffer: Buffer::new(), buffer: Buffer::new(),
refused: None, refused: None,
is_push_enabled: config.local_push_enabled, is_push_enabled: config.local_push_enabled,
_p: PhantomData,
} }
} }
@@ -106,11 +96,11 @@ where
pub fn open( pub fn open(
&mut self, &mut self,
id: StreamId, id: StreamId,
counts: &mut Counts<P>, counts: &mut Counts,
) -> Result<Option<StreamId>, RecvError> { ) -> Result<Option<StreamId>, RecvError> {
assert!(self.refused.is_none()); assert!(self.refused.is_none());
self.ensure_can_open(id)?; counts.peer().ensure_can_open(id)?;
let next_id = self.next_stream_id()?; let next_id = self.next_stream_id()?;
if id < next_id { if id < next_id {
@@ -133,8 +123,8 @@ where
pub fn recv_headers( pub fn recv_headers(
&mut self, &mut self,
frame: frame::Headers, frame: frame::Headers,
stream: &mut store::Ptr<B, P>, stream: &mut store::Ptr,
counts: &mut Counts<P>, counts: &mut Counts,
) -> Result<(), RecvError> { ) -> Result<(), RecvError> {
trace!("opening stream; init_window={}", self.init_window_sz); trace!("opening stream; init_window={}", self.init_window_sz);
let is_initial = stream.state.recv_open(frame.is_end_stream())?; let is_initial = stream.state.recv_open(frame.is_end_stream())?;
@@ -168,7 +158,7 @@ where
} }
} }
let message = P::convert_poll_message(frame)?; let message = counts.peer().convert_poll_message(frame)?;
// Push the frame onto the stream's recv buffer // Push the frame onto the stream's recv buffer
stream stream
@@ -178,18 +168,53 @@ where
// Only servers can receive a headers frame that initiates the stream. // Only servers can receive a headers frame that initiates the stream.
// This is verified in `Streams` before calling this function. // This is verified in `Streams` before calling this function.
if P::is_server() { if counts.peer().is_server() {
self.pending_accept.push(stream); self.pending_accept.push(stream);
} }
Ok(()) Ok(())
} }
/// Called by the server to get the request
///
/// TODO: Should this fn return `Result`?
pub fn take_request(&mut self, stream: &mut store::Ptr)
-> Request<()>
{
use super::peer::PollMessage::*;
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Headers(Server(request))) => request,
_ => panic!(),
}
}
/// Called by the client to get the response
pub fn poll_response(
&mut self,
stream: &mut store::Ptr,
) -> Poll<Response<()>, proto::Error> {
use super::peer::PollMessage::*;
// If the buffer is not empty, then the first frame must be a HEADERS
// frame or the user violated the contract.
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Headers(Client(response))) => Ok(response.into()),
Some(_) => panic!("poll_response called after response returned"),
None => {
stream.state.ensure_recv_open()?;
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
},
}
}
/// Transition the stream based on receiving trailers /// Transition the stream based on receiving trailers
pub fn recv_trailers( pub fn recv_trailers(
&mut self, &mut self,
frame: frame::Headers, frame: frame::Headers,
stream: &mut store::Ptr<B, P>, stream: &mut store::Ptr,
) -> Result<(), RecvError> { ) -> Result<(), RecvError> {
// Transition the state // Transition the state
stream.state.recv_close()?; stream.state.recv_close()?;
@@ -216,7 +241,7 @@ where
pub fn release_capacity( pub fn release_capacity(
&mut self, &mut self,
capacity: WindowSize, capacity: WindowSize,
stream: &mut store::Ptr<B, P>, stream: &mut store::Ptr,
task: &mut Option<Task>, task: &mut Option<Task>,
) -> Result<(), UserError> { ) -> Result<(), UserError> {
trace!("release_capacity; size={}", capacity); trace!("release_capacity; size={}", capacity);
@@ -293,7 +318,7 @@ where
} }
} }
pub fn body_is_empty(&self, stream: &store::Ptr<B, P>) -> bool { pub fn body_is_empty(&self, stream: &store::Ptr) -> bool {
if !stream.state.is_recv_closed() { if !stream.state.is_recv_closed() {
return false; return false;
} }
@@ -308,7 +333,7 @@ where
pub fn recv_data( pub fn recv_data(
&mut self, &mut self,
frame: frame::Data, frame: frame::Data,
stream: &mut store::Ptr<B, P>, stream: &mut store::Ptr,
) -> Result<(), RecvError> { ) -> Result<(), RecvError> {
let sz = frame.payload().len(); let sz = frame.payload().len();
@@ -379,9 +404,9 @@ where
pub fn recv_push_promise( pub fn recv_push_promise(
&mut self, &mut self,
frame: frame::PushPromise, frame: frame::PushPromise,
send: &Send<B, P>, send: &Send,
stream: store::Key, stream: store::Key,
store: &mut Store<B, P>, store: &mut Store,
) -> Result<(), RecvError> { ) -> Result<(), RecvError> {
// First, make sure that the values are legit // First, make sure that the values are legit
self.ensure_can_reserve(frame.promised_id())?; self.ensure_can_reserve(frame.promised_id())?;
@@ -443,7 +468,7 @@ where
pub fn recv_reset( pub fn recv_reset(
&mut self, &mut self,
frame: frame::Reset, frame: frame::Reset,
stream: &mut Stream<B, P>, stream: &mut Stream,
) -> Result<(), RecvError> { ) -> Result<(), RecvError> {
let err = proto::Error::Proto(frame.reason()); let err = proto::Error::Proto(frame.reason());
@@ -454,7 +479,7 @@ where
} }
/// Handle a received error /// Handle a received error
pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream<B, P>) { pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) {
// Receive an error // Receive an error
stream.state.recv_err(err); stream.state.recv_err(err);
@@ -462,22 +487,6 @@ where
stream.notify_recv(); stream.notify_recv();
} }
/// Returns true if the remote peer can initiate a stream with the given ID.
fn ensure_can_open(&self, id: StreamId) -> Result<(), RecvError> {
if !P::is_server() {
// Remote is a server and cannot open streams. PushPromise is
// registered by reserving, so does not go through this path.
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
}
// Ensure that the ID is a valid server initiated ID
if !id.is_client_initiated() {
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
}
Ok(())
}
fn next_stream_id(&self) -> Result<StreamId, RecvError> { fn next_stream_id(&self) -> Result<StreamId, RecvError> {
if let Ok(id) = self.next_stream_id { if let Ok(id) = self.next_stream_id {
Ok(id) Ok(id)
@@ -487,14 +496,9 @@ where
} }
/// Returns true if the remote peer can reserve a stream with the given ID. /// Returns true if the remote peer can reserve a stream with the given ID.
fn ensure_can_reserve(&self, promised_id: StreamId) -> Result<(), RecvError> { fn ensure_can_reserve(&self, promised_id: StreamId)
// TODO: Are there other rules? -> Result<(), RecvError>
if P::is_server() { {
// The remote is a client and cannot reserve
trace!("recv_push_promise; error remote is client");
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
}
if !promised_id.is_server_initiated() { if !promised_id.is_server_initiated() {
trace!( trace!(
"recv_push_promise; error promised id is invalid {:?}", "recv_push_promise; error promised id is invalid {:?}",
@@ -512,7 +516,7 @@ where
} }
/// Send any pending refusals. /// Send any pending refusals.
pub fn send_pending_refusal<T>( pub fn send_pending_refusal<T, B>(
&mut self, &mut self,
dst: &mut Codec<T, Prioritized<B>>, dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error> ) -> Poll<(), io::Error>
@@ -537,9 +541,9 @@ where
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
pub fn poll_complete<T>( pub fn poll_complete<T, B>(
&mut self, &mut self,
store: &mut Store<B, P>, store: &mut Store,
dst: &mut Codec<T, Prioritized<B>>, dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error> ) -> Poll<(), io::Error>
where where
@@ -556,7 +560,7 @@ where
} }
/// Send connection level window update /// Send connection level window update
fn send_connection_window_update<T>( fn send_connection_window_update<T, B>(
&mut self, &mut self,
dst: &mut Codec<T, Prioritized<B>>, dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error> ) -> Poll<(), io::Error>
@@ -587,9 +591,9 @@ where
/// Send stream level window update /// Send stream level window update
pub fn send_stream_window_updates<T>( pub fn send_stream_window_updates<T, B>(
&mut self, &mut self,
store: &mut Store<B, P>, store: &mut Store,
dst: &mut Codec<T, Prioritized<B>>, dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error> ) -> Poll<(), io::Error>
where where
@@ -632,11 +636,11 @@ where
} }
} }
pub fn next_incoming(&mut self, store: &mut Store<B, P>) -> Option<store::Key> { pub fn next_incoming(&mut self, store: &mut Store) -> Option<store::Key> {
self.pending_accept.pop(store).map(|ptr| ptr.key()) self.pending_accept.pop(store).map(|ptr| ptr.key())
} }
pub fn poll_data(&mut self, stream: &mut Stream<B, P>) -> Poll<Option<Bytes>, proto::Error> { pub fn poll_data(&mut self, stream: &mut Stream) -> Poll<Option<Bytes>, proto::Error> {
// TODO: Return error when the stream is reset // TODO: Return error when the stream is reset
match stream.pending_recv.pop_front(&mut self.buffer) { match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Data(payload)) => Ok(Some(payload).into()), Some(Event::Data(payload)) => Ok(Some(payload).into()),
@@ -653,7 +657,7 @@ where
pub fn poll_trailers( pub fn poll_trailers(
&mut self, &mut self,
stream: &mut Stream<B, P>, stream: &mut Stream,
) -> Poll<Option<HeaderMap>, proto::Error> { ) -> Poll<Option<HeaderMap>, proto::Error> {
match stream.pending_recv.pop_front(&mut self.buffer) { match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Trailers(trailers)) => Ok(Some(trailers).into()), Some(Event::Trailers(trailers)) => Ok(Some(trailers).into()),
@@ -667,7 +671,7 @@ where
} }
} }
fn schedule_recv<T>(&mut self, stream: &mut Stream<B, P>) -> Poll<Option<T>, proto::Error> { fn schedule_recv<T>(&mut self, stream: &mut Stream) -> Poll<Option<T>, proto::Error> {
if stream.state.ensure_recv_open()? { if stream.state.ensure_recv_open()? {
// Request to get notified once more frames arrive // Request to get notified once more frames arrive
stream.recv_task = Some(task::current()); stream.recv_task = Some(task::current());
@@ -679,45 +683,9 @@ where
} }
} }
impl<B> Recv<B, server::Peer>
where
B: Buf,
{
/// TODO: Should this fn return `Result`?
pub fn take_request(&mut self, stream: &mut store::Ptr<B, server::Peer>) -> Request<()> {
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Headers(request)) => request,
_ => panic!(),
}
}
}
impl<B> Recv<B, client::Peer>
where
B: Buf,
{
pub fn poll_response(
&mut self,
stream: &mut store::Ptr<B, client::Peer>,
) -> Poll<Response<()>, proto::Error> {
// If the buffer is not empty, then the first frame must be a HEADERS
// frame or the user violated the contract.
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Headers(response)) => Ok(response.into()),
Some(_) => panic!("poll_response called after response returned"),
None => {
stream.state.ensure_recv_open()?;
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
},
}
}
}
// ===== impl Event ===== // ===== impl Event =====
impl<T> Event<T> { impl Event {
fn is_data(&self) -> bool { fn is_data(&self) -> bool {
match *self { match *self {
Event::Data(..) => true, Event::Data(..) => true,

View File

@@ -10,10 +10,7 @@ use std::{cmp, io};
/// Manages state transitions related to outbound frames. /// Manages state transitions related to outbound frames.
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Send<B, P> pub(super) struct Send {
where
P: Peer,
{
/// Stream identifier to use for next initialized stream. /// Stream identifier to use for next initialized stream.
next_stream_id: Result<StreamId, StreamIdOverflow>, next_stream_id: Result<StreamId, StreamIdOverflow>,
@@ -21,13 +18,10 @@ where
init_window_sz: WindowSize, init_window_sz: WindowSize,
/// Prioritization layer /// Prioritization layer
prioritize: Prioritize<B, P>, prioritize: Prioritize,
} }
impl<B, P> Send<B, P> impl Send {
where
P: Peer,
{
/// Create a new `Send` /// Create a new `Send`
pub fn new(config: &Config) -> Self { pub fn new(config: &Config) -> Self {
Send { Send {
@@ -43,16 +37,17 @@ where
} }
pub fn open(&mut self) -> Result<StreamId, UserError> { pub fn open(&mut self) -> Result<StreamId, UserError> {
let stream_id = self.try_open()?; let stream_id = self.ensure_next_stream_id()?;
self.next_stream_id = stream_id.next_id(); self.next_stream_id = stream_id.next_id();
Ok(stream_id) Ok(stream_id)
} }
pub fn send_headers( pub fn send_headers<B>(
&mut self, &mut self,
frame: frame::Headers, frame: frame::Headers,
stream: &mut store::Ptr<B, P>, buffer: &mut Buffer<Frame<B>>,
counts: &mut Counts<P>, stream: &mut store::Ptr,
counts: &mut Counts,
task: &mut Option<Task>, task: &mut Option<Task>,
) -> Result<(), UserError> { ) -> Result<(), UserError> {
trace!( trace!(
@@ -66,7 +61,7 @@ where
// Update the state // Update the state
stream.state.send_open(end_stream)?; stream.state.send_open(end_stream)?;
if P::is_local_init(frame.stream_id()) { if counts.peer().is_local_init(frame.stream_id()) {
if counts.can_inc_num_send_streams() { if counts.can_inc_num_send_streams() {
counts.inc_num_send_streams(); counts.inc_num_send_streams();
} else { } else {
@@ -75,42 +70,42 @@ where
} }
// Queue the frame for sending // Queue the frame for sending
self.prioritize.queue_frame(frame.into(), stream, task); self.prioritize.queue_frame(frame.into(), buffer, stream, task);
Ok(()) Ok(())
} }
/// Send an RST_STREAM frame /// Send an explicit RST_STREAM frame
/// ///
/// # Arguments /// # Arguments
/// + `reason`: the error code for the RST_STREAM frame /// + `reason`: the error code for the RST_STREAM frame
/// + `clear_queue`: if true, all pending outbound frames will be cleared, /// + `clear_queue`: if true, all pending outbound frames will be cleared,
/// if false, the RST_STREAM frame will be appended to the end of the /// if false, the RST_STREAM frame will be appended to the end of the
/// send queue. /// send queue.
pub fn send_reset( pub fn send_reset<B>(
&mut self, &mut self,
reason: Reason, reason: Reason,
stream: &mut store::Ptr<B, P>, buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
task: &mut Option<Task>, task: &mut Option<Task>,
clear_queue: bool,
) { ) {
let is_reset = stream.state.is_reset(); let is_reset = stream.state.is_reset();
let is_closed = stream.state.is_closed(); let is_closed = stream.state.is_closed();
let is_empty = stream.pending_send.is_empty(); let is_empty = stream.pending_send.is_empty();
trace!( trace!(
"send_reset(..., reason={:?}, stream={:?}, ..., \ "send_reset(..., reason={:?}, stream={:?}, ..., \
clear_queue={:?});\n\
is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
state={:?} \ state={:?} \
", ",
stream.id, stream.id,
reason, reason,
clear_queue,
is_reset, is_reset,
is_closed, is_closed,
is_empty, is_empty,
stream.state stream.state
); );
if is_reset { if is_reset {
// Don't double reset // Don't double reset
trace!( trace!(
@@ -122,7 +117,7 @@ where
// If closed AND the send queue is flushed, then the stream cannot be // If closed AND the send queue is flushed, then the stream cannot be
// reset explicitly, either. Implicit resets can still be queued. // reset explicitly, either. Implicit resets can still be queued.
if is_closed && (is_empty || !clear_queue) { if is_closed && is_empty {
trace!( trace!(
" -> not sending explicit RST_STREAM ({:?} was closed \ " -> not sending explicit RST_STREAM ({:?} was closed \
and send queue was flushed)", and send queue was flushed)",
@@ -134,41 +129,43 @@ where
// Transition the state // Transition the state
stream.state.set_reset(reason); stream.state.set_reset(reason);
// TODO: this could be a call to `recv_err`, but that will always self.recv_err(buffer, stream);
// clear the send queue. could we pass whether or not to clear
// the send queue to that method?
if clear_queue {
// Clear all pending outbound frames
self.prioritize.clear_queue(stream);
}
// Reclaim all capacity assigned to the stream and re-assign it to the
// connection
let available = stream.send_flow.available().as_size();
stream.send_flow.claim_capacity(available);
let frame = frame::Reset::new(stream.id, reason); let frame = frame::Reset::new(stream.id, reason);
trace!("send_reset -- queueing; frame={:?}", frame); trace!("send_reset -- queueing; frame={:?}", frame);
self.prioritize.queue_frame(frame.into(), stream, task); self.prioritize.queue_frame(frame.into(), buffer, stream, task);
} }
pub fn send_data( pub fn schedule_cancel(&mut self, stream: &mut store::Ptr, task: &mut Option<Task>) {
if stream.state.is_closed() {
// Stream is already closed, nothing more to do
return;
}
stream.state.set_canceled();
self.reclaim_capacity(stream);
self.prioritize.schedule_send(stream, task);
}
pub fn send_data<B>(
&mut self, &mut self,
frame: frame::Data<B>, frame: frame::Data<B>,
stream: &mut store::Ptr<B, P>, buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
task: &mut Option<Task>, task: &mut Option<Task>,
) -> Result<(), UserError> ) -> Result<(), UserError>
where where B: Buf,
B: Buf,
{ {
self.prioritize.send_data(frame, stream, task) self.prioritize.send_data(frame, buffer, stream, task)
} }
pub fn send_trailers( pub fn send_trailers<B>(
&mut self, &mut self,
frame: frame::Headers, frame: frame::Headers,
stream: &mut store::Ptr<B, P>, buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
task: &mut Option<Task>, task: &mut Option<Task>,
) -> Result<(), UserError> { ) -> Result<(), UserError> {
// TODO: Should this logic be moved into state.rs? // TODO: Should this logic be moved into state.rs?
@@ -179,7 +176,7 @@ where
stream.state.send_close(); stream.state.send_close();
trace!("send_trailers -- queuing; frame={:?}", frame); trace!("send_trailers -- queuing; frame={:?}", frame);
self.prioritize.queue_frame(frame.into(), stream, task); self.prioritize.queue_frame(frame.into(), buffer, stream, task);
// Release any excess capacity // Release any excess capacity
self.prioritize.reserve_capacity(0, stream); self.prioritize.reserve_capacity(0, stream);
@@ -187,27 +184,27 @@ where
Ok(()) Ok(())
} }
pub fn poll_complete<T>( pub fn poll_complete<T, B>(
&mut self, &mut self,
store: &mut Store<B, P>, buffer: &mut Buffer<Frame<B>>,
counts: &mut Counts<P>, store: &mut Store,
counts: &mut Counts,
dst: &mut Codec<T, Prioritized<B>>, dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error> ) -> Poll<(), io::Error>
where where T: AsyncWrite,
T: AsyncWrite, B: Buf,
B: Buf,
{ {
self.prioritize.poll_complete(store, counts, dst) self.prioritize.poll_complete(buffer, store, counts, dst)
} }
/// Request capacity to send data /// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr<B, P>) { pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) {
self.prioritize.reserve_capacity(capacity, stream) self.prioritize.reserve_capacity(capacity, stream)
} }
pub fn poll_capacity( pub fn poll_capacity(
&mut self, &mut self,
stream: &mut store::Ptr<B, P>, stream: &mut store::Ptr,
) -> Poll<Option<WindowSize>, UserError> { ) -> Poll<Option<WindowSize>, UserError> {
if !stream.state.is_send_streaming() { if !stream.state.is_send_streaming() {
return Ok(Async::Ready(None)); return Ok(Async::Ready(None));
@@ -224,7 +221,7 @@ where
} }
/// Current available stream send capacity /// Current available stream send capacity
pub fn capacity(&self, stream: &mut store::Ptr<B, P>) -> WindowSize { pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize {
let available = stream.send_flow.available().as_size(); let available = stream.send_flow.available().as_size();
let buffered = stream.buffered_send_data; let buffered = stream.buffered_send_data;
@@ -238,21 +235,25 @@ where
pub fn recv_connection_window_update( pub fn recv_connection_window_update(
&mut self, &mut self,
frame: frame::WindowUpdate, frame: frame::WindowUpdate,
store: &mut Store<B, P>, store: &mut Store,
) -> Result<(), Reason> { ) -> Result<(), Reason> {
self.prioritize self.prioritize
.recv_connection_window_update(frame.size_increment(), store) .recv_connection_window_update(frame.size_increment(), store)
} }
pub fn recv_stream_window_update( pub fn recv_stream_window_update<B>(
&mut self, &mut self,
sz: WindowSize, sz: WindowSize,
stream: &mut store::Ptr<B, P>, buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
task: &mut Option<Task>, task: &mut Option<Task>,
) -> Result<(), Reason> { ) -> Result<(), Reason> {
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
debug!("recv_stream_window_update !!; err={:?}", e); debug!("recv_stream_window_update !!; err={:?}", e);
self.send_reset(Reason::FLOW_CONTROL_ERROR.into(), stream, task, true);
self.send_reset(
Reason::FLOW_CONTROL_ERROR.into(),
buffer, stream, task);
return Err(e); return Err(e);
} }
@@ -260,10 +261,17 @@ where
Ok(()) Ok(())
} }
pub fn recv_err(&mut self, stream: &mut store::Ptr<B, P>) { pub fn recv_err<B>(
&mut self,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr
) {
// Clear all pending outbound frames // Clear all pending outbound frames
self.prioritize.clear_queue(stream); self.prioritize.clear_queue(buffer, stream);
self.reclaim_capacity(stream);
}
fn reclaim_capacity(&mut self, stream: &mut store::Ptr) {
// Reclaim all capacity assigned to the stream and re-assign it to the // Reclaim all capacity assigned to the stream and re-assign it to the
// connection // connection
let available = stream.send_flow.available().as_size(); let available = stream.send_flow.available().as_size();
@@ -273,10 +281,11 @@ where
.assign_connection_capacity(available, stream); .assign_connection_capacity(available, stream);
} }
pub fn apply_remote_settings( pub fn apply_remote_settings<B>(
&mut self, &mut self,
settings: &frame::Settings, settings: &frame::Settings,
store: &mut Store<B, P>, buffer: &mut Buffer<Frame<B>>,
store: &mut Store,
task: &mut Option<Task>, task: &mut Option<Task>,
) -> Result<(), RecvError> { ) -> Result<(), RecvError> {
// Applies an update to the remote endpoint's initial window size. // Applies an update to the remote endpoint's initial window size.
@@ -336,7 +345,7 @@ where
let inc = val - old_val; let inc = val - old_val;
store.for_each(|mut stream| { store.for_each(|mut stream| {
self.recv_stream_window_update(inc, &mut stream, task) self.recv_stream_window_update(inc, buffer, &mut stream, task)
.map_err(RecvError::Connection) .map_err(RecvError::Connection)
})?; })?;
} }
@@ -359,14 +368,4 @@ where
pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> { pub fn ensure_next_stream_id(&self) -> Result<StreamId, UserError> {
self.next_stream_id.map_err(|_| OverflowedStreamId) self.next_stream_id.map_err(|_| OverflowedStreamId)
} }
/// Returns a new StreamId if the local actor can initiate a new stream.
fn try_open(&self) -> Result<StreamId, UserError> {
if P::is_server() {
// Servers cannot open streams. PushPromise must first be reserved.
return Err(UnexpectedFrameType);
}
self.ensure_next_stream_id()
}
} }

View File

@@ -74,6 +74,11 @@ enum Peer {
enum Cause { enum Cause {
Proto(Reason), Proto(Reason),
Io, Io,
/// The user droped all handles to the stream without explicitly canceling.
/// This indicates to the connection that a reset frame must be sent out
/// once the send queue has been flushed.
Canceled,
} }
impl State { impl State {
@@ -238,10 +243,24 @@ impl State {
/// Set the stream state to reset /// Set the stream state to reset
pub fn set_reset(&mut self, reason: Reason) { pub fn set_reset(&mut self, reason: Reason) {
debug_assert!(!self.is_reset());
self.inner = Closed(Some(Cause::Proto(reason))); self.inner = Closed(Some(Cause::Proto(reason)));
} }
/// Set the stream state to canceled
pub fn set_canceled(&mut self) {
debug_assert!(!self.is_closed());
self.inner = Closed(Some(Cause::Canceled));
}
pub fn is_canceled(&self) -> bool {
use self::Cause::Canceled;
match self.inner {
Closed(Some(Canceled)) => true,
_ => false,
}
}
/// Returns true if the stream is already reset. /// Returns true if the stream is already reset.
pub fn is_reset(&self) -> bool { pub fn is_reset(&self) -> bool {
match self.inner { match self.inner {
@@ -318,6 +337,7 @@ impl State {
// TODO: Is this correct? // TODO: Is this correct?
match self.inner { match self.inner {
Closed(Some(Cause::Proto(reason))) => Err(proto::Error::Proto(reason)), Closed(Some(Cause::Proto(reason))) => Err(proto::Error::Proto(reason)),
Closed(Some(Cause::Canceled)) => Err(proto::Error::Proto(Reason::CANCEL)),
Closed(Some(Cause::Io)) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), Closed(Some(Cause::Io)) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())),
Closed(None) | HalfClosedRemote(..) => Ok(false), Closed(None) | HalfClosedRemote(..) => Ok(false),
_ => Ok(true), _ => Ok(true),

View File

@@ -9,22 +9,16 @@ use std::ops;
/// Storage for streams /// Storage for streams
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Store<B, P> pub(super) struct Store {
where slab: slab::Slab<(StoreId, Stream)>,
P: Peer,
{
slab: slab::Slab<(StoreId, Stream<B, P>)>,
ids: OrderMap<StreamId, (usize, StoreId)>, ids: OrderMap<StreamId, (usize, StoreId)>,
counter: StoreId, counter: StoreId,
} }
/// "Pointer" to an entry in the store /// "Pointer" to an entry in the store
pub(super) struct Ptr<'a, B: 'a, P> pub(super) struct Ptr<'a> {
where
P: Peer + 'a,
{
key: Key, key: Key,
store: &'a mut Store<B, P>, store: &'a mut Store,
} }
/// References an entry in the store. /// References an entry in the store.
@@ -37,24 +31,21 @@ pub(crate) struct Key {
type StoreId = usize; type StoreId = usize;
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Queue<B, N, P> pub(super) struct Queue<N> {
where
P: Peer,
{
indices: Option<store::Indices>, indices: Option<store::Indices>,
_p: PhantomData<(B, N, P)>, _p: PhantomData<N>,
} }
pub(super) trait Next { pub(super) trait Next {
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<Key>; fn next(stream: &Stream) -> Option<Key>;
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<Key>); fn set_next(stream: &mut Stream, key: Option<Key>);
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<Key>; fn take_next(stream: &mut Stream) -> Option<Key>;
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool; fn is_queued(stream: &Stream) -> bool;
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool); fn set_queued(stream: &mut Stream, val: bool);
} }
/// A linked list /// A linked list
@@ -64,37 +55,28 @@ struct Indices {
pub tail: Key, pub tail: Key,
} }
pub(super) enum Entry<'a, B: 'a, P: Peer + 'a> { pub(super) enum Entry<'a> {
Occupied(OccupiedEntry<'a>), Occupied(OccupiedEntry<'a>),
Vacant(VacantEntry<'a, B, P>), Vacant(VacantEntry<'a>),
} }
pub(super) struct OccupiedEntry<'a> { pub(super) struct OccupiedEntry<'a> {
ids: ordermap::OccupiedEntry<'a, StreamId, (usize, StoreId)>, ids: ordermap::OccupiedEntry<'a, StreamId, (usize, StoreId)>,
} }
pub(super) struct VacantEntry<'a, B: 'a, P> pub(super) struct VacantEntry<'a> {
where
P: Peer + 'a,
{
ids: ordermap::VacantEntry<'a, StreamId, (usize, StoreId)>, ids: ordermap::VacantEntry<'a, StreamId, (usize, StoreId)>,
slab: &'a mut slab::Slab<(StoreId, Stream<B, P>)>, slab: &'a mut slab::Slab<(StoreId, Stream)>,
counter: &'a mut usize, counter: &'a mut usize,
} }
pub(super) trait Resolve<B, P> pub(super) trait Resolve {
where fn resolve(&mut self, key: Key) -> Ptr;
P: Peer,
{
fn resolve(&mut self, key: Key) -> Ptr<B, P>;
} }
// ===== impl Store ===== // ===== impl Store =====
impl<B, P> Store<B, P> impl Store {
where
P: Peer,
{
pub fn new() -> Self { pub fn new() -> Self {
Store { Store {
slab: slab::Slab::new(), slab: slab::Slab::new(),
@@ -103,7 +85,7 @@ where
} }
} }
pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr<B, P>> { pub fn find_mut(&mut self, id: &StreamId) -> Option<Ptr> {
let key = match self.ids.get(id) { let key = match self.ids.get(id) {
Some(key) => *key, Some(key) => *key,
None => return None, None => return None,
@@ -118,7 +100,7 @@ where
}) })
} }
pub fn insert(&mut self, id: StreamId, val: Stream<B, P>) -> Ptr<B, P> { pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr {
let store_id = self.counter; let store_id = self.counter;
self.counter = self.counter.wrapping_add(1); self.counter = self.counter.wrapping_add(1);
let key = self.slab.insert((store_id, val)); let key = self.slab.insert((store_id, val));
@@ -133,7 +115,7 @@ where
} }
} }
pub fn find_entry(&mut self, id: StreamId) -> Entry<B, P> { pub fn find_entry(&mut self, id: StreamId) -> Entry {
use self::ordermap::Entry::*; use self::ordermap::Entry::*;
match self.ids.entry(id) { match self.ids.entry(id) {
@@ -150,7 +132,7 @@ where
pub fn for_each<F, E>(&mut self, mut f: F) -> Result<(), E> pub fn for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
where where
F: FnMut(Ptr<B, P>) -> Result<(), E>, F: FnMut(Ptr) -> Result<(), E>,
{ {
let mut len = self.ids.len(); let mut len = self.ids.len();
let mut i = 0; let mut i = 0;
@@ -182,11 +164,8 @@ where
} }
} }
impl<B, P> Resolve<B, P> for Store<B, P> impl Resolve for Store {
where fn resolve(&mut self, key: Key) -> Ptr {
P: Peer,
{
fn resolve(&mut self, key: Key) -> Ptr<B, P> {
Ptr { Ptr {
key: key, key: key,
store: self, store: self,
@@ -194,11 +173,8 @@ where
} }
} }
impl<B, P> ops::Index<Key> for Store<B, P> impl ops::Index<Key> for Store {
where type Output = Stream;
P: Peer,
{
type Output = Stream<B, P>;
fn index(&self, key: Key) -> &Self::Output { fn index(&self, key: Key) -> &Self::Output {
let slot = self.slab.index(key.index); let slot = self.slab.index(key.index);
@@ -207,10 +183,7 @@ where
} }
} }
impl<B, P> ops::IndexMut<Key> for Store<B, P> impl ops::IndexMut<Key> for Store {
where
P: Peer,
{
fn index_mut(&mut self, key: Key) -> &mut Self::Output { fn index_mut(&mut self, key: Key) -> &mut Self::Output {
let slot = self.slab.index_mut(key.index); let slot = self.slab.index_mut(key.index);
assert_eq!(slot.0, key.store_id); assert_eq!(slot.0, key.store_id);
@@ -218,10 +191,7 @@ where
} }
} }
impl<B, P> Store<B, P> impl Store {
where
P: Peer,
{
pub fn num_active_streams(&self) -> usize { pub fn num_active_streams(&self) -> usize {
self.ids.len() self.ids.len()
} }
@@ -234,10 +204,9 @@ where
// ===== impl Queue ===== // ===== impl Queue =====
impl<B, N, P> Queue<B, N, P> impl<N> Queue<N>
where where
N: Next, N: Next,
P: Peer,
{ {
pub fn new() -> Self { pub fn new() -> Self {
Queue { Queue {
@@ -260,7 +229,7 @@ where
/// Queue the stream. /// Queue the stream.
/// ///
/// If the stream is already contained by the list, return `false`. /// If the stream is already contained by the list, return `false`.
pub fn push(&mut self, stream: &mut store::Ptr<B, P>) -> bool { pub fn push(&mut self, stream: &mut store::Ptr) -> bool {
trace!("Queue::push"); trace!("Queue::push");
if N::is_queued(stream) { if N::is_queued(stream) {
@@ -297,9 +266,9 @@ where
true true
} }
pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a, B, P>> pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a>>
where where
R: Resolve<B, P>, R: Resolve,
{ {
if let Some(mut idxs) = self.indices { if let Some(mut idxs) = self.indices {
let mut stream = store.resolve(idxs.head); let mut stream = store.resolve(idxs.head);
@@ -324,10 +293,7 @@ where
// ===== impl Ptr ===== // ===== impl Ptr =====
impl<'a, B: 'a, P> Ptr<'a, B, P> impl<'a> Ptr<'a> {
where
P: Peer,
{
/// Returns the Key associated with the stream /// Returns the Key associated with the stream
pub fn key(&self) -> Key { pub fn key(&self) -> Key {
self.key self.key
@@ -352,11 +318,8 @@ where
} }
} }
impl<'a, B: 'a, P> Resolve<B, P> for Ptr<'a, B, P> impl<'a> Resolve for Ptr<'a> {
where fn resolve(&mut self, key: Key) -> Ptr {
P: Peer,
{
fn resolve(&mut self, key: Key) -> Ptr<B, P> {
Ptr { Ptr {
key: key, key: key,
store: &mut *self.store, store: &mut *self.store,
@@ -364,22 +327,16 @@ where
} }
} }
impl<'a, B: 'a, P> ops::Deref for Ptr<'a, B, P> impl<'a> ops::Deref for Ptr<'a> {
where type Target = Stream;
P: Peer,
{
type Target = Stream<B, P>;
fn deref(&self) -> &Stream<B, P> { fn deref(&self) -> &Stream {
&self.store.slab[self.key.index].1 &self.store.slab[self.key.index].1
} }
} }
impl<'a, B: 'a, P> ops::DerefMut for Ptr<'a, B, P> impl<'a> ops::DerefMut for Ptr<'a> {
where fn deref_mut(&mut self) -> &mut Stream {
P: Peer,
{
fn deref_mut(&mut self) -> &mut Stream<B, P> {
&mut self.store.slab[self.key.index].1 &mut self.store.slab[self.key.index].1
} }
} }
@@ -398,11 +355,8 @@ impl<'a> OccupiedEntry<'a> {
// ===== impl VacantEntry ===== // ===== impl VacantEntry =====
impl<'a, B, P> VacantEntry<'a, B, P> impl<'a> VacantEntry<'a> {
where pub fn insert(self, value: Stream) -> Key {
P: Peer,
{
pub fn insert(self, value: Stream<B, P>) -> Key {
// Insert the value in the slab // Insert the value in the slab
let store_id = *self.counter; let store_id = *self.counter;
*self.counter = store_id.wrapping_add(1); *self.counter = store_id.wrapping_add(1);

View File

@@ -14,10 +14,7 @@ use std::usize;
/// (such as an accept queue), this is **not** tracked by a reference count. /// (such as an accept queue), this is **not** tracked by a reference count.
/// Thus, `ref_count` can be zero and the stream still has to be kept around. /// Thus, `ref_count` can be zero and the stream still has to be kept around.
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Stream<B, P> pub(super) struct Stream {
where
P: Peer,
{
/// The h2 stream identifier /// The h2 stream identifier
pub id: StreamId, pub id: StreamId,
@@ -48,7 +45,7 @@ where
pub send_task: Option<task::Task>, pub send_task: Option<task::Task>,
/// Frames pending for this stream being sent to the socket /// Frames pending for this stream being sent to the socket
pub pending_send: buffer::Deque<Frame<B>>, pub pending_send: buffer::Deque,
/// Next node in the linked list of streams waiting for additional /// Next node in the linked list of streams waiting for additional
/// connection level capacity. /// connection level capacity.
@@ -88,13 +85,13 @@ where
pub is_pending_window_update: bool, pub is_pending_window_update: bool,
/// Frames pending for this stream to read /// Frames pending for this stream to read
pub pending_recv: buffer::Deque<recv::Event<P::Poll>>, pub pending_recv: buffer::Deque,
/// Task tracking receiving frames /// Task tracking receiving frames
pub recv_task: Option<task::Task>, pub recv_task: Option<task::Task>,
/// The stream's pending push promises /// The stream's pending push promises
pub pending_push_promises: store::Queue<B, NextAccept, P>, pub pending_push_promises: store::Queue<NextAccept>,
/// Validate content-length headers /// Validate content-length headers
pub content_length: ContentLength, pub content_length: ContentLength,
@@ -123,15 +120,12 @@ pub(super) struct NextWindowUpdate;
#[derive(Debug)] #[derive(Debug)]
pub(super) struct NextOpen; pub(super) struct NextOpen;
impl<B, P> Stream<B, P> impl Stream {
where
P: Peer,
{
pub fn new( pub fn new(
id: StreamId, id: StreamId,
init_send_window: WindowSize, init_send_window: WindowSize,
init_recv_window: WindowSize, init_recv_window: WindowSize,
) -> Stream<B, P> { ) -> Stream {
let mut send_flow = FlowControl::new(); let mut send_flow = FlowControl::new();
let mut recv_flow = FlowControl::new(); let mut recv_flow = FlowControl::new();
@@ -224,6 +218,15 @@ where
!self.is_pending_accept && !self.is_pending_window_update !self.is_pending_accept && !self.is_pending_window_update
} }
/// Returns true when the consumer of the stream has dropped all handles
/// (indicating no further interest in the stream) and the stream state is
/// not actually closed.
///
/// In this case, a reset should be sent.
pub fn is_canceled_interest(&self) -> bool {
self.ref_count == 0 && !self.state.is_recv_closed()
}
pub fn assign_capacity(&mut self, capacity: WindowSize) { pub fn assign_capacity(&mut self, capacity: WindowSize) {
debug_assert!(capacity > 0); debug_assert!(capacity > 0);
self.send_capacity_inc = true; self.send_capacity_inc = true;
@@ -279,111 +282,111 @@ where
} }
impl store::Next for NextAccept { impl store::Next for NextAccept {
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<store::Key> { fn next(stream: &Stream) -> Option<store::Key> {
stream.next_pending_accept stream.next_pending_accept
} }
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<store::Key>) { fn set_next(stream: &mut Stream, key: Option<store::Key>) {
stream.next_pending_accept = key; stream.next_pending_accept = key;
} }
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<store::Key> { fn take_next(stream: &mut Stream) -> Option<store::Key> {
stream.next_pending_accept.take() stream.next_pending_accept.take()
} }
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool { fn is_queued(stream: &Stream) -> bool {
stream.is_pending_accept stream.is_pending_accept
} }
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool) { fn set_queued(stream: &mut Stream, val: bool) {
stream.is_pending_accept = val; stream.is_pending_accept = val;
} }
} }
impl store::Next for NextSend { impl store::Next for NextSend {
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<store::Key> { fn next(stream: &Stream) -> Option<store::Key> {
stream.next_pending_send stream.next_pending_send
} }
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<store::Key>) { fn set_next(stream: &mut Stream, key: Option<store::Key>) {
stream.next_pending_send = key; stream.next_pending_send = key;
} }
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<store::Key> { fn take_next(stream: &mut Stream) -> Option<store::Key> {
stream.next_pending_send.take() stream.next_pending_send.take()
} }
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool { fn is_queued(stream: &Stream) -> bool {
stream.is_pending_send stream.is_pending_send
} }
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool) { fn set_queued(stream: &mut Stream, val: bool) {
stream.is_pending_send = val; stream.is_pending_send = val;
} }
} }
impl store::Next for NextSendCapacity { impl store::Next for NextSendCapacity {
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<store::Key> { fn next(stream: &Stream) -> Option<store::Key> {
stream.next_pending_send_capacity stream.next_pending_send_capacity
} }
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<store::Key>) { fn set_next(stream: &mut Stream, key: Option<store::Key>) {
stream.next_pending_send_capacity = key; stream.next_pending_send_capacity = key;
} }
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<store::Key> { fn take_next(stream: &mut Stream) -> Option<store::Key> {
stream.next_pending_send_capacity.take() stream.next_pending_send_capacity.take()
} }
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool { fn is_queued(stream: &Stream) -> bool {
stream.is_pending_send_capacity stream.is_pending_send_capacity
} }
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool) { fn set_queued(stream: &mut Stream, val: bool) {
stream.is_pending_send_capacity = val; stream.is_pending_send_capacity = val;
} }
} }
impl store::Next for NextWindowUpdate { impl store::Next for NextWindowUpdate {
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<store::Key> { fn next(stream: &Stream) -> Option<store::Key> {
stream.next_window_update stream.next_window_update
} }
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<store::Key>) { fn set_next(stream: &mut Stream, key: Option<store::Key>) {
stream.next_window_update = key; stream.next_window_update = key;
} }
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<store::Key> { fn take_next(stream: &mut Stream) -> Option<store::Key> {
stream.next_window_update.take() stream.next_window_update.take()
} }
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool { fn is_queued(stream: &Stream) -> bool {
stream.is_pending_window_update stream.is_pending_window_update
} }
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool) { fn set_queued(stream: &mut Stream, val: bool) {
stream.is_pending_window_update = val; stream.is_pending_window_update = val;
} }
} }
impl store::Next for NextOpen { impl store::Next for NextOpen {
fn next<B, P: Peer>(stream: &Stream<B, P>) -> Option<store::Key> { fn next(stream: &Stream) -> Option<store::Key> {
stream.next_open stream.next_open
} }
fn set_next<B, P: Peer>(stream: &mut Stream<B, P>, key: Option<store::Key>) { fn set_next(stream: &mut Stream, key: Option<store::Key>) {
stream.next_open = key; stream.next_open = key;
} }
fn take_next<B, P: Peer>(stream: &mut Stream<B, P>) -> Option<store::Key> { fn take_next(stream: &mut Stream) -> Option<store::Key> {
stream.next_open.take() stream.next_open.take()
} }
fn is_queued<B, P: Peer>(stream: &Stream<B, P>) -> bool { fn is_queued(stream: &Stream) -> bool {
stream.is_pending_open stream.is_pending_open
} }
fn set_queued<B, P: Peer>(stream: &mut Stream<B, P>, val: bool) { fn set_queued(stream: &mut Stream, val: bool) {
stream.is_pending_open = val; stream.is_pending_open = val;
} }
} }

View File

@@ -15,15 +15,33 @@ pub(crate) struct Streams<B, P>
where where
P: Peer, P: Peer,
{ {
inner: Arc<Mutex<Inner<B, P>>>, /// Holds most of the connection and stream related state for processing
/// HTTP/2.0 frames associated with streams.
inner: Arc<Mutex<Inner>>,
/// This is the queue of frames to be written to the wire. This is split out
/// to avoid requiring a `B` generic on all public API types even if `B` is
/// not technically required.
///
/// Currently, splitting this out requires a second `Arc` + `Mutex`.
/// However, it should be possible to avoid this duplication with a little
/// bit of unsafe code. This optimization has been postponed until it has
/// been shown to be necessary.
send_buffer: Arc<SendBuffer<B>>,
_p: ::std::marker::PhantomData<P>,
} }
/// Reference to the stream state /// Reference to the stream state
pub(crate) struct StreamRef<B, P> #[derive(Debug)]
where pub(crate) struct StreamRef<B> {
P: Peer, opaque: OpaqueStreamRef,
{ send_buffer: Arc<SendBuffer<B>>,
inner: Arc<Mutex<Inner<B, P>>>, }
/// Reference to the stream state that hides the send data chunk generic
pub(crate) struct OpaqueStreamRef {
inner: Arc<Mutex<Inner>>,
key: store::Key, key: store::Key,
} }
@@ -32,26 +50,24 @@ where
/// ///
/// TODO: better name /// TODO: better name
#[derive(Debug)] #[derive(Debug)]
struct Inner<B, P> struct Inner {
where
P: Peer,
{
/// Tracks send & recv stream concurrency. /// Tracks send & recv stream concurrency.
counts: Counts<P>, counts: Counts,
actions: Actions<B, P>,
store: Store<B, P>, /// Connection level state and performs actions on streams
actions: Actions,
/// Stores stream state
store: Store,
} }
#[derive(Debug)] #[derive(Debug)]
struct Actions<B, P> struct Actions {
where
P: Peer,
{
/// Manages state transitions initiated by receiving frames /// Manages state transitions initiated by receiving frames
recv: Recv<B, P>, recv: Recv,
/// Manages state transitions initiated by sending frames /// Manages state transitions initiated by sending frames
send: Send<B, P>, send: Send,
/// Task that calls `poll_complete`. /// Task that calls `poll_complete`.
task: Option<task::Task>, task: Option<task::Task>,
@@ -60,23 +76,35 @@ where
conn_error: Option<proto::Error>, conn_error: Option<proto::Error>,
} }
/// Contains the buffer of frames to be written to the wire.
#[derive(Debug)]
struct SendBuffer<B> {
inner: Mutex<Buffer<Frame<B>>>,
}
// ===== impl Streams =====
impl<B, P> Streams<B, P> impl<B, P> Streams<B, P>
where where
B: Buf, B: Buf,
P: Peer, P: Peer,
{ {
pub fn new(config: Config) -> Self { pub fn new(config: Config) -> Self {
let peer = P::dyn();
Streams { Streams {
inner: Arc::new(Mutex::new(Inner { inner: Arc::new(Mutex::new(Inner {
counts: Counts::new(&config), counts: Counts::new(peer, &config),
actions: Actions { actions: Actions {
recv: Recv::new(&config), recv: Recv::new(peer, &config),
send: Send::new(&config), send: Send::new(&config),
task: None, task: None,
conn_error: None, conn_error: None,
}, },
store: Store::new(), store: Store::new(),
})), })),
send_buffer: Arc::new(SendBuffer::new()),
_p: ::std::marker::PhantomData,
} }
} }
@@ -113,6 +141,8 @@ where
let stream = me.store.resolve(key); let stream = me.store.resolve(key);
let actions = &mut me.actions; let actions = &mut me.actions;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |counts, stream| { me.counts.transition(stream, |counts, stream| {
trace!( trace!(
@@ -132,7 +162,7 @@ where
actions.recv.recv_trailers(frame, stream) actions.recv.recv_trailers(frame, stream)
}; };
actions.reset_on_recv_stream_err(stream, res) actions.reset_on_recv_stream_err(send_buffer, stream, res)
}) })
} }
@@ -148,10 +178,12 @@ where
}; };
let actions = &mut me.actions; let actions = &mut me.actions;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |_, stream| { me.counts.transition(stream, |_, stream| {
let res = actions.recv.recv_data(frame, stream); let res = actions.recv.recv_data(frame, stream);
actions.reset_on_recv_stream_err(stream, res) actions.reset_on_recv_stream_err(send_buffer, stream, res)
}) })
} }
@@ -170,7 +202,7 @@ where
None => { None => {
// TODO: Are there other error cases? // TODO: Are there other error cases?
me.actions me.actions
.ensure_not_idle(id) .ensure_not_idle(me.counts.peer(), id)
.map_err(RecvError::Connection)?; .map_err(RecvError::Connection)?;
return Ok(()); return Ok(());
@@ -193,6 +225,8 @@ where
let actions = &mut me.actions; let actions = &mut me.actions;
let counts = &mut me.counts; let counts = &mut me.counts;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
let last_processed_id = actions.recv.last_processed_id(); let last_processed_id = actions.recv.last_processed_id();
@@ -200,7 +234,7 @@ where
.for_each(|stream| { .for_each(|stream| {
counts.transition(stream, |_, stream| { counts.transition(stream, |_, stream| {
actions.recv.recv_err(err, &mut *stream); actions.recv.recv_err(err, &mut *stream);
actions.send.recv_err(stream); actions.send.recv_err(send_buffer, stream);
Ok::<_, ()>(()) Ok::<_, ()>(())
}) })
}) })
@@ -217,6 +251,8 @@ where
let actions = &mut me.actions; let actions = &mut me.actions;
let counts = &mut me.counts; let counts = &mut me.counts;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
let last_stream_id = frame.last_stream_id(); let last_stream_id = frame.last_stream_id();
let err = frame.reason().into(); let err = frame.reason().into();
@@ -225,7 +261,7 @@ where
.for_each(|stream| if stream.id > last_stream_id { .for_each(|stream| if stream.id > last_stream_id {
counts.transition(stream, |_, stream| { counts.transition(stream, |_, stream| {
actions.recv.recv_err(&err, &mut *stream); actions.recv.recv_err(&err, &mut *stream);
actions.send.recv_err(stream); actions.send.recv_err(send_buffer, stream);
Ok::<_, ()>(()) Ok::<_, ()>(())
}) })
} else { } else {
@@ -245,6 +281,9 @@ where
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
if id.is_zero() { if id.is_zero() {
me.actions me.actions
.send .send
@@ -259,6 +298,7 @@ where
// the error is informational. // the error is informational.
let _ = me.actions.send.recv_stream_window_update( let _ = me.actions.send.recv_stream_window_update(
frame.size_increment(), frame.size_increment(),
send_buffer,
&mut stream, &mut stream,
&mut me.actions.task, &mut me.actions.task,
); );
@@ -284,12 +324,19 @@ where
None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)), None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)),
}; };
me.actions if me.counts.peer().is_server() {
.recv // The remote is a client and cannot reserve
.recv_push_promise(frame, &me.actions.send, stream, &mut me.store) trace!("recv_push_promise; error remote is client");
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
}
me.actions.recv.recv_push_promise(frame,
&me.actions.send,
stream,
&mut me.store)
} }
pub fn next_incoming(&mut self) -> Option<StreamRef<B, P>> { pub fn next_incoming(&mut self) -> Option<StreamRef<B>> {
let key = { let key = {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
@@ -308,8 +355,11 @@ where
key.map(|key| { key.map(|key| {
StreamRef { StreamRef {
inner: self.inner.clone(), opaque: OpaqueStreamRef {
key, inner: self.inner.clone(),
key,
},
send_buffer: self.send_buffer.clone(),
} }
}) })
} }
@@ -333,6 +383,9 @@ where
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
// Send WINDOW_UPDATE frames first // Send WINDOW_UPDATE frames first
// //
// TODO: It would probably be better to interleave updates w/ data // TODO: It would probably be better to interleave updates w/ data
@@ -341,6 +394,7 @@ where
// Send any other pending frames // Send any other pending frames
try_ready!(me.actions.send.poll_complete( try_ready!(me.actions.send.poll_complete(
send_buffer,
&mut me.store, &mut me.store,
&mut me.counts, &mut me.counts,
dst dst
@@ -356,11 +410,13 @@ where
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.apply_remote_settings(frame); me.counts.apply_remote_settings(frame);
me.actions me.actions.send.apply_remote_settings(
.send frame, send_buffer, &mut me.store, &mut me.actions.task)
.apply_remote_settings(frame, &mut me.store, &mut me.actions.task)
} }
pub fn send_request( pub fn send_request(
@@ -368,7 +424,7 @@ where
request: Request<()>, request: Request<()>,
end_of_stream: bool, end_of_stream: bool,
pending: Option<&store::Key>, pending: Option<&store::Key>,
) -> Result<StreamRef<B, P>, SendError> { ) -> Result<StreamRef<B>, SendError> {
use super::stream::ContentLength; use super::stream::ContentLength;
use http::Method; use http::Method;
@@ -381,6 +437,9 @@ where
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.actions.ensure_no_conn_error()?; me.actions.ensure_no_conn_error()?;
me.actions.send.ensure_next_stream_id()?; me.actions.send.ensure_next_stream_id()?;
@@ -396,6 +455,11 @@ where
} }
} }
if me.counts.peer().is_server() {
// Servers cannot open streams. PushPromise must first be reserved.
return Err(UserError::UnexpectedFrameType.into());
}
let stream_id = me.actions.send.open()?; let stream_id = me.actions.send.open()?;
let mut stream = Stream::new( let mut stream = Stream::new(
@@ -415,6 +479,7 @@ where
me.actions.send.send_headers( me.actions.send.send_headers(
headers, headers,
send_buffer,
&mut stream, &mut stream,
&mut me.counts, &mut me.counts,
&mut me.actions.task, &mut me.actions.task,
@@ -431,8 +496,11 @@ where
}; };
Ok(StreamRef { Ok(StreamRef {
inner: self.inner.clone(), opaque: OpaqueStreamRef {
key: key, inner: self.inner.clone(),
key: key,
},
send_buffer: self.send_buffer.clone(),
}) })
} }
@@ -454,11 +522,12 @@ where
let stream = me.store.resolve(key); let stream = me.store.resolve(key);
let actions = &mut me.actions; let actions = &mut me.actions;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |_, stream| { me.counts.transition(stream, |_, stream| {
actions actions.send.send_reset(
.send reason, send_buffer, stream, &mut actions.task)
.send_reset(reason, stream, &mut actions.task, true)
}) })
} }
} }
@@ -510,25 +579,26 @@ where
fn clone(&self) -> Self { fn clone(&self) -> Self {
Streams { Streams {
inner: self.inner.clone(), inner: self.inner.clone(),
send_buffer: self.send_buffer.clone(),
_p: ::std::marker::PhantomData,
} }
} }
} }
// ===== impl StreamRef ===== // ===== impl StreamRef =====
impl<B, P> StreamRef<B, P> impl<B> StreamRef<B> {
where
P: Peer,
{
pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError> pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError>
where where
B: Buf, B: Buf,
{ {
let mut me = self.inner.lock().unwrap(); let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let stream = me.store.resolve(self.key); let stream = me.store.resolve(self.opaque.key);
let actions = &mut me.actions; let actions = &mut me.actions;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |_, stream| { me.counts.transition(stream, |_, stream| {
// Create the data frame // Create the data frame
@@ -536,37 +606,41 @@ where
frame.set_end_stream(end_stream); frame.set_end_stream(end_stream);
// Send the data frame // Send the data frame
actions.send.send_data(frame, stream, &mut actions.task) actions.send.send_data(frame, send_buffer, stream, &mut actions.task)
}) })
} }
pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> { pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> {
let mut me = self.inner.lock().unwrap(); let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let stream = me.store.resolve(self.key); let stream = me.store.resolve(self.opaque.key);
let actions = &mut me.actions; let actions = &mut me.actions;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |_, stream| { me.counts.transition(stream, |_, stream| {
// Create the trailers frame // Create the trailers frame
let frame = frame::Headers::trailers(stream.id, trailers); let frame = frame::Headers::trailers(stream.id, trailers);
// Send the trailers frame // Send the trailers frame
actions.send.send_trailers(frame, stream, &mut actions.task) actions.send.send_trailers(
frame, send_buffer, stream, &mut actions.task)
}) })
} }
pub fn send_reset(&mut self, reason: Reason) { pub fn send_reset(&mut self, reason: Reason) {
let mut me = self.inner.lock().unwrap(); let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let stream = me.store.resolve(self.key); let stream = me.store.resolve(self.opaque.key);
let actions = &mut me.actions; let actions = &mut me.actions;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |_, stream| { me.counts.transition(stream, |_, stream| {
actions actions.send.send_reset(
.send reason, send_buffer, stream, &mut actions.task)
.send_reset(reason, stream, &mut actions.task, true)
}) })
} }
@@ -575,25 +649,107 @@ where
response: Response<()>, response: Response<()>,
end_of_stream: bool, end_of_stream: bool,
) -> Result<(), UserError> { ) -> Result<(), UserError> {
let mut me = self.inner.lock().unwrap(); let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let stream = me.store.resolve(self.key); let stream = me.store.resolve(self.opaque.key);
let actions = &mut me.actions; let actions = &mut me.actions;
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |counts, stream| { me.counts.transition(stream, |counts, stream| {
let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream); let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream);
actions actions.send.send_headers(
.send frame, send_buffer, stream, counts, &mut actions.task)
.send_headers(frame, stream, counts, &mut actions.task)
}) })
} }
pub fn body_is_empty(&self) -> bool /// Called by the server after the stream is accepted. Given that clients
where /// initialize streams by sending HEADERS, the request will always be
B: Buf, /// available.
///
/// # Panics
///
/// This function panics if the request isn't present.
pub fn take_request(&self) -> Request<()> {
let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.opaque.key);
me.actions.recv.take_request(&mut stream)
}
/// Called by a client to see if the current stream is pending open
pub fn is_pending_open(&self) -> bool {
let mut me = self.opaque.inner.lock().unwrap();
me.store.resolve(self.opaque.key).is_pending_open
}
/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize) {
let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.opaque.key);
me.actions.send.reserve_capacity(capacity, &mut stream)
}
/// Returns the stream's current send capacity.
pub fn capacity(&self) -> WindowSize {
let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.opaque.key);
me.actions.send.capacity(&mut stream)
}
/// Request to be notified when the stream's capacity increases
pub fn poll_capacity(&mut self) -> Poll<Option<WindowSize>, UserError> {
let mut me = self.opaque.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.opaque.key);
me.actions.send.poll_capacity(&mut stream)
}
pub(crate) fn key(&self) -> store::Key {
self.opaque.key
}
pub fn clone_to_opaque(&self) -> OpaqueStreamRef
where B: 'static,
{ {
self.opaque.clone()
}
}
impl<B> Clone for StreamRef<B> {
fn clone(&self) -> Self {
StreamRef {
opaque: self.opaque.clone(),
send_buffer: self.send_buffer.clone(),
}
}
}
// ===== impl OpaqueStreamRef =====
impl OpaqueStreamRef {
/// Called by a client to check for a received response.
pub fn poll_response(&mut self) -> Poll<Response<()>, proto::Error> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
me.actions.recv.poll_response(&mut stream)
}
pub fn body_is_empty(&self) -> bool {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
@@ -602,10 +758,7 @@ where
me.actions.recv.body_is_empty(&stream) me.actions.recv.body_is_empty(&stream)
} }
pub fn poll_data(&mut self) -> Poll<Option<Bytes>, proto::Error> pub fn poll_data(&mut self) -> Poll<Option<Bytes>, proto::Error> {
where
B: Buf,
{
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
@@ -614,10 +767,7 @@ where
me.actions.recv.poll_data(&mut stream) me.actions.recv.poll_data(&mut stream)
} }
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, proto::Error> pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, proto::Error> {
where
B: Buf,
{
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
@@ -628,10 +778,7 @@ where
/// Releases recv capacity back to the peer. This may result in sending /// Releases recv capacity back to the peer. This may result in sending
/// WINDOW_UPDATE frames on both the stream and connection. /// WINDOW_UPDATE frames on both the stream and connection.
pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {
where
B: Buf,
{
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
@@ -641,171 +788,88 @@ where
.recv .recv
.release_capacity(capacity, &mut stream, &mut me.actions.task) .release_capacity(capacity, &mut stream, &mut me.actions.task)
} }
/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
me.actions.send.reserve_capacity(capacity, &mut stream)
}
/// Returns the stream's current send capacity.
pub fn capacity(&self) -> WindowSize {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
me.actions.send.capacity(&mut stream)
}
/// Request to be notified when the stream's capacity increases
pub fn poll_capacity(&mut self) -> Poll<Option<WindowSize>, UserError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
me.actions.send.poll_capacity(&mut stream)
}
pub(crate) fn key(&self) -> store::Key {
self.key
}
} }
impl<B> StreamRef<B, server::Peer> impl fmt::Debug for OpaqueStreamRef {
where
B: Buf,
{
/// Called by the server after the stream is accepted. Given that clients
/// initialize streams by sending HEADERS, the request will always be
/// available.
///
/// # Panics
///
/// This function panics if the request isn't present.
pub fn take_request(&self) -> Request<()> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
me.actions.recv.take_request(&mut stream)
}
}
impl<B> StreamRef<B, client::Peer>
where
B: Buf,
{
pub fn poll_response(&mut self) -> Poll<Response<()>, proto::Error> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
me.actions.recv.poll_response(&mut stream)
}
pub fn is_pending_open(&self) -> bool {
let mut me = self.inner.lock().unwrap();
me.store.resolve(self.key).is_pending_open
}
}
impl<B, P> Clone for StreamRef<B, P>
where
P: Peer,
{
fn clone(&self) -> Self {
// Increment the ref count
self.inner.lock().unwrap().store.resolve(self.key).ref_inc();
StreamRef {
inner: self.inner.clone(),
key: self.key.clone(),
}
}
}
impl<B, P> fmt::Debug for StreamRef<B, P>
where
P: Peer,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self.inner.lock() { match self.inner.lock() {
Ok(me) => { Ok(me) => {
let stream = &me.store[self.key]; let stream = &me.store[self.key];
fmt.debug_struct("StreamRef") fmt.debug_struct("OpaqueStreamRef")
.field("stream_id", &stream.id) .field("stream_id", &stream.id)
.field("ref_count", &stream.ref_count) .field("ref_count", &stream.ref_count)
.finish() .finish()
}, },
Err(_poisoned) => fmt.debug_struct("StreamRef") Err(_poisoned) => fmt.debug_struct("OpaqueStreamRef")
.field("inner", &"<Poisoned>") .field("inner", &"<Poisoned>")
.finish(), .finish(),
} }
} }
} }
impl<B, P> Drop for StreamRef<B, P> impl Clone for OpaqueStreamRef {
where fn clone(&self) -> Self {
P: Peer, // Increment the ref count
{ self.inner.lock().unwrap().store.resolve(self.key).ref_inc();
OpaqueStreamRef {
inner: self.inner.clone(),
key: self.key.clone(),
}
}
}
impl Drop for OpaqueStreamRef {
fn drop(&mut self) { fn drop(&mut self) {
trace!("StreamRef::drop({:?})", self); drop_stream_ref(&self.inner, self.key);
let mut me = match self.inner.lock() { }
Ok(inner) => inner, }
Err(_) => if ::std::thread::panicking() {
trace!("StreamRef::drop; mutex poisoned");
return;
} else {
panic!("StreamRef::drop; mutex poisoned");
},
};
let me = &mut *me; // TODO: Move back in fn above
fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
let mut me = match inner.lock() {
Ok(inner) => inner,
Err(_) => if ::std::thread::panicking() {
trace!("StreamRef::drop; mutex poisoned");
return;
} else {
panic!("StreamRef::drop; mutex poisoned");
},
};
let mut stream = me.store.resolve(self.key); let me = &mut *me;
// decrement the stream's ref count by 1.
stream.ref_dec();
let actions = &mut me.actions; let mut stream = me.store.resolve(key);
// the reset must be sent inside a `transition` block. // decrement the stream's ref count by 1.
// `transition_after` will release the stream if it is stream.ref_dec();
// released.
let recv_closed = stream.state.is_recv_closed(); let actions = &mut me.actions;
me.counts.transition(stream, |_, stream|
// if this is the last reference to the stream, reset the stream. me.counts.transition(stream, |_, mut stream| {
if stream.ref_count == 0 && !recv_closed { if stream.is_canceled_interest() {
trace!( actions.send.schedule_cancel(
" -> last reference to {:?} was dropped, trying to reset", &mut stream,
stream.id, &mut actions.task);
); }
actions.send.send_reset( });
Reason::CANCEL, }
stream,
&mut actions.task, // ===== impl SendBuffer =====
false
); impl<B> SendBuffer<B> {
}); fn new() -> Self {
let inner = Mutex::new(Buffer::new());
SendBuffer { inner }
} }
} }
// ===== impl Actions ===== // ===== impl Actions =====
impl<B, P> Actions<B, P> impl Actions {
where fn reset_on_recv_stream_err<B>(
B: Buf,
P: Peer,
{
fn reset_on_recv_stream_err(
&mut self, &mut self,
stream: &mut store::Ptr<B, P>, buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
res: Result<(), RecvError>, res: Result<(), RecvError>,
) -> Result<(), RecvError> { ) -> Result<(), RecvError> {
if let Err(RecvError::Stream { if let Err(RecvError::Stream {
@@ -813,15 +877,15 @@ where
}) = res }) = res
{ {
// Reset the stream. // Reset the stream.
self.send.send_reset(reason, stream, &mut self.task, true); self.send.send_reset(reason, buffer, stream, &mut self.task);
Ok(()) Ok(())
} else { } else {
res res
} }
} }
fn ensure_not_idle(&mut self, id: StreamId) -> Result<(), Reason> { fn ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason> {
if P::is_local_init(id) { if peer.is_local_init(id) {
self.send.ensure_not_idle(id) self.send.ensure_not_idle(id)
} else { } else {
self.recv.ensure_not_idle(id) self.recv.ensure_not_idle(id)

View File

@@ -1,10 +1,11 @@
use {SendStream, RecvStream, ReleaseCapacity};
use codec::{Codec, RecvError}; use codec::{Codec, RecvError};
use frame::{self, Reason, Settings, StreamId}; use frame::{self, Reason, Settings, StreamId};
use proto::{self, Connection, WindowSize, Prioritized}; use proto::{self, Connection, Prioritized};
use bytes::{Buf, Bytes, IntoBuf}; use bytes::{Buf, Bytes, IntoBuf};
use futures::{self, Async, Future, Poll}; use futures::{self, Async, Future, Poll};
use http::{HeaderMap, Request, Response}; use http::{Request, Response};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use std::{convert, fmt, mem}; use std::{convert, fmt, mem};
@@ -27,28 +28,13 @@ pub struct Builder {
settings: Settings, settings: Settings,
} }
/// Respond to a request
///
///
/// Instances of `Respond` are used to send a respond or reserve push promises.
#[derive(Debug)] #[derive(Debug)]
pub struct Stream<B: IntoBuf> { pub struct Respond<B: IntoBuf> {
inner: proto::StreamRef<B::Buf, Peer>, inner: proto::StreamRef<B::Buf>,
}
pub struct Body<B: IntoBuf> {
inner: ReleaseCapacity<B>,
}
#[derive(Debug)]
pub struct ReleaseCapacity<B: IntoBuf> {
inner: proto::StreamRef<B::Buf, Peer>,
}
#[derive(Debug)]
pub struct Send<T> {
src: T,
dst: Option<Stream<Bytes>>,
// Pending data
buf: Option<Bytes>,
// True when this is the end of the stream
eos: bool,
} }
/// Stages of an in-progress handshake. /// Stages of an in-progress handshake.
@@ -103,6 +89,7 @@ impl<T, B> Server<T, B>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static, B: IntoBuf + 'static,
B::Buf: 'static,
{ {
fn handshake2(io: T, settings: Settings) -> Handshake<T, B> { fn handshake2(io: T, settings: Settings) -> Handshake<T, B> {
// Create the codec. // Create the codec.
@@ -141,8 +128,9 @@ impl<T, B> futures::Stream for Server<T, B>
where where
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static, B: IntoBuf + 'static,
B::Buf: 'static,
{ {
type Item = (Request<Body<B>>, Stream<B>); type Item = (Request<RecvStream>, Respond<B>);
type Error = ::Error; type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, ::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, ::Error> {
@@ -160,16 +148,12 @@ where
if let Some(inner) = self.connection.next_incoming() { if let Some(inner) = self.connection.next_incoming() {
trace!("received incoming"); trace!("received incoming");
let (head, _) = inner.take_request().into_parts(); let (head, _) = inner.take_request().into_parts();
let body = Body { let body = RecvStream::new(ReleaseCapacity::new(inner.clone_to_opaque()));
inner: ReleaseCapacity { inner: inner.clone() },
};
let request = Request::from_parts(head, body); let request = Request::from_parts(head, body);
let incoming = Stream { let respond = Respond { inner };
inner,
};
return Ok(Some((request, incoming)).into()); return Ok(Some((request, respond)).into());
} }
Ok(Async::NotReady) Ok(Async::NotReady)
@@ -229,179 +213,27 @@ impl Builder {
} }
} }
// ===== impl Stream ===== // ===== impl Respond =====
impl<B: IntoBuf> Stream<B> { impl<B: IntoBuf> Respond<B> {
/// Send a response /// Send a response
pub fn send_response( pub fn send_response(
&mut self, &mut self,
response: Response<()>, response: Response<()>,
end_of_stream: bool, end_of_stream: bool,
) -> Result<(), ::Error> { ) -> Result<SendStream<B>, ::Error> {
self.inner self.inner
.send_response(response, end_of_stream) .send_response(response, end_of_stream)
.map(|_| SendStream::new(self.inner.clone()))
.map_err(Into::into) .map_err(Into::into)
} }
/// Request capacity to send data /// Reset the stream
pub fn reserve_capacity(&mut self, capacity: usize) { pub fn send_reset(&mut self, reason: Reason) {
// TODO: Check for overflow
self.inner.reserve_capacity(capacity as WindowSize)
}
/// Returns the stream's current send capacity.
pub fn capacity(&self) -> usize {
self.inner.capacity() as usize
}
/// Request to be notified when the stream's capacity increases
pub fn poll_capacity(&mut self) -> Poll<Option<usize>, ::Error> {
let res = try_ready!(self.inner.poll_capacity());
Ok(Async::Ready(res.map(|v| v as usize)))
}
/// Send a single data frame
pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ::Error> {
self.inner
.send_data(data.into_buf(), end_of_stream)
.map_err(Into::into)
}
/// Send trailers
pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ::Error> {
self.inner.send_trailers(trailers).map_err(Into::into)
}
pub fn send_reset(mut self, reason: Reason) {
self.inner.send_reset(reason) self.inner.send_reset(reason)
} }
}
impl Stream<Bytes> { // TODO: Support reserving push promises.
/// Send the body
pub fn send<T>(self, src: T, end_of_stream: bool) -> Send<T>
where
T: futures::Stream<Item = Bytes, Error = ::Error>,
{
Send {
src: src,
dst: Some(self),
buf: None,
eos: end_of_stream,
}
}
}
// ===== impl Body =====
impl<B: IntoBuf> Body<B> {
pub fn is_empty(&self) -> bool {
// If the recv side is closed and the receive queue is empty, the body is empty.
self.inner.inner.body_is_empty()
}
pub fn release_capacity(&mut self) -> &mut ReleaseCapacity<B> {
&mut self.inner
}
/// Poll trailers
///
/// This function **must** not be called until `Body::poll` returns `None`.
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ::Error> {
self.inner.inner.poll_trailers().map_err(Into::into)
}
}
impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Bytes;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.inner.poll_data().map_err(Into::into)
}
}
impl<B: IntoBuf> fmt::Debug for Body<B>
where B: fmt::Debug,
B::Buf: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Body")
.field("inner", &self.inner)
.finish()
}
}
// ===== impl ReleaseCapacity =====
impl<B: IntoBuf> ReleaseCapacity<B> {
pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> {
self.inner
.release_capacity(sz as proto::WindowSize)
.map_err(Into::into)
}
}
impl<B: IntoBuf> Clone for ReleaseCapacity<B> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
ReleaseCapacity { inner }
}
}
// ===== impl Send =====
impl<T> Future for Send<T>
where
T: futures::Stream<Item = Bytes, Error = ::Error>,
{
type Item = Stream<Bytes>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if self.buf.is_none() {
// Get a chunk to send to the H2 stream
self.buf = try_ready!(self.src.poll());
}
match self.buf.take() {
Some(mut buf) => {
let dst = self.dst.as_mut().unwrap();
// Ask for the amount of capacity needed
dst.reserve_capacity(buf.len());
let cap = dst.capacity();
if cap == 0 {
self.buf = Some(buf);
// TODO: This seems kind of lame :(
try_ready!(dst.poll_capacity());
continue;
}
let chunk = buf.split_to(cap);
if !buf.is_empty() {
self.buf = Some(buf);
}
dst.send_data(chunk, false)?;
},
None => {
// TODO: It would be nice to not have to send an extra
// frame...
if self.eos {
self.dst.as_mut().unwrap().send_data(Bytes::new(), true)?;
}
return Ok(Async::Ready(self.dst.take().unwrap()));
},
}
}
}
} }
// ===== impl Flush ===== // ===== impl Flush =====
@@ -546,6 +378,10 @@ impl proto::Peer for Peer {
true true
} }
fn dyn() -> proto::DynPeer {
proto::DynPeer::Server
}
fn convert_send_message( fn convert_send_message(
id: StreamId, id: StreamId,
response: Self::Send, response: Self::Send,

127
src/share.rs Normal file
View File

@@ -0,0 +1,127 @@
use frame::Reason;
use proto::{self, WindowSize};
use bytes::{Bytes, IntoBuf};
use futures::{self, Poll, Async};
use http::{HeaderMap};
use std::fmt;
#[derive(Debug)]
pub struct SendStream<B: IntoBuf> {
inner: proto::StreamRef<B::Buf>,
}
pub struct RecvStream {
inner: ReleaseCapacity,
}
#[derive(Debug)]
pub struct ReleaseCapacity {
inner: proto::OpaqueStreamRef,
}
// ===== impl Stream =====
impl<B: IntoBuf> SendStream<B> {
pub(crate) fn new(inner: proto::StreamRef<B::Buf>) -> Self {
SendStream { inner }
}
/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: usize) {
// TODO: Check for overflow
self.inner.reserve_capacity(capacity as WindowSize)
}
/// Returns the stream's current send capacity.
pub fn capacity(&self) -> usize {
self.inner.capacity() as usize
}
/// Request to be notified when the stream's capacity increases
pub fn poll_capacity(&mut self) -> Poll<Option<usize>, ::Error> {
let res = try_ready!(self.inner.poll_capacity());
Ok(Async::Ready(res.map(|v| v as usize)))
}
/// Send a single data frame
pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ::Error> {
self.inner
.send_data(data.into_buf(), end_of_stream)
.map_err(Into::into)
}
/// Send trailers
pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ::Error> {
self.inner.send_trailers(trailers).map_err(Into::into)
}
/// Reset the stream
pub fn send_reset(&mut self, reason: Reason) {
self.inner.send_reset(reason)
}
}
// ===== impl Body =====
impl RecvStream {
pub(crate) fn new(inner: ReleaseCapacity) -> Self {
RecvStream { inner }
}
// TODO: Rename to "is_end_stream"
pub fn is_empty(&self) -> bool {
// If the recv side is closed and the receive queue is empty, the body is empty.
self.inner.inner.body_is_empty()
}
pub fn release_capacity(&mut self) -> &mut ReleaseCapacity {
&mut self.inner
}
/// Poll trailers
///
/// This function **must** not be called until `Body::poll` returns `None`.
pub fn poll_trailers(&mut self) -> Poll<Option<HeaderMap>, ::Error> {
self.inner.inner.poll_trailers().map_err(Into::into)
}
}
impl futures::Stream for RecvStream {
type Item = Bytes;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.inner.inner.poll_data().map_err(Into::into)
}
}
impl fmt::Debug for RecvStream {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("RecvStream")
.field("inner", &self.inner)
.finish()
}
}
// ===== impl ReleaseCapacity =====
impl ReleaseCapacity {
pub(crate) fn new(inner: proto::OpaqueStreamRef) -> Self {
ReleaseCapacity { inner }
}
pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> {
self.inner
.release_capacity(sz as proto::WindowSize)
.map_err(Into::into)
}
}
impl Clone for ReleaseCapacity {
fn clone(&self) -> Self {
let inner = self.inner.clone();
ReleaseCapacity { inner }
}
}

View File

@@ -1,4 +1,4 @@
use h2::client; use h2;
use super::string::{String, TryFrom}; use super::string::{String, TryFrom};
use bytes::Bytes; use bytes::Bytes;
@@ -8,7 +8,7 @@ pub fn byte_str(s: &str) -> String<Bytes> {
String::try_from(Bytes::from(s)).unwrap() String::try_from(Bytes::from(s)).unwrap()
} }
pub fn wait_for_capacity(stream: client::Stream<Bytes>, target: usize) -> WaitForCapacity { pub fn wait_for_capacity(stream: h2::SendStream<Bytes>, target: usize) -> WaitForCapacity {
WaitForCapacity { WaitForCapacity {
stream: Some(stream), stream: Some(stream),
target: target, target: target,
@@ -16,18 +16,18 @@ pub fn wait_for_capacity(stream: client::Stream<Bytes>, target: usize) -> WaitFo
} }
pub struct WaitForCapacity { pub struct WaitForCapacity {
stream: Option<client::Stream<Bytes>>, stream: Option<h2::SendStream<Bytes>>,
target: usize, target: usize,
} }
impl WaitForCapacity { impl WaitForCapacity {
fn stream(&mut self) -> &mut client::Stream<Bytes> { fn stream(&mut self) -> &mut h2::SendStream<Bytes> {
self.stream.as_mut().unwrap() self.stream.as_mut().unwrap()
} }
} }
impl Future for WaitForCapacity { impl Future for WaitForCapacity {
type Item = client::Stream<Bytes>; type Item = h2::SendStream<Bytes>;
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Self::Item, ()> { fn poll(&mut self) -> Poll<Self::Item, ()> {