935 lines
29 KiB
Rust
935 lines
29 KiB
Rust
use std::cmp;
|
|
use std::fmt;
|
|
use std::io::{self, IoSlice};
|
|
use std::marker::Unpin;
|
|
use std::mem::MaybeUninit;
|
|
|
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
|
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
|
|
|
use super::{Http1Transaction, ParseContext, ParsedMessage};
|
|
use crate::common::buf::BufList;
|
|
use crate::common::{task, Pin, Poll};
|
|
|
|
/// The initial buffer size allocated before trying to read from IO.
|
|
pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
|
|
|
|
/// The minimum value that can be set to max buffer size.
|
|
pub(crate) const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE;
|
|
|
|
/// The default maximum read buffer size. If the buffer gets this big and
|
|
/// a message is still not complete, a `TooLarge` error is triggered.
|
|
// Note: if this changes, update server::conn::Http::max_buf_size docs.
|
|
pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
|
|
|
|
/// The maximum number of distinct `Buf`s to hold in a list before requiring
|
|
/// a flush. Only affects when the buffer strategy is to queue buffers.
|
|
///
|
|
/// Note that a flush can happen before reaching the maximum. This simply
|
|
/// forces a flush if the queue gets this big.
|
|
const MAX_BUF_LIST_BUFFERS: usize = 16;
|
|
|
|
pub(crate) struct Buffered<T, B> {
|
|
flush_pipeline: bool,
|
|
io: T,
|
|
read_blocked: bool,
|
|
read_buf: BytesMut,
|
|
read_buf_strategy: ReadStrategy,
|
|
write_buf: WriteBuf<B>,
|
|
}
|
|
|
|
impl<T, B> fmt::Debug for Buffered<T, B>
|
|
where
|
|
B: Buf,
|
|
{
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.debug_struct("Buffered")
|
|
.field("read_buf", &self.read_buf)
|
|
.field("write_buf", &self.write_buf)
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl<T, B> Buffered<T, B>
|
|
where
|
|
T: AsyncRead + AsyncWrite + Unpin,
|
|
B: Buf,
|
|
{
|
|
pub(crate) fn new(io: T) -> Buffered<T, B> {
|
|
let strategy = if io.is_write_vectored() {
|
|
WriteStrategy::Queue
|
|
} else {
|
|
WriteStrategy::Flatten
|
|
};
|
|
let write_buf = WriteBuf::new(strategy);
|
|
Buffered {
|
|
flush_pipeline: false,
|
|
io,
|
|
read_blocked: false,
|
|
read_buf: BytesMut::with_capacity(0),
|
|
read_buf_strategy: ReadStrategy::default(),
|
|
write_buf,
|
|
}
|
|
}
|
|
|
|
#[cfg(feature = "server")]
|
|
pub(crate) fn set_flush_pipeline(&mut self, enabled: bool) {
|
|
debug_assert!(!self.write_buf.has_remaining());
|
|
self.flush_pipeline = enabled;
|
|
if enabled {
|
|
self.set_write_strategy_flatten();
|
|
}
|
|
}
|
|
|
|
pub(crate) fn set_max_buf_size(&mut self, max: usize) {
|
|
assert!(
|
|
max >= MINIMUM_MAX_BUFFER_SIZE,
|
|
"The max_buf_size cannot be smaller than {}.",
|
|
MINIMUM_MAX_BUFFER_SIZE,
|
|
);
|
|
self.read_buf_strategy = ReadStrategy::with_max(max);
|
|
self.write_buf.max_buf_size = max;
|
|
}
|
|
|
|
#[cfg(feature = "client")]
|
|
pub(crate) fn set_read_buf_exact_size(&mut self, sz: usize) {
|
|
self.read_buf_strategy = ReadStrategy::Exact(sz);
|
|
}
|
|
|
|
#[cfg(feature = "server")]
|
|
pub(crate) fn set_write_strategy_flatten(&mut self) {
|
|
// this should always be called only at construction time,
|
|
// so this assert is here to catch myself
|
|
debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
|
|
self.write_buf.set_strategy(WriteStrategy::Flatten);
|
|
}
|
|
|
|
pub(crate) fn read_buf(&self) -> &[u8] {
|
|
self.read_buf.as_ref()
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[cfg(feature = "nightly")]
|
|
pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut {
|
|
&mut self.read_buf
|
|
}
|
|
|
|
/// Return the "allocated" available space, not the potential space
|
|
/// that could be allocated in the future.
|
|
fn read_buf_remaining_mut(&self) -> usize {
|
|
self.read_buf.capacity() - self.read_buf.len()
|
|
}
|
|
|
|
pub(crate) fn headers_buf(&mut self) -> &mut Vec<u8> {
|
|
let buf = self.write_buf.headers_mut();
|
|
&mut buf.bytes
|
|
}
|
|
|
|
pub(super) fn write_buf(&mut self) -> &mut WriteBuf<B> {
|
|
&mut self.write_buf
|
|
}
|
|
|
|
pub(crate) fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
|
|
self.write_buf.buffer(buf)
|
|
}
|
|
|
|
pub(crate) fn can_buffer(&self) -> bool {
|
|
self.flush_pipeline || self.write_buf.can_buffer()
|
|
}
|
|
|
|
pub(crate) fn consume_leading_lines(&mut self) {
|
|
if !self.read_buf.is_empty() {
|
|
let mut i = 0;
|
|
while i < self.read_buf.len() {
|
|
match self.read_buf[i] {
|
|
b'\r' | b'\n' => i += 1,
|
|
_ => break,
|
|
}
|
|
}
|
|
self.read_buf.advance(i);
|
|
}
|
|
}
|
|
|
|
pub(super) fn parse<S>(
|
|
&mut self,
|
|
cx: &mut task::Context<'_>,
|
|
parse_ctx: ParseContext<'_>,
|
|
) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
|
|
where
|
|
S: Http1Transaction,
|
|
{
|
|
loop {
|
|
match super::role::parse_headers::<S>(
|
|
&mut self.read_buf,
|
|
ParseContext {
|
|
cached_headers: parse_ctx.cached_headers,
|
|
req_method: parse_ctx.req_method,
|
|
h1_parser_config: parse_ctx.h1_parser_config.clone(),
|
|
preserve_header_case: parse_ctx.preserve_header_case,
|
|
h09_responses: parse_ctx.h09_responses,
|
|
#[cfg(feature = "ffi")]
|
|
raw_headers: parse_ctx.raw_headers,
|
|
},
|
|
)? {
|
|
Some(msg) => {
|
|
debug!("parsed {} headers", msg.head.headers.len());
|
|
return Poll::Ready(Ok(msg));
|
|
}
|
|
None => {
|
|
let max = self.read_buf_strategy.max();
|
|
if self.read_buf.len() >= max {
|
|
debug!("max_buf_size ({}) reached, closing", max);
|
|
return Poll::Ready(Err(crate::Error::new_too_large()));
|
|
}
|
|
}
|
|
}
|
|
if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
|
|
trace!("parse eof");
|
|
return Poll::Ready(Err(crate::Error::new_incomplete()));
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) fn poll_read_from_io(
|
|
&mut self,
|
|
cx: &mut task::Context<'_>,
|
|
) -> Poll<io::Result<usize>> {
|
|
self.read_blocked = false;
|
|
let next = self.read_buf_strategy.next();
|
|
if self.read_buf_remaining_mut() < next {
|
|
self.read_buf.reserve(next);
|
|
}
|
|
|
|
let dst = self.read_buf.chunk_mut();
|
|
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
|
|
let mut buf = ReadBuf::uninit(dst);
|
|
match Pin::new(&mut self.io).poll_read(cx, &mut buf) {
|
|
Poll::Ready(Ok(_)) => {
|
|
let n = buf.filled().len();
|
|
unsafe {
|
|
// Safety: we just read that many bytes into the
|
|
// uninitialized part of the buffer, so this is okay.
|
|
// @tokio pls give me back `poll_read_buf` thanks
|
|
self.read_buf.advance_mut(n);
|
|
}
|
|
self.read_buf_strategy.record(n);
|
|
Poll::Ready(Ok(n))
|
|
}
|
|
Poll::Pending => {
|
|
self.read_blocked = true;
|
|
Poll::Pending
|
|
}
|
|
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
|
|
}
|
|
}
|
|
|
|
pub(crate) fn into_inner(self) -> (T, Bytes) {
|
|
(self.io, self.read_buf.freeze())
|
|
}
|
|
|
|
pub(crate) fn io_mut(&mut self) -> &mut T {
|
|
&mut self.io
|
|
}
|
|
|
|
pub(crate) fn is_read_blocked(&self) -> bool {
|
|
self.read_blocked
|
|
}
|
|
|
|
pub(crate) fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
|
if self.flush_pipeline && !self.read_buf.is_empty() {
|
|
Poll::Ready(Ok(()))
|
|
} else if self.write_buf.remaining() == 0 {
|
|
Pin::new(&mut self.io).poll_flush(cx)
|
|
} else {
|
|
if let WriteStrategy::Flatten = self.write_buf.strategy {
|
|
return self.poll_flush_flattened(cx);
|
|
}
|
|
|
|
const MAX_WRITEV_BUFS: usize = 64;
|
|
loop {
|
|
let n = {
|
|
let mut iovs = [IoSlice::new(&[]); MAX_WRITEV_BUFS];
|
|
let len = self.write_buf.chunks_vectored(&mut iovs);
|
|
ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
|
|
};
|
|
// TODO(eliza): we have to do this manually because
|
|
// `poll_write_buf` doesn't exist in Tokio 0.3 yet...when
|
|
// `poll_write_buf` comes back, the manual advance will need to leave!
|
|
self.write_buf.advance(n);
|
|
debug!("flushed {} bytes", n);
|
|
if self.write_buf.remaining() == 0 {
|
|
break;
|
|
} else if n == 0 {
|
|
trace!(
|
|
"write returned zero, but {} bytes remaining",
|
|
self.write_buf.remaining()
|
|
);
|
|
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
|
|
}
|
|
}
|
|
Pin::new(&mut self.io).poll_flush(cx)
|
|
}
|
|
}
|
|
|
|
/// Specialized version of `flush` when strategy is Flatten.
|
|
///
|
|
/// Since all buffered bytes are flattened into the single headers buffer,
|
|
/// that skips some bookkeeping around using multiple buffers.
|
|
fn poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
|
|
loop {
|
|
let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.chunk()))?;
|
|
debug!("flushed {} bytes", n);
|
|
self.write_buf.headers.advance(n);
|
|
if self.write_buf.headers.remaining() == 0 {
|
|
self.write_buf.headers.reset();
|
|
break;
|
|
} else if n == 0 {
|
|
trace!(
|
|
"write returned zero, but {} bytes remaining",
|
|
self.write_buf.remaining()
|
|
);
|
|
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
|
|
}
|
|
}
|
|
Pin::new(&mut self.io).poll_flush(cx)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
fn flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a {
|
|
futures_util::future::poll_fn(move |cx| self.poll_flush(cx))
|
|
}
|
|
}
|
|
|
|
// The `B` is a `Buf`, we never project a pin to it
|
|
impl<T: Unpin, B> Unpin for Buffered<T, B> {}
|
|
|
|
// TODO: This trait is old... at least rename to PollBytes or something...
|
|
pub(crate) trait MemRead {
|
|
fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>;
|
|
}
|
|
|
|
impl<T, B> MemRead for Buffered<T, B>
|
|
where
|
|
T: AsyncRead + AsyncWrite + Unpin,
|
|
B: Buf,
|
|
{
|
|
fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
|
|
if !self.read_buf.is_empty() {
|
|
let n = std::cmp::min(len, self.read_buf.len());
|
|
Poll::Ready(Ok(self.read_buf.split_to(n).freeze()))
|
|
} else {
|
|
let n = ready!(self.poll_read_from_io(cx))?;
|
|
Poll::Ready(Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()))
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
enum ReadStrategy {
|
|
Adaptive {
|
|
decrease_now: bool,
|
|
next: usize,
|
|
max: usize,
|
|
},
|
|
#[cfg(feature = "client")]
|
|
Exact(usize),
|
|
}
|
|
|
|
impl ReadStrategy {
|
|
fn with_max(max: usize) -> ReadStrategy {
|
|
ReadStrategy::Adaptive {
|
|
decrease_now: false,
|
|
next: INIT_BUFFER_SIZE,
|
|
max,
|
|
}
|
|
}
|
|
|
|
fn next(&self) -> usize {
|
|
match *self {
|
|
ReadStrategy::Adaptive { next, .. } => next,
|
|
#[cfg(feature = "client")]
|
|
ReadStrategy::Exact(exact) => exact,
|
|
}
|
|
}
|
|
|
|
fn max(&self) -> usize {
|
|
match *self {
|
|
ReadStrategy::Adaptive { max, .. } => max,
|
|
#[cfg(feature = "client")]
|
|
ReadStrategy::Exact(exact) => exact,
|
|
}
|
|
}
|
|
|
|
fn record(&mut self, bytes_read: usize) {
|
|
match *self {
|
|
ReadStrategy::Adaptive {
|
|
ref mut decrease_now,
|
|
ref mut next,
|
|
max,
|
|
..
|
|
} => {
|
|
if bytes_read >= *next {
|
|
*next = cmp::min(incr_power_of_two(*next), max);
|
|
*decrease_now = false;
|
|
} else {
|
|
let decr_to = prev_power_of_two(*next);
|
|
if bytes_read < decr_to {
|
|
if *decrease_now {
|
|
*next = cmp::max(decr_to, INIT_BUFFER_SIZE);
|
|
*decrease_now = false;
|
|
} else {
|
|
// Decreasing is a two "record" process.
|
|
*decrease_now = true;
|
|
}
|
|
} else {
|
|
// A read within the current range should cancel
|
|
// a potential decrease, since we just saw proof
|
|
// that we still need this size.
|
|
*decrease_now = false;
|
|
}
|
|
}
|
|
}
|
|
#[cfg(feature = "client")]
|
|
ReadStrategy::Exact(_) => (),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn incr_power_of_two(n: usize) -> usize {
|
|
n.saturating_mul(2)
|
|
}
|
|
|
|
fn prev_power_of_two(n: usize) -> usize {
|
|
// Only way this shift can underflow is if n is less than 4.
|
|
// (Which would means `usize::MAX >> 64` and underflowed!)
|
|
debug_assert!(n >= 4);
|
|
(::std::usize::MAX >> (n.leading_zeros() + 2)) + 1
|
|
}
|
|
|
|
impl Default for ReadStrategy {
|
|
fn default() -> ReadStrategy {
|
|
ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub(crate) struct Cursor<T> {
|
|
bytes: T,
|
|
pos: usize,
|
|
}
|
|
|
|
impl<T: AsRef<[u8]>> Cursor<T> {
|
|
#[inline]
|
|
pub(crate) fn new(bytes: T) -> Cursor<T> {
|
|
Cursor { bytes, pos: 0 }
|
|
}
|
|
}
|
|
|
|
impl Cursor<Vec<u8>> {
|
|
/// If we've advanced the position a bit in this cursor, and wish to
|
|
/// extend the underlying vector, we may wish to unshift the "read" bytes
|
|
/// off, and move everything else over.
|
|
fn maybe_unshift(&mut self, additional: usize) {
|
|
if self.pos == 0 {
|
|
// nothing to do
|
|
return;
|
|
}
|
|
|
|
if self.bytes.capacity() - self.bytes.len() >= additional {
|
|
// there's room!
|
|
return;
|
|
}
|
|
|
|
self.bytes.drain(0..self.pos);
|
|
self.pos = 0;
|
|
}
|
|
|
|
fn reset(&mut self) {
|
|
self.pos = 0;
|
|
self.bytes.clear();
|
|
}
|
|
}
|
|
|
|
impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.debug_struct("Cursor")
|
|
.field("pos", &self.pos)
|
|
.field("len", &self.bytes.as_ref().len())
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl<T: AsRef<[u8]>> Buf for Cursor<T> {
|
|
#[inline]
|
|
fn remaining(&self) -> usize {
|
|
self.bytes.as_ref().len() - self.pos
|
|
}
|
|
|
|
#[inline]
|
|
fn chunk(&self) -> &[u8] {
|
|
&self.bytes.as_ref()[self.pos..]
|
|
}
|
|
|
|
#[inline]
|
|
fn advance(&mut self, cnt: usize) {
|
|
debug_assert!(self.pos + cnt <= self.bytes.as_ref().len());
|
|
self.pos += cnt;
|
|
}
|
|
}
|
|
|
|
// an internal buffer to collect writes before flushes
|
|
pub(super) struct WriteBuf<B> {
|
|
/// Re-usable buffer that holds message headers
|
|
headers: Cursor<Vec<u8>>,
|
|
max_buf_size: usize,
|
|
/// Deque of user buffers if strategy is Queue
|
|
queue: BufList<B>,
|
|
strategy: WriteStrategy,
|
|
}
|
|
|
|
impl<B: Buf> WriteBuf<B> {
|
|
fn new(strategy: WriteStrategy) -> WriteBuf<B> {
|
|
WriteBuf {
|
|
headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
|
|
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
|
|
queue: BufList::new(),
|
|
strategy,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<B> WriteBuf<B>
|
|
where
|
|
B: Buf,
|
|
{
|
|
#[cfg(feature = "server")]
|
|
fn set_strategy(&mut self, strategy: WriteStrategy) {
|
|
self.strategy = strategy;
|
|
}
|
|
|
|
pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
|
|
debug_assert!(buf.has_remaining());
|
|
match self.strategy {
|
|
WriteStrategy::Flatten => {
|
|
let head = self.headers_mut();
|
|
|
|
head.maybe_unshift(buf.remaining());
|
|
trace!(
|
|
self.len = head.remaining(),
|
|
buf.len = buf.remaining(),
|
|
"buffer.flatten"
|
|
);
|
|
//perf: This is a little faster than <Vec as BufMut>>::put,
|
|
//but accomplishes the same result.
|
|
loop {
|
|
let adv = {
|
|
let slice = buf.chunk();
|
|
if slice.is_empty() {
|
|
return;
|
|
}
|
|
head.bytes.extend_from_slice(slice);
|
|
slice.len()
|
|
};
|
|
buf.advance(adv);
|
|
}
|
|
}
|
|
WriteStrategy::Queue => {
|
|
trace!(
|
|
self.len = self.remaining(),
|
|
buf.len = buf.remaining(),
|
|
"buffer.queue"
|
|
);
|
|
self.queue.push(buf.into());
|
|
}
|
|
}
|
|
}
|
|
|
|
fn can_buffer(&self) -> bool {
|
|
match self.strategy {
|
|
WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
|
|
WriteStrategy::Queue => {
|
|
self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
|
|
}
|
|
}
|
|
}
|
|
|
|
fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> {
|
|
debug_assert!(!self.queue.has_remaining());
|
|
&mut self.headers
|
|
}
|
|
}
|
|
|
|
impl<B: Buf> fmt::Debug for WriteBuf<B> {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
f.debug_struct("WriteBuf")
|
|
.field("remaining", &self.remaining())
|
|
.field("strategy", &self.strategy)
|
|
.finish()
|
|
}
|
|
}
|
|
|
|
impl<B: Buf> Buf for WriteBuf<B> {
|
|
#[inline]
|
|
fn remaining(&self) -> usize {
|
|
self.headers.remaining() + self.queue.remaining()
|
|
}
|
|
|
|
#[inline]
|
|
fn chunk(&self) -> &[u8] {
|
|
let headers = self.headers.chunk();
|
|
if !headers.is_empty() {
|
|
headers
|
|
} else {
|
|
self.queue.chunk()
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn advance(&mut self, cnt: usize) {
|
|
let hrem = self.headers.remaining();
|
|
|
|
match hrem.cmp(&cnt) {
|
|
cmp::Ordering::Equal => self.headers.reset(),
|
|
cmp::Ordering::Greater => self.headers.advance(cnt),
|
|
cmp::Ordering::Less => {
|
|
let qcnt = cnt - hrem;
|
|
self.headers.reset();
|
|
self.queue.advance(qcnt);
|
|
}
|
|
}
|
|
}
|
|
|
|
#[inline]
|
|
fn chunks_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
|
|
let n = self.headers.chunks_vectored(dst);
|
|
self.queue.chunks_vectored(&mut dst[n..]) + n
|
|
}
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum WriteStrategy {
|
|
Flatten,
|
|
Queue,
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::time::Duration;
|
|
|
|
use tokio_test::io::Builder as Mock;
|
|
|
|
// #[cfg(feature = "nightly")]
|
|
// use test::Bencher;
|
|
|
|
/*
|
|
impl<T: Read> MemRead for AsyncIo<T> {
|
|
fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
|
|
let mut v = vec![0; len];
|
|
let n = try_nb!(self.read(v.as_mut_slice()));
|
|
Ok(Async::Ready(BytesMut::from(&v[..n]).freeze()))
|
|
}
|
|
}
|
|
*/
|
|
|
|
#[tokio::test]
|
|
#[ignore]
|
|
async fn iobuf_write_empty_slice() {
|
|
// TODO(eliza): can i have writev back pls T_T
|
|
// // First, let's just check that the Mock would normally return an
|
|
// // error on an unexpected write, even if the buffer is empty...
|
|
// let mut mock = Mock::new().build();
|
|
// futures_util::future::poll_fn(|cx| {
|
|
// Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[]))
|
|
// })
|
|
// .await
|
|
// .expect_err("should be a broken pipe");
|
|
|
|
// // underlying io will return the logic error upon write,
|
|
// // so we are testing that the io_buf does not trigger a write
|
|
// // when there is nothing to flush
|
|
// let mock = Mock::new().build();
|
|
// let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
|
// io_buf.flush().await.expect("should short-circuit flush");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn parse_reads_until_blocked() {
|
|
use crate::proto::h1::ClientTransaction;
|
|
|
|
let _ = pretty_env_logger::try_init();
|
|
let mock = Mock::new()
|
|
// Split over multiple reads will read all of it
|
|
.read(b"HTTP/1.1 200 OK\r\n")
|
|
.read(b"Server: hyper\r\n")
|
|
// missing last line ending
|
|
.wait(Duration::from_secs(1))
|
|
.build();
|
|
|
|
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
|
|
|
// We expect a `parse` to be not ready, and so can't await it directly.
|
|
// Rather, this `poll_fn` will wrap the `Poll` result.
|
|
futures_util::future::poll_fn(|cx| {
|
|
let parse_ctx = ParseContext {
|
|
cached_headers: &mut None,
|
|
req_method: &mut None,
|
|
h1_parser_config: Default::default(),
|
|
preserve_header_case: false,
|
|
h09_responses: false,
|
|
#[cfg(feature = "ffi")]
|
|
raw_headers: false,
|
|
};
|
|
assert!(buffered
|
|
.parse::<ClientTransaction>(cx, parse_ctx)
|
|
.is_pending());
|
|
Poll::Ready(())
|
|
})
|
|
.await;
|
|
|
|
assert_eq!(
|
|
buffered.read_buf,
|
|
b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn read_strategy_adaptive_increments() {
|
|
let mut strategy = ReadStrategy::default();
|
|
assert_eq!(strategy.next(), 8192);
|
|
|
|
// Grows if record == next
|
|
strategy.record(8192);
|
|
assert_eq!(strategy.next(), 16384);
|
|
|
|
strategy.record(16384);
|
|
assert_eq!(strategy.next(), 32768);
|
|
|
|
// Enormous records still increment at same rate
|
|
strategy.record(::std::usize::MAX);
|
|
assert_eq!(strategy.next(), 65536);
|
|
|
|
let max = strategy.max();
|
|
while strategy.next() < max {
|
|
strategy.record(max);
|
|
}
|
|
|
|
assert_eq!(strategy.next(), max, "never goes over max");
|
|
strategy.record(max + 1);
|
|
assert_eq!(strategy.next(), max, "never goes over max");
|
|
}
|
|
|
|
#[test]
|
|
fn read_strategy_adaptive_decrements() {
|
|
let mut strategy = ReadStrategy::default();
|
|
strategy.record(8192);
|
|
assert_eq!(strategy.next(), 16384);
|
|
|
|
strategy.record(1);
|
|
assert_eq!(
|
|
strategy.next(),
|
|
16384,
|
|
"first smaller record doesn't decrement yet"
|
|
);
|
|
strategy.record(8192);
|
|
assert_eq!(strategy.next(), 16384, "record was with range");
|
|
|
|
strategy.record(1);
|
|
assert_eq!(
|
|
strategy.next(),
|
|
16384,
|
|
"in-range record should make this the 'first' again"
|
|
);
|
|
|
|
strategy.record(1);
|
|
assert_eq!(strategy.next(), 8192, "second smaller record decrements");
|
|
|
|
strategy.record(1);
|
|
assert_eq!(strategy.next(), 8192, "first doesn't decrement");
|
|
strategy.record(1);
|
|
assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
|
|
}
|
|
|
|
#[test]
|
|
fn read_strategy_adaptive_stays_the_same() {
|
|
let mut strategy = ReadStrategy::default();
|
|
strategy.record(8192);
|
|
assert_eq!(strategy.next(), 16384);
|
|
|
|
strategy.record(8193);
|
|
assert_eq!(
|
|
strategy.next(),
|
|
16384,
|
|
"first smaller record doesn't decrement yet"
|
|
);
|
|
|
|
strategy.record(8193);
|
|
assert_eq!(
|
|
strategy.next(),
|
|
16384,
|
|
"with current step does not decrement"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn read_strategy_adaptive_max_fuzz() {
|
|
fn fuzz(max: usize) {
|
|
let mut strategy = ReadStrategy::with_max(max);
|
|
while strategy.next() < max {
|
|
strategy.record(::std::usize::MAX);
|
|
}
|
|
let mut next = strategy.next();
|
|
while next > 8192 {
|
|
strategy.record(1);
|
|
strategy.record(1);
|
|
next = strategy.next();
|
|
assert!(
|
|
next.is_power_of_two(),
|
|
"decrement should be powers of two: {} (max = {})",
|
|
next,
|
|
max,
|
|
);
|
|
}
|
|
}
|
|
|
|
let mut max = 8192;
|
|
while max < std::usize::MAX {
|
|
fuzz(max);
|
|
max = (max / 2).saturating_mul(3);
|
|
}
|
|
fuzz(::std::usize::MAX);
|
|
}
|
|
|
|
#[test]
|
|
#[should_panic]
|
|
#[cfg(debug_assertions)] // needs to trigger a debug_assert
|
|
fn write_buf_requires_non_empty_bufs() {
|
|
let mock = Mock::new().build();
|
|
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
|
|
|
buffered.buffer(Cursor::new(Vec::new()));
|
|
}
|
|
|
|
/*
|
|
TODO: needs tokio_test::io to allow configure write_buf calls
|
|
#[test]
|
|
fn write_buf_queue() {
|
|
let _ = pretty_env_logger::try_init();
|
|
|
|
let mock = AsyncIo::new_buf(vec![], 1024);
|
|
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
|
|
|
|
|
buffered.headers_buf().extend(b"hello ");
|
|
buffered.buffer(Cursor::new(b"world, ".to_vec()));
|
|
buffered.buffer(Cursor::new(b"it's ".to_vec()));
|
|
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
|
|
assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
|
|
buffered.flush().unwrap();
|
|
|
|
assert_eq!(buffered.io, b"hello world, it's hyper!");
|
|
assert_eq!(buffered.io.num_writes(), 1);
|
|
assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
|
|
}
|
|
*/
|
|
|
|
#[tokio::test]
|
|
async fn write_buf_flatten() {
|
|
let _ = pretty_env_logger::try_init();
|
|
|
|
let mock = Mock::new()
|
|
.write(b"hello world, it's hyper!")
|
|
.build();
|
|
|
|
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
|
buffered.write_buf.set_strategy(WriteStrategy::Flatten);
|
|
|
|
buffered.headers_buf().extend(b"hello ");
|
|
buffered.buffer(Cursor::new(b"world, ".to_vec()));
|
|
buffered.buffer(Cursor::new(b"it's ".to_vec()));
|
|
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
|
|
assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
|
|
|
|
buffered.flush().await.expect("flush");
|
|
}
|
|
|
|
#[test]
|
|
fn write_buf_flatten_partially_flushed() {
|
|
let _ = pretty_env_logger::try_init();
|
|
|
|
let b = |s: &str| Cursor::new(s.as_bytes().to_vec());
|
|
|
|
let mut write_buf = WriteBuf::<Cursor<Vec<u8>>>::new(WriteStrategy::Flatten);
|
|
|
|
write_buf.buffer(b("hello "));
|
|
write_buf.buffer(b("world, "));
|
|
|
|
assert_eq!(write_buf.chunk(), b"hello world, ");
|
|
|
|
// advance most of the way, but not all
|
|
write_buf.advance(11);
|
|
|
|
assert_eq!(write_buf.chunk(), b", ");
|
|
assert_eq!(write_buf.headers.pos, 11);
|
|
assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE);
|
|
|
|
// there's still room in the headers buffer, so just push on the end
|
|
write_buf.buffer(b("it's hyper!"));
|
|
|
|
assert_eq!(write_buf.chunk(), b", it's hyper!");
|
|
assert_eq!(write_buf.headers.pos, 11);
|
|
|
|
let rem1 = write_buf.remaining();
|
|
let cap = write_buf.headers.bytes.capacity();
|
|
|
|
// but when this would go over capacity, don't copy the old bytes
|
|
write_buf.buffer(Cursor::new(vec![b'X'; cap]));
|
|
assert_eq!(write_buf.remaining(), cap + rem1);
|
|
assert_eq!(write_buf.headers.pos, 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn write_buf_queue_disable_auto() {
|
|
let _ = pretty_env_logger::try_init();
|
|
|
|
let mock = Mock::new()
|
|
.write(b"hello ")
|
|
.write(b"world, ")
|
|
.write(b"it's ")
|
|
.write(b"hyper!")
|
|
.build();
|
|
|
|
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
|
|
buffered.write_buf.set_strategy(WriteStrategy::Queue);
|
|
|
|
// we have 4 buffers, and vec IO disabled, but explicitly said
|
|
// don't try to auto detect (via setting strategy above)
|
|
|
|
buffered.headers_buf().extend(b"hello ");
|
|
buffered.buffer(Cursor::new(b"world, ".to_vec()));
|
|
buffered.buffer(Cursor::new(b"it's ".to_vec()));
|
|
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
|
|
assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
|
|
|
|
buffered.flush().await.expect("flush");
|
|
|
|
assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
|
|
}
|
|
|
|
// #[cfg(feature = "nightly")]
|
|
// #[bench]
|
|
// fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) {
|
|
// let s = "Hello, World!";
|
|
// b.bytes = s.len() as u64;
|
|
|
|
// let mut write_buf = WriteBuf::<bytes::Bytes>::new();
|
|
// write_buf.set_strategy(WriteStrategy::Flatten);
|
|
// b.iter(|| {
|
|
// let chunk = bytes::Bytes::from(s);
|
|
// write_buf.buffer(chunk);
|
|
// ::test::black_box(&write_buf);
|
|
// write_buf.headers.bytes.clear();
|
|
// })
|
|
// }
|
|
}
|