perf(http1): implement an adaptive read buffer strategy
The default read strategy for HTTP/1 connections is now adaptive. It increases or decreases the size of the read buffer depending on the number of bytes that are received in a `read` call. If a transport continuously fills the read buffer, it will continue to grow (up to the `max_buf_size`), allowing for reading faster. If the transport consistently only fills a portion of the read buffer, it will be shrunk. This doesn't provide much benefit to small requests/responses, but benchmarks show it to be a noticeable improvement to throughput when streaming larger bodies. Closes #1708
This commit is contained in:
		| @@ -1,4 +1,5 @@ | ||||
| use std::cell::Cell; | ||||
| use std::cmp; | ||||
| use std::collections::VecDeque; | ||||
| use std::fmt; | ||||
| use std::io; | ||||
| @@ -60,9 +61,7 @@ where | ||||
|             io: io, | ||||
|             read_blocked: false, | ||||
|             read_buf: BytesMut::with_capacity(0), | ||||
|             read_buf_strategy: ReadStrategy::Adaptive { | ||||
|                 max: DEFAULT_MAX_BUFFER_SIZE, | ||||
|             }, | ||||
|             read_buf_strategy: ReadStrategy::default(), | ||||
|             write_buf: WriteBuf::new(), | ||||
|         } | ||||
|     } | ||||
| @@ -81,9 +80,7 @@ where | ||||
|             "The max_buf_size cannot be smaller than {}.", | ||||
|             MINIMUM_MAX_BUFFER_SIZE, | ||||
|         ); | ||||
|         self.read_buf_strategy = ReadStrategy::Adaptive { | ||||
|             max, | ||||
|         }; | ||||
|         self.read_buf_strategy = ReadStrategy::with_max(max); | ||||
|         self.write_buf.max_buf_size = max; | ||||
|     } | ||||
|  | ||||
| @@ -149,18 +146,11 @@ where | ||||
|                     debug!("parsed {} headers", msg.head.headers.len()); | ||||
|                     return Ok(Async::Ready(msg)) | ||||
|                 }, | ||||
|                 None => match self.read_buf_strategy { | ||||
|                     ReadStrategy::Adaptive { max } => { | ||||
|                         if self.read_buf.len() >= max { | ||||
|                             debug!("max_buf_size ({}) reached, closing", max); | ||||
|                             return Err(::Error::new_too_large()); | ||||
|                         } | ||||
|                     }, | ||||
|                     ReadStrategy::Exact(exact) => { | ||||
|                         if self.read_buf.len() >= exact { | ||||
|                             debug!("exact buf size ({}) filled, closing", exact); | ||||
|                             return Err(::Error::new_too_large()); | ||||
|                         } | ||||
|                 None => { | ||||
|                     let max = self.read_buf_strategy.max(); | ||||
|                     if self.read_buf.len() >= max { | ||||
|                         debug!("max_buf_size ({}) reached, closing", max); | ||||
|                         return Err(::Error::new_too_large()); | ||||
|                     } | ||||
|                 }, | ||||
|             } | ||||
| @@ -177,22 +167,15 @@ where | ||||
|     pub fn read_from_io(&mut self) -> Poll<usize, io::Error> { | ||||
|         use bytes::BufMut; | ||||
|         self.read_blocked = false; | ||||
|         match self.read_buf_strategy { | ||||
|             ReadStrategy::Adaptive { .. } => { | ||||
|                 if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE { | ||||
|                     self.read_buf.reserve(INIT_BUFFER_SIZE); | ||||
|                 } | ||||
|             }, | ||||
|             ReadStrategy::Exact(exact) => { | ||||
|                 if self.read_buf.capacity() < exact { | ||||
|                     self.read_buf.reserve(exact); | ||||
|                 } | ||||
|             }, | ||||
|         let next = self.read_buf_strategy.next(); | ||||
|         if self.read_buf.remaining_mut() < next { | ||||
|             self.read_buf.reserve(next); | ||||
|         } | ||||
|         self.io.read_buf(&mut self.read_buf).map(|ok| { | ||||
|             match ok { | ||||
|                 Async::Ready(n) => { | ||||
|                     debug!("read {} bytes", n); | ||||
|                     self.read_buf_strategy.record(n); | ||||
|                     Async::Ready(n) | ||||
|                 }, | ||||
|                 Async::NotReady => { | ||||
| @@ -285,11 +268,82 @@ where | ||||
| #[derive(Clone, Copy, Debug)] | ||||
| enum ReadStrategy { | ||||
|     Adaptive { | ||||
|         decrease_now: bool, | ||||
|         next: usize, | ||||
|         max: usize | ||||
|     }, | ||||
|     Exact(usize), | ||||
| } | ||||
|  | ||||
| impl ReadStrategy { | ||||
|     fn with_max(max: usize) -> ReadStrategy { | ||||
|         ReadStrategy::Adaptive { | ||||
|             decrease_now: false, | ||||
|             next: INIT_BUFFER_SIZE, | ||||
|             max, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn next(&self) -> usize { | ||||
|         match *self { | ||||
|             ReadStrategy::Adaptive { next, .. } => next, | ||||
|             ReadStrategy::Exact(exact) => exact, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn max(&self) -> usize { | ||||
|         match *self { | ||||
|             ReadStrategy::Adaptive { max, .. } => max, | ||||
|             ReadStrategy::Exact(exact) => exact, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn record(&mut self, bytes_read: usize) { | ||||
|         match *self { | ||||
|             ReadStrategy::Adaptive { ref mut decrease_now, ref mut next, max, .. } => { | ||||
|                 if bytes_read >= *next { | ||||
|                     *next = cmp::min(incr_power_of_two(*next), max); | ||||
|                     *decrease_now = false; | ||||
|                 } else { | ||||
|                     let decr_to = prev_power_of_two(*next); | ||||
|                     if bytes_read < decr_to { | ||||
|                         if *decrease_now { | ||||
|                             *next = cmp::max(decr_to, INIT_BUFFER_SIZE); | ||||
|                             *decrease_now = false; | ||||
|                         } else { | ||||
|                             // Decreasing is a two "record" process. | ||||
|                             *decrease_now = true; | ||||
|                         } | ||||
|                     } else { | ||||
|                         // A read within the current range should cancel | ||||
|                         // a potential decrease, since we just saw proof | ||||
|                         // that we still need this size. | ||||
|                         *decrease_now = false; | ||||
|                     } | ||||
|                 } | ||||
|             }, | ||||
|             _ => (), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn incr_power_of_two(n: usize) -> usize { | ||||
|     n.saturating_mul(2) | ||||
| } | ||||
|  | ||||
| fn prev_power_of_two(n: usize) -> usize { | ||||
|     // Only way this shift can underflow is if n is less than 4. | ||||
|     // (Which would means `usize::MAX >> 64` and underflowed!) | ||||
|     debug_assert!(n >= 4); | ||||
|     (::std::usize::MAX >> (n.leading_zeros() + 2)) + 1 | ||||
| } | ||||
|  | ||||
| impl Default for ReadStrategy { | ||||
|     fn default() -> ReadStrategy { | ||||
|         ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct Cursor<T> { | ||||
|     bytes: T, | ||||
| @@ -637,6 +691,97 @@ mod tests { | ||||
|         assert!(buffered.io.blocked()); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn read_strategy_adaptive_increments() { | ||||
|         let mut strategy = ReadStrategy::default(); | ||||
|         assert_eq!(strategy.next(), 8192); | ||||
|  | ||||
|         // Grows if record == next | ||||
|         strategy.record(8192); | ||||
|         assert_eq!(strategy.next(), 16384); | ||||
|  | ||||
|         strategy.record(16384); | ||||
|         assert_eq!(strategy.next(), 32768); | ||||
|  | ||||
|         // Enormous records still increment at same rate | ||||
|         strategy.record(::std::usize::MAX); | ||||
|         assert_eq!(strategy.next(), 65536); | ||||
|  | ||||
|         let max = strategy.max(); | ||||
|         while strategy.next() < max { | ||||
|             strategy.record(max); | ||||
|         } | ||||
|  | ||||
|         assert_eq!(strategy.next(), max, "never goes over max"); | ||||
|         strategy.record(max + 1); | ||||
|         assert_eq!(strategy.next(), max, "never goes over max"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn read_strategy_adaptive_decrements() { | ||||
|         let mut strategy = ReadStrategy::default(); | ||||
|         strategy.record(8192); | ||||
|         assert_eq!(strategy.next(), 16384); | ||||
|  | ||||
|         strategy.record(1); | ||||
|         assert_eq!(strategy.next(), 16384, "first smaller record doesn't decrement yet"); | ||||
|         strategy.record(8192); | ||||
|         assert_eq!(strategy.next(), 16384, "record was with range"); | ||||
|  | ||||
|         strategy.record(1); | ||||
|         assert_eq!(strategy.next(), 16384, "in-range record should make this the 'first' again"); | ||||
|  | ||||
|         strategy.record(1); | ||||
|         assert_eq!(strategy.next(), 8192, "second smaller record decrements"); | ||||
|  | ||||
|         strategy.record(1); | ||||
|         assert_eq!(strategy.next(), 8192, "first doesn't decrement"); | ||||
|         strategy.record(1); | ||||
|         assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn read_strategy_adaptive_stays_the_same() { | ||||
|         let mut strategy = ReadStrategy::default(); | ||||
|         strategy.record(8192); | ||||
|         assert_eq!(strategy.next(), 16384); | ||||
|  | ||||
|         strategy.record(8193); | ||||
|         assert_eq!(strategy.next(), 16384, "first smaller record doesn't decrement yet"); | ||||
|  | ||||
|         strategy.record(8193); | ||||
|         assert_eq!(strategy.next(), 16384, "with current step does not decrement"); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     fn read_strategy_adaptive_max_fuzz() { | ||||
|         fn fuzz(max: usize) { | ||||
|             let mut strategy = ReadStrategy::with_max(max); | ||||
|             while strategy.next() < max { | ||||
|                 strategy.record(::std::usize::MAX); | ||||
|             } | ||||
|             let mut next = strategy.next(); | ||||
|             while next > 8192 { | ||||
|                 strategy.record(1); | ||||
|                 strategy.record(1); | ||||
|                 next = strategy.next(); | ||||
|                 assert!( | ||||
|                     next.is_power_of_two(), | ||||
|                     "decrement should be powers of two: {} (max = {})", | ||||
|                     next, | ||||
|                     max, | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         let mut max = 8192; | ||||
|         while max < ::std::usize::MAX { | ||||
|             fuzz(max); | ||||
|             max = (max / 2).saturating_mul(3); | ||||
|         } | ||||
|         fuzz(::std::usize::MAX); | ||||
|     } | ||||
|  | ||||
|     #[test] | ||||
|     #[should_panic] | ||||
|     fn write_buf_requires_non_empty_bufs() { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user