Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions crates/wal-protocol/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use bytes::Bytes;
use std::sync::Arc;

use bytes::{Buf, BufMut, Bytes};

use restate_encoding::Arced;
use restate_limiter::RuleBook;
use restate_storage_api::fsm_table::{CurrentReplicaSetState, NextReplicaSetState};
use restate_types::identifiers::{LeaderEpoch, PartitionId};
use restate_types::logs::{HasRecordKeys, Keys, Lsn, SequenceNumber};
Expand Down Expand Up @@ -241,8 +246,28 @@ impl HasRecordKeys for UpsertSchemaCommand {
/// The state machine decodes once on apply.
///
/// Since v1.7.0.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, bilrost::Message)]
pub struct UpsertRuleBookCommand {
pub partition_key_range: KeyRange,
pub rule_book: Bytes,
#[bilrost(tag(1), encoding(Arced))]
pub rule_book: Arc<RuleBook>,
}

bilrost_storage_encode_decode!(UpsertRuleBookCommand);

impl UpsertRuleBookCommand {
pub fn bilrost_encode<B: BufMut>(&self, b: &mut B) -> Result<(), bilrost::EncodeError> {
bilrost::Message::encode(self, b)
}

pub fn encoded_len(&self) -> usize {
bilrost::Message::encoded_len(self)
}

pub fn bilrost_encode_to_bytes(&self) -> Bytes {
bilrost::Message::encode_to_bytes(self)
}

pub fn bilrost_decode<B: Buf>(buf: B) -> Result<Self, bilrost::DecodeError> {
bilrost::OwnedMessage::decode(buf)
}
}
17 changes: 14 additions & 3 deletions crates/wal-protocol/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use restate_types::invocation::{
};
use restate_types::logs::{self, HasRecordKeys, Keys, MatchKeyQuery};
use restate_types::message::MessageIndex;
use restate_types::sharding::KeyRange;
use restate_types::state_mut::ExternalStateMutation;

use crate::control::{
AnnounceLeaderCommand, UpdatePartitionDurabilityCommand, UpsertRuleBookCommand,
UpsertSchemaCommand, VersionBarrierCommand,
AnnounceLeaderCommand, UpdatePartitionDurabilityCommand, UpsertSchemaCommand,
VersionBarrierCommand,
};
use crate::timer::TimerKeyValue;

Expand Down Expand Up @@ -193,8 +194,9 @@ pub enum Command {
/// Upsert the cluster-global rule book for consistent rules across
/// replicas; the apply path persists it to the partition store and
/// notifies the leader's `UserLimiter` of the diff.
///
/// *Since v1.7.0
UpsertRuleBook(UpsertRuleBookCommand),
UpsertRuleBook(UpsertRuleBookCommandWrapper),
// # Commands for VQueues management
// ----------------------------------
/// A command to attempt a run an entry in the vqueue (invocation, or otherwise)
Expand Down Expand Up @@ -275,3 +277,12 @@ impl MatchKeyQuery for Envelope {
self.record_keys().matches_key_query(query)
}
}

/// A temporary wrapper for [`UpsertRuleBookCommand`]
/// only used in v1 to only supply the partition_key_range
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct UpsertRuleBookCommandWrapper {
pub partition_key_range: KeyRange,
/// Bytes are bilrost encoded [`UpsertRuleBookCommand`]
pub command: Bytes,
}
31 changes: 5 additions & 26 deletions crates/wal-protocol/src/v2/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,24 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// Re-epxort vqueues commands
pub use crate::vqueues::{VQueuesPauseCommand, VQueuesResumeCommand};
pub use restate_storage_api::vqueue_table::scheduler::SchedulerDecisionsCommand;

use std::sync::Arc;

use serde::{Deserialize, Serialize};

use restate_encoding::Arced;
use restate_limiter::RuleBook;
pub use restate_storage_api::vqueue_table::scheduler::SchedulerDecisionsCommand;
use restate_types::{
bilrost_storage_encode_decode, flexbuffers_storage_encode_decode,
identifiers::{WithInvocationId, WithPartitionKey},
invocation,
logs::{HasRecordKeys, Keys},
message::MessageIndex,
sharding::KeyRange,
state_mut,
};

use super::sealed::Sealed;
use super::{Command, CommandKind};
use crate::timer::{self};
pub use crate::control::UpsertRuleBookCommand;
use crate::timer;
// Re-epxort vqueues commands
pub use crate::vqueues::{VQueuesPauseCommand, VQueuesResumeCommand};

pub use crate::control::{
AnnounceLeaderCommand, UpdatePartitionDurabilityCommand, UpsertSchemaCommand,
Expand Down Expand Up @@ -340,22 +335,6 @@ impl HasRecordKeys for ProxyThroughCommand {
}
}

#[derive(Debug, Clone, bilrost::Message)]
pub struct UpsertRuleBookCommand {
#[bilrost(tag(1))]
pub partition_key_range: KeyRange,
#[bilrost(tag(2), encoding(Arced))]
pub rule_book: Arc<RuleBook>,
}

bilrost_storage_encode_decode!(UpsertRuleBookCommand);

impl HasRecordKeys for UpsertRuleBookCommand {
fn record_keys(&self) -> Keys {
Keys::RangeInclusive(self.partition_key_range.into())
}
}

// end types

// define record types
Expand Down
27 changes: 7 additions & 20 deletions crates/wal-protocol/src/v2/compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use bilrost::OwnedMessage;

use restate_encoding::U128;
use restate_limiter::RuleBook;
use restate_storage_api::deduplication_table::{
DedupInformation, DedupSequenceNumber, EpochSequenceNumber, ProducerId,
};
Expand All @@ -23,10 +18,7 @@ use restate_util_string::ReString;
use super::{Raw, commands};
use crate::{
v1,
v2::{
self, Envelope,
commands::{TruncateOutboxCommand, UpsertRuleBookCommand},
},
v2::{self, Envelope, commands::TruncateOutboxCommand},
};

impl TryFrom<v1::Envelope> for v2::Envelope<Raw> {
Expand Down Expand Up @@ -139,17 +131,12 @@ impl TryFrom<v1::Envelope> for v2::Envelope<Raw> {
}
v1::Command::UpsertSchema(payload) => Envelope::new(dedup, payload).into_raw(),
v1::Command::VersionBarrier(payload) => Envelope::new(dedup, payload).into_raw(),
v1::Command::UpsertRuleBook(payload) => {
let rule_book = RuleBook::decode(payload.rule_book)?;
Envelope::new(
dedup,
UpsertRuleBookCommand {
partition_key_range: payload.partition_key_range,
rule_book: Arc::new(rule_book),
},
)
.into_raw()
}
v1::Command::UpsertRuleBook(wrapper) => Envelope::from_bytes_unchecked(
v2::CommandKind::UpsertRuleBook,
StorageCodecKind::Bilrost,
dedup,
wrapper.command,
),
v1::Command::VQSchedulerDecisions(payload) => Envelope::from_bytes_unchecked(
v2::CommandKind::VQSchedulerDecisions,
StorageCodecKind::Bilrost,
Expand Down
20 changes: 13 additions & 7 deletions crates/worker/src/partition/leadership/leader_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use restate_vqueues::scheduler::Decisions;
use restate_vqueues::{SchedulerService, VQueuesMeta};
use restate_wal_protocol::Command;
use restate_wal_protocol::control::UpsertSchemaCommand;
use restate_wal_protocol::v1::UpsertRuleBookCommandWrapper;
use restate_worker_api::invoker::InvokerHandle;
use restate_worker_api::resources::ReservedResources;
use restate_worker_api::{SchedulerStatusEntry, UserLimitCounterEntry};
Expand Down Expand Up @@ -454,23 +455,28 @@ impl LeaderState {
.await?;
}
}
ActionEffect::UpsertRuleBook(book) => {
ActionEffect::UpsertRuleBook(rule_book) => {
// todo(tillrohrmann) also enable the feature once the partition has been migrated
// to use vqueues and then rolling back to v1.7
if Configuration::pinned()
.common
.experimental
.is_vqueues_enabled()
{
let cmd =
restate_wal_protocol::control::UpsertRuleBookCommand { rule_book };

arena.reserve(cmd.encoded_len());
// safe to unwrap because we reserved enough space
cmd.bilrost_encode(&mut arena).unwrap();

self.self_proposer
.self_propose(
self.partition_key_range.start(),
Command::UpsertRuleBook(
restate_wal_protocol::control::UpsertRuleBookCommand {
partition_key_range: self.partition_key_range,
rule_book: book.bilrost_encode_to_bytes(),
},
),
Command::UpsertRuleBook(UpsertRuleBookCommandWrapper {
partition_key_range: self.partition_key_range,
command: arena.split().freeze(),
}),
)
.await?;
}
Expand Down
Loading