Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions crates/clock/src/rough_ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@ mod bilrost_encoding {
to encode proxied type (RoughTimestamp)
with general encodings including distinguished
);

bilrost::delegate_proxied_encoding!(
use encoding (bilrost::encoding::Fixed)
to encode proxied type (RoughTimestamp)
with encoding (bilrost::encoding::Fixed)
including distinguished
);
}

#[cfg(test)]
Expand Down Expand Up @@ -526,4 +533,23 @@ mod tests {
);
}
}

#[test]
fn bilrost_fixed_round_trip() {
use bilrost::Message;

#[derive(bilrost::Message, PartialEq, Debug)]
struct TestMessage {
#[bilrost(tag(1), encoding(fixed))]
timestamp: RoughTimestamp,
}

let msg = TestMessage {
timestamp: RoughTimestamp::new(123),
};

let encoded = msg.encode_to_vec();
assert_eq!(encoded.len(), 5);
assert_eq!(TestMessage::decode(encoded.as_slice()), Ok(msg));
}
}
7 changes: 7 additions & 0 deletions crates/clock/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,13 @@ mod bilrost_encoding {
to encode proxied type (MillisSinceEpoch)
with general encodings including distinguished
);

bilrost::delegate_proxied_encoding!(
use encoding (bilrost::encoding::Fixed)
to encode proxied type (MillisSinceEpoch)
with encoding (bilrost::encoding::Fixed)
including distinguished
);
}

/// Nanos since the unix epoch. Used internally to get rough latency measurements across nodes.
Expand Down
7 changes: 7 additions & 0 deletions crates/clock/src/unique_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ mod bilrost_encoding {
to encode proxied type (UniqueTimestamp)
with general encodings including distinguished
);

bilrost::delegate_proxied_encoding!(
use encoding (bilrost::encoding::Fixed)
to encode proxied type (UniqueTimestamp)
with encoding (bilrost::encoding::Fixed)
including distinguished
);
}

#[cfg(test)]
Expand Down
150 changes: 143 additions & 7 deletions crates/encoding/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
// by the Apache License, Version 2.0.

use bilrost::{
DecodeErrorKind,
encoding::{EmptyState, ForOverwrite, Proxiable},
Canonicity, DecodeErrorKind,
encoding::{DistinguishedProxiable, EmptyState, ForOverwrite, Proxiable},
};
use restate_encoding_derive::BilrostNewType;
use restate_platform::network::NetSerde;

use crate::bilrost_encodings::RestateEncoding;
Expand Down Expand Up @@ -60,25 +59,114 @@ bilrost::delegate_proxied_encoding!(
);

/// A Bilrost compatible U128 type.
#[derive(Debug, Clone, Copy, PartialEq, Eq, BilrostNewType)]
pub struct U128((u64, u64));
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct U128([u8; core::mem::size_of::<u128>()]);

impl From<u128> for U128 {
fn from(value: u128) -> Self {
Self(((value >> 64) as u64, value as u64))
Self(value.to_le_bytes())
}
}

impl From<U128> for u128 {
fn from(value: U128) -> Self {
(value.0.0 as u128) << 64 | value.0.1 as u128
Self::from_le_bytes(value.0)
}
}

impl NetSerde for U128 {}

struct GeneralU128Tag;

impl Proxiable<GeneralU128Tag> for U128 {
type Proxy = (u64, u64);

fn encode_proxy(&self) -> Self::Proxy {
let value = u128::from(*self);
((value >> 64) as u64, value as u64)
}

fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> {
*self = U128::from((u128::from(proxy.0) << 64) | u128::from(proxy.1));
Ok(())
}
}

impl DistinguishedProxiable<GeneralU128Tag> for U128 {
fn decode_proxy_distinguished(
&mut self,
proxy: Self::Proxy,
) -> Result<Canonicity, DecodeErrorKind> {
<U128 as Proxiable<GeneralU128Tag>>::decode_proxy(self, proxy)?;
Ok(Canonicity::Canonical)
}
}

impl ForOverwrite<(), U128> for () {
fn for_overwrite() -> U128 {
U128([0; core::mem::size_of::<u128>()])
}
}

impl EmptyState<(), U128> for () {
fn empty() -> U128 {
U128([0; core::mem::size_of::<u128>()])
}

fn is_empty(val: &U128) -> bool {
val.0 == [0; core::mem::size_of::<u128>()]
}

fn clear(val: &mut U128) {
val.0 = [0; core::mem::size_of::<u128>()];
}
}

bilrost::delegate_proxied_encoding!(
use encoding (bilrost::encoding::General)
to encode proxied type (U128) using proxy tag (GeneralU128Tag)
with general encodings including distinguished
);

struct FixedU128Tag;

// Keep the general `U128` encoding as the historical `(u64, u64)` tuple above
// for existing wire compatibility. The fixed encoding is opt-in via
// `#[bilrost(encoding(fixed))]` and uses a length-delimited 16-byte little-endian
// payload because bilrost has no fixed128 scalar wire type.
impl Proxiable<FixedU128Tag> for U128 {
type Proxy = [u8; core::mem::size_of::<u128>()];

fn encode_proxy(&self) -> Self::Proxy {
self.0
}

fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> {
self.0 = proxy;
Ok(())
}
}

impl DistinguishedProxiable<FixedU128Tag> for U128 {
fn decode_proxy_distinguished(
&mut self,
proxy: Self::Proxy,
) -> Result<Canonicity, DecodeErrorKind> {
<U128 as Proxiable<FixedU128Tag>>::decode_proxy(self, proxy)?;
Ok(Canonicity::Canonical)
}
}

bilrost::delegate_proxied_encoding!(
use encoding (bilrost::encoding::PlainBytes)
to encode proxied type (U128) using proxy tag (FixedU128Tag)
with encoding (bilrost::encoding::Fixed)
including distinguished
);

#[cfg(test)]
mod test {
use bilrost::{Message, OwnedMessage};
use rand::random;

use super::U128;
Expand All @@ -92,4 +180,52 @@ mod test {
assert_eq!(num, u128::from(value));
});
}

#[test]
fn u128_fixed_bilrost_round_trip() {
#[derive(Debug, PartialEq, bilrost::Message)]
struct EncodedU128 {
#[bilrost(tag(1), encoding(fixed))]
value: U128,
}

let raw = 0x1234_5678_90ab_cdef_0123_4567_89ab_cdef;
let value = EncodedU128 {
value: U128::from(raw),
};
let encoded = value.encode_to_bytes();

assert_eq!(encoded.len(), 18);
assert_eq!(encoded[0], 0x05);
assert_eq!(encoded[1], 16);
assert_eq!(&encoded[2..], &raw.to_le_bytes());
assert_eq!(EncodedU128::decode(encoded).unwrap(), value);
}

#[test]
fn u128_general_bilrost_keeps_high_low_tuple_encoding() {
#[derive(Debug, PartialEq, bilrost::Message)]
struct EncodedU128 {
#[bilrost(tag(1))]
value: U128,
}

#[derive(Debug, PartialEq, bilrost::Message)]
struct HistoricalEncodedU128 {
#[bilrost(tag(1))]
value: (u64, u64),
}

let raw = 0x1234_5678_90ab_cdef_0123_4567_89ab_cdef;
let value = EncodedU128 {
value: U128::from(raw),
};
let historical = HistoricalEncodedU128 {
value: ((raw >> 64) as u64, raw as u64),
};
let encoded = value.encode_to_bytes();

assert_eq!(encoded, historical.encode_to_bytes());
assert_eq!(EncodedU128::decode(encoded).unwrap(), value);
}
}
4 changes: 4 additions & 0 deletions crates/partition-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,9 @@ tracing-subscriber = { workspace = true }
name = "basic_benchmark"
harness = false

[[bench]]
name = "vqueue_meta_merge"
harness = false

[lints]
workspace = true
Loading
Loading