fix(http1): reduce memory used with flatten write strategy

If the write buffer was filled with large bufs from the user, such that
it couldn't be fully written to the transport, the write buffer could
start to grow significantly as it moved its cursor without shifting over
the unwritten bytes.

This will now try to shift over the unwritten bytes if the next buf
wouldn't fit in the already allocated space.
This commit is contained in:
Sean McArthur
2021-05-26 16:47:36 -07:00
parent e61b494e3b
commit eb0c646395

View File

@@ -56,7 +56,12 @@ where
B: Buf,
{
pub(crate) fn new(io: T) -> Buffered<T, B> {
let write_buf = WriteBuf::new(&io);
let strategy = if io.is_write_vectored() {
WriteStrategy::Queue
} else {
WriteStrategy::Flatten
};
let write_buf = WriteBuf::new(strategy);
Buffered {
flush_pipeline: false,
io,
@@ -419,6 +424,24 @@ impl<T: AsRef<[u8]>> Cursor<T> {
}
impl Cursor<Vec<u8>> {
/// If we've advanced the position a bit in this cursor, and wish to
/// extend the underlying vector, we may wish to unshift the "read" bytes
/// off, and move everything else over.
fn maybe_unshift(&mut self, additional: usize) {
if self.pos == 0 {
// nothing to do
return;
}
if self.bytes.capacity() - self.bytes.len() >= additional {
// there's room!
return;
}
self.bytes.drain(0..self.pos);
self.pos = 0;
}
fn reset(&mut self) {
self.pos = 0;
self.bytes.clear();
@@ -463,12 +486,7 @@ pub(super) struct WriteBuf<B> {
}
impl<B: Buf> WriteBuf<B> {
fn new(io: &impl AsyncWrite) -> WriteBuf<B> {
let strategy = if io.is_write_vectored() {
WriteStrategy::Queue
} else {
WriteStrategy::Flatten
};
fn new(strategy: WriteStrategy) -> WriteBuf<B> {
WriteBuf {
headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
@@ -492,6 +510,8 @@ where
match self.strategy {
WriteStrategy::Flatten => {
let head = self.headers_mut();
head.maybe_unshift(buf.remaining());
//perf: This is a little faster than <Vec as BufMut>>::put,
//but accomplishes the same result.
loop {
@@ -804,7 +824,6 @@ mod tests {
let _ = pretty_env_logger::try_init();
let mock = Mock::new()
// Just a single write
.write(b"hello world, it's hyper!")
.build();
@@ -820,6 +839,41 @@ mod tests {
buffered.flush().await.expect("flush");
}
#[test]
fn write_buf_flatten_partially_flushed() {
let _ = pretty_env_logger::try_init();
let b = |s: &str| Cursor::new(s.as_bytes().to_vec());
let mut write_buf = WriteBuf::<Cursor<Vec<u8>>>::new(WriteStrategy::Flatten);
write_buf.buffer(b("hello "));
write_buf.buffer(b("world, "));
assert_eq!(write_buf.chunk(), b"hello world, ");
// advance most of the way, but not all
write_buf.advance(11);
assert_eq!(write_buf.chunk(), b", ");
assert_eq!(write_buf.headers.pos, 11);
assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE);
// there's still room in the headers buffer, so just push on the end
write_buf.buffer(b("it's hyper!"));
assert_eq!(write_buf.chunk(), b", it's hyper!");
assert_eq!(write_buf.headers.pos, 11);
let rem1 = write_buf.remaining();
let cap = write_buf.headers.bytes.capacity();
// but when this would go over capacity, don't copy the old bytes
write_buf.buffer(Cursor::new(vec![b'X'; cap]));
assert_eq!(write_buf.remaining(), cap + rem1);
assert_eq!(write_buf.headers.pos, 0);
}
#[tokio::test]
async fn write_buf_queue_disable_auto() {
let _ = pretty_env_logger::try_init();