From e2993540efe4e90f2f361cc04e8b722e6aeb300f Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Fri, 29 May 2026 10:59:37 +0200 Subject: [PATCH 1/6] refactor(test): clean up MultiArchiverIT remove duplicate stubs, add missing post verification Co-Authored-By: Claude Sonnet 4.6 --- .../aa/AAChannelProcessorMultiArchiverIT.java | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java index 46a5aa09..583e6e69 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java @@ -108,18 +108,6 @@ void testProcessMultiArchivers( .toList(); when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - // Requests to archiver - actionsToNames.forEach( - (key, value) -> { - when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) value.size()); - }); - - // Request to policies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - - // Request to archiver status - when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - // Requests to archiver actionsToNames.forEach( (key, value) -> { @@ -132,7 +120,8 @@ void testProcessMultiArchivers( verify(archiverService, times(2)).getAAPolicies(anyString()); if (!namesToStatuses.isEmpty()) { - verify(archiverService, times(1)).getStatusesViaGet(anyString(), anyList()); + verify(archiverService).getStatusesViaGet(anyString(), anyList()); + verify(archiverService).getStatusesViaPost(anyString(), anyList()); } } } From 9bed6ae2413261b5c38a12f7f465945f2cd7d984 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Fri, 29 May 2026 11:18:13 +0200 Subject: [PATCH 2/6] feat(aa): cache AA policies with scheduled background refresh Policies are fetched once at startup (@PostConstruct) and refreshed on a configurable fixed delay (aa.policy_refresh_interval_seconds, default 1 h) rather than on every process() call. process() now reads a volatile snapshot, so policy fetches can no longer block or fail mid-run. processorInfo() exposes LastPolicyRefresh and per-archiver policy counts for observability. Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 105 +++++++++++++++++- .../service/external/ArchiverService.java | 11 +- .../model/archiver/aa/ArchiverPolicies.java | 10 ++ src/main/resources/application.properties | 2 + .../processors/aa/AAChannelProcessorIT.java | 12 +- .../aa/AAChannelProcessorMultiArchiverIT.java | 14 ++- .../aa/AAChannelProcessorMultiIT.java | 11 +- .../aa/AAChannelProcessorNoDefaultIT.java | 9 ++ .../aa/AAChannelProcessorNoPauseIT.java | 9 ++ .../aa/AAChannelProcessorPolicyCacheIT.java | 89 +++++++++++++++ .../aa/AAChannelProcessorStatusPauseIT.java | 9 ++ .../aa/AAChannelProcessorTagPauseIT.java | 9 ++ .../service/external/ArchiverServiceTest.java | 4 +- 13 files changed, 264 insertions(+), 30 deletions(-) create mode 100644 src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java create mode 100644 src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 995c5f1d..ffdf0fa4 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -1,12 +1,16 @@ package org.phoebus.channelfinder.configuration; +import jakarta.annotation.PostConstruct; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -20,10 +24,12 @@ import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiverInfo; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchiverPolicies; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; import tools.jackson.core.JacksonException; /** @@ -69,6 +75,12 @@ public class AAChannelProcessor implements ChannelProcessor { @Autowired private ArchiverService archiverService; + @Value("${aa.policy_refresh_interval_seconds:3600}") + private long policyRefreshIntervalSeconds; + + private volatile Map cachedPolicies = Map.of(); + private volatile Instant lastPolicyRefresh; + @Override public boolean enabled() { return aaEnabled; @@ -79,8 +91,24 @@ public void setEnabled(boolean enabled) { this.aaEnabled = enabled; } + @PostConstruct + public void initPolicyCache() { + refreshPolicies(); + } + + @Scheduled( + fixedDelayString = "${aa.policy_refresh_interval_seconds:3600}", + timeUnit = TimeUnit.SECONDS) + public void scheduledPolicyRefresh() { + refreshPolicies(); + } + @Override public ChannelProcessorInfo processorInfo() { + String policyCounts = + cachedPolicies.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining(", ")); return new ChannelProcessorInfo( "AAChannelProcessor", aaEnabled, @@ -92,7 +120,15 @@ public ChannelProcessorInfo processorInfo() { "Archivers", aaURLs.keySet().toString(), "AutoPauseOn", - autoPauseOptions.toString())); + autoPauseOptions.toString(), + "PostSupportArchivers", + postSupportArchivers.toString(), + "PolicyRefreshIntervalSeconds", + String.valueOf(policyRefreshIntervalSeconds), + "LastPolicyRefresh", + lastPolicyRefresh == null ? "never" : lastPolicyRefresh.toString(), + "CachedPoliciesPerArchiver", + policyCounts.isEmpty() ? "none" : policyCounts)); } /** @@ -111,7 +147,7 @@ public long process(List channels) throws JacksonException { return 0; } - Map archiversInfo = getArchiversInfo(aaURLs); + Map archiversInfo = getArchiversInfoFromCache(); if (archiversInfo.isEmpty()) { logger.log( Level.WARNING, @@ -332,14 +368,71 @@ private ArchivePVOptions createArchivePV( return newArchiverPV; } - private Map getArchiversInfo(Map aaURLs) { + private Map getArchiversInfoFromCache() { + Map snapshot = cachedPolicies; return aaURLs.entrySet().stream() - .filter(aa -> !StringUtils.isEmpty(aa.getValue())) + .filter(e -> !StringUtils.isEmpty(e.getValue())) + .filter( + e -> { + if (!snapshot.containsKey(e.getKey())) { + logger.log( + Level.WARNING, + () -> + String.format( + "Archiver '%s' has no cached policies (unreachable at last refresh); skipping.", + e.getKey())); + return false; + } + return true; + }) .collect( Collectors.toMap( Map.Entry::getKey, - aa -> + e -> new ArchiverInfo( - aa.getKey(), aa.getValue(), archiverService.getAAPolicies(aa.getValue())))); + e.getKey(), e.getValue(), snapshot.get(e.getKey()).policies()))); + } + + private void refreshPolicies() { + if (aaURLs.isEmpty()) { + logger.log(Level.FINE, "No archivers configured; skipping policy cache refresh."); + return; + } + Map current = cachedPolicies; + Map updated = new HashMap<>(current); + List changed = new ArrayList<>(); + + for (Map.Entry entry : aaURLs.entrySet()) { + if (StringUtils.isEmpty(entry.getValue())) continue; + String alias = entry.getKey(); + try { + List fresh = archiverService.getAAPolicies(entry.getValue()); + ArchiverPolicies existing = current.get(alias); + if (existing == null || !existing.policies().equals(fresh)) { + updated.put(alias, new ArchiverPolicies(fresh)); + changed.add(alias); + } + } catch (ArchiverServiceException ex) { + logger.log( + Level.WARNING, + () -> + String.format( + "Policy fetch failed for archiver '%s'; retaining cached policies: %s", + alias, ex.getMessage())); + } + } + + lastPolicyRefresh = Instant.now(); + if (!changed.isEmpty()) { + cachedPolicies = Collections.unmodifiableMap(updated); + logger.log( + Level.INFO, + "AA policy cache updated: {0}", + cachedPolicies.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining(", "))); + } else { + logger.log(Level.FINE, "AA policy cache unchanged after refresh."); + } } } diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 30a3a177..18a92081 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -239,15 +239,14 @@ public List getAAPolicies(String aaURL) { .retrieve() .body(new ParameterizedTypeReference<>() {}); if (policyMap == null) { - return List.of(); + throw new ArchiverServiceException("Null response from " + uriString); } return new ArrayList<>(policyMap.keySet()); + } catch (ArchiverServiceException e) { + throw e; } catch (Exception e) { - // problem collecting policies from AA, so warn and return empty list - logger.log( - Level.WARNING, - () -> "Could not get AA policies list from " + aaURL + ": " + e.getMessage()); - return List.of(); + throw new ArchiverServiceException( + "Could not get AA policies from " + aaURL + ": " + e.getMessage(), e); } } } diff --git a/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java b/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java new file mode 100644 index 00000000..d45836af --- /dev/null +++ b/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java @@ -0,0 +1,10 @@ +package org.phoebus.channelfinder.service.model.archiver.aa; + +import java.util.List; + +public record ArchiverPolicies(List policies) { + @Override + public String toString() { + return policies.size() + " policies"; + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 5e4e7da1..52219b79 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -165,6 +165,8 @@ aa.archiver_property_name=archiver aa.timeout_seconds=15 # Connect timeout (seconds); short so unreachable archivers fail fast. aa.connect_timeout_seconds=5 +# Interval in seconds between automatic AA policy cache refreshes (default: 1 hour) +aa.policy_refresh_interval_seconds=3600 # Comma-separated list of archivers to use post support aa.post_support= diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java index d490d9eb..d7c521f1 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java @@ -13,6 +13,7 @@ import java.util.Map; import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -45,6 +46,12 @@ class AAChannelProcessorIT { @MockitoBean ArchiverService archiverService; @Autowired AAChannelProcessor aaChannelProcessor; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + @NotNull private static Stream processSource() { return Stream.of( @@ -101,9 +108,6 @@ public static void paramableAAChannelProcessorTest( String archiveStatus, String archiverEndpoint) throws JacksonException { - // Mock getAAPolicies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - if (!archiveStatus.isEmpty()) { // Mock getStatuses List> archivePVStatuses = @@ -124,8 +128,6 @@ public static void paramableAAChannelProcessorTest( assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size()); // Verifications - verify(archiverService).getAAPolicies(anyString()); - if (!archiveStatus.isEmpty()) { verify(archiverService).getStatusesViaGet(anyString(), anyList()); } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java index 583e6e69..9001e3f2 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java @@ -3,7 +3,6 @@ import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; @@ -15,6 +14,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -39,6 +39,12 @@ class AAChannelProcessorMultiArchiverIT { @Autowired AAChannelProcessor aaChannelProcessor; @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + static Stream provideArguments() { List channels = List.of( @@ -99,8 +105,6 @@ void testProcessMultiArchivers( Map namesToStatuses, Map> actionsToNames) throws JacksonException { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - // Request to archiver status List> archivePVStatuses = namesToStatuses.entrySet().stream() @@ -116,9 +120,7 @@ void testProcessMultiArchivers( aaChannelProcessor.process(channels); - // Verifications - verify(archiverService, times(2)).getAAPolicies(anyString()); - + // Verifications: query archiver uses GET, post archiver uses POST (aa.post_support=post) if (!namesToStatuses.isEmpty()) { verify(archiverService).getStatusesViaGet(anyString(), anyList()); verify(archiverService).getStatusesViaPost(anyString(), anyList()); diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java index 68ac2ec9..98ae8573 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java @@ -16,6 +16,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -42,6 +43,12 @@ class AAChannelProcessorMultiIT { @Autowired AAChannelProcessor aaChannelProcessor; @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + static Stream provideArguments() { List channels = List.of( @@ -104,9 +111,6 @@ void testProcessMulti( int expectedProcessedChannels) throws JacksonException { - // Mock getAAPolicies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - // Mock getStatuses List> archivePVStatuses = namesToStatuses.entrySet().stream() @@ -121,7 +125,6 @@ void testProcessMulti( long count = aaChannelProcessor.process(channels); assertEquals(expectedProcessedChannels, count); - verify(archiverService).getAAPolicies(anyString()); verify(archiverService).getStatusesViaGet(anyString(), anyList()); ArgumentCaptor>> captor = diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java index 38cdb1e4..7c3ff55e 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -30,6 +33,12 @@ class AAChannelProcessorNoDefaultIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java index 9c6342bf..612b1a3e 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -28,6 +31,12 @@ class AAChannelProcessorNoPauseIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java new file mode 100644 index 00000000..a099b886 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java @@ -0,0 +1,89 @@ +package org.phoebus.channelfinder.processors.aa; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.phoebus.channelfinder.configuration.AAChannelProcessor; +import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.service.external.ArchiverService; +import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; +import tools.jackson.core.JacksonException; + +@WebMvcTest(AAChannelProcessor.class) +@ExtendWith(MockitoExtension.class) +@TestPropertySource(value = "classpath:application_aa_proc_test.properties") +class AAChannelProcessorPolicyCacheIT { + + @MockitoBean ArchiverService archiverService; + @Autowired AAChannelProcessor aaChannelProcessor; + + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + + @Test + void testProcessDoesNotCallGetAAPolicies() throws JacksonException { + when(archiverService.getStatusesViaGet(anyString(), anyList())) + .thenReturn(List.of(Map.of("pvName", "PVNoneActive", "status", "Not being archived"))); + when(archiverService.configureAA(anyMap(), anyString())).thenReturn(1L); + clearInvocations(archiverService); + + Channel channel = + new Channel( + "PVNoneActive", + "owner", + List.of( + new Property("archive", "owner", "default"), + new Property("pvStatus", "owner", "Active")), + List.of()); + aaChannelProcessor.process(List.of(channel)); + + verify(archiverService, never()).getAAPolicies(anyString()); + } + + @Test + void testProcessorInfoShowsCacheMetadata() { + ChannelProcessorInfo info = aaChannelProcessor.processorInfo(); + + assertNotEquals("never", info.properties().get("LastPolicyRefresh")); + assertTrue(info.properties().get("CachedPoliciesPerArchiver").contains("default=")); + assertEquals("3600", info.properties().get("PolicyRefreshIntervalSeconds")); + } + + @Test + void testScheduledRefreshUpdatesCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("p1", "p2", "p3")); + clearInvocations(archiverService); + + aaChannelProcessor.scheduledPolicyRefresh(); + + verify(archiverService).getAAPolicies(anyString()); + assertTrue( + aaChannelProcessor + .processorInfo() + .properties() + .get("CachedPoliciesPerArchiver") + .contains("default=3")); + } +} diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java index 86b4f49c..fd48434a 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -28,6 +31,12 @@ class AAChannelProcessorStatusPauseIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java index 1aeafff0..38f46057 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java @@ -1,11 +1,14 @@ package org.phoebus.channelfinder.processors.aa; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import java.util.List; import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -28,6 +31,12 @@ class AAChannelProcessorTagPauseIT { @MockitoBean ArchiverService archiverService; + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + private static Stream processNoPauseSource() { return Stream.of( diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java index 888eb7f0..909561d7 100644 --- a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -306,9 +306,7 @@ void testGetAAPoliciesInvalidResponse() { .andExpect(method(HttpMethod.GET)) .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); - List result = archiverService.getAAPolicies(ARCHIVER_URL); - - assertTrue(result.isEmpty()); + assertThrows(ArchiverServiceException.class, () -> archiverService.getAAPolicies(ARCHIVER_URL)); } @Test From 7d798c4831878258c6bdb9ae122bdd15d23fb7bd Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 2 Jun 2026 12:56:40 +0200 Subject: [PATCH 3/6] Make ChannelProcessorInfo keys consistent --- .../configuration/AAChannelProcessor.java | 16 ++++++++-------- .../aa/AAChannelProcessorPolicyCacheIT.java | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index ffdf0fa4..81f117ea 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -113,21 +113,21 @@ public ChannelProcessorInfo processorInfo() { "AAChannelProcessor", aaEnabled, Map.of( - "archiveProperty", + "archivePropertyName", archivePropertyName, - "archiverProperty", + "archiverPropertyName", archiverPropertyName, - "Archivers", + "archiverURLs", aaURLs.keySet().toString(), - "AutoPauseOn", + "autoPauseOptions", autoPauseOptions.toString(), - "PostSupportArchivers", + "postSupportArchivers", postSupportArchivers.toString(), - "PolicyRefreshIntervalSeconds", + "policyCacheRefreshIntervalSeconds", String.valueOf(policyRefreshIntervalSeconds), - "LastPolicyRefresh", + "lastPolicyRefresh", lastPolicyRefresh == null ? "never" : lastPolicyRefresh.toString(), - "CachedPoliciesPerArchiver", + "cachedPoliciesPerArchiver", policyCounts.isEmpty() ? "none" : policyCounts)); } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java index a099b886..67780f27 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java @@ -66,9 +66,9 @@ void testProcessDoesNotCallGetAAPolicies() throws JacksonException { void testProcessorInfoShowsCacheMetadata() { ChannelProcessorInfo info = aaChannelProcessor.processorInfo(); - assertNotEquals("never", info.properties().get("LastPolicyRefresh")); - assertTrue(info.properties().get("CachedPoliciesPerArchiver").contains("default=")); - assertEquals("3600", info.properties().get("PolicyRefreshIntervalSeconds")); + assertNotEquals("never", info.properties().get("lastPolicyRefresh")); + assertTrue(info.properties().get("cachedPoliciesPerArchiver").contains("default=")); + assertEquals("3600", info.properties().get("policyCacheRefreshIntervalSeconds")); } @Test @@ -83,7 +83,7 @@ void testScheduledRefreshUpdatesCache() { aaChannelProcessor .processorInfo() .properties() - .get("CachedPoliciesPerArchiver") + .get("cachedPoliciesPerArchiver") .contains("default=3")); } } From 1064dfc1d7dff88c8e45883b7451429b538dbeca Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 2 Jun 2026 12:58:17 +0200 Subject: [PATCH 4/6] Remove over descriptive comment --- .../processors/aa/AAChannelProcessorMultiArchiverIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java index 9001e3f2..64449f64 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java @@ -120,7 +120,7 @@ void testProcessMultiArchivers( aaChannelProcessor.process(channels); - // Verifications: query archiver uses GET, post archiver uses POST (aa.post_support=post) + // Verifications if (!namesToStatuses.isEmpty()) { verify(archiverService).getStatusesViaGet(anyString(), anyList()); verify(archiverService).getStatusesViaPost(anyString(), anyList()); From ca32ec82616d26cd55446191750996d40595a05a Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 2 Jun 2026 13:10:18 +0200 Subject: [PATCH 5/6] refactor(test): consolidate AA processor IT tests Create a base class AAChannelProcessorBaseIT with shared fields (archiveProperty, activeProperty, inactiveProperty), @MockitoBean/@Autowired declarations, @BeforeEach primeCache(), and the paramableAAChannelProcessorTest helper. Co-Authored-By: Claude Sonnet 4.6 --- .../aa/AAChannelProcessorBaseIT.java | 86 +++++++++++++++++ .../processors/aa/AAChannelProcessorIT.java | 96 +------------------ .../aa/AAChannelProcessorMultiArchiverIT.java | 20 +--- .../aa/AAChannelProcessorMultiIT.java | 19 +--- .../aa/AAChannelProcessorNoDefaultIT.java | 27 +----- .../aa/AAChannelProcessorNoPauseIT.java | 26 +---- .../aa/AAChannelProcessorPolicyCacheIT.java | 15 +-- .../aa/AAChannelProcessorStatusPauseIT.java | 26 +---- .../aa/AAChannelProcessorTagPauseIT.java | 26 +---- 9 files changed, 100 insertions(+), 241 deletions(-) create mode 100644 src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorBaseIT.java diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorBaseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorBaseIT.java new file mode 100644 index 00000000..af365330 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorBaseIT.java @@ -0,0 +1,86 @@ +package org.phoebus.channelfinder.processors.aa; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.ArgumentCaptor; +import org.phoebus.channelfinder.configuration.AAChannelProcessor; +import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.service.external.ArchiverService; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.bean.override.mockito.MockitoBean; +import tools.jackson.core.JacksonException; + +abstract class AAChannelProcessorBaseIT { + + protected static Property archiveProperty = new Property("archive", "owner", "default"); + protected static Property activeProperty = new Property("pvStatus", "owner", "Active"); + protected static Property inactiveProperty = new Property("pvStatus", "owner", "Inactive"); + + @MockitoBean protected ArchiverService archiverService; + @Autowired protected AAChannelProcessor aaChannelProcessor; + + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + + protected void paramableAAChannelProcessorTest( + List channels, String archiveStatus, String archiverEndpoint) + throws JacksonException { + if (!archiveStatus.isEmpty()) { + List> archivePVStatuses = + channels.stream() + .map(channel -> Map.of("pvName", channel.getName(), "status", archiveStatus)) + .toList(); + when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); + } + + if (!archiverEndpoint.isEmpty()) { + when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) channels.size()); + } else { + when(archiverService.configureAA(anyMap(), anyString())).thenReturn(0L); + } + + long count = aaChannelProcessor.process(channels); + assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size()); + + if (!archiveStatus.isEmpty()) { + verify(archiverService).getStatusesViaGet(anyString(), anyList()); + } + + if (!archiverEndpoint.isEmpty()) { + ArgumentCaptor>> captor = + ArgumentCaptor.forClass(Map.class); + verify(archiverService).configureAA(captor.capture(), anyString()); + Map> map = captor.getValue(); + + ArchiveAction expectedAction = getActionFromEndpoint(archiverEndpoint); + if (expectedAction != null) { + assertTrue(map.containsKey(expectedAction)); + List options = map.get(expectedAction); + assertFalse(options.isEmpty()); + } + } + } + + private static ArchiveAction getActionFromEndpoint(String endpoint) { + if (endpoint.contains("resumeArchivingPV")) return ArchiveAction.RESUME; + if (endpoint.contains("pauseArchivingPV")) return ArchiveAction.PAUSE; + if (endpoint.contains("archivePV")) return ArchiveAction.ARCHIVE; + return null; + } +} diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java index d7c521f1..ff33b9c3 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java @@ -1,56 +1,24 @@ package org.phoebus.channelfinder.processors.aa; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import java.util.List; -import java.util.Map; import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.ArgumentCaptor; import org.mockito.junit.jupiter.MockitoExtension; import org.phoebus.channelfinder.configuration.AAChannelProcessor; -import org.phoebus.channelfinder.configuration.ChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.entity.Property; -import org.phoebus.channelfinder.service.external.ArchiverService; -import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; -import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @ExtendWith(MockitoExtension.class) @TestPropertySource(value = "classpath:application_aa_proc_test.properties") -class AAChannelProcessorIT { - - protected static Property archiveProperty = new Property("archive", "owner", "default"); - protected static Property activeProperty = new Property("pvStatus", "owner", "Active"); - protected static Property inactiveProperty = new Property("pvStatus", "owner", "Inactive"); - - @MockitoBean ArchiverService archiverService; - @Autowired AAChannelProcessor aaChannelProcessor; - - @BeforeEach - void primeCache() { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - aaChannelProcessor.scheduledPolicyRefresh(); - } +class AAChannelProcessorIT extends AAChannelProcessorBaseIT { @NotNull private static Stream processSource() { @@ -101,75 +69,15 @@ private static Stream processSource() { "[\"PVArchivedNotag\"]")); } - public static void paramableAAChannelProcessorTest( - ArchiverService archiverService, - ChannelProcessor aaChannelProcessor, - List channels, - String archiveStatus, - String archiverEndpoint) - throws JacksonException { - if (!archiveStatus.isEmpty()) { - // Mock getStatuses - List> archivePVStatuses = - channels.stream() - .map(channel -> Map.of("pvName", channel.getName(), "status", archiveStatus)) - .toList(); - when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - } - - if (!archiverEndpoint.isEmpty()) { - // Mock configureAA - when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) channels.size()); - } else { - when(archiverService.configureAA(anyMap(), anyString())).thenReturn(0L); - } - - long count = aaChannelProcessor.process(channels); - assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size()); - - // Verifications - if (!archiveStatus.isEmpty()) { - verify(archiverService).getStatusesViaGet(anyString(), anyList()); - } - - if (!archiverEndpoint.isEmpty()) { - ArgumentCaptor>> captor = - ArgumentCaptor.forClass(Map.class); - verify(archiverService).configureAA(captor.capture(), anyString()); - Map> map = captor.getValue(); - - ArchiveAction expectedAction = getActionFromEndpoint(archiverEndpoint); - if (expectedAction != null) { - assertTrue(map.containsKey(expectedAction)); - List options = map.get(expectedAction); - assertFalse(options.isEmpty()); - // We could parse submissionBody to be more strict, but checking the action is likely - // sufficient for now - // as we trust the mapping logic in AAChannelProcessor - } - } - } - - private static ArchiveAction getActionFromEndpoint(String endpoint) { - if (endpoint.contains("resumeArchivingPV")) return ArchiveAction.RESUME; - if (endpoint.contains("pauseArchivingPV")) return ArchiveAction.PAUSE; - if (endpoint.contains("archivePV")) return ArchiveAction.ARCHIVE; - return null; - } - @Test void testProcessNoPVs() throws JacksonException { aaChannelProcessor.process(List.of()); - - // verify interactions are minimal or none if empty - // But since list is empty, process returns 0 early } @ParameterizedTest @MethodSource("processSource") void testProcessNotArchivedActive(Channel channel, String archiveStatus, String archiverEndpoint) throws JacksonException { - paramableAAChannelProcessorTest( - archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); + paramableAAChannelProcessorTest(List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java index 64449f64..3ff08abf 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java @@ -5,45 +5,30 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource(value = "classpath:application_test_multi.properties") -class AAChannelProcessorMultiArchiverIT { +class AAChannelProcessorMultiArchiverIT extends AAChannelProcessorBaseIT { public static final String BEING_ARCHIVED = "Being archived"; public static final String PAUSED = "Paused"; public static final String NOT_BEING_ARCHIVED = "Not being archived"; public static final String OWNER = "owner"; - @Autowired AAChannelProcessor aaChannelProcessor; - @MockitoBean ArchiverService archiverService; - - @BeforeEach - void primeCache() { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - aaChannelProcessor.scheduledPolicyRefresh(); - } static Stream provideArguments() { List channels = @@ -105,14 +90,12 @@ void testProcessMultiArchivers( Map namesToStatuses, Map> actionsToNames) throws JacksonException { - // Request to archiver status List> archivePVStatuses = namesToStatuses.entrySet().stream() .map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())) .toList(); when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - // Requests to archiver actionsToNames.forEach( (key, value) -> { when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) value.size()); @@ -120,7 +103,6 @@ void testProcessMultiArchivers( aaChannelProcessor.process(channels); - // Verifications if (!namesToStatuses.isEmpty()) { verify(archiverService).getStatusesViaGet(anyString(), anyList()); verify(archiverService).getStatusesViaPost(anyString(), anyList()); diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java index 98ae8573..122b3f1f 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java @@ -7,47 +7,32 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource(value = "classpath:application_aa_proc_test.properties") -class AAChannelProcessorMultiIT { +class AAChannelProcessorMultiIT extends AAChannelProcessorBaseIT { public static final String BEING_ARCHIVED = "Being archived"; public static final String PAUSED = "Paused"; public static final String NOT_BEING_ARCHIVED = "Not being archived"; public static final String OWNER = "owner"; - @Autowired AAChannelProcessor aaChannelProcessor; - @MockitoBean ArchiverService archiverService; - - @BeforeEach - void primeCache() { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - aaChannelProcessor.scheduledPolicyRefresh(); - } static Stream provideArguments() { List channels = @@ -111,14 +96,12 @@ void testProcessMulti( int expectedProcessedChannels) throws JacksonException { - // Mock getStatuses List> archivePVStatuses = namesToStatuses.entrySet().stream() .map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())) .toList(); when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - // Mock configureAA when(archiverService.configureAA(anyMap(), anyString())) .thenReturn((long) expectedProcessedChannels); diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java index 7c3ff55e..f2c9ba43 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java @@ -1,46 +1,26 @@ package org.phoebus.channelfinder.processors.aa; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.when; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; - import java.util.List; import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; -import org.phoebus.channelfinder.service.external.ArchiverService; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( locations = "classpath:application_aa_proc_test.properties", properties = "aa.urls:{'default': '','aa': 'http://localhost:17665'}") -class AAChannelProcessorNoDefaultIT { - protected static Property archiverProperty = new Property("archiver", "owner", "aa"); - - @Autowired AAChannelProcessor aaChannelProcessor; - - @MockitoBean ArchiverService archiverService; +class AAChannelProcessorNoDefaultIT extends AAChannelProcessorBaseIT { - @BeforeEach - void primeCache() { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - aaChannelProcessor.scheduledPolicyRefresh(); - } + protected static Property archiverProperty = new Property("archiver", "owner", "aa"); private static Stream processNoPauseSource() { - return Stream.of( Arguments.of( new Channel( @@ -64,7 +44,6 @@ private static Stream processNoPauseSource() { void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) throws JacksonException { - paramableAAChannelProcessorTest( - archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); + paramableAAChannelProcessorTest(List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java index 612b1a3e..83bae401 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java @@ -1,44 +1,23 @@ package org.phoebus.channelfinder.processors.aa; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.when; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; - import java.util.List; import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.service.external.ArchiverService; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( locations = "classpath:application_aa_proc_test.properties", properties = "aa.auto_pause=none") -class AAChannelProcessorNoPauseIT { - - @Autowired AAChannelProcessor aaChannelProcessor; - - @MockitoBean ArchiverService archiverService; - - @BeforeEach - void primeCache() { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - aaChannelProcessor.scheduledPolicyRefresh(); - } +class AAChannelProcessorNoPauseIT extends AAChannelProcessorBaseIT { private static Stream processNoPauseSource() { - return Stream.of( Arguments.of( new Channel( @@ -57,7 +36,6 @@ private static Stream processNoPauseSource() { void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) throws JacksonException { - paramableAAChannelProcessorTest( - archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); + paramableAAChannelProcessorTest(List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java index 67780f27..72f6215a 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java @@ -13,34 +13,21 @@ import java.util.List; import java.util.Map; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; -import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @ExtendWith(MockitoExtension.class) @TestPropertySource(value = "classpath:application_aa_proc_test.properties") -class AAChannelProcessorPolicyCacheIT { - - @MockitoBean ArchiverService archiverService; - @Autowired AAChannelProcessor aaChannelProcessor; - - @BeforeEach - void primeCache() { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - aaChannelProcessor.scheduledPolicyRefresh(); - } +class AAChannelProcessorPolicyCacheIT extends AAChannelProcessorBaseIT { @Test void testProcessDoesNotCallGetAAPolicies() throws JacksonException { diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java index fd48434a..6833493c 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java @@ -1,44 +1,23 @@ package org.phoebus.channelfinder.processors.aa; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.when; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; - import java.util.List; import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.service.external.ArchiverService; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( locations = "classpath:application_aa_proc_test.properties", properties = "aa.auto_pause=pvStatus") -class AAChannelProcessorStatusPauseIT { - - @Autowired AAChannelProcessor aaChannelProcessor; - - @MockitoBean ArchiverService archiverService; - - @BeforeEach - void primeCache() { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - aaChannelProcessor.scheduledPolicyRefresh(); - } +class AAChannelProcessorStatusPauseIT extends AAChannelProcessorBaseIT { private static Stream processNoPauseSource() { - return Stream.of( Arguments.of( new Channel( @@ -57,7 +36,6 @@ private static Stream processNoPauseSource() { void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) throws JacksonException { - paramableAAChannelProcessorTest( - archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); + paramableAAChannelProcessorTest(List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java index 38f46057..d3567fcb 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java @@ -1,44 +1,23 @@ package org.phoebus.channelfinder.processors.aa; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.when; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; - import java.util.List; import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.service.external.ArchiverService; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( locations = "classpath:application_aa_proc_test.properties", properties = "aa.auto_pause=archive") -class AAChannelProcessorTagPauseIT { - - @Autowired AAChannelProcessor aaChannelProcessor; - - @MockitoBean ArchiverService archiverService; - - @BeforeEach - void primeCache() { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - aaChannelProcessor.scheduledPolicyRefresh(); - } +class AAChannelProcessorTagPauseIT extends AAChannelProcessorBaseIT { private static Stream processNoPauseSource() { - return Stream.of( Arguments.of( new Channel( @@ -61,7 +40,6 @@ private static Stream processNoPauseSource() { void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) throws JacksonException { - paramableAAChannelProcessorTest( - archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); + paramableAAChannelProcessorTest(List.of(channel), archiveStatus, archiverEndpoint); } } From eb7226acfeb63b8c75b12033f561dc5704609e7f Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 2 Jun 2026 13:52:54 +0200 Subject: [PATCH 6/6] refactor(aa): policy string representation Make a method cachedPoliciesRepresentation to make printing of cachedPolicies more DRY --- .../configuration/AAChannelProcessor.java | 19 +++++++++---------- .../model/archiver/aa/ArchiverPolicies.java | 7 +------ .../aa/AAChannelProcessorPolicyCacheIT.java | 9 +++------ 3 files changed, 13 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 81f117ea..c2566a8f 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -105,10 +105,7 @@ public void scheduledPolicyRefresh() { @Override public ChannelProcessorInfo processorInfo() { - String policyCounts = - cachedPolicies.entrySet().stream() - .map(e -> e.getKey() + "=" + e.getValue()) - .collect(Collectors.joining(", ")); + String cachedPoliciesStringValue = cachedPoliciesRepresentation(cachedPolicies); return new ChannelProcessorInfo( "AAChannelProcessor", aaEnabled, @@ -128,7 +125,7 @@ public ChannelProcessorInfo processorInfo() { "lastPolicyRefresh", lastPolicyRefresh == null ? "never" : lastPolicyRefresh.toString(), "cachedPoliciesPerArchiver", - policyCounts.isEmpty() ? "none" : policyCounts)); + cachedPoliciesStringValue.isEmpty() ? "none" : cachedPoliciesStringValue)); } /** @@ -393,6 +390,12 @@ private Map getArchiversInfoFromCache() { e.getKey(), e.getValue(), snapshot.get(e.getKey()).policies()))); } + private static String cachedPoliciesRepresentation(Map policies) { + return policies.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining(", ")); + } + private void refreshPolicies() { if (aaURLs.isEmpty()) { logger.log(Level.FINE, "No archivers configured; skipping policy cache refresh."); @@ -426,11 +429,7 @@ private void refreshPolicies() { if (!changed.isEmpty()) { cachedPolicies = Collections.unmodifiableMap(updated); logger.log( - Level.INFO, - "AA policy cache updated: {0}", - cachedPolicies.entrySet().stream() - .map(e -> e.getKey() + "=" + e.getValue()) - .collect(Collectors.joining(", "))); + Level.INFO, "AA policy cache updated: {0}", cachedPoliciesRepresentation(cachedPolicies)); } else { logger.log(Level.FINE, "AA policy cache unchanged after refresh."); } diff --git a/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java b/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java index d45836af..458513db 100644 --- a/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java +++ b/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java @@ -2,9 +2,4 @@ import java.util.List; -public record ArchiverPolicies(List policies) { - @Override - public String toString() { - return policies.size() + " policies"; - } -} +public record ArchiverPolicies(List policies) {} diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java index 72f6215a..c3692e5e 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java @@ -66,11 +66,8 @@ void testScheduledRefreshUpdatesCache() { aaChannelProcessor.scheduledPolicyRefresh(); verify(archiverService).getAAPolicies(anyString()); - assertTrue( - aaChannelProcessor - .processorInfo() - .properties() - .get("cachedPoliciesPerArchiver") - .contains("default=3")); + String cachedPoliciePerArchiver = + aaChannelProcessor.processorInfo().properties().get("cachedPoliciesPerArchiver"); + assertTrue(cachedPoliciePerArchiver.contains("default=ArchiverPolicies[policies=[p1, p2, p3]")); } }