diff --git a/src/client.rs b/src/client.rs index 9d98092..69ee050 100644 --- a/src/client.rs +++ b/src/client.rs @@ -12,6 +12,7 @@ use tokio_io::io::WriteAll; use std::fmt; use std::marker::PhantomData; +use std::time::Duration; /// In progress H2 connection binding #[must_use = "futures do nothing unless polled"] @@ -45,6 +46,12 @@ pub struct ResponseFuture { /// Build a Client. #[derive(Clone, Debug)] pub struct Builder { + /// Time to keep locally reset streams around before reaping. + reset_stream_duration: Duration, + + /// Maximum number of locally reset streams to keep at a time. + reset_stream_max: usize, + /// Initial `Settings` frame to send as part of the handshake. settings: Settings, @@ -208,6 +215,26 @@ impl Builder { self } + /// Set the maximum number of concurrent locally reset streams. + /// + /// Locally reset streams are to "ignore frames from the peer for some + /// time". While waiting for that time, locally reset streams "waste" + /// space in order to be able to ignore those frames. This setting + /// can limit how many extra streams are left waiting for "some time". + pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { + self.reset_stream_max = max; + self + } + + /// Set the maximum number of concurrent locally reset streams. + /// + /// Locally reset streams are to "ignore frames from the peer for some + /// time", but that time is unspecified. Set that time with this setting. + pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self { + self.reset_stream_duration = dur; + self + } + /// Enable or disable the server to send push promises. pub fn enable_push(&mut self, enabled: bool) -> &mut Self { self.settings.set_enable_push(enabled); @@ -245,6 +272,8 @@ impl Builder { impl Default for Builder { fn default() -> Builder { Builder { + reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), + reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, settings: Default::default(), stream_id: 1.into(), } @@ -324,8 +353,12 @@ where .buffer(self.builder.settings.clone().into()) .expect("invalid SETTINGS frame"); - let connection = - proto::Connection::new(codec, &self.builder.settings, self.builder.stream_id); + let connection = proto::Connection::new(codec, proto::Config { + next_stream_id: self.builder.stream_id, + reset_stream_duration: self.builder.reset_stream_duration, + reset_stream_max: self.builder.reset_stream_max, + settings: self.builder.settings.clone(), + }); let client = Client { inner: connection.streams().clone(), pending: None, diff --git a/src/frame/go_away.rs b/src/frame/go_away.rs index 1af7acb..f81147e 100644 --- a/src/frame/go_away.rs +++ b/src/frame/go_away.rs @@ -5,14 +5,14 @@ use bytes::{BigEndian, BufMut}; #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct GoAway { last_stream_id: StreamId, - error_code: u32, + error_code: Reason, } impl GoAway { pub fn new(last_stream_id: StreamId, reason: Reason) -> Self { GoAway { last_stream_id, - error_code: reason.into(), + error_code: reason, } } @@ -21,7 +21,7 @@ impl GoAway { } pub fn reason(&self) -> Reason { - self.error_code.into() + self.error_code } pub fn load(payload: &[u8]) -> Result { @@ -34,16 +34,16 @@ impl GoAway { Ok(GoAway { last_stream_id: last_stream_id, - error_code: error_code, + error_code: error_code.into(), }) } pub fn encode(&self, dst: &mut B) { - trace!("encoding GO_AWAY; code={}", self.error_code); + trace!("encoding GO_AWAY; code={:?}", self.error_code); let head = Head::new(Kind::GoAway, 0, StreamId::zero()); head.encode(8, dst); dst.put_u32::(self.last_stream_id.into()); - dst.put_u32::(self.error_code); + dst.put_u32::(self.error_code.into()); } } diff --git a/src/frame/reset.rs b/src/frame/reset.rs index 2023023..be22a1f 100644 --- a/src/frame/reset.rs +++ b/src/frame/reset.rs @@ -5,14 +5,14 @@ use bytes::{BigEndian, BufMut}; #[derive(Debug, Eq, PartialEq)] pub struct Reset { stream_id: StreamId, - error_code: u32, + error_code: Reason, } impl Reset { pub fn new(stream_id: StreamId, error: Reason) -> Reset { Reset { stream_id, - error_code: error.into(), + error_code: error, } } @@ -21,7 +21,7 @@ impl Reset { } pub fn reason(&self) -> Reason { - self.error_code.into() + self.error_code } pub fn load(head: Head, payload: &[u8]) -> Result { @@ -33,19 +33,19 @@ impl Reset { Ok(Reset { stream_id: head.stream_id(), - error_code: error_code, + error_code: error_code.into(), }) } pub fn encode(&self, dst: &mut B) { trace!( - "encoding RESET; id={:?} code={}", + "encoding RESET; id={:?} code={:?}", self.stream_id, self.error_code ); let head = Head::new(Kind::Reset, 0, self.stream_id); head.encode(4, dst); - dst.put_u32::(self.error_code); + dst.put_u32::(self.error_code.into()); } } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 6db375a..08a58f3 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,6 +1,6 @@ use {client, frame, proto, server}; use codec::RecvError; -use frame::Reason; +use frame::{Reason, StreamId}; use frame::DEFAULT_INITIAL_WINDOW_SIZE; use proto::*; @@ -10,6 +10,7 @@ use futures::Stream; use tokio_io::{AsyncRead, AsyncWrite}; use std::marker::PhantomData; +use std::time::Duration; /// An H2 connection #[derive(Debug)] @@ -42,6 +43,14 @@ where _phantom: PhantomData

, } +#[derive(Debug, Clone)] +pub(crate) struct Config { + pub next_stream_id: StreamId, + pub reset_stream_duration: Duration, + pub reset_stream_max: usize, + pub settings: frame::Settings, +} + #[derive(Debug)] enum State { /// Currently open in a sane state @@ -65,18 +74,19 @@ where { pub fn new( codec: Codec>, - settings: &frame::Settings, - next_stream_id: frame::StreamId, + config: Config, ) -> Connection { let streams = Streams::new(streams::Config { - local_init_window_sz: settings + local_init_window_sz: config.settings .initial_window_size() .unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE), local_max_initiated: None, - local_next_stream_id: next_stream_id, - local_push_enabled: settings.is_push_enabled(), + local_next_stream_id: config.next_stream_id, + local_push_enabled: config.settings.is_push_enabled(), + local_reset_duration: config.reset_stream_duration, + local_reset_max: config.reset_stream_max, remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, - remote_max_initiated: settings + remote_max_initiated: config.settings .max_concurrent_streams() .map(|max| max as usize), }); @@ -230,6 +240,11 @@ where fn poll2(&mut self) -> Poll<(), RecvError> { use frame::Frame::*; + // This happens outside of the loop to prevent needing to do a clock + // check and then comparison of the queue possibly multiple times a + // second (and thus, the clock wouldn't have changed enough to matter). + self.clear_expired_reset_streams(); + loop { // First, ensure that the `Connection` is able to receive a frame try_ready!(self.poll_ready()); @@ -284,6 +299,10 @@ where } } } + + fn clear_expired_reset_streams(&mut self) { + self.streams.clear_expired_reset_streams(); + } } impl Connection diff --git a/src/proto/mod.rs b/src/proto/mod.rs index d2611b7..e1aec8d 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -5,7 +5,7 @@ mod ping_pong; mod settings; mod streams; -pub(crate) use self::connection::Connection; +pub(crate) use self::connection::{Config, Connection}; pub(crate) use self::error::Error; pub(crate) use self::peer::{Peer, Dyn as DynPeer}; pub(crate) use self::streams::{Key as StreamKey, StreamRef, OpaqueStreamRef, Streams}; @@ -31,3 +31,5 @@ pub type WindowSize = u32; // Constants pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; +pub const DEFAULT_RESET_STREAM_MAX: usize = 10; +pub const DEFAULT_RESET_STREAM_SECS: u64 = 30; diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index 5f8fcde..6aed6d2 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -19,6 +19,12 @@ pub(super) struct Counts { /// Current number of locally initiated streams num_recv_streams: usize, + + /// Maximum number of pending locally reset streams + max_reset_streams: usize, + + /// Current number of pending locally reset streams + num_reset_streams: usize, } impl Counts { @@ -30,6 +36,8 @@ impl Counts { num_send_streams: 0, max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX), num_recv_streams: 0, + max_reset_streams: config.local_reset_max, + num_reset_streams: 0, } } @@ -72,6 +80,22 @@ impl Counts { self.num_send_streams += 1; } + /// Returns true if the number of pending reset streams can be incremented. + pub fn can_inc_num_reset_streams(&self) -> bool { + self.max_reset_streams > self.num_reset_streams + } + + /// Increments the number of pending reset streams. + /// + /// # Panics + /// + /// Panics on failure as this should have been validated before hand. + pub fn inc_num_reset_streams(&mut self) { + assert!(self.can_inc_num_reset_streams()); + + self.num_reset_streams += 1; + } + pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { if let Some(val) = settings.max_concurrent_streams() { self.max_send_streams = val as usize; @@ -87,19 +111,26 @@ impl Counts { F: FnOnce(&mut Self, &mut store::Ptr) -> U, { let is_counted = stream.is_counted(); + let is_pending_reset = stream.is_pending_reset_expiration(); // Run the action let ret = f(self, &mut stream); - self.transition_after(stream, is_counted); + self.transition_after(stream, is_counted, is_pending_reset); ret } // TODO: move this to macro? - pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool) { + pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool, is_reset_counted: bool) { if stream.is_closed() { - stream.unlink(); + if !stream.is_pending_reset_expiration() { + stream.unlink(); + + if is_reset_counted { + self.dec_num_reset_streams(); + } + } if is_counted { // Decrement the number of active streams. @@ -115,9 +146,16 @@ impl Counts { fn dec_num_streams(&mut self, id: StreamId) { if self.peer.is_local_init(id) { + assert!(self.num_send_streams > 0); self.num_send_streams -= 1; } else { + assert!(self.num_recv_streams > 0); self.num_recv_streams -= 1; } } + + fn dec_num_reset_streams(&mut self) { + assert!(self.num_reset_streams > 0); + self.num_reset_streams -= 1; + } } diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index db33f6d..13ce2eb 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -20,12 +20,13 @@ use self::prioritize::Prioritize; use self::recv::Recv; use self::send::Send; use self::state::State; -use self::store::{Entry, Store}; +use self::store::Store; use self::stream::Stream; use frame::{StreamId, StreamIdOverflow}; use proto::*; +use std::time::Duration; use bytes::Bytes; use http::{Request, Response}; @@ -43,6 +44,12 @@ pub struct Config { /// If the local peer is willing to receive push promises pub local_push_enabled: bool, + /// How long a locally reset stream should ignore frames + pub local_reset_duration: Duration, + + /// Maximum number of locally reset streams to keep at a time + pub local_reset_max: usize, + /// Initial window size of remote initiated streams pub remote_init_window_sz: WindowSize, diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index a310de1..10ecff7 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -529,7 +529,13 @@ impl Prioritize { Some(mut stream) => { trace!("pop_frame; stream={:?}", stream.id); + // It's possible that this stream, besides having data to send, + // is also queued to send a reset, and thus is already in the queue + // to wait for "some time" after a reset. + // + // To be safe, we just always ask the stream. let is_counted = stream.is_counted(); + let is_pending_reset = stream.is_pending_reset_expiration(); let frame = match stream.pending_send.pop_front(buffer) { Some(Frame::Data(mut frame)) => { @@ -651,7 +657,7 @@ impl Prioritize { self.pending_send.push(&mut stream); } - counts.transition_after(stream, is_counted); + counts.transition_after(stream, is_counted, is_pending_reset); return Some(frame); }, diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 1fde2b9..9f5cd6b 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -2,11 +2,11 @@ use super::*; use {frame, proto}; use codec::{RecvError, UserError}; use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE}; -use proto::*; use http::HeaderMap; use std::io; +use std::time::{Duration, Instant}; #[derive(Debug)] pub(super) struct Recv { @@ -31,6 +31,12 @@ pub(super) struct Recv { /// New streams to be accepted pending_accept: store::Queue, + /// Locally reset streams that should be reaped when they expire + pending_reset_expired: store::Queue, + + /// How long locally reset streams should ignore received frames + reset_duration: Duration, + /// Holds frames that are waiting to be read buffer: Buffer, @@ -74,6 +80,8 @@ impl Recv { pending_window_updates: store::Queue::new(), last_processed_id: StreamId::zero(), pending_accept: store::Queue::new(), + pending_reset_expired: store::Queue::new(), + reset_duration: config.local_reset_duration, buffer: Buffer::new(), refused: None, is_push_enabled: config.local_push_enabled, @@ -237,7 +245,28 @@ impl Recv { Ok(()) } - /// Releases capacity back to the connection + /// Releases capacity of the connection + fn release_connection_capacity( + &mut self, + capacity: WindowSize, + task: &mut Option, + ) { + trace!("release_connection_capacity; size={}", capacity); + + // Decrement in-flight data + self.in_flight_data -= capacity; + + // Assign capacity to connection + self.flow.assign_capacity(capacity); + + if self.flow.unclaimed_capacity().is_some() { + if let Some(task) = task.take() { + task.notify(); + } + } + } + + /// Releases capacity back to the connection & stream pub fn release_capacity( &mut self, capacity: WindowSize, @@ -250,19 +279,14 @@ impl Recv { return Err(UserError::ReleaseCapacityTooBig); } + self.release_connection_capacity(capacity, task); + // Decrement in-flight data stream.in_flight_recv_data -= capacity; - self.in_flight_data -= capacity; - // Assign capacity to connection & stream - self.flow.assign_capacity(capacity); + // Assign capacity to stream stream.recv_flow.assign_capacity(capacity); - if self.flow.unclaimed_capacity().is_some() { - if let Some(task) = task.take() { - task.notify(); - } - } if stream.recv_flow.unclaimed_capacity().is_some() { // Queue the stream for sending the WINDOW_UPDATE frame. @@ -353,8 +377,12 @@ impl Recv { let sz = sz as WindowSize; - if !stream.state.is_recv_streaming() { - trace!("stream is not in receiving state; state={:?}", stream.state); + let is_ignoring_frame = stream.state.is_local_reset(); + + if !is_ignoring_frame && !stream.state.is_recv_streaming() { + // TODO: There are cases where this can be a stream error of + // STREAM_CLOSED instead... + // Receiving a DATA frame when not expecting one is a protocol // error. return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); @@ -369,19 +397,46 @@ impl Recv { // Ensure that there is enough capacity on the connection before acting // on the stream. - if self.flow.window_size() < sz || stream.recv_flow.window_size() < sz { - return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR)); + self.consume_connection_window(sz)?; + + if is_ignoring_frame { + trace!( + "recv_data frame ignored on locally reset {:?} for some time", + stream.id, + ); + // we just checked for enough connection window capacity, and + // consumed it. Since we are ignoring this frame "for some time", + // we aren't returning the frame to the user. That means they + // have no way to release the capacity back to the connection. So + // we have to release it automatically. + // + // This call doesn't send a WINDOW_UPDATE immediately, just marks + // the capacity as available to be reclaimed. When the available + // capacity meets a threshold, a WINDOW_UPDATE is then sent. + self.release_connection_capacity(sz, &mut None); + return Ok(()); } - // Update connection level flow control - self.flow.send_data(sz); + if stream.recv_flow.window_size() < sz { + // http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE + // > A receiver MAY respond with a stream error (Section 5.4.2) or + // > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if + // > it is unable to accept a frame. + // + // So, for violating the **stream** window, we can send either a + // stream or connection error. We've opted to send a stream + // error. + return Err(RecvError::Stream { + id: stream.id, + reason: Reason::FLOW_CONTROL_ERROR, + }); + } // Update stream level flow control stream.recv_flow.send_data(sz); // Track the data as in-flight stream.in_flight_recv_data += sz; - self.in_flight_data += sz; if stream.dec_content_length(frame.payload().len()).is_err() { trace!("content-length overflow"); @@ -415,6 +470,19 @@ impl Recv { Ok(()) } + pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> { + if self.flow.window_size() < sz { + return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR)); + } + + // Update connection level flow control + self.flow.send_data(sz); + + // Track the data as in-flight + self.in_flight_data += sz; + Ok(()) + } + pub fn recv_push_promise( &mut self, frame: frame::PushPromise, @@ -480,15 +548,14 @@ impl Recv { Ok(()) } + /// Handle remote sending an explicit RST_STREAM. pub fn recv_reset( &mut self, frame: frame::Reset, stream: &mut Stream, ) -> Result<(), RecvError> { - let err = proto::Error::Proto(frame.reason()); - // Notify the stream - stream.state.recv_err(&err); + stream.state.recv_reset(frame.reason()); stream.notify_recv(); Ok(()) } @@ -536,6 +603,38 @@ impl Recv { Ok(()) } + /// Add a locally reset stream to queue to be eventually reaped. + pub fn enqueue_reset_expiration( + &mut self, + stream: &mut store::Ptr, + counts: &mut Counts, + ) { + assert!(stream.state.is_local_reset()); + + if stream.is_pending_reset_expiration() { + return; + } + + if !counts.can_inc_num_reset_streams() { + // try to evict 1 stream if possible + // if max allow is 0, this won't be able to evict, + // and then we'll just bail after + if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) { + // It's possible that this stream is still sitting in a send queue, + // such as if some data is to be sent and then a CANCEL. In this case, + // it could still be "counted", so we just make sure to always ask the + // stream instead of assuming. + let is_counted = evicted.is_counted(); + counts.transition_after(evicted, is_counted, true); + } + } + + if counts.can_inc_num_reset_streams() { + counts.inc_num_reset_streams(); + self.pending_reset_expired.push(stream); + } + } + /// Send any pending refusals. pub fn send_pending_refusal( &mut self, @@ -562,6 +661,18 @@ impl Recv { Ok(Async::Ready(())) } + pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) { + let now = Instant::now(); + let reset_duration = self.reset_duration; + while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| { + let reset_at = stream.reset_at.expect("reset_at must be set if in queue"); + now - reset_at > reset_duration + }) { + let is_counted = stream.is_counted(); + counts.transition_after(stream, is_counted, true); + } + } + pub fn poll_complete( &mut self, store: &mut Store, diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index a7af550..218954e 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -3,7 +3,6 @@ use super::*; use codec::{RecvError, UserError}; use codec::UserError::*; use frame::{self, Reason}; -use proto::*; use bytes::Buf; diff --git a/src/proto/streams/state.rs b/src/proto/streams/state.rs index 1c4a28e..3fdda30 100644 --- a/src/proto/streams/state.rs +++ b/src/proto/streams/state.rs @@ -60,8 +60,7 @@ enum Inner { Open { local: Peer, remote: Peer }, HalfClosedLocal(Peer), // TODO: explicitly name this value HalfClosedRemote(Peer), - // When reset, a reason is provided - Closed(Option), + Closed(Cause), } #[derive(Debug, Copy, Clone)] @@ -72,7 +71,9 @@ enum Peer { #[derive(Debug, Copy, Clone)] enum Cause { + EndStream, Proto(Reason), + LocallyReset(Reason), Io, /// The user droped all handles to the stream without explicitly canceling. @@ -84,7 +85,7 @@ enum Cause { impl State { /// Opens the send-half of a stream if it is not already open. pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> { - let local = Peer::Streaming; + let local = Streaming; self.inner = match self.inner { Idle => if eos { @@ -107,7 +108,7 @@ impl State { } }, HalfClosedRemote(AwaitingHeaders) => if eos { - Closed(None) + Closed(Cause::EndStream) } else { HalfClosedRemote(local) }, @@ -124,7 +125,7 @@ impl State { /// /// Returns true if this transitions the state to Open. pub fn recv_open(&mut self, eos: bool) -> Result { - let remote = Peer::Streaming; + let remote = Streaming; let mut initial = false; self.inner = match self.inner { @@ -144,7 +145,7 @@ impl State { initial = true; if eos { - Closed(None) + Closed(Cause::EndStream) } else { Open { local: AwaitingHeaders, @@ -164,7 +165,7 @@ impl State { } }, HalfClosedLocal(AwaitingHeaders) => if eos { - Closed(None) + Closed(Cause::EndStream) } else { HalfClosedLocal(remote) }, @@ -201,13 +202,25 @@ impl State { }, HalfClosedLocal(..) => { trace!("recv_close: HalfClosedLocal => Closed"); - self.inner = Closed(None); + self.inner = Closed(Cause::EndStream); Ok(()) }, _ => Err(RecvError::Connection(Reason::PROTOCOL_ERROR)), } } + /// The remote explicitly sent a RST_STREAM. + pub fn recv_reset(&mut self, reason: Reason) { + match self.inner { + Closed(..) => {}, + _ => { + trace!("recv_reset; reason={:?}", reason); + self.inner = Closed(Cause::Proto(reason)); + }, + } + } + + /// We noticed a protocol error. pub fn recv_err(&mut self, err: &proto::Error) { use proto::Error::*; @@ -216,8 +229,8 @@ impl State { _ => { trace!("recv_err; err={:?}", err); self.inner = Closed(match *err { - Proto(reason) => Some(Cause::Proto(reason)), - Io(..) => Some(Cause::Io), + Proto(reason) => Cause::LocallyReset(reason), + Io(..) => Cause::Io, }); }, } @@ -228,7 +241,7 @@ impl State { Closed(..) => {}, s => { trace!("recv_eof; state={:?}", s); - self.inner = Closed(Some(Cause::Io)); + self.inner = Closed(Cause::Io); } } } @@ -245,28 +258,34 @@ impl State { }, HalfClosedRemote(..) => { trace!("send_close: HalfClosedRemote => Closed"); - self.inner = Closed(None); + self.inner = Closed(Cause::EndStream); }, _ => panic!("transition send_close on unexpected state"), } } - /// Set the stream state to reset + /// Set the stream state to reset locally. pub fn set_reset(&mut self, reason: Reason) { - self.inner = Closed(Some(Cause::Proto(reason))); + self.inner = Closed(Cause::LocallyReset(reason)); } /// Set the stream state to canceled pub fn set_canceled(&mut self) { debug_assert!(!self.is_closed()); - self.inner = Closed(Some(Cause::Canceled)); + self.inner = Closed(Cause::Canceled); } pub fn is_canceled(&self) -> bool { - use self::Cause::Canceled; - match self.inner { - Closed(Some(Canceled)) => true, + Closed(Cause::Canceled) => true, + _ => false, + } + } + + pub fn is_local_reset(&self) -> bool { + match self.inner { + Closed(Cause::LocallyReset(_)) => true, + Closed(Cause::Canceled) => true, _ => false, } } @@ -274,7 +293,8 @@ impl State { /// Returns true if the stream is already reset. pub fn is_reset(&self) -> bool { match self.inner { - Closed(Some(_)) => true, + Closed(Cause::EndStream) => false, + Closed(_) => true, _ => false, } } @@ -294,10 +314,10 @@ impl State { pub fn is_send_streaming(&self) -> bool { match self.inner { Open { - local: Peer::Streaming, + local: Streaming, .. } => true, - HalfClosedRemote(Peer::Streaming) => true, + HalfClosedRemote(Streaming) => true, _ => false, } } @@ -319,10 +339,10 @@ impl State { pub fn is_recv_streaming(&self) -> bool { match self.inner { Open { - remote: Peer::Streaming, + remote: Streaming, .. } => true, - HalfClosedLocal(Peer::Streaming) => true, + HalfClosedLocal(Streaming) => true, _ => false, } } @@ -353,10 +373,12 @@ impl State { // TODO: Is this correct? match self.inner { - Closed(Some(Cause::Proto(reason))) => Err(proto::Error::Proto(reason)), - Closed(Some(Cause::Canceled)) => Err(proto::Error::Proto(Reason::CANCEL)), - Closed(Some(Cause::Io)) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), - Closed(None) | HalfClosedRemote(..) => Ok(false), + Closed(Cause::Proto(reason)) | + Closed(Cause::LocallyReset(reason)) => Err(proto::Error::Proto(reason)), + Closed(Cause::Canceled) => Err(proto::Error::Proto(Reason::CANCEL)), + Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())), + Closed(Cause::EndStream) | + HalfClosedRemote(..) => Ok(false), _ => Ok(true), } } @@ -372,6 +394,6 @@ impl Default for State { impl Default for Peer { fn default() -> Self { - Peer::AwaitingHeaders + AwaitingHeaders } } diff --git a/src/proto/streams/store.rs b/src/proto/streams/store.rs index 0cad36a..dc8834f 100644 --- a/src/proto/streams/store.rs +++ b/src/proto/streams/store.rs @@ -289,6 +289,21 @@ where None } + + pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option> + where + R: Resolve, + F: Fn(&Stream) -> bool, + { + if let Some(idxs) = self.indices { + let should_pop = f(&store.resolve(idxs.head)); + if should_pop { + return self.pop(store); + } + } + + None + } } // ===== impl Ptr ===== @@ -299,6 +314,10 @@ impl<'a> Ptr<'a> { self.key } + pub fn store_mut(&mut self) -> &mut Store { + &mut self.store + } + /// Remove the stream from the store pub fn remove(self) -> StreamId { // The stream must have been unlinked before this point diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 4a57f21..e37c65b 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -1,5 +1,6 @@ use super::*; +use std::time::Instant; use std::usize; /// Tracks Stream related state @@ -84,6 +85,12 @@ pub(super) struct Stream { /// True if the stream is waiting to send a window update pub is_pending_window_update: bool, + /// The time when this stream may have been locally reset. + pub reset_at: Option, + + /// Next node in list of reset streams that should expire eventually + pub next_reset_expire: Option, + /// Frames pending for this stream to read pub pending_recv: buffer::Deque, @@ -120,6 +127,9 @@ pub(super) struct NextWindowUpdate; #[derive(Debug)] pub(super) struct NextOpen; +#[derive(Debug)] +pub(super) struct NextResetExpire; + impl Stream { pub fn new( id: StreamId, @@ -167,6 +177,8 @@ impl Stream { in_flight_recv_data: 0, next_window_update: None, is_pending_window_update: false, + reset_at: None, + next_reset_expire: None, pending_recv: buffer::Deque::new(), recv_task: None, pending_push_promises: store::Queue::new(), @@ -192,6 +204,12 @@ impl Stream { !self.is_pending_open && self.state.is_at_least_half_open() } + /// Returns true if stream is currently being held for some time because of + /// a local reset. + pub fn is_pending_reset_expiration(&self) -> bool { + self.reset_at.is_some() + } + /// Returns true if the stream is closed pub fn is_closed(&self) -> bool { // The state has fully transitioned to closed. @@ -215,7 +233,8 @@ impl Stream { self.ref_count == 0 && // The stream is not in any queue !self.is_pending_send && !self.is_pending_send_capacity && - !self.is_pending_accept && !self.is_pending_window_update + !self.is_pending_accept && !self.is_pending_window_update && + !self.reset_at.is_some() } /// Returns true when the consumer of the stream has dropped all handles @@ -391,6 +410,32 @@ impl store::Next for NextOpen { } } +impl store::Next for NextResetExpire { + fn next(stream: &Stream) -> Option { + stream.next_reset_expire + } + + fn set_next(stream: &mut Stream, key: Option) { + stream.next_reset_expire = key; + } + + fn take_next(stream: &mut Stream) -> Option { + stream.next_reset_expire.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.reset_at.is_some() + } + + fn set_queued(stream: &mut Stream, val: bool) { + if val { + stream.reset_at = Some(Instant::now()); + } else { + stream.reset_at = None; + } + } +} + // ===== impl ContentLength ===== impl ContentLength { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 9ec3f0a..9acab33 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -1,11 +1,14 @@ -use super::*; -use super::store::Resolve; +use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId}; +use super::store::{self, Entry, Resolve, Store}; use {client, proto, server}; -use codec::{RecvError, SendError, UserError}; -use frame::Reason; -use proto::*; +use codec::{Codec, RecvError, SendError, UserError}; +use frame::{self, Frame, Reason}; +use proto::{peer, Peer, WindowSize}; -use http::HeaderMap; +use bytes::{Buf, Bytes}; +use futures::{task, Async, Poll}; +use http::{HeaderMap, Request, Response}; +use tokio_io::AsyncWrite; use std::{fmt, io}; use std::sync::{Arc, Mutex}; @@ -174,7 +177,10 @@ where let stream = match me.store.find_mut(&id) { Some(stream) => stream, - None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)), + None => { + trace!("recv_data; stream not found: {:?}", id); + return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)); + }, }; let actions = &mut me.actions; @@ -364,8 +370,10 @@ where match me.actions.recv.next_incoming(&mut me.store) { Some(key) => { + let mut stream = me.store.resolve(key); + trace!("next_incoming; id={:?}, state={:?}", stream.id, stream.state); // Increment the ref count - me.store.resolve(key).ref_inc(); + stream.ref_inc(); // Return the key Some(key) @@ -397,6 +405,12 @@ where me.actions.recv.send_pending_refusal(dst) } + pub fn clear_expired_reset_streams(&mut self) { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; + me.actions.recv.clear_expired_reset_streams(&mut me.store, &mut me.counts); + } + pub fn poll_complete(&mut self, dst: &mut Codec>) -> Poll<(), io::Error> where T: AsyncWrite, @@ -547,9 +561,10 @@ where let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - me.counts.transition(stream, |_, stream| { + me.counts.transition(stream, |counts, stream| { actions.send.send_reset( - reason, send_buffer, stream, &mut actions.task) + reason, send_buffer, stream, &mut actions.task); + actions.recv.enqueue_reset_expiration(stream, counts) }) } } @@ -876,11 +891,12 @@ fn drop_stream_ref(inner: &Mutex, key: store::Key) { let actions = &mut me.actions; - me.counts.transition(stream, |_, mut stream| { + me.counts.transition(stream, |counts, mut stream| { if stream.is_canceled_interest() { actions.send.schedule_cancel( &mut stream, &mut actions.task); + actions.recv.enqueue_reset_expiration(stream, counts); } }); } diff --git a/src/server.rs b/src/server.rs index abdbd15..101c023 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,19 +3,20 @@ use {SendStream, RecvStream, ReleaseCapacity}; use codec::{Codec, RecvError}; use frame::{self, Reason, Settings, StreamId}; -use proto::{self, Connection, Prioritized}; +use proto::{self, Config, Connection, Prioritized}; use bytes::{Buf, Bytes, IntoBuf}; use futures::{self, Async, Future, Poll}; use http::{Request, Response}; use tokio_io::{AsyncRead, AsyncWrite}; use std::{convert, fmt, mem}; +use std::time::Duration; /// In progress H2 connection binding #[must_use = "futures do nothing unless polled"] pub struct Handshake { - /// SETTINGS frame that will be sent once the connection is established. - settings: Settings, + /// The config to pass to Connection::new after handshake succeeds. + builder: Builder, /// The current state of the handshake. state: Handshaking } @@ -27,8 +28,15 @@ pub struct Server { } /// Build a Server -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct Builder { + /// Time to keep locally reset streams around before reaping. + reset_stream_duration: Duration, + + /// Maximum number of locally reset streams to keep at a time. + reset_stream_max: usize, + + /// Initial `Settings` frame to send as part of the handshake. settings: Settings, } @@ -95,23 +103,23 @@ where B: IntoBuf, B::Buf: 'static, { - fn handshake2(io: T, settings: Settings) -> Handshake { + fn handshake2(io: T, builder: Builder) -> Handshake { // Create the codec. let mut codec = Codec::new(io); - if let Some(max) = settings.max_frame_size() { + if let Some(max) = builder.settings.max_frame_size() { codec.set_max_recv_frame_size(max as usize); } // Send initial settings frame. codec - .buffer(settings.clone().into()) + .buffer(builder.settings.clone().into()) .expect("invalid SETTINGS frame"); // Create the handshake future. let state = Handshaking::from(codec); - Handshake { settings, state } + Handshake { builder, state } } /// Sets the target window size for the whole connection. @@ -204,6 +212,26 @@ impl Builder { self } + /// Set the maximum number of concurrent locally reset streams. + /// + /// Locally reset streams are to "ignore frames from the peer for some + /// time". While waiting for that time, locally reset streams "waste" + /// space in order to be able to ignore those frames. This setting + /// can limit how many extra streams are left waiting for "some time". + pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { + self.reset_stream_max = max; + self + } + + /// Set the maximum number of concurrent locally reset streams. + /// + /// Locally reset streams are to "ignore frames from the peer for some + /// time", but that time is unspecified. Set that time with this setting. + pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self { + self.reset_stream_duration = dur; + self + } + /// Bind an H2 server connection. /// /// Returns a future which resolves to the connection value once the H2 @@ -214,7 +242,17 @@ impl Builder { B: IntoBuf, B::Buf: 'static, { - Server::handshake2(io, self.settings.clone()) + Server::handshake2(io, self.clone()) + } +} + +impl Default for Builder { + fn default() -> Builder { + Builder { + reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), + reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, + settings: Settings::default(), + } } } @@ -357,8 +395,12 @@ impl Future for Handshake unreachable!("Handshake::poll() state was not advanced completely!") }; let server = poll?.map(|codec| { - let connection = - Connection::new(codec, &self.settings, 2.into()); + let connection = Connection::new(codec, Config { + next_stream_id: 2.into(), + reset_stream_duration: self.builder.reset_stream_duration, + reset_stream_max: self.builder.reset_stream_max, + settings: self.builder.settings.clone(), + }); trace!("Handshake::poll(); connection established!"); Server { connection } }); diff --git a/tests/flow_control.rs b/tests/flow_control.rs index 44d7004..2c4b112 100644 --- a/tests/flow_control.rs +++ b/tests/flow_control.rs @@ -276,17 +276,14 @@ fn recv_data_overflows_stream_window() { .send_frame(frames::data(1, vec![0u8; 16_384])) // this frame overflows the window! .send_frame(frames::data(1, &[0; 16][..]).eos()) - // expecting goaway for the conn - // TODO: change to a RST_STREAM eventually - .recv_frame(frames::go_away(0).flow_control()) - // close the connection - .map(drop); + .recv_frame(frames::reset(1).flow_control()) + .close(); let h2 = Client::builder() .initial_window_size(16_384) .handshake::<_, Bytes>(io) .unwrap() - .and_then(|(mut client, h2)| { + .and_then(|(mut client, conn)| { let request = Request::builder() .method(Method::GET) .uri("https://http2.akamai.com/") @@ -310,15 +307,6 @@ fn recv_data_overflows_stream_window() { }) }); - // client should see a flow control error - let conn = h2.then(|res| { - let err = res.unwrap_err(); - assert_eq!( - err.to_string(), - "protocol error: flow-control protocol violated" - ); - Ok::<(), ()>(()) - }); conn.unwrap().join(req) }); h2.join(mock).wait().unwrap(); diff --git a/tests/stream_states.rs b/tests/stream_states.rs index f18e1af..4832226 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -465,6 +465,209 @@ fn skipped_stream_ids_are_implicitly_closed() { h2.join(srv).wait().expect("wait"); } + +#[test] +fn send_rst_stream_allows_recv_frames() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .send_frame(frames::headers(1).response(200)) + .recv_frame(frames::reset(1).cancel()) + // sending frames after canceled! + // note: sending 2 to cosume 50% of connection window + .send_frame(frames::data(1, vec![0; 16_384])) + .send_frame(frames::data(1, vec![0; 16_384]).eos()) + // make sure we automatically free the connection window + .recv_frame(frames::window_update(0, 16_384 * 2)) + // do a pingpong to ensure no other frames were sent + .ping_pong([1; 8]) + .close(); + + let client = Client::handshake(io) + .expect("handshake") + .and_then(|(mut client, conn)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://example.com/") + .body(()) + .unwrap(); + + let req = client.send_request(request, true) + .unwrap() + .0.expect("response") + .and_then(|resp| { + assert_eq!(resp.status(), StatusCode::OK); + // drop resp will send a reset + Ok(()) + }); + + conn.expect("client") + .drive(req) + .and_then(|(conn, _)| conn) + }); + + + client.join(srv).wait().expect("wait"); +} + +#[test] +fn rst_stream_expires() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .send_frame(frames::headers(1).response(200)) + .send_frame(frames::data(1, vec![0; 16_384])) + .recv_frame(frames::reset(1).cancel()) + // wait till after the configured duration + .idle_ms(15) + .ping_pong([1; 8]) + // sending frame after canceled! + .send_frame(frames::data(1, vec![0; 16_384]).eos()) + .recv_frame(frames::go_away(0).protocol_error()) + .close(); + + let client = Client::builder() + .reset_stream_duration(Duration::from_millis(10)) + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(|(mut client, conn)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://example.com/") + .body(()) + .unwrap(); + + let req = client.send_request(request, true) + .unwrap() + .0.expect("response") + .and_then(|resp| { + assert_eq!(resp.status(), StatusCode::OK); + // drop resp will send a reset + Ok(()) + }) + .map_err(|()| -> Error { + unreachable!() + }); + + conn.drive(req) + .and_then(|(conn, _)| conn.expect_err("client")) + .map(|err| { + assert_eq!( + err.to_string(), + "protocol error: unspecific protocol error detected" + ); + }) + }); + + + client.join(srv).wait().expect("wait"); +} + +#[test] +fn rst_stream_max() { + let _ = ::env_logger::init(); + let (io, srv) = mock::new(); + + let srv = srv.assert_client_handshake() + .unwrap() + .recv_settings() + .recv_frame( + frames::headers(1) + .request("GET", "https://example.com/") + .eos(), + ) + .recv_frame( + frames::headers(3) + .request("GET", "https://example.com/") + .eos(), + ) + .send_frame(frames::headers(1).response(200)) + .send_frame(frames::data(1, vec![0; 16])) + .send_frame(frames::headers(3).response(200)) + .send_frame(frames::data(3, vec![0; 16])) + .recv_frame(frames::reset(1).cancel()) + .recv_frame(frames::reset(3).cancel()) + // sending frame after canceled! + // newer streams trump older streams + // 3 is still being ignored + .send_frame(frames::data(3, vec![0; 16]).eos()) + // ping pong to be sure of no goaway + .ping_pong([1; 8]) + // 1 has been evicted, will get a goaway + .send_frame(frames::data(1, vec![0; 16]).eos()) + .recv_frame(frames::go_away(0).protocol_error()) + .close(); + + let client = Client::builder() + .max_concurrent_reset_streams(1) + .handshake::<_, Bytes>(io) + .expect("handshake") + .and_then(|(mut client, conn)| { + let request = Request::builder() + .method(Method::GET) + .uri("https://example.com/") + .body(()) + .unwrap(); + + let req1 = client.send_request(request, true) + .unwrap() + .0.expect("response1") + .and_then(|resp| { + assert_eq!(resp.status(), StatusCode::OK); + // drop resp will send a reset + Ok(()) + }) + .map_err(|()| -> Error { + unreachable!() + }); + + let request = Request::builder() + .method(Method::GET) + .uri("https://example.com/") + .body(()) + .unwrap(); + + let req2 = client.send_request(request, true) + .unwrap() + .0.expect("response2") + .and_then(|resp| { + assert_eq!(resp.status(), StatusCode::OK); + // drop resp will send a reset + Ok(()) + }) + .map_err(|()| -> Error { + unreachable!() + }); + + conn.drive(req1.join(req2)) + .and_then(|(conn, _)| conn.expect_err("client")) + .map(|err| { + assert_eq!( + err.to_string(), + "protocol error: unspecific protocol error detected" + ); + }) + }); + + + client.join(srv).wait().expect("wait"); +} /* #[test] fn send_data_after_headers_eos() { diff --git a/tests/support/future_ext.rs b/tests/support/future_ext.rs index 9575bc1..2419ead 100644 --- a/tests/support/future_ext.rs +++ b/tests/support/future_ext.rs @@ -109,11 +109,11 @@ where type Error = (); fn poll(&mut self) -> Poll { - let poll = - self.inner.poll() - .map_err(Async::Ready) - .unwrap_err(); - Ok(poll) + match self.inner.poll() { + Ok(Async::Ready(v)) => panic!("Future::unwrap_err() on an Ok value: {:?}", v), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => Ok(Async::Ready(e)), + } } } @@ -159,11 +159,11 @@ where type Error = (); fn poll(&mut self) -> Poll { - let poll = - self.inner.poll() - .map_err(Async::Ready) - .expect_err(&self.msg); - Ok(poll) + match self.inner.poll() { + Ok(Async::Ready(v)) => panic!("{}: {:?}", self.msg, v), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => Ok(Async::Ready(e)), + } } } diff --git a/tests/support/mock.rs b/tests/support/mock.rs index 296e18f..3614359 100644 --- a/tests/support/mock.rs +++ b/tests/support/mock.rs @@ -1,4 +1,4 @@ -use {FutureExt, SendFrame}; +use {frames, FutureExt, SendFrame}; use h2::{self, RecvError, SendError}; use h2::frame::{self, Frame}; @@ -441,6 +441,15 @@ pub trait HandleFutureExt { } } + fn ping_pong(self, payload: [u8; 8]) -> RecvFrame< as IntoRecvFrame>::Future> + where + Self: Future + Sized + 'static, + Self::Error: fmt::Debug, + { + self.send_frame(frames::ping(payload)) + .recv_frame(frames::ping(payload).pong()) + } + fn idle_ms(self, ms: usize) -> Box> where Self: Sized + 'static,