diff --git a/crates/wal-protocol/src/control.rs b/crates/wal-protocol/src/control.rs index 537a0c9f38..0fcc794268 100644 --- a/crates/wal-protocol/src/control.rs +++ b/crates/wal-protocol/src/control.rs @@ -26,7 +26,7 @@ use restate_types::{ /// Announcing a new leader. This message can be written by any component to make the specified /// partition processor the leader. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, bilrost::Message)] -pub struct AnnounceLeader { +pub struct AnnounceLeaderCommand { /// Sender of the announce leader message. /// /// This became non-optional in v1.5. Noting that it has always been set in previous versions, @@ -60,9 +60,9 @@ pub struct AnnounceLeader { pub next_config: Option, } -bilrost_storage_encode_decode!(AnnounceLeader); +bilrost_storage_encode_decode!(AnnounceLeaderCommand); -impl HasRecordKeys for AnnounceLeader { +impl HasRecordKeys for AnnounceLeaderCommand { fn record_keys(&self) -> Keys { Keys::RangeInclusive(self.partition_key_range.start()..=self.partition_key_range.end()) } @@ -161,7 +161,7 @@ fn new_replica_set_state(version: Version, node_set: &NodeSet) -> ReplicaSetStat /// minimum version of restate server that can progress after this command. It also updates the FSM /// in case command has been trimmed. #[derive(Debug, Clone, bilrost::Message, serde::Serialize, serde::Deserialize)] -pub struct VersionBarrier { +pub struct VersionBarrierCommand { /// The minimum version required (inclusive) to progress after this barrier. pub version: SemanticRestateVersion, /// A human-readable reason for why this barrier exists. @@ -169,9 +169,9 @@ pub struct VersionBarrier { pub partition_key_range: Keys, } -bilrost_storage_encode_decode!(VersionBarrier); +bilrost_storage_encode_decode!(VersionBarrierCommand); -impl HasRecordKeys for VersionBarrier { +impl HasRecordKeys for VersionBarrierCommand { fn record_keys(&self) -> Keys { self.partition_key_range.clone() } @@ -185,7 +185,7 @@ impl HasRecordKeys for VersionBarrier { /// /// Since v1.4.2. #[derive(Debug, Clone, bilrost::Message, serde::Serialize, serde::Deserialize)] -pub struct PartitionDurability { +pub struct UpdatePartitionDurabilityCommand { #[bilrost(tag(1))] pub partition_id: PartitionId, /// The partition has applied this LSN durably to the replica-set and/or has been @@ -201,9 +201,9 @@ pub struct PartitionDurability { pub partition_key_range: Keys, } -bilrost_storage_encode_decode!(PartitionDurability); +bilrost_storage_encode_decode!(UpdatePartitionDurabilityCommand); -impl HasRecordKeys for PartitionDurability { +impl HasRecordKeys for UpdatePartitionDurabilityCommand { fn record_keys(&self) -> Keys { self.partition_key_range.clone() } @@ -213,14 +213,14 @@ impl HasRecordKeys for PartitionDurability { /// /// Since v1.6.0. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct UpsertSchema { +pub struct UpsertSchemaCommand { pub partition_key_range: Keys, pub schema: Schema, } -flexbuffers_storage_encode_decode!(UpsertSchema); +flexbuffers_storage_encode_decode!(UpsertSchemaCommand); -impl HasRecordKeys for UpsertSchema { +impl HasRecordKeys for UpsertSchemaCommand { fn record_keys(&self) -> Keys { self.partition_key_range.clone() } @@ -242,7 +242,7 @@ impl HasRecordKeys for UpsertSchema { /// /// Since v1.7.0. #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct UpsertRuleBook { +pub struct UpsertRuleBookCommand { pub partition_key_range: KeyRange, pub rule_book: Bytes, } diff --git a/crates/wal-protocol/src/v1.rs b/crates/wal-protocol/src/v1.rs index e1c634cff9..cca0069dad 100644 --- a/crates/wal-protocol/src/v1.rs +++ b/crates/wal-protocol/src/v1.rs @@ -22,7 +22,8 @@ use restate_types::message::MessageIndex; use restate_types::state_mut::ExternalStateMutation; use crate::control::{ - AnnounceLeader, PartitionDurability, UpsertRuleBook, UpsertSchema, VersionBarrier, + AnnounceLeaderCommand, UpdatePartitionDurabilityCommand, UpsertRuleBookCommand, + UpsertSchemaCommand, VersionBarrierCommand, }; use crate::timer::TimerKeyValue; @@ -135,13 +136,13 @@ pub enum Command { /// See [`PartitionDurability`] for more details. /// /// *Since v1.4.2* - UpdatePartitionDurability(PartitionDurability), + UpdatePartitionDurability(UpdatePartitionDurabilityCommand), /// A version barrier to fence off state machine changes that require a certain minimum /// version of restate server. /// *Since v1.4.0* - VersionBarrier(VersionBarrier), + VersionBarrier(VersionBarrierCommand), // -- Control-plane related events - AnnounceLeader(Box), + AnnounceLeader(Box), // -- Partition processor commands /// Manual patching of storage state @@ -188,12 +189,12 @@ pub enum Command { /// Upsert schema for consistent schema across replicas /// *Since v1.6.0 - UpsertSchema(UpsertSchema), + UpsertSchema(UpsertSchemaCommand), /// Upsert the cluster-global rule book for consistent rules across /// replicas; the apply path persists it to the partition store and /// notifies the leader's `UserLimiter` of the diff. /// *Since v1.7.0 - UpsertRuleBook(UpsertRuleBook), + UpsertRuleBook(UpsertRuleBookCommand), // # Commands for VQueues management // ---------------------------------- /// A command to attempt a run an entry in the vqueue (invocation, or otherwise) diff --git a/crates/wal-protocol/src/v2.rs b/crates/wal-protocol/src/v2.rs index 5e6d1df785..1900683192 100644 --- a/crates/wal-protocol/src/v2.rs +++ b/crates/wal-protocol/src/v2.rs @@ -26,8 +26,8 @@ use restate_util_string::ReString; use crate::v1; +pub mod commands; mod compatibility; -pub mod records; mod sealed { pub trait Sealed {} @@ -41,7 +41,7 @@ pub struct Header { dedup: Dedup, /// Payload record kind #[bilrost(2)] - kind: RecordKind, + kind: CommandKind, /// Payload codec #[bilrost(3)] codec: Option, @@ -52,7 +52,7 @@ impl Header { &self.dedup } - pub fn kind(&self) -> RecordKind { + pub fn kind(&self) -> CommandKind { self.kind } } @@ -67,19 +67,19 @@ pub struct Envelope { _p: PhantomData, } -impl Envelope { +impl Envelope { pub fn header(&self) -> &Header { &self.header } } -impl Envelope { - pub fn new(dedup: Dedup, payload: impl Into>) -> Self { +impl Envelope { + pub fn new(dedup: Dedup, payload: impl Into>) -> Self { let payload = payload.into(); Self { header: Header { dedup, - kind: R::KIND, + kind: C::KIND, codec: Some(payload.default_codec()), }, payload: PolyBytes::Typed(payload), @@ -88,7 +88,7 @@ impl Envelope { } } -impl StorageEncode for Envelope { +impl StorageEncode for Envelope { fn default_codec(&self) -> StorageCodecKind { StorageCodecKind::Custom } @@ -150,7 +150,7 @@ impl StorageDecode for Envelope { impl Envelope { /// Converts Raw Envelope into a Typed envelope. Panics /// if the record kind does not match the M::KIND - pub fn into_typed(self) -> Envelope { + pub fn into_typed(self) -> Envelope { assert_eq!(self.header.kind, M::KIND); let Self { @@ -165,18 +165,18 @@ impl Envelope { } } -impl Envelope +impl Envelope where - M::Payload: Clone, + C: Clone, { /// return the envelope payload - pub fn split(self) -> Result<(Header, M::Payload), StorageDecodeError> { + pub fn split(self) -> Result<(Header, C), StorageDecodeError> { let payload = match self.payload { PolyBytes::Bytes(mut bytes) => { - M::Payload::decode(&mut bytes, self.header.codec.expect("has codec kind"))? + C::decode(&mut bytes, self.header.codec.expect("has codec kind"))? } PolyBytes::Both(typed, _) | PolyBytes::Typed(typed) => { - let typed = typed.downcast_arc::().map_err(|_| { + let typed = typed.downcast_arc::().map_err(|_| { StorageDecodeError::DecodeValue("Type mismatch. Original value in PolyBytes::Typed does not match requested type".into()) })?; @@ -190,12 +190,12 @@ where Ok((self.header, payload)) } - pub fn into_inner(self) -> Result { + pub fn into_inner(self) -> Result { self.split().map(|v| v.1) } } -impl Envelope { +impl Envelope { pub fn into_raw(self) -> Envelope { Envelope { header: self.header, @@ -205,8 +205,8 @@ impl Envelope { } } -impl From> for Envelope { - fn from(value: Envelope) -> Self { +impl From> for Envelope { + fn from(value: Envelope) -> Self { value.into_raw() } } @@ -216,7 +216,7 @@ impl From> for Envelope { #[derive( Debug, Clone, Copy, PartialEq, Eq, bilrost::Enumeration, strum::Display, strum::IntoStaticStr, )] -pub enum RecordKind { +pub enum CommandKind { Unknown = 0, AnnounceLeader = 1, @@ -322,14 +322,14 @@ pub enum Dedup { /// A partial type-erased envelope mainly used for writing records. /// It carries the payload part with Keys. -pub struct PartialRecord { - kind: RecordKind, +pub struct PartialEnvelope { + kind: CommandKind, keys: Keys, payload: Arc, } -impl PartialRecord { - pub fn kind(&self) -> RecordKind { +impl PartialEnvelope { + pub fn kind(&self) -> CommandKind { self.kind } @@ -351,66 +351,35 @@ impl PartialRecord { BodyWithKeys::new(inner, self.keys) } - - /// Extract the typed payload back from the [`PartialRecord`] - #[cfg(any(test, feature = "test-util"))] - pub fn unwrap(self) -> R::Payload { - assert_eq!(R::KIND, self.kind, "Record kind mismatch"); - let typed = self - .payload - .downcast_arc::() - .map_err(|_| ()) - .expect("record kind to match"); - - Arc::into_inner(typed).expect("sole owner of the payload") - } } /// Marker trait implemented by strongly-typed representations of WAL record /// payloads. -pub trait Record: sealed::Sealed + Sized { - const KIND: RecordKind; - type Payload: StorageEncode + StorageDecode + 'static; - - /// Create an envelope with `this` record kind - /// given the header, keys and payload - fn envelope(dedup: Dedup, payload: impl Into) -> Envelope { - Envelope::new(dedup, payload.into()) +pub trait Command: sealed::Sealed + StorageEncode + StorageDecode + Sized { + const KIND: CommandKind; + + fn envelope(self, dedup: Dedup) -> Envelope { + Envelope::new(dedup, self) } /// Creates a new test envelope. Shortcut for new(Source::Ingress, Dedup::None, payload) #[cfg(any(test, feature = "test-util"))] - fn test_envelope(payload: impl Into) -> Envelope { - let record = Self::envelope(Dedup::None, payload); + fn test_envelope(payload: impl Into) -> Envelope { + let record = Envelope::new(Dedup::None, payload.into()); record.into_raw() } } -pub trait RecordWithKeys: Record { - fn partial(payload: impl Into) -> PartialRecord; - fn partial_arc(payload: impl Into>) -> PartialRecord; -} - -impl RecordWithKeys for T +impl From for PartialEnvelope where - T: Record, - T::Payload: HasRecordKeys, + C: Command + HasRecordKeys, { - fn partial(payload: impl Into) -> PartialRecord { - let payload = payload.into(); - PartialRecord { - kind: T::KIND, - keys: payload.record_keys(), - payload: Arc::new(payload), - } - } - - fn partial_arc(payload: impl Into>) -> PartialRecord { - let payload = payload.into(); - PartialRecord { - kind: T::KIND, - keys: payload.record_keys(), - payload, + fn from(command: C) -> PartialEnvelope { + // let payload = payload.into(); + PartialEnvelope { + kind: C::KIND, + keys: command.record_keys(), + payload: Arc::new(command), } } } @@ -424,15 +393,15 @@ mod test { GenerationalNodeId, logs::Keys, sharding::KeyRange, storage::StorageCodec, }; - use super::{Dedup, records}; + use super::Dedup; use crate::{ - control::AnnounceLeader, - v2::{Envelope, Raw, Record, RecordKind, RecordWithKeys}, + control::AnnounceLeaderCommand, + v2::{CommandKind, Envelope, PartialEnvelope, Raw}, }; #[test] fn envelope_encode_decode() { - let payload = AnnounceLeader { + let payload = AnnounceLeaderCommand { leader_epoch: 11.into(), node_id: GenerationalNodeId::new(1, 3), partition_key_range: KeyRange::new(0, u64::MAX), @@ -441,7 +410,7 @@ mod test { next_config: None, }; - let envelope = records::AnnounceLeader::envelope( + let envelope = Envelope::new( Dedup::SelfProposal { leader_epoch: 10.into(), seq: 120, @@ -454,8 +423,8 @@ mod test { let envelope: Envelope = StorageCodec::decode(&mut buf).expect("to decode"); - assert_eq!(envelope.kind(), RecordKind::AnnounceLeader); - let typed = envelope.into_typed::(); + assert_eq!(envelope.kind(), CommandKind::AnnounceLeader); + let typed = envelope.into_typed::(); let (_, loaded_payload) = typed.split().expect("to decode"); @@ -464,7 +433,7 @@ mod test { #[test] fn envelope_skip_encode() { - let payload = AnnounceLeader { + let payload = AnnounceLeaderCommand { leader_epoch: 11.into(), node_id: GenerationalNodeId::new(1, 3), partition_key_range: KeyRange::new(0, u64::MAX), @@ -473,7 +442,7 @@ mod test { next_config: None, }; - let envelope = records::AnnounceLeader::envelope( + let envelope = Envelope::new( Dedup::SelfProposal { leader_epoch: 10.into(), seq: 120, @@ -485,8 +454,8 @@ mod test { let envelope = envelope.into_raw(); - assert_eq!(envelope.kind(), RecordKind::AnnounceLeader); - let typed = envelope.into_typed::(); + assert_eq!(envelope.kind(), CommandKind::AnnounceLeader); + let typed = envelope.into_typed::(); let (_, loaded_payload) = typed.split().expect("to decode"); @@ -495,7 +464,7 @@ mod test { #[test] fn partial_envelope_with_keys() { - let payload = AnnounceLeader { + let payload = AnnounceLeaderCommand { leader_epoch: 11.into(), node_id: GenerationalNodeId::new(1, 3), partition_key_range: KeyRange::new(0, u64::MAX), @@ -504,7 +473,7 @@ mod test { next_config: None, }; - let envelope = records::AnnounceLeader::partial(payload.clone()); + let envelope: PartialEnvelope = payload.clone().into(); let keyed = envelope.build(Dedup::SelfProposal { leader_epoch: 10.into(), @@ -517,8 +486,8 @@ mod test { ); let envelope = keyed.into_inner(); - assert_eq!(envelope.kind(), RecordKind::AnnounceLeader); - let envelope = envelope.into_typed::(); + assert_eq!(envelope.kind(), CommandKind::AnnounceLeader); + let envelope = envelope.into_typed::(); let (_, loaded_payload) = envelope.split().expect("to decode"); @@ -526,7 +495,7 @@ mod test { } #[track_caller] - fn assert_announce_leader_eq(expected: &AnnounceLeader, actual: &AnnounceLeader) { + fn assert_announce_leader_eq(expected: &AnnounceLeaderCommand, actual: &AnnounceLeaderCommand) { assert_eq!(expected.node_id, actual.node_id); assert_eq!(expected.leader_epoch, actual.leader_epoch); assert_eq!(expected.partition_key_range, actual.partition_key_range); diff --git a/crates/wal-protocol/src/v2/commands.rs b/crates/wal-protocol/src/v2/commands.rs new file mode 100644 index 0000000000..1db7a46f61 --- /dev/null +++ b/crates/wal-protocol/src/v2/commands.rs @@ -0,0 +1,493 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +use restate_encoding::Arced; +use restate_limiter::RuleBook; +use restate_storage_api::vqueue_table::scheduler::{self}; +use restate_types::{ + bilrost_storage_encode_decode, flexbuffers_storage_encode_decode, + identifiers::{WithInvocationId, WithPartitionKey}, + invocation, + logs::{HasRecordKeys, Keys}, + message::MessageIndex, + sharding::KeyRange, + state_mut, +}; + +use super::sealed::Sealed; +use super::{Command, CommandKind}; +use crate::timer::{self}; + +pub use crate::control::{ + AnnounceLeaderCommand, UpdatePartitionDurabilityCommand, UpsertSchemaCommand, + VersionBarrierCommand, +}; + +// Create type wrappers to implement storage encode/decode +// and HasRecordKeys +#[derive( + Debug, + Clone, + Eq, + PartialEq, + bilrost::Message, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct PatchStateCommand(state_mut::ExternalStateMutation); +bilrost_storage_encode_decode!(PatchStateCommand); + +impl HasRecordKeys for PatchStateCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.0.service_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct TerminateInvocationCommand(invocation::InvocationTermination); +flexbuffers_storage_encode_decode!(TerminateInvocationCommand); + +impl HasRecordKeys for TerminateInvocationCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct PurgeInvocationCommand(invocation::PurgeInvocationRequest); +flexbuffers_storage_encode_decode!(PurgeInvocationCommand); + +impl HasRecordKeys for PurgeInvocationCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct PurgeJournalCommand(invocation::PurgeInvocationRequest); +flexbuffers_storage_encode_decode!(PurgeJournalCommand); + +impl HasRecordKeys for PurgeJournalCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct InvokeCommand(invocation::ServiceInvocation); + +flexbuffers_storage_encode_decode!(InvokeCommand); + +impl HasRecordKeys for InvokeCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive(Debug, Clone, bilrost::Message)] +pub struct TruncateOutboxCommand { + #[bilrost(1)] + pub index: MessageIndex, + + #[bilrost(2)] + pub partition_key_range: Keys, +} + +impl HasRecordKeys for TruncateOutboxCommand { + fn record_keys(&self) -> Keys { + self.partition_key_range.clone() + } +} + +bilrost_storage_encode_decode!(TruncateOutboxCommand); + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct AttachInvocationCommand(invocation::AttachInvocationRequest); + +flexbuffers_storage_encode_decode!(AttachInvocationCommand); + +impl HasRecordKeys for AttachInvocationCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct ResumeInvocationCommand(invocation::ResumeInvocationRequest); + +flexbuffers_storage_encode_decode!(ResumeInvocationCommand); + +impl HasRecordKeys for ResumeInvocationCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.0.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct RestartAsNewInvocationCommand(invocation::RestartAsNewInvocationRequest); + +flexbuffers_storage_encode_decode!(RestartAsNewInvocationCommand); + +impl HasRecordKeys for RestartAsNewInvocationCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.0.invocation_id.partition_key()) + } +} + +#[derive( + Clone, Serialize, Deserialize, derive_more::Deref, derive_more::Into, derive_more::From, +)] +pub struct InvokerEffectCommand(restate_worker_api::invoker::Effect); + +flexbuffers_storage_encode_decode!(InvokerEffectCommand); + +impl HasRecordKeys for InvokerEffectCommand { + fn record_keys(&self) -> restate_types::logs::Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct TimerCommand(timer::TimerKeyValue); + +flexbuffers_storage_encode_decode!(TimerCommand); + +impl HasRecordKeys for TimerCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id().partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct ScheduleTimerCommand(timer::TimerKeyValue); + +flexbuffers_storage_encode_decode!(ScheduleTimerCommand); + +impl HasRecordKeys for ScheduleTimerCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id().partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct InvocationResponseCommand(invocation::InvocationResponse); + +flexbuffers_storage_encode_decode!(InvocationResponseCommand); + +impl HasRecordKeys for InvocationResponseCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct NotifyGetInvocationOutputResponseCommand(invocation::GetInvocationOutputResponse); + +flexbuffers_storage_encode_decode!(NotifyGetInvocationOutputResponseCommand); + +impl HasRecordKeys for NotifyGetInvocationOutputResponseCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.0.target.invocation_id().partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct NotifySignalCommand(invocation::NotifySignalRequest); + +flexbuffers_storage_encode_decode!(NotifySignalCommand); + +impl HasRecordKeys for NotifySignalCommand { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct ProxyThroughCommand { + pub invocation: InvokeCommand, + + pub proxy_partition: Keys, +} + +flexbuffers_storage_encode_decode!(ProxyThroughCommand); + +impl HasRecordKeys for ProxyThroughCommand { + fn record_keys(&self) -> Keys { + self.proxy_partition.clone() + } +} + +#[derive(Clone, derive_more::Deref, derive_more::Into, derive_more::From, bilrost::Message)] +pub struct VQSchedulerDecisionsCommand(scheduler::SchedulerDecisions); + +bilrost_storage_encode_decode!(VQSchedulerDecisionsCommand); + +impl HasRecordKeys for VQSchedulerDecisionsCommand { + fn record_keys(&self) -> Keys { + // All records in a decision are for a single partition key + if self.0.qids.is_empty() { + Keys::None + } else { + Keys::Single(self.0.qids[0].0.partition_key()) + } + } +} + +#[derive(Debug, Clone, bilrost::Message)] +pub struct UpsertRuleBookCommand { + #[bilrost(tag(1))] + pub partition_key_range: KeyRange, + #[bilrost(tag(2), encoding(Arced))] + pub rule_book: Arc, +} + +bilrost_storage_encode_decode!(UpsertRuleBookCommand); + +impl HasRecordKeys for UpsertRuleBookCommand { + fn record_keys(&self) -> Keys { + Keys::RangeInclusive(self.partition_key_range.into()) + } +} + +// end types + +// define record types + +macro_rules! command { + {@kind=$type:expr, @command=$command:path} => { + impl Sealed for $command{} + impl Command for $command { + const KIND: CommandKind = $type; + } + }; +} + +command! { + @kind=CommandKind::AnnounceLeader, + @command=AnnounceLeaderCommand +} + +command! { + @kind=CommandKind::VersionBarrier, + @command=VersionBarrierCommand +} + +command! { + @kind=CommandKind::UpdatePartitionDurability, + @command=UpdatePartitionDurabilityCommand +} + +command! { + @kind=CommandKind::PatchState, + @command=PatchStateCommand +} + +command! { + @kind=CommandKind::TerminateInvocation, + @command=TerminateInvocationCommand +} + +command! { + @kind=CommandKind::PurgeInvocation, + @command=PurgeInvocationCommand +} + +command! { + @kind=CommandKind::PurgeJournal, + @command=PurgeJournalCommand +} + +command! { + @kind=CommandKind::Invoke, + @command=InvokeCommand +} + +command! { + @kind=CommandKind::TruncateOutbox, + @command=TruncateOutboxCommand +} + +command! { + @kind=CommandKind::ProxyThrough, + @command=ProxyThroughCommand +} + +command! { + @kind=CommandKind::AttachInvocation, + @command=AttachInvocationCommand +} + +command! { + @kind=CommandKind::ResumeInvocation, + @command=ResumeInvocationCommand +} + +command! { + @kind=CommandKind::RestartAsNewInvocation, + @command=RestartAsNewInvocationCommand +} + +command! { + @kind=CommandKind::InvokerEffect, + @command=InvokerEffectCommand +} + +command! { + @kind=CommandKind::Timer, + @command=TimerCommand +} + +command! { + @kind=CommandKind::ScheduleTimer, + @command=ScheduleTimerCommand +} + +command! { + @kind=CommandKind::InvocationResponse, + @command=InvocationResponseCommand +} + +command! { + @kind=CommandKind::NotifyGetInvocationOutputResponse, + @command=NotifyGetInvocationOutputResponseCommand +} + +command! { + @kind=CommandKind::NotifySignal, + @command=NotifySignalCommand +} + +command! { + @kind=CommandKind::UpsertSchema, + @command=UpsertSchemaCommand +} + +command! { + @kind=CommandKind::VQSchedulerDecisions, + @command=VQSchedulerDecisionsCommand +} + +command! { + @kind=CommandKind::UpsertRuleBook, + @command=UpsertRuleBookCommand +} diff --git a/crates/wal-protocol/src/v2/compatibility.rs b/crates/wal-protocol/src/v2/compatibility.rs index 84cad85544..bf36cb880a 100644 --- a/crates/wal-protocol/src/v2/compatibility.rs +++ b/crates/wal-protocol/src/v2/compatibility.rs @@ -21,12 +21,12 @@ use restate_storage_api::{ use restate_types::logs::Keys; use restate_util_string::ReString; -use super::{Raw, records}; +use super::{Raw, commands}; use crate::{ v1, v2::{ - self, Record, - records::{ProxyThroughPayload, TruncateOutboxPayload, UpsertRuleBookPayload}, + self, Envelope, + commands::{TruncateOutboxCommand, UpsertRuleBookCommand}, }, }; @@ -69,58 +69,64 @@ impl TryFrom for v2::Envelope { }; let envelope = match value.command { - v1::Command::AnnounceLeader(payload) => { - records::AnnounceLeader::envelope(dedup, *payload).into_raw() - } + v1::Command::AnnounceLeader(payload) => Envelope::new(dedup, *payload).into_raw(), v1::Command::AttachInvocation(payload) => { - records::AttachInvocation::envelope(dedup, payload).into_raw() + Envelope::new(dedup, commands::AttachInvocationCommand::from(payload)).into_raw() } v1::Command::InvocationResponse(payload) => { - records::InvocationResponse::envelope(dedup, payload).into_raw() + Envelope::new(dedup, commands::InvocationResponseCommand::from(payload)).into_raw() } - v1::Command::Invoke(payload) => records::Invoke::envelope(dedup, payload).into_raw(), - v1::Command::InvokerEffect(payload) => { - records::InvokerEffect::envelope(dedup, payload).into_raw() + v1::Command::Invoke(payload) => { + Envelope::new(dedup, commands::InvokeCommand::from(*payload)).into_raw() } - v1::Command::NotifyGetInvocationOutputResponse(payload) => { - records::NotifyGetInvocationOutputResponse::envelope(dedup, payload).into_raw() + v1::Command::InvokerEffect(payload) => { + Envelope::new(dedup, commands::InvokerEffectCommand::from(*payload)).into_raw() } + v1::Command::NotifyGetInvocationOutputResponse(payload) => Envelope::new( + dedup, + commands::NotifyGetInvocationOutputResponseCommand::from(payload), + ) + .into_raw(), v1::Command::NotifySignal(payload) => { - records::NotifySignal::envelope(dedup, payload).into_raw() + Envelope::new(dedup, commands::NotifySignalCommand::from(payload)).into_raw() } v1::Command::PatchState(payload) => { - records::PatchState::envelope(dedup, payload).into_raw() + Envelope::new(dedup, commands::PatchStateCommand::from(payload)).into_raw() } - v1::Command::ProxyThrough(payload) => records::ProxyThrough::envelope( + v1::Command::ProxyThrough(payload) => Envelope::new( dedup, - ProxyThroughPayload { - invocation: payload.into(), + commands::ProxyThroughCommand { + invocation: (*payload).into(), proxy_partition: Keys::Single(partition_key), }, ) .into_raw(), v1::Command::PurgeInvocation(payload) => { - records::PurgeInvocation::envelope(dedup, payload).into_raw() + Envelope::new(dedup, commands::PurgeInvocationCommand::from(payload)).into_raw() } v1::Command::PurgeJournal(payload) => { - records::PurgeJournal::envelope(dedup, payload).into_raw() - } - v1::Command::RestartAsNewInvocation(payload) => { - records::RestartAsNewInvocation::envelope(dedup, payload).into_raw() + Envelope::new(dedup, commands::PurgeJournalCommand::from(payload)).into_raw() } + v1::Command::RestartAsNewInvocation(payload) => Envelope::new( + dedup, + commands::RestartAsNewInvocationCommand::from(payload), + ) + .into_raw(), v1::Command::ResumeInvocation(payload) => { - records::ResumeInvocation::envelope(dedup, payload).into_raw() + Envelope::new(dedup, commands::ResumeInvocationCommand::from(payload)).into_raw() } v1::Command::ScheduleTimer(payload) => { - records::ScheduleTimer::envelope(dedup, payload).into_raw() + Envelope::new(dedup, commands::ScheduleTimerCommand::from(payload)).into_raw() } v1::Command::TerminateInvocation(payload) => { - records::TerminateInvocation::envelope(dedup, payload).into_raw() + Envelope::new(dedup, commands::TerminateInvocationCommand::from(payload)).into_raw() } - v1::Command::Timer(payload) => records::Timer::envelope(dedup, payload).into_raw(), - v1::Command::TruncateOutbox(payload) => records::TruncateOutbox::envelope( + v1::Command::Timer(payload) => { + Envelope::new(dedup, commands::TimerCommand::from(payload)).into_raw() + } + v1::Command::TruncateOutbox(payload) => Envelope::new( dedup, - TruncateOutboxPayload { + TruncateOutboxCommand { index: payload, // this actually should be a key-range but v1 unfortunately // only hold the "start" of the range. @@ -130,24 +136,21 @@ impl TryFrom for v2::Envelope { ) .into_raw(), v1::Command::UpdatePartitionDurability(payload) => { - records::UpdatePartitionDurability::envelope(dedup, payload).into_raw() - } - v1::Command::UpsertSchema(payload) => { - records::UpsertSchema::envelope(dedup, payload).into_raw() - } - v1::Command::VersionBarrier(payload) => { - records::VersionBarrier::envelope(dedup, payload).into_raw() + Envelope::new(dedup, payload).into_raw() } + v1::Command::UpsertSchema(payload) => Envelope::new(dedup, payload).into_raw(), + v1::Command::VersionBarrier(payload) => Envelope::new(dedup, payload).into_raw(), v1::Command::VQSchedulerDecisions(payload) => { // bytes are bilrost encoded SchedulerDecision. let payload = SchedulerDecisions::bilrost_decode(payload)?; - records::VQSchedulerDecisions::envelope(dedup, payload).into_raw() + Envelope::new(dedup, commands::VQSchedulerDecisionsCommand::from(payload)) + .into_raw() } v1::Command::UpsertRuleBook(payload) => { let rule_book = RuleBook::decode(payload.rule_book)?; - records::UpsertRuleBook::envelope( + Envelope::new( dedup, - UpsertRuleBookPayload { + UpsertRuleBookCommand { partition_key_range: payload.partition_key_range, rule_book: Arc::new(rule_book), }, diff --git a/crates/wal-protocol/src/v2/records.rs b/crates/wal-protocol/src/v2/records.rs deleted file mode 100644 index 65b0479361..0000000000 --- a/crates/wal-protocol/src/v2/records.rs +++ /dev/null @@ -1,475 +0,0 @@ -// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::sync::Arc; - -use serde::{Deserialize, Serialize}; - -use restate_encoding::Arced; -use restate_limiter::RuleBook; -use restate_storage_api::vqueue_table::scheduler::{self}; -use restate_types::{ - bilrost_storage_encode_decode, flexbuffers_storage_encode_decode, - identifiers::{WithInvocationId, WithPartitionKey}, - invocation, - logs::{HasRecordKeys, Keys}, - message::MessageIndex, - sharding::KeyRange, - state_mut, -}; - -use super::sealed::Sealed; -use super::{Record, RecordKind}; -use crate::timer::{self}; - -// Create type wrappers to implement storage encode/decode -// and HasRecordKeys -#[derive( - Debug, - Clone, - Eq, - PartialEq, - bilrost::Message, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct ExternalStateMutationPayload(state_mut::ExternalStateMutation); -bilrost_storage_encode_decode!(ExternalStateMutationPayload); - -impl HasRecordKeys for ExternalStateMutationPayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.0.service_id.partition_key()) - } -} - -#[derive( - Clone, - Eq, - PartialEq, - Serialize, - Deserialize, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct InvocationTerminationPayload(invocation::InvocationTermination); -flexbuffers_storage_encode_decode!(InvocationTerminationPayload); - -impl HasRecordKeys for InvocationTerminationPayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.invocation_id.partition_key()) - } -} - -#[derive( - Clone, - Eq, - PartialEq, - Serialize, - Deserialize, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct PurgeInvocationRequestPayload(invocation::PurgeInvocationRequest); -flexbuffers_storage_encode_decode!(PurgeInvocationRequestPayload); - -impl HasRecordKeys for PurgeInvocationRequestPayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.invocation_id.partition_key()) - } -} - -#[derive( - Clone, - Eq, - PartialEq, - Serialize, - Deserialize, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct ServiceInvocationPayload(Box); - -flexbuffers_storage_encode_decode!(ServiceInvocationPayload); - -impl HasRecordKeys for ServiceInvocationPayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.invocation_id.partition_key()) - } -} - -#[derive(Debug, Clone, bilrost::Message)] -pub struct TruncateOutboxPayload { - #[bilrost(1)] - pub index: MessageIndex, - - #[bilrost(2)] - pub partition_key_range: Keys, -} - -impl HasRecordKeys for TruncateOutboxPayload { - fn record_keys(&self) -> Keys { - self.partition_key_range.clone() - } -} - -bilrost_storage_encode_decode!(TruncateOutboxPayload); - -#[derive( - Clone, - Eq, - PartialEq, - Serialize, - Deserialize, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct AttachInvocationRequestPayload(invocation::AttachInvocationRequest); - -flexbuffers_storage_encode_decode!(AttachInvocationRequestPayload); - -impl HasRecordKeys for AttachInvocationRequestPayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.partition_key()) - } -} - -#[derive( - Clone, - Eq, - PartialEq, - Serialize, - Deserialize, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct ResumeInvocationRequestPayload(invocation::ResumeInvocationRequest); - -flexbuffers_storage_encode_decode!(ResumeInvocationRequestPayload); - -impl HasRecordKeys for ResumeInvocationRequestPayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.0.invocation_id.partition_key()) - } -} - -#[derive( - Clone, - Eq, - PartialEq, - Serialize, - Deserialize, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct RestartAsNewInvocationRequestPayload(invocation::RestartAsNewInvocationRequest); - -flexbuffers_storage_encode_decode!(RestartAsNewInvocationRequestPayload); - -impl HasRecordKeys for RestartAsNewInvocationRequestPayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.0.invocation_id.partition_key()) - } -} - -#[derive( - Clone, Serialize, Deserialize, derive_more::Deref, derive_more::Into, derive_more::From, -)] -pub struct EffectPayload(Box); - -flexbuffers_storage_encode_decode!(EffectPayload); - -impl HasRecordKeys for EffectPayload { - fn record_keys(&self) -> restate_types::logs::Keys { - Keys::Single(self.invocation_id.partition_key()) - } -} - -#[derive( - Clone, - Eq, - PartialEq, - Serialize, - Deserialize, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct TimerKeyValuePayload(timer::TimerKeyValue); - -flexbuffers_storage_encode_decode!(TimerKeyValuePayload); - -impl HasRecordKeys for TimerKeyValuePayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.invocation_id().partition_key()) - } -} - -#[derive( - Clone, - Eq, - PartialEq, - Serialize, - Deserialize, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct InvocationResponsePayload(invocation::InvocationResponse); - -flexbuffers_storage_encode_decode!(InvocationResponsePayload); - -impl HasRecordKeys for InvocationResponsePayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.partition_key()) - } -} - -#[derive( - Clone, - Eq, - PartialEq, - Serialize, - Deserialize, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct GetInvocationOutputResponsePayload(invocation::GetInvocationOutputResponse); - -flexbuffers_storage_encode_decode!(GetInvocationOutputResponsePayload); - -impl HasRecordKeys for GetInvocationOutputResponsePayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.0.target.invocation_id().partition_key()) - } -} - -#[derive( - Clone, - Eq, - PartialEq, - Serialize, - Deserialize, - derive_more::Deref, - derive_more::Into, - derive_more::From, -)] -pub struct NotifySignalRequestPayload(invocation::NotifySignalRequest); - -flexbuffers_storage_encode_decode!(NotifySignalRequestPayload); - -impl HasRecordKeys for NotifySignalRequestPayload { - fn record_keys(&self) -> Keys { - Keys::Single(self.partition_key()) - } -} - -#[derive(Clone, Serialize, Deserialize)] -pub struct ProxyThroughPayload { - pub invocation: ServiceInvocationPayload, - - pub proxy_partition: Keys, -} - -flexbuffers_storage_encode_decode!(ProxyThroughPayload); - -impl HasRecordKeys for ProxyThroughPayload { - fn record_keys(&self) -> Keys { - self.proxy_partition.clone() - } -} - -#[derive(Clone, derive_more::Deref, derive_more::Into, derive_more::From, bilrost::Message)] -pub struct SchedulerDecisionsPayload(scheduler::SchedulerDecisions); - -bilrost_storage_encode_decode!(SchedulerDecisionsPayload); - -impl HasRecordKeys for SchedulerDecisionsPayload { - fn record_keys(&self) -> Keys { - // All records in a decision are for a single partition key - if self.0.qids.is_empty() { - Keys::None - } else { - Keys::Single(self.0.qids[0].0.partition_key()) - } - } -} - -#[derive(Debug, Clone, bilrost::Message)] -pub struct UpsertRuleBookPayload { - #[bilrost(tag(1))] - pub partition_key_range: KeyRange, - #[bilrost(tag(2), encoding(Arced))] - pub rule_book: Arc, -} - -bilrost_storage_encode_decode!(UpsertRuleBookPayload); - -impl HasRecordKeys for UpsertRuleBookPayload { - fn record_keys(&self) -> Keys { - Keys::RangeInclusive(self.partition_key_range.into()) - } -} - -// end types - -// define record types - -macro_rules! record { - {@name=$name:ident, @kind=$type:expr, @payload=$payload:path} => { - #[allow(dead_code)] - #[derive(Clone, Copy)] - pub struct $name; - impl Sealed for $name{} - impl Record for $name { - const KIND: RecordKind = $type; - type Payload = $payload; - } - }; -} - -record! { - @name=AnnounceLeader, - @kind=RecordKind::AnnounceLeader, - @payload=crate::control::AnnounceLeader -} - -record! { - @name=VersionBarrier, - @kind=RecordKind::VersionBarrier, - @payload=crate::control::VersionBarrier -} - -record! { - @name=UpdatePartitionDurability, - @kind=RecordKind::UpdatePartitionDurability, - @payload=crate::control::PartitionDurability -} - -record! { - @name=PatchState, - @kind=RecordKind::PatchState, - @payload=ExternalStateMutationPayload -} - -record! { - @name=TerminateInvocation, - @kind=RecordKind::TerminateInvocation, - @payload=InvocationTerminationPayload -} - -record! { - @name=PurgeInvocation, - @kind=RecordKind::PurgeInvocation, - @payload=PurgeInvocationRequestPayload -} - -record! { - @name=PurgeJournal, - @kind=RecordKind::PurgeJournal, - @payload=PurgeInvocationRequestPayload -} - -record! { - @name=Invoke, - @kind=RecordKind::Invoke, - @payload=ServiceInvocationPayload -} - -record! { - @name=TruncateOutbox, - @kind=RecordKind::TruncateOutbox, - @payload=TruncateOutboxPayload -} - -record! { - @name=ProxyThrough, - @kind=RecordKind::ProxyThrough, - @payload=ProxyThroughPayload -} - -record! { - @name=AttachInvocation, - @kind=RecordKind::AttachInvocation, - @payload=AttachInvocationRequestPayload -} - -record! { - @name=ResumeInvocation, - @kind=RecordKind::ResumeInvocation, - @payload=ResumeInvocationRequestPayload -} - -record! { - @name=RestartAsNewInvocation, - @kind=RecordKind::RestartAsNewInvocation, - @payload=RestartAsNewInvocationRequestPayload -} - -record! { - @name=InvokerEffect, - @kind=RecordKind::InvokerEffect, - @payload=EffectPayload -} - -record! { - @name=Timer, - @kind=RecordKind::Timer, - @payload=TimerKeyValuePayload -} - -record! { - @name=ScheduleTimer, - @kind=RecordKind::ScheduleTimer, - @payload=TimerKeyValuePayload -} - -record! { - @name=InvocationResponse, - @kind=RecordKind::InvocationResponse, - @payload=InvocationResponsePayload -} - -record! { - @name=NotifyGetInvocationOutputResponse, - @kind=RecordKind::NotifyGetInvocationOutputResponse, - @payload=GetInvocationOutputResponsePayload -} - -record! { - @name=NotifySignal, - @kind=RecordKind::NotifySignal, - @payload=NotifySignalRequestPayload -} - -record! { - @name=UpsertSchema, - @kind=RecordKind::UpsertSchema, - @payload=crate::control::UpsertSchema -} - -record! { - @name=VQSchedulerDecisions, - @kind=RecordKind::VQSchedulerDecisions, - @payload=SchedulerDecisionsPayload -} - -record! { - @name=UpsertRuleBook, - @kind=RecordKind::UpsertRuleBook, - @payload=UpsertRuleBookPayload -} diff --git a/crates/worker/src/partition/leadership/durability_tracker.rs b/crates/worker/src/partition/leadership/durability_tracker.rs index 9554090981..6a092f3a93 100644 --- a/crates/worker/src/partition/leadership/durability_tracker.rs +++ b/crates/worker/src/partition/leadership/durability_tracker.rs @@ -28,7 +28,7 @@ use restate_types::nodes_config::Role; use restate_types::partitions::Partition; use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::time::MillisSinceEpoch; -use restate_wal_protocol::control::PartitionDurability; +use restate_wal_protocol::control::UpdatePartitionDurabilityCommand; const WARN_PERIOD: Duration = Duration::from_secs(60); @@ -148,7 +148,7 @@ impl DurabilityTracker { } impl Stream for DurabilityTracker { - type Item = PartitionDurability; + type Item = UpdatePartitionDurabilityCommand; fn poll_next( mut self: Pin<&mut Self>, @@ -212,7 +212,7 @@ impl Stream for DurabilityTracker { return Poll::Pending; } - let partition_durability = PartitionDurability { + let partition_durability = UpdatePartitionDurabilityCommand { partition_id: self.partition.partition_id, durable_point: suggested, modification_time: MillisSinceEpoch::now(), diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index e96f9ca68f..cd1a6d36ab 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -49,7 +49,7 @@ use restate_vqueues::VQueueEvent; use restate_vqueues::scheduler::Decisions; use restate_vqueues::{SchedulerService, VQueuesMeta}; use restate_wal_protocol::Command; -use restate_wal_protocol::control::UpsertSchema; +use restate_wal_protocol::control::UpsertSchemaCommand; use restate_worker_api::invoker::InvokerHandle; use restate_worker_api::resources::ReservedResources; use restate_worker_api::{SchedulerStatusEntry, UserLimitCounterEntry}; @@ -444,7 +444,7 @@ impl LeaderState { self.self_proposer .self_propose( self.partition_key_range.start(), - Command::UpsertSchema(UpsertSchema { + Command::UpsertSchema(UpsertSchemaCommand { partition_key_range: Keys::RangeInclusive( self.partition_key_range.into(), ), @@ -466,7 +466,7 @@ impl LeaderState { .self_propose( self.partition_key_range.start(), Command::UpsertRuleBook( - restate_wal_protocol::control::UpsertRuleBook { + restate_wal_protocol::control::UpsertRuleBookCommand { partition_key_range: self.partition_key_range, rule_book: book.bilrost_encode_to_bytes(), }, diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 4f7ff03d62..07f3ed8e3a 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -66,7 +66,9 @@ use restate_types::{ }; use restate_vqueues::scheduler::{self}; use restate_vqueues::{ResourceManager, SchedulerService, VQueuesMeta, VQueuesMetaCache}; -use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability, VersionBarrier}; +use restate_wal_protocol::control::{ + AnnounceLeaderCommand, UpdatePartitionDurabilityCommand, VersionBarrierCommand, +}; use restate_wal_protocol::timer::TimerKeyValue; use restate_wal_protocol::{Command, Envelope}; use restate_worker_api::invoker::capacity::InvokerCapacity; @@ -149,7 +151,7 @@ pub(crate) enum ActionEffect { Shuffle(shuffle::OutboxTruncation), Timer(TimerKeyValue), Cleaner(cleaner::CleanerEffect), - PartitionMaintenance(PartitionDurability), + PartitionMaintenance(UpdatePartitionDurabilityCommand), UpsertSchema(Schema), UpsertRuleBook(Arc), AwaitingRpcSelfProposeDone, @@ -265,7 +267,7 @@ where ) -> Result<(), Error> { let leader_epoch = leadership_info.leader_epoch; - let announce_leader = Command::AnnounceLeader(Box::new(AnnounceLeader { + let announce_leader = Command::AnnounceLeader(Box::new(AnnounceLeaderCommand { node_id: my_node_id(), leader_epoch, epoch_version: Some(leadership_info.version), @@ -334,7 +336,7 @@ where #[instrument(level = "debug", skip_all, fields(leader_epoch = %announce_leader.leader_epoch))] pub async fn on_announce_leader( &mut self, - announce_leader: &AnnounceLeader, + announce_leader: &AnnounceLeaderCommand, partition_store: &mut PartitionStore, replica_set_states: &PartitionReplicaSetStates, config: &Configuration, @@ -545,7 +547,7 @@ where self_proposer .self_propose( self.partition.key_range.start(), - Command::VersionBarrier(VersionBarrier { + Command::VersionBarrier(VersionBarrierCommand { version: forced_min_restate_version.clone(), partition_key_range: Keys::RangeInclusive( self.partition.key_range.into(), @@ -565,7 +567,7 @@ where self_proposer .self_propose( self.partition.key_range.start(), - Command::VersionBarrier(VersionBarrier { + Command::VersionBarrier(VersionBarrierCommand { version: RESTATE_VERSION_1_6_0.clone(), partition_key_range: Keys::RangeInclusive( self.partition.key_range.into(), diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index a4f0614cc2..2ddfcdbf33 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -83,7 +83,7 @@ use restate_types::{GenerationalNodeId, SemanticRestateVersion, Version}; use restate_util_string::{ReString, ToReString}; use restate_vqueues::{VQueuesMeta, VQueuesMetaCache}; use restate_wal_protocol::control::{ - AnnounceLeader, CurrentReplicaSetConfiguration, NextReplicaSetConfiguration, + AnnounceLeaderCommand, CurrentReplicaSetConfiguration, NextReplicaSetConfiguration, }; use restate_wal_protocol::{Command, Destination, Envelope, Header}; use restate_worker_api::invoker::capacity::InvokerCapacity; @@ -995,7 +995,7 @@ where transaction: &mut PartitionStoreTransaction<'_>, action_collector: &mut ActionCollector, vqueues_cache: &mut VQueuesMetaCache, - ) -> Result>, state_machine::Error> { + ) -> Result>, state_machine::Error> { trace!(lsn = %record.lsn, "Processing bifrost record for '{}': {:?}", record.envelope.command.name(), record.envelope.header); if let Some(dedup_information) = self.is_targeted_to_me(&record.envelope.header) { diff --git a/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs b/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs index c9c82d613e..439310e326 100644 --- a/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs +++ b/crates/worker/src/partition/state_machine/lifecycle/version_barrier.rs @@ -9,13 +9,13 @@ // by the Apache License, Version 2.0. use restate_storage_api::fsm_table::WriteFsmTable; -use restate_wal_protocol::control::VersionBarrier; +use restate_wal_protocol::control::VersionBarrierCommand; use crate::debug_if_leader; use crate::partition::state_machine::{CommandHandler, Error, StateMachineApplyContext}; pub struct OnVersionBarrierCommand { - pub barrier: VersionBarrier, + pub barrier: VersionBarrierCommand, } impl<'ctx, 's: 'ctx, S> CommandHandler<&'ctx mut StateMachineApplyContext<'s, S>> @@ -53,7 +53,7 @@ mod tests { use restate_types::logs::Keys; use restate_types::sharding::KeyRange; use restate_wal_protocol::Command; - use restate_wal_protocol::control::VersionBarrier; + use restate_wal_protocol::control::VersionBarrierCommand; use crate::partition::state_machine::StateMachine; use crate::partition::state_machine::tests::TestEnv; @@ -83,7 +83,7 @@ mod tests { ); let result = test_env - .apply_fallible(Command::VersionBarrier(VersionBarrier { + .apply_fallible(Command::VersionBarrier(VersionBarrierCommand { version: SemanticRestateVersion::parse("99.0.0").unwrap(), human_reason: Some("testing".to_string()), partition_key_range: Keys::RangeInclusive(PartitionKey::MIN..=PartitionKey::MAX), @@ -120,7 +120,7 @@ mod tests { let mut test_env = TestEnv::create_with_state_machine(state_machine).await; let result = test_env - .apply_fallible(Command::VersionBarrier(VersionBarrier { + .apply_fallible(Command::VersionBarrier(VersionBarrierCommand { version: SemanticRestateVersion::current().clone(), human_reason: Some("testing".to_string()), partition_key_range: Keys::RangeInclusive(PartitionKey::MIN..=PartitionKey::MAX), @@ -135,7 +135,7 @@ mod tests { } // re-apply the same version, no-op let result = test_env - .apply_fallible(Command::VersionBarrier(VersionBarrier { + .apply_fallible(Command::VersionBarrier(VersionBarrierCommand { version: SemanticRestateVersion::current().clone(), human_reason: Some("testing".to_string()), partition_key_range: Keys::RangeInclusive(PartitionKey::MIN..=PartitionKey::MAX), @@ -150,7 +150,7 @@ mod tests { // apply an older version, success but without effect. let result = test_env - .apply_fallible(Command::VersionBarrier(VersionBarrier { + .apply_fallible(Command::VersionBarrier(VersionBarrierCommand { version: SemanticRestateVersion::parse("0.1.0").unwrap(), human_reason: Some("testing".to_string()), partition_key_range: Keys::RangeInclusive(PartitionKey::MIN..=PartitionKey::MAX),