Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -69,8 +69,8 @@ public final class XceiverServerDomainSocket implements XceiverServerSpi, Runnab
private DomainSocket domainSocket;
private final ConfigurationSource config;
private final String threadPrefix;
private final HashMap<DomainPeer, Thread> peers = new HashMap<>();
private final HashMap<DomainPeer, Receiver> peersReceiver = new HashMap<>();
private final ConcurrentHashMap<DomainPeer, Thread> peers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<DomainPeer, Receiver> peersReceiver = new ConcurrentHashMap<>();
private int readTimeoutMs;
private int writeTimeoutMs;
private final ThreadPoolExecutor readExecutors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ COMPOSE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
export COMPOSE_DIR

export SECURITY_ENABLED=false
export OZONE_REPLICATION_FACTOR=3
export OZONE_REPLICATION_FACTOR=1
export SHORT_CIRCUIT_READ_ENABLED=true

# shellcheck source=/dev/null
source "$COMPOSE_DIR/../testlib.sh"

export COMPOSE_FILE=docker-compose.yaml:short-circuit.yaml

start_docker_env 3
start_docker_env 1

execute_robot_test datanode freon/read-write-key.robot
execute_robot_test datanode short-circuit

Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ Run rk with key validation through short-circuit channel
Pass Execution If '${SHORT_CIRCUIT_READ_ENABLED}' == 'false' Skip when short-circuit read is disabled

${keysCount} = BuiltIn.Set Variable 10
${result} = Execute ozone freon rk --numOfVolumes 1 --numOfBuckets 1 --numOfKeys ${keysCount} --keySize 1MB --replication-type=RATIS --factor=THREE --validate-writes --validation-channel=short-circuit
${result} = Execute ozone freon rk --numOfVolumes 1 --numOfBuckets 1 --numOfKeys ${keysCount} --keySize 1MB --replication-type=RATIS --factor=ONE --validate-writes --validation-channel=short-circuit
Should contain ${result} Status: Success
Should contain ${result} XceiverClientShortCircuit is created for pipeline
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

*** Settings ***
Documentation Test short-circuit read feature
Library OperatingSystem
Library String
Resource ../commonlib.robot
Test Timeout 5 minutes

*** Variables ***
${VOLUME} sc-vol
${BUCKET} sc-bucket
${KEY} sc-key

*** Test Cases ***
Test Short Circuit Read Metrics
Pass Execution If '${SHORT_CIRCUIT_READ_ENABLED}' == 'false' Skip when short-circuit read is disabled

${random} = Generate Random String 5 [NUMBERS]
${vol} = Set Variable ${VOLUME}${random}
${buck} = Set Variable ${BUCKET}${random}

# Create volume and bucket
Execute ozone sh volume create /${vol}
Execute ozone sh bucket create /${vol}/${buck}

# Create a dummy file
Execute dd if=/dev/urandom of=/tmp/testfile bs=1024 count=1024

# Put key
Execute ozone sh key put /${vol}/${buck}/${KEY} /tmp/testfile

# Get key
${result} = Execute ozone sh key get /${vol}/${buck}/${KEY} /tmp/downloadedfile

# Verify short circuit read metrics from datanode JMX
# The metric is numLocalGetBlock
${jmx_output} = Execute curl -s 'http://localhost:9882/jmx?qry=Hadoop:service=HddsDatanode,name=StorageContainerMetrics' | grep -o '"numLocalGetBlock" : [0-9]*' | awk -F: '{print $2}' | tr -d ' '
Should Be True ${jmx_output} > 0

# Clean up
Execute rm /tmp/testfile /tmp/downloadedfile
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,7 @@ public void run() {
try {
KeyValidate kv = validationQueue.poll(5, TimeUnit.SECONDS);
if (kv != null) {
long validationStartTime = System.nanoTime();
try (OzoneInputStream is = kv.bucket.readKey(kv.keyName)) {
dig.getMessageDigest().reset();
byte[] curDigest = dig.digest(is);
Expand All @@ -1271,6 +1272,7 @@ public void run() {
LOG.warn("Expected checksum: {}, Actual checksum: {}",
kv.digest, curDigest);
}
keyReadTime.addAndGet(System.nanoTime() - validationStartTime);
}
}
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* Tests {@link LocalChunkInputStream}.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestShortCircuitChunkInputStream extends TestChunkInputStream {
public class TestLocalChunkInputStream extends TestChunkInputStream {

@TempDir
private File dir;
Expand Down