diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerDomainSocket.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerDomainSocket.java index 2c1b19ad5bfb..ef5aa60b661d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerDomainSocket.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerDomainSocket.java @@ -21,9 +21,9 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.nio.channels.AsynchronousCloseException; -import java.util.HashMap; import java.util.List; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -69,8 +69,8 @@ public final class XceiverServerDomainSocket implements XceiverServerSpi, Runnab private DomainSocket domainSocket; private final ConfigurationSource config; private final String threadPrefix; - private final HashMap peers = new HashMap<>(); - private final HashMap peersReceiver = new HashMap<>(); + private final ConcurrentHashMap peers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap peersReceiver = new ConcurrentHashMap<>(); private int readTimeoutMs; private int writeTimeoutMs; private final ThreadPoolExecutor readExecutors; diff --git a/hadoop-ozone/dist/src/main/compose/ozone/test-short-circuit.sh b/hadoop-ozone/dist/src/main/compose/ozone/test-short-circuit.sh index bec702c2129b..c56341896f60 100755 --- a/hadoop-ozone/dist/src/main/compose/ozone/test-short-circuit.sh +++ b/hadoop-ozone/dist/src/main/compose/ozone/test-short-circuit.sh @@ -21,7 +21,7 @@ COMPOSE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" export COMPOSE_DIR export SECURITY_ENABLED=false -export OZONE_REPLICATION_FACTOR=3 +export OZONE_REPLICATION_FACTOR=1 export SHORT_CIRCUIT_READ_ENABLED=true # shellcheck source=/dev/null @@ -29,7 +29,8 @@ source "$COMPOSE_DIR/../testlib.sh" export COMPOSE_FILE=docker-compose.yaml:short-circuit.yaml -start_docker_env 3 +start_docker_env 1 execute_robot_test datanode freon/read-write-key.robot +execute_robot_test datanode short-circuit diff --git a/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot b/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot index fa6104c4c7e0..3db4bd4b7490 100644 --- a/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot +++ b/hadoop-ozone/dist/src/main/smoketest/freon/read-write-key.robot @@ -66,6 +66,6 @@ Run rk with key validation through short-circuit channel Pass Execution If '${SHORT_CIRCUIT_READ_ENABLED}' == 'false' Skip when short-circuit read is disabled ${keysCount} = BuiltIn.Set Variable 10 - ${result} = Execute ozone freon rk --numOfVolumes 1 --numOfBuckets 1 --numOfKeys ${keysCount} --keySize 1MB --replication-type=RATIS --factor=THREE --validate-writes --validation-channel=short-circuit + ${result} = Execute ozone freon rk --numOfVolumes 1 --numOfBuckets 1 --numOfKeys ${keysCount} --keySize 1MB --replication-type=RATIS --factor=ONE --validate-writes --validation-channel=short-circuit Should contain ${result} Status: Success Should contain ${result} XceiverClientShortCircuit is created for pipeline diff --git a/hadoop-ozone/dist/src/main/smoketest/short-circuit/short-circuit.robot b/hadoop-ozone/dist/src/main/smoketest/short-circuit/short-circuit.robot new file mode 100644 index 000000000000..a104d031159f --- /dev/null +++ b/hadoop-ozone/dist/src/main/smoketest/short-circuit/short-circuit.robot @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +*** Settings *** +Documentation Test short-circuit read feature +Library OperatingSystem +Library String +Resource ../commonlib.robot +Test Timeout 5 minutes + +*** Variables *** +${VOLUME} sc-vol +${BUCKET} sc-bucket +${KEY} sc-key + +*** Test Cases *** +Test Short Circuit Read Metrics + Pass Execution If '${SHORT_CIRCUIT_READ_ENABLED}' == 'false' Skip when short-circuit read is disabled + + ${random} = Generate Random String 5 [NUMBERS] + ${vol} = Set Variable ${VOLUME}${random} + ${buck} = Set Variable ${BUCKET}${random} + + # Create volume and bucket + Execute ozone sh volume create /${vol} + Execute ozone sh bucket create /${vol}/${buck} + + # Create a dummy file + Execute dd if=/dev/urandom of=/tmp/testfile bs=1024 count=1024 + + # Put key + Execute ozone sh key put /${vol}/${buck}/${KEY} /tmp/testfile + + # Get key + ${result} = Execute ozone sh key get /${vol}/${buck}/${KEY} /tmp/downloadedfile + + # Verify short circuit read metrics from datanode JMX + # The metric is numLocalGetBlock + ${jmx_output} = Execute curl -s 'http://localhost:9882/jmx?qry=Hadoop:service=HddsDatanode,name=StorageContainerMetrics' | grep -o '"numLocalGetBlock" : [0-9]*' | awk -F: '{print $2}' | tr -d ' ' + Should Be True ${jmx_output} > 0 + + # Clean up + Execute rm /tmp/testfile /tmp/downloadedfile diff --git a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java index b649f948617e..a156d176e45f 100644 --- a/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java +++ b/hadoop-ozone/freon/src/main/java/org/apache/hadoop/ozone/freon/RandomKeyGenerator.java @@ -1258,6 +1258,7 @@ public void run() { try { KeyValidate kv = validationQueue.poll(5, TimeUnit.SECONDS); if (kv != null) { + long validationStartTime = System.nanoTime(); try (OzoneInputStream is = kv.bucket.readKey(kv.keyName)) { dig.getMessageDigest().reset(); byte[] curDigest = dig.digest(is); @@ -1271,6 +1272,7 @@ public void run() { LOG.warn("Expected checksum: {}, Actual checksum: {}", kv.digest, curDigest); } + keyReadTime.addAndGet(System.nanoTime() - validationStartTime); } } } catch (IOException ex) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestShortCircuitChunkInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestLocalChunkInputStream.java similarity index 98% rename from hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestShortCircuitChunkInputStream.java rename to hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestLocalChunkInputStream.java index 5d0fd16cd6a2..a9d9f90335f1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestShortCircuitChunkInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestLocalChunkInputStream.java @@ -54,7 +54,7 @@ * Tests {@link LocalChunkInputStream}. */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class TestShortCircuitChunkInputStream extends TestChunkInputStream { +public class TestLocalChunkInputStream extends TestChunkInputStream { @TempDir private File dir;