@@ -90,6 +90,9 @@ public class DataPartitionTableIntegrityCheckProcedure
9090 // how long to check all datanode are alive, the unit is ms
9191 private static final long CHECK_ALL_DATANODE_IS_ALIVE_INTERVAL = 10000 ;
9292
93+ // how long to roll back the next state, the unit is ms
94+ private static final long ROLL_BACK_NEXT_STATE_INTERVAL = 60000 ;
95+
9396 NodeManager dataNodeManager ;
9497 private List <TDataNodeConfiguration > allDataNodes = new ArrayList <>();
9598
@@ -276,7 +279,8 @@ private Flow collectEarliestTimeslots() {
276279 }
277280
278281 if (failedDataNodes .size () == allDataNodes .size ()) {
279- setNextState (DataPartitionTableIntegrityCheckProcedureState .COLLECT_EARLIEST_TIMESLOTS );
282+ delayRollbackNextState (
283+ DataPartitionTableIntegrityCheckProcedureState .COLLECT_EARLIEST_TIMESLOTS );
280284 } else {
281285 setNextState (DataPartitionTableIntegrityCheckProcedureState .ANALYZE_MISSING_PARTITIONS );
282286 }
@@ -439,7 +443,8 @@ private Flow requestPartitionTables() {
439443 }
440444
441445 if (failedDataNodes .size () == allDataNodes .size ()) {
442- setNextState (DataPartitionTableIntegrityCheckProcedureState .COLLECT_EARLIEST_TIMESLOTS );
446+ delayRollbackNextState (
447+ DataPartitionTableIntegrityCheckProcedureState .COLLECT_EARLIEST_TIMESLOTS );
443448 return Flow .HAS_MORE_STATE ;
444449 }
445450
@@ -524,7 +529,8 @@ private Flow requestPartitionTablesHeartBeat() {
524529 // Don't find any one data partition table generation task on all registered DataNodes, go back
525530 // to the REQUEST_PARTITION_TABLES step and re-execute
526531 if (failedDataNodes .size () == allDataNodes .size ()) {
527- setNextState (DataPartitionTableIntegrityCheckProcedureState .REQUEST_PARTITION_TABLES );
532+ delayRollbackNextState (
533+ DataPartitionTableIntegrityCheckProcedureState .REQUEST_PARTITION_TABLES );
528534 return Flow .HAS_MORE_STATE ;
529535 }
530536
@@ -554,7 +560,8 @@ private Flow mergePartitionTables(final ConfigNodeProcedureEnv env) {
554560 if (dataPartitionTables .isEmpty ()) {
555561 LOG .error (
556562 "[DataPartitionIntegrity] No DataPartitionTables to merge, dataPartitionTables is empty" );
557- setNextState (DataPartitionTableIntegrityCheckProcedureState .COLLECT_EARLIEST_TIMESLOTS );
563+ delayRollbackNextState (
564+ DataPartitionTableIntegrityCheckProcedureState .COLLECT_EARLIEST_TIMESLOTS );
558565 return Flow .HAS_MORE_STATE ;
559566 }
560567
@@ -675,14 +682,25 @@ private Flow getFlow() {
675682 if (!failedDataNodes .isEmpty ()) {
676683 allDataNodes .removeAll (failedDataNodes );
677684 skipDataNodes = new HashSet <>(allDataNodes );
678- setNextState (DataPartitionTableIntegrityCheckProcedureState .COLLECT_EARLIEST_TIMESLOTS );
685+ delayRollbackNextState (
686+ DataPartitionTableIntegrityCheckProcedureState .COLLECT_EARLIEST_TIMESLOTS );
679687 return Flow .HAS_MORE_STATE ;
680688 } else {
681689 skipDataNodes .clear ();
682690 return Flow .NO_MORE_STATE ;
683691 }
684692 }
685693
694+ /** Delay to jump to next state, avoid write raft logs frequently when exception occur */
695+ private void delayRollbackNextState (DataPartitionTableIntegrityCheckProcedureState state ) {
696+ sleep (
697+ ROLL_BACK_NEXT_STATE_INTERVAL ,
698+ String .format (
699+ "[DataPartitionIntegrity] Error waiting for roll back the %s state due to thread interruption." ,
700+ state ));
701+ setNextState (state );
702+ }
703+
686704 @ Override
687705 public void serialize (final DataOutputStream stream ) throws IOException {
688706 super .serialize (stream );
0 commit comments