Skip to content

Commit bd9ce03

Browse files
[INLONG-11815][Agent] Add a unified reporting point for events (#11816)
* [INLONG-11815][Agent] Add a unified reporting point for events * [INLONG-11815][Agent] Close all senders when creating sender error
1 parent 8cfa815 commit bd9ce03

10 files changed

Lines changed: 325 additions & 20 deletions

File tree

inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class CommonConstants {
4141
public static final boolean DEFAULT_PROXY_IS_COMPRESS = true;
4242

4343
public static final String PROXY_MAX_SENDER_PER_GROUP = "proxy.max.sender.per.group";
44-
public static final int DEFAULT_PROXY_MAX_SENDER_PER_GROUP = 10;
44+
public static final int DEFAULT_PROXY_MAX_SENDER_PER_GROUP = 3;
4545

4646
// max size of message list
4747
public static final String PROXY_PACKAGE_MAX_SIZE = "proxy.package.maxSize";
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.agent.metrics;
19+
20+
import org.apache.inlong.common.metric.CountMetric;
21+
import org.apache.inlong.common.metric.Dimension;
22+
import org.apache.inlong.common.metric.MetricDomain;
23+
import org.apache.inlong.common.metric.MetricItem;
24+
25+
import java.text.SimpleDateFormat;
26+
import java.util.concurrent.atomic.AtomicLong;
27+
28+
@MetricDomain(name = "AgentEvent")
29+
public class AgentEventMetricItem extends MetricItem {
30+
31+
public final static SimpleDateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
32+
public static final String KEY_INLONG_EVENT_TIME = "eventTime";
33+
public static final String KEY_INLONG_GROUP_ID = "groupId";
34+
public static final String KEY_INLONG_STREAM_ID = "streamId";
35+
public static final String KEY_INLONG_COMPONENT_TYPE = "componentType";
36+
public static final String KEY_INLONG_COMPONENT_NAME = "componentName";
37+
public static final String KEY_INLONG_AGENT_IP = "agentIp";
38+
public static final String KEY_INLONG_COMPONENT_VERSION = "componentVersion";
39+
public static final String KEY_INLONG_EVENT_TYPE = "eventType";
40+
public static final String KEY_INLONG_EVENT_LEVEL = "eventLevel";
41+
public static final String KEY_INLONG_EVENT_CODE = "eventCode";
42+
public static final String KEY_INLONG_EXT = "ext";
43+
public static final String KEY_INLONG_EVENT_DESC = "eventDesc";
44+
45+
@Dimension
46+
public String eventTime;
47+
@Dimension
48+
public String groupId;
49+
@Dimension
50+
public String streamId;
51+
@Dimension
52+
public String componentType;
53+
@Dimension
54+
public String componentName;
55+
@Dimension
56+
public String agentIp;
57+
@Dimension
58+
public String componentVersion;
59+
@Dimension
60+
public String eventType;
61+
@Dimension
62+
public String eventLevel;
63+
@Dimension
64+
public String eventCode;
65+
@Dimension
66+
public String ext;
67+
@Dimension
68+
public String eventDesc;
69+
70+
@CountMetric
71+
public AtomicLong count = new AtomicLong(0);
72+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.agent.metrics;
19+
20+
import org.apache.inlong.common.metric.MetricDomain;
21+
import org.apache.inlong.common.metric.MetricItemSet;
22+
23+
@MetricDomain(name = "AgentEvent")
24+
public class AgentEventMetricItemSet extends MetricItemSet<AgentEventMetricItem> {
25+
26+
/**
27+
* Constructor
28+
*
29+
* @param name
30+
*/
31+
public AgentEventMetricItemSet(String name) {
32+
super(name);
33+
}
34+
35+
@Override
36+
protected AgentEventMetricItem createItem() {
37+
return new AgentEventMetricItem();
38+
}
39+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.agent.utils;
19+
20+
import org.apache.inlong.agent.conf.AgentConfiguration;
21+
import org.apache.inlong.agent.metrics.AgentEventMetricItem;
22+
import org.apache.inlong.agent.metrics.AgentEventMetricItemSet;
23+
import org.apache.inlong.common.metric.MetricRegister;
24+
25+
import java.util.Date;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_IP;
30+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_AGENT_IP;
31+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_NAME;
32+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_TYPE;
33+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_COMPONENT_VERSION;
34+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_CODE;
35+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_DESC;
36+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_LEVEL;
37+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_TIME;
38+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EVENT_TYPE;
39+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_EXT;
40+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_GROUP_ID;
41+
import static org.apache.inlong.agent.metrics.AgentEventMetricItem.KEY_INLONG_STREAM_ID;
42+
43+
/**
44+
* DiagUtils
45+
*/
46+
public class EventReportUtils {
47+
48+
public enum EvenCodeEnum {
49+
50+
CONFIG_UPDATE_SUC(0, "config update suc"),
51+
CONFIG_NO_UPDATE(1, "config no update"),
52+
CONFIG_UPDATE_VERSION_NO_CHANGE(2, "config update version no change"),
53+
CONFIG_INVALID_RET_CODE(3, "config invalid ret code"),
54+
CONFIG_INVALID_RESULT(4, "config invalid result maybe visit manager failed"),
55+
TASK_ADD(5, "task add"),
56+
TASK_DELETE(6, "task delete");
57+
58+
private final int code;
59+
private final String message;
60+
61+
EvenCodeEnum(int code, String message) {
62+
this.code = code;
63+
this.message = message;
64+
}
65+
66+
public int getCode() {
67+
return code;
68+
}
69+
70+
public String getMessage() {
71+
return message;
72+
}
73+
}
74+
75+
private final static String COMPONENT_TYPE_AGENT = "AGENT";
76+
private final static String COMPONENT_NAME_AGENT = "AGENT";
77+
public static final String EVENT_TYPE_CONFIG_UPDATE = "CONFIG_UPDATE";
78+
public static final String EVENT_LEVEL_INFO = "INFO";
79+
public static final String EVENT_LEVEL_WARN = "WARN";
80+
public static final String EVENT_LEVEL_ERROR = "ERROR";
81+
private static AgentEventMetricItemSet metricItemSet;
82+
83+
private EventReportUtils() {
84+
}
85+
86+
public static void init() {
87+
metricItemSet = new AgentEventMetricItemSet(COMPONENT_NAME_AGENT);
88+
MetricRegister.register(metricItemSet);
89+
}
90+
91+
public static void report(String groupId, String streamId, long eventTime, String eventType,
92+
String eventLevel, EvenCodeEnum evenCode, String ext, String desc) {
93+
Map<String, String> dims = new HashMap<>();
94+
dims.put(KEY_INLONG_GROUP_ID, groupId);
95+
dims.put(KEY_INLONG_STREAM_ID, streamId);
96+
dims.put(KEY_INLONG_COMPONENT_TYPE, COMPONENT_TYPE_AGENT);
97+
dims.put(KEY_INLONG_COMPONENT_NAME, COMPONENT_NAME_AGENT);
98+
dims.put(KEY_INLONG_AGENT_IP, AgentConfiguration.getAgentConf().get(AGENT_LOCAL_IP));
99+
dims.put(KEY_INLONG_COMPONENT_VERSION, EventReportUtils.class.getPackage().getImplementationVersion());
100+
dims.put(KEY_INLONG_EVENT_TIME, AgentEventMetricItem.FORMAT.format(new Date(eventTime)));
101+
dims.put(KEY_INLONG_EVENT_TYPE, eventType);
102+
dims.put(KEY_INLONG_EVENT_LEVEL, eventLevel);
103+
dims.put(KEY_INLONG_EVENT_CODE, String.valueOf(evenCode.getCode()));
104+
dims.put(KEY_INLONG_EXT, ext.replaceAll("\\|", "-"));
105+
dims.put(KEY_INLONG_EVENT_DESC, desc);
106+
metricItemSet.findMetricItem(dims).count.addAndGet(1);
107+
}
108+
}

inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.inlong.agent.core.task.MemoryManager;
2222
import org.apache.inlong.agent.core.task.OffsetManager;
2323
import org.apache.inlong.agent.core.task.TaskManager;
24+
import org.apache.inlong.agent.metrics.audit.AuditUtils;
2425
import org.apache.inlong.agent.utils.AgentUtils;
2526
import org.apache.inlong.agent.utils.ExcuteLinux;
2627
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
@@ -167,10 +168,20 @@ private void doSendStatusMsg(TcpMsgSender sender) {
167168
}
168169
try {
169170
ProcessResult procResult = new ProcessResult();
171+
long dataTime = AgentUtils.getCurrentTime();
172+
byte[] body = data.getFieldsString().getBytes(StandardCharsets.UTF_8);
170173
if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM,
171-
INLONG_AGENT_STATUS, AgentUtils.getCurrentTime(), null,
172-
data.getFieldsString().getBytes(StandardCharsets.UTF_8)), procResult)) {
174+
INLONG_AGENT_STATUS, dataTime, null, body), procResult)) {
173175
LOGGER.error("send status failed: ret = {}", procResult);
176+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, INLONG_AGENT_SYSTEM, INLONG_AGENT_STATUS,
177+
dataTime, 1, body.length);
178+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, INLONG_AGENT_SYSTEM,
179+
INLONG_AGENT_STATUS, dataTime, 1, body.length);
180+
} else {
181+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, INLONG_AGENT_SYSTEM, INLONG_AGENT_STATUS,
182+
dataTime, 1, body.length);
183+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, INLONG_AGENT_SYSTEM,
184+
INLONG_AGENT_STATUS, dataTime, 1, body.length);
174185
}
175186
} catch (Throwable ex) {
176187
LOGGER.error("send status throw exception", ex);

inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.inlong.agent.core;
1919

2020
import org.apache.inlong.agent.conf.AgentConfiguration;
21+
import org.apache.inlong.agent.metrics.audit.AuditUtils;
2122
import org.apache.inlong.agent.utils.AgentUtils;
2223
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
2324
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
@@ -134,10 +135,20 @@ private void doSendStaticMsg(TcpMsgSender sender) {
134135
}
135136
try {
136137
ProcessResult procResult = new ProcessResult();
138+
long dataTime = AgentUtils.getCurrentTime();
139+
byte[] body = data.getFieldsString().getBytes(StandardCharsets.UTF_8);
137140
if (!sender.sendMessage(new TcpEventInfo(INLONG_AGENT_SYSTEM,
138-
INLONG_FILE_STATIC, AgentUtils.getCurrentTime(), null,
139-
data.getFieldsString().getBytes(StandardCharsets.UTF_8)), procResult)) {
141+
INLONG_FILE_STATIC, dataTime, null, body), procResult)) {
140142
LOGGER.error("send static failed: ret = {}", procResult);
143+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, INLONG_AGENT_SYSTEM, INLONG_FILE_STATIC,
144+
dataTime, 1, body.length);
145+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, INLONG_AGENT_SYSTEM,
146+
INLONG_FILE_STATIC, dataTime, 1, body.length);
147+
} else {
148+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, INLONG_AGENT_SYSTEM, INLONG_FILE_STATIC,
149+
dataTime, 1, body.length);
150+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, INLONG_AGENT_SYSTEM,
151+
INLONG_FILE_STATIC, dataTime, 1, body.length);
141152
}
142153
} catch (Throwable ex) {
143154
LOGGER.error("send static throw exception", ex);

inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ private HeartbeatManager(AgentManager agentManager) {
7777
httpManager = new HttpManager(conf);
7878
baseManagerUrl = httpManager.getBaseUrl();
7979
reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
80+
createMessageSender();
81+
AgentStatusManager.init(agentManager);
82+
FileStaticManager.init();
8083
}
8184

8285
public static HeartbeatManager getInstance(AgentManager agentManager) {
@@ -123,6 +126,9 @@ private Runnable heartbeatReportThread() {
123126
if (LOGGER.isDebugEnabled()) {
124127
LOGGER.debug(" {} report heartbeat to manager", heartbeatMsg);
125128
}
129+
if (sender == null) {
130+
createMessageSender();
131+
}
126132
AgentStatusManager.sendStatusMsg(sender);
127133
FileStaticManager.sendStaticMsg(sender);
128134
} catch (Throwable e) {
@@ -212,6 +218,7 @@ private void createMessageSender() {
212218
// start sender object
213219
ProcessResult procResult = new ProcessResult();
214220
if (!sender.start(procResult)) {
221+
sender.close();
215222
throw new ProxySdkException("Sender start failure, " + procResult);
216223
}
217224
} catch (Throwable ex) {

inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import org.apache.inlong.agent.store.Store;
2828
import org.apache.inlong.agent.store.TaskStore;
2929
import org.apache.inlong.agent.utils.AgentUtils;
30+
import org.apache.inlong.agent.utils.EventReportUtils;
31+
import org.apache.inlong.agent.utils.EventReportUtils.EvenCodeEnum;
3032
import org.apache.inlong.agent.utils.ThreadUtils;
3133
import org.apache.inlong.common.enums.TaskStateEnum;
3234

@@ -144,6 +146,7 @@ public TaskManager() {
144146
pendingTasks = new LinkedBlockingQueue<>(taskMaxLimit);
145147
configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY);
146148
actionQueue = new LinkedBlockingQueue<>(ACTION_QUEUE_CAPACITY);
149+
EventReportUtils.init();
147150
}
148151

149152
public static TaskStore getTaskStore() {
@@ -299,6 +302,11 @@ private void traverseManagerTasksToStore(Map<String, TaskProfile> tasksFromManag
299302
profileFromManager.getTaskId(),
300303
profileFromManager.isRetry(), profileFromManager.getState());
301304
addTask(profileFromManager);
305+
EventReportUtils.report(profileFromManager.getInlongGroupId(),
306+
profileFromManager.getInlongStreamId(), AgentUtils.getCurrentTime(),
307+
EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_INFO,
308+
EvenCodeEnum.TASK_ADD, profileFromManager.toJsonStr(),
309+
EvenCodeEnum.TASK_ADD.getMessage());
302310
} else {
303311
TaskStateEnum managerState = profileFromManager.getState();
304312
TaskStateEnum storeState = taskFromStore.getState();
@@ -331,6 +339,11 @@ private void traverseStoreTasksToManager(Map<String, TaskProfile> tasksFromManag
331339
taskStore.getTasks().forEach((profileFromStore) -> {
332340
if (!tasksFromManager.containsKey(profileFromStore.getTaskId())) {
333341
LOGGER.info("traverseStoreTasksToManager try to delete task {}", profileFromStore.getTaskId());
342+
EventReportUtils.report(profileFromStore.getInlongGroupId(),
343+
profileFromStore.getInlongStreamId(), AgentUtils.getCurrentTime(),
344+
EventReportUtils.EVENT_TYPE_CONFIG_UPDATE, EventReportUtils.EVENT_LEVEL_INFO,
345+
EvenCodeEnum.TASK_DELETE, profileFromStore.toJsonStr(),
346+
EvenCodeEnum.TASK_DELETE.getMessage());
334347
deleteTask(profileFromStore);
335348
}
336349
});

0 commit comments

Comments
 (0)