Restructure proto

The existing code has been moved out and is being copied back piece / by
piece while restructuring the code to (hopefully) be more manageable.
This commit is contained in:
Carl Lerche
2017-08-02 09:42:10 -07:00
committed by GitHub
parent 13d6866ee8
commit 33bdc057d6
36 changed files with 1495 additions and 2964 deletions

View File

@@ -1,7 +1,7 @@
use {frame, proto, Peer, ConnectionError, StreamId};
use http;
use futures::{Future, Poll};
use futures::{Future, Poll, Sink, AsyncSink};
use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, IntoBuf};
@@ -29,21 +29,31 @@ pub fn handshake<T>(io: T) -> Handshake<T, Bytes>
///
/// Returns a future which resolves to the connection value once the H2
/// handshake has been completed.
pub fn handshake2<T, B: IntoBuf>(io: T) -> Handshake<T, B>
pub fn handshake2<T, B>(io: T) -> Handshake<T, B>
where T: AsyncRead + AsyncWrite + 'static,
B: IntoBuf + 'static,
{
use tokio_io::io;
debug!("binding client connection");
let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")
.map(|(io, _)| {
.map_err(ConnectionError::from)
.and_then(|(io, _)| {
debug!("client connection bound");
// Use default local settings for now
proto::from_io(io, Default::default())
})
.map_err(ConnectionError::from);
let mut framed_write = proto::framed_write(io);
let settings = frame::Settings::default();
// Send initial settings frame
match framed_write.start_send(settings.into()) {
Ok(AsyncSink::Ready) => {
Ok(proto::from_framed_write(framed_write))
}
Ok(_) => unreachable!(),
Err(e) => Err(ConnectionError::from(e)),
}
});
Handshake { inner: Box::new(handshake) }
}

View File

@@ -49,9 +49,6 @@ pub struct PushPromise {
}
impl PushPromise {
pub fn stream_id(&self) -> StreamId {
self.stream_id
}
}
#[derive(Debug)]
@@ -177,6 +174,14 @@ impl Headers {
})
}
/// Returns `true` if the frame represents trailers
///
/// Trailers are header frames that contain no pseudo headers.
pub fn is_trailers(&self) -> bool {
self.pseudo.method.is_none() &&
self.pseudo.status.is_none()
}
pub fn stream_id(&self) -> StreamId {
self.stream_id
}

View File

@@ -3,6 +3,8 @@ use error::{ConnectionError, Reason};
use bytes::Bytes;
use std::fmt;
/// A helper macro that unpacks a sequence of 4 bytes found in the buffer with
/// the given identifier, starting at the given offset, into the given integer
/// type. Obviously, the integer type should be able to support at least 4
@@ -54,7 +56,6 @@ pub use self::settings::{
pub const HEADER_LEN: usize = 9;
#[derive(Debug /*, Clone, PartialEq */)]
pub enum Frame<T = Bytes> {
Data(Data<T>),
Headers(Headers),
@@ -66,50 +67,20 @@ pub enum Frame<T = Bytes> {
}
impl<T> Frame<T> {
pub fn is_connection_frame(&self) -> bool {
}
impl<T> fmt::Debug for Frame<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use self::Frame::*;
match self {
&Headers(..) |
&Data(..) |
&PushPromise(..) |
&Reset(..) => false,
&WindowUpdate(ref v) => v.stream_id().is_zero(),
&Ping(_) |
&Settings(_) => true,
}
}
pub fn stream_id(&self) -> StreamId {
use self::Frame::*;
match self {
&Headers(ref v) => v.stream_id(),
&Data(ref v) => v.stream_id(),
&PushPromise(ref v) => v.stream_id(),
&WindowUpdate(ref v) => v.stream_id(),
&Reset(ref v) => v.stream_id(),
&Ping(_) |
&Settings(_) => StreamId::zero(),
}
}
pub fn is_end_stream(&self) -> bool {
use self::Frame::*;
match self {
&Headers(ref v) => v.is_end_stream(),
&Data(ref v) => v.is_end_stream(),
&PushPromise(_) |
&WindowUpdate(_) |
&Ping(_) |
&Settings(_) => false,
&Reset(_) => true,
match *self {
Data(..) => write!(fmt, "Frame::Data(..)"),
Headers(ref frame) => write!(fmt, "Frame::Headers({:?})", frame),
PushPromise(ref frame) => write!(fmt, "Frame::PushPromise({:?})", frame),
Settings(ref frame) => write!(fmt, "Frame::Settings({:?})", frame),
Ping(ref frame) => write!(fmt, "Frame::Ping({:?})", frame),
WindowUpdate(ref frame) => write!(fmt, "Frame::WindowUpdate({:?})", frame),
Reset(ref frame) => write!(fmt, "Frame::Reset({:?})", frame),
}
}
}

View File

@@ -12,10 +12,6 @@ pub struct Ping {
}
impl Ping {
pub fn ping(payload: Payload) -> Ping {
Ping { ack: false, payload }
}
pub fn pong(payload: Payload) -> Ping {
Ping { ack: true, payload }
}

View File

@@ -66,21 +66,10 @@ impl Settings {
}
}
pub fn new(values: SettingSet) -> Settings {
Settings {
flags: SettingsFlags::empty(),
values: values,
}
}
pub fn is_ack(&self) -> bool {
self.flags.is_ack()
}
pub fn into_set(self) -> SettingSet {
self.values
}
pub fn load(head: Head, payload: &[u8]) -> Result<Settings, Error> {
use self::Setting::*;

View File

@@ -61,7 +61,7 @@ pub enum Frame<T, B = Bytes> {
},
PushPromise {
id: StreamId,
promise: (),
promised_id: StreamId,
},
Reset {
id: StreamId,

View File

@@ -1,23 +0,0 @@
use ConnectionError;
use frame::SettingSet;
/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to
/// FramedWrite).
pub trait ApplySettings {
fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>;
fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>;
}
macro_rules! proxy_apply_settings {
($struct:ident $(, $targs:ident)*) => (
impl<T: ApplySettings$(, $targs)*> ApplySettings for $struct<T$(, $targs)*> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
)
}

View File

@@ -1,33 +1,49 @@
use {ConnectionError, Frame};
use client::Client;
use error;
use {ConnectionError, Frame, Peer};
use frame::{self, StreamId};
use proto::*;
use client::Client;
use server::Server;
use bytes::{Bytes, IntoBuf};
use proto::*;
use http::{request, response};
use bytes::{Bytes, IntoBuf};
use tokio_io::{AsyncRead, AsyncWrite};
use std::marker::PhantomData;
/// An H2 connection
#[derive(Debug)]
pub struct Connection<T, P, B: IntoBuf = Bytes> {
inner: Transport<T, B::Buf>,
// Set to `true` as long as the connection is in a valid state.
active: bool,
_phantom: PhantomData<(P, B)>,
// Codec
codec: Codec<T, B::Buf>,
// TODO: Remove <B>
ping_pong: PingPong<B::Buf>,
settings: Settings,
streams: Streams<P>,
_phantom: PhantomData<P>,
}
pub fn new<T, P, B>(transport: Transport<T, B::Buf>)
pub fn new<T, P, B>(codec: Codec<T, B::Buf>)
-> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
// TODO: Actually configure
let streams = Streams::new(streams::Config {
max_remote_initiated: None,
init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
max_local_initiated: None,
init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
});
Connection {
inner: transport,
active: true,
codec: codec,
ping_pong: PingPong::new(),
settings: Settings::new(),
streams: streams,
_phantom: PhantomData,
}
}
@@ -39,40 +55,44 @@ impl<T, P, B> Connection<T, P, B>
{
/// Polls for the next update to a remote flow control window.
pub fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
self.inner.poll_window_update()
self.streams.poll_window_update()
}
/// Increases the capacity of a local flow control window.
pub fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
self.inner.expand_window(id, incr)
///
/// # Panics
///
/// THis function panics if `incr` is not a valid window size.
pub fn expand_window(&mut self, id: StreamId, incr: usize)
-> Result<(), ConnectionError>
{
assert!(incr <= MAX_WINDOW_SIZE as usize);
self.streams.expand_window(id, incr as WindowSize)
}
pub fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.update_local_settings(local)
pub fn update_local_settings(&mut self, _local: frame::SettingSet) -> Result<(), ConnectionError> {
unimplemented!();
}
pub fn remote_initial_window_size(&self) -> u32 {
self.inner.remote_initial_window_size()
unimplemented!();
}
pub fn remote_max_concurrent_streams(&self) -> Option<u32> {
self.inner.remote_max_concurrent_streams()
pub fn remote_max_concurrent_streams(&self) -> Option<usize> {
unimplemented!();
}
pub fn remote_push_enabled(&self) -> Option<bool> {
self.inner.remote_push_enabled()
}
pub fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
pub fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
unimplemented!();
}
pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
try_ready!(self.poll_send_ready());
// TODO: Once there is write buffering, this shouldn't be needed
try_ready!(self.codec.poll_ready());
Ok(().into())
}
pub fn send_data(self,
@@ -81,13 +101,139 @@ impl<T, P, B> Connection<T, P, B>
end_of_stream: bool)
-> sink::Send<Self>
{
trace!("send_data: id={:?}", id);
self.send(Frame::Data {
id,
data,
end_of_stream,
})
}
pub fn start_ping(&mut self, _body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
unimplemented!();
}
// ===== Private =====
/// Returns `Ready` when the `Connection` is ready to receive a frame from
/// the socket.
fn poll_recv_ready(&mut self) -> Poll<(), ConnectionError> {
// Pong, settings ack, and stream refusals are high priority frames to
// send. If the write buffer is full, we stop reading any further frames
// until these high priority writes can be committed to the buffer.
try_ready!(self.ping_pong.send_pending_pong(&mut self.codec));
try_ready!(self.settings.send_pending_ack(&mut self.codec));
try_ready!(self.streams.send_pending_refusal(&mut self.codec));
Ok(().into())
}
/// Returns `Ready` when the `Connection` is ready to accept a frame from
/// the user
///
/// This function is currently used by poll_complete, but at some point it
/// will probably not be required.
fn poll_send_ready(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.poll_recv_ready());
// Ensure all window updates have been sent.
try_ready!(self.streams.send_pending_window_updates(&mut self.codec));
Ok(().into())
}
/// Try to receive the next frame
fn recv_frame(&mut self) -> Poll<Option<Frame<P::Poll>>, ConnectionError> {
use frame::Frame::*;
loop {
// First, ensure that the `Connection` is able to receive a frame
try_ready!(self.poll_recv_ready());
trace!("polling codec");
let frame = match try!(self.codec.poll()) {
Async::Ready(frame) => frame,
Async::NotReady => {
// Receiving new frames may depend on ensuring that the write buffer
// is clear (e.g. if window updates need to be sent), so `poll_complete`
// is called here.
let _ = try!(self.poll_complete());
return Ok(Async::NotReady);
}
};
match frame {
Some(Headers(frame)) => {
trace!("recv HEADERS; frame={:?}", frame);
// Update stream state while ensuring that the headers frame
// can be received
if let Some(frame) = try!(self.streams.recv_headers(frame)) {
let frame = Self::convert_poll_message(frame);
return Ok(Some(frame).into());
}
}
Some(Data(frame)) => {
trace!("recv DATA; frame={:?}", frame);
try!(self.streams.recv_data(&frame));
let frame = Frame::Data {
id: frame.stream_id(),
end_of_stream: frame.is_end_stream(),
data: frame.into_payload(),
};
return Ok(Some(frame).into());
}
Some(Reset(frame)) => {
trace!("recv RST_STREAM; frame={:?}", frame);
try!(self.streams.recv_reset(&frame));
let frame = Frame::Reset {
id: frame.stream_id(),
error: frame.reason(),
};
return Ok(Some(frame).into());
}
Some(PushPromise(frame)) => {
trace!("recv PUSH_PROMISE; frame={:?}", frame);
try!(self.streams.recv_push_promise(frame));
}
Some(Settings(frame)) => {
trace!("recv SETTINGS; frame={:?}", frame);
self.settings.recv_settings(frame);
// TODO: ACK must be sent THEN settings applied.
}
Some(Ping(frame)) => {
trace!("recv PING; frame={:?}", frame);
self.ping_pong.recv_ping(frame);
}
Some(WindowUpdate(frame)) => {
trace!("recv WINDOW_UPDATE; frame={:?}", frame);
try!(self.streams.recv_window_update(frame));
}
None => {
trace!("codec closed");
return Ok(Async::Ready(None));
}
}
}
}
fn convert_poll_message(frame: frame::Headers) -> Frame<P::Poll> {
if frame.is_trailers() {
// TODO: return trailers
unimplemented!();
} else {
Frame::Headers {
id: frame.stream_id(),
end_of_stream: frame.is_end_stream(),
headers: P::convert_poll_message(frame),
}
}
}
}
impl<T, B> Connection<T, Client, B>
@@ -124,6 +270,17 @@ impl<T, B> Connection<T, Server, B>
end_of_stream: end_of_stream,
})
}
pub fn send_push_promise(self,
id: StreamId,
promised_id: StreamId)
-> sink::Send<Self>
{
self.send(Frame::PushPromise {
id,
promised_id,
})
}
}
impl<T, P, B> Stream for Connection<T, P, B>
@@ -135,55 +292,8 @@ impl<T, P, B> Stream for Connection<T, P, B>
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Self::Item>, ConnectionError> {
use frame::Frame::*;
trace!("poll");
if !self.active {
return Err(error::User::Corrupt.into());
}
loop {
let frame = match try!(self.inner.poll()) {
Async::Ready(f) => f,
// XXX is this necessary?
Async::NotReady => {
// Receiving new frames may depend on ensuring that the write buffer
// is clear (e.g. if window updates need to be sent), so `poll_complete`
// is called here.
try_ready!(self.poll_complete());
// If the write buffer is cleared, attempt to poll the underlying
// stream once more because it, may have been made ready.
try_ready!(self.inner.poll())
}
};
trace!("poll; frame={:?}", frame);
let frame = match frame {
Some(Headers(v)) => Frame::Headers {
id: v.stream_id(),
end_of_stream: v.is_end_stream(),
headers: P::convert_poll_message(v),
},
Some(Data(v)) => Frame::Data {
id: v.stream_id(),
end_of_stream: v.is_end_stream(),
data: v.into_payload(),
},
Some(Reset(v)) => Frame::Reset {
id: v.stream_id(),
error: v.reason(),
},
Some(frame) => panic!("unexpected frame; frame={:?}", frame),
None => return Ok(Async::Ready(None)),
};
return Ok(Async::Ready(Some(frame)));
}
// TODO: intercept errors and flag the connection
self.recv_frame()
}
}
@@ -199,14 +309,9 @@ impl<T, P, B> Sink for Connection<T, P, B>
fn start_send(&mut self, item: Self::SinkItem)
-> StartSend<Self::SinkItem, Self::SinkError>
{
trace!("start_send");
// TODO: Ensure connection is not corrupt
if !self.active {
return Err(error::User::Corrupt.into());
}
// Ensure the transport is ready to send a frame before we transform the external
// `Frame` into an internal `frame::Frame`.
// Ensure that the connection is ready to accept a new frame
if !try!(self.poll_ready()).is_ready() {
return Ok(AsyncSink::NotReady(item));
}
@@ -216,12 +321,21 @@ impl<T, P, B> Sink for Connection<T, P, B>
// This is a one-way conversion. By checking `poll_ready` first (above),
// it's already been determined that the inner `Sink` can accept the item.
// If the item is rejected, then there is a bug.
let f = P::convert_send_message(id, headers, end_of_stream);
frame::Frame::Headers(f)
let frame = P::convert_send_message(id, headers, end_of_stream);
// Update the stream state
self.streams.send_headers(&frame)?;
frame::Frame::Headers(frame)
}
Frame::Data { id, data, end_of_stream } => {
frame::Data::from_buf(id, data.into_buf(), end_of_stream).into()
let frame = frame::Data::from_buf(
id, data.into_buf(), end_of_stream);
self.streams.send_data(&frame)?;
frame.into()
}
Frame::Reset { id, error } => frame::Reset::new(id, error).into(),
@@ -240,13 +354,21 @@ impl<T, P, B> Sink for Connection<T, P, B>
_ => unimplemented!(),
};
let res = self.inner.start_send(frame)?;
// Write the frame to the socket
let res = self.codec.start_send(frame)?;
// Ensure that the write was accepted. This is always true due to the
// check at the top of the function
assert!(res.is_ready());
// Return success
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
trace!("poll_complete");
self.inner.poll_complete()
try_ready!(self.poll_send_ready());
try_ready!(self.codec.poll_complete());
Ok(().into())
}
}

View File

@@ -1,45 +0,0 @@
use ConnectionError;
use proto::*;
/// Exposes flow control states to "upper" layers of the transport (i.e. above
/// FlowControl).
pub trait ControlFlowSend {
/// Polls for the next window update from the remote.
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError>;
}
pub trait ControlFlowRecv {
/// Increases the local receive capacity of a stream.
///
/// This may cause a window update to be sent to the remote.
///
/// Fails if the given stream is not active.
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>;
}
macro_rules! proxy_control_flow_send {
($outer:ident) => (
impl<T: ControlFlowSend> ControlFlowSend for $outer<T> {
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
self.inner.poll_window_update()
}
}
)
}
macro_rules! proxy_control_flow_recv {
($outer:ident) => (
impl<T: ControlFlowRecv> ControlFlowRecv for $outer<T> {
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
self.inner.expand_window(id, incr)
}
}
)
}
macro_rules! proxy_control_flow {
($outer:ident) => (
proxy_control_flow_recv!($outer);
proxy_control_flow_send!($outer);
)
}

View File

@@ -1,21 +0,0 @@
use ConnectionError;
use proto::*;
pub trait ControlPing {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError>;
fn take_pong(&mut self) -> Option<PingPayload>;
}
macro_rules! proxy_control_ping {
($struct:ident $(, $targs:ident)*) => (
impl<T: ControlPing$(, $targs)*> ControlPing for $struct<T$(, $targs)*> {
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
self.inner.start_ping(body)
}
fn take_pong(&mut self) -> Option<PingPayload> {
self.inner.take_pong()
}
}
)
}

View File

@@ -1,13 +0,0 @@
use ConnectionError;
use frame::SettingSet;
use proto::*;
/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and
/// above---Connection).
pub trait ControlSettings {
fn update_local_settings(&mut self, set: SettingSet) -> Result<(), ConnectionError>;
fn remote_push_enabled(&self) -> Option<bool>;
fn remote_max_concurrent_streams(&self) -> Option<u32>;
fn remote_initial_window_size(&self) -> WindowSize;
}

View File

@@ -1,23 +0,0 @@
use proto::*;
/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up
/// to Connection).
pub trait ControlStreams {
fn streams(&self) -> &Streams;
fn streams_mut(&mut self) -> &mut Streams;
}
macro_rules! proxy_control_streams {
($outer:ident) => (
impl<T: ControlStreams> ControlStreams for $outer<T> {
fn streams(&self) -> &Streams {
self.inner.streams()
}
fn streams_mut(&mut self) -> &mut Streams {
self.inner.streams_mut()
}
}
)
}

View File

@@ -0,0 +1,8 @@
#[derive(Debug)]
pub struct FlowControl;
impl FlowControl {
pub fn new() -> Self {
FlowControl
}
}

View File

@@ -1,223 +0,0 @@
use {error, ConnectionError, FrameSize};
use frame::{self, Frame};
use proto::*;
use std::collections::VecDeque;
/// Tracks local flow control windows.
#[derive(Debug)]
pub struct FlowControlRecv<T> {
inner: T,
initial_window_size: WindowSize,
/// Tracks the connection-level flow control window for receiving data from the
/// remote.
connection: FlowControlState,
/// Holds the list of streams on which local window updates may be sent.
// XXX It would be cool if this didn't exist.
pending_streams: VecDeque<StreamId>,
/// If a window update can't be sent immediately, it may need to be saved to be sent
/// later.
sending: Option<frame::WindowUpdate>,
}
impl<T, U> FlowControlRecv<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams
{
pub fn new(initial_window_size: WindowSize, inner: T) -> FlowControlRecv<T> {
FlowControlRecv {
inner,
initial_window_size,
connection: FlowControlState::with_initial_size(initial_window_size),
pending_streams: VecDeque::new(),
sending: None,
}
}
}
/// Exposes a public upward API for flow control.
impl<T: ControlStreams> ControlFlowRecv for FlowControlRecv<T> {
fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> {
let added = match self.streams_mut().recv_flow_controller(id) {
None => false,
Some(mut fc) => {
fc.expand_window(incr);
true
}
};
if added {
if !id.is_zero() {
self.pending_streams.push_back(id);
}
Ok(())
} else if let Some(rst) = self.streams().get_reset(id) {
Err(error::User::StreamReset(rst).into())
} else {
Err(error::User::InvalidStreamId.into())
}
}
}
impl<T, U> FlowControlRecv<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
/// Returns ready when there are no pending window updates to send.
fn poll_send_local(&mut self) -> Poll<(), ConnectionError> {
if let Some(f) = self.sending.take() {
try_ready!(self.try_send(f));
}
if let Some(incr) = self.connection.apply_window_update() {
try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr)));
}
while let Some(id) = self.pending_streams.pop_front() {
if self.streams().get_reset(id).is_none() {
let update = self.streams_mut().recv_flow_controller(id).and_then(|s| s.apply_window_update());
if let Some(incr) = update {
try_ready!(self.try_send(frame::WindowUpdate::new(id, incr)));
}
}
}
Ok(Async::Ready(()))
}
fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> {
if self.inner.start_send(f.into())?.is_not_ready() {
self.sending = Some(f);
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
}
}
/// Ensures that the remote does not violate the local peer's flow controller.
impl<T> Stream for FlowControlRecv<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: ControlStreams,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
trace!("poll");
loop {
match try_ready!(self.inner.poll()) {
Some(Frame::Data(v)) => {
let id = v.stream_id();
let sz = v.payload().len() as FrameSize;
// Ensure there's enough capacity on the connection before acting on
// the stream.
if !self.connection.check_window(sz) {
// TODO this should cause a GO_AWAY
return Err(error::Reason::FlowControlError.into());
}
let fc = self.inner.streams_mut().recv_flow_controller(id)
.expect("receiving data with no flow controller");
if fc.claim_window(sz).is_err() {
// TODO this should cause a GO_AWAY
return Err(error::Reason::FlowControlError.into());
}
self.connection.claim_window(sz)
.expect("local connection flow control error");
return Ok(Async::Ready(Some(Frame::Data(v))));
}
v => return Ok(Async::Ready(v)),
}
}
}
}
/// Sends pending window updates before operating on the underlying transport.
impl<T, U> Sink for FlowControlRecv<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
T: ControlStreams,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
if self.poll_send_local()?.is_not_ready() {
return Ok(AsyncSink::NotReady(frame));
}
self.inner.start_send(frame)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
try_ready!(self.poll_send_local());
self.inner.poll_complete()
}
}
/// Sends pending window updates before checking the underyling transport's readiness.
impl<T, U> ReadySink for FlowControlRecv<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
T: ControlStreams,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.poll_send_local());
self.inner.poll_ready()
}
}
/// Applies an update to an endpoint's initial window size.
///
/// Per RFC 7540 §6.9.2:
///
/// > In addition to changing the flow-control window for streams that are not yet
/// > active, a SETTINGS frame can alter the initial flow-control window size for
/// > streams with active flow-control windows (that is, streams in the "open" or
/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE
/// > changes, a receiver MUST adjust the size of all stream flow-control windows that
/// > it maintains by the difference between the new value and the old value.
/// >
/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a
/// > flow-control window to become negative. A sender MUST track the negative
/// > flow-control window and MUST NOT send new flow-controlled frames until it
/// > receives WINDOW_UPDATE frames that cause the flow-control window to become
/// > positive.
impl<T> ApplySettings for FlowControlRecv<T>
where T: ApplySettings,
T: ControlStreams
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)?;
if let Some(new_window_size) = set.initial_window_size() {
let old_window_size = self.initial_window_size;
if new_window_size == old_window_size {
return Ok(());
}
self.streams_mut().update_inital_recv_window_size(old_window_size, new_window_size);
self.initial_window_size = new_window_size;
}
Ok(())
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
proxy_control_flow_send!(FlowControlRecv);
proxy_control_ping!(FlowControlRecv);
proxy_control_streams!(FlowControlRecv);

View File

@@ -1,209 +0,0 @@
use {error, ConnectionError, FrameSize};
use frame::{self, Frame};
use proto::*;
use std::collections::VecDeque;
/// Tracks remote flow control windows.
#[derive(Debug)]
pub struct FlowControlSend<T> {
inner: T,
initial_window_size: WindowSize,
/// Tracks the onnection-level flow control window for receiving data from the remote.
connection: FlowControlState,
/// Holds the list of streams on which local window updates may be sent.
// XXX It would be cool if this didn't exist.
pending_streams: VecDeque<StreamId>,
/// When `poll_window_update` is not ready, then the calling task is saved to
/// be notified later. Access to poll_window_update must not be shared across tasks,
/// as we only track a single task (and *not* i.e. a task per stream id).
blocked: Option<task::Task>,
}
impl<T, U> FlowControlSend<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams
{
pub fn new(initial_window_size: WindowSize, inner: T) -> FlowControlSend<T> {
FlowControlSend {
inner,
initial_window_size,
connection: FlowControlState::with_initial_size(initial_window_size),
pending_streams: VecDeque::new(),
blocked: None,
}
}
}
/// Exposes a public upward API for flow control.
impl<T: ControlStreams> ControlFlowSend for FlowControlSend<T> {
fn poll_window_update(&mut self) -> Poll<WindowUpdate, ConnectionError> {
// This biases connection window updates, which probably makes sense.
if let Some(incr) = self.connection.apply_window_update() {
return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr)));
}
// TODO this should probably account for stream priority?
while let Some(id) = self.pending_streams.pop_front() {
if let Some(mut flow) = self.streams_mut().send_flow_controller(id) {
if let Some(incr) = flow.apply_window_update() {
return Ok(Async::Ready(WindowUpdate::new(id, incr)));
}
}
}
self.blocked = Some(task::current());
return Ok(Async::NotReady);
}
}
/// Applies remote window updates as they are received.
impl<T> Stream for FlowControlSend<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: ControlStreams,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
trace!("poll");
loop {
match try_ready!(self.inner.poll()) {
Some(Frame::WindowUpdate(v)) => {
let id = v.stream_id();
let sz = v.size_increment();
if id.is_zero() {
self.connection.expand_window(sz);
} else {
// The remote may send window updates for streams that the local
// now considers closed. It's okay.
if let Some(fc) = self.streams_mut().send_flow_controller(id) {
fc.expand_window(sz);
}
}
}
f => return Ok(Async::Ready(f)),
}
}
}
}
/// Tracks the flow control windows for sent davta frames.
///
/// If sending a frame would violate the remote's window, start_send fails with
/// `FlowControlViolation`.
impl<T, U> Sink for FlowControlSend<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
T: ControlStreams,
U: Buf,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, frame: Frame<U>) -> StartSend<T::SinkItem, T::SinkError> {
debug_assert!(self.streams().get_reset(frame.stream_id()).is_none());
// Ensures that the underlying transport is will accept the frame. It's important
// that this be checked before claiming capacity from the flow controllers.
if self.poll_ready()?.is_not_ready() {
return Ok(AsyncSink::NotReady(frame));
}
// Ensure that an outbound data frame does not violate the remote's flow control
// window.
if let &Frame::Data(ref v) = &frame {
let sz = v.payload().remaining() as FrameSize;
// Ensure there's enough capacity on the connection before acting on the
// stream.
if !self.connection.check_window(sz) {
return Err(error::User::FlowControlViolation.into());
}
// Ensure there's enough capacity on stream.
let mut fc = self.inner.streams_mut().send_flow_controller(v.stream_id())
.expect("no remote stream for data frame");
if fc.claim_window(sz).is_err() {
return Err(error::User::FlowControlViolation.into())
}
self.connection.claim_window(sz)
.expect("remote connection flow control error");
}
let res = self.inner.start_send(frame)?;
assert!(res.is_ready());
Ok(res)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
}
}
/// Proxy.
impl<T, U> ReadySink for FlowControlSend<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
T: ControlStreams,
U: Buf,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_ready()
}
}
/// Applies an update to the remote endpoint's initial window size.
///
/// Per RFC 7540 §6.9.2:
///
/// > In addition to changing the flow-control window for streams that are not yet
/// > active, a SETTINGS frame can alter the initial flow-control window size for
/// > streams with active flow-control windows (that is, streams in the "open" or
/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE
/// > changes, a receiver MUST adjust the size of all stream flow-control windows that
/// > it maintains by the difference between the new value and the old value.
/// >
/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a
/// > flow-control window to become negative. A sender MUST track the negative
/// > flow-control window and MUST NOT send new flow-controlled frames until it
/// > receives WINDOW_UPDATE frames that cause the flow-control window to become
/// > positive.
impl<T> ApplySettings for FlowControlSend<T>
where T: ApplySettings,
T: ControlStreams
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)?;
if let Some(new_window_size) = set.initial_window_size() {
let old_window_size = self.initial_window_size;
if new_window_size == old_window_size {
return Ok(());
}
self.streams_mut().update_inital_send_window_size(old_window_size, new_window_size);
self.initial_window_size = new_window_size;
}
Ok(())
}
}
proxy_control_flow_recv!(FlowControlSend);
proxy_control_ping!(FlowControlSend);
proxy_control_streams!(FlowControlSend);

View File

@@ -1,181 +0,0 @@
use proto::WindowSize;
#[derive(Clone, Copy, Debug)]
pub struct WindowUnderflow;
pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535;
#[derive(Copy, Clone, Debug)]
pub struct FlowControlState {
/// Amount that may be claimed.
window_size: WindowSize,
/// Amount to be removed by future increments.
underflow: WindowSize,
/// The amount that has been incremented but not yet advertised (to the application or
/// the remote).
next_window_update: WindowSize,
}
impl Default for FlowControlState {
fn default() -> Self {
Self::with_initial_size(DEFAULT_INITIAL_WINDOW_SIZE)
}
}
impl FlowControlState {
pub fn with_initial_size(window_size: WindowSize) -> FlowControlState {
FlowControlState {
window_size,
underflow: 0,
next_window_update: 0,
}
}
// pub fn with_next_update(next_window_update: WindowSize) -> FlowControlState {
// FlowControlState {
// window_size: 0,
// underflow: 0,
// next_window_update,
// }
// }
/// Reduce future capacity of the window.
///
/// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE.
pub fn shrink_window(&mut self, decr: WindowSize) {
if decr < self.next_window_update {
self.next_window_update -= decr
} else {
self.underflow += decr - self.next_window_update;
self.next_window_update = 0;
}
}
/// Returns true iff `claim_window(sz)` would return succeed.
pub fn check_window(&mut self, sz: WindowSize) -> bool {
sz <= self.window_size
}
/// Claims the provided amount from the window, if there is enough space.
///
/// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than
/// have been previously claimed.
pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> {
if !self.check_window(sz) {
return Err(WindowUnderflow);
}
self.window_size -= sz;
Ok(())
}
/// Increase the _unadvertised_ window capacity.
pub fn expand_window(&mut self, sz: WindowSize) {
if sz <= self.underflow {
self.underflow -= sz;
return;
}
let added = sz - self.underflow;
self.next_window_update += added;
self.underflow = 0;
}
/// Obtains and applies an unadvertised window update.
pub fn apply_window_update(&mut self) -> Option<WindowSize> {
if self.next_window_update == 0 {
return None;
}
let incr = self.next_window_update;
self.next_window_update = 0;
self.window_size += incr;
Some(incr)
}
}
#[test]
fn test_with_initial_size() {
let mut fc = FlowControlState::with_initial_size(10);
fc.expand_window(8);
assert_eq!(fc.window_size, 10);
assert_eq!(fc.next_window_update, 8);
assert_eq!(fc.apply_window_update(), Some(8));
assert_eq!(fc.window_size, 18);
assert_eq!(fc.next_window_update, 0);
assert!(fc.claim_window(13).is_ok());
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 0);
assert!(fc.apply_window_update().is_none());
}
// #[test]
// fn test_with_next_update() {
// let mut fc = FlowControlState::with_next_update(10);
//
// fc.expand_window(8);
// assert_eq!(fc.window_size, 0);
// assert_eq!(fc.next_window_update, 18);
//
// assert_eq!(fc.apply_window_update(), Some(18));
// assert_eq!(fc.window_size, 18);
// assert_eq!(fc.next_window_update, 0);
// }
#[test]
fn test_grow_accumulates() {
let mut fc = FlowControlState::with_initial_size(5);
// Updates accumulate, though the window is not made immediately available. Trying to
// claim data not returned by apply_window_update results in an underflow.
fc.expand_window(2);
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 2);
fc.expand_window(6);
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 8);
assert!(fc.claim_window(13).is_err());
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 8);
assert_eq!(fc.apply_window_update(), Some(8));
assert_eq!(fc.window_size, 13);
assert_eq!(fc.next_window_update, 0);
assert!(fc.claim_window(13).is_ok());
assert_eq!(fc.window_size, 0);
assert_eq!(fc.next_window_update, 0);
}
#[test]
fn test_shrink() {
let mut fc = FlowControlState::with_initial_size(5);
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 0);
fc.expand_window(3);
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 3);
assert_eq!(fc.underflow, 0);
fc.shrink_window(8);
assert_eq!(fc.window_size, 5);
assert_eq!(fc.next_window_update, 0);
assert_eq!(fc.underflow, 5);
assert!(fc.claim_window(5).is_ok());
assert_eq!(fc.window_size, 0);
assert_eq!(fc.next_window_update, 0);
assert_eq!(fc.underflow, 5);
fc.expand_window(8);
assert_eq!(fc.window_size, 0);
assert_eq!(fc.next_window_update, 3);
assert_eq!(fc.underflow, 0);
}

View File

@@ -1,7 +1,7 @@
use {hpack, ConnectionError};
use frame::{self, Frame, Kind};
use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE;
use proto::{ApplySettings, ReadySink};
use proto::*;
use futures::*;
@@ -105,6 +105,7 @@ impl<T> FramedRead<T> {
}
}
/*
impl<T: ApplySettings> ApplySettings for FramedRead<T> {
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.get_mut().apply_local_settings(set)
@@ -114,6 +115,7 @@ impl<T: ApplySettings> ApplySettings for FramedRead<T> {
self.inner.get_mut().apply_remote_settings(set)
}
}
*/
impl<T> Stream for FramedRead<T>
where T: AsyncRead,
@@ -151,8 +153,8 @@ impl<T: Sink> Sink for FramedRead<T> {
}
}
impl<T: ReadySink> ReadySink for FramedRead<T> {
fn poll_ready(&mut self) -> Poll<(), Self::SinkError> {
impl<T: AsyncWrite, B: Buf> FramedRead<FramedWrite<T, B>> {
pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
self.inner.get_mut().poll_ready()
}
}

View File

@@ -1,6 +1,5 @@
use {hpack, ConnectionError, FrameSize};
use frame::{self, Frame};
use proto::{ApplySettings, ReadySink};
use futures::*;
use tokio_io::{AsyncRead, AsyncWrite};
@@ -65,6 +64,19 @@ impl<T, B> FramedWrite<T, B>
}
}
pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
if !self.has_capacity() {
// Try flushing
try!(self.poll_complete());
if !self.has_capacity() {
return Ok(Async::NotReady);
}
}
Ok(Async::Ready(()))
}
fn has_capacity(&self) -> bool {
self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY
}
@@ -78,16 +90,6 @@ impl<T, B> FramedWrite<T, B>
}
}
impl<T, B> ApplySettings for FramedWrite<T, B> {
fn apply_local_settings(&mut self, _set: &frame::SettingSet) -> Result<(), ConnectionError> {
Ok(())
}
fn apply_remote_settings(&mut self, _set: &frame::SettingSet) -> Result<(), ConnectionError> {
Ok(())
}
}
impl<T, B> Sink for FramedWrite<T, B>
where T: AsyncWrite,
B: Buf,
@@ -102,6 +104,8 @@ impl<T, B> Sink for FramedWrite<T, B>
return Ok(AsyncSink::NotReady(item));
}
trace!("send; frame={:?}", item);
match item {
Frame::Data(mut v) => {
if v.payload().remaining() >= CHAIN_THRESHOLD {
@@ -186,24 +190,6 @@ impl<T, B> Sink for FramedWrite<T, B>
}
}
impl<T, B> ReadySink for FramedWrite<T, B>
where T: AsyncWrite,
B: Buf,
{
fn poll_ready(&mut self) -> Poll<(), Self::SinkError> {
if !self.has_capacity() {
// Try flushing
try!(self.poll_complete());
if !self.has_capacity() {
return Ok(Async::NotReady);
}
}
Ok(Async::Ready(()))
}
}
impl<T: Stream, B> Stream for FramedWrite<T, B> {
type Item = T::Item;
type Error = T::Error;

View File

@@ -1,131 +0,0 @@
use {ConnectionError, Peer};
use frame::{self, Frame};
use proto::{self, Connection};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
use futures::{Future, Sink, Stream, Poll, Async, AsyncSink};
use std::marker::PhantomData;
/// Implements the settings component of the initial H2 handshake
pub struct Handshake<T, P> {
// Upstream transport
inner: Option<Inner<T>>,
// True when the local settings have been sent
settings_sent: bool,
// Peer
peer: PhantomData<P>,
}
struct Inner<T> {
// Upstream transport
framed: proto::Framed<T>,
// Our settings
local: frame::SettingSet,
}
impl<T, P> Handshake<T, P>
where T: AsyncRead + AsyncWrite,
{
/// Initiate an HTTP/2.0 handshake.
pub fn new(io: T, local: frame::SettingSet) -> Self {
// Delimit the frames
let framed_read = length_delimited::Builder::new()
.big_endian()
.length_field_length(3)
.length_adjustment(9)
.num_skip(0) // Don't skip the header
.new_read(io);
// Map to `Frame` types
let framed_read = proto::FramedRead::new(framed_read);
// Frame encoder
let mut framed = proto::FramedWrite::new(framed_read);
Handshake {
inner: Some(Inner {
framed: framed,
local: local,
}),
settings_sent: false,
peer: PhantomData,
}
}
/// Returns a reference to the local settings.
///
/// # Panics
///
/// Panics if `HandshakeInner` has already been consumed.
fn local(&self) -> &frame::SettingSet {
&self.inner.as_ref().unwrap().local
}
/// Returns a mutable reference to `HandshakeInner`.
///
/// # Panics
///
/// Panics if `HandshakeInner` has already been consumed.
fn inner_mut(&mut self) -> &mut proto::Framed<T> {
&mut self.inner.as_mut().unwrap().framed
}
}
// Either a client or server. satisfied when we have sent a SETTINGS frame and
// have sent an ACK for the remote's settings.
impl<T, P> Future for Handshake<T, P>
where T: AsyncRead + AsyncWrite,
P: Peer,
{
type Item = Connection<T, P>;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if !self.settings_sent {
let frame = frame::Settings::new(self.local().clone()).into();
if let AsyncSink::NotReady(_) = try!(self.inner_mut().start_send(frame)) {
// This shouldn't really happen, but if it does, try again
// later.
return Ok(Async::NotReady);
}
// Try flushing...
try!(self.inner_mut().poll_complete());
self.settings_sent = true;
}
match try_ready!(self.inner_mut().poll()) {
Some(Frame::Settings(v)) => {
if v.is_ack() {
// TODO: unexpected ACK, protocol error
unimplemented!();
} else {
let remote = v.into_set();
let inner = self.inner.take().unwrap();
// Add ping/pong handler
let ping_pong = proto::PingPong::new(inner.framed);
// Add settings handler
let settings = proto::Settings::new(
ping_pong, inner.local, remote);
// Finally, convert to the `Connection`
let connection = settings.into();
return Ok(Async::Ready(connection));
}
}
// TODO: handle handshake failure
_ => unimplemented!(),
}
}
}

View File

@@ -1,212 +1,27 @@
use {frame, Peer, StreamId};
use error::Reason;
use frame::Frame;
use bytes::{Buf, IntoBuf};
use futures::*;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
macro_rules! proxy_stream {
($struct:ident $(, $targs:ident)*) => (
impl<T: Stream$(, $targs)*> Stream for $struct<T$(, $targs)*> {
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
self.inner.poll()
}
}
)
}
macro_rules! proxy_sink {
($struct:ident $(, $targs:ident)*) => (
impl<T, U$(, $targs)*> Sink for $struct<T$(, $targs)*>
where T: Sink<SinkItem = frame::Frame<U>, SinkError = ConnectionError>
{
type SinkItem = frame::Frame<U>;
type SinkError = ConnectionError;
fn start_send(&mut self, it: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
self.inner.start_send(it)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
}
}
)
}
macro_rules! proxy_ready_sink {
($struct:ident $(, $targs:ident)*$(; $constraint:ident)*) => (
impl<T, U$(, $targs)*> ReadySink for $struct<T$(, $targs)*>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink $(+ $constraint)*
{
fn poll_ready(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_ready()
}
}
)
}
// First, pull in the internal interfaces that support macros used throughout this module.
#[macro_use]
mod apply_settings;
#[macro_use]
mod control_flow;
#[macro_use]
mod control_ping;
mod control_settings;
#[macro_use]
mod control_streams;
use self::apply_settings::ApplySettings;
use self::control_flow::{ControlFlowRecv, ControlFlowSend};
use self::control_ping::ControlPing;
use self::control_settings::ControlSettings;
use self::control_streams::ControlStreams;
mod connection;
mod flow_control_recv;
mod flow_control_send;
mod flow_control_state;
mod framed_read;
mod framed_write;
mod ping_pong;
mod ready;
mod settings;
mod stream_recv_close;
mod stream_recv_open;
mod stream_send_close;
mod stream_send_open;
mod stream_state;
mod stream_states;
mod state;
mod streams;
pub use self::connection::Connection;
use self::flow_control_recv::FlowControlRecv;
use self::flow_control_send::FlowControlSend;
use self::flow_control_state::FlowControlState;
use self::framed_read::FramedRead;
use self::framed_write::FramedWrite;
use self::ping_pong::PingPong;
use self::ready::ReadySink;
use self::settings::Settings;
use self::stream_recv_close::StreamRecvClose;
use self::stream_recv_open::StreamRecvOpen;
use self::stream_send_close::StreamSendClose;
use self::stream_send_open::StreamSendOpen;
use self::stream_states::{StreamStates, Streams};
use self::streams::Streams;
/// Represents the internals of an HTTP/2 connection.
///
/// A transport consists of several layers (_transporters_) and is arranged from _top_
/// (near the application) to _bottom_ (near the network). Each transporter implements a
/// Stream of frames received from the remote, and a ReadySink of frames sent to the
/// remote.
///
/// ## Transport Layers
///
/// ### `Settings`
///
/// - Receives remote settings frames and applies the settings downward through the
/// transport (via the ApplySettings trait) before responding with acknowledgements.
/// - Exposes ControlSettings up towards the application and transmits local settings to
/// the remote.
///
/// ### The stream transport
///
/// The states of all HTTP/2 connections are stored centrally in the `StreamStates` at the
/// bottom of the stream transport. Several modules above this access this state via the
/// `ControlStreams` API to drive changes to the stream state. In each direction (send
/// from local to remote, and recv from remote to local), there is an Stream\*Open module
/// responsible for initializing new streams and ensuring that frames do not violate
/// stream state. Then, there are modules that operate on streams (for instance,
/// FlowControl). Finally, a Stream\*Close module is responsible for acting on END_STREAM
/// frames to ensure that stream states are not closed before work is complete.
///
/// #### `StreamSendOpen`
///
/// - Initializes streams initiated by the local peer.
/// - Ensures that frames sent from the local peer are appropriate for the stream's state.
/// - Ensures that the remote's max stream concurrency is not violated.
///
/// #### `FlowControlSend`
///
/// - Tracks sent data frames against the remote stream and connection flow control
/// windows.
/// - Tracks remote settings updates to SETTINGS_INITIAL_WINDOW_SIZE.
/// - Exposes `ControlFlowSend` upwards.
/// - Tracks received window updates against the remote stream and connection flow
/// control windows so that upper layers may poll for updates.
///
/// #### `StreamSendClose`
///
/// - Updates the stream state for frames sent with END_STREAM.
///
/// #### `StreamRecvClose`
///
/// - Updates the stream state for frames received with END_STREAM.
///
/// #### `FlowControlRecv`
///
/// - Tracks received data frames against the local stream and connection flow control
/// windows.
/// - Tracks remote settings updates to SETTINGS_INITIAL_WINDOW_SIZE.
/// - Exposes `ControlFlowRecv` upwards.
/// - Sends window updates for the local stream and connection flow control windows as
/// instructed by upper layers.
///
/// #### `StreamRecvOpen`
///
/// - Initializes streams initiated by the remote peer.
/// - Ensures that frames received from the remote peer are appropriate for the stream's
/// state.
/// - Ensures that the local peer's max stream concurrency is not violated.
/// - Emits StreamRefused resets to the remote.
///
/// #### `StreamStates`
///
/// - Holds the state of all local & remote active streams.
/// - Holds the cause of all reset/closed streams.
/// - Exposes `ControlStreams` so that upper layers may share stream state.
///
/// ### `PingPong`
///
/// - Acknowleges PINGs from the remote.
/// - Exposes ControlPing that allows the application side to send ping requests to the
/// remote. Acknowledgements from the remoe are queued to be consumed by the
/// application.
///
/// ### FramedRead
///
/// - Decodes frames from bytes.
///
/// ### FramedWrite
///
/// - Encodes frames to bytes.
///
type Transport<T, B>=
Settings<
Streams2<
PingPong<
Codec<T, B>,
B>>>;
use {StreamId, Peer};
use error::Reason;
use frame::Frame;
// TODO: rename
type Streams2<T> =
StreamSendOpen<
FlowControlSend<
StreamSendClose<
StreamRecvClose<
FlowControlRecv<
StreamRecvOpen<
StreamStates<T>>>>>>>;
type Codec<T, B> =
FramedRead<
FramedWrite<T, B>>;
use futures::*;
use bytes::{Buf, IntoBuf};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::length_delimited;
pub type PingPayload = [u8; 8];
@@ -215,12 +30,55 @@ pub type WindowSize = u32;
#[derive(Debug, Copy, Clone)]
pub struct WindowUpdate {
stream_id: StreamId,
increment: WindowSize
increment: WindowSize,
}
type Codec<T, B> =
FramedRead<
FramedWrite<T, B>>;
// Constants
pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535;
pub const MAX_WINDOW_SIZE: WindowSize = ::std::u32::MAX;
/// Create a transport prepared to handle the server handshake.
///
/// When the server is performing the handshake, it is able to only send
/// `Settings` frames and is expected to receive the client preface as a byte
/// stream. To represent this, `Settings<FramedWrite<T>>` is returned.
pub fn framed_write<T, B>(io: T) -> FramedWrite<T, B>
where T: AsyncRead + AsyncWrite,
B: Buf,
{
FramedWrite::new(io)
}
/// Create a full H2 transport from the server handshaker
pub fn from_framed_write<T, P, B>(framed_write: FramedWrite<T, B::Buf>)
-> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
// Delimit the frames.
let framed = length_delimited::Builder::new()
.big_endian()
.length_field_length(3)
.length_adjustment(9)
.num_skip(0) // Don't skip the header
.new_read(framed_write);
let codec = FramedRead::new(framed);
connection::new(codec)
}
impl WindowUpdate {
pub fn new(stream_id: StreamId, increment: WindowSize) -> WindowUpdate {
WindowUpdate { stream_id, increment }
WindowUpdate {
stream_id,
increment
}
}
pub fn stream_id(&self) -> StreamId {
@@ -231,83 +89,3 @@ impl WindowUpdate {
self.increment
}
}
/// Create a full H2 transport from an I/O handle.
///
/// This is called as the final step of the client handshake future.
pub fn from_io<T, P, B>(io: T, local_settings: frame::SettingSet)
-> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
let framed_write: FramedWrite<_, B::Buf> = FramedWrite::new(io);
// To avoid code duplication, we're going to go this route. It is a bit
// weird, but oh well...
//
// We first create a Settings directly around a framed writer
let transport = Settings::new(framed_write, local_settings.clone());
from_server_handshaker(transport, local_settings)
}
/// Create a transport prepared to handle the server handshake.
///
/// When the server is performing the handshake, it is able to only send
/// `Settings` frames and is expected to receive the client preface as a byte
/// stream. To represent this, `Settings<FramedWrite<T>>` is returned.
pub fn server_handshaker<T, B>(io: T, settings: frame::SettingSet)
-> Settings<FramedWrite<T, B>>
where T: AsyncRead + AsyncWrite,
B: Buf,
{
let framed_write = FramedWrite::new(io);
Settings::new(framed_write, settings)
}
/// Create a full H2 transport from the server handshaker
pub fn from_server_handshaker<T, P, B>(settings: Settings<FramedWrite<T, B::Buf>>,
local_settings: frame::SettingSet)
-> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
let initial_recv_window_size = local_settings.initial_window_size().unwrap_or(65_535);
let local_max_concurrency = local_settings.max_concurrent_streams();
let initial_send_window_size = settings.remote_initial_window_size();
let remote_max_concurrency = settings.remote_max_concurrent_streams();
// Replace Settings' writer with a full transport.
let transport = settings.swap_inner(|io| {
// Delimit the frames.
let framed = length_delimited::Builder::new()
.big_endian()
.length_field_length(3)
.length_adjustment(9)
.num_skip(0) // Don't skip the header
.new_read(io);
trace!("composing transport");
StreamSendOpen::new(
initial_send_window_size,
remote_max_concurrency,
FlowControlSend::new(
initial_send_window_size,
StreamSendClose::new(
StreamRecvClose::new(
FlowControlRecv::new(
initial_recv_window_size,
StreamRecvOpen::new(
initial_recv_window_size,
local_max_concurrency,
StreamStates::new::<P>(
PingPong::new(
FramedRead::new(framed)))))))))
});
connection::new(transport)
}

View File

@@ -1,362 +1,60 @@
use ConnectionError;
use frame::{Frame, Ping, SettingSet};
use frame::Ping;
use proto::*;
/// Acknowledges ping requests from the remote.
#[derive(Debug)]
pub struct PingPong<T, U> {
inner: T,
sending_pong: Option<Frame<U>>,
pub struct PingPong<B> {
// TODO: this doesn't need to save the entire frame
sending_pong: Option<Frame<B>>,
received_pong: Option<PingPayload>,
// TODO: factor this out
blocked_ping: Option<task::Task>,
expecting_pong: bool,
}
impl<T, U> PingPong<T, U>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
impl<B> PingPong<B>
where B: Buf,
{
pub fn new(inner: T) -> Self {
pub fn new() -> Self {
PingPong {
inner,
sending_pong: None,
received_pong: None,
expecting_pong: false,
blocked_ping: None,
}
}
}
impl<T, U> ControlPing for PingPong<T, U>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
{
fn start_ping(&mut self, body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
if self.inner.poll_ready()?.is_not_ready() {
return Ok(AsyncSink::NotReady(body));
}
/// Process a ping
pub fn recv_ping(&mut self, ping: Ping) {
// The caller should always check that `send_pongs` returns ready before
// calling `recv_ping`.
assert!(self.sending_pong.is_none());
// Only allow one in-flight ping.
if self.expecting_pong || self.received_pong.is_some() {
self.blocked_ping = Some(task::current());
return Ok(AsyncSink::NotReady(body))
}
if ping.is_ack() {
// Save acknowledgements to be returned from take_pong().
self.received_pong = Some(ping.into_payload());
match self.inner.start_send(Ping::ping(body).into())? {
AsyncSink::NotReady(_) => {
// By virtual of calling inner.poll_ready(), this must not happen.
unreachable!()
}
AsyncSink::Ready => {
self.expecting_pong = true;
Ok(AsyncSink::Ready)
if let Some(task) = self.blocked_ping.take() {
task.notify();
}
} else {
// Save the ping's payload to be sent as an acknowledgement.
let pong = Ping::pong(ping.into_payload());
self.sending_pong = Some(pong.into());
}
}
fn take_pong(&mut self) -> Option<PingPayload> {
match self.received_pong.take() {
None => None,
Some(p) => {
self.expecting_pong = false;
if let Some(task) = self.blocked_ping.take() {
task.notify();
}
Some(p)
}
}
}
}
impl<T, U> PingPong<T, U>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
fn try_send_pong(&mut self) -> Poll<(), ConnectionError> {
/// Send any pending pongs.
pub fn send_pending_pong<T>(&mut self, dst: &mut Codec<T, B>) -> Poll<(), ConnectionError>
where T: AsyncWrite,
{
if let Some(pong) = self.sending_pong.take() {
if let AsyncSink::NotReady(pong) = self.inner.start_send(pong)? {
if let AsyncSink::NotReady(pong) = dst.start_send(pong)? {
// If the pong can't be sent, save it.
self.sending_pong = Some(pong);
return Ok(Async::NotReady);
}
}
Ok(Async::Ready(()))
}
}
/// > Receivers of a PING frame that does not include an ACK flag MUST send
/// > a PING frame with the ACK flag set in response, with an identical
/// > payload. PING responses SHOULD be given higher priority than any
/// > other frame.
impl<T, U> Stream for PingPong<T, U>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
type Item = Frame;
type Error = ConnectionError;
/// Reads the next frame from the underlying socket, eliding ping requests.
///
/// If a PING is received without the ACK flag, the frame is sent to the remote with
/// its ACK flag set.
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
loop {
// Don't read any frames until `inner` accepts any pending pong.
try_ready!(self.try_send_pong());
match self.inner.poll()? {
Async::Ready(Some(Frame::Ping(ping))) => {
if ping.is_ack() {
// Save acknowledgements to be returned from take_pong().
self.received_pong = Some(ping.into_payload());
if let Some(task) = self.blocked_ping.take() {
task.notify();
}
} else {
// Save the ping's payload to be sent as an acknowledgement.
let pong = Ping::pong(ping.into_payload());
self.sending_pong = Some(pong.into());
}
}
// Everything other than ping gets passed through.
f => return Ok(f),
}
}
}
}
impl<T, U> Sink for PingPong<T, U>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
type SinkItem = Frame<U>;
type SinkError = ConnectionError;
fn start_send(&mut self, item: Self::SinkItem)
-> StartSend<Self::SinkItem, Self::SinkError>
{
// Pings _SHOULD_ have priority over other messages, so attempt to send pending
// ping frames before attempting to send `item`.
if self.try_send_pong()?.is_not_ready() {
return Ok(AsyncSink::NotReady(item));
}
self.inner.start_send(item)
}
/// Polls the underlying sink and tries to flush pending pong frames.
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.try_send_pong());
self.inner.poll_complete()
}
}
impl<T, U> ReadySink for PingPong<T, U>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.try_send_pong());
self.inner.poll_ready()
}
}
impl<T: ApplySettings, U> ApplySettings for PingPong<T, U> {
fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
#[cfg(test)]
mod test {
use super::*;
use proto::ControlPing;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
#[test]
fn responds_to_ping_with_pong() {
let trans = Transport::default();
let mut ping_pong = PingPong::new(trans.clone());
{
let mut trans = trans.0.borrow_mut();
let ping = Ping::ping(*b"buoyant_");
trans.from_socket.push_back(ping.into());
}
match ping_pong.poll() {
Ok(Async::NotReady) => {} // cool
rsp => panic!("unexpected poll result: {:?}", rsp),
}
{
let mut trans = trans.0.borrow_mut();
assert_eq!(trans.to_socket.len(), 1);
match trans.to_socket.pop_front().unwrap() {
Frame::Ping(pong) => {
assert!(pong.is_ack());
assert_eq!(&pong.into_payload(), b"buoyant_");
}
f => panic!("unexpected frame: {:?}", f),
}
}
}
#[test]
fn responds_to_ping_even_when_blocked() {
let trans = Transport::default();
let mut ping_pong = PingPong::new(trans.clone());
// Configure the transport so that writes can't proceed.
{
let mut trans = trans.0.borrow_mut();
trans.start_send_blocked = true;
}
// The transport receives a ping but can't send it immediately.
{
let mut trans = trans.0.borrow_mut();
let ping = Ping::ping(*b"buoyant?");
trans.from_socket.push_back(ping.into());
}
assert!(ping_pong.poll().unwrap().is_not_ready());
// The transport receives another ping but can't send it immediately.
{
let mut trans = trans.0.borrow_mut();
let ping = Ping::ping(*b"buoyant!");
trans.from_socket.push_back(ping.into());
}
assert!(ping_pong.poll().unwrap().is_not_ready());
// At this point, ping_pong is holding two pongs that it cannot send.
{
let mut trans = trans.0.borrow_mut();
assert!(trans.to_socket.is_empty());
trans.start_send_blocked = false;
}
// Now that start_send_blocked is disabled, the next poll will successfully send
// the pongs on the transport.
assert!(ping_pong.poll().unwrap().is_not_ready());
{
let mut trans = trans.0.borrow_mut();
assert_eq!(trans.to_socket.len(), 2);
match trans.to_socket.pop_front().unwrap() {
Frame::Ping(pong) => {
assert!(pong.is_ack());
assert_eq!(&pong.into_payload(), b"buoyant?");
}
f => panic!("unexpected frame: {:?}", f),
}
match trans.to_socket.pop_front().unwrap() {
Frame::Ping(pong) => {
assert!(pong.is_ack());
assert_eq!(&pong.into_payload(), b"buoyant!");
}
f => panic!("unexpected frame: {:?}", f),
}
}
}
#[test]
fn pong_passes_through() {
let trans = Transport::default();
let mut ping_pong = PingPong::new(trans.clone());
{
let mut trans = trans.0.borrow_mut();
let pong = Ping::pong(*b"buoyant!");
trans.from_socket.push_back(pong.into());
}
assert!(ping_pong.poll().unwrap().is_not_ready());
match ping_pong.take_pong() {
Some(pong) => assert_eq!(&pong, b"buoyant!"),
None => panic!("no pong received"),
}
{
let trans = trans.0.borrow();
assert_eq!(trans.to_socket.len(), 0);
}
}
/// A stubbed transport for tests.a
///
/// We probably want/have something generic for this?
#[derive(Clone, Default)]
struct Transport(Rc<RefCell<Inner>>);
#[derive(Default)]
struct Inner {
from_socket: VecDeque<Frame>,
to_socket: VecDeque<Frame>,
read_blocked: bool,
start_send_blocked: bool,
closing: bool,
}
impl Stream for Transport {
type Item = Frame;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
let mut trans = self.0.borrow_mut();
if trans.read_blocked || (!trans.closing && trans.from_socket.is_empty()) {
Ok(Async::NotReady)
} else {
Ok(trans.from_socket.pop_front().into())
}
}
}
impl Sink for Transport {
type SinkItem = Frame;
type SinkError = ConnectionError;
fn start_send(&mut self, item: Frame) -> StartSend<Frame, ConnectionError> {
let mut trans = self.0.borrow_mut();
if trans.closing || trans.start_send_blocked {
Ok(AsyncSink::NotReady(item))
} else {
trans.to_socket.push_back(item);
Ok(AsyncSink::Ready)
}
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
let trans = self.0.borrow();
if !trans.to_socket.is_empty() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
}
fn close(&mut self) -> Poll<(), ConnectionError> {
{
let mut trans = self.0.borrow_mut();
trans.closing = true;
}
self.poll_complete()
}
}
impl ReadySink for Transport {
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
let trans = self.0.borrow();
if trans.closing || trans.start_send_blocked {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
}
}
}

View File

@@ -1,5 +0,0 @@
use futures::{Sink, Poll};
pub trait ReadySink: Sink {
fn poll_ready(&mut self) -> Poll<(), Self::SinkError>;
}

View File

@@ -1,218 +1,47 @@
use {StreamId, ConnectionError};
use frame::{self, Frame, SettingSet};
use {frame, ConnectionError};
use proto::*;
use tokio_io::AsyncRead;
use bytes::BufMut;
use std::io;
#[derive(Debug)]
pub struct Settings<T> {
// Upstream transport
inner: T,
remote_push_enabled: Option<bool>,
remote_max_concurrent_streams: Option<u32>,
remote_initial_window_size: WindowSize,
// Number of acks remaining to send to the peer
remaining_acks: usize,
// Holds a new set of local values to be applied.
pending_local: Option<SettingSet>,
// True when we have received a settings frame from the remote.
received_remote: bool,
pub struct Settings {
pending_ack: bool,
}
impl<T, U> Settings<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
pub fn new(inner: T, local: SettingSet) -> Settings<T> {
impl Settings {
pub fn new() -> Self {
Settings {
inner: inner,
pending_local: Some(local),
remote_push_enabled: None,
remote_max_concurrent_streams: None,
remote_initial_window_size: 65_535,
remaining_acks: 0,
received_remote: false,
pending_ack: false,
}
}
/// Swap the inner transport while maintaining the current state.
pub fn swap_inner<T2, F: FnOnce(T) -> T2>(self, f: F) -> Settings<T2> {
let inner = f(self.inner);
Settings {
inner: inner,
remote_push_enabled: self.remote_push_enabled,
remote_max_concurrent_streams: self.remote_max_concurrent_streams,
remote_initial_window_size: self.remote_initial_window_size,
remaining_acks: self.remaining_acks,
pending_local: self.pending_local,
received_remote: self.received_remote,
}
}
fn try_send_pending(&mut self) -> Poll<(), ConnectionError> {
trace!("try_send_pending; dirty={} acks={}", self.pending_local.is_some(), self.remaining_acks);
if let Some(local) = self.pending_local.take() {
try_ready!(self.try_send_local(local));
}
while self.remaining_acks > 0 {
let frame = frame::Settings::ack().into();
try_ready!(self.try_send(frame));
self.remaining_acks -= 1;
}
Ok(Async::Ready(()))
}
fn try_send_local(&mut self, local: SettingSet) -> Poll<(), ConnectionError> {
let frame = frame::Settings::new(local.clone()).into();
if self.try_send(frame)?.is_not_ready() {
self.pending_local = Some(local);
Ok(Async::NotReady)
pub fn recv_settings(&mut self, frame: frame::Settings) {
if frame.is_ack() {
debug!("received remote settings ack");
// TODO: handle acks
} else {
Ok(Async::Ready(()))
assert!(!self.pending_ack);
self.pending_ack = true;
}
}
fn try_send(&mut self, frame: frame::Settings) -> Poll<(), ConnectionError> {
trace!("try_send");
if self.inner.start_send(frame.into())?.is_ready() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
pub fn send_pending_ack<T, B>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
B: Buf,
{
if self.pending_ack {
let frame = frame::Settings::ack();
impl<T, U> ControlSettings for Settings<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
fn update_local_settings(&mut self, local: SettingSet) -> Result<(), ConnectionError> {
self.try_send_local(local)?;
Ok(())
}
fn remote_initial_window_size(&self) -> u32 {
self.remote_initial_window_size
}
fn remote_max_concurrent_streams(&self) -> Option<u32> {
self.remote_max_concurrent_streams
}
fn remote_push_enabled(&self) -> Option<bool> {
self.remote_push_enabled
}
}
impl<T, U> Stream for Settings<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ApplySettings,
{
type Item = Frame;
type Error = ConnectionError;
fn poll(&mut self) -> Poll<Option<Frame>, ConnectionError> {
loop {
match try_ready!(self.inner.poll()) {
Some(Frame::Settings(v)) => {
if v.is_ack() {
debug!("received remote settings ack");
// TODO: Handle acks
} else {
// Apply the settings before saving them and sending
// acknowledgements.
let settings = v.into_set();
self.inner.apply_remote_settings(&settings)?;
if let Some(sz) = settings.initial_window_size() {
self.remote_initial_window_size = sz;
}
if let Some(max) = settings.max_concurrent_streams() {
self.remote_max_concurrent_streams = Some(max);
}
if let Some(ok) = settings.enable_push() {
self.remote_push_enabled = Some(ok);
}
self.remaining_acks += 1;
let _ = try!(self.try_send_pending());
}
match dst.start_send(frame.into())? {
AsyncSink::Ready => {
self.pending_ack = false;
return Ok(().into());
}
AsyncSink::NotReady(_) => {
return Ok(Async::NotReady);
}
v => return Ok(Async::Ready(v)),
}
}
Ok(().into())
}
}
impl<T, U> Sink for Settings<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
type SinkItem = Frame<U>;
type SinkError = ConnectionError;
fn start_send(&mut self, item: Self::SinkItem)
-> StartSend<Self::SinkItem, Self::SinkError>
{
// Settings frames take priority, so `item` cannot be sent if there are
// any pending acks OR the local settings have been changed w/o sending
// an associated frame.
if !try!(self.try_send_pending()).is_ready() {
return Ok(AsyncSink::NotReady(item));
}
self.inner.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
trace!("poll_complete");
try_ready!(self.try_send_pending());
self.inner.poll_complete()
}
fn close(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.try_send_pending());
self.inner.close()
}
}
impl<T, U> ReadySink for Settings<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
trace!("poll_ready");
try_ready!(self.try_send_pending());
self.inner.poll_ready()
}
}
impl<T: io::Read> io::Read for Settings<T> {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
self.inner.read(dst)
}
}
impl<T: AsyncRead> AsyncRead for Settings<T> {
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error>
where Self: Sized,
{
self.inner.read_buf(buf)
}
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
}
proxy_control_flow!(Settings);
proxy_control_ping!(Settings);

338
src/proto/state.rs Normal file
View File

@@ -0,0 +1,338 @@
use ConnectionError;
use error::Reason;
use error::Reason::*;
use error::User::*;
use proto::*;
/// Represents the state of an H2 stream
///
/// ```not_rust
/// +--------+
/// send PP | | recv PP
/// ,--------| idle |--------.
/// / | | \
/// v +--------+ v
/// +----------+ | +----------+
/// | | | send H / | |
/// ,------| reserved | | recv H | reserved |------.
/// | | (local) | | | (remote) | |
/// | +----------+ v +----------+ |
/// | | +--------+ | |
/// | | recv ES | | send ES | |
/// | send H | ,-------| open |-------. | recv H |
/// | | / | | \ | |
/// | v v +--------+ v v |
/// | +----------+ | +----------+ |
/// | | half | | | half | |
/// | | closed | | send R / | closed | |
/// | | (remote) | | recv R | (local) | |
/// | +----------+ | +----------+ |
/// | | | | |
/// | | send ES / | recv ES / | |
/// | | send R / v send R / | |
/// | | recv R +--------+ recv R | |
/// | send R / `----------->| |<-----------' send R / |
/// | recv R | closed | recv R |
/// `----------------------->| |<----------------------'
/// +--------+
///
/// send: endpoint sends this frame
/// recv: endpoint receives this frame
///
/// H: HEADERS frame (with implied CONTINUATIONs)
/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
/// ES: END_STREAM flag
/// R: RST_STREAM frame
/// ```
#[derive(Debug, Copy, Clone)]
pub enum Stream {
Idle,
// TODO: these states shouldn't count against concurrency limits:
//ReservedLocal,
//ReservedRemote,
Open {
local: Peer,
remote: Peer,
},
HalfClosedLocal(Peer), // TODO: explicitly name this value
HalfClosedRemote(Peer),
// When reset, a reason is provided
Closed(Option<Reason>),
}
#[derive(Debug, Copy, Clone)]
pub enum Peer {
AwaitingHeaders,
/// Contains a FlowControl representing the _receiver_ of this this data stream.
Streaming(FlowControl),
}
#[derive(Copy, Clone, Debug)]
pub struct FlowControl {
/// Amount that may be claimed.
window_size: WindowSize,
/// Amount to be removed by future increments.
underflow: WindowSize,
/// The amount that has been incremented but not yet advertised (to the application or
/// the remote).
next_window_update: WindowSize,
}
impl Stream {
/// Opens the send-half of a stream if it is not already open.
pub fn send_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> {
use self::Stream::*;
use self::Peer::*;
let local = Peer::streaming(sz);
*self = match *self {
Idle => {
if eos {
HalfClosedLocal(AwaitingHeaders)
} else {
Open {
local,
remote: AwaitingHeaders,
}
}
}
Open { local: AwaitingHeaders, remote } => {
if eos {
HalfClosedLocal(remote)
} else {
Open {
local,
remote,
}
}
}
HalfClosedRemote(AwaitingHeaders) => {
if eos {
Closed(None)
} else {
HalfClosedRemote(local)
}
}
_ => {
// All other transitions result in a protocol error
return Err(UnexpectedFrameType.into());
}
};
return Ok(());
}
/// Open the receive have of the stream, this action is taken when a HEADERS
/// frame is received.
pub fn recv_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> {
use self::Stream::*;
use self::Peer::*;
let remote = Peer::streaming(sz);
*self = match *self {
Idle => {
if eos {
HalfClosedRemote(AwaitingHeaders)
} else {
Open {
local: AwaitingHeaders,
remote,
}
}
}
Open { local, remote: AwaitingHeaders } => {
if eos {
HalfClosedRemote(local)
} else {
Open {
local,
remote,
}
}
}
HalfClosedLocal(AwaitingHeaders) => {
if eos {
Closed(None)
} else {
HalfClosedLocal(remote)
}
}
_ => {
// All other transitions result in a protocol error
return Err(ProtocolError.into());
}
};
return Ok(());
}
/// Indicates that the remote side will not send more data to the local.
pub fn recv_close(&mut self) -> Result<(), ConnectionError> {
use self::Stream::*;
match *self {
Open { local, .. } => {
// The remote side will continue to receive data.
trace!("recv_close: Open => HalfClosedRemote({:?})", local);
*self = HalfClosedRemote(local);
Ok(())
}
HalfClosedLocal(..) => {
trace!("recv_close: HalfClosedLocal => Closed");
*self = Closed(None);
Ok(())
}
_ => Err(ProtocolError.into()),
}
}
/// Indicates that the local side will not send more data to the local.
pub fn send_close(&mut self) -> Result<(), ConnectionError> {
use self::Stream::*;
match *self {
Open { remote, .. } => {
// The remote side will continue to receive data.
trace!("send_close: Open => HalfClosedLocal({:?})", remote);
*self = HalfClosedLocal(remote);
Ok(())
}
HalfClosedRemote(..) => {
trace!("send_close: HalfClosedRemote => Closed");
*self = Closed(None);
Ok(())
}
_ => Err(ProtocolError.into()),
}
}
pub fn is_closed(&self) -> bool {
use self::Stream::*;
match *self {
Closed(_) => true,
_ => false,
}
}
pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> {
use self::Stream::*;
match *self {
Open { ref mut remote, .. } |
HalfClosedLocal(ref mut remote) => remote.flow_control(),
_ => None,
}
}
pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> {
use self::Stream::*;
match *self {
Open { ref mut local, .. } |
HalfClosedRemote(ref mut local) => local.flow_control(),
_ => None,
}
}
}
impl Default for Stream {
fn default() -> Stream {
Stream::Idle
}
}
impl Default for Peer {
fn default() -> Self {
Peer::AwaitingHeaders
}
}
impl Peer {
fn streaming(sz: WindowSize) -> Peer {
Peer::Streaming(FlowControl::new(sz))
}
fn flow_control(&mut self) -> Option<&mut FlowControl> {
use self::Peer::*;
match *self {
Streaming(ref mut flow) => Some(flow),
_ => None,
}
}
}
impl FlowControl {
pub fn new(window_size: WindowSize) -> FlowControl {
FlowControl {
window_size,
underflow: 0,
next_window_update: 0,
}
}
/// Returns true iff `claim_window(sz)` would return succeed.
pub fn ensure_window<T>(&mut self, sz: WindowSize, err: T) -> Result<(), ConnectionError>
where T: Into<ConnectionError>,
{
if sz <= self.window_size {
Ok(())
} else {
Err(err.into())
}
}
/// Claims the provided amount from the window, if there is enough space.
///
/// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than
/// have been previously claimed.
pub fn claim_window<T>(&mut self, sz: WindowSize, err: T)
-> Result<(), ConnectionError>
where T: Into<ConnectionError>,
{
self.ensure_window(sz, err)?;
self.window_size -= sz;
Ok(())
}
/// Increase the _unadvertised_ window capacity.
pub fn expand_window(&mut self, sz: WindowSize) {
if sz <= self.underflow {
self.underflow -= sz;
return;
}
let added = sz - self.underflow;
self.next_window_update += added;
self.underflow = 0;
}
/// Obtains the unadvertised window update.
///
/// This does not apply the window update to `self`.
pub fn peek_window_update(&mut self) -> Option<WindowSize> {
if self.next_window_update == 0 {
None
} else {
Some(self.next_window_update)
}
}
/// Obtains and applies an unadvertised window update.
pub fn apply_window_update(&mut self) -> Option<WindowSize> {
if self.next_window_update == 0 {
return None;
}
let incr = self.next_window_update;
self.next_window_update = 0;
self.window_size += incr;
Some(incr)
}
}

View File

@@ -1,58 +0,0 @@
use ConnectionError;
use frame::{self, Frame};
use proto::*;
use proto::ready::ReadySink;
/// Tracks END_STREAM frames received from the remote peer.
#[derive(Debug)]
pub struct StreamRecvClose<T> {
inner: T,
}
impl<T, U> StreamRecvClose<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
pub fn new(inner: T) -> StreamRecvClose<T> {
StreamRecvClose { inner }
}
}
/// Tracks END_STREAM frames received from the remote peer.
impl<T> Stream for StreamRecvClose<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: ControlStreams,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
let frame = match try_ready!(self.inner.poll()) {
None => return Ok(Async::Ready(None)),
Some(f) => f,
};
let id = frame.stream_id();
if !id.is_zero() {
if frame.is_end_stream() {
trace!("poll: id={:?} eos", id);
if let &Frame::Reset(ref rst) = &frame {
self.streams_mut().reset_stream(id, rst.reason());
} else {
debug_assert!(self.streams().is_active(id));
self.streams_mut().close_recv_half(id)?;
}
}
}
Ok(Async::Ready(Some(frame)))
}
}
proxy_apply_settings!(StreamRecvClose);
proxy_control_flow!(StreamRecvClose);
proxy_control_streams!(StreamRecvClose);
proxy_control_ping!(StreamRecvClose);
proxy_sink!(StreamRecvClose);
proxy_ready_sink!(StreamRecvClose);

View File

@@ -1,217 +0,0 @@
use ConnectionError;
use error::Reason::{ProtocolError, RefusedStream};
use frame::{Frame, StreamId};
use proto::*;
/// Ensures that frames are received on open streams in the appropriate state.
#[derive(Debug)]
pub struct StreamRecvOpen<T> {
inner: T,
max_concurrency: Option<u32>,
initial_window_size: WindowSize,
pending_refuse: Option<StreamId>,
}
impl<T> StreamRecvOpen<T> {
pub fn new<U>(initial_window_size: WindowSize,
max_concurrency: Option<u32>,
inner: T)
-> StreamRecvOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
StreamRecvOpen {
inner,
max_concurrency,
initial_window_size,
pending_refuse: None,
}
}
}
impl<T, U> StreamRecvOpen<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
fn send_refuse(&mut self, id: StreamId) -> Poll<(), ConnectionError> {
debug_assert!(self.pending_refuse.is_none());
let f = frame::Reset::new(id, RefusedStream);
match self.inner.start_send(f.into())? {
AsyncSink::Ready => {
self.streams_mut().reset_stream(id, RefusedStream);
Ok(Async::Ready(()))
}
AsyncSink::NotReady(_) => {
self.pending_refuse = Some(id);
Ok(Async::NotReady)
}
}
}
fn send_pending_refuse(&mut self) -> Poll<(), ConnectionError> {
if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refuse(id));
}
Ok(Async::Ready(()))
}
}
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS` from the local peer.
impl<T> ApplySettings for StreamRecvOpen<T>
where T: ApplySettings
{
fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.max_concurrency = set.max_concurrent_streams();
if let Some(sz) = set.initial_window_size() {
self.initial_window_size = sz;
}
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_remote_settings(set)
}
}
/// Helper.
impl<T: ControlStreams> StreamRecvOpen<T> {
fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> {
// Ensure that the stream hasn't been closed otherwise.
match self.streams().get_reset(id) {
Some(reason) => Err(reason.into()),
None => Ok(()),
}
}
}
/// Ensures that frames are received on open streams in the appropriate state.
impl<T, U> Stream for StreamRecvOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
type Item = T::Item;
type Error = T::Error;
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error> {
// Since there's only one slot for pending refused streams, it must be cleared
// before polling a frame from the transport.
try_ready!(self.send_pending_refuse());
trace!("poll");
loop {
let frame = match try_ready!(self.inner.poll()) {
None => return Ok(Async::Ready(None)),
Some(f) => f,
};
let id = frame.stream_id();
trace!("poll: id={:?}", id);
if id.is_zero() {
if !frame.is_connection_frame() {
return Err(ProtocolError.into())
}
// Nothing to do on connection frames.
return Ok(Async::Ready(Some(frame)));
}
match &frame {
&Frame::Reset(..) => {}
&Frame::Headers(..) => {
self.check_not_reset(id)?;
if self.streams().is_valid_remote_stream_id(id) {
if self.streams().is_remote_active(id) {
// Can't send a a HEADERS frame on a remote stream that's
// active, because we've already received headers. This will
// have to change to support PUSH_PROMISE.
return Err(ProtocolError.into());
}
if !self.streams().can_remote_open() {
return Err(ProtocolError.into());
}
if let Some(max) = self.max_concurrency {
if (max as usize) < self.streams().remote_active_len() {
debug!("refusing stream that would exceed max_concurrency={}", max);
self.send_refuse(id)?;
// There's no point in returning an error to the application.
continue;
}
}
self.inner.streams_mut().remote_open(id, self.initial_window_size)?;
} else {
// On remote streams,
self.inner.streams_mut().local_open_recv_half(id, self.initial_window_size)?;
}
}
// All other stream frames are sent only when
_ => {
self.check_not_reset(id)?;
if !self.streams().is_recv_open(id) {
return Err(ProtocolError.into());
}
}
}
// If the frame ends the stream, it will be handled in
// StreamRecvClose.
return Ok(Async::Ready(Some(frame)));
}
}
}
/// Sends pending resets before operating on the underlying transport.
impl<T, U> Sink for StreamRecvOpen<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, frame: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
// The local must complete refusing the remote stream before sending any other
// frames.
if self.send_pending_refuse()?.is_not_ready() {
return Ok(AsyncSink::NotReady(frame));
}
self.inner.start_send(frame)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
try_ready!(self.send_pending_refuse());
self.inner.poll_complete()
}
}
/// Sends pending resets before checking the underlying transport's readiness.
impl<T, U> ReadySink for StreamRecvOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink,
T: ControlStreams,
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
if let Some(id) = self.pending_refuse.take() {
try_ready!(self.send_refuse(id));
}
self.inner.poll_ready()
}
}
proxy_control_flow!(StreamRecvOpen);
proxy_control_streams!(StreamRecvOpen);
proxy_control_ping!(StreamRecvOpen);

View File

@@ -1,57 +0,0 @@
use ConnectionError;
use frame::{self, Frame};
use proto::*;
/// Tracks END_STREAM frames sent from the local peer.
#[derive(Debug)]
pub struct StreamSendClose<T> {
inner: T,
}
impl<T, U> StreamSendClose<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
pub fn new(inner: T) -> StreamSendClose<T> {
StreamSendClose { inner }
}
}
/// Tracks END_STREAM frames sent from the local peer.
impl<T, U> Sink for StreamSendClose<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
type SinkItem = Frame<U>;
type SinkError = ConnectionError;
fn start_send(&mut self, frame: Self::SinkItem) -> StartSend<Frame<U>, ConnectionError> {
let id = frame.stream_id();
let eos = frame.is_end_stream();
trace!("start_send: id={:?} eos={}", id, eos);
if !id.is_zero() {
if eos {
if let &Frame::Reset(ref rst) = &frame {
self.streams_mut().reset_stream(id, rst.reason());
} else {
debug_assert!(self.streams().is_active(id));
self.streams_mut().close_send_half(id)?;
}
}
}
self.inner.start_send(frame)
}
fn poll_complete(&mut self) -> Poll<(), ConnectionError> {
self.inner.poll_complete()
}
}
proxy_apply_settings!(StreamSendClose);
proxy_control_flow!(StreamSendClose);
proxy_control_streams!(StreamSendClose);
proxy_control_ping!(StreamSendClose);
proxy_stream!(StreamSendClose);
proxy_ready_sink!(StreamSendClose; ControlStreams);

View File

@@ -1,137 +0,0 @@
use ConnectionError;
use error::User::{InactiveStreamId, InvalidStreamId, StreamReset, Rejected, UnexpectedFrameType};
use frame::{Frame, SettingSet};
use proto::*;
/// Ensures that frames are sent on open streams in the appropriate state.
#[derive(Debug)]
pub struct StreamSendOpen<T> {
inner: T,
max_concurrency: Option<u32>,
initial_window_size: WindowSize,
}
impl<T, U> StreamSendOpen<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
pub fn new(initial_window_size: WindowSize,
max_concurrency: Option<u32>,
inner: T)
-> StreamSendOpen<T>
{
StreamSendOpen {
inner,
max_concurrency,
initial_window_size,
}
}
}
/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS` from the remote peer.
impl<T: ApplySettings> ApplySettings for StreamSendOpen<T> {
fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.inner.apply_local_settings(set)
}
fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> {
self.max_concurrency = set.max_concurrent_streams();
if let Some(sz) = set.initial_window_size() {
self.initial_window_size = sz;
}
self.inner.apply_remote_settings(set)
}
}
/// Helper.
impl<T: ControlStreams> StreamSendOpen<T> {
fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> {
// Ensure that the stream hasn't been closed otherwise.
match self.streams().get_reset(id) {
Some(reason) => Err(StreamReset(reason).into()),
None => Ok(()),
}
}
}
/// Ensures that frames are sent on open streams in the appropriate state.
impl<T, U> Sink for StreamSendOpen<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams,
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;
fn start_send(&mut self, frame: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
let id = frame.stream_id();
trace!("start_send: id={:?}", id);
// Forward connection frames immediately.
if id.is_zero() {
if !frame.is_connection_frame() {
return Err(InvalidStreamId.into());
}
return self.inner.start_send(frame);
}
match &frame {
&Frame::Reset(..) => {}
&Frame::Headers(..) => {
self.check_not_reset(id)?;
if self.streams().is_valid_local_stream_id(id) {
if self.streams().is_local_active(id) {
// Can't send a a HEADERS frame on a local stream that's active,
// because we've already sent headers. This will have to change
// to support PUSH_PROMISE.
return Err(UnexpectedFrameType.into());
}
if !self.streams().can_local_open() {
// A server tried to start a stream with a HEADERS frame.
return Err(UnexpectedFrameType.into());
}
if let Some(max) = self.max_concurrency {
// Don't allow this stream to overflow the remote's max stream
// concurrency.
if (max as usize) < self.streams().local_active_len() {
return Err(Rejected.into());
}
}
self.inner.streams_mut().local_open(id, self.initial_window_size)?;
} else {
// On remote streams,
if self.inner.streams_mut().remote_open_send_half(id, self.initial_window_size).is_err() {
return Err(InvalidStreamId.into());
}
}
}
// This only handles other stream frames (data, window update, ...). Ensure
// the stream is open (i.e. has already sent headers).
_ => {
self.check_not_reset(id)?;
if !self.streams().is_send_open(id) {
return Err(InactiveStreamId.into());
}
}
}
self.inner.start_send(frame)
}
fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
}
}
proxy_control_flow!(StreamSendOpen);
proxy_control_streams!(StreamSendOpen);
proxy_control_ping!(StreamSendOpen);
proxy_stream!(StreamSendOpen);
proxy_ready_sink!(StreamSendOpen; ControlStreams);

View File

@@ -1,289 +0,0 @@
use ConnectionError;
use error::Reason::*;
use proto::{FlowControlState, WindowSize};
/// Represents the state of an H2 stream
///
/// ```not_rust
/// +--------+
/// send PP | | recv PP
/// ,--------| idle |--------.
/// / | | \
/// v +--------+ v
/// +----------+ | +----------+
/// | | | send H / | |
/// ,------| reserved | | recv H | reserved |------.
/// | | (local) | | | (remote) | |
/// | +----------+ v +----------+ |
/// | | +--------+ | |
/// | | recv ES | | send ES | |
/// | send H | ,-------| open |-------. | recv H |
/// | | / | | \ | |
/// | v v +--------+ v v |
/// | +----------+ | +----------+ |
/// | | half | | | half | |
/// | | closed | | send R / | closed | |
/// | | (remote) | | recv R | (local) | |
/// | +----------+ | +----------+ |
/// | | | | |
/// | | send ES / | recv ES / | |
/// | | send R / v send R / | |
/// | | recv R +--------+ recv R | |
/// | send R / `----------->| |<-----------' send R / |
/// | recv R | closed | recv R |
/// `----------------------->| |<----------------------'
/// +--------+
///
/// send: endpoint sends this frame
/// recv: endpoint receives this frame
///
/// H: HEADERS frame (with implied CONTINUATIONs)
/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
/// ES: END_STREAM flag
/// R: RST_STREAM frame
/// ```
#[derive(Debug, Copy, Clone)]
pub enum StreamState {
Idle,
// TODO: these states shouldn't count against concurrency limits:
//ReservedLocal,
//ReservedRemote,
Open {
local: PeerState,
remote: PeerState,
},
HalfClosedLocal(PeerState),
HalfClosedRemote(PeerState),
Closed,
}
impl StreamState {
pub fn new_open_sending(sz: WindowSize) -> StreamState {
StreamState::Open {
local: PeerState::AwaitingHeaders,
remote: PeerState::streaming(sz),
}
}
pub fn new_open_recving(sz: WindowSize) -> StreamState {
StreamState::Open {
local: PeerState::streaming(sz),
remote: PeerState::AwaitingHeaders,
}
}
/// Opens the send-half of a stream if it is not already open.
///
/// Returns true iff the send half was not previously open.
pub fn open_send_half(&mut self, sz: WindowSize) -> Result<bool, ConnectionError> {
use self::StreamState::*;
use self::PeerState::*;
// Try to avoid copying `self` by first checking to see whether the stream needs
// to be updated.
match self {
&mut Idle |
&mut Closed |
&mut HalfClosedRemote(..) => {
return Err(ProtocolError.into());
}
&mut Open { remote: Streaming(..), .. } |
&mut HalfClosedLocal(Streaming(..)) => {
return Ok(false);
}
&mut Open { remote: AwaitingHeaders, .. } |
&mut HalfClosedLocal(AwaitingHeaders) => {}
}
match *self {
Open { local, remote: AwaitingHeaders } => {
*self = Open {
local,
remote: PeerState::streaming(sz),
};
}
HalfClosedLocal(AwaitingHeaders) => {
*self = HalfClosedLocal(PeerState::streaming(sz));
}
_ => unreachable!()
}
Ok(true)
}
pub fn open_recv_half(&mut self, sz: WindowSize) -> Result<bool, ConnectionError> {
use self::StreamState::*;
use self::PeerState::*;
// Try to avoid copying `self` by first checking to see whether the stream needs
// to be updated.
match self {
&mut Idle |
&mut Closed |
&mut HalfClosedLocal(..) => {
return Err(ProtocolError.into());
}
&mut Open { local: Streaming(..), .. } |
&mut HalfClosedRemote(Streaming(..)) => {
return Ok(false);
}
&mut Open { local: AwaitingHeaders, .. } |
&mut HalfClosedRemote(AwaitingHeaders) => {}
}
match *self {
Open { remote, local: AwaitingHeaders } => {
*self = Open {
local: PeerState::streaming(sz),
remote,
};
}
HalfClosedRemote(AwaitingHeaders) => {
*self = HalfClosedRemote(PeerState::streaming(sz));
}
_ => unreachable!()
}
Ok(true)
}
pub fn is_send_open(&self) -> bool {
use self::StreamState::*;
match self {
&Idle | &Closed | &HalfClosedRemote(..) => false,
&Open { ref remote, .. } |
&HalfClosedLocal(ref remote) => remote.is_streaming(),
}
}
pub fn is_recv_open(&self) -> bool {
use self::StreamState::*;
match self {
&Idle | &Closed | &HalfClosedLocal(..) => false,
&Open { ref local, .. } |
&HalfClosedRemote(ref local) => {
local.is_streaming()
}
}
}
/// Indicates that the local side will not send more data to the remote.
///
/// Returns true iff the stream is fully closed.
pub fn close_send_half(&mut self) -> Result<bool, ConnectionError> {
use self::StreamState::*;
match *self {
Open { local, .. } => {
// The local side will continue to receive data.
trace!("close_send_half: Open => HalfClosedRemote({:?})", local);
*self = HalfClosedRemote(local);
Ok(false)
}
HalfClosedLocal(..) => {
trace!("close_send_half: HalfClosedLocal => Closed");
*self = Closed;
Ok(true)
}
Idle | Closed | HalfClosedRemote(..) => {
Err(ProtocolError.into())
}
}
}
/// Indicates that the remote side will not send more data to the local.
///
/// Returns true iff the stream is fully closed.
pub fn close_recv_half(&mut self) -> Result<bool, ConnectionError> {
use self::StreamState::*;
match *self {
Open { remote, .. } => {
// The remote side will continue to receive data.
trace!("close_recv_half: Open => HalfClosedLocal({:?})", remote);
*self = HalfClosedLocal(remote);
Ok(false)
}
HalfClosedRemote(..) => {
trace!("close_recv_half: HalfClosedRemoteOpen => Closed");
*self = Closed;
Ok(true)
}
Idle | Closed | HalfClosedLocal(..) => {
Err(ProtocolError.into())
}
}
}
pub fn recv_flow_controller(&mut self) -> Option<&mut FlowControlState> {
use self::StreamState::*;
match self {
&mut Open { ref mut local, .. } |
&mut HalfClosedRemote(ref mut local) => local.flow_controller(),
_ => None,
}
}
pub fn send_flow_controller(&mut self) -> Option<&mut FlowControlState> {
use self::StreamState::*;
match self {
&mut Open { ref mut remote, .. } |
&mut HalfClosedLocal(ref mut remote) => remote.flow_controller(),
_ => None,
}
}
}
impl Default for StreamState {
fn default() -> StreamState {
StreamState::Idle
}
}
#[derive(Debug, Copy, Clone)]
pub enum PeerState {
AwaitingHeaders,
/// Contains a FlowControlState representing the _receiver_ of this this data stream.
Streaming(FlowControlState),
}
impl Default for PeerState {
fn default() -> Self {
PeerState::AwaitingHeaders
}
}
impl PeerState {
fn streaming(sz: WindowSize) -> PeerState {
PeerState::Streaming(FlowControlState::with_initial_size(sz))
}
#[inline]
fn is_streaming(&self) -> bool {
use self::PeerState::*;
match self {
&Streaming(..) => true,
_ => false,
}
}
fn flow_controller(&mut self) -> Option<&mut FlowControlState> {
use self::PeerState::*;
match self {
&mut Streaming(ref mut fc) => Some(fc),
_ => None,
}
}
}

View File

@@ -1,325 +0,0 @@
use {ConnectionError, Peer, StreamId};
use error::Reason::{NoError, ProtocolError};
use proto::*;
use proto::stream_state::StreamState;
use fnv::FnvHasher;
use ordermap::OrderMap;
use std::hash::BuildHasherDefault;
/// Holds the underlying stream state to be accessed by upper layers.
// TODO track reserved streams
// TODO constrain the size of `reset`
#[derive(Debug)]
pub struct StreamStates<T> {
inner: T,
streams: Streams,
}
#[derive(Debug)]
pub struct Streams {
/// True when in the context of an H2 server.
is_server: bool,
/// Holds active streams initiated by the local endpoint.
local_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote endpoint.
remote_active: OrderMap<StreamId, StreamState, BuildHasherDefault<FnvHasher>>,
/// Holds active streams initiated by the remote.
reset: OrderMap<StreamId, Reason, BuildHasherDefault<FnvHasher>>,
}
impl<T, U> StreamStates<T>
where T: Stream<Item = Frame, Error = ConnectionError>,
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
{
pub fn new<P: Peer>(inner: T) -> StreamStates<T> {
StreamStates {
inner,
streams: Streams {
is_server: P::is_server(),
local_active: OrderMap::default(),
remote_active: OrderMap::default(),
reset: OrderMap::default(),
},
}
}
}
impl<T> ControlStreams for StreamStates<T> {
fn streams(&self) -> &Streams {
&self.streams
}
fn streams_mut(&mut self) -> &mut Streams {
&mut self.streams
}
}
impl Streams {
pub fn is_valid_local_stream_id(&self, id: StreamId) -> bool {
if self.is_server {
id.is_server_initiated()
} else {
id.is_client_initiated()
}
}
pub fn is_valid_remote_stream_id(&self, id: StreamId) -> bool {
if self.is_server {
id.is_client_initiated()
} else {
id.is_server_initiated()
}
}
pub fn get_active(&self, id: StreamId) -> Option<&StreamState> {
assert!(!id.is_zero());
if self.is_valid_local_stream_id(id) {
self.local_active.get(&id)
} else {
self.remote_active.get(&id)
}
}
pub fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> {
assert!(!id.is_zero());
if self.is_valid_local_stream_id(id) {
self.local_active.get_mut(&id)
} else {
self.remote_active.get_mut(&id)
}
}
pub fn remove_active(&mut self, id: StreamId) {
assert!(!id.is_zero());
if self.is_valid_local_stream_id(id) {
self.local_active.remove(&id);
} else {
self.remote_active.remove(&id);
}
}
pub fn can_local_open(&self) -> bool {
!self.is_server
}
pub fn can_remote_open(&self) -> bool {
!self.can_local_open()
}
pub fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !self.is_valid_local_stream_id(id) || !self.can_local_open() {
return Err(ProtocolError.into());
}
if self.local_active.contains_key(&id) {
return Err(ProtocolError.into());
}
self.local_active.insert(id, StreamState::new_open_sending(sz));
Ok(())
}
pub fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !self.is_valid_remote_stream_id(id) || !self.can_remote_open() {
return Err(ProtocolError.into());
}
if self.remote_active.contains_key(&id) {
return Err(ProtocolError.into());
}
self.remote_active.insert(id, StreamState::new_open_recving(sz));
Ok(())
}
pub fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !self.is_valid_local_stream_id(id) {
return Err(ProtocolError.into());
}
match self.local_active.get_mut(&id) {
Some(s) => s.open_recv_half(sz).map(|_| {}),
None => Err(ProtocolError.into()),
}
}
pub fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> {
if !self.is_valid_remote_stream_id(id) {
return Err(ProtocolError.into());
}
match self.remote_active.get_mut(&id) {
Some(s) => s.open_send_half(sz).map(|_| {}),
None => Err(ProtocolError.into()),
}
}
pub fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
let fully_closed = self.get_active_mut(id)
.map(|s| s.close_send_half())
.unwrap_or_else(|| Err(ProtocolError.into()))?;
if fully_closed {
self.remove_active(id);
self.reset.insert(id, NoError);
}
Ok(())
}
pub fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> {
let fully_closed = self.get_active_mut(id)
.map(|s| s.close_recv_half())
.unwrap_or_else(|| Err(ProtocolError.into()))?;
if fully_closed {
self.remove_active(id);
self.reset.insert(id, NoError);
}
Ok(())
}
pub fn reset_stream(&mut self, id: StreamId, cause: Reason) {
self.remove_active(id);
self.reset.insert(id, cause);
}
pub fn get_reset(&self, id: StreamId) -> Option<Reason> {
self.reset.get(&id).map(|r| *r)
}
pub fn is_local_active(&self, id: StreamId) -> bool {
self.local_active.contains_key(&id)
}
pub fn is_remote_active(&self, id: StreamId) -> bool {
self.remote_active.contains_key(&id)
}
/// Returns true if the given stream was opened and is not yet closed.
pub fn is_active(&self, id: StreamId) -> bool {
if self.is_valid_local_stream_id(id) {
self.is_local_active(id)
} else {
self.is_remote_active(id)
}
}
pub fn is_send_open(&self, id: StreamId) -> bool {
match self.get_active(id) {
Some(s) => s.is_send_open(),
None => false,
}
}
pub fn is_recv_open(&self, id: StreamId) -> bool {
match self.get_active(id) {
Some(s) => s.is_recv_open(),
None => false,
}
}
pub fn local_active_len(&self) -> usize {
self.local_active.len()
}
pub fn remote_active_len(&self) -> usize {
self.remote_active.len()
}
pub fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) {
if new_sz < old_sz {
let decr = old_sz - new_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.shrink_window(decr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.shrink_window(decr);
}
}
} else {
let incr = new_sz - old_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.expand_window(incr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.recv_flow_controller() {
fc.expand_window(incr);
}
}
}
}
pub fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) {
if new_sz < old_sz {
let decr = old_sz - new_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.shrink_window(decr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.shrink_window(decr);
}
}
} else {
let incr = new_sz - old_sz;
for s in self.local_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.expand_window(incr);
}
}
for s in self.remote_active.values_mut() {
if let Some(fc) = s.send_flow_controller() {
fc.expand_window(incr);
}
}
}
}
pub fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
// TODO: Abstract getting the state for a stream ID
if id.is_zero() {
None
} else if self.is_valid_local_stream_id(id) {
self.local_active.get_mut(&id).and_then(|s| s.recv_flow_controller())
} else {
self.remote_active.get_mut(&id).and_then(|s| s.recv_flow_controller())
}
}
pub fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> {
if id.is_zero() {
None
} else if self.is_valid_local_stream_id(id) {
self.local_active.get_mut(&id).and_then(|s| s.send_flow_controller())
} else {
self.remote_active.get_mut(&id).and_then(|s| s.send_flow_controller())
}
}
}
proxy_apply_settings!(StreamStates);
proxy_control_ping!(StreamStates);
proxy_stream!(StreamStates);
proxy_sink!(StreamStates);
proxy_ready_sink!(StreamStates);

263
src/proto/streams/mod.rs Normal file
View File

@@ -0,0 +1,263 @@
mod recv;
mod send;
use self::recv::Recv;
use self::send::Send;
use {frame, Peer, StreamId, ConnectionError};
use proto::*;
use error::Reason::*;
use error::User::*;
use ordermap::{OrderMap, Entry};
// TODO: All the VecDeques should become linked lists using the state::Stream
// values.
#[derive(Debug)]
pub struct Streams<P> {
/// State related to managing the set of streams.
inner: Inner<P>,
/// Streams
streams: StreamMap,
}
type StreamMap = OrderMap<StreamId, state::Stream>;
/// Fields needed to manage state related to managing the set of streams. This
/// is mostly split out to make ownership happy.
///
/// TODO: better name
#[derive(Debug)]
struct Inner<P> {
/// Manages state transitions initiated by receiving frames
recv: Recv<P>,
/// Manages state transitions initiated by sending frames
send: Send<P>,
}
#[derive(Debug)]
pub struct Config {
/// Maximum number of remote initiated streams
pub max_remote_initiated: Option<usize>,
/// Initial window size of remote initiated streams
pub init_remote_window_sz: WindowSize,
/// Maximum number of locally initiated streams
pub max_local_initiated: Option<usize>,
/// Initial window size of locally initiated streams
pub init_local_window_sz: WindowSize,
}
impl<P: Peer> Streams<P> {
pub fn new(config: Config) -> Self {
Streams {
inner: Inner {
recv: Recv::new(&config),
send: Send::new(&config),
},
streams: OrderMap::default(),
}
}
pub fn recv_headers(&mut self, frame: frame::Headers)
-> Result<Option<frame::Headers>, ConnectionError>
{
let id = frame.stream_id();
let state = match self.streams.entry(id) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => {
// Trailers cannot open a stream. Trailers are header frames
// that do not contain pseudo headers. Requests MUST contain a
// method and responses MUST contain a status. If they do not,t
// hey are considered to be malformed.
if frame.is_trailers() {
return Err(ProtocolError.into());
}
match try!(self.inner.recv.open(id)) {
Some(state) => e.insert(state),
None => return Ok(None),
}
}
};
if frame.is_trailers() {
try!(self.inner.recv.recv_trailers(state, frame.is_end_stream()));
} else {
try!(self.inner.recv.recv_headers(state, frame.is_end_stream()));
}
if state.is_closed() {
self.inner.dec_num_streams(id);
}
Ok(Some(frame))
}
pub fn recv_data(&mut self, frame: &frame::Data)
-> Result<(), ConnectionError>
{
let id = frame.stream_id();
let state = match self.streams.get_mut(&id) {
Some(state) => state,
None => return Err(ProtocolError.into()),
};
// Ensure there's enough capacity on the connection before acting on the
// stream.
try!(self.inner.recv.recv_data(frame, state));
if state.is_closed() {
self.inner.dec_num_streams(id);
}
Ok(())
}
pub fn recv_reset(&mut self, _frame: &frame::Reset)
-> Result<(), ConnectionError>
{
unimplemented!();
}
pub fn recv_window_update(&mut self, frame: frame::WindowUpdate)
-> Result<(), ConnectionError> {
let id = frame.stream_id();
if id.is_zero() {
try!(self.inner.send.recv_connection_window_update(frame));
} else {
// The remote may send window updates for streams that the local now
// considers closed. It's ok...
if let Some(state) = self.streams.get_mut(&id) {
try!(self.inner.send.recv_stream_window_update(frame, state));
}
}
Ok(())
}
pub fn recv_push_promise(&mut self, _frame: frame::PushPromise)
-> Result<(), ConnectionError>
{
unimplemented!();
}
pub fn send_headers(&mut self, frame: &frame::Headers)
-> Result<(), ConnectionError>
{
let id = frame.stream_id();
trace!("send_headers; id={:?}", id);
let state = match self.streams.entry(id) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => {
// Trailers cannot open a stream. Trailers are header frames
// that do not contain pseudo headers. Requests MUST contain a
// method and responses MUST contain a status. If they do not,t
// hey are considered to be malformed.
if frame.is_trailers() {
// TODO: Should this be a different error?
return Err(UnexpectedFrameType.into());
}
let state = try!(self.inner.send.open(id));
e.insert(state)
}
};
if frame.is_trailers() {
try!(self.inner.send.send_trailers(state, frame.is_end_stream()));
} else {
try!(self.inner.send.send_headers(state, frame.is_end_stream()));
}
if state.is_closed() {
self.inner.dec_num_streams(id);
}
Ok(())
}
pub fn send_data<B: Buf>(&mut self, frame: &frame::Data<B>)
-> Result<(), ConnectionError>
{
let id = frame.stream_id();
let state = match self.streams.get_mut(&id) {
Some(state) => state,
None => return Err(UnexpectedFrameType.into()),
};
// Ensure there's enough capacity on the connection before acting on the
// stream.
try!(self.inner.send.send_data(frame, state));
if state.is_closed() {
self.inner.dec_num_streams(id);
}
Ok(())
}
pub fn poll_window_update(&mut self)
-> Poll<WindowUpdate, ConnectionError>
{
self.inner.send.poll_window_update(&mut self.streams)
}
pub fn expand_window(&mut self, id: StreamId, sz: WindowSize)
-> Result<(), ConnectionError>
{
if id.is_zero() {
try!(self.inner.recv.expand_connection_window(sz));
} else {
if let Some(state) = self.streams.get_mut(&id) {
try!(self.inner.recv.expand_stream_window(id, sz, state));
}
}
Ok(())
}
pub fn send_pending_refusal<T, B>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
B: Buf,
{
self.inner.recv.send_pending_refusal(dst)
}
pub fn send_pending_window_updates<T, B>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
B: Buf,
{
try_ready!(self.inner.recv.send_connection_window_update(dst));
try_ready!(self.inner.recv.send_stream_window_update(&mut self.streams, dst));
Ok(().into())
}
}
impl<P: Peer> Inner<P> {
fn dec_num_streams(&mut self, id: StreamId) {
if self.is_local_init(id) {
self.send.dec_num_streams();
} else {
self.recv.dec_num_streams();
}
}
fn is_local_init(&self, id: StreamId) -> bool {
assert!(!id.is_zero());
P::is_server() == id.is_server_initiated()
}
}

235
src/proto/streams/recv.rs Normal file
View File

@@ -0,0 +1,235 @@
use {frame, Peer, ConnectionError};
use proto::*;
use super::{Config, StreamMap};
use error::Reason::*;
use std::collections::VecDeque;
use std::marker::PhantomData;
#[derive(Debug)]
pub struct Recv<P> {
/// Maximum number of remote initiated streams
max_streams: Option<usize>,
/// Current number of remote initiated streams
num_streams: usize,
/// Initial window size of remote initiated streams
init_window_sz: WindowSize,
/// Connection level flow control governing received data
flow_control: state::FlowControl,
pending_window_updates: VecDeque<StreamId>,
/// Refused StreamId, this represents a frame that must be sent out.
refused: Option<StreamId>,
_p: PhantomData<P>,
}
impl<P: Peer> Recv<P> {
pub fn new(config: &Config) -> Self {
Recv {
max_streams: config.max_remote_initiated,
num_streams: 0,
init_window_sz: config.init_remote_window_sz,
flow_control: state::FlowControl::new(config.init_remote_window_sz),
pending_window_updates: VecDeque::new(),
refused: None,
_p: PhantomData,
}
}
/// Update state reflecting a new, remotely opened stream
///
/// Returns the stream state if successful. `None` if refused
pub fn open(&mut self, id: StreamId) -> Result<Option<state::Stream>, ConnectionError> {
assert!(self.refused.is_none());
try!(self.ensure_can_open(id));
if let Some(max) = self.max_streams {
if max <= self.num_streams {
self.refused = Some(id);
return Ok(None);
}
}
// Increment the number of remote initiated streams
self.num_streams += 1;
Ok(Some(state::Stream::default()))
}
/// Transition the stream state based on receiving headers
pub fn recv_headers(&mut self, state: &mut state::Stream, eos: bool)
-> Result<(), ConnectionError>
{
state.recv_open(self.init_window_sz, eos)
}
pub fn recv_trailers(&mut self, _state: &mut state::Stream, _eos: bool)
-> Result<(), ConnectionError>
{
unimplemented!();
}
pub fn recv_data(&mut self,
frame: &frame::Data,
state: &mut state::Stream)
-> Result<(), ConnectionError>
{
let sz = frame.payload().len();
if sz > MAX_WINDOW_SIZE as usize {
unimplemented!();
}
let sz = sz as WindowSize;
match state.recv_flow_control() {
Some(flow) => {
// Ensure there's enough capacity on the connection before
// acting on the stream.
try!(self.flow_control.ensure_window(sz, FlowControlError));
// Claim the window on the stream
try!(flow.claim_window(sz, FlowControlError));
// Claim the window on the connection.
self.flow_control.claim_window(sz, FlowControlError)
.expect("local connection flow control error");
}
None => return Err(ProtocolError.into()),
}
if frame.is_end_stream() {
try!(state.recv_close());
}
Ok(())
}
pub fn dec_num_streams(&mut self) {
self.num_streams -= 1;
}
/// Returns true if the remote peer can initiate a stream with the given ID.
fn ensure_can_open(&self, id: StreamId) -> Result<(), ConnectionError> {
if !P::is_server() {
// Remote is a server and cannot open streams. PushPromise is
// registered by reserving, so does not go through this path.
return Err(ProtocolError.into());
}
// Ensure that the ID is a valid server initiated ID
if !id.is_client_initiated() {
return Err(ProtocolError.into());
}
Ok(())
}
/// Send any pending refusals.
pub fn send_pending_refusal<T, B>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
B: Buf,
{
if let Some(stream_id) = self.refused.take() {
let frame = frame::Reset::new(stream_id, RefusedStream);
match dst.start_send(frame.into())? {
AsyncSink::Ready => {
self.reset(stream_id, RefusedStream);
return Ok(Async::Ready(()));
}
AsyncSink::NotReady(_) => {
self.refused = Some(stream_id);
return Ok(Async::NotReady);
}
}
}
Ok(Async::Ready(()))
}
pub fn expand_connection_window(&mut self, sz: WindowSize)
-> Result<(), ConnectionError>
{
// TODO: handle overflow
self.flow_control.expand_window(sz);
Ok(())
}
pub fn expand_stream_window(&mut self,
id: StreamId,
sz: WindowSize,
state: &mut state::Stream)
-> Result<(), ConnectionError>
{
// TODO: handle overflow
if let Some(flow) = state.recv_flow_control() {
flow.expand_window(sz);
self.pending_window_updates.push_back(id);
}
Ok(())
}
/// Send connection level window update
pub fn send_connection_window_update<T, B>(&mut self, dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
B: Buf,
{
if let Some(incr) = self.flow_control.peek_window_update() {
let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
if dst.start_send(frame.into())?.is_ready() {
assert_eq!(Some(incr), self.flow_control.apply_window_update());
} else {
return Ok(Async::NotReady);
}
}
Ok(().into())
}
/// Send stream level window update
pub fn send_stream_window_update<T, B>(&mut self,
streams: &mut StreamMap,
dst: &mut Codec<T, B>)
-> Poll<(), ConnectionError>
where T: AsyncWrite,
B: Buf,
{
while let Some(id) = self.pending_window_updates.pop_front() {
let flow = streams.get_mut(&id)
.and_then(|state| state.recv_flow_control());
if let Some(flow) = flow {
if let Some(incr) = flow.peek_window_update() {
let frame = frame::WindowUpdate::new(id, incr);
if dst.start_send(frame.into())?.is_ready() {
assert_eq!(Some(incr), flow.apply_window_update());
} else {
self.pending_window_updates.push_front(id);
return Ok(Async::NotReady);
}
}
}
}
Ok(().into())
}
fn reset(&mut self, _stream_id: StreamId, _reason: Reason) {
unimplemented!();
}
}

206
src/proto/streams/send.rs Normal file
View File

@@ -0,0 +1,206 @@
use {frame, Peer, ConnectionError};
use proto::*;
use super::{Config, StreamMap};
use error::User::*;
use bytes::Buf;
use std::collections::VecDeque;
use std::marker::PhantomData;
#[derive(Debug)]
pub struct Send<P> {
/// Maximum number of locally initiated streams
max_streams: Option<usize>,
/// Current number of locally initiated streams
num_streams: usize,
/// Initial window size of locally initiated streams
init_window_sz: WindowSize,
/// Connection level flow control governing sent data
flow_control: state::FlowControl,
/// Holds the list of streams on which local window updates may be sent.
// XXX It would be cool if this didn't exist.
pending_window_updates: VecDeque<StreamId>,
/// When `poll_window_update` is not ready, then the calling task is saved to
/// be notified later. Access to poll_window_update must not be shared across tasks,
/// as we only track a single task (and *not* i.e. a task per stream id).
blocked: Option<task::Task>,
_p: PhantomData<P>,
}
impl<P: Peer> Send<P> {
pub fn new(config: &Config) -> Self {
Send {
max_streams: config.max_local_initiated,
num_streams: 0,
init_window_sz: config.init_local_window_sz,
flow_control: state::FlowControl::new(config.init_local_window_sz),
pending_window_updates: VecDeque::new(),
blocked: None,
_p: PhantomData,
}
}
/// Update state reflecting a new, locally opened stream
///
/// Returns the stream state if successful. `None` if refused
pub fn open(&mut self, id: StreamId) -> Result<state::Stream, ConnectionError> {
try!(self.ensure_can_open(id));
if let Some(max) = self.max_streams {
if max <= self.num_streams {
return Err(Rejected.into());
}
}
// Increment the number of locally initiated streams
self.num_streams += 1;
Ok(state::Stream::default())
}
pub fn send_headers(&mut self, state: &mut state::Stream, eos: bool)
-> Result<(), ConnectionError>
{
state.send_open(self.init_window_sz, eos)
}
pub fn send_trailers(&mut self, _state: &mut state::Stream, _eos: bool)
-> Result<(), ConnectionError>
{
unimplemented!();
}
pub fn send_data<B: Buf>(&mut self,
frame: &frame::Data<B>,
state: &mut state::Stream)
-> Result<(), ConnectionError>
{
let sz = frame.payload().remaining();
if sz > MAX_WINDOW_SIZE as usize {
// TODO: handle overflow
unimplemented!();
}
let sz = sz as WindowSize;
// Make borrow checker happy
loop {
match state.send_flow_control() {
Some(flow) => {
try!(self.flow_control.ensure_window(sz, FlowControlViolation));
// Claim the window on the stream
try!(flow.claim_window(sz, FlowControlViolation));
// Claim the window on the connection
self.flow_control.claim_window(sz, FlowControlViolation)
.expect("local connection flow control error");
break;
}
None => {}
}
if state.is_closed() {
return Err(InactiveStreamId.into())
} else {
return Err(UnexpectedFrameType.into())
}
}
if frame.is_end_stream() {
try!(state.send_close());
}
Ok(())
}
/// Get pending window updates
pub fn poll_window_update(&mut self, streams: &mut StreamMap)
-> Poll<WindowUpdate, ConnectionError>
{
// This biases connection window updates, which probably makes sense.
//
// TODO: We probably don't want to expose connection level updates
if let Some(incr) = self.flow_control.apply_window_update() {
return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr)));
}
// TODO this should probably account for stream priority?
let update = self.pending_window_updates.pop_front()
.and_then(|id| {
streams.get_mut(&id)
.and_then(|state| state.send_flow_control())
.and_then(|flow| flow.apply_window_update())
.map(|incr| WindowUpdate::new(id, incr))
});
if let Some(update) = update {
return Ok(Async::Ready(update));
}
// Update the task.
//
// TODO: Extract this "gate" logic
self.blocked = Some(task::current());
return Ok(Async::NotReady);
}
pub fn recv_connection_window_update(&mut self, frame: frame::WindowUpdate)
-> Result<(), ConnectionError>
{
// TODO: Handle invalid increment
self.flow_control.expand_window(frame.size_increment());
if let Some(task) = self.blocked.take() {
task.notify();
}
Ok(())
}
pub fn recv_stream_window_update(&mut self,
frame: frame::WindowUpdate,
state: &mut state::Stream)
-> Result<(), ConnectionError>
{
if let Some(flow) = state.send_flow_control() {
// TODO: Handle invalid increment
flow.expand_window(frame.size_increment());
}
if let Some(task) = self.blocked.take() {
task.notify();
}
Ok(())
}
pub fn dec_num_streams(&mut self) {
self.num_streams -= 1;
}
/// Returns true if the local actor can initiate a stream with the given ID.
fn ensure_can_open(&self, id: StreamId) -> Result<(), ConnectionError> {
if P::is_server() {
// Servers cannot open streams. PushPromise must first be reserved.
return Err(UnexpectedFrameType.into());
}
if !id.is_client_initiated() {
return Err(InvalidStreamId.into());
}
Ok(())
}
}

39
src/proto/traits.rs Normal file
View File

@@ -0,0 +1,39 @@
use ConnectionError;
use proto::*;
/// An alias for types that implement Stream + Sink over H2 frames.
pub trait FrameStream<B>: Stream<Item = Frame, Error = ConnectionError> +
Sink<SinkItem = Frame<B>, SinkError = ConnectionError>
{
fn poll_ready(&mut self) -> Poll<(), ConnectionError>;
}
pub trait Stage<B> {
fn poll<T>(&mut self, upstream: &mut T) -> Poll<Option<Frame>, ConnectionError>
where T: FrameStream<B>,
{
upstream.poll()
}
fn poll_ready<T>(&mut self, upstream: &mut T) -> Poll<(), ConnectionError>
where T: FrameStream<B>,
{
upstream.poll_ready()
}
fn start_send<T>(&mut self, item: Frame<B>, upstream: &mut T) -> StartSend<Frame<B>, ConnectionError>
where T: FrameStream<B>,
{
upstream.start_send(item)
}
fn poll_complete<T>(&mut self, upstream: &mut T) -> Poll<(), ConnectionError>
where T: FrameStream<B>,
{
upstream.poll_complete()
}
}
pub trait StreamStage {
}

View File

@@ -1,7 +1,9 @@
#![allow(warnings)]
use {frame, proto, Peer, ConnectionError, StreamId};
use http;
use futures::{Future, Sink, Poll, Async};
use futures::{Future, Sink, Poll, Async, AsyncSink, IntoFuture};
use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, IntoBuf};
@@ -46,13 +48,24 @@ pub fn handshake2<T, B: IntoBuf>(io: T) -> Handshake<T, B>
where T: AsyncRead + AsyncWrite + 'static,
B: 'static, // TODO: Why is this required but not in client?
{
let local_settings = frame::SettingSet::default();
let transport = proto::server_handshaker(io, local_settings.clone());
let mut framed_write = proto::framed_write(io);
let settings = frame::Settings::default();
// Send initial settings frame
match framed_write.start_send(settings.into()) {
Ok(AsyncSink::Ready) => {}
Ok(_) => unreachable!(),
Err(e) => {
return Handshake {
inner: Box::new(Err(ConnectionError::from(e)).into_future()),
}
}
}
// Flush pending settings frame and then wait for the client preface
let handshake = Flush::new(transport)
let handshake = Flush::new(framed_write)
.and_then(ReadPreface::new)
.map(move |t| proto::from_server_handshaker(t, local_settings))
.map(move |framed_write| proto::from_framed_write(framed_write))
;
Handshake { inner: Box::new(handshake) }

View File

@@ -235,7 +235,7 @@ fn send_data_after_headers_eos() {
// Send the data
let err = h2.send_data(id, body.into(), true).wait().unwrap_err();
assert_user_err!(err, InactiveStreamId);
assert_user_err!(err, UnexpectedFrameType);
}
#[test]
@@ -250,7 +250,7 @@ fn send_data_without_headers() {
let b = Bytes::from_static(b"hello world");
let err = h2.send_data(1.into(), b, true).wait().unwrap_err();
assert_user_err!(err, InactiveStreamId);
assert_user_err!(err, UnexpectedFrameType);
}
#[test]