hooray! we don't have to track data_len on its own

This commit is contained in:
Oliver Gould
2017-07-17 04:27:44 +00:00
parent c1f9ff8fd8
commit 990e5c5aae
5 changed files with 22 additions and 24 deletions

View File

@@ -1,4 +1,3 @@
use FrameSize;
use frame::{util, Frame, Head, Error, StreamId, Kind}; use frame::{util, Frame, Head, Error, StreamId, Kind};
use bytes::{BufMut, Bytes, Buf}; use bytes::{BufMut, Bytes, Buf};
@@ -55,16 +54,16 @@ impl<T> Data<T> {
Head::new(Kind::Data, self.flags.into(), self.stream_id) Head::new(Kind::Data, self.flags.into(), self.stream_id)
} }
pub fn payload(&self) -> &T {
&self.data
}
pub fn into_payload(self) -> T { pub fn into_payload(self) -> T {
self.data self.data
} }
} }
impl<T: Buf> Data<T> { impl<T: Buf> Data<T> {
pub fn len(&self) -> usize {
self.data.remaining()
}
pub fn from_buf(stream_id: StreamId, data: T, eos: bool) -> Self { pub fn from_buf(stream_id: StreamId, data: T, eos: bool) -> Self {
let mut flags = DataFlag::default(); let mut flags = DataFlag::default();
if eos { if eos {
@@ -80,7 +79,7 @@ impl<T: Buf> Data<T> {
} }
pub fn encode_chunk<U: BufMut>(&mut self, dst: &mut U) { pub fn encode_chunk<U: BufMut>(&mut self, dst: &mut U) {
let len = self.len() as usize; let len = self.data.remaining() as usize;
if len > dst.remaining_mut() { if len > dst.remaining_mut() {
unimplemented!(); unimplemented!();
} }

View File

@@ -53,8 +53,6 @@ pub enum Frame<T, B = Bytes> {
Data { Data {
id: StreamId, id: StreamId,
data: B, data: B,
/// TODO figure out how to make this a requirement on `B`
//data_len: FrameSize,
end_of_stream: bool, end_of_stream: bool,
}, },
Trailers { Trailers {

View File

@@ -1,4 +1,4 @@
use {ConnectionError, Frame, FrameSize}; use {ConnectionError, Frame};
use client::Client; use client::Client;
use error; use error;
use frame::{self, SettingSet, StreamId}; use frame::{self, SettingSet, StreamId};
@@ -91,7 +91,6 @@ impl<T, P> Connection<T, P, Bytes>
{ {
self.send(Frame::Data { self.send(Frame::Data {
id, id,
data_len: data.len() as FrameSize,
data, data,
end_of_stream, end_of_stream,
}) })
@@ -176,7 +175,6 @@ impl<T, P, B> Stream for Connection<T, P, B>
Some(Data(v)) => Frame::Data { Some(Data(v)) => Frame::Data {
id: v.stream_id(), id: v.stream_id(),
end_of_stream: v.is_end_stream(), end_of_stream: v.is_end_stream(),
//data_len: v.len(),
data: v.into_payload(), data: v.into_payload(),
}, },

View File

@@ -1,5 +1,4 @@
use ConnectionError; use {error, ConnectionError, FrameSize};
use error;
use frame::{self, Frame}; use frame::{self, Frame};
use proto::*; use proto::*;
@@ -8,8 +7,8 @@ use std::collections::VecDeque;
#[derive(Debug)] #[derive(Debug)]
pub struct FlowControl<T> { pub struct FlowControl<T> {
inner: T, inner: T,
initial_local_window_size: u32, initial_local_window_size: WindowSize,
initial_remote_window_size: u32, initial_remote_window_size: WindowSize,
/// Tracks the connection-level flow control window for receiving data from the /// Tracks the connection-level flow control window for receiving data from the
/// remote. /// remote.
@@ -37,8 +36,8 @@ impl<T, U> FlowControl<T>
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ControlStreams T: ControlStreams
{ {
pub fn new(initial_local_window_size: u32, pub fn new(initial_local_window_size: WindowSize,
initial_remote_window_size: u32, initial_remote_window_size: WindowSize,
inner: T) inner: T)
-> FlowControl<T> -> FlowControl<T>
{ {
@@ -250,11 +249,12 @@ impl<T> Stream for FlowControl<T>
} }
Some(Data(v)) => { Some(Data(v)) => {
if self.connection_local_flow_controller.claim_window(v.len()).is_err() { let sz = v.payload().len() as FrameSize;
if self.connection_local_flow_controller.claim_window(sz).is_err() {
return Err(error::Reason::FlowControlError.into()) return Err(error::Reason::FlowControlError.into())
} }
if let Some(fc) = self.local_flow_controller(v.stream_id()) { if let Some(fc) = self.local_flow_controller(v.stream_id()) {
if fc.claim_window(v.len()).is_err() { if fc.claim_window(sz).is_err() {
return Err(error::Reason::FlowControlError.into()) return Err(error::Reason::FlowControlError.into())
} }
} }
@@ -271,6 +271,7 @@ impl<T, U> Sink for FlowControl<T>
where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, where T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink, T: ReadySink,
T: ControlStreams, T: ControlStreams,
U: Buf,
{ {
type SinkItem = T::SinkItem; type SinkItem = T::SinkItem;
type SinkError = T::SinkError; type SinkError = T::SinkError;
@@ -289,11 +290,12 @@ impl<T, U> Sink for FlowControl<T>
return Ok(AsyncSink::NotReady(Data(v))); return Ok(AsyncSink::NotReady(Data(v)));
} }
if self.connection_remote_flow_controller.claim_window(v.len()).is_err() { let sz = v.payload().remaining() as FrameSize;
if self.connection_remote_flow_controller.claim_window(sz).is_err() {
return Err(error::User::FlowControlViolation.into()); return Err(error::User::FlowControlViolation.into());
} }
if let Some(fc) = self.remote_flow_controller(v.stream_id()) { if let Some(fc) = self.remote_flow_controller(v.stream_id()) {
if fc.claim_window(v.len()).is_err() { if fc.claim_window(sz).is_err() {
return Err(error::User::FlowControlViolation.into()) return Err(error::User::FlowControlViolation.into())
} }
} }
@@ -318,6 +320,7 @@ impl<T, U> ReadySink for FlowControl<T>
T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>, T: Sink<SinkItem = Frame<U>, SinkError = ConnectionError>,
T: ReadySink, T: ReadySink,
T: ControlStreams, T: ControlStreams,
U: Buf,
{ {
fn poll_ready(&mut self) -> Poll<(), ConnectionError> { fn poll_ready(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.inner.poll_ready()); try_ready!(self.inner.poll_ready());

View File

@@ -74,7 +74,7 @@ impl<T, B> FramedWrite<T, B>
} }
fn frame_len(&self, data: &frame::Data<B>) -> usize { fn frame_len(&self, data: &frame::Data<B>) -> usize {
cmp::min(self.max_frame_size, data.len()) as usize cmp::min(self.max_frame_size as usize, data.payload().remaining())
} }
} }
@@ -104,7 +104,7 @@ impl<T, B> Sink for FramedWrite<T, B>
match item { match item {
Frame::Data(mut v) => { Frame::Data(mut v) => {
if v.len() >= (CHAIN_THRESHOLD as FrameSize) { if v.payload().remaining() >= CHAIN_THRESHOLD {
let head = v.head(); let head = v.head();
let len = self.frame_len(&v); let len = self.frame_len(&v);
@@ -121,7 +121,7 @@ impl<T, B> Sink for FramedWrite<T, B>
// The chunk has been fully encoded, so there is no need to // The chunk has been fully encoded, so there is no need to
// keep it around // keep it around
assert_eq!(v.len(), 0, "chunk not fully encoded"); assert_eq!(v.payload().remaining(), 0, "chunk not fully encoded");
} }
} }
Frame::Headers(v) => { Frame::Headers(v) => {