diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 8f2f9ac..cef47af 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,7 +1,7 @@ use {ConnectionError, Frame, FrameSize}; use client::Client; use frame::{self, StreamId}; -use proto::{self, Peer, ReadySink, WindowSize}; +use proto::{self, Peer, ReadySink, FlowTransporter, WindowSize}; use server::Server; use tokio_io::{AsyncRead, AsyncWrite}; @@ -32,44 +32,22 @@ pub fn new(transport: proto::Transport) } } -impl Connection { +impl Connection + where T: FlowTransporter, + B: IntoBuf, +{ /// Polls for the amount of additional data that may be sent to a remote. /// /// Connection and stream updates are distinct. - pub fn poll_window_update(&mut self, _id: StreamId) -> Poll { - // let added = if id.is_zero() { - // self.remote_flow_controller.take_window_update() - // } else { - // self.streams.get_mut(&id).and_then(|s| s.take_send_window_update()) - // }; - // match added { - // Some(incr) => Ok(Async::Ready(incr)), - // None => { - // self.blocked_window_update = Some(task::current()); - // Ok(Async::NotReady) - // } - // } - unimplemented!() + pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { + self.inner.poll_remote_window_update(id) } /// Increases the amount of data that the remote endpoint may send. /// /// Connection and stream updates are distinct. - pub fn increment_window_size(&mut self, _id: StreamId, _incr: WindowSize) { - // assert!(self.sending_window_update.is_none()); - // let added = if id.is_zero() { - // self.local_flow_controller.grow_window(incr); - // self.local_flow_controller.take_window_update() - // } else { - // self.streams.get_mut(&id).and_then(|s| { - // s.grow_recv_window(incr); - // s.take_recv_window_update() - // }) - // }; - // if let Some(added) = added { - // self.sending_window_update = Some(frame::WindowUpdate::new(id, added)); - // } - unimplemented!() + pub fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.grow_local_window(id, incr) } } @@ -150,9 +128,8 @@ impl Stream for Connection // is called here. try_ready!(self.inner.poll_complete()); - // If the sender sink is ready, we attempt to poll the underlying - // stream once more because it, may have been made ready by flushing - // the sink. + // If the write buffer is cleared, attempt to poll the underlying + // stream once more because it, may have been made ready. try_ready!(self.inner.poll()) } }; @@ -215,17 +192,12 @@ impl Sink for Connection match item { Frame::Headers { id, headers, end_of_stream } => { + // This is a one-way conversion. By checking `poll_ready` first (above), + // it's already been determined that the inner `Sink` can accept the item. + // If the item is rejected, then there is a bug. let frame = P::convert_send_message(id, headers, end_of_stream); - - // We already ensured that the upstream can handle the frame, so - // panic if it gets rejected. - let res = try!(self.inner.start_send(Headers(frame))); - - // This is a one-way conversion. By checking `poll_ready` first, - // it's already been determined that the inner `Sink` can accept - // the item. If the item is rejected, then there is a bug. + let res = self.inner.start_send(Headers(frame))?; assert!(res.is_ready()); - Ok(AsyncSink::Ready) } diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index d3c3b05..5e785d6 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -3,7 +3,7 @@ use error; use frame::{self, Frame}; use proto::*; -use futures::*; +use std::collections::VecDeque; #[derive(Debug)] pub struct FlowControl { @@ -18,11 +18,17 @@ pub struct FlowControl { /// Tracks the onnection-level flow control window for receiving data from the remote. remote_flow_controller: FlowController, - /// When `poll_window_update` is not ready, then the calling task is saved to be - /// notified later. Access to poll_window_update must not be shared across tasks. - blocked_window_update: Option, + /// Holds the list of streams on which local window updates may be sent. + // XXX It would be cool if this didn't exist. + pending_local_window_updates: VecDeque, - sending_window_update: Option, + /// If a window update can't be sent immediately, it may need to be saved to be sent later. + sending_local_window_update: Option, + + /// When `poll_remote_window_update` is not ready, then the calling task is saved to + /// be notified later. Access to poll_window_update must not be shared across tasks, + /// as we only track a single task (and *not* i.e. a task per stream id). + blocked_remote_window_update: Option, } impl FlowControl @@ -41,41 +47,38 @@ impl FlowControl initial_remote_window_size, local_flow_controller: FlowController::new(initial_local_window_size), remote_flow_controller: FlowController::new(initial_remote_window_size), - blocked_window_update: None, - sending_window_update: None, + blocked_remote_window_update: None, + sending_local_window_update: None, + pending_local_window_updates: VecDeque::new(), } } } impl FlowControl { fn claim_local_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { - if id.is_zero() { - return self.local_flow_controller.claim_window(len) - .map_err(|_| error::Reason::FlowControlError.into()); - } + let res = if id.is_zero() { + self.local_flow_controller.claim_window(len) + } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { + stream.claim_local_window(len) + } else { + // Ignore updates for non-existent streams. + Ok(()) + }; - if let Some(mut stream) = self.streams_mut().get_mut(&id) { - return stream.claim_local_window(len) - .map_err(|_| error::Reason::FlowControlError.into()); - } - - // Ignore updates for non-existent streams. - Ok(()) + res.map_err(|_| error::Reason::FlowControlError.into()) } fn claim_remote_window(&mut self, id: &StreamId, len: WindowSize) -> Result<(), ConnectionError> { - if id.is_zero() { - return self.local_flow_controller.claim_window(len) - .map_err(|_| error::Reason::FlowControlError.into()); - } + let res = if id.is_zero() { + self.local_flow_controller.claim_window(len) + } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { + stream.claim_remote_window(len) + } else { + // Ignore updates for non-existent streams. + Ok(()) + }; - if let Some(mut stream) = self.streams_mut().get_mut(&id) { - return stream.claim_remote_window(len) - .map_err(|_| error::Reason::FlowControlError.into()); - } - - // Ignore updates for non-existent streams. - Ok(()) + res.map_err(|_| error::Reason::FlowControlError.into()) } /// Handles a window update received from the remote, indicating that the local may @@ -87,35 +90,93 @@ impl FlowControl { if incr == 0 { return; } - let added = if id.is_zero() { + + if id.is_zero() { self.remote_flow_controller.grow_window(incr); - true - } else if let Some(mut s) = self.streams_mut().get_mut(&id) { + } else if let Some(mut s) = self.inner.streams_mut().get_mut(&id) { s.grow_remote_window(incr); - true } else { - false + // Ignore updates for non-existent streams. + return; }; - if added { - if let Some(task) = self.blocked_window_update.take() { - task.notify(); - } + + if let Some(task) = self.blocked_remote_window_update.take() { + task.notify(); } } } +impl FlowTransporter for FlowControl { + fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { + if id.is_zero() { + if let Some(sz) = self.remote_flow_controller.take_window_update() { + return Ok(Async::Ready(sz)); + } + } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { + if let Some(sz) = stream.take_remote_window_update() { + return Ok(Async::Ready(sz)); + } + } else { + return Err(error::User::InvalidStreamId.into()); + } + + self.blocked_remote_window_update = Some(task::current()); + return Ok(Async::NotReady); + } + + fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + if id.is_zero() { + self.local_flow_controller.grow_window(incr); + self.pending_local_window_updates.push_back(id); + Ok(()) + } else if let Some(mut stream) = self.inner.streams_mut().get_mut(&id) { + stream.grow_local_window(incr); + self.pending_local_window_updates.push_back(id); + Ok(()) + } else { + Err(error::User::InvalidStreamId.into()) + } + } +} + +impl StreamTransporter for FlowControl { + #[inline] + fn streams(&self) -> &StreamMap { + self.inner.streams() + } + + #[inline] + fn streams_mut(&mut self) -> &mut StreamMap { + self.inner.streams_mut() + } +} + impl FlowControl where T: Sink, SinkError = ConnectionError>, + T: StreamTransporter, { - /// Attempts to send a window update to the remote, if one is pending. - fn poll_sending_window_update(&mut self) -> Poll<(), ConnectionError> { - if let Some(f) = self.sending_window_update.take() { + /// Returns ready when there are no pending window updates to send. + fn poll_send_local_window_updates(&mut self) -> Poll<(), ConnectionError> { + if let Some(f) = self.sending_local_window_update.take() { if self.inner.start_send(f.into())?.is_not_ready() { - self.sending_window_update = Some(f); + self.sending_local_window_update = Some(f); return Ok(Async::NotReady); } } + while let Some(id) = self.pending_local_window_updates.pop_front() { + let update = self.inner.streams_mut().get_mut(&id) + .and_then(|mut s| s.take_local_window_update()) + .map(|incr| frame::WindowUpdate::new(id, incr)); + + if let Some(f) = update { + if self.inner.start_send(f.into())?.is_not_ready() { + self.sending_local_window_update = Some(f); + return Ok(Async::NotReady); + } + } + } + Ok(Async::Ready(())) } } @@ -136,8 +197,8 @@ impl FlowControl /// > flow-control window and MUST NOT send new flow-controlled frames until it /// > receives WINDOW_UPDATE frames that cause the flow-control window to become /// > positive. -impl ConnectionTransporter for FlowControl - where T: ConnectionTransporter, +impl ApplySettings for FlowControl + where T: ApplySettings, T: StreamTransporter { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { @@ -149,15 +210,13 @@ impl ConnectionTransporter for FlowControl return Ok(()); } - { - let mut streams = self.streams_mut(); - if new_window_size < old_window_size { - let decr = old_window_size - new_window_size; - streams.shrink_all_local_windows(decr); - } else { - let incr = new_window_size - old_window_size; - streams.grow_all_local_windows(incr); - } + let mut streams = self.inner.streams_mut(); + if new_window_size < old_window_size { + let decr = old_window_size - new_window_size; + streams.shrink_all_local_windows(decr); + } else { + let incr = new_window_size - old_window_size; + streams.grow_all_local_windows(incr); } self.initial_local_window_size = new_window_size; @@ -173,15 +232,13 @@ impl ConnectionTransporter for FlowControl return Ok(()); } - { - let mut streams = self.streams_mut(); - if new_window_size < old_window_size { - let decr = old_window_size - new_window_size; - streams.shrink_all_remote_windows(decr); - } else { - let incr = new_window_size - old_window_size; - streams.grow_all_remote_windows(incr); - } + let mut streams = self.inner.streams_mut(); + if new_window_size < old_window_size { + let decr = old_window_size - new_window_size; + streams.shrink_all_remote_windows(decr); + } else { + let incr = new_window_size - old_window_size; + streams.grow_all_remote_windows(incr); } self.initial_remote_window_size = new_window_size; @@ -189,16 +246,6 @@ impl ConnectionTransporter for FlowControl } } -impl StreamTransporter for FlowControl { - fn streams(&self) -> &StreamMap { - self.inner.streams() - } - - fn streams_mut(&mut self) -> &mut StreamMap { - self.inner.streams_mut() - } -} - impl Stream for FlowControl where T: Stream, T: StreamTransporter, @@ -230,23 +277,40 @@ impl Stream for FlowControl impl Sink for FlowControl where T: Sink, SinkError = ConnectionError>, + T: ReadySink, T: StreamTransporter, { type SinkItem = T::SinkItem; type SinkError = T::SinkError; - fn start_send(&mut self, item: Frame) -> StartSend { + fn start_send(&mut self, frame: Frame) -> StartSend { use frame::Frame::*; - if let &Data(ref v) = &item { - self.claim_remote_window(&v.stream_id(), v.len())?; + if self.poll_send_local_window_updates()?.is_not_ready() { + return Ok(AsyncSink::NotReady(frame)); } - self.inner.start_send(item) + match frame { + Data(v) => { + // Before claiming space, ensure that the transport will accept the frame. + if self.inner.poll_ready()?.is_not_ready() { + return Ok(AsyncSink::NotReady(Data(v))); + } + + self.claim_remote_window(&v.stream_id(), v.len())?; + + let res = self.inner.start_send(Data(v))?; + assert!(res.is_ready()); + Ok(res) + } + + frame => self.inner.start_send(frame), + } } fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - self.inner.poll_complete() + try_ready!(self.inner.poll_complete()); + self.poll_send_local_window_updates() } } @@ -257,6 +321,7 @@ impl ReadySink for FlowControl T: StreamTransporter, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_ready() + try_ready!(self.inner.poll_ready()); + self.poll_send_local_window_updates() } } diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index 0c34ee4..c6b9615 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -1,7 +1,7 @@ use {hpack, ConnectionError}; use frame::{self, Frame, Kind}; use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE; -use proto::{ConnectionTransporter, ReadySink}; +use proto::{ApplySettings, ReadySink}; use futures::*; @@ -103,7 +103,7 @@ impl FramedRead { } } -impl ConnectionTransporter for FramedRead { +impl ApplySettings for FramedRead { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.get_mut().apply_local_settings(set) } diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 1a0def8..ace33ae 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -1,6 +1,6 @@ use {hpack, ConnectionError, FrameSize}; use frame::{self, Frame}; -use proto::{ConnectionTransporter, ReadySink}; +use proto::{ApplySettings, ReadySink}; use futures::*; use tokio_io::{AsyncRead, AsyncWrite}; @@ -78,7 +78,7 @@ impl FramedWrite } } -impl ConnectionTransporter for FramedWrite { +impl ApplySettings for FramedWrite { fn apply_local_settings(&mut self, _set: &frame::SettingSet) -> Result<(), ConnectionError> { Ok(()) } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 6834327..5a1fe79 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,3 +1,12 @@ +use {frame, ConnectionError, Peer, StreamId}; +use bytes::{Buf, IntoBuf}; +use fnv::FnvHasher; +use futures::*; +use ordermap::{Entry, OrderMap}; +use std::hash::BuildHasherDefault; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::length_delimited; + mod connection; mod flow_control; mod flow_controller; @@ -20,17 +29,6 @@ pub use self::settings::Settings; pub use self::stream_tracker::StreamTracker; use self::state::StreamState; -use {frame, ConnectionError, Peer, StreamId}; - -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::length_delimited; - -use bytes::{Buf, IntoBuf}; - -use ordermap::{Entry, OrderMap}; -use fnv::FnvHasher; -use std::hash::BuildHasherDefault; - /// Represents the internals of an HTTP2 connection. /// /// A transport consists of several layers (_transporters_) and is arranged from _top_ @@ -100,7 +98,7 @@ impl StreamMap { } /// Allows settings to be applied from the top of the stack to the lower levels.d -pub trait ConnectionTransporter { +pub trait ApplySettings { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError>; } @@ -110,6 +108,11 @@ pub trait StreamTransporter { fn streams_mut(&mut self) -> &mut StreamMap; } +pub trait FlowTransporter { + fn poll_remote_window_update(&mut self, id: StreamId) -> Poll; + fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; +} + /// Create a full H2 transport from an I/O handle. /// /// This is called as the final step of the client handshake future. diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index d527225..7405f7c 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,7 +1,7 @@ use ConnectionError; use frame::{Frame, Ping, SettingSet}; use futures::*; -use proto::{ConnectionTransporter, ReadySink}; +use proto::{ApplySettings, ReadySink}; /// Acknowledges ping requests from the remote. #[derive(Debug)] @@ -22,7 +22,7 @@ impl PingPong } } -impl ConnectionTransporter for PingPong { +impl ApplySettings for PingPong { fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { self.inner.apply_local_settings(set) } diff --git a/src/proto/settings.rs b/src/proto/settings.rs index f62dfd4..fd7efb8 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,6 +1,6 @@ -use ConnectionError; +use {StreamId, ConnectionError}; use frame::{self, Frame}; -use proto::{ConnectionTransporter, ReadySink, StreamMap, StreamTransporter}; +use proto::{ApplySettings, ReadySink, StreamMap, StreamTransporter, FlowTransporter, WindowSize}; use futures::*; use tokio_io::AsyncRead; @@ -8,6 +8,8 @@ use bytes::BufMut; use std::io; + +// TODO #[derive(Debug)] pub struct Settings { // Upstream transport @@ -23,7 +25,7 @@ pub struct Settings { remaining_acks: usize, // True when the local settings must be flushed to the remote - is_dirty: bool, + is_local_dirty: bool, // True when we have received a settings frame from the remote. received_remote: bool, @@ -38,7 +40,7 @@ impl Settings local: local, remote: frame::SettingSet::default(), remaining_acks: 0, - is_dirty: true, + is_local_dirty: true, received_remote: false, } } @@ -60,18 +62,18 @@ impl Settings local: self.local, remote: self.remote, remaining_acks: self.remaining_acks, - is_dirty: self.is_dirty, + is_local_dirty: self.is_local_dirty, received_remote: self.received_remote, } } fn try_send_pending(&mut self) -> Poll<(), ConnectionError> { - trace!("try_send_pending; dirty={} acks={}", self.is_dirty, self.remaining_acks); - if self.is_dirty { + trace!("try_send_pending; dirty={} acks={}", self.is_local_dirty, self.remaining_acks); + if self.is_local_dirty { let frame = frame::Settings::new(self.local.clone()); try_ready!(self.try_send(frame)); - self.is_dirty = false; + self.is_local_dirty = false; } while self.remaining_acks > 0 { @@ -104,10 +106,20 @@ impl StreamTransporter for Settings { } } +impl FlowTransporter for Settings { + fn poll_remote_window_update(&mut self, id: StreamId) -> Poll { + self.inner.poll_remote_window_update(id) + } + + fn grow_local_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { + self.inner.grow_local_window(id, incr) + } +} + impl Stream for Settings where T: Stream, T: Sink, SinkError = ConnectionError>, - T: ConnectionTransporter, + T: ApplySettings, { type Item = Frame; type Error = ConnectionError; @@ -120,14 +132,13 @@ impl Stream for Settings debug!("received remote settings ack"); // TODO: Handle acks } else { - // Received new settings, queue an ACK - self.remaining_acks += 1; - - // Apply the settings before saving them. + // Apply the settings before saving them and sending + // acknowledgements. let settings = v.into_set(); self.inner.apply_remote_settings(&settings)?; self.remote = settings; + self.remaining_acks += 1; let _ = try!(self.try_send_pending()); } } diff --git a/src/proto/stream_tracker.rs b/src/proto/stream_tracker.rs index 2ffccbd..e36c8ff 100644 --- a/src/proto/stream_tracker.rs +++ b/src/proto/stream_tracker.rs @@ -4,8 +4,6 @@ use error::User::InvalidStreamId; use frame::{self, Frame}; use proto::*; -use futures::*; - use std::marker::PhantomData; #[derive(Debug)] @@ -44,10 +42,12 @@ impl StreamTracker } impl StreamTransporter for StreamTracker { + #[inline] fn streams(&self) -> &StreamMap { &self.streams } + #[inline] fn streams_mut(&mut self) -> &mut StreamMap { &mut self.streams } @@ -71,8 +71,8 @@ impl StreamTransporter for StreamTracker { /// > exceed the new value or allow streams to complete. /// /// This module does NOT close streams when the setting changes. -impl ConnectionTransporter for StreamTracker - where T: ConnectionTransporter +impl ApplySettings for StreamTracker + where T: ApplySettings { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.local_max_concurrency = set.max_concurrent_streams();