Merge pull request #1123 from hyperium/body-concat

feat(chunk): implement Extend and IntoIterator for Chunk
This commit is contained in:
Sean McArthur
2017-04-10 09:53:40 -07:00
committed by GitHub
2 changed files with 70 additions and 1 deletions

View File

@@ -96,3 +96,18 @@ fn _assert_send_sync() {
_assert_send::<Chunk>();
_assert_sync::<Chunk>();
}
#[test]
fn test_body_stream_concat() {
use futures::{Sink, Stream, Future};
let (tx, body) = Body::pair();
::std::thread::spawn(move || {
let tx = tx.send(Ok("hello ".into())).wait().unwrap();
tx.send(Ok("world".into())).wait().unwrap();
});
let total = body.concat().wait().unwrap();
assert_eq!(total.as_ref(), b"hello world");
}

View File

@@ -1,12 +1,41 @@
use std::fmt;
use std::mem;
use bytes::Bytes;
use bytes::{Bytes, BytesMut, BufMut};
/// A piece of a message body.
pub struct Chunk(Inner);
enum Inner {
Mut(BytesMut),
Shared(Bytes),
Swapping,
}
impl Inner {
fn as_bytes_mut(&mut self, reserve: usize) -> &mut BytesMut {
match *self {
Inner::Mut(ref mut bytes) => return bytes,
_ => ()
}
let bytes = match mem::replace(self, Inner::Swapping) {
Inner::Shared(bytes) => bytes,
_ => unreachable!(),
};
let bytes_mut = bytes.try_mut().unwrap_or_else(|bytes| {
let mut bytes_mut = BytesMut::with_capacity(reserve + bytes.len());
bytes_mut.put_slice(bytes.as_ref());
bytes_mut
});
*self = Inner::Mut(bytes_mut);
match *self {
Inner::Mut(ref mut bytes) => bytes,
_ => unreachable!(),
}
}
}
impl From<Vec<u8>> for Chunk {
@@ -46,7 +75,9 @@ impl From<Bytes> for Chunk {
impl From<Chunk> for Bytes {
fn from(chunk: Chunk) -> Bytes {
match chunk.0 {
Inner::Mut(bytes_mut) => bytes_mut.freeze(),
Inner::Shared(bytes) => bytes,
Inner::Swapping => unreachable!(),
}
}
}
@@ -64,7 +95,9 @@ impl AsRef<[u8]> for Chunk {
#[inline]
fn as_ref(&self) -> &[u8] {
match self.0 {
Inner::Mut(ref slice) => slice,
Inner::Shared(ref slice) => slice,
Inner::Swapping => unreachable!(),
}
}
}
@@ -75,3 +108,24 @@ impl fmt::Debug for Chunk {
fmt::Debug::fmt(self.as_ref(), f)
}
}
impl IntoIterator for Chunk {
type Item = u8;
type IntoIter = <Bytes as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
match self.0 {
Inner::Mut(bytes) => bytes.freeze().into_iter(),
Inner::Shared(bytes) => bytes.into_iter(),
Inner::Swapping => unreachable!(),
}
}
}
impl Extend<u8> for Chunk {
fn extend<T>(&mut self, iter: T) where T: IntoIterator<Item=u8> {
let iter = iter.into_iter();
self.0.as_bytes_mut(iter.size_hint().0).extend(iter);
}
}