Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ public class ServiceConfigKeys {
public static final String FLOW_CONCURRENCY_ALLOWED = GOBBLIN_SERVICE_PREFIX + "flowConcurrencyAllowed";
public static final Boolean DEFAULT_FLOW_CONCURRENCY_ALLOWED = true;

// Comma-separated list of flow group prefixes for which concurrent execution defaults to true.
// Only used as a fallback when a flow does not have flow.allowConcurrentExecution explicitly set
// and the service-level flowConcurrencyAllowed is false.
public static final String CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES =
GOBBLIN_SERVICE_PREFIX + "concurrencyAllowedFlowGroupPrefixes";

public static final String LEADER_URL = "leaderUrl";

public static final String QUOTA_MANAGER_CLASS = GOBBLIN_SERVICE_PREFIX + "quotaManager.class";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.reflect.ConstructorUtils;

Expand Down Expand Up @@ -82,6 +85,7 @@ public class FlowCompilationValidationHelper {
private final EventSubmitter eventSubmitter;
private final DagManagementStateStore dagManagementStateStore;
private final boolean isFlowConcurrencyEnabled;
private final List<String> concurrencyAllowedFlowGroupPrefixes;

@Inject
public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Expand All @@ -102,6 +106,9 @@ public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton
this.dagManagementStateStore = dagManagementStateStore;
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
String prefixesCsv = ConfigUtils.getString(config, ServiceConfigKeys.CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES, "");
this.concurrencyAllowedFlowGroupPrefixes = prefixesCsv.isEmpty() ? Collections.emptyList()
: Arrays.stream(prefixesCsv.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -148,13 +155,16 @@ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowS
/**
* Checks if flowSpec disallows concurrent executions, and if so then checks if another instance of the flow is
* already running and emits a FLOW FAILED event. Otherwise, this check passes.
* Concurrent execution is resolved in the following order:
* 1. Per-flow {@link ConfigurationKeys#FLOW_ALLOW_CONCURRENT_EXECUTION} if explicitly set
* 2. Service-level {@link ServiceConfigKeys#FLOW_CONCURRENCY_ALLOWED} (if true, allows concurrency)
* 3. Flow group prefix match via {@link ServiceConfigKeys#CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES}
* @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow and compile flowSpec, else Optional.absent()
* @throws IOException
*/
public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
String flowGroup, String flowName, Map<String,String> flowMetadata) throws IOException {
boolean allowConcurrentExecution = Boolean.parseBoolean(ConfigUtils.getString(flowConfig,
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, String.valueOf(this.isFlowConcurrencyEnabled)));
boolean allowConcurrentExecution = resolveAllowConcurrentExecution(flowConfig, flowGroup);

Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(flowSpec);

Expand Down Expand Up @@ -212,6 +222,32 @@ private boolean isExecutionPermitted(String flowGroup, String flowName, long flo
return allowConcurrentExecution || !isPriorFlowExecutionRunning(flowGroup, flowName, flowExecutionId, dagManagementStateStore);
}

/**
* Resolves whether concurrent execution is allowed for a flow. The resolution order is:
* 1. Per-flow {@link ConfigurationKeys#FLOW_ALLOW_CONCURRENT_EXECUTION} if explicitly set in the flow config
* 2. Service-level {@link ServiceConfigKeys#FLOW_CONCURRENCY_ALLOWED} if true
* 3. Flow group prefix match via {@link ServiceConfigKeys#CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES}
*/
@VisibleForTesting
boolean resolveAllowConcurrentExecution(Config flowConfig, String flowGroup) {
if (flowConfig.hasPath(ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION)) {
return flowConfig.getBoolean(ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION);
}
if (this.isFlowConcurrencyEnabled) {
return true;
}
return isFlowGroupConcurrencyAllowed(flowGroup);
}

private boolean isFlowGroupConcurrencyAllowed(String flowGroup) {
for (String prefix : this.concurrencyAllowedFlowGroupPrefixes) {
if (flowGroup.startsWith(prefix)) {
return true;
}
}
return false;
}

/**
* Returns true if any previous execution for the flow determined by the provided flowGroup, flowName, flowExecutionId is running.
* We ignore the execution that has the provided flowExecutionId so that if first attempt of some LaunchDagProc fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,22 @@
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.DagTestUtils;
import org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProcTest;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatus;
import org.apache.gobblin.service.monitoring.JobStatus;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -207,6 +210,115 @@ public void testSameFlowExecAlreadyCompiledWithinJobStartDeadline() throws IOExc
flowStartTime, this.dagManagementStateStore));
}

/**
* Helper to build a {@link FlowCompilationValidationHelper} with the given service-level config.
*/
private FlowCompilationValidationHelper buildHelper(Config serviceConfig) throws Exception {
ITestMetastoreDatabase db = TestMetastoreDatabaseFactory.get();
MySqlDagManagementStateStore dmss = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(db));
LaunchDagProcTest.mockDMSSCommonBehavior(dmss);
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(serviceConfig);
return new FlowCompilationValidationHelper(serviceConfig, sharedFlowMetricsSingleton,
mock(UserQuotaManager.class), dmss);
}

@Test
public void testResolveAllowConcurrentExecution_explicitFlowConfigTrue() throws Exception {
// Per-flow setting should take precedence even when service-level is false and no prefix match
Config serviceConfig = ConfigFactory.empty()
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false));
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);

Config flowConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, ConfigValueFactory.fromAnyRef(true));
Assert.assertTrue(helper.resolveAllowConcurrentExecution(flowConfig, "unmatched-group"));
}

@Test
public void testResolveAllowConcurrentExecution_explicitFlowConfigFalse() throws Exception {
// Per-flow setting of false should take precedence even when service-level is true
Config serviceConfig = ConfigFactory.empty()
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(true));
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);

Config flowConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, ConfigValueFactory.fromAnyRef(false));
Assert.assertFalse(helper.resolveAllowConcurrentExecution(flowConfig, "any-group"));
}

@Test
public void testResolveAllowConcurrentExecution_noFlowConfig_serviceLevelTrue() throws Exception {
// When flow config is absent and service-level is true, should allow without checking prefixes
Config serviceConfig = ConfigFactory.empty()
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(true));
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);

Assert.assertTrue(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "unmatched-group"));
}

@Test
public void testResolveAllowConcurrentExecution_noFlowConfig_serviceLevelFalse_prefixMatch() throws Exception {
// When flow config is absent and service-level is false, should fall back to prefix matching
Config serviceConfig = ConfigFactory.empty()
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false))
.withValue(ServiceConfigKeys.CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES,
ConfigValueFactory.fromAnyRef("teamA,teamB-prod"));
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);

Assert.assertTrue(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamA-pipeline"));
Assert.assertTrue(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamB-prod-etl"));
}

@Test
public void testResolveAllowConcurrentExecution_noFlowConfig_serviceLevelFalse_noPrefixMatch() throws Exception {
// When flow config is absent, service-level is false, and no prefix matches, should disallow
Config serviceConfig = ConfigFactory.empty()
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false))
.withValue(ServiceConfigKeys.CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES,
ConfigValueFactory.fromAnyRef("teamA,teamB-prod"));
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);

Assert.assertFalse(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamC-pipeline"));
}

@Test
public void testResolveAllowConcurrentExecution_noFlowConfig_serviceLevelFalse_noPrefixesConfigured() throws Exception {
// When no prefixes are configured at all, should disallow
Config serviceConfig = ConfigFactory.empty()
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false));
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);

Assert.assertFalse(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "any-group"));
}

@Test
public void testResolveAllowConcurrentExecution_noFlowConfig_serviceLevelFalse_blankPrefixesIgnored() throws Exception {
// Blank entries from trailing/double commas should not match all flow groups
Config serviceConfig = ConfigFactory.empty()
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false))
.withValue(ServiceConfigKeys.CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES,
ConfigValueFactory.fromAnyRef("teamA,,teamB, "));
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);

Assert.assertTrue(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamA-pipeline"));
Assert.assertTrue(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamB-pipeline"));
Assert.assertFalse(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamC-pipeline"));
}

@Test
public void testResolveAllowConcurrentExecution_explicitFlowConfigFalse_overridesPrefixMatch() throws Exception {
// Explicit per-flow false should take precedence even when prefix matches
Config serviceConfig = ConfigFactory.empty()
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false))
.withValue(ServiceConfigKeys.CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES,
ConfigValueFactory.fromAnyRef("teamA"));
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);

Config flowConfig = ConfigFactory.empty()
.withValue(ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, ConfigValueFactory.fromAnyRef(false));
Assert.assertFalse(helper.resolveAllowConcurrentExecution(flowConfig, "teamA-pipeline"));
}

private void insertFlowIntoDMSSMock(String flowGroup, String flowName, long flowStartTime, ExecutionStatus executionStatus, Config config)
throws URISyntaxException, IOException {
List<FlowStatus> list = new ArrayList<>();
Expand Down
Loading