Skip to content
Open
117 changes: 75 additions & 42 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,20 @@ struct CliOptions {
/// The node ID to look up in annotated_validators.yaml (e.g., "ethlambda_0")
#[arg(long)]
node_id: String,
/// Base URL of a checkpoint-sync peer's API server (e.g., http://peer:5052).
/// Base URL(s) of checkpoint-sync peer API servers (e.g., http://peer:5052).
/// When set, skips genesis initialization and fetches the finalized state
/// and block from the peer's `/lean/v0/states/finalized` and
/// and block from each peer's `/lean/v0/states/finalized` and
/// `/lean/v0/blocks/finalized` endpoints. For backward compatibility, a
/// URL ending in `/lean/v0/states/finalized` is accepted and the trailing
/// path is stripped.
#[arg(long)]
checkpoint_sync_url: Option<String>,
///
/// Multiple URLs may be supplied for redundancy, either comma-separated
/// (`--checkpoint-sync-url u1,u2`) or by repeating the flag
/// (`--checkpoint-sync-url u1 --checkpoint-sync-url u2`). URLs are tried
/// in order; the first one that succeeds is used and any failures fall
/// over to the next URL. Startup only aborts if every URL fails.
#[arg(long, value_delimiter = ',')]
checkpoint_sync_url: Option<Vec<String>>,
/// Whether this node acts as a committee aggregator.
///
/// Seeds the initial value of the live aggregator flag shared by the
Expand Down Expand Up @@ -207,7 +213,7 @@ async fn main() -> eyre::Result<()> {
let backend = Arc::new(RocksDBBackend::open(&data_dir).expect("Failed to open RocksDB"));

let store = fetch_initial_state(
options.checkpoint_sync_url.as_deref(),
options.checkpoint_sync_url.as_deref().unwrap_or(&[]),
&genesis_config,
backend.clone(),
)
Expand Down Expand Up @@ -558,12 +564,45 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
bytes
}

/// Fetch the finalized anchor from a single checkpoint URL, retrying transient
/// races where the peer advances finalization between the state and block
/// fetches.
async fn try_checkpoint_url(
url: &str,
genesis_time: u64,
validators: &[ethlambda_types::state::Validator],
) -> Result<(State, ethlambda_types::block::SignedBlock), checkpoint_sync::CheckpointSyncError> {
const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3;
const ANCHOR_FETCH_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);

let mut attempt = 1;
loop {
match checkpoint_sync::fetch_finalized_anchor(url, genesis_time, validators).await {
Ok(pair) => return Ok(pair),
Err(checkpoint_sync::CheckpointSyncError::AnchorPairingMismatch)
if attempt < MAX_ANCHOR_FETCH_ATTEMPTS =>
{
warn!(
%url,
attempt,
max = MAX_ANCHOR_FETCH_ATTEMPTS,
"Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying"
);
tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await;
attempt += 1;
}
Err(err) => return Err(err),
}
}
}

/// Fetch the initial state for the node.
///
/// If `checkpoint_url` is provided, performs checkpoint sync by downloading
/// and verifying the finalized state AND signed block in parallel from a
/// remote peer. Otherwise, creates a genesis state from the local genesis
/// configuration.
/// If `checkpoint_urls` is empty, creates a genesis state from the local
/// genesis configuration. Otherwise performs checkpoint sync by downloading
/// and verifying the finalized state AND signed block from a peer. URLs are
/// tried in order: the first peer that succeeds wins, and failures fall over
/// to the next URL. Startup only aborts if every URL fails.
///
/// Fetching the matching signed block lets the local store serve a valid
/// anchor via the `BlocksByRoot` req-resp protocol; without it, peers
Expand All @@ -572,7 +611,7 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
///
/// # Arguments
///
/// * `checkpoint_url` - Optional base URL to a peer's API server
/// * `checkpoint_urls` - Zero or more base URLs of peer API servers
/// * `genesis` - Genesis configuration (for genesis_time verification and genesis state creation)
/// * `backend` - Storage backend for Store creation
///
Expand All @@ -581,51 +620,45 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
/// `Ok(Store)` on success, or `Err(CheckpointSyncError)` if checkpoint sync fails.
/// Genesis path is infallible and always returns `Ok`.
async fn fetch_initial_state(
checkpoint_url: Option<&str>,
checkpoint_urls: &[String],
genesis: &GenesisConfig,
backend: Arc<dyn StorageBackend>,
) -> Result<Store, checkpoint_sync::CheckpointSyncError> {
let validators = genesis.validators();

let Some(checkpoint_url) = checkpoint_url else {
let Some((first_url, rest_urls)) = checkpoint_urls.split_first() else {
info!("No checkpoint sync URL provided, initializing from genesis state");
let genesis_state = State::from_genesis(genesis.genesis_time, validators);
return Ok(Store::from_anchor_state(backend, genesis_state));
};

// Checkpoint sync path
info!(%checkpoint_url, "Starting checkpoint sync");
// Checkpoint sync path: try URLs in order, fail over to the next on error.
info!(urls = ?checkpoint_urls, "Starting checkpoint sync");
Comment thread
Aliemeka marked this conversation as resolved.
Outdated

// The state and block are fetched in parallel; if the peer advances
// finalization between the two requests the pair won't match. Retry a
// small number of times so this transient race doesn't fail node startup.
const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3;
const ANCHOR_FETCH_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
let mut result = try_checkpoint_url(first_url, genesis.genesis_time, &validators).await;
if let Err(err) = &result {
warn!(
url = %first_url,
%err,
"Checkpoint sync failed for this peer; trying next URL"
);
}

let mut attempt = 1;
let (state, signed_block) = loop {
match checkpoint_sync::fetch_finalized_anchor(
checkpoint_url,
genesis.genesis_time,
&validators,
)
.await
{
Ok(pair) => break pair,
Err(checkpoint_sync::CheckpointSyncError::AnchorPairingMismatch)
if attempt < MAX_ANCHOR_FETCH_ATTEMPTS =>
{
warn!(
attempt,
max = MAX_ANCHOR_FETCH_ATTEMPTS,
"Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying"
);
tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await;
attempt += 1;
}
Err(err) => return Err(err),
for url in rest_urls {
if result.is_ok() {
break;
}
};
result = try_checkpoint_url(url, genesis.genesis_time, &validators).await;
if let Err(err) = &result {
warn!(
%url,
%err,
"Checkpoint sync failed for this peer; trying next URL"
);
}
Comment thread
Aliemeka marked this conversation as resolved.
Outdated
}

let (state, signed_block) = result?;

info!(
slot = state.slot,
Expand Down