Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 234 additions & 8 deletions crates/types/src/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,16 @@ impl RetryPolicy {
last_retry: None,
}
}

/// Build a `RetryIter` as if `attempts_made` attempts already happened.
/// The next delay returned will correspond to attempt `attempts_made + 1`,
/// honoring the configured policy (including per-step `max_interval` clamping
/// for exponential policies).
pub fn iter_after(&self, attempts_made: usize) -> RetryIter<'_> {
let mut iter = self.iter();
iter.fast_forward(attempts_made);
iter
}
}

impl IntoIterator for RetryPolicy {
Expand Down Expand Up @@ -345,7 +355,7 @@ impl RetryIter<'_> {
}

pub fn remaining_attempts(&self) -> usize {
self.max_attempts() - self.attempts()
self.max_attempts().saturating_sub(self.attempts())
}

pub fn is_infinite(&self) -> bool {
Expand Down Expand Up @@ -426,6 +436,46 @@ impl RetryIter<'_> {
self.last_retry
}

/// Advance internal state by `n` attempts without producing delays.
/// For exponential policies this replays the per-step `max_interval` clamp
/// so subsequent delays match the configured progression exactly.
pub fn fast_forward(&mut self, n: usize) {
if n == 0 {
return;
}
let steps_to_replay = n.min(self.remaining_attempts());
if let RetryPolicy::Exponential {
initial_interval,
factor,
max_interval,
..
} = self.policy.as_ref()
&& steps_to_replay > 0
{
let cap = max_interval.unwrap_or(Duration::MAX);
// Replay: a fresh iter's first `next()` sets last_retry = initial_interval;
// each subsequent `next()` multiplies by `factor` and clamps to `cap`.
let (mut d, mult_steps) = match self.last_retry {
None => (*initial_interval, steps_to_replay - 1),
Some(last) => (last, steps_to_replay),
};
for _ in 0..mult_steps {
d = cmp::min(saturating_mul_f32(d, *factor), cap);
if d >= cap {
// Further steps would all stay at cap; stop early.
break;
}
}
Comment thread
AhmedSoliman marked this conversation as resolved.
self.last_retry = Some(d);
}
self.attempts = self.attempts.saturating_add(n);
Comment thread
AhmedSoliman marked this conversation as resolved.
// Clamp to max_attempts for bounded policies so subsequent
// `peek_next`/`next` arithmetic (`self.attempts + 1`) cannot overflow.
if let Some(limit) = self.policy.max_attempts() {
self.attempts = self.attempts.min(limit.into());
}
}

/// peeks the next delay without adding jitter
pub fn peek_next(&self) -> Option<Duration> {
match self.policy.as_ref() {
Expand All @@ -450,7 +500,7 @@ impl RetryIter<'_> {
None
} else if self.last_retry.is_some() {
let new_retry = cmp::min(
self.last_retry.unwrap().mul_f32(*factor),
saturating_mul_f32(self.last_retry.unwrap(), *factor),
max_interval.map(Into::into).unwrap_or(Duration::MAX),
);
Some(new_retry)
Expand All @@ -467,7 +517,7 @@ impl Iterator for RetryIter<'_> {

/// adds up to 1/3 target duration as jitter
fn next(&mut self) -> Option<Self::Item> {
self.attempts += 1;
self.attempts = self.attempts.saturating_add(1);
match self.policy.as_ref() {
RetryPolicy::None => None,
RetryPolicy::FixedDelay {
Expand All @@ -490,7 +540,7 @@ impl Iterator for RetryIter<'_> {
None
} else if self.last_retry.is_some() {
let new_retry = cmp::min(
self.last_retry.unwrap().mul_f32(*factor),
saturating_mul_f32(self.last_retry.unwrap(), *factor),
max_interval.map(Into::into).unwrap_or(Duration::MAX),
);
self.last_retry = Some(new_retry);
Expand All @@ -510,13 +560,17 @@ impl Iterator for RetryIter<'_> {
RetryPolicy::Exponential { max_attempts, .. } => max_attempts,
};
let max_attempts: usize = max_attempts.unwrap_or(NonZeroUsize::MAX).into();
(
max_attempts - self.attempts,
Some(max_attempts - self.attempts),
)
let remaining_attempts = max_attempts.saturating_sub(self.attempts);
(remaining_attempts, Some(remaining_attempts))
}
}

/// Multiplies a [`Duration`] by an `f32` factor, saturating at [`Duration::MAX`]
/// instead of panicking on overflow, non-finite, or negative results.
fn saturating_mul_f32(d: Duration, factor: f32) -> Duration {
Duration::try_from_secs_f32(d.as_secs_f32() * factor).unwrap_or(Duration::MAX)
}

// Jitter is a random duration added to the desired target, it ranges from 3ms to
// (max_multiplier * duration) of the original requested delay. The minimum of +3ms
// is to avoid falling into common zero-ending values (0, 10, 100, etc.) which are
Expand Down Expand Up @@ -717,4 +771,176 @@ mod tests {
// no more left
assert_eq!(iter.remaining_cumulative_duration(), WaitDuration::None);
}

#[test]
fn fast_forward_fixed_delay() {
let policy = RetryPolicy::fixed_delay(Duration::from_millis(100), Some(10));

// After 5 attempts, the 6th delay should still be the configured interval.
let mut iter = policy.iter_after(5);
assert_eq!(iter.attempts(), 5);
assert_eq!(iter.remaining_attempts(), 5);
assert_eq!(iter.peek_next(), Some(Duration::from_millis(100)));
let next = iter.next().expect("delay available");
assert!(within_jitter(
Duration::from_millis(100),
next,
DEFAULT_JITTER_MULTIPLIER
));
assert_eq!(iter.attempts(), 6);

// Drains until max_attempts reached.
let remaining: Vec<_> = iter.collect();
assert_eq!(remaining.len(), 4);
}

#[test]
fn fast_forward_exponential_matches_step_by_step() {
let make = || RetryPolicy::exponential(Duration::from_millis(100), 2.0, Some(10), None);

// Step-by-step reference path.
let mut reference = make().into_iter();
for _ in 0..5 {
reference.next();
}
let reference_last = reference.last_retry();

// Fast-forwarded path.
let policy = make();
let mut iter = policy.iter_after(5);
assert_eq!(iter.attempts(), 5);
assert_eq!(iter.last_retry(), reference_last);

// 6th delay (un-jittered) = 100ms * 2^5 = 3200ms (allow f32 rounding slack)
let peeked = iter.peek_next().expect("delay available");
assert!(within_rounding_error(Duration::from_millis(3200), peeked));
let actual = iter.next().expect("delay");
assert!(within_jitter(peeked, actual, DEFAULT_JITTER_MULTIPLIER));
}

#[test]
fn fast_forward_exponential_respects_max_interval_clamp() {
// initial=100ms, factor=2.0, cap=500ms. Per-step clamp: 100,200,400,500,500,500,...
let policy = RetryPolicy::exponential(
Duration::from_millis(100),
2.0,
Some(10),
Some(Duration::from_millis(500)),
);

// After 4 attempts the per-step clamp has settled at 500ms.
let iter = policy.iter_after(4);
assert_eq!(iter.last_retry(), Some(Duration::from_millis(500)));
assert_eq!(iter.peek_next(), Some(Duration::from_millis(500)));

// After 7 attempts, still clamped at 500ms (closed-form 100*2^6=6400 would be wrong).
let iter = policy.iter_after(7);
assert_eq!(iter.last_retry(), Some(Duration::from_millis(500)));
assert_eq!(iter.peek_next(), Some(Duration::from_millis(500)));
}

#[test]
fn fast_forward_beyond_max_attempts_exhausts_iter() {
let policy = RetryPolicy::fixed_delay(Duration::from_millis(100), Some(3));
let mut iter = policy.iter_after(3);
assert_eq!(iter.peek_next(), None);
assert_eq!(iter.next(), None);

// Overshoot is allowed; iterator stays exhausted.
let mut iter = policy.iter_after(99);
assert_eq!(iter.next(), None);

let policy = RetryPolicy::exponential(Duration::from_secs(1), 2.0, Some(3), None);
let mut iter = policy.iter_after(99);
// Bounded policy: overshoot is clamped to max_attempts so that subsequent
// arithmetic on `self.attempts + 1` cannot overflow.
assert_eq!(iter.attempts(), 3);
assert_eq!(iter.remaining_attempts(), 0);
assert_eq!(iter.peek_next(), None);
assert_eq!(iter.next(), None);
assert_eq!(iter.size_hint(), (0, Some(0)));
}

#[test]
fn fast_forward_usize_max_on_bounded_policy_does_not_overflow() {
// `iter_after(usize::MAX)` on a bounded policy must not cause
// `self.attempts + 1` to overflow inside `peek_next`/`next`.
let policy = RetryPolicy::fixed_delay(Duration::from_millis(100), Some(5));
let mut iter = policy.iter_after(usize::MAX);
assert_eq!(iter.attempts(), 5);
assert_eq!(iter.remaining_attempts(), 0);
assert_eq!(iter.peek_next(), None);
assert_eq!(iter.next(), None);

let policy = RetryPolicy::exponential(
Duration::from_millis(100),
2.0,
Some(5),
Some(Duration::from_millis(500)),
);
let mut iter = policy.iter_after(usize::MAX);
assert_eq!(iter.attempts(), 5);
assert_eq!(iter.peek_next(), None);
assert_eq!(iter.next(), None);
}

#[test]
fn fast_forward_unbounded_does_not_panic_on_mul_f32_overflow() {
// factor=2.0 with no cap and unbounded max_attempts would overflow
// `Duration::mul_f32` after ~64 steps. Replay must saturate, not panic.
let policy = RetryPolicy::exponential(Duration::from_millis(100), 2.0, None, None);
let iter = policy.iter_after(1_000);
// Reached Duration::MAX saturation rather than panicking.
assert_eq!(iter.last_retry(), Some(Duration::MAX));
assert_eq!(iter.peek_next(), Some(Duration::MAX));
}

#[test]
fn pathological_factor_does_not_panic() {
// NaN / negative factor must saturate to Duration::MAX instead of panicking
// inside `mul_f32`.
for factor in [f32::NAN, -1.0, f32::INFINITY] {
let policy = RetryPolicy::exponential(
Duration::from_millis(100),
factor,
Some(5),
Some(Duration::from_secs(10)),
);
// Both replay path and iterator path must complete without panic.
let _ = policy.clone().iter_after(3);
let _ = policy.into_iter().collect::<Vec<_>>();
}
}

#[test]
fn fast_forward_zero_is_noop() {
let policy = RetryPolicy::exponential(Duration::from_millis(100), 2.0, Some(5), None);
let mut iter = policy.iter();
iter.fast_forward(0);
assert_eq!(iter.attempts(), 0);
assert_eq!(iter.last_retry(), None);
// First delay equals the initial interval.
assert_eq!(iter.peek_next(), Some(Duration::from_millis(100)));
}

#[test]
fn fast_forward_is_composable() {
// Calling fast_forward repeatedly should be equivalent to one big jump.
let policy = RetryPolicy::exponential(
Duration::from_millis(100),
2.0,
Some(20),
Some(Duration::from_millis(1000)),
);

let a = policy.iter_after(7);

let mut b = policy.iter();
b.fast_forward(3);
b.fast_forward(4);

assert_eq!(a.attempts(), b.attempts());
assert_eq!(a.last_retry(), b.last_retry());
assert_eq!(a.peek_next(), b.peek_next());
}
}
Loading