@@ -25,7 +25,7 @@ impl<N: Network, W: Wallet> Handler<N, W> for ConsensusState {
2525 async fn handle (
2626 & mut self ,
2727 node : & mut NodeState < N , W > ,
28- message : Option < NetworkEvent > ,
28+ message : NetworkEvent ,
2929 ) -> Result < ( ) , NodeError > {
3030 if self . is_leader && self . current_state == ConsensusPhase :: WaitingForPropose {
3131 if let Ok ( ChainResponse :: GetProposedBlock { block } ) = node
@@ -72,115 +72,113 @@ impl<N: Network, W: Wallet> Handler<N, W> for ConsensusState {
7272 }
7373 }
7474
75- if let Some ( message) = message {
76- match message {
77- NetworkEvent :: SelfRequest {
78- request : SelfRequest :: TriggerConsensusRound { force_round } ,
79- response_channel,
80- } => {
81- self . start_new_round ( node) ?;
82- let round_number = self . current_round ;
83-
84- if let Some ( response_channel) = response_channel {
85- response_channel
86- . send ( SelfResponse :: TriggerConsensusRoundResponse {
87- success : true ,
88- message : if force_round {
89- "Forced consensus round triggered" . to_string ( )
90- } else {
91- "Consensus round triggered" . to_string ( )
92- } ,
93- round_number : u64:: from ( round_number) ,
94- } )
95- . map_err ( |e| {
96- NodeError :: Error ( format ! ( "Failed to send response: {e}" ) )
97- } ) ?;
98- }
75+ match message {
76+ NetworkEvent :: SelfRequest {
77+ request : SelfRequest :: TriggerConsensusRound { force_round } ,
78+ response_channel,
79+ } => {
80+ self . start_new_round ( node) ?;
81+ let round_number = self . current_round ;
82+
83+ if let Some ( response_channel) = response_channel {
84+ response_channel
85+ . send ( SelfResponse :: TriggerConsensusRoundResponse {
86+ success : true ,
87+ message : if force_round {
88+ "Forced consensus round triggered" . to_string ( )
89+ } else {
90+ "Consensus round triggered" . to_string ( )
91+ } ,
92+ round_number : u64:: from ( round_number) ,
93+ } )
94+ . map_err ( |e| {
95+ NodeError :: Error ( format ! ( "Failed to send response: {e}" ) )
96+ } ) ?;
9997 }
100- NetworkEvent :: Subscribed { peer_id , topic } => {
101- if topic == self . broadcast_topic . hash ( ) {
102- self . validators . insert ( peer_id ) ;
103-
104- info ! (
105- "🔗 Peer {} subscribed to broadcast topic. Validator set size: {}" ,
106- node . network_handle . peer_name ( & peer_id ) ,
107- self . validators . len ( )
108- ) ;
109-
110- if self . current_round == 0 {
111- self . start_new_round ( node ) ? ;
112- }
98+ }
99+ NetworkEvent :: Subscribed { peer_id , topic } => {
100+ if topic == self . broadcast_topic . hash ( ) {
101+ self . validators . insert ( peer_id ) ;
102+
103+ info ! (
104+ "🔗 Peer {} subscribed to broadcast topic. Validator set size: {}" ,
105+ node . network_handle . peer_name ( & peer_id ) ,
106+ self . validators . len ( )
107+ ) ;
108+
109+ if self . current_round == 0 {
110+ self . start_new_round ( node ) ? ;
113111 }
114112 }
115- NetworkEvent :: GossipsubMessage ( message) => {
116- if let Some ( peer) = message. source {
117- let broadcast = BroadcastMessage :: decode ( & message. data ) . map_err ( |e| {
118- NodeError :: Error ( format ! ( "Failed to decode broadcast message: {e}" ) )
119- } ) ?;
113+ }
114+ NetworkEvent :: GossipsubMessage ( message) => {
115+ if let Some ( peer) = message. source {
116+ let broadcast = BroadcastMessage :: decode ( & message. data ) . map_err ( |e| {
117+ NodeError :: Error ( format ! ( "Failed to decode broadcast message: {e}" ) )
118+ } ) ?;
120119
121- match broadcast {
122- BroadcastMessage :: Consensus ( consensus_message) => {
123- match consensus_message {
124- ConsensusMessage :: LeaderAnnouncement ( announcement) => {
125- self . handle_leader_announcement ( node, & announcement) ?;
126- }
127- ConsensusMessage :: NewRound ( round) => {
128- self . handle_new_round ( node, peer, round) ?;
129- }
130- ConsensusMessage :: Vote ( vote) => {
131- self . handle_vote ( node, peer, & vote) . await ;
132- }
120+ match broadcast {
121+ BroadcastMessage :: Consensus ( consensus_message) => {
122+ match consensus_message {
123+ ConsensusMessage :: LeaderAnnouncement ( announcement) => {
124+ self . handle_leader_announcement ( node, & announcement) ?;
125+ }
126+ ConsensusMessage :: NewRound ( round) => {
127+ self . handle_new_round ( node, peer, round) ?;
128+ }
129+ ConsensusMessage :: Vote ( vote) => {
130+ self . handle_vote ( node, peer, & vote) . await ;
133131 }
134132 }
135- BroadcastMessage :: Block ( raw_block) => {
136- match Block :: deserialize ( & raw_block) {
137- Ok ( block) => {
133+ }
134+ BroadcastMessage :: Block ( raw_block) => {
135+ match Block :: deserialize ( & raw_block) {
136+ Ok ( block) => {
137+ info ! (
138+ "📥 Received block proposal for round {} from {} with {} txs" ,
139+ self . current_round,
140+ peer,
141+ block. body. transactions. len( )
142+ ) ;
143+ let Ok ( ChainResponse :: GetProposedBlock {
144+ block : local_block,
145+ } ) = node
146+ . chain_interface_tx
147+ . send_message_with_response (
148+ ChainMessage :: GetProposedBlock {
149+ previous_block : None ,
150+ proposer : self . proposer . unwrap ( ) . to_bytes ( ) ,
151+ } ,
152+ )
153+ . await
154+ else {
155+ return Err ( NodeError :: Error (
156+ "Failed to get proposed block" . to_string ( ) ,
157+ ) ) ;
158+ } ;
159+
160+ if local_block == block {
161+ info ! ( "Block is valid. Sending prevote." ) ;
162+ self . send_vote ( node, & block, VoteType :: Prevote ) ?;
163+ } else {
164+ info ! (
165+ "Block is invalid. Not voting - transaction mismatch"
166+ ) ;
138167 info ! (
139- "📥 Received block proposal for round {} from {} with {} txs" ,
140- self . current_round,
141- peer,
142- block. body. transactions. len( )
168+ "Local txs: {:?}, Received txs: {:?}" ,
169+ local_block. body. transactions,
170+ block. body. transactions
143171 ) ;
144- let Ok ( ChainResponse :: GetProposedBlock {
145- block : local_block,
146- } ) = node
147- . chain_interface_tx
148- . send_message_with_response (
149- ChainMessage :: GetProposedBlock {
150- previous_block : None ,
151- proposer : self . proposer . unwrap ( ) . to_bytes ( ) ,
152- } ,
153- )
154- . await
155- else {
156- return Err ( NodeError :: Error (
157- "Failed to get proposed block" . to_string ( ) ,
158- ) ) ;
159- } ;
160-
161- if local_block == block {
162- info ! ( "Block is valid. Sending prevote." ) ;
163- self . send_vote ( node, & block, VoteType :: Prevote ) ?;
164- } else {
165- info ! (
166- "Block is invalid. Not voting - transaction mismatch"
167- ) ;
168- info ! (
169- "Local txs: {:?}, Received txs: {:?}" ,
170- local_block. body. transactions,
171- block. body. transactions
172- ) ;
173- }
174172 }
175- Err ( e) => warn ! ( "Failed to deserialize block: {e}" ) ,
176173 }
174+ Err ( e) => warn ! ( "Failed to deserialize block: {e}" ) ,
177175 }
178- _ => { }
179176 }
177+ _ => { }
180178 }
181179 }
182- _ => { }
183180 }
181+ _ => { }
184182 }
185183 Ok ( ( ) )
186184 }
0 commit comments