Skip to content

Commit 3cf9685

Browse files
authored
[INLONG-12071][Manager] Fix the incorrect filtering logic that checks whether messages match (#12072)
* [INLONG-12071][Manager] Fix the filtering logic that incorrectly determines whether messages match * [INLONG-12071][Manager] Fix the compile warning issues
1 parent b49fb90 commit 3cf9685

6 files changed

Lines changed: 522 additions & 101 deletions

File tree

inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/BriefMQMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static class FieldInfo {
7272

7373
private String fieldName;
7474

75-
private String FieldValue;
75+
private String fieldValue;
7676

7777
}
7878

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/DeserializeOperator.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,27 +70,41 @@ default DeserializationConfig getDeserializationConfig(InlongStreamInfo streamIn
7070
streamInfo.getWrapType()));
7171
}
7272

73-
default Boolean checkIfFilter(QueryMessageRequest request, List<FieldInfo> streamFieldList) {
74-
if (StringUtils.isBlank(request.getFieldName()) || StringUtils.isBlank(request.getOperationType())
75-
|| StringUtils.isBlank(request.getTargetValue())) {
73+
/**
74+
* Check whether the message should be filtered by the filter condition
75+
*
76+
* @param request query message request
77+
* @param streamFieldList stream field list
78+
* @return true if the message should be filtered
79+
*/
80+
default boolean checkIfFilter(QueryMessageRequest request, List<FieldInfo> streamFieldList) {
81+
String fieldName = request.getFieldName();
82+
if (StringUtils.isAnyBlank(fieldName, request.getOperationType())) {
7683
return false;
7784
}
78-
boolean ifFilter = false;
85+
7986
FieldInfo fieldInfo = streamFieldList.stream()
80-
.filter(v -> Objects.equals(v.getFieldName(), request.getFieldName())).findFirst()
87+
.filter(field -> fieldName.equals(field.getFieldName()))
88+
.findFirst()
8189
.orElse(null);
82-
if (fieldInfo != null) {
83-
switch (request.getOperationType()) {
84-
case "=":
85-
ifFilter = !Objects.equals(request.getTargetValue(), fieldInfo.getFieldValue());
86-
break;
87-
case "!=":
88-
ifFilter = Objects.equals(request.getTargetValue(), fieldInfo.getFieldValue());
89-
break;
90-
case "like":
91-
ifFilter = fieldInfo.getFieldValue() != null
92-
&& !fieldInfo.getFieldValue().contains(request.getTargetValue());
93-
}
90+
if (fieldInfo == null) {
91+
return false;
92+
}
93+
94+
boolean ifFilter = false;
95+
String fieldValue = fieldInfo.getFieldValue();
96+
// support targetValue == blank string (null or "") or targetValue != blank string
97+
String targetValue = request.getTargetValue();
98+
switch (request.getOperationType()) {
99+
case "=":
100+
ifFilter = !Objects.equals(targetValue, fieldValue);
101+
break;
102+
case "!=":
103+
ifFilter = Objects.equals(targetValue, fieldValue);
104+
break;
105+
case "like":
106+
// fieldValue or targetValue is null, like operation is invalid, so this record should be filtered
107+
ifFilter = fieldValue == null || targetValue == null || !fieldValue.contains(targetValue);
94108
}
95109
return ifFilter;
96110
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/message/RawMsgDeserializeOperator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.springframework.stereotype.Service;
3636

3737
import java.nio.charset.Charset;
38-
import java.util.Collections;
3938
import java.util.List;
4039
import java.util.Map;
4140

@@ -65,6 +64,7 @@ public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, List<BriefMQM
6564
if (checkIfFilter(request, fieldList)) {
6665
return briefMQMessages;
6766
}
67+
6868
BriefMQMessage briefMQMessage = BriefMQMessage.builder()
6969
.id(index)
7070
.inlongGroupId(groupId)
@@ -75,12 +75,12 @@ public List<BriefMQMessage> decodeMsg(InlongStreamInfo streamInfo, List<BriefMQM
7575
.body(body)
7676
.fieldList(fieldList)
7777
.build();
78-
briefMQMessages.addAll(Collections.singletonList(briefMQMessage));
78+
briefMQMessages.add(briefMQMessage);
7979
return briefMQMessages;
8080
} catch (Exception e) {
81-
String errMsg = String.format("decode msg failed for groupId=%s, streamId=%s", groupId, streamId);
81+
String errMsg = String.format("Failed to decode msg for groupId=%s, streamId=%s", groupId, streamId);
8282
log.error(errMsg, e);
83-
throw new BusinessException(errMsg);
83+
throw new BusinessException(errMsg + ", message: " + e.getMessage());
8484
}
8585
}
8686

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -448,18 +448,21 @@ private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, Pulsa
448448
int index, InlongStreamInfo streamInfo, int messagePosition, QueryMessageRequest request) {
449449
List<BriefMQMessage> briefMQMessages = new ArrayList<>();
450450
try {
451-
ResponseEntity<byte[]> httpResponse =
452-
PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo, topicPartition, "latest",
453-
messagePosition);
451+
ResponseEntity<byte[]> httpResponse = PulsarUtils.examineMessage(restTemplate, pulsarClusterInfo,
452+
topicPartition, "latest", messagePosition);
454453
PulsarMessageInfo messageInfo = PulsarUtils.getMessageFromHttpResponse(httpResponse, topicPartition);
454+
if (messageInfo == null) {
455+
return briefMQMessages;
456+
}
457+
455458
Map<String, String> headers = messageInfo.getProperties();
456459
if (headers == null) {
457460
headers = new HashMap<>();
458461
}
459462
MessageWrapType messageWrapType = MessageWrapType.forType(streamInfo.getWrapType());
460-
if (headers.get(InlongConstants.MSG_ENCODE_VER) != null) {
461-
messageWrapType =
462-
MessageWrapType.valueOf(Integer.parseInt(headers.get(InlongConstants.MSG_ENCODE_VER)));
463+
String encodeType = headers.get(InlongConstants.MSG_ENCODE_VER);
464+
if (StringUtils.isNotBlank(encodeType)) {
465+
messageWrapType = MessageWrapType.valueOf(Integer.parseInt(encodeType));
463466
}
464467
DeserializeOperator deserializeOperator = deserializeOperatorFactory.getInstance(messageWrapType);
465468
deserializeOperator.decodeMsg(streamInfo, briefMQMessages, messageInfo.getBody(), headers, index, request);

0 commit comments

Comments
 (0)