Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 57 additions & 10 deletions crates/invoker-impl/src/invocation_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ impl<K: TimerKey> InvocationStateMachine<K> {
on_max_attempts: OnMaxAttempts,
concurrency_slot: ConcurrencySlot,
) -> InvocationStateMachine<K> {
let start_message_retry_count_since_last_stored_command =
permit.metadata.retry_count_since_last_stored_command;

Self {
qid,
_permit: permit,
Expand All @@ -228,7 +231,7 @@ impl<K: TimerKey> InvocationStateMachine<K> {
retry_iter,
on_max_attempts,
},
start_message_retry_count_since_last_stored_command: 0,
start_message_retry_count_since_last_stored_command,
requested_pause: false,
_concurrency_slot: concurrency_slot,
budget: None,
Expand All @@ -255,6 +258,11 @@ impl<K: TimerKey> InvocationStateMachine<K> {
};
}

/// The cumulative number of retry attempts this invocation has made so far
pub(super) fn retry_attempts(&self) -> usize {
self.retry_policy_state.retry_iter.attempts()
}

pub(super) fn abort(&mut self) {
if let AttemptState::InFlight { abort_handle, .. } = &mut self.invocation_state {
abort_handle.abort();
Expand All @@ -275,11 +283,14 @@ impl<K: TimerKey> InvocationStateMachine<K> {
return;
}

let (retry_iter, on_max_attempts) = target_resolver.resolve_invocation_retry_policy(
let (mut retry_iter, on_max_attempts) = target_resolver.resolve_invocation_retry_policy(
Some(&selected_deployment_id),
self.invocation_target.service_name(),
self.invocation_target.handler_name(),
);
// We advance the retry iterator to continue the same retry journey as previous
// incarinations.
retry_iter.fast_forward(self.retry_policy_state.retry_iter.attempts());
self.retry_policy_state = RetryPolicyState {
selected_from_deployment_id: Some(selected_deployment_id),
retry_iter,
Expand Down Expand Up @@ -481,24 +492,30 @@ impl<K: TimerKey> InvocationStateMachine<K> {
should_bump_start_message_retry_count_since_last_stored_command: bool,
register_timer: impl FnOnce(Duration) -> K,
) -> OnTaskError {
let journal_tracker = match &self.invocation_state {
if self.requested_pause {
// Shortcircuit to pause, as this is what the user asked for
return OnTaskError::Pause;
}

let journal_tracker = match self.invocation_state {
AttemptState::InFlight {
journal_tracker, ..
} => journal_tracker.clone(),
AttemptState::New => JournalTracker::default(),
ref journal_tracker,
..
} => Some(journal_tracker),
AttemptState::New => None,
AttemptState::WaitingRetry {
journal_tracker,
ref journal_tracker,
timer_fired,
..
} => {
// TODO: https://github.com/restatedev/restate/issues/538
assert!(
*timer_fired,
timer_fired,
"Restate does not support multiple retry timers yet. This would require \
deduplicating timers by some mean (e.g. fencing them off, overwriting \
old timers, not registering a new timer if an old timer has not fired yet, etc.)"
);
journal_tracker.clone()
Some(journal_tracker)
}
};

Expand All @@ -514,10 +531,30 @@ impl<K: TimerKey> InvocationStateMachine<K> {
if should_bump_start_message_retry_count_since_last_stored_command {
self.start_message_retry_count_since_last_stored_command += 1;
}

// if Qid is present, vqueues are used so we switch into retrying via the scheduler
// when the retry interval is greater > (threshold) second.
if let Some(ref qid) = self.qid
&& next_timer
>= Configuration::pinned()
.invocation
.invocation_yield_threshold()
{
trace!(
vqueue = %qid,
"Invocation is using vqueues, switching to retrying via scheduler");
return OnTaskError::RetryViaScheduler {
retry_after: next_timer,
retry_attempts: u32::try_from(self.retry_policy_state.retry_iter.attempts())
.unwrap_or(u32::MAX),
retry_count_since_last_stored_command: self
.start_message_retry_count_since_last_stored_command,
};
};
let retry_timer_key = register_timer(next_timer);
self.invocation_state = AttemptState::WaitingRetry {
timer_fired: false,
journal_tracker,
journal_tracker: journal_tracker.cloned().unwrap_or_default(),
retry_timer_key,
};
OnTaskError::Retrying(next_timer)
Expand Down Expand Up @@ -598,6 +635,16 @@ impl<K: TimerKey> InvocationStateMachine<K> {

#[derive(Debug)]
pub(super) enum OnTaskError {
RetryViaScheduler {
retry_after: Duration,
/// For service-level retry configuration. This is the total number of retries
/// we have performed throughout.
retry_attempts: u32,
/// For sdk-controlled retries. This defines the retry-count value that will be
/// sent downstream to the SDK to be used for its ctx.run() retries on the next
/// start message.
retry_count_since_last_stored_command: u32,
},
Retrying(Duration),
Pause,
Kill,
Expand Down
110 changes: 98 additions & 12 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use restate_errors::warn_it;
use restate_memory::{ByteCount, LocalMemoryPool, MemoryLease, MemoryPool, OutOfMemoryKind};
use restate_queue::SegmentQueue;
use restate_service_client::{AssumeRoleCacheMode, ServiceClient};
use restate_types::clock::RoughTimestamp;
use restate_types::config::{Configuration, InvokerOptions, ServiceClientOptions};
use restate_types::deployment::PinnedDeployment;
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -634,13 +635,21 @@ where
)
)]
fn handle_vqueue_invoke(&mut self, options: &InvokerOptions, mut command: VQueueInvokeCommand) {
let (retry_iter, on_max_attempts) =
let (mut retry_iter, on_max_attempts) =
self.schemas.live_load().resolve_invocation_retry_policy(
None,
command.invocation_target.service_name(),
command.invocation_target.handler_name(),
);

// Adjust the retry_iter so it picks up the next retry period given the number of retry
// attempts we have already done prior to the creation of this task.
retry_iter.fast_forward(command.permit.metadata.retry_attempts as usize);
trace!(
"Invoking with retry policy {retry_iter:?}, next: {:?}",
retry_iter.peek_next()
);

let storage_reader = self.invocation_state_machine_manager.storage_reader();
let concurrency_slot = self.quota.acquire_slot();

Expand Down Expand Up @@ -1301,10 +1310,11 @@ where
// Global pool exhausted — yielding may help because freeing
// the execution slot lets other invocations finish and return
// their memory.
if Configuration::pinned()
.common
.experimental
.is_invoker_yield_enabled()
if ism.qid.is_some()
|| Configuration::pinned()
.common
.experimental
.is_invoker_yield_enabled()
{
debug!(
restate.invocation.target = %ism.invocation_target,
Expand All @@ -1317,9 +1327,13 @@ where
let _ = sender
.send(Box::new(Effect {
invocation_id,
kind: EffectKind::Yield(YieldReason::ExhaustedMemoryBudget {
needed_memory: oom.needed,
}),
kind: EffectKind::Yield {
reason: YieldReason::ExhaustedMemoryBudget {
needed_memory: oom.needed,
},
error_event: None,
resume_at: None,
},
}))
.await;
} else {
Expand Down Expand Up @@ -1566,6 +1580,78 @@ where
self.invocation_state_machine_manager
.register_invocation(invocation_id, ism);
}
OnTaskError::RetryViaScheduler {
retry_after,
retry_attempts,
retry_count_since_last_stored_command,
} => {
counter!(INVOKER_INVOCATION_TASKS,
"status" => TASK_OP_FAILED,
"transient" => "true",
"partition_id" => self.invoker_id_label.clone()
)
.increment(1);
warn_it!(
error,
restate.invocation.id = %invocation_id,
restate.invocation.target = %ism.invocation_target,
restate.deployment.id = %attempt_deployment_id,
"[{}] Invocation error, retrying via the scheduler after at least {}.",
ism.retry_attempts(),
retry_after.friendly());
trace!("Invocation state: {:?}.", ism.invocation_state_debug());

let journal_v2_related_command_type =
if let InvokerError::SdkV2(SdkInvocationErrorV2 {
related_command: Some(ref related_entry),
..
}) = error
{
related_entry
.related_entry_type
.and_then(|e| e.try_as_command_ref().copied())
} else {
None
};
let invocation_error_report = error.into_invocation_error_report();
let error_event = TransientErrorEvent {
error_code: invocation_error_report.err.code(),
error_message: invocation_error_report.err.message().to_owned(),
error_stacktrace: invocation_error_report
.err
.stacktrace()
.map(|s| s.to_owned()),
restate_doc_error_code: invocation_error_report
.doc_error_code
.map(|c| c.code().to_owned()),
related_command_index: invocation_error_report.related_entry_index,
related_command_name: invocation_error_report.related_entry_name.clone(),
related_command_type: journal_v2_related_command_type,
};

// Some trivial deduplication here: if we already sent this transient error in the previous retry, don't send it again
let error_event = ism
.should_emit_transient_error_event(&error_event)
.then(|| RawEvent::from(Event::TransientError(error_event)));

self.status_store.on_end(&invocation_id);

let _ = self
.invocation_state_machine_manager
.partition_sender()
.send(Box::new(Effect {
invocation_id,
kind: EffectKind::Yield {
reason: YieldReason::TransientError {
retry_attempts,
retry_count_since_last_stored_command,
},
error_event,
resume_at: Some(RoughTimestamp::now() + retry_after),
},
}))
.await;
}
OnTaskError::Pause => {
counter!(INVOKER_INVOCATION_TASKS,
"status" => TASK_OP_FAILED,
Expand Down Expand Up @@ -3183,11 +3269,11 @@ mod tests {
*effect,
pat!(Effect {
invocation_id: eq(invocation_id),
kind: pat!(EffectKind::Yield(pat!(
YieldReason::ExhaustedMemoryBudget {
kind: pat!(EffectKind::Yield {
reason: pat!(YieldReason::ExhaustedMemoryBudget {
needed_memory: eq(NonZeroByteCount::new(NonZeroUsize::new(32768).unwrap())),
}
)))
})
})
})
);

Expand Down
1 change: 1 addition & 0 deletions crates/storage-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ restate-memory = { workspace = true }
restate-sharding = { workspace = true }
restate-types = { workspace = true }
restate-util-string = { workspace = true, features = ["bilrost", "bytes"] }
restate-util-bytecount = { workspace = true, features = ["bilrost"] }

ahash = { workspace = true }
anyhow = { workspace = true }
Expand Down
21 changes: 21 additions & 0 deletions crates/storage-api/src/vqueue_table/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use std::num::NonZeroU16;

use restate_clock::RoughTimestamp;
use restate_memory::NonZeroByteCount;
use restate_types::vqueues::{EntryId, EntryKind, Seq};

use super::Status;
Expand Down Expand Up @@ -190,13 +191,24 @@ pub struct EntryMetadataRef<'a> {
// or maybe service revision.
#[bilrost(tag(1))]
deployment: Option<&'a str>,
// If set, this is the amount of memory the invocation seems to require to
// run on the invoker side.
#[bilrost(tag(2))]
pub needed_memory: Option<NonZeroByteCount>,
#[bilrost(tag(3))]
pub retry_attempts: u32,
#[bilrost(tag(4))]
pub retry_count_since_last_stored_command: u32,
}

impl<'a> From<&'a EntryMetadata> for EntryMetadataRef<'a> {
#[inline]
fn from(value: &'a EntryMetadata) -> Self {
Self {
deployment: value.deployment.as_deref(),
needed_memory: value.needed_memory,
retry_attempts: value.retry_attempts,
retry_count_since_last_stored_command: value.retry_count_since_last_stored_command,
}
}
}
Expand All @@ -206,6 +218,15 @@ pub struct EntryMetadata {
// todo: This is temporary placeholder, type and name _will_ change.
#[bilrost(tag(1))]
pub deployment: Option<String>,

// If set, this is the amount of memory the invocation seems to require to
// run on the invoker side.
#[bilrost(tag(2))]
pub needed_memory: Option<NonZeroByteCount>,
#[bilrost(tag(3))]
pub retry_attempts: u32,
#[bilrost(tag(4))]
pub retry_count_since_last_stored_command: u32,
}

#[cfg(test)]
Expand Down
Loading
Loading