diff --git a/packages/rs-sdk/src/platform/address_sync/mod.rs b/packages/rs-sdk/src/platform/address_sync/mod.rs index b95fa877443..cd5ec73dc58 100644 --- a/packages/rs-sdk/src/platform/address_sync/mod.rs +++ b/packages/rs-sdk/src/platform/address_sync/mod.rs @@ -60,7 +60,8 @@ use dapi_grpc::platform::v0::{ get_recent_compacted_address_balance_changes_request, GetAddressesBranchStateRequest, GetRecentAddressBalanceChangesRequest, GetRecentCompactedAddressBalanceChangesRequest, Proof, }; -use dpp::balances::credits::{BlockAwareCreditOperation, CreditOperation}; +use dpp::address_funds::PlatformAddress; +use dpp::balances::credits::{BlockAwareCreditOperation, CreditOperation, Credits}; use dpp::prelude::AddressNonce; use dpp::version::PlatformVersion; use drive::drive::{Drive, RootTree}; @@ -457,9 +458,11 @@ async fn incremental_catch_up( result: &mut AddressSyncResult, settings: RequestSettings, ) -> Result<(), Error> { - // `key_to_tag` is already keyed by raw GroveDB bytes with - // `(tag, address)` values, so it can serve as the lookup directly. - let address_lookup = key_to_tag; + // Use the borrowed `key_to_tag` directly through the pass — only + // unknown-address replay (rare, end-of-pass) materializes any extra + // allocation. Buffered misses are bounded by the count of foreign / + // post-snapshot addresses in the response. + let mut pending_unknown: Vec = Vec::new(); let mut current_height = start_height; let mut observed_tip_height = start_height; @@ -614,39 +617,18 @@ async fn incremental_catch_up( result.metrics.compacted_entries_returned += entry_count; for entry in &entries { - for (platform_addr, credit_op) in &entry.changes { - let addr_bytes = platform_addr.to_bytes(); - if let Some(&(tag, address)) = address_lookup.get(&addr_bytes) { - let result_key = (tag, address); - let current_balance = result - .found - .get(&result_key) - .map(|f| f.balance) - .unwrap_or(0); - - let new_balance = match credit_op { - BlockAwareCreditOperation::SetCredits(credits) => *credits, - BlockAwareCreditOperation::AddToCreditsOperations(operations) => { - let total_to_add: u64 = operations - .iter() - .filter(|(height, _)| **height >= current_height) - .map(|(_, credits)| *credits) - .fold(0u64, |acc, c| acc.saturating_add(c)); - current_balance.saturating_add(total_to_add) - } - }; - - if new_balance != current_balance { - let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); - let funds = AddressFunds { - nonce, - balance: new_balance, - }; - result.found.insert(result_key, funds); - provider.on_address_found(tag, &address, funds).await; - } - } - } + apply_block_changes( + key_to_tag, + entry + .changes + .iter() + .map(|(a, op)| (a, AddressBalanceChange::Compacted(op))), + current_height, + provider, + result, + &mut pending_unknown, + ) + .await; if entry.end_block_height.saturating_add(1) > current_height { current_height = entry.end_block_height.saturating_add(1); @@ -677,34 +659,18 @@ async fn incremental_catch_up( highest_recent_block = entry.block_height; } - for (platform_addr, credit_op) in &entry.changes { - let addr_bytes = platform_addr.to_bytes(); - if let Some(&(tag, address)) = address_lookup.get(&addr_bytes) { - let result_key = (tag, address); - let current_balance = result - .found - .get(&result_key) - .map(|f| f.balance) - .unwrap_or(0); - - let new_balance = match credit_op { - CreditOperation::SetCredits(credits) => *credits, - CreditOperation::AddToCredits(credits) => { - current_balance.saturating_add(*credits) - } - }; - - if new_balance != current_balance { - let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); - let funds = AddressFunds { - nonce, - balance: new_balance, - }; - result.found.insert(result_key, funds); - provider.on_address_found(tag, &address, funds).await; - } - } - } + apply_block_changes( + key_to_tag, + entry + .changes + .iter() + .map(|(a, op)| (a, AddressBalanceChange::Recent(op))), + current_height, + provider, + result, + &mut pending_unknown, + ) + .await; if entry.block_height.saturating_add(1) > current_height { current_height = entry.block_height.saturating_add(1); @@ -712,6 +678,11 @@ async fn incremental_catch_up( } } + // Single end-of-pass recovery: foreign-wallet addresses fall out at + // the extras-intersection check, so no per-block refresh and no log + // flood on multi-wallet chains. + refresh_and_replay_unknown(key_to_tag, pending_unknown, provider, result).await; + result.new_sync_height = current_height.max(observed_tip_height); // Store the highest block from the recent entries so the next sync can // use RangeAfter(this_height) for compaction detection. @@ -723,6 +694,238 @@ async fn incremental_catch_up( Ok(()) } +// ── Address-balance change application ──────────────────────────────── + +/// A single address balance change, abstracting the recent (`CreditOperation`) +/// and compacted (`BlockAwareCreditOperation`) shapes so one pure function can +/// apply both phases identically. +#[derive(Clone, Copy)] +pub(crate) enum AddressBalanceChange<'a> { + /// A recent (per-block) credit operation. + Recent(&'a CreditOperation), + /// A compacted (block-range) credit operation. + Compacted(&'a BlockAwareCreditOperation), +} + +impl AddressBalanceChange<'_> { + /// Resolve the post-change balance given the address's current balance and + /// the catch-up cursor height. Mirrors the two original inline loops + /// exactly (compacted height-filtered sum vs. recent flat add). + fn new_balance(&self, current_balance: Credits, current_height: u64) -> Credits { + match self { + AddressBalanceChange::Recent(op) => match op { + CreditOperation::SetCredits(credits) => *credits, + CreditOperation::AddToCredits(credits) => current_balance.saturating_add(*credits), + }, + AddressBalanceChange::Compacted(op) => match op { + BlockAwareCreditOperation::SetCredits(credits) => *credits, + BlockAwareCreditOperation::AddToCreditsOperations(operations) => { + let total_to_add: u64 = operations + .iter() + .filter(|(height, _)| **height >= current_height) + .map(|(_, credits)| *credits) + .fold(0u64, |acc, c| acc.saturating_add(c)); + current_balance.saturating_add(total_to_add) + } + }, + } + } + + /// Owned snapshot of the change for end-of-pass replay. Cheap for + /// `Recent` (the inner op is `Copy`); clones the operations vector for + /// `Compacted`. Only called for unknown addresses. + fn into_owned(self) -> OwnedAddressBalanceChange { + match self { + AddressBalanceChange::Recent(op) => OwnedAddressBalanceChange::Recent(*op), + AddressBalanceChange::Compacted(op) => OwnedAddressBalanceChange::Compacted(op.clone()), + } + } +} + +/// Owned counterpart of [`AddressBalanceChange`] so unknown-address changes +/// can outlive the per-block iterator and be replayed at end-of-pass. +#[derive(Clone)] +pub(crate) enum OwnedAddressBalanceChange { + Recent(CreditOperation), + Compacted(BlockAwareCreditOperation), +} + +impl OwnedAddressBalanceChange { + fn as_borrowed(&self) -> AddressBalanceChange<'_> { + match self { + OwnedAddressBalanceChange::Recent(op) => AddressBalanceChange::Recent(op), + OwnedAddressBalanceChange::Compacted(op) => AddressBalanceChange::Compacted(op), + } + } +} + +/// A single change for an address that wasn't in the entry-time snapshot. +/// Buffered across the catch-up pass and replayed once at the end after a +/// single `pending_addresses()` refresh. +pub(crate) struct PendingUnknownChange { + /// Raw GroveDB key bytes — joined against the refreshed lookup. + key: Vec, + /// Owned change so the underlying response entries can be dropped. + change: OwnedAddressBalanceChange, + /// Catch-up cursor at the time of the original block — feeds the + /// compacted height filter on replay. Ignored by `Recent`. + current_height: u64, +} + +/// Apply one block's changes against the borrowed entry-time lookup, drive +/// `on_address_found` for every known address whose balance moved, and +/// append unknown-address changes to `pending_unknown` for a single +/// end-of-pass refresh + replay. The refresh is deliberately deferred so +/// foreign-wallet addresses on a shared chain do not trigger a per-block +/// provider poll. +async fn apply_block_changes<'a, P, I>( + address_lookup: &HashMap, (P::Tag, P::Address)>, + changes: I, + current_height: u64, + provider: &mut P, + result: &mut AddressSyncResult, + pending_unknown: &mut Vec, +) where + P: AddressProvider, + I: IntoIterator)>, +{ + let mut local_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); + + for (platform_addr, change) in changes { + let addr_bytes = platform_addr.to_bytes(); + if let Some(&(tag, address)) = address_lookup.get(&addr_bytes) { + let result_key = (tag, address); + let current_balance = result + .found + .get(&result_key) + .map(|f| f.balance) + .unwrap_or(0); + + let new_balance = change.new_balance(current_balance, current_height); + + if new_balance != current_balance { + // TODO: incremental RPCs carry only balance deltas, never + // nonces — addresses first seen here get nonce=0. Clients + // recover via `AddressInvalidNonceError.expected_nonce`; + // a proper fix would fetch authoritative `AddressFunds` + // or model `nonce` as `Option`. + let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); + let funds = AddressFunds { + nonce, + balance: new_balance, + }; + result.absent.remove(&result_key); + result.found.insert(result_key, funds); + local_applied.push((tag, address, funds)); + } + } else { + pending_unknown.push(PendingUnknownChange { + key: addr_bytes, + change: change.into_owned(), + current_height, + }); + } + } + + for (tag, address, funds) in &local_applied { + provider.on_address_found(*tag, address, *funds).await; + } +} + +/// End-of-pass recovery for addresses missing from the entry-time +/// snapshot. Re-polls `pending_addresses()` exactly once, builds a small +/// `extras` map of newly-derived addresses, and replays only the buffered +/// changes that match an `extras` entry. Foreign (other-wallet) addresses +/// fall out at the intersection check — no provider refresh storm, no +/// log flood. +async fn refresh_and_replay_unknown( + key_to_tag: &HashMap, (P::Tag, P::Address)>, + pending_unknown: Vec, + provider: &mut P, + result: &mut AddressSyncResult, +) { + if pending_unknown.is_empty() { + return; + } + + // Build the set of unknown keys for a fast intersection probe. + let unknown_keys: std::collections::HashSet<&[u8]> = + pending_unknown.iter().map(|p| p.key.as_slice()).collect(); + + // Only addresses the provider can now produce AND that match a + // buffered miss are interesting — everything else is some other + // wallet's address and stays out of the lookup entirely. + let mut extras: HashMap, (P::Tag, P::Address)> = HashMap::new(); + for (tag, address) in provider.pending_addresses() { + let bytes = address.to_bytes(); + if unknown_keys.contains(bytes.as_slice()) && !key_to_tag.contains_key(&bytes) { + extras.insert(bytes, (tag, address)); + } + } + + if extras.is_empty() { + // Common case on a populated multi-wallet chain: every buffered + // unknown belongs to another wallet. + debug!( + "Address sync: {} platform-reported balance change(s) reference \ + address(es) not tracked by this wallet; ignoring", + pending_unknown.len() + ); + return; + } + + // Replay only the entries whose key actually resolves in `extras`. + // Order is preserved (compacted first, then recent — same as the + // forward pass), so `AddToCredits` deltas accumulate correctly. The + // catch-up cursor per change is preserved so the compacted height + // filter still sees the same `current_height` it would have seen on + // the forward pass. + let mut replay_applied: Vec<(P::Tag, P::Address, AddressFunds)> = Vec::new(); + let mut still_unknown: usize = 0; + for pending in &pending_unknown { + let Some(&(tag, address)) = extras.get(pending.key.as_slice()) else { + still_unknown += 1; + continue; + }; + let result_key = (tag, address); + let current_balance = result + .found + .get(&result_key) + .map(|f| f.balance) + .unwrap_or(0); + let new_balance = pending + .change + .as_borrowed() + .new_balance(current_balance, pending.current_height); + + if new_balance != current_balance { + // TODO: same synthesized nonce=0 gap as the forward pass. + let nonce = result.found.get(&result_key).map(|f| f.nonce).unwrap_or(0); + let funds = AddressFunds { + nonce, + balance: new_balance, + }; + result.absent.remove(&result_key); + result.found.insert(result_key, funds); + replay_applied.push((tag, address, funds)); + } + } + + for (tag, address, funds) in &replay_applied { + provider.on_address_found(*tag, address, *funds).await; + } + + if still_unknown > 0 { + debug!( + "Address sync: {} platform-reported balance change(s) reference \ + address(es) not tracked by this wallet (refresh recovered {} \ + other(s)); ignoring the untracked entries", + still_unknown, + replay_applied.len() + ); + } +} + /// Extract the highest block height from the recent tree boundaries in the proof. /// /// Returns: @@ -1381,4 +1584,420 @@ mod tests { "expected balance conversion error, got: {err:?}" ); } + + // ── End-of-pass refresh + replay regression guards ───────────────── + + use dpp::address_funds::PlatformAddress; + use dpp::balances::credits::BlockAwareCreditOperation; + + fn p2pkh(byte: u8) -> PlatformAddress { + PlatformAddress::P2pkh([byte; 20]) + } + + /// A provider that derives a fresh address mid-pass — so the + /// entry-time lookup misses it — gets the balance applied AND + /// `on_address_found` fired after the end-of-pass refresh. + #[tokio::test] + async fn apply_block_changes_recovers_post_snapshot_address() { + use async_trait::async_trait; + + struct GrowingProvider { + late: PlatformAddress, + found: Vec<(u32, PlatformAddress, AddressFunds)>, + } + + #[async_trait] + impl AddressProvider for GrowingProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + std::iter::once((7u32, self.late)) + } + + async fn on_address_found( + &mut self, + tag: Self::Tag, + address: &Self::Address, + funds: AddressFunds, + ) { + self.found.push((tag, *address, funds)); + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let late = p2pkh(0xCD); + + let lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + + let mut provider = GrowingProvider { + late, + found: Vec::new(), + }; + let mut result: AddressSyncResult = AddressSyncResult::new(); + let mut pending_unknown: Vec = Vec::new(); + + let op = BlockAwareCreditOperation::SetCredits(42_000); + let changes = [(&late, AddressBalanceChange::Compacted(&op))]; + + apply_block_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut provider, + &mut result, + &mut pending_unknown, + ) + .await; + + // Per-block apply must NOT touch the provider for unknowns — + // the refresh is deferred to end-of-pass. + assert!( + provider.found.is_empty(), + "no on_address_found before end-of-pass refresh" + ); + assert_eq!(pending_unknown.len(), 1, "miss is buffered for replay"); + + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + + assert_eq!( + result.found.get(&(7u32, late)).map(|f| f.balance), + Some(42_000), + "post-snapshot address balance must be applied after refresh" + ); + assert!( + provider + .found + .iter() + .any(|(t, a, f)| *t == 7 && *a == late && f.balance == 42_000), + "on_address_found must fire for the recovered post-snapshot address" + ); + } + + /// A known address proven absent by the tree scan but re-discovered + /// by an incremental change is moved into `found` and pruned from + /// `absent`, keeping the two sets disjoint. + #[tokio::test] + async fn apply_block_changes_keeps_found_and_absent_disjoint_on_catch_up() { + use async_trait::async_trait; + + struct NoopProvider; + + #[async_trait] + impl AddressProvider for NoopProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + std::iter::empty() + } + + async fn on_address_found( + &mut self, + _tag: Self::Tag, + _address: &Self::Address, + _funds: AddressFunds, + ) { + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let tag: u32 = 5; + let addr = p2pkh(0x99); + + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(addr.to_bytes(), (tag, addr)); + + let mut result: AddressSyncResult = AddressSyncResult::new(); + result.absent.insert((tag, addr)); + + let op = BlockAwareCreditOperation::SetCredits(7_777); + let changes = [(&addr, AddressBalanceChange::Compacted(&op))]; + + let mut pending_unknown: Vec = Vec::new(); + apply_block_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut NoopProvider, + &mut result, + &mut pending_unknown, + ) + .await; + + assert_eq!( + result.found.get(&(tag, addr)).map(|f| f.balance), + Some(7_777), + ); + assert!( + !result.absent.contains(&(tag, addr)), + "apply_block_changes must keep found/absent disjoint" + ); + assert!( + pending_unknown.is_empty(), + "no unknowns expected for a known address" + ); + } + + /// The end-of-pass refresh must not double-count a known address's + /// `AddToCredits` delta when it replays the unknown subset in the + /// same block (the replay must exclude already-applied addresses). + #[tokio::test] + async fn refresh_does_not_double_count_known_address_delta() { + use async_trait::async_trait; + + let known = p2pkh(0x11); + let late = p2pkh(0x22); + + struct GrowingProvider { + late: PlatformAddress, + } + + #[async_trait] + impl AddressProvider for GrowingProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + std::iter::once((9u32, self.late)) + } + + async fn on_address_found( + &mut self, + _tag: Self::Tag, + _address: &Self::Address, + _funds: AddressFunds, + ) { + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + // `known` is in the snapshot with a starting balance; `late` is + // not (post-snapshot) and forces the refresh + replay path. + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(known.to_bytes(), (3u32, known)); + + let mut result: AddressSyncResult = AddressSyncResult::new(); + result.found.insert( + (3u32, known), + AddressFunds { + nonce: 0, + balance: 1_000, + }, + ); + + let mut provider = GrowingProvider { late }; + + let known_op = BlockAwareCreditOperation::AddToCreditsOperations( + std::iter::once((0u64, 500u64)).collect(), + ); + let late_op = BlockAwareCreditOperation::SetCredits(7_000); + let changes = [ + (&known, AddressBalanceChange::Compacted(&known_op)), + (&late, AddressBalanceChange::Compacted(&late_op)), + ]; + + let mut pending_unknown: Vec = Vec::new(); + apply_block_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut provider, + &mut result, + &mut pending_unknown, + ) + .await; + // Known address was applied immediately (no longer waits on the + // end-of-pass refresh). `late` is buffered for replay. + assert_eq!(pending_unknown.len(), 1); + + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + + // Known delta applied exactly once: 1000 + 500 (NOT 1000 + 500 + + // 500). The replay must skip the already-applied known address — + // here that is guaranteed structurally because the replay only + // walks the buffered misses, not the full change set. + assert_eq!( + result.found.get(&(3u32, known)).map(|f| f.balance), + Some(1_500), + "known AddToCredits delta must apply exactly once across refresh" + ); + assert_eq!( + result.found.get(&(9u32, late)).map(|f| f.balance), + Some(7_000), + "post-snapshot address still recovered after refresh" + ); + } + + /// A foreign address (not in the lookup, never produced by the + /// provider) is silently ignored — no `on_address_found`, no + /// `result.found` insert, no `result.absent` mutation, and exactly + /// one provider refresh for the whole pass. + #[tokio::test] + async fn apply_block_changes_ignores_foreign_address_without_refresh_storm() { + use async_trait::async_trait; + + struct CountingNoopProvider { + pending_polls: std::sync::atomic::AtomicUsize, + found_calls: usize, + } + + #[async_trait] + impl AddressProvider for CountingNoopProvider { + type Tag = u32; + type Address = PlatformAddress; + + fn gap_limit(&self) -> AddressIndex { + 0 + } + + fn pending_addresses(&self) -> impl Iterator + '_ { + self.pending_polls + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + std::iter::empty() + } + + async fn on_address_found( + &mut self, + _tag: Self::Tag, + _address: &Self::Address, + _funds: AddressFunds, + ) { + self.found_calls += 1; + } + + async fn on_address_absent(&mut self, _tag: Self::Tag, _address: &Self::Address) {} + + fn current_balances( + &self, + ) -> impl Iterator + '_ { + std::iter::empty() + } + } + + let mine = p2pkh(0x01); + let foreign_1 = p2pkh(0xF1); + let foreign_2 = p2pkh(0xF2); + let foreign_3 = p2pkh(0xF3); + + let mut lookup: HashMap, (u32, PlatformAddress)> = HashMap::new(); + lookup.insert(mine.to_bytes(), (1u32, mine)); + + let mut result: AddressSyncResult = AddressSyncResult::new(); + let mut provider = CountingNoopProvider { + pending_polls: std::sync::atomic::AtomicUsize::new(0), + found_calls: 0, + }; + let mut pending_unknown: Vec = Vec::new(); + + // Three separate "blocks" (representing the per-entry calls + // inside `incremental_catch_up`), every change but the first + // belongs to another wallet. + for (addr, credits) in [ + (&mine, 1_000), + (&foreign_1, 5_000), + (&foreign_2, 5_000), + (&foreign_3, 5_000), + ] { + let op = BlockAwareCreditOperation::SetCredits(credits); + let changes = [(addr, AddressBalanceChange::Compacted(&op))]; + apply_block_changes( + &lookup, + changes.iter().map(|(a, c)| (*a, *c)), + 0, + &mut provider, + &mut result, + &mut pending_unknown, + ) + .await; + } + + // Per-block apply must NEVER refresh the provider — the refresh + // runs once, at end of pass. + assert_eq!( + provider + .pending_polls + .load(std::sync::atomic::Ordering::Relaxed), + 0, + "no per-block pending_addresses() polls — refresh is end-of-pass only" + ); + + // The end-of-pass refresh runs exactly once. + refresh_and_replay_unknown(&lookup, pending_unknown, &mut provider, &mut result).await; + assert_eq!( + provider + .pending_polls + .load(std::sync::atomic::Ordering::Relaxed), + 1, + "end-of-pass refresh must poll the provider exactly once" + ); + + // Foreign addresses must not surface as `found` or fire callbacks. + assert_eq!( + result.found.len(), + 1, + "only the known address is in `found` (foreign addresses ignored)" + ); + assert_eq!( + result.found.get(&(1u32, mine)).map(|f| f.balance), + Some(1_000), + "known address applied" + ); + assert!( + !result + .found + .keys() + .any(|(_, a)| *a == foreign_1 || *a == foreign_2 || *a == foreign_3), + "no foreign address may be inserted into `result.found`" + ); + assert!( + result.absent.is_empty(), + "foreign addresses must not be marked `absent` either" + ); + assert_eq!( + provider.found_calls, 1, + "on_address_found fires only for the known address" + ); + + // `found` and `absent` stay disjoint. + for key in result.found.keys() { + assert!( + !result.absent.contains(key), + "found ∩ absent must be empty: {key:?} in both" + ); + } + } }