From 069241652d0c25b972fca0799f0842b6c9b9506a Mon Sep 17 00:00:00 2001 From: Advaita Saha Date: Thu, 23 Apr 2026 14:06:18 +0530 Subject: [PATCH] =?UTF-8?q?=20=20-=20updateBase=20is=20now=20async,=20yiel?= =?UTF-8?q?ding=20once=20after=20db.persist()=20and=20once=20every=2016=20?= =?UTF-8?q?ancestors=20during=20the=20cleanup=20loop=20=E2=80=94=20so=20a?= =?UTF-8?q?=20full=20256-block=20batch=20gets=20=20=20~17=20yield=20points?= =?UTF-8?q?=20instead=20of=200.=20=20=20-=20Reordered=20so=20c.base=20/=20?= =?UTF-8?q?c.baseTxFrame=20pointer=20updates=20happen=20before=20the=20cle?= =?UTF-8?q?anup=20burst,=20with=20oldFrontier=20=3D=20base.parent=20captur?= =?UTF-8?q?ed=20first.=20If=20a=20shutdown=20=20=20=20cancels=20us=20mid-c?= =?UTF-8?q?leanup,=20ForkedChain=20invariants=20stay=20coherent.=20=20=20-?= =?UTF-8?q?=20Split=20wall-clock=20into=20persistMs=20+=20cleanupMs=20aggr?= =?UTF-8?q?egated=20over=20the=20batch=20and=20surfaced=20in=20the=20exist?= =?UTF-8?q?ing=20"Finalized=20blocks=20persisted"=20log=20=E2=80=94=20the?= =?UTF-8?q?=20=20=20cheapest=20possible=20instrumentation=20to=20tell=20yo?= =?UTF-8?q?u=20at=20a=20glance=20whether=20future=20stalls=20are=20disk=20?= =?UTF-8?q?or=20cleanup.=20=20=20-=20processUpdateBase=20now=20awaits=20th?= =?UTF-8?q?e=20async=20updateBase.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- execution_chain/core/chain/forked_chain.nim | 62 ++++++++++++++----- .../core/chain/forked_chain/chain_desc.nim | 8 +++ 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/execution_chain/core/chain/forked_chain.nim b/execution_chain/core/chain/forked_chain.nim index c21ea54646..7373ab8f99 100644 --- a/execution_chain/core/chain/forked_chain.nim +++ b/execution_chain/core/chain/forked_chain.nim @@ -295,7 +295,8 @@ func updateFinalized(c: ForkedChainRef, finalized: BlockRef, fcuHead: BlockRef) doAssert(candidate.isNil.not) c.latest = candidate -proc updateBase(c: ForkedChainRef, base: BlockRef): uint = +proc updateBase(c: ForkedChainRef, base: BlockRef): Future[uint] + {.async: (raises: [CancelledError]).} = ## ## A1 - A2 - A3 D5 - D6 ## / / @@ -338,20 +339,45 @@ with --debug-eager-state-root.""" # and prevent other modules accessing expired baseTxFrame. c.baseTxFrame = base.txFrame - # Cleanup in-memory blocks starting from base backward - # e.g. B2 backward. - var count = 0'u + let postPersistTime = Moment.now() + + # The disk-flush burst is done. Commit the new base pointer now, *before* + # the in-memory cleanup burst, so ForkedChain invariants (c.base, + # c.baseTxFrame) stay coherent even if cleanup is interrupted by shutdown + # cancellation. Capture `oldFrontier` first because `c.base.parent = nil` + # mutates `base.parent`, which the cleanup iterator walks. + let oldFrontier = base.parent + c.base = base + c.base.parent = nil + c.base.finalize() + + # Hand the chronos loop a chance to service pending RPC / networking before + # we enter the cleanup burst, which with a full persistBatchSize=256 can + # iterate hundreds of blocks scanning txRecords. + await sleepAsync(0.milliseconds) - for it in ancestors(base.parent): + # Cleanup in-memory blocks starting from the previous base backward + # e.g. B2 backward. Yield every `cleanupYieldChunk` ancestors so a single + # updateBase can't hog the event loop for the full cleanup duration. + const cleanupYieldChunk = 16 + var + count = 0'u + sinceYield = 0 + + for it in ancestors(oldFrontier): c.removeBlockFromCache(it) inc count + inc sinceYield + if sinceYield >= cleanupYieldChunk: + sinceYield = 0 + await sleepAsync(0.milliseconds) - # Update base branch - c.base = base - c.base.parent = nil + let finishTime = Moment.now() - # Base block always have finalized marker - c.base.finalize() + # Aggregate split timings for the "Finalized blocks persisted" log so we + # can see at a glance which phase is eating the budget. + c.persistMs += (postPersistTime - startTime).milliseconds + c.cleanupMs += (finishTime - postPersistTime).milliseconds if c.dynamicBatchSize: # Dynamicly adjust the persistBatchSize based on the recorded run time. @@ -367,9 +393,7 @@ with --debug-eager-state-root.""" batchSizeLowerBound = 4 batchSizeUpperBound = 256 - let - finishTime = Moment.now() - runTime = (finishTime - startTime).milliseconds + let runTime = (finishTime - startTime).milliseconds if runTime < targetTimeLowerBound and c.persistBatchSize < batchSizeUpperBound: c.persistBatchSize = min(c.persistBatchSize + 4, batchSizeUpperBound) @@ -385,7 +409,7 @@ with --debug-eager-state-root.""" proc processUpdateBase(c: ForkedChainRef): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = if c.baseQueue.len > 0: let base = c.baseQueue.popFirst() - c.persistedCount += c.updateBase(base) + c.persistedCount += await c.updateBase(base) const minLogInterval = 5 @@ -406,7 +430,9 @@ proc processUpdateBase(c: ForkedChainRef): Future[Result[void, string]] {.async: pendingFCU = c.pendingFCU.short, resolvedFinNum = c.latestFinalized.number, resolvedFinHash = c.latestFinalized.hash.short, - dbSnapshotsCount = c.baseTxFrame.aTx.db.snapshots.len() + dbSnapshotsCount = c.baseTxFrame.aTx.db.snapshots.len(), + persistMs = c.persistMs, + cleanupMs = c.cleanupMs else: debug "Finalized blocks persisted", nBlocks = c.persistedCount, @@ -416,9 +442,13 @@ proc processUpdateBase(c: ForkedChainRef): Future[Result[void, string]] {.async: pendingFCU = c.pendingFCU.short, resolvedFinNum = c.latestFinalized.number, resolvedFinHash = c.latestFinalized.hash.short, - dbSnapshotsCount = c.baseTxFrame.aTx.db.snapshots.len() + dbSnapshotsCount = c.baseTxFrame.aTx.db.snapshots.len(), + persistMs = c.persistMs, + cleanupMs = c.cleanupMs c.lastBaseLogTime = time c.persistedCount = 0 + c.persistMs = 0 + c.cleanupMs = 0 return ok() if c.queue.isNil: diff --git a/execution_chain/core/chain/forked_chain/chain_desc.nim b/execution_chain/core/chain/forked_chain/chain_desc.nim index f50b3d4526..446dc763c0 100644 --- a/execution_chain/core/chain/forked_chain/chain_desc.nim +++ b/execution_chain/core/chain/forked_chain/chain_desc.nim @@ -49,6 +49,14 @@ type # Count how many blocks persisted when `baseQueue` # consumed. + persistMs*: int64 + cleanupMs*: int64 + # Aggregated split timing across the same batch as `persistedCount`. + # `persistMs` covers state-root check + checkpoint + db.persist; `cleanupMs` + # covers the in-memory ancestors/removeBlockFromCache loop. Used to surface + # where the `updateBase` budget is going in the "Finalized blocks persisted" + # log so event-loop stalls can be diagnosed without a new metrics pipeline. + latest* : BlockRef # Every time a new block added, # that block automatically become the latest block.