diff --git a/Cargo.lock b/Cargo.lock index 262869c1f7..68681be089 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8360,7 +8360,6 @@ dependencies = [ "restate-util-string", "restate-workspace-hack", "serde", - "smallvec", "static_assertions", "strum", "thiserror 2.0.18", diff --git a/crates/clock/src/rough_ts.rs b/crates/clock/src/rough_ts.rs index 40f9a9d9d5..09c0e52abb 100644 --- a/crates/clock/src/rough_ts.rs +++ b/crates/clock/src/rough_ts.rs @@ -378,6 +378,13 @@ mod bilrost_encoding { to encode proxied type (RoughTimestamp) with general encodings including distinguished ); + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (RoughTimestamp) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); } #[cfg(test)] @@ -526,4 +533,23 @@ mod tests { ); } } + + #[test] + fn bilrost_fixed_round_trip() { + use bilrost::Message; + + #[derive(bilrost::Message, PartialEq, Debug)] + struct TestMessage { + #[bilrost(tag(1), encoding(fixed))] + timestamp: RoughTimestamp, + } + + let msg = TestMessage { + timestamp: RoughTimestamp::new(123), + }; + + let encoded = msg.encode_to_vec(); + assert_eq!(encoded.len(), 5); + assert_eq!(TestMessage::decode(encoded.as_slice()), Ok(msg)); + } } diff --git a/crates/clock/src/time.rs b/crates/clock/src/time.rs index 0b9458dc29..b7dffea52b 100644 --- a/crates/clock/src/time.rs +++ b/crates/clock/src/time.rs @@ -347,6 +347,13 @@ mod bilrost_encoding { to encode proxied type (MillisSinceEpoch) with general encodings including distinguished ); + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (MillisSinceEpoch) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); } /// Nanos since the unix epoch. Used internally to get rough latency measurements across nodes. diff --git a/crates/clock/src/unique_timestamp.rs b/crates/clock/src/unique_timestamp.rs index 9014da2f59..862d13c371 100644 --- a/crates/clock/src/unique_timestamp.rs +++ b/crates/clock/src/unique_timestamp.rs @@ -300,6 +300,13 @@ mod bilrost_encoding { to encode proxied type (UniqueTimestamp) with general encodings including distinguished ); + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (UniqueTimestamp) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); } #[cfg(test)] diff --git a/crates/encoding/src/common.rs b/crates/encoding/src/common.rs index 422216aafc..00b43b9a42 100644 --- a/crates/encoding/src/common.rs +++ b/crates/encoding/src/common.rs @@ -9,10 +9,9 @@ // by the Apache License, Version 2.0. use bilrost::{ - DecodeErrorKind, - encoding::{EmptyState, ForOverwrite, Proxiable}, + Canonicity, DecodeErrorKind, + encoding::{DistinguishedProxiable, EmptyState, ForOverwrite, Proxiable}, }; -use restate_encoding_derive::BilrostNewType; use restate_platform::network::NetSerde; use crate::bilrost_encodings::RestateEncoding; @@ -60,25 +59,114 @@ bilrost::delegate_proxied_encoding!( ); /// A Bilrost compatible U128 type. -#[derive(Debug, Clone, Copy, PartialEq, Eq, BilrostNewType)] -pub struct U128((u64, u64)); +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct U128([u8; core::mem::size_of::()]); impl From for U128 { fn from(value: u128) -> Self { - Self(((value >> 64) as u64, value as u64)) + Self(value.to_le_bytes()) } } impl From for u128 { fn from(value: U128) -> Self { - (value.0.0 as u128) << 64 | value.0.1 as u128 + Self::from_le_bytes(value.0) } } impl NetSerde for U128 {} +struct GeneralU128Tag; + +impl Proxiable for U128 { + type Proxy = (u64, u64); + + fn encode_proxy(&self) -> Self::Proxy { + let value = u128::from(*self); + ((value >> 64) as u64, value as u64) + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + *self = U128::from((u128::from(proxy.0) << 64) | u128::from(proxy.1)); + Ok(()) + } +} + +impl DistinguishedProxiable for U128 { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + >::decode_proxy(self, proxy)?; + Ok(Canonicity::Canonical) + } +} + +impl ForOverwrite<(), U128> for () { + fn for_overwrite() -> U128 { + U128([0; core::mem::size_of::()]) + } +} + +impl EmptyState<(), U128> for () { + fn empty() -> U128 { + U128([0; core::mem::size_of::()]) + } + + fn is_empty(val: &U128) -> bool { + val.0 == [0; core::mem::size_of::()] + } + + fn clear(val: &mut U128) { + val.0 = [0; core::mem::size_of::()]; + } +} + +bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::General) + to encode proxied type (U128) using proxy tag (GeneralU128Tag) + with general encodings including distinguished +); + +struct FixedU128Tag; + +// Keep the general `U128` encoding as the historical `(u64, u64)` tuple above +// for existing wire compatibility. The fixed encoding is opt-in via +// `#[bilrost(encoding(fixed))]` and uses a length-delimited 16-byte little-endian +// payload because bilrost has no fixed128 scalar wire type. +impl Proxiable for U128 { + type Proxy = [u8; core::mem::size_of::()]; + + fn encode_proxy(&self) -> Self::Proxy { + self.0 + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + self.0 = proxy; + Ok(()) + } +} + +impl DistinguishedProxiable for U128 { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + >::decode_proxy(self, proxy)?; + Ok(Canonicity::Canonical) + } +} + +bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::PlainBytes) + to encode proxied type (U128) using proxy tag (FixedU128Tag) + with encoding (bilrost::encoding::Fixed) + including distinguished +); + #[cfg(test)] mod test { + use bilrost::{Message, OwnedMessage}; use rand::random; use super::U128; @@ -92,4 +180,52 @@ mod test { assert_eq!(num, u128::from(value)); }); } + + #[test] + fn u128_fixed_bilrost_round_trip() { + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedU128 { + #[bilrost(tag(1), encoding(fixed))] + value: U128, + } + + let raw = 0x1234_5678_90ab_cdef_0123_4567_89ab_cdef; + let value = EncodedU128 { + value: U128::from(raw), + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.len(), 18); + assert_eq!(encoded[0], 0x05); + assert_eq!(encoded[1], 16); + assert_eq!(&encoded[2..], &raw.to_le_bytes()); + assert_eq!(EncodedU128::decode(encoded).unwrap(), value); + } + + #[test] + fn u128_general_bilrost_keeps_high_low_tuple_encoding() { + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedU128 { + #[bilrost(tag(1))] + value: U128, + } + + #[derive(Debug, PartialEq, bilrost::Message)] + struct HistoricalEncodedU128 { + #[bilrost(tag(1))] + value: (u64, u64), + } + + let raw = 0x1234_5678_90ab_cdef_0123_4567_89ab_cdef; + let value = EncodedU128 { + value: U128::from(raw), + }; + let historical = HistoricalEncodedU128 { + value: ((raw >> 64) as u64, raw as u64), + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded, historical.encode_to_bytes()); + assert_eq!(EncodedU128::decode(encoded).unwrap(), value); + } } diff --git a/crates/partition-store/Cargo.toml b/crates/partition-store/Cargo.toml index fcbdcdf312..56ea6a5e1a 100644 --- a/crates/partition-store/Cargo.toml +++ b/crates/partition-store/Cargo.toml @@ -80,5 +80,9 @@ tracing-subscriber = { workspace = true } name = "basic_benchmark" harness = false +[[bench]] +name = "vqueue_meta_merge" +harness = false + [lints] workspace = true diff --git a/crates/partition-store/benches/vqueue_meta_merge.rs b/crates/partition-store/benches/vqueue_meta_merge.rs new file mode 100644 index 0000000000..32f6a7e520 --- /dev/null +++ b/crates/partition-store/benches/vqueue_meta_merge.rs @@ -0,0 +1,152 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use bilrost::Message; +use criterion::{Criterion, Throughput, black_box, criterion_group, criterion_main}; + +use restate_clock::UniqueTimestamp; +use restate_clock::time::MillisSinceEpoch; +use restate_limiter::LimitKey; +use restate_partition_store::vqueue_table::{MetaKey, vqueue_meta_merge}; +use restate_storage_api::vqueue_table::Stage; +use restate_storage_api::vqueue_table::metadata::{ + Action, MoveMetrics, Update, VQueueLink, VQueueMeta, +}; +use restate_types::vqueues::VQueueId; + +const BASE_TS_MS: u64 = 1_744_000_000_000; +const MERGE_OPERANDS: usize = 10_000; + +fn ts(offset_ms: u64) -> UniqueTimestamp { + UniqueTimestamp::from_unix_millis_unchecked(MillisSinceEpoch::new(BASE_TS_MS + offset_ms)) +} + +fn move_metrics( + last_transition_at: UniqueTimestamp, + first_runnable_at: MillisSinceEpoch, + has_started: bool, + blocked_on_concurrency_rules_ms: u32, + blocked_on_invoker_throttling_ms: u32, +) -> MoveMetrics { + MoveMetrics { + last_transition_at, + has_started, + first_runnable_at, + blocked_on_concurrency_rules_ms, + blocked_on_invoker_throttling_ms, + } +} + +fn update_operands() -> Vec> { + let mut operands = Vec::with_capacity(MERGE_OPERANDS); + + for cycle in 0..(MERGE_OPERANDS / 4) { + let base_offset = (cycle as u64) * 4; + let enqueued_at = ts(base_offset); + let started_at = ts(base_offset + 1); + let finished_at = ts(base_offset + 2); + let first_runnable_at = enqueued_at.to_unix_millis(); + + operands.push( + Update::new( + enqueued_at, + Action::Move { + prev_stage: None, + next_stage: Stage::Inbox, + metrics: move_metrics(enqueued_at, first_runnable_at, false, 0, 0), + }, + ) + .encode_contiguous() + .into_vec(), + ); + + operands.push( + Update::new( + started_at, + Action::Move { + prev_stage: Some(Stage::Inbox), + next_stage: Stage::Running, + metrics: move_metrics( + enqueued_at, + first_runnable_at, + false, + (cycle % 1_000) as u32, + (cycle % 100) as u32, + ), + }, + ) + .encode_contiguous() + .into_vec(), + ); + + operands.push( + Update::new( + finished_at, + Action::Move { + prev_stage: Some(Stage::Running), + next_stage: Stage::Finished, + metrics: move_metrics(started_at, first_runnable_at, true, 0, 0), + }, + ) + .encode_contiguous() + .into_vec(), + ); + + operands.push( + Update::new( + ts(base_offset + 3), + Action::RemoveEntry { + stage: Stage::Finished, + }, + ) + .encode_contiguous() + .into_vec(), + ); + } + + operands +} + +fn initial_vqueue_meta() -> Vec { + VQueueMeta::new(ts(0), None, LimitKey::None, VQueueLink::None) + .encode_contiguous() + .into_vec() +} + +fn vqueue_meta_key() -> Vec { + let qid = VQueueId::custom(1, "vqueue-meta-merge-benchmark"); + MetaKey::from(&qid).to_bytes().to_vec() +} + +fn vqueue_meta_merge_benchmark(c: &mut Criterion) { + let key = vqueue_meta_key(); + let existing_value = initial_vqueue_meta(); + let operands = update_operands(); + let operand_slices = operands.iter().map(Vec::as_slice).collect::>(); + + let mut group = c.benchmark_group("vqueue_meta_merge"); + group.throughput(Throughput::Elements(MERGE_OPERANDS as u64)); + group.bench_function("full_merge_10000_operands", |bencher| { + bencher.iter(|| { + let operands = black_box(operand_slices.as_slice()).iter().copied(); + let merged = vqueue_meta_merge::full_merge_slices( + black_box(key.as_slice()), + Some(black_box(existing_value.as_slice())), + operands, + ) + .expect("vqueue metadata merge succeeds"); + black_box(merged); + }); + }); + group.finish(); +} + +criterion_group!(benches, vqueue_meta_merge_benchmark); +criterion_main!(benches); diff --git a/crates/partition-store/src/keys.rs b/crates/partition-store/src/keys.rs index b9c6f3f563..f70f45ba41 100644 --- a/crates/partition-store/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -232,25 +232,15 @@ impl KeyKind { } // Rocksdb merge operator function (partial merge) + #[inline] pub fn partial_merge( - key: &[u8], + _key: &[u8], _unused: Option<&[u8]>, - operands: &MergeOperands, + _operands: &MergeOperands, ) -> Option> { - let mut kind_buf = key; - let kind = match KeyKind::deserialize(&mut kind_buf) { - Ok(kind) => kind, - Err(e) => { - error!("Cannot apply merge operator; {e}"); - return None; - } - }; - trace!(?kind, "partial merge"); - - match kind { - KeyKind::VQueueMeta => vqueue_meta_merge::partial_merge(key, operands), - _ => None, - } + // Currently, we have no partial merge operator for any key. Change this + // if/when this is needed. + None } } diff --git a/crates/partition-store/src/partition_db.rs b/crates/partition-store/src/partition_db.rs index acfcf7de62..f3f5c0df1f 100644 --- a/crates/partition-store/src/partition_db.rs +++ b/crates/partition-store/src/partition_db.rs @@ -555,7 +555,7 @@ impl CfConfigurator for RocksConfigurator { KeyKind::full_merge, KeyKind::partial_merge, ); - cf_options.set_max_successive_merges(100); + cf_options.set_max_successive_merges(5000); cf_options.set_disable_auto_compactions(config.rocksdb.rocksdb_disable_auto_compactions()); if let Some(compaction_period) = config.rocksdb.rocksdb_periodic_compaction_seconds() { diff --git a/crates/partition-store/src/vqueue_table/metadata.rs b/crates/partition-store/src/vqueue_table/metadata.rs index ea35d297dc..fd9fff4c1a 100644 --- a/crates/partition-store/src/vqueue_table/metadata.rs +++ b/crates/partition-store/src/vqueue_table/metadata.rs @@ -57,7 +57,6 @@ impl MetaKey { } } -// todo: check if this is still needed impl From<&VQueueId> for MetaKey { #[inline] fn from(qid: &VQueueId) -> Self { @@ -80,21 +79,30 @@ impl From for MetaKey { } // Rocksdb merge operator for the vqueue keys -pub(crate) mod vqueue_meta_merge { +pub mod vqueue_meta_merge { use bilrost::{Message, OwnedMessage}; use rocksdb::MergeOperands; use tracing::error; - use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaUpdates}; + use restate_storage_api::vqueue_table; + use restate_storage_api::vqueue_table::metadata::VQueueMeta; use crate::keys::DecodeTableKey; use super::MetaKey; pub fn full_merge( - mut key: &[u8], + key: &[u8], existing_val: Option<&[u8]>, operands: &MergeOperands, + ) -> Option> { + full_merge_slices(key, existing_val, operands) + } + + pub fn full_merge_slices<'a>( + mut key: &[u8], + existing_val: Option<&[u8]>, + operands: impl IntoIterator, ) -> Option> { let Some(mut existing_val) = existing_val else { let key = MetaKey::deserialize_from(&mut key); @@ -117,43 +125,21 @@ pub(crate) mod vqueue_meta_merge { } }; + let mut update = ::empty(); for op in operands { - let batch = match VQueueMetaUpdates::decode(op) { - Err(err) => { - let key = MetaKey::deserialize_from(&mut key); - error!( - ?err, - ?key, - "[full merge] Failed to decode vqueue meta batched updates ({} bytes)", - op.len(), - ); - return None; - } - Ok(batch) => batch, - }; - for update in batch.updates.iter() { - vqueue_meta.apply_update(update); + if let Err(err) = update.replace_from_slice(op) { + let key = MetaKey::deserialize_from(&mut key); + error!( + ?err, + ?key, + "[full merge] Failed to decode vqueue meta update ({} bytes)", + op.len(), + ); + return None; } + vqueue_meta.apply_update(&update); } - Some(vqueue_meta.encode_to_vec()) - } - pub fn partial_merge(mut _key: &[u8], operands: &MergeOperands) -> Option> { - let mut updates = - VQueueMetaUpdates::with_capacity(operands.len() * VQueueMetaUpdates::INLINED_UPDATES); - for op in operands { - let partial_updates = match VQueueMetaUpdates::decode(op) { - Err(err) => { - error!( - ?err, - "[partial merge] Failed to decode vqueue meta batched updates" - ); - return None; - } - Ok(u) => u, - }; - updates.extend(partial_updates); - } - Some(updates.encode_to_vec()) + Some(vqueue_meta.encode_contiguous().into_vec()) } } diff --git a/crates/partition-store/src/vqueue_table/mod.rs b/crates/partition-store/src/vqueue_table/mod.rs index f513a719df..1486634207 100644 --- a/crates/partition-store/src/vqueue_table/mod.rs +++ b/crates/partition-store/src/vqueue_table/mod.rs @@ -34,7 +34,7 @@ use tracing::error; use restate_rocksdb::Priority; use restate_storage_api::StorageError; -use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaRef, VQueueMetaUpdates}; +use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaRef}; use restate_storage_api::vqueue_table::{ EntryKey, EntryMetadata, EntryStatusHeader, EntryValue, LazyEntryStatus, ReadVQueueTable, ScanVQueueTable, Stage, Status, WriteVQueueTable, stats::EntryStatistics, @@ -180,15 +180,11 @@ impl WriteVQueueTable for PartitionStoreTransaction<'_> { update: &restate_storage_api::vqueue_table::metadata::Update, ) { let key_buffer = MetaKey::from(qid).to_bytes(); - let updates = VQueueMetaUpdates::new(update.clone()); - let value_buf = { - let value_buf = self.cleared_value_buffer_mut(updates.encoded_len()); - // unwrap is safe because we know the buffer is big enough. - updates.encode(value_buf).unwrap(); - value_buf.split() - }; - - self.raw_merge_cf(KeyKind::VQueueMeta, key_buffer, value_buf); + self.raw_merge_cf( + KeyKind::VQueueMeta, + key_buffer, + update.encode_contiguous().into_vec(), + ); } fn put_vqueue_inbox( diff --git a/crates/platform/src/storage.rs b/crates/platform/src/storage.rs index bd827ac8eb..7d6fb59854 100644 --- a/crates/platform/src/storage.rs +++ b/crates/platform/src/storage.rs @@ -90,6 +90,47 @@ pub enum StorageCodecKind { Custom = 7, } +#[cfg(feature = "bilrost")] +mod bilrost_encoding { + use bilrost::encoding::{DistinguishedProxiable, Proxiable}; + use bilrost::{Canonicity, DecodeErrorKind, Enumeration}; + + use super::StorageCodecKind; + + struct FixedStorageCodecKindTag; + + impl Proxiable for StorageCodecKind { + type Proxy = u32; + + fn encode_proxy(&self) -> Self::Proxy { + ::to_number(self) + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + *self = ::try_from_number(proxy) + .map_err(|_| DecodeErrorKind::OutOfDomainValue)?; + Ok(()) + } + } + + impl DistinguishedProxiable for StorageCodecKind { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + self.decode_proxy(proxy)?; + Ok(Canonicity::Canonical) + } + } + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (StorageCodecKind) using proxy tag (FixedStorageCodecKindTag) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); +} + impl From for u8 { #[inline] fn from(value: StorageCodecKind) -> Self { @@ -207,3 +248,44 @@ impl StorageEncode for bytes::Bytes { Ok(()) } } + +#[cfg(all(test, feature = "bilrost"))] +mod tests { + use bilrost::{Message, OwnedMessage}; + + use super::*; + + #[test] + fn fixed_encoding_round_trips_storage_codec_kind() { + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedStorageCodecKind { + #[bilrost(tag(1), encoding(fixed))] + value: Option, + } + + let value = EncodedStorageCodecKind { + value: Some(StorageCodecKind::Custom), + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.as_ref(), &[0x06, 7, 0, 0, 0]); + assert_eq!(EncodedStorageCodecKind::decode(encoded).unwrap(), value); + } + + #[test] + fn general_encoding_keeps_storage_codec_kind_varint() { + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedStorageCodecKind { + #[bilrost(tag(1))] + value: Option, + } + + let value = EncodedStorageCodecKind { + value: Some(StorageCodecKind::Custom), + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.as_ref(), &[0x04, 7]); + assert_eq!(EncodedStorageCodecKind::decode(encoded).unwrap(), value); + } +} diff --git a/crates/sharding/src/partition_id.rs b/crates/sharding/src/partition_id.rs index df4b7c50b8..c5b1b05b92 100644 --- a/crates/sharding/src/partition_id.rs +++ b/crates/sharding/src/partition_id.rs @@ -67,8 +67,8 @@ impl PartitionId { mod bilrost_impl { use super::PartitionId; - use bilrost::DecodeErrorKind; - use bilrost::encoding::{EmptyState, ForOverwrite, Proxiable}; + use bilrost::encoding::{DistinguishedProxiable, EmptyState, ForOverwrite, Proxiable}; + use bilrost::{Canonicity, DecodeErrorKind}; impl Proxiable for PartitionId { type Proxy = u16; @@ -83,6 +83,33 @@ mod bilrost_impl { } } + struct FixedPartitionIdTag; + + impl Proxiable for PartitionId { + type Proxy = u32; + + fn encode_proxy(&self) -> Self::Proxy { + u32::from(self.0) + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + self.0 = proxy + .try_into() + .map_err(|_| DecodeErrorKind::OutOfDomainValue)?; + Ok(()) + } + } + + impl DistinguishedProxiable for PartitionId { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + >::decode_proxy(self, proxy)?; + Ok(Canonicity::Canonical) + } + } + impl ForOverwrite<(), PartitionId> for () { fn for_overwrite() -> PartitionId { PartitionId(0) @@ -108,4 +135,52 @@ mod bilrost_impl { to encode proxied type (PartitionId) with general encodings ); + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (PartitionId) using proxy tag (FixedPartitionIdTag) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); +} + +#[cfg(all(test, feature = "bilrost"))] +mod tests { + use bilrost::{Message, OwnedMessage}; + + use super::*; + + #[test] + fn fixed_encoding_round_trips_partition_id() { + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedPartitionId { + #[bilrost(tag(1), encoding(fixed))] + value: PartitionId, + } + + let value = EncodedPartitionId { + value: PartitionId::MAX, + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.len(), 5); + assert_eq!(EncodedPartitionId::decode(encoded).unwrap(), value); + } + + #[test] + fn general_encoding_keeps_partition_id_varint() { + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedPartitionId { + #[bilrost(tag(1))] + value: PartitionId, + } + + let value = EncodedPartitionId { + value: PartitionId::MAX, + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.as_ref(), &[0x04, 0xff, 0xfe, 0x02]); + assert_eq!(EncodedPartitionId::decode(encoded).unwrap(), value); + } } diff --git a/crates/storage-api/Cargo.toml b/crates/storage-api/Cargo.toml index 0670510c7b..17efa298e7 100644 --- a/crates/storage-api/Cargo.toml +++ b/crates/storage-api/Cargo.toml @@ -23,7 +23,7 @@ restate-util-bytecount = { workspace = true, features = ["bilrost"] } ahash = { workspace = true } anyhow = { workspace = true } -bilrost = { workspace = true, features = ["smallvec"] } +bilrost = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } derive_more = { workspace = true, features = ["from", "into"] } @@ -31,7 +31,6 @@ futures = { workspace = true } opentelemetry = { workspace = true } prost = { workspace = true } serde = { workspace = true } -smallvec = { workspace = true, features = ["const_new"] } static_assertions = { workspace = true } strum = { workspace = true } thiserror = { workspace = true } diff --git a/crates/storage-api/src/vqueue_table/entry.rs b/crates/storage-api/src/vqueue_table/entry.rs index d12265e4b8..10d9c73770 100644 --- a/crates/storage-api/src/vqueue_table/entry.rs +++ b/crates/storage-api/src/vqueue_table/entry.rs @@ -34,9 +34,9 @@ use super::stats::EntryStatistics; pub struct EntryKey { #[bilrost(tag(1))] has_lock: bool, - #[bilrost(tag(2))] + #[bilrost(tag(2), encoding(fixed))] run_at: RoughTimestamp, - #[bilrost(tag(3))] + #[bilrost(tag(3), encoding(fixed))] seq: Seq, #[bilrost(tag(4))] entry_id: EntryId, @@ -160,7 +160,7 @@ impl EntryKey { pub struct EntryValue { /// Status is copied over from the entry status table when the last transition /// happened. - #[bilrost(tag(1))] + #[bilrost(tag(1), encoding(fixed))] pub status: Status, #[bilrost(tag(2))] pub metadata: EntryMetadata, @@ -193,11 +193,11 @@ pub struct EntryMetadataRef<'a> { deployment: Option<&'a str>, // If set, this is the amount of memory the invocation seems to require to // run on the invoker side. - #[bilrost(tag(2))] + #[bilrost(tag(2), encoding(fixed))] pub needed_memory: Option, - #[bilrost(tag(3))] + #[bilrost(tag(3), encoding(fixed))] pub retry_attempts: u32, - #[bilrost(tag(4))] + #[bilrost(tag(4), encoding(fixed))] pub retry_count_since_last_stored_command: u32, } @@ -221,11 +221,11 @@ pub struct EntryMetadata { // If set, this is the amount of memory the invocation seems to require to // run on the invoker side. - #[bilrost(tag(2))] + #[bilrost(tag(2), encoding(fixed))] pub needed_memory: Option, - #[bilrost(tag(3))] + #[bilrost(tag(3), encoding(fixed))] pub retry_attempts: u32, - #[bilrost(tag(4))] + #[bilrost(tag(4), encoding(fixed))] pub retry_count_since_last_stored_command: u32, } diff --git a/crates/storage-api/src/vqueue_table/entry_status.rs b/crates/storage-api/src/vqueue_table/entry_status.rs index 5beb64493c..2800b0d61c 100644 --- a/crates/storage-api/src/vqueue_table/entry_status.rs +++ b/crates/storage-api/src/vqueue_table/entry_status.rs @@ -49,6 +49,43 @@ pub enum Status { Succeeded, } +mod bilrost_encoding { + use bilrost::encoding::{DistinguishedProxiable, Proxiable}; + use bilrost::{Canonicity, DecodeErrorKind, Enumeration}; + + use super::Status; + + impl Proxiable for Status { + type Proxy = u32; + + fn encode_proxy(&self) -> Self::Proxy { + ::to_number(self) + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + *self = ::try_from_number(proxy).unwrap_or(Status::Unknown); + Ok(()) + } + } + + impl DistinguishedProxiable for Status { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + self.decode_proxy(proxy)?; + Ok(Canonicity::Canonical) + } + } + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (Status) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); +} + pub trait EntryStatusHeader: std::fmt::Debug { fn vqueue_id(&self) -> &VQueueId; fn status(&self) -> Status; @@ -92,3 +129,27 @@ pub trait LazyEntryStatus: EntryStatusHeader { /// A marker trait for types that can be used as entry extra state values. pub trait EntryStatusExtra {} + +#[cfg(test)] +mod tests { + use bilrost::{Message, OwnedMessage}; + + use super::*; + + #[test] + fn fixed_encoding_round_trips_status() { + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedStatus { + #[bilrost(tag(1), encoding(fixed))] + value: Status, + } + + let value = EncodedStatus { + value: Status::Succeeded, + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.as_ref(), &[0x06, 9, 0, 0, 0]); + assert_eq!(EncodedStatus::decode(encoded).unwrap(), value); + } +} diff --git a/crates/storage-api/src/vqueue_table/metadata.rs b/crates/storage-api/src/vqueue_table/metadata.rs index cb5b8ac3ef..f7d3eb6976 100644 --- a/crates/storage-api/src/vqueue_table/metadata.rs +++ b/crates/storage-api/src/vqueue_table/metadata.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use smallvec::SmallVec; - use restate_clock::time::MillisSinceEpoch; use restate_limiter::LimitKey; use restate_types::clock::UniqueTimestamp; @@ -21,7 +19,7 @@ use super::Stage; #[derive(Debug, Clone, bilrost::Message)] pub struct VQueueStatistics { /// Creation time of this vqueue metadata record. - #[bilrost(tag(1))] + #[bilrost(tag(1), encoding(fixed))] pub(crate) created_at: UniqueTimestamp, /// Exponential moving average (EMA) of first-attempt wait time. /// @@ -35,37 +33,37 @@ pub struct VQueueStatistics { /// Last timestamp an entry was moved into `Inbox`. /// /// This covers items enqueued for the first time only. - #[bilrost(tag(3))] + #[bilrost(tag(3), encoding(fixed))] pub(crate) last_enqueued_at: Option, /// Last timestamp an entry had its first transition to `Run`. /// /// This marks when a new entry starts for the first time. - #[bilrost(tag(4))] + #[bilrost(tag(4), encoding(fixed))] pub(crate) last_start_at: Option, /// Last timestamp an entry completed (transitioned into `Finished`) - #[bilrost(tag(5))] + #[bilrost(tag(5), encoding(fixed))] pub(crate) last_finish_at: Option, /// Last timestamp an entry transitioned to `Run`. /// /// This includes both first starts and retries/resumes. - #[bilrost(tag(6))] + #[bilrost(tag(6), encoding(fixed))] pub(crate) last_attempt_at: Option, /// Number of entries currently in `inbox` stage. - #[bilrost(tag(7))] + #[bilrost(tag(7), encoding(fixed))] pub(crate) num_inbox: u64, /// Number of entries currently in `suspended` stage. - #[bilrost(tag(8))] + #[bilrost(tag(8), encoding(fixed))] pub(crate) num_suspended: u64, /// Number of entries currently in `paused` stage. - #[bilrost(tag(9))] + #[bilrost(tag(9), encoding(fixed))] pub(crate) num_paused: u64, /// Number of entries currently in `running` stage. - #[bilrost(tag(10))] + #[bilrost(tag(10), encoding(fixed))] pub(crate) num_running: u64, /// How many entries are in the `Finish` stage. When deleting entries from /// the `Finished` stage, we should decrement this counter. The vqueue becomes /// obsolete when it's completely empty (all counters are zero). - #[bilrost(tag(11))] + #[bilrost(tag(11), encoding(fixed))] pub(crate) num_finished: u64, /// Exponential moving average (EMA) of how long entries stay in `Inbox` before transitioning out of it. #[bilrost(tag(12))] @@ -519,75 +517,29 @@ impl VQueueMeta { } } -/// A collection of differential updates to the vqueue meta data structure. -/// -/// Those updates can be applied to the storage layer via a merge operator and at the same -/// time they can be accepted by the vqueue's cache to keep them in sync. -#[derive(Clone, Default, Debug, bilrost::Message)] -pub struct VQueueMetaUpdates { - #[bilrost(1)] - pub updates: SmallVec<[Update; VQueueMetaUpdates::INLINED_UPDATES]>, -} - #[derive(Debug, Clone, bilrost::Message)] pub struct MoveMetrics { /// Timestamp of the entry's previous stage transition. - #[bilrost(tag(1))] + #[bilrost(tag(1), encoding(fixed))] pub last_transition_at: UniqueTimestamp, /// Whether the entry has started at least once before this transition. #[bilrost(tag(2))] pub has_started: bool, /// Earliest timestamp at which the entry can realistically start. - #[bilrost(tag(3))] + #[bilrost(tag(3), encoding(fixed))] pub first_runnable_at: MillisSinceEpoch, /// Milliseconds the head item spent blocked on user-defined concurrency /// rules during this run attempt. Only populated on Inbox → Running moves; /// zero for every other transition. Feeds `avg_blocked_on_concurrency_rules_ms`. - #[bilrost(tag(4))] + #[bilrost(tag(4), encoding(fixed))] pub blocked_on_concurrency_rules_ms: u32, /// Milliseconds the head item spent in node-level invoker throttling /// during this run attempt. Only populated on Inbox → Running moves; zero /// for every other transition. Feeds `avg_blocked_on_invoker_throttling_ms`. - #[bilrost(tag(5))] + #[bilrost(tag(5), encoding(fixed))] pub blocked_on_invoker_throttling_ms: u32, } -impl VQueueMetaUpdates { - pub const INLINED_UPDATES: usize = 1; - - pub fn new(update: Update) -> Self { - let updates = smallvec::smallvec_inline![update]; - Self { updates } - } - - pub fn with_capacity(capacity: usize) -> Self { - Self { - updates: SmallVec::with_capacity(capacity), - } - } - - #[inline(always)] - pub fn push(&mut self, ts: UniqueTimestamp, action: Action) { - self.updates.push(Update { ts, action }); - } - - pub fn len(&self) -> usize { - self.updates.len() - } - - pub fn iter(&self) -> impl Iterator { - self.updates.iter() - } - - pub fn is_empty(&self) -> bool { - self.updates.is_empty() - } - - pub fn extend(&mut self, other: Self) { - self.updates.extend(other.updates); - } -} - #[derive(Debug, Clone, Default, bilrost::Oneof, bilrost::Message)] pub enum Action { #[default] @@ -599,7 +551,9 @@ pub enum Action { /// if new_stage is Finished, the item has completed. #[bilrost(tag(2), message)] Move { + #[bilrost(encoding(fixed))] prev_stage: Option, + #[bilrost(encoding(fixed))] next_stage: Stage, metrics: MoveMetrics, }, @@ -607,14 +561,17 @@ pub enum Action { PauseVQueue {}, #[bilrost(tag(4), message)] ResumeVQueue {}, - #[bilrost(tag(5), message)] + #[bilrost(tag(5))] /// An item or have been removed from the (stage) - RemoveEntry { stage: Stage }, + RemoveEntry { + #[bilrost(encoding(fixed))] + stage: Stage, + }, } #[derive(Debug, Clone, bilrost::Message)] pub struct Update { - #[bilrost(tag(1))] + #[bilrost(tag(1), encoding(fixed))] pub(super) ts: UniqueTimestamp, #[bilrost(oneof(2, 3, 4, 5))] pub(super) action: Action, diff --git a/crates/storage-api/src/vqueue_table/scheduler.rs b/crates/storage-api/src/vqueue_table/scheduler.rs index c5accc5ce4..8de90c586d 100644 --- a/crates/storage-api/src/vqueue_table/scheduler.rs +++ b/crates/storage-api/src/vqueue_table/scheduler.rs @@ -42,16 +42,21 @@ pub enum YieldReason { PartitionLeaderChange, /// The invocation exhausted its outbound memory budget. #[bilrost(tag(2), message)] - ExhaustedMemoryBudget { needed_memory: NonZeroByteCount }, + ExhaustedMemoryBudget { + #[bilrost(encoding(fixed))] + needed_memory: NonZeroByteCount, + }, /// The invocation has been yielded due to an error during execution. #[bilrost(tag(3), message)] TransientError { /// Controls the service retry policy related retries. This is the the /// value that should be used to initialize the retry policy after resuming. + #[bilrost(encoding(fixed))] retry_attempts: u32, /// For sdk-controlled retries. This defines the retry-count value that will be /// sent downstream to the SDK to be used for its ctx.run() retries on the next /// start message. + #[bilrost(encoding(fixed))] retry_count_since_last_stored_command: u32, }, /// The invocation has been yielded due to an error or cooperatively yielded diff --git a/crates/storage-api/src/vqueue_table/stats.rs b/crates/storage-api/src/vqueue_table/stats.rs index d46353f6ca..315495f136 100644 --- a/crates/storage-api/src/vqueue_table/stats.rs +++ b/crates/storage-api/src/vqueue_table/stats.rs @@ -42,28 +42,28 @@ pub struct WaitStats { #[derive(Debug, Clone, bilrost::Message)] pub struct EntryStatistics { /// Creation timestamp of the entry. - #[bilrost(tag(1))] + #[bilrost(tag(1), encoding(fixed))] pub created_at: UniqueTimestamp, /// Timestamp of the last stage transition. /// /// This is always initialized to `created_at` and updated on every stage move. - #[bilrost(tag(2))] + #[bilrost(tag(2), encoding(fixed))] pub transitioned_at: UniqueTimestamp, /// How many times did we move this entry to the run queue? /// '0` means that it's never been started. - #[bilrost(tag(3))] + #[bilrost(tag(3), encoding(fixed))] pub num_attempts: u32, - #[bilrost(tag(4))] + #[bilrost(tag(4), encoding(fixed))] pub num_paused: u32, - #[bilrost(tag(5))] + #[bilrost(tag(5), encoding(fixed))] pub num_suspensions: u32, - #[bilrost(tag(6))] + #[bilrost(tag(6), encoding(fixed))] pub num_yields: u32, /// Timestamp of the first attempt to run this entry - #[bilrost(tag(7))] + #[bilrost(tag(7), encoding(fixed))] pub first_attempt_at: Option, /// Timestamp of the last attempt to run this entry - #[bilrost(tag(8))] + #[bilrost(tag(8), encoding(fixed))] pub latest_attempt_at: Option, /// Earliest timestamp at which the first run can realistically start. /// @@ -72,7 +72,7 @@ pub struct EntryStatistics { /// /// We clamp to `created_at` when `original_run_at` is in the past to avoid /// inflating the first-attempt wait time. - #[bilrost(tag(9))] + #[bilrost(tag(9), encoding(fixed))] pub first_runnable_at: MillisSinceEpoch, } diff --git a/crates/storage-api/src/vqueue_table/tables.rs b/crates/storage-api/src/vqueue_table/tables.rs index 4360839192..ea23102cc9 100644 --- a/crates/storage-api/src/vqueue_table/tables.rs +++ b/crates/storage-api/src/vqueue_table/tables.rs @@ -63,6 +63,43 @@ impl Stage { } } +mod bilrost_encoding { + use bilrost::encoding::{DistinguishedProxiable, Proxiable}; + use bilrost::{Canonicity, DecodeErrorKind, Enumeration}; + + use super::Stage; + + impl Proxiable for Stage { + type Proxy = u32; + + fn encode_proxy(&self) -> Self::Proxy { + ::to_number(self) + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + *self = ::try_from_number(proxy).unwrap_or(Stage::Unknown); + Ok(()) + } + } + + impl DistinguishedProxiable for Stage { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + self.decode_proxy(proxy)?; + Ok(Canonicity::Canonical) + } + } + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (Stage) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); +} + pub trait WriteVQueueTable { /// Initializes a new vqueue fn create_vqueue(&mut self, qid: &VQueueId, meta: &VQueueMeta); diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index ce9ce60618..708ab85140 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -83,6 +83,45 @@ impl Default for LeaderEpoch { } } +mod bilrost_encoding { + use bilrost::encoding::{DistinguishedProxiable, Proxiable}; + use bilrost::{Canonicity, DecodeErrorKind}; + + use super::LeaderEpoch; + + struct FixedLeaderEpochTag; + + impl Proxiable for LeaderEpoch { + type Proxy = u64; + + fn encode_proxy(&self) -> Self::Proxy { + self.0 + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + self.0 = proxy; + Ok(()) + } + } + + impl DistinguishedProxiable for LeaderEpoch { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + self.decode_proxy(proxy)?; + Ok(Canonicity::Canonical) + } + } + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (LeaderEpoch) using proxy tag (FixedLeaderEpochTag) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); +} + impl From for LeaderEpoch { fn from(epoch: crate::protobuf::common::LeaderEpoch) -> Self { Self::from(epoch.value) @@ -1600,6 +1639,44 @@ mod tests { assert_eq!(a.to_string(), b.to_string()); } + #[test] + fn fixed_encoding_round_trips_leader_epoch() { + use bilrost::{Message, OwnedMessage}; + + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedLeaderEpoch { + #[bilrost(tag(1), encoding(fixed))] + value: LeaderEpoch, + } + + let value = EncodedLeaderEpoch { + value: LeaderEpoch::from(42), + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.len(), 9); + assert_eq!(EncodedLeaderEpoch::decode(encoded).unwrap(), value); + } + + #[test] + fn general_encoding_keeps_leader_epoch_varint() { + use bilrost::{Message, OwnedMessage}; + + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedLeaderEpoch { + #[bilrost(tag(1))] + value: LeaderEpoch, + } + + let value = EncodedLeaderEpoch { + value: LeaderEpoch::from(42), + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.as_ref(), &[0x04, 42]); + assert_eq!(EncodedLeaderEpoch::decode(encoded).unwrap(), value); + } + #[test] fn deployment_id_from_str() { let deployment_id = "dp_11nGQpCRmau6ypL82KH2TnP"; diff --git a/crates/types/src/vqueues/entry_id.rs b/crates/types/src/vqueues/entry_id.rs index e54226c049..e6b3232384 100644 --- a/crates/types/src/vqueues/entry_id.rs +++ b/crates/types/src/vqueues/entry_id.rs @@ -36,11 +36,49 @@ impl EntryKind { } } +mod bilrost_encoding { + use bilrost::encoding::{DistinguishedProxiable, Proxiable}; + use bilrost::{Canonicity, DecodeErrorKind, Enumeration}; + + use super::EntryKind; + + impl Proxiable for EntryKind { + type Proxy = u32; + + fn encode_proxy(&self) -> Self::Proxy { + ::to_number(self) + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + *self = + ::try_from_number(proxy).unwrap_or(EntryKind::Unknown); + Ok(()) + } + } + + impl DistinguishedProxiable for EntryKind { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + self.decode_proxy(proxy)?; + Ok(Canonicity::Canonical) + } + } + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (EntryKind) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); +} + #[derive( derive_more::Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash, bilrost::Message, )] pub struct EntryId { - #[bilrost(tag(1))] + #[bilrost(tag(1), encoding(fixed))] kind: EntryKind, // The remainder of the original resource identifier but without the partition-key prefix. // to reconstruct the original resource, you'll need to supply the partition_key. @@ -200,3 +238,36 @@ impl std::fmt::Display for EntryIdDisplay<'_> { } } } + +#[cfg(test)] +mod tests { + use bilrost::{Message, OwnedMessage}; + + use super::*; + + #[test] + fn fixed_encoding_round_trips_entry_kind() { + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedEntryKind { + #[bilrost(tag(1), encoding(fixed))] + kind: EntryKind, + } + + let value = EncodedEntryKind { + kind: EntryKind::Invocation, + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.as_ref(), &[0x06, 1, 0, 0, 0]); + assert_eq!(EncodedEntryKind::decode(encoded).unwrap(), value); + } + + #[test] + fn entry_id_bilrost_round_trips_with_fixed_kind() { + let value = EntryId::new(EntryKind::Invocation, [1; EntryId::REMAINDER_LEN]); + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.len(), 23); + assert_eq!(EntryId::decode(encoded).unwrap(), value); + } +} diff --git a/crates/types/src/vqueues/seq.rs b/crates/types/src/vqueues/seq.rs index d7e6679f66..f57be28d9e 100644 --- a/crates/types/src/vqueues/seq.rs +++ b/crates/types/src/vqueues/seq.rs @@ -123,6 +123,13 @@ mod bilrost_encoding { to encode proxied type (Seq) with general encodings including distinguished ); + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (Seq) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); } #[cfg(test)] @@ -135,4 +142,21 @@ mod tests { assert_eq!(Seq::new(1u64 << 56).as_u64(), 0); assert_eq!(Seq::new((1u64 << 56) + 7).as_u64(), 7); } + + #[test] + fn fixed_encoding_round_trips() { + use bilrost::{Message, OwnedMessage}; + + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedSeq { + #[bilrost(tag(1), encoding(fixed))] + value: Seq, + } + + let value = EncodedSeq { value: Seq::MAX }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.len(), 9); + assert_eq!(EncodedSeq::decode(encoded).unwrap(), value); + } } diff --git a/crates/wal-protocol/src/v2.rs b/crates/wal-protocol/src/v2.rs index ae8c749814..039df5a211 100644 --- a/crates/wal-protocol/src/v2.rs +++ b/crates/wal-protocol/src/v2.rs @@ -37,13 +37,13 @@ mod sealed { /// and serialization details required to interpret the payload. #[derive(Debug, Clone, bilrost::Message)] pub struct Header { - #[bilrost(1)] + #[bilrost(tag(1))] dedup: Dedup, /// Payload record kind - #[bilrost(2)] + #[bilrost(tag(2), encoding(fixed))] kind: CommandKind, /// Payload codec - #[bilrost(3)] + #[bilrost(tag(3), encoding(fixed))] codec: Option, } @@ -311,6 +311,46 @@ pub enum CommandKind { VQueuesResume = 24, } +mod bilrost_encoding { + use bilrost::encoding::{DistinguishedProxiable, Proxiable}; + use bilrost::{Canonicity, DecodeErrorKind, Enumeration}; + + use super::CommandKind; + + struct FixedCommandKindTag; + + impl Proxiable for CommandKind { + type Proxy = u32; + + fn encode_proxy(&self) -> Self::Proxy { + ::to_number(self) + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + *self = ::try_from_number(proxy) + .unwrap_or(CommandKind::Unknown); + Ok(()) + } + } + + impl DistinguishedProxiable for CommandKind { + fn decode_proxy_distinguished( + &mut self, + proxy: Self::Proxy, + ) -> Result { + self.decode_proxy(proxy)?; + Ok(Canonicity::Canonical) + } + } + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (CommandKind) using proxy tag (FixedCommandKindTag) + with encoding (bilrost::encoding::Fixed) + including distinguished + ); +} + /// Specifies the deduplication strategy that allows receivers to discard /// duplicate WAL records safely. #[derive(Debug, Clone, PartialEq, Eq, Default, bilrost::Oneof, bilrost::Message)] @@ -322,17 +362,17 @@ pub enum Dedup { /// predecessor). #[bilrost(tag(1), message)] SelfProposal { - #[bilrost(0)] + #[bilrost(tag(0), encoding(fixed))] leader_epoch: LeaderEpoch, - #[bilrost(1)] + #[bilrost(tag(1), encoding(fixed))] seq: u64, }, /// Sequence number to deduplicate messages from a foreign partition. #[bilrost(tag(2), message)] ForeignPartition { - #[bilrost(0)] + #[bilrost(tag(0), encoding(fixed))] partition: PartitionId, - #[bilrost(1)] + #[bilrost(tag(1), encoding(fixed))] seq: u64, }, /// Sequence number to deduplicate messages from an arbitrary string prefix. @@ -340,11 +380,11 @@ pub enum Dedup { Arbitrary { // For backward compatibility with ProducerID::Other variant // Drop in Restate v1.8 - #[bilrost(0)] + #[bilrost(tag(0))] prefix: Option, - #[bilrost(1)] + #[bilrost(tag(1), encoding(fixed))] producer_id: U128, - #[bilrost(2)] + #[bilrost(tag(2), encoding(fixed))] seq: u64, }, } @@ -413,22 +453,86 @@ where #[cfg(test)] mod test { + use bilrost::{Message, OwnedMessage}; use bytes::BytesMut; + use restate_encoding::U128; use restate_types::{ GenerationalNodeId, logs::{BodyWithKeys, Keys}, sharding::KeyRange, - storage::StorageCodec, + storage::{StorageCodec, StorageCodecKind}, time::MillisSinceEpoch, }; use super::Dedup; use crate::{ control::{AnnounceLeaderCommand, UpdatePartitionDurabilityCommand}, - v2::{Command, CommandKind, CommandWithKeys, Envelope, Raw}, + v2::{Command, CommandKind, CommandWithKeys, Envelope, Header, Raw}, }; + #[test] + fn header_fixed_fields_round_trip() { + let header = Header { + dedup: Dedup::None, + kind: CommandKind::AnnounceLeader, + codec: Some(StorageCodecKind::Custom), + }; + let encoded = header.encode_to_bytes(); + + assert_eq!(encoded.as_ref(), &[0x0a, 1, 0, 0, 0, 0x06, 7, 0, 0, 0]); + + let decoded = Header::decode(encoded).unwrap(); + assert_eq!(decoded.dedup, header.dedup); + assert_eq!(decoded.kind, header.kind); + assert_eq!(decoded.codec, header.codec); + } + + #[test] + fn dedup_fixed_fields_round_trip() { + let self_proposal = Dedup::SelfProposal { + leader_epoch: 10.into(), + seq: 120, + }; + let encoded = self_proposal.encode_to_bytes(); + assert_eq!(encoded.len(), 20); + assert_eq!(Dedup::decode(encoded).unwrap(), self_proposal); + + let foreign_partition = Dedup::ForeignPartition { + partition: 10.into(), + seq: 120, + }; + let encoded = foreign_partition.encode_to_bytes(); + assert_eq!(encoded.len(), 16); + assert_eq!(Dedup::decode(encoded).unwrap(), foreign_partition); + + let arbitrary = Dedup::Arbitrary { + prefix: None, + producer_id: U128::from(0x1234_5678_90ab_cdef_0123_4567_89ab_cdef), + seq: 120, + }; + let encoded = arbitrary.encode_to_bytes(); + assert_eq!(encoded.len(), 29); + assert_eq!(Dedup::decode(encoded).unwrap(), arbitrary); + } + + #[test] + fn general_encoding_keeps_command_kind_varint() { + #[derive(Debug, PartialEq, bilrost::Message)] + struct EncodedCommandKind { + #[bilrost(tag(1))] + kind: CommandKind, + } + + let value = EncodedCommandKind { + kind: CommandKind::Invoke, + }; + let encoded = value.encode_to_bytes(); + + assert_eq!(encoded.as_ref(), &[0x04, 8]); + assert_eq!(EncodedCommandKind::decode(encoded).unwrap(), value); + } + #[test] fn envelope_encode_decode() { let payload = AnnounceLeaderCommand { diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 666e8a3e17..0e9a33b42a 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -3287,40 +3287,6 @@ impl StateMachineApplyContext<'_, S> { match status { InvocationStatus::Scheduled(ScheduledInvocation { metadata, .. }) | InvocationStatus::Inboxed(InboxedInvocation { metadata, .. }) => { - // Validate that if VO, that it's not locked already. - let invocation_target = &metadata.invocation_target; - if matches!( - invocation_target.invocation_target_ty(), - InvocationTargetType::VirtualObject(VirtualObjectHandlerType::Exclusive) - ) { - // todo(asoli): Remove this once we have confidence in the new locking mechanism - let keyed_service_id = invocation_target.as_keyed_service_id().expect( - "When the handler type is Exclusive, the invocation target must have a key", - ); - match self - .storage - .get_virtual_object_status(&keyed_service_id) - .await? - { - VirtualObjectStatus::Locked(iid) => { - panic!( - "invariant violated trying to run an invocation {invocation_id} on a VO while another invocation {iid} is holding the lock" - ); - } - VirtualObjectStatus::Unlocked => { - // Lock the service - // Obsolete: Remove in lieu of using Locks. - // maintained for compatibility until full migration to locks. - self.storage - .put_virtual_object_status( - &keyed_service_id, - &VirtualObjectStatus::Locked(invocation_id), - ) - .map_err(Error::Storage)?; - } - } - } - let (metadata, invocation_input) = InFlightInvocationMetadata::from_pre_flight_invocation_metadata( metadata, diff --git a/util/bytecount/src/bilrost_encoding.rs b/util/bytecount/src/bilrost_encoding.rs index 7821c2d893..cb500160c3 100644 --- a/util/bytecount/src/bilrost_encoding.rs +++ b/util/bytecount/src/bilrost_encoding.rs @@ -45,6 +45,13 @@ bilrost::delegate_proxied_encoding!( with general encodings including distinguished ); +bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (ByteCount) + with encoding (bilrost::encoding::Fixed) + including distinguished +); + impl Proxiable for ByteCount { type Proxy = u64; @@ -83,3 +90,42 @@ bilrost::delegate_proxied_encoding!( to encode proxied type (ByteCount) with general encodings including distinguished ); + +bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::Fixed) + to encode proxied type (ByteCount) + with encoding (bilrost::encoding::Fixed) + including distinguished +); + +#[cfg(test)] +mod tests { + use std::num::NonZeroU64; + + use bilrost::{Message, OwnedMessage}; + + use crate::{ByteCount, NonZeroByteCount}; + + #[derive(Debug, PartialEq, bilrost::Message)] + struct FixedByteCountMessage { + #[bilrost(tag(1), encoding(fixed))] + bytes: ByteCount, + #[bilrost(tag(2), encoding(fixed))] + non_zero_bytes: Option, + } + + #[test] + fn fixed_encoding_round_trips_byte_counts() { + let message = FixedByteCountMessage { + bytes: ByteCount::from(42_u64), + non_zero_bytes: Some(NonZeroByteCount::from(NonZeroU64::new(128).unwrap())), + }; + + let encoded = message.encode_to_vec(); + assert_eq!(encoded.len(), 18); + assert_eq!( + FixedByteCountMessage::decode(encoded.as_slice()), + Ok(message) + ); + } +}