This commit is contained in:
Oliver Gould
2017-07-09 06:01:42 +00:00
parent 7632a016df
commit d7b82cd50b
7 changed files with 151 additions and 13 deletions

View File

@@ -32,6 +32,7 @@ mod reset;
mod settings; mod settings;
mod stream_id; mod stream_id;
mod util; mod util;
mod window_update;
pub use self::data::Data; pub use self::data::Data;
pub use self::go_away::GoAway; pub use self::go_away::GoAway;
@@ -41,6 +42,7 @@ pub use self::ping::Ping;
pub use self::reset::Reset; pub use self::reset::Reset;
pub use self::settings::{Settings, SettingSet}; pub use self::settings::{Settings, SettingSet};
pub use self::stream_id::StreamId; pub use self::stream_id::StreamId;
pub use self::window_update::WindowUpdate;
// Re-export some constants // Re-export some constants
pub use self::settings::{ pub use self::settings::{
@@ -56,7 +58,8 @@ pub enum Frame {
Headers(Headers), Headers(Headers),
PushPromise(PushPromise), PushPromise(PushPromise),
Settings(Settings), Settings(Settings),
Ping(Ping) Ping(Ping),
WindowUpdate(WindowUpdate)
} }
/// Errors that can occur during parsing an HTTP/2 frame. /// Errors that can occur during parsing an HTTP/2 frame.

View File

@@ -0,0 +1,48 @@
use StreamId;
use byteorder::{ByteOrder, NetworkEndian};
use bytes::{BufMut};
use frame::{self, Head, Kind, Error};
const INCREMENT_MASK: u32 = 1 << 31;
type Increment = u32;
#[derive(Debug)]
pub struct WindowUpdate {
stream_id: StreamId,
increment: Increment,
}
impl WindowUpdate {
pub fn stream_id(&self) -> StreamId {
self.stream_id
}
pub fn increment(&self) -> Increment {
self.increment
}
/// Builds a `Ping` frame from a raw frame.
pub fn load(head: Head, bytes: &[u8]) -> Result<WindowUpdate, Error> {
debug_assert_eq!(head.kind(), ::frame::Kind::WindowUpdate);
Ok(WindowUpdate {
stream_id: head.stream_id(),
// Clear the most significant bit, as that is reserved and MUST be ignored when
// received.
increment: NetworkEndian::read_u32(bytes) & !INCREMENT_MASK,
})
}
pub fn encode<B: BufMut>(&self, dst: &mut B) {
trace!("encoding WINDOW_UPDATE; id={:?}", self.stream_id);
let head = Head::new(Kind::Ping, 0, self.stream_id);
head.encode(4, dst);
dst.put_u32::<NetworkEndian>(self.increment);
}
}
impl From<WindowUpdate> for frame::Frame {
fn from(src: WindowUpdate) -> frame::Frame {
frame::Frame::WindowUpdate(src)
}
}

View File

@@ -1,7 +1,8 @@
use {frame, Frame, ConnectionError, Peer, StreamId}; use {Frame, ConnectionError, Peer, StreamId};
use client::Client; use client::Client;
use frame::{Frame as WireFrame};
use server::Server; use server::Server;
use proto::{self, ReadySink, State}; use proto::{self, ReadySink, State, WindowUpdate};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@@ -15,17 +16,60 @@ use fnv::FnvHasher;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::hash::BuildHasherDefault; use std::hash::BuildHasherDefault;
pub struct FlowControlViolation;
#[derive(Debug)]
struct FlowController {
window_size: u32,
underflow: u32,
}
impl FlowController {
pub fn new(window_size: u32) -> FlowController {
FlowController {
window_size,
underflow: 0,
}
}
pub fn shrink(&mut self, mut sz: u32) {
self.underflow += sz;
}
pub fn consume(&mut self, mut sz: u32) -> Result<(), FlowControlViolation> {
if sz < self.window_size {
self.underflow -= sz;
return Err(FlowControlViolation);
}
self.window_size -= sz;
Ok(())
}
pub fn increment(&mut self, mut sz: u32) {
if sz <= self.underflow {
self.underflow -= sz;
return;
}
sz -= self.underflow;
self.window_size += sz;
}
}
/// An H2 connection /// An H2 connection
#[derive(Debug)] #[derive(Debug)]
pub struct Connection<T, P> { pub struct Connection<T, P> {
inner: proto::Inner<T>, inner: proto::Inner<T>,
streams: StreamMap<State>, streams: StreamMap<State>,
peer: PhantomData<P>, peer: PhantomData<P>,
local_flow_controller: FlowController,
remote_flow_controller: FlowController,
} }
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>; type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
pub fn new<T, P>(transport: proto::Inner<T>) -> Connection<T, P> pub fn new<T, P>(transport: proto::Inner<T>, initial_local_window_size: u32, initial_remote_window_size: u32) -> Connection<T, P>
where T: AsyncRead + AsyncWrite, where T: AsyncRead + AsyncWrite,
P: Peer, P: Peer,
{ {
@@ -33,15 +77,35 @@ pub fn new<T, P>(transport: proto::Inner<T>) -> Connection<T, P>
inner: transport, inner: transport,
streams: StreamMap::default(), streams: StreamMap::default(),
peer: PhantomData, peer: PhantomData,
local_flow_controller: FlowController::new(initial_local_window_size),
remote_flow_controller: FlowController::new(initial_remote_window_size),
} }
} }
impl<T, P> Connection<T, P> { impl<T, P> Connection<T, P> {
pub fn increment_local_window_size(&mut self, id: StreamId, increment: usize) { /// Publishes stream window updates to the remote.
///
/// Connection window updates (StreamId=0) and stream window updates are published
/// distinctly.
pub fn increment_local_window(&mut self, up: WindowUpdate) {
let incr = up.increment();
let flow = match up {
WindowUpdate::Connection { .. } => Some(&self.local_flow_controller),
WindowUpdate::Stream { id, .. } => {
self.streams.get(&id).map(|s| s.local_flow_controller())
}
};
if let Some(flow) = flow {
flow.increment(incr);
}
unimplemented!() unimplemented!()
} }
pub fn poll_remote_window_size(&mut self, id: StreamId) -> Poll<usize, ()> { /// Advertises stream window updates from the remote.
///
/// Connection window updates (StreamId=0) and stream window updates are advertised
/// distinctly.
pub fn poll_remote_window(&mut self) -> Poll<WindowUpdate, ()> {
unimplemented!() unimplemented!()
} }
} }
@@ -88,8 +152,6 @@ impl<T, P> Stream for Connection<T, P>
type Error = ConnectionError; type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> { fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> {
use frame::Frame::*;
trace!("Connection::poll"); trace!("Connection::poll");
let frame = match try!(self.inner.poll()) { let frame = match try!(self.inner.poll()) {
@@ -105,7 +167,7 @@ impl<T, P> Stream for Connection<T, P>
trace!("received; frame={:?}", frame); trace!("received; frame={:?}", frame);
let frame = match frame { let frame = match frame {
Some(Headers(v)) => { Some(WireFrame::Headers(v)) => {
// TODO: Update stream state // TODO: Update stream state
let stream_id = v.stream_id(); let stream_id = v.stream_id();
let end_of_stream = v.is_end_stream(); let end_of_stream = v.is_end_stream();
@@ -130,7 +192,7 @@ impl<T, P> Stream for Connection<T, P>
end_of_stream: end_of_stream, end_of_stream: end_of_stream,
} }
} }
Some(Data(v)) => { Some(WireFrame::Data(v)) => {
// TODO: Validate frame // TODO: Validate frame
let stream_id = v.stream_id(); let stream_id = v.stream_id();
@@ -189,7 +251,7 @@ impl<T, P> Sink for Connection<T, P>
// We already ensured that the upstream can handle the frame, so // We already ensured that the upstream can handle the frame, so
// panic if it gets rejected. // panic if it gets rejected.
let res = try!(self.inner.start_send(frame::Frame::Headers(frame))); let res = try!(self.inner.start_send(WireFrame::Headers(frame)));
// This is a one-way conversion. By checking `poll_ready` first, // This is a one-way conversion. By checking `poll_ready` first,
// it's already been determined that the inner `Sink` can accept // it's already been determined that the inner `Sink` can accept

View File

@@ -96,7 +96,10 @@ impl<T> FramedRead<T> {
debug!("decoded; frame={:?}", frame); debug!("decoded; frame={:?}", frame);
unimplemented!(); unimplemented!();
} }
Kind::WindowUpdate => unimplemented!(), Kind::WindowUpdate => {
let frame = try!(frame::WindowUpdate::load(head, &bytes[frame::HEADER_LEN..]));
frame.into()
}
Kind::Continuation => { Kind::Continuation => {
unimplemented!(); unimplemented!();
} }

View File

@@ -120,6 +120,10 @@ impl<T: AsyncWrite> Sink for FramedWrite<T> {
v.encode(self.buf.get_mut()); v.encode(self.buf.get_mut());
trace!("encoded ping; rem={:?}", self.buf.remaining()); trace!("encoded ping; rem={:?}", self.buf.remaining());
} }
Frame::WindowUpdate(v) => {
v.encode(self.buf.get_mut());
trace!("encoded window_update; rem={:?}", self.buf.remaining());
}
} }
Ok(AsyncSink::Ready) Ok(AsyncSink::Ready)

View File

@@ -5,6 +5,7 @@ mod ping_pong;
mod ready; mod ready;
mod settings; mod settings;
mod state; mod state;
mod window_update;
pub use self::connection::{Connection}; pub use self::connection::{Connection};
pub use self::framed_read::FramedRead; pub use self::framed_read::FramedRead;
@@ -13,6 +14,7 @@ pub use self::ping_pong::PingPong;
pub use self::ready::ReadySink; pub use self::ready::ReadySink;
pub use self::settings::Settings; pub use self::settings::Settings;
pub use self::state::State; pub use self::state::State;
pub use self::window_update::WindowUpdate;
use {frame, Peer}; use {frame, Peer};
@@ -83,5 +85,5 @@ pub fn from_server_handshaker<T, P>(transport: Settings<FramedWrite<T>>)
}); });
// Finally, return the constructed `Connection` // Finally, return the constructed `Connection`
connection::new(settings) connection::new(settings, 65_535, 65_535)
} }

View File

@@ -0,0 +1,16 @@
use StreamId;
#[derive(Debug)]
pub enum WindowUpdate {
Connection { increment: usize },
Stream { id: StreamId, increment: usize },
}
impl WindowUpdate {
pub fn increment(&self) -> usize {
match *self {
WindowUpdate::Connection { increment } |
WindowUpdate::Stream { increment, .. } => increment
}
}
}