Skip to content

Commit 8a54dc4

Browse files
gosonzhanggosonzhang
andauthored
[INLONG-11863][SDK] Enhance Event Attribute Validation and Decoding Logic in SDK (#11864)
Co-authored-by: gosonzhang <gosonzhang@tencent.com>
1 parent 12c2998 commit 8a54dc4

11 files changed

Lines changed: 108 additions & 43 deletions

File tree

inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ private boolean removeClusterIdSender(BaseSender msgSender, Map<String, BaseSend
324324
private int releaseAllGroupIdSenders(Map<String, BaseSender> senderMap) {
325325
int totalSenderCnt = 0;
326326
for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
327-
if (entry == null || entry.getValue() == null) {
327+
if (entry == null || entry.getKey() == null || entry.getValue() == null) {
328328
continue;
329329
}
330330
try {

inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,12 @@ protected EventInfo(String groupId, String streamId, long dtMs, Long auditId, St
7373
// attrs
7474
if (attrs != null && !attrs.isEmpty()) {
7575
for (Map.Entry<String, String> entry : attrs.entrySet()) {
76-
if (StringUtils.isBlank(entry.getKey())) {
76+
if (entry == null
77+
|| StringUtils.isBlank(entry.getKey())
78+
|| entry.getValue() == null) {
7779
continue;
7880
}
79-
innSetAttr(entry.getKey().trim(), entry.getValue());
81+
innSetAttr(entry.getKey().trim(), entry.getValue().trim());
8082
}
8183
}
8284
if (auditId != null && auditId != -1L) {
@@ -129,20 +131,16 @@ protected void innSetAttr(String key, String value) throws ProxyEventException {
129131
+ AttributeConstants.KEY_VALUE_SEPARATOR + " or "
130132
+ AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
131133
}
132-
String valValue = value;
133-
if (valValue != null) {
134-
valValue = valValue.trim();
135-
if (valValue.contains(AttributeConstants.SEPARATOR)
136-
|| valValue.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
137-
if (exceptCnt.shouldPrint()) {
138-
logger.warn(String.format("Attribute value(%s) include reserved word(%s or %s)",
139-
valValue, AttributeConstants.KEY_VALUE_SEPARATOR, AttributeConstants.KEY_VALUE_SEPARATOR));
140-
}
141-
throw new ProxyEventException("Attribute value(" + valValue + ") include reserved word("
142-
+ AttributeConstants.KEY_VALUE_SEPARATOR + " or "
143-
+ AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
134+
if (value.contains(AttributeConstants.SEPARATOR)
135+
|| value.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
136+
if (exceptCnt.shouldPrint()) {
137+
logger.warn(String.format("Attribute value(%s) include reserved word(%s or %s)",
138+
value, AttributeConstants.KEY_VALUE_SEPARATOR, AttributeConstants.KEY_VALUE_SEPARATOR));
144139
}
140+
throw new ProxyEventException("Attribute value(" + value + ") include reserved word("
141+
+ AttributeConstants.KEY_VALUE_SEPARATOR + " or "
142+
+ AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
145143
}
146-
this.attrs.put(key, valValue);
144+
this.attrs.put(key, value);
147145
}
148146
}

inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ public void getAndResetValue(StringBuilder strBuff) {
5353
long curCnt = 0;
5454
strBuff.append("\"ms\":{\"errT\":{");
5555
for (Map.Entry<Integer, LongAdder> entry : syncErrInfo.entrySet()) {
56+
if (entry == null
57+
|| entry.getKey() == null
58+
|| entry.getValue() == null) {
59+
continue;
60+
}
5661
if (curCnt++ > 0) {
5762
strBuff.append(",");
5863
}

inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,11 @@ public void getAndResetValue(StringBuilder strBuff) {
421421
metaSyncInfo.getAndResetValue(strBuff);
422422
strBuff.append(",\"tr\":[");
423423
for (Map.Entry<String, TrafficInfo> entry : trafficMap.entrySet()) {
424+
if (entry == null
425+
|| entry.getKey() == null
426+
|| entry.getValue() == null) {
427+
continue;
428+
}
424429
if (count++ > 0) {
425430
strBuff.append(",");
426431
}
@@ -429,6 +434,11 @@ public void getAndResetValue(StringBuilder strBuff) {
429434
strBuff.append("],\"errs\":{");
430435
count = 0;
431436
for (Map.Entry<Integer, LongAdder> entry : errCodeMap.entrySet()) {
437+
if (entry == null
438+
|| entry.getKey() == null
439+
|| entry.getValue() == null) {
440+
continue;
441+
}
432442
if (count++ > 0) {
433443
strBuff.append(",");
434444
}

inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ public void getAndResetValue(StringBuilder strBuff) {
7878
long bucketCnt = 0;
7979
strBuff.append("\"").append(name).append("\":{\"bkts\":{");
8080
for (Map.Entry<String, LongAdder> entry : sendTimeBucketT.entrySet()) {
81+
if (entry == null
82+
|| entry.getKey() == null
83+
|| entry.getValue() == null) {
84+
continue;
85+
}
8186
if (bucketCnt++ > 0) {
8287
strBuff.append(",");
8388
}

inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,22 +275,22 @@ public boolean reportEvent(SendQos sendQos, TcpNettyClient client,
275275
timerObj.newTimeout(new TimeoutTask(encObject.getMessageId()),
276276
tcpConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS));
277277
if (!client.write(clientTerm, encObject, procResult)) {
278-
Timeout timeout = reqTimeouts.remove(encObject.getMessageId());
278+
Timeout timeout = reqTimeouts.remove(newFuture.getMessageId());
279279
if (timeout != null) {
280280
timeout.cancel();
281281
}
282-
rmvMsgStubInfo(encObject.getMessageId());
282+
rmvMsgStubInfo(newFuture.getMessageId());
283283
}
284284
return procResult.isSuccess();
285285
} else {
286286
// process sync report
287287
if (!client.write(clientTerm, encObject, procResult)) {
288-
rmvMsgStubInfo(encObject.getMessageId());
288+
rmvMsgStubInfo(newFuture.getMessageId());
289289
return false;
290290
}
291291
boolean retValue = newFuture.get(procResult,
292292
tcpConfig.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
293-
if (rmvMsgStubInfo(encObject.getMessageId())) {
293+
if (rmvMsgStubInfo(newFuture.getMessageId())) {
294294
if (procResult.getErrCode() == ErrorCode.SEND_WAIT_TIMEOUT.getErrCode()) {
295295
client.setBusy(clientTerm);
296296
}

inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/DecodeObject.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
2424
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
2525

26-
import com.google.common.base.Splitter;
2726
import org.apache.commons.lang3.StringUtils;
2827

2928
import java.util.HashMap;
@@ -36,10 +35,6 @@
3635
*/
3736
public class DecodeObject {
3837

39-
private static final Splitter.MapSplitter MAP_SPLITTER =
40-
Splitter.on(AttributeConstants.SEPARATOR).trimResults()
41-
.withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
42-
4338
private final MsgType msgType;
4439
private int messageId;
4540
private String dpIp;
@@ -87,12 +82,30 @@ private void handleAttr(String attributes) {
8782
this.procResult = new ProcessResult(ErrorCode.OK);
8883
return;
8984
}
90-
retAttr = new HashMap<>(MAP_SPLITTER.split(attributes));
85+
// decode attribute string
86+
this.retAttr = new HashMap<>();
87+
String[] keyValSet = attributes.split(AttributeConstants.SEPARATOR);
88+
for (String keyVal : keyValSet) {
89+
if (StringUtils.isBlank(keyVal)) {
90+
continue;
91+
}
92+
String[] keyValSplit = keyVal.split(AttributeConstants.KEY_VALUE_SEPARATOR);
93+
if (keyValSplit.length == 1) {
94+
if (StringUtils.isBlank(keyValSplit[0])) {
95+
continue;
96+
}
97+
retAttr.put(keyValSplit[0].trim(), "");
98+
} else {
99+
if (StringUtils.isBlank(keyValSplit[0]) || keyValSplit[1] == null) {
100+
continue;
101+
}
102+
retAttr.put(keyValSplit[0].trim(), keyValSplit[1].trim());
103+
}
104+
}
91105
if (retAttr.containsKey(AttributeConstants.MESSAGE_ID)) {
92106
this.messageId = Integer.parseInt(retAttr.get(AttributeConstants.MESSAGE_ID));
93107
}
94108
dpIp = retAttr.get(AttributeConstants.MESSAGE_DP_IP);
95-
96109
String errCode = retAttr.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
97110
// errCode is empty or equals 0 -> success
98111
if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) {

inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.inlong.common.msg.MsgType;
2222

2323
import com.google.common.base.Joiner;
24+
import org.apache.commons.lang3.StringUtils;
2425

2526
import java.nio.charset.StandardCharsets;
2627
import java.util.HashMap;
@@ -86,10 +87,10 @@ public void setAttrInfo(int intMsgType, boolean isCompress, byte[] aesKey, Map<S
8687
this.aesKey = aesKey;
8788
if (tgtAttrs != null && !tgtAttrs.isEmpty()) {
8889
for (Map.Entry<String, String> entry : tgtAttrs.entrySet()) {
89-
if (entry == null || entry.getKey() == null) {
90+
if (entry == null || StringUtils.isBlank(entry.getKey()) || entry.getValue() == null) {
9091
continue;
9192
}
92-
this.attrMap.put(entry.getKey(), entry.getValue());
93+
this.attrMap.put(entry.getKey().trim(), entry.getValue().trim());
9394
}
9495
String preAttrStr = mapJoiner.join(this.attrMap);
9596
this.attrData = preAttrStr.getBytes(StandardCharsets.UTF_8);

inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,12 @@ public List<byte[]> getBodyList() {
7979

8080
public void setAttr(String key, String value) throws ProxyEventException {
8181
if (StringUtils.isBlank(key)) {
82-
throw new ProxyEventException("Key is blank!");
82+
throw new ProxyEventException("Parameter key is blank!");
8383
}
84-
innSetAttr(key.trim(), value);
84+
if (value == null) {
85+
throw new ProxyEventException("Parameter value is null!");
86+
}
87+
innSetAttr(key.trim(), value.trim());
8588
}
8689

8790
@Override

inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -165,25 +165,25 @@ public static Map<String, String> getValidAttrs(Map<String, String> attrsMap) {
165165
if (attrsMap == null || attrsMap.isEmpty()) {
166166
return attrsMap;
167167
}
168+
String tmpKey;
168169
String tmpValue;
169170
Map<String, String> validAttrsMap = new HashMap<>();
170171
for (Map.Entry<String, String> entry : attrsMap.entrySet()) {
171-
if (StringUtils.isBlank(entry.getKey())
172-
|| entry.getKey().contains(AttributeConstants.SEPARATOR)
173-
|| entry.getKey().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
172+
if (entry == null
173+
|| StringUtils.isBlank(entry.getKey())
174+
|| entry.getValue() == null) {
174175
continue;
175176
}
176-
tmpValue = entry.getKey().trim();
177-
if (ProxyUtils.SdkReservedWords.contains(tmpValue)) {
177+
tmpKey = entry.getKey().trim();
178+
tmpValue = entry.getValue().trim();
179+
if (tmpKey.contains(AttributeConstants.SEPARATOR)
180+
|| tmpKey.contains(AttributeConstants.KEY_VALUE_SEPARATOR)
181+
|| ProxyUtils.SdkReservedWords.contains(tmpKey)
182+
|| tmpValue.contains(AttributeConstants.SEPARATOR)
183+
|| tmpValue.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
178184
continue;
179185
}
180-
if (entry.getValue() != null) {
181-
if (entry.getValue().contains(AttributeConstants.SEPARATOR)
182-
|| entry.getValue().contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
183-
continue;
184-
}
185-
}
186-
validAttrsMap.put(tmpValue, entry.getValue());
186+
validAttrsMap.put(tmpKey, tmpValue);
187187
}
188188
return validAttrsMap;
189189
}

0 commit comments

Comments
 (0)