Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/storage-api/src/deduplication_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl DedupInformation {
}
}

#[deprecated]
pub fn ingress(producer_id: impl Into<ByteString>, sequence_number: MessageIndex) -> Self {
DedupInformation {
producer_id: ProducerId::Other(producer_id.into()),
Expand Down
8 changes: 4 additions & 4 deletions crates/storage-api/src/vqueue_table/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub enum YieldReason {
},
}

#[derive(Debug, bilrost::Oneof, bilrost::Message, derive_more::From)]
#[derive(Debug, Clone, bilrost::Oneof, bilrost::Message, derive_more::From)]
pub enum SchedulerAction {
#[bilrost(empty)]
Unknown,
Expand All @@ -40,23 +40,23 @@ pub enum SchedulerAction {
Yield(YieldAction),
}

#[derive(Debug, bilrost::Message)]
#[derive(Debug, Clone, bilrost::Message)]
pub struct RunAction {
#[bilrost(tag(1))]
pub key: EntryKey,
#[bilrost(tag(2))]
pub wait_stats: WaitStats,
}

#[derive(Debug, bilrost::Message)]
#[derive(Debug, Clone, bilrost::Message)]
pub struct YieldAction {
#[bilrost(tag(1))]
pub key: EntryKey,
#[bilrost(tag(2))]
pub next_run_at: Option<RoughTimestamp>,
}

#[derive(Debug, bilrost::Message)]
#[derive(Debug, Clone, bilrost::Message)]
pub struct SchedulerDecisions {
#[bilrost(tag(1))]
pub qids: Vec<(VQueueId, Vec<SchedulerAction>)>,
Expand Down
2 changes: 1 addition & 1 deletion crates/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ restate-clock = { workspace = true, features = ["prost-types", "jiff", "hlc"] }
restate-encoding = { workspace = true }
restate-errors = { workspace = true }
restate-memory = { workspace = true }
restate-platform = { workspace = true }
restate-platform = { workspace = true, features = ["bilrost"] }
restate-serde-util = { workspace = true }
restate-sharding = { workspace = true, features = ["serde", "bilrost"] }
restate-test-util = { workspace = true, optional = true }
Expand Down
12 changes: 12 additions & 0 deletions crates/types/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,18 @@ impl<T: StorageEncode> BodyWithKeys<T> {
pub fn into_inner(self) -> T {
self.inner
}

pub fn keys(&self) -> &Keys {
&self.keys
}

pub fn inner(&self) -> &T {
&self.inner
}

pub fn split(self) -> (Keys, T) {
(self.keys, self.inner)
}
}

impl<T> HasRecordKeys for BodyWithKeys<T>
Expand Down
52 changes: 52 additions & 0 deletions crates/types/src/replication/nodeset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,58 @@ fn write_nodes<'a>(
write!(f, "]")
}

mod bilrost_encoding {
use bilrost::encoding::{EmptyState, ForOverwrite, Proxiable};

use crate::PlainNodeId;

use super::NodeSet;

impl Proxiable for NodeSet {
type Proxy = Vec<PlainNodeId>;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid the extra allocation and serialize/deserialize directly into the indexmap?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into this one


fn encode_proxy(&self) -> Self::Proxy {
Vec::from_iter(self.0.iter().cloned())
}

fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), bilrost::DecodeErrorKind> {
self.0.extend(proxy);
Ok(())
}
}

impl EmptyState<(), NodeSet> for () {
fn clear(val: &mut NodeSet) {
val.0.clear();
}
fn empty() -> NodeSet
where
NodeSet: Sized,
{
NodeSet::new()
}

fn is_empty(val: &NodeSet) -> bool {
val.is_empty()
}
}

impl ForOverwrite<(), NodeSet> for () {
fn for_overwrite() -> NodeSet
where
NodeSet: Sized,
{
NodeSet::new()
}
}

bilrost::delegate_proxied_encoding!(
use encoding (bilrost::encoding::GeneralPacked)
to encode proxied type (NodeSet)
with general encodings
);
}

#[cfg(test)]
mod test {
use ahash::HashMap;
Expand Down
33 changes: 33 additions & 0 deletions crates/types/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,39 @@ macro_rules! flexbuffers_storage_encode_decode {
};
}

/// Implements the [`StorageEncode`] and [`StorageDecode`] by encoding/decoding the implementing
/// type using [`bilrost`].
#[macro_export]
macro_rules! bilrost_storage_encode_decode {
($name:tt) => {
impl $crate::storage::StorageEncode for $name {
fn default_codec(&self) -> $crate::storage::StorageCodecKind {
$crate::storage::StorageCodecKind::Bilrost
}

fn encode(
&self,
buf: &mut ::bytes::BytesMut,
) -> Result<(), $crate::storage::StorageEncodeError> {
$crate::storage::encode::encode_bilrost(self, buf)
}
}

impl $crate::storage::StorageDecode for $name {
fn decode<B: ::bytes::Buf>(
buf: &mut B,
kind: $crate::storage::StorageCodecKind,
) -> Result<Self, $crate::storage::StorageDecodeError>
where
Self: Sized,
{
debug_assert_eq!(kind, $crate::storage::StorageCodecKind::Bilrost);
$crate::storage::decode::decode_bilrost(buf)
}
}
};
}

/// A polymorphic container of a buffer or a cached storage-encodeable object
#[derive(Clone, derive_more::Debug, BilrostAs)]
#[bilrost_as(dto::PolyBytes)]
Expand Down
7 changes: 7 additions & 0 deletions crates/wal-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@ rust-version.workspace = true
license.workspace = true
publish = false

[features]
test-util = []

[dependencies]
restate-workspace-hack = { workspace = true }

restate-encoding = { workspace = true }
restate-limiter = { workspace = true }
restate-storage-api = { workspace = true }
restate-types = { workspace = true }
restate-util-string = { workspace = true }
restate-worker-api = { workspace = true, features = ["serde"] }

anyhow = { workspace = true }
bilrost = { workspace = true }
bytes = { workspace = true }
derive_more = { workspace = true, features = ["debug"] }
Expand Down
69 changes: 61 additions & 8 deletions crates/wal-protocol/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,33 @@
// by the Apache License, Version 2.0.

use bytes::Bytes;

use restate_storage_api::fsm_table::{CurrentReplicaSetState, NextReplicaSetState};
use restate_types::identifiers::{LeaderEpoch, PartitionId};
use restate_types::logs::{Keys, Lsn, SequenceNumber};
use restate_types::logs::{HasRecordKeys, Keys, Lsn, SequenceNumber};
use restate_types::partitions::PartitionConfiguration;
use restate_types::partitions::state::{MemberState, ReplicaSetState};
use restate_types::replication::{NodeSet, ReplicationProperty};
use restate_types::schema::Schema;
use restate_types::sharding::KeyRange;
use restate_types::time::MillisSinceEpoch;
use restate_types::{GenerationalNodeId, SemanticRestateVersion, Version, Versioned};
use restate_types::{
GenerationalNodeId, SemanticRestateVersion, Version, Versioned, bilrost_storage_encode_decode,
flexbuffers_storage_encode_decode,
};

/// Announcing a new leader. This message can be written by any component to make the specified
/// partition processor the leader.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, bilrost::Message)]
pub struct AnnounceLeader {
/// Sender of the announce leader message.
///
/// This became non-optional in v1.5. Noting that it has always been set in previous versions,
/// it's safe to assume that it's always set.
#[bilrost(tag(1))]
pub node_id: GenerationalNodeId,
#[bilrost(tag(2))]
pub leader_epoch: LeaderEpoch,
#[bilrost(tag(3))]
pub partition_key_range: KeyRange,

/// Associated epoch metadata version
Expand All @@ -40,25 +45,40 @@ pub struct AnnounceLeader {
///
/// *Since v1.6*
#[serde(default, skip_serializing_if = "Option::is_none")]
#[bilrost(tag(4))]
pub epoch_version: Option<Version>,
/// Current replica set configuration at the time of the announcement.
/// This field is optional for backward compatibility with older versions.
/// *Since v1.6*
#[serde(default, skip_serializing_if = "Option::is_none")]
#[bilrost(tag(5))]
pub current_config: Option<CurrentReplicaSetConfiguration>,
/// Next replica set configuration.
/// *Since v1.6*
#[serde(default, skip_serializing_if = "Option::is_none")]
#[bilrost(tag(6))]
pub next_config: Option<NextReplicaSetConfiguration>,
}

bilrost_storage_encode_decode!(AnnounceLeader);

impl HasRecordKeys for AnnounceLeader {
fn record_keys(&self) -> Keys {
Keys::RangeInclusive(self.partition_key_range.start()..=self.partition_key_range.end())
}
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, bilrost::Message)]
pub struct CurrentReplicaSetConfiguration {
#[bilrost(tag = 1)]
pub version: Version,
#[bilrost(tag = 2)]
pub replica_set: NodeSet,
#[bilrost(tag = 3)]
pub modified_at: MillisSinceEpoch,
#[serde_as(as = "serde_with::DisplayFromStr")]
#[bilrost(tag = 4)]
pub replication: ReplicationProperty,
}

Expand Down Expand Up @@ -87,9 +107,11 @@ impl CurrentReplicaSetConfiguration {
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, bilrost::Message)]
pub struct NextReplicaSetConfiguration {
#[bilrost(tag(1))]
pub version: Version,
#[bilrost(tag(2))]
pub replica_set: NodeSet,
}

Expand Down Expand Up @@ -138,7 +160,7 @@ fn new_replica_set_state(version: Version, node_set: &NodeSet) -> ReplicaSetStat
/// Readers before v1.4.0 will crash when reading this command. For v1.4.0+, the barrier defines the
/// minimum version of restate server that can progress after this command. It also updates the FSM
/// in case command has been trimmed.
#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, bilrost::Message, serde::Serialize, serde::Deserialize)]
pub struct VersionBarrier {
/// The minimum version required (inclusive) to progress after this barrier.
pub version: SemanticRestateVersion,
Expand All @@ -147,21 +169,44 @@ pub struct VersionBarrier {
pub partition_key_range: Keys,
}

bilrost_storage_encode_decode!(VersionBarrier);

impl HasRecordKeys for VersionBarrier {
fn record_keys(&self) -> Keys {
self.partition_key_range.clone()
}
}

/// Updates the `PARTITION_DURABILITY` FSM variable to the given value. Note that durability
/// only applies to partitions with the same `partition_id`. At replay time, the partition will
/// ignore updates that are not targeted to its own ID.
///
/// NOTE: The durability point is monotonically increasing.
///
/// Since v1.4.2.
#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, bilrost::Message, serde::Serialize, serde::Deserialize)]
pub struct PartitionDurability {
#[bilrost(tag(1))]
pub partition_id: PartitionId,
/// The partition has applied this LSN durably to the replica-set and/or has been
/// persisted in a snapshot in the snapshot repository.
#[bilrost(tag(2))]
pub durable_point: Lsn,
/// Timestamp which the durability point was updated
#[bilrost(tag(3))]
pub modification_time: MillisSinceEpoch,
/// partition key range
#[bilrost(tag(4))]
#[serde(default)]
pub partition_key_range: Keys,
Comment on lines +198 to +201
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve decoding of old durability records

In clusters upgrading with existing v1 WAL entries, v1::Envelope is still flexbuffers/serde decoded before the v1→v2 bridge runs (crates/wal-protocol/src/v1.rs:27, crates/wal-protocol/src/v1.rs:33), and Command::UpdatePartitionDurability embeds this PartitionDurability type (crates/wal-protocol/src/v1.rs:135). Adding a required serde field here means any pre-upgrade durability record that lacks partition_key_range will fail deserialization instead of replaying; please make this field serde-defaultable or keep the v1 payload shape separate and synthesize the key range during compatibility conversion.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safe to serde(default) the partition_key_range since it will default to None. PartitionDurability records that are read by a partition are still filtered out by the partition_id, even if a None partition_key_range is still accepted by the range key_filter.

}

bilrost_storage_encode_decode!(PartitionDurability);

impl HasRecordKeys for PartitionDurability {
fn record_keys(&self) -> Keys {
self.partition_key_range.clone()
}
}

/// Consistently store schema across partition replicas.
Expand All @@ -173,6 +218,14 @@ pub struct UpsertSchema {
pub schema: Schema,
}

flexbuffers_storage_encode_decode!(UpsertSchema);

impl HasRecordKeys for UpsertSchema {
fn record_keys(&self) -> Keys {
self.partition_key_range.clone()
}
}

/// Consistently distribute the cluster-global rule book across partition
/// replicas. Each partition's leader observes a node-level cache of the
/// rule book stored in the metadata store and proposes this command when
Expand Down
1 change: 1 addition & 0 deletions crates/wal-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
pub mod control;
pub mod timer;
pub mod v1;
pub mod v2;

pub use v1::{Command, Destination, Envelope, Header, Source};
Loading
Loading