From 60593aa286b16a0b30d6455a76914433e2090073 Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Sun, 31 May 2026 23:34:06 +0800 Subject: [PATCH 1/2] HDDS-10237. Dynamic reconfiguration of replication supervisor thread pool --- .../hadoop/ozone/HddsDatanodeService.java | 5 ++- .../replication/ReplicationSupervisor.java | 32 ++++++++++++------- .../TestReplicationSupervisor.java | 32 +++++++++++++++++++ 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java index 1f08dacc90eb..0e26a5dd4943 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java @@ -710,8 +710,11 @@ private String reconfigDeletingServiceWorkers(String value) { } private String reconfigReplicationStreamsLimit(String value) { + int poolSize = Integer.parseInt(value); getDatanodeStateMachine().getContainer().getReplicationServer() - .setPoolSize(Integer.parseInt(value)); + .setPoolSize(poolSize); + getDatanodeStateMachine().getSupervisor() + .setReplicationMaxStreams(poolSize); return value; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 8dee840db226..3184fb2ed2e0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -346,21 +346,31 @@ public int getMaxQueueSize() { public void nodeStateUpdated(HddsProtos.NodeOperationalState newState) { if (state.getAndSet(newState) != newState) { - int threadCount = replicationConfig.getReplicationMaxStreams(); - int newMaxQueueSize = datanodeConfig.getCommandQueueLimit(); + resize(newState); + } + } - if (isMaintenance(newState) || isDecommission(newState)) { - threadCount = replicationConfig.scaleOutOfServiceLimit(threadCount); - newMaxQueueSize = - replicationConfig.scaleOutOfServiceLimit(newMaxQueueSize); - } + public void setReplicationMaxStreams(int replicationMaxStreams) { + replicationConfig.setReplicationMaxStreams(replicationMaxStreams); + resize(state.get()); + } - LOG.info("Node state updated to {}, scaling executor pool size to {}", - newState, threadCount); + private void resize(HddsProtos.NodeOperationalState nodeState) { + int threadCount = replicationConfig.getReplicationMaxStreams(); + int newMaxQueueSize = datanodeConfig.getCommandQueueLimit(); - maxQueueSize = newMaxQueueSize; - executorThreadUpdater.accept(threadCount); + if (isMaintenance(nodeState) || isDecommission(nodeState)) { + threadCount = replicationConfig.scaleOutOfServiceLimit(threadCount); + newMaxQueueSize = + replicationConfig.scaleOutOfServiceLimit(newMaxQueueSize); } + + LOG.info("Scaling replication supervisor for node state {} to executor " + + "pool size {} and queue size {}", nodeState, threadCount, + newMaxQueueSize); + + maxQueueSize = newMaxQueueSize; + executorThreadUpdater.accept(threadCount); } /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index abfef6fbffdc..1ac303e833c0 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -20,6 +20,7 @@ import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; @@ -1139,6 +1140,37 @@ public void poolSizeCanBeDecreased() { } } + @ContainerLayoutTestInfo.ContainerTest + public void poolSizeCanBeUpdatedByReplicationStreamsLimitReconfiguration() { + final int replicationMaxStreams = 5; + ReplicationServer.ReplicationConfig repConf = + new ReplicationServer.ReplicationConfig(); + repConf.setReplicationMaxStreams(replicationMaxStreams); + + AtomicInteger threadPoolSize = new AtomicInteger(); + + ReplicationSupervisor rs = ReplicationSupervisor.newBuilder() + .executor(new DiscardingExecutorService()) + .executorThreadUpdater(threadPoolSize::set) + .replicationConfig(repConf) + .build(); + + rs.nodeStateUpdated(IN_SERVICE); + assertEquals(replicationMaxStreams, threadPoolSize.get()); + + rs.setReplicationMaxStreams(7); + assertEquals(7, threadPoolSize.get()); + + rs.nodeStateUpdated(DECOMMISSIONING); + assertEquals(14, threadPoolSize.get()); + + rs.setReplicationMaxStreams(3); + assertEquals(6, threadPoolSize.get()); + + rs.nodeStateUpdated(IN_SERVICE); + assertEquals(3, threadPoolSize.get()); + } + @ContainerLayoutTestInfo.ContainerTest public void testMaxQueueSize() { List datanodes = new ArrayList<>(); From f53cafbaccc36666487b6b0263187c045ba8d823 Mon Sep 17 00:00:00 2001 From: Russole <850905junior@gmail.com> Date: Tue, 2 Jun 2026 21:50:38 +0800 Subject: [PATCH 2/2] Use config value for out-of-service test expectation --- .../container/replication/TestReplicationSupervisor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 1ac303e833c0..a8b590e671e8 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -1162,10 +1162,10 @@ public void poolSizeCanBeUpdatedByReplicationStreamsLimitReconfiguration() { assertEquals(7, threadPoolSize.get()); rs.nodeStateUpdated(DECOMMISSIONING); - assertEquals(14, threadPoolSize.get()); + assertEquals(repConf.scaleOutOfServiceLimit(7), threadPoolSize.get()); rs.setReplicationMaxStreams(3); - assertEquals(6, threadPoolSize.get()); + assertEquals(repConf.scaleOutOfServiceLimit(3), threadPoolSize.get()); rs.nodeStateUpdated(IN_SERVICE); assertEquals(3, threadPoolSize.get());