Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/invoker-impl/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ impl InvokerError {
}
}

pub(crate) fn should_pause(&self) -> bool {
match self {
InvokerError::SdkV2(SdkInvocationErrorV2 { should_pause, .. }) => *should_pause,
_ => false,
}
}

pub(crate) fn into_invocation_error(self) -> InvocationError {
match self {
InvokerError::Sdk(sdk_error) => *sdk_error.error,
Expand Down Expand Up @@ -491,6 +498,7 @@ pub(crate) struct SdkInvocationErrorV2 {
pub(crate) related_command: Option<InvocationErrorRelatedCommandV2>,
pub(crate) next_retry_interval_override: Option<Duration>,
pub(crate) error: Box<InvocationError>,
pub(crate) should_pause: bool,
}

impl SdkInvocationErrorV2 {
Expand All @@ -499,6 +507,7 @@ impl SdkInvocationErrorV2 {
related_command: None,
next_retry_interval_override: None,
error: Default::default(),
should_pause: false,
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions crates/invoker-impl/src/invocation_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ impl<K: TimerKey> InvocationStateMachine<K> {
&mut self,
error_is_transient: bool,
next_retry_interval_override: Option<Duration>,
should_pause: bool,
should_bump_start_message_retry_count_since_last_stored_command: bool,
register_timer: impl FnOnce(Duration) -> K,
) -> OnTaskError {
Expand All @@ -501,7 +502,7 @@ impl<K: TimerKey> InvocationStateMachine<K> {
}
};

if self.requested_pause {
if self.requested_pause || should_pause {
// Shortcircuit to pause, as this is what the user asked for
return OnTaskError::Pause;
}
Expand Down Expand Up @@ -646,7 +647,7 @@ mod tests {

let_assert!(
OnTaskError::Retrying(_) =
invocation_state_machine.handle_task_error(true, None, true, |_| 0)
invocation_state_machine.handle_task_error(true, None, false, true, |_| 0)
);
check!(let AttemptState::WaitingRetry { .. } = invocation_state_machine.invocation_state);

Expand All @@ -655,7 +656,7 @@ mod tests {
// We stay in `WaitingForRetry`
let_assert!(
OnTaskError::Retrying(_) =
invocation_state_machine.handle_task_error(true, None, true, |_| 1)
invocation_state_machine.handle_task_error(true, None, false, true, |_| 1)
);
check!(let AttemptState::WaitingRetry { .. } = invocation_state_machine.invocation_state);
}
Expand Down Expand Up @@ -700,7 +701,7 @@ mod tests {
// Notify error
let_assert!(
OnTaskError::Retrying(_) =
invocation_state_machine.handle_task_error(true, None, true, |_| 0)
invocation_state_machine.handle_task_error(true, None, false, true, |_| 0)
);
assert_eq!(
invocation_state_machine.start_message_retry_count_since_last_stored_command,
Expand All @@ -716,7 +717,7 @@ mod tests {
// Get error again
let_assert!(
OnTaskError::Retrying(_) =
invocation_state_machine.handle_task_error(true, None, true, |_| 1)
invocation_state_machine.handle_task_error(true, None, false, true, |_| 1)
);
assert_eq!(
invocation_state_machine.start_message_retry_count_since_last_stored_command,
Expand Down Expand Up @@ -754,7 +755,7 @@ mod tests {
invocation_state_machine.notify_new_command(1, false);
let_assert!(
OnTaskError::Retrying(_) =
invocation_state_machine.handle_task_error(true, None, true, |_| 0)
invocation_state_machine.handle_task_error(true, None, false, true, |_| 0)
);

// PP sends ack for command 1
Expand All @@ -781,7 +782,7 @@ mod tests {
invocation_state_machine.notify_new_notification_proposal(NotificationId::CompletionId(1));
let_assert!(
OnTaskError::Retrying(_) =
invocation_state_machine.handle_task_error(true, None, true, |_| 0)
invocation_state_machine.handle_task_error(true, None, false, true, |_| 0)
);

// Waiting notifications acks and retry timer fired
Expand Down Expand Up @@ -819,7 +820,7 @@ mod tests {
// Put the ISM in WaitingRetry state with timer key 0
let_assert!(
OnTaskError::Retrying(_) =
invocation_state_machine.handle_task_error(true, None, true, |_| 0)
invocation_state_machine.handle_task_error(true, None, false, true, |_| 0)
);
check!(let AttemptState::WaitingRetry { .. } = invocation_state_machine.invocation_state);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,7 @@ where
.is_some_and(|entry_idx| entry_idx < self.command_index),
}),
next_retry_interval_override: error.next_retry_delay.map(Duration::from_millis),
should_pause: error.should_pause,
error: InvocationError::from(error).into(),
}))
}
Expand Down
67 changes: 67 additions & 0 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,7 @@ where
let result = ism.handle_task_error(
error.is_transient(),
error.next_retry_interval_override(),
error.should_pause(),
error.should_bump_start_message_retry_count_since_last_stored_entry(),
|duration| self.retry_timers.insert(invocation_id, duration),
);
Expand Down Expand Up @@ -2482,6 +2483,7 @@ mod tests {
let error_a = InvokerError::SdkV2(SdkInvocationErrorV2 {
related_command: None,
next_retry_interval_override: Some(Duration::from_millis(1)),
should_pause: false,
error: InvocationError::new(codes::INTERNAL, "boom").into(),
});
service_inner
Expand All @@ -2506,6 +2508,7 @@ mod tests {
let error_a_same = InvokerError::SdkV2(SdkInvocationErrorV2 {
related_command: None,
next_retry_interval_override: Some(Duration::from_millis(1)),
should_pause: false,
error: InvocationError::new(codes::INTERNAL, "boom").into(),
});
service_inner
Expand All @@ -2523,6 +2526,7 @@ mod tests {
let error_b = InvokerError::SdkV2(SdkInvocationErrorV2 {
related_command: None,
next_retry_interval_override: Some(Duration::from_millis(1)),
should_pause: false,
error: InvocationError::new(codes::INTERNAL, "boom-2").into(),
});
service_inner
Expand All @@ -2541,6 +2545,69 @@ mod tests {
);
}

#[test(restate_core::test(start_paused = true))]
async fn error_message_should_pause() {
// Enable proposing events and keep timers short for the test
let invoker_options = InvokerOptionsBuilder::default()
.inactivity_timeout(FriendlyDuration::ZERO)
.abort_timeout(FriendlyDuration::ZERO)
.disable_eager_state(false)
.build()
.unwrap();

let invocation_id = InvocationId::mock_random();

// Mock service
let (_, _status_tx, mut effects_rx, mut service_inner) = ServiceInner::mock(
(),
MockSchemas(
// fixed amount of retries so that an invocation eventually completes with a failure
Some(RetryPolicy::fixed_delay(Duration::ZERO, Some(3))),
Some(OnMaxAttempts::Kill),
),
None,
EmptyStorageReader,
);

// Start invocation epoch 0
let budget = service_inner.test_budget();
service_inner.handle_invoke(
&invoker_options,
invocation_id,
InvocationTarget::mock_virtual_object(),
budget,
);

// Select protocol V4 to allow proposing events
service_inner.handle_pinned_deployment(
invocation_id,
PinnedDeployment::new(DeploymentId::new(), ServiceProtocolVersion::V4),
false, // has_changed = false -> directly selects protocol without emitting effect
);

// Transient error with should_pause
let error_a = InvokerError::SdkV2(SdkInvocationErrorV2 {
related_command: None,
next_retry_interval_override: Some(Duration::from_millis(1)),
should_pause: true,
error: InvocationError::new(codes::INTERNAL, "boom").into(),
});
service_inner
.handle_invocation_task_failed(invocation_id, error_a, service_inner.test_budget())
.await;
assert_that!(
*effects_rx
.try_recv()
.expect("expected a proposed transient error event"),
pat!(Effect {
invocation_id: eq(invocation_id),
kind: pat!(EffectKind::Paused {
paused_event: predicate(|e: &RawEvent| e.ty() == EventType::Paused)
})
})
);
}

#[test(restate_core::test)]
async fn abort_error_counts_towards_retry_policy() {
// Enable proposing events and keep timers short for the test
Expand Down
5 changes: 5 additions & 0 deletions service-protocol/dev/restate/service/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ enum ServiceProtocolVersion {
// * WorkflowTarget.scope
// * IdempotentRequestTarget.scope
// * StartMessage.scope, StartMessage.limit_key and StartMessage.idempotency_key
// * ErrorMessage.should_pause
V7 = 7;
}

Expand Down Expand Up @@ -176,6 +177,10 @@ message ErrorMessage {
// Delay before executing the next retry, specified as duration in milliseconds.
// If provided, it will override the default retry policy used by Restate's invoker ONLY for the next retry attempt.
optional uint64 next_retry_delay = 8;

// If true, Restate will pause instead of retrying.
// This field supersedes next_retry_delay.
bool should_pause = 9;
}

// Type: 0x0000 + 3
Expand Down
Loading