Skip to content

Commit 4517d37

Browse files
authored
[INLONG-11873][SDK] Support parsing field values from extended parameters (#11874)
1 parent f272ccf commit 4517d37

23 files changed

Lines changed: 164 additions & 53 deletions
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.sdk.transform.decode;
19+
20+
import org.apache.inlong.sdk.transform.process.Context;
21+
22+
/**
23+
* AbstractSourceData
24+
*
25+
*/
26+
public abstract class AbstractSourceData implements SourceData {
27+
28+
public static final String CTX_KEY = "$ctx.";
29+
30+
protected Context context;
31+
32+
protected boolean isContextField(String fieldName) {
33+
return fieldName.startsWith(CTX_KEY);
34+
}
35+
36+
protected String getContextField(String fieldName) {
37+
if (context == null) {
38+
return "";
39+
}
40+
if (!isContextField(fieldName)) {
41+
return null;
42+
}
43+
String realFieldName = fieldName.substring(CTX_KEY.length());
44+
String fieldValue = this.context.getStringOrDefault(realFieldName, "");
45+
return fieldValue;
46+
}
47+
}

inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceData.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.inlong.sdk.transform.decode;
1919

20+
import org.apache.inlong.sdk.transform.process.Context;
21+
2022
import org.apache.avro.Schema;
2123
import org.apache.avro.Schema.Type;
2224
import org.apache.avro.generic.GenericRecord;
@@ -29,7 +31,7 @@
2931
import java.util.List;
3032
import java.util.Map;
3133

32-
public class AvroSourceData implements SourceData {
34+
public class AvroSourceData extends AbstractSourceData {
3335

3436
public static final String ROOT_KEY = "$root";
3537

@@ -41,10 +43,11 @@ public class AvroSourceData implements SourceData {
4143

4244
private Charset srcCharset;
4345

44-
public AvroSourceData(GenericRecord root, List<GenericRecord> childRoot, Charset srcCharset) {
46+
public AvroSourceData(GenericRecord root, List<GenericRecord> childRoot, Charset srcCharset, Context context) {
4547
this.root = root;
4648
this.childRoot = childRoot;
4749
this.srcCharset = srcCharset;
50+
this.context = context;
4851
}
4952

5053
@Override
@@ -59,6 +62,9 @@ public int getRowCount() {
5962
@Override
6063
public String getField(int rowNum, String fieldName) {
6164
try {
65+
if (isContextField(fieldName)) {
66+
return getContextField(fieldName);
67+
}
6268
List<AvroNode> childNodes = new ArrayList<>();
6369
String[] nodeStrings = fieldName.split("\\.");
6470
for (String nodeString : nodeStrings) {

inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public SourceData decode(byte[] srcBytes, Context context) {
7474
GenericRecord root = dataFileStream.next();
7575
List<GenericRecord> childRoot = null;
7676
if (CollectionUtils.isEmpty(childNodes)) {
77-
return new AvroSourceData(root, null, srcCharset);
77+
return new AvroSourceData(root, null, srcCharset, context);
7878
}
7979

8080
Object current = root;
@@ -83,12 +83,12 @@ public SourceData decode(byte[] srcBytes, Context context) {
8383
for (AvroNode node : childNodes) {
8484
if (curSchema.getType() != Type.RECORD) {
8585
// error data
86-
return new AvroSourceData(root, null, srcCharset);
86+
return new AvroSourceData(root, null, srcCharset, context);
8787
}
8888
Object newElement = ((GenericRecord) current).get(node.getName());
8989
if (newElement == null) {
9090
// error data
91-
return new AvroSourceData(root, null, srcCharset);
91+
return new AvroSourceData(root, null, srcCharset, context);
9292
}
9393
// node is not array
9494
if (!node.isArray()) {
@@ -100,15 +100,15 @@ public SourceData decode(byte[] srcBytes, Context context) {
100100
current = getElementFromArray(node, newElement, curSchema);
101101
if (current == null) {
102102
// error data
103-
return new AvroSourceData(root, null, srcCharset);
103+
return new AvroSourceData(root, null, srcCharset, context);
104104
}
105105
}
106106
if (curSchema.getType() != Type.ARRAY) {
107107
// error data
108-
return new AvroSourceData(root, null, srcCharset);
108+
return new AvroSourceData(root, null, srcCharset, context);
109109
}
110110
childRoot = (List<GenericRecord>) current;
111-
return new AvroSourceData(root, childRoot, srcCharset);
111+
return new AvroSourceData(root, childRoot, srcCharset, context);
112112
} catch (Exception e) {
113113
LOG.error(e.getMessage(), e);
114114
return null;

inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceData.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.inlong.sdk.transform.decode;
1919

20+
import org.apache.inlong.sdk.transform.process.Context;
21+
2022
import com.google.gson.JsonArray;
2123
import com.google.gson.JsonObject;
2224

@@ -25,7 +27,7 @@
2527
*/
2628
public class BsonSourceData extends JsonSourceData {
2729

28-
public BsonSourceData(JsonObject root, JsonArray childRoot) {
29-
super(root, childRoot);
30+
public BsonSourceData(JsonObject root, JsonArray childRoot, Context context) {
31+
super(root, childRoot, context);
3032
}
3133
}

inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceData.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.inlong.sdk.transform.decode;
1919

20+
import org.apache.inlong.sdk.transform.process.Context;
21+
2022
import java.util.ArrayList;
2123
import java.util.HashMap;
2224
import java.util.List;
@@ -26,13 +28,14 @@
2628
* CsvSourceData
2729
*
2830
*/
29-
public class CsvSourceData implements SourceData {
31+
public class CsvSourceData extends AbstractSourceData {
3032

3133
private List<Map<String, Object>> rows = new ArrayList<>();
3234

3335
private Map<String, Object> currentRow;
3436

35-
public CsvSourceData() {
37+
public CsvSourceData(Context context) {
38+
this.context = context;
3639
}
3740

3841
public void putField(String fieldName, Object fieldValue) {
@@ -54,6 +57,9 @@ public Object getField(int rowNum, String fieldName) {
5457
if (rowNum >= this.rows.size()) {
5558
return null;
5659
}
60+
if (isContextField(fieldName)) {
61+
return getContextField(fieldName);
62+
}
5763
Map<String, Object> targetRow = this.rows.get(rowNum);
5864
return targetRow.get(fieldName);
5965
}

inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public SourceData decode(byte[] srcBytes, Context context) {
6060
@Override
6161
public SourceData decode(String srcString, Context context) {
6262
String[][] rowValues = SplitUtils.splitCsv(srcString, delimiter, escapeChar, '\"', '\n', true);
63-
CsvSourceData sourceData = new CsvSourceData();
63+
CsvSourceData sourceData = new CsvSourceData(context);
6464
for (int i = 0; i < rowValues.length; i++) {
6565
String[] fieldValues = rowValues[i];
6666
sourceData.addRow();

inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceData.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.inlong.sdk.transform.decode;
1919

20+
import org.apache.inlong.sdk.transform.process.Context;
21+
2022
import com.google.gson.JsonArray;
2123
import com.google.gson.JsonElement;
2224
import com.google.gson.JsonObject;
@@ -29,7 +31,7 @@
2931
* JsonSourceData
3032
*
3133
*/
32-
public class JsonSourceData implements SourceData {
34+
public class JsonSourceData extends AbstractSourceData {
3335

3436
public static final String ROOT_KEY = "$root";
3537

@@ -43,10 +45,12 @@ public class JsonSourceData implements SourceData {
4345
* Constructor
4446
* @param root
4547
* @param childRoot
48+
* @param context
4649
*/
47-
public JsonSourceData(JsonObject root, JsonArray childRoot) {
50+
public JsonSourceData(JsonObject root, JsonArray childRoot, Context context) {
4851
this.root = root;
4952
this.childRoot = childRoot;
53+
this.context = context;
5054
}
5155

5256
/**
@@ -71,6 +75,10 @@ public int getRowCount() {
7175
@Override
7276
public String getField(int rowNum, String fieldName) {
7377
try {
78+
if (isContextField(fieldName)) {
79+
return getContextField(fieldName);
80+
}
81+
// split field name
7482
List<JsonNode> childNodes = new ArrayList<>();
7583
String[] nodeStrings = fieldName.split("\\.");
7684
for (String nodeString : nodeStrings) {

inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,18 +86,18 @@ public SourceData decode(String srcString, Context context) {
8686
JsonObject root = gson.fromJson(srcString, JsonObject.class);
8787
JsonArray childRoot = null;
8888
if (CollectionUtils.isEmpty(childNodes)) {
89-
return new JsonSourceData(root, null);
89+
return new JsonSourceData(root, null, context);
9090
}
9191
JsonElement current = root;
9292
for (JsonNode node : childNodes) {
9393
if (!current.isJsonObject()) {
9494
// error data
95-
return new JsonSourceData(root, null);
95+
return new JsonSourceData(root, null, context);
9696
}
9797
JsonElement newElement = current.getAsJsonObject().get(node.getName());
9898
if (newElement == null) {
9999
// error data
100-
return new JsonSourceData(root, null);
100+
return new JsonSourceData(root, null, context);
101101
}
102102
// node is not array
103103
if (!node.isArray()) {
@@ -108,15 +108,15 @@ public SourceData decode(String srcString, Context context) {
108108
current = getElementFromArray(node, newElement);
109109
if (current == null) {
110110
// error data
111-
return new JsonSourceData(root, null);
111+
return new JsonSourceData(root, null, context);
112112
}
113113
}
114114
if (!current.isJsonArray()) {
115115
// error data
116-
return new JsonSourceData(root, null);
116+
return new JsonSourceData(root, null, context);
117117
}
118118
childRoot = current.getAsJsonArray();
119-
return new JsonSourceData(root, childRoot);
119+
return new JsonSourceData(root, childRoot, context);
120120
}
121121

122122
private JsonElement getElementFromArray(JsonNode node, JsonElement curElement) {

inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceData.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.inlong.sdk.transform.decode;
1919

20+
import org.apache.inlong.sdk.transform.process.Context;
21+
2022
import java.util.ArrayList;
2123
import java.util.HashMap;
2224
import java.util.List;
@@ -25,13 +27,14 @@
2527
/**
2628
* KvSourceData
2729
*/
28-
public class KvSourceData implements SourceData {
30+
public class KvSourceData extends AbstractSourceData {
2931

3032
private List<Map<String, String>> rows = new ArrayList<>();
3133

3234
private Map<String, String> currentRow;
3335

34-
public KvSourceData() {
36+
public KvSourceData(Context context) {
37+
this.context = context;
3538
}
3639

3740
public void putField(String fieldName, String fieldValue) {
@@ -53,6 +56,9 @@ public String getField(int rowNum, String fieldName) {
5356
if (rowNum >= this.rows.size()) {
5457
return null;
5558
}
59+
if (isContextField(fieldName)) {
60+
return getContextField(fieldName);
61+
}
5662
Map<String, String> targetRow = this.rows.get(rowNum);
5763
return targetRow.get(fieldName);
5864
}

inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public SourceData decode(byte[] srcBytes, Context context) {
7575
public SourceData decode(String srcString, Context context) {
7676
List<Map<String, String>> rowValues = KvUtils.splitKv(srcString, entryDelimiter, kvDelimiter,
7777
escapeChar, quoteChar, lineDelimiter);
78-
KvSourceData sourceData = new KvSourceData();
78+
KvSourceData sourceData = new KvSourceData(context);
7979
if (CollectionUtils.isEmpty(fields)) {
8080
for (Map<String, String> row : rowValues) {
8181
sourceData.addRow();

0 commit comments

Comments
 (0)