diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java index 9ecc2aa2c138..b3f06a61ea3e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java @@ -88,6 +88,7 @@ public class ReconTaskControllerImpl implements ReconTaskController { private final ReconTaskStatusUpdaterManager taskStatusUpdaterManager; private final OMUpdateEventBuffer eventBuffer; private ExecutorService eventProcessingExecutor; + private volatile boolean running = false; private final AtomicBoolean tasksFailed = new AtomicBoolean(false); private volatile ReconOMMetadataManager currentOMMetadataManager; private final OzoneConfiguration configuration; @@ -359,6 +360,7 @@ public synchronized void start() { .build()); // Start async event processing thread + running = true; eventProcessingExecutor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setNameFormat("ReconEventProcessor-%d") .build()); @@ -369,6 +371,9 @@ public synchronized void start() { @Override public synchronized void stop() { LOG.info("Stopping Recon Task Controller."); + // Signal the event processing loop to exit on its next poll cycle so the + // graceful shutdown below can complete without waiting out the timeout. + running = false; shutdownExecutorGracefully(this.executorService, "main task executor"); shutdownExecutorGracefully(this.eventProcessingExecutor, "event processing executor"); } @@ -481,7 +486,7 @@ private void processTasks( private void processBufferedEventsAsync() { LOG.info("Started async buffered event processing thread"); - while (!Thread.currentThread().isInterrupted()) { + while (running && !Thread.currentThread().isInterrupted()) { try { ReconEvent event = eventBuffer.poll(1000); // 1 second timeout if (event != null) { diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java index da1f790ccdb8..c2636701bf30 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestReconTaskControllerImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider; import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater; import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager; +import org.apache.hadoop.util.Time; import org.apache.ozone.recon.schema.generated.tables.daos.ReconTaskStatusDao; import org.apache.ozone.recon.schema.generated.tables.pojos.ReconTaskStatus; import org.apache.ozone.test.GenericTestUtils; @@ -95,6 +96,19 @@ public void setUp() throws IOException { reconTaskController.start(); } + @Test + public void testStopCompletesPromptly() { + // stop() must not block on the graceful shutdown timeout. The event + // processing loop only exits on interrupt, so a plain shutdown() can never + // drain it and awaitTermination would otherwise burn the full 30s timeout. + long start = Time.monotonicNow(); + reconTaskController.stop(); + long elapsed = Time.monotonicNow() - start; + assertThat(elapsed) + .as("stop() should return promptly, not wait out the shutdown timeout") + .isLessThan(5000L); + } + @Test public void testRegisterTask() { String taskName = "Dummy_" + System.currentTimeMillis(); @@ -596,9 +610,11 @@ public void testProcessReInitializationEventWithTaskFailuresAndRetry() throws Ex .thenReturn(false) // First call fails .thenReturn(true); // Second call succeeds - // Stop async processing to control event processing manually - controllerSpy.stop(); - + // Stop async processing on the real controller so we can drive event + // processing manually. Stopping controllerSpy would only flip the flag on + // the Mockito copy, not the live event-processing thread. + controllerImpl.stop(); + // Create and manually process a reinitialization event ReconTaskReInitializationEvent reinitEvent = new ReconTaskReInitializationEvent( ReconTaskReInitializationEvent.ReInitializationReason.TASK_FAILURES, @@ -732,9 +748,11 @@ public void testProcessReInitializationEventWithCheckpointedManager() throws Exc when(controllerSpy.reInitializeTasks(any(ReconOMMetadataManager.class), any())) .thenReturn(true); // Succeed - // Stop async processing to control event processing manually - controllerSpy.stop(); - + // Stop async processing on the real controller so we can drive event + // processing manually. Stopping controllerSpy would only flip the flag on + // the Mockito copy, not the live event-processing thread. + controllerImpl.stop(); + // Create reinitialization event with checkpointed manager ReconTaskReInitializationEvent reinitEvent = new ReconTaskReInitializationEvent( ReconTaskReInitializationEvent.ReInitializationReason.BUFFER_OVERFLOW,