Skip to content

Commit 3a41e3e

Browse files
abhishekmjainclaude
andcommitted
Add flow group prefix-based concurrency override
When service-level flowConcurrencyAllowed is false and a flow does not have flow.allowConcurrentExecution explicitly set, allow concurrent execution based on a configurable list of flow group prefixes. This avoids bulk-updating 80k flow configs in the DB which would overwhelm the Brooklin CDC stream. New config: gobblin.service.flowConcurrencyAllowed.flowGroupPrefixes Resolution order: 1. Per-flow flow.allowConcurrentExecution if explicitly set 2. Service-level flowConcurrencyAllowed if true 3. Flow group prefix match (new fallback) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent dd659cd commit 3a41e3e

3 files changed

Lines changed: 156 additions & 2 deletions

File tree

gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ public class ServiceConfigKeys {
9494
public static final String FLOW_CONCURRENCY_ALLOWED = GOBBLIN_SERVICE_PREFIX + "flowConcurrencyAllowed";
9595
public static final Boolean DEFAULT_FLOW_CONCURRENCY_ALLOWED = true;
9696

97+
// Comma-separated list of flow group prefixes for which concurrent execution defaults to true.
98+
// Only used as a fallback when a flow does not have flow.allowConcurrentExecution explicitly set
99+
// and the service-level flowConcurrencyAllowed is false.
100+
public static final String FLOW_CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES =
101+
GOBBLIN_SERVICE_PREFIX + "flowConcurrencyAllowed.flowGroupPrefixes";
102+
97103
public static final String LEADER_URL = "leaderUrl";
98104

99105
public static final String QUOTA_MANAGER_CLASS = GOBBLIN_SERVICE_PREFIX + "quotaManager.class";

gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
import java.io.IOException;
2121
import java.lang.reflect.InvocationTargetException;
22+
import java.util.Arrays;
23+
import java.util.Collections;
2224
import java.util.List;
2325
import java.util.Map;
26+
import java.util.stream.Collectors;
2427

2528
import org.apache.commons.lang3.reflect.ConstructorUtils;
2629

@@ -82,6 +85,7 @@ public class FlowCompilationValidationHelper {
8285
private final EventSubmitter eventSubmitter;
8386
private final DagManagementStateStore dagManagementStateStore;
8487
private final boolean isFlowConcurrencyEnabled;
88+
private final List<String> concurrencyAllowedFlowGroupPrefixes;
8589

8690
@Inject
8791
public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
@@ -102,6 +106,9 @@ public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton
102106
this.dagManagementStateStore = dagManagementStateStore;
103107
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
104108
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
109+
String prefixesCsv = ConfigUtils.getString(config, ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES, "");
110+
this.concurrencyAllowedFlowGroupPrefixes = prefixesCsv.isEmpty() ? Collections.emptyList()
111+
: Arrays.stream(prefixesCsv.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
105112
}
106113

107114
/**
@@ -148,13 +155,16 @@ public Optional<Dag<JobExecutionPlan>> createExecutionPlanIfValid(FlowSpec flowS
148155
/**
149156
* Checks if flowSpec disallows concurrent executions, and if so then checks if another instance of the flow is
150157
* already running and emits a FLOW FAILED event. Otherwise, this check passes.
158+
* Concurrent execution is resolved in the following order:
159+
* 1. Per-flow {@link ConfigurationKeys#FLOW_ALLOW_CONCURRENT_EXECUTION} if explicitly set
160+
* 2. Service-level {@link ServiceConfigKeys#FLOW_CONCURRENCY_ALLOWED} (if true, allows concurrency)
161+
* 3. Flow group prefix match via {@link ServiceConfigKeys#FLOW_CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES}
151162
* @return Optional<Dag<JobExecutionPlan>> if caller allowed to execute flow and compile flowSpec, else Optional.absent()
152163
* @throws IOException
153164
*/
154165
public Optional<Dag<JobExecutionPlan>> validateAndHandleConcurrentExecution(Config flowConfig, FlowSpec flowSpec,
155166
String flowGroup, String flowName, Map<String,String> flowMetadata) throws IOException {
156-
boolean allowConcurrentExecution = Boolean.parseBoolean(ConfigUtils.getString(flowConfig,
157-
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, String.valueOf(this.isFlowConcurrencyEnabled)));
167+
boolean allowConcurrentExecution = resolveAllowConcurrentExecution(flowConfig, flowGroup);
158168

159169
Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(flowSpec);
160170

@@ -212,6 +222,32 @@ private boolean isExecutionPermitted(String flowGroup, String flowName, long flo
212222
return allowConcurrentExecution || !isPriorFlowExecutionRunning(flowGroup, flowName, flowExecutionId, dagManagementStateStore);
213223
}
214224

225+
/**
226+
* Resolves whether concurrent execution is allowed for a flow. The resolution order is:
227+
* 1. Per-flow {@link ConfigurationKeys#FLOW_ALLOW_CONCURRENT_EXECUTION} if explicitly set in the flow config
228+
* 2. Service-level {@link ServiceConfigKeys#FLOW_CONCURRENCY_ALLOWED} if true
229+
* 3. Flow group prefix match via {@link ServiceConfigKeys#FLOW_CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES}
230+
*/
231+
@VisibleForTesting
232+
boolean resolveAllowConcurrentExecution(Config flowConfig, String flowGroup) {
233+
if (flowConfig.hasPath(ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION)) {
234+
return flowConfig.getBoolean(ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION);
235+
}
236+
if (this.isFlowConcurrencyEnabled) {
237+
return true;
238+
}
239+
return isFlowGroupConcurrencyAllowed(flowGroup);
240+
}
241+
242+
private boolean isFlowGroupConcurrencyAllowed(String flowGroup) {
243+
for (String prefix : this.concurrencyAllowedFlowGroupPrefixes) {
244+
if (flowGroup.startsWith(prefix)) {
245+
return true;
246+
}
247+
}
248+
return false;
249+
}
250+
215251
/**
216252
* Returns true if any previous execution for the flow determined by the provided flowGroup, flowName, flowExecutionId is running.
217253
* We ignore the execution that has the provided flowExecutionId so that if first attempt of some LaunchDagProc fails

gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,22 @@
4141
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
4242
import org.apache.gobblin.metrics.event.TimingEvent;
4343
import org.apache.gobblin.service.ExecutionStatus;
44+
import org.apache.gobblin.service.ServiceConfigKeys;
4445
import org.apache.gobblin.service.modules.flowgraph.Dag;
4546
import org.apache.gobblin.service.modules.orchestration.DagManagerTest;
4647
import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
4748
import org.apache.gobblin.service.modules.orchestration.DagTestUtils;
4849
import org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStore;
4950
import org.apache.gobblin.service.modules.orchestration.MySqlDagManagementStateStoreTest;
51+
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
5052
import org.apache.gobblin.service.modules.orchestration.proc.LaunchDagProcTest;
5153
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
5254
import org.apache.gobblin.service.monitoring.FlowStatus;
5355
import org.apache.gobblin.service.monitoring.JobStatus;
5456
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
5557

5658
import static org.mockito.ArgumentMatchers.anyString;
59+
import static org.mockito.Mockito.mock;
5760
import static org.mockito.Mockito.spy;
5861
import static org.mockito.Mockito.when;
5962

@@ -207,6 +210,115 @@ public void testSameFlowExecAlreadyCompiledWithinJobStartDeadline() throws IOExc
207210
flowStartTime, this.dagManagementStateStore));
208211
}
209212

213+
/**
214+
* Helper to build a {@link FlowCompilationValidationHelper} with the given service-level config.
215+
*/
216+
private FlowCompilationValidationHelper buildHelper(Config serviceConfig) throws Exception {
217+
ITestMetastoreDatabase db = TestMetastoreDatabaseFactory.get();
218+
MySqlDagManagementStateStore dmss = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(db));
219+
LaunchDagProcTest.mockDMSSCommonBehavior(dmss);
220+
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new SharedFlowMetricsSingleton(serviceConfig);
221+
return new FlowCompilationValidationHelper(serviceConfig, sharedFlowMetricsSingleton,
222+
mock(UserQuotaManager.class), dmss);
223+
}
224+
225+
@Test
226+
public void testResolveAllowConcurrentExecution_explicitFlowConfigTrue() throws Exception {
227+
// Per-flow setting should take precedence even when service-level is false and no prefix match
228+
Config serviceConfig = ConfigFactory.empty()
229+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false));
230+
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);
231+
232+
Config flowConfig = ConfigFactory.empty()
233+
.withValue(ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, ConfigValueFactory.fromAnyRef(true));
234+
Assert.assertTrue(helper.resolveAllowConcurrentExecution(flowConfig, "unmatched-group"));
235+
}
236+
237+
@Test
238+
public void testResolveAllowConcurrentExecution_explicitFlowConfigFalse() throws Exception {
239+
// Per-flow setting of false should take precedence even when service-level is true
240+
Config serviceConfig = ConfigFactory.empty()
241+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(true));
242+
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);
243+
244+
Config flowConfig = ConfigFactory.empty()
245+
.withValue(ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, ConfigValueFactory.fromAnyRef(false));
246+
Assert.assertFalse(helper.resolveAllowConcurrentExecution(flowConfig, "any-group"));
247+
}
248+
249+
@Test
250+
public void testResolveAllowConcurrentExecution_noFlowConfig_serviceLevelTrue() throws Exception {
251+
// When flow config is absent and service-level is true, should allow without checking prefixes
252+
Config serviceConfig = ConfigFactory.empty()
253+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(true));
254+
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);
255+
256+
Assert.assertTrue(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "unmatched-group"));
257+
}
258+
259+
@Test
260+
public void testResolveAllowConcurrentExecution_noFlowConfig_serviceLevelFalse_prefixMatch() throws Exception {
261+
// When flow config is absent and service-level is false, should fall back to prefix matching
262+
Config serviceConfig = ConfigFactory.empty()
263+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false))
264+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES,
265+
ConfigValueFactory.fromAnyRef("teamA,teamB-prod"));
266+
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);
267+
268+
Assert.assertTrue(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamA-pipeline"));
269+
Assert.assertTrue(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamB-prod-etl"));
270+
}
271+
272+
@Test
273+
public void testResolveAllowConcurrentExecution_noFlowConfig_serviceLevelFalse_noPrefixMatch() throws Exception {
274+
// When flow config is absent, service-level is false, and no prefix matches, should disallow
275+
Config serviceConfig = ConfigFactory.empty()
276+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false))
277+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES,
278+
ConfigValueFactory.fromAnyRef("teamA,teamB-prod"));
279+
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);
280+
281+
Assert.assertFalse(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamC-pipeline"));
282+
}
283+
284+
@Test
285+
public void testResolveAllowConcurrentExecution_noFlowConfig_serviceLevelFalse_noPrefixesConfigured() throws Exception {
286+
// When no prefixes are configured at all, should disallow
287+
Config serviceConfig = ConfigFactory.empty()
288+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false));
289+
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);
290+
291+
Assert.assertFalse(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "any-group"));
292+
}
293+
294+
@Test
295+
public void testResolveAllowConcurrentExecution_noFlowConfig_serviceLevelFalse_blankPrefixesIgnored() throws Exception {
296+
// Blank entries from trailing/double commas should not match all flow groups
297+
Config serviceConfig = ConfigFactory.empty()
298+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false))
299+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES,
300+
ConfigValueFactory.fromAnyRef("teamA,,teamB, "));
301+
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);
302+
303+
Assert.assertTrue(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamA-pipeline"));
304+
Assert.assertTrue(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamB-pipeline"));
305+
Assert.assertFalse(helper.resolveAllowConcurrentExecution(ConfigFactory.empty(), "teamC-pipeline"));
306+
}
307+
308+
@Test
309+
public void testResolveAllowConcurrentExecution_explicitFlowConfigFalse_overridesPrefixMatch() throws Exception {
310+
// Explicit per-flow false should take precedence even when prefix matches
311+
Config serviceConfig = ConfigFactory.empty()
312+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED, ConfigValueFactory.fromAnyRef(false))
313+
.withValue(ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED_FLOWGROUP_PREFIXES,
314+
ConfigValueFactory.fromAnyRef("teamA"));
315+
FlowCompilationValidationHelper helper = buildHelper(serviceConfig);
316+
317+
Config flowConfig = ConfigFactory.empty()
318+
.withValue(ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, ConfigValueFactory.fromAnyRef(false));
319+
Assert.assertFalse(helper.resolveAllowConcurrentExecution(flowConfig, "teamA-pipeline"));
320+
}
321+
210322
private void insertFlowIntoDMSSMock(String flowGroup, String flowName, long flowStartTime, ExecutionStatus executionStatus, Config config)
211323
throws URISyntaxException, IOException {
212324
List<FlowStatus> list = new ArrayList<>();

0 commit comments

Comments
 (0)