From ebd49dc46bcfb76af1ad3373f5f324c08aca2971 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 22 Apr 2026 13:16:34 +0100 Subject: [PATCH 1/2] Introduce restate-platform foundation crate Part of the effort to decompose the restate-types monolith into focused, composable utility crates. Add `restate-platform` as a low-dependency foundation crate that provides key type aliases, traits, and essential types the rest of the project can reliably depend upon. It sits at the bottom of the dependency graph with minimal transitive dependencies. Moves the following into restate-platform: - Error traits (GenericError, CodedError) from restate-types - Storage traits (StorageEncode, StorageDecode) from restate-types - Network marker trait (NetSerde) from restate-encoding - Memory estimation trait (EstimatedMemorySize) from restate-memory - Hash re-exports (HashMap, HashSet) for consistent hashing - Sync utilities (OwnedSemaphorePermit) --- Cargo.lock | 29 ++- Cargo.toml | 1 + crates/clock/Cargo.toml | 3 +- crates/clock/src/rough_ts.rs | 4 +- crates/clock/src/time.rs | 6 +- crates/core/Cargo.toml | 1 + crates/core/src/network/incoming.rs | 2 +- crates/core/src/network/message_router.rs | 3 +- crates/encoding/Cargo.toml | 5 +- crates/encoding/derive/src/net.rs | 4 +- crates/encoding/src/common.rs | 3 +- crates/encoding/src/lib.rs | 132 ----------- crates/encoding/tests/net_serde.rs | 10 +- crates/log-server/Cargo.toml | 1 + crates/log-server/src/loglet_worker.rs | 2 +- crates/memory/Cargo.toml | 3 +- crates/memory/src/controller.rs | 4 +- crates/memory/src/lib.rs | 5 - .../src/invocation_status_table/mod.rs | 7 +- crates/platform/Cargo.toml | 22 ++ crates/platform/README.md | 26 +++ crates/platform/src/errors.rs | 84 +++++++ crates/platform/src/hash.rs | 12 + crates/platform/src/lib.rs | 26 +++ .../footprint.rs => platform/src/memory.rs} | 0 crates/platform/src/network.rs | 98 ++++++++ crates/platform/src/storage.rs | 209 ++++++++++++++++++ crates/platform/src/sync.rs | 12 + crates/sharding/Cargo.toml | 2 + crates/sharding/src/key_range.rs | 4 + crates/sharding/src/partition_id.rs | 4 + crates/types/Cargo.toml | 3 +- crates/types/src/errors.rs | 80 +------ crates/types/src/logs/record.rs | 2 +- crates/types/src/net/log_server.rs | 2 +- crates/types/src/storage.rs | 201 +---------------- 36 files changed, 573 insertions(+), 439 deletions(-) create mode 100644 crates/platform/Cargo.toml create mode 100644 crates/platform/README.md create mode 100644 crates/platform/src/errors.rs create mode 100644 crates/platform/src/hash.rs create mode 100644 crates/platform/src/lib.rs rename crates/{memory/src/footprint.rs => platform/src/memory.rs} (100%) create mode 100644 crates/platform/src/network.rs create mode 100644 crates/platform/src/storage.rs create mode 100644 crates/platform/src/sync.rs diff --git a/Cargo.lock b/Cargo.lock index f136631bbe..cde7355a40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7179,7 +7179,7 @@ dependencies = [ "prost-types", "rand 0.9.4", "restate-clock", - "restate-encoding", + "restate-platform", "restate-workspace-hack", "schemars 1.2.1", "serde", @@ -7240,6 +7240,7 @@ dependencies = [ "restate-futures-util", "restate-memory", "restate-metadata-store", + "restate-platform", "restate-test-util", "restate-time-util", "restate-types", @@ -7316,12 +7317,10 @@ version = "1.6.3-dev" dependencies = [ "bilrost", "bytes", - "bytestring", "rand 0.9.4", "restate-encoding-derive", - "restate-sharding", + "restate-platform", "restate-workspace-hack", - "static_assertions", ] [[package]] @@ -7645,6 +7644,7 @@ dependencies = [ "restate-futures-util", "restate-memory", "restate-metadata-store", + "restate-platform", "restate-rocksdb", "restate-serde-util", "restate-time-util", @@ -7682,10 +7682,9 @@ version = "1.6.3-dev" dependencies = [ "bytes", "futures", - "hashbrown 0.16.1", "metrics", - "parking_lot", "pin-project-lite", + "restate-platform", "restate-serde-util", "restate-workspace-hack", "thiserror 2.0.18", @@ -7951,6 +7950,22 @@ dependencies = [ "url", ] +[[package]] +name = "restate-platform" +version = "1.6.3-dev" +dependencies = [ + "bilrost", + "bytes", + "bytestring", + "derive_more", + "downcast-rs", + "hashbrown 0.16.1", + "parking_lot", + "restate-util-string", + "restate-workspace-hack", + "thiserror 2.0.18", +] + [[package]] name = "restate-queue" version = "1.6.3-dev" @@ -8174,6 +8189,7 @@ dependencies = [ "bilrost", "derive_more", "flexbuffers", + "restate-platform", "restate-sharding", "restate-workspace-hack", "serde", @@ -8391,6 +8407,7 @@ dependencies = [ "restate-encoding", "restate-errors", "restate-memory", + "restate-platform", "restate-serde-util", "restate-sharding", "restate-test-util", diff --git a/Cargo.toml b/Cargo.toml index c81f1d0835..0cb4ec11b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ restate-metadata-store = { path = "crates/metadata-store" } restate-node = { path = "crates/node" } restate-object-store-util = { path = "crates/object-store-util" } restate-partition-store = { path = "crates/partition-store" } +restate-platform = { path = "crates/platform" } restate-queue = { path = "crates/queue" } restate-rocksdb = { path = "crates/rocksdb" } restate-serde-util = { path = "crates/serde-util" } diff --git a/crates/clock/Cargo.toml b/crates/clock/Cargo.toml index 38a93f9c64..90e852d84e 100644 --- a/crates/clock/Cargo.toml +++ b/crates/clock/Cargo.toml @@ -14,7 +14,8 @@ test-util = [] [dependencies] restate-workspace-hack = { workspace = true } -restate-encoding = { workspace = true } + +restate-platform = { workspace = true } bilrost = { workspace = true } jiff = { workspace = true, optional = true } diff --git a/crates/clock/src/rough_ts.rs b/crates/clock/src/rough_ts.rs index 881c9f9bea..40f9a9d9d5 100644 --- a/crates/clock/src/rough_ts.rs +++ b/crates/clock/src/rough_ts.rs @@ -14,6 +14,8 @@ use std::num::NonZeroU32; use std::ops::{Add, Sub}; use std::time::Duration; +use restate_platform::network::NetSerde; + use crate::WallClock; use crate::time::MillisSinceEpoch; use crate::unique_timestamp::UniqueTimestamp; @@ -45,7 +47,7 @@ impl fmt::Debug for RoughTimestamp { } } -impl restate_encoding::NetSerde for RoughTimestamp {} +impl NetSerde for RoughTimestamp {} const _: () = { assert!( diff --git a/crates/clock/src/time.rs b/crates/clock/src/time.rs index c133ce754f..0b9458dc29 100644 --- a/crates/clock/src/time.rs +++ b/crates/clock/src/time.rs @@ -17,6 +17,8 @@ use std::time::{Duration, SystemTime}; // Note: BilrostNewType and NetSerde derives are not used since both MillisSinceEpoch // and NanosSinceEpoch have custom implementations for niche optimization. +use restate_platform::network::NetSerde; + use crate::WallClock; /// Milliseconds since the unix epoch. @@ -46,7 +48,7 @@ impl fmt::Debug for MillisSinceEpoch { } } -impl restate_encoding::NetSerde for MillisSinceEpoch {} +impl NetSerde for MillisSinceEpoch {} // Static assertions to ensure that MillisSinceEpoch is the same size as u64 // and that niche optimization works. @@ -376,7 +378,7 @@ impl fmt::Debug for NanosSinceEpoch { } } -impl restate_encoding::NetSerde for NanosSinceEpoch {} +impl NetSerde for NanosSinceEpoch {} // Static assertions to ensure that NanosSinceEpoch is the same size as u64 // and that niche optimization works. diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ec387e1e58..cfdb76c46c 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -29,6 +29,7 @@ restate-core-derive = { workspace = true, optional = true } restate-futures-util = { workspace = true } restate-memory = { workspace = true } restate-metadata-store = { workspace = true } +restate-platform = { workspace = true } restate-time-util = { workspace = true } restate-types = { workspace = true } diff --git a/crates/core/src/network/incoming.rs b/crates/core/src/network/incoming.rs index 3915c7eebb..594c3e436f 100644 --- a/crates/core/src/network/incoming.rs +++ b/crates/core/src/network/incoming.rs @@ -13,7 +13,7 @@ use std::marker::PhantomData; use bytes::Bytes; use tokio::sync::{oneshot, watch}; -use restate_memory::EstimatedMemorySize; +use restate_platform::memory::EstimatedMemorySize; pub use restate_memory::MemoryLease; use restate_types::GenerationalNodeId; diff --git a/crates/core/src/network/message_router.rs b/crates/core/src/network/message_router.rs index 68d195bf59..17cfc892fd 100644 --- a/crates/core/src/network/message_router.rs +++ b/crates/core/src/network/message_router.rs @@ -30,7 +30,8 @@ use tokio::sync::{mpsc, oneshot, watch}; use tokio_stream::StreamExt; use tracing::{debug, instrument, trace, warn}; -use restate_memory::{EstimatedMemorySize, MemoryLease, MemoryPool}; +use restate_memory::{MemoryLease, MemoryPool}; +use restate_platform::memory::EstimatedMemorySize; use restate_types::SharedString; use restate_types::net::{Service, ServiceTag}; diff --git a/crates/encoding/Cargo.toml b/crates/encoding/Cargo.toml index 2000b3a479..fd63c303ad 100644 --- a/crates/encoding/Cargo.toml +++ b/crates/encoding/Cargo.toml @@ -11,13 +11,10 @@ publish = false restate-workspace-hack = { workspace = true } restate-encoding-derive = { version = "0.1.0", path = "derive" } -restate-sharding = { workspace = true } +restate-platform = { workspace = true } bilrost = { workspace = true } bytes = { workspace = true } -bytestring = { workspace = true } [dev-dependencies] rand = { workspace = true } -restate-sharding = { workspace = true, features = ["bilrost"] } -static_assertions = { workspace = true } \ No newline at end of file diff --git a/crates/encoding/derive/src/net.rs b/crates/encoding/derive/src/net.rs index 91bf6ebb94..e2ea742fd1 100644 --- a/crates/encoding/derive/src/net.rs +++ b/crates/encoding/derive/src/net.rs @@ -44,12 +44,12 @@ pub fn net_serde_inner(input: DeriveInput) -> Result { let where_clauses = field_types.iter().map(|ty| { quote! { - #ty: NetSerde + #ty: ::restate_platform::network::NetSerde } }); let expanded = quote! { - impl ::restate_encoding::NetSerde for #name where #(#where_clauses),* {} + impl ::restate_platform::network::NetSerde for #name where #(#where_clauses),* {} }; Ok(TokenStream::from(expanded)) diff --git a/crates/encoding/src/common.rs b/crates/encoding/src/common.rs index e75535f715..bccd2f9e4e 100644 --- a/crates/encoding/src/common.rs +++ b/crates/encoding/src/common.rs @@ -13,8 +13,9 @@ use bilrost::{ encoding::{EmptyState, ForOverwrite, Proxiable}, }; use restate_encoding_derive::BilrostNewType; +use restate_platform::network::NetSerde; -use crate::{NetSerde, bilrost_encodings::RestateEncoding}; +use crate::bilrost_encodings::RestateEncoding; struct U128Tag; diff --git a/crates/encoding/src/lib.rs b/crates/encoding/src/lib.rs index 988db1e63c..ccbc99f083 100644 --- a/crates/encoding/src/lib.rs +++ b/crates/encoding/src/lib.rs @@ -12,97 +12,10 @@ mod bilrost_as; pub mod bilrost_encodings; mod common; -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::collections::HashSet; -use std::ops::RangeInclusive; -use std::sync::Arc; - pub use bilrost_as::BilrostAsAdaptor; pub use bilrost_encodings::{Arced, ArcedSlice, RestateEncoding}; pub use common::U128; pub use restate_encoding_derive::{BilrostAs, BilrostNewType, NetSerde}; -/// A marker trait for types that can be serialized and sent over the network. -/// -/// Types implementing this trait are considered eligible for wire transmission, -/// typically via serialization. It is intended to be implemented automatically -/// using the `#[derive(NetSerde)]` macro. -/// -/// # Example -/// ```ignore -/// #[derive(NetSerde)] -/// struct MyMessage { -/// a: u64, -/// b: String, -/// } -/// ``` -pub trait NetSerde {} - -macro_rules! impl_net_serde { - ($t:ty) => { - impl NetSerde for $t {} - }; - ($($t:ty),+) => { - $(impl_net_serde!($t);)+ - } -} - -impl_net_serde!( - bool, - usize, - u8, - u16, - u32, - u64, - u128, - isize, - i8, - i16, - i32, - i64, - i128, - String, - bytes::Bytes, - bytestring::ByteString, - std::time::Duration -); - -macro_rules! impl_net_serde_tuple { - ($($t:ident),+) => { - impl<$($t),+> NetSerde for ($($t),+) where $($t: NetSerde),+ {} - }; -} - -impl_net_serde_tuple!(T0, T1); -impl_net_serde_tuple!(T0, T1, T2); -impl_net_serde_tuple!(T0, T1, T2, T3); -impl_net_serde_tuple!(T0, T1, T2, T3, T4); -impl_net_serde_tuple!(T0, T1, T2, T3, T4, T5); -impl_net_serde_tuple!(T0, T1, T2, T3, T4, T5, T6); - -impl NetSerde for Vec where T: NetSerde {} -impl NetSerde for Option where T: NetSerde {} -impl NetSerde for HashMap -where - K: NetSerde, - V: NetSerde, -{ -} - -impl NetSerde for BTreeMap -where - K: NetSerde, - V: NetSerde, -{ -} - -impl NetSerde for HashSet where V: NetSerde {} -impl NetSerde for RangeInclusive where Idx: NetSerde {} -impl NetSerde for restate_sharding::PartitionId {} -impl NetSerde for Arc where T: NetSerde {} -impl NetSerde for Arc<[T]> where T: NetSerde {} -impl NetSerde for Box where T: NetSerde {} -impl NetSerde for [T; N] where T: NetSerde {} #[cfg(test)] mod test { @@ -132,49 +45,4 @@ mod test { assert_eq!(x.id.0, y.id); } - - /// Validates that `KeyRange`'s general bilrost encoding produces the same - /// wire format as `RangeInclusive` with `RestateEncoding`. This ensures - /// that `KeyRange` fields can use `#[bilrost(N)]` and remain wire-compatible - /// with the old `#[bilrost(tag(N), encoding(RestateEncoding))] RangeInclusive`. - #[test] - fn key_range_wire_compat_with_range_inclusive() { - use restate_sharding::KeyRange; - - use super::RestateEncoding; - - #[derive(Debug, PartialEq, bilrost::Message)] - struct WithKeyRange { - #[bilrost(1)] - range: KeyRange, - } - - #[derive(Debug, PartialEq, bilrost::Message)] - struct WithRangeInclusive { - #[bilrost(tag(1), encoding(RestateEncoding))] - range: std::ops::RangeInclusive, - } - - for (start, end) in [(0u64, 0u64), (1, 100), (0, u64::MAX), (42, 42)] { - let kr = KeyRange::new(start, end); - let ri = start..=end; - - let kr_bytes = WithKeyRange { range: kr }.encode_to_vec(); - let ri_bytes = WithRangeInclusive { range: ri.clone() }.encode_to_vec(); - - assert_eq!( - kr_bytes, ri_bytes, - "wire format mismatch for range ({start}, {end})" - ); - - // Cross-decode - let decoded: WithRangeInclusive = - WithRangeInclusive::decode(&*kr_bytes).expect("cross-decode KeyRange→RI"); - assert_eq!(decoded.range, ri); - - let decoded: WithKeyRange = - WithKeyRange::decode(&*ri_bytes).expect("cross-decode RI→KeyRange"); - assert_eq!(decoded.range, kr); - } - } } diff --git a/crates/encoding/tests/net_serde.rs b/crates/encoding/tests/net_serde.rs index b9eca8494a..271672fce0 100644 --- a/crates/encoding/tests/net_serde.rs +++ b/crates/encoding/tests/net_serde.rs @@ -8,10 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::HashMap; - use restate_encoding::NetSerde; -use static_assertions::assert_impl_all; +use restate_platform::hash::HashMap; +use restate_platform::network::NetSerde; #[allow(dead_code)] #[derive(NetSerde)] @@ -30,4 +29,7 @@ struct NotSendable; #[derive(NetSerde)] struct Inner(HashMap); -assert_impl_all!(SomeMessage: NetSerde); +const _: fn() = || { + fn assert_impl_all() {} + assert_impl_all::(); +}; diff --git a/crates/log-server/Cargo.toml b/crates/log-server/Cargo.toml index b962518b74..7cef4107a9 100644 --- a/crates/log-server/Cargo.toml +++ b/crates/log-server/Cargo.toml @@ -24,6 +24,7 @@ restate-core = { workspace = true } restate-futures-util = { workspace = true } restate-memory = { workspace = true } restate-metadata-store = { workspace = true } +restate-platform = { workspace = true } restate-rocksdb = { workspace = true } restate-serde-util = { workspace = true } restate-time-util = { workspace = true } diff --git a/crates/log-server/src/loglet_worker.rs b/crates/log-server/src/loglet_worker.rs index 386c876569..b048041f5b 100644 --- a/crates/log-server/src/loglet_worker.rs +++ b/crates/log-server/src/loglet_worker.rs @@ -24,7 +24,7 @@ use restate_core::network::{ use restate_core::task_center::TaskGuard; use restate_core::{ShutdownError, TaskCenter, TaskKind, cancellation_token}; use restate_futures_util::waiter_queue::WaiterQueue; -use restate_memory::EstimatedMemorySize; +use restate_platform::memory::EstimatedMemorySize; use restate_serde_util::ByteCount; use restate_types::GenerationalNodeId; use restate_types::logs::{LogletId, LogletOffset, SequenceNumber, TailState}; diff --git a/crates/memory/Cargo.toml b/crates/memory/Cargo.toml index c5f3f0a0c2..324778869e 100644 --- a/crates/memory/Cargo.toml +++ b/crates/memory/Cargo.toml @@ -13,13 +13,12 @@ default = [] [dependencies] restate-workspace-hack = { workspace = true } +restate-platform = { workspace = true } restate-serde-util = { workspace = true } bytes = { workspace = true } futures = { workspace = true } -hashbrown = { workspace = true } metrics = { workspace = true } -parking_lot = { workspace = true } pin-project-lite = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["sync"] } diff --git a/crates/memory/src/controller.rs b/crates/memory/src/controller.rs index 529b6e8bdc..52aac96bc3 100644 --- a/crates/memory/src/controller.rs +++ b/crates/memory/src/controller.rs @@ -8,11 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use hashbrown::HashMap; use metrics::{Gauge, gauge}; -use parking_lot::RwLock; use tracing::warn; +use restate_platform::hash::HashMap; +use restate_platform::sync::RwLock; use restate_serde_util::NonZeroByteCount; use crate::MemoryPool; diff --git a/crates/memory/src/lib.rs b/crates/memory/src/lib.rs index e7bd0e1222..df19d7ddc2 100644 --- a/crates/memory/src/lib.rs +++ b/crates/memory/src/lib.rs @@ -13,11 +13,8 @@ //! This crate provides: //! - [`MemoryPool`]: A named memory budget for bounding memory usage //! - [`MemoryLease`]: RAII guard for memory leases that can be passed through channels -//! - [`EstimatedMemorySize`]: Trait for types that can estimate their memory -//! footprint mod controller; -mod footprint; pub mod local_pool; mod metric_definitions; mod pinned_memory_stream; @@ -30,5 +27,3 @@ pub use local_pool::{ pub use pinned_memory_stream::{IgnorePinnableMemoryStream, PinnableMapErr, PinnableMemoryStream}; pub use pool::{MemoryLease, MemoryPool, PollMemoryPool}; pub use restate_serde_util::{ByteCount, NonZeroByteCount}; - -pub use footprint::*; diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index 58f6d4c09e..d4189d3860 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -11,7 +11,6 @@ use std::ops::ControlFlow; use futures::Stream; -use restate_storage_api::protobuf_types::v1::lazy::InvocationStatusV2Lazy; use restate_rocksdb::{Priority, RocksDbPerfGuard}; use restate_storage_api::invocation_status_table::{ @@ -20,8 +19,10 @@ use restate_storage_api::invocation_status_table::{ WriteInvocationStatusTable, }; use restate_storage_api::protobuf_types::PartitionStoreProtobufValue; +use restate_storage_api::protobuf_types::v1::lazy::InvocationStatusV2Lazy; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey}; +use restate_util_string::format_restring; use crate::TableScan::FullScanPartitionKeyRange; use crate::keys::{DecodeTableKey, KeyKind, define_table_key}; @@ -168,7 +169,7 @@ impl ScanInvocationStatusTable for PartitionStore { break_on_err(InvocationStatusKey::deserialize_from(&mut key))?; if value.len() < std::mem::size_of::() { - return ControlFlow::Break(Err(StorageError::Conversion(restate_types::storage::StorageDecodeError::ReadingCodec(format!( + return ControlFlow::Break(Err(StorageError::Conversion(restate_types::storage::StorageDecodeError::ReadingCodec(format_restring!( "remaining bytes in buf '{}' < version bytes '{}'", value.len(), std::mem::size_of::() @@ -228,7 +229,7 @@ impl ScanInvocationStatusTable for PartitionStore { let status_key = InvocationStatusKey::deserialize_from(&mut key)?; if value.len() < std::mem::size_of::() { - return Err(StorageError::Conversion(restate_types::storage::StorageDecodeError::ReadingCodec(format!( + return Err(StorageError::Conversion(restate_types::storage::StorageDecodeError::ReadingCodec(format_restring!( "remaining bytes in buf '{}' < version bytes '{}'", value.len(), std::mem::size_of::() diff --git a/crates/platform/Cargo.toml b/crates/platform/Cargo.toml new file mode 100644 index 0000000000..e77d573ac4 --- /dev/null +++ b/crates/platform/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "restate-platform" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +publish = false + +[dependencies] +restate-workspace-hack = { workspace = true } + +restate-util-string = { workspace = true } + +bilrost = { workspace = true, optional = true } +bytes = { workspace = true } +bytestring = { workspace = true, optional = true } +derive_more = { workspace = true, features = ["deref", "from", "display"] } +downcast-rs = { workspace = true } +hashbrown = { workspace = true } +parking_lot = { workspace = true } +thiserror = { workspace = true } diff --git a/crates/platform/README.md b/crates/platform/README.md new file mode 100644 index 0000000000..47e036cb97 --- /dev/null +++ b/crates/platform/README.md @@ -0,0 +1,26 @@ +# restate-platform + +Foundation crate for the Restate project. Provides key type aliases, traits, and +essential types that the rest of the codebase can safely and reliably depend upon. + +This crate sits at the bottom of the dependency graph and has minimal +dependencies of its own, making it suitable for use by any crate in the +workspace without pulling in heavy transitive dependencies. + +## What belongs here + +- Common traits (error handling, networking, memory estimation) +- Lightweight type aliases and marker types +- Re-exports that unify scattered definitions under a single import path + +## Prelude + +`restate_platform::prelude::*` provides a curated set of the most commonly used +types and traits, intended as a single convenient import for crates across the +project. + +## What does NOT belong here + +- Concrete implementations or business logic +- Types with heavy dependencies (serialization frameworks, async runtimes, storage engines) +- Anything that would cause circular dependencies diff --git a/crates/platform/src/errors.rs b/crates/platform/src/errors.rs new file mode 100644 index 0000000000..e8c7d683c1 --- /dev/null +++ b/crates/platform/src/errors.rs @@ -0,0 +1,84 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +/// Error type which abstracts away the actual [`std::error::Error`] type. Use this type +/// if you don't know the actual error type or if it is not important. +pub type GenericError = Box; + +pub type BoxedMaybeRetryableError = Box; + +/// Tells whether an error should be retried by upper layers or not. +pub trait MaybeRetryableError: std::error::Error + 'static { + /// Signal upper layers whether this error should be retried or not. + fn retryable(&self) -> bool { + false + } +} + +pub trait IntoMaybeRetryable: Sized { + /// Marks the error marked as retryable + fn into_retryable(self) -> RetryableError { + RetryableError(self) + } + + /// Marks the error marked as non-retryable + fn into_terminal(self) -> TerminalError { + TerminalError(self) + } +} + +impl IntoMaybeRetryable for T where + T: std::fmt::Debug + std::fmt::Display + Send + Sync + std::error::Error + 'static +{ +} + +/// Wraps any source error and marks it as retryable +#[derive(Debug, thiserror::Error, derive_more::Deref, derive_more::From)] +pub struct RetryableError(#[source] T); + +/// Wraps any source error and marks it as non-retryable +#[derive(Debug, thiserror::Error, derive_more::Deref, derive_more::From)] +pub struct TerminalError(#[source] T); + +impl MaybeRetryableError for RetryableError +where + T: std::error::Error + 'static, +{ + fn retryable(&self) -> bool { + true + } +} + +impl MaybeRetryableError for TerminalError +where + T: std::error::Error + 'static, +{ + fn retryable(&self) -> bool { + false + } +} + +impl std::fmt::Display for RetryableError +where + T: std::fmt::Debug + std::fmt::Display + std::error::Error, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[retryable] {}", self.0) + } +} + +impl std::fmt::Display for TerminalError +where + T: std::fmt::Debug + std::fmt::Display + std::error::Error + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[terminal] {}", self.0) + } +} diff --git a/crates/platform/src/hash.rs b/crates/platform/src/hash.rs new file mode 100644 index 0000000000..b1290f8db2 --- /dev/null +++ b/crates/platform/src/hash.rs @@ -0,0 +1,12 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub type HashMap = hashbrown::HashMap; +pub type HashSet = hashbrown::HashSet; diff --git a/crates/platform/src/lib.rs b/crates/platform/src/lib.rs new file mode 100644 index 0000000000..6d935bc9ab --- /dev/null +++ b/crates/platform/src/lib.rs @@ -0,0 +1,26 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod errors; +pub mod hash; +pub mod memory; +pub mod network; +pub mod storage; +pub mod sync; + +// prelude exports; +pub mod prelude { + pub use crate::errors::*; + pub use crate::hash::{HashMap, HashSet}; + pub use crate::memory::EstimatedMemorySize; + pub use crate::network::NetSerde; + pub use crate::sync::{Mutex, RwLock}; + pub use restate_util_string::{ReString, ToReString, format_restring}; +} diff --git a/crates/memory/src/footprint.rs b/crates/platform/src/memory.rs similarity index 100% rename from crates/memory/src/footprint.rs rename to crates/platform/src/memory.rs diff --git a/crates/platform/src/network.rs b/crates/platform/src/network.rs new file mode 100644 index 0000000000..baae7b89df --- /dev/null +++ b/crates/platform/src/network.rs @@ -0,0 +1,98 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use crate::hash::HashMap; + +/// A marker trait for types that can be serialized and sent over the network. +/// +/// Types implementing this trait are considered eligible for wire transmission, +/// typically via serialization. It is intended to be implemented automatically +/// using the `#[derive(NetSerde)]` macro. +/// +/// # Example +/// ```ignore +/// #[derive(NetSerde)] +/// struct MyMessage { +/// a: u64, +/// b: String, +/// } +/// ``` +pub trait NetSerde {} + +macro_rules! impl_net_serde { + ($t:ty) => { + impl NetSerde for $t {} + }; + ($($t:ty),+) => { + $(impl_net_serde!($t);)+ + } +} + +impl_net_serde!( + bool, + usize, + u8, + u16, + u32, + u64, + u128, + isize, + i8, + i16, + i32, + i64, + i128, + String, + bytes::Bytes, + std::time::Duration +); + +#[cfg(feature = "bytestring")] +impl_net_serde!(bytestring::ByteString); + +macro_rules! impl_net_serde_tuple { + ($($t:ident),+) => { + impl<$($t),+> NetSerde for ($($t),+) where $($t: NetSerde),+ {} + }; +} + +impl_net_serde_tuple!(T0, T1); +impl_net_serde_tuple!(T0, T1, T2); +impl_net_serde_tuple!(T0, T1, T2, T3); +impl_net_serde_tuple!(T0, T1, T2, T3, T4); +impl_net_serde_tuple!(T0, T1, T2, T3, T4, T5); +impl_net_serde_tuple!(T0, T1, T2, T3, T4, T5, T6); + +impl NetSerde for Vec where T: NetSerde {} +impl NetSerde for Option where T: NetSerde {} +impl NetSerde for HashMap +where + K: NetSerde, + V: NetSerde, +{ +} + +impl NetSerde for BTreeMap +where + K: NetSerde, + V: NetSerde, +{ +} + +impl NetSerde for crate::hash::HashSet where V: NetSerde {} +impl NetSerde for core::range::RangeInclusive where Idx: NetSerde {} +impl NetSerde for core::ops::RangeInclusive where Idx: NetSerde {} +impl NetSerde for Arc where T: NetSerde {} +impl NetSerde for Arc<[T]> where T: NetSerde {} +impl NetSerde for Box where T: NetSerde {} +impl NetSerde for [T; N] where T: NetSerde {} diff --git a/crates/platform/src/storage.rs b/crates/platform/src/storage.rs new file mode 100644 index 0000000000..d269948759 --- /dev/null +++ b/crates/platform/src/storage.rs @@ -0,0 +1,209 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use bytes::{Buf, BufMut, BytesMut}; +use downcast_rs::{DowncastSync, impl_downcast}; + +use restate_util_string::{ReString, format_restring}; + +use crate::errors::GenericError; + +#[derive(Debug, thiserror::Error)] +pub enum StorageEncodeError { + #[error("encoding failed: {0}")] + EncodeValue(GenericError), + #[error("only support serializing types of size <= 4GB, got: {0} bytes")] + SizeOverflow(usize), +} + +#[derive(Debug, thiserror::Error)] +pub enum StorageDecodeError { + #[error("failed reading codec: {0}")] + ReadingCodec(ReString), + #[error("decoding failed: {0}")] + DecodeValue(GenericError), + #[error("unsupported codec kind: {0}")] + UnsupportedCodecKind(StorageCodecKind), +} + +/// Trait to encode a value using the specified [`Self::default_codec`]. The trait is used by the +/// [`StorageCodec`] to first write the codec byte and then the serialized value via +/// [`Self::encode`]. +/// +/// # Important +/// The [`Self::encode`] implementation should use the codec specified by [`Self::default_codec`]. +pub trait StorageEncode: DowncastSync { + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError>; + + /// Codec which is used when encode new values. + fn default_codec(&self) -> StorageCodecKind; +} + +impl_downcast!(sync StorageEncode); + +/// Trait to decode a value given the [`StorageCodecKind`]. This trait is used by the +/// [`StorageCodec`] to decode a value after reading the used storage codec. +/// +/// # Important +/// To support codec evolution, this trait implementation needs to be able to decode values encoded +/// with any previously used codec. +pub trait StorageDecode { + fn decode(buf: &mut B, kind: StorageCodecKind) -> Result + where + Self: Sized; +} + +#[derive(Debug, Copy, Clone, derive_more::Display, PartialEq, Eq)] +#[cfg_attr(feature = "bilrost", derive(bilrost::Enumeration))] +#[repr(u8)] +pub enum StorageCodecKind { + /// plain old protobuf + Protobuf = 1, + /// flexbuffers + serde (length-prefixed) + FlexbuffersSerde = 2, + /// length-prefixed raw-bytes. length is u32 + LengthPrefixedRawBytes = 3, + // Note: Discriminant 4 was previously `BincodeSerde`, which was added to prepare for future + // use but was never actually used for persistent storage. It has been removed because bincode + // is no longer maintained. The discriminant is intentionally skipped to prevent accidental + // reuse. + /// Json (no length prefix) + Json = 5, + /// Bilrost (no length-prefixed) + Bilrost = 6, + /// A custom encoding that does not rely on any of the standard encoding formats + /// supported by the [`encode`] and [`decode`] modules. + /// + /// When using this variant, the encoding and decoding logic is entirely defined + /// by the implementation of the [`StorageEncode`] and [`StorageDecode`] traits. + /// + /// While you may still use utility functions from the [`encode`] and [`decode`] modules, + /// it is up to your implementation to decide how (or if) to use them, and how the final + /// byte representation is constructed. + Custom = 7, +} + +impl From for u8 { + #[inline] + fn from(value: StorageCodecKind) -> Self { + value as u8 + } +} + +impl TryFrom for StorageCodecKind { + type Error = StorageDecodeError; + + #[inline] + fn try_from(value: u8) -> Result { + match value { + 1 => Ok(Self::Protobuf), + 2 => Ok(Self::FlexbuffersSerde), + 3 => Ok(Self::LengthPrefixedRawBytes), + 5 => Ok(Self::Json), + 6 => Ok(Self::Bilrost), + 7 => Ok(Self::Custom), + value => Err(StorageDecodeError::ReadingCodec(format_restring!( + "unknown discriminant '{value}'" + ))), + } + } +} + +/// Enable simple serialization of String types as length-prefixed byte slice +impl StorageEncode for String { + fn default_codec(&self) -> StorageCodecKind { + StorageCodecKind::LengthPrefixedRawBytes + } + + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { + let my_bytes = self.as_bytes(); + buf.put_u32_le( + u32::try_from(my_bytes.len()) + .map_err(|_| StorageEncodeError::SizeOverflow(my_bytes.len()))?, + ); + if buf.remaining_mut() < my_bytes.len() { + return Err(StorageEncodeError::EncodeValue( + format!( + "not enough buffer space to serialize value;\ + required {} bytes but free capacity was {}", + my_bytes.len(), + buf.remaining_mut() + ) + .into(), + )); + } + buf.put_slice(my_bytes); + Ok(()) + } +} +impl StorageDecode for String { + fn decode( + buf: &mut B, + kind: StorageCodecKind, + ) -> Result + where + Self: Sized, + { + match kind { + StorageCodecKind::LengthPrefixedRawBytes => { + if buf.remaining() < std::mem::size_of::() { + return Err(StorageDecodeError::DecodeValue( + format!( + "insufficient data: expecting {} bytes for length", + std::mem::size_of::() + ) + .into(), + )); + } + let length = usize::try_from(buf.get_u32_le()).expect("u32 to fit into usize"); + + if buf.remaining() < length { + return Err(StorageDecodeError::DecodeValue( + format!( + "insufficient data: expecting {} bytes for flexbuffers", + length + ) + .into(), + )); + } + + let bytes = buf.take(length); + Ok(String::from_utf8_lossy(bytes.chunk()).to_string()) + } + codec => Err(StorageDecodeError::UnsupportedCodecKind(codec)), + } + } +} + +// Enable simple serialization of Bytes types as length-prefixed byte slice +impl StorageEncode for bytes::Bytes { + fn default_codec(&self) -> StorageCodecKind { + StorageCodecKind::LengthPrefixedRawBytes + } + + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { + buf.put_u32_le( + u32::try_from(self.len()).map_err(|_| StorageEncodeError::SizeOverflow(self.len()))?, + ); + if buf.remaining_mut() < self.len() { + return Err(StorageEncodeError::EncodeValue( + format!( + "not enough buffer space to serialize value;\ + required {} bytes but free capacity was {}", + self.len(), + buf.remaining_mut() + ) + .into(), + )); + } + buf.put_slice(&self[..]); + Ok(()) + } +} diff --git a/crates/platform/src/sync.rs b/crates/platform/src/sync.rs new file mode 100644 index 0000000000..e7a0d6401c --- /dev/null +++ b/crates/platform/src/sync.rs @@ -0,0 +1,12 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub type RwLock = parking_lot::RwLock; +pub type Mutex = parking_lot::Mutex; diff --git a/crates/sharding/Cargo.toml b/crates/sharding/Cargo.toml index 37834bc639..07513b472f 100644 --- a/crates/sharding/Cargo.toml +++ b/crates/sharding/Cargo.toml @@ -15,6 +15,8 @@ bilrost = ["dep:bilrost"] [dependencies] restate-workspace-hack = { workspace = true } +restate-platform = { workspace = true } + bilrost = { workspace = true, optional = true } derive_more = { workspace = true, features = ["deref", "from", "into", "add", "display", "debug", "from_str"] } serde = { workspace = true, optional = true } diff --git a/crates/sharding/src/key_range.rs b/crates/sharding/src/key_range.rs index 336654b513..2ebfacd827 100644 --- a/crates/sharding/src/key_range.rs +++ b/crates/sharding/src/key_range.rs @@ -12,6 +12,8 @@ use std::fmt; use std::ops::{Bound, RangeBounds}; use std::range::RangeInclusiveIter; +use restate_platform::network::NetSerde; + /// An inclusive range of partition keys `[start, end]`. /// /// This is a compact, `Copy` representation of an inclusive key range backed by @@ -22,6 +24,8 @@ use std::range::RangeInclusiveIter; #[repr(transparent)] pub struct KeyRange(std::range::RangeInclusive); +impl NetSerde for KeyRange {} + impl KeyRange { /// The full key space `[0, u64::MAX]`. pub const FULL: Self = Self::new(u64::MIN, u64::MAX); diff --git a/crates/sharding/src/partition_id.rs b/crates/sharding/src/partition_id.rs index 3687e2cf19..df4b7c50b8 100644 --- a/crates/sharding/src/partition_id.rs +++ b/crates/sharding/src/partition_id.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use restate_platform::network::NetSerde; + /// Identifying the partition #[derive( Copy, @@ -43,6 +45,8 @@ impl From for u64 { } } +impl NetSerde for PartitionId {} + impl PartitionId { /// It's your responsibility to ensure the value is within the valid range. pub const fn new_unchecked(v: u16) -> Self { diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 3496bfe0ae..3f37cadb78 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -27,10 +27,11 @@ restate-clock = { workspace = true, features = ["prost-types", "jiff", "hlc"] } restate-encoding = { workspace = true } restate-errors = { workspace = true } restate-memory = { workspace = true } +restate-platform = { workspace = true } restate-serde-util = { workspace = true } +restate-sharding = { workspace = true, features = ["serde", "bilrost"] } restate-test-util = { workspace = true, optional = true } restate-time-util = { workspace = true, features = ["serde", "serde_with"] } -restate-sharding = { workspace = true, features = ["serde", "bilrost"] } restate-util-string = { workspace = true, features = ["serde", "bilrost"] } restate-utoipa = { workspace = true } diff --git a/crates/types/src/errors.rs b/crates/types/src/errors.rs index b20e364561..ddb3d034f2 100644 --- a/crates/types/src/errors.rs +++ b/crates/types/src/errors.rs @@ -14,84 +14,10 @@ use std::fmt; use tonic; use tracing::Level; -use restate_encoding::BilrostNewType; - -/// Error type which abstracts away the actual [`std::error::Error`] type. Use this type -/// if you don't know the actual error type or if it is not important. -pub type GenericError = Box; - -pub type BoxedMaybeRetryableError = Box; - -/// Tells whether an error should be retried by upper layers or not. -pub trait MaybeRetryableError: std::error::Error + 'static { - /// Signal upper layers whether this error should be retried or not. - fn retryable(&self) -> bool { - false - } -} - -static_assertions::assert_obj_safe!(MaybeRetryableError); - -pub trait IntoMaybeRetryable: Sized { - /// Marks the error marked as retryable - fn into_retryable(self) -> RetryableError { - RetryableError(self) - } - - /// Marks the error marked as non-retryable - fn into_terminal(self) -> TerminalError { - TerminalError(self) - } -} - -impl IntoMaybeRetryable for T where - T: fmt::Debug + fmt::Display + Send + Sync + std::error::Error + 'static -{ -} - -/// Wraps any source error and marks it as retryable -#[derive(Debug, thiserror::Error, derive_more::Deref, derive_more::From)] -pub struct RetryableError(#[source] T); +// Re-exports the error traits from restate platform to avoid mass-migration of existing code. +pub use restate_platform::errors::*; -/// Wraps any source error and marks it as non-retryable -#[derive(Debug, thiserror::Error, derive_more::Deref, derive_more::From)] -pub struct TerminalError(#[source] T); - -impl MaybeRetryableError for RetryableError -where - T: std::error::Error + 'static, -{ - fn retryable(&self) -> bool { - true - } -} - -impl MaybeRetryableError for TerminalError -where - T: std::error::Error + 'static, -{ - fn retryable(&self) -> bool { - false - } -} - -impl fmt::Display for RetryableError -where - T: fmt::Debug + fmt::Display + std::error::Error, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[retryable] {}", self.0) - } -} - -impl fmt::Display for TerminalError -where - T: fmt::Debug + fmt::Display + std::error::Error + 'static, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[terminal] {}", self.0) - } -} +use restate_encoding::BilrostNewType; #[derive(Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, BilrostNewType)] #[serde(transparent)] diff --git a/crates/types/src/logs/record.rs b/crates/types/src/logs/record.rs index 9a8d5267ab..067e883920 100644 --- a/crates/types/src/logs/record.rs +++ b/crates/types/src/logs/record.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use bytes::BytesMut; use restate_encoding::NetSerde; -use restate_memory::EstimatedMemorySize; +use restate_platform::memory::EstimatedMemorySize; use crate::storage::{PolyBytes, StorageCodec, StorageDecode, StorageDecodeError, StorageEncode}; use crate::time::NanosSinceEpoch; diff --git a/crates/types/src/net/log_server.rs b/crates/types/src/net/log_server.rs index ea48741611..d732151f8f 100644 --- a/crates/types/src/net/log_server.rs +++ b/crates/types/src/net/log_server.rs @@ -15,7 +15,7 @@ use bitflags::bitflags; use prost_dto::{FromProst, IntoProst}; use restate_encoding::{ArcedSlice, BilrostNewType, NetSerde}; -use restate_memory::EstimatedMemorySize; +use restate_platform::memory::EstimatedMemorySize; use super::{RpcResponse, ServiceTag}; use crate::GenerationalNodeId; diff --git a/crates/types/src/storage.rs b/crates/types/src/storage.rs index 8e439543fa..6c4fe16f85 100644 --- a/crates/types/src/storage.rs +++ b/crates/types/src/storage.rs @@ -16,80 +16,22 @@ use std::sync::Arc; use bytes::{Buf, BufMut, Bytes, BytesMut}; use chrono::Utc; -use downcast_rs::{DowncastSync, impl_downcast}; -use restate_encoding::{BilrostAs, NetSerde}; -use restate_memory::EstimatedMemorySize; +use restate_platform::network::NetSerde; +// Re-export the traits from restate-platform +pub use restate_platform::storage::{ + StorageCodecKind, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, +}; + +use restate_encoding::BilrostAs; +use restate_platform::memory::EstimatedMemorySize; use restate_serde_util::ByteCount; +use restate_util_string::format_restring; -use crate::errors::GenericError; use crate::journal_v2::raw::{RawEntry, RawEntryError, TryFromEntry}; use crate::journal_v2::{Decoder, EntryMetadata, EntryType}; use crate::time::MillisSinceEpoch; -#[derive(Debug, thiserror::Error)] -pub enum StorageEncodeError { - #[error("encoding failed: {0}")] - EncodeValue(GenericError), -} - -#[derive(Debug, thiserror::Error)] -pub enum StorageDecodeError { - #[error("failed reading codec: {0}")] - ReadingCodec(String), - #[error("decoding failed: {0}")] - DecodeValue(GenericError), - #[error("unsupported codec kind: {0}")] - UnsupportedCodecKind(StorageCodecKind), -} - -#[derive( - Debug, Copy, Clone, strum::FromRepr, derive_more::Display, PartialEq, Eq, bilrost::Enumeration, -)] -#[repr(u8)] -pub enum StorageCodecKind { - /// plain old protobuf - Protobuf = 1, - /// flexbuffers + serde (length-prefixed) - FlexbuffersSerde = 2, - /// length-prefixed raw-bytes. length is u32 - LengthPrefixedRawBytes = 3, - // Note: Discriminant 4 was previously `BincodeSerde`, which was added to prepare for future - // use but was never actually used for persistent storage. It has been removed because bincode - // is no longer maintained. The discriminant is intentionally skipped to prevent accidental - // reuse. - /// Json (no length prefix) - Json = 5, - /// Bilrost (no length-prefixed) - Bilrost = 6, - /// A custom encoding that does not rely on any of the standard encoding formats - /// supported by the [`encode`] and [`decode`] modules. - /// - /// When using this variant, the encoding and decoding logic is entirely defined - /// by the implementation of the [`StorageEncode`] and [`StorageDecode`] traits. - /// - /// While you may still use utility functions from the [`encode`] and [`decode`] modules, - /// it is up to your implementation to decide how (or if) to use them, and how the final - /// byte representation is constructed. - Custom = 7, -} - -impl From for u8 { - fn from(value: StorageCodecKind) -> Self { - value as u8 - } -} - -impl TryFrom for StorageCodecKind { - type Error = StorageDecodeError; - - fn try_from(value: u8) -> Result { - StorageCodecKind::from_repr(value).ok_or_else(|| { - StorageDecodeError::ReadingCodec(format!("unknown discriminant '{value}'")) - }) - } -} - /// Codec which encodes [`StorageEncode`] implementations by first writing the /// [`StorageEncode::default_codec`] byte and then encoding the value part via /// [`StorageEncode::encode`]. @@ -119,7 +61,7 @@ impl StorageCodec { pub fn decode(buf: &mut B) -> Result { if buf.remaining() < mem::size_of::() { - return Err(StorageDecodeError::ReadingCodec(format!( + return Err(StorageDecodeError::ReadingCodec(format_restring!( "remaining bytes in buf '{}' < version bytes '{}'", buf.remaining(), mem::size_of::() @@ -134,34 +76,6 @@ impl StorageCodec { } } -/// Trait to encode a value using the specified [`Self::default_codec`]. The trait is used by the -/// [`StorageCodec`] to first write the codec byte and then the serialized value via -/// [`Self::encode`]. -/// -/// # Important -/// The [`Self::encode`] implementation should use the codec specified by [`Self::default_codec`]. -pub trait StorageEncode: DowncastSync { - fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError>; - - /// Codec which is used when encode new values. - fn default_codec(&self) -> StorageCodecKind; -} -impl_downcast!(sync StorageEncode); - -static_assertions::assert_obj_safe!(StorageEncode); - -/// Trait to decode a value given the [`StorageCodecKind`]. This trait is used by the -/// [`StorageCodec`] to decode a value after reading the used storage codec. -/// -/// # Important -/// To support codec evolution, this trait implementation needs to be able to decode values encoded -/// with any previously used codec. -pub trait StorageDecode { - fn decode(buf: &mut B, kind: StorageCodecKind) -> Result - where - Self: Sized; -} - /// Implements the [`StorageEncode`] and [`StorageDecode`] by encoding/decoding the implementing /// type using [`flexbuffers`] and [`serde`]. #[macro_export] @@ -280,101 +194,6 @@ impl StorageEncode for PolyBytes { static_assertions::assert_impl_all!(PolyBytes: Send, Sync); -/// Enable simple serialization of String types as length-prefixed byte slice -impl StorageEncode for String { - fn default_codec(&self) -> StorageCodecKind { - StorageCodecKind::LengthPrefixedRawBytes - } - - fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { - let my_bytes = self.as_bytes(); - buf.put_u32_le(u32::try_from(my_bytes.len()).map_err(|_| { - StorageEncodeError::EncodeValue( - anyhow::anyhow!("only support serializing types of size <= 4GB").into(), - ) - })?); - if buf.remaining_mut() < my_bytes.len() { - return Err(StorageEncodeError::EncodeValue( - anyhow::anyhow!(format!( - "not enough buffer space to serialize value;\ - required {} bytes but free capacity was {}", - my_bytes.len(), - buf.remaining_mut() - )) - .into(), - )); - } - buf.put_slice(my_bytes); - Ok(()) - } -} -impl StorageDecode for String { - fn decode( - buf: &mut B, - kind: StorageCodecKind, - ) -> Result - where - Self: Sized, - { - match kind { - StorageCodecKind::LengthPrefixedRawBytes => { - if buf.remaining() < mem::size_of::() { - return Err(StorageDecodeError::DecodeValue( - anyhow::anyhow!( - "insufficient data: expecting {} bytes for length", - mem::size_of::() - ) - .into(), - )); - } - let length = usize::try_from(buf.get_u32_le()).expect("u32 to fit into usize"); - - if buf.remaining() < length { - return Err(StorageDecodeError::DecodeValue( - anyhow::anyhow!( - "insufficient data: expecting {} bytes for flexbuffers", - length - ) - .into(), - )); - } - - let bytes = buf.take(length); - Ok(String::from_utf8_lossy(bytes.chunk()).to_string()) - } - codec => Err(StorageDecodeError::UnsupportedCodecKind(codec)), - } - } -} - -// Enable simple serialization of Bytes types as length-prefixed byte slice -impl StorageEncode for bytes::Bytes { - fn default_codec(&self) -> StorageCodecKind { - StorageCodecKind::LengthPrefixedRawBytes - } - - fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { - buf.put_u32_le(u32::try_from(self.len()).map_err(|_| { - StorageEncodeError::EncodeValue( - anyhow::anyhow!("only support serializing types of size <= 4GB").into(), - ) - })?); - if buf.remaining_mut() < self.len() { - return Err(StorageEncodeError::EncodeValue( - anyhow::anyhow!(format!( - "not enough buffer space to serialize value;\ - required {} bytes but free capacity was {}", - self.len(), - buf.remaining_mut() - )) - .into(), - )); - } - buf.put_slice(&self[..]); - Ok(()) - } -} - /// A marker stored in the storage /// /// The marker is used to sanity-check if the storage is correctly initialized and whether From 36fec4d3b21b7bf61a4ba546392bd0c6cbe4a1a6 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Wed, 22 Apr 2026 14:21:41 +0100 Subject: [PATCH 2/2] [WIP] Clippy to disallow std types --- AGENTS.md | 1 + Cargo.toml | 4 ++++ clippy.toml | 9 +++++++++ crates/platform/src/lib.rs | 3 +++ 4 files changed, 17 insertions(+) diff --git a/AGENTS.md b/AGENTS.md index d8f52fb2de..ea2db28f79 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -53,6 +53,7 @@ If these rules conflict with normal behavior, always follow the rules above. 1. New or deprecated config options must have `/// Since vX.Y.Z` in their doc comment. 1. Use `ByteCount::from(value)` (from `restate_memory`) when displaying byte sizes in errors/logs. 1. Use `KeyRange` (from `restate_sharding`, re-exported via `restate_types::sharding::KeyRange`) instead of `std::ops::RangeInclusive` for partition key ranges. `KeyRange` is `Copy`, 16 bytes (vs 24), and has wire-compatible serde/bilrost encoding. +1. Use types from `restate_platform` instead of their std equivalents: `restate_platform::hash::{HashMap, HashSet}` instead of `std::collections::{HashMap, HashSet}`, and `restate_platform::sync::{Mutex, RwLock}` instead of `std::sync::{Mutex, RwLock}`. The `restate_platform::prelude::*` re-exports the most common types and traits. See `clippy.toml` for the enforced `disallowed_types` list. # Validation Before Committing Changes diff --git a/Cargo.toml b/Cargo.toml index 0cb4ec11b9..5240b40136 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -333,6 +333,10 @@ opt-level = 2 [profile.dist] inherits = "release" +[workspace.lints.clippy] +# Prefer restate-platform's re-exports over direct std usage. See clippy.toml for the list. +disallowed_types = "warn" + [workspace.lints.rust] # Temporarily allow unused assignments until https://github.com/rust-lang/rust/issues/147648 has been fixed. # The problem is that error fields used by thiserror::error attribute generate false positives for unused_assignments. diff --git a/clippy.toml b/clippy.toml index 1420ac21db..87638891ce 100644 --- a/clippy.toml +++ b/clippy.toml @@ -2,3 +2,12 @@ ignore-interior-mutability = ["bytes::Bytes", "bytestring::ByteString", "http::header::HeaderName", "http::header::HeaderValue"] # Sometimes we want to keep the mod.rs file clean allow-private-module-inception = true + +# Prefer restate-platform's re-exports over direct std usage. +# Activated by the clippy::disallowed_types lint in [workspace.lints.clippy]. +disallowed-types = [ + { path = "std::sync::Mutex", reason = "use `restate_platform::sync::Mutex` instead" }, + { path = "std::sync::RwLock", reason = "use `restate_platform::sync::RwLock` instead" }, + { path = "std::collections::HashMap", reason = "use `restate_platform::hash::HashMap` instead" }, + { path = "std::collections::HashSet", reason = "use `restate_platform::hash::HashSet` instead" }, +] diff --git a/crates/platform/src/lib.rs b/crates/platform/src/lib.rs index 6d935bc9ab..a59d295a89 100644 --- a/crates/platform/src/lib.rs +++ b/crates/platform/src/lib.rs @@ -8,6 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +// This crate re-exports the disallowed types — allow them here. +#![allow(clippy::disallowed_types)] + pub mod errors; pub mod hash; pub mod memory;