Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/ethlambda/src/checkpoint_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub enum CheckpointSyncError {
Http(#[from] reqwest::Error),
#[error("SSZ deserialization failed: {0:?}")]
SszDecode(DecodeError),
#[error("Storage error: {0}")]
Storage(#[from] ethlambda_storage::Error),
#[error("checkpoint state slot cannot be 0")]
SlotIsZero,
#[error("checkpoint state has no validators")]
Expand Down
13 changes: 8 additions & 5 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ async fn main() -> eyre::Result<()> {
// and the API server (which exposes GET/POST admin endpoints).
let aggregator = AggregatorController::new(options.is_aggregator);

let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone());
let blockchain = BlockChain::spawn(store.clone(), validator_keys, aggregator.clone())
.expect("failed to spawn blockchain actor");

// Note: SwarmConfig.is_aggregator is intentionally a plain bool, not the
// AggregatorController — subnet subscriptions are decided once here and
Expand Down Expand Up @@ -307,8 +308,9 @@ async fn main() -> eyre::Result<()> {
async fn run_test_driver(rpc_config: RpcConfig) -> eyre::Result<()> {
use tokio::sync::RwLock;

let driver: ethlambda_rpc::test_driver::DriverState =
Arc::new(RwLock::new(ethlambda_rpc::test_driver::empty_driver_store()));
let driver: ethlambda_rpc::test_driver::DriverState = Arc::new(RwLock::new(
ethlambda_rpc::test_driver::empty_driver_store().map_err(|e| eyre::eyre!(e.to_string()))?,
));

let shutdown_token = CancellationToken::new();
let rpc_shutdown = shutdown_token.clone();
Expand Down Expand Up @@ -581,7 +583,8 @@ async fn fetch_initial_state(
let Some(checkpoint_url) = checkpoint_url else {
info!("No checkpoint sync URL provided, initializing from genesis state");
let genesis_state = State::from_genesis(genesis.genesis_time, validators);
return Ok(Store::from_anchor_state(backend, genesis_state));
return Store::from_anchor_state(backend, genesis_state)
.map_err(checkpoint_sync::CheckpointSyncError::from);
};

// Checkpoint sync path
Expand All @@ -599,7 +602,7 @@ async fn fetch_initial_state(
);

// Store the anchor state and header, without body
Ok(Store::from_anchor_state(backend, state))
Store::from_anchor_state(backend, state).map_err(checkpoint_sync::CheckpointSyncError::from)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion crates/blockchain/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub fn snapshot_aggregation_inputs(store: &Store) -> Option<AggregationSnapshot>
return None;
}

let head_state = store.head_state();
let head_state = store.head_state().ok()?;
Comment thread
d4m014 marked this conversation as resolved.
Outdated
let validators = &head_state.validators;

let gossip_roots: HashSet<H256> = gossip_groups
Expand Down
118 changes: 77 additions & 41 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant, SystemTime};

type Error = ethlambda_storage::Error;

use ethlambda_network_api::{BlockChainToP2PRef, InitP2P};
use ethlambda_state_transition::is_proposer;
use ethlambda_storage::{ALL_TABLES, Store};
Expand Down Expand Up @@ -60,10 +62,10 @@ impl BlockChain {
store: Store,
validator_keys: HashMap<u64, ValidatorKeyPair>,
aggregator: AggregatorController,
) -> BlockChain {
) -> Result<BlockChain, Error> {
metrics::set_is_aggregator(aggregator.is_enabled());
metrics::set_node_sync_status(metrics::SyncStatus::Idle);
let genesis_time = store.config().genesis_time;
let genesis_time = store.config()?.genesis_time;
let key_manager = key_manager::KeyManager::new(validator_keys);
let handle = BlockChainServer {
store,
Expand All @@ -84,7 +86,7 @@ impl BlockChain {
handle.context(),
block_chain_protocol::Tick,
);
BlockChain { handle }
Ok(BlockChain { handle })
}

pub fn actor_ref(&self) -> &ActorRef<BlockChainServer> {
Expand Down Expand Up @@ -130,14 +132,14 @@ pub struct BlockChainServer {
}

impl BlockChainServer {
async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context<Self>) {
async fn on_tick(&mut self, timestamp_ms: u64, ctx: &Context<Self>) -> Result<(), Error> {
// Observe tick interval duration before any processing
if let Some(prev_instant) = self.last_tick_instant {
metrics::observe_tick_interval_duration(prev_instant.elapsed());
}
self.last_tick_instant = Some(Instant::now());

let genesis_time_ms = self.store.config().genesis_time * 1000;
let genesis_time_ms = self.store.config()?.genesis_time * 1000;

// Calculate current slot and interval from milliseconds
let time_since_genesis_ms = timestamp_ms.saturating_sub(genesis_time_ms);
Expand All @@ -146,9 +148,9 @@ impl BlockChainServer {

// Fail fast: a state with zero validators is invalid and would cause
// panics in proposer selection and attestation processing.
if self.store.head_state().validators.is_empty() {
if self.store.head_state()?.validators.is_empty() {
error!("Head state has no validators, skipping tick");
return;
return Ok(());
}

// Update current slot metric
Expand All @@ -164,9 +166,11 @@ impl BlockChainServer {
// At interval 0, check if we will propose (but don't build the block yet).
// Tick forkchoice first to accept attestations, then build the block
// using the freshly-accepted attestations.
let proposer_validator_id = (interval == 0 && slot > 0)
.then(|| self.get_our_proposer(slot))
.flatten();
let proposer_validator_id = if interval == 0 && slot > 0 {
self.get_our_proposer(slot)?
} else {
None
};

// Tick the store first - this accepts attestations at interval 0 if we have a proposal
store::on_tick(
Expand All @@ -192,9 +196,10 @@ impl BlockChainServer {
}

// Update safe target slot metric (updated by store.on_tick at interval 3)
metrics::update_safe_target_slot(self.store.safe_target_slot());
metrics::update_safe_target_slot(self.store.safe_target_slot()?);
// Update head slot metric (head may change when attestations are promoted at intervals 0/4)
metrics::update_head_slot(self.store.head_slot());
metrics::update_head_slot(self.store.head_slot()?);
Ok(())
}

/// Kick off a committee-signature aggregation session:
Expand Down Expand Up @@ -254,21 +259,26 @@ impl BlockChainServer {
}

Comment thread
d4m014 marked this conversation as resolved.
/// Returns the validator ID if any of our validators is the proposer for this slot.
fn get_our_proposer(&self, slot: u64) -> Option<u64> {
let head_state = self.store.head_state();
fn get_our_proposer(&self, slot: u64) -> Result<Option<u64>, Error> {
let head_state = self.store.head_state()?;
let num_validators = head_state.validators.len() as u64;

self.key_manager
Ok(self
.key_manager
.validator_ids()
.into_iter()
.find(|&vid| is_proposer(vid, slot, num_validators))
.find(|&vid| is_proposer(vid, slot, num_validators)))
}

fn produce_attestations(&mut self, slot: u64, is_aggregator: bool) {
let _timing = metrics::time_attestations_production();

// Produce attestation data once for all validators
let attestation_data = store::produce_attestation_data(&self.store, slot);
let Ok(attestation_data) = store::produce_attestation_data(&self.store, slot)
.inspect_err(|err| error!(%slot, %err, "Failed to produce attestation data"))
else {
return;
};

// For each registered validator, produce and publish attestation
for validator_id in self.key_manager.validator_ids() {
Expand Down Expand Up @@ -369,14 +379,14 @@ impl BlockChainServer {

fn process_block(&mut self, signed_block: SignedBlock) -> Result<(), StoreError> {
store::on_block(&mut self.store, signed_block)?;
let head_slot = self.store.head_slot();
let head_slot = self.store.head_slot()?;
metrics::update_head_slot(head_slot);
metrics::update_latest_justified_slot(self.store.latest_justified().slot);
metrics::update_latest_finalized_slot(self.store.latest_finalized().slot);
metrics::update_latest_justified_slot(self.store.latest_justified()?.slot);
metrics::update_latest_finalized_slot(self.store.latest_finalized()?.slot);
metrics::update_validators_count(self.key_manager.validator_ids().len() as u64);

// Update sync status based on head slot vs wall clock slot
let current_slot = self.store.time() / INTERVALS_PER_SLOT;
let current_slot = self.store.time()? / INTERVALS_PER_SLOT;
let status = if head_slot >= current_slot {
metrics::SyncStatus::Synced
} else {
Expand All @@ -399,13 +409,18 @@ impl BlockChainServer {
// Here we process blocks iteratively, to avoid recursive calls that could
// cause a stack overflow.
while let Some(block) = queue.pop_front() {
self.process_or_pend_block(block, &mut queue);
let _ = self
.process_or_pend_block(block, &mut queue)
.inspect_err(|e| error!(%e, "Failed to process or pend block"));
}

// Prune old states and blocks AFTER the entire cascade completes.
// Running this mid-cascade would delete states that pending children
// still need, causing re-processing loops when fallback pruning is active.
self.store.prune_old_data();
let _ = self
.store
.prune_old_data()
.inspect_err(|e| error!(%e, "Failed to prune old data"));
}

/// Try to process a single block. If its parent state is missing, store it
Expand All @@ -415,7 +430,7 @@ impl BlockChainServer {
&mut self,
signed_block: SignedBlock,
queue: &mut VecDeque<SignedBlock>,
) {
) -> Result<(), Error> {
let slot = signed_block.message.slot;
let block_root = signed_block.message.hash_tree_root();
let parent_root = signed_block.message.parent_root;
Expand All @@ -425,9 +440,9 @@ impl BlockChainServer {
// already part of the canonical chain and cannot affect fork choice.
// Discard any pending children: since we won't process this block,
// children referencing it as parent would remain stuck indefinitely.
if slot <= self.store.latest_finalized().slot {
if slot <= self.store.latest_finalized()?.slot {
self.discard_pending_subtree(block_root);
return;
return Ok(());
}

// Reject blocks whose slot has not started locally, mirroring the
Expand All @@ -437,7 +452,7 @@ impl BlockChainServer {
// Catching this early also avoids persisting bogus future blocks to
// RocksDB and triggering BlocksByRoot fan-out for fabricated parents.
let block_start_interval = slot.saturating_mul(INTERVALS_PER_SLOT);
let store_time = self.store.time();
let store_time = self.store.time()?;
if block_start_interval > store_time + GOSSIP_DISPARITY_INTERVALS {
warn!(
%slot,
Expand All @@ -448,11 +463,11 @@ impl BlockChainServer {
"Rejecting block: slot is too far in future"
);
self.discard_pending_subtree(block_root);
return;
return Ok(());
}

// Check if parent state exists before attempting to process
if !self.store.has_state(&parent_root) {
if !self.store.has_state(&parent_root)? {
info!(%slot, %parent_root, %block_root, "Block parent missing, storing as pending");

// Resolve the actual missing ancestor by walking the chain. A stale entry
Expand All @@ -466,7 +481,10 @@ impl BlockChainServer {
self.pending_block_parents.insert(block_root, missing_root);

// Persist block data to DB (no LiveChain entry — invisible to fork choice)
self.store.insert_pending_block(block_root, signed_block);
let _ = self
.store
.insert_pending_block(block_root, signed_block)
.inspect_err(|e| warn!(%block_root, %e, "Failed to persist pending block"));

// Store only the H256 reference in memory
self.pending_blocks
Expand All @@ -478,16 +496,30 @@ impl BlockChainServer {
// session, the actual missing block is further up the chain.
// Note: this loop always terminates — blocks reference parents by hash,
// so a cycle would require a hash collision.
while let Some(header) = self.store.get_block_header(&missing_root) {
if self.store.has_state(&header.parent_root) {
loop {
let header = match self.store.get_block_header(&missing_root) {
Ok(Some(h)) => h,
Ok(None) => break,
Err(e) => {
error!(%missing_root, %e, "DB error during pending-block walk-up");
break;
}
};
if self.store.has_state(&header.parent_root)? {
// Parent state available — enqueue for processing, cascade
// handles the rest via the outer loop.
let block = self
.store
.get_signed_block(&missing_root)
.expect("header and parent state exist, so the full signed block must too");
queue.push_back(block);
return;
match self.store.get_signed_block(&missing_root) {
Ok(Some(block)) => {
queue.push_back(block);
}
Ok(None) => {
warn!(%missing_root, "Pending block missing from DB during walk-up");
}
Err(e) => {
error!(%missing_root, %e, "DB error fetching pending block during walk-up");
}
}
return Ok(());
}
// Block exists but parent doesn't have state — register as pending
// so the cascade works when the true ancestor arrives
Expand All @@ -502,7 +534,7 @@ impl BlockChainServer {

// Request the actual missing block from network
self.request_missing_block(missing_root);
return;
return Ok(());
}

// Parent exists, proceed with processing
Expand Down Expand Up @@ -530,6 +562,7 @@ impl BlockChainServer {
);
}
}
Ok(())
}

fn request_missing_block(&mut self, block_root: H256) {
Expand Down Expand Up @@ -559,7 +592,7 @@ impl BlockChainServer {
self.pending_block_parents.remove(&block_root);

// Load block data from DB
let Some(child_block) = self.store.get_signed_block(&block_root) else {
let Ok(Some(child_block)) = self.store.get_signed_block(&block_root) else {
warn!(
block_root = %ShortRoot(&block_root.0),
"Pending block missing from DB, skipping"
Expand Down Expand Up @@ -619,7 +652,10 @@ impl BlockChainServer {
let timestamp = SystemTime::UNIX_EPOCH
.elapsed()
.expect("already past the unix epoch");
self.on_tick(timestamp.as_millis() as u64, ctx).await;
let _ = self
.on_tick(timestamp.as_millis() as u64, ctx)
.await
.inspect_err(|e| error!(%e, "Tick failed"));
// Schedule the next tick at the next 800ms interval boundary
let ms_since_epoch = timestamp.as_millis() as u64;
let ms_to_next_interval =
Expand Down
Loading