Wire in recv flow control (#26)

This commit is contained in:
Carl Lerche
2017-08-23 11:22:24 -07:00
committed by GitHub
parent a623ab68b5
commit 807d2b7317
18 changed files with 452 additions and 345 deletions

View File

@@ -1,5 +1,5 @@
use {frame, ConnectionError, StreamId}; use {frame, ConnectionError, StreamId};
use {Body, Chunk}; use Body;
use proto::{self, Connection, WindowSize}; use proto::{self, Connection, WindowSize};
use error::Reason::*; use error::Reason::*;

View File

@@ -3,13 +3,20 @@ use frame::{self, Head, Error, Kind, StreamId};
use bytes::{BufMut, BigEndian}; use bytes::{BufMut, BigEndian};
#[derive(Debug)] #[derive(Debug, Clone, Copy)]
pub struct GoAway { pub struct GoAway {
last_stream_id: StreamId, last_stream_id: StreamId,
error_code: u32, error_code: u32,
} }
impl GoAway { impl GoAway {
pub fn new(last_stream_id: StreamId, reason: Reason) -> Self {
GoAway {
last_stream_id,
error_code: reason.into(),
}
}
pub fn reason(&self) -> Reason { pub fn reason(&self) -> Reason {
self.error_code.into() self.error_code.into()
} }

View File

@@ -165,6 +165,9 @@ pub enum Error {
/// An invalid setting value was provided /// An invalid setting value was provided
InvalidSettingValue, InvalidSettingValue,
/// An invalid window update value
InvalidWindowUpdateValue,
/// The payload length specified by the frame header was not the /// The payload length specified by the frame header was not the
/// value necessary for the specific frame type. /// value necessary for the specific frame type.
InvalidPayloadLength, InvalidPayloadLength,

View File

@@ -38,7 +38,9 @@ impl WindowUpdate {
// when received. // when received.
let size_increment = unpack_octets_4!(payload, 0, u32) & !SIZE_INCREMENT_MASK; let size_increment = unpack_octets_4!(payload, 0, u32) & !SIZE_INCREMENT_MASK;
// TODO: the size_increment must be greater than 0 if size_increment == 0 {
return Err(Error::InvalidWindowUpdateValue.into());
}
Ok(WindowUpdate { Ok(WindowUpdate {
stream_id: head.stream_id(), stream_id: head.stream_id(),

View File

@@ -53,30 +53,14 @@ pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<B::Buf>, inner: proto::StreamRef<B::Buf>,
} }
#[derive(Debug)]
pub struct Chunk<B: IntoBuf> {
inner: proto::Chunk<B::Buf>,
}
// ===== impl Body ===== // ===== impl Body =====
impl<B: IntoBuf> futures::Stream for Body<B> { impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Chunk<B>; type Item = Bytes;
type Error = ConnectionError; type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let chunk = try_ready!(self.inner.poll_data()) self.inner.poll_data()
.map(|inner| Chunk { inner });
Ok(chunk.into())
}
}
// ===== impl Chunk =====
impl<B: IntoBuf> Chunk<B> {
pub fn pop_bytes(&mut self) -> Option<Bytes> {
self.inner.pop_bytes()
} }
} }

View File

@@ -13,14 +13,40 @@ use std::marker::PhantomData;
/// An H2 connection /// An H2 connection
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Connection<T, P, B: IntoBuf = Bytes> { pub(crate) struct Connection<T, P, B: IntoBuf = Bytes> {
// Codec /// Tracks the connection level state transitions.
state: State,
/// Read / write frame values
codec: Codec<T, Prioritized<B::Buf>>, codec: Codec<T, Prioritized<B::Buf>>,
/// Ping/pong handler
ping_pong: PingPong<Prioritized<B::Buf>>, ping_pong: PingPong<Prioritized<B::Buf>>,
/// Connection settings
settings: Settings, settings: Settings,
/// Stream state handler
streams: Streams<B::Buf>, streams: Streams<B::Buf>,
/// Client or server
_phantom: PhantomData<P>, _phantom: PhantomData<P>,
} }
#[derive(Debug)]
enum State {
/// Currently open in a sane state
Open,
/// Waiting to send a GO_AWAY frame
GoAway(frame::GoAway),
/// The codec must be flushed
Flush(Reason),
/// In an errored state
Error(Reason),
}
impl<T, P, B> Connection<T, P, B> impl<T, P, B> Connection<T, P, B>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,
P: Peer, P: Peer,
@@ -36,6 +62,7 @@ impl<T, P, B> Connection<T, P, B>
}); });
Connection { Connection {
state: State::Open,
codec: codec, codec: codec,
ping_pong: PingPong::new(), ping_pong: PingPong::new(),
settings: Settings::new(), settings: Settings::new(),
@@ -62,13 +89,36 @@ impl<T, P, B> Connection<T, P, B>
/// Advances the internal state of the connection. /// Advances the internal state of the connection.
pub fn poll(&mut self) -> Poll<(), ConnectionError> { pub fn poll(&mut self) -> Poll<(), ConnectionError> {
match self.poll2() { use error::ConnectionError::*;
Err(e) => {
debug!("Connection::poll; err={:?}", e); loop {
self.streams.recv_err(&e); match self.state {
Err(e) // When open, continue to poll a frame
State::Open => {},
// In an error state
_ => {
try_ready!(self.poll_complete());
// GO_AWAY frame has been sent, return the error
return Err(self.state.error().unwrap().into());
}
}
match self.poll2() {
Err(Proto(e)) => {
debug!("Connection::poll; err={:?}", e);
let last_processed_id = self.streams.recv_err(&e.into());
let frame = frame::GoAway::new(last_processed_id, e);
self.state = State::GoAway(frame);
}
Err(e) => {
// TODO: Are I/O errors recoverable?
self.streams.recv_err(&e);
return Err(e);
}
ret => return ret,
} }
ret => ret,
} }
} }
@@ -114,7 +164,7 @@ impl<T, P, B> Connection<T, P, B>
self.settings.recv_settings(frame); self.settings.recv_settings(frame);
} }
Some(GoAway(frame)) => { Some(GoAway(frame)) => {
// TODO: handle the last_stream_id. Also, should this be // TODO: handle the last_processed_id. Also, should this be
// handled as an error? // handled as an error?
let e = ConnectionError::Proto(frame.reason()); let e = ConnectionError::Proto(frame.reason());
return Ok(().into()); return Ok(().into());
@@ -141,12 +191,34 @@ impl<T, P, B> Connection<T, P, B>
} }
fn poll_complete(&mut self) -> Poll<(), ConnectionError> { fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.poll_ready()); loop {
match self.state {
State::Open => {
try_ready!(self.poll_ready());
// Ensure all window updates have been sent. // Ensure all window updates have been sent.
try_ready!(self.streams.poll_complete(&mut self.codec)); try_ready!(self.streams.poll_complete(&mut self.codec));
Ok(().into()) return Ok(().into());
}
State::GoAway(frame) => {
if !self.codec.start_send(frame.into())?.is_ready() {
// Not ready to send the frame... try again later.
return Ok(Async::NotReady);
}
// GO_AWAY sent, transition the connection to an errored state
self.state = State::Flush(frame.reason());
}
State::Flush(reason) => {
try_ready!(self.codec.poll_complete());
self.state = State::Error(reason);
}
State::Error(..) => {
return Ok(().into());
}
}
}
} }
fn convert_poll_message(frame: frame::Headers) -> Result<Frame<P::Poll>, ConnectionError> { fn convert_poll_message(frame: frame::Headers) -> Result<Frame<P::Poll>, ConnectionError> {
@@ -185,3 +257,21 @@ impl<T, B> Connection<T, server::Peer, B>
self.streams.next_incoming() self.streams.next_incoming()
} }
} }
// ====== impl State =====
impl State {
fn is_open(&self) -> bool {
match *self {
State::Open => true,
_ => false,
}
}
fn error(&self) -> Option<Reason> {
match *self {
State::Error(reason) => Some(reason),
_ => None,
}
}
}

View File

@@ -7,7 +7,7 @@ mod settings;
mod streams; mod streams;
pub(crate) use self::connection::Connection; pub(crate) use self::connection::Connection;
pub(crate) use self::streams::{Streams, StreamRef, Chunk}; pub(crate) use self::streams::{Streams, StreamRef};
use self::codec::Codec; use self::codec::Codec;
use self::framed_read::FramedRead; use self::framed_read::FramedRead;
@@ -21,6 +21,7 @@ use error::Reason;
use frame::{self, Frame}; use frame::{self, Frame};
use futures::{self, task, Poll, Async, AsyncSink, Sink, Stream as Stream2}; use futures::{self, task, Poll, Async, AsyncSink, Sink, Stream as Stream2};
use futures::task::Task;
use bytes::{Buf, IntoBuf}; use bytes::{Buf, IntoBuf};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited; use tokio_io::codec::length_delimited;
@@ -57,7 +58,7 @@ pub struct WindowUpdate {
// Constants // Constants
pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535;
pub const MAX_WINDOW_SIZE: WindowSize = ::std::u32::MAX; pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
/// Create a transport prepared to handle the server handshake. /// Create a transport prepared to handle the server handshake.
/// ///

View File

@@ -108,53 +108,4 @@ impl<B> Deque<B> {
None => None, None => None,
} }
} }
pub fn take_while<F>(&mut self, buf: &mut Buffer<B>, mut f: F) -> Self
where F: FnMut(&Frame<B>) -> bool
{
match self.indices {
Some(mut idxs) => {
if !f(&buf.slab[idxs.head].frame) {
return Deque::new();
}
let head = idxs.head;
let mut tail = idxs.head;
loop {
let next = match buf.slab[tail].next {
Some(next) => next,
None => {
self.indices = None;
return Deque {
indices: Some(idxs),
_p: PhantomData,
};
}
};
if !f(&buf.slab[next].frame) {
// Split the linked list
buf.slab[tail].next = None;
self.indices = Some(Indices {
head: next,
tail: idxs.tail,
});
return Deque {
indices: Some(Indices {
head: head,
tail: tail,
}),
_p: PhantomData,
}
}
tail = next;
}
}
None => Deque::new(),
}
}
} }

View File

@@ -1,5 +1,6 @@
use ConnectionError; use ConnectionError;
use proto::*; use proto::*;
use error::Reason::*;
use std::cmp; use std::cmp;
@@ -48,29 +49,69 @@ impl FlowControl {
self.available -= capacity; self.available -= capacity;
} }
pub fn assign_capacity(&mut self, capacity: WindowSize) { pub fn assign_capacity(&mut self, capacity: WindowSize)
assert!(self.window_size() >= self.available + capacity); -> Result<(), ConnectionError>
self.available += capacity; {
let (val, overflow) = self.available.overflowing_add(capacity);
if overflow {
return Err(FlowControlError.into());
}
if val > MAX_WINDOW_SIZE {
return Err(FlowControlError.into());
}
self.available = val;
Ok(())
} }
/// Update the window size. /// Returns the number of bytes available but not assigned to the window.
///
/// This represents pending outbound WINDOW_UPDATE frames.
pub fn unclaimed_capacity(&self) -> WindowSize {
let available = self.available as i32;
if self.window_size >= available {
return 0;
}
(available - self.window_size) as WindowSize
}
/// Increase the window size.
/// ///
/// This is called after receiving a WINDOW_UPDATE frame /// This is called after receiving a WINDOW_UPDATE frame
pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), ConnectionError> { pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), ConnectionError> {
// TODO: Handle invalid increment let (val, overflow) = self.window_size.overflowing_add(sz as i32);
self.window_size += sz as i32;
if overflow {
return Err(FlowControlError.into());
}
if val > MAX_WINDOW_SIZE as i32 {
return Err(FlowControlError.into());
}
self.window_size = val;
Ok(()) Ok(())
} }
/// Decrement the window size.
///
/// This is called after receiving a SETTINGS frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_window(&mut self, sz: WindowSize) {
// This should not be able to overflow `window_size` from the bottom.
self.window_size -= sz as i32;
}
/// Decrements the window reflecting data has actually been sent. The caller /// Decrements the window reflecting data has actually been sent. The caller
/// must ensure that the window has capacity. /// must ensure that the window has capacity.
pub fn send_data(&mut self, sz: WindowSize) { pub fn send_data(&mut self, sz: WindowSize) {
trace!("send_data; sz={}; window={}; available={}", trace!("send_data; sz={}; window={}; available={}",
sz, self.window_size, self.available); sz, self.window_size, self.available);
// Available cannot be greater than the window
debug_assert!(self.available as i32 <= self.window_size || self.available == 0);
// Ensure that the argument is correct // Ensure that the argument is correct
assert!(sz <= self.window_size as WindowSize); assert!(sz <= self.window_size as WindowSize);

View File

@@ -8,7 +8,7 @@ mod store;
mod stream; mod stream;
mod streams; mod streams;
pub(crate) use self::streams::{Streams, StreamRef, Chunk}; pub(crate) use self::streams::{Streams, StreamRef};
pub(crate) use self::prioritize::Prioritized; pub(crate) use self::prioritize::Prioritized;
use self::buffer::Buffer; use self::buffer::Buffer;

View File

@@ -17,10 +17,6 @@ pub(super) struct Prioritize<B> {
/// Holds frames that are waiting to be written to the socket /// Holds frames that are waiting to be written to the socket
buffer: Buffer<B>, buffer: Buffer<B>,
/// Holds the connection task. This signals the connection that there is
/// data to flush.
conn_task: Option<task::Task>,
} }
pub(crate) struct Prioritized<B> { pub(crate) struct Prioritized<B> {
@@ -41,22 +37,25 @@ impl<B> Prioritize<B>
pub fn new(config: &Config) -> Prioritize<B> { pub fn new(config: &Config) -> Prioritize<B> {
let mut flow = FlowControl::new(); let mut flow = FlowControl::new();
flow.inc_window(config.init_local_window_sz); flow.inc_window(config.init_local_window_sz)
flow.assign_capacity(config.init_local_window_sz); .ok().expect("invalid initial window size");
flow.assign_capacity(config.init_local_window_sz)
.ok().expect("invalid initial window size");
Prioritize { Prioritize {
pending_send: store::Queue::new(), pending_send: store::Queue::new(),
pending_capacity: store::Queue::new(), pending_capacity: store::Queue::new(),
flow: flow, flow: flow,
buffer: Buffer::new(), buffer: Buffer::new(),
conn_task: None,
} }
} }
/// Queue a frame to be sent to the remote /// Queue a frame to be sent to the remote
pub fn queue_frame(&mut self, pub fn queue_frame(&mut self,
frame: Frame<B>, frame: Frame<B>,
stream: &mut store::Ptr<B>) stream: &mut store::Ptr<B>,
task: &mut Option<Task>)
{ {
// Queue the frame in the buffer // Queue the frame in the buffer
stream.pending_send.push_back(&mut self.buffer, frame); stream.pending_send.push_back(&mut self.buffer, frame);
@@ -65,7 +64,7 @@ impl<B> Prioritize<B>
self.pending_send.push(stream); self.pending_send.push(stream);
// Notify the connection. // Notify the connection.
if let Some(task) = self.conn_task.take() { if let Some(task) = task.take() {
task.notify(); task.notify();
} }
} }
@@ -73,7 +72,8 @@ impl<B> Prioritize<B>
/// Send a data frame /// Send a data frame
pub fn send_data(&mut self, pub fn send_data(&mut self,
frame: frame::Data<B>, frame: frame::Data<B>,
stream: &mut store::Ptr<B>) stream: &mut store::Ptr<B>,
task: &mut Option<Task>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
let sz = frame.payload().remaining(); let sz = frame.payload().remaining();
@@ -112,7 +112,7 @@ impl<B> Prioritize<B>
if stream.send_flow.available() > stream.buffered_send_data { if stream.send_flow.available() > stream.buffered_send_data {
// The stream currently has capacity to send the data frame, so // The stream currently has capacity to send the data frame, so
// queue it up and notify the connection task. // queue it up and notify the connection task.
self.queue_frame(frame.into(), stream); self.queue_frame(frame.into(), stream, task);
} else { } else {
// The stream has no capacity to send the frame now, save it but // The stream has no capacity to send the frame now, save it but
// don't notify the conneciton task. Once additional capacity // don't notify the conneciton task. Once additional capacity
@@ -155,10 +155,6 @@ impl<B> Prioritize<B>
stream: &mut store::Ptr<B>) stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
if !stream.state.is_send_streaming() {
return Ok(());
}
// Update the stream level flow control. // Update the stream level flow control.
stream.send_flow.inc_window(inc)?; stream.send_flow.inc_window(inc)?;
@@ -215,6 +211,8 @@ impl<B> Prioritize<B>
return; return;
} }
debug_assert!(stream.state.is_send_streaming());
// The amount of currently available capacity on the connection // The amount of currently available capacity on the connection
let conn_available = self.flow.available(); let conn_available = self.flow.available();
@@ -294,9 +292,6 @@ impl<B> Prioritize<B>
// This might release a data frame... // This might release a data frame...
if !self.reclaim_frame(store, dst) { if !self.reclaim_frame(store, dst) {
// Nothing else to do, track the task
self.conn_task = Some(task::current());
return Ok(().into()); return Ok(().into());
} }

View File

@@ -19,14 +19,16 @@ pub(super) struct Recv<B> {
init_window_sz: WindowSize, init_window_sz: WindowSize,
/// Connection level flow control governing received data /// Connection level flow control governing received data
flow_control: FlowControl, flow: FlowControl,
/// The lowest stream ID that is still idle /// The lowest stream ID that is still idle
next_stream_id: StreamId, next_stream_id: StreamId,
/// The stream ID of the last processed stream
last_processed_id: StreamId,
/// Streams that have pending window updates /// Streams that have pending window updates
/// TODO: don't use a VecDeque pending_window_updates: store::Queue<B, stream::NextWindowUpdate>,
pending_window_updates: VecDeque<StreamId>,
/// New streams to be accepted /// New streams to be accepted
pending_accept: store::Queue<B, stream::Next>, pending_accept: store::Queue<B, stream::Next>,
@@ -40,12 +42,6 @@ pub(super) struct Recv<B> {
_p: PhantomData<(B)>, _p: PhantomData<(B)>,
} }
#[derive(Debug)]
pub(super) struct Chunk {
/// Data frames pending receival
pub pending_recv: buffer::Deque<Bytes>,
}
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
struct Indices { struct Indices {
head: store::Key, head: store::Key,
@@ -63,14 +59,16 @@ impl<B> Recv<B> where B: Buf {
let mut flow = FlowControl::new(); let mut flow = FlowControl::new();
flow.inc_window(config.init_remote_window_sz); flow.inc_window(config.init_remote_window_sz);
flow.assign_capacity(config.init_remote_window_sz);
Recv { Recv {
max_streams: config.max_remote_initiated, max_streams: config.max_remote_initiated,
num_streams: 0, num_streams: 0,
init_window_sz: config.init_remote_window_sz, init_window_sz: config.init_remote_window_sz,
flow_control: flow, flow: flow,
next_stream_id: next_stream_id.into(), next_stream_id: next_stream_id.into(),
pending_window_updates: VecDeque::new(), pending_window_updates: store::Queue::new(),
last_processed_id: StreamId::zero(),
pending_accept: store::Queue::new(), pending_accept: store::Queue::new(),
buffer: Buffer::new(), buffer: Buffer::new(),
refused: None, refused: None,
@@ -78,6 +76,11 @@ impl<B> Recv<B> where B: Buf {
} }
} }
/// Returns the ID of the last processed stream
pub fn last_processed_id(&self) -> StreamId {
self.last_processed_id
}
/// Update state reflecting a new, remotely opened stream /// Update state reflecting a new, remotely opened stream
/// ///
/// Returns the stream state if successful. `None` if refused /// Returns the stream state if successful. `None` if refused
@@ -138,7 +141,10 @@ impl<B> Recv<B> where B: Buf {
trace!("opening stream; init_window={}", self.init_window_sz); trace!("opening stream; init_window={}", self.init_window_sz);
let is_initial = stream.state.recv_open(frame.is_end_stream())?; let is_initial = stream.state.recv_open(frame.is_end_stream())?;
// TODO: Update flow control if stream.state.is_recv_streaming() {
stream.recv_flow.inc_window(self.init_window_sz)?;
stream.recv_flow.assign_capacity(self.init_window_sz);
}
if is_initial { if is_initial {
if !self.can_inc_num_streams() { if !self.can_inc_num_streams() {
@@ -152,6 +158,11 @@ impl<B> Recv<B> where B: Buf {
return Err(ProtocolError.into()); return Err(ProtocolError.into());
} }
// TODO: be smarter about this logic
if frame.stream_id() > self.last_processed_id {
self.last_processed_id = frame.stream_id();
}
// Increment the number of concurrent streams // Increment the number of concurrent streams
self.inc_num_streams(); self.inc_num_streams();
} }
@@ -185,6 +196,35 @@ impl<B> Recv<B> where B: Buf {
Ok(()) Ok(())
} }
pub fn release_capacity(&mut self,
capacity: WindowSize,
stream: &mut store::Ptr<B>,
send: &mut Send<B>,
task: &mut Option<Task>)
-> Result<(), ConnectionError>
{
if capacity > stream.in_flight_recv_data {
// TODO: Handle error
unimplemented!();
}
// Decrement in-flight data
stream.in_flight_recv_data -= capacity;
// Assign capacity to connection & stream
self.flow.assign_capacity(capacity);
stream.recv_flow.assign_capacity(capacity);
// Queue the stream for sending the WINDOW_UPDATE frame.
self.pending_window_updates.push(stream);
if let Some(task) = task.take() {
task.notify();
}
Ok(())
}
pub fn recv_data(&mut self, pub fn recv_data(&mut self,
frame: frame::Data, frame: frame::Data,
stream: &mut store::Ptr<B>) stream: &mut store::Ptr<B>)
@@ -198,24 +238,29 @@ impl<B> Recv<B> where B: Buf {
let sz = sz as WindowSize; let sz = sz as WindowSize;
// TODO: implement if !stream.state.is_recv_streaming() {
/* // Receiving a DATA frame when not expecting one is a protocol
match stream.recv_flow_control() { // error.
Some(flow) => { return Err(ProtocolError.into());
// Ensure there's enough capacity on the connection before
// acting on the stream.
try!(self.flow_control.ensure_window(sz, FlowControlError));
// Claim the window on the stream
try!(flow.claim_window(sz, FlowControlError));
// Claim the window on the connection.
self.flow_control.claim_window(sz, FlowControlError)
.expect("local connection flow control error");
}
None => return Err(ProtocolError.into()),
} }
*/
trace!("recv_data; size={}; connection={}; stream={}",
sz, self.flow.window_size(), stream.recv_flow.window_size());
// Ensure that there is enough capacity on the connection before acting
// on the stream.
if self.flow.window_size() < sz || stream.recv_flow.window_size() < sz {
return Err(FlowControlError.into());
}
// Update connection level flow control
self.flow.send_data(sz);
// Update stream level flow control
stream.recv_flow.send_data(sz);
// Track the data as in-flight
stream.in_flight_recv_data += sz;
if frame.is_end_stream() { if frame.is_end_stream() {
try!(stream.state.recv_close()); try!(stream.state.recv_close());
@@ -294,6 +339,7 @@ impl<B> Recv<B> where B: Buf {
Ok(()) Ok(())
} }
/// Handle a received error
pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream<B>) { pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream<B>) {
// Receive an error // Receive an error
stream.state.recv_err(err); stream.state.recv_err(err);
@@ -384,47 +430,33 @@ impl<B> Recv<B> where B: Buf {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} }
pub fn expand_connection_window(&mut self, sz: WindowSize) pub fn poll_complete<T>(&mut self,
-> Result<(), ConnectionError> store: &mut Store<B>,
{ dst: &mut Codec<T, Prioritized<B>>)
unimplemented!();
/*
// TODO: handle overflow
self.flow_control.expand_window(sz);
Ok(())
*/
}
pub fn expand_stream_window(&mut self,
id: StreamId,
sz: WindowSize,
stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{
unimplemented!();
/*
// TODO: handle overflow
if let Some(flow) = stream.recv_flow_control() {
flow.expand_window(sz);
self.pending_window_updates.push_back(id);
}
Ok(())
*/
}
/*
/// Send connection level window update
pub fn send_connection_window_update<T>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError> -> Poll<(), ConnectionError>
where T: AsyncWrite, where T: AsyncWrite,
{ {
if let Some(incr) = self.flow_control.peek_window_update() { // Send any pending connection level window updates
try_ready!(self.send_connection_window_update(dst));
// Send any pending stream level window updates
try_ready!(self.send_stream_window_updates(store, dst));
Ok(().into())
}
/// Send connection level window update
fn send_connection_window_update<T>(&mut self, dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
{
let incr = self.flow.unclaimed_capacity();
if incr > 0 {
let frame = frame::WindowUpdate::new(StreamId::zero(), incr); let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
if dst.start_send(frame.into())?.is_ready() { if dst.start_send(frame.into())?.is_ready() {
assert_eq!(Some(incr), self.flow_control.apply_window_update()); self.flow.inc_window(incr);
} else { } else {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
@@ -432,73 +464,78 @@ impl<B> Recv<B> where B: Buf {
Ok(().into()) Ok(().into())
} }
*/
/// Send stream level window update
pub fn send_stream_window_updates<T>(&mut self,
store: &mut Store<B>,
dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
{
loop {
// Ensure the codec has capacity
try_ready!(dst.poll_ready());
// Get the next stream
let stream = match self.pending_window_updates.pop(store) {
Some(stream) => stream,
None => return Ok(().into()),
};
if !stream.state.is_recv_streaming() {
// No need to send window updates on the stream if the stream is
// no longer receiving data.
continue;
}
// TODO: de-dup
let incr = stream.recv_flow.unclaimed_capacity();
if incr > 0 {
let frame = frame::WindowUpdate::new(stream.id, incr);
let res = dst.start_send(frame.into())?;
assert!(res.is_ready());
}
}
}
pub fn next_incoming(&mut self, store: &mut Store<B>) -> Option<store::Key> { pub fn next_incoming(&mut self, store: &mut Store<B>) -> Option<store::Key> {
self.pending_accept.pop(store) self.pending_accept.pop(store)
.map(|ptr| ptr.key()) .map(|ptr| ptr.key())
} }
pub fn poll_chunk(&mut self, stream: &mut Stream<B>) pub fn poll_data(&mut self, stream: &mut Stream<B>)
-> Poll<Option<Chunk>, ConnectionError> -> Poll<Option<Bytes>, ConnectionError>
{ {
let frames = stream.pending_recv match stream.pending_recv.pop_front(&mut self.buffer) {
.take_while(&mut self.buffer, |frame| frame.is_data()); Some(frame) => {
match frame {
Frame::Data(frame) => {
Ok(Some(frame.into_payload()).into())
}
frame => {
// Frame is trailer
stream.pending_recv.push_front(&mut self.buffer, frame);
if frames.is_empty() { // No more data frames
if stream.state.is_recv_closed() { Ok(None.into())
Ok(None.into())
} else {
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
} else {
Ok(Some(Chunk {
pending_recv: frames,
}).into())
}
}
pub fn pop_bytes(&mut self, chunk: &mut Chunk) -> Option<Bytes> {
match chunk.pending_recv.pop_front(&mut self.buffer) {
Some(Frame::Data(frame)) => {
Some(frame.into_payload())
}
None => None,
_ => panic!("unexpected frame type"),
}
}
/*
/// Send stream level window update
pub fn send_stream_window_update<T>(&mut self,
streams: &mut Store<B>,
dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
{
while let Some(id) = self.pending_window_updates.pop_front() {
let flow = streams.find_mut(&id)
.and_then(|stream| stream.into_mut().recv_flow_control());
if let Some(flow) = flow {
if let Some(incr) = flow.peek_window_update() {
let frame = frame::WindowUpdate::new(id, incr);
if dst.start_send(frame.into())?.is_ready() {
assert_eq!(Some(incr), flow.apply_window_update());
} else {
self.pending_window_updates.push_front(id);
return Ok(Async::NotReady);
} }
} }
} }
None => {
if stream.state.is_recv_closed() {
// No more data frames will be received
Ok(None.into())
} else {
// Request to get notified once more data frames arrive
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
}
} }
Ok(().into())
} }
*/
fn reset(&mut self, _stream_id: StreamId, _reason: Reason) { fn reset(&mut self, _stream_id: StreamId, _reason: Reason) {
unimplemented!(); unimplemented!();

View File

@@ -84,7 +84,8 @@ impl<B> Send<B> where B: Buf {
pub fn send_headers(&mut self, pub fn send_headers(&mut self,
frame: frame::Headers, frame: frame::Headers,
stream: &mut store::Ptr<B>) stream: &mut store::Ptr<B>,
task: &mut Option<Task>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
trace!("send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz); trace!("send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz);
@@ -96,7 +97,7 @@ impl<B> Send<B> where B: Buf {
} }
// Queue the frame for sending // Queue the frame for sending
self.prioritize.queue_frame(frame.into(), stream); self.prioritize.queue_frame(frame.into(), stream, task);
Ok(()) Ok(())
} }
@@ -109,10 +110,11 @@ impl<B> Send<B> where B: Buf {
pub fn send_data(&mut self, pub fn send_data(&mut self,
frame: frame::Data<B>, frame: frame::Data<B>,
stream: &mut store::Ptr<B>) stream: &mut store::Ptr<B>,
task: &mut Option<Task>)
-> Result<(), ConnectionError> -> Result<(), ConnectionError>
{ {
self.prioritize.send_data(frame, stream) self.prioritize.send_data(frame, stream, task)
} }
pub fn poll_complete<T>(&mut self, pub fn poll_complete<T>(&mut self,
@@ -168,11 +170,13 @@ impl<B> Send<B> where B: Buf {
} }
pub fn recv_stream_window_update(&mut self, pub fn recv_stream_window_update(&mut self,
frame: frame::WindowUpdate, sz: WindowSize,
stream: &mut store::Ptr<B>) stream: &mut store::Ptr<B>)
-> Result<(), ConnectionError>
{ {
self.prioritize.recv_stream_window_update(frame.size_increment(), stream) if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
// TODO: Send reset
unimplemented!();
}
} }
pub fn apply_remote_settings(&mut self, pub fn apply_remote_settings(&mut self,
@@ -210,32 +214,20 @@ impl<B> Send<B> where B: Buf {
store.for_each(|mut stream| { store.for_each(|mut stream| {
let stream = &mut *stream; let stream = &mut *stream;
unimplemented!(); if stream.state.is_send_streaming() {
/* stream.send_flow.dec_window(dec);
if let Some(flow) = stream.state.send_flow_control() {
flow.shrink_window(val);
// Update the unadvertised number as well // TODO: Handle reclaiming connection level window
if stream.unadvertised_send_window < dec { // capacity.
stream.unadvertised_send_window = 0;
} else {
stream.unadvertised_send_window -= dec;
}
unimplemented!(); // TODO: Should this notify the producer?
} }
*/
}); });
} else if val > old_val { } else if val > old_val {
let inc = val - old_val; let inc = val - old_val;
store.for_each(|mut stream| { store.for_each(|mut stream| {
unimplemented!(); self.recv_stream_window_update(inc, &mut stream);
/*
if let Some(flow) = stream.state.send_flow_control() {
unimplemented!();
}
*/
}); });
} }
} }

View File

@@ -52,6 +52,14 @@ pub(super) struct Stream<B> {
/// Receive data flow control /// Receive data flow control
pub recv_flow: FlowControl, pub recv_flow: FlowControl,
pub in_flight_recv_data: WindowSize,
/// Next node in the linked list of streams waiting to send window updates.
pub next_window_update: Option<store::Key>,
/// True if the stream is waiting to send a window update
pub is_pending_window_update: bool,
/// Frames pending for this stream to read /// Frames pending for this stream to read
pub pending_recv: buffer::Deque<Bytes>, pub pending_recv: buffer::Deque<Bytes>,
@@ -68,6 +76,9 @@ pub(super) struct Next;
#[derive(Debug)] #[derive(Debug)]
pub(super) struct NextSendCapacity; pub(super) struct NextSendCapacity;
#[derive(Debug)]
pub(super) struct NextWindowUpdate;
impl<B> Stream<B> { impl<B> Stream<B> {
pub fn new(id: StreamId) -> Stream<B> pub fn new(id: StreamId) -> Stream<B>
{ {
@@ -91,6 +102,9 @@ impl<B> Stream<B> {
// ===== Fields related to receiving ===== // ===== Fields related to receiving =====
recv_flow: FlowControl::new(), recv_flow: FlowControl::new(),
in_flight_recv_data: 0,
next_window_update: None,
is_pending_window_update: false,
pending_recv: buffer::Deque::new(), pending_recv: buffer::Deque::new(),
recv_task: None, recv_task: None,
pending_push_promises: store::Queue::new(), pending_push_promises: store::Queue::new(),
@@ -164,3 +178,25 @@ impl store::Next for NextSendCapacity {
stream.is_pending_send_capacity = val; stream.is_pending_send_capacity = val;
} }
} }
impl store::Next for NextWindowUpdate {
fn next<B>(stream: &Stream<B>) -> Option<store::Key> {
stream.next_window_update
}
fn set_next<B>(stream: &mut Stream<B>, key: Option<store::Key>) {
stream.next_window_update = key;
}
fn take_next<B>(stream: &mut Stream<B>) -> Option<store::Key> {
stream.next_window_update.take()
}
fn is_queued<B>(stream: &Stream<B>) -> bool {
stream.is_pending_window_update
}
fn set_queued<B>(stream: &mut Stream<B>, val: bool) {
stream.is_pending_window_update = val;
}
}

View File

@@ -17,14 +17,6 @@ pub(crate) struct StreamRef<B> {
key: store::Key, key: store::Key,
} }
#[derive(Debug)]
pub(crate) struct Chunk<B>
where B: Buf,
{
inner: Arc<Mutex<Inner<B>>>,
recv: recv::Chunk,
}
/// Fields needed to manage state related to managing the set of streams. This /// Fields needed to manage state related to managing the set of streams. This
/// is mostly split out to make ownership happy. /// is mostly split out to make ownership happy.
/// ///
@@ -42,6 +34,9 @@ struct Actions<B> {
/// Manages state transitions initiated by sending frames /// Manages state transitions initiated by sending frames
send: Send<B>, send: Send<B>,
/// Task that calls `poll_complete`.
task: Option<task::Task>,
} }
impl<B> Streams<B> impl<B> Streams<B>
@@ -53,6 +48,7 @@ impl<B> Streams<B>
actions: Actions { actions: Actions {
recv: Recv::new::<P>(&config), recv: Recv::new::<P>(&config),
send: Send::new::<P>(&config), send: Send::new::<P>(&config),
task: None,
}, },
store: Store::new(), store: Store::new(),
})), })),
@@ -147,14 +143,19 @@ impl<B> Streams<B>
}) })
} }
pub fn recv_err(&mut self, err: &ConnectionError) { /// Handle a received error and return the ID of the last processed stream.
pub fn recv_err(&mut self, err: &ConnectionError) -> StreamId {
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
let actions = &mut me.actions; let actions = &mut me.actions;
let last_processed_id = actions.recv.last_processed_id();
me.store.for_each(|mut stream| { me.store.for_each(|mut stream| {
actions.recv.recv_err(err, &mut *stream) actions.recv.recv_err(err, &mut *stream)
}); });
last_processed_id
} }
pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) pub fn recv_window_update(&mut self, frame: frame::WindowUpdate)
@@ -171,7 +172,8 @@ impl<B> Streams<B>
// The remote may send window updates for streams that the local now // The remote may send window updates for streams that the local now
// considers closed. It's ok... // considers closed. It's ok...
if let Some(mut stream) = me.store.find_mut(&id) { if let Some(mut stream) = me.store.find_mut(&id) {
try!(me.actions.send.recv_stream_window_update(frame, &mut stream)); me.actions.send.recv_stream_window_update(
frame.size_increment(), &mut stream);
} else { } else {
me.actions.recv.ensure_not_idle(id)?; me.actions.recv.ensure_not_idle(id)?;
} }
@@ -212,23 +214,6 @@ impl<B> Streams<B>
}) })
} }
pub fn expand_window(&mut self, id: StreamId, sz: WindowSize)
-> Result<(), ConnectionError>
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
if id.is_zero() {
try!(me.actions.recv.expand_connection_window(sz));
} else {
if let Some(mut stream) = me.store.find_mut(&id) {
try!(me.actions.recv.expand_stream_window(id, sz, &mut stream));
}
}
Ok(())
}
pub fn send_pending_refusal<T>(&mut self, dst: &mut Codec<T, Prioritized<B>>) pub fn send_pending_refusal<T>(&mut self, dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), ConnectionError> -> Poll<(), ConnectionError>
where T: AsyncWrite, where T: AsyncWrite,
@@ -245,7 +230,19 @@ impl<B> Streams<B>
let mut me = self.inner.lock().unwrap(); let mut me = self.inner.lock().unwrap();
let me = &mut *me; let me = &mut *me;
me.actions.send.poll_complete(&mut me.store, dst) // Send WINDOW_UPDATE frames first
//
// TODO: It would probably be better to interleave updates w/ data
// frames.
try_ready!(me.actions.recv.poll_complete(&mut me.store, dst));
// Send any other pending frames
try_ready!(me.actions.send.poll_complete(&mut me.store, dst));
// Nothing else to do, track the task
me.actions.task = Some(task::current());
Ok(().into())
} }
pub fn apply_remote_settings(&mut self, frame: &frame::Settings) { pub fn apply_remote_settings(&mut self, frame: &frame::Settings) {
@@ -283,7 +280,8 @@ impl<B> Streams<B>
let mut stream = me.store.insert(stream.id, stream); let mut stream = me.store.insert(stream.id, stream);
me.actions.send.send_headers(headers, &mut stream)?; me.actions.send.send_headers(
headers, &mut stream, &mut me.actions.task)?;
// Given that the stream has been initialized, it should not be in the // Given that the stream has been initialized, it should not be in the
// closed state. // closed state.
@@ -317,7 +315,7 @@ impl<B> StreamRef<B>
me.actions.transition::<P, _, _>(stream, |actions, stream| { me.actions.transition::<P, _, _>(stream, |actions, stream| {
// Send the data frame // Send the data frame
actions.send.send_data(frame, stream) actions.send.send_data(frame, stream, &mut actions.task)
}) })
} }
@@ -348,7 +346,7 @@ impl<B> StreamRef<B>
stream.id, response, end_of_stream); stream.id, response, end_of_stream);
me.actions.transition::<server::Peer, _, _>(stream, |actions, stream| { me.actions.transition::<server::Peer, _, _>(stream, |actions, stream| {
actions.send.send_headers(frame, stream) actions.send.send_headers(frame, stream, &mut actions.task)
}) })
} }
@@ -361,25 +359,27 @@ impl<B> StreamRef<B>
me.actions.recv.poll_response(&mut stream) me.actions.recv.poll_response(&mut stream)
} }
pub fn poll_data(&mut self) -> Poll<Option<Chunk<B>>, ConnectionError> { pub fn poll_data(&mut self) -> Poll<Option<Bytes>, ConnectionError> {
let recv = { let mut me = self.inner.lock().unwrap();
let mut me = self.inner.lock().unwrap(); let me = &mut *me;
let me = &mut *me;
let mut stream = me.store.resolve(self.key); let mut stream = me.store.resolve(self.key);
try_ready!(me.actions.recv.poll_chunk(&mut stream)) me.actions.recv.poll_data(&mut stream)
}; }
// Convert to a chunk /// Releases recv capacity back to the peer. This will result in sending
let chunk = recv.map(|recv| { /// WINDOW_UPDATE frames on both the stream and connection.
Chunk { pub fn release_capacity(&mut self, capacity: WindowSize)
inner: self.inner.clone(), -> Result<(), ConnectionError>
recv: recv, {
} let mut me = self.inner.lock().unwrap();
}); let me = &mut *me;
Ok(chunk.into()) let mut stream = me.store.resolve(self.key);
me.actions.recv.release_capacity(
capacity, &mut stream, &mut me.actions.send, &mut me.actions.task)
} }
/// Request capacity to send data /// Request capacity to send data
@@ -424,32 +424,6 @@ impl<B> Clone for StreamRef<B> {
} }
} }
// ===== impl Chunk =====
impl<B> Chunk<B>
where B: Buf,
{
// TODO: Come up w/ a better API
pub fn pop_bytes(&mut self) -> Option<Bytes> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
me.actions.recv.pop_bytes(&mut self.recv)
}
}
impl<B> Drop for Chunk<B>
where B: Buf,
{
fn drop(&mut self) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
while let Some(_) = me.actions.recv.pop_bytes(&mut self.recv) {
}
}
}
// ===== impl Actions ===== // ===== impl Actions =====
impl<B> Actions<B> impl<B> Actions<B>

View File

@@ -1,5 +1,5 @@
use {frame, ConnectionError, StreamId}; use {frame, ConnectionError, StreamId};
use {Body, Chunk}; use Body;
use proto::{self, Connection, WindowSize}; use proto::{self, Connection, WindowSize};
use error::Reason::*; use error::Reason::*;

View File

@@ -37,6 +37,8 @@ fn recv_invalid_server_stream_id() {
.write(SETTINGS_ACK) .write(SETTINGS_ACK)
// Read response // Read response
.read(&[0, 0, 1, 1, 5, 0, 0, 0, 2, 137]) .read(&[0, 0, 1, 1, 5, 0, 0, 0, 2, 137])
// Write GO_AWAY
.write(&[0, 0, 8, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])
.build(); .build();
let mut h2 = Client::handshake(mock) let mut h2 = Client::handshake(mock)

View File

@@ -90,15 +90,12 @@ fn send_recv_data() {
let (_, body) = resp.into_parts(); let (_, body) = resp.into_parts();
// Wait for all the data frames to be received // Wait for all the data frames to be received
let mut chunks = h2.run(body.collect()).unwrap(); let bytes = h2.run(body.collect()).unwrap();
// Only one chunk since two frames are coalesced. // One byte chunk
assert_eq!(1, chunks.len()); assert_eq!(1, bytes.len());
let data = chunks[0].pop_bytes().unwrap(); assert_eq!(bytes[0], &b"world"[..]);
assert_eq!(data, &b"world"[..]);
assert!(chunks[0].pop_bytes().is_none());
// The H2 connection is closed // The H2 connection is closed
h2.wait().unwrap(); h2.wait().unwrap();
@@ -141,18 +138,13 @@ fn send_headers_recv_data_single_frame() {
let (_, body) = resp.into_parts(); let (_, body) = resp.into_parts();
// Wait for all the data frames to be received // Wait for all the data frames to be received
let mut chunks = h2.run(body.collect()).unwrap(); let bytes = h2.run(body.collect()).unwrap();
// Only one chunk since two frames are coalesced. // Two data frames
assert_eq!(1, chunks.len()); assert_eq!(2, bytes.len());
let data = chunks[0].pop_bytes().unwrap(); assert_eq!(bytes[0], &b"hello"[..]);
assert_eq!(data, &b"hello"[..]); assert_eq!(bytes[1], &b"world"[..]);
let data = chunks[0].pop_bytes().unwrap();
assert_eq!(data, &b"world"[..]);
assert!(chunks[0].pop_bytes().is_none());
// The H2 connection is closed // The H2 connection is closed
h2.wait().unwrap(); h2.wait().unwrap();