refactor(http2): store bdp sampler in Body H2 variant
This commit is contained in:
@@ -27,7 +27,7 @@ futures-util = { version = "0.3", default-features = false }
|
|||||||
http = "0.2"
|
http = "0.2"
|
||||||
http-body = "0.3.1"
|
http-body = "0.3.1"
|
||||||
httparse = "1.0"
|
httparse = "1.0"
|
||||||
h2 = "0.2.1"
|
h2 = "0.2.2"
|
||||||
itoa = "0.4.1"
|
itoa = "0.4.1"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
pin-project = "0.4"
|
pin-project = "0.4"
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ enum Kind {
|
|||||||
rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
|
rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
|
||||||
},
|
},
|
||||||
H2 {
|
H2 {
|
||||||
|
bdp: bdp::Sampler,
|
||||||
content_length: DecodedLength,
|
content_length: DecodedLength,
|
||||||
recv: h2::RecvStream,
|
recv: h2::RecvStream,
|
||||||
},
|
},
|
||||||
@@ -62,11 +63,6 @@ struct Extra {
|
|||||||
/// connection yet.
|
/// connection yet.
|
||||||
delayed_eof: Option<DelayEof>,
|
delayed_eof: Option<DelayEof>,
|
||||||
on_upgrade: OnUpgrade,
|
on_upgrade: OnUpgrade,
|
||||||
|
|
||||||
/// Records bytes read to compute the BDP.
|
|
||||||
///
|
|
||||||
/// Only used with `H2` variant.
|
|
||||||
h2_bdp: bdp::Sampler,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type DelayEofUntil = oneshot::Receiver<Never>;
|
type DelayEofUntil = oneshot::Receiver<Never>;
|
||||||
@@ -186,15 +182,12 @@ impl Body {
|
|||||||
content_length: DecodedLength,
|
content_length: DecodedLength,
|
||||||
bdp: bdp::Sampler,
|
bdp: bdp::Sampler,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let mut body = Body::new(Kind::H2 {
|
let body = Body::new(Kind::H2 {
|
||||||
|
bdp,
|
||||||
content_length,
|
content_length,
|
||||||
recv,
|
recv,
|
||||||
});
|
});
|
||||||
|
|
||||||
if bdp.is_enabled() {
|
|
||||||
body.extra_mut().h2_bdp = bdp;
|
|
||||||
}
|
|
||||||
|
|
||||||
body
|
body
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -220,7 +213,6 @@ impl Body {
|
|||||||
Box::new(Extra {
|
Box::new(Extra {
|
||||||
delayed_eof: None,
|
delayed_eof: None,
|
||||||
on_upgrade: OnUpgrade::none(),
|
on_upgrade: OnUpgrade::none(),
|
||||||
h2_bdp: bdp::disabled(),
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -273,15 +265,14 @@ impl Body {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Kind::H2 {
|
Kind::H2 {
|
||||||
|
ref bdp,
|
||||||
recv: ref mut h2,
|
recv: ref mut h2,
|
||||||
content_length: ref mut len,
|
content_length: ref mut len,
|
||||||
} => match ready!(h2.poll_data(cx)) {
|
} => match ready!(h2.poll_data(cx)) {
|
||||||
Some(Ok(bytes)) => {
|
Some(Ok(bytes)) => {
|
||||||
let _ = h2.flow_control().release_capacity(bytes.len());
|
let _ = h2.flow_control().release_capacity(bytes.len());
|
||||||
len.sub_if(bytes.len() as u64);
|
len.sub_if(bytes.len() as u64);
|
||||||
if let Some(ref extra) = self.extra {
|
bdp.sample(bytes.len());
|
||||||
extra.h2_bdp.sample(bytes.len());
|
|
||||||
}
|
|
||||||
Poll::Ready(Some(Ok(bytes)))
|
Poll::Ready(Some(Ok(bytes)))
|
||||||
}
|
}
|
||||||
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
|
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ type WindowSize = u32;
|
|||||||
/// Any higher than this likely will be hitting the TCP flow control.
|
/// Any higher than this likely will be hitting the TCP flow control.
|
||||||
const BDP_LIMIT: usize = 1024 * 1024 * 16;
|
const BDP_LIMIT: usize = 1024 * 1024 * 16;
|
||||||
|
|
||||||
pub(crate) fn disabled() -> Sampler {
|
pub(super) fn disabled() -> Sampler {
|
||||||
Sampler {
|
Sampler {
|
||||||
shared: Weak::new(),
|
shared: Weak::new(),
|
||||||
}
|
}
|
||||||
@@ -105,10 +105,6 @@ impl Sampler {
|
|||||||
|
|
||||||
inner.bytes += bytes;
|
inner.bytes += bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn is_enabled(&self) -> bool {
|
|
||||||
self.shared.strong_count() > 0
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Estimator {
|
impl Estimator {
|
||||||
|
|||||||
Reference in New Issue
Block a user