Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
},
req_resp::{
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec,
MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status,
MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer,
},
swarm_adapter::SwarmHandle,
Expand All @@ -59,6 +59,9 @@ const MAX_FETCH_RETRIES: u32 = 10;
const INITIAL_BACKOFF_MS: u64 = 5;
const BACKOFF_MULTIPLIER: u64 = 2;
const PEER_REDIAL_INTERVAL_SECS: u64 = 12;
const LONG_RANGE_SYNC_THRESHOLD: u64 = 2;
const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days)
const MAX_SLOT_LOOKBACK: u64 = MAX_REQUEST_BLOCKS * 4; // 4 096 slots

pub(crate) struct PendingRequest {
pub(crate) attempts: u32,
Expand Down Expand Up @@ -302,6 +305,7 @@ impl P2P {
connected_peers: HashSet::new(),
pending_requests: HashMap::new(),
request_id_map: HashMap::new(),
range_request_ids: HashSet::new(),
bootnode_addrs: built.bootnode_addrs,
node_names,
};
Expand Down Expand Up @@ -338,6 +342,7 @@ pub struct P2PServer {
pub(crate) connected_peers: HashSet<PeerId>,
pub(crate) pending_requests: HashMap<H256, PendingRequest>,
pub(crate) request_id_map: HashMap<OutboundRequestId, H256>,
pub(crate) range_request_ids: HashSet<OutboundRequestId>,
bootnode_addrs: HashMap<PeerId, Multiaddr>,
node_names: HashMap<PeerId, String>,
}
Expand Down
132 changes: 125 additions & 7 deletions crates/net/p2p/src/req_resp/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ use ethlambda_types::primitives::HashTreeRoot as _;
use ethlambda_types::{block::SignedBlock, primitives::H256};

use super::{
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS,
Request, Response, ResponsePayload, Status,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest,
BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, Response, ResponsePayload, Status,
messages::{ResponseCode, error_message},
};
use crate::{
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest,
p2p_protocol, req_resp::RequestedBlockRoots,
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, LONG_RANGE_SYNC_THRESHOLD, MAX_FETCH_RETRIES,
MAX_SLOT_LOOKBACK, MAX_SYNC_RANGE, P2PServer, PendingRequest, p2p_protocol,
req_resp::RequestedBlockRoots,
};

pub async fn handle_req_resp_message(
Expand Down Expand Up @@ -62,12 +63,18 @@ pub async fn handle_req_resp_message(
Response::Success { payload } => match payload {
ResponsePayload::Status(status) => {
info!(kind = "status_response", peer_count, "P2P message received");
handle_status_response(status, peer).await;
handle_status_response(server, status, peer).await;
}
ResponsePayload::Blocks(blocks) => {
info!(kind = "blocks_response", peer_count, "P2P message received");
handle_blocks_by_root_response(server, blocks, peer, request_id, ctx)
if server.range_request_ids.remove(&request_id) {
handle_blocks_by_range_response(server, blocks, peer).await;
} else {
handle_blocks_by_root_response(
server, blocks, peer, request_id, ctx,
)
.await;
}
}
},
Response::Error { code, message } => {
Expand All @@ -88,6 +95,8 @@ pub async fn handle_req_resp_message(
// Check if this was a block fetch request
if let Some(root) = server.request_id_map.remove(&request_id) {
handle_fetch_failure(server, root, peer, ctx).await;
} else if server.range_request_ids.remove(&request_id) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

range_request_ids should store the requested range for validation

warn!(%peer, ?request_id, "BlocksByRange request failed");
Comment on lines +97 to +98
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should look into adding retry logic here.

}
}
request_response::Event::InboundFailure {
Expand Down Expand Up @@ -118,8 +127,26 @@ async fn handle_status_request(
server.swarm_handle.send_response(channel, response);
}

async fn handle_status_response(status: Status, peer: PeerId) {
async fn handle_status_response(server: &mut P2PServer, status: Status, peer: PeerId) {
info!(finalized_slot=%status.finalized.slot, head_slot=%status.head.slot, "Received status response from peer {peer}");

let our_head_slot = server.store.head_slot();
if status.head.slot <= our_head_slot {
return;
}
let gap = status.head.slot - our_head_slot;

if gap > LONG_RANGE_SYNC_THRESHOLD {
Comment thread
dicethedev marked this conversation as resolved.
Outdated
// Long-range sync: request blocks by range to efficiently fill large gap.
let start_slot = our_head_slot.saturating_add(1);
// Cap the range to avoid requesting an excessive number of blocks if the peer is very far ahead.
let count = gap.min(MAX_SYNC_RANGE);
info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange");
request_blocks_by_range_from_peer(server, peer, start_slot, count).await;
} else {
// Short-range sync: fetch individual blocks by root, relying on gossip to fill any small gaps.
info!(%peer, gap, "Short gap, relying on gossip / FetchBlock for missing slots");
}
}

async fn handle_blocks_by_root_request(
Expand Down Expand Up @@ -207,6 +234,12 @@ fn canonical_blocks_by_range(
return Vec::new();
};

// Avoid expensive lookups if the requested range is too far in the past (beyond recent gossip history).
let head_slot = store.head_slot();
if head_slot.saturating_sub(end_slot) > MAX_SLOT_LOOKBACK {
return Vec::new();
}
Comment thread
dicethedev marked this conversation as resolved.
Outdated

let mut roots_by_slot = HashMap::new();
let mut current_root = store.head();

Expand Down Expand Up @@ -282,6 +315,36 @@ async fn handle_blocks_by_root_response(
}
}

async fn handle_blocks_by_range_response(
server: &mut P2PServer,
blocks: Vec<SignedBlock>,
peer: PeerId,
) {
info!(%peer, count = blocks.len(), "Received BlocksByRange response");

if blocks.is_empty() {
warn!(%peer, "Received empty BlocksByRange response");
return;
}

if let Some(ref blockchain) = server.blockchain {
for block in blocks {
let block_root = block.message.hash_tree_root();
let slot = block.message.slot;
// TODO: validate block.message.slot is within the originally requested range.
let _ = blockchain.new_block(block).inspect_err(|err| {
error!(
%peer,
%slot,
block_root = %ethlambda_types::ShortRoot(&block_root.0),
%err,
"Failed to forward range-fetched block to blockchain"
)
});
}
}
Comment thread
dicethedev marked this conversation as resolved.
}

/// Build a Status message from the current Store state.
pub fn build_status(store: &Store) -> Status {
let finalized = store.latest_finalized();
Expand Down Expand Up @@ -380,6 +443,61 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool {
true
}

async fn request_blocks_by_range_from_peer(
server: &mut P2PServer,
peer: PeerId,
start_slot: u64,
count: u64,
) -> bool {
if count == 0 {
return true;
}

let mut remaining = count;
let mut next_slot = start_slot;

while remaining > 0 {
let batch_count = remaining.min(MAX_REQUEST_BLOCKS);
let request = BlocksByRangeRequest {
start_slot: next_slot,
count: batch_count,
step: 1,
};

info!(
%peer,
start_slot = next_slot,
count = batch_count,
"Sending BlocksByRange request"
);

let Some(request_id) = server
.swarm_handle
.send_request(
peer,
Request::BlocksByRange(request),
libp2p::StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1),
Comment on lines +452 to +457
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should space these in time. We can send a single request and wait for a response before requesting more.

)
.await
else {
warn!(
%peer,
start_slot = next_slot,
count = batch_count,
"Failed to send BlocksByRange request (swarm adapter closed)"
);
return false;
};

server.range_request_ids.insert(request_id);
Comment thread
dicethedev marked this conversation as resolved.

remaining -= batch_count;
next_slot = next_slot.saturating_add(batch_count);
}

true
}

async fn handle_fetch_failure(
server: &mut P2PServer,
root: H256,
Expand Down