Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ use crate::{
publish_attestation, publish_block,
},
req_resp::{
BLOCKS_BY_ROOT_PROTOCOL_V1, Codec, MAX_COMPRESSED_PAYLOAD_SIZE, Request,
STATUS_PROTOCOL_V1, build_status, fetch_block_from_peer,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, Codec,
MAX_COMPRESSED_PAYLOAD_SIZE, Request, STATUS_PROTOCOL_V1, build_status,
fetch_block_from_peer,
},
swarm_adapter::SwarmHandle,
};
Expand All @@ -58,6 +59,7 @@ const MAX_FETCH_RETRIES: u32 = 10;
const INITIAL_BACKOFF_MS: u64 = 5;
const BACKOFF_MULTIPLIER: u64 = 2;
const PEER_REDIAL_INTERVAL_SECS: u64 = 12;
pub const LONG_RANGE_SYNC_THRESHOLD: u64 = 2;

pub(crate) struct PendingRequest {
pub(crate) attempts: u32,
Expand Down Expand Up @@ -154,6 +156,10 @@ pub fn build_swarm(
StreamProtocol::new(BLOCKS_BY_ROOT_PROTOCOL_V1),
request_response::ProtocolSupport::Full,
),
(
StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1),
request_response::ProtocolSupport::Full,
),
],
Default::default(),
);
Expand Down
41 changes: 36 additions & 5 deletions crates/net/p2p/src/req_resp/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use tracing::{debug, trace, warn};
use super::{
encoding::{MAX_PAYLOAD_SIZE, decode_payload, write_payload},
messages::{
BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response, ResponseCode, ResponsePayload,
STATUS_PROTOCOL_V1, Status,
BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, ErrorMessage, Request, Response,
ResponseCode, ResponsePayload, STATUS_PROTOCOL_V1, Status,
},
};

Expand All @@ -21,6 +21,7 @@ fn protocol_label(protocol: &str) -> &'static str {
match protocol {
STATUS_PROTOCOL_V1 => "status",
BLOCKS_BY_ROOT_PROTOCOL_V1 => "blocks_by_root",
BLOCKS_BY_RANGE_PROTOCOL_V1 => "blocks_by_range",
_ => "unknown",
}
}
Expand Down Expand Up @@ -59,6 +60,12 @@ impl libp2p::request_response::Codec for Codec {
})?;
Ok(Request::BlocksByRoot(request))
}
BLOCKS_BY_RANGE_PROTOCOL_V1 => {
let request = SszDecode::from_ssz_bytes(&payload).map_err(|err| {
io::Error::new(io::ErrorKind::InvalidData, format!("{err:?}"))
})?;
Ok(Request::BlocksByRange(request))
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unknown protocol: {}", protocol.as_ref()),
Expand All @@ -78,6 +85,7 @@ impl libp2p::request_response::Codec for Codec {
match protocol.as_ref() {
STATUS_PROTOCOL_V1 => decode_status_response(io, label).await,
BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io, label).await,
BLOCKS_BY_RANGE_PROTOCOL_V1 => decode_blocks_by_range_response(io, label).await,
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unknown protocol: {}", protocol.as_ref()),
Expand All @@ -99,6 +107,7 @@ impl libp2p::request_response::Codec for Codec {
let encoded = match req {
Request::Status(status) => status.to_ssz(),
Request::BlocksByRoot(request) => request.to_ssz(),
Request::BlocksByRange(request) => request.to_ssz(),
};

let compressed_size = write_payload(io, &encoded).await?;
Expand Down Expand Up @@ -132,7 +141,8 @@ impl libp2p::request_response::Codec for Codec {
);
Ok(())
}
ResponsePayload::BlocksByRoot(blocks) => {
ResponsePayload::BlocksByRoot(blocks)
| ResponsePayload::BlocksByRange(blocks) => {
// Write each block as a separate chunk.
// Encode first, then check size before writing the SUCCESS
// code byte. This avoids corrupting the stream if a block
Expand All @@ -143,7 +153,7 @@ impl libp2p::request_response::Codec for Codec {
if encoded.len() > MAX_PAYLOAD_SIZE - 1024 {
warn!(
size = encoded.len(),
"Skipping oversized block in BlocksByRoot response"
"Skipping oversized block in block response"
);
continue;
}
Expand Down Expand Up @@ -254,6 +264,27 @@ where
/// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this
/// function to return `Err` - they are logged and skipped.
async fn decode_blocks_by_root_response<T>(io: &mut T, protocol_label: &str) -> io::Result<Response>
where
T: AsyncRead + Unpin + Send,
{
decode_blocks_response(io, protocol_label, ResponsePayload::BlocksByRoot).await
}

async fn decode_blocks_by_range_response<T>(
io: &mut T,
protocol_label: &str,
) -> io::Result<Response>
where
T: AsyncRead + Unpin + Send,
{
decode_blocks_response(io, protocol_label, ResponsePayload::BlocksByRange).await
}

async fn decode_blocks_response<T>(
io: &mut T,
protocol_label: &str,
payload: fn(Vec<SignedBlock>) -> ResponsePayload,
) -> io::Result<Response>
where
T: AsyncRead + Unpin + Send,
{
Expand Down Expand Up @@ -291,5 +322,5 @@ where
blocks.push(block);
}

Ok(Response::success(ResponsePayload::BlocksByRoot(blocks)))
Ok(Response::success(payload(blocks)))
}
Loading