Skip to content

Commit 237cb1c

Browse files
committed
Merge pull request #68 from andyqzb/master
ORListener block for new events to reduce cpu usage
2 parents 8c31209 + 9c3be97 commit 237cb1c

2 files changed

Lines changed: 24 additions & 4 deletions

File tree

databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,9 @@ public interface TransactionProcessor
136136
/** Shared queue to transfer binlog events from OpenReplicator to ORlistener thread **/
137137
private BlockingQueue<BinlogEventV4> _binlogEventQueue = null;
138138

139+
/** Milli sec timeout for _binlogEventQueue operation **/
140+
private long _queueTimeoutMs = 100L;
141+
139142
public ORListener(String name,
140143
int currentFileNumber,
141144
Logger log,
@@ -144,7 +147,8 @@ public ORListener(String name,
144147
Map<String, Short> tableUriToSrcIdMap,
145148
Map<String, String> tableUriToSrcNameMap,
146149
SchemaRegistryService schemaRegistryService,
147-
int maxQueueSize)
150+
int maxQueueSize,
151+
long queueTimeoutMs)
148152
{
149153
super("ORListener_" + name);
150154
_log = log;
@@ -155,6 +159,7 @@ public ORListener(String name,
155159
_schemaRegistryService = schemaRegistryService;
156160
_currFileNum = currentFileNumber;
157161
_binlogEventQueue = new LinkedBlockingQueue<BinlogEventV4>(maxQueueSize);
162+
_queueTimeoutMs = queueTimeoutMs;
158163
}
159164

160165
@Override
@@ -163,7 +168,7 @@ public void onEvents(BinlogEventV4 event)
163168
boolean isPut = false;
164169
do {
165170
try {
166-
isPut = _binlogEventQueue.offer(event, 100, TimeUnit.MILLISECONDS);
171+
isPut = _binlogEventQueue.offer(event, _queueTimeoutMs, TimeUnit.MILLISECONDS);
167172
} catch (InterruptedException e) {
168173
_log.error("failed to put binlog event to binlogEventQueue event: " + event, e);
169174
}
@@ -638,9 +643,24 @@ public void run() {
638643

639644
eventList.clear();
640645
int eventNumber = _binlogEventQueue.drainTo(eventList);
646+
if (eventNumber == 0)
647+
{
648+
try
649+
{
650+
event = _binlogEventQueue.poll(_queueTimeoutMs, TimeUnit.MILLISECONDS);
651+
if (event != null)
652+
{
653+
eventList.add(event);
654+
eventNumber = eventList.size();
655+
}
656+
}
657+
catch (InterruptedException e)
658+
{
659+
_log.info("Interrupted when poll from _binlogEventQueue!!");
660+
}
661+
}
641662
for (int i = 0; i < eventNumber; i++)
642663
{
643-
644664
event = eventList.get(i);
645665
if (event == null)
646666
{

databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ void initOpenReplicator(long scn)
330330
String binlogFile = String.format("%s.%06d", _binlogFilePrefix, logid);
331331
// we should use a new ORListener to drop the left events in binlogEventQueue and the half processed transaction.
332332
_orListener = new ORListener(_sourceName, logid, _log, _binlogFilePrefix, _producerThread, _tableUriToSrcIdMap,
333-
_tableUriToSrcNameMap, _schemaRegistryService, 200);
333+
_tableUriToSrcNameMap, _schemaRegistryService, 200, 100L);
334334

335335
_or.setBinlogFileName(binlogFile);
336336
_or.setBinlogPosition(offset);

0 commit comments

Comments
 (0)