diff --git a/src/error.rs b/src/error.rs index 16c3ca0..91993d6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -22,7 +22,23 @@ pub enum ConnectionError { } #[derive(Debug)] -pub struct Stream(Reason); +pub struct StreamError(Reason); + +impl StreamError { + pub fn new(r: Reason) -> StreamError { + StreamError(r) + } + + pub fn reason(&self) -> Reason { + self.0 + } +} + +impl From for StreamError { + fn from(r: Reason) -> Self { + StreamError(r) + } +} #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum Reason { @@ -65,6 +81,9 @@ pub enum User { /// The connection state is corrupt and the connection should be dropped. Corrupt, + /// The stream state has been reset. + StreamReset, + // TODO: reserve additional variants } @@ -103,6 +122,7 @@ macro_rules! user_desc { InactiveStreamId => concat!($prefix, "inactive stream ID"), UnexpectedFrameType => concat!($prefix, "unexpected frame type"), FlowControlViolation => concat!($prefix, "flow control violation"), + StreamReset => concat!($prefix, "frame sent on reset stream"), Corrupt => concat!($prefix, "connection state corrupt"), } }); diff --git a/src/frame/data.rs b/src/frame/data.rs index 8945595..7e63748 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -4,7 +4,6 @@ use bytes::{BufMut, Bytes, Buf}; #[derive(Debug)] pub struct Data { stream_id: StreamId, - //data_len: FrameSize, data: T, flags: DataFlag, pad_len: Option, @@ -29,7 +28,6 @@ impl Data { }; Ok(Data { stream_id: head.stream_id(), - //data_len: payload.len() as FrameSize, data: payload, flags: flags, pad_len: pad_len, @@ -71,7 +69,6 @@ impl Data { } Data { stream_id, - //data_len: data.remaining() as FrameSize, data, flags, pad_len: None, diff --git a/src/frame/headers.rs b/src/frame/headers.rs index fc3d46c..48aa18b 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -48,6 +48,12 @@ pub struct PushPromise { flags: HeadersFlag, } +impl PushPromise { + pub fn stream_id(&self) -> StreamId { + self.stream_id + } +} + #[derive(Debug)] pub struct Continuation { /// Stream ID of continuation frame diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 24590e2..58a85b3 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -61,7 +61,25 @@ pub enum Frame { PushPromise(PushPromise), Settings(Settings), Ping(Ping), - WindowUpdate(WindowUpdate) + WindowUpdate(WindowUpdate), + Reset(Reset) +} + +impl Frame { + pub fn stream_id(&self) -> StreamId { + use self::Frame::*; + + match self { + &Headers(ref v) => v.stream_id(), + &Data(ref v) => v.stream_id(), + &PushPromise(ref v) => v.stream_id(), + &WindowUpdate(ref v) => v.stream_id(), + &Reset(ref v) => v.stream_id(), + + &Ping(_) | + &Settings(_) => StreamId::zero(), + } + } } /// Errors that can occur during parsing an HTTP/2 frame. diff --git a/src/frame/reset.rs b/src/frame/reset.rs index 2cd7d92..925dd56 100644 --- a/src/frame/reset.rs +++ b/src/frame/reset.rs @@ -1,5 +1,7 @@ -use frame::{Head, Error}; -use super::{StreamId}; +use error::Reason; +use frame::{self, Head, Error, Kind, StreamId}; + +use bytes::{BufMut, BigEndian}; #[derive(Debug)] pub struct Reset { @@ -8,11 +10,15 @@ pub struct Reset { } impl Reset { + pub fn new(stream_id: StreamId, error: Reason) -> Reset { + Reset { + stream_id, + error_code: error.into(), + } + } pub fn load(head: Head, payload: &[u8]) -> Result { if payload.len() != 4 { - // Invalid payload len - // TODO: Handle error - unimplemented!(); + return Err(Error::InvalidPayloadLength); } let error_code = unpack_octets_4!(payload, 0, u32); @@ -22,4 +28,25 @@ impl Reset { error_code: error_code, }) } + + pub fn encode(&self, dst: &mut B) { + trace!("encoding RESET; id={:?} code={}", self.stream_id, self.error_code); + let head = Head::new(Kind::Reset, 0, self.stream_id); + head.encode(4, dst); + dst.put_u32::(self.error_code); + } + + pub fn stream_id(&self) -> StreamId { + self.stream_id + } + + pub fn reason(&self) -> Reason { + self.error_code.into() + } +} + +impl From for frame::Frame { + fn from(src: Reset) -> Self { + frame::Frame::Reset(src) + } } diff --git a/src/frame/window_update.rs b/src/frame/window_update.rs index b3443dd..1c4293d 100644 --- a/src/frame/window_update.rs +++ b/src/frame/window_update.rs @@ -1,8 +1,8 @@ use StreamId; -use byteorder::NetworkEndian; -use bytes::{BufMut}; use frame::{self, Head, Kind, Error}; +use bytes::{BufMut, BigEndian}; + const SIZE_INCREMENT_MASK: u32 = 1 << 31; #[derive(Copy, Clone, Debug)] @@ -48,7 +48,7 @@ impl WindowUpdate { trace!("encoding WINDOW_UPDATE; id={:?}", self.stream_id); let head = Head::new(Kind::Ping, 0, self.stream_id); head.encode(4, dst); - dst.put_u32::(self.size_increment); + dst.put_u32::(self.size_increment); } } diff --git a/src/hpack/table.rs b/src/hpack/table.rs index 3819ed1..8382b72 100644 --- a/src/hpack/table.rs +++ b/src/hpack/table.rs @@ -515,7 +515,7 @@ impl Table { } #[cfg(test)] - fn assert_valid_state(&self, msg: &'static str) -> bool { + fn assert_valid_state(&self, _msg: &'static str) -> bool { /* // Checks that the internal map structure is valid // diff --git a/src/lib.rs b/src/lib.rs index 00956cf..f05ccba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,7 @@ pub mod server; mod util; -pub use error::ConnectionError; +pub use error::{ConnectionError, Reason}; pub use frame::{StreamId}; pub use proto::Connection; @@ -63,9 +63,9 @@ pub enum Frame { id: StreamId, promise: (), }, - Error { + Reset { id: StreamId, - error: (), + error: Reason, }, } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 92f09ba..f23312b 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -2,25 +2,24 @@ use {ConnectionError, Frame}; use client::Client; use error; use frame::{self, SettingSet, StreamId}; -use proto::{self, ControlFlow, ControlPing, ControlSettings, Peer, PingPayload, ReadySink, WindowSize}; +use proto::*; use server::Server; use bytes::{Bytes, IntoBuf}; use http::{request, response}; -use futures::*; use tokio_io::{AsyncRead, AsyncWrite}; use std::marker::PhantomData; /// An H2 connection #[derive(Debug)] pub struct Connection { - inner: proto::Transport, + inner: Transport, // Set to `true` as long as the connection is in a valid state. active: bool, _phantom: PhantomData<(P, B)>, } -pub fn new(transport: proto::Transport) +pub fn new(transport: Transport) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, @@ -178,6 +177,11 @@ impl Stream for Connection data: v.into_payload(), }, + Some(Reset(v)) => Frame::Reset { + id: v.stream_id(), + error: v.reason(), + }, + Some(frame) => panic!("unexpected frame; frame={:?}", frame), None => return Ok(Async::Ready(None)), }; @@ -213,6 +217,10 @@ impl Sink for Connection match item { Frame::Headers { id, headers, end_of_stream } => { + if self.inner.stream_is_reset(id) { + return Err(error::User::StreamReset.into()); + } + // This is a one-way conversion. By checking `poll_ready` first (above), // it's already been determined that the inner `Sink` can accept the item. // If the item is rejected, then there is a bug. @@ -222,20 +230,28 @@ impl Sink for Connection Ok(AsyncSink::Ready) } - Frame::Data { id, data, end_of_stream, .. } => { + Frame::Data { id, data, end_of_stream } => { + if self.inner.stream_is_reset(id) { + return Err(error::User::StreamReset.into()); + } + let frame = frame::Data::from_buf(id, data.into_buf(), end_of_stream); let res = try!(self.inner.start_send(frame.into())); assert!(res.is_ready()); Ok(AsyncSink::Ready) } + Frame::Reset { id, error } => { + let f = frame::Reset::new(id, error); + let res = self.inner.start_send(f.into())?; + assert!(res.is_ready()); + Ok(AsyncSink::Ready) + } + /* Frame::Trailers { id, headers } => { unimplemented!(); } - Frame::Body { id, chunk, end_of_stream } => { - unimplemented!(); - } Frame::PushPromise { id, promise } => { unimplemented!(); } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index cb4f20e..913fe71 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -76,26 +76,33 @@ impl FlowControl { /// Proxies access to streams. impl ControlStreams for FlowControl { - #[inline] fn streams(&self) -> &StreamMap { self.inner.streams() } - #[inline] fn streams_mut(&mut self) -> &mut StreamMap { self.inner.streams_mut() } + + fn stream_is_reset(&self, id: StreamId) -> bool { + self.inner.stream_is_reset(id) + } } /// Exposes a public upward API for flow control. impl ControlFlow for FlowControl { fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { - if let Some(mut flow) = self.remote_flow_controller(id) { - if let Some(sz) = flow.apply_window_update() { - return Ok(Async::Ready(sz)); + if self.stream_is_reset(id) { + return Err(error::User::StreamReset.into()); + } + + match self.remote_flow_controller(id) { + None => return Err(error::User::InvalidStreamId.into()), + Some(mut flow) => { + if let Some(sz) = flow.apply_window_update() { + return Ok(Async::Ready(sz)); + } } - } else { - return Err(error::User::InvalidStreamId.into()); } self.blocked_remote_window_update = Some(task::current()); @@ -103,18 +110,26 @@ impl ControlFlow for FlowControl { } fn expand_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - if let Some(mut fc) = self.local_flow_controller(id) { - fc.expand_window(incr); - } else { - return Err(error::User::InvalidStreamId.into()); - } + let added = match self.local_flow_controller(id) { + None => false, + Some(mut fc) => { + fc.expand_window(incr); + true + } + }; - if id.is_zero() { - self.pending_local_connection_window_update = true; + if added { + if id.is_zero() { + self.pending_local_connection_window_update = true; + } else { + self.pending_local_window_updates.push_back(id); + } + Ok(()) + } else if self.stream_is_reset(id) { + Err(error::User::StreamReset.into()) } else { - self.pending_local_window_updates.push_back(id); + Err(error::User::InvalidStreamId.into()) } - Ok(()) } } @@ -145,9 +160,11 @@ impl FlowControl } while let Some(id) = self.pending_local_window_updates.pop_front() { - let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update()); - if let Some(incr) = update { - try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); + if !self.stream_is_reset(id) { + let update = self.local_flow_controller(id).and_then(|s| s.apply_window_update()); + if let Some(incr) = update { + try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); + } } } diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index c6b9615..71b8d69 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -80,12 +80,12 @@ impl FramedRead { frame.into() } + Kind::Reset => { + frame::Reset::load(head, &bytes[frame::HEADER_LEN..])?.into() + } + // TODO - Kind::Reset => { - let _todo = try!(frame::Reset::load(head, &bytes[frame::HEADER_LEN..])); - unimplemented!(); - } Kind::GoAway => { let _todo = try!(frame::GoAway::load(&bytes[frame::HEADER_LEN..])); unimplemented!(); diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index fa0bc4d..6a1ee6f 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -145,6 +145,10 @@ impl Sink for FramedWrite v.encode(self.buf.get_mut()); trace!("encoded window_update; rem={:?}", self.buf.remaining()); } + Frame::Reset(v) => { + v.encode(self.buf.get_mut()); + trace!("encoded reset; rem={:?}", self.buf.remaining()); + } } Ok(AsyncSink::Ready) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index eb1b7f4..c59db0a 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -128,6 +128,7 @@ pub trait ControlFlow { pub trait ControlStreams { fn streams(&self)-> &StreamMap; fn streams_mut(&mut self) -> &mut StreamMap; + fn stream_is_reset(&self, id: StreamId) -> bool; } pub type PingPayload = [u8; 8]; diff --git a/src/proto/settings.rs b/src/proto/settings.rs index a2383c2..d5ae257 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -103,6 +103,10 @@ impl ControlStreams for Settings { fn streams_mut(&mut self) -> &mut StreamMap { self.inner.streams_mut() } + + fn stream_is_reset(&self, id: StreamId) -> bool { + self.inner.stream_is_reset(id) + } } impl ControlFlow for Settings { diff --git a/src/proto/state.rs b/src/proto/state.rs index ffcd7ef..f7afb0a 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -64,6 +64,14 @@ pub enum StreamState { } impl StreamState { + pub fn is_closed(&self) -> bool { + use self::StreamState::*; + + match self { + &Closed => true, + _ => false, + } + } /// Transition the state to represent headers being received. /// /// Returns true if this state transition results in iniitializing the @@ -296,6 +304,10 @@ impl StreamMap { self.inner.entry(id) } + pub fn remove(&mut self, id: &StreamId) -> Option { + self.inner.remove(id) + } + pub fn shrink_all_local_windows(&mut self, decr: u32) { for (_, mut s) in &mut self.inner { if let Some(fc) = s.local_flow_controller() { diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index 20cd3d8..cb0f62d 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -1,16 +1,21 @@ -use ConnectionError; +use {ConnectionError}; use error::Reason; use error::User; use frame::{self, Frame}; use proto::*; +use fnv::FnvHasher; +use ordermap::OrderMap; +use std::hash::BuildHasherDefault; use std::marker::PhantomData; #[derive(Debug)] pub struct StreamTracker { inner: T, peer: PhantomData

, - streams: StreamMap, + active_streams: StreamMap, + // TODO reserved_streams: HashSet + reset_streams: OrderMap>, local_max_concurrency: Option, remote_max_concurrency: Option, initial_local_window_size: WindowSize, @@ -32,7 +37,8 @@ impl StreamTracker StreamTracker { inner, peer: PhantomData, - streams: StreamMap::default(), + active_streams: StreamMap::default(), + reset_streams: OrderMap::default(), local_max_concurrency, remote_max_concurrency, initial_local_window_size, @@ -44,12 +50,16 @@ impl StreamTracker impl ControlStreams for StreamTracker { #[inline] fn streams(&self) -> &StreamMap { - &self.streams + &self.active_streams } #[inline] fn streams_mut(&mut self) -> &mut StreamMap { - &mut self.streams + &mut self.active_streams + } + + fn stream_is_reset(&self, id: StreamId) -> bool { + self.reset_streams.contains_key(&id) } } @@ -109,40 +119,93 @@ impl Stream for StreamTracker fn poll(&mut self) -> Poll, T::Error> { use frame::Frame::*; - match try_ready!(self.inner.poll()) { - Some(Headers(v)) => { - let id = v.stream_id(); - let eos = v.is_end_stream(); + loop { + match try_ready!(self.inner.poll()) { + Some(Headers(v)) => { + let id = v.stream_id(); + let eos = v.is_end_stream(); - let initialized = self.streams - .entry(id) - .or_insert_with(|| StreamState::default()) - .recv_headers::

(eos, self.initial_local_window_size)?; + if self.reset_streams.contains_key(&id) { + continue; + } - if initialized { - // TODO: Ensure available capacity for a new stream - // This won't be as simple as self.streams.len() as closed - // connections should not be factored. + let is_closed = { + let stream = self.active_streams.entry(id) + .or_insert_with(|| StreamState::default()); - if !P::is_valid_remote_stream_id(id) { - return Err(Reason::ProtocolError.into()); + let initialized = + stream.recv_headers::

(eos, self.initial_local_window_size)?; + + if initialized { + // TODO: Ensure available capacity for a new stream + // This won't be as simple as self.streams.len() as closed + // connections should not be factored. + + if !P::is_valid_remote_stream_id(id) { + return Err(Reason::ProtocolError.into()); + } + } + + stream.is_closed() + }; + + if is_closed { + self.active_streams.remove(&id); + self.reset_streams.insert(id, Reason::NoError); + } + + return Ok(Async::Ready(Some(Headers(v)))); + } + + Some(Data(v)) => { + let id = v.stream_id(); + + if self.reset_streams.contains_key(&id) { + continue; + } + + let is_closed = { + let stream = match self.active_streams.get_mut(&id) { + None => return Err(Reason::ProtocolError.into()), + Some(s) => s, + }; + stream.recv_data(v.is_end_stream())?; + stream.is_closed() + }; + + if is_closed { + self.active_streams.remove(&id); + self.reset_streams.insert(id, Reason::NoError); + } + + return Ok(Async::Ready(Some(Data(v)))); + } + + Some(Reset(v)) => { + let id = v.stream_id(); + + // Set or update the reset reason. + self.reset_streams.insert(id, v.reason()); + + if self.active_streams.remove(&id).is_some() { + return Ok(Async::Ready(Some(Reset(v)))); } } - Ok(Async::Ready(Some(Headers(v)))) - } + Some(f) => { + let id = f.stream_id(); - Some(Data(v)) => { - match self.streams.get_mut(&v.stream_id()) { - None => Err(Reason::ProtocolError.into()), - Some(state) => { - state.recv_data(v.is_end_stream())?; - Ok(Async::Ready(Some(Data(v)))) + if self.reset_streams.contains_key(&id) { + continue; } + + return Ok(Async::Ready(Some(f))); + } + + None => { + return Ok(Async::Ready(None)); } } - - f => Ok(Async::Ready(f)), } } } @@ -158,6 +221,9 @@ impl Sink for StreamTracker fn start_send(&mut self, item: T::SinkItem) -> StartSend { use frame::Frame::*; + // Must be enforced through higher levels. + debug_assert!(!self.stream_is_reset(item.stream_id())); + match &item { &Headers(ref v) => { let id = v.stream_id(); @@ -170,34 +236,51 @@ impl Sink for StreamTracker // // ACTUALLY(ver), maybe not? // https://github.com/http2/http2-spec/commit/c83c8d911e6b6226269877e446a5cad8db921784 - let initialized = self.streams - .entry(id) - .or_insert_with(|| StreamState::default()) - .send_headers::

(eos, self.initial_remote_window_size)?; - if initialized { - // TODO: Ensure available capacity for a new stream - // This won't be as simple as self.streams.len() as closed - // connections should not be factored. - if !P::is_valid_local_stream_id(id) { - // TODO: clear state - return Err(User::InvalidStreamId.into()); + let is_closed = { + let stream = self.active_streams.entry(id) + .or_insert_with(|| StreamState::default()); + + let initialized = + stream.send_headers::

(eos, self.initial_remote_window_size)?; + + if initialized { + // TODO: Ensure available capacity for a new stream + // This won't be as simple as self.streams.len() as closed + // connections should not be factored. + if !P::is_valid_local_stream_id(id) { + // TODO: clear state + return Err(User::InvalidStreamId.into()); + } } + + stream.is_closed() + }; + + if is_closed { + self.active_streams.remove(&id); + self.reset_streams.insert(id, Reason::NoError); } } &Data(ref v) => { - match self.streams.get_mut(&v.stream_id()) { + match self.active_streams.get_mut(&v.stream_id()) { None => return Err(User::InactiveStreamId.into()), - Some(state) => state.send_data(v.is_end_stream())?, + Some(stream) => stream.send_data(v.is_end_stream())?, } } + + &Reset(ref v) => { + let id = v.stream_id(); + self.active_streams.remove(&id); + self.reset_streams.insert(id, v.reason()); + } + _ => {} } self.inner.start_send(item) - } fn poll_complete(&mut self) -> Poll<(), T::SinkError> {