Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_STALENODE_INTERVAL_DEFAULT =
"5m";

public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL =
"ozone.scm.pending.container.roll.interval";
public static final String OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT =
"5m";

public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT =
"ozone.scm.heartbeat.rpc-timeout";
public static final String OZONE_SCM_HEARTBEAT_RPC_TIMEOUT_DEFAULT =
Expand Down
11 changes: 11 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,17 @@
balances the amount of metadata.
</description>
</property>
<property>
<name>ozone.scm.pending.container.roll.interval</name>
<value>5m</value>
<tag>OZONE, SCM, PERFORMANCE, MANAGEMENT</tag>
<description>
The interval at which the two-window tumbling bucket for pending
container allocations rolls over per DataNode. Pending containers
that have not been confirmed within two intervals are automatically
aged out. Default is 5 minutes.
</description>
</property>
<property>
<name>ozone.scm.container.lock.stripes</name>
<value>512</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,17 @@ private ContainerInfo createContainer(Pipeline pipeline, String owner)
private ContainerInfo allocateContainer(final Pipeline pipeline,
final String owner)
throws IOException {
if (!pipelineManager.hasEnoughSpace(pipeline)) {
LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline);
return null;
}

final long uniqueId = sequenceIdGen.getNextId(SequenceIdType.containerId);
Preconditions.checkState(uniqueId > 0,
"Cannot allocate container, negative container id" +
" generated. %s.", uniqueId);
final ContainerID containerID = ContainerID.valueOf(uniqueId);

if (!pipelineManager.checkSpaceAndRecordAllocation(pipeline, containerID)) {
LOG.debug("Cannot allocate a new container because pipeline {} does not have enough space.", pipeline);
return null;
}

final ContainerInfoProto.Builder containerInfoBuilder = ContainerInfoProto
.newBuilder()
.setState(LifeCycleState.OPEN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hdds.scm.container.report.ContainerReportValidator;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
Expand Down Expand Up @@ -153,6 +154,7 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
containerReport.getReportsList();
final Set<ContainerID> expectedContainersInDatanode =
getNodeManager().getContainers(datanodeDetails);
DatanodeInfo datanodeInfo = datanodeDetails instanceof DatanodeInfo ? (DatanodeInfo) datanodeDetails : null;

for (ContainerReplicaProto replica : replicas) {
ContainerID cid = ContainerID.valueOf(replica.getContainerID());
Expand All @@ -175,6 +177,11 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
if (!alreadyInDn) {
// This is a new Container not in the nodeManager -> dn map yet
getNodeManager().addContainer(datanodeDetails, cid);
// Remove from pending tracker when container is added to DN
// This container was just confirmed for the first time on this DN
if (datanodeInfo != null) {
getNodeManager().removePendingAllocationForDatanode(datanodeInfo, cid);
}
}
if (container == null || ContainerReportValidator
.validate(container, datanodeDetails, replica)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hdds.scm.container.report.ContainerReportValidator;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
Expand Down Expand Up @@ -83,6 +84,7 @@ protected void processICR(IncrementalContainerReportFromDatanode report,
// issue between the container list in NodeManager and the replicas in
// ContainerManager.
synchronized (dd) {
DatanodeInfo datanodeInfo = dd instanceof DatanodeInfo ? (DatanodeInfo) dd : null;
for (ContainerReplicaProto replicaProto :
report.getReport().getReportList()) {
Object detailsForLogging = getDetailsForLogging(null, replicaProto, dd);
Expand All @@ -103,6 +105,9 @@ protected void processICR(IncrementalContainerReportFromDatanode report,
}
if (ContainerReportValidator.validate(container, dd, replicaProto)) {
processContainerReplica(dd, container, replicaProto, publisher, detailsForLogging);
if (datanodeInfo != null) {
getNodeManager().removePendingAllocationForDatanode(datanodeInfo, id);
}
}
success = true;
} catch (ContainerNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,23 @@ default int getAllNodeCount() {
DatanodeInfo getDatanodeInfo(DatanodeDetails dn);

/**
* True if the node can accept another container of the given size.
* Atomically checks if the datanode has space for a new container and records the allocation
* if space is available. This prevents race conditions where multiple threads check space
* concurrently and over-allocate.
*
* @param datanodeInfo node info of the receiving the allocation
* @param containerID the container being allocated
* @return true if space was available and allocation was recorded, false otherwise
*/
boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID);
boolean checkSpaceAndRecordAllocation(DatanodeInfo datanodeInfo, ContainerID containerID);

/**
* Records a pending container allocation for a single DataNode identified by its ID.
* Removes a pending container allocation from a datanode.
*
* @param datanodeID the ID of the DataNode receiving the allocation
* @param containerID the container being allocated
* @param datanodeInfo info about the datanode
* @param containerID the container to remove from pending
*/
void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID);
void removePendingAllocationForDatanode(DatanodeInfo datanodeInfo, ContainerID containerID);

/**
* Return the node stat of the specified datanode.
Expand Down Expand Up @@ -435,4 +441,9 @@ default void removeNode(DatanodeDetails datanodeDetails) throws NodeNotFoundExce
}

int openContainerLimit(List<DatanodeDetails> datanodes);

/**
* SCM-side tracker for container allocations not yet reported by datanodes.
*/
PendingContainerTracker getPendingContainerTracker();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
Expand Down Expand Up @@ -124,7 +125,7 @@ public class NodeStateManager implements Runnable, Closeable {
*/
private final long deadNodeIntervalMs;

private final long containerRollIntervalMs = 5 * 60 * 1000; //TODO
private final long containerRollIntervalMs;

/**
* The future is used to pause/unpause the scheduled checks.
Expand Down Expand Up @@ -214,6 +215,11 @@ public NodeStateManager(ConfigurationSource conf,
scmContext.getFinalizationCheckpoint()) &&
!layoutMatchCondition.test(layout);

containerRollIntervalMs = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL,
ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);

scheduleNextHealthCheck();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hadoop.hdds.scm.node;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -120,16 +121,6 @@ synchronized boolean contains(ContainerID containerID) {
return currentWindow.contains(containerID) || previousWindow.contains(containerID);
}

/**
* Add container to current window.
*/
synchronized boolean add(ContainerID containerID) {
boolean added = currentWindow.add(containerID);
LOG.debug("Recorded pending container {} on DataNode {}. Added={}, Total pending={}",
containerID, datanodeID, added, getCount());
return added;
}

/**
* Remove container from both windows.
*/
Expand All @@ -148,6 +139,34 @@ synchronized boolean remove(ContainerID containerID) {
synchronized int getCount() {
return currentWindow.size() + previousWindow.size();
}

/**
* Atomically checks whether there is allocatable space for one more container of
* {@code maxContainerSize} given the current pending count, and adds {@code containerID}
* to the current window if so.
*
* @param storageReports storage reports for the datanode
* @param maxContainerSize maximum size of a single container in bytes
* @param containerID the container being allocated
* @return true if space was available and the container was recorded, false otherwise
*/

synchronized boolean checkSpaceAndAdd(
List<StorageReportProto> storageReports, long maxContainerSize, ContainerID containerID) {
final int pendingAllocationCount = getCount();
long allocatableCount = 0;
for (StorageReportProto report : storageReports) {
final long allocatableCountOnThisDisk = VolumeUsage.getUsableSpace(report) / maxContainerSize;
allocatableCount += allocatableCountOnThisDisk;
if (allocatableCount > pendingAllocationCount) {
final boolean added = currentWindow.add(containerID);
LOG.debug("Recorded pending container {} on DataNode {}. Added={}, Total pending={}",
containerID, datanodeID, added, getCount());
return added;
}
}
return false;
}
}

public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, SCMNodeMetrics metrics) {
Expand All @@ -158,52 +177,34 @@ public PendingContainerTracker(long maxContainerSize, long rollIntervalMs, SCMNo
}

/**
* Whether the datanode can fit another container of {@link #maxContainerSize} after accounting for
* SCM pending allocations for {@code node} (this tracker) and usable space across volumes on
* {@code datanodeInfo}. Pending bytes are count × {@code maxContainerSize};
* effective allocatable space sums full-container slots per storage report.
* Atomically checks if the datanode has space for a new container and records the allocation
* if space is available. The check-and-add atomicity is enforced inside
* {@link TwoWindowBucket#checkSpaceAndAdd}.
*
* @param datanodeInfo storage reports for the datanode
* @param datanodeInfo datanode whose storage reports and pending bucket
* @param containerID the container being allocated
* @return true if space was available and the allocation was recorded, false otherwise
*/
public boolean hasEffectiveAllocatableSpaceForNewContainer(DatanodeInfo datanodeInfo) {
public boolean checkSpaceAndRecordAllocation(DatanodeInfo datanodeInfo, ContainerID containerID) {
Objects.requireNonNull(datanodeInfo, "datanodeInfo == null");
Objects.requireNonNull(containerID, "containerID == null");

long pendingAllocationSize = datanodeInfo.getPendingContainerAllocations().getCount() * maxContainerSize;
List<StorageReportProto> storageReports = datanodeInfo.getStorageReports();
Objects.requireNonNull(storageReports, "storageReports == null");
if (storageReports.isEmpty()) {
return false;
}
long effectiveAllocatableSpace = 0L;
for (StorageReportProto report : storageReports) {
long usableSpace = VolumeUsage.getUsableSpace(report);
long containersOnThisDisk = usableSpace / maxContainerSize;
effectiveAllocatableSpace += containersOnThisDisk * maxContainerSize;
if (effectiveAllocatableSpace - pendingAllocationSize >= maxContainerSize) {
return true;
}
}
if (metrics != null) {
metrics.incNumSkippedFullNodeContainerAllocation();
}
return false;
}

/**
* Record a pending container allocation for a single DataNode.
* Container is added to the current window.
*
* @param containerID The container being allocated/replicated
*/
public void recordPendingAllocationForDatanode(DatanodeInfo datanodeInfo, ContainerID containerID) {
Objects.requireNonNull(containerID, "containerID == null");
if (datanodeInfo == null) {
return;
}
final boolean added = datanodeInfo.getPendingContainerAllocations().add(containerID);
if (added && metrics != null) {
metrics.incNumPendingContainersAdded();
boolean added = datanodeInfo.getPendingContainerAllocations()
.checkSpaceAndAdd(storageReports, maxContainerSize, containerID);
if (metrics != null) {
if (added) {
metrics.incNumPendingContainersAdded();
} else {
metrics.incNumSkippedFullNodeContainerAllocation();
}
}
return added;
}

/**
Expand All @@ -222,4 +223,9 @@ public void removePendingAllocation(TwoWindowBucket bucket, ContainerID containe
metrics.incNumPendingContainersRemoved();
}
}

@VisibleForTesting
public SCMNodeMetrics getMetrics() {
return metrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ public SCMNodeManager(
this.pendingContainerTracker = new PendingContainerTracker(
(long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES),
5 * 60 * 1000, // TODO
conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL,
ScmConfigKeys.OZONE_SCM_PENDING_CONTAINER_ROLL_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS),
this.metrics);
this.clusterMap = networkTopology;
this.nodeResolver = nodeResolver;
Expand Down Expand Up @@ -231,6 +234,11 @@ private void unregisterMXBean() {
}
}

@Override
public PendingContainerTracker getPendingContainerTracker() {
return pendingContainerTracker;
}

protected NodeStateManager getNodeStateManager() {
return nodeStateManager;
}
Expand Down Expand Up @@ -1071,28 +1079,15 @@ public DatanodeInfo getDatanodeInfo(DatanodeDetails dn) {
}
}

/**
* Effective space check aligned with container allocation: per-disk slot model minus
* SCM pending allocations.
*/
@Override
public boolean hasSpaceForNewContainerAllocation(DatanodeID datanodeID) {
DatanodeInfo datanodeInfo = getNode(datanodeID);
if (datanodeInfo == null) {
LOG.warn("DatanodeInfo not found for node {}", datanodeID);
return false;
}
return pendingContainerTracker.hasEffectiveAllocatableSpaceForNewContainer(datanodeInfo);
public boolean checkSpaceAndRecordAllocation(DatanodeInfo datanodeInfo, ContainerID containerID) {
return pendingContainerTracker.checkSpaceAndRecordAllocation(datanodeInfo, containerID);
}

@Override
public void recordPendingAllocationForDatanode(DatanodeID datanodeID, ContainerID containerID) {
DatanodeInfo datanodeInfo = getNode(datanodeID);
if (datanodeInfo == null) {
LOG.warn("DatanodeInfo not found for node {}", datanodeID);
return;
}
pendingContainerTracker.recordPendingAllocationForDatanode(datanodeInfo, containerID);
public void removePendingAllocationForDatanode(DatanodeInfo datanodeInfo, ContainerID containerID) {
pendingContainerTracker.removePendingAllocation(
datanodeInfo.getPendingContainerAllocations(), containerID);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ void incNumPendingContainersRemoved() {
numPendingContainersRemoved.incr();
}

public long getNumPendingContainersAdded() {
return numPendingContainersAdded.value();
}

public long getNumPendingContainersRemoved() {
return numPendingContainersRemoved.value();
}

void incNumSkippedFullNodeContainerAllocation() {
numSkippedFullNodeContainerAllocation.incr();
}
Expand Down
Loading