wal-protocol: introduce v2 Envelope with bilrost encoding and lazy payloads#4695
Conversation
a6b4a7f to
d861565
Compare
a2210f5 to
5f85984
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5f85984ffd
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| /// partition key range | ||
| #[bilrost(tag(4))] | ||
| pub partition_key_range: Keys, |
There was a problem hiding this comment.
Preserve decoding of old durability records
In clusters upgrading with existing v1 WAL entries, v1::Envelope is still flexbuffers/serde decoded before the v1→v2 bridge runs (crates/wal-protocol/src/v1.rs:27, crates/wal-protocol/src/v1.rs:33), and Command::UpdatePartitionDurability embeds this PartitionDurability type (crates/wal-protocol/src/v1.rs:135). Adding a required serde field here means any pre-upgrade durability record that lacks partition_key_range will fail deserialization instead of replaying; please make this field serde-defaultable or keep the v1 payload shape separate and synthesize the key range during compatibility conversion.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
It's safe to serde(default) the partition_key_range since it will default to None. PartitionDurability records that are read by a partition are still filtered out by the partition_id, even if a None partition_key_range is still accepted by the range key_filter.
49520b0 to
ef120a1
Compare
| use super::NodeSet; | ||
|
|
||
| impl Proxiable for NodeSet { | ||
| type Proxy = Vec<PlainNodeId>; |
There was a problem hiding this comment.
Is it possible to avoid the extra allocation and serialize/deserialize directly into the indexmap?
There was a problem hiding this comment.
I will look into this one
| } | ||
|
|
||
| impl<R: Record> Envelope<R> { | ||
| pub fn new(source: Source, dedup: Dedup, payload: R::Payload) -> Self { |
There was a problem hiding this comment.
if payload is impl Into<Arc<R::Payload>> this makes it possible to pass an existing Arc down.
| // For backward compatibility with ProducerID::Other variant | ||
| // Drop in Restate v1.8 | ||
| #[bilrost(0)] | ||
| prefix: Option<String>, |
There was a problem hiding this comment.
Can we use ReString instead?
| } | ||
| } | ||
|
|
||
| impl<R: Send + Sync + 'static> StorageEncode for Envelope<R> { |
There was a problem hiding this comment.
Why do we need 'static?
There was a problem hiding this comment.
Because of PhantomData, I had to add Send + Sync otherwise the compiler compain with "R" cannot be shared between threads safely. Once I add both. Once I add both StorageEncode then start complaining that the parameter type "R" may not live long enough
There was a problem hiding this comment.
Ah, I see. It's because we require DowncastSync
|
|
||
| fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { | ||
| let len = self.header.encoded_len(); | ||
| buf.reserve(encoded_len_varint(len as u64) + len); |
There was a problem hiding this comment.
It'd be ideal if we can reserve enough space to account for the payload as well. This will need a small modification in StorageEncode trait to expose a method that calculates/estimates the length. I understand that this is beyond the scope of this particular change but can we leave a comment here so we can follow up on it?
| T: Record, | ||
| T::Payload: HasRecordKeys, | ||
| { | ||
| fn partial(payload: impl Into<Self::Payload>) -> PartialRecord { |
There was a problem hiding this comment.
Perhaps we should make it possible to construct it cheaply if we already have an Arc of the payload?
There was a problem hiding this comment.
Most payload types here are actually thin wrapper types around the inner message. For example, in crates/wal-protocol/src/v2/records.rs, the payload type for the ExternalStateMutation record is ExternalStateMutationPayload, which simply wraps state_mut::ExternalStateMutation while implementing the storage encode/decode traits and HasRecordKeys.
In practice, callers will almost always invoke partial() with the underlying state_mut::ExternalStateMutation directly.
Changing this to impl Into<Arc<..>> would no longer allow that usage pattern. However, one option would be to introduce a separate partial_arc method that can be used in those cases instead.
| impl StorageDecode for Header { | ||
| fn decode<B: bytes::Buf>( | ||
| buf: &mut B, | ||
| kind: StorageCodecKind, | ||
| ) -> Result<Self, StorageDecodeError> | ||
| where | ||
| Self: Sized, | ||
| { | ||
| // we use custom encoding because it's the length delimited version | ||
| // of bilrost | ||
| debug_assert_eq!(kind, StorageCodecKind::Custom); | ||
|
|
||
| Self::decode_length_delimited(buf) | ||
| .map_err(|err| StorageDecodeError::DecodeValue(err.into())) | ||
| } | ||
| } |
There was a problem hiding this comment.
I don't think the StorageDecode trait is the right place for this. If the envelope's codec is Custom, we know that the header is bilrost encoded.
18a5036 to
d84a9eb
Compare
AhmedSoliman
left a comment
There was a problem hiding this comment.
Approving to unblock, and as agreed. We can address some of the follow-ups in future PRs to reduce the merge conflict hell.
| write!(f, "]") | ||
| } | ||
|
|
||
| mod bilrost_impl { |
There was a problem hiding this comment.
nit: convention is bilrost_encoding
| // this actually should be a key-range but v1 unfortunately | ||
| // only hold the "start" of the range. | ||
| // will be fixed in v2 |
| // impl From<(Keys, v2::Header)> for v1::Header { | ||
| // fn from((keys, value): (Keys, v2::Header)) -> Self { | ||
| // let source = match value.source { | ||
| // v2::Source::Ingress => v1::Source::Ingress {}, | ||
| // v2::Source::ControlPlane => v1::Source::ControlPlane {}, | ||
| // v2::Source::Processor { leader_epoch } => v1::Source::Processor { | ||
| // partition_id: None, | ||
| // partition_key: None, | ||
| // leader_epoch, | ||
| // }, | ||
| // }; |
| pub index: MessageIndex, | ||
|
|
||
| #[bilrost(2)] | ||
| pub partition_key_range: Keys, |
There was a problem hiding this comment.
Perhaps KeyRange (from restate_sharding)? Keys is a bifrost type, it's trivial to convert between the two but separating them make it clear(er).
| )] | ||
| pub struct RestartAsNewInvocationRequestPayload(invocation::RestartAsNewInvocationRequest); | ||
|
|
||
| flexbuffers_storage_encode_decode!(RestartAsNewInvocationRequestPayload); |
There was a problem hiding this comment.
Do you have a plan to switch/redesign those types in the future?
There was a problem hiding this comment.
Yes. This should be fairly easy with the new v2.
|
|
||
| #[serde_with::serde_as] | ||
| #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] | ||
| #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, bilrost::Message)] |
There was a problem hiding this comment.
The PartialEq, and Eq are used in tests in assert statements. A quick way to compare 2 payloads.
There was a problem hiding this comment.
Can you ask claude to refactor the tests to avoid it please?
| } | ||
| } | ||
|
|
||
| impl<R: Send + Sync + 'static> StorageEncode for Envelope<R> { |
There was a problem hiding this comment.
Ah, I see. It's because we require DowncastSync
|
|
||
| /// Marker trait implemented by strongly-typed representations of WAL record | ||
| /// payloads. | ||
| pub trait Record: sealed::Sealed + Send + Sync + Clone + Copy + Sized { |
There was a problem hiding this comment.
Not sure if the trait needs to have all those constraints.
There was a problem hiding this comment.
Indeed. I will clean it up.
| #[derive( | ||
| Clone, Serialize, Deserialize, derive_more::Deref, derive_more::Into, derive_more::From, | ||
| )] | ||
| pub struct EffectPayload(Box<restate_worker_api::invoker::Effect>); |
There was a problem hiding this comment.
I guess it doesn't need to be boxed here but I understand that it originated as a box.
| seq: sn.sequence_number, | ||
| }, | ||
| (ProducerId::Other(prefix), DedupSequenceNumber::Sn(seq)) => v2::Dedup::Arbitrary { | ||
| prefix: Some(ReString::from(prefix.to_string())), |
There was a problem hiding this comment.
Let's use From on the string slice
…yloads ## Summary Add a new v2 envelope under `wal-protocol/src/v2/` that replaces the flexbuffers-encoded v1 layout. Key differences vs v1: - Header now carries (Source, Dedup, RecordKind) directly; the v1 Destination is gone. Partition routing moves to the log record's `Keys` instead of being embedded in the payload. - Encoded with bilrost (length-delimited header + payload) instead of flexbuffers, so readers can decode the header alone and skip the payload bytes when not needed. - `Envelope<R>` is generic over a `Record` marker type, with a `Raw` variant for not-yet-decoded payloads. `into_typed::<R>()` performs the typed decode lazily. - A `compatibility` module provides `TryFrom<v1::Envelope> for v2::Envelope<Raw>`, mapping every v1 Command variant to its v2 record kind so existing on-disk records keep replaying. Supporting changes: - Bilrost `Message` impls for AnnounceLeader, VersionBarrier, PartitionDurability, UpsertSchema (and bilrost proxy for NodeSet). - New `bilrost_storage_encode_decode!` macro in `restate_types`. - `PartitionDurability` gains `partition_key_range` so it can supply HasRecordKeys without help from the envelope header. - `wal-protocol` gets a `test-util` feature gating `Record::new_test`. Nothing in the worker writes v2 yet; this commit only adds the encoding layer and the v1→v2 bridge.
wal-protocol: introduce v2 Envelope with bilrost encoding and lazy payloads
Summary
Add a new v2 envelope under
wal-protocol/src/v2/that replaces theflexbuffers-encoded v1 layout. Key differences vs v1:
Destination is gone. Partition routing moves to the log record's
Keysinstead of being embedded in the payload.flexbuffers, so readers can decode the header alone and skip the
payload bytes when not needed.
Envelope<R>is generic over aRecordmarker type, with aRawvariant for not-yet-decoded payloads.
into_typed::<R>()performsthe typed decode lazily.
compatibilitymodule providesTryFrom<v1::Envelope> for v2::Envelope<Raw>, mapping every v1 Command variant to its v2record kind so existing on-disk records keep replaying.
Supporting changes:
Messageimpls for AnnounceLeader, VersionBarrier,PartitionDurability, UpsertSchema (and bilrost proxy for NodeSet).
bilrost_storage_encode_decode!macro inrestate_types.PartitionDurabilitygainspartition_key_rangeso it can supplyHasRecordKeys without help from the envelope header.
wal-protocolgets atest-utilfeature gatingRecord::new_test.Nothing in the worker writes v2 yet; this commit only adds the
encoding layer and the v1→v2 bridge.
Stack created with Sapling. Best reviewed with ReviewStack.