diff --git a/Cargo.toml b/Cargo.toml index 07c6a9d..db5b002 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ futures = "0.1" tokio-io = "0.1.3" tokio-timer = "0.1" bytes = "0.4" -http = { git = "https://github.com/carllerche/http", branch = "lower-case-header-name-parsing" } +http = { git = "https://github.com/carllerche/http", rev = "2dd15d9" } byteorder = "1.0" log = "0.3.8" fnv = "1.0.5" diff --git a/examples/client.rs b/examples/client.rs index d0b3dcb..ffe31fa 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -24,9 +24,9 @@ struct Process { impl Future for Process { type Item = (); - type Error = ConnectionError; + type Error = h2::Error; - fn poll(&mut self) -> Poll<(), ConnectionError> { + fn poll(&mut self) -> Poll<(), h2::Error> { loop { if self.trailers { let trailers = try_ready!(self.body.poll_trailers()); @@ -71,7 +71,7 @@ pub fn main() { .uri("https://http2.akamai.com/") .body(()).unwrap(); - let mut trailers = h2::HeaderMap::new(); + let mut trailers = HeaderMap::new(); trailers.insert("zomg", "hello".parse().unwrap()); let mut stream = client.request(request, false).unwrap(); diff --git a/src/client.rs b/src/client.rs index 8a4ba14..0895306 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,9 +1,9 @@ -use {frame, HeaderMap, ConnectionError}; -use frame::StreamId; -use proto::{self, Connection, WindowSize, ProtoError}; -use error::Reason::*; +use frame::{StreamId, Headers, Pseudo, Settings}; +use frame::Reason::*; +use codec::{Codec, RecvError}; +use proto::{self, Connection, WindowSize}; -use http::{Request, Response}; +use http::{Request, Response, HeaderMap}; use futures::{Future, Poll, Sink, Async, AsyncSink, AndThen, MapErr}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::io::WriteAll; @@ -16,9 +16,9 @@ use std::io::Error as IoError; pub struct Handshake { inner: AndThen< - MapErr, fn(IoError) -> ConnectionError>, - Result, ConnectionError>, - fn((T, &'static [u8])) -> Result, ConnectionError> + MapErr, fn(IoError) -> ::Error>, + Result, ::Error>, + fn((T, &'static [u8])) -> Result, ::Error> > } @@ -63,28 +63,31 @@ impl Client debug!("binding client connection"); let bind: fn((T, &'static [u8])) - -> Result, ConnectionError> = + -> Result, ::Error> = |(io, _)| { debug!("client connection bound"); - let mut framed_write = proto::framed_write(io); - let settings = frame::Settings::default(); + // Create the codec + let mut codec = Codec::new(io); + + // Create the initial SETTINGS frame + let settings = Settings::default(); // Send initial settings frame - match framed_write.start_send(settings.into()) { + match codec.start_send(settings.into()) { Ok(AsyncSink::Ready) => { - let connection = proto::from_framed_write(framed_write); + let connection = Connection::new(codec); Ok(Client { connection }) } Ok(_) => unreachable!(), - Err(e) => Err(ConnectionError::from(e)), + Err(e) => Err(::Error::from(e)), } }; let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; let handshake = io::write_all(io, msg) - .map_err(ConnectionError::from as - fn(IoError) -> ConnectionError + .map_err(::Error::from as + fn(IoError) -> ::Error ) .and_then(bind); @@ -93,15 +96,16 @@ impl Client /// Returns `Ready` when the connection can initialize a new HTTP 2.0 /// stream. - pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - self.connection.poll_send_request_ready() + pub fn poll_ready(&mut self) -> Poll<(), ::Error> { + Ok(self.connection.poll_send_request_ready()) } /// Send a request on a new HTTP 2.0 stream pub fn request(&mut self, request: Request<()>, end_of_stream: bool) - -> Result, ConnectionError> + -> Result, ::Error> { self.connection.send_request(request, end_of_stream) + .map_err(Into::into) .map(|stream| Stream { inner: stream, }) @@ -114,10 +118,11 @@ impl Future for Client B: IntoBuf, { type Item = (); - type Error = ConnectionError; + type Error = ::Error; - fn poll(&mut self) -> Poll<(), ConnectionError> { + fn poll(&mut self) -> Poll<(), ::Error> { self.connection.poll() + .map_err(Into::into) } } @@ -139,7 +144,7 @@ impl fmt::Debug for Client impl Future for Handshake where T: AsyncRead + AsyncWrite { type Item = Client; - type Error = ConnectionError; + type Error = ::Error; fn poll(&mut self) -> Poll { self.inner.poll() @@ -161,7 +166,7 @@ impl fmt::Debug for Handshake impl Stream { /// Receive the HTTP/2.0 response, if it is ready. - pub fn poll_response(&mut self) -> Poll>, ConnectionError> { + pub fn poll_response(&mut self) -> Poll>, ::Error> { let (parts, _) = try_ready!(self.inner.poll_response()).into_parts(); let body = Body { inner: self.inner.clone() }; @@ -180,29 +185,31 @@ impl Stream { } /// Request to be notified when the stream's capacity increases - pub fn poll_capacity(&mut self) -> Poll, ConnectionError> { + pub fn poll_capacity(&mut self) -> Poll, ::Error> { let res = try_ready!(self.inner.poll_capacity()); Ok(Async::Ready(res.map(|v| v as usize))) } /// Send data pub fn send_data(&mut self, data: B, end_of_stream: bool) - -> Result<(), ConnectionError> + -> Result<(), ::Error> { self.inner.send_data(data.into_buf(), end_of_stream) + .map_err(Into::into) } /// Send trailers pub fn send_trailers(&mut self, trailers: HeaderMap) - -> Result<(), ConnectionError> + -> Result<(), ::Error> { self.inner.send_trailers(trailers) + .map_err(Into::into) } } impl Future for Stream { type Item = Response>; - type Error = ConnectionError; + type Error = ::Error; fn poll(&mut self) -> Poll { self.poll_response() @@ -217,24 +224,27 @@ impl Body { self.inner.body_is_empty() } - pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> { + pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { self.inner.release_capacity(sz as proto::WindowSize) + .map_err(Into::into) } /// Poll trailers /// /// This function **must** not be called until `Body::poll` returns `None`. - pub fn poll_trailers(&mut self) -> Poll, ConnectionError> { + pub fn poll_trailers(&mut self) -> Poll, ::Error> { self.inner.poll_trailers() + .map_err(Into::into) } } impl ::futures::Stream for Body { type Item = Bytes; - type Error = ConnectionError; + type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { self.inner.poll_data() + .map_err(Into::into) } } @@ -251,7 +261,7 @@ impl proto::Peer for Peer { fn convert_send_message( id: StreamId, request: Self::Send, - end_of_stream: bool) -> frame::Headers + end_of_stream: bool) -> Headers { use http::request::Parts; @@ -259,10 +269,10 @@ impl proto::Peer for Peer { // Build the set pseudo header set. All requests will include `method` // and `path`. - let pseudo = frame::Pseudo::request(method, uri); + let pseudo = Pseudo::request(method, uri); // Create the HEADERS frame - let mut frame = frame::Headers::new(id, pseudo, headers); + let mut frame = Headers::new(id, pseudo, headers); if end_of_stream { frame.set_end_stream() @@ -271,7 +281,7 @@ impl proto::Peer for Peer { frame } - fn convert_poll_message(headers: frame::Headers) -> Result { + fn convert_poll_message(headers: Headers) -> Result { let mut b = Response::builder(); let stream_id = headers.stream_id(); @@ -286,7 +296,7 @@ impl proto::Peer for Peer { Err(_) => { // TODO: Should there be more specialized handling for different // kinds of errors - return Err(ProtoError::Stream { + return Err(RecvError::Stream { id: stream_id, reason: ProtocolError, }); diff --git a/src/codec/error.rs b/src/codec/error.rs new file mode 100644 index 0000000..12105a4 --- /dev/null +++ b/src/codec/error.rs @@ -0,0 +1,121 @@ +use frame::{Reason, StreamId}; + +use std::{error, fmt, io}; + +/// Errors that are received +#[derive(Debug)] +pub enum RecvError { + Connection(Reason), + Stream { + id: StreamId, + reason: Reason, + }, + Io(io::Error), +} + +/// Errors caused by sending a message +#[derive(Debug)] +pub enum SendError { + /// User error + User(UserError), + + /// I/O error + Io(io::Error), +} + +/// Errors caused by users of the library +#[derive(Debug)] +pub enum UserError { + /// The stream ID is no longer accepting frames. + InactiveStreamId, + + /// The stream is not currently expecting a frame of this type. + UnexpectedFrameType, + + /// The payload size is too big + PayloadTooBig, + + /// The application attempted to initiate too many streams to remote. + Rejected, +} + +// ===== impl RecvError ===== + +impl From for RecvError { + fn from(src: io::Error) -> Self { + RecvError::Io(src) + } +} + +impl error::Error for RecvError { + fn description(&self) -> &str { + use self::RecvError::*; + + match *self { + Connection(ref reason) => reason.description(), + Stream { ref reason, .. } => reason.description(), + Io(ref e) => e.description(), + } + } +} + +impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use std::error::Error; + write!(fmt, "{}", self.description()) + } +} + +// ===== impl SendError ===== + +impl error::Error for SendError { + fn description(&self) -> &str { + use self::SendError::*; + + match *self { + User(ref e) => e.description(), + Io(ref e) => e.description(), + } + } +} + +impl fmt::Display for SendError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use std::error::Error; + write!(fmt, "{}", self.description()) + } +} + +impl From for SendError { + fn from(src: io::Error) -> Self { + SendError::Io(src) + } +} + +impl From for SendError { + fn from(src: UserError) -> Self { + SendError::User(src) + } +} + +// ===== impl UserError ===== + +impl error::Error for UserError { + fn description(&self) -> &str { + use self::UserError::*; + + match *self { + InactiveStreamId => "inactive stream", + UnexpectedFrameType => "unexpected frame type", + PayloadTooBig => "payload too big", + Rejected => "rejected", + } + } +} + +impl fmt::Display for UserError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + use std::error::Error; + write!(fmt, "{}", self.description()) + } +} diff --git a/src/proto/framed_read.rs b/src/codec/framed_read.rs similarity index 89% rename from src/proto/framed_read.rs rename to src/codec/framed_read.rs index 2e6562b..2432e6c 100644 --- a/src/proto/framed_read.rs +++ b/src/codec/framed_read.rs @@ -1,8 +1,9 @@ -use {hpack, ConnectionError}; +use codec::RecvError; use frame::{self, Frame, Kind}; use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE; -use proto::*; -use error::Reason::*; +use frame::Reason::*; + +use hpack; use futures::*; @@ -11,8 +12,6 @@ use bytes::BytesMut; use tokio_io::AsyncRead; use tokio_io::codec::length_delimited; -use std::io; - #[derive(Debug)] pub struct FramedRead { inner: length_delimited::FramedRead, @@ -54,8 +53,8 @@ impl FramedRead { // TODO: Is this needed? } - fn decode_frame(&mut self, mut bytes: BytesMut) -> Result, ProtoError> { - use self::ProtoError::*; + fn decode_frame(&mut self, mut bytes: BytesMut) -> Result, RecvError> { + use self::RecvError::*; trace!("decoding frame from {}B", bytes.len()); @@ -226,11 +225,11 @@ impl FramedRead { } } -impl futures::Stream for FramedRead +impl Stream for FramedRead where T: AsyncRead, { type Item = Frame; - type Error = ProtoError; + type Error = RecvError; fn poll(&mut self) -> Poll, Self::Error> { loop { @@ -247,32 +246,3 @@ impl futures::Stream for FramedRead } } } - -impl Sink for FramedRead { - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send(&mut self, item: T::SinkItem) -> StartSend { - self.inner.get_mut().start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - self.inner.get_mut().poll_complete() - } -} - -impl FramedRead> { - pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - self.inner.get_mut().poll_ready() - } -} - -impl io::Write for FramedRead { - fn write(&mut self, src: &[u8]) -> io::Result { - self.inner.get_mut().write(src) - } - - fn flush(&mut self) -> io::Result<()> { - self.inner.get_mut().flush() - } -} diff --git a/src/proto/framed_write.rs b/src/codec/framed_write.rs similarity index 85% rename from src/proto/framed_write.rs rename to src/codec/framed_write.rs index 3a173f4..df3fd97 100644 --- a/src/proto/framed_write.rs +++ b/src/codec/framed_write.rs @@ -1,6 +1,7 @@ -use {hpack, ConnectionError}; -use error::User::*; +use codec::UserError; +use codec::UserError::*; use frame::{self, Frame, FrameSize}; +use hpack; use futures::*; use tokio_io::{AsyncRead, AsyncWrite}; @@ -64,10 +65,14 @@ impl FramedWrite } } - pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + /// Returns `Ready` when `send` is able to accept a frame + /// + /// Calling this function may result in the current contents of the buffer + /// to be flushed to `T`. + pub fn poll_ready(&mut self) -> Poll<(), io::Error> { if !self.has_capacity() { // Try flushing - try!(self.poll_complete()); + try!(self.flush()); if !self.has_capacity() { return Ok(Async::NotReady); @@ -77,47 +82,13 @@ impl FramedWrite Ok(Async::Ready(())) } - fn has_capacity(&self) -> bool { - self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY - } - - fn is_empty(&self) -> bool { - match self.next { - Some(Next::Data(ref frame)) => !frame.payload().has_remaining(), - _ => !self.buf.has_remaining(), - } - } -} - -impl FramedWrite { - pub fn max_frame_size(&self) -> usize { - self.max_frame_size as usize - } - - pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { - if let Some(val) = settings.max_frame_size() { - self.max_frame_size = val; - } - } - - pub fn take_last_data_frame(&mut self) -> Option> { - self.last_data_frame.take() - } -} - -impl Sink for FramedWrite - where T: AsyncWrite, - B: Buf, -{ - type SinkItem = Frame; - type SinkError = ConnectionError; - - fn start_send(&mut self, item: Self::SinkItem) - -> StartSend - { - if !try!(self.poll_ready()).is_ready() { - return Ok(AsyncSink::NotReady(item)); - } + /// Buffer a frame. + /// + /// `poll_ready` must be called first to ensure that a frame may be + /// accepted. + pub fn buffer(&mut self, item: Frame) -> Result<(), UserError> { + // Ensure that we have enough capacity to accept the write. + assert!(self.has_capacity()); debug!("send; frame={:?}", item); @@ -127,7 +98,7 @@ impl Sink for FramedWrite let len = v.payload().remaining(); if len > self.max_frame_size() { - return Err(PayloadTooBig.into()); + return Err(PayloadTooBig); } if len >= CHAIN_THRESHOLD { @@ -188,11 +159,12 @@ impl Sink for FramedWrite } } - Ok(AsyncSink::Ready) + Ok(()) } - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - trace!("poll_complete"); + /// Flush buffered data to the wire + pub fn flush(&mut self) -> Poll<(), io::Error> { + trace!("flush"); while !self.is_empty() { match self.next { @@ -228,18 +200,44 @@ impl Sink for FramedWrite Ok(Async::Ready(())) } - fn close(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.poll_complete()); + /// Close the codec + pub fn shutdown(&mut self) -> Poll<(), io::Error> { + try_ready!(self.flush()); self.inner.shutdown().map_err(Into::into) } + + fn has_capacity(&self) -> bool { + self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY + } + + fn is_empty(&self) -> bool { + match self.next { + Some(Next::Data(ref frame)) => !frame.payload().has_remaining(), + _ => !self.buf.has_remaining(), + } + } } -impl Stream for FramedWrite { - type Item = T::Item; - type Error = T::Error; +impl FramedWrite { + /// Returns the max frame size that can be sent + pub fn max_frame_size(&self) -> usize { + self.max_frame_size as usize + } - fn poll(&mut self) -> Poll, T::Error> { - self.inner.poll() + /// Apply settings received by the peer + pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { + if let Some(val) = settings.max_frame_size() { + self.max_frame_size = val; + } + } + + /// Retrieve the last data frame that has been sent + pub fn take_last_data_frame(&mut self) -> Option> { + self.last_data_frame.take() + } + + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner } } diff --git a/src/codec/mod.rs b/src/codec/mod.rs new file mode 100644 index 0000000..7f84e30 --- /dev/null +++ b/src/codec/mod.rs @@ -0,0 +1,148 @@ +mod error; +mod framed_read; +mod framed_write; + +pub use self::error::{SendError, RecvError, UserError}; + +use self::framed_read::FramedRead; +use self::framed_write::FramedWrite; + +use frame::{self, Frame, Data, Settings}; + +use futures::*; + +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::length_delimited; + +use bytes::Buf; + +use std::io; + +#[derive(Debug)] +pub struct Codec { + inner: FramedRead>, +} + +impl Codec + where T: AsyncRead + AsyncWrite, + B: Buf, +{ + pub fn new(io: T) -> Self { + // Wrap with writer + let framed_write = FramedWrite::new(io); + + // Delimit the frames + let delimited = length_delimited::Builder::new() + .big_endian() + .length_field_length(3) + .length_adjustment(9) + .num_skip(0) // Don't skip the header + // TODO: make this configurable and allow it to be changed during + // runtime. + .max_frame_length(frame::DEFAULT_MAX_FRAME_SIZE as usize) + .new_read(framed_write); + + let inner = FramedRead::new(delimited); + + Codec { inner } + } +} + +impl Codec { + /// Apply a settings received from the peer + pub fn apply_remote_settings(&mut self, frame: &Settings) { + self.framed_read().apply_remote_settings(frame); + self.framed_write().apply_remote_settings(frame); + } + + /// Takes the data payload value that was fully written to the socket + pub fn take_last_data_frame(&mut self) -> Option> { + self.framed_write().take_last_data_frame() + } + + /// Returns the max frame size that can be sent to the peer + pub fn max_send_frame_size(&self) -> usize { + self.inner.get_ref().max_frame_size() + } + + pub fn get_mut(&mut self) -> &mut T { + self.inner.get_mut().get_mut() + } + + fn framed_read(&mut self) -> &mut FramedRead> { + &mut self.inner + } + + fn framed_write(&mut self) -> &mut FramedWrite { + self.inner.get_mut() + } +} + +impl Codec + where T: AsyncWrite, + B: Buf, +{ + /// Returns `Ready` when the codec can buffer a frame + pub fn poll_ready(&mut self) -> Poll<(), io::Error> { + self.framed_write().poll_ready() + } + + /// Buffer a frame. + /// + /// `poll_ready` must be called first to ensure that a frame may be + /// accepted. + /// + /// TODO: Rename this to avoid conflicts with Sink::buffer + pub fn buffer(&mut self, item: Frame) -> Result<(), UserError> + { + self.framed_write().buffer(item) + } + + /// Flush buffered data to the wire + pub fn flush(&mut self) -> Poll<(), io::Error> { + self.framed_write().flush() + } + + /// Shutdown the send half + pub fn shutdown(&mut self) -> Poll<(), io::Error> { + self.framed_write().shutdown() + } +} + +impl Stream for Codec + where T: AsyncRead, +{ + type Item = Frame; + type Error = RecvError; + + fn poll(&mut self) -> Poll, Self::Error> { + self.inner.poll() + } +} + +impl Sink for Codec + where T: AsyncWrite, + B: Buf, +{ + type SinkItem = Frame; + type SinkError = SendError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + if !self.poll_ready()?.is_ready() { + return Ok(AsyncSink::NotReady(item)); + } + + self.buffer(item)?; + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.flush()?; + Ok(Async::Ready(())) + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.shutdown()?; + Ok(Async::Ready(())) + } +} diff --git a/src/error.rs b/src/error.rs index 7d98d24..3488dbd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,263 +1,101 @@ -use http; +use codec::{SendError, UserError}; +use proto; + use std::{error, fmt, io}; +pub use frame::Reason; + /// The error type for HTTP/2 operations -/// -/// XXX does this sufficiently destinguish stream-level errors from connection-level errors? #[derive(Debug)] -pub enum ConnectionError { +pub struct Error { + kind: Kind, +} + +#[derive(Debug)] +enum Kind { /// An error caused by an action taken by the remote peer. /// /// This is either an error received by the peer or caused by an invalid /// action taken by the peer (i.e. a protocol error). Proto(Reason), - /// An `io::Error` occurred while trying to read or write. - Io(io::Error), - /// An error resulting from an invalid action taken by the user of this /// library. - User(User), + User(UserError), - // TODO: reserve additional variants + /// An `io::Error` occurred while trying to read or write. + Io(io::Error), } -#[derive(Debug)] -pub struct StreamError(Reason); +// ===== impl Error ===== -impl StreamError { - pub fn new(r: Reason) -> StreamError { - StreamError(r) - } +impl From for Error { + fn from(src: proto::Error) -> Error { + use proto::Error::*; - pub fn reason(&self) -> Reason { - self.0 - } -} - -impl From for StreamError { - fn from(r: Reason) -> Self { - StreamError(r) - } -} - -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum Reason { - NoError, - ProtocolError, - InternalError, - FlowControlError, - SettingsTimeout, - StreamClosed, - FrameSizeError, - RefusedStream, - Cancel, - CompressionError, - ConnectError, - EnhanceYourCalm, - InadequateSecurity, - Http11Required, - Other(u32), - // TODO: reserve additional variants -} - -#[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum User { - /// The specified stream ID is invalid. - /// - /// For example, using a stream ID reserved for a push promise from the - /// client or using a non-zero stream ID for settings. - InvalidStreamId, - - /// The stream ID is no longer accepting frames. - InactiveStreamId, - - /// The stream is not currently expecting a frame of this type. - UnexpectedFrameType, - - /// The connection or stream does not have a sufficient flow control window to - /// transmit a Data frame to the remote. - FlowControlViolation, - - /// The payload size is too big - PayloadTooBig, - - /// The connection state is corrupt and the connection should be dropped. - Corrupt, - - /// The stream state has been reset. - StreamReset(Reason), - - /// The application attempted to initiate too many streams to remote. - Rejected, - - // TODO: reserve additional variants -} - -macro_rules! reason_desc { - ($reason:expr) => (reason_desc!($reason, "")); - ($reason:expr, $prefix:expr) => ({ - use self::Reason::*; - - match $reason { - NoError => concat!($prefix, "not a result of an error"), - ProtocolError => concat!($prefix, "unspecific protocol error detected"), - InternalError => concat!($prefix, "unexpected internal error encountered"), - FlowControlError => concat!($prefix, "flow-control protocol violated"), - SettingsTimeout => concat!($prefix, "settings ACK not received in timely manner"), - StreamClosed => concat!($prefix, "received frame when stream half-closed"), - FrameSizeError => concat!($prefix, "frame sent with invalid size"), - RefusedStream => concat!($prefix, "refused stream before processing any application logic"), - Cancel => concat!($prefix, "stream no longer needed"), - CompressionError => concat!($prefix, "unable to maintain the header compression context"), - ConnectError => concat!($prefix, "connection established in response to a CONNECT request was reset or abnormally closed"), - EnhanceYourCalm => concat!($prefix, "detected excessive load generating behavior"), - InadequateSecurity => concat!($prefix, "security properties do not meet minimum requirements"), - Http11Required => concat!($prefix, "endpoint requires HTTP/1.1"), - Other(_) => concat!($prefix, "other reason (ain't no tellin')"), + Error { + kind: match src { + Proto(reason) => Kind::Proto(reason), + Io(e) => Kind::Io(e), + }, } - }); + } } -macro_rules! user_desc { - ($reason:expr) => (user_desc!($reason, "")); - ($reason:expr, $prefix:expr) => ({ - use self::User::*; +impl From for Error { + fn from(src: io::Error) -> Error { + Error { kind: Kind::Io(src) } + } +} - match $reason { - InvalidStreamId => concat!($prefix, "invalid stream ID"), - InactiveStreamId => concat!($prefix, "inactive stream ID"), - UnexpectedFrameType => concat!($prefix, "unexpected frame type"), - FlowControlViolation => concat!($prefix, "flow control violation"), - StreamReset(_) => concat!($prefix, "frame sent on reset stream"), - Corrupt => concat!($prefix, "connection state corrupt"), - Rejected => concat!($prefix, "stream would exceed remote max concurrency"), - PayloadTooBig => concat!($prefix, "payload too big"), +impl From for Error { + fn from(src: Reason) -> Error { + Error { kind: Kind::Proto(src) } + } +} + +impl From for Error { + fn from(src: SendError) -> Error { + match src { + SendError::User(e) => e.into(), + SendError::Io(e) => e.into(), } - }); -} - -// ===== impl ConnectionError ===== - -impl From for ConnectionError { - fn from(src: io::Error) -> ConnectionError { - ConnectionError::Io(src) } } -impl From for ConnectionError { - fn from(src: Reason) -> ConnectionError { - ConnectionError::Proto(src) +impl From for Error { + fn from(src: UserError) -> Error { + Error { kind: Kind::User(src) } } } -impl From for ConnectionError { - fn from(src: User) -> ConnectionError { - ConnectionError::User(src) - } -} - -impl From for io::Error { - fn from(src: ConnectionError) -> io::Error { - io::Error::new(io::ErrorKind::Other, src) - } -} - -impl From for ConnectionError { - fn from(_: http::Error) -> Self { - // TODO: Should this always be a protocol error? - Reason::ProtocolError.into() - } -} - -impl fmt::Display for ConnectionError { +impl fmt::Display for Error { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - use self::ConnectionError::*; + use self::Kind::*; - match *self { - Proto(reason) => write!(fmt, "protocol error: {}", reason), + match self.kind { + Proto(ref reason) => write!(fmt, "protocol error: {}", reason), + User(ref e) => write!(fmt, "user error: {}", e), Io(ref e) => fmt::Display::fmt(e, fmt), - User(e) => write!(fmt, "user error: {}", e), } } } -impl error::Error for ConnectionError { +impl error::Error for Error { fn description(&self) -> &str { - use self::ConnectionError::*; + use self::Kind::*; - match *self { + match self.kind { Io(ref e) => error::Error::description(e), - Proto(reason) => reason_desc!(reason, "protocol error: "), - User(user) => user_desc!(user, "user error: "), + Proto(ref reason) => reason.description(), + User(ref user) => user.description(), } } } -// ===== impl Reason ===== - -impl Reason { - pub fn description(&self) -> &str { - reason_desc!(*self) - } -} - -impl From for Reason { - fn from(src: u32) -> Reason { - use self::Reason::*; - - match src { - 0x0 => NoError, - 0x1 => ProtocolError, - 0x2 => InternalError, - 0x3 => FlowControlError, - 0x4 => SettingsTimeout, - 0x5 => StreamClosed, - 0x6 => FrameSizeError, - 0x7 => RefusedStream, - 0x8 => Cancel, - 0x9 => CompressionError, - 0xa => ConnectError, - 0xb => EnhanceYourCalm, - 0xc => InadequateSecurity, - 0xd => Http11Required, - _ => Other(src), - } - } -} - -impl From for u32 { - fn from(src: Reason) -> u32 { - use self::Reason::*; - - match src { - NoError => 0x0, - ProtocolError => 0x1, - InternalError => 0x2, - FlowControlError => 0x3, - SettingsTimeout => 0x4, - StreamClosed => 0x5, - FrameSizeError => 0x6, - RefusedStream => 0x7, - Cancel => 0x8, - CompressionError => 0x9, - ConnectError => 0xa, - EnhanceYourCalm => 0xb, - InadequateSecurity => 0xc, - Http11Required => 0xd, - Other(v) => v, - } - } -} - -impl fmt::Display for Reason { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{}", self.description()) - } -} - // ===== impl User ===== +/* impl User { pub fn description(&self) -> &str { user_desc!(*self) @@ -269,3 +107,4 @@ impl fmt::Display for User { write!(fmt, "{}", self.description()) } } +*/ diff --git a/src/frame/go_away.rs b/src/frame/go_away.rs index 0063d2e..0188111 100644 --- a/src/frame/go_away.rs +++ b/src/frame/go_away.rs @@ -1,5 +1,4 @@ -use error::Reason; -use frame::{self, Head, Error, Kind, StreamId}; +use frame::{self, Head, Error, Kind, StreamId, Reason}; use bytes::{BufMut, BigEndian}; diff --git a/src/frame/headers.rs b/src/frame/headers.rs index e913c54..4694710 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -1,9 +1,8 @@ use super::{StreamId, StreamDependency}; use hpack; use frame::{self, Frame, Head, Kind, Error}; -use HeaderMap; -use http::{uri, Method, StatusCode, Uri}; +use http::{uri, Method, StatusCode, Uri, HeaderMap}; use http::header::{self, HeaderName, HeaderValue}; use bytes::{BytesMut, Bytes}; diff --git a/src/frame/mod.rs b/src/frame/mod.rs index b064f9b..7d75565 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -1,5 +1,4 @@ use hpack; -use error::{ConnectionError, Reason}; use bytes::Bytes; @@ -33,6 +32,7 @@ mod head; mod headers; mod ping; mod priority; +mod reason; mod reset; mod settings; mod stream_id; @@ -45,6 +45,7 @@ pub use self::head::{Head, Kind}; pub use self::headers::{Headers, PushPromise, Continuation, Pseudo}; pub use self::ping::Ping; pub use self::priority::{Priority, StreamDependency}; +pub use self::reason::Reason; pub use self::reset::Reset; pub use self::settings::Settings; pub use self::stream_id::StreamId; @@ -113,15 +114,6 @@ impl fmt::Debug for Frame { /// Errors that can occur during parsing an HTTP/2 frame. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Error { - /// A full frame header was not passed. - Short, - - /// An unsupported value was set for the flag value. - BadFlag, - - /// An unsupported value was set for the frame kind. - BadKind, - /// A length value other than 8 was set on a PING message. BadFrameSize, @@ -129,19 +121,6 @@ pub enum Error { /// length of the payload. TooMuchPadding, - /// The payload length specified by the frame header was shorter than - /// necessary for the parser settings specified and the frame type. - /// - /// This happens if, for instance, the priority flag is set and the - /// header length is shorter than a stream dependency. - /// - /// `PayloadLengthTooShort` should be treated as a protocol error. - PayloadLengthTooShort, - - /// The payload length specified by the frame header of a settings frame - /// was not a round multiple of the size of a single setting. - PartialSettingLength, - /// An invalid setting value was provided InvalidSettingValue, @@ -173,14 +152,3 @@ pub enum Error { /// Failed to perform HPACK decoding Hpack(hpack::DecoderError), } - -// ===== impl Error ===== - -impl From for ConnectionError { - fn from(src: Error) -> ConnectionError { - match src { - // TODO: implement - _ => ConnectionError::Proto(Reason::ProtocolError), - } - } -} diff --git a/src/frame/reader.rs b/src/frame/reader.rs deleted file mode 100644 index 6d5e11b..0000000 --- a/src/frame/reader.rs +++ /dev/null @@ -1,40 +0,0 @@ -use ConnectionError; -use super::Frame; -use futures::*; -use bytes::BytesMut; -use std::io; - -pub struct Reader { - inner: T, -} - -impl Stream for Reader - where T: Stream, -{ - type Item = Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - match try_ready!(self.inner.poll()) { - Some(bytes) => { - Frame::load(bytes.freeze()) - .map(|frame| Async::Ready(Some(frame))) - .map_err(ConnectionError::from) - } - None => Ok(Async::Ready(None)), - } - } -} - -impl Sink for Reader { - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send(&mut self, item: T::SinkItem) -> StartSend { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - self.inner.poll_complete() - } -} diff --git a/src/frame/reason.rs b/src/frame/reason.rs new file mode 100644 index 0000000..894cd87 --- /dev/null +++ b/src/frame/reason.rs @@ -0,0 +1,101 @@ +use std::fmt; + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum Reason { + NoError, + ProtocolError, + InternalError, + FlowControlError, + SettingsTimeout, + StreamClosed, + FrameSizeError, + RefusedStream, + Cancel, + CompressionError, + ConnectError, + EnhanceYourCalm, + InadequateSecurity, + Http11Required, + Other(u32), + // TODO: reserve additional variants +} + +// ===== impl Reason ===== + +impl Reason { + pub fn description(&self) -> &str { + use self::Reason::*; + + match *self { + NoError => "not a result of an error", + ProtocolError => "unspecific protocol error detected", + InternalError => "unexpected internal error encountered", + FlowControlError => "flow-control protocol violated", + SettingsTimeout => "settings ACK not received in timely manner", + StreamClosed => "received frame when stream half-closed", + FrameSizeError => "frame sent with invalid size", + RefusedStream => "refused stream before processing any application logic", + Cancel => "stream no longer needed", + CompressionError => "unable to maintain the header compression context", + ConnectError => "connection established in response to a CONNECT request was reset or abnormally closed", + EnhanceYourCalm => "detected excessive load generating behavior", + InadequateSecurity => "security properties do not meet minimum requirements", + Http11Required => "endpoint requires HTTP/1.1", + Other(_) => "other reason (ain't no tellin')", + } + } +} + +impl From for Reason { + fn from(src: u32) -> Reason { + use self::Reason::*; + + match src { + 0x0 => NoError, + 0x1 => ProtocolError, + 0x2 => InternalError, + 0x3 => FlowControlError, + 0x4 => SettingsTimeout, + 0x5 => StreamClosed, + 0x6 => FrameSizeError, + 0x7 => RefusedStream, + 0x8 => Cancel, + 0x9 => CompressionError, + 0xa => ConnectError, + 0xb => EnhanceYourCalm, + 0xc => InadequateSecurity, + 0xd => Http11Required, + _ => Other(src), + } + } +} + +impl From for u32 { + fn from(src: Reason) -> u32 { + use self::Reason::*; + + match src { + NoError => 0x0, + ProtocolError => 0x1, + InternalError => 0x2, + FlowControlError => 0x3, + SettingsTimeout => 0x4, + StreamClosed => 0x5, + FrameSizeError => 0x6, + RefusedStream => 0x7, + Cancel => 0x8, + CompressionError => 0x9, + ConnectError => 0xa, + EnhanceYourCalm => 0xb, + InadequateSecurity => 0xc, + Http11Required => 0xd, + Other(v) => v, + } + } +} + +impl fmt::Display for Reason { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{}", self.description()) + } +} diff --git a/src/frame/reset.rs b/src/frame/reset.rs index b513250..c819569 100644 --- a/src/frame/reset.rs +++ b/src/frame/reset.rs @@ -1,5 +1,4 @@ -use error::Reason; -use frame::{self, Head, Error, Kind, StreamId}; +use frame::{self, Head, Error, Kind, StreamId, Reason}; use bytes::{BufMut, BigEndian}; diff --git a/src/frame/unknown.rs b/src/frame/unknown.rs deleted file mode 100644 index 77cbb67..0000000 --- a/src/frame/unknown.rs +++ /dev/null @@ -1,33 +0,0 @@ -use frame::{Frame, Head, Error}; -use bytes::{Bytes, BytesMut, BufMut}; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Unknown { - head: Head, - payload: Bytes, -} - -impl Unknown { - pub fn new(head: Head, payload: Bytes) -> Unknown { - Unknown { - head: head, - payload: payload, - } - } - - pub fn encode_len(&self) -> usize { - self.head.encode_len() + self.payload.len() - } - - pub fn encode(&self, dst: &mut BytesMut) -> Result<(), Error> { - try!(self.head.encode(self.payload.len(), dst)); - dst.put(&self.payload); - Ok(()) - } -} - -impl From for Frame { - fn from(src: Unknown) -> Frame { - Frame::Unknown(src) - } -} diff --git a/src/frame/writer.rs b/src/frame/writer.rs deleted file mode 100644 index 5a13dca..0000000 --- a/src/frame/writer.rs +++ /dev/null @@ -1 +0,0 @@ -pub struct Writer; diff --git a/src/lib.rs b/src/lib.rs index a5e69ca..942a889 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,14 +24,13 @@ extern crate log; extern crate string; -pub mod client; -pub mod error; +mod error; +mod codec; mod hpack; mod proto; mod frame; + +pub mod client; pub mod server; -pub use error::{ConnectionError, Reason}; - -// TODO: remove if carllerche/http#90 lands -pub type HeaderMap = http::HeaderMap; +pub use error::{Error, Reason}; diff --git a/src/proto/codec.rs b/src/proto/codec.rs deleted file mode 100644 index 699f170..0000000 --- a/src/proto/codec.rs +++ /dev/null @@ -1,77 +0,0 @@ -use super::*; -use futures::*; - -#[derive(Debug)] -pub struct Codec { - inner: FramedRead>, -} - -impl Codec { - pub fn apply_remote_settings(&mut self, frame: &frame::Settings) { - self.framed_read().apply_remote_settings(frame); - self.framed_write().apply_remote_settings(frame); - } - - /// Takes the data payload value that was fully written to the socket - pub(crate) fn take_last_data_frame(&mut self) -> Option> { - self.framed_write().take_last_data_frame() - } - - pub fn max_send_frame_size(&self) -> usize { - self.inner.get_ref().max_frame_size() - } - - fn framed_read(&mut self) -> &mut FramedRead> { - &mut self.inner - } - - fn framed_write(&mut self) -> &mut FramedWrite { - self.inner.get_mut() - } -} - -impl Codec - where T: AsyncRead + AsyncWrite, - B: Buf, -{ - pub fn from_framed(inner: FramedRead>) -> Self { - Codec { inner } - } -} - -impl Codec - where T: AsyncWrite, - B: Buf, -{ - pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_ready() - } - -} - -impl futures::Stream for Codec - where T: AsyncRead, -{ - type Item = Frame; - type Error = ProtoError; - - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() - } -} - -impl Sink for Codec - where T: AsyncWrite, - B: Buf, -{ - type SinkItem = Frame; - type SinkError = ConnectionError; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.poll_complete() - } -} diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 9b640f7..b090f79 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,9 +1,11 @@ -use {client, frame, server, ConnectionError}; +use {client, frame, server, proto}; +use frame::Reason; +use codec::{SendError, RecvError}; use proto::*; use http::Request; -use futures::{Sink, Stream}; +use futures::{Stream}; use bytes::{Bytes, IntoBuf}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -73,7 +75,10 @@ impl Connection } /// Returns `Ready` when the connection is ready to receive a frame. - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + /// + /// Returns `RecvError` as this may raise errors that are caused by delayed + /// processing of received frames. + fn poll_ready(&mut self) -> Poll<(), RecvError> { // The order of these calls don't really matter too much as only one // should have pending work. try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); @@ -83,84 +88,97 @@ impl Connection Ok(().into()) } - /// Returns `Ready` when new the connection is able to support a new request stream. - pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> { - self.streams.poll_send_request_ready() - } - /// Advances the internal state of the connection. - pub fn poll(&mut self) -> Poll<(), ConnectionError> { - use error::ConnectionError::*; + pub fn poll(&mut self) -> Poll<(), proto::Error> { + use codec::RecvError::*; loop { + // TODO: probably clean up this glob of code match self.state { // When open, continue to poll a frame - State::Open => {}, - // In an error state - _ => { - try_ready!(self.poll_complete()); + State::Open => { + match self.poll2() { + // The connection has shutdown normally + Ok(Async::Ready(())) => return Ok(().into()), + // The connection is not ready to make progress + Ok(Async::NotReady) => { + // Ensure all window updates have been sent. + // + // This will also handle flushing `self.codec` + try_ready!(self.streams.poll_complete(&mut self.codec)); - // GO_AWAY frame has been sent, return the error - return Err(self.state.error().unwrap().into()); - } - } + return Ok(Async::NotReady); + } + // Attempting to read a frame resulted in a connection level + // error. This is handled by setting a GO_AWAY frame followed by + // terminating the connection. + Err(Connection(e)) => { + debug!("Connection::poll; err={:?}", e); - match self.poll2() { - Err(Proto(e)) => { - debug!("Connection::poll; err={:?}", e); - let last_processed_id = self.streams.recv_err(&e.into()); - let frame = frame::GoAway::new(last_processed_id, e); + // Reset all active streams + let last_processed_id = self.streams.recv_err(&e.into()); - self.state = State::GoAway(frame); + // Create the GO_AWAY frame with the last_processed_id + let frame = frame::GoAway::new(last_processed_id, e); + + // Transition to the going away state. + self.state = State::GoAway(frame); + } + // Attempting to read a frame resulted in a stream level error. + // This is handled by resetting the frame then trying to read + // another frame. + Err(Stream { id, reason }) => { + trace!("stream level error; id={:?}; reason={:?}", id, reason); + self.streams.send_reset(id, reason); + } + // Attempting to read a frame resulted in an I/O error. All + // active streams must be reset. + // + // TODO: Are I/O errors recoverable? + Err(Io(e)) => { + let e = e.into(); + + // Reset all active streams + self.streams.recv_err(&e); + + // Return the error + return Err(e); + } + } + }, + State::GoAway(frame) => { + // Ensure the codec is ready to accept the frame + try_ready!(self.codec.poll_ready()); + + // Buffer the GO_AWAY frame + self.codec.buffer(frame.into()) + .ok().expect("invalid GO_AWAY frame"); + + // GO_AWAY sent, transition the connection to an errored state + self.state = State::Flush(frame.reason()); } - Err(e) => { - // TODO: Are I/O errors recoverable? - self.streams.recv_err(&e); - return Err(e); + State::Flush(reason) => { + // Flush the codec + try_ready!(self.codec.flush()); + + // Transition the state to error + self.state = State::Error(reason); + } + State::Error(reason) => { + return Err(reason.into()); } - ret => return ret, } } } - fn poll2(&mut self) -> Poll<(), ConnectionError> { + fn poll2(&mut self) -> Poll<(), RecvError> { use frame::Frame::*; - use proto::ProtoError::*; loop { // First, ensure that the `Connection` is able to receive a frame try_ready!(self.poll_ready()); - trace!("polling codec"); - - let frame = match self.codec.poll() { - // Receive a frame - Ok(Async::Ready(frame)) => frame, - // Socket not ready, try to flush any pending data - Ok(Async::NotReady) => { - // Flush any pending writes - let _ = try!(self.poll_complete()); - return Ok(Async::NotReady); - } - // Connection level error, set GO_AWAY and close connection - Err(Connection(reason)) => { - return Err(ConnectionError::Proto(reason)); - } - // Stream level error, reset the stream - Err(Stream { id, reason }) => { - trace!("stream level error; id={:?}; reason={:?}", id, reason); - self.streams.send_reset(id, reason); - continue; - } - // I/O error, nothing more can be done - Err(Io(err)) => { - return Err(err.into()); - } - }; - - debug!("recv; frame={:?}", frame); - - match frame { + match try_ready!(self.codec.poll()) { Some(Headers(frame)) => { trace!("recv HEADERS; frame={:?}", frame); try!(self.streams.recv_headers(frame)); @@ -184,7 +202,7 @@ impl Connection Some(GoAway(_)) => { // TODO: handle the last_processed_id. Also, should this be // handled as an error? - // let _ = ConnectionError::Proto(frame.reason()); + // let _ = RecvError::Proto(frame.reason()); return Ok(().into()); } Some(Ping(frame)) => { @@ -207,46 +225,20 @@ impl Connection } } } - - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - loop { - match self.state { - State::Open => { - try_ready!(self.poll_ready()); - - // Ensure all window updates have been sent. - try_ready!(self.streams.poll_complete(&mut self.codec)); - - return Ok(().into()); - } - State::GoAway(frame) => { - if !self.codec.start_send(frame.into())?.is_ready() { - // Not ready to send the frame... try again later. - return Ok(Async::NotReady); - } - - // GO_AWAY sent, transition the connection to an errored state - self.state = State::Flush(frame.reason()); - } - State::Flush(reason) => { - try_ready!(self.codec.poll_complete()); - self.state = State::Error(reason); - } - State::Error(..) => { - return Ok(().into()); - } - } - } - } } impl Connection where T: AsyncRead + AsyncWrite, B: IntoBuf, { + /// Returns `Ready` when new the connection is able to support a new request stream. + pub fn poll_send_request_ready(&mut self) -> Async<()> { + self.streams.poll_send_request_ready() + } + /// Initialize a new HTTP/2.0 stream and send the message. pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) - -> Result, ConnectionError> + -> Result, SendError> { self.streams.send_request(request, end_of_stream) } @@ -260,14 +252,3 @@ impl Connection self.streams.next_incoming() } } - -// ====== impl State ===== - -impl State { - fn error(&self) -> Option { - match *self { - State::Error(reason) => Some(reason), - _ => None, - } - } -} diff --git a/src/proto/error.rs b/src/proto/error.rs new file mode 100644 index 0000000..7615874 --- /dev/null +++ b/src/proto/error.rs @@ -0,0 +1,34 @@ +use frame::Reason; +use codec::RecvError; + +use std::io; + +/// Either an H2 reason or an I/O error +#[derive(Debug)] +pub enum Error { + Proto(Reason), + Io(io::Error), +} + +impl Error { + pub fn into_connection_recv_error(self) -> RecvError { + use self::Error::*; + + match self { + Proto(reason) => RecvError::Connection(reason), + Io(e) => RecvError::Io(e), + } + } +} + +impl From for Error { + fn from(src: Reason) -> Self { + Error::Proto(src) + } +} + +impl From for Error { + fn from(src: io::Error) -> Self { + Error::Io(src) + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 1775c4c..a48a26b 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,109 +1,35 @@ -mod codec; mod connection; -mod framed_read; -mod framed_write; +mod error; +mod peer; mod ping_pong; mod settings; mod streams; pub(crate) use self::connection::Connection; +pub(crate) use self::error::Error; +pub(crate) use self::peer::Peer; pub(crate) use self::streams::{Streams, StreamRef}; -use self::codec::Codec; -use self::framed_read::FramedRead; -use self::framed_write::FramedWrite; +use codec::Codec; + use self::ping_pong::PingPong; use self::settings::Settings; use self::streams::Prioritized; -use ConnectionError; -use error::Reason; -use frame::{self, Frame, StreamId}; +use frame::{self, Frame}; -use futures::{self, task, Poll, Async, AsyncSink}; +use futures::{task, Poll, Async}; use futures::task::Task; -use bytes::{Buf, IntoBuf}; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::length_delimited; -use std::{fmt, io}; +use bytes::Buf; -/// Either a Client or a Server -pub trait Peer { - /// Message type sent into the transport - type Send; - - /// Message type polled from the transport - type Poll: fmt::Debug; - - fn is_server() -> bool; - - fn convert_send_message( - id: StreamId, - headers: Self::Send, - end_of_stream: bool) -> frame::Headers; - - fn convert_poll_message(headers: frame::Headers) -> Result; -} +use tokio_io::AsyncWrite; pub type PingPayload = [u8; 8]; pub type WindowSize = u32; -/// Errors that are received -#[derive(Debug)] -pub enum ProtoError { - Connection(Reason), - Stream { - id: StreamId, - reason: Reason, - }, - Io(io::Error), -} - // Constants +// TODO: Move these into `frame` pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; - -/// Create a transport prepared to handle the server handshake. -/// -/// When the server is performing the handshake, it is able to only send -/// `Settings` frames and is expected to receive the client preface as a byte -/// stream. To represent this, `Settings>` is returned. -pub(crate) fn framed_write(io: T) -> FramedWrite - where T: AsyncRead + AsyncWrite, - B: Buf, -{ - FramedWrite::new(io) -} - -/// Create a full H2 transport from the server handshaker -pub(crate) fn from_framed_write(framed_write: FramedWrite>) - -> Connection - where T: AsyncRead + AsyncWrite, - P: Peer, - B: IntoBuf, -{ - // Delimit the frames. - let framed = length_delimited::Builder::new() - .big_endian() - .length_field_length(3) - .length_adjustment(9) - .num_skip(0) // Don't skip the header - // TODO: make this configurable and allow it to be changed during - // runtime. - .max_frame_length(frame::DEFAULT_MAX_FRAME_SIZE as usize) - .new_read(framed_write); - - let codec = Codec::from_framed(FramedRead::new(framed)); - - Connection::new(codec) -} - -// ===== impl ProtoError ===== - -impl From for ProtoError { - fn from(src: io::Error) -> Self { - ProtoError::Io(src) - } -} diff --git a/src/proto/peer.rs b/src/proto/peer.rs new file mode 100644 index 0000000..2ab847b --- /dev/null +++ b/src/proto/peer.rs @@ -0,0 +1,22 @@ +use frame::{Headers, StreamId}; +use codec::RecvError; + +use std::fmt; + +/// Either a Client or a Server +pub trait Peer { + /// Message type sent into the transport + type Send; + + /// Message type polled from the transport + type Poll: fmt::Debug; + + fn is_server() -> bool; + + fn convert_send_message( + id: StreamId, + headers: Self::Send, + end_of_stream: bool) -> Headers; + + fn convert_poll_message(headers: Headers) -> Result; +} diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index 4baa1b8..1a6f096 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,7 +1,7 @@ use frame::Ping; use proto::*; -use futures::Sink; +use std::io; /// Acknowledges ping requests from the remote. #[derive(Debug)] @@ -45,15 +45,16 @@ impl PingPong } /// Send any pending pongs. - pub fn send_pending_pong(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> + pub fn send_pending_pong(&mut self, dst: &mut Codec) -> Poll<(), io::Error> where T: AsyncWrite, { if let Some(pong) = self.sending_pong.take() { - if let AsyncSink::NotReady(pong) = dst.start_send(pong)? { - // If the pong can't be sent, save it. + if !dst.poll_ready()?.is_ready() { self.sending_pong = Some(pong); return Ok(Async::NotReady); } + + dst.buffer(pong).ok().expect("invalid pong frame"); } Ok(Async::Ready(())) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index b415651..ff6a80b 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,8 +1,7 @@ use frame; +use codec::RecvError; use proto::*; -use futures::Sink; - #[derive(Debug)] pub(crate) struct Settings { /// Received SETTINGS frame pending processing. The ACK must be written to @@ -31,7 +30,7 @@ impl Settings { pub fn send_pending_ack(&mut self, dst: &mut Codec, streams: &mut Streams) - -> Poll<(), ConnectionError> + -> Poll<(), RecvError> where T: AsyncWrite, B: Buf, C: Buf, @@ -40,13 +39,17 @@ impl Settings { trace!("send_pending_ack; pending={:?}", self.pending); if let Some(ref settings) = self.pending { - let frame = frame::Settings::ack(); - - if let AsyncSink::NotReady(_) = dst.start_send(frame.into())? { + if !dst.poll_ready()?.is_ready() { trace!("failed to send ACK"); return Ok(Async::NotReady); } + // Create an ACK settings frame + let frame = frame::Settings::ack(); + + // Buffer the settings frame + dst.buffer(frame.into()).ok().expect("invalid settings frame"); + trace!("ACK sent; applying settings"); dst.apply_remote_settings(settings); diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index fef8b47..3e85b4b 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -1,6 +1,6 @@ -use ConnectionError; +use frame::Reason; +use frame::Reason::*; use proto::*; -use error::Reason::*; #[derive(Copy, Clone, Debug)] pub struct FlowControl { @@ -67,15 +67,15 @@ impl FlowControl { /// Increase the window size. /// /// This is called after receiving a WINDOW_UPDATE frame - pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), ConnectionError> { + pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), Reason> { let (val, overflow) = self.window_size.overflowing_add(sz as i32); if overflow { - return Err(FlowControlError.into()); + return Err(FlowControlError); } if val > MAX_WINDOW_SIZE as i32 { - return Err(FlowControlError.into()); + return Err(FlowControlError); } trace!("inc_window; sz={}; old={}; new={}", sz, self.window_size, val); diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index 9e09002..c875c0f 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -20,11 +20,9 @@ use self::state::State; use self::store::{Store, Entry}; use self::stream::Stream; -use {frame, ConnectionError}; use frame::StreamId; use proto::*; use error::Reason::*; -use error::User::*; use http::{Request, Response}; use bytes::Bytes; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 1411ae6..bc878bc 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -1,9 +1,14 @@ use super::*; use super::store::Resolve; -use bytes::buf::Take; -use futures::Sink; +use frame::Reason; +use codec::UserError; +use codec::UserError::*; + +use bytes::buf::Take; + +use std::io; use std::{fmt, cmp}; #[derive(Debug)] @@ -80,7 +85,7 @@ impl Prioritize frame: frame::Data, stream: &mut store::Ptr, task: &mut Option) - -> Result<(), ConnectionError> + -> Result<(), UserError> { let sz = frame.payload().remaining(); @@ -93,9 +98,9 @@ impl Prioritize if !stream.state.is_send_streaming() { if stream.state.is_closed() { - return Err(InactiveStreamId.into()); + return Err(InactiveStreamId); } else { - return Err(UnexpectedFrameType.into()); + return Err(UnexpectedFrameType); } } @@ -115,7 +120,7 @@ impl Prioritize } if frame.is_end_stream() { - try!(stream.state.send_close()); + stream.state.send_close(); } trace!("send_data (2); available={}; buffered={}", @@ -161,7 +166,7 @@ impl Prioritize pub fn recv_stream_window_update(&mut self, inc: WindowSize, stream: &mut store::Ptr) - -> Result<(), ConnectionError> + -> Result<(), Reason> { trace!("recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}", stream.id, stream.state, inc, stream.send_flow); @@ -179,7 +184,7 @@ impl Prioritize pub fn recv_connection_window_update(&mut self, inc: WindowSize, store: &mut Store) - -> Result<(), ConnectionError> + -> Result<(), Reason> { // Update the connection's window self.flow.inc_window(inc)?; @@ -284,7 +289,7 @@ impl Prioritize pub fn poll_complete(&mut self, store: &mut Store, dst: &mut Codec>) - -> Poll<(), ConnectionError> + -> Poll<(), io::Error> where T: AsyncWrite, { // Ensure codec is ready @@ -303,22 +308,17 @@ impl Prioritize Some(frame) => { trace!("writing frame={:?}", frame); - let res = dst.start_send(frame)?; - - // We already verified that `dst` is ready to accept the - // write - assert!(res.is_ready()); + dst.buffer(frame).ok().expect("invalid frame"); // Ensure the codec is ready to try the loop again. try_ready!(dst.poll_ready()); // Because, always try to reclaim... self.reclaim_frame(store, dst); - } None => { // Try to flush the codec. - try_ready!(dst.poll_complete()); + try_ready!(dst.flush()); // This might release a data frame... if !self.reclaim_frame(store, dst) { diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 9d75fbc..40db92d 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -1,10 +1,12 @@ -use {client, server, frame, HeaderMap, ConnectionError}; +use {client, server, frame, proto}; +use frame::Reason; +use codec::{RecvError, UserError}; use proto::*; use super::*; -use error::Reason::*; -use futures::Sink; +use http::HeaderMap; +use std::io; use std::marker::PhantomData; #[derive(Debug)] @@ -41,14 +43,14 @@ pub(super) struct Recv /// Refused StreamId, this represents a frame that must be sent out. refused: Option, - _p: PhantomData<(B)>, + _p: PhantomData, } #[derive(Debug)] pub(super) enum Event { Headers(T), Data(Bytes), - Trailers(::HeaderMap), + Trailers(HeaderMap), } #[derive(Debug, Clone, Copy)] @@ -103,7 +105,7 @@ impl Recv /// /// Returns the stream state if successful. `None` if refused pub fn open(&mut self, id: StreamId) - -> Result, ConnectionError> + -> Result, RecvError> { assert!(self.refused.is_none()); @@ -123,7 +125,7 @@ impl Recv pub fn recv_headers(&mut self, frame: frame::Headers, stream: &mut store::Ptr) - -> Result<(), ProtoError> + -> Result<(), RecvError> { trace!("opening stream; init_window={}", self.init_window_sz); let is_initial = stream.state.recv_open(frame.is_end_stream())?; @@ -137,7 +139,7 @@ impl Recv self.next_stream_id = frame.stream_id(); self.next_stream_id.increment(); } else { - return Err(ProtoError::Connection(ProtocolError)); + return Err(RecvError::Connection(ProtocolError)); } // TODO: be smarter about this logic @@ -184,13 +186,13 @@ impl Recv pub fn recv_trailers(&mut self, frame: frame::Headers, stream: &mut store::Ptr) - -> Result<(), ProtoError> + -> Result<(), RecvError> { // Transition the state stream.state.recv_close()?; if stream.ensure_content_length_zero().is_err() { - return Err(ProtoError::Stream { + return Err(RecvError::Stream { id: stream.id, reason: ProtocolError, }); @@ -205,11 +207,12 @@ impl Recv Ok(()) } + /// Releases capacity back to the connection pub fn release_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr, task: &mut Option) - -> Result<(), ConnectionError> + -> Result<(), UserError> { if capacity > stream.in_flight_recv_data { // TODO: Handle error @@ -246,7 +249,7 @@ impl Recv pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) - -> Result<(), ProtoError> + -> Result<(), RecvError> { let sz = frame.payload().len(); @@ -259,7 +262,7 @@ impl Recv if !stream.state.is_recv_streaming() { // Receiving a DATA frame when not expecting one is a protocol // error. - return Err(ProtoError::Connection(ProtocolError)); + return Err(RecvError::Connection(ProtocolError)); } trace!("recv_data; size={}; connection={}; stream={}", @@ -268,7 +271,7 @@ impl Recv // Ensure that there is enough capacity on the connection before acting // on the stream. if self.flow.window_size() < sz || stream.recv_flow.window_size() < sz { - return Err(ProtoError::Connection(FlowControlError)); + return Err(RecvError::Connection(FlowControlError)); } // Update connection level flow control @@ -281,7 +284,7 @@ impl Recv stream.in_flight_recv_data += sz; if stream.dec_content_length(frame.payload().len()).is_err() { - return Err(ProtoError::Stream { + return Err(RecvError::Stream { id: stream.id, reason: ProtocolError, }); @@ -289,14 +292,14 @@ impl Recv if frame.is_end_stream() { if stream.ensure_content_length_zero().is_err() { - return Err(ProtoError::Stream { + return Err(RecvError::Stream { id: stream.id, reason: ProtocolError, }); } if stream.state.recv_close().is_err() { - return Err(ProtoError::Connection(ProtocolError)); + return Err(RecvError::Connection(ProtocolError)); } } @@ -314,13 +317,14 @@ impl Recv send: &Send, stream: store::Key, store: &mut Store) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { // First, make sure that the values are legit self.ensure_can_reserve(frame.promised_id())?; // Make sure that the stream state is valid - store[stream].state.ensure_recv_open()?; + store[stream].state.ensure_recv_open() + .map_err(|e| e.into_connection_recv_error())?; // TODO: Streams in the reserved states do not count towards the concurrency // limit. However, it seems like there should be a cap otherwise this @@ -361,18 +365,19 @@ impl Recv Ok(()) } - pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), ConnectionError> { + /// Ensures that `id` is not in the `Idle` state. + pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { if id >= self.next_stream_id { - return Err(ProtocolError.into()); + return Err(ProtocolError); } Ok(()) } pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { - let err = ConnectionError::Proto(frame.reason()); + let err = proto::Error::Proto(frame.reason()); // Notify the stream stream.state.recv_err(&err); @@ -381,7 +386,7 @@ impl Recv } /// Handle a received error - pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream) { + pub fn recv_err(&mut self, err: &proto::Error, stream: &mut Stream) { // Receive an error stream.state.recv_err(err); @@ -415,17 +420,17 @@ impl Recv /// Returns true if the remote peer can initiate a stream with the given ID. fn ensure_can_open(&self, id: StreamId) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { if !P::is_server() { // Remote is a server and cannot open streams. PushPromise is // registered by reserving, so does not go through this path. - return Err(ProtocolError.into()); + return Err(RecvError::Connection(ProtocolError)); } // Ensure that the ID is a valid server initiated ID if !id.is_client_initiated() { - return Err(ProtocolError.into()); + return Err(RecvError::Connection(ProtocolError)); } Ok(()) @@ -433,16 +438,16 @@ impl Recv /// Returns true if the remote peer can reserve a stream with the given ID. fn ensure_can_reserve(&self, promised_id: StreamId) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { // TODO: Are there other rules? if P::is_server() { // The remote is a client and cannot reserve - return Err(ProtocolError.into()); + return Err(RecvError::Connection(ProtocolError)); } if !promised_id.is_server_initiated() { - return Err(ProtocolError.into()); + return Err(RecvError::Connection(ProtocolError)); } Ok(()) @@ -450,31 +455,28 @@ impl Recv /// Send any pending refusals. pub fn send_pending_refusal(&mut self, dst: &mut Codec>) - -> Poll<(), ConnectionError> + -> Poll<(), io::Error> where T: AsyncWrite, { - if let Some(stream_id) = self.refused.take() { + if let Some(stream_id) = self.refused { + try_ready!(dst.poll_ready()); + + // Create the RST_STREAM frame let frame = frame::Reset::new(stream_id, RefusedStream); - match dst.start_send(frame.into())? { - AsyncSink::Ready => { - self.reset(stream_id, RefusedStream); - return Ok(Async::Ready(())); - } - AsyncSink::NotReady(_) => { - self.refused = Some(stream_id); - return Ok(Async::NotReady); - } - } + // Buffer the frame + dst.buffer(frame.into()).ok().expect("invalid RST_STREAM frame"); } + self.refused = None; + Ok(Async::Ready(())) } pub fn poll_complete(&mut self, store: &mut Store, dst: &mut Codec>) - -> Poll<(), ConnectionError> + -> Poll<(), io::Error> where T: AsyncWrite, { // Send any pending connection level window updates @@ -488,7 +490,7 @@ impl Recv /// Send connection level window update fn send_connection_window_update(&mut self, dst: &mut Codec>) - -> Poll<(), ConnectionError> + -> Poll<(), io::Error> where T: AsyncWrite, { let incr = self.flow.unclaimed_capacity(); @@ -496,11 +498,14 @@ impl Recv if incr > 0 { let frame = frame::WindowUpdate::new(StreamId::zero(), incr); - if dst.start_send(frame.into())?.is_ready() { - self.flow.inc_window(incr).ok().expect("unexpected flow control state"); - } else { - return Ok(Async::NotReady); - } + // Ensure the codec has capacity + try_ready!(dst.poll_ready()); + + // Buffer the WINDOW_UPDATE frame + dst.buffer(frame.into()).ok().expect("invalid WINDOW_UPDATE frame"); + + // Update flow control + self.flow.inc_window(incr).ok().expect("unexpected flow control state"); } Ok(().into()) @@ -511,7 +516,7 @@ impl Recv pub fn send_stream_window_updates(&mut self, store: &mut Store, dst: &mut Codec>) - -> Poll<(), ConnectionError> + -> Poll<(), io::Error> where T: AsyncWrite, { loop { @@ -534,10 +539,11 @@ impl Recv let incr = stream.recv_flow.unclaimed_capacity(); if incr > 0 { + // Create the WINDOW_UPDATE frame let frame = frame::WindowUpdate::new(stream.id, incr); - let res = dst.start_send(frame.into())?; - assert!(res.is_ready()); + // Buffer it + dst.buffer(frame.into()).ok().expect("invalid WINDOW_UPDATE frame"); } } } @@ -548,8 +554,9 @@ impl Recv } pub fn poll_data(&mut self, stream: &mut Stream) - -> Poll, ConnectionError> + -> Poll, proto::Error> { + // TODO: Return error when the stream is reset match stream.pending_recv.pop_front(&mut self.buffer) { Some(Event::Data(payload)) => { Ok(Some(payload).into()) @@ -575,7 +582,7 @@ impl Recv } pub fn poll_trailers(&mut self, stream: &mut Stream) - -> Poll, ConnectionError> + -> Poll, proto::Error> { match stream.pending_recv.pop_front(&mut self.buffer) { Some(Event::Trailers(trailers)) => { @@ -599,10 +606,6 @@ impl Recv } } } - - fn reset(&mut self, _stream_id: StreamId, _reason: Reason) { - unimplemented!(); - } } impl Recv @@ -610,15 +613,10 @@ impl Recv { /// TODO: Should this fn return `Result`? pub fn take_request(&mut self, stream: &mut store::Ptr) - -> Result, ConnectionError> + -> Request<()> { match stream.pending_recv.pop_front(&mut self.buffer) { - Some(Event::Headers(request)) => Ok(request), - /* - // TODO: This error should probably be caught on receipt of the - // frame vs. now. - Ok(server::Peer::convert_poll_message(frame)?) - */ + Some(Event::Headers(request)) => request, _ => panic!(), } } @@ -628,7 +626,7 @@ impl Recv where B: Buf, { pub fn poll_response(&mut self, stream: &mut store::Ptr) - -> Poll, ConnectionError> { + -> Poll, proto::Error> { // If the buffer is not empty, then the first frame must be a HEADERS // frame or the user violated the contract. match stream.pending_recv.pop_front(&mut self.buffer) { diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 725cff9..b49d09e 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -1,11 +1,14 @@ -use {frame, ConnectionError}; +use client; +use frame::{self, Reason}; +use codec::{RecvError, UserError}; +use codec::UserError::*; use proto::*; use super::*; -use error::User::*; - use bytes::Buf; +use std::io; + /// Manages state transitions related to outbound frames. #[derive(Debug)] pub(super) struct Send @@ -53,24 +56,11 @@ where B: Buf, self.init_window_sz } - pub fn poll_open_ready(&mut self) -> Poll<(), ConnectionError> { - try!(self.ensure_can_open()); - - if let Some(max) = self.max_streams { - if max <= self.num_streams { - self.blocked_open = Some(task::current()); - return Ok(Async::NotReady); - } - } - - return Ok(Async::Ready(())); - } - /// Update state reflecting a new, locally opened stream /// /// Returns the stream state if successful. `None` if refused pub fn open(&mut self) - -> Result + -> Result { try!(self.ensure_can_open()); @@ -93,7 +83,7 @@ where B: Buf, frame: frame::Headers, stream: &mut store::Ptr, task: &mut Option) - -> Result<(), ConnectionError> + -> Result<(), UserError> { trace!("send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz); // Update the state @@ -145,7 +135,7 @@ where B: Buf, frame: frame::Data, stream: &mut store::Ptr, task: &mut Option) - -> Result<(), ConnectionError> + -> Result<(), UserError> { self.prioritize.send_data(frame, stream, task) } @@ -154,14 +144,14 @@ where B: Buf, frame: frame::Headers, stream: &mut store::Ptr, task: &mut Option) - -> Result<(), ConnectionError> + -> Result<(), UserError> { // TODO: Should this logic be moved into state.rs? if !stream.state.is_send_streaming() { return Err(UnexpectedFrameType.into()); } - stream.state.send_close()?; + stream.state.send_close(); trace!("send_trailers -- queuing; frame={:?}", frame); self.prioritize.queue_frame(frame.into(), stream, task); @@ -172,7 +162,7 @@ where B: Buf, pub fn poll_complete(&mut self, store: &mut Store, dst: &mut Codec>) - -> Poll<(), ConnectionError> + -> Poll<(), io::Error> where T: AsyncWrite, { self.prioritize.poll_complete(store, dst) @@ -184,7 +174,7 @@ where B: Buf, } pub fn poll_capacity(&mut self, stream: &mut store::Ptr) - -> Poll, ConnectionError> + -> Poll, UserError> { if !stream.state.is_send_streaming() { return Ok(Async::Ready(None)); @@ -214,7 +204,7 @@ where B: Buf, pub fn recv_connection_window_update(&mut self, frame: frame::WindowUpdate, store: &mut Store) - -> Result<(), ConnectionError> + -> Result<(), Reason> { self.prioritize.recv_connection_window_update(frame.size_increment(), store) } @@ -223,11 +213,13 @@ where B: Buf, sz: WindowSize, stream: &mut store::Ptr, task: &mut Option) - -> Result<(), ConnectionError> + -> Result<(), Reason> { if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { debug!("recv_stream_window_update !!; err={:?}", e); self.send_reset(FlowControlError.into(), stream, task); + + return Err(e); } Ok(()) @@ -237,7 +229,7 @@ where B: Buf, settings: &frame::Settings, store: &mut Store, task: &mut Option) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { if let Some(val) = settings.max_concurrent_streams() { self.max_streams = Some(val as usize); @@ -283,13 +275,14 @@ where B: Buf, // TODO: Should this notify the producer? - Ok(()) + Ok::<_, RecvError>(()) })?; } else if val > old_val { let inc = val - old_val; store.for_each(|mut stream| { self.recv_stream_window_update(inc, &mut stream, task) + .map_err(RecvError::Connection) })?; } } @@ -297,9 +290,9 @@ where B: Buf, Ok(()) } - pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), ConnectionError> { + pub fn ensure_not_idle(&self, id: StreamId) -> Result<(), Reason> { if id >= self.next_stream_id { - return Err(ProtocolError.into()); + return Err(ProtocolError); } Ok(()) @@ -316,10 +309,10 @@ where B: Buf, } /// Returns true if the local actor can initiate a stream with the given ID. - fn ensure_can_open(&self) -> Result<(), ConnectionError> { + fn ensure_can_open(&self) -> Result<(), UserError> { if P::is_server() { // Servers cannot open streams. PushPromise must first be reserved. - return Err(UnexpectedFrameType.into()); + return Err(UnexpectedFrameType); } // TODO: Handle StreamId overflow @@ -327,3 +320,18 @@ where B: Buf, Ok(()) } } + +impl Send +where B: Buf, +{ + pub fn poll_open_ready(&mut self) -> Async<()> { + if let Some(max) = self.max_streams { + if max <= self.num_streams { + self.blocked_open = Some(task::current()); + return Async::NotReady; + } + } + + return Async::Ready(()); + } +} diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index f28c603..ce31f55 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -1,8 +1,8 @@ -use ConnectionError; -use proto::ProtoError; -use error::Reason; -use error::Reason::*; -use error::User::*; +use frame::Reason; +use frame::Reason::*; +use codec::{RecvError, UserError}; +use codec::UserError::*; +use proto; use self::Inner::*; use self::Peer::*; @@ -82,7 +82,7 @@ enum Cause { impl State { /// Opens the send-half of a stream if it is not already open. - pub fn send_open(&mut self, eos: bool) -> Result<(), ConnectionError> { + pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> { let local = Peer::Streaming; self.inner = match self.inner { @@ -115,7 +115,7 @@ impl State { } _ => { // All other transitions result in a protocol error - return Err(UnexpectedFrameType.into()); + return Err(UnexpectedFrameType); } }; @@ -126,7 +126,7 @@ impl State { /// frame is received. /// /// Returns true if this transitions the state to Open - pub fn recv_open(&mut self, eos: bool) -> Result { + pub fn recv_open(&mut self, eos: bool) -> Result { let remote = Peer::Streaming; let mut initial = false; @@ -174,7 +174,7 @@ impl State { } _ => { // All other transitions result in a protocol error - return Err(ProtoError::Connection(ProtocolError)); + return Err(RecvError::Connection(ProtocolError)); } }; @@ -182,18 +182,18 @@ impl State { } /// Transition from Idle -> ReservedRemote - pub fn reserve_remote(&mut self) -> Result<(), ConnectionError> { + pub fn reserve_remote(&mut self) -> Result<(), RecvError> { match self.inner { Idle => { self.inner = ReservedRemote; Ok(()) } - _ => Err(ProtocolError.into()), + _ => Err(RecvError::Connection(ProtocolError)), } } /// Indicates that the remote side will not send more data to the local. - pub fn recv_close(&mut self) -> Result<(), ProtoError> { + pub fn recv_close(&mut self) -> Result<(), RecvError> { match self.inner { Open { local, .. } => { // The remote side will continue to receive data. @@ -206,39 +206,38 @@ impl State { self.inner = Closed(None); Ok(()) } - _ => Err(ProtoError::Connection(ProtocolError)), + _ => Err(RecvError::Connection(ProtocolError)), } } - pub fn recv_err(&mut self, err: &ConnectionError) { + pub fn recv_err(&mut self, err: &proto::Error) { + use proto::Error::*; + match self.inner { Closed(..) => {} _ => { trace!("recv_err; err={:?}", err); self.inner = Closed(match *err { - ConnectionError::Proto(reason) => Some(Cause::Proto(reason)), - ConnectionError::Io(..) => Some(Cause::Io), - ref e => panic!("cannot terminate stream with user error; err={:?}", e), + Proto(reason) => Some(Cause::Proto(reason)), + Io(..) => Some(Cause::Io), }); } } } /// Indicates that the local side will not send more data to the local. - pub fn send_close(&mut self) -> Result<(), ConnectionError> { + pub fn send_close(&mut self) { match self.inner { Open { remote, .. } => { // The remote side will continue to receive data. trace!("send_close: Open => HalfClosedLocal({:?})", remote); self.inner = HalfClosedLocal(remote); - Ok(()) } HalfClosedRemote(..) => { trace!("send_close: HalfClosedRemote => Closed"); self.inner = Closed(None); - Ok(()) } - _ => Err(ProtocolError.into()), + _ => panic!("transition send_close on unexpected state"), } } @@ -307,16 +306,16 @@ impl State { } } - pub fn ensure_recv_open(&self) -> Result<(), ConnectionError> { + pub fn ensure_recv_open(&self) -> Result<(), proto::Error> { use std::io; // TODO: Is this correct? match self.inner { Closed(Some(Cause::Proto(reason))) => { - Err(ConnectionError::Proto(reason)) + Err(proto::Error::Proto(reason)) } Closed(Some(Cause::Io)) => { - Err(ConnectionError::Io(io::ErrorKind::BrokenPipe.into())) + Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())) } _ => Ok(()), } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 8280214..1eb3e56 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -127,8 +127,8 @@ impl Store } } - pub fn for_each(&mut self, mut f: F) -> Result<(), ConnectionError> - where F: FnMut(Ptr) -> Result<(), ConnectionError>, + pub fn for_each(&mut self, mut f: F) -> Result<(), E> + where F: FnMut(Ptr) -> Result<(), E>, { for &key in self.ids.values() { f(Ptr { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index f810913..db7d524 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,8 +1,13 @@ -use {client, server, HeaderMap}; +use {client, server, proto}; +use frame::Reason; +use codec::{SendError, RecvError, UserError}; use proto::*; use super::*; use super::store::Resolve; +use http::HeaderMap; + +use std::io; use std::sync::{Arc, Mutex}; #[derive(Debug)] @@ -66,7 +71,7 @@ impl Streams /// Process inbound headers pub fn recv_headers(&mut self, frame: frame::Headers) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { let id = frame.stream_id(); let mut me = self.inner.lock().unwrap(); @@ -97,7 +102,7 @@ impl Streams } else { if !frame.is_end_stream() { // TODO: Is this the right error - return Err(ProtocolError.into()); + return Err(RecvError::Connection(ProtocolError)); } actions.recv.recv_trailers(frame, stream) @@ -105,20 +110,18 @@ impl Streams // TODO: extract this match res { - Ok(()) => Ok(()), - Err(ProtoError::Connection(reason)) => Err(reason.into()), - Err(ProtoError::Stream { reason, .. }) => { + Err(RecvError::Stream { reason, .. }) => { // Reset the stream. actions.send.send_reset(reason, stream, &mut actions.task); Ok(()) } - Err(ProtoError::Io(_)) => unreachable!(), + res => res, } }) } pub fn recv_data(&mut self, frame: frame::Data) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -127,25 +130,23 @@ impl Streams let stream = match me.store.find_mut(&id) { Some(stream) => stream, - None => return Err(ProtocolError.into()), + None => return Err(RecvError::Connection(ProtocolError)), }; me.actions.transition(stream, |actions, stream| { match actions.recv.recv_data(frame, stream) { - Ok(()) => Ok(()), - Err(ProtoError::Connection(reason)) => Err(reason.into()), - Err(ProtoError::Stream { reason, .. }) => { + Err(RecvError::Stream { reason, .. }) => { // Reset the stream. actions.send.send_reset(reason, stream, &mut actions.task); Ok(()) } - Err(ProtoError::Io(_)) => unreachable!(), + res => res, } }) } pub fn recv_reset(&mut self, frame: frame::Reset) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -153,14 +154,16 @@ impl Streams let id = frame.stream_id(); if id.is_zero() { - return Err(ProtocolError.into()); + return Err(RecvError::Connection(ProtocolError)); } let stream = match me.store.find_mut(&id) { Some(stream) => stream, None => { // TODO: Are there other error cases? - me.actions.ensure_not_idle(id)?; + me.actions.ensure_not_idle(id) + .map_err(RecvError::Connection)?; + return Ok(()); } }; @@ -173,7 +176,7 @@ impl Streams } /// Handle a received error and return the ID of the last processed stream. - pub fn recv_err(&mut self, err: &ConnectionError) -> StreamId { + pub fn recv_err(&mut self, err: &proto::Error) -> StreamId { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -182,14 +185,14 @@ impl Streams me.store.for_each(|mut stream| { actions.recv.recv_err(err, &mut *stream); - Ok(()) + Ok::<_, ()>(()) }).ok().expect("unexpected error processing error"); last_processed_id } pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { let id = frame.stream_id(); let mut me = self.inner.lock().unwrap(); @@ -197,15 +200,22 @@ impl Streams if id.is_zero() { me.actions.send.recv_connection_window_update( - frame, &mut me.store)?; + frame, &mut me.store) + .map_err(RecvError::Connection)?; } else { // The remote may send window updates for streams that the local now // considers closed. It's ok... if let Some(mut stream) = me.store.find_mut(&id) { - me.actions.send.recv_stream_window_update( - frame.size_increment(), &mut stream, &mut me.actions.task)?; + // This result is ignored as there is nothing to do when there + // is an error. The stream is reset by the function on error and + // the error is informational. + let _ = me.actions.send.recv_stream_window_update( + frame.size_increment(), + &mut stream, + &mut me.actions.task); } else { - me.actions.recv.ensure_not_idle(id)?; + me.actions.recv.ensure_not_idle(id) + .map_err(RecvError::Connection)?; } } @@ -213,7 +223,7 @@ impl Streams } pub fn recv_push_promise(&mut self, frame: frame::PushPromise) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -222,7 +232,7 @@ impl Streams let stream = match me.store.find_mut(&id) { Some(stream) => stream.key(), - None => return Err(ProtocolError.into()), + None => return Err(RecvError::Connection(ProtocolError)), }; me.actions.recv.recv_push_promise( @@ -246,7 +256,7 @@ impl Streams } pub fn send_pending_refusal(&mut self, dst: &mut Codec>) - -> Poll<(), ConnectionError> + -> Poll<(), io::Error> where T: AsyncWrite, { let mut me = self.inner.lock().unwrap(); @@ -255,7 +265,7 @@ impl Streams } pub fn poll_complete(&mut self, dst: &mut Codec>) - -> Poll<(), ConnectionError> + -> Poll<(), io::Error> where T: AsyncWrite, { let mut me = self.inner.lock().unwrap(); @@ -277,7 +287,7 @@ impl Streams } pub fn apply_remote_settings(&mut self, frame: &frame::Settings) - -> Result<(), ConnectionError> + -> Result<(), RecvError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -286,15 +296,8 @@ impl Streams frame, &mut me.store, &mut me.actions.task) } - pub fn poll_send_request_ready(&mut self) -> Poll<(), ConnectionError> { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - me.actions.send.poll_open_ready() - } - pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool) - -> Result, ConnectionError> + -> Result, SendError> { use http::method; use super::stream::ContentLength; @@ -370,6 +373,17 @@ impl Streams } } +impl Streams + where B: Buf, +{ + pub fn poll_send_request_ready(&mut self) -> Async<()> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + + me.actions.send.poll_open_ready() + } +} + // ===== impl StreamRef ===== impl StreamRef @@ -377,7 +391,7 @@ impl StreamRef P: Peer, { pub fn send_data(&mut self, data: B, end_of_stream: bool) - -> Result<(), ConnectionError> + -> Result<(), UserError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -393,7 +407,8 @@ impl StreamRef }) } - pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), ConnectionError> + pub fn send_trailers(&mut self, trailers: HeaderMap) + -> Result<(), UserError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -420,7 +435,7 @@ impl StreamRef } pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool) - -> Result<(), ConnectionError> + -> Result<(), UserError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -444,7 +459,7 @@ impl StreamRef me.actions.recv.body_is_empty(&stream) } - pub fn poll_data(&mut self) -> Poll, ConnectionError> { + pub fn poll_data(&mut self) -> Poll, proto::Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -453,7 +468,7 @@ impl StreamRef me.actions.recv.poll_data(&mut stream) } - pub fn poll_trailers(&mut self) -> Poll, ConnectionError> { + pub fn poll_trailers(&mut self) -> Poll, proto::Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -465,7 +480,7 @@ impl StreamRef /// Releases recv capacity back to the peer. This will result in sending /// WINDOW_UPDATE frames on both the stream and connection. pub fn release_capacity(&mut self, capacity: WindowSize) - -> Result<(), ConnectionError> + -> Result<(), UserError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -497,7 +512,7 @@ impl StreamRef } /// Request to be notified when the stream's capacity increases - pub fn poll_capacity(&mut self) -> Poll, ConnectionError> { + pub fn poll_capacity(&mut self) -> Poll, UserError> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -517,7 +532,7 @@ impl StreamRef /// # Panics /// /// This function panics if the request isn't present. - pub fn take_request(&self) -> Result, ConnectionError> { + pub fn take_request(&self) -> Request<()> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -529,7 +544,7 @@ impl StreamRef impl StreamRef where B: Buf, { - pub fn poll_response(&mut self) -> Poll, ConnectionError> { + pub fn poll_response(&mut self) -> Poll, proto::Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; @@ -557,7 +572,7 @@ impl Actions P: Peer, { fn ensure_not_idle(&mut self, id: StreamId) - -> Result<(), ConnectionError> + -> Result<(), Reason> { if self.is_local_init(id) { self.send.ensure_not_idle(id) diff --git a/src/server.rs b/src/server.rs index 9afc40c..93a4598 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,20 +1,19 @@ -use {HeaderMap, ConnectionError}; -use frame::{self, StreamId}; -use proto::{self, Connection, WindowSize, ProtoError}; -use error::Reason; -use error::Reason::*; +use frame::{self, StreamId, Reason}; +use frame::Reason::*; +use codec::{Codec, RecvError}; +use proto::{self, Connection, WindowSize}; -use http::{Request, Response}; -use futures::{self, Future, Sink, Poll, Async, AsyncSink, IntoFuture}; +use http::{Request, Response, HeaderMap}; +use futures::{self, Future, Poll, Async}; use tokio_io::{AsyncRead, AsyncWrite}; -use bytes::{Bytes, IntoBuf}; +use bytes::{Bytes, Buf, IntoBuf}; use std::fmt; /// In progress H2 connection binding pub struct Handshake { // TODO: unbox - inner: Box, Error = ConnectionError>>, + inner: Box, Error = ::Error>>, } /// Marker type indicating a client peer @@ -43,13 +42,13 @@ pub struct Send { } /// Flush a Sink -struct Flush { - inner: Option, +struct Flush { + codec: Option>, } /// Read the client connection preface -struct ReadPreface { - inner: Option, +struct ReadPreface { + codec: Option>, pos: usize, } @@ -77,25 +76,21 @@ impl Server /// Returns a future which resolves to the connection value once the H2 /// handshake has been completed. pub fn handshake2(io: T) -> Handshake { - let mut framed_write = proto::framed_write(io); + // Create the codec + let mut codec = Codec::new(io); + + // Create the initial SETTINGS frame let settings = frame::Settings::default(); // Send initial settings frame - match framed_write.start_send(settings.into()) { - Ok(AsyncSink::Ready) => {} - Ok(_) => unreachable!(), - Err(e) => { - return Handshake { - inner: Box::new(Err(ConnectionError::from(e)).into_future()), - } - } - } + codec.buffer(settings.into()) + .ok().expect("invalid SETTINGS frame"); // Flush pending settings frame and then wait for the client preface - let handshake = Flush::new(framed_write) + let handshake = Flush::new(codec) .and_then(ReadPreface::new) - .map(move |framed_write| { - let connection = proto::from_framed_write(framed_write); + .map(move |codec| { + let connection = Connection::new(codec); Server { connection } }) ; @@ -104,8 +99,9 @@ impl Server } /// Returns `Ready` when the underlying connection has closed. - pub fn poll_close(&mut self) -> Poll<(), ConnectionError> { + pub fn poll_close(&mut self) -> Poll<(), ::Error> { self.connection.poll() + .map_err(Into::into) } } @@ -114,9 +110,9 @@ impl futures::Stream for Server B: IntoBuf + 'static, { type Item = (Request>, Stream); - type Error = ConnectionError; + type Error = ::Error; - fn poll(&mut self) -> Poll, ConnectionError> { + fn poll(&mut self) -> Poll, ::Error> { // Always try to advance the internal state. Getting NotReady also is // needed to allow this function to return NotReady. match self.poll_close()? { @@ -130,7 +126,7 @@ impl futures::Stream for Server if let Some(inner) = self.connection.next_incoming() { trace!("received incoming"); - let (head, _) = inner.take_request()?.into_parts(); + let (head, _) = inner.take_request().into_parts(); let body = Body { inner: inner.clone() }; let request = Request::from_parts(head, body); @@ -160,9 +156,10 @@ impl fmt::Debug for Server impl Stream { /// Send a response pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool) - -> Result<(), ConnectionError> + -> Result<(), ::Error> { self.inner.send_response(response, end_of_stream) + .map_err(Into::into) } /// Request capacity to send data @@ -177,23 +174,25 @@ impl Stream { } /// Request to be notified when the stream's capacity increases - pub fn poll_capacity(&mut self) -> Poll, ConnectionError> { + pub fn poll_capacity(&mut self) -> Poll, ::Error> { let res = try_ready!(self.inner.poll_capacity()); Ok(Async::Ready(res.map(|v| v as usize))) } /// Send a single data frame pub fn send_data(&mut self, data: B, end_of_stream: bool) - -> Result<(), ConnectionError> + -> Result<(), ::Error> { self.inner.send_data(data.into_buf(), end_of_stream) + .map_err(Into::into) } /// Send trailers pub fn send_trailers(&mut self, trailers: HeaderMap) - -> Result<(), ConnectionError> + -> Result<(), ::Error> { self.inner.send_trailers(trailers) + .map_err(Into::into) } pub fn send_reset(mut self, reason: Reason) { @@ -204,7 +203,7 @@ impl Stream { impl Stream { /// Send the body pub fn send(self, src: T, end_of_stream: bool,) -> Send - where T: futures::Stream, + where T: futures::Stream, { Send { src: src, @@ -223,34 +222,37 @@ impl Body { self.inner.body_is_empty() } - pub fn release_capacity(&mut self, sz: usize) -> Result<(), ConnectionError> { + pub fn release_capacity(&mut self, sz: usize) -> Result<(), ::Error> { self.inner.release_capacity(sz as proto::WindowSize) + .map_err(Into::into) } /// Poll trailers /// /// This function **must** not be called until `Body::poll` returns `None`. - pub fn poll_trailers(&mut self) -> Poll, ConnectionError> { + pub fn poll_trailers(&mut self) -> Poll, ::Error> { self.inner.poll_trailers() + .map_err(Into::into) } } impl futures::Stream for Body { type Item = Bytes; - type Error = ConnectionError; + type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { self.inner.poll_data() + .map_err(Into::into) } } // ===== impl Send ===== impl Future for Send - where T: futures::Stream, + where T: futures::Stream, { type Item = Stream; - type Error = ConnectionError; + type Error = ::Error; fn poll(&mut self) -> Poll { loop { @@ -299,41 +301,54 @@ impl Future for Send // ===== impl Flush ===== -impl Flush { - fn new(inner: T) -> Self { - Flush { inner: Some(inner) } +impl Flush { + fn new(codec: Codec) -> Self { + Flush { codec: Some(codec) } } } -impl Future for Flush { - type Item = T; - type Error = T::SinkError; +impl Future for Flush + where T: AsyncWrite, + B: Buf, +{ + type Item = Codec; + type Error = ::Error; - fn poll(&mut self) -> Poll { - try_ready!(self.inner.as_mut().unwrap().poll_complete()); - Ok(Async::Ready(self.inner.take().unwrap())) + fn poll(&mut self) -> Poll { + // Flush the codec + try_ready!(self.codec.as_mut().unwrap().flush()); + + // Return the codec + Ok(Async::Ready(self.codec.take().unwrap())) } } -impl ReadPreface { - fn new(inner: T) -> Self { +impl ReadPreface { + fn new(codec: Codec) -> Self { ReadPreface { - inner: Some(inner), + codec: Some(codec), pos: 0, } } + + fn inner_mut(&mut self) -> &mut T { + self.codec.as_mut().unwrap().get_mut() + } } -impl Future for ReadPreface { - type Item = T; - type Error = ConnectionError; +impl Future for ReadPreface + where T: AsyncRead, + B: Buf, +{ + type Item = Codec; + type Error = ::Error; - fn poll(&mut self) -> Poll { + fn poll(&mut self) -> Poll { let mut buf = [0; 24]; let mut rem = PREFACE.len() - self.pos; while rem > 0 { - let n = try_nb!(self.inner.as_mut().unwrap().read(&mut buf[..rem])); + let n = try_nb!(self.inner_mut().read(&mut buf[..rem])); if PREFACE[self.pos..self.pos+n] != buf[..n] { // TODO: Should this just write the GO_AWAY frame directly? @@ -344,7 +359,7 @@ impl Future for ReadPreface { rem -= n; // TODO test } - Ok(Async::Ready(self.inner.take().unwrap())) + Ok(Async::Ready(self.codec.take().unwrap())) } } @@ -352,7 +367,7 @@ impl Future for ReadPreface { impl Future for Handshake { type Item = Server; - type Error = ConnectionError; + type Error = ::Error; fn poll(&mut self) -> Poll { self.inner.poll() @@ -401,7 +416,7 @@ impl proto::Peer for Peer { } fn convert_poll_message(headers: frame::Headers) - -> Result + -> Result { use http::{version, uri}; @@ -412,7 +427,7 @@ impl proto::Peer for Peer { macro_rules! malformed { () => { - return Err(ProtoError::Stream { + return Err(RecvError::Stream { id: stream_id, reason: ProtocolError, }); @@ -429,7 +444,7 @@ impl proto::Peer for Peer { // Specifying :status for a request is a protocol error if pseudo.status.is_some() { - return Err(ProtoError::Connection(ProtocolError)); + return Err(RecvError::Connection(ProtocolError)); } // Convert the URI @@ -464,7 +479,7 @@ impl proto::Peer for Peer { Err(_) => { // TODO: Should there be more specialized handling for different // kinds of errors - return Err(ProtoError::Stream { + return Err(RecvError::Stream { id: stream_id, reason: ProtocolError, }); diff --git a/tests/client_request.rs b/tests/client_request.rs index 414e323..ddb19db 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -53,10 +53,10 @@ fn recv_invalid_server_stream_id() { let stream = h2.request(request, true).unwrap(); // The connection errors - assert_proto_err!(h2.wait().unwrap_err(), ProtocolError); + assert!(h2.wait().is_err()); // The stream errors - assert_proto_err!(stream.wait().unwrap_err(), ProtocolError); + assert!(stream.wait().is_err()); } #[test] diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 2797ae1..b80e61a 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -89,27 +89,3 @@ pub mod frames { pub const SETTINGS: &'static [u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; pub const SETTINGS_ACK: &'static [u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; } - -#[macro_export] -macro_rules! assert_user_err { - ($actual:expr, $err:ident) => {{ - use h2::error::{ConnectionError, User}; - - match $actual { - ConnectionError::User(e) => assert_eq!(e, User::$err), - _ => panic!("unexpected connection error type"), - } - }}; -} - -#[macro_export] -macro_rules! assert_proto_err { - ($actual:expr, $err:ident) => {{ - use h2::error::{ConnectionError, Reason}; - - match $actual { - ConnectionError::Proto(e) => assert_eq!(e, Reason::$err), - _ => panic!("unexpected connection error type"), - } - }}; -} diff --git a/util/h2-codec/Cargo.toml b/util/h2-codec/Cargo.toml new file mode 100644 index 0000000..5f60ddf --- /dev/null +++ b/util/h2-codec/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "h2-codec" +version = "0.1.0" +authors = ["Carl Lerche "] + +[dependencies] +http = { git = "https://github.com/carllerche/http" } +log = "0.3.8" +fnv = "1.0.5" +bytes = "0.4" +string = { git = "https://github.com/carllerche/string" } +byteorder = "1.0" +futures = "0.1" +tokio-io = "0.1.3" + +# tokio-timer = "0.1" +# http = { git = "https://github.com/carllerche/http", branch = "lower-case-header-name-parsing" } +# slab = "0.4.0" diff --git a/util/h2-codec/src/lib.rs b/util/h2-codec/src/lib.rs new file mode 100644 index 0000000..9f7374a --- /dev/null +++ b/util/h2-codec/src/lib.rs @@ -0,0 +1,22 @@ +extern crate http; +extern crate fnv; +extern crate bytes; +extern crate string; +extern crate byteorder; + +extern crate futures; + +#[macro_use] +extern crate tokio_io; + +#[macro_use] +extern crate log; + +#[path = "../../../src/hpack/mod.rs"] +mod hpack; + +#[path = "../../../src/frame/mod.rs"] +mod frame; + +#[path = "../../../src/codec/mod.rs"] +mod codec;