Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.commons.lang3.StringUtils;
import org.phoebus.channelfinder.entity.Channel;
import org.phoebus.channelfinder.entity.Property;
import org.phoebus.channelfinder.exceptions.ArchiverServiceException;
import org.phoebus.channelfinder.service.external.ArchiverService;
import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo;
import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction;
Expand Down Expand Up @@ -182,8 +183,11 @@ private long submitToArchivers(
}
Map<String, ArchivePVOptions> pvMap =
e.getValue().stream().collect(Collectors.toMap(ArchivePVOptions::getPv, pv -> pv));
count +=
archiverService.configureAA(getArchiveActions(pvMap, archiverInfo), archiverInfo.url());
Optional<Map<ArchiveAction, List<ArchivePVOptions>>> actions =
getArchiveActions(pvMap, archiverInfo);
if (!actions.isEmpty()) {
count += archiverService.configureAA(actions.get(), archiverInfo.url());
}
}
return count;
}
Expand Down Expand Up @@ -239,10 +243,10 @@ private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) {
return ArchiveAction.NONE;
}

private Map<ArchiveAction, List<ArchivePVOptions>> getArchiveActions(
private Optional<Map<ArchiveAction, List<ArchivePVOptions>>> getArchiveActions(
Map<String, ArchivePVOptions> archivePVS, ArchiverInfo archiverInfo) {
if (archiverInfo == null) {
return Map.of();
return Optional.empty();
}

logger.log(
Expand All @@ -257,13 +261,24 @@ private Map<ArchiveAction, List<ArchivePVOptions>> getArchiveActions(
.forEach(archiveAction -> result.put(archiveAction, new ArrayList<>()));
// Don't request to archive an empty list.
if (archivePVS.isEmpty()) {
return result;
return Optional.of(result);
}
List<String> pvList = new ArrayList<>(archivePVS.keySet());
List<Map<String, String>> statuses =
postSupportArchivers.contains(archiverInfo.alias())
? archiverService.getStatusesViaPost(archiverInfo.url(), pvList)
: archiverService.getStatusesViaGet(archiverInfo.url(), pvList);
List<Map<String, String>> statuses;
try {
statuses =
postSupportArchivers.contains(archiverInfo.alias())
? archiverService.getStatusesViaPost(archiverInfo.url(), pvList)
: archiverService.getStatusesViaGet(archiverInfo.url(), pvList);
} catch (ArchiverServiceException e) {
logger.log(
Level.WARNING,
() ->
String.format(
"Status fetch failed for archiver '%s'; skipping %d PVs to avoid spurious ARCHIVE submissions: %s",
archiverInfo.alias(), archivePVS.size(), e.getMessage()));
return Optional.empty();
}
logger.log(
Level.FINER,
() ->
Expand Down Expand Up @@ -301,7 +316,7 @@ private Map<ArchiveAction, List<ArchivePVOptions>> getArchiveActions(
List<ArchivePVOptions> archivePVOptionsList = result.get(action);
archivePVOptionsList.add(archivePVOptions);
});
return result;
return Optional.of(result);
}

private ArchivePVOptions createArchivePV(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@ String key() {

@Autowired
public ArchiverService(
@Value("${aa.timeout_seconds:15}") int timeoutSeconds, RestClient.Builder builder) {
@Value("${aa.connect_timeout_seconds:5}") int connectTimeoutSeconds,
@Value("${aa.timeout_seconds:15}") int readTimeoutSeconds,
RestClient.Builder builder) {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setReadTimeout(timeoutSeconds * 1000);
factory.setConnectTimeout(connectTimeoutSeconds * 1000);
factory.setReadTimeout(readTimeoutSeconds * 1000);
this.client = builder.requestFactory(factory).build();
}

Expand All @@ -87,18 +90,13 @@ private List<Map<String, String>> getStatusesViaGetBatch(String archiverURL, Lis
.queryParam(StatusResponseKey.PV.key(), String.join(",", pvs))
.build()
.toUri();

try {
List<Map<String, String>> result =
client.get().uri(pvStatusURI).retrieve().body(new ParameterizedTypeReference<>() {});
return result != null ? result : List.of();
} catch (Exception e) {
logger.log(
Level.WARNING,
String.format(
"There was an error getting a response with URI: %s. Error: %s",
uriString, e.getMessage()));
return List.of();
throw new ArchiverServiceException(
String.format("Failed GET status query to %s: %s", uriString, e.getMessage()), e);
}
}

Expand All @@ -115,12 +113,8 @@ public List<Map<String, String>> getStatusesViaPost(String archiverURL, List<Str
.body(new ParameterizedTypeReference<>() {});
return result != null ? result : List.of();
} catch (Exception e) {
logger.log(
Level.WARNING,
String.format(
"There was an error getting a response with URI: %s. Error: %s",
uriString, e.getMessage()));
return List.of();
throw new ArchiverServiceException(
String.format("Failed POST status query to %s: %s", uriString, e.getMessage()), e);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ aa.pva=false
aa.archive_property_name=archive
aa.archiver_property_name=archiver
aa.timeout_seconds=15
# Connect timeout (seconds); short so unreachable archivers fail fast.
aa.connect_timeout_seconds=5

# Comma-separated list of archivers to use post support
aa.post_support=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo;
import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -102,9 +104,8 @@ void testGetStatusesGetInvalidResponse() {
.andExpect(method(HttpMethod.GET))
.andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON));

List<Map<String, String>> result = archiverService.getStatusesViaGet(ARCHIVER_URL, pvs);

assertTrue(result.isEmpty());
assertThrows(
ArchiverServiceException.class, () -> archiverService.getStatusesViaGet(ARCHIVER_URL, pvs));
}

@Test
Expand All @@ -116,9 +117,9 @@ void testGetStatusesPostInvalidResponse() {
.andExpect(method(HttpMethod.POST))
.andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON));

List<Map<String, String>> result = archiverService.getStatusesViaPost(ARCHIVER_URL, pvs);

assertTrue(result.isEmpty());
assertThrows(
ArchiverServiceException.class,
() -> archiverService.getStatusesViaPost(ARCHIVER_URL, pvs));
}

@Test
Expand Down Expand Up @@ -368,4 +369,77 @@ void testSubmitActionWithRealResponseArchive() throws JacksonException {
assertEquals(1, successfulPvs.size());
assertTrue(successfulPvs.contains("PV1"));
}

@Test
void testRequestTimesOutAtConfiguredReadTimeout() throws IOException {
int connectTimeoutSeconds = 30;
int readTimeoutSeconds = 1;

// Accept the TCP connection but never send a response, simulating a hung archiver host.
try (ServerSocket serverSocket = new ServerSocket(0)) {
int port = serverSocket.getLocalPort();
Thread serverThread =
new Thread(
() -> {
try {
serverSocket.accept();
} catch (IOException _) {
// Ignored
}
});
serverThread.setDaemon(true);
serverThread.start();

RestClient.Builder builder = RestClient.builder();
ArchiverService service =
new ArchiverService(connectTimeoutSeconds, readTimeoutSeconds, builder);

long start = System.currentTimeMillis();
List<String> successfulPvs = List.of("pv1");
assertThrows(
ArchiverServiceException.class,
() -> service.getStatusesViaGet("http://localhost:" + port, successfulPvs));
long elapsedMs = System.currentTimeMillis() - start;

// The connection is accepted instantly, so this can only be the read timeout. Elapsing
// at ~1s — far below the 30s connect timeout — proves it was the read timeout that
// fired, and well short of the OS TCP default (~2 minutes).
assertTrue(
elapsedMs >= readTimeoutSeconds * 1000L,
"Expected to wait for the read timeout, elapsed: " + elapsedMs + "ms");
assertTrue(elapsedMs < 10_000, "Expected timeout within 10s, elapsed: " + elapsedMs + "ms");
}
}

@Test
void testConnectionTimesOutAtConfiguredConnectTimeout() {
int connectTimeoutSeconds = 1;
int readTimeoutSeconds = 30;
RestClient.Builder builder = RestClient.builder();
ArchiverService service =
new ArchiverService(connectTimeoutSeconds, readTimeoutSeconds, builder);

// 192.0.2.0/24 is TEST-NET-1 (RFC 5737): reserved and unrouted, so the TCP SYN is
// silently dropped and the connect attempt hangs until the connect timeout fires.
// A closed local port would instead be refused (RST) instantly, never exercising the
// connect timeout — this address ensures we test connection, not response, timeout.
long start = System.currentTimeMillis();
List<String> pvs = List.of("pv1");
assertThrows(
ArchiverServiceException.class,
() -> service.getStatusesViaGet("http://192.0.2.1:81", pvs));
long elapsedMs = System.currentTimeMillis() - start;

// Distinct timeouts make this self-distinguishing: elapsing at ~1s — far below the 30s
// read timeout — proves it was the connect timeout that fired, not the read timeout, and
// not the OS TCP default (~2 minutes). The lower bound rules out an instant refusal.
assertTrue(
elapsedMs >= connectTimeoutSeconds * 1000L,
"Expected to wait for the connect timeout, elapsed: " + elapsedMs + "ms");
assertTrue(
elapsedMs < 10_000,
"Expected to fail at the connect timeout (not the 30s read timeout), elapsed: "
+ elapsedMs
+ "ms");
}
}
Loading