From a3be110a55571a1ee9a31b2335d7aec27c04e96a Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 7 May 2018 10:06:28 -0700 Subject: [PATCH] feat(body): change `Payload::Data` to be a `Buf` Closes #1508 BREAKING CHANGE: Each payload chunk must implement `Buf`, instead of just `AsRef<[u8]>`. --- src/body.rs | 4 ++-- src/chunk.rs | 19 ++++++++++++++++++- src/client/conn.rs | 1 - src/proto/h1/conn.rs | 27 ++++++++------------------- src/proto/h1/dispatch.rs | 4 ++-- src/proto/h2/mod.rs | 9 ++++----- 6 files changed, 34 insertions(+), 30 deletions(-) diff --git a/src/body.rs b/src/body.rs index ebd53645..49e225cc 100644 --- a/src/body.rs +++ b/src/body.rs @@ -17,7 +17,7 @@ use std::borrow::Cow; use std::fmt; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use futures::{Async, Future, Poll, Stream}; use futures::sync::{mpsc, oneshot}; use h2; @@ -34,7 +34,7 @@ type BodySender = mpsc::Sender>; /// don't need to customize a send stream for your own application. pub trait Payload: Send + 'static { /// A buffer of bytes representing a single chunk of a body. - type Data: AsRef<[u8]> + Send; + type Data: Buf + Send; /// The error type of this stream. type Error: Into>; diff --git a/src/chunk.rs b/src/chunk.rs index 692a961d..92706030 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,6 +1,6 @@ use std::fmt; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use h2::ReleaseCapacity; /// A piece of a message body. @@ -53,6 +53,23 @@ impl Chunk { } } +impl Buf for Chunk { + #[inline] + fn remaining(&self) -> usize { + self.bytes.len() + } + + #[inline] + fn bytes(&self) -> &[u8] { + &self.bytes + } + + #[inline] + fn advance(&mut self, cnt: usize) { + self.bytes.advance(cnt); + } +} + impl From> for Chunk { #[inline] fn from(v: Vec) -> Chunk { diff --git a/src/client/conn.rs b/src/client/conn.rs index 97689bb5..d377a24d 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -626,7 +626,6 @@ impl AssertSend for Connection where T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, - B::Data: Send + 'static, {} #[doc(hidden)] diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index f46dbf21..bb11ae41 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -2,7 +2,7 @@ use std::fmt; use std::io::{self}; use std::marker::PhantomData; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use futures::{Async, AsyncSink, Poll, StartSend}; use futures::task::Task; use http::{Method, Version}; @@ -10,7 +10,7 @@ use tokio_io::{AsyncRead, AsyncWrite}; use ::Chunk; use proto::{BodyLength, Decode, Http1Transaction, MessageHead}; -use super::io::{Cursor, Buffered}; +use super::io::{Buffered}; use super::{EncodedBuf, Encoder, Decoder}; @@ -22,25 +22,14 @@ use super::{EncodedBuf, Encoder, Decoder}; /// determine if this connection can be kept alive after the message, /// or if it is complete. pub(crate) struct Conn { - io: Buffered>>, + io: Buffered>, state: State, _marker: PhantomData } -/* -impl Conn -where I: AsyncRead + AsyncWrite, - B: AsRef<[u8]>, -{ - pub fn new_client(io: I) -> Conn { - Conn::new(io) - } -} -*/ - impl Conn where I: AsyncRead + AsyncWrite, - B: AsRef<[u8]>, + B: Buf, T: Http1Transaction, { pub fn new(io: I) -> Conn { @@ -488,7 +477,7 @@ where I: AsyncRead + AsyncWrite, if !self.can_buffer_body() { if let Async::NotReady = self.flush()? { // if chunk is Some(&[]), aka empty, whatever, just skip it - if chunk.as_ref().map(|c| c.as_ref().is_empty()).unwrap_or(false) { + if chunk.as_ref().map(|c| c.remaining() == 0).unwrap_or(false) { return Ok(AsyncSink::Ready); } else { return Ok(AsyncSink::NotReady(chunk)); @@ -499,11 +488,11 @@ where I: AsyncRead + AsyncWrite, let state = match self.state.writing { Writing::Body(ref mut encoder) => { if let Some(chunk) = chunk { - if chunk.as_ref().is_empty() { + if chunk.remaining() == 0 { return Ok(AsyncSink::Ready); } - let encoded = encoder.encode(Cursor::new(chunk)); + let encoded = encoder.encode(chunk); self.io.buffer(encoded); if encoder.is_eof() { @@ -612,7 +601,7 @@ where I: AsyncRead + AsyncWrite, } } -impl, T> fmt::Debug for Conn { +impl fmt::Debug for Conn { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Conn") .field("state", &self.state) diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 8cd7665d..c0e8a9f3 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -1,4 +1,4 @@ -use bytes::Bytes; +use bytes::{Buf, Bytes}; use futures::{Async, Future, Poll, Stream}; use http::{Request, Response, StatusCode}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -241,7 +241,7 @@ where if self.conn.can_write_body() { self.conn.write_body(Some(chunk)).map_err(::Error::new_body_write)?; // This allows when chunk is `None`, or `Some([])`. - } else if chunk.as_ref().len() == 0 { + } else if chunk.remaining() == 0 { // ok } else { warn!("unexpected chunk when body cannot write"); diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index f30ad732..4baad8a4 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -5,7 +5,6 @@ use http::HeaderMap; use http::header::{CONNECTION, TRANSFER_ENCODING}; use ::body::Payload; -use ::proto::h1::Cursor; mod client; mod server; @@ -74,11 +73,11 @@ where let is_eos = self.stream.is_end_stream(); trace!( "send body chunk: {} bytes, eos={}", - chunk.as_ref().len(), + chunk.remaining(), is_eos, ); - let buf = SendBuf(Some(Cursor::new(chunk))); + let buf = SendBuf(Some(chunk)); self.body_tx.send_data(buf, is_eos) .map_err(::Error::new_body_write)?; @@ -104,9 +103,9 @@ where } } -struct SendBuf(Option>); +struct SendBuf(Option); -impl> Buf for SendBuf { +impl Buf for SendBuf { #[inline] fn remaining(&self) -> usize { self.0