From 807d2b73172072905f600f1f95bd0aaee5106c07 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 23 Aug 2017 11:22:24 -0700 Subject: [PATCH] Wire in recv flow control (#26) --- src/client.rs | 2 +- src/frame/go_away.rs | 9 +- src/frame/mod.rs | 3 + src/frame/window_update.rs | 4 +- src/lib.rs | 20 +-- src/proto/connection.rs | 114 +++++++++++-- src/proto/mod.rs | 5 +- src/proto/streams/buffer.rs | 49 ------ src/proto/streams/flow_control.rs | 59 ++++++- src/proto/streams/mod.rs | 2 +- src/proto/streams/prioritize.rs | 31 ++-- src/proto/streams/recv.rs | 273 +++++++++++++++++------------- src/proto/streams/send.rs | 42 ++--- src/proto/streams/stream.rs | 36 ++++ src/proto/streams/streams.rs | 118 +++++-------- src/server.rs | 2 +- tests/client_request.rs | 2 + tests/stream_states.rs | 26 +-- 18 files changed, 452 insertions(+), 345 deletions(-) diff --git a/src/client.rs b/src/client.rs index 20feb73..e2049c2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,5 @@ use {frame, ConnectionError, StreamId}; -use {Body, Chunk}; +use Body; use proto::{self, Connection, WindowSize}; use error::Reason::*; diff --git a/src/frame/go_away.rs b/src/frame/go_away.rs index 444e3d6..0063d2e 100644 --- a/src/frame/go_away.rs +++ b/src/frame/go_away.rs @@ -3,13 +3,20 @@ use frame::{self, Head, Error, Kind, StreamId}; use bytes::{BufMut, BigEndian}; -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct GoAway { last_stream_id: StreamId, error_code: u32, } impl GoAway { + pub fn new(last_stream_id: StreamId, reason: Reason) -> Self { + GoAway { + last_stream_id, + error_code: reason.into(), + } + } + pub fn reason(&self) -> Reason { self.error_code.into() } diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 45594b4..c310850 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -165,6 +165,9 @@ pub enum Error { /// An invalid setting value was provided InvalidSettingValue, + /// An invalid window update value + InvalidWindowUpdateValue, + /// The payload length specified by the frame header was not the /// value necessary for the specific frame type. InvalidPayloadLength, diff --git a/src/frame/window_update.rs b/src/frame/window_update.rs index 29f8c2a..fc4bd39 100644 --- a/src/frame/window_update.rs +++ b/src/frame/window_update.rs @@ -38,7 +38,9 @@ impl WindowUpdate { // when received. let size_increment = unpack_octets_4!(payload, 0, u32) & !SIZE_INCREMENT_MASK; - // TODO: the size_increment must be greater than 0 + if size_increment == 0 { + return Err(Error::InvalidWindowUpdateValue.into()); + } Ok(WindowUpdate { stream_id: head.stream_id(), diff --git a/src/lib.rs b/src/lib.rs index 479f549..a52567d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,30 +53,14 @@ pub struct Body { inner: proto::StreamRef, } -#[derive(Debug)] -pub struct Chunk { - inner: proto::Chunk, -} - // ===== impl Body ===== impl futures::Stream for Body { - type Item = Chunk; + type Item = Bytes; type Error = ConnectionError; fn poll(&mut self) -> Poll, Self::Error> { - let chunk = try_ready!(self.inner.poll_data()) - .map(|inner| Chunk { inner }); - - Ok(chunk.into()) - } -} - -// ===== impl Chunk ===== - -impl Chunk { - pub fn pop_bytes(&mut self) -> Option { - self.inner.pop_bytes() + self.inner.poll_data() } } diff --git a/src/proto/connection.rs b/src/proto/connection.rs index adb800c..9bfd9fb 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -13,14 +13,40 @@ use std::marker::PhantomData; /// An H2 connection #[derive(Debug)] pub(crate) struct Connection { - // Codec + /// Tracks the connection level state transitions. + state: State, + + /// Read / write frame values codec: Codec>, + + /// Ping/pong handler ping_pong: PingPong>, + + /// Connection settings settings: Settings, + + /// Stream state handler streams: Streams, + + /// Client or server _phantom: PhantomData

, } +#[derive(Debug)] +enum State { + /// Currently open in a sane state + Open, + + /// Waiting to send a GO_AWAY frame + GoAway(frame::GoAway), + + /// The codec must be flushed + Flush(Reason), + + /// In an errored state + Error(Reason), +} + impl Connection where T: AsyncRead + AsyncWrite, P: Peer, @@ -36,6 +62,7 @@ impl Connection }); Connection { + state: State::Open, codec: codec, ping_pong: PingPong::new(), settings: Settings::new(), @@ -62,13 +89,36 @@ impl Connection /// Advances the internal state of the connection. pub fn poll(&mut self) -> Poll<(), ConnectionError> { - match self.poll2() { - Err(e) => { - debug!("Connection::poll; err={:?}", e); - self.streams.recv_err(&e); - Err(e) + use error::ConnectionError::*; + + loop { + match self.state { + // When open, continue to poll a frame + State::Open => {}, + // In an error state + _ => { + try_ready!(self.poll_complete()); + + // GO_AWAY frame has been sent, return the error + return Err(self.state.error().unwrap().into()); + } + } + + match self.poll2() { + Err(Proto(e)) => { + debug!("Connection::poll; err={:?}", e); + let last_processed_id = self.streams.recv_err(&e.into()); + let frame = frame::GoAway::new(last_processed_id, e); + + self.state = State::GoAway(frame); + } + Err(e) => { + // TODO: Are I/O errors recoverable? + self.streams.recv_err(&e); + return Err(e); + } + ret => return ret, } - ret => ret, } } @@ -114,7 +164,7 @@ impl Connection self.settings.recv_settings(frame); } Some(GoAway(frame)) => { - // TODO: handle the last_stream_id. Also, should this be + // TODO: handle the last_processed_id. Also, should this be // handled as an error? let e = ConnectionError::Proto(frame.reason()); return Ok(().into()); @@ -141,12 +191,34 @@ impl Connection } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.poll_ready()); + loop { + match self.state { + State::Open => { + try_ready!(self.poll_ready()); - // Ensure all window updates have been sent. - try_ready!(self.streams.poll_complete(&mut self.codec)); + // Ensure all window updates have been sent. + try_ready!(self.streams.poll_complete(&mut self.codec)); - Ok(().into()) + return Ok(().into()); + } + State::GoAway(frame) => { + if !self.codec.start_send(frame.into())?.is_ready() { + // Not ready to send the frame... try again later. + return Ok(Async::NotReady); + } + + // GO_AWAY sent, transition the connection to an errored state + self.state = State::Flush(frame.reason()); + } + State::Flush(reason) => { + try_ready!(self.codec.poll_complete()); + self.state = State::Error(reason); + } + State::Error(..) => { + return Ok(().into()); + } + } + } } fn convert_poll_message(frame: frame::Headers) -> Result, ConnectionError> { @@ -185,3 +257,21 @@ impl Connection self.streams.next_incoming() } } + +// ====== impl State ===== + +impl State { + fn is_open(&self) -> bool { + match *self { + State::Open => true, + _ => false, + } + } + + fn error(&self) -> Option { + match *self { + State::Error(reason) => Some(reason), + _ => None, + } + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 9b39877..c8429d7 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -7,7 +7,7 @@ mod settings; mod streams; pub(crate) use self::connection::Connection; -pub(crate) use self::streams::{Streams, StreamRef, Chunk}; +pub(crate) use self::streams::{Streams, StreamRef}; use self::codec::Codec; use self::framed_read::FramedRead; @@ -21,6 +21,7 @@ use error::Reason; use frame::{self, Frame}; use futures::{self, task, Poll, Async, AsyncSink, Sink, Stream as Stream2}; +use futures::task::Task; use bytes::{Buf, IntoBuf}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; @@ -57,7 +58,7 @@ pub struct WindowUpdate { // Constants pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; -pub const MAX_WINDOW_SIZE: WindowSize = ::std::u32::MAX; +pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; /// Create a transport prepared to handle the server handshake. /// diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 4d30918..1011304 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -108,53 +108,4 @@ impl Deque { None => None, } } - - pub fn take_while(&mut self, buf: &mut Buffer, mut f: F) -> Self - where F: FnMut(&Frame) -> bool - { - match self.indices { - Some(mut idxs) => { - if !f(&buf.slab[idxs.head].frame) { - return Deque::new(); - } - - let head = idxs.head; - let mut tail = idxs.head; - - loop { - let next = match buf.slab[tail].next { - Some(next) => next, - None => { - self.indices = None; - return Deque { - indices: Some(idxs), - _p: PhantomData, - }; - } - }; - - if !f(&buf.slab[next].frame) { - // Split the linked list - buf.slab[tail].next = None; - - self.indices = Some(Indices { - head: next, - tail: idxs.tail, - }); - - return Deque { - indices: Some(Indices { - head: head, - tail: tail, - }), - _p: PhantomData, - } - } - - tail = next; - } - } - None => Deque::new(), - } - } } diff --git a/src/proto/streams/flow_control.rs b/src/proto/streams/flow_control.rs index f2c3a28..d404474 100644 --- a/src/proto/streams/flow_control.rs +++ b/src/proto/streams/flow_control.rs @@ -1,5 +1,6 @@ use ConnectionError; use proto::*; +use error::Reason::*; use std::cmp; @@ -48,29 +49,69 @@ impl FlowControl { self.available -= capacity; } - pub fn assign_capacity(&mut self, capacity: WindowSize) { - assert!(self.window_size() >= self.available + capacity); - self.available += capacity; + pub fn assign_capacity(&mut self, capacity: WindowSize) + -> Result<(), ConnectionError> + { + let (val, overflow) = self.available.overflowing_add(capacity); + + if overflow { + return Err(FlowControlError.into()); + } + + if val > MAX_WINDOW_SIZE { + return Err(FlowControlError.into()); + } + + self.available = val; + Ok(()) } - /// Update the window size. + /// Returns the number of bytes available but not assigned to the window. + /// + /// This represents pending outbound WINDOW_UPDATE frames. + pub fn unclaimed_capacity(&self) -> WindowSize { + let available = self.available as i32; + + if self.window_size >= available { + return 0; + } + + (available - self.window_size) as WindowSize + } + + /// Increase the window size. /// /// This is called after receiving a WINDOW_UPDATE frame pub fn inc_window(&mut self, sz: WindowSize) -> Result<(), ConnectionError> { - // TODO: Handle invalid increment - self.window_size += sz as i32; + let (val, overflow) = self.window_size.overflowing_add(sz as i32); + + if overflow { + return Err(FlowControlError.into()); + } + + if val > MAX_WINDOW_SIZE as i32 { + return Err(FlowControlError.into()); + } + + self.window_size = val; Ok(()) } + /// Decrement the window size. + /// + /// This is called after receiving a SETTINGS frame with a lower + /// INITIAL_WINDOW_SIZE value. + pub fn dec_window(&mut self, sz: WindowSize) { + // This should not be able to overflow `window_size` from the bottom. + self.window_size -= sz as i32; + } + /// Decrements the window reflecting data has actually been sent. The caller /// must ensure that the window has capacity. pub fn send_data(&mut self, sz: WindowSize) { trace!("send_data; sz={}; window={}; available={}", sz, self.window_size, self.available); - // Available cannot be greater than the window - debug_assert!(self.available as i32 <= self.window_size || self.available == 0); - // Ensure that the argument is correct assert!(sz <= self.window_size as WindowSize); diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs index d53fdc9..2f52cd6 100644 --- a/src/proto/streams/mod.rs +++ b/src/proto/streams/mod.rs @@ -8,7 +8,7 @@ mod store; mod stream; mod streams; -pub(crate) use self::streams::{Streams, StreamRef, Chunk}; +pub(crate) use self::streams::{Streams, StreamRef}; pub(crate) use self::prioritize::Prioritized; use self::buffer::Buffer; diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 358e522..ba30461 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -17,10 +17,6 @@ pub(super) struct Prioritize { /// Holds frames that are waiting to be written to the socket buffer: Buffer, - - /// Holds the connection task. This signals the connection that there is - /// data to flush. - conn_task: Option, } pub(crate) struct Prioritized { @@ -41,22 +37,25 @@ impl Prioritize pub fn new(config: &Config) -> Prioritize { let mut flow = FlowControl::new(); - flow.inc_window(config.init_local_window_sz); - flow.assign_capacity(config.init_local_window_sz); + flow.inc_window(config.init_local_window_sz) + .ok().expect("invalid initial window size"); + + flow.assign_capacity(config.init_local_window_sz) + .ok().expect("invalid initial window size"); Prioritize { pending_send: store::Queue::new(), pending_capacity: store::Queue::new(), flow: flow, buffer: Buffer::new(), - conn_task: None, } } /// Queue a frame to be sent to the remote pub fn queue_frame(&mut self, frame: Frame, - stream: &mut store::Ptr) + stream: &mut store::Ptr, + task: &mut Option) { // Queue the frame in the buffer stream.pending_send.push_back(&mut self.buffer, frame); @@ -65,7 +64,7 @@ impl Prioritize self.pending_send.push(stream); // Notify the connection. - if let Some(task) = self.conn_task.take() { + if let Some(task) = task.take() { task.notify(); } } @@ -73,7 +72,8 @@ impl Prioritize /// Send a data frame pub fn send_data(&mut self, frame: frame::Data, - stream: &mut store::Ptr) + stream: &mut store::Ptr, + task: &mut Option) -> Result<(), ConnectionError> { let sz = frame.payload().remaining(); @@ -112,7 +112,7 @@ impl Prioritize if stream.send_flow.available() > stream.buffered_send_data { // The stream currently has capacity to send the data frame, so // queue it up and notify the connection task. - self.queue_frame(frame.into(), stream); + self.queue_frame(frame.into(), stream, task); } else { // The stream has no capacity to send the frame now, save it but // don't notify the conneciton task. Once additional capacity @@ -155,10 +155,6 @@ impl Prioritize stream: &mut store::Ptr) -> Result<(), ConnectionError> { - if !stream.state.is_send_streaming() { - return Ok(()); - } - // Update the stream level flow control. stream.send_flow.inc_window(inc)?; @@ -215,6 +211,8 @@ impl Prioritize return; } + debug_assert!(stream.state.is_send_streaming()); + // The amount of currently available capacity on the connection let conn_available = self.flow.available(); @@ -294,9 +292,6 @@ impl Prioritize // This might release a data frame... if !self.reclaim_frame(store, dst) { - // Nothing else to do, track the task - self.conn_task = Some(task::current()); - return Ok(().into()); } diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 2509a2d..6b63095 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -19,14 +19,16 @@ pub(super) struct Recv { init_window_sz: WindowSize, /// Connection level flow control governing received data - flow_control: FlowControl, + flow: FlowControl, /// The lowest stream ID that is still idle next_stream_id: StreamId, + /// The stream ID of the last processed stream + last_processed_id: StreamId, + /// Streams that have pending window updates - /// TODO: don't use a VecDeque - pending_window_updates: VecDeque, + pending_window_updates: store::Queue, /// New streams to be accepted pending_accept: store::Queue, @@ -40,12 +42,6 @@ pub(super) struct Recv { _p: PhantomData<(B)>, } -#[derive(Debug)] -pub(super) struct Chunk { - /// Data frames pending receival - pub pending_recv: buffer::Deque, -} - #[derive(Debug, Clone, Copy)] struct Indices { head: store::Key, @@ -63,14 +59,16 @@ impl Recv where B: Buf { let mut flow = FlowControl::new(); flow.inc_window(config.init_remote_window_sz); + flow.assign_capacity(config.init_remote_window_sz); Recv { max_streams: config.max_remote_initiated, num_streams: 0, init_window_sz: config.init_remote_window_sz, - flow_control: flow, + flow: flow, next_stream_id: next_stream_id.into(), - pending_window_updates: VecDeque::new(), + pending_window_updates: store::Queue::new(), + last_processed_id: StreamId::zero(), pending_accept: store::Queue::new(), buffer: Buffer::new(), refused: None, @@ -78,6 +76,11 @@ impl Recv where B: Buf { } } + /// Returns the ID of the last processed stream + pub fn last_processed_id(&self) -> StreamId { + self.last_processed_id + } + /// Update state reflecting a new, remotely opened stream /// /// Returns the stream state if successful. `None` if refused @@ -138,7 +141,10 @@ impl Recv where B: Buf { trace!("opening stream; init_window={}", self.init_window_sz); let is_initial = stream.state.recv_open(frame.is_end_stream())?; - // TODO: Update flow control + if stream.state.is_recv_streaming() { + stream.recv_flow.inc_window(self.init_window_sz)?; + stream.recv_flow.assign_capacity(self.init_window_sz); + } if is_initial { if !self.can_inc_num_streams() { @@ -152,6 +158,11 @@ impl Recv where B: Buf { return Err(ProtocolError.into()); } + // TODO: be smarter about this logic + if frame.stream_id() > self.last_processed_id { + self.last_processed_id = frame.stream_id(); + } + // Increment the number of concurrent streams self.inc_num_streams(); } @@ -185,6 +196,35 @@ impl Recv where B: Buf { Ok(()) } + pub fn release_capacity(&mut self, + capacity: WindowSize, + stream: &mut store::Ptr, + send: &mut Send, + task: &mut Option) + -> Result<(), ConnectionError> + { + if capacity > stream.in_flight_recv_data { + // TODO: Handle error + unimplemented!(); + } + + // Decrement in-flight data + stream.in_flight_recv_data -= capacity; + + // Assign capacity to connection & stream + self.flow.assign_capacity(capacity); + stream.recv_flow.assign_capacity(capacity); + + // Queue the stream for sending the WINDOW_UPDATE frame. + self.pending_window_updates.push(stream); + + if let Some(task) = task.take() { + task.notify(); + } + + Ok(()) + } + pub fn recv_data(&mut self, frame: frame::Data, stream: &mut store::Ptr) @@ -198,24 +238,29 @@ impl Recv where B: Buf { let sz = sz as WindowSize; - // TODO: implement - /* - match stream.recv_flow_control() { - Some(flow) => { - // Ensure there's enough capacity on the connection before - // acting on the stream. - try!(self.flow_control.ensure_window(sz, FlowControlError)); - - // Claim the window on the stream - try!(flow.claim_window(sz, FlowControlError)); - - // Claim the window on the connection. - self.flow_control.claim_window(sz, FlowControlError) - .expect("local connection flow control error"); - } - None => return Err(ProtocolError.into()), + if !stream.state.is_recv_streaming() { + // Receiving a DATA frame when not expecting one is a protocol + // error. + return Err(ProtocolError.into()); } - */ + + trace!("recv_data; size={}; connection={}; stream={}", + sz, self.flow.window_size(), stream.recv_flow.window_size()); + + // Ensure that there is enough capacity on the connection before acting + // on the stream. + if self.flow.window_size() < sz || stream.recv_flow.window_size() < sz { + return Err(FlowControlError.into()); + } + + // Update connection level flow control + self.flow.send_data(sz); + + // Update stream level flow control + stream.recv_flow.send_data(sz); + + // Track the data as in-flight + stream.in_flight_recv_data += sz; if frame.is_end_stream() { try!(stream.state.recv_close()); @@ -294,6 +339,7 @@ impl Recv where B: Buf { Ok(()) } + /// Handle a received error pub fn recv_err(&mut self, err: &ConnectionError, stream: &mut Stream) { // Receive an error stream.state.recv_err(err); @@ -384,47 +430,33 @@ impl Recv where B: Buf { Ok(Async::Ready(())) } - pub fn expand_connection_window(&mut self, sz: WindowSize) - -> Result<(), ConnectionError> - { - unimplemented!(); - /* - // TODO: handle overflow - self.flow_control.expand_window(sz); - - Ok(()) - */ - } - - pub fn expand_stream_window(&mut self, - id: StreamId, - sz: WindowSize, - stream: &mut store::Ptr) - -> Result<(), ConnectionError> - { - unimplemented!(); - /* - // TODO: handle overflow - if let Some(flow) = stream.recv_flow_control() { - flow.expand_window(sz); - self.pending_window_updates.push_back(id); - } - - Ok(()) - */ - } - - /* - /// Send connection level window update - pub fn send_connection_window_update(&mut self, dst: &mut Codec) + pub fn poll_complete(&mut self, + store: &mut Store, + dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, { - if let Some(incr) = self.flow_control.peek_window_update() { + // Send any pending connection level window updates + try_ready!(self.send_connection_window_update(dst)); + + // Send any pending stream level window updates + try_ready!(self.send_stream_window_updates(store, dst)); + + Ok(().into()) + } + + /// Send connection level window update + fn send_connection_window_update(&mut self, dst: &mut Codec>) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + { + let incr = self.flow.unclaimed_capacity(); + + if incr > 0 { let frame = frame::WindowUpdate::new(StreamId::zero(), incr); if dst.start_send(frame.into())?.is_ready() { - assert_eq!(Some(incr), self.flow_control.apply_window_update()); + self.flow.inc_window(incr); } else { return Ok(Async::NotReady); } @@ -432,73 +464,78 @@ impl Recv where B: Buf { Ok(().into()) } - */ + + + /// Send stream level window update + pub fn send_stream_window_updates(&mut self, + store: &mut Store, + dst: &mut Codec>) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + { + loop { + // Ensure the codec has capacity + try_ready!(dst.poll_ready()); + + // Get the next stream + let stream = match self.pending_window_updates.pop(store) { + Some(stream) => stream, + None => return Ok(().into()), + }; + + if !stream.state.is_recv_streaming() { + // No need to send window updates on the stream if the stream is + // no longer receiving data. + continue; + } + + // TODO: de-dup + let incr = stream.recv_flow.unclaimed_capacity(); + + if incr > 0 { + let frame = frame::WindowUpdate::new(stream.id, incr); + let res = dst.start_send(frame.into())?; + + assert!(res.is_ready()); + } + } + } pub fn next_incoming(&mut self, store: &mut Store) -> Option { self.pending_accept.pop(store) .map(|ptr| ptr.key()) } - pub fn poll_chunk(&mut self, stream: &mut Stream) - -> Poll, ConnectionError> + pub fn poll_data(&mut self, stream: &mut Stream) + -> Poll, ConnectionError> { - let frames = stream.pending_recv - .take_while(&mut self.buffer, |frame| frame.is_data()); + match stream.pending_recv.pop_front(&mut self.buffer) { + Some(frame) => { + match frame { + Frame::Data(frame) => { + Ok(Some(frame.into_payload()).into()) + } + frame => { + // Frame is trailer + stream.pending_recv.push_front(&mut self.buffer, frame); - if frames.is_empty() { - if stream.state.is_recv_closed() { - Ok(None.into()) - } else { - stream.recv_task = Some(task::current()); - Ok(Async::NotReady) - } - } else { - Ok(Some(Chunk { - pending_recv: frames, - }).into()) - } - } - - pub fn pop_bytes(&mut self, chunk: &mut Chunk) -> Option { - match chunk.pending_recv.pop_front(&mut self.buffer) { - Some(Frame::Data(frame)) => { - Some(frame.into_payload()) - } - None => None, - _ => panic!("unexpected frame type"), - } - } - - /* - /// Send stream level window update - pub fn send_stream_window_update(&mut self, - streams: &mut Store, - dst: &mut Codec) - -> Poll<(), ConnectionError> - where T: AsyncWrite, - { - while let Some(id) = self.pending_window_updates.pop_front() { - let flow = streams.find_mut(&id) - .and_then(|stream| stream.into_mut().recv_flow_control()); - - - if let Some(flow) = flow { - if let Some(incr) = flow.peek_window_update() { - let frame = frame::WindowUpdate::new(id, incr); - - if dst.start_send(frame.into())?.is_ready() { - assert_eq!(Some(incr), flow.apply_window_update()); - } else { - self.pending_window_updates.push_front(id); - return Ok(Async::NotReady); + // No more data frames + Ok(None.into()) } } } + None => { + if stream.state.is_recv_closed() { + // No more data frames will be received + Ok(None.into()) + } else { + // Request to get notified once more data frames arrive + stream.recv_task = Some(task::current()); + Ok(Async::NotReady) + } + } } - - Ok(().into()) } - */ fn reset(&mut self, _stream_id: StreamId, _reason: Reason) { unimplemented!(); diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index 7cd04a8..0f9af08 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -84,7 +84,8 @@ impl Send where B: Buf { pub fn send_headers(&mut self, frame: frame::Headers, - stream: &mut store::Ptr) + stream: &mut store::Ptr, + task: &mut Option) -> Result<(), ConnectionError> { trace!("send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz); @@ -96,7 +97,7 @@ impl Send where B: Buf { } // Queue the frame for sending - self.prioritize.queue_frame(frame.into(), stream); + self.prioritize.queue_frame(frame.into(), stream, task); Ok(()) } @@ -109,10 +110,11 @@ impl Send where B: Buf { pub fn send_data(&mut self, frame: frame::Data, - stream: &mut store::Ptr) + stream: &mut store::Ptr, + task: &mut Option) -> Result<(), ConnectionError> { - self.prioritize.send_data(frame, stream) + self.prioritize.send_data(frame, stream, task) } pub fn poll_complete(&mut self, @@ -168,11 +170,13 @@ impl Send where B: Buf { } pub fn recv_stream_window_update(&mut self, - frame: frame::WindowUpdate, + sz: WindowSize, stream: &mut store::Ptr) - -> Result<(), ConnectionError> { - self.prioritize.recv_stream_window_update(frame.size_increment(), stream) + if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) { + // TODO: Send reset + unimplemented!(); + } } pub fn apply_remote_settings(&mut self, @@ -210,32 +214,20 @@ impl Send where B: Buf { store.for_each(|mut stream| { let stream = &mut *stream; - unimplemented!(); - /* - if let Some(flow) = stream.state.send_flow_control() { - flow.shrink_window(val); + if stream.state.is_send_streaming() { + stream.send_flow.dec_window(dec); - // Update the unadvertised number as well - if stream.unadvertised_send_window < dec { - stream.unadvertised_send_window = 0; - } else { - stream.unadvertised_send_window -= dec; - } + // TODO: Handle reclaiming connection level window + // capacity. - unimplemented!(); + // TODO: Should this notify the producer? } - */ }); } else if val > old_val { let inc = val - old_val; store.for_each(|mut stream| { - unimplemented!(); - /* - if let Some(flow) = stream.state.send_flow_control() { - unimplemented!(); - } - */ + self.recv_stream_window_update(inc, &mut stream); }); } } diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 7a6790f..5bf2abd 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -52,6 +52,14 @@ pub(super) struct Stream { /// Receive data flow control pub recv_flow: FlowControl, + pub in_flight_recv_data: WindowSize, + + /// Next node in the linked list of streams waiting to send window updates. + pub next_window_update: Option, + + /// True if the stream is waiting to send a window update + pub is_pending_window_update: bool, + /// Frames pending for this stream to read pub pending_recv: buffer::Deque, @@ -68,6 +76,9 @@ pub(super) struct Next; #[derive(Debug)] pub(super) struct NextSendCapacity; +#[derive(Debug)] +pub(super) struct NextWindowUpdate; + impl Stream { pub fn new(id: StreamId) -> Stream { @@ -91,6 +102,9 @@ impl Stream { // ===== Fields related to receiving ===== recv_flow: FlowControl::new(), + in_flight_recv_data: 0, + next_window_update: None, + is_pending_window_update: false, pending_recv: buffer::Deque::new(), recv_task: None, pending_push_promises: store::Queue::new(), @@ -164,3 +178,25 @@ impl store::Next for NextSendCapacity { stream.is_pending_send_capacity = val; } } + +impl store::Next for NextWindowUpdate { + fn next(stream: &Stream) -> Option { + stream.next_window_update + } + + fn set_next(stream: &mut Stream, key: Option) { + stream.next_window_update = key; + } + + fn take_next(stream: &mut Stream) -> Option { + stream.next_window_update.take() + } + + fn is_queued(stream: &Stream) -> bool { + stream.is_pending_window_update + } + + fn set_queued(stream: &mut Stream, val: bool) { + stream.is_pending_window_update = val; + } +} diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index bb2a3c5..9f63196 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -17,14 +17,6 @@ pub(crate) struct StreamRef { key: store::Key, } -#[derive(Debug)] -pub(crate) struct Chunk - where B: Buf, -{ - inner: Arc>>, - recv: recv::Chunk, -} - /// Fields needed to manage state related to managing the set of streams. This /// is mostly split out to make ownership happy. /// @@ -42,6 +34,9 @@ struct Actions { /// Manages state transitions initiated by sending frames send: Send, + + /// Task that calls `poll_complete`. + task: Option, } impl Streams @@ -53,6 +48,7 @@ impl Streams actions: Actions { recv: Recv::new::

(&config), send: Send::new::

(&config), + task: None, }, store: Store::new(), })), @@ -147,14 +143,19 @@ impl Streams }) } - pub fn recv_err(&mut self, err: &ConnectionError) { + /// Handle a received error and return the ID of the last processed stream. + pub fn recv_err(&mut self, err: &ConnectionError) -> StreamId { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let actions = &mut me.actions; + let last_processed_id = actions.recv.last_processed_id(); + me.store.for_each(|mut stream| { actions.recv.recv_err(err, &mut *stream) }); + + last_processed_id } pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) @@ -171,7 +172,8 @@ impl Streams // The remote may send window updates for streams that the local now // considers closed. It's ok... if let Some(mut stream) = me.store.find_mut(&id) { - try!(me.actions.send.recv_stream_window_update(frame, &mut stream)); + me.actions.send.recv_stream_window_update( + frame.size_increment(), &mut stream); } else { me.actions.recv.ensure_not_idle(id)?; } @@ -212,23 +214,6 @@ impl Streams }) } - pub fn expand_window(&mut self, id: StreamId, sz: WindowSize) - -> Result<(), ConnectionError> - { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - if id.is_zero() { - try!(me.actions.recv.expand_connection_window(sz)); - } else { - if let Some(mut stream) = me.store.find_mut(&id) { - try!(me.actions.recv.expand_stream_window(id, sz, &mut stream)); - } - } - - Ok(()) - } - pub fn send_pending_refusal(&mut self, dst: &mut Codec>) -> Poll<(), ConnectionError> where T: AsyncWrite, @@ -245,7 +230,19 @@ impl Streams let mut me = self.inner.lock().unwrap(); let me = &mut *me; - me.actions.send.poll_complete(&mut me.store, dst) + // Send WINDOW_UPDATE frames first + // + // TODO: It would probably be better to interleave updates w/ data + // frames. + try_ready!(me.actions.recv.poll_complete(&mut me.store, dst)); + + // Send any other pending frames + try_ready!(me.actions.send.poll_complete(&mut me.store, dst)); + + // Nothing else to do, track the task + me.actions.task = Some(task::current()); + + Ok(().into()) } pub fn apply_remote_settings(&mut self, frame: &frame::Settings) { @@ -283,7 +280,8 @@ impl Streams let mut stream = me.store.insert(stream.id, stream); - me.actions.send.send_headers(headers, &mut stream)?; + me.actions.send.send_headers( + headers, &mut stream, &mut me.actions.task)?; // Given that the stream has been initialized, it should not be in the // closed state. @@ -317,7 +315,7 @@ impl StreamRef me.actions.transition::(stream, |actions, stream| { // Send the data frame - actions.send.send_data(frame, stream) + actions.send.send_data(frame, stream, &mut actions.task) }) } @@ -348,7 +346,7 @@ impl StreamRef stream.id, response, end_of_stream); me.actions.transition::(stream, |actions, stream| { - actions.send.send_headers(frame, stream) + actions.send.send_headers(frame, stream, &mut actions.task) }) } @@ -361,25 +359,27 @@ impl StreamRef me.actions.recv.poll_response(&mut stream) } - pub fn poll_data(&mut self) -> Poll>, ConnectionError> { - let recv = { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; + pub fn poll_data(&mut self) -> Poll, ConnectionError> { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; - let mut stream = me.store.resolve(self.key); + let mut stream = me.store.resolve(self.key); - try_ready!(me.actions.recv.poll_chunk(&mut stream)) - }; + me.actions.recv.poll_data(&mut stream) + } - // Convert to a chunk - let chunk = recv.map(|recv| { - Chunk { - inner: self.inner.clone(), - recv: recv, - } - }); + /// Releases recv capacity back to the peer. This will result in sending + /// WINDOW_UPDATE frames on both the stream and connection. + pub fn release_capacity(&mut self, capacity: WindowSize) + -> Result<(), ConnectionError> + { + let mut me = self.inner.lock().unwrap(); + let me = &mut *me; - Ok(chunk.into()) + let mut stream = me.store.resolve(self.key); + + me.actions.recv.release_capacity( + capacity, &mut stream, &mut me.actions.send, &mut me.actions.task) } /// Request capacity to send data @@ -424,32 +424,6 @@ impl Clone for StreamRef { } } -// ===== impl Chunk ===== - -impl Chunk - where B: Buf, -{ - // TODO: Come up w/ a better API - pub fn pop_bytes(&mut self) -> Option { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - me.actions.recv.pop_bytes(&mut self.recv) - } -} - -impl Drop for Chunk - where B: Buf, -{ - fn drop(&mut self) { - let mut me = self.inner.lock().unwrap(); - let me = &mut *me; - - while let Some(_) = me.actions.recv.pop_bytes(&mut self.recv) { - } - } -} - // ===== impl Actions ===== impl Actions diff --git a/src/server.rs b/src/server.rs index 7d6b79e..9126f35 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,5 @@ use {frame, ConnectionError, StreamId}; -use {Body, Chunk}; +use Body; use proto::{self, Connection, WindowSize}; use error::Reason::*; diff --git a/tests/client_request.rs b/tests/client_request.rs index 59eab1c..414e323 100644 --- a/tests/client_request.rs +++ b/tests/client_request.rs @@ -37,6 +37,8 @@ fn recv_invalid_server_stream_id() { .write(SETTINGS_ACK) // Read response .read(&[0, 0, 1, 1, 5, 0, 0, 0, 2, 137]) + // Write GO_AWAY + .write(&[0, 0, 8, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]) .build(); let mut h2 = Client::handshake(mock) diff --git a/tests/stream_states.rs b/tests/stream_states.rs index c127e9d..ef37652 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -90,15 +90,12 @@ fn send_recv_data() { let (_, body) = resp.into_parts(); // Wait for all the data frames to be received - let mut chunks = h2.run(body.collect()).unwrap(); + let bytes = h2.run(body.collect()).unwrap(); - // Only one chunk since two frames are coalesced. - assert_eq!(1, chunks.len()); + // One byte chunk + assert_eq!(1, bytes.len()); - let data = chunks[0].pop_bytes().unwrap(); - assert_eq!(data, &b"world"[..]); - - assert!(chunks[0].pop_bytes().is_none()); + assert_eq!(bytes[0], &b"world"[..]); // The H2 connection is closed h2.wait().unwrap(); @@ -141,18 +138,13 @@ fn send_headers_recv_data_single_frame() { let (_, body) = resp.into_parts(); // Wait for all the data frames to be received - let mut chunks = h2.run(body.collect()).unwrap(); + let bytes = h2.run(body.collect()).unwrap(); - // Only one chunk since two frames are coalesced. - assert_eq!(1, chunks.len()); + // Two data frames + assert_eq!(2, bytes.len()); - let data = chunks[0].pop_bytes().unwrap(); - assert_eq!(data, &b"hello"[..]); - - let data = chunks[0].pop_bytes().unwrap(); - assert_eq!(data, &b"world"[..]); - - assert!(chunks[0].pop_bytes().is_none()); + assert_eq!(bytes[0], &b"hello"[..]); + assert_eq!(bytes[1], &b"world"[..]); // The H2 connection is closed h2.wait().unwrap();