Skip to content

Commit b4689cb

Browse files
cdxkerdensumesh
authored andcommitted
feature: only run concensus logic on a new tick.
1 parent 00dade5 commit b4689cb

2 files changed

Lines changed: 76 additions & 77 deletions

File tree

crates/node/src/handlers/consensus/handler.rs

Lines changed: 70 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -27,52 +27,60 @@ impl<N: Network, W: Wallet> Handler<N, W> for ConsensusState {
2727
node: &mut NodeState<N, W>,
2828
message: NetworkEvent,
2929
) -> Result<(), NodeError> {
30-
if self.is_leader && self.current_state == ConsensusPhase::WaitingForPropose {
31-
if let Ok(ChainResponse::GetProposedBlock { block }) = node
32-
.chain_interface_tx
33-
.send_message_with_response(ChainMessage::GetProposedBlock {
34-
previous_block: None,
35-
proposer: node.peer_id.to_bytes(),
36-
})
37-
.await
38-
{
39-
if !block.body.transactions.is_empty() {
40-
let bytes = block.serialize()?;
41-
42-
let broadcast_msg = BroadcastMessage::Block(bytes);
43-
44-
node.network_handle
45-
.send_broadcast(broadcast_msg)
46-
.map_err(|e| {
47-
NodeError::Error(format!("Failed to broadcast block proposal: {e:?}"))
48-
})?;
30+
match message {
31+
NetworkEvent::SelfRequest {
32+
request: SelfRequest::Tick,
33+
..
34+
} => {
35+
if self.is_leader && self.current_state == ConsensusPhase::WaitingForPropose {
36+
if let Ok(ChainResponse::GetProposedBlock { block }) = node
37+
.chain_interface_tx
38+
.send_message_with_response(ChainMessage::GetProposedBlock {
39+
previous_block: None,
40+
proposer: node.peer_id.to_bytes(),
41+
})
42+
.await
43+
{
44+
if !block.body.transactions.is_empty() {
45+
let bytes = block.serialize()?;
4946

50-
info!(
51-
"📦 Proposed block for round {} with {} txs",
52-
self.current_round,
53-
block.body.transactions.len()
54-
);
47+
let broadcast_msg = BroadcastMessage::Block(bytes);
5548

56-
self.current_state = ConsensusPhase::Propose;
57-
}
58-
}
59-
}
49+
node.network_handle
50+
.send_broadcast(broadcast_msg)
51+
.map_err(|e| {
52+
NodeError::Error(format!(
53+
"Failed to broadcast block proposal: {e:?}"
54+
))
55+
})?;
6056

61-
if let Some(start_time) = self.round_start_time {
62-
if start_time.elapsed() >= self.round_timeout && self.is_leader {
63-
info!("Round timeout reached. I am current leader. Proposing new round.");
64-
let next_round = self.current_round + 1;
65-
let new_round_message = ConsensusMessage::NewRound(next_round);
57+
info!(
58+
"📦 Proposed block for round {} with {} txs",
59+
self.current_round,
60+
block.body.transactions.len()
61+
);
6662

67-
node.network_handle
68-
.send_broadcast(BroadcastMessage::Consensus(new_round_message))
69-
.map_err(|e| {
70-
NodeError::Error(format!("Failed to broadcast new round message: {e:?}"))
71-
})?;
72-
}
73-
}
63+
self.current_state = ConsensusPhase::Propose;
64+
}
65+
}
66+
}
7467

75-
match message {
68+
if let Some(start_time) = self.round_start_time {
69+
if start_time.elapsed() >= self.round_timeout && self.is_leader {
70+
info!("Round timeout reached. I am current leader. Proposing new round.");
71+
let next_round = self.current_round + 1;
72+
let new_round_message = ConsensusMessage::NewRound(next_round);
73+
74+
node.network_handle
75+
.send_broadcast(BroadcastMessage::Consensus(new_round_message))
76+
.map_err(|e| {
77+
NodeError::Error(format!(
78+
"Failed to broadcast new round message: {e:?}"
79+
))
80+
})?;
81+
}
82+
}
83+
}
7684
NetworkEvent::SelfRequest {
7785
request: SelfRequest::TriggerConsensusRound { force_round },
7886
response_channel,
@@ -91,9 +99,7 @@ impl<N: Network, W: Wallet> Handler<N, W> for ConsensusState {
9199
},
92100
round_number: u64::from(round_number),
93101
})
94-
.map_err(|e| {
95-
NodeError::Error(format!("Failed to send response: {e}"))
96-
})?;
102+
.map_err(|e| NodeError::Error(format!("Failed to send response: {e}")))?;
97103
}
98104
}
99105
NetworkEvent::Subscribed { peer_id, topic } => {
@@ -118,19 +124,17 @@ impl<N: Network, W: Wallet> Handler<N, W> for ConsensusState {
118124
})?;
119125

120126
match broadcast {
121-
BroadcastMessage::Consensus(consensus_message) => {
122-
match consensus_message {
123-
ConsensusMessage::LeaderAnnouncement(announcement) => {
124-
self.handle_leader_announcement(node, &announcement)?;
125-
}
126-
ConsensusMessage::NewRound(round) => {
127-
self.handle_new_round(node, peer, round)?;
128-
}
129-
ConsensusMessage::Vote(vote) => {
130-
self.handle_vote(node, peer, &vote).await;
131-
}
127+
BroadcastMessage::Consensus(consensus_message) => match consensus_message {
128+
ConsensusMessage::LeaderAnnouncement(announcement) => {
129+
self.handle_leader_announcement(node, &announcement)?;
132130
}
133-
}
131+
ConsensusMessage::NewRound(round) => {
132+
self.handle_new_round(node, peer, round)?;
133+
}
134+
ConsensusMessage::Vote(vote) => {
135+
self.handle_vote(node, peer, &vote).await;
136+
}
137+
},
134138
BroadcastMessage::Block(raw_block) => {
135139
match Block::deserialize(&raw_block) {
136140
Ok(block) => {
@@ -140,17 +144,15 @@ impl<N: Network, W: Wallet> Handler<N, W> for ConsensusState {
140144
peer,
141145
block.body.transactions.len()
142146
);
143-
let Ok(ChainResponse::GetProposedBlock {
144-
block: local_block,
145-
}) = node
146-
.chain_interface_tx
147-
.send_message_with_response(
148-
ChainMessage::GetProposedBlock {
149-
previous_block: None,
150-
proposer: self.proposer.unwrap().to_bytes(),
151-
},
152-
)
153-
.await
147+
let Ok(ChainResponse::GetProposedBlock { block: local_block }) =
148+
node.chain_interface_tx
149+
.send_message_with_response(
150+
ChainMessage::GetProposedBlock {
151+
previous_block: None,
152+
proposer: self.proposer.unwrap().to_bytes(),
153+
},
154+
)
155+
.await
154156
else {
155157
return Err(NodeError::Error(
156158
"Failed to get proposed block".to_string(),
@@ -166,8 +168,7 @@ impl<N: Network, W: Wallet> Handler<N, W> for ConsensusState {
166168
);
167169
info!(
168170
"Local txs: {:?}, Received txs: {:?}",
169-
local_block.body.transactions,
170-
block.body.transactions
171+
local_block.body.transactions, block.body.transactions
171172
);
172173
}
173174
}

crates/node/src/handlers/signing/handler.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,12 @@ impl<N: Network, W: Wallet> Handler<N, W> for SigningState {
4444
.map_err(|e| NodeError::Error(format!("Failed to send response: {e}")))?;
4545
}
4646
}
47-
NetworkEvent::MessageEvent((
48-
peer,
49-
DirectMessage::SignRequest { sign_id, message },
50-
)) => self.handle_sign_request(node, peer, sign_id, message)?,
51-
NetworkEvent::MessageEvent((
52-
peer,
53-
DirectMessage::SignPackage { sign_id, package },
54-
)) => self.handle_sign_package(node, peer, sign_id, &package)?,
47+
NetworkEvent::MessageEvent((peer, DirectMessage::SignRequest { sign_id, message })) => {
48+
self.handle_sign_request(node, peer, sign_id, message)?;
49+
}
50+
NetworkEvent::MessageEvent((peer, DirectMessage::SignPackage { sign_id, package })) => {
51+
self.handle_sign_package(node, peer, sign_id, &package)?;
52+
}
5553
NetworkEvent::MessageEvent((
5654
peer,
5755
DirectMessage::Commitments {

0 commit comments

Comments
 (0)