diff --git a/crates/types/src/retries.rs b/crates/types/src/retries.rs index 7dba572780..634b2c0f4c 100644 --- a/crates/types/src/retries.rs +++ b/crates/types/src/retries.rs @@ -242,6 +242,16 @@ impl RetryPolicy { last_retry: None, } } + + /// Build a `RetryIter` as if `attempts_made` attempts already happened. + /// The next delay returned will correspond to attempt `attempts_made + 1`, + /// honoring the configured policy (including per-step `max_interval` clamping + /// for exponential policies). + pub fn iter_after(&self, attempts_made: usize) -> RetryIter<'_> { + let mut iter = self.iter(); + iter.fast_forward(attempts_made); + iter + } } impl IntoIterator for RetryPolicy { @@ -345,7 +355,7 @@ impl RetryIter<'_> { } pub fn remaining_attempts(&self) -> usize { - self.max_attempts() - self.attempts() + self.max_attempts().saturating_sub(self.attempts()) } pub fn is_infinite(&self) -> bool { @@ -426,6 +436,46 @@ impl RetryIter<'_> { self.last_retry } + /// Advance internal state by `n` attempts without producing delays. + /// For exponential policies this replays the per-step `max_interval` clamp + /// so subsequent delays match the configured progression exactly. + pub fn fast_forward(&mut self, n: usize) { + if n == 0 { + return; + } + let steps_to_replay = n.min(self.remaining_attempts()); + if let RetryPolicy::Exponential { + initial_interval, + factor, + max_interval, + .. + } = self.policy.as_ref() + && steps_to_replay > 0 + { + let cap = max_interval.unwrap_or(Duration::MAX); + // Replay: a fresh iter's first `next()` sets last_retry = initial_interval; + // each subsequent `next()` multiplies by `factor` and clamps to `cap`. + let (mut d, mult_steps) = match self.last_retry { + None => (*initial_interval, steps_to_replay - 1), + Some(last) => (last, steps_to_replay), + }; + for _ in 0..mult_steps { + d = cmp::min(saturating_mul_f32(d, *factor), cap); + if d >= cap { + // Further steps would all stay at cap; stop early. + break; + } + } + self.last_retry = Some(d); + } + self.attempts = self.attempts.saturating_add(n); + // Clamp to max_attempts for bounded policies so subsequent + // `peek_next`/`next` arithmetic (`self.attempts + 1`) cannot overflow. + if let Some(limit) = self.policy.max_attempts() { + self.attempts = self.attempts.min(limit.into()); + } + } + /// peeks the next delay without adding jitter pub fn peek_next(&self) -> Option { match self.policy.as_ref() { @@ -450,7 +500,7 @@ impl RetryIter<'_> { None } else if self.last_retry.is_some() { let new_retry = cmp::min( - self.last_retry.unwrap().mul_f32(*factor), + saturating_mul_f32(self.last_retry.unwrap(), *factor), max_interval.map(Into::into).unwrap_or(Duration::MAX), ); Some(new_retry) @@ -467,7 +517,7 @@ impl Iterator for RetryIter<'_> { /// adds up to 1/3 target duration as jitter fn next(&mut self) -> Option { - self.attempts += 1; + self.attempts = self.attempts.saturating_add(1); match self.policy.as_ref() { RetryPolicy::None => None, RetryPolicy::FixedDelay { @@ -490,7 +540,7 @@ impl Iterator for RetryIter<'_> { None } else if self.last_retry.is_some() { let new_retry = cmp::min( - self.last_retry.unwrap().mul_f32(*factor), + saturating_mul_f32(self.last_retry.unwrap(), *factor), max_interval.map(Into::into).unwrap_or(Duration::MAX), ); self.last_retry = Some(new_retry); @@ -510,13 +560,17 @@ impl Iterator for RetryIter<'_> { RetryPolicy::Exponential { max_attempts, .. } => max_attempts, }; let max_attempts: usize = max_attempts.unwrap_or(NonZeroUsize::MAX).into(); - ( - max_attempts - self.attempts, - Some(max_attempts - self.attempts), - ) + let remaining_attempts = max_attempts.saturating_sub(self.attempts); + (remaining_attempts, Some(remaining_attempts)) } } +/// Multiplies a [`Duration`] by an `f32` factor, saturating at [`Duration::MAX`] +/// instead of panicking on overflow, non-finite, or negative results. +fn saturating_mul_f32(d: Duration, factor: f32) -> Duration { + Duration::try_from_secs_f32(d.as_secs_f32() * factor).unwrap_or(Duration::MAX) +} + // Jitter is a random duration added to the desired target, it ranges from 3ms to // (max_multiplier * duration) of the original requested delay. The minimum of +3ms // is to avoid falling into common zero-ending values (0, 10, 100, etc.) which are @@ -717,4 +771,176 @@ mod tests { // no more left assert_eq!(iter.remaining_cumulative_duration(), WaitDuration::None); } + + #[test] + fn fast_forward_fixed_delay() { + let policy = RetryPolicy::fixed_delay(Duration::from_millis(100), Some(10)); + + // After 5 attempts, the 6th delay should still be the configured interval. + let mut iter = policy.iter_after(5); + assert_eq!(iter.attempts(), 5); + assert_eq!(iter.remaining_attempts(), 5); + assert_eq!(iter.peek_next(), Some(Duration::from_millis(100))); + let next = iter.next().expect("delay available"); + assert!(within_jitter( + Duration::from_millis(100), + next, + DEFAULT_JITTER_MULTIPLIER + )); + assert_eq!(iter.attempts(), 6); + + // Drains until max_attempts reached. + let remaining: Vec<_> = iter.collect(); + assert_eq!(remaining.len(), 4); + } + + #[test] + fn fast_forward_exponential_matches_step_by_step() { + let make = || RetryPolicy::exponential(Duration::from_millis(100), 2.0, Some(10), None); + + // Step-by-step reference path. + let mut reference = make().into_iter(); + for _ in 0..5 { + reference.next(); + } + let reference_last = reference.last_retry(); + + // Fast-forwarded path. + let policy = make(); + let mut iter = policy.iter_after(5); + assert_eq!(iter.attempts(), 5); + assert_eq!(iter.last_retry(), reference_last); + + // 6th delay (un-jittered) = 100ms * 2^5 = 3200ms (allow f32 rounding slack) + let peeked = iter.peek_next().expect("delay available"); + assert!(within_rounding_error(Duration::from_millis(3200), peeked)); + let actual = iter.next().expect("delay"); + assert!(within_jitter(peeked, actual, DEFAULT_JITTER_MULTIPLIER)); + } + + #[test] + fn fast_forward_exponential_respects_max_interval_clamp() { + // initial=100ms, factor=2.0, cap=500ms. Per-step clamp: 100,200,400,500,500,500,... + let policy = RetryPolicy::exponential( + Duration::from_millis(100), + 2.0, + Some(10), + Some(Duration::from_millis(500)), + ); + + // After 4 attempts the per-step clamp has settled at 500ms. + let iter = policy.iter_after(4); + assert_eq!(iter.last_retry(), Some(Duration::from_millis(500))); + assert_eq!(iter.peek_next(), Some(Duration::from_millis(500))); + + // After 7 attempts, still clamped at 500ms (closed-form 100*2^6=6400 would be wrong). + let iter = policy.iter_after(7); + assert_eq!(iter.last_retry(), Some(Duration::from_millis(500))); + assert_eq!(iter.peek_next(), Some(Duration::from_millis(500))); + } + + #[test] + fn fast_forward_beyond_max_attempts_exhausts_iter() { + let policy = RetryPolicy::fixed_delay(Duration::from_millis(100), Some(3)); + let mut iter = policy.iter_after(3); + assert_eq!(iter.peek_next(), None); + assert_eq!(iter.next(), None); + + // Overshoot is allowed; iterator stays exhausted. + let mut iter = policy.iter_after(99); + assert_eq!(iter.next(), None); + + let policy = RetryPolicy::exponential(Duration::from_secs(1), 2.0, Some(3), None); + let mut iter = policy.iter_after(99); + // Bounded policy: overshoot is clamped to max_attempts so that subsequent + // arithmetic on `self.attempts + 1` cannot overflow. + assert_eq!(iter.attempts(), 3); + assert_eq!(iter.remaining_attempts(), 0); + assert_eq!(iter.peek_next(), None); + assert_eq!(iter.next(), None); + assert_eq!(iter.size_hint(), (0, Some(0))); + } + + #[test] + fn fast_forward_usize_max_on_bounded_policy_does_not_overflow() { + // `iter_after(usize::MAX)` on a bounded policy must not cause + // `self.attempts + 1` to overflow inside `peek_next`/`next`. + let policy = RetryPolicy::fixed_delay(Duration::from_millis(100), Some(5)); + let mut iter = policy.iter_after(usize::MAX); + assert_eq!(iter.attempts(), 5); + assert_eq!(iter.remaining_attempts(), 0); + assert_eq!(iter.peek_next(), None); + assert_eq!(iter.next(), None); + + let policy = RetryPolicy::exponential( + Duration::from_millis(100), + 2.0, + Some(5), + Some(Duration::from_millis(500)), + ); + let mut iter = policy.iter_after(usize::MAX); + assert_eq!(iter.attempts(), 5); + assert_eq!(iter.peek_next(), None); + assert_eq!(iter.next(), None); + } + + #[test] + fn fast_forward_unbounded_does_not_panic_on_mul_f32_overflow() { + // factor=2.0 with no cap and unbounded max_attempts would overflow + // `Duration::mul_f32` after ~64 steps. Replay must saturate, not panic. + let policy = RetryPolicy::exponential(Duration::from_millis(100), 2.0, None, None); + let iter = policy.iter_after(1_000); + // Reached Duration::MAX saturation rather than panicking. + assert_eq!(iter.last_retry(), Some(Duration::MAX)); + assert_eq!(iter.peek_next(), Some(Duration::MAX)); + } + + #[test] + fn pathological_factor_does_not_panic() { + // NaN / negative factor must saturate to Duration::MAX instead of panicking + // inside `mul_f32`. + for factor in [f32::NAN, -1.0, f32::INFINITY] { + let policy = RetryPolicy::exponential( + Duration::from_millis(100), + factor, + Some(5), + Some(Duration::from_secs(10)), + ); + // Both replay path and iterator path must complete without panic. + let _ = policy.clone().iter_after(3); + let _ = policy.into_iter().collect::>(); + } + } + + #[test] + fn fast_forward_zero_is_noop() { + let policy = RetryPolicy::exponential(Duration::from_millis(100), 2.0, Some(5), None); + let mut iter = policy.iter(); + iter.fast_forward(0); + assert_eq!(iter.attempts(), 0); + assert_eq!(iter.last_retry(), None); + // First delay equals the initial interval. + assert_eq!(iter.peek_next(), Some(Duration::from_millis(100))); + } + + #[test] + fn fast_forward_is_composable() { + // Calling fast_forward repeatedly should be equivalent to one big jump. + let policy = RetryPolicy::exponential( + Duration::from_millis(100), + 2.0, + Some(20), + Some(Duration::from_millis(1000)), + ); + + let a = policy.iter_after(7); + + let mut b = policy.iter(); + b.fast_forward(3); + b.fast_forward(4); + + assert_eq!(a.attempts(), b.attempts()); + assert_eq!(a.last_retry(), b.last_retry()); + assert_eq!(a.peek_next(), b.peek_next()); + } }