Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package org.phoebus.channelfinder.configuration;

import jakarta.annotation.PostConstruct;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand All @@ -20,10 +24,12 @@
import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction;
import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions;
import org.phoebus.channelfinder.service.model.archiver.aa.ArchiverInfo;
import org.phoebus.channelfinder.service.model.archiver.aa.ArchiverPolicies;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import tools.jackson.core.JacksonException;

/**
Expand Down Expand Up @@ -69,6 +75,12 @@ public class AAChannelProcessor implements ChannelProcessor {

@Autowired private ArchiverService archiverService;

@Value("${aa.policy_refresh_interval_seconds:3600}")
private long policyRefreshIntervalSeconds;

private volatile Map<String, ArchiverPolicies> cachedPolicies = Map.of();
private volatile Instant lastPolicyRefresh;

@Override
public boolean enabled() {
return aaEnabled;
Expand All @@ -79,20 +91,41 @@ public void setEnabled(boolean enabled) {
this.aaEnabled = enabled;
}

@PostConstruct
public void initPolicyCache() {
refreshPolicies();
}

@Scheduled(
fixedDelayString = "${aa.policy_refresh_interval_seconds:3600}",
timeUnit = TimeUnit.SECONDS)
public void scheduledPolicyRefresh() {
refreshPolicies();
}

@Override
public ChannelProcessorInfo processorInfo() {
String cachedPoliciesStringValue = cachedPoliciesRepresentation(cachedPolicies);
return new ChannelProcessorInfo(
"AAChannelProcessor",
aaEnabled,
Map.of(
"archiveProperty",
"archivePropertyName",
archivePropertyName,
"archiverProperty",
"archiverPropertyName",
archiverPropertyName,
"Archivers",
"archiverURLs",
aaURLs.keySet().toString(),
"AutoPauseOn",
autoPauseOptions.toString()));
"autoPauseOptions",
autoPauseOptions.toString(),
"postSupportArchivers",
postSupportArchivers.toString(),
"policyCacheRefreshIntervalSeconds",
String.valueOf(policyRefreshIntervalSeconds),
"lastPolicyRefresh",
lastPolicyRefresh == null ? "never" : lastPolicyRefresh.toString(),
"cachedPoliciesPerArchiver",
cachedPoliciesStringValue.isEmpty() ? "none" : cachedPoliciesStringValue));
}

/**
Expand All @@ -111,7 +144,7 @@ public long process(List<Channel> channels) throws JacksonException {
return 0;
}

Map<String, ArchiverInfo> archiversInfo = getArchiversInfo(aaURLs);
Map<String, ArchiverInfo> archiversInfo = getArchiversInfoFromCache();
if (archiversInfo.isEmpty()) {
logger.log(
Level.WARNING,
Expand Down Expand Up @@ -332,14 +365,73 @@ private ArchivePVOptions createArchivePV(
return newArchiverPV;
}

private Map<String, ArchiverInfo> getArchiversInfo(Map<String, String> aaURLs) {
private Map<String, ArchiverInfo> getArchiversInfoFromCache() {
Map<String, ArchiverPolicies> snapshot = cachedPolicies;
return aaURLs.entrySet().stream()
.filter(aa -> !StringUtils.isEmpty(aa.getValue()))
.filter(e -> !StringUtils.isEmpty(e.getValue()))
.filter(
e -> {
if (!snapshot.containsKey(e.getKey())) {
logger.log(
Level.WARNING,
() ->
String.format(
"Archiver '%s' has no cached policies (unreachable at last refresh); skipping.",
e.getKey()));
return false;
}
return true;
})
.collect(
Collectors.toMap(
Map.Entry::getKey,
aa ->
e ->
new ArchiverInfo(
aa.getKey(), aa.getValue(), archiverService.getAAPolicies(aa.getValue()))));
e.getKey(), e.getValue(), snapshot.get(e.getKey()).policies())));
}

private static String cachedPoliciesRepresentation(Map<String, ArchiverPolicies> policies) {
return policies.entrySet().stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(", "));
}

private void refreshPolicies() {
if (aaURLs.isEmpty()) {
logger.log(Level.FINE, "No archivers configured; skipping policy cache refresh.");
return;
}
Map<String, ArchiverPolicies> current = cachedPolicies;
Map<String, ArchiverPolicies> updated = new HashMap<>(current);
List<String> changed = new ArrayList<>();

for (Map.Entry<String, String> entry : aaURLs.entrySet()) {
if (StringUtils.isEmpty(entry.getValue())) continue;
String alias = entry.getKey();
try {
List<String> fresh = archiverService.getAAPolicies(entry.getValue());
ArchiverPolicies existing = current.get(alias);
if (existing == null || !existing.policies().equals(fresh)) {
updated.put(alias, new ArchiverPolicies(fresh));
changed.add(alias);
}
} catch (ArchiverServiceException ex) {
logger.log(
Level.WARNING,
() ->
String.format(
"Policy fetch failed for archiver '%s'; retaining cached policies: %s",
alias, ex.getMessage()));
}
}

lastPolicyRefresh = Instant.now();
if (!changed.isEmpty()) {
cachedPolicies = Collections.unmodifiableMap(updated);
logger.log(
Level.INFO, "AA policy cache updated: {0}", cachedPoliciesRepresentation(cachedPolicies));
} else {
logger.log(Level.FINE, "AA policy cache unchanged after refresh.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,14 @@ public List<String> getAAPolicies(String aaURL) {
.retrieve()
.body(new ParameterizedTypeReference<>() {});
if (policyMap == null) {
return List.of();
throw new ArchiverServiceException("Null response from " + uriString);
}
return new ArrayList<>(policyMap.keySet());
} catch (ArchiverServiceException e) {
throw e;
} catch (Exception e) {
// problem collecting policies from AA, so warn and return empty list
logger.log(
Level.WARNING,
() -> "Could not get AA policies list from " + aaURL + ": " + e.getMessage());
return List.of();
throw new ArchiverServiceException(
"Could not get AA policies from " + aaURL + ": " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.phoebus.channelfinder.service.model.archiver.aa;

import java.util.List;

public record ArchiverPolicies(List<String> policies) {}
2 changes: 2 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ aa.archiver_property_name=archiver
aa.timeout_seconds=15
# Connect timeout (seconds); short so unreachable archivers fail fast.
aa.connect_timeout_seconds=5
# Interval in seconds between automatic AA policy cache refreshes (default: 1 hour)
aa.policy_refresh_interval_seconds=3600

# Comma-separated list of archivers to use post support
aa.post_support=
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.phoebus.channelfinder.processors.aa;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.mockito.ArgumentCaptor;
import org.phoebus.channelfinder.configuration.AAChannelProcessor;
import org.phoebus.channelfinder.entity.Channel;
import org.phoebus.channelfinder.entity.Property;
import org.phoebus.channelfinder.service.external.ArchiverService;
import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction;
import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
import tools.jackson.core.JacksonException;

abstract class AAChannelProcessorBaseIT {

protected static Property archiveProperty = new Property("archive", "owner", "default");
protected static Property activeProperty = new Property("pvStatus", "owner", "Active");
protected static Property inactiveProperty = new Property("pvStatus", "owner", "Inactive");

@MockitoBean protected ArchiverService archiverService;
@Autowired protected AAChannelProcessor aaChannelProcessor;

@BeforeEach
void primeCache() {
when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy"));
aaChannelProcessor.scheduledPolicyRefresh();
}

protected void paramableAAChannelProcessorTest(
List<Channel> channels, String archiveStatus, String archiverEndpoint)
throws JacksonException {
if (!archiveStatus.isEmpty()) {
List<Map<String, String>> archivePVStatuses =
channels.stream()
.map(channel -> Map.of("pvName", channel.getName(), "status", archiveStatus))
.toList();
when(archiverService.getStatusesViaGet(anyString(), anyList())).thenReturn(archivePVStatuses);
}

if (!archiverEndpoint.isEmpty()) {
when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) channels.size());
} else {
when(archiverService.configureAA(anyMap(), anyString())).thenReturn(0L);
}

long count = aaChannelProcessor.process(channels);
assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size());

if (!archiveStatus.isEmpty()) {
verify(archiverService).getStatusesViaGet(anyString(), anyList());
}

if (!archiverEndpoint.isEmpty()) {
ArgumentCaptor<Map<ArchiveAction, List<ArchivePVOptions>>> captor =
ArgumentCaptor.forClass(Map.class);
verify(archiverService).configureAA(captor.capture(), anyString());
Map<ArchiveAction, List<ArchivePVOptions>> map = captor.getValue();

ArchiveAction expectedAction = getActionFromEndpoint(archiverEndpoint);
if (expectedAction != null) {
assertTrue(map.containsKey(expectedAction));
List<ArchivePVOptions> options = map.get(expectedAction);
assertFalse(options.isEmpty());
}
}
}

private static ArchiveAction getActionFromEndpoint(String endpoint) {
if (endpoint.contains("resumeArchivingPV")) return ArchiveAction.RESUME;
if (endpoint.contains("pauseArchivingPV")) return ArchiveAction.PAUSE;
if (endpoint.contains("archivePV")) return ArchiveAction.ARCHIVE;
return null;
}
}
Loading
Loading