From 8e62bb781c05d4795c912f29782529b84bee287b Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Fri, 8 May 2026 18:36:40 +0200 Subject: [PATCH] wal-protocol: introduce v2 Envelope with bilrost encoding and lazy payloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Add a new v2 envelope under `wal-protocol/src/v2/` that replaces the flexbuffers-encoded v1 layout. Key differences vs v1: - Header now carries (Source, Dedup, RecordKind) directly; the v1 Destination is gone. Partition routing moves to the log record's `Keys` instead of being embedded in the payload. - Encoded with bilrost (length-delimited header + payload) instead of flexbuffers, so readers can decode the header alone and skip the payload bytes when not needed. - `Envelope` is generic over a `Record` marker type, with a `Raw` variant for not-yet-decoded payloads. `into_typed::()` performs the typed decode lazily. - A `compatibility` module provides `TryFrom for v2::Envelope`, mapping every v1 Command variant to its v2 record kind so existing on-disk records keep replaying. Supporting changes: - Bilrost `Message` impls for AnnounceLeader, VersionBarrier, PartitionDurability, UpsertSchema (and bilrost proxy for NodeSet). - New `bilrost_storage_encode_decode!` macro in `restate_types`. - `PartitionDurability` gains `partition_key_range` so it can supply HasRecordKeys without help from the envelope header. - `wal-protocol` gets a `test-util` feature gating `Record::new_test`. Nothing in the worker writes v2 yet; this commit only adds the encoding layer and the v1→v2 bridge. --- Cargo.lock | 4 + .../src/deduplication_table/mod.rs | 1 + .../storage-api/src/vqueue_table/scheduler.rs | 8 +- crates/types/Cargo.toml | 2 +- crates/types/src/logs/mod.rs | 12 + crates/types/src/replication/nodeset.rs | 52 ++ crates/types/src/storage.rs | 33 ++ crates/wal-protocol/Cargo.toml | 7 + crates/wal-protocol/src/control.rs | 69 ++- crates/wal-protocol/src/lib.rs | 1 + crates/wal-protocol/src/v2.rs | 540 ++++++++++++++++++ crates/wal-protocol/src/v2/compatibility.rs | 193 +++++++ crates/wal-protocol/src/v2/records.rs | 475 +++++++++++++++ .../leadership/durability_tracker.rs | 24 +- crates/worker/src/partition/leadership/mod.rs | 2 +- 15 files changed, 1398 insertions(+), 25 deletions(-) create mode 100644 crates/wal-protocol/src/v2.rs create mode 100644 crates/wal-protocol/src/v2/compatibility.rs create mode 100644 crates/wal-protocol/src/v2/records.rs diff --git a/Cargo.lock b/Cargo.lock index 54a9d8e2a9..a70a8c14f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8655,11 +8655,15 @@ dependencies = [ name = "restate-wal-protocol" version = "1.7.0-dev" dependencies = [ + "anyhow", "bilrost", "bytes", "derive_more", + "restate-encoding", + "restate-limiter", "restate-storage-api", "restate-types", + "restate-util-string", "restate-worker-api", "restate-workspace-hack", "serde", diff --git a/crates/storage-api/src/deduplication_table/mod.rs b/crates/storage-api/src/deduplication_table/mod.rs index a670592c6e..3ebf7d18ff 100644 --- a/crates/storage-api/src/deduplication_table/mod.rs +++ b/crates/storage-api/src/deduplication_table/mod.rs @@ -40,6 +40,7 @@ impl DedupInformation { } } + #[deprecated] pub fn ingress(producer_id: impl Into, sequence_number: MessageIndex) -> Self { DedupInformation { producer_id: ProducerId::Other(producer_id.into()), diff --git a/crates/storage-api/src/vqueue_table/scheduler.rs b/crates/storage-api/src/vqueue_table/scheduler.rs index 8e155ca576..7e2a026517 100644 --- a/crates/storage-api/src/vqueue_table/scheduler.rs +++ b/crates/storage-api/src/vqueue_table/scheduler.rs @@ -28,7 +28,7 @@ pub enum YieldReason { }, } -#[derive(Debug, bilrost::Oneof, bilrost::Message, derive_more::From)] +#[derive(Debug, Clone, bilrost::Oneof, bilrost::Message, derive_more::From)] pub enum SchedulerAction { #[bilrost(empty)] Unknown, @@ -40,7 +40,7 @@ pub enum SchedulerAction { Yield(YieldAction), } -#[derive(Debug, bilrost::Message)] +#[derive(Debug, Clone, bilrost::Message)] pub struct RunAction { #[bilrost(tag(1))] pub key: EntryKey, @@ -48,7 +48,7 @@ pub struct RunAction { pub wait_stats: WaitStats, } -#[derive(Debug, bilrost::Message)] +#[derive(Debug, Clone, bilrost::Message)] pub struct YieldAction { #[bilrost(tag(1))] pub key: EntryKey, @@ -56,7 +56,7 @@ pub struct YieldAction { pub next_run_at: Option, } -#[derive(Debug, bilrost::Message)] +#[derive(Debug, Clone, bilrost::Message)] pub struct SchedulerDecisions { #[bilrost(tag(1))] pub qids: Vec<(VQueueId, Vec)>, diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 475bf015b0..f72ecee40c 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -27,7 +27,7 @@ restate-clock = { workspace = true, features = ["prost-types", "jiff", "hlc"] } restate-encoding = { workspace = true } restate-errors = { workspace = true } restate-memory = { workspace = true } -restate-platform = { workspace = true } +restate-platform = { workspace = true, features = ["bilrost"] } restate-serde-util = { workspace = true } restate-sharding = { workspace = true, features = ["serde", "bilrost"] } restate-test-util = { workspace = true, optional = true } diff --git a/crates/types/src/logs/mod.rs b/crates/types/src/logs/mod.rs index 98cf32b49a..e50684b63c 100644 --- a/crates/types/src/logs/mod.rs +++ b/crates/types/src/logs/mod.rs @@ -327,6 +327,18 @@ impl BodyWithKeys { pub fn into_inner(self) -> T { self.inner } + + pub fn keys(&self) -> &Keys { + &self.keys + } + + pub fn inner(&self) -> &T { + &self.inner + } + + pub fn split(self) -> (Keys, T) { + (self.keys, self.inner) + } } impl HasRecordKeys for BodyWithKeys diff --git a/crates/types/src/replication/nodeset.rs b/crates/types/src/replication/nodeset.rs index c75f50cc58..15ba72bdf0 100644 --- a/crates/types/src/replication/nodeset.rs +++ b/crates/types/src/replication/nodeset.rs @@ -406,6 +406,58 @@ fn write_nodes<'a>( write!(f, "]") } +mod bilrost_encoding { + use bilrost::encoding::{EmptyState, ForOverwrite, Proxiable}; + + use crate::PlainNodeId; + + use super::NodeSet; + + impl Proxiable for NodeSet { + type Proxy = Vec; + + fn encode_proxy(&self) -> Self::Proxy { + Vec::from_iter(self.0.iter().cloned()) + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), bilrost::DecodeErrorKind> { + self.0.extend(proxy); + Ok(()) + } + } + + impl EmptyState<(), NodeSet> for () { + fn clear(val: &mut NodeSet) { + val.0.clear(); + } + fn empty() -> NodeSet + where + NodeSet: Sized, + { + NodeSet::new() + } + + fn is_empty(val: &NodeSet) -> bool { + val.is_empty() + } + } + + impl ForOverwrite<(), NodeSet> for () { + fn for_overwrite() -> NodeSet + where + NodeSet: Sized, + { + NodeSet::new() + } + } + + bilrost::delegate_proxied_encoding!( + use encoding (bilrost::encoding::GeneralPacked) + to encode proxied type (NodeSet) + with general encodings + ); +} + #[cfg(test)] mod test { use ahash::HashMap; diff --git a/crates/types/src/storage.rs b/crates/types/src/storage.rs index 6c4fe16f85..06495fa128 100644 --- a/crates/types/src/storage.rs +++ b/crates/types/src/storage.rs @@ -112,6 +112,39 @@ macro_rules! flexbuffers_storage_encode_decode { }; } +/// Implements the [`StorageEncode`] and [`StorageDecode`] by encoding/decoding the implementing +/// type using [`bilrost`]. +#[macro_export] +macro_rules! bilrost_storage_encode_decode { + ($name:tt) => { + impl $crate::storage::StorageEncode for $name { + fn default_codec(&self) -> $crate::storage::StorageCodecKind { + $crate::storage::StorageCodecKind::Bilrost + } + + fn encode( + &self, + buf: &mut ::bytes::BytesMut, + ) -> Result<(), $crate::storage::StorageEncodeError> { + $crate::storage::encode::encode_bilrost(self, buf) + } + } + + impl $crate::storage::StorageDecode for $name { + fn decode( + buf: &mut B, + kind: $crate::storage::StorageCodecKind, + ) -> Result + where + Self: Sized, + { + debug_assert_eq!(kind, $crate::storage::StorageCodecKind::Bilrost); + $crate::storage::decode::decode_bilrost(buf) + } + } + }; +} + /// A polymorphic container of a buffer or a cached storage-encodeable object #[derive(Clone, derive_more::Debug, BilrostAs)] #[bilrost_as(dto::PolyBytes)] diff --git a/crates/wal-protocol/Cargo.toml b/crates/wal-protocol/Cargo.toml index cd44ab32de..0811996f57 100644 --- a/crates/wal-protocol/Cargo.toml +++ b/crates/wal-protocol/Cargo.toml @@ -7,13 +7,20 @@ rust-version.workspace = true license.workspace = true publish = false +[features] +test-util = [] + [dependencies] restate-workspace-hack = { workspace = true } +restate-encoding = { workspace = true } +restate-limiter = { workspace = true } restate-storage-api = { workspace = true } restate-types = { workspace = true } +restate-util-string = { workspace = true } restate-worker-api = { workspace = true, features = ["serde"] } +anyhow = { workspace = true } bilrost = { workspace = true } bytes = { workspace = true } derive_more = { workspace = true, features = ["debug"] } diff --git a/crates/wal-protocol/src/control.rs b/crates/wal-protocol/src/control.rs index a360a58da8..537a0c9f38 100644 --- a/crates/wal-protocol/src/control.rs +++ b/crates/wal-protocol/src/control.rs @@ -9,28 +9,33 @@ // by the Apache License, Version 2.0. use bytes::Bytes; - use restate_storage_api::fsm_table::{CurrentReplicaSetState, NextReplicaSetState}; use restate_types::identifiers::{LeaderEpoch, PartitionId}; -use restate_types::logs::{Keys, Lsn, SequenceNumber}; +use restate_types::logs::{HasRecordKeys, Keys, Lsn, SequenceNumber}; use restate_types::partitions::PartitionConfiguration; use restate_types::partitions::state::{MemberState, ReplicaSetState}; use restate_types::replication::{NodeSet, ReplicationProperty}; use restate_types::schema::Schema; use restate_types::sharding::KeyRange; use restate_types::time::MillisSinceEpoch; -use restate_types::{GenerationalNodeId, SemanticRestateVersion, Version, Versioned}; +use restate_types::{ + GenerationalNodeId, SemanticRestateVersion, Version, Versioned, bilrost_storage_encode_decode, + flexbuffers_storage_encode_decode, +}; /// Announcing a new leader. This message can be written by any component to make the specified /// partition processor the leader. -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, bilrost::Message)] pub struct AnnounceLeader { /// Sender of the announce leader message. /// /// This became non-optional in v1.5. Noting that it has always been set in previous versions, /// it's safe to assume that it's always set. + #[bilrost(tag(1))] pub node_id: GenerationalNodeId, + #[bilrost(tag(2))] pub leader_epoch: LeaderEpoch, + #[bilrost(tag(3))] pub partition_key_range: KeyRange, /// Associated epoch metadata version @@ -40,25 +45,40 @@ pub struct AnnounceLeader { /// /// *Since v1.6* #[serde(default, skip_serializing_if = "Option::is_none")] + #[bilrost(tag(4))] pub epoch_version: Option, /// Current replica set configuration at the time of the announcement. /// This field is optional for backward compatibility with older versions. /// *Since v1.6* #[serde(default, skip_serializing_if = "Option::is_none")] + #[bilrost(tag(5))] pub current_config: Option, /// Next replica set configuration. /// *Since v1.6* #[serde(default, skip_serializing_if = "Option::is_none")] + #[bilrost(tag(6))] pub next_config: Option, } +bilrost_storage_encode_decode!(AnnounceLeader); + +impl HasRecordKeys for AnnounceLeader { + fn record_keys(&self) -> Keys { + Keys::RangeInclusive(self.partition_key_range.start()..=self.partition_key_range.end()) + } +} + #[serde_with::serde_as] -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, bilrost::Message)] pub struct CurrentReplicaSetConfiguration { + #[bilrost(tag = 1)] pub version: Version, + #[bilrost(tag = 2)] pub replica_set: NodeSet, + #[bilrost(tag = 3)] pub modified_at: MillisSinceEpoch, #[serde_as(as = "serde_with::DisplayFromStr")] + #[bilrost(tag = 4)] pub replication: ReplicationProperty, } @@ -87,9 +107,11 @@ impl CurrentReplicaSetConfiguration { } } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, bilrost::Message)] pub struct NextReplicaSetConfiguration { + #[bilrost(tag(1))] pub version: Version, + #[bilrost(tag(2))] pub replica_set: NodeSet, } @@ -138,7 +160,7 @@ fn new_replica_set_state(version: Version, node_set: &NodeSet) -> ReplicaSetStat /// Readers before v1.4.0 will crash when reading this command. For v1.4.0+, the barrier defines the /// minimum version of restate server that can progress after this command. It also updates the FSM /// in case command has been trimmed. -#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, bilrost::Message, serde::Serialize, serde::Deserialize)] pub struct VersionBarrier { /// The minimum version required (inclusive) to progress after this barrier. pub version: SemanticRestateVersion, @@ -147,6 +169,14 @@ pub struct VersionBarrier { pub partition_key_range: Keys, } +bilrost_storage_encode_decode!(VersionBarrier); + +impl HasRecordKeys for VersionBarrier { + fn record_keys(&self) -> Keys { + self.partition_key_range.clone() + } +} + /// Updates the `PARTITION_DURABILITY` FSM variable to the given value. Note that durability /// only applies to partitions with the same `partition_id`. At replay time, the partition will /// ignore updates that are not targeted to its own ID. @@ -154,14 +184,29 @@ pub struct VersionBarrier { /// NOTE: The durability point is monotonically increasing. /// /// Since v1.4.2. -#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, bilrost::Message, serde::Serialize, serde::Deserialize)] pub struct PartitionDurability { + #[bilrost(tag(1))] pub partition_id: PartitionId, /// The partition has applied this LSN durably to the replica-set and/or has been /// persisted in a snapshot in the snapshot repository. + #[bilrost(tag(2))] pub durable_point: Lsn, /// Timestamp which the durability point was updated + #[bilrost(tag(3))] pub modification_time: MillisSinceEpoch, + /// partition key range + #[bilrost(tag(4))] + #[serde(default)] + pub partition_key_range: Keys, +} + +bilrost_storage_encode_decode!(PartitionDurability); + +impl HasRecordKeys for PartitionDurability { + fn record_keys(&self) -> Keys { + self.partition_key_range.clone() + } } /// Consistently store schema across partition replicas. @@ -173,6 +218,14 @@ pub struct UpsertSchema { pub schema: Schema, } +flexbuffers_storage_encode_decode!(UpsertSchema); + +impl HasRecordKeys for UpsertSchema { + fn record_keys(&self) -> Keys { + self.partition_key_range.clone() + } +} + /// Consistently distribute the cluster-global rule book across partition /// replicas. Each partition's leader observes a node-level cache of the /// rule book stored in the metadata store and proposes this command when diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index e9bdb22a7e..015d0e6d24 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -11,5 +11,6 @@ pub mod control; pub mod timer; pub mod v1; +pub mod v2; pub use v1::{Command, Destination, Envelope, Header, Source}; diff --git a/crates/wal-protocol/src/v2.rs b/crates/wal-protocol/src/v2.rs new file mode 100644 index 0000000000..5e6d1df785 --- /dev/null +++ b/crates/wal-protocol/src/v2.rs @@ -0,0 +1,540 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::marker::PhantomData; +use std::sync::Arc; + +use bilrost::encoding::encoded_len_varint; +use bilrost::{Message, OwnedMessage}; +use bytes::{BufMut, BytesMut}; + +use restate_encoding::U128; +use restate_types::identifiers::{LeaderEpoch, PartitionId}; +use restate_types::logs::{BodyWithKeys, HasRecordKeys, Keys}; +use restate_types::storage::{ + PolyBytes, StorageCodecKind, StorageDecode, StorageDecodeError, StorageEncode, + StorageEncodeError, +}; +use restate_util_string::ReString; + +use crate::v1; + +mod compatibility; +pub mod records; + +mod sealed { + pub trait Sealed {} +} + +/// Metadata that accompanies every WAL record and carries routing, deduplication, +/// and serialization details required to interpret the payload. +#[derive(Debug, Clone, bilrost::Message)] +pub struct Header { + #[bilrost(1)] + dedup: Dedup, + /// Payload record kind + #[bilrost(2)] + kind: RecordKind, + /// Payload codec + #[bilrost(3)] + codec: Option, +} + +impl Header { + pub fn dedup(&self) -> &Dedup { + &self.dedup + } + + pub fn kind(&self) -> RecordKind { + self.kind + } +} + +/// Outgoing envelope used when you are sending out records +/// over bifrost. +#[derive(Clone, derive_more::Deref)] +pub struct Envelope { + #[deref] + header: Header, + payload: PolyBytes, + _p: PhantomData, +} + +impl Envelope { + pub fn header(&self) -> &Header { + &self.header + } +} + +impl Envelope { + pub fn new(dedup: Dedup, payload: impl Into>) -> Self { + let payload = payload.into(); + Self { + header: Header { + dedup, + kind: R::KIND, + codec: Some(payload.default_codec()), + }, + payload: PolyBytes::Typed(payload), + _p: PhantomData, + } + } +} + +impl StorageEncode for Envelope { + fn default_codec(&self) -> StorageCodecKind { + StorageCodecKind::Custom + } + + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { + let len = self.header.encoded_len(); + // todo(azmy): Followup! Also reserve enough space for the payload in one go + buf.reserve(encoded_len_varint(len as u64) + len); + + self.header + .encode_length_delimited(buf) + .map_err(|err| StorageEncodeError::EncodeValue(err.into()))?; + + match &self.payload { + PolyBytes::Bytes(bytes) => buf.put_slice(bytes), + PolyBytes::Typed(payload) => payload.encode(buf)?, + PolyBytes::Both(_, bytes) => buf.put_slice(bytes), + } + + Ok(()) + } +} + +/// Marker type used with [`IncomingEnvelope`] to signal that the payload has not been +/// decoded into a typed record yet. +#[derive(Clone, Copy)] +pub struct Raw; + +impl StorageDecode for Envelope { + fn decode( + buf: &mut B, + kind: StorageCodecKind, + ) -> Result + where + Self: Sized, + { + match kind { + StorageCodecKind::FlexbuffersSerde => { + let envelope = v1::Envelope::decode(buf, kind)?; + Self::try_from(envelope).map_err(|err| StorageDecodeError::DecodeValue(err.into())) + } + StorageCodecKind::Custom => { + let header = Header::decode_length_delimited(&mut *buf) + .map_err(|err| StorageDecodeError::DecodeValue(err.into()))?; + + Ok(Self { + header, + payload: PolyBytes::Bytes(buf.copy_to_bytes(buf.remaining())), + _p: PhantomData, + }) + } + _ => { + panic!("unsupported encoding"); + } + } + } +} + +impl Envelope { + /// Converts Raw Envelope into a Typed envelope. Panics + /// if the record kind does not match the M::KIND + pub fn into_typed(self) -> Envelope { + assert_eq!(self.header.kind, M::KIND); + + let Self { + header, payload, .. + } = self; + + Envelope { + header, + payload, + _p: PhantomData, + } + } +} + +impl Envelope +where + M::Payload: Clone, +{ + /// return the envelope payload + pub fn split(self) -> Result<(Header, M::Payload), StorageDecodeError> { + let payload = match self.payload { + PolyBytes::Bytes(mut bytes) => { + M::Payload::decode(&mut bytes, self.header.codec.expect("has codec kind"))? + } + PolyBytes::Both(typed, _) | PolyBytes::Typed(typed) => { + let typed = typed.downcast_arc::().map_err(|_| { + StorageDecodeError::DecodeValue("Type mismatch. Original value in PolyBytes::Typed does not match requested type".into()) + })?; + + match Arc::try_unwrap(typed) { + Ok(payload) => payload, + Err(arc) => arc.as_ref().clone(), + } + } + }; + + Ok((self.header, payload)) + } + + pub fn into_inner(self) -> Result { + self.split().map(|v| v.1) + } +} + +impl Envelope { + pub fn into_raw(self) -> Envelope { + Envelope { + header: self.header, + payload: self.payload, + _p: PhantomData, + } + } +} + +impl From> for Envelope { + fn from(value: Envelope) -> Self { + value.into_raw() + } +} + +/// Enumerates the logical categories of WAL records that the partition +/// processor understands. +#[derive( + Debug, Clone, Copy, PartialEq, Eq, bilrost::Enumeration, strum::Display, strum::IntoStaticStr, +)] +pub enum RecordKind { + Unknown = 0, + + AnnounceLeader = 1, + /// A version barrier to fence off state machine changes that require a certain minimum + /// version of restate server. + /// *Since v1.4.0* + VersionBarrier = 2, + /// Updates the `PARTITION_DURABILITY` FSM variable to the given value. + /// See [`PartitionDurability`] for more details. + /// + /// *Since v1.4.2* + UpdatePartitionDurability = 3, + + // -- Partition processor commands + /// Manual patching of storage state + PatchState = 4, + /// Terminate an ongoing invocation + TerminateInvocation = 5, + /// Purge a completed invocation + PurgeInvocation = 6, + /// Purge a completed invocation journal + PurgeJournal = 7, + /// Start an invocation on this partition + Invoke = 8, + /// Truncate the message outbox up to, and including, the specified index. + TruncateOutbox = 9, + /// Proxy a service invocation through this partition processor, to reuse the deduplication id map. + ProxyThrough = 10, + /// Attach to an existing invocation + AttachInvocation = 11, + /// Resume an invocation + ResumeInvocation = 12, + /// Restart as new invocation from prefix + RestartAsNewInvocation = 13, + // -- Partition processor events for PP + /// Invoker is reporting effect(s) from an ongoing invocation. + InvokerEffect = 14, + /// Timer has fired + Timer = 15, + /// Schedule timer + ScheduleTimer = 16, + /// Another partition processor is reporting a response of an invocation we requested. + /// + /// KINDA DEPRECATED: When Journal Table V1 is removed, this command should be used only to reply to invocations. + /// Now it's abused for a bunch of other scenarios, like replying to get promise and get invocation output. + /// + /// For more details see `OnNotifyInvocationResponse`. + InvocationResponse = 17, + + // -- New PP <-> PP commands using Journal V2 + /// Notify Get invocation output + NotifyGetInvocationOutputResponse = 18, + /// Notify a signal. + NotifySignal = 19, + + /// UpsertSchema record type + UpsertSchema = 20, + + /// VQueues scheduler decisions record type. + VQSchedulerDecisions = 21, + + /// Upsert rule book + UpsertRuleBook = 22, +} + +/// Specifies the deduplication strategy that allows receivers to discard +/// duplicate WAL records safely. +#[derive(Debug, Clone, PartialEq, Eq, Default, bilrost::Oneof, bilrost::Message)] +pub enum Dedup { + #[default] + None, + /// Sequence number to deduplicate messages sent by the same partition or a successor + /// of a previous partition (a successor partition will inherit the leader epoch of its + /// predecessor). + #[bilrost(tag(1), message)] + SelfProposal { + #[bilrost(0)] + leader_epoch: LeaderEpoch, + #[bilrost(1)] + seq: u64, + }, + /// Sequence number to deduplicate messages from a foreign partition. + #[bilrost(tag(2), message)] + ForeignPartition { + #[bilrost(0)] + partition: PartitionId, + #[bilrost(1)] + seq: u64, + }, + /// Sequence number to deduplicate messages from an arbitrary string prefix. + #[bilrost(tag(3), message)] + Arbitrary { + // For backward compatibility with ProducerID::Other variant + // Drop in Restate v1.8 + #[bilrost(0)] + prefix: Option, + #[bilrost(1)] + producer_id: U128, + #[bilrost(2)] + seq: u64, + }, +} + +/// A partial type-erased envelope mainly used for writing records. +/// It carries the payload part with Keys. +pub struct PartialRecord { + kind: RecordKind, + keys: Keys, + payload: Arc, +} + +impl PartialRecord { + pub fn kind(&self) -> RecordKind { + self.kind + } + + pub fn keys(&self) -> &Keys { + &self.keys + } + + /// Builds an [`Envelope`] with keys from the [`PartialEnvelope`] + pub fn build(self, dedup: Dedup) -> BodyWithKeys> { + let inner = Envelope { + header: Header { + dedup, + kind: self.kind, + codec: Some(self.payload.default_codec()), + }, + payload: PolyBytes::Typed(self.payload), + _p: PhantomData, + }; + + BodyWithKeys::new(inner, self.keys) + } + + /// Extract the typed payload back from the [`PartialRecord`] + #[cfg(any(test, feature = "test-util"))] + pub fn unwrap(self) -> R::Payload { + assert_eq!(R::KIND, self.kind, "Record kind mismatch"); + let typed = self + .payload + .downcast_arc::() + .map_err(|_| ()) + .expect("record kind to match"); + + Arc::into_inner(typed).expect("sole owner of the payload") + } +} + +/// Marker trait implemented by strongly-typed representations of WAL record +/// payloads. +pub trait Record: sealed::Sealed + Sized { + const KIND: RecordKind; + type Payload: StorageEncode + StorageDecode + 'static; + + /// Create an envelope with `this` record kind + /// given the header, keys and payload + fn envelope(dedup: Dedup, payload: impl Into) -> Envelope { + Envelope::new(dedup, payload.into()) + } + + /// Creates a new test envelope. Shortcut for new(Source::Ingress, Dedup::None, payload) + #[cfg(any(test, feature = "test-util"))] + fn test_envelope(payload: impl Into) -> Envelope { + let record = Self::envelope(Dedup::None, payload); + record.into_raw() + } +} + +pub trait RecordWithKeys: Record { + fn partial(payload: impl Into) -> PartialRecord; + fn partial_arc(payload: impl Into>) -> PartialRecord; +} + +impl RecordWithKeys for T +where + T: Record, + T::Payload: HasRecordKeys, +{ + fn partial(payload: impl Into) -> PartialRecord { + let payload = payload.into(); + PartialRecord { + kind: T::KIND, + keys: payload.record_keys(), + payload: Arc::new(payload), + } + } + + fn partial_arc(payload: impl Into>) -> PartialRecord { + let payload = payload.into(); + PartialRecord { + kind: T::KIND, + keys: payload.record_keys(), + payload, + } + } +} + +#[cfg(test)] +mod test { + + use bytes::BytesMut; + + use restate_types::{ + GenerationalNodeId, logs::Keys, sharding::KeyRange, storage::StorageCodec, + }; + + use super::{Dedup, records}; + use crate::{ + control::AnnounceLeader, + v2::{Envelope, Raw, Record, RecordKind, RecordWithKeys}, + }; + + #[test] + fn envelope_encode_decode() { + let payload = AnnounceLeader { + leader_epoch: 11.into(), + node_id: GenerationalNodeId::new(1, 3), + partition_key_range: KeyRange::new(0, u64::MAX), + epoch_version: None, + current_config: None, + next_config: None, + }; + + let envelope = records::AnnounceLeader::envelope( + Dedup::SelfProposal { + leader_epoch: 10.into(), + seq: 120, + }, + payload.clone(), + ); + + let mut buf = BytesMut::new(); + StorageCodec::encode(&envelope, &mut buf).expect("to encode"); + + let envelope: Envelope = StorageCodec::decode(&mut buf).expect("to decode"); + + assert_eq!(envelope.kind(), RecordKind::AnnounceLeader); + let typed = envelope.into_typed::(); + + let (_, loaded_payload) = typed.split().expect("to decode"); + + assert_announce_leader_eq(&payload, &loaded_payload); + } + + #[test] + fn envelope_skip_encode() { + let payload = AnnounceLeader { + leader_epoch: 11.into(), + node_id: GenerationalNodeId::new(1, 3), + partition_key_range: KeyRange::new(0, u64::MAX), + epoch_version: None, + current_config: None, + next_config: None, + }; + + let envelope = records::AnnounceLeader::envelope( + Dedup::SelfProposal { + leader_epoch: 10.into(), + seq: 120, + }, + payload.clone(), + ); + + // assert_eq!(envelope.record_keys(), Keys::RangeInclusive(0..=u64::MAX)); + + let envelope = envelope.into_raw(); + + assert_eq!(envelope.kind(), RecordKind::AnnounceLeader); + let typed = envelope.into_typed::(); + + let (_, loaded_payload) = typed.split().expect("to decode"); + + assert_announce_leader_eq(&payload, &loaded_payload); + } + + #[test] + fn partial_envelope_with_keys() { + let payload = AnnounceLeader { + leader_epoch: 11.into(), + node_id: GenerationalNodeId::new(1, 3), + partition_key_range: KeyRange::new(0, u64::MAX), + epoch_version: None, + current_config: None, + next_config: None, + }; + + let envelope = records::AnnounceLeader::partial(payload.clone()); + + let keyed = envelope.build(Dedup::SelfProposal { + leader_epoch: 10.into(), + seq: 120, + }); + + assert_eq!( + keyed.keys(), + &Keys::RangeInclusive(payload.partition_key_range.into()) + ); + + let envelope = keyed.into_inner(); + assert_eq!(envelope.kind(), RecordKind::AnnounceLeader); + let envelope = envelope.into_typed::(); + + let (_, loaded_payload) = envelope.split().expect("to decode"); + + assert_announce_leader_eq(&payload, &loaded_payload); + } + + #[track_caller] + fn assert_announce_leader_eq(expected: &AnnounceLeader, actual: &AnnounceLeader) { + assert_eq!(expected.node_id, actual.node_id); + assert_eq!(expected.leader_epoch, actual.leader_epoch); + assert_eq!(expected.partition_key_range, actual.partition_key_range); + assert_eq!(expected.epoch_version, actual.epoch_version); + assert_eq!( + expected.current_config.is_some(), + actual.current_config.is_some(), + ); + assert_eq!(expected.next_config.is_some(), actual.next_config.is_some(),); + } +} diff --git a/crates/wal-protocol/src/v2/compatibility.rs b/crates/wal-protocol/src/v2/compatibility.rs new file mode 100644 index 0000000000..84cad85544 --- /dev/null +++ b/crates/wal-protocol/src/v2/compatibility.rs @@ -0,0 +1,193 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use bilrost::OwnedMessage; + +use restate_encoding::U128; +use restate_limiter::RuleBook; +use restate_storage_api::{ + deduplication_table::{DedupInformation, DedupSequenceNumber, EpochSequenceNumber, ProducerId}, + vqueue_table::scheduler::SchedulerDecisions, +}; +use restate_types::logs::Keys; +use restate_util_string::ReString; + +use super::{Raw, records}; +use crate::{ + v1, + v2::{ + self, Record, + records::{ProxyThroughPayload, TruncateOutboxPayload, UpsertRuleBookPayload}, + }, +}; + +impl TryFrom for v2::Envelope { + type Error = anyhow::Error; + + fn try_from(value: v1::Envelope) -> Result { + let v1::Destination::Processor { + dedup, + partition_key, + } = value.header.dest; + + let dedup = match dedup { + None => v2::Dedup::None, + Some(info) => match (info.producer_id, info.sequence_number) { + (ProducerId::Partition(id), DedupSequenceNumber::Sn(seq)) => { + v2::Dedup::ForeignPartition { partition: id, seq } + } + (ProducerId::Partition(_), _) => anyhow::bail!("invalid deduplication information"), + (ProducerId::Other(_), DedupSequenceNumber::Esn(sn)) => v2::Dedup::SelfProposal { + leader_epoch: sn.leader_epoch, + seq: sn.sequence_number, + }, + (ProducerId::Other(prefix), DedupSequenceNumber::Sn(seq)) => v2::Dedup::Arbitrary { + prefix: Some(ReString::from(&prefix[..])), + producer_id: 0.into(), + seq, + }, + (ProducerId::Producer(producer_id), DedupSequenceNumber::Sn(seq)) => { + v2::Dedup::Arbitrary { + prefix: None, + producer_id: U128::from(u128::from(producer_id)), + seq, + } + } + (ProducerId::Producer(_), _) => { + anyhow::bail!("invalid deduplication information") + } + }, + }; + + let envelope = match value.command { + v1::Command::AnnounceLeader(payload) => { + records::AnnounceLeader::envelope(dedup, *payload).into_raw() + } + v1::Command::AttachInvocation(payload) => { + records::AttachInvocation::envelope(dedup, payload).into_raw() + } + v1::Command::InvocationResponse(payload) => { + records::InvocationResponse::envelope(dedup, payload).into_raw() + } + v1::Command::Invoke(payload) => records::Invoke::envelope(dedup, payload).into_raw(), + v1::Command::InvokerEffect(payload) => { + records::InvokerEffect::envelope(dedup, payload).into_raw() + } + v1::Command::NotifyGetInvocationOutputResponse(payload) => { + records::NotifyGetInvocationOutputResponse::envelope(dedup, payload).into_raw() + } + v1::Command::NotifySignal(payload) => { + records::NotifySignal::envelope(dedup, payload).into_raw() + } + v1::Command::PatchState(payload) => { + records::PatchState::envelope(dedup, payload).into_raw() + } + v1::Command::ProxyThrough(payload) => records::ProxyThrough::envelope( + dedup, + ProxyThroughPayload { + invocation: payload.into(), + proxy_partition: Keys::Single(partition_key), + }, + ) + .into_raw(), + v1::Command::PurgeInvocation(payload) => { + records::PurgeInvocation::envelope(dedup, payload).into_raw() + } + v1::Command::PurgeJournal(payload) => { + records::PurgeJournal::envelope(dedup, payload).into_raw() + } + v1::Command::RestartAsNewInvocation(payload) => { + records::RestartAsNewInvocation::envelope(dedup, payload).into_raw() + } + v1::Command::ResumeInvocation(payload) => { + records::ResumeInvocation::envelope(dedup, payload).into_raw() + } + v1::Command::ScheduleTimer(payload) => { + records::ScheduleTimer::envelope(dedup, payload).into_raw() + } + v1::Command::TerminateInvocation(payload) => { + records::TerminateInvocation::envelope(dedup, payload).into_raw() + } + v1::Command::Timer(payload) => records::Timer::envelope(dedup, payload).into_raw(), + v1::Command::TruncateOutbox(payload) => records::TruncateOutbox::envelope( + dedup, + TruncateOutboxPayload { + index: payload, + // this actually should be a key-range but v1 unfortunately + // only hold the "start" of the range. + // will be fixed in v2 + partition_key_range: Keys::Single(partition_key), + }, + ) + .into_raw(), + v1::Command::UpdatePartitionDurability(payload) => { + records::UpdatePartitionDurability::envelope(dedup, payload).into_raw() + } + v1::Command::UpsertSchema(payload) => { + records::UpsertSchema::envelope(dedup, payload).into_raw() + } + v1::Command::VersionBarrier(payload) => { + records::VersionBarrier::envelope(dedup, payload).into_raw() + } + v1::Command::VQSchedulerDecisions(payload) => { + // bytes are bilrost encoded SchedulerDecision. + let payload = SchedulerDecisions::bilrost_decode(payload)?; + records::VQSchedulerDecisions::envelope(dedup, payload).into_raw() + } + v1::Command::UpsertRuleBook(payload) => { + let rule_book = RuleBook::decode(payload.rule_book)?; + records::UpsertRuleBook::envelope( + dedup, + UpsertRuleBookPayload { + partition_key_range: payload.partition_key_range, + rule_book: Arc::new(rule_book), + }, + ) + .into_raw() + } + }; + + Ok(envelope) + } +} + +impl From for Option { + fn from(value: v2::Dedup) -> Self { + match value { + v2::Dedup::None => None, + v2::Dedup::SelfProposal { leader_epoch, seq } => { + Some(DedupInformation::self_proposal(EpochSequenceNumber { + leader_epoch, + sequence_number: seq, + })) + } + v2::Dedup::ForeignPartition { partition, seq } => { + Some(DedupInformation::cross_partition(partition, seq)) + } + v2::Dedup::Arbitrary { + prefix: Some(prefix), + producer_id: _, + seq, + } => { + // TODO(azmy): remove prefix from dedup info + // and drop `DedupInformation::ingress()` + #[allow(deprecated)] + Some(DedupInformation::ingress(prefix.to_string(), seq)) + } + v2::Dedup::Arbitrary { + prefix: None, + producer_id, + seq, + } => Some(DedupInformation::producer(producer_id.into(), seq)), + } + } +} diff --git a/crates/wal-protocol/src/v2/records.rs b/crates/wal-protocol/src/v2/records.rs new file mode 100644 index 0000000000..65b0479361 --- /dev/null +++ b/crates/wal-protocol/src/v2/records.rs @@ -0,0 +1,475 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +use restate_encoding::Arced; +use restate_limiter::RuleBook; +use restate_storage_api::vqueue_table::scheduler::{self}; +use restate_types::{ + bilrost_storage_encode_decode, flexbuffers_storage_encode_decode, + identifiers::{WithInvocationId, WithPartitionKey}, + invocation, + logs::{HasRecordKeys, Keys}, + message::MessageIndex, + sharding::KeyRange, + state_mut, +}; + +use super::sealed::Sealed; +use super::{Record, RecordKind}; +use crate::timer::{self}; + +// Create type wrappers to implement storage encode/decode +// and HasRecordKeys +#[derive( + Debug, + Clone, + Eq, + PartialEq, + bilrost::Message, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct ExternalStateMutationPayload(state_mut::ExternalStateMutation); +bilrost_storage_encode_decode!(ExternalStateMutationPayload); + +impl HasRecordKeys for ExternalStateMutationPayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.0.service_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct InvocationTerminationPayload(invocation::InvocationTermination); +flexbuffers_storage_encode_decode!(InvocationTerminationPayload); + +impl HasRecordKeys for InvocationTerminationPayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct PurgeInvocationRequestPayload(invocation::PurgeInvocationRequest); +flexbuffers_storage_encode_decode!(PurgeInvocationRequestPayload); + +impl HasRecordKeys for PurgeInvocationRequestPayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct ServiceInvocationPayload(Box); + +flexbuffers_storage_encode_decode!(ServiceInvocationPayload); + +impl HasRecordKeys for ServiceInvocationPayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive(Debug, Clone, bilrost::Message)] +pub struct TruncateOutboxPayload { + #[bilrost(1)] + pub index: MessageIndex, + + #[bilrost(2)] + pub partition_key_range: Keys, +} + +impl HasRecordKeys for TruncateOutboxPayload { + fn record_keys(&self) -> Keys { + self.partition_key_range.clone() + } +} + +bilrost_storage_encode_decode!(TruncateOutboxPayload); + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct AttachInvocationRequestPayload(invocation::AttachInvocationRequest); + +flexbuffers_storage_encode_decode!(AttachInvocationRequestPayload); + +impl HasRecordKeys for AttachInvocationRequestPayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct ResumeInvocationRequestPayload(invocation::ResumeInvocationRequest); + +flexbuffers_storage_encode_decode!(ResumeInvocationRequestPayload); + +impl HasRecordKeys for ResumeInvocationRequestPayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.0.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct RestartAsNewInvocationRequestPayload(invocation::RestartAsNewInvocationRequest); + +flexbuffers_storage_encode_decode!(RestartAsNewInvocationRequestPayload); + +impl HasRecordKeys for RestartAsNewInvocationRequestPayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.0.invocation_id.partition_key()) + } +} + +#[derive( + Clone, Serialize, Deserialize, derive_more::Deref, derive_more::Into, derive_more::From, +)] +pub struct EffectPayload(Box); + +flexbuffers_storage_encode_decode!(EffectPayload); + +impl HasRecordKeys for EffectPayload { + fn record_keys(&self) -> restate_types::logs::Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct TimerKeyValuePayload(timer::TimerKeyValue); + +flexbuffers_storage_encode_decode!(TimerKeyValuePayload); + +impl HasRecordKeys for TimerKeyValuePayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id().partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct InvocationResponsePayload(invocation::InvocationResponse); + +flexbuffers_storage_encode_decode!(InvocationResponsePayload); + +impl HasRecordKeys for InvocationResponsePayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct GetInvocationOutputResponsePayload(invocation::GetInvocationOutputResponse); + +flexbuffers_storage_encode_decode!(GetInvocationOutputResponsePayload); + +impl HasRecordKeys for GetInvocationOutputResponsePayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.0.target.invocation_id().partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct NotifySignalRequestPayload(invocation::NotifySignalRequest); + +flexbuffers_storage_encode_decode!(NotifySignalRequestPayload); + +impl HasRecordKeys for NotifySignalRequestPayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct ProxyThroughPayload { + pub invocation: ServiceInvocationPayload, + + pub proxy_partition: Keys, +} + +flexbuffers_storage_encode_decode!(ProxyThroughPayload); + +impl HasRecordKeys for ProxyThroughPayload { + fn record_keys(&self) -> Keys { + self.proxy_partition.clone() + } +} + +#[derive(Clone, derive_more::Deref, derive_more::Into, derive_more::From, bilrost::Message)] +pub struct SchedulerDecisionsPayload(scheduler::SchedulerDecisions); + +bilrost_storage_encode_decode!(SchedulerDecisionsPayload); + +impl HasRecordKeys for SchedulerDecisionsPayload { + fn record_keys(&self) -> Keys { + // All records in a decision are for a single partition key + if self.0.qids.is_empty() { + Keys::None + } else { + Keys::Single(self.0.qids[0].0.partition_key()) + } + } +} + +#[derive(Debug, Clone, bilrost::Message)] +pub struct UpsertRuleBookPayload { + #[bilrost(tag(1))] + pub partition_key_range: KeyRange, + #[bilrost(tag(2), encoding(Arced))] + pub rule_book: Arc, +} + +bilrost_storage_encode_decode!(UpsertRuleBookPayload); + +impl HasRecordKeys for UpsertRuleBookPayload { + fn record_keys(&self) -> Keys { + Keys::RangeInclusive(self.partition_key_range.into()) + } +} + +// end types + +// define record types + +macro_rules! record { + {@name=$name:ident, @kind=$type:expr, @payload=$payload:path} => { + #[allow(dead_code)] + #[derive(Clone, Copy)] + pub struct $name; + impl Sealed for $name{} + impl Record for $name { + const KIND: RecordKind = $type; + type Payload = $payload; + } + }; +} + +record! { + @name=AnnounceLeader, + @kind=RecordKind::AnnounceLeader, + @payload=crate::control::AnnounceLeader +} + +record! { + @name=VersionBarrier, + @kind=RecordKind::VersionBarrier, + @payload=crate::control::VersionBarrier +} + +record! { + @name=UpdatePartitionDurability, + @kind=RecordKind::UpdatePartitionDurability, + @payload=crate::control::PartitionDurability +} + +record! { + @name=PatchState, + @kind=RecordKind::PatchState, + @payload=ExternalStateMutationPayload +} + +record! { + @name=TerminateInvocation, + @kind=RecordKind::TerminateInvocation, + @payload=InvocationTerminationPayload +} + +record! { + @name=PurgeInvocation, + @kind=RecordKind::PurgeInvocation, + @payload=PurgeInvocationRequestPayload +} + +record! { + @name=PurgeJournal, + @kind=RecordKind::PurgeJournal, + @payload=PurgeInvocationRequestPayload +} + +record! { + @name=Invoke, + @kind=RecordKind::Invoke, + @payload=ServiceInvocationPayload +} + +record! { + @name=TruncateOutbox, + @kind=RecordKind::TruncateOutbox, + @payload=TruncateOutboxPayload +} + +record! { + @name=ProxyThrough, + @kind=RecordKind::ProxyThrough, + @payload=ProxyThroughPayload +} + +record! { + @name=AttachInvocation, + @kind=RecordKind::AttachInvocation, + @payload=AttachInvocationRequestPayload +} + +record! { + @name=ResumeInvocation, + @kind=RecordKind::ResumeInvocation, + @payload=ResumeInvocationRequestPayload +} + +record! { + @name=RestartAsNewInvocation, + @kind=RecordKind::RestartAsNewInvocation, + @payload=RestartAsNewInvocationRequestPayload +} + +record! { + @name=InvokerEffect, + @kind=RecordKind::InvokerEffect, + @payload=EffectPayload +} + +record! { + @name=Timer, + @kind=RecordKind::Timer, + @payload=TimerKeyValuePayload +} + +record! { + @name=ScheduleTimer, + @kind=RecordKind::ScheduleTimer, + @payload=TimerKeyValuePayload +} + +record! { + @name=InvocationResponse, + @kind=RecordKind::InvocationResponse, + @payload=InvocationResponsePayload +} + +record! { + @name=NotifyGetInvocationOutputResponse, + @kind=RecordKind::NotifyGetInvocationOutputResponse, + @payload=GetInvocationOutputResponsePayload +} + +record! { + @name=NotifySignal, + @kind=RecordKind::NotifySignal, + @payload=NotifySignalRequestPayload +} + +record! { + @name=UpsertSchema, + @kind=RecordKind::UpsertSchema, + @payload=crate::control::UpsertSchema +} + +record! { + @name=VQSchedulerDecisions, + @kind=RecordKind::VQSchedulerDecisions, + @payload=SchedulerDecisionsPayload +} + +record! { + @name=UpsertRuleBook, + @kind=RecordKind::UpsertRuleBook, + @payload=UpsertRuleBookPayload +} diff --git a/crates/worker/src/partition/leadership/durability_tracker.rs b/crates/worker/src/partition/leadership/durability_tracker.rs index 34b3427c45..9554090981 100644 --- a/crates/worker/src/partition/leadership/durability_tracker.rs +++ b/crates/worker/src/partition/leadership/durability_tracker.rs @@ -9,22 +9,23 @@ // by the Apache License, Version 2.0. use std::pin::Pin; +use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::task::Poll; use std::time::Duration; use futures::{Stream, StreamExt}; -use restate_clock::WallClock; use tokio::sync::watch; use tokio::time::{Instant, MissedTickBehavior}; use tokio_stream::wrappers::{IntervalStream, WatchStream}; use tracing::{debug, warn}; +use restate_clock::WallClock; use restate_core::Metadata; use restate_types::config::{Configuration, DurabilityMode}; -use restate_types::identifiers::PartitionId; -use restate_types::logs::{Lsn, SequenceNumber}; +use restate_types::logs::{Keys, Lsn, SequenceNumber}; use restate_types::nodes_config::Role; +use restate_types::partitions::Partition; use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::control::PartitionDurability; @@ -40,7 +41,7 @@ static LAST_SNAPSHOT_WARNING_AT: AtomicU64 = AtomicU64::new(0); /// changed in the replica-set, but it'll react immediately to changes in the archived Lsn /// watch. pub struct DurabilityTracker { - partition_id: PartitionId, + partition: Arc, last_reported_durable_lsn: Lsn, replica_set_states: PartitionReplicaSetStates, archived_lsn_watch: WatchStream>, @@ -52,7 +53,7 @@ pub struct DurabilityTracker { impl DurabilityTracker { pub fn new( - partition_id: PartitionId, + partition: Arc, last_reported_durable_lsn: Option, replica_set_states: PartitionReplicaSetStates, archived_lsn_watch: watch::Receiver>, @@ -64,7 +65,7 @@ impl DurabilityTracker { let check_timer = IntervalStream::new(check_timer); Self { - partition_id, + partition, last_reported_durable_lsn: last_reported_durable_lsn.unwrap_or(Lsn::INVALID), replica_set_states, archived_lsn_watch: WatchStream::new(archived_lsn_watch), @@ -183,11 +184,11 @@ impl Stream for DurabilityTracker { } DurabilityMode::ReplicaSetOnly => self .replica_set_states - .get_min_durable_lsn(self.partition_id), + .get_min_durable_lsn(self.partition.partition_id), DurabilityMode::SnapshotAndReplicaSet => { let min_durable_lsn = self .replica_set_states - .get_min_durable_lsn(self.partition_id); + .get_min_durable_lsn(self.partition.partition_id); self.last_archived.min(min_durable_lsn) } // disabled until ad-hoc snapshot sharing is supported @@ -201,7 +202,7 @@ impl Stream for DurabilityTracker { DurabilityMode::Balanced => { let max_durable_lsn = self .replica_set_states - .get_max_durable_lsn(self.partition_id); + .get_max_durable_lsn(self.partition.partition_id); self.last_archived.min(max_durable_lsn) } }; @@ -212,15 +213,16 @@ impl Stream for DurabilityTracker { } let partition_durability = PartitionDurability { - partition_id: self.partition_id, + partition_id: self.partition.partition_id, durable_point: suggested, modification_time: MillisSinceEpoch::now(), + partition_key_range: Keys::RangeInclusive(self.partition.key_range.into()), }; // We don't want to keep reporting the same durable Lsn over and over. self.last_reported_durable_lsn = suggested; debug!( - partition_id = %self.partition_id, + partition_id = %self.partition.partition_id, durability_mode = %durability_mode, "Reporting {suggested:?} as a durable point for partition" ); diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index fe9821dbbf..66ded4401e 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -582,7 +582,7 @@ where .map(|d| d.durable_point); let durability_tracker = DurabilityTracker::new( - self.partition.partition_id, + self.partition.clone(), last_reported_durable_lsn, replica_set_states, partition_store.partition_db().watch_archived_lsn(),