Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions execution_chain/core/chain/forked_chain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ func updateFinalized(c: ForkedChainRef, finalized: BlockRef, fcuHead: BlockRef)
doAssert(candidate.isNil.not)
c.latest = candidate

proc updateBase(c: ForkedChainRef, base: BlockRef): uint =
proc updateBase(c: ForkedChainRef, base: BlockRef): Future[uint]
{.async: (raises: [CancelledError]).} =
##
## A1 - A2 - A3 D5 - D6
## / /
Expand Down Expand Up @@ -338,20 +339,45 @@ with --debug-eager-state-root."""
# and prevent other modules accessing expired baseTxFrame.
c.baseTxFrame = base.txFrame

# Cleanup in-memory blocks starting from base backward
# e.g. B2 backward.
var count = 0'u
let postPersistTime = Moment.now()

# The disk-flush burst is done. Commit the new base pointer now, *before*
# the in-memory cleanup burst, so ForkedChain invariants (c.base,
# c.baseTxFrame) stay coherent even if cleanup is interrupted by shutdown
# cancellation. Capture `oldFrontier` first because `c.base.parent = nil`
# mutates `base.parent`, which the cleanup iterator walks.
let oldFrontier = base.parent
c.base = base
c.base.parent = nil
c.base.finalize()

# Hand the chronos loop a chance to service pending RPC / networking before
# we enter the cleanup burst, which with a full persistBatchSize=256 can
# iterate hundreds of blocks scanning txRecords.
await sleepAsync(0.milliseconds)

for it in ancestors(base.parent):
# Cleanup in-memory blocks starting from the previous base backward
# e.g. B2 backward. Yield every `cleanupYieldChunk` ancestors so a single
# updateBase can't hog the event loop for the full cleanup duration.
const cleanupYieldChunk = 16
var
count = 0'u
sinceYield = 0

for it in ancestors(oldFrontier):
c.removeBlockFromCache(it)
inc count
inc sinceYield
if sinceYield >= cleanupYieldChunk:
sinceYield = 0
await sleepAsync(0.milliseconds)
Copy link
Copy Markdown
Contributor

@bhartnett bhartnett Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be implemented in a simpler way. Why not just add an i index to the iterator and then if i mod 16 == 0 then run the sleep.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agreed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe the removeBlockFromCache function could become async so you can simply avoid the sleep altogether.


# Update base branch
c.base = base
c.base.parent = nil
let finishTime = Moment.now()

# Base block always have finalized marker
c.base.finalize()
# Aggregate split timings for the "Finalized blocks persisted" log so we
# can see at a glance which phase is eating the budget.
c.persistMs += (postPersistTime - startTime).milliseconds
c.cleanupMs += (finishTime - postPersistTime).milliseconds

if c.dynamicBatchSize:
# Dynamicly adjust the persistBatchSize based on the recorded run time.
Expand All @@ -367,9 +393,7 @@ with --debug-eager-state-root."""
batchSizeLowerBound = 4
batchSizeUpperBound = 256

let
finishTime = Moment.now()
runTime = (finishTime - startTime).milliseconds
let runTime = (finishTime - startTime).milliseconds
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would remove cleanupMs since that part is no longer blocking the event loop after this change.

Then runTime used here should just equal persistMs


if runTime < targetTimeLowerBound and c.persistBatchSize < batchSizeUpperBound:
c.persistBatchSize = min(c.persistBatchSize + 4, batchSizeUpperBound)
Expand All @@ -385,7 +409,7 @@ with --debug-eager-state-root."""
proc processUpdateBase(c: ForkedChainRef): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
if c.baseQueue.len > 0:
let base = c.baseQueue.popFirst()
c.persistedCount += c.updateBase(base)
c.persistedCount += await c.updateBase(base)

const
minLogInterval = 5
Expand All @@ -406,7 +430,9 @@ proc processUpdateBase(c: ForkedChainRef): Future[Result[void, string]] {.async:
pendingFCU = c.pendingFCU.short,
resolvedFinNum = c.latestFinalized.number,
resolvedFinHash = c.latestFinalized.hash.short,
dbSnapshotsCount = c.baseTxFrame.aTx.db.snapshots.len()
dbSnapshotsCount = c.baseTxFrame.aTx.db.snapshots.len(),
persistMs = c.persistMs,
cleanupMs = c.cleanupMs
else:
debug "Finalized blocks persisted",
nBlocks = c.persistedCount,
Expand All @@ -416,9 +442,13 @@ proc processUpdateBase(c: ForkedChainRef): Future[Result[void, string]] {.async:
pendingFCU = c.pendingFCU.short,
resolvedFinNum = c.latestFinalized.number,
resolvedFinHash = c.latestFinalized.hash.short,
dbSnapshotsCount = c.baseTxFrame.aTx.db.snapshots.len()
dbSnapshotsCount = c.baseTxFrame.aTx.db.snapshots.len(),
persistMs = c.persistMs,
cleanupMs = c.cleanupMs
c.lastBaseLogTime = time
c.persistedCount = 0
c.persistMs = 0
c.cleanupMs = 0
return ok()

if c.queue.isNil:
Expand Down
8 changes: 8 additions & 0 deletions execution_chain/core/chain/forked_chain/chain_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ type
# Count how many blocks persisted when `baseQueue`
# consumed.

persistMs*: int64
cleanupMs*: int64
# Aggregated split timing across the same batch as `persistedCount`.
# `persistMs` covers state-root check + checkpoint + db.persist; `cleanupMs`
# covers the in-memory ancestors/removeBlockFromCache loop. Used to surface
# where the `updateBase` budget is going in the "Finalized blocks persisted"
# log so event-loop stalls can be diagnosed without a new metrics pipeline.

latest* : BlockRef
# Every time a new block added,
# that block automatically become the latest block.
Expand Down
Loading