Use rustfmt to enforce consistent formatting

This change adds a .rustfmt.toml that includes ALL supported settings,
12 of which we have overridden to attempt to cater to our own
proclivities.

rustfmt is checked in the rust-nightly CI job.
This commit is contained in:
Oliver Gould
2017-09-08 17:20:41 +00:00
parent 93925e6d1f
commit 897bf84163
60 changed files with 2087 additions and 1620 deletions

View File

@@ -1,13 +1,13 @@
use {client, frame, server, proto};
use {client, frame, proto, server};
use codec::{RecvError, SendError};
use frame::Reason;
use codec::{SendError, RecvError};
use frame::DEFAULT_INITIAL_WINDOW_SIZE;
use proto::*;
use http::Request;
use futures::{Stream};
use bytes::{Bytes, IntoBuf};
use futures::Stream;
use http::Request;
use tokio_io::{AsyncRead, AsyncWrite};
use std::marker::PhantomData;
@@ -15,7 +15,8 @@ use std::marker::PhantomData;
/// An H2 connection
#[derive(Debug)]
pub(crate) struct Connection<T, P, B: IntoBuf = Bytes>
where P: Peer,
where
P: Peer,
{
/// Tracks the connection level state transitions.
state: State,
@@ -52,18 +53,19 @@ enum State {
}
impl<T, P, B> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
where
T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
pub fn new(codec: Codec<T, Prioritized<B::Buf>>) -> Connection<T, P, B> {
// TODO: Actually configure
let streams = Streams::new(streams::Config {
max_remote_initiated: None,
init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
max_local_initiated: None,
init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
});
max_remote_initiated: None,
init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
max_local_initiated: None,
init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
});
Connection {
state: State::Open,
@@ -83,7 +85,8 @@ impl<T, P, B> Connection<T, P, B>
// The order of these calls don't really matter too much as only one
// should have pending work.
try_ready!(self.ping_pong.send_pending_pong(&mut self.codec));
try_ready!(self.settings.send_pending_ack(&mut self.codec, &mut self.streams));
try_ready!(self.settings
.send_pending_ack(&mut self.codec, &mut self.streams));
try_ready!(self.streams.send_pending_refusal(&mut self.codec));
Ok(().into())
@@ -128,7 +131,10 @@ impl<T, P, B> Connection<T, P, B>
// Attempting to read a frame resulted in a stream level error.
// This is handled by resetting the frame then trying to read
// another frame.
Err(Stream { id, reason }) => {
Err(Stream {
id,
reason,
}) => {
trace!("stream level error; id={:?}; reason={:?}", id, reason);
self.streams.send_reset(id, reason);
}
@@ -146,14 +152,16 @@ impl<T, P, B> Connection<T, P, B>
return Err(e);
}
}
},
}
State::GoAway(frame) => {
// Ensure the codec is ready to accept the frame
try_ready!(self.codec.poll_ready());
// Buffer the GO_AWAY frame
self.codec.buffer(frame.into())
.ok().expect("invalid GO_AWAY frame");
self.codec
.buffer(frame.into())
.ok()
.expect("invalid GO_AWAY frame");
// GO_AWAY sent, transition the connection to an errored state
self.state = State::Flush(frame.reason());
@@ -182,15 +190,15 @@ impl<T, P, B> Connection<T, P, B>
match try_ready!(self.codec.poll()) {
Some(Headers(frame)) => {
trace!("recv HEADERS; frame={:?}", frame);
try!(self.streams.recv_headers(frame));
self.streams.recv_headers(frame)?;
}
Some(Data(frame)) => {
trace!("recv DATA; frame={:?}", frame);
try!(self.streams.recv_data(frame));
self.streams.recv_data(frame)?;
}
Some(Reset(frame)) => {
trace!("recv RST_STREAM; frame={:?}", frame);
try!(self.streams.recv_reset(frame));
self.streams.recv_reset(frame)?;
}
Some(PushPromise(frame)) => {
trace!("recv PUSH_PROMISE; frame={:?}", frame);
@@ -229,8 +237,9 @@ impl<T, P, B> Connection<T, P, B>
}
impl<T, B> Connection<T, client::Peer, B>
where T: AsyncRead + AsyncWrite,
B: IntoBuf,
where
T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
/// Returns `Ready` when new the connection is able to support a new request stream.
pub fn poll_send_request_ready(&mut self) -> Async<()> {
@@ -238,16 +247,19 @@ impl<T, B> Connection<T, client::Peer, B>
}
/// Initialize a new HTTP/2.0 stream and send the message.
pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool)
-> Result<StreamRef<B::Buf, client::Peer>, SendError>
{
pub fn send_request(
&mut self,
request: Request<()>,
end_of_stream: bool,
) -> Result<StreamRef<B::Buf, client::Peer>, SendError> {
self.streams.send_request(request, end_of_stream)
}
}
impl<T, B> Connection<T, server::Peer, B>
where T: AsyncRead + AsyncWrite,
B: IntoBuf,
where
T: AsyncRead + AsyncWrite,
B: IntoBuf,
{
pub fn next_incoming(&mut self) -> Option<StreamRef<B::Buf, server::Peer>> {
self.streams.next_incoming()
@@ -256,9 +268,10 @@ impl<T, B> Connection<T, server::Peer, B>
#[cfg(feature = "unstable")]
impl<T, P, B> Connection<T, P, B>
where T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
where
T: AsyncRead + AsyncWrite,
P: Peer,
B: IntoBuf,
{
pub fn num_active_streams(&self) -> usize {
self.streams.num_active_streams()

View File

@@ -1,5 +1,5 @@
use frame::Reason;
use codec::RecvError;
use frame::Reason;
use std::io;

View File

@@ -8,7 +8,7 @@ mod streams;
pub(crate) use self::connection::Connection;
pub(crate) use self::error::Error;
pub(crate) use self::peer::Peer;
pub(crate) use self::streams::{Streams, StreamRef};
pub(crate) use self::streams::{StreamRef, Streams};
use codec::Codec;
@@ -18,7 +18,7 @@ use self::streams::Prioritized;
use frame::{self, Frame};
use futures::{task, Poll, Async};
use futures::{task, Async, Poll};
use futures::task::Task;
use bytes::Buf;

View File

@@ -1,5 +1,5 @@
use frame::{Headers, StreamId};
use codec::RecvError;
use frame::{Headers, StreamId};
use std::fmt;
@@ -13,10 +13,7 @@ pub trait Peer {
fn is_server() -> bool;
fn convert_send_message(
id: StreamId,
headers: Self::Send,
end_of_stream: bool) -> Headers;
fn convert_send_message(id: StreamId, headers: Self::Send, end_of_stream: bool) -> Headers;
fn convert_poll_message(headers: Headers) -> Result<Self::Poll, RecvError>;

View File

@@ -14,7 +14,8 @@ pub struct PingPong<B> {
}
impl<B> PingPong<B>
where B: Buf,
where
B: Buf,
{
pub fn new() -> Self {
PingPong {
@@ -46,7 +47,8 @@ impl<B> PingPong<B>
/// Send any pending pongs.
pub fn send_pending_pong<T>(&mut self, dst: &mut Codec<T, B>) -> Poll<(), io::Error>
where T: AsyncWrite,
where
T: AsyncWrite,
{
if let Some(pong) = self.sending_pong.take() {
if !dst.poll_ready()?.is_ready() {

View File

@@ -1,5 +1,5 @@
use frame;
use codec::RecvError;
use frame;
use proto::*;
#[derive(Debug)]
@@ -20,21 +20,23 @@ impl Settings {
pub fn recv_settings(&mut self, frame: frame::Settings) {
if frame.is_ack() {
debug!("received remote settings ack");
// TODO: handle acks
// TODO: handle acks
} else {
assert!(self.pending.is_none());
self.pending = Some(frame);
}
}
pub fn send_pending_ack<T, B, C, P>(&mut self,
dst: &mut Codec<T, B>,
streams: &mut Streams<C, P>)
-> Poll<(), RecvError>
where T: AsyncWrite,
B: Buf,
C: Buf,
P: Peer,
pub fn send_pending_ack<T, B, C, P>(
&mut self,
dst: &mut Codec<T, B>,
streams: &mut Streams<C, P>,
) -> Poll<(), RecvError>
where
T: AsyncWrite,
B: Buf,
C: Buf,
P: Peer,
{
trace!("send_pending_ack; pending={:?}", self.pending);
@@ -48,7 +50,9 @@ impl Settings {
let frame = frame::Settings::ack();
// Buffer the settings frame
dst.buffer(frame.into()).ok().expect("invalid settings frame");
dst.buffer(frame.into())
.ok()
.expect("invalid settings frame");
trace!("ACK sent; applying settings");

View File

@@ -50,9 +50,9 @@ impl<T> Deque<T> {
pub fn push_back(&mut self, buf: &mut Buffer<T>, value: T) {
let key = buf.slab.insert(Slot {
value,
next: None,
});
value,
next: None,
});
match self.indices {
Some(ref mut idxs) => {
@@ -61,18 +61,18 @@ impl<T> Deque<T> {
}
None => {
self.indices = Some(Indices {
head: key,
tail: key,
});
head: key,
tail: key,
});
}
}
}
pub fn push_front(&mut self, buf: &mut Buffer<T>, value: T) {
let key = buf.slab.insert(Slot {
value,
next: None,
});
value,
next: None,
});
match self.indices {
Some(ref mut idxs) => {
@@ -81,9 +81,9 @@ impl<T> Deque<T> {
}
None => {
self.indices = Some(Indices {
head: key,
tail: key,
});
head: key,
tail: key,
});
}
}
}
@@ -109,9 +109,7 @@ impl<T> Deque<T> {
pub fn peek_front<'a>(&self, buf: &'a Buffer<T>) -> Option<&'a T> {
match self.indices {
Some(idxs) => {
Some(&buf.slab[idxs.head].value)
}
Some(idxs) => Some(&buf.slab[idxs.head].value),
None => None,
}
}

View File

@@ -1,12 +1,13 @@
use client;
use super::*;
use client;
use std::usize;
use std::marker::PhantomData;
use std::usize;
#[derive(Debug)]
pub(super) struct Counts<P>
where P: Peer,
where
P: Peer,
{
/// Maximum number of locally initiated streams
max_send_streams: Option<usize>,
@@ -27,7 +28,8 @@ pub(super) struct Counts<P>
}
impl<P> Counts<P>
where P: Peer,
where
P: Peer,
{
/// Create a new `Counts` using the provided configuration values.
pub fn new(config: &Config) -> Self {
@@ -94,7 +96,8 @@ impl<P> Counts<P>
/// If the stream state transitions to closed, this function will perform
/// all necessary cleanup.
pub fn transition<F, B, U>(&mut self, mut stream: store::Ptr<B, P>, f: F) -> U
where F: FnOnce(&mut Self, &mut store::Ptr<B, P>) -> U
where
F: FnOnce(&mut Self, &mut store::Ptr<B, P>) -> U,
{
let is_counted = stream.state.is_counted();

View File

@@ -1,5 +1,5 @@
use frame::Reason;
use proto::{MAX_WINDOW_SIZE, WindowSize};
use proto::{WindowSize, MAX_WINDOW_SIZE};
// We don't want to send WINDOW_UPDATE frames for tiny changes, but instead
// aggregate them when the changes are significant. Many implementations do
@@ -87,8 +87,7 @@ impl FlowControl {
}
let unclaimed = available - self.window_size;
let threshold = self.window_size / UNCLAIMED_DENOMINATOR
* UNCLAIMED_NUMERATOR;
let threshold = self.window_size / UNCLAIMED_DENOMINATOR * UNCLAIMED_NUMERATOR;
if unclaimed < threshold {
None
@@ -111,7 +110,10 @@ impl FlowControl {
return Err(Reason::FlowControlError);
}
trace!("inc_window; sz={}; old={}; new={}", sz, self.window_size, val);
trace!("inc_window; sz={}; old={}; new={}",
sz,
self.window_size,
val);
self.window_size = val;
Ok(())
@@ -130,7 +132,9 @@ impl FlowControl {
/// must ensure that the window has capacity.
pub fn send_data(&mut self, sz: WindowSize) {
trace!("send_data; sz={}; window={}; available={}",
sz, self.window_size, self.available);
sz,
self.window_size,
self.available);
// Ensure that the argument is correct
assert!(sz <= self.window_size as WindowSize);

View File

@@ -9,8 +9,8 @@ mod store;
mod stream;
mod streams;
pub(crate) use self::streams::{Streams, StreamRef};
pub(crate) use self::prioritize::Prioritized;
pub(crate) use self::streams::{StreamRef, Streams};
use self::buffer::Buffer;
use self::counts::Counts;
@@ -19,15 +19,15 @@ use self::prioritize::Prioritize;
use self::recv::Recv;
use self::send::Send;
use self::state::State;
use self::store::{Store, Entry};
use self::store::{Entry, Store};
use self::stream::Stream;
use error::Reason::*;
use frame::StreamId;
use proto::*;
use error::Reason::*;
use http::{Request, Response};
use bytes::Bytes;
use http::{Request, Response};
#[derive(Debug)]
pub struct Config {

View File

@@ -8,12 +8,13 @@ use codec::UserError::*;
use bytes::buf::Take;
use std::{cmp, fmt};
use std::io;
use std::{fmt, cmp};
#[derive(Debug)]
pub(super) struct Prioritize<B, P>
where P: Peer,
where
P: Peer,
{
/// Queue of streams waiting for socket capacity to send a frame
pending_send: store::Queue<B, stream::NextSend, P>,
@@ -41,14 +42,16 @@ pub(crate) struct Prioritized<B> {
// ===== impl Prioritize =====
impl<B, P> Prioritize<B, P>
where B: Buf,
P: Peer,
where
B: Buf,
P: Peer,
{
pub fn new(config: &Config) -> Prioritize<B, P> {
let mut flow = FlowControl::new();
flow.inc_window(config.init_local_window_sz)
.ok().expect("invalid initial window size");
.ok()
.expect("invalid initial window size");
flow.assign_capacity(config.init_local_window_sz);
@@ -63,11 +66,12 @@ impl<B, P> Prioritize<B, P>
}
/// Queue a frame to be sent to the remote
pub fn queue_frame(&mut self,
frame: Frame<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
{
pub fn queue_frame(
&mut self,
frame: Frame<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) {
// Queue the frame in the buffer
stream.pending_send.push_back(&mut self.buffer, frame);
@@ -81,12 +85,12 @@ impl<B, P> Prioritize<B, P>
}
/// Send a data frame
pub fn send_data(&mut self,
frame: frame::Data<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), UserError>
{
pub fn send_data(
&mut self,
frame: frame::Data<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) -> Result<(), UserError> {
let sz = frame.payload().remaining();
if sz > MAX_WINDOW_SIZE as usize {
@@ -108,7 +112,9 @@ impl<B, P> Prioritize<B, P>
stream.buffered_send_data += sz;
trace!("send_data; sz={}; buffered={}; requested={}",
sz, stream.buffered_send_data, stream.requested_send_capacity);
sz,
stream.buffered_send_data,
stream.requested_send_capacity);
// Implicitly request more send capacity if not enough has been
// requested yet.
@@ -136,7 +142,9 @@ impl<B, P> Prioritize<B, P>
// The stream has no capacity to send the frame now, save it but
// don't notify the conneciton task. Once additional capacity
// becomes available, the frame will be flushed.
stream.pending_send.push_back(&mut self.buffer, frame.into());
stream
.pending_send
.push_back(&mut self.buffer, frame.into());
}
Ok(())
@@ -183,13 +191,16 @@ impl<B, P> Prioritize<B, P>
}
}
pub fn recv_stream_window_update(&mut self,
inc: WindowSize,
stream: &mut store::Ptr<B, P>)
-> Result<(), Reason>
{
pub fn recv_stream_window_update(
&mut self,
inc: WindowSize,
stream: &mut store::Ptr<B, P>,
) -> Result<(), Reason> {
trace!("recv_stream_window_update; stream={:?}; state={:?}; inc={}; flow={:?}",
stream.id, stream.state, inc, stream.send_flow);
stream.id,
stream.state,
inc,
stream.send_flow);
// Update the stream level flow control.
stream.send_flow.inc_window(inc)?;
@@ -201,11 +212,11 @@ impl<B, P> Prioritize<B, P>
Ok(())
}
pub fn recv_connection_window_update(&mut self,
inc: WindowSize,
store: &mut Store<B, P>)
-> Result<(), Reason>
{
pub fn recv_connection_window_update(
&mut self,
inc: WindowSize,
store: &mut Store<B, P>,
) -> Result<(), Reason> {
// Update the connection's window
self.flow.inc_window(inc)?;
@@ -213,10 +224,9 @@ impl<B, P> Prioritize<B, P>
Ok(())
}
pub fn assign_connection_capacity<R>(&mut self,
inc: WindowSize,
store: &mut R)
where R: Resolve<B, P>
pub fn assign_connection_capacity<R>(&mut self, inc: WindowSize, store: &mut R)
where
R: Resolve<B, P>,
{
self.flow.assign_capacity(inc);
@@ -244,10 +254,9 @@ impl<B, P> Prioritize<B, P>
// The amount of additional capacity that the stream requests.
// Don't assign more than the window has available!
let additional = cmp::min(
total_requested - stream.send_flow.available(),
// Can't assign more than what is available
stream.send_flow.window_size() - stream.send_flow.available());
let additional = cmp::min(total_requested - stream.send_flow.available(),
// Can't assign more than what is available
stream.send_flow.window_size() - stream.send_flow.available());
trace!("try_assign_capacity; requested={}; additional={}; buffered={}; window={}; conn={}",
total_requested,
@@ -265,7 +274,8 @@ impl<B, P> Prioritize<B, P>
// streaming state (more data could be sent) or there is buffered data
// waiting to be sent.
debug_assert!(stream.state.is_send_streaming() || stream.buffered_send_data > 0,
"state={:?}", stream.state);
"state={:?}",
stream.state);
// The amount of currently available capacity on the connection
let conn_available = self.flow.available();
@@ -286,7 +296,8 @@ impl<B, P> Prioritize<B, P>
self.flow.claim_capacity(assign);
}
trace!("try_assign_capacity; available={}; requested={}; buffered={}; has_unavailable={:?}",
trace!("try_assign_capacity; available={}; requested={}; buffered={}; \
has_unavailable={:?}",
stream.send_flow.available(),
stream.requested_send_capacity,
stream.buffered_send_data,
@@ -324,12 +335,14 @@ impl<B, P> Prioritize<B, P>
}
}
pub fn poll_complete<T>(&mut self,
store: &mut Store<B, P>,
counts: &mut Counts<P>,
dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), io::Error>
where T: AsyncWrite,
pub fn poll_complete<T>(
&mut self,
store: &mut Store<B, P>,
counts: &mut Counts<P>,
dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error>
where
T: AsyncWrite,
{
// Ensure codec is ready
try_ready!(dst.poll_ready());
@@ -378,15 +391,18 @@ impl<B, P> Prioritize<B, P>
/// When a data frame is written to the codec, it may not be written in its
/// entirety (large chunks are split up into potentially many data frames).
/// In this case, the stream needs to be reprioritized.
fn reclaim_frame<T>(&mut self,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>) -> bool
{
fn reclaim_frame<T>(
&mut self,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>,
) -> bool {
trace!("try reclaim frame");
// First check if there are any data chunks to take back
if let Some(frame) = dst.take_last_data_frame() {
trace!(" -> reclaimed; frame={:?}; sz={}", frame, frame.payload().remaining());
trace!(" -> reclaimed; frame={:?}; sz={}",
frame,
frame.payload().remaining());
let mut eos = false;
let key = frame.payload().stream;
@@ -435,9 +451,12 @@ impl<B, P> Prioritize<B, P>
}
}
fn pop_frame(&mut self, store: &mut Store<B, P>, max_len: usize, counts: &mut Counts<P>)
-> Option<Frame<Prioritized<B>>>
{
fn pop_frame(
&mut self,
store: &mut Store<B, P>,
max_len: usize,
counts: &mut Counts<P>,
) -> Option<Frame<Prioritized<B>>> {
trace!("pop_frame");
loop {
@@ -455,7 +474,8 @@ impl<B, P> Prioritize<B, P>
let stream_capacity = stream.send_flow.available();
let sz = frame.payload().remaining();
trace!(" --> data frame; stream={:?}; sz={}; eos={:?}; window={}; available={}; requested={}",
trace!(" --> data frame; stream={:?}; sz={}; eos={:?}; \
window={}; available={}; requested={}",
frame.stream_id(),
sz,
frame.is_end_stream(),
@@ -479,7 +499,9 @@ impl<B, P> Prioritize<B, P>
// happen if the remote reduced the stream
// window. In this case, we need to buffer the
// frame and wait for a window update...
stream.pending_send.push_front(&mut self.buffer, frame.into());
stream
.pending_send
.push_front(&mut self.buffer, frame.into());
continue;
}
@@ -556,7 +578,8 @@ impl<B, P> Prioritize<B, P>
// ===== impl Prioritized =====
impl<B> Buf for Prioritized<B>
where B: Buf,
where
B: Buf,
{
fn remaining(&self) -> usize {
self.inner.remaining()

View File

@@ -1,8 +1,8 @@
use {client, server, frame, proto};
use frame::Reason;
use codec::{RecvError, UserError};
use proto::*;
use super::*;
use {client, frame, proto, server};
use codec::{RecvError, UserError};
use frame::Reason;
use proto::*;
use http::HeaderMap;
@@ -11,7 +11,8 @@ use std::marker::PhantomData;
#[derive(Debug)]
pub(super) struct Recv<B, P>
where P: Peer,
where
P: Peer,
{
/// Initial window size of remote initiated streams
init_window_sz: WindowSize,
@@ -54,20 +55,18 @@ struct Indices {
}
impl<B, P> Recv<B, P>
where B: Buf,
P: Peer,
where
B: Buf,
P: Peer,
{
pub fn new(config: &Config) -> Self {
let next_stream_id = if P::is_server() {
1
} else {
2
};
let next_stream_id = if P::is_server() { 1 } else { 2 };
let mut flow = FlowControl::new();
flow.inc_window(config.init_remote_window_sz)
.ok().expect("invalid initial remote window size");
.ok()
.expect("invalid initial remote window size");
flow.assign_capacity(config.init_remote_window_sz);
Recv {
@@ -96,12 +95,14 @@ impl<B, P> Recv<B, P>
/// Update state reflecting a new, remotely opened stream
///
/// Returns the stream state if successful. `None` if refused
pub fn open(&mut self, id: StreamId, counts: &mut Counts<P>)
-> Result<Option<StreamId>, RecvError>
{
pub fn open(
&mut self,
id: StreamId,
counts: &mut Counts<P>,
) -> Result<Option<StreamId>, RecvError> {
assert!(self.refused.is_none());
try!(self.ensure_can_open(id));
self.ensure_can_open(id)?;
if id < self.next_stream_id {
return Err(RecvError::Connection(ProtocolError));
@@ -121,12 +122,12 @@ impl<B, P> Recv<B, P>
/// Transition the stream state based on receiving headers
///
/// The caller ensures that the frame represents headers and not trailers.
pub fn recv_headers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B, P>,
counts: &mut Counts<P>)
-> Result<(), RecvError>
{
pub fn recv_headers(
&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B, P>,
counts: &mut Counts<P>,
) -> Result<(), RecvError> {
trace!("opening stream; init_window={}", self.init_window_sz);
let is_initial = stream.state.recv_open(frame.is_end_stream())?;
@@ -159,7 +160,9 @@ impl<B, P> Recv<B, P>
let message = P::convert_poll_message(frame)?;
// Push the frame onto the stream's recv buffer
stream.pending_recv.push_back(&mut self.buffer, Event::Headers(message));
stream
.pending_recv
.push_back(&mut self.buffer, Event::Headers(message));
stream.notify_recv();
// Only servers can receive a headers frame that initiates the stream.
@@ -172,37 +175,39 @@ impl<B, P> Recv<B, P>
}
/// Transition the stream based on receiving trailers
pub fn recv_trailers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B, P>)
-> Result<(), RecvError>
{
pub fn recv_trailers(
&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B, P>,
) -> Result<(), RecvError> {
// Transition the state
stream.state.recv_close()?;
if stream.ensure_content_length_zero().is_err() {
return Err(RecvError::Stream {
id: stream.id,
reason: ProtocolError,
});
id: stream.id,
reason: ProtocolError,
});
}
let trailers = frame.into_fields();
// Push the frame onto the stream's recv buffer
stream.pending_recv.push_back(&mut self.buffer, Event::Trailers(trailers));
stream
.pending_recv
.push_back(&mut self.buffer, Event::Trailers(trailers));
stream.notify_recv();
Ok(())
}
/// Releases capacity back to the connection
pub fn release_capacity(&mut self,
capacity: WindowSize,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), UserError>
{
pub fn release_capacity(
&mut self,
capacity: WindowSize,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) -> Result<(), UserError> {
trace!("release_capacity; size={}", capacity);
if capacity > stream.in_flight_recv_data {
@@ -233,16 +238,18 @@ impl<B, P> Recv<B, P>
return false;
}
stream.pending_recv.peek_front(&self.buffer)
stream
.pending_recv
.peek_front(&self.buffer)
.map(|event| !event.is_data())
.unwrap_or(true)
}
pub fn recv_data(&mut self,
frame: frame::Data,
stream: &mut store::Ptr<B, P>)
-> Result<(), RecvError>
{
pub fn recv_data(
&mut self,
frame: frame::Data,
stream: &mut store::Ptr<B, P>,
) -> Result<(), RecvError> {
let sz = frame.payload().len();
if sz > MAX_WINDOW_SIZE as usize {
@@ -258,7 +265,9 @@ impl<B, P> Recv<B, P>
}
trace!("recv_data; size={}; connection={}; stream={}",
sz, self.flow.window_size(), stream.recv_flow.window_size());
sz,
self.flow.window_size(),
stream.recv_flow.window_size());
// Ensure that there is enough capacity on the connection before acting
// on the stream.
@@ -277,17 +286,17 @@ impl<B, P> Recv<B, P>
if stream.dec_content_length(frame.payload().len()).is_err() {
return Err(RecvError::Stream {
id: stream.id,
reason: ProtocolError,
});
id: stream.id,
reason: ProtocolError,
});
}
if frame.is_end_stream() {
if stream.ensure_content_length_zero().is_err() {
return Err(RecvError::Stream {
id: stream.id,
reason: ProtocolError,
});
id: stream.id,
reason: ProtocolError,
});
}
if stream.state.recv_close().is_err() {
@@ -304,18 +313,20 @@ impl<B, P> Recv<B, P>
Ok(())
}
pub fn recv_push_promise(&mut self,
frame: frame::PushPromise,
send: &Send<B, P>,
stream: store::Key,
store: &mut Store<B, P>)
-> Result<(), RecvError>
{
pub fn recv_push_promise(
&mut self,
frame: frame::PushPromise,
send: &Send<B, P>,
stream: store::Key,
store: &mut Store<B, P>,
) -> Result<(), RecvError> {
// First, make sure that the values are legit
self.ensure_can_reserve(frame.promised_id())?;
// Make sure that the stream state is valid
store[stream].state.ensure_recv_open()
store[stream]
.state
.ensure_recv_open()
.map_err(|e| e.into_connection_recv_error())?;
// TODO: Streams in the reserved states do not count towards the concurrency
@@ -332,10 +343,9 @@ impl<B, P> Recv<B, P>
// TODO: All earlier stream IDs should be implicitly closed.
// Now, create a new entry for the stream
let mut new_stream = Stream::new(
frame.promised_id(),
send.init_window_sz(),
self.init_window_sz);
let mut new_stream = Stream::new(frame.promised_id(),
send.init_window_sz(),
self.init_window_sz);
new_stream.state.reserve_remote()?;
@@ -343,8 +353,7 @@ impl<B, P> Recv<B, P>
{
// Store the stream
let mut new_stream = store
.insert(frame.promised_id(), new_stream);
let mut new_stream = store.insert(frame.promised_id(), new_stream);
ppp.push(&mut new_stream);
}
@@ -366,9 +375,11 @@ impl<B, P> Recv<B, P>
Ok(())
}
pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream<B, P>)
-> Result<(), RecvError>
{
pub fn recv_reset(
&mut self,
frame: frame::Reset,
stream: &mut Stream<B, P>,
) -> Result<(), RecvError> {
let err = proto::Error::Proto(frame.reason());
// Notify the stream
@@ -387,9 +398,7 @@ impl<B, P> Recv<B, P>
}
/// Returns true if the remote peer can initiate a stream with the given ID.
fn ensure_can_open(&self, id: StreamId)
-> Result<(), RecvError>
{
fn ensure_can_open(&self, id: StreamId) -> Result<(), RecvError> {
if !P::is_server() {
// Remote is a server and cannot open streams. PushPromise is
// registered by reserving, so does not go through this path.
@@ -405,9 +414,7 @@ impl<B, P> Recv<B, P>
}
/// Returns true if the remote peer can reserve a stream with the given ID.
fn ensure_can_reserve(&self, promised_id: StreamId)
-> Result<(), RecvError>
{
fn ensure_can_reserve(&self, promised_id: StreamId) -> Result<(), RecvError> {
// TODO: Are there other rules?
if P::is_server() {
// The remote is a client and cannot reserve
@@ -422,9 +429,12 @@ impl<B, P> Recv<B, P>
}
/// Send any pending refusals.
pub fn send_pending_refusal<T>(&mut self, dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), io::Error>
where T: AsyncWrite,
pub fn send_pending_refusal<T>(
&mut self,
dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error>
where
T: AsyncWrite,
{
if let Some(stream_id) = self.refused {
try_ready!(dst.poll_ready());
@@ -433,7 +443,9 @@ impl<B, P> Recv<B, P>
let frame = frame::Reset::new(stream_id, RefusedStream);
// Buffer the frame
dst.buffer(frame.into()).ok().expect("invalid RST_STREAM frame");
dst.buffer(frame.into())
.ok()
.expect("invalid RST_STREAM frame");
}
self.refused = None;
@@ -441,11 +453,13 @@ impl<B, P> Recv<B, P>
Ok(Async::Ready(()))
}
pub fn poll_complete<T>(&mut self,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), io::Error>
where T: AsyncWrite,
pub fn poll_complete<T>(
&mut self,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error>
where
T: AsyncWrite,
{
// Send any pending connection level window updates
try_ready!(self.send_connection_window_update(dst));
@@ -457,9 +471,12 @@ impl<B, P> Recv<B, P>
}
/// Send connection level window update
fn send_connection_window_update<T>(&mut self, dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), io::Error>
where T: AsyncWrite,
fn send_connection_window_update<T>(
&mut self,
dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error>
where
T: AsyncWrite,
{
if let Some(incr) = self.flow.unclaimed_capacity() {
let frame = frame::WindowUpdate::new(StreamId::zero(), incr);
@@ -468,10 +485,15 @@ impl<B, P> Recv<B, P>
try_ready!(dst.poll_ready());
// Buffer the WINDOW_UPDATE frame
dst.buffer(frame.into()).ok().expect("invalid WINDOW_UPDATE frame");
dst.buffer(frame.into())
.ok()
.expect("invalid WINDOW_UPDATE frame");
// Update flow control
self.flow.inc_window(incr).ok().expect("unexpected flow control state");
self.flow
.inc_window(incr)
.ok()
.expect("unexpected flow control state");
}
Ok(().into())
@@ -479,11 +501,13 @@ impl<B, P> Recv<B, P>
/// Send stream level window update
pub fn send_stream_window_updates<T>(&mut self,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), io::Error>
where T: AsyncWrite,
pub fn send_stream_window_updates<T>(
&mut self,
store: &mut Store<B, P>,
dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error>
where
T: AsyncWrite,
{
loop {
// Ensure the codec has capacity
@@ -507,27 +531,28 @@ impl<B, P> Recv<B, P>
let frame = frame::WindowUpdate::new(stream.id, incr);
// Buffer it
dst.buffer(frame.into()).ok().expect("invalid WINDOW_UPDATE frame");
dst.buffer(frame.into())
.ok()
.expect("invalid WINDOW_UPDATE frame");
// Update flow control
stream.recv_flow.inc_window(incr).ok().expect("unexpected flow control state");
stream
.recv_flow
.inc_window(incr)
.ok()
.expect("unexpected flow control state");
}
}
}
pub fn next_incoming(&mut self, store: &mut Store<B, P>) -> Option<store::Key> {
self.pending_accept.pop(store)
.map(|ptr| ptr.key())
self.pending_accept.pop(store).map(|ptr| ptr.key())
}
pub fn poll_data(&mut self, stream: &mut Stream<B, P>)
-> Poll<Option<Bytes>, proto::Error>
{
pub fn poll_data(&mut self, stream: &mut Stream<B, P>) -> Poll<Option<Bytes>, proto::Error> {
// TODO: Return error when the stream is reset
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Data(payload)) => {
Ok(Some(payload).into())
}
Some(Event::Data(payload)) => Ok(Some(payload).into()),
Some(event) => {
// Frame is trailer
stream.pending_recv.push_front(&mut self.buffer, event);
@@ -548,13 +573,12 @@ impl<B, P> Recv<B, P>
}
}
pub fn poll_trailers(&mut self, stream: &mut Stream<B, P>)
-> Poll<Option<HeaderMap>, proto::Error>
{
pub fn poll_trailers(
&mut self,
stream: &mut Stream<B, P>,
) -> Poll<Option<HeaderMap>, proto::Error> {
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Trailers(trailers)) => {
Ok(Some(trailers).into())
}
Some(Event::Trailers(trailers)) => Ok(Some(trailers).into()),
Some(_) => {
// TODO: This is a user error. `poll_trailers` was called before
// the entire set of data frames have been consumed. What should
@@ -576,12 +600,11 @@ impl<B, P> Recv<B, P>
}
impl<B> Recv<B, server::Peer>
where B: Buf,
where
B: Buf,
{
/// TODO: Should this fn return `Result`?
pub fn take_request(&mut self, stream: &mut store::Ptr<B, server::Peer>)
-> Request<()>
{
pub fn take_request(&mut self, stream: &mut store::Ptr<B, server::Peer>) -> Request<()> {
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Headers(request)) => request,
_ => panic!(),
@@ -590,16 +613,17 @@ impl<B> Recv<B, server::Peer>
}
impl<B> Recv<B, client::Peer>
where B: Buf,
where
B: Buf,
{
pub fn poll_response(&mut self, stream: &mut store::Ptr<B, client::Peer>)
-> Poll<Response<()>, proto::Error> {
pub fn poll_response(
&mut self,
stream: &mut store::Ptr<B, client::Peer>,
) -> Poll<Response<()>, proto::Error> {
// If the buffer is not empty, then the first frame must be a HEADERS
// frame or the user violated the contract.
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Headers(response)) => {
Ok(response.into())
}
Some(Event::Headers(response)) => Ok(response.into()),
Some(_) => unimplemented!(),
None => {
stream.state.ensure_recv_open()?;

View File

@@ -1,8 +1,8 @@
use frame::{self, Reason};
use super::*;
use codec::{RecvError, UserError};
use codec::UserError::*;
use frame::{self, Reason};
use proto::*;
use super::*;
use bytes::Buf;
@@ -11,7 +11,8 @@ use std::io;
/// Manages state transitions related to outbound frames.
#[derive(Debug)]
pub(super) struct Send<B, P>
where P: Peer,
where
P: Peer,
{
/// Stream identifier to use for next initialized stream.
next_stream_id: StreamId,
@@ -24,8 +25,9 @@ pub(super) struct Send<B, P>
}
impl<B, P> Send<B, P>
where B: Buf,
P: Peer,
where
B: Buf,
P: Peer,
{
/// Create a new `Send`
pub fn new(config: &Config) -> Self {
@@ -46,10 +48,8 @@ where B: Buf,
/// Update state reflecting a new, locally opened stream
///
/// Returns the stream state if successful. `None` if refused
pub fn open(&mut self, counts: &mut Counts<P>)
-> Result<StreamId, UserError>
{
try!(self.ensure_can_open());
pub fn open(&mut self, counts: &mut Counts<P>) -> Result<StreamId, UserError> {
self.ensure_can_open()?;
if !counts.can_inc_num_send_streams() {
return Err(Rejected.into());
@@ -64,13 +64,15 @@ where B: Buf,
Ok(ret)
}
pub fn send_headers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), UserError>
{
trace!("send_headers; frame={:?}; init_window={:?}", frame, self.init_window_sz);
pub fn send_headers(
&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) -> Result<(), UserError> {
trace!("send_headers; frame={:?}; init_window={:?}",
frame,
self.init_window_sz);
let end_stream = frame.is_end_stream();
@@ -83,11 +85,12 @@ where B: Buf,
Ok(())
}
pub fn send_reset(&mut self,
reason: Reason,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
{
pub fn send_reset(
&mut self,
reason: Reason,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) {
if stream.state.is_reset() {
// Don't double reset
return;
@@ -116,24 +119,25 @@ where B: Buf,
self.prioritize.queue_frame(frame.into(), stream, task);
// Re-assign all capacity to the connection
self.prioritize.assign_connection_capacity(available, stream);
self.prioritize
.assign_connection_capacity(available, stream);
}
pub fn send_data(&mut self,
frame: frame::Data<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), UserError>
{
pub fn send_data(
&mut self,
frame: frame::Data<B>,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) -> Result<(), UserError> {
self.prioritize.send_data(frame, stream, task)
}
pub fn send_trailers(&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), UserError>
{
pub fn send_trailers(
&mut self,
frame: frame::Headers,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) -> Result<(), UserError> {
// TODO: Should this logic be moved into state.rs?
if !stream.state.is_send_streaming() {
return Err(UnexpectedFrameType.into());
@@ -150,12 +154,14 @@ where B: Buf,
Ok(())
}
pub fn poll_complete<T>(&mut self,
store: &mut Store<B, P>,
counts: &mut Counts<P>,
dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), io::Error>
where T: AsyncWrite,
pub fn poll_complete<T>(
&mut self,
store: &mut Store<B, P>,
counts: &mut Counts<P>,
dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error>
where
T: AsyncWrite,
{
self.prioritize.poll_complete(store, counts, dst)
}
@@ -165,9 +171,10 @@ where B: Buf,
self.prioritize.reserve_capacity(capacity, stream)
}
pub fn poll_capacity(&mut self, stream: &mut store::Ptr<B, P>)
-> Poll<Option<WindowSize>, UserError>
{
pub fn poll_capacity(
&mut self,
stream: &mut store::Ptr<B, P>,
) -> Poll<Option<WindowSize>, UserError> {
if !stream.state.is_send_streaming() {
return Ok(Async::Ready(None));
}
@@ -193,20 +200,21 @@ where B: Buf,
}
}
pub fn recv_connection_window_update(&mut self,
frame: frame::WindowUpdate,
store: &mut Store<B, P>)
-> Result<(), Reason>
{
self.prioritize.recv_connection_window_update(frame.size_increment(), store)
pub fn recv_connection_window_update(
&mut self,
frame: frame::WindowUpdate,
store: &mut Store<B, P>,
) -> Result<(), Reason> {
self.prioritize
.recv_connection_window_update(frame.size_increment(), store)
}
pub fn recv_stream_window_update(&mut self,
sz: WindowSize,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>)
-> Result<(), Reason>
{
pub fn recv_stream_window_update(
&mut self,
sz: WindowSize,
stream: &mut store::Ptr<B, P>,
task: &mut Option<Task>,
) -> Result<(), Reason> {
if let Err(e) = self.prioritize.recv_stream_window_update(sz, stream) {
debug!("recv_stream_window_update !!; err={:?}", e);
self.send_reset(FlowControlError.into(), stream, task);
@@ -217,12 +225,12 @@ where B: Buf,
Ok(())
}
pub fn apply_remote_settings(&mut self,
settings: &frame::Settings,
store: &mut Store<B, P>,
task: &mut Option<Task>)
-> Result<(), RecvError>
{
pub fn apply_remote_settings(
&mut self,
settings: &frame::Settings,
store: &mut Store<B, P>,
task: &mut Option<Task>,
) -> Result<(), RecvError> {
// Applies an update to the remote endpoint's initial window size.
//
// Per RFC 7540 §6.9.2:
@@ -254,7 +262,9 @@ where B: Buf,
stream.send_flow.dec_window(dec);
trace!("decremented stream window; id={:?}; decr={}; flow={:?}",
stream.id, dec, stream.send_flow);
stream.id,
dec,
stream.send_flow);
// TODO: Probably try to assign capacity?

View File

@@ -1,7 +1,7 @@
use frame::Reason;
use frame::Reason::*;
use codec::{RecvError, UserError};
use codec::UserError::*;
use frame::Reason;
use frame::Reason::*;
use proto;
use self::Inner::*;
@@ -58,10 +58,7 @@ enum Inner {
// TODO: these states shouldn't count against concurrency limits:
//ReservedLocal,
ReservedRemote,
Open {
local: Peer,
remote: Peer,
},
Open { local: Peer, remote: Peer },
HalfClosedLocal(Peer), // TODO: explicitly name this value
HalfClosedRemote(Peer),
// When reset, a reason is provided
@@ -96,7 +93,10 @@ impl State {
}
}
}
Open { local: AwaitingHeaders, remote } => {
Open {
local: AwaitingHeaders,
remote,
} => {
if eos {
HalfClosedLocal(remote)
} else {
@@ -155,7 +155,10 @@ impl State {
}
}
}
Open { local, remote: AwaitingHeaders } => {
Open {
local,
remote: AwaitingHeaders,
} => {
if eos {
HalfClosedRemote(local)
} else {
@@ -195,7 +198,9 @@ impl State {
/// Indicates that the remote side will not send more data to the local.
pub fn recv_close(&mut self) -> Result<(), RecvError> {
match self.inner {
Open { local, .. } => {
Open {
local, ..
} => {
// The remote side will continue to receive data.
trace!("recv_close: Open => HalfClosedRemote({:?})", local);
self.inner = HalfClosedRemote(local);
@@ -218,9 +223,9 @@ impl State {
_ => {
trace!("recv_err; err={:?}", err);
self.inner = Closed(match *err {
Proto(reason) => Some(Cause::Proto(reason)),
Io(..) => Some(Cause::Io),
});
Proto(reason) => Some(Cause::Proto(reason)),
Io(..) => Some(Cause::Io),
});
}
}
}
@@ -228,7 +233,9 @@ impl State {
/// Indicates that the local side will not send more data to the local.
pub fn send_close(&mut self) {
match self.inner {
Open { remote, .. } => {
Open {
remote, ..
} => {
// The remote side will continue to receive data.
trace!("send_close: Open => HalfClosedLocal({:?})", remote);
self.inner = HalfClosedLocal(remote);
@@ -259,7 +266,9 @@ impl State {
/// concurrency limit.
pub fn is_counted(&self) -> bool {
match self.inner {
Open { .. } => true,
Open {
..
} => true,
HalfClosedLocal(..) => true,
HalfClosedRemote(..) => true,
_ => false,
@@ -268,7 +277,10 @@ impl State {
pub fn is_send_streaming(&self) -> bool {
match self.inner {
Open { local: Peer::Streaming, .. } => true,
Open {
local: Peer::Streaming,
..
} => true,
HalfClosedRemote(Peer::Streaming) => true,
_ => false,
}
@@ -278,7 +290,10 @@ impl State {
pub fn is_recv_headers(&self) -> bool {
match self.inner {
Idle => true,
Open { remote: AwaitingHeaders, .. } => true,
Open {
remote: AwaitingHeaders,
..
} => true,
HalfClosedLocal(AwaitingHeaders) => true,
_ => false,
}
@@ -286,7 +301,10 @@ impl State {
pub fn is_recv_streaming(&self) -> bool {
match self.inner {
Open { remote: Peer::Streaming, .. } => true,
Open {
remote: Peer::Streaming,
..
} => true,
HalfClosedLocal(Peer::Streaming) => true,
_ => false,
}
@@ -311,12 +329,8 @@ impl State {
// TODO: Is this correct?
match self.inner {
Closed(Some(Cause::Proto(reason))) => {
Err(proto::Error::Proto(reason))
}
Closed(Some(Cause::Io)) => {
Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into()))
}
Closed(Some(Cause::Proto(reason))) => Err(proto::Error::Proto(reason)),
Closed(Some(Cause::Io)) => Err(proto::Error::Io(io::ErrorKind::BrokenPipe.into())),
_ => Ok(()),
}
}
@@ -324,7 +338,9 @@ impl State {
impl Default for State {
fn default() -> State {
State { inner: Inner::Idle }
State {
inner: Inner::Idle,
}
}
}

View File

@@ -4,13 +4,14 @@ use slab;
use ordermap::{self, OrderMap};
use std::ops;
use std::marker::PhantomData;
use std::ops;
/// Storage for streams
#[derive(Debug)]
pub(super) struct Store<B, P>
where P: Peer,
where
P: Peer,
{
slab: slab::Slab<Stream<B, P>>,
ids: OrderMap<StreamId, usize>,
@@ -18,7 +19,8 @@ pub(super) struct Store<B, P>
/// "Pointer" to an entry in the store
pub(super) struct Ptr<'a, B: 'a, P>
where P: Peer + 'a,
where
P: Peer + 'a,
{
key: Key,
store: &'a mut Store<B, P>,
@@ -30,7 +32,8 @@ pub(super) struct Key(usize);
#[derive(Debug)]
pub(super) struct Queue<B, N, P>
where P: Peer,
where
P: Peer,
{
indices: Option<store::Indices>,
_p: PhantomData<(B, N, P)>,
@@ -65,14 +68,16 @@ pub(super) struct OccupiedEntry<'a> {
}
pub(super) struct VacantEntry<'a, B: 'a, P>
where P: Peer + 'a,
where
P: Peer + 'a,
{
ids: ordermap::VacantEntry<'a, StreamId, usize>,
slab: &'a mut slab::Slab<Stream<B, P>>,
}
pub(super) trait Resolve<B, P>
where P: Peer,
where
P: Peer,
{
fn resolve(&mut self, key: Key) -> Ptr<B, P>;
}
@@ -80,7 +85,8 @@ pub(super) trait Resolve<B, P>
// ===== impl Store =====
impl<B, P> Store<B, P>
where P: Peer,
where
P: Peer,
{
pub fn new() -> Self {
Store {
@@ -100,9 +106,9 @@ impl<B, P> Store<B, P>
};
Some(Ptr {
key: Key(key),
store: self,
})
key: Key(key),
store: self,
})
}
pub fn insert(&mut self, id: StreamId, val: Stream<B, P>) -> Ptr<B, P> {
@@ -121,20 +127,21 @@ impl<B, P> Store<B, P>
match self.ids.entry(id) {
Occupied(e) => {
Entry::Occupied(OccupiedEntry {
ids: e,
})
ids: e,
})
}
Vacant(e) => {
Entry::Vacant(VacantEntry {
ids: e,
slab: &mut self.slab,
})
ids: e,
slab: &mut self.slab,
})
}
}
}
pub fn for_each<F, E>(&mut self, mut f: F) -> Result<(), E>
where F: FnMut(Ptr<B, P>) -> Result<(), E>,
where
F: FnMut(Ptr<B, P>) -> Result<(), E>,
{
let mut len = self.ids.len();
let mut i = 0;
@@ -144,9 +151,9 @@ impl<B, P> Store<B, P>
let key = *self.ids.get_index(i).unwrap().1;
f(Ptr {
key: Key(key),
store: self,
})?;
key: Key(key),
store: self,
})?;
// TODO: This logic probably could be better...
let new_len = self.ids.len();
@@ -164,7 +171,8 @@ impl<B, P> Store<B, P>
}
impl<B, P> Resolve<B, P> for Store<B, P>
where P: Peer,
where
P: Peer,
{
fn resolve(&mut self, key: Key) -> Ptr<B, P> {
Ptr {
@@ -175,7 +183,8 @@ impl<B, P> Resolve<B, P> for Store<B, P>
}
impl<B, P> ops::Index<Key> for Store<B, P>
where P: Peer,
where
P: Peer,
{
type Output = Stream<B, P>;
@@ -185,7 +194,8 @@ impl<B, P> ops::Index<Key> for Store<B, P>
}
impl<B, P> ops::IndexMut<Key> for Store<B, P>
where P: Peer,
where
P: Peer,
{
fn index_mut(&mut self, key: Key) -> &mut Self::Output {
self.slab.index_mut(key.0)
@@ -194,7 +204,8 @@ impl<B, P> ops::IndexMut<Key> for Store<B, P>
#[cfg(feature = "unstable")]
impl<B, P> Store<B, P>
where P: Peer,
where
P: Peer,
{
pub fn num_active_streams(&self) -> usize {
self.ids.len()
@@ -208,8 +219,9 @@ impl<B, P> Store<B, P>
// ===== impl Queue =====
impl<B, N, P> Queue<B, N, P>
where N: Next,
P: Peer,
where
N: Next,
P: Peer,
{
pub fn new() -> Self {
Queue {
@@ -260,9 +272,9 @@ impl<B, N, P> Queue<B, N, P>
None => {
trace!(" -> first entry");
self.indices = Some(store::Indices {
head: stream.key(),
tail: stream.key(),
});
head: stream.key(),
tail: stream.key(),
});
}
}
@@ -270,7 +282,8 @@ impl<B, N, P> Queue<B, N, P>
}
pub fn pop<'a, R>(&mut self, store: &'a mut R) -> Option<store::Ptr<'a, B, P>>
where R: Resolve<B, P>
where
R: Resolve<B, P>,
{
if let Some(mut idxs) = self.indices {
let mut stream = store.resolve(idxs.head);
@@ -296,7 +309,8 @@ impl<B, N, P> Queue<B, N, P>
// ===== impl Ptr =====
impl<'a, B: 'a, P> Ptr<'a, B, P>
where P: Peer,
where
P: Peer,
{
/// Returns the Key associated with the stream
pub fn key(&self) -> Key {
@@ -323,7 +337,8 @@ impl<'a, B: 'a, P> Ptr<'a, B, P>
}
impl<'a, B: 'a, P> Resolve<B, P> for Ptr<'a, B, P>
where P: Peer,
where
P: Peer,
{
fn resolve(&mut self, key: Key) -> Ptr<B, P> {
Ptr {
@@ -334,7 +349,8 @@ impl<'a, B: 'a, P> Resolve<B, P> for Ptr<'a, B, P>
}
impl<'a, B: 'a, P> ops::Deref for Ptr<'a, B, P>
where P: Peer,
where
P: Peer,
{
type Target = Stream<B, P>;
@@ -344,7 +360,8 @@ impl<'a, B: 'a, P> ops::Deref for Ptr<'a, B, P>
}
impl<'a, B: 'a, P> ops::DerefMut for Ptr<'a, B, P>
where P: Peer,
where
P: Peer,
{
fn deref_mut(&mut self) -> &mut Stream<B, P> {
&mut self.store.slab[self.key.0]
@@ -362,7 +379,8 @@ impl<'a> OccupiedEntry<'a> {
// ===== impl VacantEntry =====
impl<'a, B, P> VacantEntry<'a, B, P>
where P: Peer,
where
P: Peer,
{
pub fn insert(self, value: Stream<B, P>) -> Key {
// Insert the value in the slab

View File

@@ -15,7 +15,8 @@ use std::usize;
/// Thus, `ref_count` can be zero and the stream still has to be kept around.
#[derive(Debug)]
pub(super) struct Stream<B, P>
where P: Peer,
where
P: Peer,
{
/// The h2 stream identifier
pub id: StreamId,
@@ -27,7 +28,6 @@ pub(super) struct Stream<B, P>
pub ref_count: usize,
// ===== Fields related to sending =====
/// Next node in the accept linked list
pub next_pending_send: Option<store::Key>,
@@ -61,7 +61,6 @@ pub(super) struct Stream<B, P>
pub send_capacity_inc: bool,
// ===== Fields related to receiving =====
/// Next node in the accept linked list
pub next_pending_accept: Option<store::Key>,
@@ -90,7 +89,6 @@ pub(super) struct Stream<B, P>
/// Validate content-length headers
pub content_length: ContentLength,
}
/// State related to validating a stream's content-length
@@ -114,21 +112,27 @@ pub(super) struct NextSendCapacity;
pub(super) struct NextWindowUpdate;
impl<B, P> Stream<B, P>
where P: Peer,
where
P: Peer,
{
pub fn new(id: StreamId,
init_send_window: WindowSize,
init_recv_window: WindowSize) -> Stream<B, P>
{
pub fn new(
id: StreamId,
init_send_window: WindowSize,
init_recv_window: WindowSize,
) -> Stream<B, P> {
let mut send_flow = FlowControl::new();
let mut recv_flow = FlowControl::new();
recv_flow.inc_window(init_recv_window)
.ok().expect("invalid initial receive window");
recv_flow
.inc_window(init_recv_window)
.ok()
.expect("invalid initial receive window");
recv_flow.assign_capacity(init_recv_window);
send_flow.inc_window(init_send_window)
.ok().expect("invalid initial send window size");
send_flow
.inc_window(init_send_window)
.ok()
.expect("invalid initial send window size");
Stream {
id,
@@ -136,7 +140,6 @@ impl<B, P> Stream<B, P>
ref_count: 0,
// ===== Fields related to sending =====
next_pending_send: None,
is_pending_send: false,
send_flow: send_flow,
@@ -149,7 +152,6 @@ impl<B, P> Stream<B, P>
send_capacity_inc: false,
// ===== Fields related to receiving =====
next_pending_accept: None,
is_pending_accept: false,
recv_flow: recv_flow,
@@ -197,10 +199,8 @@ impl<B, P> Stream<B, P>
// There are no more outstanding references to the stream
self.ref_count == 0 &&
// The stream is not in any queue
!self.is_pending_send &&
!self.is_pending_send_capacity &&
!self.is_pending_accept &&
!self.is_pending_window_update
!self.is_pending_send && !self.is_pending_send_capacity &&
!self.is_pending_accept && !self.is_pending_window_update
}
pub fn assign_capacity(&mut self, capacity: WindowSize) {

View File

@@ -1,9 +1,9 @@
use {client, server, proto};
use frame::Reason;
use codec::{SendError, RecvError, UserError};
use proto::*;
use super::*;
use super::store::Resolve;
use {client, proto, server};
use codec::{RecvError, SendError, UserError};
use frame::Reason;
use proto::*;
use http::HeaderMap;
@@ -12,7 +12,8 @@ use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub(crate) struct Streams<B, P>
where P: Peer,
where
P: Peer,
{
inner: Arc<Mutex<Inner<B, P>>>,
}
@@ -20,7 +21,8 @@ pub(crate) struct Streams<B, P>
/// Reference to the stream state
#[derive(Debug)]
pub(crate) struct StreamRef<B, P>
where P: Peer,
where
P: Peer,
{
inner: Arc<Mutex<Inner<B, P>>>,
key: store::Key,
@@ -32,7 +34,8 @@ pub(crate) struct StreamRef<B, P>
/// TODO: better name
#[derive(Debug)]
struct Inner<B, P>
where P: Peer,
where
P: Peer,
{
/// Tracks send & recv stream concurrency.
counts: Counts<P>,
@@ -42,7 +45,8 @@ struct Inner<B, P>
#[derive(Debug)]
struct Actions<B, P>
where P: Peer,
where
P: Peer,
{
/// Manages state transitions initiated by receiving frames
recv: Recv<B, P>,
@@ -55,27 +59,26 @@ struct Actions<B, P>
}
impl<B, P> Streams<B, P>
where B: Buf,
P: Peer,
where
B: Buf,
P: Peer,
{
pub fn new(config: Config) -> Self {
Streams {
inner: Arc::new(Mutex::new(Inner {
counts: Counts::new(&config),
actions: Actions {
recv: Recv::new(&config),
send: Send::new(&config),
task: None,
},
store: Store::new(),
})),
counts: Counts::new(&config),
actions: Actions {
recv: Recv::new(&config),
send: Send::new(&config),
task: None,
},
store: Store::new(),
})),
}
}
/// Process inbound headers
pub fn recv_headers(&mut self, frame: frame::Headers)
-> Result<(), RecvError>
{
pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), RecvError> {
let id = frame.stream_id();
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -83,12 +86,11 @@ impl<B, P> Streams<B, P>
let key = match me.store.find_entry(id) {
Entry::Occupied(e) => e.key(),
Entry::Vacant(e) => {
match try!(me.actions.recv.open(id, &mut me.counts)) {
match me.actions.recv.open(id, &mut me.counts)? {
Some(stream_id) => {
let stream = Stream::new(
stream_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz());
let stream = Stream::new(stream_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz());
e.insert(stream)
}
@@ -101,7 +103,9 @@ impl<B, P> Streams<B, P>
let actions = &mut me.actions;
me.counts.transition(stream, |counts, stream| {
trace!("recv_headers; stream={:?}; state={:?}", stream.id, stream.state);
trace!("recv_headers; stream={:?}; state={:?}",
stream.id,
stream.state);
let res = if stream.state.is_recv_headers() {
actions.recv.recv_headers(frame, stream, counts)
@@ -118,9 +122,7 @@ impl<B, P> Streams<B, P>
})
}
pub fn recv_data(&mut self, frame: frame::Data)
-> Result<(), RecvError>
{
pub fn recv_data(&mut self, frame: frame::Data) -> Result<(), RecvError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -139,9 +141,7 @@ impl<B, P> Streams<B, P>
})
}
pub fn recv_reset(&mut self, frame: frame::Reset)
-> Result<(), RecvError>
{
pub fn recv_reset(&mut self, frame: frame::Reset) -> Result<(), RecvError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -155,7 +155,8 @@ impl<B, P> Streams<B, P>
Some(stream) => stream,
None => {
// TODO: Are there other error cases?
me.actions.ensure_not_idle(id)
me.actions
.ensure_not_idle(id)
.map_err(RecvError::Connection)?;
return Ok(());
@@ -181,26 +182,27 @@ impl<B, P> Streams<B, P>
let last_processed_id = actions.recv.last_processed_id();
me.store.for_each(|stream| {
counts.transition(stream, |_, stream| {
actions.recv.recv_err(err, &mut *stream);
Ok::<_, ()>(())
me.store
.for_each(|stream| {
counts.transition(stream, |_, stream| {
actions.recv.recv_err(err, &mut *stream);
Ok::<_, ()>(())
})
})
}).unwrap();
.unwrap();
last_processed_id
}
pub fn recv_window_update(&mut self, frame: frame::WindowUpdate)
-> Result<(), RecvError>
{
pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) -> Result<(), RecvError> {
let id = frame.stream_id();
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
if id.is_zero() {
me.actions.send.recv_connection_window_update(
frame, &mut me.store)
me.actions
.send
.recv_connection_window_update(frame, &mut me.store)
.map_err(RecvError::Connection)?;
} else {
// The remote may send window updates for streams that the local now
@@ -209,12 +211,15 @@ impl<B, P> Streams<B, P>
// This result is ignored as there is nothing to do when there
// is an error. The stream is reset by the function on error and
// the error is informational.
let _ = me.actions.send.recv_stream_window_update(
frame.size_increment(),
&mut stream,
&mut me.actions.task);
let _ = me.actions
.send
.recv_stream_window_update(frame.size_increment(),
&mut stream,
&mut me.actions.task);
} else {
me.actions.recv.ensure_not_idle(id)
me.actions
.recv
.ensure_not_idle(id)
.map_err(RecvError::Connection)?;
}
}
@@ -222,9 +227,7 @@ impl<B, P> Streams<B, P>
Ok(())
}
pub fn recv_push_promise(&mut self, frame: frame::PushPromise)
-> Result<(), RecvError>
{
pub fn recv_push_promise(&mut self, frame: frame::PushPromise) -> Result<(), RecvError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -235,8 +238,9 @@ impl<B, P> Streams<B, P>
None => return Err(RecvError::Connection(ProtocolError)),
};
me.actions.recv.recv_push_promise(
frame, &me.actions.send, stream, &mut me.store)
me.actions
.recv
.recv_push_promise(frame, &me.actions.send, stream, &mut me.store)
}
pub fn next_incoming(&mut self) -> Option<StreamRef<B, P>> {
@@ -264,18 +268,21 @@ impl<B, P> Streams<B, P>
})
}
pub fn send_pending_refusal<T>(&mut self, dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), io::Error>
where T: AsyncWrite,
pub fn send_pending_refusal<T>(
&mut self,
dst: &mut Codec<T, Prioritized<B>>,
) -> Poll<(), io::Error>
where
T: AsyncWrite,
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
me.actions.recv.send_pending_refusal(dst)
}
pub fn poll_complete<T>(&mut self, dst: &mut Codec<T, Prioritized<B>>)
-> Poll<(), io::Error>
where T: AsyncWrite,
pub fn poll_complete<T>(&mut self, dst: &mut Codec<T, Prioritized<B>>) -> Poll<(), io::Error>
where
T: AsyncWrite,
{
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -287,7 +294,9 @@ impl<B, P> Streams<B, P>
try_ready!(me.actions.recv.poll_complete(&mut me.store, dst));
// Send any other pending frames
try_ready!(me.actions.send.poll_complete(&mut me.store, &mut me.counts, dst));
try_ready!(me.actions
.send
.poll_complete(&mut me.store, &mut me.counts, dst));
// Nothing else to do, track the task
me.actions.task = Some(task::current());
@@ -295,21 +304,22 @@ impl<B, P> Streams<B, P>
Ok(().into())
}
pub fn apply_remote_settings(&mut self, frame: &frame::Settings)
-> Result<(), RecvError>
{
pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), RecvError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
me.counts.apply_remote_settings(frame);
me.actions.send.apply_remote_settings(
frame, &mut me.store, &mut me.actions.task)
me.actions
.send
.apply_remote_settings(frame, &mut me.store, &mut me.actions.task)
}
pub fn send_request(&mut self, request: Request<()>, end_of_stream: bool)
-> Result<StreamRef<B, P>, SendError>
{
pub fn send_request(
&mut self,
request: Request<()>,
end_of_stream: bool,
) -> Result<StreamRef<B, P>, SendError> {
use http::Method;
use super::stream::ContentLength;
@@ -325,23 +335,22 @@ impl<B, P> Streams<B, P>
// Initialize a new stream. This fails if the connection is at capacity.
let stream_id = me.actions.send.open(&mut me.counts)?;
let mut stream = Stream::new(
stream_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz());
let mut stream = Stream::new(stream_id,
me.actions.send.init_window_sz(),
me.actions.recv.init_window_sz());
if *request.method() == Method::HEAD {
stream.content_length = ContentLength::Head;
}
// Convert the message
let headers = client::Peer::convert_send_message(
stream_id, request, end_of_stream);
let headers = client::Peer::convert_send_message(stream_id, request, end_of_stream);
let mut stream = me.store.insert(stream.id, stream);
me.actions.send.send_headers(
headers, &mut stream, &mut me.actions.task)?;
me.actions
.send
.send_headers(headers, &mut stream, &mut me.actions.task)?;
// Given that the stream has been initialized, it should not be in the
// closed state.
@@ -354,9 +363,9 @@ impl<B, P> Streams<B, P>
};
Ok(StreamRef {
inner: self.inner.clone(),
key: key,
})
inner: self.inner.clone(),
key: key,
})
}
pub fn send_reset(&mut self, id: StreamId, reason: Reason) {
@@ -368,8 +377,7 @@ impl<B, P> Streams<B, P>
Entry::Vacant(e) => {
match me.actions.recv.open(id, &mut me.counts) {
Ok(Some(stream_id)) => {
let stream = Stream::new(
stream_id, 0, 0);
let stream = Stream::new(stream_id, 0, 0);
e.insert(stream)
}
@@ -388,7 +396,8 @@ impl<B, P> Streams<B, P>
}
impl<B> Streams<B, client::Peer>
where B: Buf,
where
B: Buf,
{
pub fn poll_send_request_ready(&mut self) -> Async<()> {
let mut me = self.inner.lock().unwrap();
@@ -400,8 +409,9 @@ impl<B> Streams<B, client::Peer>
#[cfg(feature = "unstable")]
impl<B, P> Streams<B, P>
where B: Buf,
P: Peer,
where
B: Buf,
P: Peer,
{
pub fn num_active_streams(&self) -> usize {
let me = self.inner.lock().unwrap();
@@ -417,12 +427,11 @@ impl<B, P> Streams<B, P>
// ===== impl StreamRef =====
impl<B, P> StreamRef<B, P>
where B: Buf,
P: Peer,
where
B: Buf,
P: Peer,
{
pub fn send_data(&mut self, data: B, end_stream: bool)
-> Result<(), UserError>
{
pub fn send_data(&mut self, data: B, end_stream: bool) -> Result<(), UserError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -439,9 +448,7 @@ impl<B, P> StreamRef<B, P>
})
}
pub fn send_trailers(&mut self, trailers: HeaderMap)
-> Result<(), UserError>
{
pub fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), UserError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -469,9 +476,11 @@ impl<B, P> StreamRef<B, P>
})
}
pub fn send_response(&mut self, response: Response<()>, end_of_stream: bool)
-> Result<(), UserError>
{
pub fn send_response(
&mut self,
response: Response<()>,
end_of_stream: bool,
) -> Result<(), UserError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
@@ -479,8 +488,7 @@ impl<B, P> StreamRef<B, P>
let actions = &mut me.actions;
me.counts.transition(stream, |_, stream| {
let frame = server::Peer::convert_send_message(
stream.id, response, end_of_stream);
let frame = server::Peer::convert_send_message(stream.id, response, end_of_stream);
actions.send.send_headers(frame, stream, &mut actions.task)
})
@@ -515,16 +523,15 @@ impl<B, P> StreamRef<B, P>
/// Releases recv capacity back to the peer. This may result in sending
/// WINDOW_UPDATE frames on both the stream and connection.
pub fn release_capacity(&mut self, capacity: WindowSize)
-> Result<(), UserError>
{
pub fn release_capacity(&mut self, capacity: WindowSize) -> Result<(), UserError> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;
let mut stream = me.store.resolve(self.key);
me.actions.recv.release_capacity(
capacity, &mut stream, &mut me.actions.task)
me.actions
.recv
.release_capacity(capacity, &mut stream, &mut me.actions.task)
}
/// Request capacity to send data
@@ -559,7 +566,8 @@ impl<B, P> StreamRef<B, P>
}
impl<B> StreamRef<B, server::Peer>
where B: Buf,
where
B: Buf,
{
/// Called by the server after the stream is accepted. Given that clients
/// initialize streams by sending HEADERS, the request will always be
@@ -578,7 +586,8 @@ impl<B> StreamRef<B, server::Peer>
}
impl<B> StreamRef<B, client::Peer>
where B: Buf,
where
B: Buf,
{
pub fn poll_response(&mut self) -> Poll<Response<()>, proto::Error> {
let mut me = self.inner.lock().unwrap();
@@ -591,13 +600,12 @@ impl<B> StreamRef<B, client::Peer>
}
impl<B, P> Clone for StreamRef<B, P>
where P: Peer,
where
P: Peer,
{
fn clone(&self) -> Self {
// Increment the ref count
self.inner.lock().unwrap()
.store.resolve(self.key)
.ref_inc();
self.inner.lock().unwrap().store.resolve(self.key).ref_inc();
StreamRef {
inner: self.inner.clone(),
@@ -607,7 +615,8 @@ impl<B, P> Clone for StreamRef<B, P>
}
impl<B, P> Drop for StreamRef<B, P>
where P: Peer,
where
P: Peer,
{
fn drop(&mut self) {
let mut me = self.inner.lock().unwrap();
@@ -632,16 +641,19 @@ impl<B, P> Drop for StreamRef<B, P>
// ===== impl Actions =====
impl<B, P> Actions<B, P>
where B: Buf,
P: Peer,
where
B: Buf,
P: Peer,
{
fn reset_on_recv_stream_err(&mut self,
stream: &mut store::Ptr<B, P>,
res: Result<(), RecvError>)
-> Result<(), RecvError>
{
if let Err(RecvError::Stream { reason, .. }) = res {
fn reset_on_recv_stream_err(
&mut self,
stream: &mut store::Ptr<B, P>,
res: Result<(), RecvError>,
) -> Result<(), RecvError> {
if let Err(RecvError::Stream {
reason, ..
}) = res
{
// Reset the stream.
self.send.send_reset(reason, stream, &mut self.task);
Ok(())
@@ -650,9 +662,7 @@ impl<B, P> Actions<B, P>
}
}
fn ensure_not_idle(&mut self, id: StreamId)
-> Result<(), Reason>
{
fn ensure_not_idle(&mut self, id: StreamId) -> Result<(), Reason> {
if P::is_local_init(id) {
self.send.ensure_not_idle(id)
} else {