diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 7c03fefd..995c5f1d 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -14,6 +14,7 @@ import org.apache.commons.lang3.StringUtils; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.exceptions.ArchiverServiceException; import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; @@ -182,8 +183,11 @@ private long submitToArchivers( } Map pvMap = e.getValue().stream().collect(Collectors.toMap(ArchivePVOptions::getPv, pv -> pv)); - count += - archiverService.configureAA(getArchiveActions(pvMap, archiverInfo), archiverInfo.url()); + Optional>> actions = + getArchiveActions(pvMap, archiverInfo); + if (!actions.isEmpty()) { + count += archiverService.configureAA(actions.get(), archiverInfo.url()); + } } return count; } @@ -239,10 +243,10 @@ private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) { return ArchiveAction.NONE; } - private Map> getArchiveActions( + private Optional>> getArchiveActions( Map archivePVS, ArchiverInfo archiverInfo) { if (archiverInfo == null) { - return Map.of(); + return Optional.empty(); } logger.log( @@ -257,13 +261,24 @@ private Map> getArchiveActions( .forEach(archiveAction -> result.put(archiveAction, new ArrayList<>())); // Don't request to archive an empty list. if (archivePVS.isEmpty()) { - return result; + return Optional.of(result); } List pvList = new ArrayList<>(archivePVS.keySet()); - List> statuses = - postSupportArchivers.contains(archiverInfo.alias()) - ? archiverService.getStatusesViaPost(archiverInfo.url(), pvList) - : archiverService.getStatusesViaGet(archiverInfo.url(), pvList); + List> statuses; + try { + statuses = + postSupportArchivers.contains(archiverInfo.alias()) + ? archiverService.getStatusesViaPost(archiverInfo.url(), pvList) + : archiverService.getStatusesViaGet(archiverInfo.url(), pvList); + } catch (ArchiverServiceException e) { + logger.log( + Level.WARNING, + () -> + String.format( + "Status fetch failed for archiver '%s'; skipping %d PVs to avoid spurious ARCHIVE submissions: %s", + archiverInfo.alias(), archivePVS.size(), e.getMessage())); + return Optional.empty(); + } logger.log( Level.FINER, () -> @@ -301,7 +316,7 @@ private Map> getArchiveActions( List archivePVOptionsList = result.get(action); archivePVOptionsList.add(archivePVOptions); }); - return result; + return Optional.of(result); } private ArchivePVOptions createArchivePV( diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 9febf16d..30a3a177 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -58,9 +58,12 @@ String key() { @Autowired public ArchiverService( - @Value("${aa.timeout_seconds:15}") int timeoutSeconds, RestClient.Builder builder) { + @Value("${aa.connect_timeout_seconds:5}") int connectTimeoutSeconds, + @Value("${aa.timeout_seconds:15}") int readTimeoutSeconds, + RestClient.Builder builder) { SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); - factory.setReadTimeout(timeoutSeconds * 1000); + factory.setConnectTimeout(connectTimeoutSeconds * 1000); + factory.setReadTimeout(readTimeoutSeconds * 1000); this.client = builder.requestFactory(factory).build(); } @@ -87,18 +90,13 @@ private List> getStatusesViaGetBatch(String archiverURL, Lis .queryParam(StatusResponseKey.PV.key(), String.join(",", pvs)) .build() .toUri(); - try { List> result = client.get().uri(pvStatusURI).retrieve().body(new ParameterizedTypeReference<>() {}); return result != null ? result : List.of(); } catch (Exception e) { - logger.log( - Level.WARNING, - String.format( - "There was an error getting a response with URI: %s. Error: %s", - uriString, e.getMessage())); - return List.of(); + throw new ArchiverServiceException( + String.format("Failed GET status query to %s: %s", uriString, e.getMessage()), e); } } @@ -115,12 +113,8 @@ public List> getStatusesViaPost(String archiverURL, List() {}); return result != null ? result : List.of(); } catch (Exception e) { - logger.log( - Level.WARNING, - String.format( - "There was an error getting a response with URI: %s. Error: %s", - uriString, e.getMessage())); - return List.of(); + throw new ArchiverServiceException( + String.format("Failed POST status query to %s: %s", uriString, e.getMessage()), e); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 1002887a..373a2155 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -143,6 +143,8 @@ aa.pva=false aa.archive_property_name=archive aa.archiver_property_name=archiver aa.timeout_seconds=15 +# Connect timeout (seconds); short so unreachable archivers fail fast. +aa.connect_timeout_seconds=5 # Comma-separated list of archivers to use post support aa.post_support= diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java index dd0c73d7..888eb7f0 100644 --- a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -8,6 +8,8 @@ import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo; import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess; +import java.io.IOException; +import java.net.ServerSocket; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -102,9 +104,8 @@ void testGetStatusesGetInvalidResponse() { .andExpect(method(HttpMethod.GET)) .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); - List> result = archiverService.getStatusesViaGet(ARCHIVER_URL, pvs); - - assertTrue(result.isEmpty()); + assertThrows( + ArchiverServiceException.class, () -> archiverService.getStatusesViaGet(ARCHIVER_URL, pvs)); } @Test @@ -116,9 +117,9 @@ void testGetStatusesPostInvalidResponse() { .andExpect(method(HttpMethod.POST)) .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); - List> result = archiverService.getStatusesViaPost(ARCHIVER_URL, pvs); - - assertTrue(result.isEmpty()); + assertThrows( + ArchiverServiceException.class, + () -> archiverService.getStatusesViaPost(ARCHIVER_URL, pvs)); } @Test @@ -368,4 +369,77 @@ void testSubmitActionWithRealResponseArchive() throws JacksonException { assertEquals(1, successfulPvs.size()); assertTrue(successfulPvs.contains("PV1")); } + + @Test + void testRequestTimesOutAtConfiguredReadTimeout() throws IOException { + int connectTimeoutSeconds = 30; + int readTimeoutSeconds = 1; + + // Accept the TCP connection but never send a response, simulating a hung archiver host. + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + Thread serverThread = + new Thread( + () -> { + try { + serverSocket.accept(); + } catch (IOException _) { + // Ignored + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + RestClient.Builder builder = RestClient.builder(); + ArchiverService service = + new ArchiverService(connectTimeoutSeconds, readTimeoutSeconds, builder); + + long start = System.currentTimeMillis(); + List successfulPvs = List.of("pv1"); + assertThrows( + ArchiverServiceException.class, + () -> service.getStatusesViaGet("http://localhost:" + port, successfulPvs)); + long elapsedMs = System.currentTimeMillis() - start; + + // The connection is accepted instantly, so this can only be the read timeout. Elapsing + // at ~1s — far below the 30s connect timeout — proves it was the read timeout that + // fired, and well short of the OS TCP default (~2 minutes). + assertTrue( + elapsedMs >= readTimeoutSeconds * 1000L, + "Expected to wait for the read timeout, elapsed: " + elapsedMs + "ms"); + assertTrue(elapsedMs < 10_000, "Expected timeout within 10s, elapsed: " + elapsedMs + "ms"); + } + } + + @Test + void testConnectionTimesOutAtConfiguredConnectTimeout() { + int connectTimeoutSeconds = 1; + int readTimeoutSeconds = 30; + RestClient.Builder builder = RestClient.builder(); + ArchiverService service = + new ArchiverService(connectTimeoutSeconds, readTimeoutSeconds, builder); + + // 192.0.2.0/24 is TEST-NET-1 (RFC 5737): reserved and unrouted, so the TCP SYN is + // silently dropped and the connect attempt hangs until the connect timeout fires. + // A closed local port would instead be refused (RST) instantly, never exercising the + // connect timeout — this address ensures we test connection, not response, timeout. + long start = System.currentTimeMillis(); + List pvs = List.of("pv1"); + assertThrows( + ArchiverServiceException.class, + () -> service.getStatusesViaGet("http://192.0.2.1:81", pvs)); + long elapsedMs = System.currentTimeMillis() - start; + + // Distinct timeouts make this self-distinguishing: elapsing at ~1s — far below the 30s + // read timeout — proves it was the connect timeout that fired, not the read timeout, and + // not the OS TCP default (~2 minutes). The lower bound rules out an instant refusal. + assertTrue( + elapsedMs >= connectTimeoutSeconds * 1000L, + "Expected to wait for the connect timeout, elapsed: " + elapsedMs + "ms"); + assertTrue( + elapsedMs < 10_000, + "Expected to fail at the connect timeout (not the 30s read timeout), elapsed: " + + elapsedMs + + "ms"); + } }