Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions src/main/java/org/phoebus/channelfinder/Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.FileCopyUtils;

@EnableAutoConfiguration
Expand Down Expand Up @@ -113,19 +111,4 @@ public List<ChannelProcessor> channelProcessors() {
});
return processors;
}

/**
* {@link TaskExecutor} used when calling {@link ChannelProcessor}s.
*
* @return A {@link TaskExecutor}
*/
@Bean("channelFinderTaskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(3);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(25);

return taskExecutor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.phoebus.channelfinder.configuration;

import java.util.logging.Level;
import java.util.logging.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

/**
* {@link ThreadPoolTaskExecutor} used when calling {@link ChannelProcessor}s.
*
* <p>Pool parameters are derived from {@code processors.max_concurrent_updates}. Individual values
* can be overridden via the {@code processors.task_executor.*} properties (values ≤ 0 mean "use the
* derived value").
*/
@Component("channelFinderTaskExecutor")
public class ChannelFinderProcessorExecutor extends ThreadPoolTaskExecutor {

private static final Logger logger =
Logger.getLogger(ChannelFinderProcessorExecutor.class.getName());

public ChannelFinderProcessorExecutor(
@Value("${processors.max_concurrent_updates:10}") int maxConcurrent,
@Value("${processors.task_executor.core_pool_size:-1}") int overrideCore,
@Value("${processors.task_executor.max_pool_size:-1}") int overrideMax,
@Value("${processors.task_executor.queue_capacity:-1}") int overrideQueue) {

int core = overrideCore > 0 ? overrideCore : maxConcurrent;
int max = overrideMax > 0 ? overrideMax : maxConcurrent;
int queue = overrideQueue > 0 ? overrideQueue : Math.max(1, maxConcurrent / 4);

setCorePoolSize(core);
setMaxPoolSize(max);
setQueueCapacity(queue);
setRejectedExecutionHandler(
(runnable, executor) -> {
if (!executor.isShutdown()) {
executor.getQueue().poll(); // evict oldest (stale) task to make room
executor.getQueue().offer(runnable);
logger.log(
Level.WARNING,
() ->
"ChannelFinderProcessorExecutor task queue full — evicted oldest task to admit fresher update"
+ " (active="
+ executor.getActiveCount()
+ ", queued="
+ executor.getQueue().size()
+ ")");
}
});
}
}
20 changes: 20 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,26 @@ logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO
################ Processor ##################################################
processors.chunking.size=10000

# ChannelFinderProcessorExecutor: controls the thread pool used to dispatch
# channel processor tasks (e.g. archiver sync on IOC reconnect).
#
# How many batches can be processed concurrently. Each in-flight batch holds
# one thread for the duration of its archiver HTTP calls. Increase this if you
# have more archivers or expect large-scale maintenance windows with many IOC
# restarts happening at once.
#
# Pool parameters are derived from this value:
# corePoolSize = max_concurrent_updates
# maxPoolSize = max_concurrent_updates
# queueCapacity = max(1, max_concurrent_updates / 4)
#
# Advanced: uncomment the task_executor properties below to override
# individual values (values <= 0 mean "use the derived value").
processors.max_concurrent_updates=10
# processors.task_executor.core_pool_size=-1
# processors.task_executor.max_pool_size=-1
# processors.task_executor.queue_capacity=-1

################ Archiver Appliance Configuration Processor #################
aa.urls={'default': 'http://localhost:17665'}
# Comma-separated list of archivers to use if archiver_property_name is null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.phoebus.channelfinder.processors;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.phoebus.channelfinder.configuration.ChannelFinderProcessorExecutor;

class ChannelFinderProcessorExecutorTest {

@Test
void testDefaultPoolSizing() throws Exception {
ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(4, -1, -1, -1);
ex.initialize();
Assertions.assertEquals(4, ex.getCorePoolSize());
Assertions.assertEquals(4, ex.getMaxPoolSize());
Assertions.assertEquals(1, ex.getQueueCapacity()); // max(1, 4/4) = 1
ex.shutdown();
}

@Test
void testMinQueueCapacity() throws Exception {
// max(1, 1/4) = max(1, 0) = 1
ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(1, -1, -1, -1);
ex.initialize();
Assertions.assertEquals(1, ex.getQueueCapacity());
ex.shutdown();
}

@Test
void testOverridesTakePrecedence() throws Exception {
ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(10, 2, 6, 8);
ex.initialize();
Assertions.assertEquals(2, ex.getCorePoolSize());
Assertions.assertEquals(6, ex.getMaxPoolSize());
Assertions.assertEquals(8, ex.getQueueCapacity());
ex.shutdown();
}

@Test
void testRejectionHandlerEvictsOldestAndAdmitsNew() throws Exception {
// 1 thread, queue=1 → third submit triggers the rejection handler
ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(1, -1, -1, 1);
ex.initialize();

CountDownLatch blocker = new CountDownLatch(1);
CountDownLatch task1Ready = new CountDownLatch(1);
AtomicBoolean task2Ran = new AtomicBoolean(false);
AtomicBoolean task3Ran = new AtomicBoolean(false);
CountDownLatch task3Done = new CountDownLatch(1);

// task1: blocks the sole thread
ex.execute(
() -> {
task1Ready.countDown();
try {
blocker.await();
} catch (InterruptedException e) {

Check warning on line 59 in src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace "e" with an unnamed pattern.

See more on https://sonarcloud.io/project/issues?id=ChannelFinder_ChannelFinderService&issues=AZ5o8-fvsmFUYkBRz41R&open=AZ5o8-fvsmFUYkBRz41R&pullRequest=224
Thread.currentThread().interrupt();
}
});
task1Ready.await(2, TimeUnit.SECONDS);

// task2: sits in queue (the stale task to be evicted)
ex.execute(() -> task2Ran.set(true));

// task3: triggers rejection handler → evicts task2, admits task3
ex.execute(
() -> {
task3Ran.set(true);
task3Done.countDown();
});

blocker.countDown(); // release the blocked thread
Assertions.assertTrue(task3Done.await(2, TimeUnit.SECONDS), "task3 did not complete in time");

Assertions.assertTrue(task3Ran.get(), "Newer task must run after eviction");
Assertions.assertFalse(task2Ran.get(), "Older queued task must be evicted");

ex.shutdown();
}
}
Loading