Skip to content

Commit c87fb8a

Browse files
authored
[INLONG-12108][SDK] TransformSDK supports decoding and transformation of ProtoBuf description files with multiple Proto files and multi-level NestedType (#12110)
* [INLONG-12108][SDK] TransformSDK supports decoding and transformation of ProtoBuf description files with multiple Proto files and multi-level NestedType * support that root message type has package name
1 parent 05ee935 commit c87fb8a

1 file changed

Lines changed: 145 additions & 2 deletions

File tree

inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java

Lines changed: 145 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.inlong.sdk.transform.process.Context;
2222

2323
import com.google.protobuf.DescriptorProtos;
24+
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
2425
import com.google.protobuf.Descriptors;
2526
import com.google.protobuf.Descriptors.DescriptorValidationException;
2627
import com.google.protobuf.DynamicMessage;
@@ -29,10 +30,13 @@
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
3132

33+
import java.io.IOException;
3234
import java.nio.charset.Charset;
3335
import java.util.Base64;
36+
import java.util.HashSet;
3437
import java.util.List;
3538
import java.util.Map;
39+
import java.util.Set;
3640
import java.util.concurrent.ConcurrentHashMap;
3741

3842
/**
@@ -71,10 +75,13 @@ public PbSourceDecoder(PbSourceInfo sourceInfo) {
7175
// parse description
7276
byte[] protoBytes = Base64.getDecoder().decode(protoDescription);
7377
DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(protoBytes);
74-
DescriptorProtos.FileDescriptorProto fileDesc = descriptorSet.getFile(0);
78+
// format proto
79+
DescriptorProtos.FileDescriptorProto fileDesc = formatFileDescription(descriptorSet, rootMessageType);
7580
Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(fileDesc,
7681
new Descriptors.FileDescriptor[]{});
77-
this.rootDesc = fileDescriptor.findMessageTypeByName(rootMessageType);
82+
// find root
83+
String fullRootMessageType = formatTypeName(findRootType(descriptorSet, rootMessageType));
84+
this.rootDesc = fileDescriptor.findMessageTypeByName(fullRootMessageType);
7885
// child
7986
this.rowsNodePath = sourceInfo.getRowsNodePath();
8087
this.childNodes = PbNode.parseNodePath(rootDesc, rowsNodePath);
@@ -156,4 +163,140 @@ public SourceData decode(String input, Context context) {
156163
byte[] srcBytes = Base64.getDecoder().decode(input);
157164
return decode(srcBytes, context);
158165
}
166+
167+
private static DescriptorProtos.FileDescriptorProto formatFileDescription(
168+
DescriptorProtos.FileDescriptorSet descriptorSet,
169+
String rootMessageType) throws DescriptorValidationException, IOException {
170+
// load
171+
Map<String, DescriptorProtos.DescriptorProto> messageTypeMap = new ConcurrentHashMap<>();
172+
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap = new ConcurrentHashMap<>();
173+
loadTypeFromFileSet(descriptorSet, messageTypeMap, enumTypeMap);
174+
// find root type
175+
String rootTypeName = findRootType(descriptorSet, rootMessageType);
176+
// build root type
177+
DescriptorProtos.FileDescriptorProto.Builder newFileBuilder = DescriptorProtos.FileDescriptorProto.newBuilder();
178+
Set<String> addedTypeNames = new HashSet<>();
179+
formatMessageTypeDescription(rootTypeName, newFileBuilder, messageTypeMap, enumTypeMap, addedTypeNames);
180+
// DescriptorProtos.FileDescriptorSet newDescriptorSet = DescriptorProtos.FileDescriptorSet.newBuilder()
181+
// .addFile(newFileBuilder).build();
182+
// byte[] newProtoBytes = newDescriptorSet.toByteArray();
183+
return newFileBuilder.build();
184+
}
185+
186+
private static void formatMessageTypeDescription(String typeName,
187+
DescriptorProtos.FileDescriptorProto.Builder newFileBuilder,
188+
Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
189+
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap,
190+
Set<String> addedTypeNames) {
191+
DescriptorProtos.DescriptorProto typeDesc = messageTypeMap.get(typeName);
192+
if (typeDesc == null) {
193+
return;
194+
}
195+
DescriptorProtos.DescriptorProto.Builder typeBuilder = DescriptorProtos.DescriptorProto.newBuilder();
196+
String newTypeName = formatTypeName(typeName);
197+
typeBuilder.setName(newTypeName);
198+
for (DescriptorProtos.FieldDescriptorProto fieldDesc : typeDesc.getFieldList()) {
199+
DescriptorProtos.FieldDescriptorProto.Builder fieldBuilder = DescriptorProtos.FieldDescriptorProto
200+
.newBuilder().mergeFrom(fieldDesc);
201+
if (fieldDesc.getType().equals(FieldDescriptorProto.Type.TYPE_MESSAGE)) {
202+
String fieldTypeName = fieldDesc.getTypeName();
203+
String newFieldTypeName = formatTypeName(fieldTypeName);
204+
fieldBuilder.setTypeName(newFieldTypeName);
205+
if (!addedTypeNames.contains(newFieldTypeName)) {
206+
addedTypeNames.add(newFieldTypeName);
207+
formatMessageTypeDescription(fieldTypeName, newFileBuilder, messageTypeMap, enumTypeMap,
208+
addedTypeNames);
209+
}
210+
} else if (fieldDesc.getType().equals(FieldDescriptorProto.Type.TYPE_ENUM)) {
211+
String fieldTypeName = fieldDesc.getTypeName();
212+
String newFieldTypeName = formatTypeName(fieldTypeName);
213+
fieldBuilder.setTypeName(newFieldTypeName);
214+
if (!addedTypeNames.contains(newFieldTypeName)) {
215+
addedTypeNames.add(newFieldTypeName);
216+
formatEnumTypeDescription(fieldTypeName, newFileBuilder, enumTypeMap, addedTypeNames);
217+
}
218+
}
219+
typeBuilder.addField(fieldBuilder);
220+
}
221+
newFileBuilder.addMessageType(typeBuilder.build());
222+
}
223+
224+
private static void formatEnumTypeDescription(String typeName,
225+
DescriptorProtos.FileDescriptorProto.Builder newFileBuilder,
226+
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap,
227+
Set<String> addedTypeNames) {
228+
DescriptorProtos.EnumDescriptorProto typeDesc = enumTypeMap.get(typeName);
229+
if (typeDesc == null) {
230+
return;
231+
}
232+
DescriptorProtos.EnumDescriptorProto.Builder typeBuilder = DescriptorProtos.EnumDescriptorProto.newBuilder()
233+
.mergeFrom(typeDesc);
234+
String newTypeName = formatTypeName(typeName);
235+
typeBuilder.setName(newTypeName);
236+
newFileBuilder.addEnumType(typeBuilder.build());
237+
}
238+
239+
private static String formatTypeName(String typeName) {
240+
return typeName.substring(1).replace('.', '_');
241+
}
242+
243+
private static String findRootType(DescriptorProtos.FileDescriptorSet descriptorSet,
244+
String rootMessageType) {
245+
for (DescriptorProtos.FileDescriptorProto fileDesc : descriptorSet.getFileList()) {
246+
String packageName = fileDesc.getPackage();
247+
String packagePrefix = "";
248+
if (!StringUtils.isBlank(packageName)) {
249+
packagePrefix = "." + packageName;
250+
}
251+
// message type
252+
for (DescriptorProtos.DescriptorProto typeDesc : fileDesc.getMessageTypeList()) {
253+
if (StringUtils.equals(typeDesc.getName(), rootMessageType)) {
254+
String fullTypeName = packagePrefix + "." + typeDesc.getName();
255+
return fullTypeName;
256+
}
257+
}
258+
}
259+
return null;
260+
}
261+
262+
private static void loadTypeFromFileSet(DescriptorProtos.FileDescriptorSet descriptorSet,
263+
Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
264+
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
265+
for (DescriptorProtos.FileDescriptorProto fileDesc : descriptorSet.getFileList()) {
266+
loadTypeFromFile(fileDesc, messageTypeMap, enumTypeMap);
267+
}
268+
}
269+
270+
private static void loadTypeFromFile(DescriptorProtos.FileDescriptorProto fileDesc,
271+
Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
272+
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
273+
String packageName = fileDesc.getPackage();
274+
String packagePrefix = "";
275+
if (!StringUtils.isBlank(packageName)) {
276+
packagePrefix = "." + packageName;
277+
}
278+
for (DescriptorProtos.DescriptorProto typeDesc : fileDesc.getMessageTypeList()) {
279+
loadTypeFromMessage(fileDesc, packagePrefix, typeDesc, messageTypeMap, enumTypeMap);
280+
}
281+
for (DescriptorProtos.EnumDescriptorProto typeDesc : fileDesc.getEnumTypeList()) {
282+
String enumTypeName = packagePrefix + "." + typeDesc.getName();
283+
enumTypeMap.putIfAbsent(enumTypeName, typeDesc);
284+
}
285+
}
286+
287+
private static void loadTypeFromMessage(DescriptorProtos.FileDescriptorProto fileDesc,
288+
String parentTypeName,
289+
DescriptorProtos.DescriptorProto typeDesc,
290+
Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
291+
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
292+
String fullTypeName = parentTypeName + "." + typeDesc.getName();
293+
messageTypeMap.putIfAbsent(fullTypeName, typeDesc);
294+
for (DescriptorProtos.DescriptorProto nestedTypeDesc : typeDesc.getNestedTypeList()) {
295+
loadTypeFromMessage(fileDesc, fullTypeName, nestedTypeDesc, messageTypeMap, enumTypeMap);
296+
}
297+
for (DescriptorProtos.EnumDescriptorProto enumTypeDesc : typeDesc.getEnumTypeList()) {
298+
String enumTypeName = fullTypeName + "." + enumTypeDesc.getName();
299+
enumTypeMap.putIfAbsent(enumTypeName, enumTypeDesc);
300+
}
301+
}
159302
}

0 commit comments

Comments
 (0)