Skip to content

Commit 754bb6b

Browse files
committed
add auto reconnect to or and fix rotate bug
1 parent fd6495c commit 754bb6b

2 files changed

Lines changed: 153 additions & 61 deletions

File tree

databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public interface TransactionProcessor
139139
/** Flag to indicate the begining of the txn is seen. Used to indicate **/
140140
private boolean _isBeginTxnSeen = false;
141141

142-
private BlockingQueue<BinlogEventV4> binlogEventQueue = null;
142+
private BlockingQueue<BinlogEventV4> _binlogEventQueue = null;
143143

144144
public ORListener(String name,
145145
int currentFileNumber,
@@ -159,7 +159,7 @@ public ORListener(String name,
159159
_tableUriToSrcNameMap = tableUriToSrcNameMap;
160160
_schemaRegistryService = schemaRegistryService;
161161
_currFileNum = currentFileNumber;
162-
binlogEventQueue = new LinkedBlockingQueue<BinlogEventV4>(maxQueueSize);
162+
_binlogEventQueue = new LinkedBlockingQueue<BinlogEventV4>(maxQueueSize);
163163
}
164164

165165
@Override
@@ -168,14 +168,13 @@ public void onEvents(BinlogEventV4 event)
168168
boolean isPut = false;
169169
do {
170170
try {
171-
isPut = binlogEventQueue.offer(event, 100, TimeUnit.MILLISECONDS);
171+
isPut = _binlogEventQueue.offer(event, 100, TimeUnit.MILLISECONDS);
172172
} catch (InterruptedException e) {
173173
_log.error("failed to put binlog event to binlogEventQueue event: " + event, e);
174174
}
175175
} while (!isPut && !isShutdownRequested());
176176
}
177177

178-
179178
private void processTableMapEvent(TableMapEvent tme)
180179
{
181180
String newTableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase();
@@ -681,18 +680,30 @@ public static long scn(int logId, int offset)
681680
public void run() {
682681
List<BinlogEventV4> eventList = new ArrayList<BinlogEventV4>();
683682
BinlogEventV4 event;
684-
boolean isShutdown = false;
685-
while (!isShutdown)
683+
while (!isShutdownRequested())
686684
{
687-
eventList.clear();
688-
int eventNumber = binlogEventQueue.drainTo(eventList);
689-
for (int i = 0; i < eventNumber; i++)
685+
if (isPauseRequested())
690686
{
691-
if (isShutdownRequested())
687+
LOG.info("Pause requested for ORListener. Pausing !!");
688+
signalPause();
689+
LOG.info("Pausing. Waiting for resume command");
690+
try
691+
{
692+
awaitUnPauseRequest();
693+
}
694+
catch (InterruptedException e)
692695
{
693-
isShutdown = true;
694-
break;
696+
_log.info("Interrupted !!");
695697
}
698+
LOG.info("Resuming ORListener !!");
699+
signalResumed();
700+
LOG.info("ORListener resumed !!");
701+
}
702+
703+
eventList.clear();
704+
int eventNumber = _binlogEventQueue.drainTo(eventList);
705+
for (int i = 0; i < eventNumber; i++)
706+
{
696707

697708
event = eventList.get(i);
698709
if (event == null)
@@ -716,6 +727,15 @@ public void run() {
716727
continue;
717728
}
718729
}
730+
else if (event instanceof RotateEvent)
731+
{
732+
RotateEvent re = (RotateEvent) event;
733+
String fileName = re.getBinlogFileName().toString();
734+
_log.info("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + _binlogFilePrefix);
735+
String fileNumStr = fileName.substring(fileName.lastIndexOf(_binlogFilePrefix) + _binlogFilePrefix.length() + 1);
736+
_currFileNum = Integer.parseInt(fileNumStr);
737+
continue;
738+
}
719739

720740
if ( ! _isBeginTxnSeen )
721741
{
@@ -752,14 +772,6 @@ else if ("ROLLBACK".equalsIgnoreCase(sql))
752772
continue;
753773
}
754774
}
755-
else if (event instanceof RotateEvent)
756-
{
757-
RotateEvent re = (RotateEvent)event;
758-
String fileName = re.getBinlogFileName().toString();
759-
_log.info("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + _binlogFilePrefix);
760-
String fileNumStr = fileName.substring(fileName.lastIndexOf(_binlogFilePrefix) + _binlogFilePrefix.length() + 1);
761-
_currFileNum = Integer.parseInt(fileNumStr);
762-
}
763775
else if (event instanceof XidEvent)
764776
{
765777
XidEvent xe = (XidEvent)event;
@@ -768,6 +780,12 @@ else if (event instanceof XidEvent)
768780
endXtion(xe);
769781
continue;
770782
}
783+
else if (event instanceof FormatDescriptionEvent)
784+
{
785+
// we don't need process this event
786+
_log.info("received FormatDescriptionEvent event");
787+
continue;
788+
}
771789
else if (event instanceof WriteRowsEvent)
772790
{
773791
WriteRowsEvent wre = (WriteRowsEvent)event;
@@ -803,18 +821,13 @@ else if (event instanceof TableMapEvent)
803821
TableMapEvent tme = (TableMapEvent)event;
804822
processTableMapEvent(tme);
805823
}
806-
else if (event instanceof FormatDescriptionEvent)
807-
{
808-
// we don't need process this event
809-
_log.info("receive FormatDescriptionEvent event");
810-
}
811824
else
812825
{
813826
_log.warn("Skipping !! Unknown OR event e: " + event);
814827
continue;
815828
}
816829

817-
if ( _log.isDebugEnabled())
830+
if (_log.isDebugEnabled())
818831
{
819832
_log.debug("e: " + event);
820833
}
@@ -824,5 +837,6 @@ else if (event instanceof FormatDescriptionEvent)
824837
}
825838
}
826839
_log.info("ORListener Thread done");
840+
doShutdownNotify();
827841
}
828842
}

databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java

Lines changed: 113 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -312,93 +312,171 @@ public class EventProducerThread extends DatabusThreadBase implements Transactio
312312

313313
private String _sourceName;
314314

315+
private final long _reconnectIntervalMs = 2000;
316+
private final long _workIntervalMs = 100;
317+
315318
public EventProducerThread(String sourceName, long sinceScn)
316319
{
317320
super("OpenReplicator_" + sourceName);
318321
_sourceName = sourceName;
319322
_sinceScn = sinceScn;
320323
}
321324

322-
@Override
323-
public void run()
325+
void initOpenReplicator(long scn)
324326
{
325-
_eventBuffer.start(_sinceScn);
326-
_startPrevScn.set(_sinceScn);
327-
328-
int offset = offset(_sinceScn);
329-
int logid = logid(_sinceScn);
327+
int offset = offset(scn);
328+
int logid = logid(scn);
330329

331330
String binlogFile = String.format("%s.%06d", _binlogFilePrefix, logid);
332-
_orListener = new ORListener(_sourceName, logid, _log,
333-
_binlogFilePrefix, _producerThread, _tableUriToSrcIdMap,
331+
// we should use a new ORListener to drop the left events in binlogEventQueue and the half processed transaction.
332+
_orListener = new ORListener(_sourceName, logid, _log, _binlogFilePrefix, _producerThread, _tableUriToSrcIdMap,
334333
_tableUriToSrcNameMap, _schemaRegistryService, 200);
335334

336335
_or.setBinlogFileName(binlogFile);
337336
_or.setBinlogPosition(offset);
338337
_or.setBinlogEventListener(_orListener);
339338

339+
//must set transport and binlogParser to null to drop the old connection environment in reinit case
340+
_or.setTransport(null);
341+
_or.setBinlogParser(null);
342+
343+
_log.info(String.format("Open Replicator starting from %s@%d", binlogFile, offset));
344+
}
345+
346+
@Override
347+
public void run()
348+
{
349+
_eventBuffer.start(_sinceScn);
350+
_startPrevScn.set(_sinceScn);
351+
352+
initOpenReplicator(_sinceScn);
340353
try
341354
{
342-
_log.info(String.format("Open Replicator starting from %s@%d", binlogFile, offset));
343355
_or.start();
344356
_orListener.start();
345357
} catch (Exception e)
346358
{
347359
throw new DatabusRuntimeException("failed to start open replicator: " + e.getMessage(), e);
348360
}
349-
_log.info("Event Producer Thread done");
350361

351-
}
352-
353-
@Override
354-
public void onEndTransaction(Transaction txn)
355-
throws DatabusException
356-
{
357-
if (! isShutdownRequested())
362+
long lastConnectMs = System.currentTimeMillis();
363+
while (!isShutdownRequested())
358364
{
359365
if (isPauseRequested())
360366
{
361367
LOG.info("Pause requested for OpenReplicator. Pausing !!");
362368
signalPause();
363369
LOG.info("Pausing. Waiting for resume command");
364-
try { awaitUnPauseRequest(); } catch (InterruptedException e) { _log.info("Interrupted !!"); }
370+
try
371+
{
372+
if (_orListener.isAlive())
373+
{
374+
_orListener.pause();
375+
}
376+
awaitUnPauseRequest();
377+
}
378+
catch (InterruptedException e)
379+
{
380+
_log.info("Interrupted !!");
381+
}
365382
LOG.info("Resuming OpenReplicator !!");
383+
if (_orListener.isAlive())
384+
{
385+
try
386+
{
387+
_orListener.unpause();
388+
}
389+
catch (InterruptedException e)
390+
{
391+
_log.info("Interrupted !!");
392+
}
393+
}
366394
signalResumed();
367395
LOG.info("OpenReplicator resumed !!");
368396
}
369397

370-
try
398+
if (!_or.isRunning() && System.currentTimeMillis() - lastConnectMs > _reconnectIntervalMs)
371399
{
372-
addTxnToBuffer(txn);
373-
_maxSCNReaderWriter.saveMaxScn(txn.getScn());
400+
lastConnectMs = System.currentTimeMillis();
401+
try
402+
{
403+
//should stop orListener first to get the final maxScn used for init open replicator.
404+
if (_orListener.isAlive())
405+
{
406+
_orListener.shutdown();
407+
}
408+
long maxScn = _maxSCNReaderWriter.getMaxScn();
409+
_startPrevScn.set(maxScn);
410+
initOpenReplicator(maxScn);
411+
_or.start();
412+
_orListener.start();
413+
_log.info("start Open Replicator successfully");
414+
}
415+
catch (Exception e)
416+
{
417+
_log.error("failed to start Open Replicator", e);
418+
if (_or.isRunning())
419+
{
420+
try
421+
{
422+
_or.stop(10, TimeUnit.SECONDS);
423+
}
424+
catch (Exception e2)
425+
{
426+
_log.error("failed to stop Open Replicator", e2);
427+
}
428+
}
429+
}
374430
}
375-
catch (UnsupportedKeyException e)
431+
432+
try
376433
{
377-
_log.fatal("Got UnsupportedKeyException exception while adding txn (" + txn + ") to the buffer", e);
378-
throw new DatabusException(e);
434+
Thread.sleep(_workIntervalMs);
379435
}
380-
catch (EventCreationException e)
436+
catch (InterruptedException e)
381437
{
382-
_log.fatal("Got EventCreationException exception while adding txn (" + txn + ") to the buffer", e);
383-
throw new DatabusException(e);
438+
_log.info("Interrupted !!");
384439
}
385440
}
386441

387-
if (isShutdownRequested())
442+
if (_or.isRunning())
388443
{
389444
try
390445
{
391-
// Because the current thread is orListener thread, so shutdown() will block the thread itself.
392-
// So we just set orListener's shutdown flag, the orListener thead will exit immediately after this function.
393-
_orListener.shutdownAsynchronously();
394446
_or.stop(10, TimeUnit.SECONDS);
395447
}
396448
catch (Exception e)
397449
{
398-
_log.error("Got exception while stopping open replicator",e);
450+
_log.error("failed to stop Open Replicator", e);
399451
}
400-
doShutdownNotify();
401-
return;
452+
}
453+
if (_orListener.isAlive())
454+
{
455+
_orListener.shutdown();
456+
}
457+
458+
_log.info("Event Producer Thread done");
459+
doShutdownNotify();
460+
}
461+
462+
@Override
463+
public void onEndTransaction(Transaction txn)
464+
throws DatabusException
465+
{
466+
try
467+
{
468+
addTxnToBuffer(txn);
469+
_maxSCNReaderWriter.saveMaxScn(txn.getScn());
470+
}
471+
catch (UnsupportedKeyException e)
472+
{
473+
_log.fatal("Got UnsupportedKeyException exception while adding txn (" + txn + ") to the buffer", e);
474+
throw new DatabusException(e);
475+
}
476+
catch (EventCreationException e)
477+
{
478+
_log.fatal("Got EventCreationException exception while adding txn (" + txn + ") to the buffer", e);
479+
throw new DatabusException(e);
402480
}
403481
}
404482

0 commit comments

Comments
 (0)