diff --git a/crates/invoker-impl/src/error.rs b/crates/invoker-impl/src/error.rs index 833c5cdba4..021f272818 100644 --- a/crates/invoker-impl/src/error.rs +++ b/crates/invoker-impl/src/error.rs @@ -308,6 +308,13 @@ impl InvokerError { } } + pub(crate) fn should_pause(&self) -> bool { + match self { + InvokerError::SdkV2(SdkInvocationErrorV2 { should_pause, .. }) => *should_pause, + _ => false, + } + } + pub(crate) fn into_invocation_error(self) -> InvocationError { match self { InvokerError::Sdk(sdk_error) => *sdk_error.error, @@ -491,6 +498,7 @@ pub(crate) struct SdkInvocationErrorV2 { pub(crate) related_command: Option, pub(crate) next_retry_interval_override: Option, pub(crate) error: Box, + pub(crate) should_pause: bool, } impl SdkInvocationErrorV2 { @@ -499,6 +507,7 @@ impl SdkInvocationErrorV2 { related_command: None, next_retry_interval_override: None, error: Default::default(), + should_pause: false, } } } diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index dcefa23756..30da3b0575 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -477,6 +477,7 @@ impl InvocationStateMachine { &mut self, error_is_transient: bool, next_retry_interval_override: Option, + should_pause: bool, should_bump_start_message_retry_count_since_last_stored_command: bool, register_timer: impl FnOnce(Duration) -> K, ) -> OnTaskError { @@ -501,7 +502,7 @@ impl InvocationStateMachine { } }; - if self.requested_pause { + if self.requested_pause || should_pause { // Shortcircuit to pause, as this is what the user asked for return OnTaskError::Pause; } @@ -646,7 +647,7 @@ mod tests { let_assert!( OnTaskError::Retrying(_) = - invocation_state_machine.handle_task_error(true, None, true, |_| 0) + invocation_state_machine.handle_task_error(true, None, false, true, |_| 0) ); check!(let AttemptState::WaitingRetry { .. } = invocation_state_machine.invocation_state); @@ -655,7 +656,7 @@ mod tests { // We stay in `WaitingForRetry` let_assert!( OnTaskError::Retrying(_) = - invocation_state_machine.handle_task_error(true, None, true, |_| 1) + invocation_state_machine.handle_task_error(true, None, false, true, |_| 1) ); check!(let AttemptState::WaitingRetry { .. } = invocation_state_machine.invocation_state); } @@ -700,7 +701,7 @@ mod tests { // Notify error let_assert!( OnTaskError::Retrying(_) = - invocation_state_machine.handle_task_error(true, None, true, |_| 0) + invocation_state_machine.handle_task_error(true, None, false, true, |_| 0) ); assert_eq!( invocation_state_machine.start_message_retry_count_since_last_stored_command, @@ -716,7 +717,7 @@ mod tests { // Get error again let_assert!( OnTaskError::Retrying(_) = - invocation_state_machine.handle_task_error(true, None, true, |_| 1) + invocation_state_machine.handle_task_error(true, None, false, true, |_| 1) ); assert_eq!( invocation_state_machine.start_message_retry_count_since_last_stored_command, @@ -754,7 +755,7 @@ mod tests { invocation_state_machine.notify_new_command(1, false); let_assert!( OnTaskError::Retrying(_) = - invocation_state_machine.handle_task_error(true, None, true, |_| 0) + invocation_state_machine.handle_task_error(true, None, false, true, |_| 0) ); // PP sends ack for command 1 @@ -781,7 +782,7 @@ mod tests { invocation_state_machine.notify_new_notification_proposal(NotificationId::CompletionId(1)); let_assert!( OnTaskError::Retrying(_) = - invocation_state_machine.handle_task_error(true, None, true, |_| 0) + invocation_state_machine.handle_task_error(true, None, false, true, |_| 0) ); // Waiting notifications acks and retry timer fired @@ -819,7 +820,7 @@ mod tests { // Put the ISM in WaitingRetry state with timer key 0 let_assert!( OnTaskError::Retrying(_) = - invocation_state_machine.handle_task_error(true, None, true, |_| 0) + invocation_state_machine.handle_task_error(true, None, false, true, |_| 0) ); check!(let AttemptState::WaitingRetry { .. } = invocation_state_machine.invocation_state); diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index 7aa391b33a..3fd1ed024f 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -1440,6 +1440,7 @@ where .is_some_and(|entry_idx| entry_idx < self.command_index), }), next_retry_interval_override: error.next_retry_delay.map(Duration::from_millis), + should_pause: error.should_pause, error: InvocationError::from(error).into(), })) } diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 7052e7ea31..a77caf0f37 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -1474,6 +1474,7 @@ where let result = ism.handle_task_error( error.is_transient(), error.next_retry_interval_override(), + error.should_pause(), error.should_bump_start_message_retry_count_since_last_stored_entry(), |duration| self.retry_timers.insert(invocation_id, duration), ); @@ -2482,6 +2483,7 @@ mod tests { let error_a = InvokerError::SdkV2(SdkInvocationErrorV2 { related_command: None, next_retry_interval_override: Some(Duration::from_millis(1)), + should_pause: false, error: InvocationError::new(codes::INTERNAL, "boom").into(), }); service_inner @@ -2506,6 +2508,7 @@ mod tests { let error_a_same = InvokerError::SdkV2(SdkInvocationErrorV2 { related_command: None, next_retry_interval_override: Some(Duration::from_millis(1)), + should_pause: false, error: InvocationError::new(codes::INTERNAL, "boom").into(), }); service_inner @@ -2523,6 +2526,7 @@ mod tests { let error_b = InvokerError::SdkV2(SdkInvocationErrorV2 { related_command: None, next_retry_interval_override: Some(Duration::from_millis(1)), + should_pause: false, error: InvocationError::new(codes::INTERNAL, "boom-2").into(), }); service_inner @@ -2541,6 +2545,69 @@ mod tests { ); } + #[test(restate_core::test(start_paused = true))] + async fn error_message_should_pause() { + // Enable proposing events and keep timers short for the test + let invoker_options = InvokerOptionsBuilder::default() + .inactivity_timeout(FriendlyDuration::ZERO) + .abort_timeout(FriendlyDuration::ZERO) + .disable_eager_state(false) + .build() + .unwrap(); + + let invocation_id = InvocationId::mock_random(); + + // Mock service + let (_, _status_tx, mut effects_rx, mut service_inner) = ServiceInner::mock( + (), + MockSchemas( + // fixed amount of retries so that an invocation eventually completes with a failure + Some(RetryPolicy::fixed_delay(Duration::ZERO, Some(3))), + Some(OnMaxAttempts::Kill), + ), + None, + EmptyStorageReader, + ); + + // Start invocation epoch 0 + let budget = service_inner.test_budget(); + service_inner.handle_invoke( + &invoker_options, + invocation_id, + InvocationTarget::mock_virtual_object(), + budget, + ); + + // Select protocol V4 to allow proposing events + service_inner.handle_pinned_deployment( + invocation_id, + PinnedDeployment::new(DeploymentId::new(), ServiceProtocolVersion::V4), + false, // has_changed = false -> directly selects protocol without emitting effect + ); + + // Transient error with should_pause + let error_a = InvokerError::SdkV2(SdkInvocationErrorV2 { + related_command: None, + next_retry_interval_override: Some(Duration::from_millis(1)), + should_pause: true, + error: InvocationError::new(codes::INTERNAL, "boom").into(), + }); + service_inner + .handle_invocation_task_failed(invocation_id, error_a, service_inner.test_budget()) + .await; + assert_that!( + *effects_rx + .try_recv() + .expect("expected a proposed transient error event"), + pat!(Effect { + invocation_id: eq(invocation_id), + kind: pat!(EffectKind::Paused { + paused_event: predicate(|e: &RawEvent| e.ty() == EventType::Paused) + }) + }) + ); + } + #[test(restate_core::test)] async fn abort_error_counts_towards_retry_policy() { // Enable proposing events and keep timers short for the test diff --git a/service-protocol/dev/restate/service/protocol.proto b/service-protocol/dev/restate/service/protocol.proto index 02a3b55ebf..8851b4af6d 100644 --- a/service-protocol/dev/restate/service/protocol.proto +++ b/service-protocol/dev/restate/service/protocol.proto @@ -44,6 +44,7 @@ enum ServiceProtocolVersion { // * WorkflowTarget.scope // * IdempotentRequestTarget.scope // * StartMessage.scope, StartMessage.limit_key and StartMessage.idempotency_key + // * ErrorMessage.should_pause V7 = 7; } @@ -176,6 +177,10 @@ message ErrorMessage { // Delay before executing the next retry, specified as duration in milliseconds. // If provided, it will override the default retry policy used by Restate's invoker ONLY for the next retry attempt. optional uint64 next_retry_delay = 8; + + // If true, Restate will pause instead of retrying. + // This field supersedes next_retry_delay. + bool should_pause = 9; } // Type: 0x0000 + 3