Skip to content

Commit f2a9ed1

Browse files
committed
process transaction produced by mysql trigger
1 parent 754bb6b commit f2a9ed1

1 file changed

Lines changed: 24 additions & 87 deletions

File tree

  • databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers

databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java

Lines changed: 24 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -95,18 +95,9 @@ public interface TransactionProcessor
9595
public abstract void onEndTransaction(Transaction txn) throws DatabusException;
9696
};
9797

98-
/** Track current table Id as reported in bin logs */
99-
private long _currTableId = -1;
100-
101-
/** Track current table name as reported in bin logs */
102-
private String _currTableName = "";
103-
10498
/** Transaction object containing the current transaction that is being built from the Binlog **/
10599
private Transaction _transaction = null;
106100

107-
/** Batch of events corresponding to single source (table) in the current transaction **/
108-
private PerSourceTransaction _perSourceTransaction = null;
109-
110101
/** Track current file number for generating SCN **/
111102
private int _currFileNum;
112103

@@ -139,6 +130,10 @@ public interface TransactionProcessor
139130
/** Flag to indicate the begining of the txn is seen. Used to indicate **/
140131
private boolean _isBeginTxnSeen = false;
141132

133+
/** Track all the table map events, cleared when the binlog rotated **/
134+
private final Map<Long, TableMapEvent> _tableMapEvents = new HashMap<Long, TableMapEvent>();
135+
136+
/** Shared queue to transfer binlog events from OpenReplicator to ORlistener thread **/
142137
private BlockingQueue<BinlogEventV4> _binlogEventQueue = null;
143138

144139
public ORListener(String name,
@@ -177,43 +172,10 @@ public void onEvents(BinlogEventV4 event)
177172

178173
private void processTableMapEvent(TableMapEvent tme)
179174
{
180-
String newTableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase();
181-
long newTableId = tme.getTableId();
182-
183-
final boolean areTableNamesEqual = _currTableName.equals(newTableName);
184-
final boolean areTableIdsEqual = (_currTableId == newTableId);
185-
final boolean didTableNameChange = !(areTableNamesEqual && areTableIdsEqual);
186-
final boolean errorTransition = (areTableNamesEqual && !areTableIdsEqual) || (!areTableNamesEqual && areTableIdsEqual);
175+
_tableMapEvents.put(tme.getTableId(), tme);
187176

188-
if (_currTableName.isEmpty() && (_currTableId == -1))
189-
{
190-
// First TableMapEvent for the transaction. Indicates the first event in the transaction is yet to come
191-
startSource(newTableName, newTableId);
192-
}
193-
else if (didTableNameChange)
194-
{
195-
// Event will come for a new source. Invoke an endSource on currTableName, and a startSource on newTableName
196-
endSource();
197-
startSource(newTableName, newTableId);
198-
}
199-
else
200-
{
201-
_log.error("Unexpected : TableMap Event obtained :" + tme);
202-
throw new DatabusRuntimeException("Unexpected : TableMap Event obtained :" +
203-
" _currTableName = " + _currTableName +
204-
" _curTableId = " + _currTableId +
205-
" newTableName = " + newTableName +
206-
" newTableId = " + newTableId);
207-
}
208-
209-
if (errorTransition)
210-
{
211-
throw new DatabusRuntimeException("TableName and TableId should change simultaneously or not" +
212-
" _currTableName = " + _currTableName +
213-
" _curTableId = " + _currTableId +
214-
" newTableName = " + newTableName +
215-
" newTableId = " + newTableId);
216-
}
177+
String newTableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase();
178+
startSource(newTableName);
217179
}
218180

219181
private void startXtion(QueryEvent e)
@@ -273,57 +235,29 @@ private void rollbackXtion(QueryEvent e)
273235

274236
private void reset()
275237
{
276-
_currTableName = "";
277-
_currTableId = -1;
278-
_perSourceTransaction = null;
279238
_transaction = null;
280239
_currTxnSizeInBytes = 0;
281240
}
282241

283-
private void startSource(String newTableName, long newTableId)
242+
private void startSource(String newTableName)
284243
{
285-
_currTableName = newTableName;
286-
_currTableId = newTableId;
287-
if (_perSourceTransaction == null)
288-
{
289-
Short srcId = _tableUriToSrcIdMap.get(_currTableName);
244+
Short srcId = _tableUriToSrcIdMap.get(newTableName);
290245

291-
if (null == srcId)
292-
{
293-
throw new DatabusRuntimeException("Could not find a matching logical source for table Uri (" + _currTableName + ")" );
294-
}
295-
assert(_transaction != null);
296-
_perSourceTransaction = new PerSourceTransaction(srcId);
297-
_transaction.mergePerSourceTransaction(_perSourceTransaction);
298-
}
299-
else
300-
{
301-
throw new DatabusRuntimeException("Seems like a startSource has been received without an endSource for previous source");
302-
}
303-
}
304-
305-
private void endSource()
306-
{
307-
if (_perSourceTransaction != null)
308-
{
309-
_perSourceTransaction = null;
310-
}
311-
else
246+
assert (_transaction != null);
247+
if (_transaction.getPerSourceTransaction(srcId) == null)
312248
{
313-
throw new DatabusRuntimeException("_perSourceTransaction should not be null in endSource()");
249+
_transaction.mergePerSourceTransaction(new PerSourceTransaction(srcId));
314250
}
315251
}
316252

317253
private void deleteRows(DeleteRowsEventV2 dre)
318254
{
319-
List<Row> lp = dre.getRows();
320-
frameAvroRecord(dre.getHeader(), lp, DbusOpcode.DELETE);
255+
frameAvroRecord(dre.getTableId(), dre.getHeader(), dre.getRows(), DbusOpcode.DELETE);
321256
}
322257

323258
private void deleteRows(DeleteRowsEvent dre)
324259
{
325-
List<Row> lp = dre.getRows();
326-
frameAvroRecord(dre.getHeader(), lp, DbusOpcode.DELETE);
260+
frameAvroRecord(dre.getTableId(), dre.getHeader(), dre.getRows(), DbusOpcode.DELETE);
327261
}
328262

329263
private void updateRows(UpdateRowsEvent ure)
@@ -335,7 +269,7 @@ private void updateRows(UpdateRowsEvent ure)
335269
Row r = pr.getAfter();
336270
lr.add(r);
337271
}
338-
frameAvroRecord(ure.getHeader(), lr, DbusOpcode.UPSERT);
272+
frameAvroRecord(ure.getTableId(), ure.getHeader(), lr, DbusOpcode.UPSERT);
339273
}
340274

341275
private void updateRows(UpdateRowsEventV2 ure)
@@ -347,27 +281,29 @@ private void updateRows(UpdateRowsEventV2 ure)
347281
Row r = pr.getAfter();
348282
lr.add(r);
349283
}
350-
frameAvroRecord(ure.getHeader(), lr, DbusOpcode.UPSERT);
284+
frameAvroRecord(ure.getTableId(), ure.getHeader(), lr, DbusOpcode.UPSERT);
351285
}
352286

353287
private void insertRows(WriteRowsEvent wre)
354288
{
355-
frameAvroRecord(wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT);
289+
frameAvroRecord(wre.getTableId(), wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT);
356290
}
357291

358292
private void insertRows(WriteRowsEventV2 wre)
359293
{
360-
frameAvroRecord(wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT);
294+
frameAvroRecord(wre.getTableId(), wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT);
361295
}
362296

363-
private void frameAvroRecord(BinlogEventV4Header bh, List<Row> rl, final DbusOpcode doc)
297+
private void frameAvroRecord(long tableId, BinlogEventV4Header bh, List<Row> rl, final DbusOpcode doc)
364298
{
365299
try
366300
{
367301
final long timestampInNanos = bh.getTimestamp() * 1000000L;
368302
final long scn = scn(_currFileNum, (int)bh.getPosition());
369303
final boolean isReplicated = false;
370-
VersionedSchema vs = _schemaRegistryService.fetchLatestVersionedSchemaBySourceName(_tableUriToSrcNameMap.get(_currTableName));
304+
final TableMapEvent tme = _tableMapEvents.get(tableId);
305+
String tableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase();
306+
VersionedSchema vs = _schemaRegistryService.fetchLatestVersionedSchemaBySourceName(_tableUriToSrcNameMap.get(tableName));
371307
Schema schema = vs.getSchema();
372308

373309
if ( _log.isDebugEnabled())
@@ -382,7 +318,7 @@ private void frameAvroRecord(BinlogEventV4Header bh, List<Row> rl, final DbusOpc
382318
List<KeyPair> kps = generateKeyPair(cl, schema);
383319

384320
DbChangeEntry db = new DbChangeEntry(scn, timestampInNanos, gr, doc, isReplicated, schema, kps);
385-
_perSourceTransaction.mergeDbChangeEntrySet(db);
321+
_transaction.getPerSourceTransaction(_tableUriToSrcIdMap.get(tableName)).mergeDbChangeEntrySet(db);
386322
}
387323
} catch (NoSuchSchemaException ne)
388324
{
@@ -734,6 +670,7 @@ else if (event instanceof RotateEvent)
734670
_log.info("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + _binlogFilePrefix);
735671
String fileNumStr = fileName.substring(fileName.lastIndexOf(_binlogFilePrefix) + _binlogFilePrefix.length() + 1);
736672
_currFileNum = Integer.parseInt(fileNumStr);
673+
_tableMapEvents.clear();
737674
continue;
738675
}
739676

0 commit comments

Comments
 (0)