From e130e4c4283c5369bae3400582bf0b993f1304a9 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 21 May 2026 14:54:07 +0200 Subject: [PATCH 1/5] fix(aa): propagate status fetch failures; skip archiver to prevent spurious ARCHIVE MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, a failed HTTP status batch silently returned an empty list. Those PVs were then treated as "not archived" and submitted for archiving on every subsequent run, spamming the archiver with duplicate requests. ArchiverService now throws ArchiverServiceException on status fetch failure. AAChannelProcessor catches it in getArchiveActions() and returns null, causing submitToArchivers() to skip that archiver entirely for the run. Also renames getStatusesFromPvListQuery/Body → getStatusesViaGet/Post to reflect their transport semantics Co-Authored-By: Claude Sonnet 4.6 --- .../configuration/AAChannelProcessor.java | 34 +++++++++++++------ .../service/external/ArchiverService.java | 17 +++------- .../service/external/ArchiverServiceTest.java | 11 +++--- 3 files changed, 33 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 7c03fefd..a5de3cb1 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -14,6 +14,7 @@ import org.apache.commons.lang3.StringUtils; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.exceptions.ArchiverServiceException; import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; @@ -182,8 +183,10 @@ private long submitToArchivers( } Map pvMap = e.getValue().stream().collect(Collectors.toMap(ArchivePVOptions::getPv, pv -> pv)); - count += - archiverService.configureAA(getArchiveActions(pvMap, archiverInfo), archiverInfo.url()); + Optional>> actions = + getArchiveActions(pvMap, archiverInfo); + if (actions.isEmpty()) continue; + count += archiverService.configureAA(actions.get(), archiverInfo.url()); } return count; } @@ -239,10 +242,10 @@ private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) { return ArchiveAction.NONE; } - private Map> getArchiveActions( + private Optional>> getArchiveActions( Map archivePVS, ArchiverInfo archiverInfo) { if (archiverInfo == null) { - return Map.of(); + return Optional.of(Map.of()); } logger.log( @@ -257,13 +260,24 @@ private Map> getArchiveActions( .forEach(archiveAction -> result.put(archiveAction, new ArrayList<>())); // Don't request to archive an empty list. if (archivePVS.isEmpty()) { - return result; + return Optional.of(result); } List pvList = new ArrayList<>(archivePVS.keySet()); - List> statuses = - postSupportArchivers.contains(archiverInfo.alias()) - ? archiverService.getStatusesViaPost(archiverInfo.url(), pvList) - : archiverService.getStatusesViaGet(archiverInfo.url(), pvList); + List> statuses; + try { + statuses = + postSupportArchivers.contains(archiverInfo.alias()) + ? archiverService.getStatusesViaPost(archiverInfo.url(), pvList) + : archiverService.getStatusesViaGet(archiverInfo.url(), pvList); + } catch (ArchiverServiceException e) { + logger.log( + Level.WARNING, + () -> + String.format( + "Status fetch failed for archiver '%s'; skipping %d PVs to avoid spurious ARCHIVE submissions: %s", + archiverInfo.alias(), archivePVS.size(), e.getMessage())); + return Optional.empty(); + } logger.log( Level.FINER, () -> @@ -301,7 +315,7 @@ private Map> getArchiveActions( List archivePVOptionsList = result.get(action); archivePVOptionsList.add(archivePVOptions); }); - return result; + return Optional.of(result); } private ArchivePVOptions createArchivePV( diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 9febf16d..5da69fd9 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -87,18 +87,13 @@ private List> getStatusesViaGetBatch(String archiverURL, Lis .queryParam(StatusResponseKey.PV.key(), String.join(",", pvs)) .build() .toUri(); - try { List> result = client.get().uri(pvStatusURI).retrieve().body(new ParameterizedTypeReference<>() {}); return result != null ? result : List.of(); } catch (Exception e) { - logger.log( - Level.WARNING, - String.format( - "There was an error getting a response with URI: %s. Error: %s", - uriString, e.getMessage())); - return List.of(); + throw new ArchiverServiceException( + String.format("Failed GET status query to %s: %s", uriString, e.getMessage()), e); } } @@ -115,12 +110,8 @@ public List> getStatusesViaPost(String archiverURL, List() {}); return result != null ? result : List.of(); } catch (Exception e) { - logger.log( - Level.WARNING, - String.format( - "There was an error getting a response with URI: %s. Error: %s", - uriString, e.getMessage())); - return List.of(); + throw new ArchiverServiceException( + String.format("Failed POST status query to %s: %s", uriString, e.getMessage()), e); } } diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java index dd0c73d7..82bba5f6 100644 --- a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -102,9 +102,8 @@ void testGetStatusesGetInvalidResponse() { .andExpect(method(HttpMethod.GET)) .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); - List> result = archiverService.getStatusesViaGet(ARCHIVER_URL, pvs); - - assertTrue(result.isEmpty()); + assertThrows( + ArchiverServiceException.class, () -> archiverService.getStatusesViaGet(ARCHIVER_URL, pvs)); } @Test @@ -116,9 +115,9 @@ void testGetStatusesPostInvalidResponse() { .andExpect(method(HttpMethod.POST)) .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); - List> result = archiverService.getStatusesViaPost(ARCHIVER_URL, pvs); - - assertTrue(result.isEmpty()); + assertThrows( + ArchiverServiceException.class, + () -> archiverService.getStatusesViaPost(ARCHIVER_URL, pvs)); } @Test From c6d9791a73d43f7d0b8d41e5c35acff344b60985 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 25 May 2026 09:19:55 +0200 Subject: [PATCH 2/5] fix(aa): add connect timeout to ArchiverService so unreachable hosts fail fast Without a connect timeout, TCP SYN drops (firewall, no route) stall for the OS default (~2 min) before returning. Setting connect timeout equal to the configured read timeout ensures both slow and unreachable archivers fail at the same bounded deadline. Co-Authored-By: Claude Sonnet 4.6 --- .../service/external/ArchiverService.java | 1 + .../service/external/ArchiverServiceTest.java | 36 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 5da69fd9..134ffa99 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -61,6 +61,7 @@ public ArchiverService( @Value("${aa.timeout_seconds:15}") int timeoutSeconds, RestClient.Builder builder) { SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); factory.setReadTimeout(timeoutSeconds * 1000); + factory.setConnectTimeout(timeoutSeconds * 1000); this.client = builder.requestFactory(factory).build(); } diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java index 82bba5f6..38dbd2c8 100644 --- a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -8,6 +8,8 @@ import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo; import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess; +import java.io.IOException; +import java.net.ServerSocket; import java.util.EnumMap; import java.util.List; import java.util.Map; @@ -367,4 +369,38 @@ void testSubmitActionWithRealResponseArchive() throws JacksonException { assertEquals(1, successfulPvs.size()); assertTrue(successfulPvs.contains("PV1")); } + + @Test + void testRequestTimesOutAtConfiguredTimeout() throws IOException { + int timeoutSeconds = 1; + + // Accept the TCP connection but never send a response, simulating a hung archiver host. + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + Thread serverThread = + new Thread( + () -> { + try { + serverSocket.accept(); + } catch (IOException _) { + // Ignored + } + }); + serverThread.setDaemon(true); + serverThread.start(); + + RestClient.Builder builder = RestClient.builder(); + ArchiverService service = new ArchiverService(timeoutSeconds, builder); + + long start = System.currentTimeMillis(); + List successfulPvs = List.of("pv1"); + assertThrows( + ArchiverServiceException.class, + () -> service.getStatusesViaGet("http://localhost:" + port, successfulPvs)); + long elapsedMs = System.currentTimeMillis() - start; + + // Should timeout at ~1s — well short of the OS TCP default (~2 minutes) + assertTrue(elapsedMs < 10_000, "Expected timeout within 10s, elapsed: " + elapsedMs + "ms"); + } + } } From ac6a65c45f0de5d320b807196f1db2f3f95eb62b Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 1 Jun 2026 12:35:33 +0200 Subject: [PATCH 3/5] Swap to Optional.empty() Makes clearer the response, and avoids jumping into the empty map --- .../phoebus/channelfinder/configuration/AAChannelProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index a5de3cb1..b5c95639 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -245,7 +245,7 @@ private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) { private Optional>> getArchiveActions( Map archivePVS, ArchiverInfo archiverInfo) { if (archiverInfo == null) { - return Optional.of(Map.of()); + return Optional.empty(); } logger.log( From 59d74c78c6e4e657061561814f32f30444405457 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 1 Jun 2026 12:42:55 +0200 Subject: [PATCH 4/5] test(aa): add a true connection-timeout test; split connect/read timeouts Point the connect-timeout test at TEST-NET-1 (192.0.2.1) so the SYN is dropped and the connect timeout actually fires, rather than a closed local port that returns connection-refused instantly. Give the connect and read timeouts distinct values so each test's elapsed time proves which timeout fired. Add aa.connect_timeout_seconds (default 5s) for fail-fast on unreachable archivers; aa.timeout_seconds remains the read timeout, unchanged. Co-Authored-By: Claude Sonnet 4.6 --- .../service/external/ArchiverService.java | 8 ++-- src/main/resources/application.properties | 2 + .../service/external/ArchiverServiceTest.java | 47 +++++++++++++++++-- 3 files changed, 50 insertions(+), 7 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 134ffa99..30a3a177 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -58,10 +58,12 @@ String key() { @Autowired public ArchiverService( - @Value("${aa.timeout_seconds:15}") int timeoutSeconds, RestClient.Builder builder) { + @Value("${aa.connect_timeout_seconds:5}") int connectTimeoutSeconds, + @Value("${aa.timeout_seconds:15}") int readTimeoutSeconds, + RestClient.Builder builder) { SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); - factory.setReadTimeout(timeoutSeconds * 1000); - factory.setConnectTimeout(timeoutSeconds * 1000); + factory.setConnectTimeout(connectTimeoutSeconds * 1000); + factory.setReadTimeout(readTimeoutSeconds * 1000); this.client = builder.requestFactory(factory).build(); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 1002887a..373a2155 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -143,6 +143,8 @@ aa.pva=false aa.archive_property_name=archive aa.archiver_property_name=archiver aa.timeout_seconds=15 +# Connect timeout (seconds); short so unreachable archivers fail fast. +aa.connect_timeout_seconds=5 # Comma-separated list of archivers to use post support aa.post_support= diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java index 38dbd2c8..888eb7f0 100644 --- a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -371,8 +371,9 @@ void testSubmitActionWithRealResponseArchive() throws JacksonException { } @Test - void testRequestTimesOutAtConfiguredTimeout() throws IOException { - int timeoutSeconds = 1; + void testRequestTimesOutAtConfiguredReadTimeout() throws IOException { + int connectTimeoutSeconds = 30; + int readTimeoutSeconds = 1; // Accept the TCP connection but never send a response, simulating a hung archiver host. try (ServerSocket serverSocket = new ServerSocket(0)) { @@ -390,7 +391,8 @@ void testRequestTimesOutAtConfiguredTimeout() throws IOException { serverThread.start(); RestClient.Builder builder = RestClient.builder(); - ArchiverService service = new ArchiverService(timeoutSeconds, builder); + ArchiverService service = + new ArchiverService(connectTimeoutSeconds, readTimeoutSeconds, builder); long start = System.currentTimeMillis(); List successfulPvs = List.of("pv1"); @@ -399,8 +401,45 @@ void testRequestTimesOutAtConfiguredTimeout() throws IOException { () -> service.getStatusesViaGet("http://localhost:" + port, successfulPvs)); long elapsedMs = System.currentTimeMillis() - start; - // Should timeout at ~1s — well short of the OS TCP default (~2 minutes) + // The connection is accepted instantly, so this can only be the read timeout. Elapsing + // at ~1s — far below the 30s connect timeout — proves it was the read timeout that + // fired, and well short of the OS TCP default (~2 minutes). + assertTrue( + elapsedMs >= readTimeoutSeconds * 1000L, + "Expected to wait for the read timeout, elapsed: " + elapsedMs + "ms"); assertTrue(elapsedMs < 10_000, "Expected timeout within 10s, elapsed: " + elapsedMs + "ms"); } } + + @Test + void testConnectionTimesOutAtConfiguredConnectTimeout() { + int connectTimeoutSeconds = 1; + int readTimeoutSeconds = 30; + RestClient.Builder builder = RestClient.builder(); + ArchiverService service = + new ArchiverService(connectTimeoutSeconds, readTimeoutSeconds, builder); + + // 192.0.2.0/24 is TEST-NET-1 (RFC 5737): reserved and unrouted, so the TCP SYN is + // silently dropped and the connect attempt hangs until the connect timeout fires. + // A closed local port would instead be refused (RST) instantly, never exercising the + // connect timeout — this address ensures we test connection, not response, timeout. + long start = System.currentTimeMillis(); + List pvs = List.of("pv1"); + assertThrows( + ArchiverServiceException.class, + () -> service.getStatusesViaGet("http://192.0.2.1:81", pvs)); + long elapsedMs = System.currentTimeMillis() - start; + + // Distinct timeouts make this self-distinguishing: elapsing at ~1s — far below the 30s + // read timeout — proves it was the connect timeout that fired, not the read timeout, and + // not the OS TCP default (~2 minutes). The lower bound rules out an instant refusal. + assertTrue( + elapsedMs >= connectTimeoutSeconds * 1000L, + "Expected to wait for the connect timeout, elapsed: " + elapsedMs + "ms"); + assertTrue( + elapsedMs < 10_000, + "Expected to fail at the connect timeout (not the 30s read timeout), elapsed: " + + elapsedMs + + "ms"); + } } From f0a18623d83732f118e390736179598ae26908a1 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 2 Jun 2026 09:12:09 +0200 Subject: [PATCH 5/5] Swap to if without a continue --- .../channelfinder/configuration/AAChannelProcessor.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index b5c95639..995c5f1d 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -185,8 +185,9 @@ private long submitToArchivers( e.getValue().stream().collect(Collectors.toMap(ArchivePVOptions::getPv, pv -> pv)); Optional>> actions = getArchiveActions(pvMap, archiverInfo); - if (actions.isEmpty()) continue; - count += archiverService.configureAA(actions.get(), archiverInfo.url()); + if (!actions.isEmpty()) { + count += archiverService.configureAA(actions.get(), archiverInfo.url()); + } } return count; }