reduce size of Response, async::Response, and async::Decoder

This commit is contained in:
Sean McArthur
2018-09-19 15:23:01 -07:00
parent 2698148743
commit 0735e586e5
4 changed files with 85 additions and 48 deletions

View File

@@ -1,7 +1,7 @@
use std::fmt;
use futures::{Stream, Poll, Async};
use bytes::Bytes;
use bytes::{Buf, Bytes};
use hyper::body::Payload;
/// An asynchronous `Stream`.
@@ -88,6 +88,20 @@ pub struct Chunk {
inner: ::hyper::Chunk,
}
impl Buf for Chunk {
fn bytes(&self) -> &[u8] {
self.inner.bytes()
}
fn remaining(&self) -> usize {
self.inner.remaining()
}
fn advance(&mut self, n: usize) {
self.inner.advance(n);
}
}
impl AsRef<[u8]> for Chunk {
#[inline]
fn as_ref(&self) -> &[u8] {

View File

@@ -26,7 +26,7 @@ use std::mem;
use std::cmp;
use std::io::{self, Read};
use bytes::{BufMut, BytesMut};
use bytes::{Buf, BufMut, BytesMut};
use libflate::non_blocking::gzip;
use futures::{Async, Future, Poll, Stream};
use hyper::{HeaderMap};
@@ -53,17 +53,15 @@ enum Inner {
Pending(Pending)
}
enum Pending {
/// An unreachable internal state.
Empty,
/// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
Gzip(ReadableChunks<Body>)
/// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
struct Pending {
body: ReadableChunks<Body>,
}
/// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results
/// as a `Chunk`.
struct Gzip {
inner: gzip::Decoder<Peeked<ReadableChunks<Body>>>,
inner: Box<gzip::Decoder<Peeked<ReadableChunks<Body>>>>,
buf: BytesMut,
}
@@ -101,7 +99,7 @@ impl Decoder {
#[inline]
fn gzip(body: Body) -> Decoder {
Decoder {
inner: Inner::Pending(Pending::Gzip(ReadableChunks::new(body)))
inner: Inner::Pending(Pending { body: ReadableChunks::new(body) })
}
}
@@ -141,28 +139,19 @@ impl Future for Pending {
type Error = error::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let body_state = match *self {
Pending::Gzip(ref mut body) => {
match body.poll_stream() {
Ok(Async::Ready(state)) => state,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => return Err(e)
}
},
Pending::Empty => panic!("poll for a decoder after it's done")
let body_state = match self.body.poll_stream() {
Ok(Async::Ready(state)) => state,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => return Err(e)
};
match mem::replace(self, Pending::Empty) {
Pending::Gzip(body) => {
// libflate does a read_exact([0; 2]), so its impossible to tell
// if the stream was empty, or truly had an UnexpectedEof.
// Therefore, we need to check for EOF first.
match body_state {
StreamState::Eof => Ok(Async::Ready(Inner::PlainText(body::empty()))),
StreamState::HasMore => Ok(Async::Ready(Inner::Gzip(Gzip::new(body))))
}
},
Pending::Empty => panic!("invalid internal state")
let body = mem::replace(&mut self.body, ReadableChunks::new(body::empty()));
// libflate does a read_exact([0; 2]), so its impossible to tell
// if the stream was empty, or truly had an UnexpectedEof.
// Therefore, we need to check for EOF first.
match body_state {
StreamState::Eof => Ok(Async::Ready(Inner::PlainText(body::empty()))),
StreamState::HasMore => Ok(Async::Ready(Inner::Gzip(Gzip::new(body))))
}
}
}
@@ -171,7 +160,7 @@ impl Gzip {
fn new(stream: ReadableChunks<Body>) -> Self {
Gzip {
buf: BytesMut::with_capacity(INIT_BUFFER_SIZE),
inner: gzip::Decoder::new(Peeked::new(stream))
inner: Box::new(gzip::Decoder::new(Peeked::new(stream))),
}
}
}
@@ -221,7 +210,7 @@ pub struct ReadableChunks<S> {
enum ReadState {
/// A chunk is ready to be read from.
Ready(Chunk, usize),
Ready(Chunk),
/// The next chunk isn't ready yet.
NotReady,
/// The stream has finished.
@@ -330,21 +319,20 @@ impl<S> fmt::Debug for ReadableChunks<S> {
}
}
impl<S> Read for ReadableChunks<S>
where S: Stream<Item = Chunk, Error = error::Error>
impl<S> Read for ReadableChunks<S>
where
S: Stream<Item = Chunk, Error = error::Error>,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
let ret;
match self.state {
ReadState::Ready(ref mut chunk, ref mut pos) => {
let chunk_start = *pos;
let len = cmp::min(buf.len(), chunk.len() - chunk_start);
let chunk_end = chunk_start + len;
ReadState::Ready(ref mut chunk) => {
let len = cmp::min(buf.len(), chunk.remaining());
buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);
*pos += len;
if *pos == chunk.len() {
buf[..len].copy_from_slice(&chunk[..len]);
chunk.advance(len);
if chunk.is_empty() {
ret = len;
} else {
return Ok(len);
@@ -382,7 +370,7 @@ impl<S> ReadableChunks<S>
fn poll_stream(&mut self) -> Poll<StreamState, error::Error> {
match self.stream.poll() {
Ok(Async::Ready(Some(chunk))) => {
self.state = ReadState::Ready(chunk, 0);
self.state = ReadState::Ready(chunk);
Ok(Async::Ready(StreamState::HasMore))
},
@@ -441,3 +429,8 @@ pub fn detect(headers: &mut HeaderMap, body: Body, check_gzip: bool) -> Decoder
Decoder::plain_text(body)
}
}
#[test]
fn mem_size_of() {
assert_eq!(::std::mem::size_of::<Decoder>(), 64);
}

View File

@@ -16,7 +16,9 @@ use super::{decoder, body, Decoder};
pub struct Response {
status: StatusCode,
headers: HeaderMap,
url: Url,
// Boxed to save space (11 words to 1 word), and it's not accessed
// frequently internally.
url: Box<Url>,
body: Decoder,
version: Version,
}
@@ -29,11 +31,11 @@ impl Response {
let decoder = decoder::detect(&mut headers, body::wrap(res.into_body()), gzip);
debug!("Response: '{}' for {}", status, url);
Response {
status: status,
headers: headers,
url: url,
status,
headers,
url: Box::new(url),
body: decoder,
version: version,
version,
}
}
@@ -123,9 +125,9 @@ impl Response {
#[inline]
pub fn error_for_status(self) -> ::Result<Self> {
if self.status.is_client_error() {
Err(::error::client_error(self.url, self.status))
Err(::error::client_error(*self.url, self.status))
} else if self.status.is_server_error() {
Err(::error::server_error(self.url, self.status))
Err(::error::server_error(*self.url, self.status))
} else {
Ok(self)
}
@@ -163,3 +165,4 @@ impl<T> fmt::Debug for Json<T> {
.finish()
}
}

View File

@@ -335,3 +335,30 @@ pub fn new(mut res: async_impl::Response, timeout: Option<Duration>, thread: Kee
_thread_handle: thread,
}
}
#[test]
fn mem_size_of() {
assert_eq!(::std::mem::size_of::<Response>(), 0);
}
#[test]
fn mem_size_of_readable_async_res() {
assert_eq!(::std::mem::size_of::<async_impl::Response>(), 0);
}
#[test]
fn mem_size_of_wait_body() {
assert_eq!(::std::mem::size_of::<WaitBody>(), 0);
}
#[test]
fn mem_size_of_url() {
assert_eq!(::std::mem::size_of::<::Url>(), 0);
}
#[test]
fn mem_size_of_readable_chunks_wait_body() {
assert_eq!(::std::mem::size_of::<async_impl::ReadableChunks<WaitBody>>(), 0);
}