Skip to content

Commit 055d9f4

Browse files
committed
exporter and midBuffer items
1 parent f3f6f92 commit 055d9f4

12 files changed

Lines changed: 290 additions & 78 deletions

File tree

proxy/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@
389389
<dependency>
390390
<groupId>org.apache.activemq</groupId>
391391
<artifactId>artemis-server</artifactId>
392-
<version>2.27.0</version>
392+
<version>2.27.1</version>
393393
</dependency>
394394
<dependency>
395395
<groupId>com.wavefront</groupId>

proxy/src/main/java/com/wavefront/agent/AbstractAgent.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.wavefront.agent.api.APIContainer;
1616
import com.wavefront.agent.config.LogsIngestionConfig;
1717
import com.wavefront.agent.core.buffers.BuffersManager;
18+
import com.wavefront.agent.core.buffers.Exporter;
1819
import com.wavefront.agent.core.senders.SenderTasksManager;
1920
import com.wavefront.agent.data.EntityPropertiesFactoryImpl;
2021
import com.wavefront.agent.logsharvesting.InteractiveLogsTester;
@@ -236,9 +237,17 @@ public void start(String[] args) {
236237
}
237238

238239
// If we are exporting data from the queue, run export and exit
239-
// TODO: queue exporter
240-
if (proxyConfig.getExportQueueOutputFile() != null
241-
&& proxyConfig.getExportQueuePorts() != null) {
240+
if (proxyConfig.getExportQueueOutputDir() != null
241+
&& proxyConfig.getExportQueueAtoms() != null) {
242+
try {
243+
Exporter.export(
244+
proxyConfig.getBufferFile(),
245+
proxyConfig.getExportQueueOutputDir(),
246+
proxyConfig.getExportQueueAtoms(),
247+
proxyConfig.isExportQueueRetainData());
248+
} catch (Throwable e) {
249+
System.out.println(e.getMessage());
250+
}
242251
System.exit(0);
243252
}
244253

proxy/src/main/java/com/wavefront/agent/ProxyConfig.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -165,19 +165,19 @@ public class ProxyConfig extends Configuration {
165165

166166
// TODO: review export buffer
167167
@Parameter(
168-
names = {"--exportQueuePorts"},
168+
names = {"--exportQueueAtoms"},
169169
description =
170170
"Export queued data in plaintext "
171-
+ "format for specified ports (comma-delimited list) and exit. Set to 'all' to export "
172-
+ "everything. Default: none")
173-
String exportQueuePorts = null;
171+
+ "format for specified atoms (comma-delimited list) and exit. Set to 'all' to export "
172+
+ "everything. Default: none, valid values: points, deltaCounters, histograms, sourceTags, spans, spanLogs, events, logs")
173+
String exportQueueAtoms = null;
174174

175175
@Parameter(
176-
names = {"--exportQueueOutputFile"},
176+
names = {"--exportQueueOutputDir"},
177177
description =
178178
"Export queued data in plaintext "
179179
+ "format for specified ports (comma-delimited list) and exit. Default: none")
180-
String exportQueueOutputFile = null;
180+
String exportQueueOutputDir = null;
181181

182182
@Parameter(
183183
names = {"--exportQueueRetainData"},
@@ -1236,12 +1236,12 @@ public String getSqsQueueIdentifier() {
12361236
return sqsQueueIdentifier;
12371237
}
12381238

1239-
public String getExportQueuePorts() {
1240-
return exportQueuePorts;
1239+
public String getExportQueueAtoms() {
1240+
return exportQueueAtoms;
12411241
}
12421242

1243-
public String getExportQueueOutputFile() {
1244-
return exportQueueOutputFile;
1243+
public String getExportQueueOutputDir() {
1244+
return exportQueueOutputDir;
12451245
}
12461246

12471247
public boolean isExportQueueRetainData() {
@@ -2233,8 +2233,8 @@ public void verifyAndInit() {
22332233
customApplicationTags = config.getString("customApplicationTags", customApplicationTags);
22342234
customServiceTags = config.getString("customServiceTags", customServiceTags);
22352235

2236-
exportQueuePorts = config.getString("exportQueuePorts", exportQueuePorts);
2237-
exportQueueOutputFile = config.getString("exportQueueOutputFile", exportQueueOutputFile);
2236+
exportQueueAtoms = config.getString("exportQueueAtoms", exportQueueAtoms);
2237+
exportQueueOutputDir = config.getString("exportQueueOutputDir", exportQueueOutputDir);
22382238
exportQueueRetainData = config.getBoolean("exportQueueRetainData", exportQueueRetainData);
22392239
flushThreads = config.getInteger("flushThreads", flushThreads);
22402240
flushThreadsEvents = config.getInteger("flushThreadsEvents", flushThreadsEvents);

proxy/src/main/java/com/wavefront/agent/core/buffers/ActiveMQBuffer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public abstract class ActiveMQBuffer implements Buffer {
4444
final String name;
4545
private final Map<String, Session> producers = new ConcurrentHashMap<>();
4646
private final Map<String, Session> consumers = new ConcurrentHashMap<>();
47-
private final Map<String, Gauge<Object>> sizeMetrics = new HashMap<>(); // TODO review
47+
private final Map<String, Gauge<Object>> sizeMetrics = new HashMap<>();
4848
private final Map<String, Histogram> timeMetrics = new HashMap<>();
4949
private final int serverID;
5050
protected Buffer nextBuffer;

proxy/src/main/java/com/wavefront/agent/core/buffers/BuffersManager.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,7 @@ public static void sendMsg(QueueInfo queue, String strPoint) {
7474
}
7575

7676
public static void onMsgBatch(
77-
QueueInfo handler,
78-
int idx,
79-
EntityRateLimiter rateLimiter,
80-
OnMsgDelegate func) {
77+
QueueInfo handler, int idx, EntityRateLimiter rateLimiter, OnMsgDelegate func) {
8178
memoryBuffer.onMsgBatch(handler, idx, func);
8279
}
8380

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.wavefront.agent.core.buffers;
2+
3+
import com.wavefront.data.ReportableEntityType;
4+
import java.io.BufferedWriter;
5+
import java.io.File;
6+
import java.io.FileWriter;
7+
import java.io.IOException;
8+
import java.util.Arrays;
9+
import java.util.List;
10+
import java.util.concurrent.atomic.AtomicInteger;
11+
import java.util.stream.Collectors;
12+
import org.apache.activemq.artemis.api.core.SimpleString;
13+
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
14+
import org.apache.activemq.artemis.core.server.ActiveMQServer;
15+
import org.apache.activemq.artemis.core.server.MessageReference;
16+
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
17+
18+
public class Exporter {
19+
public static void export(String bufferStr, String dirStr, String atomsStr, boolean retainData) {
20+
List<String> atomsList = Arrays.asList(atomsStr.split(","));
21+
atomsList.replaceAll(String::trim);
22+
List<ReportableEntityType> atoms =
23+
atomsList.stream()
24+
.map(
25+
s -> {
26+
ReportableEntityType atom = ReportableEntityType.fromString(s);
27+
if (atom == null) {
28+
throw new IllegalArgumentException("invalid atom '" + s + "'");
29+
}
30+
return atom;
31+
})
32+
.collect(Collectors.toList());
33+
File dir = new File(dirStr);
34+
35+
DiskBufferConfig config = new DiskBufferConfig();
36+
config.buffer = new File(bufferStr);
37+
DiskBuffer buffer = new DiskBuffer(1, "disk", config);
38+
atoms.forEach(
39+
atom -> {
40+
ActiveMQServer amq = buffer.activeMQServer;
41+
try {
42+
File outFile = new File(dir, atom.toString().toLowerCase() + ".txt");
43+
System.out.println(
44+
"Exporting '" + atom + "' from '" + dirStr + "' to '" + outFile + "'");
45+
AtomicInteger c= new AtomicInteger();
46+
BufferedWriter out = new BufferedWriter(new FileWriter(outFile));
47+
amq.getPostOffice()
48+
.listQueuesForAddress(SimpleString.toSimpleString(atom.name()))
49+
.forEach(
50+
queue -> {
51+
LinkedListIterator<MessageReference> it = queue.browserIterator();
52+
while (it.hasNext()) {
53+
CoreMessage msg = (CoreMessage) it.next().getMessage();
54+
List<String> points =
55+
Arrays.asList(msg.getReadOnlyBodyBuffer().readString().split("\n"));
56+
try {
57+
out.write(String.join("\n", points));
58+
out.write("\n");
59+
} catch (IOException e) {
60+
throw new RuntimeException("Error writing on the output file.", e);
61+
}
62+
if (!retainData) {
63+
try {
64+
queue.deleteReference(msg.getMessageID());
65+
} catch (Exception e) {
66+
throw new RuntimeException("Error deleting data from the buffer", e);
67+
}
68+
}
69+
if (c.addAndGet(points.size()) %100_000==0){
70+
System.out.print(".");
71+
}
72+
}
73+
});
74+
out.flush();
75+
out.close();
76+
System.out.println(".\nDone, exported "+(retainData?"":"and deleted ")+c+" "+atom.toString()+"\n");
77+
} catch (Exception e) {
78+
throw new RuntimeException(e);
79+
}
80+
});
81+
buffer.shutdown();
82+
}
83+
}

proxy/src/main/java/com/wavefront/agent/core/buffers/MemoryBuffer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public void shutdown() {
6565
for (Object obj : queues) {
6666
QueueControl queue = (QueueControl) obj;
6767
int c = queue.expireMessages("");
68-
System.out.println("-> queue: " + queue.getName() + " - " + c);
6968
counter += c;
7069
}
7170
} catch (Exception e) {
@@ -138,7 +137,7 @@ public void run() {
138137
boolean done = false;
139138
while (!done) {
140139
ArrayList<String> metrics = new ArrayList<>();
141-
if (midBuffer.drainTo(metrics, 100) != 0) {
140+
if (midBuffer.drainTo(metrics, queue.getMaxItemsPerMessage()) != 0) {
142141
try {
143142
sendPoints(queue.getName(), metrics);
144143
} catch (ActiveMQAddressFullException e) {

proxy/src/main/java/com/wavefront/agent/core/buffers/SQSBuffer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void onMsgBatch(QueueInfo queue, int idx, OnMsgDelegate func) {
8787
List<String> points = Arrays.asList(messages.get(0).getBody().split("\n"));
8888
batch.addAll(points);
8989
messagesToDelete.addAll(messages);
90-
done = !func.checkBatchSize(batch.size(), 0,0,0);
90+
done = !func.checkBatchSize(batch.size(), 0, 0, 0);
9191
} else {
9292
done = true;
9393
}

proxy/src/main/java/com/wavefront/agent/core/queues/Queue.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,23 @@ class Queue implements QueueInfo {
1212
private final String tenant;
1313
private final int threads;
1414
private final Map<String, QueueInfo> tenants = new HashMap<>();
15+
private final int midBufferItems;
1516

1617
Queue(ReportableEntityType entityType, String tenant, int threads) {
1718
this.name = entityType + (tenant.equalsIgnoreCase(CENTRAL_TENANT_NAME) ? "" : "." + tenant);
1819
this.entityType = entityType;
1920
this.tenant = tenant;
2021
this.threads = threads;
22+
switch (entityType) {
23+
case LOGS:
24+
midBufferItems = 10;
25+
break;
26+
case POINT:
27+
midBufferItems = 255;
28+
break;
29+
default:
30+
midBufferItems = 100;
31+
}
2132
QueueStats.register(this);
2233
}
2334

@@ -48,6 +59,11 @@ public int getNumberThreads() {
4859
return threads;
4960
}
5061

62+
@Override
63+
public int getMaxItemsPerMessage() {
64+
return midBufferItems;
65+
}
66+
5167
public void addTenant(String tenant, Queue queue) {
5268
tenants.put(tenant, queue);
5369
}

proxy/src/main/java/com/wavefront/agent/core/queues/QueueInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,6 @@ public interface QueueInfo {
1515
String getName();
1616

1717
int getNumberThreads();
18+
19+
int getMaxItemsPerMessage();
1820
}

0 commit comments

Comments
 (0)