diff --git a/src/frame/mod.rs b/src/frame/mod.rs index c31866c..a481d61 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -32,6 +32,7 @@ mod reset; mod settings; mod stream_id; mod util; +mod window_update; pub use self::data::Data; pub use self::go_away::GoAway; @@ -41,6 +42,7 @@ pub use self::ping::Ping; pub use self::reset::Reset; pub use self::settings::{Settings, SettingSet}; pub use self::stream_id::StreamId; +pub use self::window_update::WindowUpdate; // Re-export some constants pub use self::settings::{ @@ -56,7 +58,8 @@ pub enum Frame { Headers(Headers), PushPromise(PushPromise), Settings(Settings), - Ping(Ping) + Ping(Ping), + WindowUpdate(WindowUpdate) } /// Errors that can occur during parsing an HTTP/2 frame. diff --git a/src/frame/window_update.rs b/src/frame/window_update.rs new file mode 100644 index 0000000..92ec263 --- /dev/null +++ b/src/frame/window_update.rs @@ -0,0 +1,48 @@ +use StreamId; +use byteorder::{ByteOrder, NetworkEndian}; +use bytes::{BufMut}; +use frame::{self, Head, Kind, Error}; + +const INCREMENT_MASK: u32 = 1 << 31; + +type Increment = u32; + +#[derive(Debug)] +pub struct WindowUpdate { + stream_id: StreamId, + increment: Increment, +} + +impl WindowUpdate { + pub fn stream_id(&self) -> StreamId { + self.stream_id + } + + pub fn increment(&self) -> Increment { + self.increment + } + + /// Builds a `Ping` frame from a raw frame. + pub fn load(head: Head, bytes: &[u8]) -> Result { + debug_assert_eq!(head.kind(), ::frame::Kind::WindowUpdate); + Ok(WindowUpdate { + stream_id: head.stream_id(), + // Clear the most significant bit, as that is reserved and MUST be ignored when + // received. + increment: NetworkEndian::read_u32(bytes) & !INCREMENT_MASK, + }) + } + + pub fn encode(&self, dst: &mut B) { + trace!("encoding WINDOW_UPDATE; id={:?}", self.stream_id); + let head = Head::new(Kind::Ping, 0, self.stream_id); + head.encode(4, dst); + dst.put_u32::(self.increment); + } +} + +impl From for frame::Frame { + fn from(src: WindowUpdate) -> frame::Frame { + frame::Frame::WindowUpdate(src) + } +} diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 0c4f888..e8e3c9d 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,7 +1,8 @@ -use {frame, Frame, ConnectionError, Peer, StreamId}; +use {Frame, ConnectionError, Peer, StreamId}; use client::Client; +use frame::{Frame as WireFrame}; use server::Server; -use proto::{self, ReadySink, State}; +use proto::{self, ReadySink, State, WindowUpdate}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -15,17 +16,60 @@ use fnv::FnvHasher; use std::marker::PhantomData; use std::hash::BuildHasherDefault; +pub struct FlowControlViolation; + +#[derive(Debug)] +struct FlowController { + window_size: u32, + underflow: u32, +} + +impl FlowController { + pub fn new(window_size: u32) -> FlowController { + FlowController { + window_size, + underflow: 0, + } + } + + pub fn shrink(&mut self, mut sz: u32) { + self.underflow += sz; + } + + pub fn consume(&mut self, mut sz: u32) -> Result<(), FlowControlViolation> { + if sz < self.window_size { + self.underflow -= sz; + return Err(FlowControlViolation); + } + + self.window_size -= sz; + Ok(()) + } + + pub fn increment(&mut self, mut sz: u32) { + if sz <= self.underflow { + self.underflow -= sz; + return; + } + + sz -= self.underflow; + self.window_size += sz; + } +} + /// An H2 connection #[derive(Debug)] pub struct Connection { inner: proto::Inner, streams: StreamMap, peer: PhantomData

, + local_flow_controller: FlowController, + remote_flow_controller: FlowController, } type StreamMap = OrderMap>; -pub fn new(transport: proto::Inner) -> Connection +pub fn new(transport: proto::Inner, initial_local_window_size: u32, initial_remote_window_size: u32) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, { @@ -33,15 +77,35 @@ pub fn new(transport: proto::Inner) -> Connection inner: transport, streams: StreamMap::default(), peer: PhantomData, + local_flow_controller: FlowController::new(initial_local_window_size), + remote_flow_controller: FlowController::new(initial_remote_window_size), } } impl Connection { - pub fn increment_local_window_size(&mut self, id: StreamId, increment: usize) { + /// Publishes stream window updates to the remote. + /// + /// Connection window updates (StreamId=0) and stream window updates are published + /// distinctly. + pub fn increment_local_window(&mut self, up: WindowUpdate) { + let incr = up.increment(); + let flow = match up { + WindowUpdate::Connection { .. } => Some(&self.local_flow_controller), + WindowUpdate::Stream { id, .. } => { + self.streams.get(&id).map(|s| s.local_flow_controller()) + } + }; + if let Some(flow) = flow { + flow.increment(incr); + } unimplemented!() } - pub fn poll_remote_window_size(&mut self, id: StreamId) -> Poll { + /// Advertises stream window updates from the remote. + /// + /// Connection window updates (StreamId=0) and stream window updates are advertised + /// distinctly. + pub fn poll_remote_window(&mut self) -> Poll { unimplemented!() } } @@ -88,8 +152,6 @@ impl Stream for Connection type Error = ConnectionError; fn poll(&mut self) -> Poll, ConnectionError> { - use frame::Frame::*; - trace!("Connection::poll"); let frame = match try!(self.inner.poll()) { @@ -105,7 +167,7 @@ impl Stream for Connection trace!("received; frame={:?}", frame); let frame = match frame { - Some(Headers(v)) => { + Some(WireFrame::Headers(v)) => { // TODO: Update stream state let stream_id = v.stream_id(); let end_of_stream = v.is_end_stream(); @@ -130,7 +192,7 @@ impl Stream for Connection end_of_stream: end_of_stream, } } - Some(Data(v)) => { + Some(WireFrame::Data(v)) => { // TODO: Validate frame let stream_id = v.stream_id(); @@ -189,7 +251,7 @@ impl Sink for Connection // We already ensured that the upstream can handle the frame, so // panic if it gets rejected. - let res = try!(self.inner.start_send(frame::Frame::Headers(frame))); + let res = try!(self.inner.start_send(WireFrame::Headers(frame))); // This is a one-way conversion. By checking `poll_ready` first, // it's already been determined that the inner `Sink` can accept diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index ee476d4..b4d0db6 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -96,7 +96,10 @@ impl FramedRead { debug!("decoded; frame={:?}", frame); unimplemented!(); } - Kind::WindowUpdate => unimplemented!(), + Kind::WindowUpdate => { + let frame = try!(frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..])); + frame.into() + } Kind::Continuation => { unimplemented!(); } diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index 07bd934..8ece940 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -120,6 +120,10 @@ impl Sink for FramedWrite { v.encode(self.buf.get_mut()); trace!("encoded ping; rem={:?}", self.buf.remaining()); } + Frame::WindowUpdate(v) => { + v.encode(self.buf.get_mut()); + trace!("encoded window_update; rem={:?}", self.buf.remaining()); + } } Ok(AsyncSink::Ready) diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 923a284..66df641 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -5,6 +5,7 @@ mod ping_pong; mod ready; mod settings; mod state; +mod window_update; pub use self::connection::{Connection}; pub use self::framed_read::FramedRead; @@ -13,6 +14,7 @@ pub use self::ping_pong::PingPong; pub use self::ready::ReadySink; pub use self::settings::Settings; pub use self::state::State; +pub use self::window_update::WindowUpdate; use {frame, Peer}; @@ -83,5 +85,5 @@ pub fn from_server_handshaker(transport: Settings>) }); // Finally, return the constructed `Connection` - connection::new(settings) + connection::new(settings, 65_535, 65_535) } diff --git a/src/proto/window_update.rs b/src/proto/window_update.rs new file mode 100644 index 0000000..e6ea78a --- /dev/null +++ b/src/proto/window_update.rs @@ -0,0 +1,16 @@ +use StreamId; + +#[derive(Debug)] +pub enum WindowUpdate { + Connection { increment: usize }, + Stream { id: StreamId, increment: usize }, +} + +impl WindowUpdate { + pub fn increment(&self) -> usize { + match *self { + WindowUpdate::Connection { increment } | + WindowUpdate::Stream { increment, .. } => increment + } + } +}