feat(body): change Payload::Data to be a Buf

Closes #1508

BREAKING CHANGE: Each payload chunk must implement `Buf`, instead of
  just `AsRef<[u8]>`.
This commit is contained in:
Sean McArthur
2018-05-07 10:06:28 -07:00
parent dfa7e17960
commit a3be110a55
6 changed files with 34 additions and 30 deletions

View File

@@ -17,7 +17,7 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::fmt; use std::fmt;
use bytes::Bytes; use bytes::{Buf, Bytes};
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll, Stream};
use futures::sync::{mpsc, oneshot}; use futures::sync::{mpsc, oneshot};
use h2; use h2;
@@ -34,7 +34,7 @@ type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
/// don't need to customize a send stream for your own application. /// don't need to customize a send stream for your own application.
pub trait Payload: Send + 'static { pub trait Payload: Send + 'static {
/// A buffer of bytes representing a single chunk of a body. /// A buffer of bytes representing a single chunk of a body.
type Data: AsRef<[u8]> + Send; type Data: Buf + Send;
/// The error type of this stream. /// The error type of this stream.
type Error: Into<Box<::std::error::Error + Send + Sync>>; type Error: Into<Box<::std::error::Error + Send + Sync>>;

View File

@@ -1,6 +1,6 @@
use std::fmt; use std::fmt;
use bytes::Bytes; use bytes::{Buf, Bytes};
use h2::ReleaseCapacity; use h2::ReleaseCapacity;
/// A piece of a message body. /// A piece of a message body.
@@ -53,6 +53,23 @@ impl Chunk {
} }
} }
impl Buf for Chunk {
#[inline]
fn remaining(&self) -> usize {
self.bytes.len()
}
#[inline]
fn bytes(&self) -> &[u8] {
&self.bytes
}
#[inline]
fn advance(&mut self, cnt: usize) {
self.bytes.advance(cnt);
}
}
impl From<Vec<u8>> for Chunk { impl From<Vec<u8>> for Chunk {
#[inline] #[inline]
fn from(v: Vec<u8>) -> Chunk { fn from(v: Vec<u8>) -> Chunk {

View File

@@ -626,7 +626,6 @@ impl<T: Send, B: Send> AssertSend for Connection<T, B>
where where
T: AsyncRead + AsyncWrite + Send + 'static, T: AsyncRead + AsyncWrite + Send + 'static,
B: Payload + 'static, B: Payload + 'static,
B::Data: Send + 'static,
{} {}
#[doc(hidden)] #[doc(hidden)]

View File

@@ -2,7 +2,7 @@ use std::fmt;
use std::io::{self}; use std::io::{self};
use std::marker::PhantomData; use std::marker::PhantomData;
use bytes::Bytes; use bytes::{Buf, Bytes};
use futures::{Async, AsyncSink, Poll, StartSend}; use futures::{Async, AsyncSink, Poll, StartSend};
use futures::task::Task; use futures::task::Task;
use http::{Method, Version}; use http::{Method, Version};
@@ -10,7 +10,7 @@ use tokio_io::{AsyncRead, AsyncWrite};
use ::Chunk; use ::Chunk;
use proto::{BodyLength, Decode, Http1Transaction, MessageHead}; use proto::{BodyLength, Decode, Http1Transaction, MessageHead};
use super::io::{Cursor, Buffered}; use super::io::{Buffered};
use super::{EncodedBuf, Encoder, Decoder}; use super::{EncodedBuf, Encoder, Decoder};
@@ -22,25 +22,14 @@ use super::{EncodedBuf, Encoder, Decoder};
/// determine if this connection can be kept alive after the message, /// determine if this connection can be kept alive after the message,
/// or if it is complete. /// or if it is complete.
pub(crate) struct Conn<I, B, T> { pub(crate) struct Conn<I, B, T> {
io: Buffered<I, EncodedBuf<Cursor<B>>>, io: Buffered<I, EncodedBuf<B>>,
state: State, state: State,
_marker: PhantomData<T> _marker: PhantomData<T>
} }
/*
impl<I, B> Conn<I, B, ClientTransaction>
where I: AsyncRead + AsyncWrite,
B: AsRef<[u8]>,
{
pub fn new_client(io: I) -> Conn<I, B, ClientTransaction> {
Conn::new(io)
}
}
*/
impl<I, B, T> Conn<I, B, T> impl<I, B, T> Conn<I, B, T>
where I: AsyncRead + AsyncWrite, where I: AsyncRead + AsyncWrite,
B: AsRef<[u8]>, B: Buf,
T: Http1Transaction, T: Http1Transaction,
{ {
pub fn new(io: I) -> Conn<I, B, T> { pub fn new(io: I) -> Conn<I, B, T> {
@@ -488,7 +477,7 @@ where I: AsyncRead + AsyncWrite,
if !self.can_buffer_body() { if !self.can_buffer_body() {
if let Async::NotReady = self.flush()? { if let Async::NotReady = self.flush()? {
// if chunk is Some(&[]), aka empty, whatever, just skip it // if chunk is Some(&[]), aka empty, whatever, just skip it
if chunk.as_ref().map(|c| c.as_ref().is_empty()).unwrap_or(false) { if chunk.as_ref().map(|c| c.remaining() == 0).unwrap_or(false) {
return Ok(AsyncSink::Ready); return Ok(AsyncSink::Ready);
} else { } else {
return Ok(AsyncSink::NotReady(chunk)); return Ok(AsyncSink::NotReady(chunk));
@@ -499,11 +488,11 @@ where I: AsyncRead + AsyncWrite,
let state = match self.state.writing { let state = match self.state.writing {
Writing::Body(ref mut encoder) => { Writing::Body(ref mut encoder) => {
if let Some(chunk) = chunk { if let Some(chunk) = chunk {
if chunk.as_ref().is_empty() { if chunk.remaining() == 0 {
return Ok(AsyncSink::Ready); return Ok(AsyncSink::Ready);
} }
let encoded = encoder.encode(Cursor::new(chunk)); let encoded = encoder.encode(chunk);
self.io.buffer(encoded); self.io.buffer(encoded);
if encoder.is_eof() { if encoder.is_eof() {
@@ -612,7 +601,7 @@ where I: AsyncRead + AsyncWrite,
} }
} }
impl<I, B: AsRef<[u8]>, T> fmt::Debug for Conn<I, B, T> { impl<I, B: Buf, T> fmt::Debug for Conn<I, B, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Conn") f.debug_struct("Conn")
.field("state", &self.state) .field("state", &self.state)

View File

@@ -1,4 +1,4 @@
use bytes::Bytes; use bytes::{Buf, Bytes};
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll, Stream};
use http::{Request, Response, StatusCode}; use http::{Request, Response, StatusCode};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@@ -241,7 +241,7 @@ where
if self.conn.can_write_body() { if self.conn.can_write_body() {
self.conn.write_body(Some(chunk)).map_err(::Error::new_body_write)?; self.conn.write_body(Some(chunk)).map_err(::Error::new_body_write)?;
// This allows when chunk is `None`, or `Some([])`. // This allows when chunk is `None`, or `Some([])`.
} else if chunk.as_ref().len() == 0 { } else if chunk.remaining() == 0 {
// ok // ok
} else { } else {
warn!("unexpected chunk when body cannot write"); warn!("unexpected chunk when body cannot write");

View File

@@ -5,7 +5,6 @@ use http::HeaderMap;
use http::header::{CONNECTION, TRANSFER_ENCODING}; use http::header::{CONNECTION, TRANSFER_ENCODING};
use ::body::Payload; use ::body::Payload;
use ::proto::h1::Cursor;
mod client; mod client;
mod server; mod server;
@@ -74,11 +73,11 @@ where
let is_eos = self.stream.is_end_stream(); let is_eos = self.stream.is_end_stream();
trace!( trace!(
"send body chunk: {} bytes, eos={}", "send body chunk: {} bytes, eos={}",
chunk.as_ref().len(), chunk.remaining(),
is_eos, is_eos,
); );
let buf = SendBuf(Some(Cursor::new(chunk))); let buf = SendBuf(Some(chunk));
self.body_tx.send_data(buf, is_eos) self.body_tx.send_data(buf, is_eos)
.map_err(::Error::new_body_write)?; .map_err(::Error::new_body_write)?;
@@ -104,9 +103,9 @@ where
} }
} }
struct SendBuf<B>(Option<Cursor<B>>); struct SendBuf<B>(Option<B>);
impl<B: AsRef<[u8]>> Buf for SendBuf<B> { impl<B: Buf> Buf for SendBuf<B> {
#[inline] #[inline]
fn remaining(&self) -> usize { fn remaining(&self) -> usize {
self.0 self.0