diff --git a/src/client.rs b/src/client.rs index 3b84cbb..b200034 100644 --- a/src/client.rs +++ b/src/client.rs @@ -16,13 +16,19 @@ pub struct Handshake { } #[derive(Debug)] -struct Peer; +pub(crate) struct Peer; /// Marker type indicating a client peer pub struct Client { connection: Connection, } +/// Client half of an active HTTP/2.0 stream. +pub struct Stream { + inner: proto::Stream, + _p: ::std::marker::PhantomData, +} + impl Client where T: AsyncRead + AsyncWrite + 'static, { @@ -67,11 +73,82 @@ impl Client Handshake { inner: Box::new(handshake) } } - pub fn request(&mut self) { + /// Returns `Ready` when the connection can initialize a new HTTP 2.0 + /// stream. + pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + unimplemented!(); + } + + /// Send a request on a new HTTP 2.0 stream + pub fn request(&mut self, request: Request<()>, end_of_stream: bool) + -> Result, ConnectionError> + { + self.connection.send_request(request, end_of_stream) + .map(|stream| Stream { + inner: stream, + _p: ::std::marker::PhantomData, + }) + } +} + +impl fmt::Debug for Client + where T: fmt::Debug, + B: fmt::Debug + IntoBuf, + B::Buf: fmt::Debug + IntoBuf, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Client") + .field("connection", &self.connection) + .finish() + } +} + +// ===== impl Handshake ===== + +impl Future for Handshake { + type Item = Client; + type Error = ConnectionError; + + fn poll(&mut self) -> Poll { + self.inner.poll() + } +} + +impl fmt::Debug for Handshake + where T: fmt::Debug, + B: fmt::Debug + IntoBuf, + B::Buf: fmt::Debug + IntoBuf, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "client::Handshake") + } +} + +// ===== impl Stream ===== + +impl Stream { + /// Receive the HTTP/2.0 response, if it is ready. + pub fn poll_response(&mut self) -> Poll<(), ConnectionError> { + unimplemented!(); + } + + /// Send data + pub fn send_data(&mut self, data: B, end_of_stream: bool) + -> Result<(), ConnectionError> + { + unimplemented!(); + } + + /// Send trailers + pub fn send_trailers(&mut self, trailers: ()) + -> Result<(), ConnectionError> + { unimplemented!(); } } +// ===== impl Peer ===== + impl proto::Peer for Peer { type Send = Request<()>; type Poll = Response<()>; @@ -109,34 +186,3 @@ impl proto::Peer for Peer { .map_err(|_| ProtocolError.into()) } } - -impl Future for Handshake { - type Item = Client; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll { - self.inner.poll() - } -} - -impl fmt::Debug for Handshake - where T: fmt::Debug, - B: fmt::Debug + IntoBuf, - B::Buf: fmt::Debug + IntoBuf, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "client::Handshake") - } -} - -impl fmt::Debug for Client - where T: fmt::Debug, - B: fmt::Debug + IntoBuf, - B::Buf: fmt::Debug + IntoBuf, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.debug_struct("Client") - .field("connection", &self.connection) - .finish() - } -} diff --git a/src/frame/stream_id.rs b/src/frame/stream_id.rs index 6150bef..417a1e0 100644 --- a/src/frame/stream_id.rs +++ b/src/frame/stream_id.rs @@ -39,6 +39,10 @@ impl StreamId { pub fn is_zero(&self) -> bool { self.0 == 0 } + + pub fn increment(&mut self) { + self.0 += 2; + } } impl From for StreamId { diff --git a/src/lib.rs b/src/lib.rs index 1eee29b..693a6db 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ -// #![allow(warnings)] -#![deny(missing_debug_implementations)] +#![allow(warnings)] +// #![deny(missing_debug_implementations)] #[macro_use] extern crate futures; diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 2ee2240..24fe62f 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,10 +1,10 @@ -use {ConnectionError, Frame}; +use {client, ConnectionError, Frame}; use HeaderMap; use frame::{self, StreamId}; use proto::*; -use http::{request, response}; +use http::{Request, Response}; use bytes::{Bytes, IntoBuf}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -80,6 +80,7 @@ impl Connection unimplemented!(); } + /// Returns `Ready` when the connection is ready to receive a frame. pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { try_ready!(self.poll_send_ready()); @@ -89,6 +90,7 @@ impl Connection Ok(().into()) } + /* pub fn send_data(self, id: StreamId, data: B, @@ -112,10 +114,7 @@ impl Connection headers, }) } - - pub fn start_ping(&mut self, _body: PingPayload) -> StartSend { - unimplemented!(); - } + */ // ===== Private ===== @@ -227,6 +226,13 @@ impl Connection } } + fn poll_complete(&mut self) -> Poll<(), ConnectionError> { + try_ready!(self.poll_send_ready()); + try_ready!(self.codec.poll_complete()); + + Ok(().into()) + } + fn convert_poll_message(frame: frame::Headers) -> Result, ConnectionError> { if frame.is_trailers() { Ok(Frame::Trailers { @@ -243,6 +249,18 @@ impl Connection } } +impl Connection + where T: AsyncRead + AsyncWrite, + B: IntoBuf, +{ + /// Initialize a new HTTP/2.0 stream and send the message. + pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) + -> Result, ConnectionError> + { + self.streams.send_request(request, end_of_stream) + } +} + /* impl Connection where T: AsyncRead + AsyncWrite, @@ -292,20 +310,7 @@ impl Connection } */ -impl Stream for Connection - where T: AsyncRead + AsyncWrite, - P: Peer, - B: IntoBuf, -{ - type Item = Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - // TODO: intercept errors and flag the connection - self.recv_frame() - } -} - +/* impl Sink for Connection where T: AsyncRead + AsyncWrite, P: Peer, @@ -379,11 +384,5 @@ impl Sink for Connection // Return success Ok(AsyncSink::Ready) } - - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.poll_send_ready()); - try_ready!(self.codec.poll_complete()); - - Ok(().into()) - } } +*/ diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index ae2d059..3b30333 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -117,7 +117,7 @@ impl ApplySettings for FramedRead { } */ -impl Stream for FramedRead +impl futures::Stream for FramedRead where T: AsyncRead, { type Item = Frame; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index a032e91..7129402 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -6,18 +6,18 @@ mod settings; mod streams; pub use self::connection::Connection; +pub use self::streams::{Streams, Stream}; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; use self::ping_pong::PingPong; use self::settings::Settings; -use self::streams::Streams; use {StreamId, ConnectionError}; use error::Reason; use frame::{self, Frame}; -use futures::*; +use futures::{self, task, Poll, Async, AsyncSink, Sink, Stream as Stream2}; use bytes::{Buf, IntoBuf}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 3cbc39b..7165bd7 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -6,7 +6,7 @@ mod store; mod stream; mod streams; -pub use self::streams::Streams; +pub use self::streams::{Streams, Stream}; use self::flow_control::FlowControl; use self::recv::Recv; @@ -19,6 +19,8 @@ use proto::*; use error::Reason::*; use error::User::*; +use http::{Request, Response}; + #[derive(Debug)] pub struct Config { /// Maximum number of remote initiated streams diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index b36dc32..a233b83 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -17,6 +17,9 @@ pub struct Send

{ /// Current number of locally initiated streams num_streams: usize, + /// Stream identifier to use for next initialized stream. + next_stream_id: StreamId, + /// Initial window size of locally initiated streams init_window_sz: WindowSize, @@ -37,9 +40,16 @@ pub struct Send

{ impl Send

{ pub fn new(config: &Config) -> Self { + let next_stream_id = if P::is_server() { + 2 + } else { + 1 + }; + Send { max_streams: config.max_local_initiated, num_streams: 0, + next_stream_id: next_stream_id.into(), init_window_sz: config.init_local_window_sz, flow_control: FlowControl::new(config.init_local_window_sz), pending_window_updates: VecDeque::new(), @@ -51,8 +61,8 @@ impl Send

{ /// Update state reflecting a new, locally opened stream /// /// Returns the stream state if successful. `None` if refused - pub fn open(&mut self, id: StreamId) -> Result { - try!(self.ensure_can_open(id)); + pub fn open(&mut self) -> Result<(StreamId, State), ConnectionError> { + try!(self.ensure_can_open()); if let Some(max) = self.max_streams { if max <= self.num_streams { @@ -60,10 +70,13 @@ impl Send

{ } } + let ret = (self.next_stream_id, State::default()); + // Increment the number of locally initiated streams self.num_streams += 1; + self.next_stream_id.increment(); - Ok(State::default()) + Ok(ret) } pub fn send_headers(&mut self, state: &mut State, eos: bool) @@ -191,15 +204,13 @@ impl Send

{ } /// Returns true if the local actor can initiate a stream with the given ID. - fn ensure_can_open(&self, id: StreamId) -> Result<(), ConnectionError> { + fn ensure_can_open(&self) -> Result<(), ConnectionError> { if P::is_server() { // Servers cannot open streams. PushPromise must first be reserved. return Err(UnexpectedFrameType.into()); } - if !id.is_client_initiated() { - return Err(InvalidStreamId.into()); - } + // TODO: Handle StreamId overflow Ok(()) } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 3962529..3e54530 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -42,6 +42,11 @@ impl Store { } } + pub fn insert(&mut self, id: StreamId, val: State) { + let handle = self.slab.insert(val); + assert!(self.ids.insert(id, handle).is_none()); + } + pub fn entry(&mut self, id: StreamId) -> Entry { use self::hash_map::Entry::*; diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 11da177..2022a3c 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,3 +1,4 @@ +use client; use proto::*; use super::*; @@ -10,6 +11,12 @@ pub struct Streams

{ inner: Arc>>, } +#[derive(Debug)] +pub struct Stream

{ + inner: Arc>>, + id: StreamId, +} + /// Fields needed to manage state related to managing the set of streams. This /// is mostly split out to make ownership happy. /// @@ -139,28 +146,22 @@ impl Streams

{ unimplemented!(); } - pub fn send_headers(&mut self, frame: &frame::Headers) + pub fn send_headers(&mut self, headers: frame::Headers) -> Result<(), ConnectionError> { + unimplemented!(); + /* let id = frame.stream_id(); let mut me = self.inner.lock().unwrap(); let me = &mut *me; - trace!("send_headers; id={:?}", id); + // let (id, state) = me.actions.send.open()); + let state = match me.store.entry(id) { Entry::Occupied(e) => e.into_mut(), Entry::Vacant(e) => { - // Trailers cannot open a stream. Trailers are header frames - // that do not contain pseudo headers. Requests MUST contain a - // method and responses MUST contain a status. If they do not,t - // hey are considered to be malformed. - if frame.is_trailers() { - // TODO: Should this be a different error? - return Err(UnexpectedFrameType.into()); - } - - let state = try!(me.actions.send.open(id)); + let (id, state) = try!(me.actions.send.open()); e.insert(state) } }; @@ -176,6 +177,7 @@ impl Streams

{ } Ok(()) + */ } pub fn send_data(&mut self, frame: &frame::Data) @@ -250,6 +252,40 @@ impl Streams

{ } } +impl Streams { + pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) + -> Result, ConnectionError> + { + let id = { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + // Initialize a new stream. This fails if the connection is at capacity. + let (id, mut state) = me.actions.send.open()?; + + // Convert the message + let headers = client::Peer::convert_send_message( + id, request, end_of_stream); + + me.actions.send.send_headers(&mut state, end_of_stream)?; + + // Given that the stream has been initialized, it should not be in the + // closed state. + debug_assert!(!state.is_closed()); + + // Store the state + me.store.insert(id, state); + + id + }; + + Ok(Stream { + inner: self.inner.clone(), + id: id, + }) + } +} + impl Actions

{ fn dec_num_streams(&mut self, id: StreamId) { if self.is_local_init(id) { diff --git a/tests/client_request.rs b/tests/client_request.rs index 49250b2..6e897eb 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -117,6 +117,11 @@ fn recv_invalid_server_stream_id() { assert_proto_err!(err, ProtocolError); } +#[test] +#[ignore] +fn sending_request_on_closed_soket() { +} + const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];