-
Notifications
You must be signed in to change notification settings - Fork 26
feat(p2p): use BlocksByRange for long-range sync #351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
576b191
cba8385
0130146
815814e
9fc8d73
9028a29
71bc472
300a1af
659456e
1e97021
41bcebf
f201a38
50e85ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,13 +12,13 @@ use ethlambda_types::primitives::HashTreeRoot as _; | |
| use ethlambda_types::{block::SignedBlock, primitives::H256}; | ||
|
|
||
| use super::{ | ||
| BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, BlocksByRootRequest, MAX_REQUEST_BLOCKS, | ||
| Request, Response, ResponsePayload, Status, | ||
| BLOCKS_BY_RANGE_PROTOCOL_V1, BLOCKS_BY_ROOT_PROTOCOL_V1, BlocksByRangeRequest, | ||
| BlocksByRootRequest, MAX_REQUEST_BLOCKS, Request, Response, ResponsePayload, Status, | ||
| messages::{ResponseCode, error_message}, | ||
| }; | ||
| use crate::{ | ||
| BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, P2PServer, PendingRequest, | ||
| p2p_protocol, req_resp::RequestedBlockRoots, | ||
| BACKOFF_MULTIPLIER, INITIAL_BACKOFF_MS, MAX_FETCH_RETRIES, MAX_SYNC_RANGE, P2PServer, | ||
| PendingRequest, p2p_protocol, req_resp::RequestedBlockRoots, | ||
| }; | ||
|
|
||
| pub async fn handle_req_resp_message( | ||
|
|
@@ -62,12 +62,18 @@ pub async fn handle_req_resp_message( | |
| Response::Success { payload } => match payload { | ||
| ResponsePayload::Status(status) => { | ||
| info!(kind = "status_response", peer_count, "P2P message received"); | ||
| handle_status_response(status, peer).await; | ||
| handle_status_response(server, status, peer).await; | ||
| } | ||
| ResponsePayload::Blocks(blocks) => { | ||
| info!(kind = "blocks_response", peer_count, "P2P message received"); | ||
| handle_blocks_by_root_response(server, blocks, peer, request_id, ctx) | ||
| if server.range_request_ids.remove(&request_id) { | ||
| handle_blocks_by_range_response(server, blocks, peer).await; | ||
| } else { | ||
| handle_blocks_by_root_response( | ||
| server, blocks, peer, request_id, ctx, | ||
| ) | ||
| .await; | ||
| } | ||
| } | ||
| }, | ||
| Response::Error { code, message } => { | ||
|
|
@@ -88,6 +94,8 @@ pub async fn handle_req_resp_message( | |
| // Check if this was a block fetch request | ||
| if let Some(root) = server.request_id_map.remove(&request_id) { | ||
| handle_fetch_failure(server, root, peer, ctx).await; | ||
| } else if server.range_request_ids.remove(&request_id) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| warn!(%peer, ?request_id, "BlocksByRange request failed"); | ||
|
Comment on lines
+97
to
+98
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should look into adding retry logic here. |
||
| } | ||
| } | ||
| request_response::Event::InboundFailure { | ||
|
|
@@ -118,8 +126,18 @@ async fn handle_status_request( | |
| server.swarm_handle.send_response(channel, response); | ||
| } | ||
|
|
||
| async fn handle_status_response(status: Status, peer: PeerId) { | ||
| async fn handle_status_response(server: &mut P2PServer, status: Status, peer: PeerId) { | ||
| info!(finalized_slot=%status.finalized.slot, head_slot=%status.head.slot, "Received status response from peer {peer}"); | ||
|
|
||
| let our_head_slot = server.store.head_slot(); | ||
| if status.head.slot <= our_head_slot { | ||
| return; | ||
| } | ||
| let gap = status.head.slot - our_head_slot; | ||
| let count = gap.min(MAX_SYNC_RANGE); | ||
| let start_slot = our_head_slot.saturating_add(1); | ||
| request_blocks_by_range_from_peer(server, peer, start_slot, count).await; | ||
| info!(%peer, start_slot, gap, "Long-range sync: using BlocksByRange"); | ||
| } | ||
|
|
||
| async fn handle_blocks_by_root_request( | ||
|
|
@@ -268,6 +286,36 @@ async fn handle_blocks_by_root_response( | |
| } | ||
| } | ||
|
|
||
| async fn handle_blocks_by_range_response( | ||
| server: &mut P2PServer, | ||
| blocks: Vec<SignedBlock>, | ||
| peer: PeerId, | ||
| ) { | ||
| info!(%peer, count = blocks.len(), "Received BlocksByRange response"); | ||
|
|
||
| if blocks.is_empty() { | ||
| warn!(%peer, "Received empty BlocksByRange response"); | ||
| return; | ||
| } | ||
|
|
||
| if let Some(ref blockchain) = server.blockchain { | ||
| for block in blocks { | ||
| let block_root = block.message.hash_tree_root(); | ||
| let slot = block.message.slot; | ||
| // TODO: validate block.message.slot is within the originally requested range. | ||
| let _ = blockchain.new_block(block).inspect_err(|err| { | ||
| error!( | ||
| %peer, | ||
| %slot, | ||
| block_root = %ethlambda_types::ShortRoot(&block_root.0), | ||
| %err, | ||
| "Failed to forward range-fetched block to blockchain" | ||
| ) | ||
| }); | ||
| } | ||
| } | ||
|
dicethedev marked this conversation as resolved.
|
||
| } | ||
|
|
||
| /// Build a Status message from the current Store state. | ||
| pub fn build_status(store: &Store) -> Status { | ||
| let finalized = store.latest_finalized(); | ||
|
|
@@ -366,6 +414,68 @@ pub async fn fetch_block_from_peer(server: &mut P2PServer, root: H256) -> bool { | |
| true | ||
| } | ||
|
|
||
| async fn request_blocks_by_range_from_peer( | ||
| server: &mut P2PServer, | ||
| peer: PeerId, | ||
| start_slot: u64, | ||
| count: u64, | ||
| ) -> bool { | ||
| if count == 0 { | ||
| return true; | ||
| } | ||
| let end_slot = start_slot.saturating_add(count).saturating_sub(1); | ||
|
|
||
| // Deduplicate: skip if we already have this range in-flight | ||
| if server.pending_sync_ranges.contains(&(start_slot, end_slot)) { | ||
| info!(%peer, start_slot, end_slot, "BlocksByRange already in-flight, skipping duplicate"); | ||
| return true; | ||
| } | ||
| server.pending_sync_ranges.insert((start_slot, end_slot)); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Also, we should look into ways to deduplicate ranges. If we receive (122, 189) and (122, 190), we should only make a request for (189, 190). |
||
|
|
||
| let mut remaining = count; | ||
| let mut next_slot = start_slot; | ||
|
|
||
| while remaining > 0 { | ||
| let batch_count = remaining.min(MAX_REQUEST_BLOCKS); | ||
| let request = BlocksByRangeRequest { | ||
| start_slot: next_slot, | ||
| count: batch_count, | ||
| }; | ||
|
|
||
| info!( | ||
| %peer, | ||
| start_slot = next_slot, | ||
| count = batch_count, | ||
| "Sending BlocksByRange request" | ||
| ); | ||
|
|
||
| let Some(request_id) = server | ||
| .swarm_handle | ||
| .send_request( | ||
| peer, | ||
| Request::BlocksByRange(request), | ||
| libp2p::StreamProtocol::new(BLOCKS_BY_RANGE_PROTOCOL_V1), | ||
|
Comment on lines
+452
to
+457
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should space these in time. We can send a single request and wait for a response before requesting more. |
||
| ) | ||
| .await | ||
| else { | ||
| warn!( | ||
| %peer, | ||
| start_slot = next_slot, | ||
| count = batch_count, | ||
| "Failed to send BlocksByRange request (swarm adapter closed)" | ||
| ); | ||
| return false; | ||
| }; | ||
|
|
||
| server.range_request_ids.insert(request_id); | ||
|
dicethedev marked this conversation as resolved.
|
||
|
|
||
| remaining -= batch_count; | ||
| next_slot = next_slot.saturating_add(batch_count); | ||
| } | ||
|
|
||
| true | ||
| } | ||
|
|
||
| async fn handle_fetch_failure( | ||
| server: &mut P2PServer, | ||
| root: H256, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should name these similarly, maybe
pending_root_requestsandpending_range_requests,request_ids_to_rootandrequest_ids_to_range.Also, maybe we should merge
request_id_mapwithrange_request_ids, making the value an enum. That way we can do a single map get and then match on the map value, instead of one get and map per response type.