diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 995c5f1d..c2566a8f 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -1,12 +1,16 @@ package org.phoebus.channelfinder.configuration; +import jakarta.annotation.PostConstruct; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -20,10 +24,12 @@ import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiverInfo; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchiverPolicies; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.Scheduled; import tools.jackson.core.JacksonException; /** @@ -69,6 +75,12 @@ public class AAChannelProcessor implements ChannelProcessor { @Autowired private ArchiverService archiverService; + @Value("${aa.policy_refresh_interval_seconds:3600}") + private long policyRefreshIntervalSeconds; + + private volatile Map cachedPolicies = Map.of(); + private volatile Instant lastPolicyRefresh; + @Override public boolean enabled() { return aaEnabled; @@ -79,20 +91,41 @@ public void setEnabled(boolean enabled) { this.aaEnabled = enabled; } + @PostConstruct + public void initPolicyCache() { + refreshPolicies(); + } + + @Scheduled( + fixedDelayString = "${aa.policy_refresh_interval_seconds:3600}", + timeUnit = TimeUnit.SECONDS) + public void scheduledPolicyRefresh() { + refreshPolicies(); + } + @Override public ChannelProcessorInfo processorInfo() { + String cachedPoliciesStringValue = cachedPoliciesRepresentation(cachedPolicies); return new ChannelProcessorInfo( "AAChannelProcessor", aaEnabled, Map.of( - "archiveProperty", + "archivePropertyName", archivePropertyName, - "archiverProperty", + "archiverPropertyName", archiverPropertyName, - "Archivers", + "archiverURLs", aaURLs.keySet().toString(), - "AutoPauseOn", - autoPauseOptions.toString())); + "autoPauseOptions", + autoPauseOptions.toString(), + "postSupportArchivers", + postSupportArchivers.toString(), + "policyCacheRefreshIntervalSeconds", + String.valueOf(policyRefreshIntervalSeconds), + "lastPolicyRefresh", + lastPolicyRefresh == null ? "never" : lastPolicyRefresh.toString(), + "cachedPoliciesPerArchiver", + cachedPoliciesStringValue.isEmpty() ? "none" : cachedPoliciesStringValue)); } /** @@ -111,7 +144,7 @@ public long process(List channels) throws JacksonException { return 0; } - Map archiversInfo = getArchiversInfo(aaURLs); + Map archiversInfo = getArchiversInfoFromCache(); if (archiversInfo.isEmpty()) { logger.log( Level.WARNING, @@ -332,14 +365,73 @@ private ArchivePVOptions createArchivePV( return newArchiverPV; } - private Map getArchiversInfo(Map aaURLs) { + private Map getArchiversInfoFromCache() { + Map snapshot = cachedPolicies; return aaURLs.entrySet().stream() - .filter(aa -> !StringUtils.isEmpty(aa.getValue())) + .filter(e -> !StringUtils.isEmpty(e.getValue())) + .filter( + e -> { + if (!snapshot.containsKey(e.getKey())) { + logger.log( + Level.WARNING, + () -> + String.format( + "Archiver '%s' has no cached policies (unreachable at last refresh); skipping.", + e.getKey())); + return false; + } + return true; + }) .collect( Collectors.toMap( Map.Entry::getKey, - aa -> + e -> new ArchiverInfo( - aa.getKey(), aa.getValue(), archiverService.getAAPolicies(aa.getValue())))); + e.getKey(), e.getValue(), snapshot.get(e.getKey()).policies()))); + } + + private static String cachedPoliciesRepresentation(Map policies) { + return policies.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining(", ")); + } + + private void refreshPolicies() { + if (aaURLs.isEmpty()) { + logger.log(Level.FINE, "No archivers configured; skipping policy cache refresh."); + return; + } + Map current = cachedPolicies; + Map updated = new HashMap<>(current); + List changed = new ArrayList<>(); + + for (Map.Entry entry : aaURLs.entrySet()) { + if (StringUtils.isEmpty(entry.getValue())) continue; + String alias = entry.getKey(); + try { + List fresh = archiverService.getAAPolicies(entry.getValue()); + ArchiverPolicies existing = current.get(alias); + if (existing == null || !existing.policies().equals(fresh)) { + updated.put(alias, new ArchiverPolicies(fresh)); + changed.add(alias); + } + } catch (ArchiverServiceException ex) { + logger.log( + Level.WARNING, + () -> + String.format( + "Policy fetch failed for archiver '%s'; retaining cached policies: %s", + alias, ex.getMessage())); + } + } + + lastPolicyRefresh = Instant.now(); + if (!changed.isEmpty()) { + cachedPolicies = Collections.unmodifiableMap(updated); + logger.log( + Level.INFO, "AA policy cache updated: {0}", cachedPoliciesRepresentation(cachedPolicies)); + } else { + logger.log(Level.FINE, "AA policy cache unchanged after refresh."); + } } } diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 30a3a177..18a92081 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -239,15 +239,14 @@ public List getAAPolicies(String aaURL) { .retrieve() .body(new ParameterizedTypeReference<>() {}); if (policyMap == null) { - return List.of(); + throw new ArchiverServiceException("Null response from " + uriString); } return new ArrayList<>(policyMap.keySet()); + } catch (ArchiverServiceException e) { + throw e; } catch (Exception e) { - // problem collecting policies from AA, so warn and return empty list - logger.log( - Level.WARNING, - () -> "Could not get AA policies list from " + aaURL + ": " + e.getMessage()); - return List.of(); + throw new ArchiverServiceException( + "Could not get AA policies from " + aaURL + ": " + e.getMessage(), e); } } } diff --git a/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java b/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java new file mode 100644 index 00000000..458513db --- /dev/null +++ b/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverPolicies.java @@ -0,0 +1,5 @@ +package org.phoebus.channelfinder.service.model.archiver.aa; + +import java.util.List; + +public record ArchiverPolicies(List policies) {} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 5e4e7da1..52219b79 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -165,6 +165,8 @@ aa.archiver_property_name=archiver aa.timeout_seconds=15 # Connect timeout (seconds); short so unreachable archivers fail fast. aa.connect_timeout_seconds=5 +# Interval in seconds between automatic AA policy cache refreshes (default: 1 hour) +aa.policy_refresh_interval_seconds=3600 # Comma-separated list of archivers to use post support aa.post_support= diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorBaseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorBaseIT.java new file mode 100644 index 00000000..af365330 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorBaseIT.java @@ -0,0 +1,86 @@ +package org.phoebus.channelfinder.processors.aa; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.BeforeEach; +import org.mockito.ArgumentCaptor; +import org.phoebus.channelfinder.configuration.AAChannelProcessor; +import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.service.external.ArchiverService; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.bean.override.mockito.MockitoBean; +import tools.jackson.core.JacksonException; + +abstract class AAChannelProcessorBaseIT { + + protected static Property archiveProperty = new Property("archive", "owner", "default"); + protected static Property activeProperty = new Property("pvStatus", "owner", "Active"); + protected static Property inactiveProperty = new Property("pvStatus", "owner", "Inactive"); + + @MockitoBean protected ArchiverService archiverService; + @Autowired protected AAChannelProcessor aaChannelProcessor; + + @BeforeEach + void primeCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); + aaChannelProcessor.scheduledPolicyRefresh(); + } + + protected void paramableAAChannelProcessorTest( + List channels, String archiveStatus, String archiverEndpoint) + throws JacksonException { + if (!archiveStatus.isEmpty()) { + List> archivePVStatuses = + channels.stream() + .map(channel -> Map.of("pvName", channel.getName(), "status", archiveStatus)) + .toList(); + when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); + } + + if (!archiverEndpoint.isEmpty()) { + when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) channels.size()); + } else { + when(archiverService.configureAA(anyMap(), anyString())).thenReturn(0L); + } + + long count = aaChannelProcessor.process(channels); + assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size()); + + if (!archiveStatus.isEmpty()) { + verify(archiverService).getStatusesViaGet(anyString(), anyList()); + } + + if (!archiverEndpoint.isEmpty()) { + ArgumentCaptor>> captor = + ArgumentCaptor.forClass(Map.class); + verify(archiverService).configureAA(captor.capture(), anyString()); + Map> map = captor.getValue(); + + ArchiveAction expectedAction = getActionFromEndpoint(archiverEndpoint); + if (expectedAction != null) { + assertTrue(map.containsKey(expectedAction)); + List options = map.get(expectedAction); + assertFalse(options.isEmpty()); + } + } + } + + private static ArchiveAction getActionFromEndpoint(String endpoint) { + if (endpoint.contains("resumeArchivingPV")) return ArchiveAction.RESUME; + if (endpoint.contains("pauseArchivingPV")) return ArchiveAction.PAUSE; + if (endpoint.contains("archivePV")) return ArchiveAction.ARCHIVE; + return null; + } +} diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java index d490d9eb..ff33b9c3 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java @@ -1,16 +1,6 @@ package org.phoebus.channelfinder.processors.aa; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import java.util.List; -import java.util.Map; import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; @@ -18,32 +8,17 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.ArgumentCaptor; import org.mockito.junit.jupiter.MockitoExtension; import org.phoebus.channelfinder.configuration.AAChannelProcessor; -import org.phoebus.channelfinder.configuration.ChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.entity.Property; -import org.phoebus.channelfinder.service.external.ArchiverService; -import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; -import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @ExtendWith(MockitoExtension.class) @TestPropertySource(value = "classpath:application_aa_proc_test.properties") -class AAChannelProcessorIT { - - protected static Property archiveProperty = new Property("archive", "owner", "default"); - protected static Property activeProperty = new Property("pvStatus", "owner", "Active"); - protected static Property inactiveProperty = new Property("pvStatus", "owner", "Inactive"); - - @MockitoBean ArchiverService archiverService; - @Autowired AAChannelProcessor aaChannelProcessor; +class AAChannelProcessorIT extends AAChannelProcessorBaseIT { @NotNull private static Stream processSource() { @@ -94,80 +69,15 @@ private static Stream processSource() { "[\"PVArchivedNotag\"]")); } - public static void paramableAAChannelProcessorTest( - ArchiverService archiverService, - ChannelProcessor aaChannelProcessor, - List channels, - String archiveStatus, - String archiverEndpoint) - throws JacksonException { - // Mock getAAPolicies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - - if (!archiveStatus.isEmpty()) { - // Mock getStatuses - List> archivePVStatuses = - channels.stream() - .map(channel -> Map.of("pvName", channel.getName(), "status", archiveStatus)) - .toList(); - when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - } - - if (!archiverEndpoint.isEmpty()) { - // Mock configureAA - when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) channels.size()); - } else { - when(archiverService.configureAA(anyMap(), anyString())).thenReturn(0L); - } - - long count = aaChannelProcessor.process(channels); - assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size()); - - // Verifications - verify(archiverService).getAAPolicies(anyString()); - - if (!archiveStatus.isEmpty()) { - verify(archiverService).getStatusesViaGet(anyString(), anyList()); - } - - if (!archiverEndpoint.isEmpty()) { - ArgumentCaptor>> captor = - ArgumentCaptor.forClass(Map.class); - verify(archiverService).configureAA(captor.capture(), anyString()); - Map> map = captor.getValue(); - - ArchiveAction expectedAction = getActionFromEndpoint(archiverEndpoint); - if (expectedAction != null) { - assertTrue(map.containsKey(expectedAction)); - List options = map.get(expectedAction); - assertFalse(options.isEmpty()); - // We could parse submissionBody to be more strict, but checking the action is likely - // sufficient for now - // as we trust the mapping logic in AAChannelProcessor - } - } - } - - private static ArchiveAction getActionFromEndpoint(String endpoint) { - if (endpoint.contains("resumeArchivingPV")) return ArchiveAction.RESUME; - if (endpoint.contains("pauseArchivingPV")) return ArchiveAction.PAUSE; - if (endpoint.contains("archivePV")) return ArchiveAction.ARCHIVE; - return null; - } - @Test void testProcessNoPVs() throws JacksonException { aaChannelProcessor.process(List.of()); - - // verify interactions are minimal or none if empty - // But since list is empty, process returns 0 early } @ParameterizedTest @MethodSource("processSource") void testProcessNotArchivedActive(Channel channel, String archiveStatus, String archiverEndpoint) throws JacksonException { - paramableAAChannelProcessorTest( - archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); + paramableAAChannelProcessorTest(List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java index 46a5aa09..3ff08abf 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java @@ -3,12 +3,8 @@ import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import java.util.List; import java.util.Map; @@ -20,24 +16,19 @@ import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource(value = "classpath:application_test_multi.properties") -class AAChannelProcessorMultiArchiverIT { +class AAChannelProcessorMultiArchiverIT extends AAChannelProcessorBaseIT { public static final String BEING_ARCHIVED = "Being archived"; public static final String PAUSED = "Paused"; public static final String NOT_BEING_ARCHIVED = "Not being archived"; public static final String OWNER = "owner"; - @Autowired AAChannelProcessor aaChannelProcessor; - @MockitoBean ArchiverService archiverService; static Stream provideArguments() { List channels = @@ -99,28 +90,12 @@ void testProcessMultiArchivers( Map namesToStatuses, Map> actionsToNames) throws JacksonException { - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - - // Request to archiver status List> archivePVStatuses = namesToStatuses.entrySet().stream() .map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())) .toList(); when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - // Requests to archiver - actionsToNames.forEach( - (key, value) -> { - when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) value.size()); - }); - - // Request to policies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - - // Request to archiver status - when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - - // Requests to archiver actionsToNames.forEach( (key, value) -> { when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) value.size()); @@ -128,11 +103,9 @@ void testProcessMultiArchivers( aaChannelProcessor.process(channels); - // Verifications - verify(archiverService, times(2)).getAAPolicies(anyString()); - if (!namesToStatuses.isEmpty()) { - verify(archiverService, times(1)).getStatusesViaGet(anyString(), anyList()); + verify(archiverService).getStatusesViaGet(anyString(), anyList()); + verify(archiverService).getStatusesViaPost(anyString(), anyList()); } } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java index 68ac2ec9..122b3f1f 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java @@ -7,9 +7,6 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import java.util.List; import java.util.Map; @@ -22,25 +19,20 @@ import org.mockito.ArgumentCaptor; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource(value = "classpath:application_aa_proc_test.properties") -class AAChannelProcessorMultiIT { +class AAChannelProcessorMultiIT extends AAChannelProcessorBaseIT { public static final String BEING_ARCHIVED = "Being archived"; public static final String PAUSED = "Paused"; public static final String NOT_BEING_ARCHIVED = "Not being archived"; public static final String OWNER = "owner"; - @Autowired AAChannelProcessor aaChannelProcessor; - @MockitoBean ArchiverService archiverService; static Stream provideArguments() { List channels = @@ -104,24 +96,18 @@ void testProcessMulti( int expectedProcessedChannels) throws JacksonException { - // Mock getAAPolicies - when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - - // Mock getStatuses List> archivePVStatuses = namesToStatuses.entrySet().stream() .map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())) .toList(); when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses); - // Mock configureAA when(archiverService.configureAA(anyMap(), anyString())) .thenReturn((long) expectedProcessedChannels); long count = aaChannelProcessor.process(channels); assertEquals(expectedProcessedChannels, count); - verify(archiverService).getAAPolicies(anyString()); verify(archiverService).getStatusesViaGet(anyString(), anyList()); ArgumentCaptor>> captor = diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java index 38cdb1e4..f2c9ba43 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java @@ -1,9 +1,5 @@ package org.phoebus.channelfinder.processors.aa; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; - import java.util.List; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -12,26 +8,19 @@ import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; -import org.phoebus.channelfinder.service.external.ArchiverService; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( locations = "classpath:application_aa_proc_test.properties", properties = "aa.urls:{'default': '','aa': 'http://localhost:17665'}") -class AAChannelProcessorNoDefaultIT { - protected static Property archiverProperty = new Property("archiver", "owner", "aa"); - - @Autowired AAChannelProcessor aaChannelProcessor; +class AAChannelProcessorNoDefaultIT extends AAChannelProcessorBaseIT { - @MockitoBean ArchiverService archiverService; + protected static Property archiverProperty = new Property("archiver", "owner", "aa"); private static Stream processNoPauseSource() { - return Stream.of( Arguments.of( new Channel( @@ -55,7 +44,6 @@ private static Stream processNoPauseSource() { void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) throws JacksonException { - paramableAAChannelProcessorTest( - archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); + paramableAAChannelProcessorTest(List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java index 9c6342bf..83bae401 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java @@ -1,9 +1,5 @@ package org.phoebus.channelfinder.processors.aa; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; - import java.util.List; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -11,25 +7,17 @@ import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.service.external.ArchiverService; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( locations = "classpath:application_aa_proc_test.properties", properties = "aa.auto_pause=none") -class AAChannelProcessorNoPauseIT { - - @Autowired AAChannelProcessor aaChannelProcessor; - - @MockitoBean ArchiverService archiverService; +class AAChannelProcessorNoPauseIT extends AAChannelProcessorBaseIT { private static Stream processNoPauseSource() { - return Stream.of( Arguments.of( new Channel( @@ -48,7 +36,6 @@ private static Stream processNoPauseSource() { void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) throws JacksonException { - paramableAAChannelProcessorTest( - archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); + paramableAAChannelProcessorTest(List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java new file mode 100644 index 00000000..c3692e5e --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorPolicyCacheIT.java @@ -0,0 +1,73 @@ +package org.phoebus.channelfinder.processors.aa; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.phoebus.channelfinder.configuration.AAChannelProcessor; +import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; +import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; +import org.springframework.test.context.TestPropertySource; +import tools.jackson.core.JacksonException; + +@WebMvcTest(AAChannelProcessor.class) +@ExtendWith(MockitoExtension.class) +@TestPropertySource(value = "classpath:application_aa_proc_test.properties") +class AAChannelProcessorPolicyCacheIT extends AAChannelProcessorBaseIT { + + @Test + void testProcessDoesNotCallGetAAPolicies() throws JacksonException { + when(archiverService.getStatusesViaGet(anyString(), anyList())) + .thenReturn(List.of(Map.of("pvName", "PVNoneActive", "status", "Not being archived"))); + when(archiverService.configureAA(anyMap(), anyString())).thenReturn(1L); + clearInvocations(archiverService); + + Channel channel = + new Channel( + "PVNoneActive", + "owner", + List.of( + new Property("archive", "owner", "default"), + new Property("pvStatus", "owner", "Active")), + List.of()); + aaChannelProcessor.process(List.of(channel)); + + verify(archiverService, never()).getAAPolicies(anyString()); + } + + @Test + void testProcessorInfoShowsCacheMetadata() { + ChannelProcessorInfo info = aaChannelProcessor.processorInfo(); + + assertNotEquals("never", info.properties().get("lastPolicyRefresh")); + assertTrue(info.properties().get("cachedPoliciesPerArchiver").contains("default=")); + assertEquals("3600", info.properties().get("policyCacheRefreshIntervalSeconds")); + } + + @Test + void testScheduledRefreshUpdatesCache() { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("p1", "p2", "p3")); + clearInvocations(archiverService); + + aaChannelProcessor.scheduledPolicyRefresh(); + + verify(archiverService).getAAPolicies(anyString()); + String cachedPoliciePerArchiver = + aaChannelProcessor.processorInfo().properties().get("cachedPoliciesPerArchiver"); + assertTrue(cachedPoliciePerArchiver.contains("default=ArchiverPolicies[policies=[p1, p2, p3]")); + } +} diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java index 86b4f49c..6833493c 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java @@ -1,9 +1,5 @@ package org.phoebus.channelfinder.processors.aa; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; - import java.util.List; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -11,25 +7,17 @@ import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.service.external.ArchiverService; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( locations = "classpath:application_aa_proc_test.properties", properties = "aa.auto_pause=pvStatus") -class AAChannelProcessorStatusPauseIT { - - @Autowired AAChannelProcessor aaChannelProcessor; - - @MockitoBean ArchiverService archiverService; +class AAChannelProcessorStatusPauseIT extends AAChannelProcessorBaseIT { private static Stream processNoPauseSource() { - return Stream.of( Arguments.of( new Channel( @@ -48,7 +36,6 @@ private static Stream processNoPauseSource() { void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) throws JacksonException { - paramableAAChannelProcessorTest( - archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); + paramableAAChannelProcessorTest(List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java index 1aeafff0..d3567fcb 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java @@ -1,9 +1,5 @@ package org.phoebus.channelfinder.processors.aa; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; -import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; - import java.util.List; import java.util.stream.Stream; import org.junit.jupiter.params.ParameterizedTest; @@ -11,25 +7,17 @@ import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; -import org.phoebus.channelfinder.service.external.ArchiverService; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.webmvc.test.autoconfigure.WebMvcTest; import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.bean.override.mockito.MockitoBean; import tools.jackson.core.JacksonException; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( locations = "classpath:application_aa_proc_test.properties", properties = "aa.auto_pause=archive") -class AAChannelProcessorTagPauseIT { - - @Autowired AAChannelProcessor aaChannelProcessor; - - @MockitoBean ArchiverService archiverService; +class AAChannelProcessorTagPauseIT extends AAChannelProcessorBaseIT { private static Stream processNoPauseSource() { - return Stream.of( Arguments.of( new Channel( @@ -52,7 +40,6 @@ private static Stream processNoPauseSource() { void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) throws JacksonException { - paramableAAChannelProcessorTest( - archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); + paramableAAChannelProcessorTest(List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java index 888eb7f0..909561d7 100644 --- a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -306,9 +306,7 @@ void testGetAAPoliciesInvalidResponse() { .andExpect(method(HttpMethod.GET)) .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); - List result = archiverService.getAAPolicies(ARCHIVER_URL); - - assertTrue(result.isEmpty()); + assertThrows(ArchiverServiceException.class, () -> archiverService.getAAPolicies(ARCHIVER_URL)); } @Test