diff --git a/Cargo.lock b/Cargo.lock index ee48207986..c315d96a97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7179,7 +7179,7 @@ dependencies = [ "prost-types", "rand 0.9.4", "restate-clock", - "restate-encoding", + "restate-platform", "restate-workspace-hack", "schemars 1.2.1", "serde", @@ -7240,6 +7240,7 @@ dependencies = [ "restate-futures-util", "restate-memory", "restate-metadata-store", + "restate-platform", "restate-test-util", "restate-time-util", "restate-types", @@ -7316,12 +7317,10 @@ version = "1.6.3-dev" dependencies = [ "bilrost", "bytes", - "bytestring", "rand 0.9.4", "restate-encoding-derive", - "restate-sharding", + "restate-platform", "restate-workspace-hack", - "static_assertions", ] [[package]] @@ -7645,6 +7644,7 @@ dependencies = [ "restate-futures-util", "restate-memory", "restate-metadata-store", + "restate-platform", "restate-rocksdb", "restate-serde-util", "restate-time-util", @@ -7682,10 +7682,9 @@ version = "1.6.3-dev" dependencies = [ "bytes", "futures", - "hashbrown 0.16.1", "metrics", - "parking_lot", "pin-project-lite", + "restate-platform", "restate-serde-util", "restate-workspace-hack", "thiserror 2.0.18", @@ -7862,6 +7861,7 @@ dependencies = [ "restate-types", "restate-wal-protocol", "restate-worker", + "restate-worker-api", "restate-workspace-hack", "rust-rocksdb", "serde", @@ -7951,6 +7951,22 @@ dependencies = [ "url", ] +[[package]] +name = "restate-platform" +version = "1.6.3-dev" +dependencies = [ + "bilrost", + "bytes", + "bytestring", + "derive_more", + "downcast-rs", + "hashbrown 0.16.1", + "parking_lot", + "restate-util-string", + "restate-workspace-hack", + "thiserror 2.0.18", +] + [[package]] name = "restate-queue" version = "1.6.3-dev" @@ -8174,6 +8190,7 @@ dependencies = [ "bilrost", "derive_more", "flexbuffers", + "restate-platform", "restate-sharding", "restate-workspace-hack", "serde", @@ -8391,6 +8408,7 @@ dependencies = [ "restate-encoding", "restate-errors", "restate-memory", + "restate-platform", "restate-serde-util", "restate-sharding", "restate-test-util", @@ -8562,6 +8580,7 @@ dependencies = [ "restate-util-string", "restate-vqueues", "restate-wal-protocol", + "restate-worker-api", "restate-workspace-hack", "rstest", "serde", @@ -8577,6 +8596,18 @@ dependencies = [ "ulid", ] +[[package]] +name = "restate-worker-api" +version = "1.6.3-dev" +dependencies = [ + "restate-core", + "restate-types", + "restate-workspace-hack", + "thiserror 2.0.18", + "tokio", + "tracing", +] + [[package]] name = "restate-workspace-hack" version = "0.0.1" diff --git a/Cargo.toml b/Cargo.toml index 850c14656c..a7bc2b7d4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ restate-metadata-store = { path = "crates/metadata-store" } restate-node = { path = "crates/node" } restate-object-store-util = { path = "crates/object-store-util" } restate-partition-store = { path = "crates/partition-store" } +restate-platform = { path = "crates/platform" } restate-queue = { path = "crates/queue" } restate-rocksdb = { path = "crates/rocksdb" } restate-serde-util = { path = "crates/serde-util" } @@ -95,6 +96,7 @@ restate-utoipa = { path = "crates/utoipa" } restate-vqueues = { path = "crates/vqueues" } restate-wal-protocol = { path = "crates/wal-protocol" } restate-worker = { path = "crates/worker" } +restate-worker-api = { path = "crates/worker-api" } restate-ingestion-client = { path = "crates/ingestion-client" } restate-limiter = { path = "crates/limiter" } diff --git a/crates/clock/Cargo.toml b/crates/clock/Cargo.toml index 38a93f9c64..90e852d84e 100644 --- a/crates/clock/Cargo.toml +++ b/crates/clock/Cargo.toml @@ -14,7 +14,8 @@ test-util = [] [dependencies] restate-workspace-hack = { workspace = true } -restate-encoding = { workspace = true } + +restate-platform = { workspace = true } bilrost = { workspace = true } jiff = { workspace = true, optional = true } diff --git a/crates/clock/src/rough_ts.rs b/crates/clock/src/rough_ts.rs index 881c9f9bea..40f9a9d9d5 100644 --- a/crates/clock/src/rough_ts.rs +++ b/crates/clock/src/rough_ts.rs @@ -14,6 +14,8 @@ use std::num::NonZeroU32; use std::ops::{Add, Sub}; use std::time::Duration; +use restate_platform::network::NetSerde; + use crate::WallClock; use crate::time::MillisSinceEpoch; use crate::unique_timestamp::UniqueTimestamp; @@ -45,7 +47,7 @@ impl fmt::Debug for RoughTimestamp { } } -impl restate_encoding::NetSerde for RoughTimestamp {} +impl NetSerde for RoughTimestamp {} const _: () = { assert!( diff --git a/crates/clock/src/time.rs b/crates/clock/src/time.rs index c133ce754f..0b9458dc29 100644 --- a/crates/clock/src/time.rs +++ b/crates/clock/src/time.rs @@ -17,6 +17,8 @@ use std::time::{Duration, SystemTime}; // Note: BilrostNewType and NetSerde derives are not used since both MillisSinceEpoch // and NanosSinceEpoch have custom implementations for niche optimization. +use restate_platform::network::NetSerde; + use crate::WallClock; /// Milliseconds since the unix epoch. @@ -46,7 +48,7 @@ impl fmt::Debug for MillisSinceEpoch { } } -impl restate_encoding::NetSerde for MillisSinceEpoch {} +impl NetSerde for MillisSinceEpoch {} // Static assertions to ensure that MillisSinceEpoch is the same size as u64 // and that niche optimization works. @@ -376,7 +378,7 @@ impl fmt::Debug for NanosSinceEpoch { } } -impl restate_encoding::NetSerde for NanosSinceEpoch {} +impl NetSerde for NanosSinceEpoch {} // Static assertions to ensure that NanosSinceEpoch is the same size as u64 // and that niche optimization works. diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ec387e1e58..cfdb76c46c 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -29,6 +29,7 @@ restate-core-derive = { workspace = true, optional = true } restate-futures-util = { workspace = true } restate-memory = { workspace = true } restate-metadata-store = { workspace = true } +restate-platform = { workspace = true } restate-time-util = { workspace = true } restate-types = { workspace = true } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 7f15552f32..c376cf3e45 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -24,7 +24,6 @@ pub mod network; pub mod partitions; pub mod protobuf; pub mod task_center; -pub mod worker_api; pub use error::*; /// Run tests within task-center diff --git a/crates/core/src/network/incoming.rs b/crates/core/src/network/incoming.rs index 3915c7eebb..594c3e436f 100644 --- a/crates/core/src/network/incoming.rs +++ b/crates/core/src/network/incoming.rs @@ -13,7 +13,7 @@ use std::marker::PhantomData; use bytes::Bytes; use tokio::sync::{oneshot, watch}; -use restate_memory::EstimatedMemorySize; +use restate_platform::memory::EstimatedMemorySize; pub use restate_memory::MemoryLease; use restate_types::GenerationalNodeId; diff --git a/crates/core/src/network/message_router.rs b/crates/core/src/network/message_router.rs index 68d195bf59..17cfc892fd 100644 --- a/crates/core/src/network/message_router.rs +++ b/crates/core/src/network/message_router.rs @@ -30,7 +30,8 @@ use tokio::sync::{mpsc, oneshot, watch}; use tokio_stream::StreamExt; use tracing::{debug, instrument, trace, warn}; -use restate_memory::{EstimatedMemorySize, MemoryLease, MemoryPool}; +use restate_memory::{MemoryLease, MemoryPool}; +use restate_platform::memory::EstimatedMemorySize; use restate_types::SharedString; use restate_types::net::{Service, ServiceTag}; diff --git a/crates/encoding/Cargo.toml b/crates/encoding/Cargo.toml index 2000b3a479..fd63c303ad 100644 --- a/crates/encoding/Cargo.toml +++ b/crates/encoding/Cargo.toml @@ -11,13 +11,10 @@ publish = false restate-workspace-hack = { workspace = true } restate-encoding-derive = { version = "0.1.0", path = "derive" } -restate-sharding = { workspace = true } +restate-platform = { workspace = true } bilrost = { workspace = true } bytes = { workspace = true } -bytestring = { workspace = true } [dev-dependencies] rand = { workspace = true } -restate-sharding = { workspace = true, features = ["bilrost"] } -static_assertions = { workspace = true } \ No newline at end of file diff --git a/crates/encoding/derive/src/net.rs b/crates/encoding/derive/src/net.rs index 91bf6ebb94..e2ea742fd1 100644 --- a/crates/encoding/derive/src/net.rs +++ b/crates/encoding/derive/src/net.rs @@ -44,12 +44,12 @@ pub fn net_serde_inner(input: DeriveInput) -> Result { let where_clauses = field_types.iter().map(|ty| { quote! { - #ty: NetSerde + #ty: ::restate_platform::network::NetSerde } }); let expanded = quote! { - impl ::restate_encoding::NetSerde for #name where #(#where_clauses),* {} + impl ::restate_platform::network::NetSerde for #name where #(#where_clauses),* {} }; Ok(TokenStream::from(expanded)) diff --git a/crates/encoding/src/common.rs b/crates/encoding/src/common.rs index e75535f715..bccd2f9e4e 100644 --- a/crates/encoding/src/common.rs +++ b/crates/encoding/src/common.rs @@ -13,8 +13,9 @@ use bilrost::{ encoding::{EmptyState, ForOverwrite, Proxiable}, }; use restate_encoding_derive::BilrostNewType; +use restate_platform::network::NetSerde; -use crate::{NetSerde, bilrost_encodings::RestateEncoding}; +use crate::bilrost_encodings::RestateEncoding; struct U128Tag; diff --git a/crates/encoding/src/lib.rs b/crates/encoding/src/lib.rs index 988db1e63c..ccbc99f083 100644 --- a/crates/encoding/src/lib.rs +++ b/crates/encoding/src/lib.rs @@ -12,97 +12,10 @@ mod bilrost_as; pub mod bilrost_encodings; mod common; -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::collections::HashSet; -use std::ops::RangeInclusive; -use std::sync::Arc; - pub use bilrost_as::BilrostAsAdaptor; pub use bilrost_encodings::{Arced, ArcedSlice, RestateEncoding}; pub use common::U128; pub use restate_encoding_derive::{BilrostAs, BilrostNewType, NetSerde}; -/// A marker trait for types that can be serialized and sent over the network. -/// -/// Types implementing this trait are considered eligible for wire transmission, -/// typically via serialization. It is intended to be implemented automatically -/// using the `#[derive(NetSerde)]` macro. -/// -/// # Example -/// ```ignore -/// #[derive(NetSerde)] -/// struct MyMessage { -/// a: u64, -/// b: String, -/// } -/// ``` -pub trait NetSerde {} - -macro_rules! impl_net_serde { - ($t:ty) => { - impl NetSerde for $t {} - }; - ($($t:ty),+) => { - $(impl_net_serde!($t);)+ - } -} - -impl_net_serde!( - bool, - usize, - u8, - u16, - u32, - u64, - u128, - isize, - i8, - i16, - i32, - i64, - i128, - String, - bytes::Bytes, - bytestring::ByteString, - std::time::Duration -); - -macro_rules! impl_net_serde_tuple { - ($($t:ident),+) => { - impl<$($t),+> NetSerde for ($($t),+) where $($t: NetSerde),+ {} - }; -} - -impl_net_serde_tuple!(T0, T1); -impl_net_serde_tuple!(T0, T1, T2); -impl_net_serde_tuple!(T0, T1, T2, T3); -impl_net_serde_tuple!(T0, T1, T2, T3, T4); -impl_net_serde_tuple!(T0, T1, T2, T3, T4, T5); -impl_net_serde_tuple!(T0, T1, T2, T3, T4, T5, T6); - -impl NetSerde for Vec where T: NetSerde {} -impl NetSerde for Option where T: NetSerde {} -impl NetSerde for HashMap -where - K: NetSerde, - V: NetSerde, -{ -} - -impl NetSerde for BTreeMap -where - K: NetSerde, - V: NetSerde, -{ -} - -impl NetSerde for HashSet where V: NetSerde {} -impl NetSerde for RangeInclusive where Idx: NetSerde {} -impl NetSerde for restate_sharding::PartitionId {} -impl NetSerde for Arc where T: NetSerde {} -impl NetSerde for Arc<[T]> where T: NetSerde {} -impl NetSerde for Box where T: NetSerde {} -impl NetSerde for [T; N] where T: NetSerde {} #[cfg(test)] mod test { @@ -132,49 +45,4 @@ mod test { assert_eq!(x.id.0, y.id); } - - /// Validates that `KeyRange`'s general bilrost encoding produces the same - /// wire format as `RangeInclusive` with `RestateEncoding`. This ensures - /// that `KeyRange` fields can use `#[bilrost(N)]` and remain wire-compatible - /// with the old `#[bilrost(tag(N), encoding(RestateEncoding))] RangeInclusive`. - #[test] - fn key_range_wire_compat_with_range_inclusive() { - use restate_sharding::KeyRange; - - use super::RestateEncoding; - - #[derive(Debug, PartialEq, bilrost::Message)] - struct WithKeyRange { - #[bilrost(1)] - range: KeyRange, - } - - #[derive(Debug, PartialEq, bilrost::Message)] - struct WithRangeInclusive { - #[bilrost(tag(1), encoding(RestateEncoding))] - range: std::ops::RangeInclusive, - } - - for (start, end) in [(0u64, 0u64), (1, 100), (0, u64::MAX), (42, 42)] { - let kr = KeyRange::new(start, end); - let ri = start..=end; - - let kr_bytes = WithKeyRange { range: kr }.encode_to_vec(); - let ri_bytes = WithRangeInclusive { range: ri.clone() }.encode_to_vec(); - - assert_eq!( - kr_bytes, ri_bytes, - "wire format mismatch for range ({start}, {end})" - ); - - // Cross-decode - let decoded: WithRangeInclusive = - WithRangeInclusive::decode(&*kr_bytes).expect("cross-decode KeyRange→RI"); - assert_eq!(decoded.range, ri); - - let decoded: WithKeyRange = - WithKeyRange::decode(&*ri_bytes).expect("cross-decode RI→KeyRange"); - assert_eq!(decoded.range, kr); - } - } } diff --git a/crates/encoding/tests/net_serde.rs b/crates/encoding/tests/net_serde.rs index b9eca8494a..271672fce0 100644 --- a/crates/encoding/tests/net_serde.rs +++ b/crates/encoding/tests/net_serde.rs @@ -8,10 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::HashMap; - use restate_encoding::NetSerde; -use static_assertions::assert_impl_all; +use restate_platform::hash::HashMap; +use restate_platform::network::NetSerde; #[allow(dead_code)] #[derive(NetSerde)] @@ -30,4 +29,7 @@ struct NotSendable; #[derive(NetSerde)] struct Inner(HashMap); -assert_impl_all!(SomeMessage: NetSerde); +const _: fn() = || { + fn assert_impl_all() {} + assert_impl_all::(); +}; diff --git a/crates/log-server/Cargo.toml b/crates/log-server/Cargo.toml index b962518b74..7cef4107a9 100644 --- a/crates/log-server/Cargo.toml +++ b/crates/log-server/Cargo.toml @@ -24,6 +24,7 @@ restate-core = { workspace = true } restate-futures-util = { workspace = true } restate-memory = { workspace = true } restate-metadata-store = { workspace = true } +restate-platform = { workspace = true } restate-rocksdb = { workspace = true } restate-serde-util = { workspace = true } restate-time-util = { workspace = true } diff --git a/crates/log-server/src/loglet_worker.rs b/crates/log-server/src/loglet_worker.rs index 386c876569..b048041f5b 100644 --- a/crates/log-server/src/loglet_worker.rs +++ b/crates/log-server/src/loglet_worker.rs @@ -24,7 +24,7 @@ use restate_core::network::{ use restate_core::task_center::TaskGuard; use restate_core::{ShutdownError, TaskCenter, TaskKind, cancellation_token}; use restate_futures_util::waiter_queue::WaiterQueue; -use restate_memory::EstimatedMemorySize; +use restate_platform::memory::EstimatedMemorySize; use restate_serde_util::ByteCount; use restate_types::GenerationalNodeId; use restate_types::logs::{LogletId, LogletOffset, SequenceNumber, TailState}; diff --git a/crates/memory/Cargo.toml b/crates/memory/Cargo.toml index c5f3f0a0c2..324778869e 100644 --- a/crates/memory/Cargo.toml +++ b/crates/memory/Cargo.toml @@ -13,13 +13,12 @@ default = [] [dependencies] restate-workspace-hack = { workspace = true } +restate-platform = { workspace = true } restate-serde-util = { workspace = true } bytes = { workspace = true } futures = { workspace = true } -hashbrown = { workspace = true } metrics = { workspace = true } -parking_lot = { workspace = true } pin-project-lite = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["sync"] } diff --git a/crates/memory/src/controller.rs b/crates/memory/src/controller.rs index 529b6e8bdc..52aac96bc3 100644 --- a/crates/memory/src/controller.rs +++ b/crates/memory/src/controller.rs @@ -8,11 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use hashbrown::HashMap; use metrics::{Gauge, gauge}; -use parking_lot::RwLock; use tracing::warn; +use restate_platform::hash::HashMap; +use restate_platform::sync::RwLock; use restate_serde_util::NonZeroByteCount; use crate::MemoryPool; diff --git a/crates/memory/src/lib.rs b/crates/memory/src/lib.rs index e7bd0e1222..df19d7ddc2 100644 --- a/crates/memory/src/lib.rs +++ b/crates/memory/src/lib.rs @@ -13,11 +13,8 @@ //! This crate provides: //! - [`MemoryPool`]: A named memory budget for bounding memory usage //! - [`MemoryLease`]: RAII guard for memory leases that can be passed through channels -//! - [`EstimatedMemorySize`]: Trait for types that can estimate their memory -//! footprint mod controller; -mod footprint; pub mod local_pool; mod metric_definitions; mod pinned_memory_stream; @@ -30,5 +27,3 @@ pub use local_pool::{ pub use pinned_memory_stream::{IgnorePinnableMemoryStream, PinnableMapErr, PinnableMemoryStream}; pub use pool::{MemoryLease, MemoryPool, PollMemoryPool}; pub use restate_serde_util::{ByteCount, NonZeroByteCount}; - -pub use footprint::*; diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index abccb1491d..901d9535fb 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -51,6 +51,7 @@ restate-tracing-instrumentation = { workspace = true, features = ["prometheus"] restate-types = { workspace = true, features = ["clap"] } restate-wal-protocol = { workspace = true } restate-worker = { workspace = true } +restate-worker-api = { workspace = true } ahash = { workspace = true } anyhow = { workspace = true } diff --git a/crates/node/src/failure_detector.rs b/crates/node/src/failure_detector.rs index e6566c79fe..6d30a94f52 100644 --- a/crates/node/src/failure_detector.rs +++ b/crates/node/src/failure_detector.rs @@ -29,7 +29,6 @@ use restate_core::{ ServiceReceiver, Verdict, }, task_center::TaskCenterMonitoring, - worker_api::ProcessorsManagerHandle, }; use restate_memory::NonZeroByteCount; use restate_types::health::NodeStatus; @@ -45,6 +44,7 @@ use restate_types::{ config::GossipOptions, net::node::{GetNodeState, GossipService, NodeStateResponse}, }; +use restate_worker_api::ProcessorsManagerHandle; use crate::metric_definitions::GOSSIP_SENT; diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index 94c913d6b9..f7548b9445 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -21,7 +21,6 @@ use restate_core::network::NetworkServerBuilder; use restate_core::network::Networking; use restate_core::network::TransportConnect; use restate_core::partitions::PartitionRouting; -use restate_core::worker_api::PartitionProcessorInvocationClient; use restate_core::{Metadata, MetadataWriter, TaskCenter, TaskKind}; use restate_ingestion_client::IngestionClient; use restate_partition_store::PartitionStoreManager; @@ -44,6 +43,7 @@ use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::protobuf::common::AdminStatus; use restate_types::retries::RetryPolicy; use restate_wal_protocol::Envelope; +use restate_worker_api::PartitionProcessorInvocationClient; #[derive(Debug, thiserror::Error, CodedError)] pub enum AdminRoleBuildError { diff --git a/crates/node/src/roles/ingress.rs b/crates/node/src/roles/ingress.rs index d24b932929..085c048333 100644 --- a/crates/node/src/roles/ingress.rs +++ b/crates/node/src/roles/ingress.rs @@ -10,7 +10,6 @@ use restate_core::network::{Networking, TransportConnect}; use restate_core::partitions::PartitionRouting; -use restate_core::worker_api::PartitionProcessorInvocationClient; use restate_core::{TaskCenter, TaskKind}; use restate_ingress_http::{HyperServerIngress, InvocationClientRequestDispatcher}; use restate_types::config::IngressOptions; @@ -20,6 +19,7 @@ use restate_types::net::listener::AddressBook; use restate_types::partition_table::PartitionTable; use restate_types::protobuf::common::IngressStatus; use restate_types::schema::Schema; +use restate_worker_api::PartitionProcessorInvocationClient; type IngressHttp = HyperServerIngress< Schema, diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 219eb5588e..f57c321bc5 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -16,7 +16,6 @@ use restate_bifrost::Bifrost; use restate_core::network::MessageRouterBuilder; use restate_core::network::Networking; use restate_core::network::TransportConnect; -use restate_core::worker_api::ProcessorsManagerHandle; use restate_core::{MetadataWriter, TaskCenter, TaskKind}; use restate_ingestion_client::IngestionClient; use restate_partition_store::PartitionStoreManager; @@ -27,6 +26,7 @@ use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::protobuf::common::WorkerStatus; use restate_wal_protocol::Envelope; use restate_worker::Worker; +use restate_worker_api::ProcessorsManagerHandle; #[derive(Debug, thiserror::Error, CodedError)] pub enum WorkerRoleBuildError { diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index 58f6d4c09e..d4189d3860 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -11,7 +11,6 @@ use std::ops::ControlFlow; use futures::Stream; -use restate_storage_api::protobuf_types::v1::lazy::InvocationStatusV2Lazy; use restate_rocksdb::{Priority, RocksDbPerfGuard}; use restate_storage_api::invocation_status_table::{ @@ -20,8 +19,10 @@ use restate_storage_api::invocation_status_table::{ WriteInvocationStatusTable, }; use restate_storage_api::protobuf_types::PartitionStoreProtobufValue; +use restate_storage_api::protobuf_types::v1::lazy::InvocationStatusV2Lazy; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey}; +use restate_util_string::format_restring; use crate::TableScan::FullScanPartitionKeyRange; use crate::keys::{DecodeTableKey, KeyKind, define_table_key}; @@ -168,7 +169,7 @@ impl ScanInvocationStatusTable for PartitionStore { break_on_err(InvocationStatusKey::deserialize_from(&mut key))?; if value.len() < std::mem::size_of::() { - return ControlFlow::Break(Err(StorageError::Conversion(restate_types::storage::StorageDecodeError::ReadingCodec(format!( + return ControlFlow::Break(Err(StorageError::Conversion(restate_types::storage::StorageDecodeError::ReadingCodec(format_restring!( "remaining bytes in buf '{}' < version bytes '{}'", value.len(), std::mem::size_of::() @@ -228,7 +229,7 @@ impl ScanInvocationStatusTable for PartitionStore { let status_key = InvocationStatusKey::deserialize_from(&mut key)?; if value.len() < std::mem::size_of::() { - return Err(StorageError::Conversion(restate_types::storage::StorageDecodeError::ReadingCodec(format!( + return Err(StorageError::Conversion(restate_types::storage::StorageDecodeError::ReadingCodec(format_restring!( "remaining bytes in buf '{}' < version bytes '{}'", value.len(), std::mem::size_of::() diff --git a/crates/platform/Cargo.toml b/crates/platform/Cargo.toml new file mode 100644 index 0000000000..e77d573ac4 --- /dev/null +++ b/crates/platform/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "restate-platform" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +publish = false + +[dependencies] +restate-workspace-hack = { workspace = true } + +restate-util-string = { workspace = true } + +bilrost = { workspace = true, optional = true } +bytes = { workspace = true } +bytestring = { workspace = true, optional = true } +derive_more = { workspace = true, features = ["deref", "from", "display"] } +downcast-rs = { workspace = true } +hashbrown = { workspace = true } +parking_lot = { workspace = true } +thiserror = { workspace = true } diff --git a/crates/platform/README.md b/crates/platform/README.md new file mode 100644 index 0000000000..47e036cb97 --- /dev/null +++ b/crates/platform/README.md @@ -0,0 +1,26 @@ +# restate-platform + +Foundation crate for the Restate project. Provides key type aliases, traits, and +essential types that the rest of the codebase can safely and reliably depend upon. + +This crate sits at the bottom of the dependency graph and has minimal +dependencies of its own, making it suitable for use by any crate in the +workspace without pulling in heavy transitive dependencies. + +## What belongs here + +- Common traits (error handling, networking, memory estimation) +- Lightweight type aliases and marker types +- Re-exports that unify scattered definitions under a single import path + +## Prelude + +`restate_platform::prelude::*` provides a curated set of the most commonly used +types and traits, intended as a single convenient import for crates across the +project. + +## What does NOT belong here + +- Concrete implementations or business logic +- Types with heavy dependencies (serialization frameworks, async runtimes, storage engines) +- Anything that would cause circular dependencies diff --git a/crates/platform/src/errors.rs b/crates/platform/src/errors.rs new file mode 100644 index 0000000000..e8c7d683c1 --- /dev/null +++ b/crates/platform/src/errors.rs @@ -0,0 +1,84 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +/// Error type which abstracts away the actual [`std::error::Error`] type. Use this type +/// if you don't know the actual error type or if it is not important. +pub type GenericError = Box; + +pub type BoxedMaybeRetryableError = Box; + +/// Tells whether an error should be retried by upper layers or not. +pub trait MaybeRetryableError: std::error::Error + 'static { + /// Signal upper layers whether this error should be retried or not. + fn retryable(&self) -> bool { + false + } +} + +pub trait IntoMaybeRetryable: Sized { + /// Marks the error marked as retryable + fn into_retryable(self) -> RetryableError { + RetryableError(self) + } + + /// Marks the error marked as non-retryable + fn into_terminal(self) -> TerminalError { + TerminalError(self) + } +} + +impl IntoMaybeRetryable for T where + T: std::fmt::Debug + std::fmt::Display + Send + Sync + std::error::Error + 'static +{ +} + +/// Wraps any source error and marks it as retryable +#[derive(Debug, thiserror::Error, derive_more::Deref, derive_more::From)] +pub struct RetryableError(#[source] T); + +/// Wraps any source error and marks it as non-retryable +#[derive(Debug, thiserror::Error, derive_more::Deref, derive_more::From)] +pub struct TerminalError(#[source] T); + +impl MaybeRetryableError for RetryableError +where + T: std::error::Error + 'static, +{ + fn retryable(&self) -> bool { + true + } +} + +impl MaybeRetryableError for TerminalError +where + T: std::error::Error + 'static, +{ + fn retryable(&self) -> bool { + false + } +} + +impl std::fmt::Display for RetryableError +where + T: std::fmt::Debug + std::fmt::Display + std::error::Error, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[retryable] {}", self.0) + } +} + +impl std::fmt::Display for TerminalError +where + T: std::fmt::Debug + std::fmt::Display + std::error::Error + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[terminal] {}", self.0) + } +} diff --git a/crates/platform/src/hash.rs b/crates/platform/src/hash.rs new file mode 100644 index 0000000000..b1290f8db2 --- /dev/null +++ b/crates/platform/src/hash.rs @@ -0,0 +1,12 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub type HashMap = hashbrown::HashMap; +pub type HashSet = hashbrown::HashSet; diff --git a/crates/platform/src/lib.rs b/crates/platform/src/lib.rs new file mode 100644 index 0000000000..6d935bc9ab --- /dev/null +++ b/crates/platform/src/lib.rs @@ -0,0 +1,26 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod errors; +pub mod hash; +pub mod memory; +pub mod network; +pub mod storage; +pub mod sync; + +// prelude exports; +pub mod prelude { + pub use crate::errors::*; + pub use crate::hash::{HashMap, HashSet}; + pub use crate::memory::EstimatedMemorySize; + pub use crate::network::NetSerde; + pub use crate::sync::{Mutex, RwLock}; + pub use restate_util_string::{ReString, ToReString, format_restring}; +} diff --git a/crates/memory/src/footprint.rs b/crates/platform/src/memory.rs similarity index 100% rename from crates/memory/src/footprint.rs rename to crates/platform/src/memory.rs diff --git a/crates/platform/src/network.rs b/crates/platform/src/network.rs new file mode 100644 index 0000000000..baae7b89df --- /dev/null +++ b/crates/platform/src/network.rs @@ -0,0 +1,98 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use crate::hash::HashMap; + +/// A marker trait for types that can be serialized and sent over the network. +/// +/// Types implementing this trait are considered eligible for wire transmission, +/// typically via serialization. It is intended to be implemented automatically +/// using the `#[derive(NetSerde)]` macro. +/// +/// # Example +/// ```ignore +/// #[derive(NetSerde)] +/// struct MyMessage { +/// a: u64, +/// b: String, +/// } +/// ``` +pub trait NetSerde {} + +macro_rules! impl_net_serde { + ($t:ty) => { + impl NetSerde for $t {} + }; + ($($t:ty),+) => { + $(impl_net_serde!($t);)+ + } +} + +impl_net_serde!( + bool, + usize, + u8, + u16, + u32, + u64, + u128, + isize, + i8, + i16, + i32, + i64, + i128, + String, + bytes::Bytes, + std::time::Duration +); + +#[cfg(feature = "bytestring")] +impl_net_serde!(bytestring::ByteString); + +macro_rules! impl_net_serde_tuple { + ($($t:ident),+) => { + impl<$($t),+> NetSerde for ($($t),+) where $($t: NetSerde),+ {} + }; +} + +impl_net_serde_tuple!(T0, T1); +impl_net_serde_tuple!(T0, T1, T2); +impl_net_serde_tuple!(T0, T1, T2, T3); +impl_net_serde_tuple!(T0, T1, T2, T3, T4); +impl_net_serde_tuple!(T0, T1, T2, T3, T4, T5); +impl_net_serde_tuple!(T0, T1, T2, T3, T4, T5, T6); + +impl NetSerde for Vec where T: NetSerde {} +impl NetSerde for Option where T: NetSerde {} +impl NetSerde for HashMap +where + K: NetSerde, + V: NetSerde, +{ +} + +impl NetSerde for BTreeMap +where + K: NetSerde, + V: NetSerde, +{ +} + +impl NetSerde for crate::hash::HashSet where V: NetSerde {} +impl NetSerde for core::range::RangeInclusive where Idx: NetSerde {} +impl NetSerde for core::ops::RangeInclusive where Idx: NetSerde {} +impl NetSerde for Arc where T: NetSerde {} +impl NetSerde for Arc<[T]> where T: NetSerde {} +impl NetSerde for Box where T: NetSerde {} +impl NetSerde for [T; N] where T: NetSerde {} diff --git a/crates/platform/src/storage.rs b/crates/platform/src/storage.rs new file mode 100644 index 0000000000..d269948759 --- /dev/null +++ b/crates/platform/src/storage.rs @@ -0,0 +1,209 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use bytes::{Buf, BufMut, BytesMut}; +use downcast_rs::{DowncastSync, impl_downcast}; + +use restate_util_string::{ReString, format_restring}; + +use crate::errors::GenericError; + +#[derive(Debug, thiserror::Error)] +pub enum StorageEncodeError { + #[error("encoding failed: {0}")] + EncodeValue(GenericError), + #[error("only support serializing types of size <= 4GB, got: {0} bytes")] + SizeOverflow(usize), +} + +#[derive(Debug, thiserror::Error)] +pub enum StorageDecodeError { + #[error("failed reading codec: {0}")] + ReadingCodec(ReString), + #[error("decoding failed: {0}")] + DecodeValue(GenericError), + #[error("unsupported codec kind: {0}")] + UnsupportedCodecKind(StorageCodecKind), +} + +/// Trait to encode a value using the specified [`Self::default_codec`]. The trait is used by the +/// [`StorageCodec`] to first write the codec byte and then the serialized value via +/// [`Self::encode`]. +/// +/// # Important +/// The [`Self::encode`] implementation should use the codec specified by [`Self::default_codec`]. +pub trait StorageEncode: DowncastSync { + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError>; + + /// Codec which is used when encode new values. + fn default_codec(&self) -> StorageCodecKind; +} + +impl_downcast!(sync StorageEncode); + +/// Trait to decode a value given the [`StorageCodecKind`]. This trait is used by the +/// [`StorageCodec`] to decode a value after reading the used storage codec. +/// +/// # Important +/// To support codec evolution, this trait implementation needs to be able to decode values encoded +/// with any previously used codec. +pub trait StorageDecode { + fn decode(buf: &mut B, kind: StorageCodecKind) -> Result + where + Self: Sized; +} + +#[derive(Debug, Copy, Clone, derive_more::Display, PartialEq, Eq)] +#[cfg_attr(feature = "bilrost", derive(bilrost::Enumeration))] +#[repr(u8)] +pub enum StorageCodecKind { + /// plain old protobuf + Protobuf = 1, + /// flexbuffers + serde (length-prefixed) + FlexbuffersSerde = 2, + /// length-prefixed raw-bytes. length is u32 + LengthPrefixedRawBytes = 3, + // Note: Discriminant 4 was previously `BincodeSerde`, which was added to prepare for future + // use but was never actually used for persistent storage. It has been removed because bincode + // is no longer maintained. The discriminant is intentionally skipped to prevent accidental + // reuse. + /// Json (no length prefix) + Json = 5, + /// Bilrost (no length-prefixed) + Bilrost = 6, + /// A custom encoding that does not rely on any of the standard encoding formats + /// supported by the [`encode`] and [`decode`] modules. + /// + /// When using this variant, the encoding and decoding logic is entirely defined + /// by the implementation of the [`StorageEncode`] and [`StorageDecode`] traits. + /// + /// While you may still use utility functions from the [`encode`] and [`decode`] modules, + /// it is up to your implementation to decide how (or if) to use them, and how the final + /// byte representation is constructed. + Custom = 7, +} + +impl From for u8 { + #[inline] + fn from(value: StorageCodecKind) -> Self { + value as u8 + } +} + +impl TryFrom for StorageCodecKind { + type Error = StorageDecodeError; + + #[inline] + fn try_from(value: u8) -> Result { + match value { + 1 => Ok(Self::Protobuf), + 2 => Ok(Self::FlexbuffersSerde), + 3 => Ok(Self::LengthPrefixedRawBytes), + 5 => Ok(Self::Json), + 6 => Ok(Self::Bilrost), + 7 => Ok(Self::Custom), + value => Err(StorageDecodeError::ReadingCodec(format_restring!( + "unknown discriminant '{value}'" + ))), + } + } +} + +/// Enable simple serialization of String types as length-prefixed byte slice +impl StorageEncode for String { + fn default_codec(&self) -> StorageCodecKind { + StorageCodecKind::LengthPrefixedRawBytes + } + + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { + let my_bytes = self.as_bytes(); + buf.put_u32_le( + u32::try_from(my_bytes.len()) + .map_err(|_| StorageEncodeError::SizeOverflow(my_bytes.len()))?, + ); + if buf.remaining_mut() < my_bytes.len() { + return Err(StorageEncodeError::EncodeValue( + format!( + "not enough buffer space to serialize value;\ + required {} bytes but free capacity was {}", + my_bytes.len(), + buf.remaining_mut() + ) + .into(), + )); + } + buf.put_slice(my_bytes); + Ok(()) + } +} +impl StorageDecode for String { + fn decode( + buf: &mut B, + kind: StorageCodecKind, + ) -> Result + where + Self: Sized, + { + match kind { + StorageCodecKind::LengthPrefixedRawBytes => { + if buf.remaining() < std::mem::size_of::() { + return Err(StorageDecodeError::DecodeValue( + format!( + "insufficient data: expecting {} bytes for length", + std::mem::size_of::() + ) + .into(), + )); + } + let length = usize::try_from(buf.get_u32_le()).expect("u32 to fit into usize"); + + if buf.remaining() < length { + return Err(StorageDecodeError::DecodeValue( + format!( + "insufficient data: expecting {} bytes for flexbuffers", + length + ) + .into(), + )); + } + + let bytes = buf.take(length); + Ok(String::from_utf8_lossy(bytes.chunk()).to_string()) + } + codec => Err(StorageDecodeError::UnsupportedCodecKind(codec)), + } + } +} + +// Enable simple serialization of Bytes types as length-prefixed byte slice +impl StorageEncode for bytes::Bytes { + fn default_codec(&self) -> StorageCodecKind { + StorageCodecKind::LengthPrefixedRawBytes + } + + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { + buf.put_u32_le( + u32::try_from(self.len()).map_err(|_| StorageEncodeError::SizeOverflow(self.len()))?, + ); + if buf.remaining_mut() < self.len() { + return Err(StorageEncodeError::EncodeValue( + format!( + "not enough buffer space to serialize value;\ + required {} bytes but free capacity was {}", + self.len(), + buf.remaining_mut() + ) + .into(), + )); + } + buf.put_slice(&self[..]); + Ok(()) + } +} diff --git a/crates/platform/src/sync.rs b/crates/platform/src/sync.rs new file mode 100644 index 0000000000..e7a0d6401c --- /dev/null +++ b/crates/platform/src/sync.rs @@ -0,0 +1,12 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub type RwLock = parking_lot::RwLock; +pub type Mutex = parking_lot::Mutex; diff --git a/crates/sharding/Cargo.toml b/crates/sharding/Cargo.toml index 37834bc639..07513b472f 100644 --- a/crates/sharding/Cargo.toml +++ b/crates/sharding/Cargo.toml @@ -15,6 +15,8 @@ bilrost = ["dep:bilrost"] [dependencies] restate-workspace-hack = { workspace = true } +restate-platform = { workspace = true } + bilrost = { workspace = true, optional = true } derive_more = { workspace = true, features = ["deref", "from", "into", "add", "display", "debug", "from_str"] } serde = { workspace = true, optional = true } diff --git a/crates/sharding/src/key_range.rs b/crates/sharding/src/key_range.rs index 336654b513..2ebfacd827 100644 --- a/crates/sharding/src/key_range.rs +++ b/crates/sharding/src/key_range.rs @@ -12,6 +12,8 @@ use std::fmt; use std::ops::{Bound, RangeBounds}; use std::range::RangeInclusiveIter; +use restate_platform::network::NetSerde; + /// An inclusive range of partition keys `[start, end]`. /// /// This is a compact, `Copy` representation of an inclusive key range backed by @@ -22,6 +24,8 @@ use std::range::RangeInclusiveIter; #[repr(transparent)] pub struct KeyRange(std::range::RangeInclusive); +impl NetSerde for KeyRange {} + impl KeyRange { /// The full key space `[0, u64::MAX]`. pub const FULL: Self = Self::new(u64::MIN, u64::MAX); diff --git a/crates/sharding/src/partition_id.rs b/crates/sharding/src/partition_id.rs index 3687e2cf19..df4b7c50b8 100644 --- a/crates/sharding/src/partition_id.rs +++ b/crates/sharding/src/partition_id.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use restate_platform::network::NetSerde; + /// Identifying the partition #[derive( Copy, @@ -43,6 +45,8 @@ impl From for u64 { } } +impl NetSerde for PartitionId {} + impl PartitionId { /// It's your responsibility to ensure the value is within the valid range. pub const fn new_unchecked(v: u16) -> Self { diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 3496bfe0ae..3f37cadb78 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -27,10 +27,11 @@ restate-clock = { workspace = true, features = ["prost-types", "jiff", "hlc"] } restate-encoding = { workspace = true } restate-errors = { workspace = true } restate-memory = { workspace = true } +restate-platform = { workspace = true } restate-serde-util = { workspace = true } +restate-sharding = { workspace = true, features = ["serde", "bilrost"] } restate-test-util = { workspace = true, optional = true } restate-time-util = { workspace = true, features = ["serde", "serde_with"] } -restate-sharding = { workspace = true, features = ["serde", "bilrost"] } restate-util-string = { workspace = true, features = ["serde", "bilrost"] } restate-utoipa = { workspace = true } diff --git a/crates/types/src/errors.rs b/crates/types/src/errors.rs index 09ffb19813..cce39ddbae 100644 --- a/crates/types/src/errors.rs +++ b/crates/types/src/errors.rs @@ -14,84 +14,10 @@ use std::fmt; use tonic; use tracing::Level; -use restate_encoding::BilrostNewType; - -/// Error type which abstracts away the actual [`std::error::Error`] type. Use this type -/// if you don't know the actual error type or if it is not important. -pub type GenericError = Box; - -pub type BoxedMaybeRetryableError = Box; - -/// Tells whether an error should be retried by upper layers or not. -pub trait MaybeRetryableError: std::error::Error + 'static { - /// Signal upper layers whether this error should be retried or not. - fn retryable(&self) -> bool { - false - } -} - -static_assertions::assert_obj_safe!(MaybeRetryableError); - -pub trait IntoMaybeRetryable: Sized { - /// Marks the error marked as retryable - fn into_retryable(self) -> RetryableError { - RetryableError(self) - } - - /// Marks the error marked as non-retryable - fn into_terminal(self) -> TerminalError { - TerminalError(self) - } -} - -impl IntoMaybeRetryable for T where - T: fmt::Debug + fmt::Display + Send + Sync + std::error::Error + 'static -{ -} - -/// Wraps any source error and marks it as retryable -#[derive(Debug, thiserror::Error, derive_more::Deref, derive_more::From)] -pub struct RetryableError(#[source] T); +// Re-exports the error traits from restate platform to avoid mass-migration of existing code. +pub use restate_platform::errors::*; -/// Wraps any source error and marks it as non-retryable -#[derive(Debug, thiserror::Error, derive_more::Deref, derive_more::From)] -pub struct TerminalError(#[source] T); - -impl MaybeRetryableError for RetryableError -where - T: std::error::Error + 'static, -{ - fn retryable(&self) -> bool { - true - } -} - -impl MaybeRetryableError for TerminalError -where - T: std::error::Error + 'static, -{ - fn retryable(&self) -> bool { - false - } -} - -impl fmt::Display for RetryableError -where - T: fmt::Debug + fmt::Display + std::error::Error, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[retryable] {}", self.0) - } -} - -impl fmt::Display for TerminalError -where - T: fmt::Debug + fmt::Display + std::error::Error + 'static, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[terminal] {}", self.0) - } -} +use restate_encoding::BilrostNewType; #[derive(Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, BilrostNewType)] #[serde(transparent)] diff --git a/crates/types/src/logs/record.rs b/crates/types/src/logs/record.rs index 9a8d5267ab..067e883920 100644 --- a/crates/types/src/logs/record.rs +++ b/crates/types/src/logs/record.rs @@ -13,7 +13,7 @@ use std::sync::Arc; use bytes::BytesMut; use restate_encoding::NetSerde; -use restate_memory::EstimatedMemorySize; +use restate_platform::memory::EstimatedMemorySize; use crate::storage::{PolyBytes, StorageCodec, StorageDecode, StorageDecodeError, StorageEncode}; use crate::time::NanosSinceEpoch; diff --git a/crates/types/src/net/log_server.rs b/crates/types/src/net/log_server.rs index ea48741611..d732151f8f 100644 --- a/crates/types/src/net/log_server.rs +++ b/crates/types/src/net/log_server.rs @@ -15,7 +15,7 @@ use bitflags::bitflags; use prost_dto::{FromProst, IntoProst}; use restate_encoding::{ArcedSlice, BilrostNewType, NetSerde}; -use restate_memory::EstimatedMemorySize; +use restate_platform::memory::EstimatedMemorySize; use super::{RpcResponse, ServiceTag}; use crate::GenerationalNodeId; diff --git a/crates/types/src/storage.rs b/crates/types/src/storage.rs index 1b82c646b0..6c4fe16f85 100644 --- a/crates/types/src/storage.rs +++ b/crates/types/src/storage.rs @@ -16,86 +16,22 @@ use std::sync::Arc; use bytes::{Buf, BufMut, Bytes, BytesMut}; use chrono::Utc; -use downcast_rs::{DowncastSync, impl_downcast}; -use restate_encoding::{BilrostAs, NetSerde}; -use restate_memory::EstimatedMemorySize; +use restate_platform::network::NetSerde; +// Re-export the traits from restate-platform +pub use restate_platform::storage::{ + StorageCodecKind, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, +}; + +use restate_encoding::BilrostAs; +use restate_platform::memory::EstimatedMemorySize; use restate_serde_util::ByteCount; +use restate_util_string::format_restring; -use crate::errors::{ConversionError, GenericError}; use crate::journal_v2::raw::{RawEntry, RawEntryError, TryFromEntry}; use crate::journal_v2::{Decoder, EntryMetadata, EntryType}; use crate::time::MillisSinceEpoch; -#[derive(Debug, thiserror::Error)] -pub enum StorageEncodeError { - #[error("encoding failed: {0}")] - EncodeValue(GenericError), -} - -#[derive(Debug, thiserror::Error)] -pub enum StorageDecodeError { - #[error("failed reading codec: {0}")] - ReadingCodec(String), - #[error("decoding failed: {0}")] - DecodeValue(GenericError), - #[error("unsupported codec kind: {0}")] - UnsupportedCodecKind(StorageCodecKind), -} - -impl From for StorageDecodeError { - fn from(value: ConversionError) -> Self { - StorageDecodeError::DecodeValue(value.into()) - } -} - -#[derive( - Debug, Copy, Clone, strum::FromRepr, derive_more::Display, PartialEq, Eq, bilrost::Enumeration, -)] -#[repr(u8)] -pub enum StorageCodecKind { - /// plain old protobuf - Protobuf = 1, - /// flexbuffers + serde (length-prefixed) - FlexbuffersSerde = 2, - /// length-prefixed raw-bytes. length is u32 - LengthPrefixedRawBytes = 3, - // Note: Discriminant 4 was previously `BincodeSerde`, which was added to prepare for future - // use but was never actually used for persistent storage. It has been removed because bincode - // is no longer maintained. The discriminant is intentionally skipped to prevent accidental - // reuse. - /// Json (no length prefix) - Json = 5, - /// Bilrost (no length-prefixed) - Bilrost = 6, - /// A custom encoding that does not rely on any of the standard encoding formats - /// supported by the [`encode`] and [`decode`] modules. - /// - /// When using this variant, the encoding and decoding logic is entirely defined - /// by the implementation of the [`StorageEncode`] and [`StorageDecode`] traits. - /// - /// While you may still use utility functions from the [`encode`] and [`decode`] modules, - /// it is up to your implementation to decide how (or if) to use them, and how the final - /// byte representation is constructed. - Custom = 7, -} - -impl From for u8 { - fn from(value: StorageCodecKind) -> Self { - value as u8 - } -} - -impl TryFrom for StorageCodecKind { - type Error = StorageDecodeError; - - fn try_from(value: u8) -> Result { - StorageCodecKind::from_repr(value).ok_or_else(|| { - StorageDecodeError::ReadingCodec(format!("unknown discriminant '{value}'")) - }) - } -} - /// Codec which encodes [`StorageEncode`] implementations by first writing the /// [`StorageEncode::default_codec`] byte and then encoding the value part via /// [`StorageEncode::encode`]. @@ -125,7 +61,7 @@ impl StorageCodec { pub fn decode(buf: &mut B) -> Result { if buf.remaining() < mem::size_of::() { - return Err(StorageDecodeError::ReadingCodec(format!( + return Err(StorageDecodeError::ReadingCodec(format_restring!( "remaining bytes in buf '{}' < version bytes '{}'", buf.remaining(), mem::size_of::() @@ -140,34 +76,6 @@ impl StorageCodec { } } -/// Trait to encode a value using the specified [`Self::default_codec`]. The trait is used by the -/// [`StorageCodec`] to first write the codec byte and then the serialized value via -/// [`Self::encode`]. -/// -/// # Important -/// The [`Self::encode`] implementation should use the codec specified by [`Self::default_codec`]. -pub trait StorageEncode: DowncastSync { - fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError>; - - /// Codec which is used when encode new values. - fn default_codec(&self) -> StorageCodecKind; -} -impl_downcast!(sync StorageEncode); - -static_assertions::assert_obj_safe!(StorageEncode); - -/// Trait to decode a value given the [`StorageCodecKind`]. This trait is used by the -/// [`StorageCodec`] to decode a value after reading the used storage codec. -/// -/// # Important -/// To support codec evolution, this trait implementation needs to be able to decode values encoded -/// with any previously used codec. -pub trait StorageDecode { - fn decode(buf: &mut B, kind: StorageCodecKind) -> Result - where - Self: Sized; -} - /// Implements the [`StorageEncode`] and [`StorageDecode`] by encoding/decoding the implementing /// type using [`flexbuffers`] and [`serde`]. #[macro_export] @@ -286,101 +194,6 @@ impl StorageEncode for PolyBytes { static_assertions::assert_impl_all!(PolyBytes: Send, Sync); -/// Enable simple serialization of String types as length-prefixed byte slice -impl StorageEncode for String { - fn default_codec(&self) -> StorageCodecKind { - StorageCodecKind::LengthPrefixedRawBytes - } - - fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { - let my_bytes = self.as_bytes(); - buf.put_u32_le(u32::try_from(my_bytes.len()).map_err(|_| { - StorageEncodeError::EncodeValue( - anyhow::anyhow!("only support serializing types of size <= 4GB").into(), - ) - })?); - if buf.remaining_mut() < my_bytes.len() { - return Err(StorageEncodeError::EncodeValue( - anyhow::anyhow!(format!( - "not enough buffer space to serialize value;\ - required {} bytes but free capacity was {}", - my_bytes.len(), - buf.remaining_mut() - )) - .into(), - )); - } - buf.put_slice(my_bytes); - Ok(()) - } -} -impl StorageDecode for String { - fn decode( - buf: &mut B, - kind: StorageCodecKind, - ) -> Result - where - Self: Sized, - { - match kind { - StorageCodecKind::LengthPrefixedRawBytes => { - if buf.remaining() < mem::size_of::() { - return Err(StorageDecodeError::DecodeValue( - anyhow::anyhow!( - "insufficient data: expecting {} bytes for length", - mem::size_of::() - ) - .into(), - )); - } - let length = usize::try_from(buf.get_u32_le()).expect("u32 to fit into usize"); - - if buf.remaining() < length { - return Err(StorageDecodeError::DecodeValue( - anyhow::anyhow!( - "insufficient data: expecting {} bytes for flexbuffers", - length - ) - .into(), - )); - } - - let bytes = buf.take(length); - Ok(String::from_utf8_lossy(bytes.chunk()).to_string()) - } - codec => Err(StorageDecodeError::UnsupportedCodecKind(codec)), - } - } -} - -// Enable simple serialization of Bytes types as length-prefixed byte slice -impl StorageEncode for bytes::Bytes { - fn default_codec(&self) -> StorageCodecKind { - StorageCodecKind::LengthPrefixedRawBytes - } - - fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { - buf.put_u32_le(u32::try_from(self.len()).map_err(|_| { - StorageEncodeError::EncodeValue( - anyhow::anyhow!("only support serializing types of size <= 4GB").into(), - ) - })?); - if buf.remaining_mut() < self.len() { - return Err(StorageEncodeError::EncodeValue( - anyhow::anyhow!(format!( - "not enough buffer space to serialize value;\ - required {} bytes but free capacity was {}", - self.len(), - buf.remaining_mut() - )) - .into(), - )); - } - buf.put_slice(&self[..]); - Ok(()) - } -} - /// A marker stored in the storage /// /// The marker is used to sanity-check if the storage is correctly initialized and whether diff --git a/crates/worker-api/Cargo.toml b/crates/worker-api/Cargo.toml new file mode 100644 index 0000000000..30154d1a25 --- /dev/null +++ b/crates/worker-api/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "restate-worker-api" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +publish = false + +[dependencies] +restate-workspace-hack = { workspace = true } + +restate-core = { workspace = true } +restate-types = { workspace = true } + +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } diff --git a/crates/core/src/worker_api/mod.rs b/crates/worker-api/src/lib.rs similarity index 100% rename from crates/core/src/worker_api/mod.rs rename to crates/worker-api/src/lib.rs diff --git a/crates/core/src/worker_api/partition_processor_manager.rs b/crates/worker-api/src/partition_processor_manager.rs similarity index 97% rename from crates/core/src/worker_api/partition_processor_manager.rs rename to crates/worker-api/src/partition_processor_manager.rs index 13d3542fb6..48985f61c3 100644 --- a/crates/core/src/worker_api/partition_processor_manager.rs +++ b/crates/worker-api/src/partition_processor_manager.rs @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, oneshot}; use restate_types::{cluster::cluster_state::PartitionProcessorStatus, identifiers::PartitionId}; -use crate::ShutdownError; +use restate_core::ShutdownError; #[derive(Debug)] pub enum ProcessorsManagerCommand { diff --git a/crates/core/src/worker_api/partition_processor_rpc_client.rs b/crates/worker-api/src/partition_processor_rpc_client.rs similarity index 95% rename from crates/core/src/worker_api/partition_processor_rpc_client.rs rename to crates/worker-api/src/partition_processor_rpc_client.rs index 7dc888a974..e61793e317 100644 --- a/crates/core/src/worker_api/partition_processor_rpc_client.rs +++ b/crates/worker-api/src/partition_processor_rpc_client.rs @@ -8,12 +8,15 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use crate::ShutdownError; -use crate::network::ConnectError; -use crate::network::{NetworkSender, RpcReplyError, Swimlane}; -use crate::network::{Networking, TransportConnect}; -use crate::partitions::PartitionRouting; -use assert2::let_assert; +use std::sync::Arc; + +use tracing::trace; + +use restate_core::ShutdownError; +use restate_core::network::ConnectError; +use restate_core::network::{NetworkSender, RpcReplyError, Swimlane}; +use restate_core::network::{Networking, TransportConnect}; +use restate_core::partitions::PartitionRouting; use restate_types::NodeId; use restate_types::errors::GenericError; use restate_types::identifiers::{ @@ -34,8 +37,6 @@ use restate_types::net::partition_processor::{ PartitionProcessorRpcRequest, PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse, }; use restate_types::partition_table::{FindPartition, PartitionTable, PartitionTableError}; -use std::sync::Arc; -use tracing::trace; #[derive(Debug, thiserror::Error)] pub enum PartitionProcessorInvocationClientError { @@ -265,10 +266,9 @@ where ) .await?; - let_assert!( - PartitionProcessorRpcResponse::Submitted(submit_notification) = response, - "Expecting PartitionProcessorRpcResponse::Submitted" - ); + let PartitionProcessorRpcResponse::Submitted(submit_notification) = response else { + panic!("Expecting PartitionProcessorRpcResponse::Submitted"); + }; debug_assert_eq!( request_id, submit_notification.request_id, "Conflicting submit notification received" @@ -292,10 +292,9 @@ where ) .await?; - let_assert!( - PartitionProcessorRpcResponse::Output(invocation_output) = response, - "Expecting PartitionProcessorRpcResponse::Output" - ); + let PartitionProcessorRpcResponse::Output(invocation_output) = response else { + panic!("Expecting PartitionProcessorRpcResponse::Output"); + }; debug_assert_eq!( request_id, invocation_output.request_id, "Conflicting invocation output received" @@ -374,10 +373,9 @@ where ) .await?; - let_assert!( - PartitionProcessorRpcResponse::Appended = response, - "Expecting PartitionProcessorRpcResponse::Appended" - ); + let PartitionProcessorRpcResponse::Appended = response else { + panic!("Expecting PartitionProcessorRpcResponse::Appended"); + }; Ok(()) } @@ -394,10 +392,9 @@ where ) .await?; - let_assert!( - PartitionProcessorRpcResponse::Appended = response, - "Expecting PartitionProcessorRpcResponse::Appended" - ); + let PartitionProcessorRpcResponse::Appended = response else { + panic!("Expecting PartitionProcessorRpcResponse::Appended"); + }; Ok(()) } diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 32d9b771d9..4adf56a379 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -49,6 +49,7 @@ restate-types = { workspace = true } restate-util-string = { workspace = true } restate-vqueues = { workspace = true } restate-wal-protocol = { workspace = true } +restate-worker-api = { workspace = true } ahash = { workspace = true } anyhow = { workspace = true } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 46fbbb6c9f..eb9b832ea5 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -34,7 +34,6 @@ use restate_core::network::MessageRouterBuilder; use restate_core::network::Networking; use restate_core::network::TransportConnect; use restate_core::partitions::PartitionRouting; -use restate_core::worker_api::ProcessorsManagerHandle; use restate_core::{Metadata, TaskKind}; use restate_core::{MetadataWriter, TaskCenter}; use restate_ingestion_client::IngestionClient; @@ -53,6 +52,7 @@ use restate_types::protobuf::common::WorkerStatus; use restate_types::schema::Redaction; use restate_types::schema::kafka::KafkaClusterResolver; use restate_types::schema::subscriptions::SubscriptionResolver; +use restate_worker_api::ProcessorsManagerHandle; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition_processor_manager::PartitionProcessorManager; diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 545da86658..75289effea 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -38,7 +38,6 @@ use restate_core::network::{ ServiceReceiver, ShardControlMessage, ShardRegistrationDecision, Sharded, TransportConnect, Verdict, }; -use restate_core::worker_api::{ProcessorsManagerCommand, ProcessorsManagerHandle}; use restate_core::{ Metadata, MetadataWriter, TaskCenterFutureExt, TaskHandle, TaskKind, cancellation_watcher, my_node_id, @@ -48,6 +47,7 @@ use restate_ingestion_client::IngestionClient; use restate_invoker_api::StatusHandle; use restate_invoker_api::capacity::InvokerCapacity; use restate_invoker_impl::ChannelStatusReader; +use restate_worker_api::{ProcessorsManagerCommand, ProcessorsManagerHandle}; use restate_metadata_server::{MetadataStoreClient, ReadModifyWriteError}; use restate_metadata_store::{ReadWriteError, RetryError, retry_on_retryable_error};