From 26df3a36980ae4a91bafe3fd74e890ed28376b3e Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 8 Aug 2017 22:11:11 -0700 Subject: [PATCH] Remove P generic from type --- src/client.rs | 14 ++-- src/proto/connection.rs | 14 ++-- src/proto/streams/recv.rs | 79 +++++++++++----------- src/proto/streams/send.rs | 22 +++---- src/proto/streams/streams.rs | 124 ++++++++++++++++------------------- src/server.rs | 13 ++-- 6 files changed, 129 insertions(+), 137 deletions(-) diff --git a/src/client.rs b/src/client.rs index c9ab889..ead699f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -15,9 +15,6 @@ pub struct Handshake { inner: Box, Error = ConnectionError>>, } -#[derive(Debug)] -pub(crate) struct Peer; - /// Marker type indicating a client peer pub struct Client { connection: Connection, @@ -25,19 +22,22 @@ pub struct Client { #[derive(Debug)] pub struct Stream { - inner: proto::StreamRef, + inner: proto::StreamRef, } #[derive(Debug)] pub struct Body { - inner: proto::StreamRef, + inner: proto::StreamRef, } #[derive(Debug)] pub struct Chunk { - inner: proto::Chunk, + inner: proto::Chunk, } +#[derive(Debug)] +pub(crate) struct Peer; + impl Client where T: AsyncRead + AsyncWrite + 'static, { @@ -160,7 +160,7 @@ impl Stream { pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> { - self.inner.send_data(data.into_buf(), end_of_stream) + self.inner.send_data::(data.into_buf(), end_of_stream) } /// Send trailers diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 7ae9139..e8284c4 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -19,7 +19,7 @@ pub struct Connection { // TODO: Remove ping_pong: PingPong, settings: Settings, - streams: Streams, + streams: Streams, _phantom: PhantomData

, } @@ -31,7 +31,7 @@ impl Connection { pub fn new(codec: Codec) -> Connection { // TODO: Actually configure - let streams = Streams::new(streams::Config { + let streams = Streams::new::

(streams::Config { max_remote_initiated: None, init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, max_local_initiated: None, @@ -123,7 +123,7 @@ impl Connection Some(Headers(frame)) => { trace!("recv HEADERS; frame={:?}", frame); - if let Some(frame) = try!(self.streams.recv_headers(frame)) { + if let Some(frame) = try!(self.streams.recv_headers::

(frame)) { unimplemented!(); } @@ -138,15 +138,15 @@ impl Connection } Some(Data(frame)) => { trace!("recv DATA; frame={:?}", frame); - try!(self.streams.recv_data(frame)); + try!(self.streams.recv_data::

(frame)); } Some(Reset(frame)) => { trace!("recv RST_STREAM; frame={:?}", frame); - try!(self.streams.recv_reset(frame)); + try!(self.streams.recv_reset::

(frame)); } Some(PushPromise(frame)) => { trace!("recv PUSH_PROMISE; frame={:?}", frame); - self.streams.recv_push_promise(frame)?; + self.streams.recv_push_promise::

(frame)?; } Some(Settings(frame)) => { trace!("recv SETTINGS; frame={:?}", frame); @@ -263,7 +263,7 @@ impl Connection { /// Initialize a new HTTP/2.0 stream and send the message. pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) - -> Result, ConnectionError> + -> Result, ConnectionError> { self.streams.send_request(request, end_of_stream) } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index cfb813f..3980a6c 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -8,7 +8,7 @@ use std::collections::VecDeque; use std::marker::PhantomData; #[derive(Debug)] -pub(super) struct Recv { +pub(super) struct Recv { /// Maximum number of remote initiated streams max_streams: Option, @@ -31,7 +31,7 @@ pub(super) struct Recv { /// Refused StreamId, this represents a frame that must be sent out. refused: Option, - _p: PhantomData<(P, B)>, + _p: PhantomData<(B)>, } #[derive(Debug)] @@ -46,10 +46,7 @@ struct Indices { tail: store::Key, } -impl Recv - where P: Peer, - B: Buf, -{ +impl Recv where B: Buf { pub fn new(config: &Config) -> Self { Recv { max_streams: config.max_remote_initiated, @@ -66,10 +63,12 @@ impl Recv /// Update state reflecting a new, remotely opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self, id: StreamId) -> Result>, ConnectionError> { + pub fn open(&mut self, id: StreamId) + -> Result>, ConnectionError> + { assert!(self.refused.is_none()); - try!(self.ensure_can_open(id)); + try!(self.ensure_can_open::

(id)); if !self.can_inc_num_streams() { self.refused = Some(id); @@ -79,10 +78,30 @@ impl Recv Ok(Some(Stream::new(id))) } + pub fn poll_response(&mut self, stream: &mut store::Ptr) + -> Poll, ConnectionError> { + // If the buffer is not empty, then the first frame must be a HEADERS + // frame or the user violated the contract. + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(Frame::Headers(v)) => { + // TODO: This error should probably be caught on receipt of the + // frame vs. now. + Ok(client::Peer::convert_poll_message(v)?.into()) + } + Some(frame) => unimplemented!(), + None => { + stream.state.ensure_recv_open()?; + + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } + } + } + /// Transition the stream state based on receiving headers - pub fn recv_headers(&mut self, - frame: frame::Headers, - stream: &mut store::Ptr) + pub fn recv_headers(&mut self, + frame: frame::Headers, + stream: &mut store::Ptr) -> Result, ConnectionError> { let is_initial = stream.state.recv_open(self.init_window_sz, frame.is_end_stream())?; @@ -155,11 +174,13 @@ impl Recv Ok(()) } - pub fn recv_push_promise(&mut self, frame: frame::PushPromise, stream: &mut store::Ptr) + pub fn recv_push_promise(&mut self, + frame: frame::PushPromise, + stream: &mut store::Ptr) -> Result<(), ConnectionError> { // First, make sure that the values are legit - self.ensure_can_reserve(frame.promised_id())?; + self.ensure_can_reserve::

(frame.promised_id())?; // Make sure that the stream state is valid stream.state.ensure_recv_open()?; @@ -241,7 +262,9 @@ impl Recv } /// Returns true if the remote peer can initiate a stream with the given ID. - fn ensure_can_open(&self, id: StreamId) -> Result<(), ConnectionError> { + fn ensure_can_open(&self, id: StreamId) + -> Result<(), ConnectionError> + { if !P::is_server() { // Remote is a server and cannot open streams. PushPromise is // registered by reserving, so does not go through this path. @@ -257,7 +280,9 @@ impl Recv } /// Returns true if the remote peer can reserve a stream with the given ID. - fn ensure_can_reserve(&self, promised_id: StreamId) -> Result<(), ConnectionError> { + fn ensure_can_reserve(&self, promised_id: StreamId) + -> Result<(), ConnectionError> + { // TODO: Are there other rules? if P::is_server() { // The remote is a client and cannot reserve @@ -400,27 +425,3 @@ impl Recv unimplemented!(); } } - -impl Recv - where B: Buf, -{ - pub fn poll_response(&mut self, stream: &mut store::Ptr) - -> Poll, ConnectionError> { - // If the buffer is not empty, then the first frame must be a HEADERS - // frame or the user violated the contract. - match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Frame::Headers(v)) => { - // TODO: This error should probably be caught on receipt of the - // frame vs. now. - Ok(client::Peer::convert_poll_message(v)?.into()) - } - Some(frame) => unimplemented!(), - None => { - stream.state.ensure_recv_open()?; - - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) - } - } - } -} diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index de7bc6c..b0130bd 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -10,7 +10,7 @@ use std::collections::VecDeque; use std::marker::PhantomData; #[derive(Debug)] -pub(super) struct Send { +pub(super) struct Send { /// Maximum number of locally initiated streams max_streams: Option, @@ -36,15 +36,12 @@ pub(super) struct Send { /// be notified later. Access to poll_window_update must not be shared across tasks, /// as we only track a single task (and *not* i.e. a task per stream id). blocked: Option, - - _p: PhantomData

, } -impl Send - where P: Peer, - B: Buf, -{ - pub fn new(config: &Config) -> Self { +impl Send where B: Buf { + + /// Create a new `Send` + pub fn new(config: &Config) -> Self { let next_stream_id = if P::is_server() { 2 } else { @@ -60,15 +57,16 @@ impl Send prioritize: Prioritize::new(), pending_window_updates: VecDeque::new(), blocked: None, - _p: PhantomData, } } /// Update state reflecting a new, locally opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self) -> Result, ConnectionError> { - try!(self.ensure_can_open()); + pub fn open(&mut self) + -> Result, ConnectionError> + { + try!(self.ensure_can_open::

()); if let Some(max) = self.max_streams { if max <= self.num_streams { @@ -229,7 +227,7 @@ impl Send } /// Returns true if the local actor can initiate a stream with the given ID. - fn ensure_can_open(&self) -> Result<(), ConnectionError> { + fn ensure_can_open(&self) -> Result<(), ConnectionError> { if P::is_server() { // Servers cannot open streams. PushPromise must first be reserved. return Err(UnexpectedFrameType.into()); diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index fd3a47c..de23fcb 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -2,28 +2,28 @@ use client; use proto::*; use super::*; +use std::marker::PhantomData; use std::sync::{Arc, Mutex}; // TODO: All the VecDeques should become linked lists using the State // values. #[derive(Debug)] -pub struct Streams { - inner: Arc>>, +pub struct Streams { + inner: Arc>>, } /// Reference to the stream state #[derive(Debug)] -pub struct StreamRef { - inner: Arc>>, +pub struct StreamRef { + inner: Arc>>, key: store::Key, } #[derive(Debug)] -pub struct Chunk - where P: Peer, - B: Buf, +pub struct Chunk + where B: Buf, { - inner: Arc>>, + inner: Arc>>, recv: recv::Chunk, } @@ -32,30 +32,29 @@ pub struct Chunk /// /// TODO: better name #[derive(Debug)] -struct Inner { - actions: Actions, +struct Inner { + actions: Actions, store: Store, } #[derive(Debug)] -struct Actions { +struct Actions { /// Manages state transitions initiated by receiving frames - recv: Recv, + recv: Recv, /// Manages state transitions initiated by sending frames - send: Send, + send: Send, } -impl Streams - where P: Peer, - B: Buf, +impl Streams + where B: Buf, { - pub fn new(config: Config) -> Self { + pub fn new(config: Config) -> Self { Streams { inner: Arc::new(Mutex::new(Inner { actions: Actions { recv: Recv::new(&config), - send: Send::new(&config), + send: Send::new::

(&config), }, store: Store::new(), })), @@ -63,7 +62,7 @@ impl Streams } /// Process inbound headers - pub fn recv_headers(&mut self, frame: frame::Headers) + pub fn recv_headers(&mut self, frame: frame::Headers) -> Result, ConnectionError> { let id = frame.stream_id(); @@ -81,7 +80,7 @@ impl Streams return Err(ProtocolError.into()); } - match try!(me.actions.recv.open(id)) { + match try!(me.actions.recv.open::

(id)) { Some(stream) => e.insert(stream), None => return Ok(None), } @@ -90,7 +89,7 @@ impl Streams let stream = me.store.resolve(key); - me.actions.transition(stream, |actions, stream| { + me.actions.transition::(stream, |actions, stream| { if frame.is_trailers() { unimplemented!(); /* @@ -102,12 +101,12 @@ impl Streams try!(me.actions.recv.recv_eos(stream)); */ } else { - actions.recv.recv_headers(frame, stream) + actions.recv.recv_headers::

(frame, stream) } }) } - pub fn recv_data(&mut self, frame: frame::Data) + pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), ConnectionError> { let mut me = self.inner.lock().unwrap(); @@ -120,12 +119,12 @@ impl Streams None => return Err(ProtocolError.into()), }; - me.actions.transition(stream, |actions, stream| { + me.actions.transition::(stream, |actions, stream| { actions.recv.recv_data(frame, stream) }) } - pub fn recv_reset(&mut self, frame: frame::Reset) + pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), ConnectionError> { let mut me = self.inner.lock().unwrap(); @@ -139,7 +138,7 @@ impl Streams None => return Ok(()), }; - me.actions.transition(stream, |actions, stream| { + me.actions.transition::(stream, |actions, stream| { actions.recv.recv_reset(frame, stream)?; assert!(stream.state.is_closed()); Ok(()) @@ -173,7 +172,7 @@ impl Streams Ok(()) } - pub fn recv_push_promise(&mut self, frame: frame::PushPromise) + pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), ConnectionError> { let mut me = self.inner.lock().unwrap(); @@ -186,7 +185,7 @@ impl Streams None => return Err(ProtocolError.into()), }; - me.actions.recv.recv_push_promise(frame, &mut stream) + me.actions.recv.recv_push_promise::

(frame, &mut stream) } pub fn send_headers(&mut self, headers: frame::Headers) @@ -274,11 +273,11 @@ impl Streams } } -impl Streams +impl Streams where B: Buf, { pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) - -> Result, ConnectionError> + -> Result, ConnectionError> { // TODO: There is a hazard with assigning a stream ID before the // prioritize layer. If prioritization reorders new streams, this @@ -290,7 +289,7 @@ impl Streams let me = &mut *me; // Initialize a new stream. This fails if the connection is at capacity. - let mut stream = me.actions.send.open()?; + let mut stream = me.actions.send.open::()?; // Convert the message let headers = client::Peer::convert_send_message( @@ -316,11 +315,10 @@ impl Streams // ===== impl StreamRef ===== -impl StreamRef - where P: Peer, - B: Buf, +impl StreamRef + where B: Buf, { - pub fn send_data(&mut self, data: B, end_of_stream: bool) + pub fn send_data(&mut self, data: B, end_of_stream: bool) -> Result<(), ConnectionError> { let mut me = self.inner.lock().unwrap(); @@ -331,13 +329,22 @@ impl StreamRef // Create the data frame let frame = frame::Data::from_buf(stream.id, data, end_of_stream); - me.actions.transition(stream, |actions, stream| { + me.actions.transition::(stream, |actions, stream| { // Send the data frame actions.send.send_data(frame, stream) }) } - pub fn poll_data(&mut self) -> Poll>, ConnectionError> { + pub fn poll_response(&mut self) -> Poll, ConnectionError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + let mut stream = me.store.resolve(self.key); + + me.actions.recv.poll_response(&mut stream) + } + + pub fn poll_data(&mut self) -> Poll>, ConnectionError> { let recv = { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -359,22 +366,7 @@ impl StreamRef } } -impl StreamRef - where B: Buf, -{ - pub fn poll_response(&mut self) -> Poll, ConnectionError> { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - let mut stream = me.store.resolve(self.key); - - me.actions.recv.poll_response(&mut stream) - } -} - - - -impl Clone for StreamRef { +impl Clone for StreamRef { fn clone(&self) -> Self { StreamRef { inner: self.inner.clone(), @@ -385,9 +377,8 @@ impl Clone for StreamRef { // ===== impl Chunk ===== -impl Chunk - where P: Peer, - B: Buf, +impl Chunk + where B: Buf, { // TODO: Come up w/ a better API pub fn pop_bytes(&mut self) -> Option { @@ -398,9 +389,8 @@ impl Chunk } } -impl Drop for Chunk - where P: Peer, - B: Buf, +impl Drop for Chunk + where B: Buf, { fn drop(&mut self) { let mut me = self.inner.lock().unwrap(); @@ -413,32 +403,32 @@ impl Drop for Chunk // ===== impl Actions ===== -impl Actions - where P: Peer, - B: Buf, +impl Actions + where B: Buf, { - fn dec_num_streams(&mut self, id: StreamId) { - if self.is_local_init(id) { + fn dec_num_streams(&mut self, id: StreamId) { + if self.is_local_init::

(id) { self.send.dec_num_streams(); } else { self.recv.dec_num_streams(); } } - fn is_local_init(&self, id: StreamId) -> bool { + fn is_local_init(&self, id: StreamId) -> bool { assert!(!id.is_zero()); P::is_server() == id.is_server_initiated() } - fn transition(&mut self, mut stream: store::Ptr, f: F) -> U + fn transition(&mut self, mut stream: store::Ptr, f: F) -> U where F: FnOnce(&mut Self, &mut store::Ptr) -> U, + P: Peer, { let is_counted = stream.state.is_counted(); let ret = f(self, &mut stream); if is_counted && stream.state.is_closed() { - self.dec_num_streams(stream.id); + self.dec_num_streams::

(stream.id); } ret diff --git a/src/server.rs b/src/server.rs index a3b0466..5293a7b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,3 @@ -#![allow(warnings)] - use {frame, proto, Peer, ConnectionError, StreamId}; use http; @@ -12,14 +10,19 @@ use std::fmt; /// In progress H2 connection binding pub struct Handshake { // TODO: unbox - inner: Box, Error = ConnectionError>>, + inner: Box, Error = ConnectionError>>, } /// Marker type indicating a client peer #[derive(Debug)] -pub struct Server; +pub struct Server { + connection: Connection, +} -pub type Connection = super::Connection; +#[derive(Debug)] +pub struct Stream { + inner: proto::StreamRef, +} /// Flush a Sink struct Flush {