perf(http2): slow adaptive window pings as the BDP stabilizes (#2550)
This introduces a delay to sending a ping to calculate the BDP that becomes shorter as the BDP is changing, to improve throughput quickly, but then also becomes longer as the BDP stabilizes, to reduce the amount of pings sent. This improved the performance of the adaptive window end_to_end benchmark. It should also reduce the amount of pings the remote has to deal with, hopefully preventing hyper from triggering ENHANCE_YOUR_CALM errors.
This commit is contained in:
@@ -51,9 +51,15 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)
|
||||
bdp: wnd,
|
||||
max_bandwidth: 0.0,
|
||||
rtt: 0.0,
|
||||
ping_delay: Duration::from_millis(100),
|
||||
stable_count: 0,
|
||||
});
|
||||
|
||||
let bytes = bdp.as_ref().map(|_| 0);
|
||||
let (bytes, next_bdp_at) = if bdp.is_some() {
|
||||
(Some(0), Some(Instant::now()))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
#[cfg(feature = "runtime")]
|
||||
let keep_alive = config.keep_alive_interval.map(|interval| KeepAlive {
|
||||
@@ -75,6 +81,7 @@ pub(super) fn channel(ping_pong: PingPong, config: Config) -> (Recorder, Ponger)
|
||||
is_keep_alive_timed_out: false,
|
||||
ping_pong,
|
||||
ping_sent_at: None,
|
||||
next_bdp_at,
|
||||
}));
|
||||
|
||||
(
|
||||
@@ -125,6 +132,9 @@ struct Shared {
|
||||
/// If `Some`, bdp is enabled, and this tracks how many bytes have been
|
||||
/// read during the current sample.
|
||||
bytes: Option<usize>,
|
||||
/// We delay a variable amount of time between BDP pings. This allows us
|
||||
/// to send less pings as the bandwidth stabilizes.
|
||||
next_bdp_at: Option<Instant>,
|
||||
|
||||
// keep-alive
|
||||
/// If `Some`, keep-alive is enabled, and the Instant is how long ago
|
||||
@@ -143,6 +153,12 @@ struct Bdp {
|
||||
max_bandwidth: f64,
|
||||
/// Round trip time in seconds
|
||||
rtt: f64,
|
||||
/// Delay the next ping by this amount.
|
||||
///
|
||||
/// This will change depending on how stable the current bandwidth is.
|
||||
ping_delay: Duration,
|
||||
/// The count of ping round trips where BDP has stayed the same.
|
||||
stable_count: u32,
|
||||
}
|
||||
|
||||
#[cfg(feature = "runtime")]
|
||||
@@ -207,6 +223,17 @@ impl Recorder {
|
||||
#[cfg(feature = "runtime")]
|
||||
locked.update_last_read_at();
|
||||
|
||||
// are we ready to send another bdp ping?
|
||||
// if not, we don't need to record bytes either
|
||||
|
||||
if let Some(ref next_bdp_at) = locked.next_bdp_at {
|
||||
if Instant::now() < *next_bdp_at {
|
||||
return;
|
||||
} else {
|
||||
locked.next_bdp_at = None;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref mut bytes) = locked.bytes {
|
||||
*bytes += len;
|
||||
} else {
|
||||
@@ -265,6 +292,7 @@ impl Recorder {
|
||||
|
||||
impl Ponger {
|
||||
pub(super) fn poll(&mut self, cx: &mut task::Context<'_>) -> Poll<Ponged> {
|
||||
let now = Instant::now();
|
||||
let mut locked = self.shared.lock().unwrap();
|
||||
#[cfg(feature = "runtime")]
|
||||
let is_idle = self.is_idle();
|
||||
@@ -282,13 +310,13 @@ impl Ponger {
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
let (bytes, rtt) = match locked.ping_pong.poll_pong(cx) {
|
||||
match locked.ping_pong.poll_pong(cx) {
|
||||
Poll::Ready(Ok(_pong)) => {
|
||||
let rtt = locked
|
||||
let start = locked
|
||||
.ping_sent_at
|
||||
.expect("pong received implies ping_sent_at")
|
||||
.elapsed();
|
||||
.expect("pong received implies ping_sent_at");
|
||||
locked.ping_sent_at = None;
|
||||
let rtt = now - start;
|
||||
trace!("recv pong");
|
||||
|
||||
#[cfg(feature = "runtime")]
|
||||
@@ -299,19 +327,20 @@ impl Ponger {
|
||||
}
|
||||
}
|
||||
|
||||
if self.bdp.is_some() {
|
||||
if let Some(ref mut bdp) = self.bdp {
|
||||
let bytes = locked.bytes.expect("bdp enabled implies bytes");
|
||||
locked.bytes = Some(0); // reset
|
||||
trace!("received BDP ack; bytes = {}, rtt = {:?}", bytes, rtt);
|
||||
(bytes, rtt)
|
||||
} else {
|
||||
// no bdp, done!
|
||||
return Poll::Pending;
|
||||
|
||||
let update = bdp.calculate(bytes, rtt);
|
||||
locked.next_bdp_at = Some(now + bdp.ping_delay);
|
||||
if let Some(update) = update {
|
||||
return Poll::Ready(Ponged::SizeUpdate(update))
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
debug!("pong error: {}", e);
|
||||
return Poll::Pending;
|
||||
}
|
||||
Poll::Pending => {
|
||||
#[cfg(feature = "runtime")]
|
||||
@@ -324,19 +353,11 @@ impl Ponger {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Poll::Pending;
|
||||
}
|
||||
};
|
||||
|
||||
drop(locked);
|
||||
|
||||
if let Some(bdp) = self.bdp.as_mut().and_then(|bdp| bdp.calculate(bytes, rtt)) {
|
||||
Poll::Ready(Ponged::SizeUpdate(bdp))
|
||||
} else {
|
||||
// XXX: this doesn't register a waker...?
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
// XXX: this doesn't register a waker...?
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
#[cfg(feature = "runtime")]
|
||||
@@ -386,6 +407,7 @@ impl Bdp {
|
||||
fn calculate(&mut self, bytes: usize, rtt: Duration) -> Option<WindowSize> {
|
||||
// No need to do any math if we're at the limit.
|
||||
if self.bdp as usize == BDP_LIMIT {
|
||||
self.stabilize_delay();
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -405,6 +427,7 @@ impl Bdp {
|
||||
|
||||
if bw < self.max_bandwidth {
|
||||
// not a faster bandwidth, so don't update
|
||||
self.stabilize_delay();
|
||||
return None;
|
||||
} else {
|
||||
self.max_bandwidth = bw;
|
||||
@@ -415,11 +438,26 @@ impl Bdp {
|
||||
if bytes >= self.bdp as usize * 2 / 3 {
|
||||
self.bdp = (bytes * 2).min(BDP_LIMIT) as WindowSize;
|
||||
trace!("BDP increased to {}", self.bdp);
|
||||
|
||||
self.stable_count = 0;
|
||||
self.ping_delay /= 2;
|
||||
Some(self.bdp)
|
||||
} else {
|
||||
self.stabilize_delay();
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn stabilize_delay(&mut self) {
|
||||
if self.ping_delay < Duration::from_secs(10) {
|
||||
self.stable_count += 1;
|
||||
|
||||
if self.stable_count >= 2 {
|
||||
self.ping_delay *= 4;
|
||||
self.stable_count = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn seconds(dur: Duration) -> f64 {
|
||||
|
||||
Reference in New Issue
Block a user