Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public enum AlertType {

/**
* 0 workflow instance failure, 1 workflow instance success, 2 workflow instance blocked, 3 workflow instance timeout, 4 fault tolerance warning,
* 5 task failure, 6 task success, 7 task timeout, 8 close alert
* 5 task failure, 6 task success, 7 task timeout, 8 task result
*/
WORKFLOW_INSTANCE_FAILURE(0, "workflow instance failure"),
WORKFLOW_INSTANCE_SUCCESS(1, "workflow instance success"),
Expand All @@ -39,6 +39,7 @@ public enum AlertType {
TASK_FAILURE(5, "task failure"),
TASK_SUCCESS(6, "task success"),
TASK_TIMEOUT(7, "task timeout"),
TASK_RESULT(8, "task result");
;

AlertType(int code, String descp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testDeleteByCode() {
@Test
public void testNullPropertyValueOfLocalParams() {
String definitionJson =
"{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":[{\\\"direct\\\":2,\\\"type\\\":3,\\\"prop\\\":\\\"key\\\"}],\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendEmail\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}";
"{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":[{\\\"direct\\\":2,\\\"type\\\":3,\\\"prop\\\":\\\"key\\\"}],\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendAlert\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}";
TaskDefinition definition = JSONUtils.parseObject(definitionJson, TaskDefinition.class);

Map<String, String> taskParamsMap = definition.getTaskParamMap();
Expand All @@ -157,7 +157,7 @@ public void testNullPropertyValueOfLocalParams() {
@Test
public void testNullLocalParamsOfTaskParams() {
String definitionJson =
"{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":null,\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendEmail\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}";
"{\"failRetryTimes\":\"0\",\"timeoutNotifyStrategy\":\"\",\"code\":\"5195043558720\",\"flag\":\"YES\",\"environmentCode\":\"-1\",\"taskDefinitionIndex\":2,\"taskPriority\":\"MEDIUM\",\"taskParams\":\"{\\\"preStatements\\\":null,\\\"postStatements\\\":null,\\\"type\\\":\\\"ADB_MYSQL\\\",\\\"database\\\":\\\"lijia\\\",\\\"sql\\\":\\\"create table nation_${random_serial_number} as select * from nation\\\",\\\"localParams\\\":null,\\\"Name\\\":\\\"create_table_as_select_nation\\\",\\\"FailRetryTimes\\\":0,\\\"dbClusterId\\\":\\\"amv-bp10o45925jpx959\\\",\\\"sendAlert\\\":false,\\\"displayRows\\\":10,\\\"limit\\\":10000,\\\"agentSource\\\":\\\"Workflow\\\",\\\"agentVersion\\\":\\\"Unkown\\\"}\",\"timeout\":\"0\",\"taskType\":\"ADB_MYSQL\",\"timeoutFlag\":\"CLOSE\",\"projectCode\":\"5191800302720\",\"name\":\"create_table_as_select_nation\",\"delayTime\":\"0\",\"workerGroup\":\"default\"}";
TaskDefinition definition = JSONUtils.parseObject(definitionJson, TaskDefinition.class);

Assertions.assertNull(definition.getTaskParamMap(), "Serialize the task definition success");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.dolphinscheduler.extract.alert.request;

import org.apache.dolphinscheduler.common.enums.AlertType;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand All @@ -32,6 +34,6 @@ public class AlertSendRequest {

private String content;

private int warnType;
private AlertType alertType;

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;

import java.util.Map;
Expand Down Expand Up @@ -58,7 +58,7 @@ public abstract class AbstractTask {

protected boolean needAlert = false;

protected TaskAlertInfo taskAlertInfo;
protected TaskResultAlertInfo taskResultAlertInfo;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't add this into AbstractTask, this should only used at SqlTask?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't add this into AbstractTask, this should only used at SqlTask?

Excellent. The previously flawed design has been corrected.


/**
* constructor
Expand Down Expand Up @@ -117,12 +117,12 @@ public void setNeedAlert(boolean needAlert) {
this.needAlert = needAlert;
}

public TaskAlertInfo getTaskAlertInfo() {
return taskAlertInfo;
public TaskResultAlertInfo getTaskResultAlertInfo() {
return taskResultAlertInfo;
}

public void setTaskAlertInfo(TaskAlertInfo taskAlertInfo) {
this.taskAlertInfo = taskAlertInfo;
public void setTaskResultAlertInfo(TaskResultAlertInfo taskResultAlertInfo) {
this.taskResultAlertInfo = taskResultAlertInfo;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.plugin.task.api;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;

public interface TaskCallBack {
Expand All @@ -26,4 +27,6 @@ public interface TaskCallBack {
// todo:The pid should put into runtime context
@Deprecated
void updateTaskInstanceInfo(int taskInstanceId);

void sendAlert(int groupId, String title, String content, AlertType alertType);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add this at TaskCallBack, you can see Deprecated at updateTaskInstanceInfo.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add this at TaskCallBack, you can see Deprecated at updateTaskInstanceInfo.

Sorry, but given the current design, TaskCallBack is still the most straightforward solution for cross-module calls.
I couldn't find a clean way for task-sql module to reach the EventBus channel; attempting to do so would require massive changes that seem disproportionate.

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,21 @@

package org.apache.dolphinscheduler.plugin.task.api.model;

public class TaskAlertInfo {
import org.apache.dolphinscheduler.common.enums.AlertType;

private String title;
import lombok.AllArgsConstructor;
import lombok.Data;

private String content;
@Data
@AllArgsConstructor
public class TaskResultAlertInfo {

private Integer alertGroupId;

public String getTitle() {
return title;
}

public void setTitle(String title) {
this.title = title;
}

public String getContent() {
return content;
}
private String title;

public void setContent(String content) {
this.content = content;
}
private String content;

public Integer getAlertGroupId() {
return alertGroupId;
}
private AlertType alertType;

public void setAlertGroupId(Integer alertGroupId) {
this.alertGroupId = alertGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public class SqlParameters extends AbstractParameters {
private int sqlType;

/**
* send email
* send alert
*/
private Boolean sendEmail;
private Boolean sendAlert;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not break compatibility casually.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not break compatibility casually.

OK, Add database migration to rename JSON key sendEmail to sendAlert


/**
* display rows
Expand Down Expand Up @@ -147,12 +147,12 @@ public void setSqlType(int sqlType) {
this.sqlType = sqlType;
}

public Boolean getSendEmail() {
return sendEmail;
public Boolean getSendAlert() {
return sendAlert;
}

public void setSendEmail(Boolean sendEmail) {
this.sendEmail = sendEmail;
public void setSendAlert(Boolean sendAlert) {
this.sendAlert = sendAlert;
}

public int getDisplayRows() {
Expand Down Expand Up @@ -273,7 +273,7 @@ public String toString() {
+ ", datasource=" + datasource
+ ", sql='" + sql + '\''
+ ", sqlType=" + sqlType
+ ", sendEmail=" + sendEmail
+ ", sendAlert=" + sendAlert
+ ", displayRows=" + displayRows
+ ", limit=" + limit
+ ", showType='" + showType + '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class SqlParametersTest {
private final String sql = "select * from t_ds_user";
private final int datasource = 1;
private final int sqlType = 0;
private final Boolean sendEmail = true;
private final Boolean sendAlert = true;
private final int displayRows = 10;
private final String showType = "TABLE";
private final String title = "sql test";
Expand All @@ -58,7 +58,7 @@ public void testSqlParameters() {
sqlParameters.setSql(sql);
sqlParameters.setDatasource(datasource);
sqlParameters.setSqlType(sqlType);
sqlParameters.setSendEmail(sendEmail);
sqlParameters.setSendAlert(sendAlert);
sqlParameters.setDisplayRows(displayRows);
sqlParameters.setShowType(showType);
sqlParameters.setTitle(title);
Expand All @@ -68,7 +68,7 @@ public void testSqlParameters() {
Assertions.assertEquals(sql, sqlParameters.getSql());
Assertions.assertEquals(datasource, sqlParameters.getDatasource());
Assertions.assertEquals(sqlType, sqlParameters.getSqlType());
Assertions.assertEquals(sendEmail, sqlParameters.getSendEmail());
Assertions.assertEquals(sendAlert, sqlParameters.getSendAlert());
Assertions.assertEquals(displayRows, sqlParameters.getDisplayRows());
Assertions.assertEquals(showType, sqlParameters.getShowType());
Assertions.assertEquals(title, sqlParameters.getTitle());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
Expand Down Expand Up @@ -78,6 +79,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl
public void updateTaskInstanceInfo(int taskInstanceId) {

}

@Override
public void sendAlert(int groupId, String title, String content, AlertType alertType) {

}
};

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static org.mockito.ArgumentMatchers.any;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
Expand Down Expand Up @@ -86,6 +87,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl
public void updateTaskInstanceInfo(int taskInstanceId) {

}

@Override
public void sendAlert(int groupId, String title, String content, AlertType alertType) {

}
};

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static org.mockito.ArgumentMatchers.any;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
Expand Down Expand Up @@ -108,6 +109,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl
public void updateTaskInstanceInfo(int taskInstanceId) {

}

@Override
public void sendAlert(int groupId, String title, String content, AlertType alertType) {

}
};

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.plugin.task.sql;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
Expand All @@ -32,7 +33,7 @@
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResultAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
Expand Down Expand Up @@ -154,6 +155,12 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
// execute sql task
executeFuncAndSql(mainStatementSqlBinds, preStatementSqlBinds, postStatementSqlBinds);

if (this.getNeedAlert()) {
log.info("Begin to send sql result alert");
taskCallBack.sendAlert(taskResultAlertInfo.getAlertGroupId(), taskResultAlertInfo.getTitle(),
taskResultAlertInfo.getContent(), taskResultAlertInfo.getAlertType());
}

setExitStatusCode(TaskConstants.EXIT_CODE_SUCCESS);

} catch (Exception e) {
Expand Down Expand Up @@ -282,11 +289,30 @@ private String resultProcess(ResultSet resultSet) throws Exception {
String result = resultJSONArray.isEmpty() ? JSONUtils.toJsonString(generateEmptyRow(resultSet))
: JSONUtils.toJsonString(resultJSONArray);

if (Boolean.TRUE.equals(sqlParameters.getSendEmail())) {
sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle())
? sqlParameters.getTitle()
: taskExecutionContext.getTaskName() + " query result sets", result);
if (Boolean.TRUE.equals(sqlParameters.getSendAlert())) {
int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows()
: TaskConstants.DEFAULT_DISPLAY_ROWS;
String alertContent;
if (resultJSONArray.size() > displayRows) {
ArrayNode truncatedArray = JSONUtils.createArrayNode();
for (int i = 0; i < Math.min(displayRows, resultJSONArray.size()); i++) {
truncatedArray.add(resultJSONArray.get(i));
}
alertContent = JSONUtils.toJsonString(truncatedArray);
log.debug("Alert content truncated to {} rows", displayRows);
} else {
alertContent = result;
}

setNeedAlert(true);
TaskResultAlertInfo taskResultAlertInfo = new TaskResultAlertInfo(sqlParameters.getGroupId(),
StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle()
: taskExecutionContext.getTaskName() + " query result sets",
alertContent,
AlertType.TASK_RESULT);
setTaskResultAlertInfo(taskResultAlertInfo);
}

log.debug("execute sql result : {}", result);
return result;
}
Expand All @@ -311,21 +337,6 @@ private ArrayNode generateEmptyRow(ResultSet resultSet) throws SQLException {
return resultJSONArray;
}

/**
* send alert as an attachment
*
* @param title title
* @param content content
*/
private void sendAttachment(int groupId, String title, String content) {
setNeedAlert(Boolean.TRUE);
TaskAlertInfo taskAlertInfo = new TaskAlertInfo();
taskAlertInfo.setAlertGroupId(groupId);
taskAlertInfo.setContent(content);
taskAlertInfo.setTitle(title);
setTaskAlertInfo(taskAlertInfo);
}

private String executeQuery(Connection connection, SqlBinds sqlBinds, String handlerType) throws Exception {
try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBinds)) {
log.info("{} statement execute query, for sql: {}", handlerType, sqlBinds.getSql());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
Expand Down Expand Up @@ -86,6 +87,11 @@ public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo appl
public void updateTaskInstanceInfo(int taskInstanceId) {

}

@Override
public void sendAlert(int groupId, String title, String content, AlertType alertType) {

}
};

@BeforeEach
Expand Down
Loading
Loading