Skip to content

Commit 8effb56

Browse files
committed
Merge pull request #58 from andyqzb/master
the patch to improve mysql open replicator event producer's performance
2 parents c4301ed + fd6495c commit 8effb56

2 files changed

Lines changed: 183 additions & 123 deletions

File tree

databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java

Lines changed: 170 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import java.util.HashMap;
1212
import java.util.List;
1313
import java.util.Map;
14+
import java.util.concurrent.BlockingQueue;
15+
import java.util.concurrent.LinkedBlockingQueue;
16+
import java.util.concurrent.TimeUnit;
1417

1518
import org.apache.avro.Schema;
1619
import org.apache.avro.generic.GenericData;
@@ -23,6 +26,7 @@
2326
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;
2427
import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
2528
import com.google.code.or.binlog.impl.event.DeleteRowsEventV2;
29+
import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;
2630
import com.google.code.or.binlog.impl.event.QueryEvent;
2731
import com.google.code.or.binlog.impl.event.RotateEvent;
2832
import com.google.code.or.binlog.impl.event.TableMapEvent;
@@ -54,6 +58,7 @@
5458
import com.google.code.or.common.glossary.column.TinyColumn;
5559
import com.google.code.or.common.glossary.column.YearColumn;
5660
import com.linkedin.databus.core.DatabusRuntimeException;
61+
import com.linkedin.databus.core.DatabusThreadBase;
5762
import com.linkedin.databus.core.DbusOpcode;
5863
import com.linkedin.databus2.core.DatabusException;
5964
import com.linkedin.databus2.producers.ds.DbChangeEntry;
@@ -74,7 +79,7 @@
7479
* This class is responsible for converting Bin log events to Avro records using schemaRegistry and calling an application callback
7580
* to let the application generate DbusEvent and append to EventBuffer.
7681
*/
77-
class ORListener implements BinlogEventListener
82+
class ORListener extends DatabusThreadBase implements BinlogEventListener
7883
{
7984
/**
8085
* Application Callback from this class to process one transaction
@@ -134,143 +139,40 @@ public interface TransactionProcessor
134139
/** Flag to indicate the begining of the txn is seen. Used to indicate **/
135140
private boolean _isBeginTxnSeen = false;
136141

137-
public ORListener(int currentFileNumber,
142+
private BlockingQueue<BinlogEventV4> binlogEventQueue = null;
143+
144+
public ORListener(String name,
145+
int currentFileNumber,
138146
Logger log,
139147
String binlogFilePrefix,
140148
TransactionProcessor txnProcessor,
141149
Map<String, Short> tableUriToSrcIdMap,
142150
Map<String, String> tableUriToSrcNameMap,
143-
SchemaRegistryService schemaRegistryService)
151+
SchemaRegistryService schemaRegistryService,
152+
int maxQueueSize)
144153
{
154+
super("ORListener_" + name);
145155
_log = log;
146156
_txnProcessor = txnProcessor;
147157
_binlogFilePrefix = binlogFilePrefix;
148158
_tableUriToSrcIdMap = tableUriToSrcIdMap;
149159
_tableUriToSrcNameMap = tableUriToSrcNameMap;
150160
_schemaRegistryService = schemaRegistryService;
151161
_currFileNum = currentFileNumber;
162+
binlogEventQueue = new LinkedBlockingQueue<BinlogEventV4>(maxQueueSize);
152163
}
153164

154165
@Override
155166
public void onEvents(BinlogEventV4 event)
156167
{
157-
if ( event == null)
158-
{
159-
_log.error("Received null event");
160-
return;
161-
}
162-
163-
// Beginning of Txn
164-
if (event instanceof QueryEvent)
165-
{
166-
QueryEvent qe = (QueryEvent)event;
167-
String sql = qe.getSql().toString();
168-
if ("BEGIN".equalsIgnoreCase(sql))
169-
{
170-
_isBeginTxnSeen = true;
171-
_log.info("BEGIN sql: " + sql);
172-
_currTxnSizeInBytes = event.getHeader().getEventLength();
173-
startXtion(qe);
174-
return;
175-
}
176-
}
177-
178-
if ( ! _isBeginTxnSeen )
179-
{
180-
if (_log.isDebugEnabled())
181-
{
182-
_log.debug("Skipping event (" + event
183-
+ ") as this is before the start of first transaction");
168+
boolean isPut = false;
169+
do {
170+
try {
171+
isPut = binlogEventQueue.offer(event, 100, TimeUnit.MILLISECONDS);
172+
} catch (InterruptedException e) {
173+
_log.error("failed to put binlog event to binlogEventQueue event: " + event, e);
184174
}
185-
return;
186-
}
187-
188-
_currTxnSizeInBytes += event.getHeader().getEventLength();
189-
190-
if (event instanceof QueryEvent)
191-
{
192-
QueryEvent qe = (QueryEvent)event;
193-
String sql = qe.getSql().toString();
194-
if ("COMMIT".equalsIgnoreCase(sql))
195-
{
196-
_log.debug("COMMIT sql: " + sql);
197-
endXtion(qe);
198-
return;
199-
}
200-
else if ("ROLLBACK".equalsIgnoreCase(sql))
201-
{
202-
_log.debug("ROLLBACK sql: " + sql);
203-
rollbackXtion(qe);
204-
return;
205-
}
206-
else
207-
{
208-
// Ignore DDL statements for now
209-
_log.debug("Likely DDL statement sql: " + sql);
210-
return;
211-
}
212-
}
213-
else if (event instanceof RotateEvent)
214-
{
215-
RotateEvent re = (RotateEvent)event;
216-
String fileName = re.getBinlogFileName().toString();
217-
_log.info("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + _binlogFilePrefix);
218-
String fileNumStr = fileName.substring(fileName.lastIndexOf(_binlogFilePrefix) + _binlogFilePrefix.length() + 1);
219-
_currFileNum = Integer.parseInt(fileNumStr);
220-
}
221-
else if (event instanceof XidEvent)
222-
{
223-
XidEvent xe = (XidEvent)event;
224-
long xid = xe.getXid();
225-
_log.debug("Treating XID event with xid = " + xid + " as commit for the transaction");
226-
endXtion(xe);
227-
return;
228-
}
229-
else if (event instanceof WriteRowsEvent)
230-
{
231-
WriteRowsEvent wre = (WriteRowsEvent)event;
232-
insertRows(wre);
233-
}
234-
else if (event instanceof WriteRowsEventV2)
235-
{
236-
WriteRowsEventV2 wre = (WriteRowsEventV2)event;
237-
insertRows(wre);
238-
}
239-
else if (event instanceof UpdateRowsEvent)
240-
{
241-
UpdateRowsEvent ure = (UpdateRowsEvent)event;
242-
updateRows(ure);
243-
}
244-
else if (event instanceof UpdateRowsEventV2)
245-
{
246-
UpdateRowsEventV2 ure = (UpdateRowsEventV2)event;
247-
updateRows(ure);
248-
}
249-
else if (event instanceof DeleteRowsEventV2)
250-
{
251-
DeleteRowsEventV2 dre = (DeleteRowsEventV2)event;
252-
deleteRows(dre);
253-
}
254-
else if (event instanceof DeleteRowsEvent)
255-
{
256-
DeleteRowsEvent dre = (DeleteRowsEvent)event;
257-
deleteRows(dre);
258-
}
259-
else if (event instanceof TableMapEvent)
260-
{
261-
TableMapEvent tme = (TableMapEvent)event;
262-
processTableMapEvent(tme);
263-
}
264-
else
265-
{
266-
_log.warn("Skipping !! Unknown OR event e: " + event);
267-
return;
268-
}
269-
270-
if ( _log.isDebugEnabled())
271-
{
272-
_log.debug("e: " + event);
273-
}
175+
} while (!isPut && !isShutdownRequested());
274176
}
275177

276178

@@ -774,4 +676,153 @@ public static long scn(int logId, int offset)
774676
scn |= offset;
775677
return scn;
776678
}
679+
680+
@Override
681+
public void run() {
682+
List<BinlogEventV4> eventList = new ArrayList<BinlogEventV4>();
683+
BinlogEventV4 event;
684+
boolean isShutdown = false;
685+
while (!isShutdown)
686+
{
687+
eventList.clear();
688+
int eventNumber = binlogEventQueue.drainTo(eventList);
689+
for (int i = 0; i < eventNumber; i++)
690+
{
691+
if (isShutdownRequested())
692+
{
693+
isShutdown = true;
694+
break;
695+
}
696+
697+
event = eventList.get(i);
698+
if (event == null)
699+
{
700+
_log.error("Received null event");
701+
continue;
702+
}
703+
704+
try {
705+
// Beginning of Txn
706+
if (event instanceof QueryEvent)
707+
{
708+
QueryEvent qe = (QueryEvent)event;
709+
String sql = qe.getSql().toString();
710+
if ("BEGIN".equalsIgnoreCase(sql))
711+
{
712+
_isBeginTxnSeen = true;
713+
_log.info("BEGIN sql: " + sql);
714+
_currTxnSizeInBytes = event.getHeader().getEventLength();
715+
startXtion(qe);
716+
continue;
717+
}
718+
}
719+
720+
if ( ! _isBeginTxnSeen )
721+
{
722+
if (_log.isDebugEnabled())
723+
{
724+
_log.debug("Skipping event (" + event
725+
+ ") as this is before the start of first transaction");
726+
}
727+
continue;
728+
}
729+
730+
_currTxnSizeInBytes += event.getHeader().getEventLength();
731+
732+
if (event instanceof QueryEvent)
733+
{
734+
QueryEvent qe = (QueryEvent)event;
735+
String sql = qe.getSql().toString();
736+
if ("COMMIT".equalsIgnoreCase(sql))
737+
{
738+
_log.debug("COMMIT sql: " + sql);
739+
endXtion(qe);
740+
continue;
741+
}
742+
else if ("ROLLBACK".equalsIgnoreCase(sql))
743+
{
744+
_log.debug("ROLLBACK sql: " + sql);
745+
rollbackXtion(qe);
746+
continue;
747+
}
748+
else
749+
{
750+
// Ignore DDL statements for now
751+
_log.debug("Likely DDL statement sql: " + sql);
752+
continue;
753+
}
754+
}
755+
else if (event instanceof RotateEvent)
756+
{
757+
RotateEvent re = (RotateEvent)event;
758+
String fileName = re.getBinlogFileName().toString();
759+
_log.info("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + _binlogFilePrefix);
760+
String fileNumStr = fileName.substring(fileName.lastIndexOf(_binlogFilePrefix) + _binlogFilePrefix.length() + 1);
761+
_currFileNum = Integer.parseInt(fileNumStr);
762+
}
763+
else if (event instanceof XidEvent)
764+
{
765+
XidEvent xe = (XidEvent)event;
766+
long xid = xe.getXid();
767+
_log.debug("Treating XID event with xid = " + xid + " as commit for the transaction");
768+
endXtion(xe);
769+
continue;
770+
}
771+
else if (event instanceof WriteRowsEvent)
772+
{
773+
WriteRowsEvent wre = (WriteRowsEvent)event;
774+
insertRows(wre);
775+
}
776+
else if (event instanceof WriteRowsEventV2)
777+
{
778+
WriteRowsEventV2 wre = (WriteRowsEventV2)event;
779+
insertRows(wre);
780+
}
781+
else if (event instanceof UpdateRowsEvent)
782+
{
783+
UpdateRowsEvent ure = (UpdateRowsEvent)event;
784+
updateRows(ure);
785+
}
786+
else if (event instanceof UpdateRowsEventV2)
787+
{
788+
UpdateRowsEventV2 ure = (UpdateRowsEventV2)event;
789+
updateRows(ure);
790+
}
791+
else if (event instanceof DeleteRowsEventV2)
792+
{
793+
DeleteRowsEventV2 dre = (DeleteRowsEventV2)event;
794+
deleteRows(dre);
795+
}
796+
else if (event instanceof DeleteRowsEvent)
797+
{
798+
DeleteRowsEvent dre = (DeleteRowsEvent)event;
799+
deleteRows(dre);
800+
}
801+
else if (event instanceof TableMapEvent)
802+
{
803+
TableMapEvent tme = (TableMapEvent)event;
804+
processTableMapEvent(tme);
805+
}
806+
else if (event instanceof FormatDescriptionEvent)
807+
{
808+
// we don't need process this event
809+
_log.info("receive FormatDescriptionEvent event");
810+
}
811+
else
812+
{
813+
_log.warn("Skipping !! Unknown OR event e: " + event);
814+
continue;
815+
}
816+
817+
if ( _log.isDebugEnabled())
818+
{
819+
_log.debug("e: " + event);
820+
}
821+
} catch (Exception e) {
822+
_log.error("failed to process binlog event, event: " + event, e);
823+
}
824+
}
825+
}
826+
_log.info("ORListener Thread done");
827+
}
777828
}

databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,14 @@ public class EventProducerThread extends DatabusThreadBase implements Transactio
308308

309309
private final long _sinceScn;
310310

311+
private ORListener _orListener;
312+
313+
private String _sourceName;
314+
311315
public EventProducerThread(String sourceName, long sinceScn)
312316
{
313317
super("OpenReplicator_" + sourceName);
318+
_sourceName = sourceName;
314319
_sinceScn = sinceScn;
315320
}
316321

@@ -324,18 +329,19 @@ public void run()
324329
int logid = logid(_sinceScn);
325330

326331
String binlogFile = String.format("%s.%06d", _binlogFilePrefix, logid);
327-
ORListener orl = new ORListener(logid, _log, _binlogFilePrefix,
328-
_producerThread, _tableUriToSrcIdMap, _tableUriToSrcNameMap,
329-
_schemaRegistryService);
332+
_orListener = new ORListener(_sourceName, logid, _log,
333+
_binlogFilePrefix, _producerThread, _tableUriToSrcIdMap,
334+
_tableUriToSrcNameMap, _schemaRegistryService, 200);
330335

331336
_or.setBinlogFileName(binlogFile);
332337
_or.setBinlogPosition(offset);
333-
_or.setBinlogEventListener(orl);
338+
_or.setBinlogEventListener(_orListener);
334339

335340
try
336341
{
337342
_log.info(String.format("Open Replicator starting from %s@%d", binlogFile, offset));
338343
_or.start();
344+
_orListener.start();
339345
} catch (Exception e)
340346
{
341347
throw new DatabusRuntimeException("failed to start open replicator: " + e.getMessage(), e);
@@ -382,6 +388,9 @@ public void onEndTransaction(Transaction txn)
382388
{
383389
try
384390
{
391+
// Because the current thread is orListener thread, so shutdown() will block the thread itself.
392+
// So we just set orListener's shutdown flag, the orListener thead will exit immediately after this function.
393+
_orListener.shutdownAsynchronously();
385394
_or.stop(10, TimeUnit.SECONDS);
386395
}
387396
catch (Exception e)

0 commit comments

Comments
 (0)