reintroduce state tracking, separate from flow tracking
This commit is contained in:
@@ -47,21 +47,37 @@ impl<T, U> FlowControl<T>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> FlowControl<T> {
|
|
||||||
#[inline]
|
|
||||||
fn claim_local_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> {
|
|
||||||
self.local_flow_controller.claim_window(len)
|
|
||||||
.map_err(|_| error::Reason::FlowControlError.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn claim_remote_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> {
|
|
||||||
self.remote_flow_controller.claim_window(len)
|
|
||||||
.map_err(|_| error::User::FlowControlViolation.into())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: StreamTransporter> FlowControl<T> {
|
impl<T: StreamTransporter> FlowControl<T> {
|
||||||
|
fn claim_local_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> {
|
||||||
|
if id.is_zero() {
|
||||||
|
return self.local_flow_controller.claim_window(len)
|
||||||
|
.map_err(|_| error::Reason::FlowControlError.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(mut stream) = self.streams_mut().get_mut(&id) {
|
||||||
|
return stream.claim_local_window(len)
|
||||||
|
.map_err(|_| error::Reason::FlowControlError.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ignore updates for non-existent streams.
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn claim_remote_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> {
|
||||||
|
if id.is_zero() {
|
||||||
|
return self.local_flow_controller.claim_window(len)
|
||||||
|
.map_err(|_| error::Reason::FlowControlError.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(mut stream) = self.streams_mut().get_mut(&id) {
|
||||||
|
return stream.claim_remote_window(len)
|
||||||
|
.map_err(|_| error::Reason::FlowControlError.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ignore updates for non-existent streams.
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Handles a window update received from the remote, indicating that the local may
|
/// Handles a window update received from the remote, indicating that the local may
|
||||||
/// send `incr` additional bytes.
|
/// send `incr` additional bytes.
|
||||||
///
|
///
|
||||||
@@ -75,7 +91,7 @@ impl<T: StreamTransporter> FlowControl<T> {
|
|||||||
self.remote_flow_controller.grow_window(incr);
|
self.remote_flow_controller.grow_window(incr);
|
||||||
true
|
true
|
||||||
} else if let Some(mut s) = self.streams_mut().get_mut(&id) {
|
} else if let Some(mut s) = self.streams_mut().get_mut(&id) {
|
||||||
s.grow_send_window(incr);
|
s.grow_remote_window(incr);
|
||||||
true
|
true
|
||||||
} else {
|
} else {
|
||||||
false
|
false
|
||||||
@@ -137,10 +153,10 @@ impl<T> ConnectionTransporter for FlowControl<T>
|
|||||||
let mut streams = self.streams_mut();
|
let mut streams = self.streams_mut();
|
||||||
if new_window_size < old_window_size {
|
if new_window_size < old_window_size {
|
||||||
let decr = old_window_size - new_window_size;
|
let decr = old_window_size - new_window_size;
|
||||||
streams.shrink_local_window(decr);
|
streams.shrink_all_local_windows(decr);
|
||||||
} else {
|
} else {
|
||||||
let incr = new_window_size - old_window_size;
|
let incr = new_window_size - old_window_size;
|
||||||
streams.grow_local_window(incr);
|
streams.grow_all_local_windows(incr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -161,10 +177,10 @@ impl<T> ConnectionTransporter for FlowControl<T>
|
|||||||
let mut streams = self.streams_mut();
|
let mut streams = self.streams_mut();
|
||||||
if new_window_size < old_window_size {
|
if new_window_size < old_window_size {
|
||||||
let decr = old_window_size - new_window_size;
|
let decr = old_window_size - new_window_size;
|
||||||
streams.shrink_remote_window(decr);
|
streams.shrink_all_remote_windows(decr);
|
||||||
} else {
|
} else {
|
||||||
let incr = new_window_size - old_window_size;
|
let incr = new_window_size - old_window_size;
|
||||||
streams.grow_remote_window(incr);
|
streams.grow_all_remote_windows(incr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -201,7 +217,7 @@ impl<T> Stream for FlowControl<T>
|
|||||||
}
|
}
|
||||||
|
|
||||||
Some(Data(v)) => {
|
Some(Data(v)) => {
|
||||||
self.claim_local_window(v.len())?;
|
self.claim_local_window(&v.stream_id(), v.len())?;
|
||||||
return Ok(Async::Ready(Some(Data(v))));
|
return Ok(Async::Ready(Some(Data(v))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -223,7 +239,7 @@ impl<T, U> Sink for FlowControl<T>
|
|||||||
use frame::Frame::*;
|
use frame::Frame::*;
|
||||||
|
|
||||||
if let &Data(ref v) = &item {
|
if let &Data(ref v) = &item {
|
||||||
self.claim_remote_window(v.len())?;
|
self.claim_remote_window(&v.stream_id(), v.len())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.inner.start_send(item)
|
self.inner.start_send(item)
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ mod stream_tracker;
|
|||||||
|
|
||||||
pub use self::connection::Connection;
|
pub use self::connection::Connection;
|
||||||
pub use self::flow_control::FlowControl;
|
pub use self::flow_control::FlowControl;
|
||||||
pub use self::flow_controller::FlowController;
|
pub use self::flow_controller::{FlowController, WindowUnderflow};
|
||||||
pub use self::framed_read::FramedRead;
|
pub use self::framed_read::FramedRead;
|
||||||
pub use self::framed_write::FramedWrite;
|
pub use self::framed_write::FramedWrite;
|
||||||
pub use self::ping_pong::PingPong;
|
pub use self::ping_pong::PingPong;
|
||||||
@@ -74,27 +74,27 @@ impl StreamMap {
|
|||||||
self.inner.entry(id)
|
self.inner.entry(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shrink_local_window(&mut self, decr: u32) {
|
fn shrink_all_local_windows(&mut self, decr: u32) {
|
||||||
for (_, mut s) in &mut self.inner {
|
for (_, mut s) in &mut self.inner {
|
||||||
s.shrink_recv_window(decr)
|
s.shrink_local_window(decr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn grow_local_window(&mut self, incr: u32) {
|
fn grow_all_local_windows(&mut self, incr: u32) {
|
||||||
for (_, mut s) in &mut self.inner {
|
for (_, mut s) in &mut self.inner {
|
||||||
s.grow_recv_window(incr)
|
s.grow_local_window(incr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shrink_remote_window(&mut self, decr: u32) {
|
fn shrink_all_remote_windows(&mut self, decr: u32) {
|
||||||
for (_, mut s) in &mut self.inner {
|
for (_, mut s) in &mut self.inner {
|
||||||
s.shrink_send_window(decr)
|
s.shrink_remote_window(decr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn grow_remote_window(&mut self, incr: u32) {
|
fn grow_all_remote_windows(&mut self, incr: u32) {
|
||||||
for (_, mut s) in &mut self.inner {
|
for (_, mut s) in &mut self.inner {
|
||||||
s.grow_send_window(incr)
|
s.grow_remote_window(incr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
use {FrameSize, Peer};
|
use Peer;
|
||||||
use error::ConnectionError;
|
use error::ConnectionError;
|
||||||
use error::Reason::*;
|
use error::Reason::*;
|
||||||
use error::User::*;
|
use error::User::*;
|
||||||
use proto::FlowController;
|
use proto::{FlowController, WindowSize, WindowUnderflow};
|
||||||
|
|
||||||
/// Represents the state of an H2 stream
|
/// Represents the state of an H2 stream
|
||||||
///
|
///
|
||||||
@@ -60,111 +60,13 @@ pub enum StreamState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl StreamState {
|
impl StreamState {
|
||||||
/// Updates the local flow controller so that the remote may send `incr` more bytes.
|
|
||||||
///
|
|
||||||
/// Returns the amount of capacity created, accounting for window size changes. The
|
|
||||||
/// caller should send the the returned window size increment to the remote.
|
|
||||||
///
|
|
||||||
/// If the remote is closed, None is returned.
|
|
||||||
pub fn grow_send_window(&mut self, incr: u32) {
|
|
||||||
use self::StreamState::*;
|
|
||||||
use self::PeerState::*;
|
|
||||||
|
|
||||||
if incr == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
match self {
|
|
||||||
&mut Open { remote: Data(ref mut fc), .. } |
|
|
||||||
&mut HalfClosedLocal(Data(ref mut fc)) => fc.grow_window(incr),
|
|
||||||
_ => {},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn shrink_send_window(&mut self, decr: u32) {
|
|
||||||
use self::StreamState::*;
|
|
||||||
use self::PeerState::*;
|
|
||||||
|
|
||||||
if decr == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
match self {
|
|
||||||
&mut Open { local: Data(ref mut fc), .. } |
|
|
||||||
&mut HalfClosedLocal(Data(ref mut fc)) => fc.shrink_window(decr),
|
|
||||||
_ => {},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/// Consumes newly-advertised capacity to inform the local endpoint it may send more
|
|
||||||
/// data.
|
|
||||||
pub fn take_send_window_update(&mut self) -> Option<u32> {
|
|
||||||
use self::StreamState::*;
|
|
||||||
use self::PeerState::*;
|
|
||||||
|
|
||||||
match self {
|
|
||||||
&mut Open { remote: Data(ref mut fc), .. } |
|
|
||||||
&mut HalfClosedLocal(Data(ref mut fc)) => fc.take_window_update(),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Updates the remote flow controller so that the remote may receive `incr`
|
|
||||||
/// additional bytes.
|
|
||||||
///
|
|
||||||
/// Returns the amount of capacity created, accounting for window size changes. The
|
|
||||||
/// caller should send the the returned window size increment to the remote.
|
|
||||||
pub fn grow_recv_window(&mut self, incr: u32) {
|
|
||||||
use self::StreamState::*;
|
|
||||||
use self::PeerState::*;
|
|
||||||
|
|
||||||
if incr == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
match self {
|
|
||||||
&mut Open { local: Data(ref mut fc), .. } |
|
|
||||||
&mut HalfClosedRemote(Data(ref mut fc)) => fc.grow_window(incr),
|
|
||||||
_ => {},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn shrink_recv_window(&mut self, decr: u32) {
|
|
||||||
use self::StreamState::*;
|
|
||||||
use self::PeerState::*;
|
|
||||||
|
|
||||||
if decr == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
match self {
|
|
||||||
&mut Open { local: Data(ref mut fc), .. } |
|
|
||||||
&mut HalfClosedRemote(Data(ref mut fc)) => fc.shrink_window(decr),
|
|
||||||
_ => {},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Consumes newly-advertised capacity to inform the local endpoint it may send more
|
|
||||||
/// data.
|
|
||||||
pub fn take_recv_window_update(&mut self) -> Option<u32> {
|
|
||||||
use self::StreamState::*;
|
|
||||||
use self::PeerState::*;
|
|
||||||
|
|
||||||
match self {
|
|
||||||
&mut Open { local: Data(ref mut fc), .. } |
|
|
||||||
&mut HalfClosedRemote(Data(ref mut fc)) => fc.take_window_update(),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Transition the state to represent headers being received.
|
/// Transition the state to represent headers being received.
|
||||||
///
|
///
|
||||||
/// Returns true if this state transition results in iniitializing the
|
/// Returns true if this state transition results in iniitializing the
|
||||||
/// stream id. `Err` is returned if this is an invalid state transition.
|
/// stream id. `Err` is returned if this is an invalid state transition.
|
||||||
pub fn recv_headers<P: Peer>(&mut self,
|
pub fn recv_headers<P: Peer>(&mut self,
|
||||||
eos: bool,
|
eos: bool,
|
||||||
initial_recv_window_size: u32)
|
initial_recv_window_size: WindowSize)
|
||||||
-> Result<bool, ConnectionError>
|
-> Result<bool, ConnectionError>
|
||||||
{
|
{
|
||||||
use self::StreamState::*;
|
use self::StreamState::*;
|
||||||
@@ -207,22 +109,20 @@ impl StreamState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn recv_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> {
|
pub fn recv_data(&mut self, eos: bool) -> Result<(), ConnectionError> {
|
||||||
use self::StreamState::*;
|
use self::StreamState::*;
|
||||||
|
|
||||||
match *self {
|
match *self {
|
||||||
Open { local, mut remote } => {
|
Open { local, remote } => {
|
||||||
try!(remote.check_is_data(ProtocolError.into()));
|
try!(remote.check_is_data(ProtocolError.into()));
|
||||||
try!(remote.claim_window_size(len, FlowControlError.into()));
|
|
||||||
if eos {
|
if eos {
|
||||||
*self = HalfClosedRemote(local);
|
*self = HalfClosedRemote(local);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
HalfClosedLocal(mut remote) => {
|
HalfClosedLocal(remote) => {
|
||||||
try!(remote.check_is_data(ProtocolError.into()));
|
try!(remote.check_is_data(ProtocolError.into()));
|
||||||
try!(remote.claim_window_size(len, FlowControlError.into()));
|
|
||||||
if eos {
|
if eos {
|
||||||
*self = Closed;
|
*self = Closed;
|
||||||
}
|
}
|
||||||
@@ -243,7 +143,7 @@ impl StreamState {
|
|||||||
/// id. `Err` is returned if this is an invalid state transition.
|
/// id. `Err` is returned if this is an invalid state transition.
|
||||||
pub fn send_headers<P: Peer>(&mut self,
|
pub fn send_headers<P: Peer>(&mut self,
|
||||||
eos: bool,
|
eos: bool,
|
||||||
initial_window_size: u32)
|
initial_window_size: WindowSize)
|
||||||
-> Result<bool, ConnectionError>
|
-> Result<bool, ConnectionError>
|
||||||
{
|
{
|
||||||
use self::StreamState::*;
|
use self::StreamState::*;
|
||||||
@@ -294,33 +194,156 @@ impl StreamState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn send_data(&mut self, eos: bool, len: FrameSize) -> Result<(), ConnectionError> {
|
pub fn send_data(&mut self, eos: bool) -> Result<(), ConnectionError> {
|
||||||
use self::StreamState::*;
|
use self::StreamState::*;
|
||||||
|
|
||||||
match *self {
|
match *self {
|
||||||
Open { mut local, remote } => {
|
Open { local, remote } => {
|
||||||
try!(local.check_is_data(UnexpectedFrameType.into()));
|
try!(local.check_is_data(UnexpectedFrameType.into()));
|
||||||
try!(local.claim_window_size(len, FlowControlViolation.into()));
|
|
||||||
if eos {
|
if eos {
|
||||||
*self = HalfClosedLocal(remote);
|
*self = HalfClosedLocal(remote);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
HalfClosedRemote(mut local) => {
|
HalfClosedRemote(local) => {
|
||||||
try!(local.check_is_data(UnexpectedFrameType.into()));
|
try!(local.check_is_data(UnexpectedFrameType.into()));
|
||||||
try!(local.claim_window_size(len, FlowControlViolation.into()));
|
|
||||||
if eos {
|
if eos {
|
||||||
*self = Closed;
|
*self = Closed;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
Closed | HalfClosedLocal(..) => {
|
Idle | Closed | HalfClosedLocal(..) => {
|
||||||
Err(UnexpectedFrameType.into())
|
Err(UnexpectedFrameType.into())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_ => unimplemented!(),
|
/// Updates the local flow controller so that the remote may send `incr` more bytes.
|
||||||
|
///
|
||||||
|
/// Returns the amount of capacity created, accounting for window size changes. The
|
||||||
|
/// caller should send the the returned window size increment to the remote.
|
||||||
|
///
|
||||||
|
/// If the remote is closed, None is returned.
|
||||||
|
pub fn grow_remote_window(&mut self, incr: WindowSize) {
|
||||||
|
use self::StreamState::*;
|
||||||
|
use self::PeerState::*;
|
||||||
|
|
||||||
|
if incr == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self {
|
||||||
|
&mut Open { remote: Data(ref mut fc), .. } |
|
||||||
|
&mut HalfClosedLocal(Data(ref mut fc)) => fc.grow_window(incr),
|
||||||
|
_ => {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn claim_remote_window(&mut self, decr: WindowSize) -> Result<(), WindowUnderflow> {
|
||||||
|
use self::StreamState::*;
|
||||||
|
use self::PeerState::*;
|
||||||
|
|
||||||
|
if decr == 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
match self {
|
||||||
|
&mut Open { remote: Data(ref mut fc), .. } |
|
||||||
|
&mut HalfClosedLocal(Data(ref mut fc)) => fc.claim_window(decr),
|
||||||
|
_ => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn shrink_remote_window(&mut self, decr: WindowSize) {
|
||||||
|
use self::StreamState::*;
|
||||||
|
use self::PeerState::*;
|
||||||
|
|
||||||
|
if decr == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self {
|
||||||
|
&mut Open { local: Data(ref mut fc), .. } |
|
||||||
|
&mut HalfClosedLocal(Data(ref mut fc)) => fc.shrink_window(decr),
|
||||||
|
_ => {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Consumes newly-advertised capacity to inform the local endpoint it may send more
|
||||||
|
/// data.
|
||||||
|
pub fn take_remote_window_update(&mut self) -> Option<WindowSize> {
|
||||||
|
use self::StreamState::*;
|
||||||
|
use self::PeerState::*;
|
||||||
|
|
||||||
|
match self {
|
||||||
|
&mut Open { remote: Data(ref mut fc), .. } |
|
||||||
|
&mut HalfClosedLocal(Data(ref mut fc)) => fc.take_window_update(),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Updates the remote flow controller so that the remote may receive `incr`
|
||||||
|
/// additional bytes.
|
||||||
|
///
|
||||||
|
/// Returns the amount of capacity created, accounting for window size changes. The
|
||||||
|
/// caller should send the the returned window size increment to the remote.
|
||||||
|
pub fn grow_local_window(&mut self, incr: WindowSize) {
|
||||||
|
use self::StreamState::*;
|
||||||
|
use self::PeerState::*;
|
||||||
|
|
||||||
|
if incr == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self {
|
||||||
|
&mut Open { local: Data(ref mut fc), .. } |
|
||||||
|
&mut HalfClosedRemote(Data(ref mut fc)) => fc.grow_window(incr),
|
||||||
|
_ => {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn claim_local_window(&mut self, decr: WindowSize) -> Result<(), WindowUnderflow> {
|
||||||
|
use self::StreamState::*;
|
||||||
|
use self::PeerState::*;
|
||||||
|
|
||||||
|
if decr == 0 {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
match self {
|
||||||
|
&mut Open { local: Data(ref mut fc), .. } |
|
||||||
|
&mut HalfClosedRemote(Data(ref mut fc)) => fc.claim_window(decr),
|
||||||
|
_ => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn shrink_local_window(&mut self, decr: WindowSize) {
|
||||||
|
use self::StreamState::*;
|
||||||
|
use self::PeerState::*;
|
||||||
|
|
||||||
|
if decr == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self {
|
||||||
|
&mut Open { local: Data(ref mut fc), .. } |
|
||||||
|
&mut HalfClosedRemote(Data(ref mut fc)) => fc.shrink_window(decr),
|
||||||
|
_ => {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Consumes newly-advertised capacity to inform the local endpoint it may send more
|
||||||
|
/// data.
|
||||||
|
pub fn take_local_window_update(&mut self) -> Option<WindowSize> {
|
||||||
|
use self::StreamState::*;
|
||||||
|
use self::PeerState::*;
|
||||||
|
|
||||||
|
match self {
|
||||||
|
&mut Open { local: Data(ref mut fc), .. } |
|
||||||
|
&mut HalfClosedRemote(Data(ref mut fc)) => fc.take_window_update(),
|
||||||
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -356,13 +379,4 @@ impl PeerState {
|
|||||||
_ => Err(err),
|
_ => Err(err),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
|
||||||
fn claim_window_size(&mut self, sz: FrameSize, err: ConnectionError) -> Result<(), ConnectionError> {
|
|
||||||
use self::PeerState::*;
|
|
||||||
match self {
|
|
||||||
&mut Data(ref mut fc) => fc.claim_window(sz).map_err(|_| err),
|
|
||||||
_ => Err(err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
use ConnectionError;
|
use ConnectionError;
|
||||||
use error::User::*;
|
use error::Reason::ProtocolError;
|
||||||
|
use error::User::InvalidStreamId;
|
||||||
use frame::{self, Frame};
|
use frame::{self, Frame};
|
||||||
use proto::*;
|
use proto::*;
|
||||||
|
|
||||||
@@ -119,8 +120,15 @@ impl<T, P> Stream for StreamTracker<T, P>
|
|||||||
Ok(Async::Ready(Some(Headers(v))))
|
Ok(Async::Ready(Some(Headers(v))))
|
||||||
}
|
}
|
||||||
|
|
||||||
f => Ok(Async::Ready(f))
|
Some(Data(v)) => {
|
||||||
|
match self.streams.get_mut(&v.stream_id()) {
|
||||||
|
None => return Err(ProtocolError.into()),
|
||||||
|
Some(state) => state.recv_data(v.is_end_stream())?,
|
||||||
|
}
|
||||||
|
Ok(Async::Ready(Some(Data(v))))
|
||||||
|
}
|
||||||
|
|
||||||
|
f => Ok(Async::Ready(f))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -136,7 +144,8 @@ impl<T, P, U> Sink for StreamTracker<T, P>
|
|||||||
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
|
fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
|
||||||
use frame::Frame::*;
|
use frame::Frame::*;
|
||||||
|
|
||||||
if let &Headers(ref v) = &item {
|
match &item {
|
||||||
|
&Headers(ref v) => {
|
||||||
let id = v.stream_id();
|
let id = v.stream_id();
|
||||||
let eos = v.is_end_stream();
|
let eos = v.is_end_stream();
|
||||||
|
|
||||||
@@ -163,6 +172,16 @@ impl<T, P, U> Sink for StreamTracker<T, P>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
&Data(ref v) => {
|
||||||
|
match self.streams.get_mut(&v.stream_id()) {
|
||||||
|
None => return Err(ProtocolError.into()),
|
||||||
|
Some(state) => state.send_data(v.is_end_stream())?,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
self.inner.start_send(item)
|
self.inner.start_send(item)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user