Misc streams cleanup

This commit is contained in:
Carl Lerche
2017-08-02 13:09:14 -07:00
parent 22ebf186c6
commit 9d7221e6cf
6 changed files with 141 additions and 146 deletions

View File

@@ -3,6 +3,10 @@ use error::Reason;
use error::Reason::*;
use error::User::*;
use proto::*;
use super::FlowControl;
use self::Inner::*;
use self::Peer::*;
/// Represents the state of an H2 stream
///
@@ -44,8 +48,13 @@ use proto::*;
/// ES: END_STREAM flag
/// R: RST_STREAM frame
/// ```
#[derive(Debug, Copy, Clone)]
pub enum Stream {
#[derive(Debug, Clone)]
pub struct State {
inner: Inner,
}
#[derive(Debug, Clone, Copy)]
enum Inner {
Idle,
// TODO: these states shouldn't count against concurrency limits:
//ReservedLocal,
@@ -61,34 +70,18 @@ pub enum Stream {
}
#[derive(Debug, Copy, Clone)]
pub enum Peer {
enum Peer {
AwaitingHeaders,
/// Contains a FlowControl representing the _receiver_ of this this data stream.
Streaming(FlowControl),
}
#[derive(Copy, Clone, Debug)]
pub struct FlowControl {
/// Amount that may be claimed.
window_size: WindowSize,
/// Amount to be removed by future increments.
underflow: WindowSize,
/// The amount that has been incremented but not yet advertised (to the application or
/// the remote).
next_window_update: WindowSize,
}
impl Stream {
impl State {
/// Opens the send-half of a stream if it is not already open.
pub fn send_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> {
use self::Stream::*;
use self::Peer::*;
let local = Peer::streaming(sz);
*self = match *self {
self.inner = match self.inner {
Idle => {
if eos {
HalfClosedLocal(AwaitingHeaders)
@@ -128,12 +121,9 @@ impl Stream {
/// Open the receive have of the stream, this action is taken when a HEADERS
/// frame is received.
pub fn recv_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> {
use self::Stream::*;
use self::Peer::*;
let remote = Peer::streaming(sz);
*self = match *self {
self.inner = match self.inner {
Idle => {
if eos {
HalfClosedRemote(AwaitingHeaders)
@@ -172,18 +162,16 @@ impl Stream {
/// Indicates that the remote side will not send more data to the local.
pub fn recv_close(&mut self) -> Result<(), ConnectionError> {
use self::Stream::*;
match *self {
match self.inner {
Open { local, .. } => {
// The remote side will continue to receive data.
trace!("recv_close: Open => HalfClosedRemote({:?})", local);
*self = HalfClosedRemote(local);
self.inner = HalfClosedRemote(local);
Ok(())
}
HalfClosedLocal(..) => {
trace!("recv_close: HalfClosedLocal => Closed");
*self = Closed(None);
self.inner = Closed(None);
Ok(())
}
_ => Err(ProtocolError.into()),
@@ -192,18 +180,16 @@ impl Stream {
/// Indicates that the local side will not send more data to the local.
pub fn send_close(&mut self) -> Result<(), ConnectionError> {
use self::Stream::*;
match *self {
match self.inner {
Open { remote, .. } => {
// The remote side will continue to receive data.
trace!("send_close: Open => HalfClosedLocal({:?})", remote);
*self = HalfClosedLocal(remote);
self.inner = HalfClosedLocal(remote);
Ok(())
}
HalfClosedRemote(..) => {
trace!("send_close: HalfClosedRemote => Closed");
*self = Closed(None);
self.inner = Closed(None);
Ok(())
}
_ => Err(ProtocolError.into()),
@@ -211,18 +197,14 @@ impl Stream {
}
pub fn is_closed(&self) -> bool {
use self::Stream::*;
match *self {
match self.inner {
Closed(_) => true,
_ => false,
}
}
pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> {
use self::Stream::*;
match *self {
match self.inner {
Open { ref mut remote, .. } |
HalfClosedLocal(ref mut remote) => remote.flow_control(),
_ => None,
@@ -230,9 +212,7 @@ impl Stream {
}
pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> {
use self::Stream::*;
match *self {
match self.inner {
Open { ref mut local, .. } |
HalfClosedRemote(ref mut local) => local.flow_control(),
_ => None,
@@ -240,9 +220,9 @@ impl Stream {
}
}
impl Default for Stream {
fn default() -> Stream {
Stream::Idle
impl Default for State {
fn default() -> State {
State { inner: Inner::Idle }
}
}
@@ -258,81 +238,9 @@ impl Peer {
}
fn flow_control(&mut self) -> Option<&mut FlowControl> {
use self::Peer::*;
match *self {
Streaming(ref mut flow) => Some(flow),
_ => None,
}
}
}
impl FlowControl {
pub fn new(window_size: WindowSize) -> FlowControl {
FlowControl {
window_size,
underflow: 0,
next_window_update: 0,
}
}
/// Returns true iff `claim_window(sz)` would return succeed.
pub fn ensure_window<T>(&mut self, sz: WindowSize, err: T) -> Result<(), ConnectionError>
where T: Into<ConnectionError>,
{
if sz <= self.window_size {
Ok(())
} else {
Err(err.into())
}
}
/// Claims the provided amount from the window, if there is enough space.
///
/// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than
/// have been previously claimed.
pub fn claim_window<T>(&mut self, sz: WindowSize, err: T)
-> Result<(), ConnectionError>
where T: Into<ConnectionError>,
{
self.ensure_window(sz, err)?;
self.window_size -= sz;
Ok(())
}
/// Increase the _unadvertised_ window capacity.
pub fn expand_window(&mut self, sz: WindowSize) {
if sz <= self.underflow {
self.underflow -= sz;
return;
}
let added = sz - self.underflow;
self.next_window_update += added;
self.underflow = 0;
}
/// Obtains the unadvertised window update.
///
/// This does not apply the window update to `self`.
pub fn peek_window_update(&mut self) -> Option<WindowSize> {
if self.next_window_update == 0 {
None
} else {
Some(self.next_window_update)
}
}
/// Obtains and applies an unadvertised window update.
pub fn apply_window_update(&mut self) -> Option<WindowSize> {
if self.next_window_update == 0 {
return None;
}
let incr = self.next_window_update;
self.next_window_update = 0;
self.window_size += incr;
Some(incr)
}
}