Skip to content
Open
138 changes: 97 additions & 41 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,20 @@ struct CliOptions {
/// The node ID to look up in annotated_validators.yaml (e.g., "ethlambda_0")
#[arg(long)]
node_id: String,
/// Base URL of a checkpoint-sync peer's API server (e.g., http://peer:5052).
/// Base URL(s) of checkpoint-sync peer API servers (e.g., http://peer:5052).
/// When set, skips genesis initialization and fetches the finalized state
/// and block from the peer's `/lean/v0/states/finalized` and
/// and block from each peer's `/lean/v0/states/finalized` and
/// `/lean/v0/blocks/finalized` endpoints. For backward compatibility, a
/// URL ending in `/lean/v0/states/finalized` is accepted and the trailing
/// path is stripped.
#[arg(long)]
checkpoint_sync_url: Option<String>,
///
/// Multiple URLs may be supplied for redundancy, either comma-separated
/// (`--checkpoint-sync-url u1,u2`) or by repeating the flag
/// (`--checkpoint-sync-url u1 --checkpoint-sync-url u2`). URLs are tried
/// in order; the first one that succeeds is used and any failures fall
/// over to the next URL. Startup only aborts if every URL fails.
#[arg(long, value_delimiter = ',')]
checkpoint_sync_url: Option<Vec<String>>,
/// Whether this node acts as a committee aggregator.
///
/// Seeds the initial value of the live aggregator flag shared by the
Expand Down Expand Up @@ -207,7 +213,7 @@ async fn main() -> eyre::Result<()> {
let backend = Arc::new(RocksDBBackend::open(&data_dir).expect("Failed to open RocksDB"));

let store = fetch_initial_state(
options.checkpoint_sync_url.as_deref(),
options.checkpoint_sync_url.as_deref().unwrap_or(&[]),
&genesis_config,
backend.clone(),
)
Expand Down Expand Up @@ -558,12 +564,60 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
bytes
}

/// Fetch the finalized anchor from a single checkpoint URL, retrying transient
/// races where the peer advances finalization between the state and block
/// fetches.
async fn try_checkpoint_url(
url: &str,
genesis_time: u64,
validators: &[ethlambda_types::state::Validator],
) -> Result<(State, ethlambda_types::block::SignedBlock), checkpoint_sync::CheckpointSyncError> {
const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3;
const ANCHOR_FETCH_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);

let mut attempt = 1;
loop {
match checkpoint_sync::fetch_finalized_anchor(url, genesis_time, validators).await {
Ok(pair) => return Ok(pair),
Err(checkpoint_sync::CheckpointSyncError::AnchorPairingMismatch)
if attempt < MAX_ANCHOR_FETCH_ATTEMPTS =>
{
warn!(
%url,
attempt,
max = MAX_ANCHOR_FETCH_ATTEMPTS,
"Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying"
);
tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await;
attempt += 1;
}
Err(err) => return Err(err),
}
}
}

/// Strip userinfo, query, and fragment from a URL so embedded credentials
/// (basic-auth or token query params) don't leak into logs. Unparseable
/// inputs are replaced with a placeholder rather than logged raw.
fn redact_url(url: &str) -> String {
reqwest::Url::parse(url)
.map(|mut u| {
let _ = u.set_username("");
let _ = u.set_password(None);
u.set_query(None);
u.set_fragment(None);
u.to_string()
})
.unwrap_or_else(|_| "<unparseable url>".to_string())
}
Comment thread
Aliemeka marked this conversation as resolved.
Outdated

/// Fetch the initial state for the node.
///
/// If `checkpoint_url` is provided, performs checkpoint sync by downloading
/// and verifying the finalized state AND signed block in parallel from a
/// remote peer. Otherwise, creates a genesis state from the local genesis
/// configuration.
/// If `checkpoint_urls` is empty, creates a genesis state from the local
/// genesis configuration. Otherwise performs checkpoint sync by downloading
/// and verifying the finalized state AND signed block from a peer. URLs are
/// tried in order: the first peer that succeeds wins, and failures fall over
/// to the next URL. Startup only aborts if every URL fails.
///
/// Fetching the matching signed block lets the local store serve a valid
/// anchor via the `BlocksByRoot` req-resp protocol; without it, peers
Expand All @@ -572,7 +626,7 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
///
/// # Arguments
///
/// * `checkpoint_url` - Optional base URL to a peer's API server
/// * `checkpoint_urls` - Zero or more base URLs of peer API servers
/// * `genesis` - Genesis configuration (for genesis_time verification and genesis state creation)
/// * `backend` - Storage backend for Store creation
///
Expand All @@ -581,51 +635,53 @@ fn read_hex_file_bytes(path: impl AsRef<Path>) -> Vec<u8> {
/// `Ok(Store)` on success, or `Err(CheckpointSyncError)` if checkpoint sync fails.
/// Genesis path is infallible and always returns `Ok`.
async fn fetch_initial_state(
checkpoint_url: Option<&str>,
checkpoint_urls: &[String],
genesis: &GenesisConfig,
backend: Arc<dyn StorageBackend>,
) -> Result<Store, checkpoint_sync::CheckpointSyncError> {
let validators = genesis.validators();

let Some(checkpoint_url) = checkpoint_url else {
let Some((first_url, rest_urls)) = checkpoint_urls.split_first() else {
info!("No checkpoint sync URL provided, initializing from genesis state");
let genesis_state = State::from_genesis(genesis.genesis_time, validators);
return Ok(Store::from_anchor_state(backend, genesis_state));
};

// Checkpoint sync path
info!(%checkpoint_url, "Starting checkpoint sync");
// Checkpoint sync path: try URLs in order, fail over to the next on error.
// Log only the count β€” URLs may carry basic-auth credentials or token query
// parameters; per-URL log lines below redact those before emission.
info!(
url_count = checkpoint_urls.len(),
"Starting checkpoint sync"
);

// The state and block are fetched in parallel; if the peer advances
// finalization between the two requests the pair won't match. Retry a
// small number of times so this transient race doesn't fail node startup.
const MAX_ANCHOR_FETCH_ATTEMPTS: u32 = 3;
const ANCHOR_FETCH_RETRY_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
let mut result = try_checkpoint_url(first_url, genesis.genesis_time, &validators).await;
if let Err(err) = &result {
let url = redact_url(first_url);
if !rest_urls.is_empty() {
warn!(%url, %err, "Checkpoint sync failed for this peer; trying next URL");
} else {
warn!(%url, %err, "Checkpoint sync failed for this peer; no more URLs to try");
}
}
Comment thread
Aliemeka marked this conversation as resolved.
Outdated

let mut attempt = 1;
let (state, signed_block) = loop {
match checkpoint_sync::fetch_finalized_anchor(
checkpoint_url,
genesis.genesis_time,
&validators,
)
.await
{
Ok(pair) => break pair,
Err(checkpoint_sync::CheckpointSyncError::AnchorPairingMismatch)
if attempt < MAX_ANCHOR_FETCH_ATTEMPTS =>
{
warn!(
attempt,
max = MAX_ANCHOR_FETCH_ATTEMPTS,
"Anchor state and block disagree (peer likely advanced finalization mid-fetch); retrying"
);
tokio::time::sleep(ANCHOR_FETCH_RETRY_DELAY).await;
attempt += 1;
for (idx, url) in rest_urls.iter().enumerate() {
if result.is_ok() {
break;
}
result = try_checkpoint_url(url, genesis.genesis_time, &validators).await;
if let Err(err) = &result {
let url = redact_url(url);
let has_more = idx + 1 < rest_urls.len();
if has_more {
warn!(%url, %err, "Checkpoint sync failed for this peer; trying next URL");
} else {
warn!(%url, %err, "Checkpoint sync failed for this peer; no more URLs to try");
}
Err(err) => return Err(err),
}
};
}

let (state, signed_block) = result?;

info!(
slot = state.slot,
Expand Down