From 0735e586e54c86077611014426359206ea0d9a70 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 19 Sep 2018 15:23:01 -0700 Subject: [PATCH] reduce size of Response, async::Response, and async::Decoder --- src/async_impl/body.rs | 16 ++++++++- src/async_impl/decoder.rs | 73 +++++++++++++++++--------------------- src/async_impl/response.rs | 17 +++++---- src/response.rs | 27 ++++++++++++++ 4 files changed, 85 insertions(+), 48 deletions(-) diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index 1ef76b3..a3495af 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -1,7 +1,7 @@ use std::fmt; use futures::{Stream, Poll, Async}; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use hyper::body::Payload; /// An asynchronous `Stream`. @@ -88,6 +88,20 @@ pub struct Chunk { inner: ::hyper::Chunk, } +impl Buf for Chunk { + fn bytes(&self) -> &[u8] { + self.inner.bytes() + } + + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn advance(&mut self, n: usize) { + self.inner.advance(n); + } +} + impl AsRef<[u8]> for Chunk { #[inline] fn as_ref(&self) -> &[u8] { diff --git a/src/async_impl/decoder.rs b/src/async_impl/decoder.rs index c1f8250..ea86b0b 100644 --- a/src/async_impl/decoder.rs +++ b/src/async_impl/decoder.rs @@ -26,7 +26,7 @@ use std::mem; use std::cmp; use std::io::{self, Read}; -use bytes::{BufMut, BytesMut}; +use bytes::{Buf, BufMut, BytesMut}; use libflate::non_blocking::gzip; use futures::{Async, Future, Poll, Stream}; use hyper::{HeaderMap}; @@ -53,17 +53,15 @@ enum Inner { Pending(Pending) } -enum Pending { - /// An unreachable internal state. - Empty, - /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. - Gzip(ReadableChunks) +/// A future attempt to poll the response body for EOF so we know whether to use gzip or not. +struct Pending { + body: ReadableChunks, } /// A gzip decoder that reads from a `libflate::gzip::Decoder` into a `BytesMut` and emits the results /// as a `Chunk`. struct Gzip { - inner: gzip::Decoder>>, + inner: Box>>>, buf: BytesMut, } @@ -101,7 +99,7 @@ impl Decoder { #[inline] fn gzip(body: Body) -> Decoder { Decoder { - inner: Inner::Pending(Pending::Gzip(ReadableChunks::new(body))) + inner: Inner::Pending(Pending { body: ReadableChunks::new(body) }) } } @@ -141,28 +139,19 @@ impl Future for Pending { type Error = error::Error; fn poll(&mut self) -> Poll { - let body_state = match *self { - Pending::Gzip(ref mut body) => { - match body.poll_stream() { - Ok(Async::Ready(state)) => state, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(e) => return Err(e) - } - }, - Pending::Empty => panic!("poll for a decoder after it's done") + let body_state = match self.body.poll_stream() { + Ok(Async::Ready(state)) => state, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => return Err(e) }; - match mem::replace(self, Pending::Empty) { - Pending::Gzip(body) => { - // libflate does a read_exact([0; 2]), so its impossible to tell - // if the stream was empty, or truly had an UnexpectedEof. - // Therefore, we need to check for EOF first. - match body_state { - StreamState::Eof => Ok(Async::Ready(Inner::PlainText(body::empty()))), - StreamState::HasMore => Ok(Async::Ready(Inner::Gzip(Gzip::new(body)))) - } - }, - Pending::Empty => panic!("invalid internal state") + let body = mem::replace(&mut self.body, ReadableChunks::new(body::empty())); + // libflate does a read_exact([0; 2]), so its impossible to tell + // if the stream was empty, or truly had an UnexpectedEof. + // Therefore, we need to check for EOF first. + match body_state { + StreamState::Eof => Ok(Async::Ready(Inner::PlainText(body::empty()))), + StreamState::HasMore => Ok(Async::Ready(Inner::Gzip(Gzip::new(body)))) } } } @@ -171,7 +160,7 @@ impl Gzip { fn new(stream: ReadableChunks) -> Self { Gzip { buf: BytesMut::with_capacity(INIT_BUFFER_SIZE), - inner: gzip::Decoder::new(Peeked::new(stream)) + inner: Box::new(gzip::Decoder::new(Peeked::new(stream))), } } } @@ -221,7 +210,7 @@ pub struct ReadableChunks { enum ReadState { /// A chunk is ready to be read from. - Ready(Chunk, usize), + Ready(Chunk), /// The next chunk isn't ready yet. NotReady, /// The stream has finished. @@ -330,21 +319,20 @@ impl fmt::Debug for ReadableChunks { } } -impl Read for ReadableChunks - where S: Stream +impl Read for ReadableChunks +where + S: Stream, { fn read(&mut self, buf: &mut [u8]) -> io::Result { loop { let ret; match self.state { - ReadState::Ready(ref mut chunk, ref mut pos) => { - let chunk_start = *pos; - let len = cmp::min(buf.len(), chunk.len() - chunk_start); - let chunk_end = chunk_start + len; + ReadState::Ready(ref mut chunk) => { + let len = cmp::min(buf.len(), chunk.remaining()); - buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]); - *pos += len; - if *pos == chunk.len() { + buf[..len].copy_from_slice(&chunk[..len]); + chunk.advance(len); + if chunk.is_empty() { ret = len; } else { return Ok(len); @@ -382,7 +370,7 @@ impl ReadableChunks fn poll_stream(&mut self) -> Poll { match self.stream.poll() { Ok(Async::Ready(Some(chunk))) => { - self.state = ReadState::Ready(chunk, 0); + self.state = ReadState::Ready(chunk); Ok(Async::Ready(StreamState::HasMore)) }, @@ -441,3 +429,8 @@ pub fn detect(headers: &mut HeaderMap, body: Body, check_gzip: bool) -> Decoder Decoder::plain_text(body) } } + +#[test] +fn mem_size_of() { + assert_eq!(::std::mem::size_of::(), 64); +} diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index 01d212a..942b263 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -16,7 +16,9 @@ use super::{decoder, body, Decoder}; pub struct Response { status: StatusCode, headers: HeaderMap, - url: Url, + // Boxed to save space (11 words to 1 word), and it's not accessed + // frequently internally. + url: Box, body: Decoder, version: Version, } @@ -29,11 +31,11 @@ impl Response { let decoder = decoder::detect(&mut headers, body::wrap(res.into_body()), gzip); debug!("Response: '{}' for {}", status, url); Response { - status: status, - headers: headers, - url: url, + status, + headers, + url: Box::new(url), body: decoder, - version: version, + version, } } @@ -123,9 +125,9 @@ impl Response { #[inline] pub fn error_for_status(self) -> ::Result { if self.status.is_client_error() { - Err(::error::client_error(self.url, self.status)) + Err(::error::client_error(*self.url, self.status)) } else if self.status.is_server_error() { - Err(::error::server_error(self.url, self.status)) + Err(::error::server_error(*self.url, self.status)) } else { Ok(self) } @@ -163,3 +165,4 @@ impl fmt::Debug for Json { .finish() } } + diff --git a/src/response.rs b/src/response.rs index 78cf160..a91c629 100644 --- a/src/response.rs +++ b/src/response.rs @@ -335,3 +335,30 @@ pub fn new(mut res: async_impl::Response, timeout: Option, thread: Kee _thread_handle: thread, } } + +#[test] +fn mem_size_of() { + assert_eq!(::std::mem::size_of::(), 0); +} + +#[test] +fn mem_size_of_readable_async_res() { + assert_eq!(::std::mem::size_of::(), 0); +} + +#[test] +fn mem_size_of_wait_body() { + assert_eq!(::std::mem::size_of::(), 0); +} + +#[test] +fn mem_size_of_url() { + assert_eq!(::std::mem::size_of::<::Url>(), 0); +} + +#[test] +fn mem_size_of_readable_chunks_wait_body() { + assert_eq!(::std::mem::size_of::>(), 0); +} + +