diff --git a/src/frame/data.rs b/src/frame/data.rs index cfe5516..8945595 100644 --- a/src/frame/data.rs +++ b/src/frame/data.rs @@ -1,4 +1,3 @@ -use FrameSize; use frame::{util, Frame, Head, Error, StreamId, Kind}; use bytes::{BufMut, Bytes, Buf}; @@ -55,16 +54,16 @@ impl Data { Head::new(Kind::Data, self.flags.into(), self.stream_id) } + pub fn payload(&self) -> &T { + &self.data + } + pub fn into_payload(self) -> T { self.data } } impl Data { - pub fn len(&self) -> usize { - self.data.remaining() - } - pub fn from_buf(stream_id: StreamId, data: T, eos: bool) -> Self { let mut flags = DataFlag::default(); if eos { @@ -80,7 +79,7 @@ impl Data { } pub fn encode_chunk(&mut self, dst: &mut U) { - let len = self.len() as usize; + let len = self.data.remaining() as usize; if len > dst.remaining_mut() { unimplemented!(); } diff --git a/src/lib.rs b/src/lib.rs index 83352bf..00956cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,8 +53,6 @@ pub enum Frame { Data { id: StreamId, data: B, - /// TODO figure out how to make this a requirement on `B` - //data_len: FrameSize, end_of_stream: bool, }, Trailers { diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 8d9fef1..92f09ba 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,4 +1,4 @@ -use {ConnectionError, Frame, FrameSize}; +use {ConnectionError, Frame}; use client::Client; use error; use frame::{self, SettingSet, StreamId}; @@ -91,7 +91,6 @@ impl Connection { self.send(Frame::Data { id, - data_len: data.len() as FrameSize, data, end_of_stream, }) @@ -176,7 +175,6 @@ impl Stream for Connection Some(Data(v)) => Frame::Data { id: v.stream_id(), end_of_stream: v.is_end_stream(), - //data_len: v.len(), data: v.into_payload(), }, diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs index f8f89f1..cb4f20e 100644 --- a/src/proto/flow_control.rs +++ b/src/proto/flow_control.rs @@ -1,5 +1,4 @@ -use ConnectionError; -use error; +use {error, ConnectionError, FrameSize}; use frame::{self, Frame}; use proto::*; @@ -8,8 +7,8 @@ use std::collections::VecDeque; #[derive(Debug)] pub struct FlowControl { inner: T, - initial_local_window_size: u32, - initial_remote_window_size: u32, + initial_local_window_size: WindowSize, + initial_remote_window_size: WindowSize, /// Tracks the connection-level flow control window for receiving data from the /// remote. @@ -37,8 +36,8 @@ impl FlowControl T: Sink, SinkError = ConnectionError>, T: ControlStreams { - pub fn new(initial_local_window_size: u32, - initial_remote_window_size: u32, + pub fn new(initial_local_window_size: WindowSize, + initial_remote_window_size: WindowSize, inner: T) -> FlowControl { @@ -250,11 +249,12 @@ impl Stream for FlowControl } Some(Data(v)) => { - if self.connection_local_flow_controller.claim_window(v.len()).is_err() { + let sz = v.payload().len() as FrameSize; + if self.connection_local_flow_controller.claim_window(sz).is_err() { return Err(error::Reason::FlowControlError.into()) } if let Some(fc) = self.local_flow_controller(v.stream_id()) { - if fc.claim_window(v.len()).is_err() { + if fc.claim_window(sz).is_err() { return Err(error::Reason::FlowControlError.into()) } } @@ -271,6 +271,7 @@ impl Sink for FlowControl where T: Sink, SinkError = ConnectionError>, T: ReadySink, T: ControlStreams, + U: Buf, { type SinkItem = T::SinkItem; type SinkError = T::SinkError; @@ -289,11 +290,12 @@ impl Sink for FlowControl return Ok(AsyncSink::NotReady(Data(v))); } - if self.connection_remote_flow_controller.claim_window(v.len()).is_err() { + let sz = v.payload().remaining() as FrameSize; + if self.connection_remote_flow_controller.claim_window(sz).is_err() { return Err(error::User::FlowControlViolation.into()); } if let Some(fc) = self.remote_flow_controller(v.stream_id()) { - if fc.claim_window(v.len()).is_err() { + if fc.claim_window(sz).is_err() { return Err(error::User::FlowControlViolation.into()) } } @@ -318,6 +320,7 @@ impl ReadySink for FlowControl T: Sink, SinkError = ConnectionError>, T: ReadySink, T: ControlStreams, + U: Buf, { fn poll_ready(&mut self) -> Poll<(), ConnectionError> { try_ready!(self.inner.poll_ready()); diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index ace33ae..fa0bc4d 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -74,7 +74,7 @@ impl FramedWrite } fn frame_len(&self, data: &frame::Data) -> usize { - cmp::min(self.max_frame_size, data.len()) as usize + cmp::min(self.max_frame_size as usize, data.payload().remaining()) } } @@ -104,7 +104,7 @@ impl Sink for FramedWrite match item { Frame::Data(mut v) => { - if v.len() >= (CHAIN_THRESHOLD as FrameSize) { + if v.payload().remaining() >= CHAIN_THRESHOLD { let head = v.head(); let len = self.frame_len(&v); @@ -121,7 +121,7 @@ impl Sink for FramedWrite // The chunk has been fully encoded, so there is no need to // keep it around - assert_eq!(v.len(), 0, "chunk not fully encoded"); + assert_eq!(v.payload().remaining(), 0, "chunk not fully encoded"); } } Frame::Headers(v) => {