diff --git a/examples/client.rs b/examples/client.rs index 2a4c6c0..1ae8053 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,4 +1,3 @@ -extern crate bytes; extern crate env_logger; extern crate futures; extern crate h2; @@ -6,9 +5,9 @@ extern crate http; extern crate io_dump; extern crate tokio_core; -use h2::client::{Body, Client}; +use h2::client::{Client}; +use h2::RecvStream; -use bytes::*; use futures::*; use http::*; @@ -16,7 +15,7 @@ use tokio_core::net::TcpStream; use tokio_core::reactor; struct Process { - body: Body, + body: RecvStream, trailers: bool, } diff --git a/examples/server-tr.rs b/examples/server-tr.rs index 56a472f..ec53324 100644 --- a/examples/server-tr.rs +++ b/examples/server-tr.rs @@ -32,17 +32,21 @@ pub fn main() { .and_then(|conn| { println!("H2 connection bound"); - conn.for_each(|(request, mut stream)| { + conn.for_each(|(request, mut respond)| { println!("GOT request: {:?}", request); let response = Response::builder().status(StatusCode::OK).body(()).unwrap(); - if let Err(e) = stream.send_response(response, false) { - println!(" error responding; err={:?}", e); - } + let mut send = match respond.send_response(response, false) { + Ok(send) => send, + Err(e) => { + println!(" error respond; err={:?}", e); + return Ok(()); + } + }; println!(">>>> sending data"); - if let Err(e) = stream.send_data(Bytes::from_static(b"hello world"), false) { + if let Err(e) = send.send_data(Bytes::from_static(b"hello world"), false) { println!(" -> err={:?}", e); } @@ -50,7 +54,7 @@ pub fn main() { hdrs.insert("status", "ok".parse().unwrap()); println!(">>>> sending trailers"); - if let Err(e) = stream.send_trailers(hdrs) { + if let Err(e) = send.send_trailers(hdrs) { println!(" -> err={:?}", e); } diff --git a/examples/server.rs b/examples/server.rs index d49fada..5cdc3d0 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -31,18 +31,21 @@ pub fn main() { .and_then(|conn| { println!("H2 connection bound"); - conn.for_each(|(request, mut stream)| { + conn.for_each(|(request, mut respond)| { println!("GOT request: {:?}", request); - let response = Response::builder().status(StatusCode::OK).body(()).unwrap(); - if let Err(e) = stream.send_response(response, false) { - println!(" error responding; err={:?}", e); - } + let mut send = match respond.send_response(response, false) { + Ok(send) => send, + Err(e) => { + println!(" error respond; err={:?}", e); + return Ok(()); + } + }; println!(">>>> sending data"); - if let Err(e) = stream.send_data(Bytes::from_static(b"hello world"), true) { + if let Err(e) = send.send_data(Bytes::from_static(b"hello world"), true) { println!(" -> err={:?}", e); } diff --git a/src/client.rs b/src/client.rs index 39b18de..b819e94 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,11 @@ +use {SendStream, RecvStream, ReleaseCapacity}; use codec::{Codec, RecvError}; use frame::{Headers, Pseudo, Reason, Settings, StreamId}; -use proto::{self, WindowSize}; +use proto; use bytes::{Bytes, IntoBuf}; use futures::{Async, Future, MapErr, Poll}; -use http::{HeaderMap, Request, Response}; +use http::{Request, Response}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::WriteAll; @@ -30,22 +31,8 @@ pub struct Connection { } #[derive(Debug)] -pub struct ResponseFuture { - inner: proto::StreamRef, -} - -#[derive(Debug)] -pub struct Stream { - inner: proto::StreamRef, -} - -pub struct Body { - inner: ReleaseCapacity, -} - -#[derive(Debug)] -pub struct ReleaseCapacity { - inner: proto::StreamRef, +pub struct ResponseFuture { + inner: proto::OpaqueStreamRef, } /// Build a Client. @@ -86,6 +73,7 @@ impl Client { impl Client where B: IntoBuf, + B::Buf: 'static, { fn handshake2(io: T, builder: Builder) -> Handshake where @@ -118,7 +106,7 @@ where &mut self, request: Request<()>, end_of_stream: bool, - ) -> Result<(ResponseFuture, Stream), ::Error> { + ) -> Result<(ResponseFuture, SendStream), ::Error> { self.inner .send_request(request, end_of_stream, self.pending.as_ref()) .map_err(Into::into) @@ -128,12 +116,10 @@ where } let response = ResponseFuture { - inner: stream.clone(), + inner: stream.clone_to_opaque(), }; - let stream = Stream { - inner: stream, - }; + let stream = SendStream::new(stream); (response, stream) }) @@ -239,6 +225,7 @@ impl Builder { where T: AsyncRead + AsyncWrite, B: IntoBuf, + B::Buf: 'static, { Client::handshake2(io, self.clone()) } @@ -297,9 +284,11 @@ where // ===== impl Handshake ===== -impl Future for Handshake +impl Future for Handshake where T: AsyncRead + AsyncWrite, + B: IntoBuf, + B::Buf: 'static, { type Item = (Client, Connection); type Error = ::Error; @@ -348,121 +337,29 @@ where // ===== impl ResponseFuture ===== -impl Future for ResponseFuture { - type Item = Response>; +impl Future for ResponseFuture { + type Item = Response; type Error = ::Error; fn poll(&mut self) -> Poll { let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); - - let body = Body { - inner: ReleaseCapacity { inner: self.inner.clone() }, - }; + let body = RecvStream::new(ReleaseCapacity::new(self.inner.clone())); Ok(Response::from_parts(parts, body).into()) } } -// ===== impl Stream ===== - -impl Stream { - /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: usize) { - // TODO: Check for overflow - self.inner.reserve_capacity(capacity as WindowSize) - } - - /// Returns the stream's current send capacity. - pub fn capacity(&self) -> usize { - self.inner.capacity() as usize - } - - /// Request to be notified when the stream's capacity increases - pub fn poll_capacity(&mut self) -> Poll, ::Error> { - let res = try_ready!(self.inner.poll_capacity()); - Ok(Async::Ready(res.map(|v| v as usize))) - } - - /// Send data - pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ::Error> { - self.inner - .send_data(data.into_buf(), end_of_stream) - .map_err(Into::into) - } - - /// Send trailers - pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ::Error> { - self.inner.send_trailers(trailers).map_err(Into::into) - } - - pub fn send_reset(mut self, reason: Reason) { - self.inner.send_reset(reason) - } -} - -// ===== impl Body ===== - -impl Body { - pub fn is_empty(&self) -> bool { - // If the recv side is closed and the receive queue is empty, the body is empty. - self.inner.inner.body_is_empty() - } - - pub fn release_capacity(&mut self) -> &mut ReleaseCapacity { - &mut self.inner - } - - /// Poll trailers - /// - /// This function **must** not be called until `Body::poll` returns `None`. - pub fn poll_trailers(&mut self) -> Poll, ::Error> { - self.inner.inner.poll_trailers().map_err(Into::into) - } -} - -impl ::futures::Stream for Body { - type Item = Bytes; - type Error = ::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.inner.poll_data().map_err(Into::into) - } -} - -impl fmt::Debug for Body -where B: fmt::Debug, - B::Buf: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Body") - .field("inner", &self.inner) - .finish() - } -} - -// ===== impl ReleaseCapacity ===== - -impl ReleaseCapacity { - pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { - self.inner - .release_capacity(sz as proto::WindowSize) - .map_err(Into::into) - } -} - -impl Clone for ReleaseCapacity { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - ReleaseCapacity { inner } - } -} - // ===== impl Peer ===== impl proto::Peer for Peer { type Send = Request<()>; type Poll = Response<()>; + + fn dyn() -> proto::DynPeer { + proto::DynPeer::Client + } + fn is_server() -> bool { false } diff --git a/src/lib.rs b/src/lib.rs index 08eee52..3629b5b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,26 +7,20 @@ extern crate futures; extern crate tokio_io; // HTTP types - extern crate http; // Buffer utilities - extern crate bytes; // Hash function used for HPACK encoding and tracking stream states. - extern crate fnv; extern crate byteorder; - extern crate slab; #[macro_use] extern crate log; - extern crate string; - extern crate ordermap; mod error; @@ -42,8 +36,10 @@ pub mod frame; pub mod client; pub mod server; +mod share; pub use error::{Error, Reason}; +pub use share::{SendStream, RecvStream, ReleaseCapacity}; #[cfg(feature = "unstable")] pub use codec::{Codec, RecvError, SendError, UserError}; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 985eb7a..11cfb04 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -301,7 +301,7 @@ where T: AsyncRead + AsyncWrite, B: IntoBuf, { - pub fn next_incoming(&mut self) -> Option> { + pub fn next_incoming(&mut self) -> Option> { self.streams.next_incoming() } } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index d946144..d2611b7 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -7,9 +7,10 @@ mod streams; pub(crate) use self::connection::Connection; pub(crate) use self::error::Error; -pub(crate) use self::peer::Peer; -pub(crate) use self::streams::{Key as StreamKey, StreamRef, Streams}; +pub(crate) use self::peer::{Peer, Dyn as DynPeer}; +pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams}; pub(crate) use self::streams::Prioritized; + use codec::Codec; use self::ping_pong::PingPong; diff --git a/src/proto/peer.rs b/src/proto/peer.rs index ff5073c..7f1c2c9 100644 --- a/src/proto/peer.rs +++ b/src/proto/peer.rs @@ -1,6 +1,9 @@ use codec::RecvError; +use error::Reason; use frame::{Headers, StreamId}; +use http::{Request, Response}; + use std::fmt; /// Either a Client or a Server @@ -11,6 +14,8 @@ pub trait Peer { /// Message type polled from the transport type Poll: fmt::Debug; + fn dyn() -> Dyn; + fn is_server() -> bool; fn convert_send_message(id: StreamId, headers: Self::Send, end_of_stream: bool) -> Headers; @@ -22,3 +27,57 @@ pub trait Peer { Self::is_server() == id.is_server_initiated() } } + +/// A dynamic representation of `Peer`. +/// +/// This is used internally to avoid incurring a generic on all internal types. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum Dyn { + Client, + Server, +} + +#[derive(Debug)] +pub enum PollMessage { + Client(Response<()>), + Server(Request<()>), +} + +// ===== impl Dyn ===== + +impl Dyn { + pub fn is_server(&self) -> bool { + *self == Dyn::Server + } + + pub fn is_local_init(&self, id: StreamId) -> bool { + assert!(!id.is_zero()); + self.is_server() == id.is_server_initiated() + } + + pub fn convert_poll_message(&self, headers: Headers) -> Result { + if self.is_server() { + ::server::Peer::convert_poll_message(headers) + .map(PollMessage::Server) + } else { + ::client::Peer::convert_poll_message(headers) + .map(PollMessage::Client) + } + } + + /// Returns true if the remote peer can initiate a stream with the given ID. + pub fn ensure_can_open(&self, id: StreamId) -> Result<(), RecvError> { + if !self.is_server() { + // Remote is a server and cannot open streams. PushPromise is + // registered by reserving, so does not go through this path. + return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + } + + // Ensure that the ID is a valid server initiated ID + if !id.is_client_initiated() { + return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + } + + Ok(()) + } +} diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 8b31a4d..4a42d95 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -1,7 +1,5 @@ use slab::Slab; -use std::marker::PhantomData; - /// Buffers frames for multiple streams. #[derive(Debug)] pub struct Buffer { @@ -10,9 +8,8 @@ pub struct Buffer { /// A sequence of frames in a `Buffer` #[derive(Debug)] -pub struct Deque { +pub struct Deque { indices: Option, - _p: PhantomData, } /// Tracks the head & tail for a sequence of frames in a `Buffer`. @@ -36,11 +33,10 @@ impl Buffer { } } -impl Deque { +impl Deque { pub fn new() -> Self { Deque { indices: None, - _p: PhantomData, } } @@ -48,7 +44,7 @@ impl Deque { self.indices.is_none() } - pub fn push_back(&mut self, buf: &mut Buffer, value: T) { + pub fn push_back(&mut self, buf: &mut Buffer, value: T) { let key = buf.slab.insert(Slot { value, next: None, @@ -68,7 +64,7 @@ impl Deque { } } - pub fn push_front(&mut self, buf: &mut Buffer, value: T) { + pub fn push_front(&mut self, buf: &mut Buffer, value: T) { let key = buf.slab.insert(Slot { value, next: None, @@ -88,7 +84,7 @@ impl Deque { } } - pub fn pop_front(&mut self, buf: &mut Buffer) -> Option { + pub fn pop_front(&mut self, buf: &mut Buffer) -> Option { match self.indices { Some(mut idxs) => { let mut slot = buf.slab.remove(idxs.head); @@ -107,7 +103,7 @@ impl Deque { } } - pub fn peek_front<'a>(&self, buf: &'a Buffer) -> Option<&'a T> { + pub fn peek_front<'a, T>(&self, buf: &'a Buffer) -> Option<&'a T> { match self.indices { Some(idxs) => Some(&buf.slab[idxs.head].value), None => None, diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index cd7d5b9..5f8fcde 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -1,13 +1,13 @@ use super::*; -use std::marker::PhantomData; use std::usize; #[derive(Debug)] -pub(super) struct Counts

-where - P: Peer, -{ +pub(super) struct Counts { + /// Acting as a client or server. This allows us to track which values to + /// inc / dec. + peer: peer::Dyn, + /// Maximum number of locally initiated streams max_send_streams: usize, @@ -19,25 +19,25 @@ where /// Current number of locally initiated streams num_recv_streams: usize, - - _p: PhantomData

, } -impl

Counts

-where - P: Peer, -{ +impl Counts { /// Create a new `Counts` using the provided configuration values. - pub fn new(config: &Config) -> Self { + pub fn new(peer: peer::Dyn, config: &Config) -> Self { Counts { + peer, max_send_streams: config.local_max_initiated.unwrap_or(usize::MAX), num_send_streams: 0, max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), num_recv_streams: 0, - _p: PhantomData, } } + /// Returns the current peer + pub fn peer(&self) -> peer::Dyn { + self.peer + } + /// Returns true if the receive stream concurrency can be incremented pub fn can_inc_num_recv_streams(&self) -> bool { self.max_recv_streams > self.num_recv_streams @@ -82,9 +82,9 @@ where /// /// If the stream state transitions to closed, this function will perform /// all necessary cleanup. - pub fn transition(&mut self, mut stream: store::Ptr, f: F) -> U + pub fn transition(&mut self, mut stream: store::Ptr, f: F) -> U where - F: FnOnce(&mut Self, &mut store::Ptr) -> U, + F: FnOnce(&mut Self, &mut store::Ptr) -> U, { let is_counted = stream.is_counted(); @@ -97,7 +97,7 @@ where } // TODO: move this to macro? - pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool) { + pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool) { if stream.is_closed() { stream.unlink(); @@ -114,7 +114,7 @@ where } fn dec_num_streams(&mut self, id: StreamId) { - if P::is_local_init(id) { + if self.peer.is_local_init(id) { self.num_send_streams -= 1; } else { self.num_recv_streams -= 1; diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 9a5a2c1..db33f6d 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -11,7 +11,7 @@ mod streams; pub(crate) use self::prioritize::Prioritized; pub(crate) use self::store::Key; -pub(crate) use self::streams::{StreamRef, Streams}; +pub(crate) use self::streams::{StreamRef, OpaqueStreamRef, Streams}; use self::buffer::Buffer; use self::counts::Counts; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index c1d42e1..0814a1e 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -12,24 +12,18 @@ use std::{cmp, fmt}; use std::io; #[derive(Debug)] -pub(super) struct Prioritize -where - P: Peer, -{ +pub(super) struct Prioritize { /// Queue of streams waiting for socket capacity to send a frame - pending_send: store::Queue, + pending_send: store::Queue, /// Queue of streams waiting for window capacity to produce data. - pending_capacity: store::Queue, + pending_capacity: store::Queue, /// Streams waiting for capacity due to max concurrency - pending_open: store::Queue, + pending_open: store::Queue, /// Connection level flow control governing sent data flow: FlowControl, - - /// Holds frames that are waiting to be written to the socket - buffer: Buffer>, } pub(crate) struct Prioritized { @@ -44,11 +38,8 @@ pub(crate) struct Prioritized { // ===== impl Prioritize ===== -impl Prioritize -where - P: Peer, -{ - pub fn new(config: &Config) -> Prioritize { +impl Prioritize { + pub fn new(config: &Config) -> Prioritize { let mut flow = FlowControl::new(); flow.inc_window(config.remote_init_window_sz) @@ -64,20 +55,23 @@ where pending_capacity: store::Queue::new(), pending_open: store::Queue::new(), flow: flow, - buffer: Buffer::new(), } } /// Queue a frame to be sent to the remote - pub fn queue_frame( + pub fn queue_frame( &mut self, frame: Frame, - stream: &mut store::Ptr, + buffer: &mut Buffer>, + stream: &mut store::Ptr, task: &mut Option, ) { // Queue the frame in the buffer - stream.pending_send.push_back(&mut self.buffer, frame); + stream.pending_send.push_back(buffer, frame); + self.schedule_send(stream, task); + } + pub fn schedule_send(&mut self, stream: &mut store::Ptr, task: &mut Option) { // If the stream is waiting to be opened, nothing more to do. if !stream.is_pending_open { // Queue the stream @@ -90,15 +84,16 @@ where } } - pub fn queue_open(&mut self, stream: &mut store::Ptr) { + pub fn queue_open(&mut self, stream: &mut store::Ptr) { self.pending_open.push(stream); } /// Send a data frame - pub fn send_data( + pub fn send_data( &mut self, frame: frame::Data, - stream: &mut store::Ptr, + buffer: &mut Buffer>, + stream: &mut store::Ptr, task: &mut Option, ) -> Result<(), UserError> where @@ -153,21 +148,21 @@ where if stream.send_flow.available() >= stream.buffered_send_data { // The stream currently has capacity to send the data frame, so // queue it up and notify the connection task. - self.queue_frame(frame.into(), stream, task); + self.queue_frame(frame.into(), buffer, stream, task); } else { // The stream has no capacity to send the frame now, save it but // don't notify the conneciton task. Once additional capacity // becomes available, the frame will be flushed. stream .pending_send - .push_back(&mut self.buffer, frame.into()); + .push_back(buffer, frame.into()); } Ok(()) } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { + pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { trace!( "reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}", stream.id, @@ -212,7 +207,7 @@ where pub fn recv_stream_window_update( &mut self, inc: WindowSize, - stream: &mut store::Ptr, + stream: &mut store::Ptr, ) -> Result<(), Reason> { trace!( "recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}", @@ -235,7 +230,7 @@ where pub fn recv_connection_window_update( &mut self, inc: WindowSize, - store: &mut Store, + store: &mut Store, ) -> Result<(), Reason> { // Update the connection's window self.flow.inc_window(inc)?; @@ -246,7 +241,7 @@ where pub fn assign_connection_capacity(&mut self, inc: WindowSize, store: &mut R) where - R: Resolve, + R: Resolve, { trace!("assign_connection_capacity; inc={}", inc); @@ -267,7 +262,7 @@ where } /// Request capacity to send data - fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { + fn try_assign_capacity(&mut self, stream: &mut store::Ptr) { let total_requested = stream.requested_send_capacity; // Total requested should never go below actual assigned @@ -366,10 +361,11 @@ where } } - pub fn poll_complete( + pub fn poll_complete( &mut self, - store: &mut Store, - counts: &mut Counts

, + buffer: &mut Buffer>, + store: &mut Store, + counts: &mut Counts, dst: &mut Codec>, ) -> Poll<(), io::Error> where @@ -380,7 +376,7 @@ where try_ready!(dst.poll_ready()); // Reclaim any frame that has previously been written - self.reclaim_frame(store, dst); + self.reclaim_frame(buffer, store, dst); // The max frame length let max_frame_len = dst.max_send_frame_size(); @@ -389,7 +385,8 @@ where loop { self.schedule_pending_open(store, counts); - match self.pop_frame(store, max_frame_len, counts) { + + match self.pop_frame(buffer, store, max_frame_len, counts) { Some(frame) => { trace!("writing frame={:?}", frame); @@ -399,14 +396,14 @@ where try_ready!(dst.poll_ready()); // Because, always try to reclaim... - self.reclaim_frame(store, dst); + self.reclaim_frame(buffer, store, dst); }, None => { // Try to flush the codec. try_ready!(dst.flush()); // This might release a data frame... - if !self.reclaim_frame(store, dst) { + if !self.reclaim_frame(buffer, store, dst) { return Ok(().into()); } @@ -424,9 +421,10 @@ where /// When a data frame is written to the codec, it may not be written in its /// entirety (large chunks are split up into potentially many data frames). /// In this case, the stream needs to be reprioritized. - fn reclaim_frame( + fn reclaim_frame( &mut self, - store: &mut Store, + buffer: &mut Buffer>, + store: &mut Store, dst: &mut Codec>, ) -> bool where @@ -458,7 +456,7 @@ where frame.set_end_stream(true); } - self.push_back_frame(frame.into(), &mut stream); + self.push_back_frame(frame.into(), buffer, &mut stream); return true; } @@ -469,9 +467,13 @@ where /// Push the frame to the front of the stream's deque, scheduling the /// steream if needed. - fn push_back_frame(&mut self, frame: Frame, stream: &mut store::Ptr) { + fn push_back_frame(&mut self, + frame: Frame, + buffer: &mut Buffer>, + stream: &mut store::Ptr) + { // Push the frame to the front of the stream's deque - stream.pending_send.push_front(&mut self.buffer, frame); + stream.pending_send.push_front(buffer, frame); // If needed, schedule the sender if stream.send_flow.available() > 0 { @@ -480,20 +482,21 @@ where } } - pub fn clear_queue(&mut self, stream: &mut store::Ptr) { + pub fn clear_queue(&mut self, buffer: &mut Buffer>, stream: &mut store::Ptr) { trace!("clear_queue; stream-id={:?}", stream.id); // TODO: make this more efficient? - while let Some(frame) = stream.pending_send.pop_front(&mut self.buffer) { + while let Some(frame) = stream.pending_send.pop_front(buffer) { trace!("dropping; frame={:?}", frame); } } - fn pop_frame( + fn pop_frame( &mut self, - store: &mut Store, + buffer: &mut Buffer>, + store: &mut Store, max_len: usize, - counts: &mut Counts

, + counts: &mut Counts, ) -> Option>> where B: Buf, @@ -508,8 +511,8 @@ where let is_counted = stream.is_counted(); - let frame = match stream.pending_send.pop_front(&mut self.buffer).unwrap() { - Frame::Data(mut frame) => { + let frame = match stream.pending_send.pop_front(buffer) { + Some(Frame::Data(mut frame)) => { // Get the amount of capacity remaining for stream's // window. let stream_capacity = stream.send_flow.available(); @@ -546,7 +549,8 @@ where // frame and wait for a window update... stream .pending_send - .push_front(&mut self.buffer, frame.into()); + .push_front(buffer, frame.into()); + continue; } @@ -597,12 +601,19 @@ where } })) }, - frame => frame.map(|_| unreachable!()), + Some(frame) => frame.map(|_| unreachable!()), + None => { + assert!(stream.state.is_canceled()); + stream.state.set_reset(Reason::CANCEL); + + let frame = frame::Reset::new(stream.id, Reason::CANCEL); + Frame::Reset(frame) + } }; trace!("pop_frame; frame={:?}", frame); - if !stream.pending_send.is_empty() { + if !stream.pending_send.is_empty() || stream.state.is_canceled() { // TODO: Only requeue the sender IF it is ready to send // the next frame. i.e. don't requeue it if the next // frame is a data frame and the stream does not have @@ -619,7 +630,7 @@ where } } - fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts

) { + fn schedule_pending_open(&mut self, store: &mut Store, counts: &mut Counts) { trace!("schedule_pending_open"); // check for any pending open streams while counts.can_inc_num_send_streams() { diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index ebb2d88..5d3e24b 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,5 +1,5 @@ use super::*; -use {client, frame, proto, server}; +use {frame, proto}; use codec::{RecvError, UserError}; use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; use proto::*; @@ -7,13 +7,9 @@ use proto::*; use http::HeaderMap; use std::io; -use std::marker::PhantomData; #[derive(Debug)] -pub(super) struct Recv -where - P: Peer, -{ +pub(super) struct Recv { /// Initial window size of remote initiated streams init_window_sz: WindowSize, @@ -30,26 +26,24 @@ where last_processed_id: StreamId, /// Streams that have pending window updates - pending_window_updates: store::Queue, + pending_window_updates: store::Queue, /// New streams to be accepted - pending_accept: store::Queue, + pending_accept: store::Queue, /// Holds frames that are waiting to be read - buffer: Buffer>, + buffer: Buffer, /// Refused StreamId, this represents a frame that must be sent out. refused: Option, /// If push promises are allowed to be recevied. is_push_enabled: bool, - - _p: PhantomData, } #[derive(Debug)] -pub(super) enum Event { - Headers(T), +pub(super) enum Event { + Headers(peer::PollMessage), Data(Bytes), Trailers(HeaderMap), } @@ -60,12 +54,9 @@ struct Indices { tail: store::Key, } -impl Recv -where - P: Peer, -{ - pub fn new(config: &Config) -> Self { - let next_stream_id = if P::is_server() { 1 } else { 2 }; +impl Recv { + pub fn new(peer: peer::Dyn, config: &Config) -> Self { + let next_stream_id = if peer.is_server() { 1 } else { 2 }; let mut flow = FlowControl::new(); @@ -86,7 +77,6 @@ where buffer: Buffer::new(), refused: None, is_push_enabled: config.local_push_enabled, - _p: PhantomData, } } @@ -106,11 +96,11 @@ where pub fn open( &mut self, id: StreamId, - counts: &mut Counts

, + counts: &mut Counts, ) -> Result, RecvError> { assert!(self.refused.is_none()); - self.ensure_can_open(id)?; + counts.peer().ensure_can_open(id)?; let next_id = self.next_stream_id()?; if id < next_id { @@ -133,8 +123,8 @@ where pub fn recv_headers( &mut self, frame: frame::Headers, - stream: &mut store::Ptr, - counts: &mut Counts

, + stream: &mut store::Ptr, + counts: &mut Counts, ) -> Result<(), RecvError> { trace!("opening stream; init_window={}", self.init_window_sz); let is_initial = stream.state.recv_open(frame.is_end_stream())?; @@ -168,7 +158,7 @@ where } } - let message = P::convert_poll_message(frame)?; + let message = counts.peer().convert_poll_message(frame)?; // Push the frame onto the stream's recv buffer stream @@ -178,18 +168,53 @@ where // Only servers can receive a headers frame that initiates the stream. // This is verified in `Streams` before calling this function. - if P::is_server() { + if counts.peer().is_server() { self.pending_accept.push(stream); } Ok(()) } + /// Called by the server to get the request + /// + /// TODO: Should this fn return `Result`? + pub fn take_request(&mut self, stream: &mut store::Ptr) + -> Request<()> + { + use super::peer::PollMessage::*; + + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Headers(Server(request))) => request, + _ => panic!(), + } + } + + /// Called by the client to get the response + pub fn poll_response( + &mut self, + stream: &mut store::Ptr, + ) -> Poll, proto::Error> { + use super::peer::PollMessage::*; + + // If the buffer is not empty, then the first frame must be a HEADERS + // frame or the user violated the contract. + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Event::Headers(Client(response))) => Ok(response.into()), + Some(_) => panic!("poll_response called after response returned"), + None => { + stream.state.ensure_recv_open()?; + + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + }, + } + } + /// Transition the stream based on receiving trailers pub fn recv_trailers( &mut self, frame: frame::Headers, - stream: &mut store::Ptr, + stream: &mut store::Ptr, ) -> Result<(), RecvError> { // Transition the state stream.state.recv_close()?; @@ -216,7 +241,7 @@ where pub fn release_capacity( &mut self, capacity: WindowSize, - stream: &mut store::Ptr, + stream: &mut store::Ptr, task: &mut Option, ) -> Result<(), UserError> { trace!("release_capacity; size={}", capacity); @@ -293,7 +318,7 @@ where } } - pub fn body_is_empty(&self, stream: &store::Ptr) -> bool { + pub fn body_is_empty(&self, stream: &store::Ptr) -> bool { if !stream.state.is_recv_closed() { return false; } @@ -308,7 +333,7 @@ where pub fn recv_data( &mut self, frame: frame::Data, - stream: &mut store::Ptr, + stream: &mut store::Ptr, ) -> Result<(), RecvError> { let sz = frame.payload().len(); @@ -379,9 +404,9 @@ where pub fn recv_push_promise( &mut self, frame: frame::PushPromise, - send: &Send, + send: &Send, stream: store::Key, - store: &mut Store, + store: &mut Store, ) -> Result<(), RecvError> { // First, make sure that the values are legit self.ensure_can_reserve(frame.promised_id())?; @@ -443,7 +468,7 @@ where pub fn recv_reset( &mut self, frame: frame::Reset, - stream: &mut Stream, + stream: &mut Stream, ) -> Result<(), RecvError> { let err = proto::Error::Proto(frame.reason()); @@ -454,7 +479,7 @@ where } /// Handle a received error - pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) { + pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) { // Receive an error stream.state.recv_err(err); @@ -462,22 +487,6 @@ where stream.notify_recv(); } - /// Returns true if the remote peer can initiate a stream with the given ID. - fn ensure_can_open(&self, id: StreamId) -> Result<(), RecvError> { - if !P::is_server() { - // Remote is a server and cannot open streams. PushPromise is - // registered by reserving, so does not go through this path. - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); - } - - // Ensure that the ID is a valid server initiated ID - if !id.is_client_initiated() { - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); - } - - Ok(()) - } - fn next_stream_id(&self) -> Result { if let Ok(id) = self.next_stream_id { Ok(id) @@ -487,14 +496,9 @@ where } /// Returns true if the remote peer can reserve a stream with the given ID. - fn ensure_can_reserve(&self, promised_id: StreamId) -> Result<(), RecvError> { - // TODO: Are there other rules? - if P::is_server() { - // The remote is a client and cannot reserve - trace!("recv_push_promise; error remote is client"); - return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); - } - + fn ensure_can_reserve(&self, promised_id: StreamId) + -> Result<(), RecvError> + { if !promised_id.is_server_initiated() { trace!( "recv_push_promise; error promised id is invalid {:?}", @@ -512,7 +516,7 @@ where } /// Send any pending refusals. - pub fn send_pending_refusal( + pub fn send_pending_refusal( &mut self, dst: &mut Codec>, ) -> Poll<(), io::Error> @@ -537,9 +541,9 @@ where Ok(Async::Ready(())) } - pub fn poll_complete( + pub fn poll_complete( &mut self, - store: &mut Store, + store: &mut Store, dst: &mut Codec>, ) -> Poll<(), io::Error> where @@ -556,7 +560,7 @@ where } /// Send connection level window update - fn send_connection_window_update( + fn send_connection_window_update( &mut self, dst: &mut Codec>, ) -> Poll<(), io::Error> @@ -587,9 +591,9 @@ where /// Send stream level window update - pub fn send_stream_window_updates( + pub fn send_stream_window_updates( &mut self, - store: &mut Store, + store: &mut Store, dst: &mut Codec>, ) -> Poll<(), io::Error> where @@ -632,11 +636,11 @@ where } } - pub fn next_incoming(&mut self, store: &mut Store) -> Option { + pub fn next_incoming(&mut self, store: &mut Store) -> Option { self.pending_accept.pop(store).map(|ptr| ptr.key()) } - pub fn poll_data(&mut self, stream: &mut Stream) -> Poll, proto::Error> { + pub fn poll_data(&mut self, stream: &mut Stream) -> Poll, proto::Error> { // TODO: Return error when the stream is reset match stream.pending_recv.pop_front(&mut self.buffer) { Some(Event::Data(payload)) => Ok(Some(payload).into()), @@ -653,7 +657,7 @@ where pub fn poll_trailers( &mut self, - stream: &mut Stream, + stream: &mut Stream, ) -> Poll, proto::Error> { match stream.pending_recv.pop_front(&mut self.buffer) { Some(Event::Trailers(trailers)) => Ok(Some(trailers).into()), @@ -667,7 +671,7 @@ where } } - fn schedule_recv(&mut self, stream: &mut Stream) -> Poll, proto::Error> { + fn schedule_recv(&mut self, stream: &mut Stream) -> Poll, proto::Error> { if stream.state.ensure_recv_open()? { // Request to get notified once more frames arrive stream.recv_task = Some(task::current()); @@ -679,45 +683,9 @@ where } } -impl Recv -where - B: Buf, -{ - /// TODO: Should this fn return `Result`? - pub fn take_request(&mut self, stream: &mut store::Ptr) -> Request<()> { - match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Headers(request)) => request, - _ => panic!(), - } - } -} - -impl Recv -where - B: Buf, -{ - pub fn poll_response( - &mut self, - stream: &mut store::Ptr, - ) -> Poll, proto::Error> { - // If the buffer is not empty, then the first frame must be a HEADERS - // frame or the user violated the contract. - match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Headers(response)) => Ok(response.into()), - Some(_) => panic!("poll_response called after response returned"), - None => { - stream.state.ensure_recv_open()?; - - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) - }, - } - } -} - // ===== impl Event ===== -impl Event { +impl Event { fn is_data(&self) -> bool { match *self { Event::Data(..) => true, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 9ed6b40..027d20c 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -10,10 +10,7 @@ use std::{cmp, io}; /// Manages state transitions related to outbound frames. #[derive(Debug)] -pub(super) struct Send -where - P: Peer, -{ +pub(super) struct Send { /// Stream identifier to use for next initialized stream. next_stream_id: Result, @@ -21,13 +18,10 @@ where init_window_sz: WindowSize, /// Prioritization layer - prioritize: Prioritize, + prioritize: Prioritize, } -impl Send -where - P: Peer, -{ +impl Send { /// Create a new `Send` pub fn new(config: &Config) -> Self { Send { @@ -43,16 +37,17 @@ where } pub fn open(&mut self) -> Result { - let stream_id = self.try_open()?; + let stream_id = self.ensure_next_stream_id()?; self.next_stream_id = stream_id.next_id(); Ok(stream_id) } - pub fn send_headers( + pub fn send_headers( &mut self, frame: frame::Headers, - stream: &mut store::Ptr, - counts: &mut Counts

, + buffer: &mut Buffer>, + stream: &mut store::Ptr, + counts: &mut Counts, task: &mut Option, ) -> Result<(), UserError> { trace!( @@ -66,7 +61,7 @@ where // Update the state stream.state.send_open(end_stream)?; - if P::is_local_init(frame.stream_id()) { + if counts.peer().is_local_init(frame.stream_id()) { if counts.can_inc_num_send_streams() { counts.inc_num_send_streams(); } else { @@ -75,42 +70,42 @@ where } // Queue the frame for sending - self.prioritize.queue_frame(frame.into(), stream, task); + self.prioritize.queue_frame(frame.into(), buffer, stream, task); Ok(()) } - /// Send an RST_STREAM frame + /// Send an explicit RST_STREAM frame /// /// # Arguments /// + `reason`: the error code for the RST_STREAM frame /// + `clear_queue`: if true, all pending outbound frames will be cleared, /// if false, the RST_STREAM frame will be appended to the end of the /// send queue. - pub fn send_reset( + pub fn send_reset( &mut self, reason: Reason, - stream: &mut store::Ptr, + buffer: &mut Buffer>, + stream: &mut store::Ptr, task: &mut Option, - clear_queue: bool, ) { let is_reset = stream.state.is_reset(); let is_closed = stream.state.is_closed(); let is_empty = stream.pending_send.is_empty(); + trace!( "send_reset(..., reason={:?}, stream={:?}, ..., \ - clear_queue={:?});\n\ is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \ state={:?} \ ", stream.id, reason, - clear_queue, is_reset, is_closed, is_empty, stream.state ); + if is_reset { // Don't double reset trace!( @@ -122,7 +117,7 @@ where // If closed AND the send queue is flushed, then the stream cannot be // reset explicitly, either. Implicit resets can still be queued. - if is_closed && (is_empty || !clear_queue) { + if is_closed && is_empty { trace!( " -> not sending explicit RST_STREAM ({:?} was closed \ and send queue was flushed)", @@ -134,41 +129,43 @@ where // Transition the state stream.state.set_reset(reason); - // TODO: this could be a call to `recv_err`, but that will always - // clear the send queue. could we pass whether or not to clear - // the send queue to that method? - if clear_queue { - // Clear all pending outbound frames - self.prioritize.clear_queue(stream); - } - - // Reclaim all capacity assigned to the stream and re-assign it to the - // connection - let available = stream.send_flow.available().as_size(); - stream.send_flow.claim_capacity(available); + self.recv_err(buffer, stream); let frame = frame::Reset::new(stream.id, reason); trace!("send_reset -- queueing; frame={:?}", frame); - self.prioritize.queue_frame(frame.into(), stream, task); + self.prioritize.queue_frame(frame.into(), buffer, stream, task); } - pub fn send_data( + pub fn schedule_cancel(&mut self, stream: &mut store::Ptr, task: &mut Option) { + if stream.state.is_closed() { + // Stream is already closed, nothing more to do + return; + } + + stream.state.set_canceled(); + + self.reclaim_capacity(stream); + self.prioritize.schedule_send(stream, task); + } + + pub fn send_data( &mut self, frame: frame::Data, - stream: &mut store::Ptr, + buffer: &mut Buffer>, + stream: &mut store::Ptr, task: &mut Option, ) -> Result<(), UserError> - where - B: Buf, + where B: Buf, { - self.prioritize.send_data(frame, stream, task) + self.prioritize.send_data(frame, buffer, stream, task) } - pub fn send_trailers( + pub fn send_trailers( &mut self, frame: frame::Headers, - stream: &mut store::Ptr, + buffer: &mut Buffer>, + stream: &mut store::Ptr, task: &mut Option, ) -> Result<(), UserError> { // TODO: Should this logic be moved into state.rs? @@ -179,7 +176,7 @@ where stream.state.send_close(); trace!("send_trailers -- queuing; frame={:?}", frame); - self.prioritize.queue_frame(frame.into(), stream, task); + self.prioritize.queue_frame(frame.into(), buffer, stream, task); // Release any excess capacity self.prioritize.reserve_capacity(0, stream); @@ -187,27 +184,27 @@ where Ok(()) } - pub fn poll_complete( + pub fn poll_complete( &mut self, - store: &mut Store, - counts: &mut Counts

, + buffer: &mut Buffer>, + store: &mut Store, + counts: &mut Counts, dst: &mut Codec>, ) -> Poll<(), io::Error> - where - T: AsyncWrite, - B: Buf, + where T: AsyncWrite, + B: Buf, { - self.prioritize.poll_complete(store, counts, dst) + self.prioritize.poll_complete(buffer, store, counts, dst) } /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { + pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) { self.prioritize.reserve_capacity(capacity, stream) } pub fn poll_capacity( &mut self, - stream: &mut store::Ptr, + stream: &mut store::Ptr, ) -> Poll, UserError> { if !stream.state.is_send_streaming() { return Ok(Async::Ready(None)); @@ -224,7 +221,7 @@ where } /// Current available stream send capacity - pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { + pub fn capacity(&self, stream: &mut store::Ptr) -> WindowSize { let available = stream.send_flow.available().as_size(); let buffered = stream.buffered_send_data; @@ -238,21 +235,25 @@ where pub fn recv_connection_window_update( &mut self, frame: frame::WindowUpdate, - store: &mut Store, + store: &mut Store, ) -> Result<(), Reason> { self.prioritize .recv_connection_window_update(frame.size_increment(), store) } - pub fn recv_stream_window_update( + pub fn recv_stream_window_update( &mut self, sz: WindowSize, - stream: &mut store::Ptr, + buffer: &mut Buffer>, + stream: &mut store::Ptr, task: &mut Option, ) -> Result<(), Reason> { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { debug!("recv_stream_window_update !!; err={:?}", e); - self.send_reset(Reason::FLOW_CONTROL_ERROR.into(), stream, task, true); + + self.send_reset( + Reason::FLOW_CONTROL_ERROR.into(), + buffer, stream, task); return Err(e); } @@ -260,10 +261,17 @@ where Ok(()) } - pub fn recv_err(&mut self, stream: &mut store::Ptr) { + pub fn recv_err( + &mut self, + buffer: &mut Buffer>, + stream: &mut store::Ptr + ) { // Clear all pending outbound frames - self.prioritize.clear_queue(stream); + self.prioritize.clear_queue(buffer, stream); + self.reclaim_capacity(stream); + } + fn reclaim_capacity(&mut self, stream: &mut store::Ptr) { // Reclaim all capacity assigned to the stream and re-assign it to the // connection let available = stream.send_flow.available().as_size(); @@ -273,10 +281,11 @@ where .assign_connection_capacity(available, stream); } - pub fn apply_remote_settings( + pub fn apply_remote_settings( &mut self, settings: &frame::Settings, - store: &mut Store, + buffer: &mut Buffer>, + store: &mut Store, task: &mut Option, ) -> Result<(), RecvError> { // Applies an update to the remote endpoint's initial window size. @@ -336,7 +345,7 @@ where let inc = val - old_val; store.for_each(|mut stream| { - self.recv_stream_window_update(inc, &mut stream, task) + self.recv_stream_window_update(inc, buffer, &mut stream, task) .map_err(RecvError::Connection) })?; } @@ -359,14 +368,4 @@ where pub fn ensure_next_stream_id(&self) -> Result { self.next_stream_id.map_err(|_| OverflowedStreamId) } - - /// Returns a new StreamId if the local actor can initiate a new stream. - fn try_open(&self) -> Result { - if P::is_server() { - // Servers cannot open streams. PushPromise must first be reserved. - return Err(UnexpectedFrameType); - } - - self.ensure_next_stream_id() - } } diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index adfd7a6..d72dc84 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -74,6 +74,11 @@ enum Peer { enum Cause { Proto(Reason), Io, + + /// The user droped all handles to the stream without explicitly canceling. + /// This indicates to the connection that a reset frame must be sent out + /// once the send queue has been flushed. + Canceled, } impl State { @@ -238,10 +243,24 @@ impl State { /// Set the stream state to reset pub fn set_reset(&mut self, reason: Reason) { - debug_assert!(!self.is_reset()); self.inner = Closed(Some(Cause::Proto(reason))); } + /// Set the stream state to canceled + pub fn set_canceled(&mut self) { + debug_assert!(!self.is_closed()); + self.inner = Closed(Some(Cause::Canceled)); + } + + pub fn is_canceled(&self) -> bool { + use self::Cause::Canceled; + + match self.inner { + Closed(Some(Canceled)) => true, + _ => false, + } + } + /// Returns true if the stream is already reset. pub fn is_reset(&self) -> bool { match self.inner { @@ -318,6 +337,7 @@ impl State { // TODO: Is this correct? match self.inner { Closed(Some(Cause::Proto(reason))) => Err(proto::Error::Proto(reason)), + Closed(Some(Cause::Canceled)) => Err(proto::Error::Proto(Reason::CANCEL)), Closed(Some(Cause::Io)) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), Closed(None) | HalfClosedRemote(..) => Ok(false), _ => Ok(true), diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 1a87f0f..0cad36a 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -9,22 +9,16 @@ use std::ops; /// Storage for streams #[derive(Debug)] -pub(super) struct Store -where - P: Peer, -{ - slab: slab::Slab<(StoreId, Stream)>, +pub(super) struct Store { + slab: slab::Slab<(StoreId, Stream)>, ids: OrderMap, counter: StoreId, } /// "Pointer" to an entry in the store -pub(super) struct Ptr<'a, B: 'a, P> -where - P: Peer + 'a, -{ +pub(super) struct Ptr<'a> { key: Key, - store: &'a mut Store, + store: &'a mut Store, } /// References an entry in the store. @@ -37,24 +31,21 @@ pub(crate) struct Key { type StoreId = usize; #[derive(Debug)] -pub(super) struct Queue -where - P: Peer, -{ +pub(super) struct Queue { indices: Option, - _p: PhantomData<(B, N, P)>, + _p: PhantomData, } pub(super) trait Next { - fn next(stream: &Stream) -> Option; + fn next(stream: &Stream) -> Option; - fn set_next(stream: &mut Stream, key: Option); + fn set_next(stream: &mut Stream, key: Option); - fn take_next(stream: &mut Stream) -> Option; + fn take_next(stream: &mut Stream) -> Option; - fn is_queued(stream: &Stream) -> bool; + fn is_queued(stream: &Stream) -> bool; - fn set_queued(stream: &mut Stream, val: bool); + fn set_queued(stream: &mut Stream, val: bool); } /// A linked list @@ -64,37 +55,28 @@ struct Indices { pub tail: Key, } -pub(super) enum Entry<'a, B: 'a, P: Peer + 'a> { +pub(super) enum Entry<'a> { Occupied(OccupiedEntry<'a>), - Vacant(VacantEntry<'a, B, P>), + Vacant(VacantEntry<'a>), } pub(super) struct OccupiedEntry<'a> { ids: ordermap::OccupiedEntry<'a, StreamId, (usize, StoreId)>, } -pub(super) struct VacantEntry<'a, B: 'a, P> -where - P: Peer + 'a, -{ +pub(super) struct VacantEntry<'a> { ids: ordermap::VacantEntry<'a, StreamId, (usize, StoreId)>, - slab: &'a mut slab::Slab<(StoreId, Stream)>, + slab: &'a mut slab::Slab<(StoreId, Stream)>, counter: &'a mut usize, } -pub(super) trait Resolve -where - P: Peer, -{ - fn resolve(&mut self, key: Key) -> Ptr; +pub(super) trait Resolve { + fn resolve(&mut self, key: Key) -> Ptr; } // ===== impl Store ===== -impl Store -where - P: Peer, -{ +impl Store { pub fn new() -> Self { Store { slab: slab::Slab::new(), @@ -103,7 +85,7 @@ where } } - pub fn find_mut(&mut self, id: &StreamId) -> Option> { + pub fn find_mut(&mut self, id: &StreamId) -> Option { let key = match self.ids.get(id) { Some(key) => *key, None => return None, @@ -118,7 +100,7 @@ where }) } - pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { + pub fn insert(&mut self, id: StreamId, val: Stream) -> Ptr { let store_id = self.counter; self.counter = self.counter.wrapping_add(1); let key = self.slab.insert((store_id, val)); @@ -133,7 +115,7 @@ where } } - pub fn find_entry(&mut self, id: StreamId) -> Entry { + pub fn find_entry(&mut self, id: StreamId) -> Entry { use self::ordermap::Entry::*; match self.ids.entry(id) { @@ -150,7 +132,7 @@ where pub fn for_each(&mut self, mut f: F) -> Result<(), E> where - F: FnMut(Ptr) -> Result<(), E>, + F: FnMut(Ptr) -> Result<(), E>, { let mut len = self.ids.len(); let mut i = 0; @@ -182,11 +164,8 @@ where } } -impl Resolve for Store -where - P: Peer, -{ - fn resolve(&mut self, key: Key) -> Ptr { +impl Resolve for Store { + fn resolve(&mut self, key: Key) -> Ptr { Ptr { key: key, store: self, @@ -194,11 +173,8 @@ where } } -impl ops::Index for Store -where - P: Peer, -{ - type Output = Stream; +impl ops::Index for Store { + type Output = Stream; fn index(&self, key: Key) -> &Self::Output { let slot = self.slab.index(key.index); @@ -207,10 +183,7 @@ where } } -impl ops::IndexMut for Store -where - P: Peer, -{ +impl ops::IndexMut for Store { fn index_mut(&mut self, key: Key) -> &mut Self::Output { let slot = self.slab.index_mut(key.index); assert_eq!(slot.0, key.store_id); @@ -218,10 +191,7 @@ where } } -impl Store -where - P: Peer, -{ +impl Store { pub fn num_active_streams(&self) -> usize { self.ids.len() } @@ -234,10 +204,9 @@ where // ===== impl Queue ===== -impl Queue +impl Queue where N: Next, - P: Peer, { pub fn new() -> Self { Queue { @@ -260,7 +229,7 @@ where /// Queue the stream. /// /// If the stream is already contained by the list, return `false`. - pub fn push(&mut self, stream: &mut store::Ptr) -> bool { + pub fn push(&mut self, stream: &mut store::Ptr) -> bool { trace!("Queue::push"); if N::is_queued(stream) { @@ -297,9 +266,9 @@ where true } - pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option> + pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option> where - R: Resolve, + R: Resolve, { if let Some(mut idxs) = self.indices { let mut stream = store.resolve(idxs.head); @@ -324,10 +293,7 @@ where // ===== impl Ptr ===== -impl<'a, B: 'a, P> Ptr<'a, B, P> -where - P: Peer, -{ +impl<'a> Ptr<'a> { /// Returns the Key associated with the stream pub fn key(&self) -> Key { self.key @@ -352,11 +318,8 @@ where } } -impl<'a, B: 'a, P> Resolve for Ptr<'a, B, P> -where - P: Peer, -{ - fn resolve(&mut self, key: Key) -> Ptr { +impl<'a> Resolve for Ptr<'a> { + fn resolve(&mut self, key: Key) -> Ptr { Ptr { key: key, store: &mut *self.store, @@ -364,22 +327,16 @@ where } } -impl<'a, B: 'a, P> ops::Deref for Ptr<'a, B, P> -where - P: Peer, -{ - type Target = Stream; +impl<'a> ops::Deref for Ptr<'a> { + type Target = Stream; - fn deref(&self) -> &Stream { + fn deref(&self) -> &Stream { &self.store.slab[self.key.index].1 } } -impl<'a, B: 'a, P> ops::DerefMut for Ptr<'a, B, P> -where - P: Peer, -{ - fn deref_mut(&mut self) -> &mut Stream { +impl<'a> ops::DerefMut for Ptr<'a> { + fn deref_mut(&mut self) -> &mut Stream { &mut self.store.slab[self.key.index].1 } } @@ -398,11 +355,8 @@ impl<'a> OccupiedEntry<'a> { // ===== impl VacantEntry ===== -impl<'a, B, P> VacantEntry<'a, B, P> -where - P: Peer, -{ - pub fn insert(self, value: Stream) -> Key { +impl<'a> VacantEntry<'a> { + pub fn insert(self, value: Stream) -> Key { // Insert the value in the slab let store_id = *self.counter; *self.counter = store_id.wrapping_add(1); diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 9e4b304..4a57f21 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -14,10 +14,7 @@ use std::usize; /// (such as an accept queue), this is **not** tracked by a reference count. /// Thus, `ref_count` can be zero and the stream still has to be kept around. #[derive(Debug)] -pub(super) struct Stream -where - P: Peer, -{ +pub(super) struct Stream { /// The h2 stream identifier pub id: StreamId, @@ -48,7 +45,7 @@ where pub send_task: Option, /// Frames pending for this stream being sent to the socket - pub pending_send: buffer::Deque>, + pub pending_send: buffer::Deque, /// Next node in the linked list of streams waiting for additional /// connection level capacity. @@ -88,13 +85,13 @@ where pub is_pending_window_update: bool, /// Frames pending for this stream to read - pub pending_recv: buffer::Deque>, + pub pending_recv: buffer::Deque, /// Task tracking receiving frames pub recv_task: Option, /// The stream's pending push promises - pub pending_push_promises: store::Queue, + pub pending_push_promises: store::Queue, /// Validate content-length headers pub content_length: ContentLength, @@ -123,15 +120,12 @@ pub(super) struct NextWindowUpdate; #[derive(Debug)] pub(super) struct NextOpen; -impl Stream -where - P: Peer, -{ +impl Stream { pub fn new( id: StreamId, init_send_window: WindowSize, init_recv_window: WindowSize, - ) -> Stream { + ) -> Stream { let mut send_flow = FlowControl::new(); let mut recv_flow = FlowControl::new(); @@ -224,6 +218,15 @@ where !self.is_pending_accept && !self.is_pending_window_update } + /// Returns true when the consumer of the stream has dropped all handles + /// (indicating no further interest in the stream) and the stream state is + /// not actually closed. + /// + /// In this case, a reset should be sent. + pub fn is_canceled_interest(&self) -> bool { + self.ref_count == 0 && !self.state.is_recv_closed() + } + pub fn assign_capacity(&mut self, capacity: WindowSize) { debug_assert!(capacity > 0); self.send_capacity_inc = true; @@ -279,111 +282,111 @@ where } impl store::Next for NextAccept { - fn next(stream: &Stream) -> Option { + fn next(stream: &Stream) -> Option { stream.next_pending_accept } - fn set_next(stream: &mut Stream, key: Option) { + fn set_next(stream: &mut Stream, key: Option) { stream.next_pending_accept = key; } - fn take_next(stream: &mut Stream) -> Option { + fn take_next(stream: &mut Stream) -> Option { stream.next_pending_accept.take() } - fn is_queued(stream: &Stream) -> bool { + fn is_queued(stream: &Stream) -> bool { stream.is_pending_accept } - fn set_queued(stream: &mut Stream, val: bool) { + fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_accept = val; } } impl store::Next for NextSend { - fn next(stream: &Stream) -> Option { + fn next(stream: &Stream) -> Option { stream.next_pending_send } - fn set_next(stream: &mut Stream, key: Option) { + fn set_next(stream: &mut Stream, key: Option) { stream.next_pending_send = key; } - fn take_next(stream: &mut Stream) -> Option { + fn take_next(stream: &mut Stream) -> Option { stream.next_pending_send.take() } - fn is_queued(stream: &Stream) -> bool { + fn is_queued(stream: &Stream) -> bool { stream.is_pending_send } - fn set_queued(stream: &mut Stream, val: bool) { + fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_send = val; } } impl store::Next for NextSendCapacity { - fn next(stream: &Stream) -> Option { + fn next(stream: &Stream) -> Option { stream.next_pending_send_capacity } - fn set_next(stream: &mut Stream, key: Option) { + fn set_next(stream: &mut Stream, key: Option) { stream.next_pending_send_capacity = key; } - fn take_next(stream: &mut Stream) -> Option { + fn take_next(stream: &mut Stream) -> Option { stream.next_pending_send_capacity.take() } - fn is_queued(stream: &Stream) -> bool { + fn is_queued(stream: &Stream) -> bool { stream.is_pending_send_capacity } - fn set_queued(stream: &mut Stream, val: bool) { + fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_send_capacity = val; } } impl store::Next for NextWindowUpdate { - fn next(stream: &Stream) -> Option { + fn next(stream: &Stream) -> Option { stream.next_window_update } - fn set_next(stream: &mut Stream, key: Option) { + fn set_next(stream: &mut Stream, key: Option) { stream.next_window_update = key; } - fn take_next(stream: &mut Stream) -> Option { + fn take_next(stream: &mut Stream) -> Option { stream.next_window_update.take() } - fn is_queued(stream: &Stream) -> bool { + fn is_queued(stream: &Stream) -> bool { stream.is_pending_window_update } - fn set_queued(stream: &mut Stream, val: bool) { + fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_window_update = val; } } impl store::Next for NextOpen { - fn next(stream: &Stream) -> Option { + fn next(stream: &Stream) -> Option { stream.next_open } - fn set_next(stream: &mut Stream, key: Option) { + fn set_next(stream: &mut Stream, key: Option) { stream.next_open = key; } - fn take_next(stream: &mut Stream) -> Option { + fn take_next(stream: &mut Stream) -> Option { stream.next_open.take() } - fn is_queued(stream: &Stream) -> bool { + fn is_queued(stream: &Stream) -> bool { stream.is_pending_open } - fn set_queued(stream: &mut Stream, val: bool) { + fn set_queued(stream: &mut Stream, val: bool) { stream.is_pending_open = val; } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 31e1697..cc926fc 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -15,15 +15,33 @@ pub(crate) struct Streams where P: Peer, { - inner: Arc>>, + /// Holds most of the connection and stream related state for processing + /// HTTP/2.0 frames associated with streams. + inner: Arc>, + + /// This is the queue of frames to be written to the wire. This is split out + /// to avoid requiring a `B` generic on all public API types even if `B` is + /// not technically required. + /// + /// Currently, splitting this out requires a second `Arc` + `Mutex`. + /// However, it should be possible to avoid this duplication with a little + /// bit of unsafe code. This optimization has been postponed until it has + /// been shown to be necessary. + send_buffer: Arc>, + + _p: ::std::marker::PhantomData

, } /// Reference to the stream state -pub(crate) struct StreamRef -where - P: Peer, -{ - inner: Arc>>, +#[derive(Debug)] +pub(crate) struct StreamRef { + opaque: OpaqueStreamRef, + send_buffer: Arc>, +} + +/// Reference to the stream state that hides the send data chunk generic +pub(crate) struct OpaqueStreamRef { + inner: Arc>, key: store::Key, } @@ -32,26 +50,24 @@ where /// /// TODO: better name #[derive(Debug)] -struct Inner -where - P: Peer, -{ +struct Inner { /// Tracks send & recv stream concurrency. - counts: Counts

, - actions: Actions, - store: Store, + counts: Counts, + + /// Connection level state and performs actions on streams + actions: Actions, + + /// Stores stream state + store: Store, } #[derive(Debug)] -struct Actions -where - P: Peer, -{ +struct Actions { /// Manages state transitions initiated by receiving frames - recv: Recv, + recv: Recv, /// Manages state transitions initiated by sending frames - send: Send, + send: Send, /// Task that calls `poll_complete`. task: Option, @@ -60,23 +76,35 @@ where conn_error: Option, } +/// Contains the buffer of frames to be written to the wire. +#[derive(Debug)] +struct SendBuffer { + inner: Mutex>>, +} + +// ===== impl Streams ===== + impl Streams where B: Buf, P: Peer, { pub fn new(config: Config) -> Self { + let peer = P::dyn(); + Streams { inner: Arc::new(Mutex::new(Inner { - counts: Counts::new(&config), + counts: Counts::new(peer, &config), actions: Actions { - recv: Recv::new(&config), + recv: Recv::new(peer, &config), send: Send::new(&config), task: None, conn_error: None, }, store: Store::new(), })), + send_buffer: Arc::new(SendBuffer::new()), + _p: ::std::marker::PhantomData, } } @@ -113,6 +141,8 @@ where let stream = me.store.resolve(key); let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; me.counts.transition(stream, |counts, stream| { trace!( @@ -132,7 +162,7 @@ where actions.recv.recv_trailers(frame, stream) }; - actions.reset_on_recv_stream_err(stream, res) + actions.reset_on_recv_stream_err(send_buffer, stream, res) }) } @@ -148,10 +178,12 @@ where }; let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; me.counts.transition(stream, |_, stream| { let res = actions.recv.recv_data(frame, stream); - actions.reset_on_recv_stream_err(stream, res) + actions.reset_on_recv_stream_err(send_buffer, stream, res) }) } @@ -170,7 +202,7 @@ where None => { // TODO: Are there other error cases? me.actions - .ensure_not_idle(id) + .ensure_not_idle(me.counts.peer(), id) .map_err(RecvError::Connection)?; return Ok(()); @@ -193,6 +225,8 @@ where let actions = &mut me.actions; let counts = &mut me.counts; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; let last_processed_id = actions.recv.last_processed_id(); @@ -200,7 +234,7 @@ where .for_each(|stream| { counts.transition(stream, |_, stream| { actions.recv.recv_err(err, &mut *stream); - actions.send.recv_err(stream); + actions.send.recv_err(send_buffer, stream); Ok::<_, ()>(()) }) }) @@ -217,6 +251,8 @@ where let actions = &mut me.actions; let counts = &mut me.counts; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; let last_stream_id = frame.last_stream_id(); let err = frame.reason().into(); @@ -225,7 +261,7 @@ where .for_each(|stream| if stream.id > last_stream_id { counts.transition(stream, |_, stream| { actions.recv.recv_err(&err, &mut *stream); - actions.send.recv_err(stream); + actions.send.recv_err(send_buffer, stream); Ok::<_, ()>(()) }) } else { @@ -245,6 +281,9 @@ where let mut me = self.inner.lock().unwrap(); let me = &mut *me; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + if id.is_zero() { me.actions .send @@ -259,6 +298,7 @@ where // the error is informational. let _ = me.actions.send.recv_stream_window_update( frame.size_increment(), + send_buffer, &mut stream, &mut me.actions.task, ); @@ -284,12 +324,19 @@ where None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)), }; - me.actions - .recv - .recv_push_promise(frame, &me.actions.send, stream, &mut me.store) + if me.counts.peer().is_server() { + // The remote is a client and cannot reserve + trace!("recv_push_promise; error remote is client"); + return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + } + + me.actions.recv.recv_push_promise(frame, + &me.actions.send, + stream, + &mut me.store) } - pub fn next_incoming(&mut self) -> Option> { + pub fn next_incoming(&mut self) -> Option> { let key = { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -308,8 +355,11 @@ where key.map(|key| { StreamRef { - inner: self.inner.clone(), - key, + opaque: OpaqueStreamRef { + inner: self.inner.clone(), + key, + }, + send_buffer: self.send_buffer.clone(), } }) } @@ -333,6 +383,9 @@ where let mut me = self.inner.lock().unwrap(); let me = &mut *me; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + // Send WINDOW_UPDATE frames first // // TODO: It would probably be better to interleave updates w/ data @@ -341,6 +394,7 @@ where // Send any other pending frames try_ready!(me.actions.send.poll_complete( + send_buffer, &mut me.store, &mut me.counts, dst @@ -356,11 +410,13 @@ where let mut me = self.inner.lock().unwrap(); let me = &mut *me; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + me.counts.apply_remote_settings(frame); - me.actions - .send - .apply_remote_settings(frame, &mut me.store, &mut me.actions.task) + me.actions.send.apply_remote_settings( + frame, send_buffer, &mut me.store, &mut me.actions.task) } pub fn send_request( @@ -368,7 +424,7 @@ where request: Request<()>, end_of_stream: bool, pending: Option<&store::Key>, - ) -> Result, SendError> { + ) -> Result, SendError> { use super::stream::ContentLength; use http::Method; @@ -381,6 +437,9 @@ where let mut me = self.inner.lock().unwrap(); let me = &mut *me; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; + me.actions.ensure_no_conn_error()?; me.actions.send.ensure_next_stream_id()?; @@ -396,6 +455,11 @@ where } } + if me.counts.peer().is_server() { + // Servers cannot open streams. PushPromise must first be reserved. + return Err(UserError::UnexpectedFrameType.into()); + } + let stream_id = me.actions.send.open()?; let mut stream = Stream::new( @@ -415,6 +479,7 @@ where me.actions.send.send_headers( headers, + send_buffer, &mut stream, &mut me.counts, &mut me.actions.task, @@ -431,8 +496,11 @@ where }; Ok(StreamRef { - inner: self.inner.clone(), - key: key, + opaque: OpaqueStreamRef { + inner: self.inner.clone(), + key: key, + }, + send_buffer: self.send_buffer.clone(), }) } @@ -454,11 +522,12 @@ where let stream = me.store.resolve(key); let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; me.counts.transition(stream, |_, stream| { - actions - .send - .send_reset(reason, stream, &mut actions.task, true) + actions.send.send_reset( + reason, send_buffer, stream, &mut actions.task) }) } } @@ -510,25 +579,26 @@ where fn clone(&self) -> Self { Streams { inner: self.inner.clone(), + send_buffer: self.send_buffer.clone(), + _p: ::std::marker::PhantomData, } } } // ===== impl StreamRef ===== -impl StreamRef -where - P: Peer, -{ +impl StreamRef { pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError> where B: Buf, { - let mut me = self.inner.lock().unwrap(); + let mut me = self.opaque.inner.lock().unwrap(); let me = &mut *me; - let stream = me.store.resolve(self.key); + let stream = me.store.resolve(self.opaque.key); let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; me.counts.transition(stream, |_, stream| { // Create the data frame @@ -536,37 +606,41 @@ where frame.set_end_stream(end_stream); // Send the data frame - actions.send.send_data(frame, stream, &mut actions.task) + actions.send.send_data(frame, send_buffer, stream, &mut actions.task) }) } pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> { - let mut me = self.inner.lock().unwrap(); + let mut me = self.opaque.inner.lock().unwrap(); let me = &mut *me; - let stream = me.store.resolve(self.key); + let stream = me.store.resolve(self.opaque.key); let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; me.counts.transition(stream, |_, stream| { // Create the trailers frame let frame = frame::Headers::trailers(stream.id, trailers); // Send the trailers frame - actions.send.send_trailers(frame, stream, &mut actions.task) + actions.send.send_trailers( + frame, send_buffer, stream, &mut actions.task) }) } pub fn send_reset(&mut self, reason: Reason) { - let mut me = self.inner.lock().unwrap(); + let mut me = self.opaque.inner.lock().unwrap(); let me = &mut *me; - let stream = me.store.resolve(self.key); + let stream = me.store.resolve(self.opaque.key); let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; me.counts.transition(stream, |_, stream| { - actions - .send - .send_reset(reason, stream, &mut actions.task, true) + actions.send.send_reset( + reason, send_buffer, stream, &mut actions.task) }) } @@ -575,25 +649,107 @@ where response: Response<()>, end_of_stream: bool, ) -> Result<(), UserError> { - let mut me = self.inner.lock().unwrap(); + let mut me = self.opaque.inner.lock().unwrap(); let me = &mut *me; - let stream = me.store.resolve(self.key); + let stream = me.store.resolve(self.opaque.key); let actions = &mut me.actions; + let mut send_buffer = self.send_buffer.inner.lock().unwrap(); + let send_buffer = &mut *send_buffer; me.counts.transition(stream, |counts, stream| { let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream); - actions - .send - .send_headers(frame, stream, counts, &mut actions.task) + actions.send.send_headers( + frame, send_buffer, stream, counts, &mut actions.task) }) } - pub fn body_is_empty(&self) -> bool - where - B: Buf, + /// Called by the server after the stream is accepted. Given that clients + /// initialize streams by sending HEADERS, the request will always be + /// available. + /// + /// # Panics + /// + /// This function panics if the request isn't present. + pub fn take_request(&self) -> Request<()> { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.opaque.key); + me.actions.recv.take_request(&mut stream) + } + + /// Called by a client to see if the current stream is pending open + pub fn is_pending_open(&self) -> bool { + let mut me = self.opaque.inner.lock().unwrap(); + me.store.resolve(self.opaque.key).is_pending_open + } + + /// Request capacity to send data + pub fn reserve_capacity(&mut self, capacity: WindowSize) { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.opaque.key); + + me.actions.send.reserve_capacity(capacity, &mut stream) + } + + /// Returns the stream's current send capacity. + pub fn capacity(&self) -> WindowSize { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.opaque.key); + + me.actions.send.capacity(&mut stream) + } + + /// Request to be notified when the stream's capacity increases + pub fn poll_capacity(&mut self) -> Poll, UserError> { + let mut me = self.opaque.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.opaque.key); + + me.actions.send.poll_capacity(&mut stream) + } + + pub(crate) fn key(&self) -> store::Key { + self.opaque.key + } + + pub fn clone_to_opaque(&self) -> OpaqueStreamRef + where B: 'static, { + self.opaque.clone() + } +} + +impl Clone for StreamRef { + fn clone(&self) -> Self { + StreamRef { + opaque: self.opaque.clone(), + send_buffer: self.send_buffer.clone(), + } + } +} + +// ===== impl OpaqueStreamRef ===== + +impl OpaqueStreamRef { + /// Called by a client to check for a received response. + pub fn poll_response(&mut self) -> Poll, proto::Error> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_response(&mut stream) + } + + pub fn body_is_empty(&self) -> bool { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -602,10 +758,7 @@ where me.actions.recv.body_is_empty(&stream) } - pub fn poll_data(&mut self) -> Poll, proto::Error> - where - B: Buf, - { + pub fn poll_data(&mut self) -> Poll, proto::Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -614,10 +767,7 @@ where me.actions.recv.poll_data(&mut stream) } - pub fn poll_trailers(&mut self) -> Poll, proto::Error> - where - B: Buf, - { + pub fn poll_trailers(&mut self) -> Poll, proto::Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -628,10 +778,7 @@ where /// Releases recv capacity back to the peer. This may result in sending /// WINDOW_UPDATE frames on both the stream and connection. - pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> - where - B: Buf, - { + pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -641,171 +788,88 @@ where .recv .release_capacity(capacity, &mut stream, &mut me.actions.task) } - - /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: WindowSize) { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - let mut stream = me.store.resolve(self.key); - - me.actions.send.reserve_capacity(capacity, &mut stream) - } - - /// Returns the stream's current send capacity. - pub fn capacity(&self) -> WindowSize { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - let mut stream = me.store.resolve(self.key); - - me.actions.send.capacity(&mut stream) - } - - /// Request to be notified when the stream's capacity increases - pub fn poll_capacity(&mut self) -> Poll, UserError> { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - let mut stream = me.store.resolve(self.key); - - me.actions.send.poll_capacity(&mut stream) - } - - pub(crate) fn key(&self) -> store::Key { - self.key - } } -impl StreamRef -where - B: Buf, -{ - /// Called by the server after the stream is accepted. Given that clients - /// initialize streams by sending HEADERS, the request will always be - /// available. - /// - /// # Panics - /// - /// This function panics if the request isn't present. - pub fn take_request(&self) -> Request<()> { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - let mut stream = me.store.resolve(self.key); - me.actions.recv.take_request(&mut stream) - } -} - -impl StreamRef -where - B: Buf, -{ - pub fn poll_response(&mut self) -> Poll, proto::Error> { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - let mut stream = me.store.resolve(self.key); - - me.actions.recv.poll_response(&mut stream) - } - - - pub fn is_pending_open(&self) -> bool { - let mut me = self.inner.lock().unwrap(); - me.store.resolve(self.key).is_pending_open - } -} - -impl Clone for StreamRef -where - P: Peer, -{ - fn clone(&self) -> Self { - // Increment the ref count - self.inner.lock().unwrap().store.resolve(self.key).ref_inc(); - - StreamRef { - inner: self.inner.clone(), - key: self.key.clone(), - } - } -} - -impl fmt::Debug for StreamRef -where - P: Peer, -{ +impl fmt::Debug for OpaqueStreamRef { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { match self.inner.lock() { Ok(me) => { let stream = &me.store[self.key]; - fmt.debug_struct("StreamRef") + fmt.debug_struct("OpaqueStreamRef") .field("stream_id", &stream.id) .field("ref_count", &stream.ref_count) .finish() }, - Err(_poisoned) => fmt.debug_struct("StreamRef") + Err(_poisoned) => fmt.debug_struct("OpaqueStreamRef") .field("inner", &"") .finish(), } } } -impl Drop for StreamRef -where - P: Peer, -{ +impl Clone for OpaqueStreamRef { + fn clone(&self) -> Self { + // Increment the ref count + self.inner.lock().unwrap().store.resolve(self.key).ref_inc(); + + OpaqueStreamRef { + inner: self.inner.clone(), + key: self.key.clone(), + } + } +} + +impl Drop for OpaqueStreamRef { fn drop(&mut self) { - trace!("StreamRef::drop({:?})", self); - let mut me = match self.inner.lock() { - Ok(inner) => inner, - Err(_) => if ::std::thread::panicking() { - trace!("StreamRef::drop; mutex poisoned"); - return; - } else { - panic!("StreamRef::drop; mutex poisoned"); - }, - }; + drop_stream_ref(&self.inner, self.key); + } +} - let me = &mut *me; +// TODO: Move back in fn above +fn drop_stream_ref(inner: &Mutex, key: store::Key) { + let mut me = match inner.lock() { + Ok(inner) => inner, + Err(_) => if ::std::thread::panicking() { + trace!("StreamRef::drop; mutex poisoned"); + return; + } else { + panic!("StreamRef::drop; mutex poisoned"); + }, + }; - let mut stream = me.store.resolve(self.key); - // decrement the stream's ref count by 1. - stream.ref_dec(); + let me = &mut *me; - let actions = &mut me.actions; - // the reset must be sent inside a `transition` block. - // `transition_after` will release the stream if it is - // released. - let recv_closed = stream.state.is_recv_closed(); - me.counts.transition(stream, |_, stream| - // if this is the last reference to the stream, reset the stream. - if stream.ref_count == 0 && !recv_closed { - trace!( - " -> last reference to {:?} was dropped, trying to reset", - stream.id, - ); - actions.send.send_reset( - Reason::CANCEL, - stream, - &mut actions.task, - false - ); - }); + let mut stream = me.store.resolve(key); + // decrement the stream's ref count by 1. + stream.ref_dec(); + + let actions = &mut me.actions; + + me.counts.transition(stream, |_, mut stream| { + if stream.is_canceled_interest() { + actions.send.schedule_cancel( + &mut stream, + &mut actions.task); + } + }); +} + +// ===== impl SendBuffer ===== + +impl SendBuffer { + fn new() -> Self { + let inner = Mutex::new(Buffer::new()); + SendBuffer { inner } } } // ===== impl Actions ===== -impl Actions -where - B: Buf, - P: Peer, -{ - fn reset_on_recv_stream_err( +impl Actions { + fn reset_on_recv_stream_err( &mut self, - stream: &mut store::Ptr, + buffer: &mut Buffer>, + stream: &mut store::Ptr, res: Result<(), RecvError>, ) -> Result<(), RecvError> { if let Err(RecvError::Stream { @@ -813,15 +877,15 @@ where }) = res { // Reset the stream. - self.send.send_reset(reason, stream, &mut self.task, true); + self.send.send_reset(reason, buffer, stream, &mut self.task); Ok(()) } else { res } } - fn ensure_not_idle(&mut self, id: StreamId) -> Result<(), Reason> { - if P::is_local_init(id) { + fn ensure_not_idle(&mut self, peer: peer::Dyn, id: StreamId) -> Result<(), Reason> { + if peer.is_local_init(id) { self.send.ensure_not_idle(id) } else { self.recv.ensure_not_idle(id) diff --git a/src/server.rs b/src/server.rs index b554996..73ce40e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,10 +1,11 @@ +use {SendStream, RecvStream, ReleaseCapacity}; use codec::{Codec, RecvError}; use frame::{self, Reason, Settings, StreamId}; -use proto::{self, Connection, WindowSize, Prioritized}; +use proto::{self, Connection, Prioritized}; use bytes::{Buf, Bytes, IntoBuf}; use futures::{self, Async, Future, Poll}; -use http::{HeaderMap, Request, Response}; +use http::{Request, Response}; use tokio_io::{AsyncRead, AsyncWrite}; use std::{convert, fmt, mem}; @@ -27,28 +28,13 @@ pub struct Builder { settings: Settings, } +/// Respond to a request +/// +/// +/// Instances of `Respond` are used to send a respond or reserve push promises. #[derive(Debug)] -pub struct Stream { - inner: proto::StreamRef, -} - -pub struct Body { - inner: ReleaseCapacity, -} - -#[derive(Debug)] -pub struct ReleaseCapacity { - inner: proto::StreamRef, -} - -#[derive(Debug)] -pub struct Send { - src: T, - dst: Option>, - // Pending data - buf: Option, - // True when this is the end of the stream - eos: bool, +pub struct Respond { + inner: proto::StreamRef, } /// Stages of an in-progress handshake. @@ -103,6 +89,7 @@ impl Server where T: AsyncRead + AsyncWrite + 'static, B: IntoBuf + 'static, + B::Buf: 'static, { fn handshake2(io: T, settings: Settings) -> Handshake { // Create the codec. @@ -141,8 +128,9 @@ impl futures::Stream for Server where T: AsyncRead + AsyncWrite + 'static, B: IntoBuf + 'static, + B::Buf: 'static, { - type Item = (Request>, Stream); + type Item = (Request, Respond); type Error = ::Error; fn poll(&mut self) -> Poll, ::Error> { @@ -160,16 +148,12 @@ where if let Some(inner) = self.connection.next_incoming() { trace!("received incoming"); let (head, _) = inner.take_request().into_parts(); - let body = Body { - inner: ReleaseCapacity { inner: inner.clone() }, - }; + let body = RecvStream::new(ReleaseCapacity::new(inner.clone_to_opaque())); let request = Request::from_parts(head, body); - let incoming = Stream { - inner, - }; + let respond = Respond { inner }; - return Ok(Some((request, incoming)).into()); + return Ok(Some((request, respond)).into()); } Ok(Async::NotReady) @@ -229,179 +213,27 @@ impl Builder { } } -// ===== impl Stream ===== +// ===== impl Respond ===== -impl Stream { +impl Respond { /// Send a response pub fn send_response( &mut self, response: Response<()>, end_of_stream: bool, - ) -> Result<(), ::Error> { + ) -> Result, ::Error> { self.inner .send_response(response, end_of_stream) + .map(|_| SendStream::new(self.inner.clone())) .map_err(Into::into) } - /// Request capacity to send data - pub fn reserve_capacity(&mut self, capacity: usize) { - // TODO: Check for overflow - self.inner.reserve_capacity(capacity as WindowSize) - } - - /// Returns the stream's current send capacity. - pub fn capacity(&self) -> usize { - self.inner.capacity() as usize - } - - /// Request to be notified when the stream's capacity increases - pub fn poll_capacity(&mut self) -> Poll, ::Error> { - let res = try_ready!(self.inner.poll_capacity()); - Ok(Async::Ready(res.map(|v| v as usize))) - } - - /// Send a single data frame - pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ::Error> { - self.inner - .send_data(data.into_buf(), end_of_stream) - .map_err(Into::into) - } - - /// Send trailers - pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ::Error> { - self.inner.send_trailers(trailers).map_err(Into::into) - } - - pub fn send_reset(mut self, reason: Reason) { + /// Reset the stream + pub fn send_reset(&mut self, reason: Reason) { self.inner.send_reset(reason) } -} -impl Stream { - /// Send the body - pub fn send(self, src: T, end_of_stream: bool) -> Send - where - T: futures::Stream, - { - Send { - src: src, - dst: Some(self), - buf: None, - eos: end_of_stream, - } - } -} - -// ===== impl Body ===== - -impl Body { - pub fn is_empty(&self) -> bool { - // If the recv side is closed and the receive queue is empty, the body is empty. - self.inner.inner.body_is_empty() - } - - pub fn release_capacity(&mut self) -> &mut ReleaseCapacity { - &mut self.inner - } - - /// Poll trailers - /// - /// This function **must** not be called until `Body::poll` returns `None`. - pub fn poll_trailers(&mut self) -> Poll, ::Error> { - self.inner.inner.poll_trailers().map_err(Into::into) - } -} - -impl futures::Stream for Body { - type Item = Bytes; - type Error = ::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.inner.poll_data().map_err(Into::into) - } -} - - -impl fmt::Debug for Body -where B: fmt::Debug, - B::Buf: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Body") - .field("inner", &self.inner) - .finish() - } -} - -// ===== impl ReleaseCapacity ===== - -impl ReleaseCapacity { - pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { - self.inner - .release_capacity(sz as proto::WindowSize) - .map_err(Into::into) - } -} - -impl Clone for ReleaseCapacity { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - ReleaseCapacity { inner } - } -} - -// ===== impl Send ===== - -impl Future for Send -where - T: futures::Stream, -{ - type Item = Stream; - type Error = ::Error; - - fn poll(&mut self) -> Poll { - loop { - if self.buf.is_none() { - // Get a chunk to send to the H2 stream - self.buf = try_ready!(self.src.poll()); - } - - match self.buf.take() { - Some(mut buf) => { - let dst = self.dst.as_mut().unwrap(); - - // Ask for the amount of capacity needed - dst.reserve_capacity(buf.len()); - - let cap = dst.capacity(); - - if cap == 0 { - self.buf = Some(buf); - // TODO: This seems kind of lame :( - try_ready!(dst.poll_capacity()); - continue; - } - - let chunk = buf.split_to(cap); - - if !buf.is_empty() { - self.buf = Some(buf); - } - - dst.send_data(chunk, false)?; - }, - None => { - // TODO: It would be nice to not have to send an extra - // frame... - if self.eos { - self.dst.as_mut().unwrap().send_data(Bytes::new(), true)?; - } - - return Ok(Async::Ready(self.dst.take().unwrap())); - }, - } - } - } + // TODO: Support reserving push promises. } // ===== impl Flush ===== @@ -546,6 +378,10 @@ impl proto::Peer for Peer { true } + fn dyn() -> proto::DynPeer { + proto::DynPeer::Server + } + fn convert_send_message( id: StreamId, response: Self::Send, diff --git a/src/share.rs b/src/share.rs new file mode 100644 index 0000000..98bf3cb --- /dev/null +++ b/src/share.rs @@ -0,0 +1,127 @@ +use frame::Reason; +use proto::{self, WindowSize}; + +use bytes::{Bytes, IntoBuf}; +use futures::{self, Poll, Async}; +use http::{HeaderMap}; + +use std::fmt; + +#[derive(Debug)] +pub struct SendStream { + inner: proto::StreamRef, +} + +pub struct RecvStream { + inner: ReleaseCapacity, +} + +#[derive(Debug)] +pub struct ReleaseCapacity { + inner: proto::OpaqueStreamRef, +} + +// ===== impl Stream ===== + +impl SendStream { + pub(crate) fn new(inner: proto::StreamRef) -> Self { + SendStream { inner } + } + + /// Request capacity to send data + pub fn reserve_capacity(&mut self, capacity: usize) { + // TODO: Check for overflow + self.inner.reserve_capacity(capacity as WindowSize) + } + + /// Returns the stream's current send capacity. + pub fn capacity(&self) -> usize { + self.inner.capacity() as usize + } + + /// Request to be notified when the stream's capacity increases + pub fn poll_capacity(&mut self) -> Poll, ::Error> { + let res = try_ready!(self.inner.poll_capacity()); + Ok(Async::Ready(res.map(|v| v as usize))) + } + + /// Send a single data frame + pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ::Error> { + self.inner + .send_data(data.into_buf(), end_of_stream) + .map_err(Into::into) + } + + /// Send trailers + pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ::Error> { + self.inner.send_trailers(trailers).map_err(Into::into) + } + + /// Reset the stream + pub fn send_reset(&mut self, reason: Reason) { + self.inner.send_reset(reason) + } +} + +// ===== impl Body ===== + +impl RecvStream { + pub(crate) fn new(inner: ReleaseCapacity) -> Self { + RecvStream { inner } + } + + // TODO: Rename to "is_end_stream" + pub fn is_empty(&self) -> bool { + // If the recv side is closed and the receive queue is empty, the body is empty. + self.inner.inner.body_is_empty() + } + + pub fn release_capacity(&mut self) -> &mut ReleaseCapacity { + &mut self.inner + } + + /// Poll trailers + /// + /// This function **must** not be called until `Body::poll` returns `None`. + pub fn poll_trailers(&mut self) -> Poll, ::Error> { + self.inner.inner.poll_trailers().map_err(Into::into) + } +} + +impl futures::Stream for RecvStream { + type Item = Bytes; + type Error = ::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + self.inner.inner.poll_data().map_err(Into::into) + } +} + +impl fmt::Debug for RecvStream { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("RecvStream") + .field("inner", &self.inner) + .finish() + } +} + +// ===== impl ReleaseCapacity ===== + +impl ReleaseCapacity { + pub(crate) fn new(inner: proto::OpaqueStreamRef) -> Self { + ReleaseCapacity { inner } + } + + pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { + self.inner + .release_capacity(sz as proto::WindowSize) + .map_err(Into::into) + } +} + +impl Clone for ReleaseCapacity { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + ReleaseCapacity { inner } + } +} diff --git a/tests/support/util.rs b/tests/support/util.rs index ddf656c..430dc91 100644 --- a/tests/support/util.rs +++ b/tests/support/util.rs @@ -1,4 +1,4 @@ -use h2::client; +use h2; use super::string::{String, TryFrom}; use bytes::Bytes; @@ -8,7 +8,7 @@ pub fn byte_str(s: &str) -> String { String::try_from(Bytes::from(s)).unwrap() } -pub fn wait_for_capacity(stream: client::Stream, target: usize) -> WaitForCapacity { +pub fn wait_for_capacity(stream: h2::SendStream, target: usize) -> WaitForCapacity { WaitForCapacity { stream: Some(stream), target: target, @@ -16,18 +16,18 @@ pub fn wait_for_capacity(stream: client::Stream, target: usize) -> WaitFo } pub struct WaitForCapacity { - stream: Option>, + stream: Option>, target: usize, } impl WaitForCapacity { - fn stream(&mut self) -> &mut client::Stream { + fn stream(&mut self) -> &mut h2::SendStream { self.stream.as_mut().unwrap() } } impl Future for WaitForCapacity { - type Item = client::Stream; + type Item = h2::SendStream; type Error = (); fn poll(&mut self) -> Poll {