Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions crates/clock/src/rough_ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ mod bilrost_encoding {
to encode proxied type (RoughTimestamp)
with general encodings including distinguished
);

bilrost::delegate_proxied_encoding!(
use encoding (bilrost::encoding::Fixed)
to encode proxied type (RoughTimestamp)
with encoding (bilrost::encoding::Fixed)
including distinguished
);
}

#[cfg(test)]
Expand Down Expand Up @@ -526,4 +533,23 @@ mod tests {
);
}
}

#[test]
fn bilrost_fixed_round_trip() {
use bilrost::Message;

#[derive(bilrost::Message, PartialEq, Debug)]
struct TestMessage {
#[bilrost(tag(1), encoding(fixed))]
timestamp: RoughTimestamp,
}

let msg = TestMessage {
timestamp: RoughTimestamp::new(123),
};

let encoded = msg.encode_to_vec();
assert_eq!(encoded.len(), 5);
assert_eq!(TestMessage::decode(encoded.as_slice()), Ok(msg));
}
}
7 changes: 7 additions & 0 deletions crates/clock/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ mod bilrost_encoding {
to encode proxied type (MillisSinceEpoch)
with general encodings including distinguished
);

bilrost::delegate_proxied_encoding!(
use encoding (bilrost::encoding::Fixed)
to encode proxied type (MillisSinceEpoch)
with encoding (bilrost::encoding::Fixed)
including distinguished
);
}

/// Nanos since the unix epoch. Used internally to get rough latency measurements across nodes.
Expand Down
7 changes: 7 additions & 0 deletions crates/clock/src/unique_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ mod bilrost_encoding {
to encode proxied type (UniqueTimestamp)
with general encodings including distinguished
);

bilrost::delegate_proxied_encoding!(
use encoding (bilrost::encoding::Fixed)
to encode proxied type (UniqueTimestamp)
with encoding (bilrost::encoding::Fixed)
including distinguished
);
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions crates/partition-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,9 @@ tracing-subscriber = { workspace = true }
name = "basic_benchmark"
harness = false

[[bench]]
name = "vqueue_meta_merge"
harness = false

[lints]
workspace = true
152 changes: 152 additions & 0 deletions crates/partition-store/benches/vqueue_meta_merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use bilrost::Message;
use criterion::{Criterion, Throughput, black_box, criterion_group, criterion_main};

use restate_clock::UniqueTimestamp;
use restate_clock::time::MillisSinceEpoch;
use restate_limiter::LimitKey;
use restate_partition_store::vqueue_table::{MetaKey, vqueue_meta_merge};
use restate_storage_api::vqueue_table::Stage;
use restate_storage_api::vqueue_table::metadata::{
Action, MoveMetrics, Update, VQueueLink, VQueueMeta,
};
use restate_types::vqueues::VQueueId;

const BASE_TS_MS: u64 = 1_744_000_000_000;
const MERGE_OPERANDS: usize = 10_000;

fn ts(offset_ms: u64) -> UniqueTimestamp {
UniqueTimestamp::from_unix_millis_unchecked(MillisSinceEpoch::new(BASE_TS_MS + offset_ms))
}

fn move_metrics(
last_transition_at: UniqueTimestamp,
first_runnable_at: MillisSinceEpoch,
has_started: bool,
blocked_on_concurrency_rules_ms: u32,
blocked_on_invoker_throttling_ms: u32,
) -> MoveMetrics {
MoveMetrics {
last_transition_at,
has_started,
first_runnable_at,
blocked_on_concurrency_rules_ms,
blocked_on_invoker_throttling_ms,
}
}

fn update_operands() -> Vec<Vec<u8>> {
let mut operands = Vec::with_capacity(MERGE_OPERANDS);

for cycle in 0..(MERGE_OPERANDS / 4) {
let base_offset = (cycle as u64) * 4;
let enqueued_at = ts(base_offset);
let started_at = ts(base_offset + 1);
let finished_at = ts(base_offset + 2);
let first_runnable_at = enqueued_at.to_unix_millis();

operands.push(
Update::new(
enqueued_at,
Action::Move {
prev_stage: None,
next_stage: Stage::Inbox,
metrics: move_metrics(enqueued_at, first_runnable_at, false, 0, 0),
},
)
.encode_contiguous()
.into_vec(),
);

operands.push(
Update::new(
started_at,
Action::Move {
prev_stage: Some(Stage::Inbox),
next_stage: Stage::Running,
metrics: move_metrics(
enqueued_at,
first_runnable_at,
false,
(cycle % 1_000) as u32,
(cycle % 100) as u32,
),
},
)
.encode_contiguous()
.into_vec(),
);

operands.push(
Update::new(
finished_at,
Action::Move {
prev_stage: Some(Stage::Running),
next_stage: Stage::Finished,
metrics: move_metrics(started_at, first_runnable_at, true, 0, 0),
},
)
.encode_contiguous()
.into_vec(),
);

operands.push(
Update::new(
ts(base_offset + 3),
Action::RemoveEntry {
stage: Stage::Finished,
},
)
.encode_contiguous()
.into_vec(),
);
}

operands
}

fn initial_vqueue_meta() -> Vec<u8> {
VQueueMeta::new(ts(0), None, LimitKey::None, VQueueLink::None)
.encode_contiguous()
.into_vec()
}

fn vqueue_meta_key() -> Vec<u8> {
let qid = VQueueId::custom(1, "vqueue-meta-merge-benchmark");
MetaKey::from(&qid).to_bytes().to_vec()
}

fn vqueue_meta_merge_benchmark(c: &mut Criterion) {
let key = vqueue_meta_key();
let existing_value = initial_vqueue_meta();
let operands = update_operands();
let operand_slices = operands.iter().map(Vec::as_slice).collect::<Vec<_>>();

let mut group = c.benchmark_group("vqueue_meta_merge");
group.throughput(Throughput::Elements(MERGE_OPERANDS as u64));
group.bench_function("full_merge_10000_operands", |bencher| {
bencher.iter(|| {
let operands = black_box(operand_slices.as_slice()).iter().copied();
let merged = vqueue_meta_merge::full_merge_slices(
black_box(key.as_slice()),
Some(black_box(existing_value.as_slice())),
operands,
)
.expect("vqueue metadata merge succeeds");
black_box(merged);
});
});
group.finish();
}

criterion_group!(benches, vqueue_meta_merge_benchmark);
criterion_main!(benches);
22 changes: 6 additions & 16 deletions crates/partition-store/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,25 +232,15 @@ impl KeyKind {
}

// Rocksdb merge operator function (partial merge)
#[inline]
pub fn partial_merge(
key: &[u8],
_key: &[u8],
_unused: Option<&[u8]>,
operands: &MergeOperands,
_operands: &MergeOperands,
) -> Option<Vec<u8>> {
let mut kind_buf = key;
let kind = match KeyKind::deserialize(&mut kind_buf) {
Ok(kind) => kind,
Err(e) => {
error!("Cannot apply merge operator; {e}");
return None;
}
};
trace!(?kind, "partial merge");

match kind {
KeyKind::VQueueMeta => vqueue_meta_merge::partial_merge(key, operands),
_ => None,
}
// Currently, we have no partial merge operator for any key. Change this
// if/when this is needed.
None
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/partition-store/src/partition_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ impl CfConfigurator for RocksConfigurator<AllDataCf> {
KeyKind::full_merge,
KeyKind::partial_merge,
);
cf_options.set_max_successive_merges(100);
cf_options.set_max_successive_merges(5000);

cf_options.set_disable_auto_compactions(config.rocksdb.rocksdb_disable_auto_compactions());
if let Some(compaction_period) = config.rocksdb.rocksdb_periodic_compaction_seconds() {
Expand Down
62 changes: 24 additions & 38 deletions crates/partition-store/src/vqueue_table/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ impl MetaKey {
}
}

// todo: check if this is still needed
impl From<&VQueueId> for MetaKey {
#[inline]
fn from(qid: &VQueueId) -> Self {
Expand All @@ -80,21 +79,30 @@ impl From<ActiveKey> for MetaKey {
}

// Rocksdb merge operator for the vqueue keys
pub(crate) mod vqueue_meta_merge {
pub mod vqueue_meta_merge {
use bilrost::{Message, OwnedMessage};
use rocksdb::MergeOperands;
use tracing::error;

use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaUpdates};
use restate_storage_api::vqueue_table;
use restate_storage_api::vqueue_table::metadata::VQueueMeta;

use crate::keys::DecodeTableKey;

use super::MetaKey;

pub fn full_merge(
mut key: &[u8],
key: &[u8],
existing_val: Option<&[u8]>,
operands: &MergeOperands,
) -> Option<Vec<u8>> {
full_merge_slices(key, existing_val, operands)
}

pub fn full_merge_slices<'a>(
mut key: &[u8],
existing_val: Option<&[u8]>,
operands: impl IntoIterator<Item = &'a [u8]>,
) -> Option<Vec<u8>> {
let Some(mut existing_val) = existing_val else {
let key = MetaKey::deserialize_from(&mut key);
Expand All @@ -117,43 +125,21 @@ pub(crate) mod vqueue_meta_merge {
}
};

let mut update = <vqueue_table::metadata::Update as bilrost::encoding::RawMessage>::empty();
for op in operands {
let batch = match VQueueMetaUpdates::decode(op) {
Err(err) => {
let key = MetaKey::deserialize_from(&mut key);
error!(
?err,
?key,
"[full merge] Failed to decode vqueue meta batched updates ({} bytes)",
op.len(),
);
return None;
}
Ok(batch) => batch,
};
for update in batch.updates.iter() {
vqueue_meta.apply_update(update);
if let Err(err) = update.replace_from_slice(op) {
let key = MetaKey::deserialize_from(&mut key);
Comment on lines +128 to +131
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Handle legacy vqueue meta operands during full merge

full_merge_slices now decodes every merge operand as a single Update, but pre-upgrade data was written as VQueueMetaUpdates batches. During a rolling upgrade (or after restart with existing SST/memtable merge records), those legacy operands will not be interpreted correctly by this path, so queued metadata updates can be dropped or the merge can fail, leaving vqueue counters/stats inconsistent. Please add a compatibility decode path for the old operand format before applying updates.

Useful? React with 👍 / 👎.

error!(
?err,
?key,
"[full merge] Failed to decode vqueue meta update ({} bytes)",
op.len(),
);
return None;
}
vqueue_meta.apply_update(&update);
}
Some(vqueue_meta.encode_to_vec())
}

pub fn partial_merge(mut _key: &[u8], operands: &MergeOperands) -> Option<Vec<u8>> {
let mut updates =
VQueueMetaUpdates::with_capacity(operands.len() * VQueueMetaUpdates::INLINED_UPDATES);
for op in operands {
let partial_updates = match VQueueMetaUpdates::decode(op) {
Err(err) => {
error!(
?err,
"[partial merge] Failed to decode vqueue meta batched updates"
);
return None;
}
Ok(u) => u,
};
updates.extend(partial_updates);
}
Some(updates.encode_to_vec())
Some(vqueue_meta.encode_contiguous().into_vec())
}
}
Loading
Loading