Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -379,11 +380,18 @@ public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clust

public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent, Set<String> toBeStoppedInstances) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent, toBeStoppedInstances,
false);
}

public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent, Set<String> toBeStoppedInstances,
boolean preserveOrder) throws IOException {
Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
// helix instance check.
List<String> instancesForCustomInstanceLevelChecks =
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks,
toBeStoppedInstances);
toBeStoppedInstances, preserveOrder);
// custom check, includes partition check.
batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
toBeStoppedInstances, finalStoppableChecks, getMapFromJsonPayload(jsonContent));
Expand Down Expand Up @@ -476,12 +484,16 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl

private List<String> batchHelixInstanceStoppableCheck(String clusterId,
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks,
Set<String> toBeStoppedInstances) {
Set<String> toBeStoppedInstances, boolean preserveOrder) {

// Perform all but min_active replicas check in parallel
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));
Collectors.toMap(
Function.identity(),
instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)),
(existing, replacement) -> existing,
preserveOrder ? LinkedHashMap::new : HashMap::new
Comment thread
LZD-PratyushBhatt marked this conversation as resolved.
Comment thread
LZD-PratyushBhatt marked this conversation as resolved.
));

// Perform min_active replicas check sequentially
addMinActiveReplicaChecks(clusterId, helixInstanceChecks, toBeStoppedInstances);
Expand Down Expand Up @@ -618,7 +630,7 @@ private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(
// this is helix own check
instancesForNext =
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks,
Collections.emptySet());
Collections.emptySet(), false);
} else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
// custom check, includes custom Instance check and partition check.
instancesForNext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@
import org.apache.helix.rest.server.json.instance.StoppableCheck;
import org.apache.helix.rest.server.resources.helix.InstancesAccessor;

/**
* This class is used to select stoppable instances based on different selection criteria.
* Selection criteria include:
* 1. Zone-based selection - Select instances from a single zone
* 2. Cross-zone selection - Select instances across multiple zones
* 3. Non-zone-based selection - Select instances regardless of zone
*
* For zone-based selection, instances can be ordered either lexicographically (default) or
* by preserving the original input order when preserveOrder is set to true.
*/
public class StoppableInstancesSelector {
// This type does not belong to real HealthCheck failed reason. Also, if we add this type
// to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl
Expand All @@ -53,16 +63,19 @@ public class StoppableInstancesSelector {
private final MaintenanceManagementService _maintenanceService;
private final ClusterTopology _clusterTopology;
private final ZKHelixDataAccessor _dataAccessor;
private final boolean _preserveOrder;

private StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
String customizedInput, MaintenanceManagementService maintenanceService,
ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor) {
ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor,
boolean preserveOrder) {
_clusterId = clusterId;
_orderOfZone = orderOfZone;
_customizedInput = customizedInput;
_maintenanceService = maintenanceService;
_clusterTopology = clusterTopology;
_dataAccessor = dataAccessor;
_preserveOrder = preserveOrder;
}

/**
Expand All @@ -76,7 +89,7 @@ private StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
* @return An ObjectNode containing:
* - 'stoppableNode': List of instances that can be stopped.
* - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
* a list of reasons for non-stoppability as the value.
* a list of getZoneBasedInstancesreasons for non-stoppability as the value.
* @throws IOException
*/
public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
Expand All @@ -91,7 +104,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
failedStoppableInstances, _preserveOrder);
processNonexistentInstances(instances, failedStoppableInstances);

return result;
Expand Down Expand Up @@ -128,7 +141,7 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
continue;
}
populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
failedStoppableInstances, false);
}
processNonexistentInstances(instances, failedStoppableInstances);
return result;
Expand Down Expand Up @@ -162,16 +175,16 @@ public ObjectNode getStoppableInstancesNonZoneBased(List<String> instances,
List<String> instancesToCheck = new ArrayList<>(instances);
instancesToCheck.removeAll(nonExistingInstances);
populateStoppableInstances(instancesToCheck, toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);
failedStoppableInstances, false);

return result;
}

private void populateStoppableInstances(List<String> instances, Set<String> toBeStoppedInstances,
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException {
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances, boolean preserveOrder) throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
_maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances,
_customizedInput, toBeStoppedInstances);
_customizedInput, toBeStoppedInstances, preserveOrder);

for (Map.Entry<String, StoppableCheck> instanceStoppableCheck : instancesStoppableChecks.entrySet()) {
String instance = instanceStoppableCheck.getKey();
Expand Down Expand Up @@ -251,9 +264,10 @@ public void calculateOrderOfZone(List<String> instances, boolean random) {
* The order of zones can directly come from user input. If user did not specify it, Helix will order
* zones by the number of associated instances in descending order.
*
* @param instances
* @param zoneMapping
* @return
* @param instances List of instances to be considered
* @param zoneMapping Mapping from zone to instances
* @return List of instances in the first non-empty zone. If preserveOrder is true, the original order
* of instances is maintained. If preserveOrder is false (default), instances are sorted lexicographically.
*/
private List<String> getZoneBasedInstances(List<String> instances,
Map<String, Set<String>> zoneMapping) {
Expand All @@ -263,11 +277,21 @@ private List<String> getZoneBasedInstances(List<String> instances,

Set<String> instanceSet = null;
for (String zone : _orderOfZone) {
instanceSet = new TreeSet<>(instances);
Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone));
instanceSet.retainAll(currentZoneInstanceSet);
if (instanceSet.size() > 0) {
return new ArrayList<>(instanceSet);
if (_preserveOrder) {
List<String> filteredInstances = new ArrayList<>(instances);
Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone));
filteredInstances.removeIf(instance -> !currentZoneInstanceSet.contains(instance));
if (!filteredInstances.isEmpty()) {
return filteredInstances;
}
} else {
// Original behavior - lexicographical ordering via TreeSet
instanceSet = new TreeSet<>(instances);
Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone));
instanceSet.retainAll(currentZoneInstanceSet);
if (!instanceSet.isEmpty()) {
return new ArrayList<>(instanceSet);
}
Comment thread
LZD-PratyushBhatt marked this conversation as resolved.
Outdated
}
}

Expand Down Expand Up @@ -319,6 +343,7 @@ public static class StoppableInstancesSelectorBuilder {
private MaintenanceManagementService _maintenanceService;
private ClusterTopology _clusterTopology;
private ZKHelixDataAccessor _dataAccessor;
private boolean _preserveOrder = false; // Default to false for backward compatibility
Comment thread
LZD-PratyushBhatt marked this conversation as resolved.
Outdated

public StoppableInstancesSelectorBuilder setClusterId(String clusterId) {
_clusterId = clusterId;
Expand Down Expand Up @@ -351,9 +376,14 @@ public StoppableInstancesSelectorBuilder setDataAccessor(ZKHelixDataAccessor dat
return this;
}

public StoppableInstancesSelectorBuilder setPreserveOrder(boolean preserveOrder) {
_preserveOrder = preserveOrder;
return this;
}

public StoppableInstancesSelector build() {
return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput,
_maintenanceService, _clusterTopology, _dataAccessor);
_maintenanceService, _clusterTopology, _dataAccessor, _preserveOrder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public enum InstancesProperties {
to_be_stopped_instances,
skip_stoppable_check_list,
customized_values,
preserve_order,
Comment thread
LZD-PratyushBhatt marked this conversation as resolved.
Outdated
instance_stoppable_parallel,
instance_not_stoppable_with_reasons
}
Expand Down Expand Up @@ -304,6 +305,11 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
.asBoolean();
}

boolean preserveOrder = false;
if (node.get(InstancesProperties.preserve_order.name()) != null) {
preserveOrder = node.get(InstancesProperties.preserve_order.name()).asBoolean();
}

ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
if (selectionBase != InstanceHealthSelectionBase.non_zone_based) {
if (!clusterService.isClusterTopologyAware(clusterId)) {
Expand Down Expand Up @@ -354,6 +360,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
.setMaintenanceService(maintenanceService)
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.setPreserveOrder(preserveOrder)
.build();
ObjectNode result;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,74 @@ public void testMultipleReplicasInSameMZ() throws Exception {
System.out.println("End test :" + TestHelper.getTestMethodName());
}

@DataProvider(name = "preserveOrderProvider")
public Object[][] preserveOrderProvider() {
return new Object[][] {
{ true },
{ false }
};
}

@Test(dataProvider = "preserveOrderProvider",
dependsOnMethods = "testMultipleReplicasInSameMZ"
)
public void testMultipleReplicasInSameMZWithPreserveOrder(boolean preserveOrder) throws Exception {
System.out.println("Start test :" + TestHelper.getTestMethodName());
// Create SemiAuto DB so that we can control assignment
String testDb = TestHelper.getTestMethodName() + "_resource_" + preserveOrder;
_gSetupTool.getClusterManagementTool().addResource(STOPPABLE_CLUSTER2, testDb, 3, "MasterSlave",
IdealState.RebalanceMode.SEMI_AUTO.toString());
_gSetupTool.getClusterManagementTool().rebalance(STOPPABLE_CLUSTER2, testDb, 3);

// Manually set ideal state to have the 3 replcias assigned to 3 instances all in the same zone
List<String> preferenceList = Arrays.asList("instance0", "instance1", "instance2");
IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(STOPPABLE_CLUSTER2, testDb);
for (String p : is.getPartitionSet()) {
is.setPreferenceList(p, preferenceList);
}
is.setMinActiveReplicas(2);
_gSetupTool.getClusterManagementTool().setResourceIdealState(STOPPABLE_CLUSTER2, testDb, is);

// Wait for assignments to take place
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER2).setZkAddr(ZK_ADDR).build();
Assert.assertTrue(verifier.verifyByPolling());

// Run stoppable check against the 3 instances where SemiAuto DB was assigned
String content =
String.format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\"],\"%s\":%s}",
InstancesAccessor.InstancesProperties.selection_base.name(),
InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance2", "instance0",
InstancesAccessor.InstancesProperties.preserve_order.name(),
preserveOrder);
Response response =
new JerseyUriRequestBuilder("clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));

String stoppableNode = "instance0";
List<String> nonStoppableNodes = Arrays.asList("instance1", "instance2");
if (preserveOrder) {
stoppableNode = "instance1";
nonStoppableNodes = Arrays.asList("instance0", "instance2");
}
Set<String> stoppableSet = getStringSet(jsonNode,
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
Assert.assertTrue(Collections.singleton(stoppableNode).equals(stoppableSet));

// Next 2 instances should fail stoppable due to MIN_ACTIVE_REPLICA_CHECK_FAILED
JsonNode nonStoppableInstances = jsonNode.get(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Assert.assertFalse(getStringSet(nonStoppableInstances, stoppableNode)
.contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
Assert.assertTrue(getStringSet(nonStoppableInstances, nonStoppableNodes.get(0))
.contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
Assert.assertTrue(getStringSet(nonStoppableInstances, nonStoppableNodes.get(1))
.contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
System.out.println("End test :" + TestHelper.getTestMethodName());
}

@Test(dependsOnMethods = "testMultipleReplicasInSameMZ")
public void testSkipClusterLevelHealthCheck() throws IOException {
System.out.println("Start test :" + TestHelper.getTestMethodName());
Expand Down