Skip to content

Commit b729a9d

Browse files
authored
[INLONG-11877][Manager] Support verifying transform SQL (#11878)
* [INLONG-11877][Manager] Support verifying transform SQL
1 parent 4517d37 commit b729a9d

10 files changed

Lines changed: 267 additions & 0 deletions

File tree

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
3131
import org.apache.inlong.manager.pojo.sink.SinkRequest;
3232
import org.apache.inlong.manager.pojo.sink.StreamSink;
33+
import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
3334

3435
import org.apache.commons.lang3.tuple.Pair;
3536

3637
import java.util.List;
38+
import java.util.Map;
3739

3840
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
3941
import static org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
@@ -171,4 +173,17 @@ public List<SinkField> parseFields(String method, String statement) {
171173
ParseFieldRequest request = ParseFieldRequest.builder().method(method).statement(statement).build();
172174
return parseFields(request);
173175
}
176+
177+
/**
178+
* Parse transform sql for data
179+
*
180+
* @param transformParseRequest the request for parse transform
181+
* @return result of parse result
182+
*/
183+
public Map<String, Object> parseTransform(TransformParseRequest transformParseRequest) {
184+
Response<Map<String, Object>> response =
185+
ClientUtils.executeHttpCall(streamSinkApi.parseTransform(transformParseRequest));
186+
ClientUtils.assertRespSuccess(response);
187+
return response.getData();
188+
}
174189
}

inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
2727
import org.apache.inlong.manager.pojo.sink.SinkRequest;
2828
import org.apache.inlong.manager.pojo.sink.StreamSink;
29+
import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
2930

3031
import retrofit2.Call;
3132
import retrofit2.http.Body;
@@ -36,6 +37,7 @@
3637
import retrofit2.http.Query;
3738

3839
import java.util.List;
40+
import java.util.Map;
3941

4042
public interface StreamSinkApi {
4143

@@ -67,4 +69,6 @@ Call<Response<Boolean>> deleteByKey(@Query("groupId") String groupId, @Query("st
6769
@POST("sink/parseFields")
6870
Call<Response<List<SinkField>>> parseFields(@Body ParseFieldRequest parseFieldRequest);
6971

72+
@POST("sink/parseTransform")
73+
Call<Response<Map<String, Object>>> parseTransform(@Body TransformParseRequest request);
7074
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.manager.pojo.sink;
19+
20+
import org.apache.inlong.manager.common.validation.SaveValidation;
21+
import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
22+
23+
import io.swagger.annotations.ApiModel;
24+
import io.swagger.annotations.ApiModelProperty;
25+
import lombok.Data;
26+
import lombok.EqualsAndHashCode;
27+
import org.hibernate.validator.constraints.Length;
28+
29+
import javax.validation.constraints.NotBlank;
30+
import javax.validation.constraints.Pattern;
31+
32+
import java.util.List;
33+
34+
/**
35+
* Request for Dirty data
36+
*/
37+
@Data
38+
@EqualsAndHashCode(callSuper = false)
39+
@ApiModel("Transform parse request")
40+
public class TransformParseRequest {
41+
42+
@ApiModelProperty("Inlong group id")
43+
@NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class}, message = "inlongGroupId cannot be blank")
44+
@Length(min = 4, max = 256, message = "length must be between 4 and 200")
45+
@Pattern(regexp = "^[a-zA-Z0-9_.-]{4,200}$", message = "only supports letters, numbers, '.', '-', or '_'")
46+
private String inlongGroupId;
47+
48+
@ApiModelProperty("Inlong stream id")
49+
@NotBlank(groups = {SaveValidation.class, UpdateByKeyValidation.class}, message = "inlongStreamId cannot be blank")
50+
@Length(min = 1, max = 256, message = "inlongStreamId length must be between 1 and 200")
51+
@Pattern(regexp = "^[a-zA-Z0-9_.-]{1,200}$", message = "inlongStreamId only supports letters, numbers, '.', '-', or '_'")
52+
private String inlongStreamId;
53+
54+
@ApiModelProperty("Transform sql")
55+
private String transformSql;
56+
57+
@ApiModelProperty("Data")
58+
private String data;
59+
60+
@ApiModelProperty("Sink field list")
61+
private List<SinkField> sinkFieldList;
62+
63+
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,28 @@
2020
import org.apache.inlong.common.enums.DataTypeEnum;
2121
import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
2222
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
23+
import org.apache.inlong.manager.common.exceptions.BusinessException;
2324
import org.apache.inlong.manager.common.util.CommonBeanUtils;
2425
import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
26+
import org.apache.inlong.manager.pojo.sink.SinkField;
2527
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
28+
import org.apache.inlong.manager.pojo.stream.StreamField;
29+
import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
2630
import org.apache.inlong.sdk.transform.decode.SplitUtils;
31+
import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
32+
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
33+
import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
34+
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
35+
import org.apache.inlong.sdk.transform.process.TransformProcessor;
36+
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
2737

2838
import lombok.extern.slf4j.Slf4j;
2939
import org.apache.commons.lang3.StringUtils;
3040
import org.springframework.stereotype.Service;
3141

42+
import java.util.ArrayList;
3243
import java.util.List;
44+
import java.util.Map;
3345

3446
@Slf4j
3547
@Service
@@ -84,4 +96,47 @@ public DataTypeConfig getDataTypeConfig(InlongStreamInfo streamInfo) {
8496
csvConfig.setEscapeChar(escape);
8597
return csvConfig;
8698
}
99+
100+
@Override
101+
public Map<String, Object> parseTransform(InlongStreamInfo streamInfo, List<SinkField> fieldList,
102+
String transformSql,
103+
String data) {
104+
try {
105+
List<org.apache.inlong.sdk.transform.pojo.FieldInfo> srcFields = new ArrayList<>();
106+
List<org.apache.inlong.sdk.transform.pojo.FieldInfo> dstFields = new ArrayList<>();
107+
for (StreamField streamField : streamInfo.getFieldList()) {
108+
if (StringUtils.isNotBlank(streamField.getFieldName())) {
109+
srcFields.add(
110+
new org.apache.inlong.sdk.transform.pojo.FieldInfo(streamField.getFieldName(),
111+
TypeConverter.DefaultTypeConverter()));
112+
}
113+
}
114+
for (SinkField sinkField : fieldList) {
115+
String targetFieldName = sinkField.getFieldName();
116+
if (StringUtils.isNotBlank(targetFieldName)) {
117+
dstFields.add(new org.apache.inlong.sdk.transform.pojo.FieldInfo(targetFieldName));
118+
}
119+
}
120+
char separator = '&';
121+
if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
122+
separator = (char) Integer.parseInt(streamInfo.getDataSeparator());
123+
}
124+
Character escape = null;
125+
if (StringUtils.isNotBlank(streamInfo.getDataEscapeChar())) {
126+
escape = streamInfo.getDataEscapeChar().charAt(0);
127+
}
128+
CsvSourceInfo csvSource = new CsvSourceInfo(streamInfo.getDataEncoding(), separator, escape, srcFields);
129+
MapSinkInfo mapSinkInfo = new MapSinkInfo(streamInfo.getDataEncoding(), dstFields);
130+
TransformConfig config = new TransformConfig(transformSql);
131+
TransformProcessor<String, Map<String, Object>> processor = TransformProcessor
132+
.create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
133+
SinkEncoderFactory.createMapEncoder(mapSinkInfo));
134+
List<Map<String, Object>> result = processor.transform(data);
135+
log.info("success parse transform sql result={}", result);
136+
return result.get(0);
137+
} catch (Exception e) {
138+
log.error("parse transform sql failed", e);
139+
throw new BusinessException("parse transform sql failed" + e.getMessage());
140+
}
141+
}
87142
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/DataTypeOperator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.apache.inlong.manager.common.exceptions.BusinessException;
2323
import org.apache.inlong.manager.common.util.CommonBeanUtils;
2424
import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
25+
import org.apache.inlong.manager.pojo.sink.SinkField;
2526
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
2627

2728
import java.util.List;
29+
import java.util.Map;
2830

2931
/**
3032
* Data type operator
@@ -51,4 +53,10 @@ default DataTypeConfig getDataTypeConfig(InlongStreamInfo streamInfo) {
5153
String.format("current type is not support for data type=%s", streamInfo.getDataType()));
5254
}
5355

56+
default Map<String, Object> parseTransform(InlongStreamInfo streamInfo, List<SinkField> fieldList,
57+
String transformSql, String data) {
58+
throw new BusinessException(
59+
String.format("current type is not support for data type=%s", streamInfo.getDataType()));
60+
}
61+
5462
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/KvDataTypeOperator.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,26 @@
2020
import org.apache.inlong.common.enums.DataTypeEnum;
2121
import org.apache.inlong.common.pojo.sort.dataflow.dataType.DataTypeConfig;
2222
import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
23+
import org.apache.inlong.manager.common.exceptions.BusinessException;
2324
import org.apache.inlong.manager.common.util.CommonBeanUtils;
2425
import org.apache.inlong.manager.pojo.consume.BriefMQMessage.FieldInfo;
26+
import org.apache.inlong.manager.pojo.sink.SinkField;
2527
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
28+
import org.apache.inlong.manager.pojo.stream.StreamField;
2629
import org.apache.inlong.sdk.transform.decode.KvUtils;
30+
import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
31+
import org.apache.inlong.sdk.transform.encode.SinkEncoderFactory;
32+
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
33+
import org.apache.inlong.sdk.transform.pojo.MapSinkInfo;
34+
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
35+
import org.apache.inlong.sdk.transform.process.TransformProcessor;
36+
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
2737

2838
import lombok.extern.slf4j.Slf4j;
2939
import org.apache.commons.lang3.StringUtils;
3040
import org.springframework.stereotype.Service;
3141

42+
import java.util.ArrayList;
3243
import java.util.List;
3344
import java.util.Map;
3445

@@ -101,4 +112,54 @@ public DataTypeConfig getDataTypeConfig(InlongStreamInfo streamInfo) {
101112
kvConfig.setEscapeChar(escape);
102113
return kvConfig;
103114
}
115+
116+
@Override
117+
public Map<String, Object> parseTransform(InlongStreamInfo streamInfo, List<SinkField> fieldList,
118+
String transformSql,
119+
String data) {
120+
try {
121+
List<org.apache.inlong.sdk.transform.pojo.FieldInfo> srcFields = new ArrayList<>();
122+
List<org.apache.inlong.sdk.transform.pojo.FieldInfo> dstFields = new ArrayList<>();
123+
for (StreamField streamField : streamInfo.getFieldList()) {
124+
if (StringUtils.isNotBlank(streamField.getFieldName())) {
125+
srcFields.add(
126+
new org.apache.inlong.sdk.transform.pojo.FieldInfo(streamField.getFieldName(),
127+
TypeConverter.DefaultTypeConverter()));
128+
}
129+
}
130+
for (SinkField sinkField : fieldList) {
131+
String targetFieldName = sinkField.getFieldName();
132+
if (StringUtils.isNotBlank(targetFieldName)) {
133+
dstFields.add(new org.apache.inlong.sdk.transform.pojo.FieldInfo(targetFieldName));
134+
}
135+
}
136+
char separator = '&';
137+
if (StringUtils.isNotBlank(streamInfo.getDataSeparator())) {
138+
separator = (char) Integer.parseInt(streamInfo.getDataSeparator());
139+
}
140+
Character escape = null;
141+
if (StringUtils.isNotBlank(streamInfo.getDataEscapeChar())) {
142+
escape = streamInfo.getDataEscapeChar().charAt(0);
143+
}
144+
char kvSeparator = '=';
145+
if (StringUtils.isNotBlank(streamInfo.getKvSeparator())) {
146+
kvSeparator = (char) Integer.parseInt(streamInfo.getKvSeparator());
147+
}
148+
KvSourceInfo kvSourceInfo = new KvSourceInfo(streamInfo.getDataEncoding(), srcFields);
149+
kvSourceInfo.setEscapeChar(escape);
150+
kvSourceInfo.setEntryDelimiter(separator);
151+
kvSourceInfo.setKvDelimiter(kvSeparator);
152+
MapSinkInfo mapSinkInfo = new MapSinkInfo(streamInfo.getDataEncoding(), dstFields);
153+
TransformConfig config = new TransformConfig(transformSql);
154+
TransformProcessor<String, Map<String, Object>> processor = TransformProcessor
155+
.create(config, SourceDecoderFactory.createKvDecoder(kvSourceInfo),
156+
SinkEncoderFactory.createMapEncoder(mapSinkInfo));
157+
List<Map<String, Object>> result = processor.transform(data);
158+
log.info("success parse transform sql result={}", result);
159+
return result.get(0);
160+
} catch (Exception e) {
161+
log.error("parse transform sql failed", e);
162+
throw new BusinessException("parse transform sql failed" + e.getMessage());
163+
}
164+
}
104165
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
3030
import org.apache.inlong.manager.pojo.sink.SinkRequest;
3131
import org.apache.inlong.manager.pojo.sink.StreamSink;
32+
import org.apache.inlong.manager.pojo.sink.TransformParseRequest;
3233
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
3334
import org.apache.inlong.manager.pojo.user.UserInfo;
3435

@@ -247,4 +248,13 @@ public interface StreamSinkService {
247248
* @return list of sink field
248249
*/
249250
List<SinkField> parseFields(ParseFieldRequest parseFieldRequest);
251+
252+
/**
253+
* Parse transform sql for data
254+
*
255+
* @param request the request for parse transform
256+
* @return result of parse result
257+
*/
258+
Map<String, Object> parseTransform(TransformParseRequest request);
259+
250260
}

0 commit comments

Comments
 (0)