Skip to content

Commit f3f6f92

Browse files
committed
batch limit control for items and bytes (bytes only on logs for now)
1 parent c530bf4 commit f3f6f92

11 files changed

Lines changed: 185 additions & 75 deletions

File tree

proxy/src/main/java/com/wavefront/agent/core/buffers/ActiveMQBuffer.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import com.wavefront.agent.core.queues.QueueInfo;
77
import com.wavefront.agent.core.queues.QueueStats;
8-
import com.wavefront.agent.data.EntityRateLimiter;
98
import com.yammer.metrics.Metrics;
109
import com.yammer.metrics.core.Gauge;
1110
import com.yammer.metrics.core.Histogram;
@@ -35,6 +34,8 @@ public abstract class ActiveMQBuffer implements Buffer {
3534
private static final Logger log =
3635
LoggerFactory.getLogger(ActiveMQBuffer.class.getCanonicalName());
3736
private static final Logger slowLog = log;
37+
public static final String MSG_ITEMS = "items";
38+
public static final String MSG_BYTES = "bytes";
3839
// new
3940
// MessageDedupingLogger(LoggerFactory.getLogger(ActiveMQBuffer.class.getCanonicalName()), 1000,
4041
// 1);
@@ -221,8 +222,10 @@ public void doSendPoints(String queue, List<String> points) throws ActiveMQAddre
221222

222223
try {
223224
ClientMessage message = mqCtx.session.createMessage(true);
224-
message.writeBodyBufferString(String.join("\n", points));
225-
message.putIntProperty("points", points.size());
225+
String str = String.join("\n", points);
226+
message.writeBodyBufferString(str);
227+
message.putIntProperty(MSG_ITEMS, points.size());
228+
message.putIntProperty(MSG_BYTES, str.length());
226229
mqCtx.producer.send(message);
227230
} catch (ActiveMQAddressFullException e) {
228231
log.info("queue full: " + e.getMessage());
@@ -253,8 +256,7 @@ private void checkConnection() throws Exception {
253256
}
254257

255258
@Override
256-
public void onMsgBatch(
257-
QueueInfo queue, int idx, int batchSize, EntityRateLimiter rateLimiter, OnMsgFunction func) {
259+
public void onMsgBatch(QueueInfo queue, int idx, OnMsgDelegate delegate) {
258260
String sessionKey = "onMsgBatch." + queue.getName() + "." + Thread.currentThread().getName();
259261
Session mqCtx =
260262
consumers.computeIfAbsent(
@@ -277,20 +279,29 @@ public void onMsgBatch(
277279
try {
278280
long start = System.currentTimeMillis();
279281
mqCtx.session.start();
280-
List<String> batch = new ArrayList<>(batchSize);
282+
List<String> batch = new ArrayList<>();
281283
List<ClientMessage> toACK = new ArrayList<>();
282284
boolean done = false;
283285
boolean needRollBack = false;
284-
while ((batch.size() < batchSize) && !done && ((System.currentTimeMillis() - start) < 1000)) {
286+
int batchBytes = 0;
287+
while (!done && ((System.currentTimeMillis() - start) < 1000)) {
285288
ClientMessage msg = mqCtx.consumer.receive(100);
286289
if (msg != null) {
287290
List<String> points = Arrays.asList(msg.getReadOnlyBodyBuffer().readString().split("\n"));
288-
boolean ok = rateLimiter.tryAcquire(points.size());
289-
if (ok) {
291+
boolean ok_size =
292+
delegate.checkBatchSize(
293+
batch.size(), batchBytes, points.size(), msg.getIntProperty(MSG_BYTES));
294+
boolean ok_rate = delegate.checkRates(points.size(), batchBytes);
295+
if (ok_size && ok_rate) {
290296
toACK.add(msg);
291297
batch.addAll(points);
298+
batchBytes += msg.getIntProperty(MSG_BYTES);
292299
} else {
293-
slowLog.info("rate limit reached on queue '" + queue.getName() + "'");
300+
if (!ok_rate) {
301+
slowLog.info("rate limit reached on queue '" + queue.getName() + "'");
302+
} else {
303+
slowLog.info("payload limit reached on queue '" + queue.getName() + "'");
304+
}
294305
done = true;
295306
needRollBack = true;
296307
}
@@ -301,7 +312,7 @@ public void onMsgBatch(
301312

302313
try {
303314
if (batch.size() > 0) {
304-
func.run(batch);
315+
delegate.processBatch(batch);
305316
}
306317
// commit all messages ACKed
307318
toACK.forEach(

proxy/src/main/java/com/wavefront/agent/core/buffers/Bridge.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.wavefront.agent.core.buffers;
22

3+
import static com.wavefront.agent.core.buffers.ActiveMQBuffer.MSG_ITEMS;
4+
35
import com.wavefront.agent.core.queues.QueueStats;
46
import java.util.Arrays;
57
import java.util.List;
@@ -55,10 +57,10 @@ public void messageAcknowledged(MessageReference ref, AckReason reason, ServerCo
5557
diskBuffer.sendPoints(queue, points);
5658
switch (reason) {
5759
case KILLED:
58-
stats.queuedFailed.inc(ref.getMessage().getIntProperty("points"));
60+
stats.queuedFailed.inc(ref.getMessage().getIntProperty(MSG_ITEMS));
5961
break;
6062
case EXPIRED:
61-
stats.queuedExpired.inc(ref.getMessage().getIntProperty("points"));
63+
stats.queuedExpired.inc(ref.getMessage().getIntProperty(MSG_ITEMS));
6264
break;
6365
}
6466
} catch (ActiveMQAddressFullException e) {

proxy/src/main/java/com/wavefront/agent/core/buffers/Buffer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package com.wavefront.agent.core.buffers;
22

33
import com.wavefront.agent.core.queues.QueueInfo;
4-
import com.wavefront.agent.data.EntityRateLimiter;
54
import java.util.List;
65
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
76

87
public interface Buffer {
98
void registerNewQueueInfo(QueueInfo key);
109

11-
void onMsgBatch(
12-
QueueInfo key, int idx, int batchSize, EntityRateLimiter rateLimiter, OnMsgFunction func);
10+
void onMsgBatch(QueueInfo key, int idx, OnMsgDelegate func);
1311

1412
void sendPoints(String queue, List<String> strPoint) throws ActiveMQAddressFullException;
1513

proxy/src/main/java/com/wavefront/agent/core/buffers/BuffersManager.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,9 @@ public static void sendMsg(QueueInfo queue, String strPoint) {
7676
public static void onMsgBatch(
7777
QueueInfo handler,
7878
int idx,
79-
int batchSize,
8079
EntityRateLimiter rateLimiter,
81-
OnMsgFunction func) {
82-
memoryBuffer.onMsgBatch(handler, idx, batchSize, rateLimiter, func);
80+
OnMsgDelegate func) {
81+
memoryBuffer.onMsgBatch(handler, idx, func);
8382
}
8483

8584
public static void truncateBacklog() {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.wavefront.agent.core.buffers;
2+
3+
import java.util.List;
4+
5+
public interface OnMsgDelegate {
6+
void processBatch(List<String> batch) throws Exception;
7+
8+
boolean checkBatchSize(int items, int bytes, int newItems, int newBytes);
9+
10+
boolean checkRates(int newItems, int newBytes);
11+
}

proxy/src/main/java/com/wavefront/agent/core/buffers/OnMsgFunction.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

proxy/src/main/java/com/wavefront/agent/core/buffers/PointsGauge.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.wavefront.agent.core.buffers;
22

3+
import static com.wavefront.agent.core.buffers.ActiveMQBuffer.MSG_ITEMS;
4+
35
import com.wavefront.agent.PushAgent;
46
import com.wavefront.agent.core.queues.QueueInfo;
57
import com.wavefront.common.NamedThreadFactory;
@@ -47,7 +49,7 @@ long doCount() {
4749
(QueueControl) amq.getManagementService().getResource(ResourceNames.QUEUE + queueName);
4850
Map<String, Object>[] messages = queueControl.listMessages("");
4951
for (Map<String, Object> message : messages) {
50-
int p = (int) message.get("points");
52+
int p = (int) message.get(MSG_ITEMS);
5153
count += p;
5254
}
5355
}

proxy/src/main/java/com/wavefront/agent/core/buffers/SQSBuffer.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
66
import com.amazonaws.services.sqs.model.*;
77
import com.wavefront.agent.core.queues.QueueInfo;
8-
import com.wavefront.agent.data.EntityRateLimiter;
98
import java.util.*;
109
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
1110
import org.slf4j.Logger;
@@ -71,15 +70,14 @@ public void registerNewQueueInfo(QueueInfo queue) {
7170
}
7271

7372
@Override
74-
public void onMsgBatch(
75-
QueueInfo queue, int idx, int batchSize, EntityRateLimiter rateLimiter, OnMsgFunction func) {
73+
public void onMsgBatch(QueueInfo queue, int idx, OnMsgDelegate func) {
7674

7775
String queueUrl = queuesUrls.get(queue.getName());
7876
long start = System.currentTimeMillis();
79-
List<String> batch = new ArrayList<>(batchSize);
77+
List<String> batch = new ArrayList<>();
8078
List<Message> messagesToDelete = new ArrayList<>();
8179
boolean done = false;
82-
while ((batch.size() < batchSize) && !done && ((System.currentTimeMillis() - start) < 1000)) {
80+
while (!done && ((System.currentTimeMillis() - start) < 1000)) {
8381
ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);
8482
receiveRequest.setMaxNumberOfMessages(1);
8583
receiveRequest.setWaitTimeSeconds(1);
@@ -89,14 +87,15 @@ public void onMsgBatch(
8987
List<String> points = Arrays.asList(messages.get(0).getBody().split("\n"));
9088
batch.addAll(points);
9189
messagesToDelete.addAll(messages);
90+
done = !func.checkBatchSize(batch.size(), 0,0,0);
9291
} else {
9392
done = true;
9493
}
9594
}
9695

9796
try {
9897
if (batch.size() > 0) {
99-
func.run(batch);
98+
func.processBatch(batch);
10099
}
101100
messagesToDelete.forEach(
102101
message -> {

proxy/src/main/java/com/wavefront/agent/core/senders/LogSenderTask.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class LogSenderTask extends SenderTask {
2121
private final QueueInfo queue;
2222
private final LogAPI logAPI;
2323
private final UUID proxyId;
24+
private EntityProperties properties;
2425

2526
/**
2627
* @param queue handler key, that serves as an identifier of the log pipeline.
@@ -40,6 +41,11 @@ public class LogSenderTask extends SenderTask {
4041
this.queue = queue;
4142
this.logAPI = logAPI;
4243
this.proxyId = proxyId;
44+
this.properties = properties;
45+
}
46+
47+
public boolean checkBatchSize(int items, int bytes, int newItems, int newBytes) {
48+
return bytes + newBytes <= properties.getDataPerBatch();
4349
}
4450

4551
protected Response submit(List<String> logs) {

proxy/src/main/java/com/wavefront/agent/core/senders/SenderTask.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static com.wavefront.common.Utils.isWavefrontResponse;
44

55
import com.wavefront.agent.core.buffers.Buffer;
6+
import com.wavefront.agent.core.buffers.OnMsgDelegate;
67
import com.wavefront.agent.core.queues.QueueInfo;
78
import com.wavefront.agent.core.queues.QueueStats;
89
import com.wavefront.agent.data.EntityProperties;
@@ -22,7 +23,7 @@
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425

25-
abstract class SenderTask implements Runnable {
26+
abstract class SenderTask implements Runnable, OnMsgDelegate {
2627
private static final Logger log = LoggerFactory.getLogger(SenderTask.class.getCanonicalName());
2728
// new MessageDedupingLogger(LoggerFactory.getLogger(SenderTask.class.getCanonicalName()),
2829
// 1000, 1);
@@ -45,18 +46,24 @@ abstract class SenderTask implements Runnable {
4546
@Override
4647
public void run() {
4748
try {
48-
buffer.onMsgBatch(
49-
queue,
50-
idx,
51-
properties.getDataPerBatch(),
52-
properties.getRateLimiter(),
53-
this::processBatch);
49+
buffer.onMsgBatch(queue, idx, this);
5450
} catch (Throwable e) {
5551
log.error("error sending " + queue.getEntityType().name(), e);
5652
}
5753
}
5854

59-
private void processBatch(List<String> batch) throws SenderTaskException {
55+
@Override
56+
public boolean checkBatchSize(int items, int bytes, int newItems, int newBytes) {
57+
return items + newItems <= properties.getDataPerBatch();
58+
}
59+
60+
@Override
61+
public boolean checkRates(int newItems, int newBytes) {
62+
return properties.getRateLimiter().tryAcquire(newItems);
63+
}
64+
65+
@Override
66+
public void processBatch(List<String> batch) throws SenderTaskException {
6067
TimerContext timer =
6168
Metrics.newTimer(
6269
new MetricName("push." + queue.getName(), "", "duration"),

0 commit comments

Comments
 (0)