ok, starting to look good
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
use Frame;
|
||||
use {Frame, FrameSize};
|
||||
use client::Client;
|
||||
use error::{self, ConnectionError};
|
||||
use frame::{self, StreamId};
|
||||
use proto::{self, Peer, ReadySink, State, FlowController};
|
||||
use proto::{self, Peer, ReadySink, State, FlowController, WindowSize};
|
||||
use server::Server;
|
||||
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
@@ -26,12 +26,12 @@ pub struct Connection<T, P, B: IntoBuf = Bytes> {
|
||||
peer: PhantomData<P>,
|
||||
|
||||
/// Tracks connection-level flow control.
|
||||
local_flow_controller: FlowController,
|
||||
remote_flow_controller: FlowController,
|
||||
recv_flow_controller: FlowController,
|
||||
send_flow_controller: FlowController,
|
||||
|
||||
|
||||
pending_local_window_update: Option<frame::WindowUpdate>,
|
||||
blocked_remote_window_update: Option<task::Task>,
|
||||
pending_send_window_update: Option<frame::WindowUpdate>,
|
||||
blocked_recv_window_update: Option<task::Task>,
|
||||
}
|
||||
|
||||
type StreamMap<T> = OrderMap<StreamId, T, BuildHasherDefault<FnvHasher>>;
|
||||
@@ -42,37 +42,47 @@ pub fn new<T, P, B>(transport: proto::Inner<T, B::Buf>)
|
||||
P: Peer,
|
||||
B: IntoBuf,
|
||||
{
|
||||
let local_window_size = transport.local_settings().initial_window_size();
|
||||
let remote_window_size = transport.remote_settings().initial_window_size();
|
||||
let recv_window_size = transport.local_settings().initial_window_size();
|
||||
let send_window_size = transport.remote_settings().initial_window_size();
|
||||
Connection {
|
||||
inner: transport,
|
||||
streams: StreamMap::default(),
|
||||
peer: PhantomData,
|
||||
|
||||
local_flow_controller: FlowController::new(local_window_size),
|
||||
remote_flow_controller: FlowController::new(remote_window_size),
|
||||
recv_flow_controller: FlowController::new(recv_window_size),
|
||||
send_flow_controller: FlowController::new(send_window_size),
|
||||
|
||||
pending_local_window_update: None,
|
||||
blocked_remote_window_update: None,
|
||||
pending_send_window_update: None,
|
||||
blocked_recv_window_update: None,
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, P, B: IntoBuf> Connection<T, P, B> {
|
||||
pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<u32, ConnectionError> {
|
||||
if id.is_zero() {
|
||||
return match self.local_flow_controller.take_window_update() {
|
||||
Some(incr) => Ok(Async::Ready(incr)),
|
||||
None => {
|
||||
self.blocked_remote_window_update = Some(task::current());
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
};
|
||||
}
|
||||
#[inline]
|
||||
fn claim_connection_recv_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> {
|
||||
self.recv_flow_controller.claim_window(len)
|
||||
.map_err(|_| error::Reason::FlowControlError.into())
|
||||
}
|
||||
|
||||
match self.streams.get_mut(&id).and_then(|mut s| s.take_remote_window_update()) {
|
||||
#[inline]
|
||||
fn claim_connection_send_window(&mut self, len: WindowSize) -> Result<(), ConnectionError> {
|
||||
self.send_flow_controller.claim_window(len)
|
||||
.map_err(|_| error::Reason::FlowControlError.into())
|
||||
}
|
||||
|
||||
// TODO check max frame size
|
||||
|
||||
pub fn poll_remote_window_update(&mut self, id: StreamId) -> Poll<WindowSize, ConnectionError> {
|
||||
let added = if id.is_zero() {
|
||||
self.send_flow_controller.take_window_update()
|
||||
} else {
|
||||
self.streams.get_mut(&id).and_then(|mut s| s.take_recv_window_update())
|
||||
};
|
||||
|
||||
match added {
|
||||
Some(incr) => Ok(Async::Ready(incr)),
|
||||
None => {
|
||||
self.blocked_remote_window_update = Some(task::current());
|
||||
self.blocked_recv_window_update = Some(task::current());
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
@@ -83,18 +93,18 @@ impl<T, P, B: IntoBuf> Connection<T, P, B> {
|
||||
///
|
||||
/// Connection window updates (StreamId=0) and stream window must be published
|
||||
/// distinctly.
|
||||
pub fn init_send_window_update(&mut self, id: StreamId, incr: u32) {
|
||||
assert!(self.pending_local_window_update.is_none());
|
||||
pub fn init_send_window_update(&mut self, id: StreamId, incr: WindowSize) {
|
||||
assert!(self.pending_send_window_update.is_none());
|
||||
|
||||
let added = if id.is_zero() {
|
||||
self.remote_flow_controller.add_to_window(incr);
|
||||
self.remote_flow_controller.take_window_update()
|
||||
self.send_flow_controller.add_to_window(incr);
|
||||
self.send_flow_controller.take_window_update()
|
||||
} else {
|
||||
self.streams.get_mut(&id).and_then(|mut s| s.send_window_update(incr))
|
||||
};
|
||||
|
||||
if let Some(added) = added {
|
||||
self.pending_local_window_update = Some(frame::WindowUpdate::new(id, added));
|
||||
self.pending_send_window_update = Some(frame::WindowUpdate::new(id, added));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -102,9 +112,9 @@ impl<T, P, B: IntoBuf> Connection<T, P, B> {
|
||||
///
|
||||
/// Connection window updates (id=0) and stream window updates are advertised
|
||||
/// distinctly.
|
||||
fn recv_window_update(&mut self, id: StreamId, incr: u32) {
|
||||
fn recv_window_update(&mut self, id: StreamId, incr: WindowSize) {
|
||||
if id.is_zero() {
|
||||
return self.remote_flow_controller.add_to_window(incr);
|
||||
return self.recv_flow_controller.add_to_window(incr);
|
||||
}
|
||||
|
||||
if let Some(mut s) = self.streams.get_mut(&id) {
|
||||
@@ -116,28 +126,34 @@ impl<T, P, B: IntoBuf> Connection<T, P, B> {
|
||||
impl<T, P, B> Connection<T, P, B>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
P: Peer,
|
||||
B: IntoBuf,
|
||||
B: IntoBuf
|
||||
{
|
||||
/// Attempts to send a window update to the remote.
|
||||
fn poll_send_window_update(&mut self) -> Poll<(), ConnectionError> {
|
||||
if let Some(f) = self.pending_local_window_update.take() {
|
||||
if let Some(f) = self.pending_send_window_update.take() {
|
||||
if self.inner.start_send(f.into())?.is_not_ready() {
|
||||
self.pending_local_window_update = Some(f);
|
||||
self.pending_send_window_update = Some(f);
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
}
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
}
|
||||
|
||||
// Note: this is bytes-specific for now so that we can know the payload's length.
|
||||
impl<T, P> Connection<T, P, Bytes>
|
||||
where T: AsyncRead + AsyncWrite,
|
||||
P: Peer,
|
||||
{
|
||||
pub fn send_data(self,
|
||||
id: StreamId,
|
||||
data: B,
|
||||
data_len: usize,
|
||||
data: Bytes,
|
||||
end_of_stream: bool)
|
||||
-> sink::Send<Self>
|
||||
{
|
||||
self.send(Frame::Data {
|
||||
id,
|
||||
data_len,
|
||||
data_len: data.len() as FrameSize,
|
||||
data,
|
||||
end_of_stream,
|
||||
})
|
||||
@@ -196,10 +212,15 @@ impl<T, P, B> Stream for Connection<T, P, B>
|
||||
let frame = match try!(self.inner.poll()) {
|
||||
Async::Ready(f) => f,
|
||||
Async::NotReady => {
|
||||
// Because receiving new frames may depend on ensuring that the
|
||||
// write buffer is clear, `poll_complete` is called here.
|
||||
let _ = try!(self.poll_complete());
|
||||
return Ok(Async::NotReady);
|
||||
// Receiving new frames may depend on ensuring that the write buffer
|
||||
// is clear (e.g. if window updates need to be sent), so `poll_ready`
|
||||
// is called here.
|
||||
try_ready!(self.poll_ready());
|
||||
|
||||
// If the snder sink is ready, we attempt to poll the underlying
|
||||
// stream once more because it, may have been made ready by flushing
|
||||
// the sink.
|
||||
try_ready!(self.inner.poll())
|
||||
}
|
||||
};
|
||||
|
||||
@@ -234,22 +255,30 @@ impl<T, P, B> Stream for Connection<T, P, B>
|
||||
}
|
||||
|
||||
Some(Data(v)) => {
|
||||
let stream_id = v.stream_id();
|
||||
let id = v.stream_id();
|
||||
let end_of_stream = v.is_end_stream();
|
||||
match self.streams.get_mut(&stream_id) {
|
||||
|
||||
self.claim_connection_recv_window(v.len())?;
|
||||
match self.streams.get_mut(&id) {
|
||||
None => return Err(error::Reason::ProtocolError.into()),
|
||||
Some(state) => try!(state.recv_data(end_of_stream, v.len())),
|
||||
Some(state) => state.recv_data(end_of_stream, v.len())?,
|
||||
}
|
||||
|
||||
Frame::Data {
|
||||
id: stream_id,
|
||||
id,
|
||||
end_of_stream,
|
||||
data_len: v.len(),
|
||||
data: v.into_payload(),
|
||||
end_of_stream,
|
||||
}
|
||||
}
|
||||
|
||||
Some(WindowUpdate(v)) => {
|
||||
// When a window update is read from the remote, apply that update to
|
||||
// the proper stream.
|
||||
self.recv_window_update(v.stream_id(), v.size_increment());
|
||||
|
||||
// There's nothing to return yet, so continue attempting to read
|
||||
// additional frames.
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -278,11 +307,11 @@ impl<T, P, B> Sink for Connection<T, P, B>
|
||||
|
||||
// First ensure that the upstream can process a new item. This ensures, for
|
||||
// instance, that any pending local window updates have been sent to the remote
|
||||
// before sending any other frames.
|
||||
// before sending any other (i.e. DATA) frames.
|
||||
if try!(self.poll_ready()).is_not_ready() {
|
||||
return Ok(AsyncSink::NotReady(item));
|
||||
}
|
||||
assert!(self.pending_local_window_update.is_none());
|
||||
assert!(self.pending_send_window_update.is_none());
|
||||
|
||||
match item {
|
||||
Frame::Headers { id, headers, end_of_stream } => {
|
||||
@@ -324,6 +353,8 @@ impl<T, P, B> Sink for Connection<T, P, B>
|
||||
}
|
||||
|
||||
Frame::Data { id, data, data_len, end_of_stream } => {
|
||||
self.claim_connection_send_window(data_len)?;
|
||||
|
||||
// The stream must be initialized at this point
|
||||
match self.streams.get_mut(&id) {
|
||||
None => return Err(error::User::InactiveStreamId.into()),
|
||||
@@ -331,14 +362,15 @@ impl<T, P, B> Sink for Connection<T, P, B>
|
||||
}
|
||||
|
||||
let mut frame = frame::Data::from_buf(id, data.into_buf());
|
||||
|
||||
if end_of_stream {
|
||||
frame.set_end_stream();
|
||||
}
|
||||
|
||||
let res = try!(self.inner.start_send(frame.into()));
|
||||
|
||||
// poll_ready has already been called.
|
||||
assert!(res.is_ready());
|
||||
|
||||
Ok(AsyncSink::Ready)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user