Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4571d88
Remove resetPayload from concurrent lru and use the new cache in the …
bhartnett May 13, 2026
17dfd32
Add -Wno-psabi flag.
bhartnett May 13, 2026
6b7dbc3
Implement initial version of optimistic state pre-fetching.
bhartnett May 14, 2026
cd7b0ce
Fixes.
bhartnett May 14, 2026
5dc400d
More fixes.
bhartnett May 14, 2026
38af9a6
Clean up database between eest tests.
bhartnett May 15, 2026
9cdf2ba
Use parent txFrame.
bhartnett May 15, 2026
378b23b
Merge branch 'master' into optimistic-state-prefetch
bhartnett May 15, 2026
f0d7566
Fix memory leaks.
bhartnett May 15, 2026
fa4a7ad
Test optimisticStatePrefetch enabled in CI.
bhartnett May 17, 2026
a272f28
Merge branch 'master' into optimistic-state-prefetch
bhartnett May 18, 2026
f8afbb3
Updates.
bhartnett May 18, 2026
1a5a161
Fix copyright.
bhartnett May 18, 2026
cfe84f2
Refactor common code out of processTransaction and prefetchTransaction.
bhartnett May 18, 2026
2135acb
Add concurrency helpers in concurrency/utils.nim.
bhartnett May 18, 2026
7bc9bcd
Cleanup.
bhartnett May 19, 2026
af05238
Refactor.
bhartnett May 19, 2026
17ec865
Cleanup commented code.
bhartnett May 19, 2026
31e900e
Remove gas costs export.
bhartnett May 19, 2026
1a267c7
Merge branch 'master' into optimistic-state-prefetch
bhartnett May 19, 2026
b4db54c
Combine recoverTask and prefetchTask.
bhartnett May 19, 2026
35d0a64
Merge branch 'master' into optimistic-state-prefetch
bhartnett May 20, 2026
c81f1a4
Update parallelStateRootComputation defaults.
bhartnett May 20, 2026
a45fdbd
More performance improvements.
bhartnett May 20, 2026
358931c
Enable taskpool and optimistic state prefetch in tests.
bhartnett May 20, 2026
3a5bef6
Preload trusted setup to avoid race condition during block execution …
bhartnett May 20, 2026
e04aab0
Fixes.
bhartnett May 20, 2026
e1e979e
Make in memory db caches configurable. Enable in eest tests.
bhartnett May 20, 2026
ef6d05a
Merge branch 'master' into optimistic-state-prefetch
bhartnett May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions execution_chain/common/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ type
statelessWitnessValidation*: bool
## Enable full validation of execution witnesses.

optimisticStatePrefetch*: bool
## Optimistically pre-execute the transactions of a block on background
## threads to warm database caches before the main thread executes them.

# ------------------------------------------------------------------------------
# Private helper functions
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -176,7 +180,8 @@ proc init(com : CommonRef,
genesis : Genesis,
initializeDb: bool,
statelessProviderEnabled: bool,
statelessWitnessValidation: bool) =
statelessWitnessValidation: bool,
optimisticStatePrefetch: bool) =


config.daoCheck()
Expand Down Expand Up @@ -215,6 +220,7 @@ proc init(com : CommonRef,

com.statelessProviderEnabled = statelessProviderEnabled
com.statelessWitnessValidation = statelessWitnessValidation
com.optimisticStatePrefetch = optimisticStatePrefetch

proc isBlockAfterTtd(com: CommonRef, header: Header, txFrame: CoreDbTxRef): bool =
if com.config.terminalTotalDifficulty.isNone:
Expand All @@ -239,6 +245,7 @@ proc new*(
initializeDb = true;
statelessProviderEnabled = false;
statelessWitnessValidation = false;
optimisticStatePrefetch = false;
): CommonRef =

## If genesis data is present, the forkIds will be initialized
Expand All @@ -251,7 +258,8 @@ proc new*(
params.genesis,
initializeDb,
statelessProviderEnabled,
statelessWitnessValidation)
statelessWitnessValidation,
optimisticStatePrefetch)

proc new*(
_: type CommonRef;
Expand All @@ -260,7 +268,8 @@ proc new*(
networkId: NetworkId = MainNet;
initializeDb = true;
statelessProviderEnabled = false;
statelessWitnessValidation = false
statelessWitnessValidation = false;
optimisticStatePrefetch = false;
): CommonRef =

## There is no genesis data present
Expand All @@ -273,7 +282,8 @@ proc new*(
nil,
initializeDb,
statelessProviderEnabled,
statelessWitnessValidation)
statelessWitnessValidation,
optimisticStatePrefetch)

func clone*(com: CommonRef, db: CoreDbRef): CommonRef =
## clone but replace the db
Expand All @@ -287,7 +297,8 @@ func clone*(com: CommonRef, db: CoreDbRef): CommonRef =
genesisHeader: com.genesisHeader,
networkId : com.networkId,
statelessProviderEnabled: com.statelessProviderEnabled,
statelessWitnessValidation: com.statelessWitnessValidation
statelessWitnessValidation: com.statelessWitnessValidation,
optimisticStatePrefetch: com.optimisticStatePrefetch
)

func clone*(com: CommonRef): CommonRef =
Expand Down
28 changes: 28 additions & 0 deletions execution_chain/concurrency/utils.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Nimbus
# Copyright (c) 2026 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

# These borrow functions are a workaround to avoid updating the ref count
# of a ref type when assigning it into another ref type or heap object.
# This is needed because refc doesn't support atomic reference counts
# and when passing in a ref type from the main thread as a parameter
# of a new ref instance in a child thread/task the ref count update can cause
# memory corruption and crashes due to a race with the ref counts being updated
# in both the main thread and the child thread concurrently.
#
# The unborrowRef function is needed to cleanup the borrowed ref after usage
# and before the GC runs on the ref type containing the borrowed ref.

template borrowRef*[T](dest, src: ref T) =
# Copies the ref type without updating the ref count.
copyMem(addr dest, addr src, sizeof(pointer))

template unborrowRef*[T](dest: ref T) =
# Sets the ref type back to nil without updating the ref count.
var p: pointer = nil
copyMem(addr dest, addr p, sizeof(pointer))
7 changes: 7 additions & 0 deletions execution_chain/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,13 @@ type
desc: "Compute state root in parallel using multiple threads"
name: "debug-parallel-state-root".}: bool

optimisticStatePrefetch* {.
hidden
defaultValue: true
desc: "Optimistically pre-execute block transactions on background " &
"threads to warm DB caches"
name: "debug-optimistic-state-prefetch".}: bool

eagerStateRootCheck* {.
hidden
desc: "Eagerly check state roots when syncing finalized blocks"
Expand Down
159 changes: 124 additions & 35 deletions execution_chain/core/executor/process_block.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,51 +18,140 @@ import
../../transaction,
../../evm/state,
../../evm/types,
../../evm/interpreter/gas_costs,
../../block_access_list/block_access_list_validation,
../../concurrency/utils,
../dao,
../eip6110,
./calculate_reward,
./executor_helpers,
./process_transaction,
eth/common/[keys, transaction_utils],
chronicles,
results
results,
stew/assign2

when compileOption("threads"):
import taskpools

template withSenderParallel(txs: openArray[Transaction], body: untyped, taskpool: Taskpool) =
type Entry = (Signature, Hash32, Flowvar[Address])

proc recoverTask(e: ptr Entry): Address {.nimcall.} =
let pk = recover(e[][0], SkMessage(e[][1].data))
if pk.isOk():
pk[].to(Address)
else:
default(Address)

var entries = newSeq[Entry](txs.len)

# Prepare signature recovery tasks for each transaction - for simplicity,
# we use `default(Address)` to signal sig check failure
import std/atomics, taskpools

type
Entry = object
sig: Signature
hash: Hash32
sender: Address
senderReady: Atomic[bool]
fut: Flowvar[bool]

PrefetchCtx = object
cancel: Atomic[bool]
parent: Header
blockCtx: BlockContext
com: CommonRef
txFrame: CoreDbTxRef

proc recoverAndPrefetchTask(
e: ptr Entry, ctx: ptr PrefetchCtx, tx: ptr Transaction): bool {.nimcall.} =

# Recover the sender from the signature. `default(Address)` signals sig
# check failure.
let
pk = recover(e[].sig, SkMessage(e[].hash.data))
sender =
if pk.isOk(): pk[].to(Address)
else: default(Address)
e[].sender = sender
e[].senderReady.store(true, moRelease)

# When ctx is non-nil, optimistic state prefetch is enabled
if ctx.isNil() or sender == default(Address) or ctx[].cancel.load(moAcquire):
return true

# Create the ledger without triggering a ref count increment on the txFrame
# which is owned by the main/parent thread.
let ledger = LedgerRef()
ledger.txFrame.borrowRef(ctx[].txFrame)
defer:
ledger.txFrame.unborrowRef()
discard ledger.beginSavePoint()

# Create the vmState without triggering a ref count increment on the common object
# which is owned by the main/parent thread.
let vmState = BaseVMState()
vmState.com.borrowRef(ctx[].com)
defer:
vmState.com.unborrowRef()
vmState.ledger = ledger
assign(vmState.parent, ctx[].parent)
assign(vmState.blockCtx, ctx[].blockCtx)
const txCtx = default(TxContext)
assign(vmState.txCtx, txCtx)
vmState.hardFork = vmState.determineFork
vmState.fork = ToEVMFork[vmState.hardFork]
vmState.gasCosts = vmState.fork.forkToSchedule
vmState.tracer = nil
vmState.receipts.setLen(0)
vmState.cumulativeGasUsed = 0
vmState.blockRegularGasUsed = 0
vmState.blockStateGasUsed = 0
vmState.blobGasUsed = 0'u64
vmState.allLogs.setLen(0)
vmState.gasRefunded = 0
vmState.balTracker = nil

# Execute the transaction discarding the results in order to fill the in memory caches.
vmState.prefetchTransaction(tx[], sender)

true

template withSenderParallel(
vmState: BaseVMState, txs: openArray[Transaction], body: untyped) =
doAssert not vmState.com.taskpool.isNil()

var
entries = newSeq[Entry](txs.len)
ctx: PrefetchCtx
ctxPtr: ptr PrefetchCtx = nil

if vmState.com.optimisticStatePrefetch and vmState.com.taskpool.numThreads > 1:
ctx.parent = vmState.parent
ctx.blockCtx = vmState.blockCtx
ctx.com = vmState.com
# Run the prefetch on the parent frame because the current frame will
# be writen to during block execution and this way we avoid having to
# use locking on the frame data structures.
ctx.txFrame = vmState.ledger.txFrame.parent()
ctx.cancel.store(false, moRelease)
ctxPtr = ctx.addr

# Spawn one task per transaction that recovers the sender and, when ctxPtr
# is non-nil, also performs an optimistic state prefetch. Spawning here
# allows the task to start early, while we still haven't hashed subsequent txs.
for i, e in entries.mpairs():
e[0] = txs[i].signature().valueOr(default(Signature))
e[1] = txs[i].rlpHashForSigning(txs[i].isEip155)
let a = addr e
# Spawning the task here allows it to start early, while we still haven't
# hashed subsequent txs
e[2] = taskpool.spawn recoverTask(a)

for txIndex {.inject.}, e in entries.mpairs():
template tx(): untyped =
txs[txIndex]

# Sync blocks until the sender is available from the task pool - as soon
# as we have it, we can process this transaction while the senders of the
# other transactions are being computed
let sender {.inject.} = sync(e[2])

body
e.sig = txs[i].signature().valueOr(default(Signature))
e.hash = txs[i].rlpHashForSigning(txs[i].isEip155)
let entryPtr = e.addr
e.fut = vmState.com.taskpool.spawn recoverAndPrefetchTask(
entryPtr, ctxPtr, txs[i].addr)

try:
for txIndex {.inject.}, e in entries.mpairs():
template tx(): untyped =
txs[txIndex]

# Wait until the worker has published the sender.
while not e.senderReady.load(moAcquire):
cpuRelax()
let sender {.inject.} = e.sender

body
finally:
if not ctxPtr.isNil():
# Cancel any in-flight prefetch tasks so that they bail out quickly.
ctxPtr[].cancel.store(true, moRelease)
# Wait for all tasks to complete before returning so that no task
# outlives the local data it references.
for e in entries.mitems():
discard sync(e.fut)

template withSenderSerial(txs: openArray[Transaction], body: untyped) =
for txIndex {.inject.}, tx {.inject.} in txs:
Expand All @@ -76,7 +165,7 @@ template withSender(vmState: BaseVMState, txs: openArray[Transaction], body: unt
if vmState.com.taskpool == nil:
withSenderSerial(txs, body)
else:
withSenderParallel(txs, body, vmState.com.taskpool)
withSenderParallel(vmState, txs, body)
else:
withSenderSerial(txs, body)

Expand Down
Loading
Loading