Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
},
req_resp::{
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec,
MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status,
MAX_COMPRESSED_PAYLOAD_SIZE, MAX_REQUEST_BLOCKS, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer,
},
swarm_adapter::SwarmHandle,
Expand All @@ -59,6 +59,7 @@ const MAX_FETCH_RETRIES: u32 = 10;
const INITIAL_BACKOFF_MS: u64 = 5;
const BACKOFF_MULTIPLIER: u64 = 2;
const PEER_REDIAL_INTERVAL_SECS: u64 = 12;
const MAX_SYNC_RANGE: u64 = MAX_REQUEST_BLOCKS * 64; // 65,536 slots (~3 days)

pub(crate) struct PendingRequest {
pub(crate) attempts: u32,
Expand Down Expand Up @@ -302,6 +303,8 @@ impl P2P {
connected_peers: HashSet::new(),
pending_requests: HashMap::new(),
request_id_map: HashMap::new(),
range_request_ids: HashSet::new(),
pending_sync_ranges: HashSet::new(),
Comment on lines 304 to +307
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should name these similarly, maybe pending_root_requests and pending_range_requests, request_ids_to_root and request_ids_to_range.

Also, maybe we should merge request_id_map with range_request_ids, making the value an enum. That way we can do a single map get and then match on the map value, instead of one get and map per response type.

bootnode_addrs: built.bootnode_addrs,
node_names,
};
Expand Down Expand Up @@ -338,6 +341,8 @@ pub struct P2PServer {
pub(crate) connected_peers: HashSet<PeerId>,
pub(crate) pending_requests: HashMap<H256, PendingRequest>,
pub(crate) request_id_map: HashMap<OutboundRequestId, H256>,
pub(crate) range_request_ids: HashSet<OutboundRequestId>,
pub(crate) pending_sync_ranges: HashSet<(u64, u64)>,
bootnode_addrs: HashMap<PeerId, Multiaddr>,
node_names: HashMap<PeerId, String>,
}
Expand Down
124 changes: 117 additions & 7 deletions crates/net/p2p/src/req_resp/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ use ethlambda_types::primitives::HashTreeRoot as _;
use ethlambda_types::{block::SignedBlock, primitives::H256};

use super::{
BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS,
Request, Response, ResponsePayload, Status,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest,
BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, Response, ResponsePayload, Status,
messages::{ResponseCode, error_message},
};
use crate::{
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest,
p2p_protocol, req_resp::RequestedBlockRoots,
BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, MAX_SYNC_RANGE, P2PServer,
PendingRequest, p2p_protocol, req_resp::RequestedBlockRoots,
};

pub async fn handle_req_resp_message(
Expand Down Expand Up @@ -62,12 +62,18 @@ pub async fn handle_req_resp_message(
Response::Success { payload } => match payload {
ResponsePayload::Status(status) => {
info!(kind = "status_response", peer_count, "P2P message received");
handle_status_response(status, peer).await;
handle_status_response(server, status, peer).await;
}
ResponsePayload::Blocks(blocks) => {
info!(kind = "blocks_response", peer_count, "P2P message received");
handle_blocks_by_root_response(server, blocks, peer, request_id, ctx)
if server.range_request_ids.remove(&request_id) {
handle_blocks_by_range_response(server, blocks, peer).await;
} else {
handle_blocks_by_root_response(
server, blocks, peer, request_id, ctx,
)
.await;
}
}
},
Response::Error { code, message } => {
Expand All @@ -88,6 +94,8 @@ pub async fn handle_req_resp_message(
// Check if this was a block fetch request
if let Some(root) = server.request_id_map.remove(&request_id) {
handle_fetch_failure(server, root, peer, ctx).await;
} else if server.range_request_ids.remove(&request_id) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

range_request_ids should store the requested range for validation

warn!(%peer, ?request_id, "BlocksByRange request failed");
Comment on lines +97 to +98
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should look into adding retry logic here.

}
}
request_response::Event::InboundFailure {
Expand Down Expand Up @@ -118,8 +126,18 @@ async fn handle_status_request(
server.swarm_handle.send_response(channel, response);
}

async fn handle_status_response(status: Status, peer: PeerId) {
async fn handle_status_response(server: &mut P2PServer, status: Status, peer: PeerId) {
info!(finalized_slot=%status.finalized.slot, head_slot=%status.head.slot, "Received status response from peer {peer}");

let our_head_slot = server.store.head_slot();
if status.head.slot <= our_head_slot {
return;
}
let gap = status.head.slot - our_head_slot;
let count = gap.min(MAX_SYNC_RANGE);
let start_slot = our_head_slot.saturating_add(1);
request_blocks_by_range_from_peer(server, peer, start_slot, count).await;
info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange");
}

async fn handle_blocks_by_root_request(
Expand Down Expand Up @@ -268,6 +286,36 @@ async fn handle_blocks_by_root_response(
}
}

async fn handle_blocks_by_range_response(
server: &mut P2PServer,
blocks: Vec<SignedBlock>,
peer: PeerId,
) {
info!(%peer, count = blocks.len(), "Received BlocksByRange response");

if blocks.is_empty() {
warn!(%peer, "Received empty BlocksByRange response");
return;
}

if let Some(ref blockchain) = server.blockchain {
for block in blocks {
let block_root = block.message.hash_tree_root();
let slot = block.message.slot;
// TODO: validate block.message.slot is within the originally requested range.
let _ = blockchain.new_block(block).inspect_err(|err| {
error!(
%peer,
%slot,
block_root = %ethlambda_types::ShortRoot(&block_root.0),
%err,
"Failed to forward range-fetched block to blockchain"
)
});
}
}
Comment thread
dicethedev marked this conversation as resolved.
}

/// Build a Status message from the current Store state.
pub fn build_status(store: &Store) -> Status {
let finalized = store.latest_finalized();
Expand Down Expand Up @@ -366,6 +414,68 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool {
true
}

async fn request_blocks_by_range_from_peer(
server: &mut P2PServer,
peer: PeerId,
start_slot: u64,
count: u64,
) -> bool {
if count == 0 {
return true;
}
let end_slot = start_slot.saturating_add(count).saturating_sub(1);

// Deduplicate: skip if we already have this range in-flight
if server.pending_sync_ranges.contains(&(start_slot, end_slot)) {
info!(%peer, start_slot, end_slot, "BlocksByRange already in-flight, skipping duplicate");
return true;
}
server.pending_sync_ranges.insert((start_slot, end_slot));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending_sync_ranges is never cleared. We should remove entries once we get a response.

Also, we should look into ways to deduplicate ranges. If we receive (122, 189) and (122, 190), we should only make a request for (189, 190).


let mut remaining = count;
let mut next_slot = start_slot;

while remaining > 0 {
let batch_count = remaining.min(MAX_REQUEST_BLOCKS);
let request = BlocksByRangeRequest {
start_slot: next_slot,
count: batch_count,
};

info!(
%peer,
start_slot = next_slot,
count = batch_count,
"Sending BlocksByRange request"
);

let Some(request_id) = server
.swarm_handle
.send_request(
peer,
Request::BlocksByRange(request),
libp2p::StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1),
Comment on lines +452 to +457
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should space these in time. We can send a single request and wait for a response before requesting more.

)
.await
else {
warn!(
%peer,
start_slot = next_slot,
count = batch_count,
"Failed to send BlocksByRange request (swarm adapter closed)"
);
return false;
};

server.range_request_ids.insert(request_id);
Comment thread
dicethedev marked this conversation as resolved.

remaining -= batch_count;
next_slot = next_slot.saturating_add(batch_count);
}

true
}

async fn handle_fetch_failure(
server: &mut P2PServer,
root: H256,
Expand Down