ignore received frames on a stream locally reset for some time (#174)

- Adds config duration for how long to ignore frames on a reset stream
- Adds config for how many reset streams can be held at a time
This commit is contained in:
Sean McArthur
2017-12-18 11:09:38 -08:00
committed by GitHub
parent edaeaa8941
commit 1ea9a8fc7e
19 changed files with 684 additions and 125 deletions

View File

@@ -19,6 +19,12 @@ pub(super) struct Counts {
/// Current number of locally initiated streams
num_recv_streams: usize,
/// Maximum number of pending locally reset streams
max_reset_streams: usize,
/// Current number of pending locally reset streams
num_reset_streams: usize,
}
impl Counts {
@@ -30,6 +36,8 @@ impl Counts {
num_send_streams: 0,
max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
num_recv_streams: 0,
max_reset_streams: config.local_reset_max,
num_reset_streams: 0,
}
}
@@ -72,6 +80,22 @@ impl Counts {
self.num_send_streams += 1;
}
/// Returns true if the number of pending reset streams can be incremented.
pub fn can_inc_num_reset_streams(&self) -> bool {
self.max_reset_streams > self.num_reset_streams
}
/// Increments the number of pending reset streams.
///
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub fn inc_num_reset_streams(&mut self) {
assert!(self.can_inc_num_reset_streams());
self.num_reset_streams += 1;
}
pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
if let Some(val) = settings.max_concurrent_streams() {
self.max_send_streams = val as usize;
@@ -87,19 +111,26 @@ impl Counts {
F: FnOnce(&mut Self, &mut store::Ptr) -> U,
{
let is_counted = stream.is_counted();
let is_pending_reset = stream.is_pending_reset_expiration();
// Run the action
let ret = f(self, &mut stream);
self.transition_after(stream, is_counted);
self.transition_after(stream, is_counted, is_pending_reset);
ret
}
// TODO: move this to macro?
pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool) {
pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool, is_reset_counted: bool) {
if stream.is_closed() {
stream.unlink();
if !stream.is_pending_reset_expiration() {
stream.unlink();
if is_reset_counted {
self.dec_num_reset_streams();
}
}
if is_counted {
// Decrement the number of active streams.
@@ -115,9 +146,16 @@ impl Counts {
fn dec_num_streams(&mut self, id: StreamId) {
if self.peer.is_local_init(id) {
assert!(self.num_send_streams > 0);
self.num_send_streams -= 1;
} else {
assert!(self.num_recv_streams > 0);
self.num_recv_streams -= 1;
}
}
fn dec_num_reset_streams(&mut self) {
assert!(self.num_reset_streams > 0);
self.num_reset_streams -= 1;
}
}

View File

@@ -20,12 +20,13 @@ use self::prioritize::Prioritize;
use self::recv::Recv;
use self::send::Send;
use self::state::State;
use self::store::{Entry, Store};
use self::store::Store;
use self::stream::Stream;
use frame::{StreamId, StreamIdOverflow};
use proto::*;
use std::time::Duration;
use bytes::Bytes;
use http::{Request, Response};
@@ -43,6 +44,12 @@ pub struct Config {
/// If the local peer is willing to receive push promises
pub local_push_enabled: bool,
/// How long a locally reset stream should ignore frames
pub local_reset_duration: Duration,
/// Maximum number of locally reset streams to keep at a time
pub local_reset_max: usize,
/// Initial window size of remote initiated streams
pub remote_init_window_sz: WindowSize,

View File

@@ -529,7 +529,13 @@ impl Prioritize {
Some(mut stream) => {
trace!("pop_frame; stream={:?}", stream.id);
// It's possible that this stream, besides having data to send,
// is also queued to send a reset, and thus is already in the queue
// to wait for "some time" after a reset.
//
// To be safe, we just always ask the stream.
let is_counted = stream.is_counted();
let is_pending_reset = stream.is_pending_reset_expiration();
let frame = match stream.pending_send.pop_front(buffer) {
Some(Frame::Data(mut frame)) => {
@@ -651,7 +657,7 @@ impl Prioritize {
self.pending_send.push(&mut stream);
}
counts.transition_after(stream, is_counted);
counts.transition_after(stream, is_counted, is_pending_reset);
return Some(frame);
},

View File

@@ -2,11 +2,11 @@ use super::*;
use {frame, proto};
use codec::{RecvError, UserError};
use frame::{Reason, DEFAULT_INITIAL_WINDOW_SIZE};
use proto::*;
use http::HeaderMap;
use std::io;
use std::time::{Duration, Instant};
#[derive(Debug)]
pub(super) struct Recv {
@@ -31,6 +31,12 @@ pub(super) struct Recv {
/// New streams to be accepted
pending_accept: store::Queue<stream::NextAccept>,
/// Locally reset streams that should be reaped when they expire
pending_reset_expired: store::Queue<stream::NextResetExpire>,
/// How long locally reset streams should ignore received frames
reset_duration: Duration,
/// Holds frames that are waiting to be read
buffer: Buffer<Event>,
@@ -74,6 +80,8 @@ impl Recv {
pending_window_updates: store::Queue::new(),
last_processed_id: StreamId::zero(),
pending_accept: store::Queue::new(),
pending_reset_expired: store::Queue::new(),
reset_duration: config.local_reset_duration,
buffer: Buffer::new(),
refused: None,
is_push_enabled: config.local_push_enabled,
@@ -237,7 +245,28 @@ impl Recv {
Ok(())
}
/// Releases capacity back to the connection
/// Releases capacity of the connection
fn release_connection_capacity(
&mut self,
capacity: WindowSize,
task: &mut Option<Task>,
) {
trace!("release_connection_capacity; size={}", capacity);
// Decrement in-flight data
self.in_flight_data -= capacity;
// Assign capacity to connection
self.flow.assign_capacity(capacity);
if self.flow.unclaimed_capacity().is_some() {
if let Some(task) = task.take() {
task.notify();
}
}
}
/// Releases capacity back to the connection & stream
pub fn release_capacity(
&mut self,
capacity: WindowSize,
@@ -250,19 +279,14 @@ impl Recv {
return Err(UserError::ReleaseCapacityTooBig);
}
self.release_connection_capacity(capacity, task);
// Decrement in-flight data
stream.in_flight_recv_data -= capacity;
self.in_flight_data -= capacity;
// Assign capacity to connection & stream
self.flow.assign_capacity(capacity);
// Assign capacity to stream
stream.recv_flow.assign_capacity(capacity);
if self.flow.unclaimed_capacity().is_some() {
if let Some(task) = task.take() {
task.notify();
}
}
if stream.recv_flow.unclaimed_capacity().is_some() {
// Queue the stream for sending the WINDOW_UPDATE frame.
@@ -353,8 +377,12 @@ impl Recv {
let sz = sz as WindowSize;
if !stream.state.is_recv_streaming() {
trace!("stream is not in receiving state; state={:?}", stream.state);
let is_ignoring_frame = stream.state.is_local_reset();
if !is_ignoring_frame && !stream.state.is_recv_streaming() {
// TODO: There are cases where this can be a stream error of
// STREAM_CLOSED instead...
// Receiving a DATA frame when not expecting one is a protocol
// error.
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
@@ -369,19 +397,46 @@ impl Recv {
// Ensure that there is enough capacity on the connection before acting
// on the stream.
if self.flow.window_size() < sz || stream.recv_flow.window_size() < sz {
return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR));
self.consume_connection_window(sz)?;
if is_ignoring_frame {
trace!(
"recv_data frame ignored on locally reset {:?} for some time",
stream.id,
);
// we just checked for enough connection window capacity, and
// consumed it. Since we are ignoring this frame "for some time",
// we aren't returning the frame to the user. That means they
// have no way to release the capacity back to the connection. So
// we have to release it automatically.
//
// This call doesn't send a WINDOW_UPDATE immediately, just marks
// the capacity as available to be reclaimed. When the available
// capacity meets a threshold, a WINDOW_UPDATE is then sent.
self.release_connection_capacity(sz, &mut None);
return Ok(());
}
// Update connection level flow control
self.flow.send_data(sz);
if stream.recv_flow.window_size() < sz {
// http://httpwg.org/specs/rfc7540.html#WINDOW_UPDATE
// > A receiver MAY respond with a stream error (Section 5.4.2) or
// > connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR if
// > it is unable to accept a frame.
//
// So, for violating the **stream** window, we can send either a
// stream or connection error. We've opted to send a stream
// error.
return Err(RecvError::Stream {
id: stream.id,
reason: Reason::FLOW_CONTROL_ERROR,
});
}
// Update stream level flow control
stream.recv_flow.send_data(sz);
// Track the data as in-flight
stream.in_flight_recv_data += sz;
self.in_flight_data += sz;
if stream.dec_content_length(frame.payload().len()).is_err() {
trace!("content-length overflow");
@@ -415,6 +470,19 @@ impl Recv {
Ok(())
}
pub fn consume_connection_window(&mut self, sz: WindowSize) -> Result<(), RecvError> {
if self.flow.window_size() < sz {
return Err(RecvError::Connection(Reason::FLOW_CONTROL_ERROR));
}
// Update connection level flow control
self.flow.send_data(sz);
// Track the data as in-flight
self.in_flight_data += sz;
Ok(())
}
pub fn recv_push_promise(
&mut self,
frame: frame::PushPromise,
@@ -480,15 +548,14 @@ impl Recv {
Ok(())
}
/// Handle remote sending an explicit RST_STREAM.
pub fn recv_reset(
&mut self,
frame: frame::Reset,
stream: &mut Stream,
) -> Result<(), RecvError> {
let err = proto::Error::Proto(frame.reason());
// Notify the stream
stream.state.recv_err(&err);
stream.state.recv_reset(frame.reason());
stream.notify_recv();
Ok(())
}
@@ -536,6 +603,38 @@ impl Recv {
Ok(())
}
/// Add a locally reset stream to queue to be eventually reaped.
pub fn enqueue_reset_expiration(
&mut self,
stream: &mut store::Ptr,
counts: &mut Counts,
) {
assert!(stream.state.is_local_reset());
if stream.is_pending_reset_expiration() {
return;
}
if !counts.can_inc_num_reset_streams() {
// try to evict 1 stream if possible
// if max allow is 0, this won't be able to evict,
// and then we'll just bail after
if let Some(evicted) = self.pending_reset_expired.pop(stream.store_mut()) {
// It's possible that this stream is still sitting in a send queue,
// such as if some data is to be sent and then a CANCEL. In this case,
// it could still be "counted", so we just make sure to always ask the
// stream instead of assuming.
let is_counted = evicted.is_counted();
counts.transition_after(evicted, is_counted, true);
}
}
if counts.can_inc_num_reset_streams() {
counts.inc_num_reset_streams();
self.pending_reset_expired.push(stream);
}
}
/// Send any pending refusals.
pub fn send_pending_refusal<T, B>(
&mut self,
@@ -562,6 +661,18 @@ impl Recv {
Ok(Async::Ready(()))
}
pub fn clear_expired_reset_streams(&mut self, store: &mut Store, counts: &mut Counts) {
let now = Instant::now();
let reset_duration = self.reset_duration;
while let Some(stream) = self.pending_reset_expired.pop_if(store, |stream| {
let reset_at = stream.reset_at.expect("reset_at must be set if in queue");
now - reset_at > reset_duration
}) {
let is_counted = stream.is_counted();
counts.transition_after(stream, is_counted, true);
}
}
pub fn poll_complete<T, B>(
&mut self,
store: &mut Store,

View File

@@ -3,7 +3,6 @@ use super::*;
use codec::{RecvError, UserError};
use codec::UserError::*;
use frame::{self, Reason};
use proto::*;
use bytes::Buf;

View File

@@ -60,8 +60,7 @@ enum Inner {
Open { local: Peer, remote: Peer },
HalfClosedLocal(Peer), // TODO: explicitly name this value
HalfClosedRemote(Peer),
// When reset, a reason is provided
Closed(Option<Cause>),
Closed(Cause),
}
#[derive(Debug, Copy, Clone)]
@@ -72,7 +71,9 @@ enum Peer {
#[derive(Debug, Copy, Clone)]
enum Cause {
EndStream,
Proto(Reason),
LocallyReset(Reason),
Io,
/// The user droped all handles to the stream without explicitly canceling.
@@ -84,7 +85,7 @@ enum Cause {
impl State {
/// Opens the send-half of a stream if it is not already open.
pub fn send_open(&mut self, eos: bool) -> Result<(), UserError> {
let local = Peer::Streaming;
let local = Streaming;
self.inner = match self.inner {
Idle => if eos {
@@ -107,7 +108,7 @@ impl State {
}
},
HalfClosedRemote(AwaitingHeaders) => if eos {
Closed(None)
Closed(Cause::EndStream)
} else {
HalfClosedRemote(local)
},
@@ -124,7 +125,7 @@ impl State {
///
/// Returns true if this transitions the state to Open.
pub fn recv_open(&mut self, eos: bool) -> Result<bool, RecvError> {
let remote = Peer::Streaming;
let remote = Streaming;
let mut initial = false;
self.inner = match self.inner {
@@ -144,7 +145,7 @@ impl State {
initial = true;
if eos {
Closed(None)
Closed(Cause::EndStream)
} else {
Open {
local: AwaitingHeaders,
@@ -164,7 +165,7 @@ impl State {
}
},
HalfClosedLocal(AwaitingHeaders) => if eos {
Closed(None)
Closed(Cause::EndStream)
} else {
HalfClosedLocal(remote)
},
@@ -201,13 +202,25 @@ impl State {
},
HalfClosedLocal(..) => {
trace!("recv_close: HalfClosedLocal => Closed");
self.inner = Closed(None);
self.inner = Closed(Cause::EndStream);
Ok(())
},
_ => Err(RecvError::Connection(Reason::PROTOCOL_ERROR)),
}
}
/// The remote explicitly sent a RST_STREAM.
pub fn recv_reset(&mut self, reason: Reason) {
match self.inner {
Closed(..) => {},
_ => {
trace!("recv_reset; reason={:?}", reason);
self.inner = Closed(Cause::Proto(reason));
},
}
}
/// We noticed a protocol error.
pub fn recv_err(&mut self, err: &proto::Error) {
use proto::Error::*;
@@ -216,8 +229,8 @@ impl State {
_ => {
trace!("recv_err; err={:?}", err);
self.inner = Closed(match *err {
Proto(reason) => Some(Cause::Proto(reason)),
Io(..) => Some(Cause::Io),
Proto(reason) => Cause::LocallyReset(reason),
Io(..) => Cause::Io,
});
},
}
@@ -228,7 +241,7 @@ impl State {
Closed(..) => {},
s => {
trace!("recv_eof; state={:?}", s);
self.inner = Closed(Some(Cause::Io));
self.inner = Closed(Cause::Io);
}
}
}
@@ -245,28 +258,34 @@ impl State {
},
HalfClosedRemote(..) => {
trace!("send_close: HalfClosedRemote => Closed");
self.inner = Closed(None);
self.inner = Closed(Cause::EndStream);
},
_ => panic!("transition send_close on unexpected state"),
}
}
/// Set the stream state to reset
/// Set the stream state to reset locally.
pub fn set_reset(&mut self, reason: Reason) {
self.inner = Closed(Some(Cause::Proto(reason)));
self.inner = Closed(Cause::LocallyReset(reason));
}
/// Set the stream state to canceled
pub fn set_canceled(&mut self) {
debug_assert!(!self.is_closed());
self.inner = Closed(Some(Cause::Canceled));
self.inner = Closed(Cause::Canceled);
}
pub fn is_canceled(&self) -> bool {
use self::Cause::Canceled;
match self.inner {
Closed(Some(Canceled)) => true,
Closed(Cause::Canceled) => true,
_ => false,
}
}
pub fn is_local_reset(&self) -> bool {
match self.inner {
Closed(Cause::LocallyReset(_)) => true,
Closed(Cause::Canceled) => true,
_ => false,
}
}
@@ -274,7 +293,8 @@ impl State {
/// Returns true if the stream is already reset.
pub fn is_reset(&self) -> bool {
match self.inner {
Closed(Some(_)) => true,
Closed(Cause::EndStream) => false,
Closed(_) => true,
_ => false,
}
}
@@ -294,10 +314,10 @@ impl State {
pub fn is_send_streaming(&self) -> bool {
match self.inner {
Open {
local: Peer::Streaming,
local: Streaming,
..
} => true,
HalfClosedRemote(Peer::Streaming) => true,
HalfClosedRemote(Streaming) => true,
_ => false,
}
}
@@ -319,10 +339,10 @@ impl State {
pub fn is_recv_streaming(&self) -> bool {
match self.inner {
Open {
remote: Peer::Streaming,
remote: Streaming,
..
} => true,
HalfClosedLocal(Peer::Streaming) => true,
HalfClosedLocal(Streaming) => true,
_ => false,
}
}
@@ -353,10 +373,12 @@ impl State {
// TODO: Is this correct?
match self.inner {
Closed(Some(Cause::Proto(reason))) => Err(proto::Error::Proto(reason)),
Closed(Some(Cause::Canceled)) => Err(proto::Error::Proto(Reason::CANCEL)),
Closed(Some(Cause::Io)) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())),
Closed(None) | HalfClosedRemote(..) => Ok(false),
Closed(Cause::Proto(reason)) |
Closed(Cause::LocallyReset(reason)) => Err(proto::Error::Proto(reason)),
Closed(Cause::Canceled) => Err(proto::Error::Proto(Reason::CANCEL)),
Closed(Cause::Io) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())),
Closed(Cause::EndStream) |
HalfClosedRemote(..) => Ok(false),
_ => Ok(true),
}
}
@@ -372,6 +394,6 @@ impl Default for State {
impl Default for Peer {
fn default() -> Self {
Peer::AwaitingHeaders
AwaitingHeaders
}
}

View File

@@ -289,6 +289,21 @@ where
None
}
pub fn pop_if<'a, R, F>(&mut self, store: &'a mut R, f: F) -> Option<store::Ptr<'a>>
where
R: Resolve,
F: Fn(&Stream) -> bool,
{
if let Some(idxs) = self.indices {
let should_pop = f(&store.resolve(idxs.head));
if should_pop {
return self.pop(store);
}
}
None
}
}
// ===== impl Ptr =====
@@ -299,6 +314,10 @@ impl<'a> Ptr<'a> {
self.key
}
pub fn store_mut(&mut self) -> &mut Store {
&mut self.store
}
/// Remove the stream from the store
pub fn remove(self) -> StreamId {
// The stream must have been unlinked before this point

View File

@@ -1,5 +1,6 @@
use super::*;
use std::time::Instant;
use std::usize;
/// Tracks Stream related state
@@ -84,6 +85,12 @@ pub(super) struct Stream {
/// True if the stream is waiting to send a window update
pub is_pending_window_update: bool,
/// The time when this stream may have been locally reset.
pub reset_at: Option<Instant>,
/// Next node in list of reset streams that should expire eventually
pub next_reset_expire: Option<store::Key>,
/// Frames pending for this stream to read
pub pending_recv: buffer::Deque,
@@ -120,6 +127,9 @@ pub(super) struct NextWindowUpdate;
#[derive(Debug)]
pub(super) struct NextOpen;
#[derive(Debug)]
pub(super) struct NextResetExpire;
impl Stream {
pub fn new(
id: StreamId,
@@ -167,6 +177,8 @@ impl Stream {
in_flight_recv_data: 0,
next_window_update: None,
is_pending_window_update: false,
reset_at: None,
next_reset_expire: None,
pending_recv: buffer::Deque::new(),
recv_task: None,
pending_push_promises: store::Queue::new(),
@@ -192,6 +204,12 @@ impl Stream {
!self.is_pending_open && self.state.is_at_least_half_open()
}
/// Returns true if stream is currently being held for some time because of
/// a local reset.
pub fn is_pending_reset_expiration(&self) -> bool {
self.reset_at.is_some()
}
/// Returns true if the stream is closed
pub fn is_closed(&self) -> bool {
// The state has fully transitioned to closed.
@@ -215,7 +233,8 @@ impl Stream {
self.ref_count == 0 &&
// The stream is not in any queue
!self.is_pending_send && !self.is_pending_send_capacity &&
!self.is_pending_accept && !self.is_pending_window_update
!self.is_pending_accept && !self.is_pending_window_update &&
!self.reset_at.is_some()
}
/// Returns true when the consumer of the stream has dropped all handles
@@ -391,6 +410,32 @@ impl store::Next for NextOpen {
}
}
impl store::Next for NextResetExpire {
fn next(stream: &Stream) -> Option<store::Key> {
stream.next_reset_expire
}
fn set_next(stream: &mut Stream, key: Option<store::Key>) {
stream.next_reset_expire = key;
}
fn take_next(stream: &mut Stream) -> Option<store::Key> {
stream.next_reset_expire.take()
}
fn is_queued(stream: &Stream) -> bool {
stream.reset_at.is_some()
}
fn set_queued(stream: &mut Stream, val: bool) {
if val {
stream.reset_at = Some(Instant::now());
} else {
stream.reset_at = None;
}
}
}
// ===== impl ContentLength =====
impl ContentLength {

View File

@@ -1,11 +1,14 @@
use super::*;
use super::store::Resolve;
use super::{Buffer, Config, Counts, Prioritized, Recv, Send, Stream, StreamId};
use super::store::{self, Entry, Resolve, Store};
use {client, proto, server};
use codec::{RecvError, SendError, UserError};
use frame::Reason;
use proto::*;
use codec::{Codec, RecvError, SendError, UserError};
use frame::{self, Frame, Reason};
use proto::{peer, Peer, WindowSize};
use http::HeaderMap;
use bytes::{Buf, Bytes};
use futures::{task, Async, Poll};
use http::{HeaderMap, Request, Response};
use tokio_io::AsyncWrite;
use std::{fmt, io};
use std::sync::{Arc, Mutex};
@@ -174,7 +177,10 @@ where
let stream = match me.store.find_mut(&id) {
Some(stream) => stream,
None => return Err(RecvError::Connection(Reason::PROTOCOL_ERROR)),
None => {
trace!("recv_data; stream not found: {:?}", id);
return Err(RecvError::Connection(Reason::PROTOCOL_ERROR));
},
};
let actions = &mut me.actions;
@@ -364,8 +370,10 @@ where
match me.actions.recv.next_incoming(&mut me.store) {
Some(key) => {
let mut stream = me.store.resolve(key);
trace!("next_incoming; id={:?}, state={:?}", stream.id, stream.state);
// Increment the ref count
me.store.resolve(key).ref_inc();
stream.ref_inc();
// Return the key
Some(key)
@@ -397,6 +405,12 @@ where
me.actions.recv.send_pending_refusal(dst)
}
pub fn clear_expired_reset_streams(&mut self) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
me.actions.recv.clear_expired_reset_streams(&mut me.store, &mut me.counts);
}
pub fn poll_complete<T>(&mut self, dst: &mut Codec<T, Prioritized<B>>) -> Poll<(), io::Error>
where
T: AsyncWrite,
@@ -547,9 +561,10 @@ where
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;
me.counts.transition(stream, |_, stream| {
me.counts.transition(stream, |counts, stream| {
actions.send.send_reset(
reason, send_buffer, stream, &mut actions.task)
reason, send_buffer, stream, &mut actions.task);
actions.recv.enqueue_reset_expiration(stream, counts)
})
}
}
@@ -876,11 +891,12 @@ fn drop_stream_ref(inner: &Mutex<Inner>, key: store::Key) {
let actions = &mut me.actions;
me.counts.transition(stream, |_, mut stream| {
me.counts.transition(stream, |counts, mut stream| {
if stream.is_canceled_interest() {
actions.send.schedule_cancel(
&mut stream,
&mut actions.task);
actions.recv.enqueue_reset_expiration(stream, counts);
}
});
}