perf(body): reduce memory size of Body by a u64 (#2118)

Replaces the `Option<u64>` content-length with a `DecodedLength`, which
stores its unknown-ness as `u64::MAX`.
This commit is contained in:
Sean McArthur
2020-01-27 13:09:40 -08:00
committed by GitHub
parent 1881db6391
commit a354580e3f
6 changed files with 96 additions and 20 deletions

View File

@@ -12,6 +12,7 @@ use http::HeaderMap;
use http_body::{Body as HttpBody, SizeHint}; use http_body::{Body as HttpBody, SizeHint};
use crate::common::{task, Future, Never, Pin, Poll}; use crate::common::{task, Future, Never, Pin, Poll};
use crate::proto::DecodedLength;
use crate::upgrade::OnUpgrade; use crate::upgrade::OnUpgrade;
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>; type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
@@ -31,12 +32,12 @@ pub struct Body {
enum Kind { enum Kind {
Once(Option<Bytes>), Once(Option<Bytes>),
Chan { Chan {
content_length: Option<u64>, content_length: DecodedLength,
abort_rx: oneshot::Receiver<()>, abort_rx: oneshot::Receiver<()>,
rx: mpsc::Receiver<Result<Bytes, crate::Error>>, rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
}, },
H2 { H2 {
content_length: Option<u64>, content_length: DecodedLength,
recv: h2::RecvStream, recv: h2::RecvStream,
}, },
// NOTE: This requires `Sync` because of how easy it is to use `await` // NOTE: This requires `Sync` because of how easy it is to use `await`
@@ -105,10 +106,10 @@ impl Body {
/// Useful when wanting to stream chunks from another thread. /// Useful when wanting to stream chunks from another thread.
#[inline] #[inline]
pub fn channel() -> (Sender, Body) { pub fn channel() -> (Sender, Body) {
Self::new_channel(None) Self::new_channel(DecodedLength::CHUNKED)
} }
pub(crate) fn new_channel(content_length: Option<u64>) -> (Sender, Body) { pub(crate) fn new_channel(content_length: DecodedLength) -> (Sender, Body) {
let (tx, rx) = mpsc::channel(0); let (tx, rx) = mpsc::channel(0);
let (abort_tx, abort_rx) = oneshot::channel(); let (abort_tx, abort_rx) = oneshot::channel();
@@ -167,7 +168,7 @@ impl Body {
Body { kind, extra: None } Body { kind, extra: None }
} }
pub(crate) fn h2(recv: h2::RecvStream, content_length: Option<u64>) -> Self { pub(crate) fn h2(recv: h2::RecvStream, content_length: DecodedLength) -> Self {
Body::new(Kind::H2 { Body::new(Kind::H2 {
content_length, content_length,
recv, recv,
@@ -243,20 +244,19 @@ impl Body {
match ready!(Pin::new(rx).poll_next(cx)?) { match ready!(Pin::new(rx).poll_next(cx)?) {
Some(chunk) => { Some(chunk) => {
if let Some(ref mut len) = *len { len.sub_if(chunk.len() as u64);
debug_assert!(*len >= chunk.len() as u64);
*len -= chunk.len() as u64;
}
Poll::Ready(Some(Ok(chunk))) Poll::Ready(Some(Ok(chunk)))
} }
None => Poll::Ready(None), None => Poll::Ready(None),
} }
} }
Kind::H2 { Kind::H2 {
recv: ref mut h2, .. recv: ref mut h2,
content_length: ref mut len,
} => match ready!(h2.poll_data(cx)) { } => match ready!(h2.poll_data(cx)) {
Some(Ok(bytes)) => { Some(Ok(bytes)) => {
let _ = h2.flow_control().release_capacity(bytes.len()); let _ = h2.flow_control().release_capacity(bytes.len());
len.sub_if(bytes.len() as u64);
Poll::Ready(Some(Ok(bytes))) Poll::Ready(Some(Ok(bytes)))
} }
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
@@ -317,7 +317,7 @@ impl HttpBody for Body {
fn is_end_stream(&self) -> bool { fn is_end_stream(&self) -> bool {
match self.kind { match self.kind {
Kind::Once(ref val) => val.is_none(), Kind::Once(ref val) => val.is_none(),
Kind::Chan { content_length, .. } => content_length == Some(0), Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
#[cfg(feature = "stream")] #[cfg(feature = "stream")]
Kind::Wrapped(..) => false, Kind::Wrapped(..) => false,
@@ -337,7 +337,7 @@ impl HttpBody for Body {
Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => { Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => {
let mut hint = SizeHint::default(); let mut hint = SizeHint::default();
if let Some(content_length) = content_length { if let Some(content_length) = content_length.into_opt() {
hint.set_exact(content_length as u64); hint.set_exact(content_length as u64);
} }
@@ -498,6 +498,7 @@ impl Sender {
/// Aborts the body in an abnormal fashion. /// Aborts the body in an abnormal fashion.
pub fn abort(self) { pub fn abort(self) {
// TODO(sean): this can just be `self.tx.clone().try_send()`
let _ = self.abort_tx.send(()); let _ = self.abort_tx.send(());
} }
@@ -505,3 +506,39 @@ impl Sender {
let _ = self.tx.try_send(Err(err)); let _ = self.tx.try_send(Err(err));
} }
} }
#[cfg(test)]
mod tests {
use std::mem;
use super::{Body, Sender};
#[test]
fn test_size_of() {
// These are mostly to help catch *accidentally* increasing
// the size by too much.
assert_eq!(
mem::size_of::<Body>(),
mem::size_of::<usize>() * 5 + mem::size_of::<u64>(),
"Body"
);
assert_eq!(
mem::size_of::<Body>(),
mem::size_of::<Option<Body>>(),
"Option<Body>"
);
assert_eq!(
mem::size_of::<Sender>(),
mem::size_of::<usize>() * 4,
"Sender"
);
assert_eq!(
mem::size_of::<Sender>(),
mem::size_of::<Option<Sender>>(),
"Option<Sender>"
);
}
}

View File

@@ -239,7 +239,7 @@ where
let mut body = match body_len { let mut body = match body_len {
DecodedLength::ZERO => Body::empty(), DecodedLength::ZERO => Body::empty(),
other => { other => {
let (tx, rx) = Body::new_channel(other.into_opt()); let (tx, rx) = Body::new_channel(other);
self.body_tx = Some(tx); self.body_tx = Some(tx);
rx rx
} }

View File

@@ -4,11 +4,10 @@ use futures_util::stream::StreamExt as _;
use h2::client::{Builder, SendRequest}; use h2::client::{Builder, SendRequest};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use super::{PipeToSendStream, SendBuf}; use super::{decode_content_length, PipeToSendStream, SendBuf};
use crate::body::Payload; use crate::body::Payload;
use crate::common::{task, Exec, Future, Never, Pin, Poll}; use crate::common::{task, Exec, Future, Never, Pin, Poll};
use crate::headers; use crate::headers;
use crate::headers::content_length_parse_all;
use crate::proto::Dispatched; use crate::proto::Dispatched;
use crate::{Body, Request, Response}; use crate::{Body, Request, Response};
@@ -159,7 +158,7 @@ where
let fut = fut.map(move |result| match result { let fut = fut.map(move |result| match result {
Ok(res) => { Ok(res) => {
let content_length = content_length_parse_all(res.headers()); let content_length = decode_content_length(res.headers());
let res = res.map(|stream| crate::Body::h2(stream, content_length)); let res = res.map(|stream| crate::Body::h2(stream, content_length));
Ok(res) Ok(res)
} }

View File

@@ -7,8 +7,10 @@ use http::header::{
use http::HeaderMap; use http::HeaderMap;
use pin_project::pin_project; use pin_project::pin_project;
use super::DecodedLength;
use crate::body::Payload; use crate::body::Payload;
use crate::common::{task, Future, Pin, Poll}; use crate::common::{task, Future, Pin, Poll};
use crate::headers::content_length_parse_all;
pub(crate) mod client; pub(crate) mod client;
pub(crate) mod server; pub(crate) mod server;
@@ -71,6 +73,15 @@ fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) {
} }
} }
fn decode_content_length(headers: &HeaderMap) -> DecodedLength {
if let Some(len) = content_length_parse_all(headers) {
// If the length is u64::MAX, oh well, just reported chunked.
DecodedLength::checked_new(len).unwrap_or_else(|_| DecodedLength::CHUNKED)
} else {
DecodedLength::CHUNKED
}
}
// body adapters used by both Client and Server // body adapters used by both Client and Server
#[pin_project] #[pin_project]

View File

@@ -6,12 +6,11 @@ use h2::Reason;
use pin_project::{pin_project, project}; use pin_project::{pin_project, project};
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use super::{PipeToSendStream, SendBuf}; use super::{decode_content_length, PipeToSendStream, SendBuf};
use crate::body::Payload; use crate::body::Payload;
use crate::common::exec::H2Exec; use crate::common::exec::H2Exec;
use crate::common::{task, Future, Pin, Poll}; use crate::common::{task, Future, Pin, Poll};
use crate::headers; use crate::headers;
use crate::headers::content_length_parse_all;
use crate::proto::Dispatched; use crate::proto::Dispatched;
use crate::service::HttpService; use crate::service::HttpService;
@@ -168,7 +167,7 @@ where
match ready!(self.conn.poll_accept(cx)) { match ready!(self.conn.poll_accept(cx)) {
Some(Ok((req, respond))) => { Some(Ok((req, respond))) => {
trace!("incoming request"); trace!("incoming request");
let content_length = content_length_parse_all(req.headers()); let content_length = decode_content_length(req.headers());
let req = req.map(|stream| crate::Body::h2(stream, content_length)); let req = req.map(|stream| crate::Body::h2(stream, content_length));
let fut = H2Stream::new(service.call(req), respond); let fut = H2Stream::new(service.call(req), respond);
exec.execute_h2stream(fut); exec.execute_h2stream(fut);

View File

@@ -1,7 +1,7 @@
//! Pieces pertaining to the HTTP message protocol. //! Pieces pertaining to the HTTP message protocol.
use http::{HeaderMap, Method, StatusCode, Uri, Version}; use http::{HeaderMap, Method, StatusCode, Uri, Version};
use self::body_length::DecodedLength; pub(crate) use self::body_length::DecodedLength;
pub(crate) use self::h1::{dispatch, Conn, ServerTransaction}; pub(crate) use self::h1::{dispatch, Conn, ServerTransaction};
pub(crate) mod h1; pub(crate) mod h1;
@@ -90,6 +90,15 @@ mod body_length {
Err(crate::error::Parse::TooLarge) Err(crate::error::Parse::TooLarge)
} }
} }
pub(crate) fn sub_if(&mut self, amt: u64) {
match *self {
DecodedLength::CHUNKED | DecodedLength::CLOSE_DELIMITED => (),
DecodedLength(ref mut known) => {
*known -= amt;
}
}
}
} }
impl fmt::Debug for DecodedLength { impl fmt::Debug for DecodedLength {
@@ -112,4 +121,25 @@ mod body_length {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sub_if_known() {
let mut len = DecodedLength::new(30);
len.sub_if(20);
assert_eq!(len.0, 10);
}
#[test]
fn sub_if_chunked() {
let mut len = DecodedLength::CHUNKED;
len.sub_if(20);
assert_eq!(len, DecodedLength::CHUNKED);
}
}
} }