diff --git a/Cargo.lock b/Cargo.lock index a17e14e405..a852e8d8ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8356,6 +8356,7 @@ dependencies = [ "restate-memory", "restate-sharding", "restate-types", + "restate-util-bytecount", "restate-util-string", "restate-workspace-hack", "serde", @@ -8583,6 +8584,7 @@ dependencies = [ name = "restate-util-bytecount" version = "1.7.0-dev" dependencies = [ + "bilrost", "bytesize", "restate-util-bytecount", "restate-util-string", diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index 30da3b0575..85a2d75de7 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -215,6 +215,9 @@ impl InvocationStateMachine { on_max_attempts: OnMaxAttempts, concurrency_slot: ConcurrencySlot, ) -> InvocationStateMachine { + let start_message_retry_count_since_last_stored_command = + permit.metadata.retry_count_since_last_stored_command; + Self { qid, _permit: permit, @@ -228,7 +231,7 @@ impl InvocationStateMachine { retry_iter, on_max_attempts, }, - start_message_retry_count_since_last_stored_command: 0, + start_message_retry_count_since_last_stored_command, requested_pause: false, _concurrency_slot: concurrency_slot, budget: None, @@ -255,6 +258,11 @@ impl InvocationStateMachine { }; } + /// The cumulative number of retry attempts this invocation has made so far + pub(super) fn retry_attempts(&self) -> usize { + self.retry_policy_state.retry_iter.attempts() + } + pub(super) fn abort(&mut self) { if let AttemptState::InFlight { abort_handle, .. } = &mut self.invocation_state { abort_handle.abort(); @@ -275,11 +283,14 @@ impl InvocationStateMachine { return; } - let (retry_iter, on_max_attempts) = target_resolver.resolve_invocation_retry_policy( + let (mut retry_iter, on_max_attempts) = target_resolver.resolve_invocation_retry_policy( Some(&selected_deployment_id), self.invocation_target.service_name(), self.invocation_target.handler_name(), ); + // We advance the retry iterator to continue the same retry journey as previous + // incarinations. + retry_iter.fast_forward(self.retry_policy_state.retry_iter.attempts()); self.retry_policy_state = RetryPolicyState { selected_from_deployment_id: Some(selected_deployment_id), retry_iter, @@ -481,24 +492,30 @@ impl InvocationStateMachine { should_bump_start_message_retry_count_since_last_stored_command: bool, register_timer: impl FnOnce(Duration) -> K, ) -> OnTaskError { - let journal_tracker = match &self.invocation_state { + if self.requested_pause { + // Shortcircuit to pause, as this is what the user asked for + return OnTaskError::Pause; + } + + let journal_tracker = match self.invocation_state { AttemptState::InFlight { - journal_tracker, .. - } => journal_tracker.clone(), - AttemptState::New => JournalTracker::default(), + ref journal_tracker, + .. + } => Some(journal_tracker), + AttemptState::New => None, AttemptState::WaitingRetry { - journal_tracker, + ref journal_tracker, timer_fired, .. } => { // TODO: https://github.com/restatedev/restate/issues/538 assert!( - *timer_fired, + timer_fired, "Restate does not support multiple retry timers yet. This would require \ deduplicating timers by some mean (e.g. fencing them off, overwriting \ old timers, not registering a new timer if an old timer has not fired yet, etc.)" ); - journal_tracker.clone() + Some(journal_tracker) } }; @@ -514,10 +531,30 @@ impl InvocationStateMachine { if should_bump_start_message_retry_count_since_last_stored_command { self.start_message_retry_count_since_last_stored_command += 1; } + + // if Qid is present, vqueues are used so we switch into retrying via the scheduler + // when the retry interval is greater > (threshold) second. + if let Some(ref qid) = self.qid + && next_timer + >= Configuration::pinned() + .invocation + .invocation_yield_threshold() + { + trace!( + vqueue = %qid, + "Invocation is using vqueues, switching to retrying via scheduler"); + return OnTaskError::RetryViaScheduler { + retry_after: next_timer, + retry_attempts: u32::try_from(self.retry_policy_state.retry_iter.attempts()) + .unwrap_or(u32::MAX), + retry_count_since_last_stored_command: self + .start_message_retry_count_since_last_stored_command, + }; + }; let retry_timer_key = register_timer(next_timer); self.invocation_state = AttemptState::WaitingRetry { timer_fired: false, - journal_tracker, + journal_tracker: journal_tracker.cloned().unwrap_or_default(), retry_timer_key, }; OnTaskError::Retrying(next_timer) @@ -598,6 +635,16 @@ impl InvocationStateMachine { #[derive(Debug)] pub(super) enum OnTaskError { + RetryViaScheduler { + retry_after: Duration, + /// For service-level retry configuration. This is the total number of retries + /// we have performed throughout. + retry_attempts: u32, + /// For sdk-controlled retries. This defines the retry-count value that will be + /// sent downstream to the SDK to be used for its ctx.run() retries on the next + /// start message. + retry_count_since_last_stored_command: u32, + }, Retrying(Duration), Pause, Kill, diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index a77caf0f37..7d6d19ef79 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -42,6 +42,7 @@ use restate_errors::warn_it; use restate_memory::{ByteCount, LocalMemoryPool, MemoryLease, MemoryPool, OutOfMemoryKind}; use restate_queue::SegmentQueue; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; +use restate_types::clock::RoughTimestamp; use restate_types::config::{Configuration, InvokerOptions, ServiceClientOptions}; use restate_types::deployment::PinnedDeployment; use restate_types::identifiers::PartitionId; @@ -634,13 +635,21 @@ where ) )] fn handle_vqueue_invoke(&mut self, options: &InvokerOptions, mut command: VQueueInvokeCommand) { - let (retry_iter, on_max_attempts) = + let (mut retry_iter, on_max_attempts) = self.schemas.live_load().resolve_invocation_retry_policy( None, command.invocation_target.service_name(), command.invocation_target.handler_name(), ); + // Adjust the retry_iter so it picks up the next retry period given the number of retry + // attempts we have already done prior to the creation of this task. + retry_iter.fast_forward(command.permit.metadata.retry_attempts as usize); + trace!( + "Invoking with retry policy {retry_iter:?}, next: {:?}", + retry_iter.peek_next() + ); + let storage_reader = self.invocation_state_machine_manager.storage_reader(); let concurrency_slot = self.quota.acquire_slot(); @@ -1301,10 +1310,11 @@ where // Global pool exhausted — yielding may help because freeing // the execution slot lets other invocations finish and return // their memory. - if Configuration::pinned() - .common - .experimental - .is_invoker_yield_enabled() + if ism.qid.is_some() + || Configuration::pinned() + .common + .experimental + .is_invoker_yield_enabled() { debug!( restate.invocation.target = %ism.invocation_target, @@ -1317,9 +1327,13 @@ where let _ = sender .send(Box::new(Effect { invocation_id, - kind: EffectKind::Yield(YieldReason::ExhaustedMemoryBudget { - needed_memory: oom.needed, - }), + kind: EffectKind::Yield { + reason: YieldReason::ExhaustedMemoryBudget { + needed_memory: oom.needed, + }, + error_event: None, + resume_at: None, + }, })) .await; } else { @@ -1566,6 +1580,78 @@ where self.invocation_state_machine_manager .register_invocation(invocation_id, ism); } + OnTaskError::RetryViaScheduler { + retry_after, + retry_attempts, + retry_count_since_last_stored_command, + } => { + counter!(INVOKER_INVOCATION_TASKS, + "status" => TASK_OP_FAILED, + "transient" => "true", + "partition_id" => self.invoker_id_label.clone() + ) + .increment(1); + warn_it!( + error, + restate.invocation.id = %invocation_id, + restate.invocation.target = %ism.invocation_target, + restate.deployment.id = %attempt_deployment_id, + "[{}] Invocation error, retrying via the scheduler after at least {}.", + ism.retry_attempts(), + retry_after.friendly()); + trace!("Invocation state: {:?}.", ism.invocation_state_debug()); + + let journal_v2_related_command_type = + if let InvokerError::SdkV2(SdkInvocationErrorV2 { + related_command: Some(ref related_entry), + .. + }) = error + { + related_entry + .related_entry_type + .and_then(|e| e.try_as_command_ref().copied()) + } else { + None + }; + let invocation_error_report = error.into_invocation_error_report(); + let error_event = TransientErrorEvent { + error_code: invocation_error_report.err.code(), + error_message: invocation_error_report.err.message().to_owned(), + error_stacktrace: invocation_error_report + .err + .stacktrace() + .map(|s| s.to_owned()), + restate_doc_error_code: invocation_error_report + .doc_error_code + .map(|c| c.code().to_owned()), + related_command_index: invocation_error_report.related_entry_index, + related_command_name: invocation_error_report.related_entry_name.clone(), + related_command_type: journal_v2_related_command_type, + }; + + // Some trivial deduplication here: if we already sent this transient error in the previous retry, don't send it again + let error_event = ism + .should_emit_transient_error_event(&error_event) + .then(|| RawEvent::from(Event::TransientError(error_event))); + + self.status_store.on_end(&invocation_id); + + let _ = self + .invocation_state_machine_manager + .partition_sender() + .send(Box::new(Effect { + invocation_id, + kind: EffectKind::Yield { + reason: YieldReason::TransientError { + retry_attempts, + retry_count_since_last_stored_command, + }, + error_event, + resume_at: Some(RoughTimestamp::now() + retry_after), + }, + })) + .await; + } OnTaskError::Pause => { counter!(INVOKER_INVOCATION_TASKS, "status" => TASK_OP_FAILED, @@ -3183,11 +3269,11 @@ mod tests { *effect, pat!(Effect { invocation_id: eq(invocation_id), - kind: pat!(EffectKind::Yield(pat!( - YieldReason::ExhaustedMemoryBudget { + kind: pat!(EffectKind::Yield { + reason: pat!(YieldReason::ExhaustedMemoryBudget { needed_memory: eq(NonZeroByteCount::new(NonZeroUsize::new(32768).unwrap())), - } - ))) + }) + }) }) ); diff --git a/crates/storage-api/Cargo.toml b/crates/storage-api/Cargo.toml index 74bf5d7c79..0670510c7b 100644 --- a/crates/storage-api/Cargo.toml +++ b/crates/storage-api/Cargo.toml @@ -19,6 +19,7 @@ restate-memory = { workspace = true } restate-sharding = { workspace = true } restate-types = { workspace = true } restate-util-string = { workspace = true, features = ["bilrost", "bytes"] } +restate-util-bytecount = { workspace = true, features = ["bilrost"] } ahash = { workspace = true } anyhow = { workspace = true } diff --git a/crates/storage-api/src/vqueue_table/entry.rs b/crates/storage-api/src/vqueue_table/entry.rs index e2b40475fb..d12265e4b8 100644 --- a/crates/storage-api/src/vqueue_table/entry.rs +++ b/crates/storage-api/src/vqueue_table/entry.rs @@ -11,6 +11,7 @@ use std::num::NonZeroU16; use restate_clock::RoughTimestamp; +use restate_memory::NonZeroByteCount; use restate_types::vqueues::{EntryId, EntryKind, Seq}; use super::Status; @@ -190,6 +191,14 @@ pub struct EntryMetadataRef<'a> { // or maybe service revision. #[bilrost(tag(1))] deployment: Option<&'a str>, + // If set, this is the amount of memory the invocation seems to require to + // run on the invoker side. + #[bilrost(tag(2))] + pub needed_memory: Option, + #[bilrost(tag(3))] + pub retry_attempts: u32, + #[bilrost(tag(4))] + pub retry_count_since_last_stored_command: u32, } impl<'a> From<&'a EntryMetadata> for EntryMetadataRef<'a> { @@ -197,6 +206,9 @@ impl<'a> From<&'a EntryMetadata> for EntryMetadataRef<'a> { fn from(value: &'a EntryMetadata) -> Self { Self { deployment: value.deployment.as_deref(), + needed_memory: value.needed_memory, + retry_attempts: value.retry_attempts, + retry_count_since_last_stored_command: value.retry_count_since_last_stored_command, } } } @@ -206,6 +218,15 @@ pub struct EntryMetadata { // todo: This is temporary placeholder, type and name _will_ change. #[bilrost(tag(1))] pub deployment: Option, + + // If set, this is the amount of memory the invocation seems to require to + // run on the invoker side. + #[bilrost(tag(2))] + pub needed_memory: Option, + #[bilrost(tag(3))] + pub retry_attempts: u32, + #[bilrost(tag(4))] + pub retry_count_since_last_stored_command: u32, } #[cfg(test)] diff --git a/crates/storage-api/src/vqueue_table/scheduler.rs b/crates/storage-api/src/vqueue_table/scheduler.rs index 6e087a41aa..c5accc5ce4 100644 --- a/crates/storage-api/src/vqueue_table/scheduler.rs +++ b/crates/storage-api/src/vqueue_table/scheduler.rs @@ -11,22 +11,75 @@ use bytes::{Buf, BufMut, Bytes}; use restate_clock::RoughTimestamp; +use restate_memory::NonZeroByteCount; use restate_types::bilrost_storage_encode_decode; use restate_types::vqueues::VQueueId; use super::EntryKey; use super::stats::WaitStats; -#[derive(Debug)] +/// Why the invoker yielded the invocation back to the scheduler. +/// +/// New reasons can be added without a version barrier — nodes that don't +/// recognize a reason will deserialize it as [`Unknown`](Self::Unknown) and +/// apply the default re-scheduling strategy (immediate re-invoke). +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + serde::Serialize, + serde::Deserialize, + bilrost::Message, + bilrost::Oneof, +)] +#[serde(tag = "reason")] pub enum YieldReason { - /// The entry is in "run" queue and needs to be placed back onto inbox - PartitionLeaderChanged, - // Only used if we'd like to update the next run time - // Entry is in "inbox" and needs to be re-placed back onto inbox but at different - // run_at time. - RetryLater { - run_at: RoughTimestamp, + /// The entry is in "run" queue and needs to be placed back onto inbox because + /// the partition leader has changed. + #[bilrost(tag(1), message)] + PartitionLeaderChange, + /// The invocation exhausted its outbound memory budget. + #[bilrost(tag(2), message)] + ExhaustedMemoryBudget { needed_memory: NonZeroByteCount }, + /// The invocation has been yielded due to an error during execution. + #[bilrost(tag(3), message)] + TransientError { + /// Controls the service retry policy related retries. This is the the + /// value that should be used to initialize the retry policy after resuming. + retry_attempts: u32, + /// For sdk-controlled retries. This defines the retry-count value that will be + /// sent downstream to the SDK to be used for its ctx.run() retries on the next + /// start message. + retry_count_since_last_stored_command: u32, }, + /// The invocation has been yielded due to an error or cooperatively yielded + /// due to capacity constraints. + #[bilrost(tag(4), message)] + InvokerLoadShedding, + /// A yield reason not recognized by this node version or a yield that triggered + /// through a path that doesn't expect yields (harmless). + #[serde(other)] + #[bilrost(empty)] + Unknown, +} + +impl std::fmt::Display for YieldReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + YieldReason::ExhaustedMemoryBudget { needed_memory } => { + write!( + f, + "exhausted invocation memory budget ({needed_memory} needed)", + ) + } + YieldReason::TransientError { .. } => write!(f, "transient error"), + YieldReason::InvokerLoadShedding => write!(f, "invoker load shedding"), + YieldReason::PartitionLeaderChange => write!(f, "partition leader change"), + YieldReason::Unknown => write!(f, "unknown"), + } + } } #[derive(Debug, Clone, bilrost::Oneof, bilrost::Message, derive_more::From)] @@ -55,6 +108,8 @@ pub struct YieldAction { pub key: EntryKey, #[bilrost(tag(2))] pub next_run_at: Option, + #[bilrost(tag(3))] + pub reason: YieldReason, } #[derive(Debug, Clone, bilrost::Message)] diff --git a/crates/storage-query-datafusion/src/vqueues/tests.rs b/crates/storage-query-datafusion/src/vqueues/tests.rs index 4192937860..07390d8f09 100644 --- a/crates/storage-query-datafusion/src/vqueues/tests.rs +++ b/crates/storage-query-datafusion/src/vqueues/tests.rs @@ -60,6 +60,7 @@ async fn get_vqueue_entry_value_fields() { status: Status::BackingOff, metadata: EntryMetadata { deployment: Some("dp_123".to_string()), + ..Default::default() }, stats, }; diff --git a/crates/types/service-protocol-v3/dev/restate/service/protocol.proto b/crates/types/service-protocol-v3/dev/restate/service/protocol.proto index e2e06e1ab3..ea6b133878 100644 --- a/crates/types/service-protocol-v3/dev/restate/service/protocol.proto +++ b/crates/types/service-protocol-v3/dev/restate/service/protocol.proto @@ -75,8 +75,8 @@ message StartMessage { // Retry count since the last stored entry. // - // Please note that this count might not be accurate, as it's not durably stored, - // thus it might get reset in case Restate crashes/changes leader. + // Please note that this count might not be accurate, as it's not durably stored on every update, + // thus it might get revert to an older value in case Restate crashes/changes leader. uint32 retry_count_since_last_stored_entry = 7; // Duration since the last stored entry, in milliseconds. diff --git a/crates/types/src/config/invocation.rs b/crates/types/src/config/invocation.rs index e090b66f2d..7bede99a13 100644 --- a/crates/types/src/config/invocation.rs +++ b/crates/types/src/config/invocation.rs @@ -14,6 +14,8 @@ use serde::{Deserialize, Serialize}; use restate_util_time::{FriendlyDuration, NonZeroFriendlyDuration}; +const DEFAULT_INVOCATION_YIELD_THRESHOLD: FriendlyDuration = FriendlyDuration::from_secs(2); + #[derive(Debug, Clone, Serialize, Deserialize, derive_builder::Builder, PartialEq)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[cfg_attr(feature = "schemars", schemars(rename = "InvocationOptions", default))] @@ -56,6 +58,26 @@ pub struct InvocationOptions { /// `None` means no limit, that is infinite retries is enabled. #[serde(default, skip_serializing_if = "Option::is_none")] pub max_retry_policy_max_attempts: Option, + + /// # Yield from invoker threshold + /// + /// If an invocation needs retrying, it'll hold its invoker concurrency slot and + /// while backing-off. This threshold allows the invoker to yield the invocation + /// back to the scheduler when the next back-off duration exceeds this value. + /// + /// When an invocation yields, its invoker concurrency slot is released and the + /// scheduler is given a chance to scheduler other invocations that may make progress + /// instead. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[cfg_attr(feature = "schemars", schemars(skip))] + invocation_yield_threshold: Option, +} + +impl InvocationOptions { + pub fn invocation_yield_threshold(&self) -> FriendlyDuration { + self.invocation_yield_threshold + .unwrap_or(DEFAULT_INVOCATION_YIELD_THRESHOLD) + } } impl Default for InvocationOptions { @@ -65,6 +87,7 @@ impl Default for InvocationOptions { max_journal_retention: None, default_retry_policy: InvocationRetryPolicyOptions::default(), max_retry_policy_max_attempts: None, + invocation_yield_threshold: None, } } } diff --git a/crates/vqueues/src/lib.rs b/crates/vqueues/src/lib.rs index fe5aa92976..f7bf0f0381 100644 --- a/crates/vqueues/src/lib.rs +++ b/crates/vqueues/src/lib.rs @@ -42,6 +42,7 @@ use restate_types::vqueues::{EntryId, Seq, VQueueId}; #[cfg(test)] use restate_types::{LockName, Scope}; use restate_util_string::ReString; +use restate_worker_api::invoker::YieldReason; // Token bucket used for throttling over all vqueues type GlobalTokenBucket = @@ -307,10 +308,10 @@ where }; debug!( + entry = %entry_id.display(meta.vqueue_id().partition_key()), qid = %meta.vqueue_id(), - key = ?key, - "[enqueue] entry: {}, next_stage: 'inbox', status: {status}", - entry_id.display(meta.vqueue_id().partition_key()), + "->{}, status: {status}, run_at: {run_at}", + Stage::Inbox, ); self.storage @@ -400,21 +401,21 @@ where *header.entry_key() }; - debug!( - header = ?header, - modified_key = ?modified_key, - "[run] entry: {}, next_stage: '{}', prev_status: {}", - header.display_entry_id(), - Stage::Running, - header.status(), - ); - let new_status = if !header.has_started() { Status::Started } else { header.status() }; + debug!( + entry = %header.display_entry_id(), + qid = %header.vqueue_id(), + "{}->{}, status: {}->{new_status}", + header.stage(), + Stage::Running, + header.status(), + ); + self.storage.put_vqueue_inbox( vqueue_id, Stage::Running, @@ -501,10 +502,10 @@ where let maybe_new_metadata = updated_metadata.unwrap_or_else(|| header.metadata().clone()); debug!( - header = ?header, - modified_key = ?modified_key, - "[wake-up] entry: {}, next_stage: '{}', last_status: {}", - header.display_entry_id(), + entry = %header.display_entry_id(), + qid = %header.vqueue_id(), + "{}->{}, status: {}", + header.stage(), Stage::Inbox, header.status(), ); @@ -570,9 +571,12 @@ where assert!(matches!(next_stage, Stage::Paused | Stage::Suspended)); debug!( - header = ?header, - "[park] entry: {}, next_stage: '{next_stage}', status remains the same", - header.display_entry_id() + entry = %header.display_entry_id(), + qid = %header.vqueue_id(), + "{}->{}, status: {}", + header.stage(), + next_stage, + header.status(), ); self.storage @@ -642,20 +646,13 @@ where &mut self, at: UniqueTimestamp, header: &impl EntryStatusHeader, - run_at: Option, - updated_metadata: Option, - new_status: Status, + run_at: Option, + reason: YieldReason, ) { let vqueue_id = header.vqueue_id(); let meta = self.cache.get_mut(self.handle).unwrap(); assert_eq!(vqueue_id, meta.vqueue_id()); - debug!( - header = ?header, - "[yield] entry: {}, next_stage: '{}', new_status: {new_status}", - Stage::Inbox, - header.display_entry_id() - ); // Remove from running and move to waiting self.storage .delete_vqueue_inbox(vqueue_id, header.stage(), header.entry_key()); @@ -679,37 +676,67 @@ where // We can be asked to wake up but not run immediately (or get a lower run_at for priority // boosting). If that's the case, we mutate the entry key to reflect that. - let modified_key = { - let run_at = match run_at { - // Future: ceil to the next whole second so we never fire early. - Some(t) if t > at.to_unix_millis() => { - Some(RoughTimestamp::from_unix_millis_ceil(t)) - } - // Already past: floor is safe — it's eligible immediately regardless. - Some(t) => Some(RoughTimestamp::from_unix_millis_clamped(t)), - // Leave as is - None => None, - }; - header.entry_key().set_run_at(run_at) - }; + let modified_key = header.entry_key().set_run_at(run_at); let stats = Self::mark_yield(at, header.stats()); - let maybe_new_metadata = updated_metadata.unwrap_or_else(|| header.metadata().clone()); + let (status, metadata) = match reason { + YieldReason::Unknown + | YieldReason::InvokerLoadShedding + | YieldReason::PartitionLeaderChange => { + // The reason could be coming from a future version of restate. + // It's okay to have an unknown reason, we'll continue to yield as usual. + (Status::Yielded, header.metadata().clone()) + } + YieldReason::ExhaustedMemoryBudget { needed_memory } => { + let current_metadata = header.metadata().clone(); + ( + Status::Yielded, + EntryMetadata { + needed_memory: Some(needed_memory), + ..current_metadata + }, + ) + } + YieldReason::TransientError { + retry_attempts, + retry_count_since_last_stored_command, + } => { + let current_metadata = header.metadata().clone(); + ( + Status::BackingOff, + EntryMetadata { + retry_attempts, + retry_count_since_last_stored_command, + ..current_metadata + }, + ) + } + }; + + debug!( + entry = %header.display_entry_id(), + qid = %header.vqueue_id(), + "{}->{} due to '{}', status: {}->{status}", + header.stage(), + Stage::Inbox, + reason, + header.status(), + ); self.storage.put_vqueue_entry_status( vqueue_id, Stage::Inbox, &modified_key, - &maybe_new_metadata, + &metadata, stats.clone(), - new_status, + status, ); let value = EntryValue { stats, - status: new_status, - metadata: maybe_new_metadata, + status, + metadata, }; // We add the entry back into the inbox stage @@ -754,11 +781,12 @@ where .delete_vqueue_inbox(vqueue_id, header.stage(), header.entry_key()); debug!( - header = ?header, - key = ?header.entry_key(), - "[end] entry: {}, next_stage: '{}', new_status: {new_status}", - header.display_entry_id(), + entry = %header.display_entry_id(), + qid = %header.vqueue_id(), + "{}->{}, status: {}->{new_status}", + header.stage(), Stage::Finished, + header.status(), ); let update = metadata::Update::new( @@ -982,7 +1010,7 @@ where return; } - debug!("Pausing vqueue {}", meta.vqueue_id()); + debug!(qid = %meta.vqueue_id(), "Pausing vqueue"); let update = metadata::Update::new(at, metadata::Action::PauseVQueue {}); // Update cache @@ -1010,7 +1038,7 @@ where // queue is not paused return; } - debug!("Resuming vqueue {}", meta.vqueue_id()); + debug!(qid = %meta.vqueue_id(), "Resuming vqueue"); let update = metadata::Update::new(at, metadata::Action::ResumeVQueue {}); // Update cache diff --git a/crates/vqueues/src/scheduler.rs b/crates/vqueues/src/scheduler.rs index 9839cef801..966a50aec7 100644 --- a/crates/vqueues/src/scheduler.rs +++ b/crates/vqueues/src/scheduler.rs @@ -18,7 +18,7 @@ use std::task::Poll; use restate_limiter::RuleUpdate; use restate_storage_api::StorageError; use restate_storage_api::vqueue_table::scheduler::{RunAction, SchedulerAction, YieldAction}; -use restate_storage_api::vqueue_table::{EntryKey, ScanVQueueTable, VQueueStore}; +use restate_storage_api::vqueue_table::{EntryKey, EntryMetadata, ScanVQueueTable, VQueueStore}; use restate_types::identifiers::PartitionKey; use restate_types::vqueues::VQueueId; use restate_worker_api::resources::ReservedResources; @@ -43,7 +43,7 @@ mod vqueue_state; // Re-exports pub use resource_manager::ResourceManager; -type UnconfirmedAssignments = hashbrown::HashMap; +type UnconfirmedAssignments = hashbrown::HashMap; fn status_from_detailed_eligibility(value: DetailedEligibility) -> SchedulingStatus { match value { diff --git a/crates/vqueues/src/scheduler/drr.rs b/crates/vqueues/src/scheduler/drr.rs index 41ec739fa9..9ec37fced9 100644 --- a/crates/vqueues/src/scheduler/drr.rs +++ b/crates/vqueues/src/scheduler/drr.rs @@ -220,7 +220,7 @@ impl DRRScheduler { let qstate = self.q.get_mut(handle)?; // I've the resources. Let's run it. - let permit = qstate.remove_from_unconfirmed_assignments(key)?; + let (permit, metadata) = qstate.remove_from_unconfirmed_assignments(key)?; counter!(VQUEUE_RUN_CONFIRMED).increment(1); if qstate.is_dormant(slot.meta()) { @@ -231,7 +231,7 @@ impl DRRScheduler { self.q.remove(handle); self.eligible.remove(handle); } - Some(permit.build(&self.resource_manager)) + Some(permit.build(metadata, &self.resource_manager)) } /// Forward a batch of rule-book updates to the embedded resource manager. @@ -301,7 +301,7 @@ impl DRRScheduler { // 3. None of the above, removing only changes the vqueue metadata. // // If we have been holding a concurrency permit for this item, we release it. - if let Some(permit) = qstate.remove_from_unconfirmed_assignments(key) { + if let Some((permit, _)) = qstate.remove_from_unconfirmed_assignments(key) { // Case 1: // This item is _not_ going to run, so we revert its built up permit self.resource_manager diff --git a/crates/vqueues/src/scheduler/resource_manager/permit.rs b/crates/vqueues/src/scheduler/resource_manager/permit.rs index d06c8f3ace..f0dcd84fc7 100644 --- a/crates/vqueues/src/scheduler/resource_manager/permit.rs +++ b/crates/vqueues/src/scheduler/resource_manager/permit.rs @@ -10,13 +10,13 @@ use smallvec::SmallVec; +use restate_futures_util::concurrency::Permit; +use restate_storage_api::vqueue_table::EntryMetadata; +use restate_types::{LockName, Scope}; use restate_worker_api::resources::{ ReservedResources, SystemPermit, ThrottlingToken, UserPermitKind, }; -use restate_futures_util::concurrency::Permit; -use restate_types::{LockName, Scope}; - use super::ResourceManager; // Holds incrementally secured resources @@ -58,8 +58,13 @@ impl PermitBuilder { } } - pub(crate) fn build(self, resource_manager: &ResourceManager) -> ReservedResources { + pub(crate) fn build( + self, + metadata: EntryMetadata, + resource_manager: &ResourceManager, + ) -> ReservedResources { ReservedResources::new( + metadata, self.user_permit.expect("user permit must be set").resources, self.system_permit, resource_manager.tx.clone(), diff --git a/crates/vqueues/src/scheduler/vqueue_state.rs b/crates/vqueues/src/scheduler/vqueue_state.rs index bcb3406224..d34ca8721f 100644 --- a/crates/vqueues/src/scheduler/vqueue_state.rs +++ b/crates/vqueues/src/scheduler/vqueue_state.rs @@ -13,11 +13,13 @@ use std::time::Duration; use enum_map::{Enum, EnumMap}; use metrics::counter; -use restate_storage_api::StorageError; -use restate_storage_api::vqueue_table::metadata::VQueueMeta; use tokio::time::Instant; use restate_clock::RoughTimestamp; +use restate_storage_api::StorageError; +use restate_storage_api::vqueue_table::EntryMetadata; +use restate_storage_api::vqueue_table::metadata::VQueueMeta; +use restate_storage_api::vqueue_table::scheduler::YieldReason; use restate_storage_api::vqueue_table::{EntryKey, EntryValue, VQueueStore, stats::WaitStats}; use restate_types::vqueues::{EntryId, VQueueId}; use restate_worker_api::ResourceKind; @@ -274,6 +276,7 @@ impl VQueueState { let action = YieldAction { key: *inbox_head_key, next_run_at: None, + reason: YieldReason::PartitionLeaderChange, }; self.queue.try_advance()?; return Ok(Pop::Yield(action)); @@ -288,8 +291,10 @@ impl VQueueState { &mut self.current_permit, ) { AcquireOutcome::Acquired(resources) => { - self.unconfirmed_assignments - .insert(*inbox_head_key, resources); + self.unconfirmed_assignments.insert( + *inbox_head_key, + (resources, inbox_head_value.metadata.clone()), + ); let action = RunAction { key: *inbox_head_key, @@ -364,7 +369,10 @@ impl VQueueState { } } - pub fn remove_from_unconfirmed_assignments(&mut self, key: &EntryKey) -> Option { + pub fn remove_from_unconfirmed_assignments( + &mut self, + key: &EntryKey, + ) -> Option<(PermitBuilder, EntryMetadata)> { self.unconfirmed_assignments.remove(key) } diff --git a/crates/worker-api/src/invoker/effects.rs b/crates/worker-api/src/invoker/effects.rs index 7a0389990f..c15e17ede2 100644 --- a/crates/worker-api/src/invoker/effects.rs +++ b/crates/worker-api/src/invoker/effects.rs @@ -10,7 +10,8 @@ use std::collections::HashSet; -use restate_memory::NonZeroByteCount; +use restate_clock::RoughTimestamp; +use restate_storage_api::vqueue_table::scheduler::YieldReason; use restate_types::deployment::PinnedDeployment; use restate_types::errors::InvocationError; use restate_types::identifiers::InvocationId; @@ -79,7 +80,12 @@ pub enum EffectKind { /// The invoker yielded the invocation back to the scheduler. The partition /// processor should re-schedule the invocation (via [`YieldReason`] the /// scheduler can apply reason-specific strategies in the future). - Yield(YieldReason), + Yield { + reason: YieldReason, + #[serde(skip_serializing_if = "Option::is_none")] + error_event: Option, + resume_at: Option, + }, /// This is sent always after [`Self::JournalEntry`] with `OutputStreamEntry`(s). End, /// This is sent when the invoker exhausted all its attempts to make progress on the specific invocation. @@ -113,20 +119,3 @@ impl EffectKind { // } } } - -/// Why the invoker yielded the invocation back to the scheduler. -/// -/// New reasons can be added without a version barrier — nodes that don't -/// recognize a reason will deserialize it as [`Unknown`](Self::Unknown) and -/// apply the default re-scheduling strategy (immediate re-invoke). -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -#[cfg_attr(feature = "serde", serde(tag = "reason"))] -pub enum YieldReason { - /// The invocation exhausted its outbound memory budget. - ExhaustedMemoryBudget { needed_memory: NonZeroByteCount }, - /// A yield reason not recognized by this node version. The partition - /// processor applies the default strategy (re-schedule immediately). - #[cfg_attr(feature = "serde", serde(other))] - Unknown, -} diff --git a/crates/worker-api/src/invoker/mod.rs b/crates/worker-api/src/invoker/mod.rs index 25a7ac4881..70356985b5 100644 --- a/crates/worker-api/src/invoker/mod.rs +++ b/crates/worker-api/src/invoker/mod.rs @@ -19,4 +19,5 @@ pub use effects::*; pub use entry_enricher::EntryEnricher; pub use handle::*; pub use invocation_reader::{InvocationReaderError, JournalKind, JournalMetadata}; +pub use restate_storage_api::vqueue_table::scheduler::YieldReason; pub use status_handle::{InvocationErrorReport, InvocationStatusReport, StatusHandle}; diff --git a/crates/worker-api/src/resources.rs b/crates/worker-api/src/resources.rs index 0912a267d3..f2b69a6c42 100644 --- a/crates/worker-api/src/resources.rs +++ b/crates/worker-api/src/resources.rs @@ -14,6 +14,7 @@ use tokio::sync::mpsc; use restate_futures_util::concurrency::Permit; use restate_limiter::LimitKey; use restate_memory::MemoryLease; +use restate_storage_api::vqueue_table::EntryMetadata; use restate_types::Scope; use restate_util_string::ReString; @@ -72,6 +73,7 @@ impl SystemPermit { #[must_use] #[clippy::has_significant_drop] pub struct ReservedResources { + pub metadata: EntryMetadata, resources: SmallVec<[UserPermitKind; 1]>, system_permit: SystemPermit, manager_tx: Option>, @@ -80,6 +82,7 @@ pub struct ReservedResources { impl ReservedResources { pub fn new_empty() -> Self { Self { + metadata: EntryMetadata::default(), resources: SmallVec::new(), system_permit: SystemPermit::default(), manager_tx: None, @@ -87,11 +90,13 @@ impl ReservedResources { } pub const fn new( + metadata: EntryMetadata, resources: SmallVec<[UserPermitKind; 1]>, system_permit: SystemPermit, manager_tx: mpsc::UnboundedSender, ) -> Self { Self { + metadata, resources, system_permit, manager_tx: Some(manager_tx), diff --git a/crates/worker/src/partition/state_machine/lifecycle/mod.rs b/crates/worker/src/partition/state_machine/lifecycle/mod.rs index e97ef6b34d..7cfa923198 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/mod.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/mod.rs @@ -24,6 +24,7 @@ mod restart_as_new; mod resume; mod suspend; mod version_barrier; +mod yield_invocation; pub(super) use cancel::OnCancelCommand; pub(super) use event::ApplyEventCommand; @@ -41,3 +42,4 @@ pub(super) use restart_as_new::OnRestartAsNewInvocationCommand; pub(super) use resume::ResumeInvocationCommand; pub(super) use suspend::OnSuspendCommand; pub(super) use version_barrier::OnVersionBarrierCommand; +pub(super) use yield_invocation::YieldInvocationCommand; diff --git a/crates/worker/src/partition/state_machine/lifecycle/pinned_deployment.rs b/crates/worker/src/partition/state_machine/lifecycle/pinned_deployment.rs index 003d44446a..14d5c5c6f6 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/pinned_deployment.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/pinned_deployment.rs @@ -89,6 +89,9 @@ where in_flight_invocation_metadata .set_pinned_deployment(self.pinned_deployment, ctx.record_created_at); + // todo: set the pinned deployment in the vqueue entry metadata so that the scheduler can know + // about it. + // We recreate the InvocationStatus in Invoked state as the invoker can notify the // chosen deployment_id only when the invocation is in-flight. ctx.storage diff --git a/crates/worker/src/partition/state_machine/lifecycle/yield_invocation.rs b/crates/worker/src/partition/state_machine/lifecycle/yield_invocation.rs new file mode 100644 index 0000000000..e06f8ebf7f --- /dev/null +++ b/crates/worker/src/partition/state_machine/lifecycle/yield_invocation.rs @@ -0,0 +1,83 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use tracing::{debug, info}; + +use restate_clock::{RoughTimestamp, UniqueTimestamp}; +use restate_storage_api::lock_table::WriteLockTable; +use restate_storage_api::vqueue_table::scheduler::YieldReason; +use restate_storage_api::vqueue_table::{ + EntryStatusHeader, ReadVQueueTable, Stage, WriteVQueueTable, +}; +use restate_types::identifiers::InvocationId; +use restate_types::sharding::WithPartitionKey; +use restate_types::vqueues::EntryId; +use restate_vqueues::VQueue; + +use crate::debug_if_leader; +use crate::partition::state_machine::{CommandHandler, Error, StateMachineApplyContext}; + +/// Yields an invocation from the running stage back to the inbox stage the vqueue it belongs to. +/// +/// NOTE: this requires vqueues to be enabled or that the invocation was driven by the vqueue +/// scheduler +pub struct YieldInvocationCommand<'e> { + pub invocation_id: &'e InvocationId, + pub yield_reason: YieldReason, + pub resume_at: Option, +} + +impl<'e, 'ctx: 'e, 's: 'ctx, S> CommandHandler<&'ctx mut StateMachineApplyContext<'s, S>> + for YieldInvocationCommand<'e> +where + S: WriteVQueueTable + ReadVQueueTable + WriteLockTable + ReadVQueueTable, +{ + async fn apply(self, ctx: &'ctx mut StateMachineApplyContext<'s, S>) -> Result<(), Error> { + debug_if_leader!(ctx.is_leader, "Effect: Yield"); + + // From scheduler's yield + let at = UniqueTimestamp::from_unix_millis_unchecked(ctx.record_created_at); + let entry_id = EntryId::from(self.invocation_id); + + let Some(header) = ctx + .storage + .get_vqueue_entry_status(self.invocation_id.partition_key(), &entry_id) + .await? + else { + // It could be that the invocation was killed and purged before the invoker yielded it. + info!( + "Not yielding {} because it has no vqueue state", + self.invocation_id, + ); + return Ok(()); + }; + + if !matches!(header.stage(), Stage::Running) { + debug!( + "Not yielding {} because it is not in running stage. Current stage is {}", + self.invocation_id, + header.stage() + ); + return Ok(()); + } + + VQueue::get( + header.vqueue_id(), + ctx.storage, + ctx.vqueues_cache, + ctx.is_leader.then_some(ctx.action_collector), + ) + .await? + .expect("yielding in a non-existent vqueue") + .yield_entry(at, &header, self.resume_at, self.yield_reason); + + Ok(()) + } +} diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 0d45958479..666e8a3e17 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -59,7 +59,7 @@ use restate_storage_api::service_status_table::{ use restate_storage_api::state_table::{ReadStateTable, WriteStateTable}; use restate_storage_api::timer_table::TimerKey; use restate_storage_api::timer_table::{Timer, WriteTimerTable}; -use restate_storage_api::vqueue_table::scheduler; +use restate_storage_api::vqueue_table::scheduler::{self, YieldReason}; use restate_storage_api::vqueue_table::{self, EntryKey, Stage}; use restate_storage_api::vqueue_table::{EntryStatusHeader, ReadVQueueTable, WriteVQueueTable}; use restate_storage_api::{Result as StorageResult, journal_table}; @@ -512,45 +512,16 @@ impl StateMachineApplyContext<'_, S> { .await?; } scheduler::SchedulerAction::Yield(yield_action) => { - let at = UniqueTimestamp::from_unix_millis_unchecked( - self.record_created_at, - ); - let Some(header) = self - .storage - .get_vqueue_entry_status( - qid.partition_key(), - yield_action.key.entry_id(), - ) - .await? - else { - info!( - vqueue = %qid, - "Not yielding {} because it has no vqueue state", - yield_action.key.entry_id().display(qid.partition_key()) - ); - continue; - }; - // move the entry from inbox and notify the scheduler that it has started - // also, ship to invoker. - let mut vqueue = VQueue::get( - qid, - self.storage, - self.vqueues_cache, - self.is_leader.then_some(self.action_collector), - ) - .await? - .expect("yielding in a non-existent vqueue"); - vqueue.yield_entry( - at, - &header, - None, - // todo: this would be a good place to pick up if the invocation - // metadata should be updated (i.e. a deployment was pinned) - None, - // We are assuming it's an invocation because state mutations - // can never be observed in the run stage. - vqueue_table::Status::Yielded, - ); + let entry_id = yield_action.key.entry_id(); + lifecycle::YieldInvocationCommand { + invocation_id: + &entry_id.to_invocation_id(qid.partition_key()) + .expect("This version does not support yielding vqueues entries other than invocations"), + resume_at: yield_action.next_run_at, + yield_reason: yield_action.reason, + } + .apply(self) + .await?; } } } @@ -2826,14 +2797,53 @@ impl StateMachineApplyContext<'_, S> { ) .await?; } - InvokerEffectKind::Yield(ref reason) => { - let invocation_metadata = invocation_status - .into_invocation_metadata() - .expect("Must be present if status is invoked"); - debug_if_leader!(self.is_leader, ?reason, "Effect: Yield invocation"); - // todo pass memory requirements from the reason to the vqueue scheduler and invoker - self.do_resume_service(effect.invocation_id, invocation_metadata) + InvokerEffectKind::Yield { + reason, + error_event, + resume_at, + } => { + if let Some(event) = error_event { + // Submit the journal event if we have one + lifecycle::ApplyEventCommand { + invocation_id: &effect.invocation_id, + invocation_status: &invocation_status, + event, + } + .apply(self) .await?; + } + + // Special casing for memory-budget yields when vqueues are disabled. + // todo: remove when vqueues are always enabled + if self.is_leader + && let YieldReason::ExhaustedMemoryBudget { .. } = reason + && !Configuration::pinned() + .common + .experimental + .is_vqueues_enabled() + { + let Some(invocation_target) = invocation_status.invocation_target().cloned() + else { + return Ok(()); + }; + + debug_if_leader!(self.is_leader, "Effect: Yield"); + + self.action_collector.push(Action::Invoke { + invocation_id: effect.invocation_id, + invocation_target, + }); + return Ok(()); + } + + // Submit the journal event if we have one + lifecycle::YieldInvocationCommand { + invocation_id: &effect.invocation_id, + yield_reason: reason, + resume_at, + } + .apply(self) + .await?; } } @@ -3244,7 +3254,6 @@ impl StateMachineApplyContext<'_, S> { if header.has_started() { // We fallthrough if the invocation was never started so we can initialize the journal. debug_if_leader!(self.is_leader, "Invoke"); - info!("Resuming invocation {invocation_id}, scheduler stats: {wait_stats:?}"); let status = self.get_invocation_status(&invocation_id).await?; let mut vqueue = VQueue::get( @@ -5373,8 +5382,6 @@ impl StateMachineApplyContext<'_, S> { let now = UniqueTimestamp::from_unix_millis_unchecked(self.record_created_at); - // todo: this would be a good place to pick up if the invocation - // metadata should be updated (i.e. a deployment was pinned) match header.stage() { Stage::Suspended => { vqueue.wake_up(now, &header, None, None); @@ -5383,7 +5390,7 @@ impl StateMachineApplyContext<'_, S> { vqueue.wake_up(now, &header, None, None); } Stage::Running => { - vqueue.yield_entry(now, &header, None, None, header.status()); + vqueue.yield_entry(now, &header, None, YieldReason::Unknown); } Stage::Inbox => { // nothing to do if we are already in the inbox diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index fc72c2f8bc..2fd3fb1c99 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -1286,11 +1286,15 @@ async fn yield_effect_resumes_invocation() { let actions = test_env .apply(commands::InvokerEffectCommand::test_envelope(Effect { invocation_id, - kind: EffectKind::Yield(YieldReason::ExhaustedMemoryBudget { - needed_memory: restate_memory::NonZeroByteCount::new( - NonZeroUsize::new(32768).unwrap(), - ), - }), + kind: EffectKind::Yield { + error_event: None, + resume_at: None, + reason: YieldReason::ExhaustedMemoryBudget { + needed_memory: restate_memory::NonZeroByteCount::new( + NonZeroUsize::new(32768).unwrap(), + ), + }, + }, })) .await; diff --git a/util/bytecount/Cargo.toml b/util/bytecount/Cargo.toml index 67fa62e9a4..c7541b8968 100644 --- a/util/bytecount/Cargo.toml +++ b/util/bytecount/Cargo.toml @@ -11,12 +11,14 @@ publish = false default = [] serde = ["dep:serde_core"] schema = ["dep:schemars"] +bilrost = ["dep:bilrost"] [dependencies] restate-workspace-hack = { workspace = true } restate-util-string = { workspace = true } +bilrost = { workspace = true, optional = true } bytesize = { workspace = true } schemars = { workspace = true, optional = true } serde_core = { workspace = true, optional = true } diff --git a/util/bytecount/src/bilrost_encoding.rs b/util/bytecount/src/bilrost_encoding.rs new file mode 100644 index 0000000000..7821c2d893 --- /dev/null +++ b/util/bytecount/src/bilrost_encoding.rs @@ -0,0 +1,85 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use bilrost::Canonicity::Canonical; +use bilrost::encoding::{DistinguishedProxiable, ForOverwrite, Proxiable}; +use bilrost::{Canonicity, DecodeErrorKind}; + +use crate::ByteCount; + +impl Proxiable for ByteCount { + type Proxy = u64; + + fn encode_proxy(&self) -> Self::Proxy { + self.as_u64() + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + *self = ByteCount(proxy); + Ok(()) + } +} + +impl DistinguishedProxiable for ByteCount { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + self.decode_proxy(proxy)?; + Ok(Canonical) + } +} + +bilrost::empty_state_via_default!(ByteCount); + +bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Varint) + to encode proxied type (ByteCount) + with general encodings including distinguished +); + +impl Proxiable for ByteCount { + type Proxy = u64; + + fn encode_proxy(&self) -> Self::Proxy { + self.as_u64() + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + *self = Self::try_from(proxy).map_err(|_| DecodeErrorKind::InvalidValue)?; + Ok(()) + } +} + +impl DistinguishedProxiable for ByteCount { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + self.decode_proxy(proxy)?; + Ok(Canonical) + } +} + +impl ForOverwrite<(), ByteCount> for () { + fn for_overwrite() -> ByteCount { + // default to MIN (1 in case of non-zero). This value is never used + // it's simply an initialization value until decoding is done. + ByteCount(1) + } +} + +bilrost::empty_state_via_for_overwrite!(ByteCount); + +bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Varint) + to encode proxied type (ByteCount) + with general encodings including distinguished +); diff --git a/util/bytecount/src/lib.rs b/util/bytecount/src/lib.rs index c79da4d2be..68a2680a78 100644 --- a/util/bytecount/src/lib.rs +++ b/util/bytecount/src/lib.rs @@ -14,6 +14,9 @@ mod serde; #[cfg(feature = "schema")] mod schema; +#[cfg(feature = "bilrost")] +mod bilrost_encoding; + use std::cmp::Ordering; use std::fmt::{self, Display}; use std::num::{NonZeroU64, NonZeroUsize};