From 455af9cfe7f73d24c7dde98825a4c781123388d5 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 25 May 2026 14:37:49 +0200 Subject: [PATCH] feat(processors): give channel processors a tunable, named thread pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Channel processors (e.g. archiver sync on IOC reconnect) are dispatched asynchronously on every channel write. Previously the pool was anonymous, sized by a single hard-coded property, and used AbortPolicy — meaning a saturated queue would propagate an exception to the HTTP caller and cause receivers to retry, increasing load further. ChannelFinderProcessorExecutor replaces it with a named @Component whose size is driven by processors.max_concurrent_updates (default 10). Core and max are equal to avoid ramp-up lag during bursts of channel updates. The queue is deliberately shallow (N/4) with a discard-oldest rejection policy: when the pool is saturated, the stalest pending batch is evicted in favour of the fresher update, since a newer channel snapshot always supersedes an older one. Individual pool parameters can be overridden via processors.task_executor.* properties without touching the shared default. Tests cover pool-size derivation from defaults and overrides, and confirm the eviction behaviour under a saturated queue. Co-Authored-By: Claude Sonnet 4.6 --- .../phoebus/channelfinder/Application.java | 17 ---- .../ChannelFinderProcessorExecutor.java | 52 ++++++++++++ src/main/resources/application.properties | 20 +++++ .../ChannelFinderProcessorExecutorTest.java | 83 +++++++++++++++++++ 4 files changed, 155 insertions(+), 17 deletions(-) create mode 100644 src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java create mode 100644 src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java diff --git a/src/main/java/org/phoebus/channelfinder/Application.java b/src/main/java/org/phoebus/channelfinder/Application.java index 28d4bee9..7f7e9799 100644 --- a/src/main/java/org/phoebus/channelfinder/Application.java +++ b/src/main/java/org/phoebus/channelfinder/Application.java @@ -31,9 +31,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; -import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.FileCopyUtils; @EnableAutoConfiguration @@ -113,19 +111,4 @@ public List channelProcessors() { }); return processors; } - - /** - * {@link TaskExecutor} used when calling {@link ChannelProcessor}s. - * - * @return A {@link TaskExecutor} - */ - @Bean("channelFinderTaskExecutor") - public TaskExecutor taskExecutor() { - ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); - taskExecutor.setCorePoolSize(3); - taskExecutor.setMaxPoolSize(10); - taskExecutor.setQueueCapacity(25); - - return taskExecutor; - } } diff --git a/src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java b/src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java new file mode 100644 index 00000000..dff1a099 --- /dev/null +++ b/src/main/java/org/phoebus/channelfinder/configuration/ChannelFinderProcessorExecutor.java @@ -0,0 +1,52 @@ +package org.phoebus.channelfinder.configuration; + +import java.util.logging.Level; +import java.util.logging.Logger; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +/** + * {@link ThreadPoolTaskExecutor} used when calling {@link ChannelProcessor}s. + * + *

Pool parameters are derived from {@code processors.max_concurrent_updates}. Individual values + * can be overridden via the {@code processors.task_executor.*} properties (values ≤ 0 mean "use the + * derived value"). + */ +@Component("channelFinderTaskExecutor") +public class ChannelFinderProcessorExecutor extends ThreadPoolTaskExecutor { + + private static final Logger logger = + Logger.getLogger(ChannelFinderProcessorExecutor.class.getName()); + + public ChannelFinderProcessorExecutor( + @Value("${processors.max_concurrent_updates:10}") int maxConcurrent, + @Value("${processors.task_executor.core_pool_size:-1}") int overrideCore, + @Value("${processors.task_executor.max_pool_size:-1}") int overrideMax, + @Value("${processors.task_executor.queue_capacity:-1}") int overrideQueue) { + + int core = overrideCore > 0 ? overrideCore : maxConcurrent; + int max = overrideMax > 0 ? overrideMax : maxConcurrent; + int queue = overrideQueue > 0 ? overrideQueue : Math.max(1, maxConcurrent / 4); + + setCorePoolSize(core); + setMaxPoolSize(max); + setQueueCapacity(queue); + setRejectedExecutionHandler( + (runnable, executor) -> { + if (!executor.isShutdown()) { + executor.getQueue().poll(); // evict oldest (stale) task to make room + executor.getQueue().offer(runnable); + logger.log( + Level.WARNING, + () -> + "ChannelFinderProcessorExecutor task queue full — evicted oldest task to admit fresher update" + + " (active=" + + executor.getActiveCount() + + ", queued=" + + executor.getQueue().size() + + ")"); + } + }); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 1002887a..84af4758 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -134,6 +134,26 @@ logging.level.org.springframework.web.filter.CommonsRequestLoggingFilter=INFO ################ Processor ################################################## processors.chunking.size=10000 +# ChannelFinderProcessorExecutor: controls the thread pool used to dispatch +# channel processor tasks (e.g. archiver sync on IOC reconnect). +# +# How many batches can be processed concurrently. Each in-flight batch holds +# one thread for the duration of its archiver HTTP calls. Increase this if you +# have more archivers or expect large-scale maintenance windows with many IOC +# restarts happening at once. +# +# Pool parameters are derived from this value: +# corePoolSize = max_concurrent_updates +# maxPoolSize = max_concurrent_updates +# queueCapacity = max(1, max_concurrent_updates / 4) +# +# Advanced: uncomment the task_executor properties below to override +# individual values (values <= 0 mean "use the derived value"). +processors.max_concurrent_updates=10 +# processors.task_executor.core_pool_size=-1 +# processors.task_executor.max_pool_size=-1 +# processors.task_executor.queue_capacity=-1 + ################ Archiver Appliance Configuration Processor ################# aa.urls={'default': 'http://localhost:17665'} # Comma-separated list of archivers to use if archiver_property_name is null diff --git a/src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java b/src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java new file mode 100644 index 00000000..8fead183 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/ChannelFinderProcessorExecutorTest.java @@ -0,0 +1,83 @@ +package org.phoebus.channelfinder.processors; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.phoebus.channelfinder.configuration.ChannelFinderProcessorExecutor; + +class ChannelFinderProcessorExecutorTest { + + @Test + void testDefaultPoolSizing() throws Exception { + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(4, -1, -1, -1); + ex.initialize(); + Assertions.assertEquals(4, ex.getCorePoolSize()); + Assertions.assertEquals(4, ex.getMaxPoolSize()); + Assertions.assertEquals(1, ex.getQueueCapacity()); // max(1, 4/4) = 1 + ex.shutdown(); + } + + @Test + void testMinQueueCapacity() throws Exception { + // max(1, 1/4) = max(1, 0) = 1 + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(1, -1, -1, -1); + ex.initialize(); + Assertions.assertEquals(1, ex.getQueueCapacity()); + ex.shutdown(); + } + + @Test + void testOverridesTakePrecedence() throws Exception { + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(10, 2, 6, 8); + ex.initialize(); + Assertions.assertEquals(2, ex.getCorePoolSize()); + Assertions.assertEquals(6, ex.getMaxPoolSize()); + Assertions.assertEquals(8, ex.getQueueCapacity()); + ex.shutdown(); + } + + @Test + void testRejectionHandlerEvictsOldestAndAdmitsNew() throws Exception { + // 1 thread, queue=1 → third submit triggers the rejection handler + ChannelFinderProcessorExecutor ex = new ChannelFinderProcessorExecutor(1, -1, -1, 1); + ex.initialize(); + + CountDownLatch blocker = new CountDownLatch(1); + CountDownLatch task1Ready = new CountDownLatch(1); + AtomicBoolean task2Ran = new AtomicBoolean(false); + AtomicBoolean task3Ran = new AtomicBoolean(false); + CountDownLatch task3Done = new CountDownLatch(1); + + // task1: blocks the sole thread + ex.execute( + () -> { + task1Ready.countDown(); + try { + blocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + task1Ready.await(2, TimeUnit.SECONDS); + + // task2: sits in queue (the stale task to be evicted) + ex.execute(() -> task2Ran.set(true)); + + // task3: triggers rejection handler → evicts task2, admits task3 + ex.execute( + () -> { + task3Ran.set(true); + task3Done.countDown(); + }); + + blocker.countDown(); // release the blocked thread + Assertions.assertTrue(task3Done.await(2, TimeUnit.SECONDS), "task3 did not complete in time"); + + Assertions.assertTrue(task3Ran.get(), "Newer task must run after eviction"); + Assertions.assertFalse(task2Ran.get(), "Older queued task must be evicted"); + + ex.shutdown(); + } +}