Skip to content

Commit cfe8dd7

Browse files
authored
Merge branch 'dev' into feature/17843-task-timeout-alert-it
2 parents 5f76638 + 70ddf72 commit cfe8dd7

25 files changed

Lines changed: 470 additions & 46 deletions

File tree

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.dolphinscheduler.api.utils.CheckUtils;
5454
import org.apache.dolphinscheduler.api.utils.PageInfo;
5555
import org.apache.dolphinscheduler.api.utils.Result;
56+
import org.apache.dolphinscheduler.api.validator.GlobalParamsValidator;
5657
import org.apache.dolphinscheduler.common.constants.Constants;
5758
import org.apache.dolphinscheduler.common.enums.ReleaseState;
5859
import org.apache.dolphinscheduler.common.enums.UserType;
@@ -211,6 +212,9 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl implements Wo
211212
@Autowired
212213
private WorkflowLineageService workflowLineageService;
213214

215+
@Autowired
216+
private GlobalParamsValidator globalParamsValidator;
217+
214218
/**
215219
* create workflow definition
216220
*
@@ -257,6 +261,9 @@ public Map<String, Object> createWorkflowDefinition(User loginUser,
257261
definition.getName(), definition.getCode());
258262
throw new ServiceException(Status.WORKFLOW_DEFINITION_NAME_EXIST, name);
259263
}
264+
265+
globalParamsValidator.validate(globalParams);
266+
260267
List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
261268
List<WorkflowTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);
262269

@@ -691,6 +698,9 @@ public Map<String, Object> updateWorkflowDefinition(User loginUser,
691698
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
692699
return result;
693700
}
701+
702+
globalParamsValidator.validate(globalParams);
703+
694704
List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
695705
List<WorkflowTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);
696706

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.apache.dolphinscheduler.dao.utils.WorkflowUtils;
8282
import org.apache.dolphinscheduler.extract.master.command.ICommandParam;
8383
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
84+
import org.apache.dolphinscheduler.plugin.task.api.utils.GlobalParameterUtils;
8485
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
8586
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
8687
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
@@ -700,7 +701,7 @@ private void setWorkflowInstance(WorkflowInstance workflowInstance, String sched
700701
schedule = DateUtils.stringToDate(scheduleTime);
701702
}
702703
workflowInstance.setScheduleTime(schedule);
703-
List<Property> globalParamList = JSONUtils.toList(globalParams, Property.class);
704+
List<Property> globalParamList = GlobalParameterUtils.deserializeGlobalParameter(globalParams);
704705
Map<String, String> globalParamMap =
705706
globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
706707
globalParams = curingGlobalParamsService.curingGlobalParams(workflowInstance.getId(), globalParamMap,
@@ -825,14 +826,15 @@ public Map<String, Object> viewVariables(long projectCode, Integer workflowInsta
825826

826827
// global param string
827828
String globalParamStr =
828-
ParameterUtils.convertParameterPlaceholders(JSONUtils.toJsonString(globalParams), timeParams);
829-
globalParams = JSONUtils.toList(globalParamStr, Property.class);
829+
ParameterUtils.convertParameterPlaceholders(GlobalParameterUtils.serializeGlobalParameter(globalParams),
830+
timeParams);
831+
globalParams = GlobalParameterUtils.deserializeGlobalParameter(globalParamStr);
830832
for (Property property : globalParams) {
831833
timeParams.put(property.getProp(), property.getValue());
832834
}
833835

834836
if (userDefinedParams != null && userDefinedParams.length() > 0) {
835-
globalParams = JSONUtils.toList(userDefinedParams, Property.class);
837+
globalParams = GlobalParameterUtils.deserializeGlobalParameter(userDefinedParams);
836838
}
837839

838840
Map<String, Map<String, Object>> localUserDefParams = getLocalParams(workflowInstance, timeParams);
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.validator;
19+
20+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
21+
import org.apache.dolphinscheduler.plugin.task.api.utils.GlobalParameterUtils;
22+
23+
import org.apache.commons.lang3.StringUtils;
24+
25+
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Set;
28+
29+
import lombok.extern.slf4j.Slf4j;
30+
31+
import org.springframework.stereotype.Component;
32+
33+
/**
34+
* Validates global parameters: non-empty keys, no duplicates
35+
* <p> If globalParams is not valid, an {@link IllegalArgumentException} will be thrown. </p>
36+
*/
37+
@Slf4j
38+
@Component
39+
public class GlobalParamsValidator implements IValidator<String> {
40+
41+
@Override
42+
public void validate(String globalParams) {
43+
if (StringUtils.isBlank(globalParams)) {
44+
return;
45+
}
46+
47+
List<Property> params;
48+
try {
49+
params = GlobalParameterUtils.deserializeGlobalParameter(globalParams);
50+
} catch (Exception ex) {
51+
throw new IllegalArgumentException("Invalid globalParams", ex);
52+
}
53+
54+
if (params == null || params.isEmpty()) {
55+
return;
56+
}
57+
58+
Set<String> keys = new HashSet<>();
59+
for (Property p : params) {
60+
if (StringUtils.isBlank(p.getProp())) {
61+
throw new IllegalArgumentException("Global param key cannot be empty");
62+
}
63+
64+
String key = p.getProp().trim();
65+
if (!keys.add(key)) {
66+
throw new IllegalArgumentException("Duplicate global param key: " + key);
67+
}
68+
}
69+
}
70+
}

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
4040
import org.apache.dolphinscheduler.api.service.impl.WorkflowDefinitionServiceImpl;
4141
import org.apache.dolphinscheduler.api.utils.PageInfo;
42+
import org.apache.dolphinscheduler.api.validator.GlobalParamsValidator;
4243
import org.apache.dolphinscheduler.common.constants.Constants;
4344
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
4445
import org.apache.dolphinscheduler.common.enums.Priority;
@@ -194,6 +195,9 @@ public class WorkflowDefinitionServiceTest extends BaseServiceTestTool {
194195
@Mock
195196
private UserMapper userMapper;
196197

198+
@Mock
199+
private GlobalParamsValidator globalParamsValidator;
200+
197201
protected User user;
198202
protected Exception exception;
199203
protected final static long projectCode = 1L;
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.validator;
19+
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
22+
import org.junit.jupiter.api.Test;
23+
import org.junit.jupiter.api.extension.ExtendWith;
24+
import org.mockito.InjectMocks;
25+
import org.mockito.junit.jupiter.MockitoExtension;
26+
27+
@ExtendWith(MockitoExtension.class)
28+
class GlobalParamsValidatorTest {
29+
30+
@InjectMocks
31+
private GlobalParamsValidator globalParamsValidator;
32+
33+
@Test
34+
void testValidate_blankInput() {
35+
globalParamsValidator.validate(null);
36+
globalParamsValidator.validate("");
37+
globalParamsValidator.validate(" ");
38+
}
39+
40+
@Test
41+
void testValidate_invalidJsonFormat() {
42+
String invalidJson = "{key: value}";
43+
44+
assertThatThrownBy(() -> globalParamsValidator.validate(invalidJson))
45+
.isInstanceOf(IllegalArgumentException.class)
46+
.hasMessage("Invalid globalParams");
47+
}
48+
49+
@Test
50+
void testValidate_emptyList() {
51+
globalParamsValidator.validate("[]");
52+
}
53+
54+
@Test
55+
void testValidate_duplicateKeys() {
56+
String jsonWithDupes = "[{\"prop\":\"app_name\",\"value\":\"A\"},{\"prop\":\"app_name\",\"value\":\"B\"}]";
57+
58+
assertThatThrownBy(() -> globalParamsValidator.validate(jsonWithDupes))
59+
.isInstanceOf(IllegalArgumentException.class)
60+
.hasMessageContaining("Duplicate global param key: app_name");
61+
}
62+
63+
@Test
64+
void testValidate_duplicateKeysAfterTrim() {
65+
String jsonWithSpaces = "[{\"prop\":\" app_name \",\"value\":\"A\"},{\"prop\":\"app_name\",\"value\":\"B\"}]";
66+
67+
assertThatThrownBy(() -> globalParamsValidator.validate(jsonWithSpaces))
68+
.isInstanceOf(IllegalArgumentException.class)
69+
.hasMessageContaining("Duplicate global param key: app_name");
70+
}
71+
72+
@Test
73+
void testValidate_emptyKey() {
74+
String jsonEmptyKey = "[{\"prop\":\"\",\"value\":\"test\"}]";
75+
assertThatThrownBy(() -> globalParamsValidator.validate(jsonEmptyKey))
76+
.isInstanceOf(IllegalArgumentException.class)
77+
.hasMessage("Global param key cannot be empty");
78+
79+
String jsonTab = "[{\"prop\":\"\\t\",\"value\":\"test\"}]";
80+
assertThatThrownBy(() -> globalParamsValidator.validate(jsonTab))
81+
.isInstanceOf(IllegalArgumentException.class)
82+
.hasMessage("Global param key cannot be empty");
83+
84+
String jsonNewLine = "[{\"prop\":\"\\n\",\"value\":\"test\"}]";
85+
assertThatThrownBy(() -> globalParamsValidator.validate(jsonNewLine))
86+
.isInstanceOf(IllegalArgumentException.class)
87+
.hasMessage("Global param key cannot be empty");
88+
89+
String jsonMixed = "[{\"prop\":\" \\t \",\"value\":\"test\"}]";
90+
assertThatThrownBy(() -> globalParamsValidator.validate(jsonMixed))
91+
.isInstanceOf(IllegalArgumentException.class)
92+
.hasMessage("Global param key cannot be empty");
93+
}
94+
95+
@Test
96+
void testValidate_missingKeyField() {
97+
String jsonMissingKey = "[{\"value\":\"test\"}]";
98+
99+
assertThatThrownBy(() -> globalParamsValidator.validate(jsonMissingKey))
100+
.isInstanceOf(IllegalArgumentException.class)
101+
.hasMessage("Global param key cannot be empty");
102+
}
103+
104+
@Test
105+
void testValidate_validParameters() {
106+
String validJson = "[" +
107+
"{\"prop\":\"workflow_id\",\"value\":\"1001\",\"direct\":\"IN\"}," +
108+
"{\"prop\":\"env\",\"value\":\"\",\"direct\":\"IN\"}," +
109+
"{\"prop\":\"env2\",\"value\":\" \",\"direct\":\"IN\"}," +
110+
"{\"prop\":\"env3\",\"value\":\" \",\"direct\":\"OUT\"}" +
111+
"]";
112+
113+
globalParamsValidator.validate(validJson);
114+
}
115+
}

dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowDefinition.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import org.apache.dolphinscheduler.common.enums.Flag;
2121
import org.apache.dolphinscheduler.common.enums.ReleaseState;
2222
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionTypeEnum;
23-
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2423
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
24+
import org.apache.dolphinscheduler.plugin.task.api.utils.GlobalParameterUtils;
2525

2626
import java.util.ArrayList;
2727
import java.util.Date;
@@ -134,7 +134,7 @@ public void set(long projectCode,
134134
}
135135

136136
public void setGlobalParams(String globalParams) {
137-
this.globalParamList = JSONUtils.toList(globalParams, Property.class);
137+
this.globalParamList = GlobalParameterUtils.deserializeGlobalParameter(globalParams);
138138
if (this.globalParamList == null) {
139139
this.globalParamList = new ArrayList<>();
140140
}
@@ -143,7 +143,7 @@ public void setGlobalParams(String globalParams) {
143143

144144
public Map<String, String> getGlobalParamMap() {
145145
if (globalParamMap == null && !Strings.isNullOrEmpty(globalParams)) {
146-
List<Property> propList = JSONUtils.toList(globalParams, Property.class);
146+
List<Property> propList = GlobalParameterUtils.deserializeGlobalParameter(globalParams);
147147
globalParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
148148
}
149149

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.dolphinscheduler.extract.master.command.ICommandParam;
2828
import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
2929
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
30+
import org.apache.dolphinscheduler.plugin.task.api.utils.GlobalParameterUtils;
3031
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
3132
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
3233
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
@@ -123,7 +124,8 @@ private String mergeCommandParamsWithWorkflowParams(final Command command,
123124
Optional.ofNullable(JSONUtils.parseObject(command.getCommandParam(), ICommandParam.class))
124125
.map(ICommandParam::getCommandParams)
125126
.orElse(null);
126-
final List<Property> globalParamsList = JSONUtils.toList(workflowDefinition.getGlobalParams(), Property.class);
127+
final List<Property> globalParamsList =
128+
GlobalParameterUtils.deserializeGlobalParameter(workflowDefinition.getGlobalParams());
127129
Map<String, Property> finalParams = new HashMap<>();
128130
if (CollectionUtils.isNotEmpty(globalParamsList)) {
129131
globalParamsList.forEach(globalParam -> finalParams.put(globalParam.getProp(), globalParam));

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/plugin/switchtask/SwitchLogicTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
2525
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
2626
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
27+
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
2728
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.AbstractLogicTask;
2829
import org.apache.dolphinscheduler.server.master.engine.executor.plugin.ITaskParameterDeserializer;
2930
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
@@ -90,8 +91,7 @@ private void moveToDefaultBranch() {
9091
private void calculateSwitchBranch() {
9192
List<SwitchResultVo> switchResultVos = taskParameters.getSwitchResult().getDependTaskList();
9293
Map<String, Property> globalParams = taskExecutionContext.getPrepareParamsMap();
93-
Map<String, Property> varParams = JSONUtils
94-
.toList(taskInstance.getVarPool(), Property.class)
94+
Map<String, Property> varParams = VarPoolUtils.deserializeVarPool(taskInstance.getVarPool())
9595
.stream()
9696
.collect(Collectors.toMap(Property::getProp, Property -> Property));
9797

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.dolphinscheduler.common.constants.Constants;
2323
import org.apache.dolphinscheduler.common.enums.Flag;
2424
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
25-
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2625
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
2726
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
2827
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -41,6 +40,7 @@
4140
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
4241
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
4342
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
43+
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
4444
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
4545
import org.apache.dolphinscheduler.service.process.ProcessService;
4646

@@ -299,7 +299,7 @@ private DependResult dependResultBySingleTaskInstance(WorkflowInstance workflowI
299299
* @param endTime
300300
*/
301301
private void addItemVarPool(String varPoolStr, Long endTime) {
302-
List<Property> varPool = new ArrayList<>(JSONUtils.toList(varPoolStr, Property.class));
302+
List<Property> varPool = new ArrayList<>(VarPoolUtils.deserializeVarPool(varPoolStr));
303303
if (!varPool.isEmpty()) {
304304
Map<String, Property> varPoolPropertyMap = varPool.stream().filter(p -> p.getDirect().equals(Direct.OUT))
305305
.collect(Collectors.toMap(Property::getProp, Function.identity()));

0 commit comments

Comments
 (0)