Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.inlong.sdk.transform.process.Context;

import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.DynamicMessage;
Expand All @@ -29,10 +30,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Base64;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand Down Expand Up @@ -71,10 +75,13 @@ public PbSourceDecoder(PbSourceInfo sourceInfo) {
// parse description
byte[] protoBytes = Base64.getDecoder().decode(protoDescription);
DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(protoBytes);
DescriptorProtos.FileDescriptorProto fileDesc = descriptorSet.getFile(0);
// format proto
DescriptorProtos.FileDescriptorProto fileDesc = formatFileDescription(descriptorSet, rootMessageType);
Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(fileDesc,
new Descriptors.FileDescriptor[]{});
this.rootDesc = fileDescriptor.findMessageTypeByName(rootMessageType);
// find root
String fullRootMessageType = formatTypeName(findRootType(descriptorSet, rootMessageType));
this.rootDesc = fileDescriptor.findMessageTypeByName(fullRootMessageType);
// child
this.rowsNodePath = sourceInfo.getRowsNodePath();
this.childNodes = PbNode.parseNodePath(rootDesc, rowsNodePath);
Expand Down Expand Up @@ -156,4 +163,140 @@ public SourceData decode(String input, Context context) {
byte[] srcBytes = Base64.getDecoder().decode(input);
return decode(srcBytes, context);
}

private static DescriptorProtos.FileDescriptorProto formatFileDescription(
DescriptorProtos.FileDescriptorSet descriptorSet,
String rootMessageType) throws DescriptorValidationException, IOException {
// load
Map<String, DescriptorProtos.DescriptorProto> messageTypeMap = new ConcurrentHashMap<>();
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap = new ConcurrentHashMap<>();
loadTypeFromFileSet(descriptorSet, messageTypeMap, enumTypeMap);
// find root type
String rootTypeName = findRootType(descriptorSet, rootMessageType);
// build root type
DescriptorProtos.FileDescriptorProto.Builder newFileBuilder = DescriptorProtos.FileDescriptorProto.newBuilder();
Set<String> addedTypeNames = new HashSet<>();
formatMessageTypeDescription(rootTypeName, newFileBuilder, messageTypeMap, enumTypeMap, addedTypeNames);
// DescriptorProtos.FileDescriptorSet newDescriptorSet = DescriptorProtos.FileDescriptorSet.newBuilder()
// .addFile(newFileBuilder).build();
// byte[] newProtoBytes = newDescriptorSet.toByteArray();
return newFileBuilder.build();
}

private static void formatMessageTypeDescription(String typeName,
DescriptorProtos.FileDescriptorProto.Builder newFileBuilder,
Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap,
Set<String> addedTypeNames) {
DescriptorProtos.DescriptorProto typeDesc = messageTypeMap.get(typeName);
if (typeDesc == null) {
return;
}
DescriptorProtos.DescriptorProto.Builder typeBuilder = DescriptorProtos.DescriptorProto.newBuilder();
String newTypeName = formatTypeName(typeName);
typeBuilder.setName(newTypeName);
for (DescriptorProtos.FieldDescriptorProto fieldDesc : typeDesc.getFieldList()) {
DescriptorProtos.FieldDescriptorProto.Builder fieldBuilder = DescriptorProtos.FieldDescriptorProto
.newBuilder().mergeFrom(fieldDesc);
if (fieldDesc.getType().equals(FieldDescriptorProto.Type.TYPE_MESSAGE)) {
String fieldTypeName = fieldDesc.getTypeName();
String newFieldTypeName = formatTypeName(fieldTypeName);
fieldBuilder.setTypeName(newFieldTypeName);
if (!addedTypeNames.contains(newFieldTypeName)) {
addedTypeNames.add(newFieldTypeName);
formatMessageTypeDescription(fieldTypeName, newFileBuilder, messageTypeMap, enumTypeMap,
addedTypeNames);
}
} else if (fieldDesc.getType().equals(FieldDescriptorProto.Type.TYPE_ENUM)) {
String fieldTypeName = fieldDesc.getTypeName();
String newFieldTypeName = formatTypeName(fieldTypeName);
fieldBuilder.setTypeName(newFieldTypeName);
if (!addedTypeNames.contains(newFieldTypeName)) {
addedTypeNames.add(newFieldTypeName);
formatEnumTypeDescription(fieldTypeName, newFileBuilder, enumTypeMap, addedTypeNames);
}
}
typeBuilder.addField(fieldBuilder);
}
newFileBuilder.addMessageType(typeBuilder.build());
}

private static void formatEnumTypeDescription(String typeName,
DescriptorProtos.FileDescriptorProto.Builder newFileBuilder,
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap,
Set<String> addedTypeNames) {
DescriptorProtos.EnumDescriptorProto typeDesc = enumTypeMap.get(typeName);
if (typeDesc == null) {
return;
}
DescriptorProtos.EnumDescriptorProto.Builder typeBuilder = DescriptorProtos.EnumDescriptorProto.newBuilder()
.mergeFrom(typeDesc);
String newTypeName = formatTypeName(typeName);
typeBuilder.setName(newTypeName);
newFileBuilder.addEnumType(typeBuilder.build());
}

private static String formatTypeName(String typeName) {
return typeName.substring(1).replace('.', '_');
}

private static String findRootType(DescriptorProtos.FileDescriptorSet descriptorSet,
String rootMessageType) {
for (DescriptorProtos.FileDescriptorProto fileDesc : descriptorSet.getFileList()) {
String packageName = fileDesc.getPackage();
String packagePrefix = "";
if (!StringUtils.isBlank(packageName)) {
packagePrefix = "." + packageName;
}
// message type
for (DescriptorProtos.DescriptorProto typeDesc : fileDesc.getMessageTypeList()) {
if (StringUtils.equals(typeDesc.getName(), rootMessageType)) {
String fullTypeName = packagePrefix + "." + typeDesc.getName();
return fullTypeName;
}
}
}
return null;
}

private static void loadTypeFromFileSet(DescriptorProtos.FileDescriptorSet descriptorSet,
Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
for (DescriptorProtos.FileDescriptorProto fileDesc : descriptorSet.getFileList()) {
loadTypeFromFile(fileDesc, messageTypeMap, enumTypeMap);
}
}

private static void loadTypeFromFile(DescriptorProtos.FileDescriptorProto fileDesc,
Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
String packageName = fileDesc.getPackage();
String packagePrefix = "";
if (!StringUtils.isBlank(packageName)) {
packagePrefix = "." + packageName;
}
for (DescriptorProtos.DescriptorProto typeDesc : fileDesc.getMessageTypeList()) {
loadTypeFromMessage(fileDesc, packagePrefix, typeDesc, messageTypeMap, enumTypeMap);
}
for (DescriptorProtos.EnumDescriptorProto typeDesc : fileDesc.getEnumTypeList()) {
String enumTypeName = packagePrefix + "." + typeDesc.getName();
enumTypeMap.putIfAbsent(enumTypeName, typeDesc);
}
}

private static void loadTypeFromMessage(DescriptorProtos.FileDescriptorProto fileDesc,
String parentTypeName,
DescriptorProtos.DescriptorProto typeDesc,
Map<String, DescriptorProtos.DescriptorProto> messageTypeMap,
Map<String, DescriptorProtos.EnumDescriptorProto> enumTypeMap) {
String fullTypeName = parentTypeName + "." + typeDesc.getName();
messageTypeMap.putIfAbsent(fullTypeName, typeDesc);
for (DescriptorProtos.DescriptorProto nestedTypeDesc : typeDesc.getNestedTypeList()) {
loadTypeFromMessage(fileDesc, fullTypeName, nestedTypeDesc, messageTypeMap, enumTypeMap);
}
for (DescriptorProtos.EnumDescriptorProto enumTypeDesc : typeDesc.getEnumTypeList()) {
String enumTypeName = fullTypeName + "." + enumTypeDesc.getName();
enumTypeMap.putIfAbsent(enumTypeName, enumTypeDesc);
}
}
}
Loading