@@ -95,18 +95,9 @@ public interface TransactionProcessor
9595 public abstract void onEndTransaction (Transaction txn ) throws DatabusException ;
9696 };
9797
98- /** Track current table Id as reported in bin logs */
99- private long _currTableId = -1 ;
100-
101- /** Track current table name as reported in bin logs */
102- private String _currTableName = "" ;
103-
10498 /** Transaction object containing the current transaction that is being built from the Binlog **/
10599 private Transaction _transaction = null ;
106100
107- /** Batch of events corresponding to single source (table) in the current transaction **/
108- private PerSourceTransaction _perSourceTransaction = null ;
109-
110101 /** Track current file number for generating SCN **/
111102 private int _currFileNum ;
112103
@@ -139,6 +130,10 @@ public interface TransactionProcessor
139130 /** Flag to indicate the begining of the txn is seen. Used to indicate **/
140131 private boolean _isBeginTxnSeen = false ;
141132
133+ /** Track all the table map events, cleared when the binlog rotated **/
134+ private final Map <Long , TableMapEvent > _tableMapEvents = new HashMap <Long , TableMapEvent >();
135+
136+ /** Shared queue to transfer binlog events from OpenReplicator to ORlistener thread **/
142137 private BlockingQueue <BinlogEventV4 > _binlogEventQueue = null ;
143138
144139 public ORListener (String name ,
@@ -177,43 +172,10 @@ public void onEvents(BinlogEventV4 event)
177172
178173 private void processTableMapEvent (TableMapEvent tme )
179174 {
180- String newTableName = tme .getDatabaseName ().toString ().toLowerCase () + "." + tme .getTableName ().toString ().toLowerCase ();
181- long newTableId = tme .getTableId ();
182-
183- final boolean areTableNamesEqual = _currTableName .equals (newTableName );
184- final boolean areTableIdsEqual = (_currTableId == newTableId );
185- final boolean didTableNameChange = !(areTableNamesEqual && areTableIdsEqual );
186- final boolean errorTransition = (areTableNamesEqual && !areTableIdsEqual ) || (!areTableNamesEqual && areTableIdsEqual );
175+ _tableMapEvents .put (tme .getTableId (), tme );
187176
188- if (_currTableName .isEmpty () && (_currTableId == -1 ))
189- {
190- // First TableMapEvent for the transaction. Indicates the first event in the transaction is yet to come
191- startSource (newTableName , newTableId );
192- }
193- else if (didTableNameChange )
194- {
195- // Event will come for a new source. Invoke an endSource on currTableName, and a startSource on newTableName
196- endSource ();
197- startSource (newTableName , newTableId );
198- }
199- else
200- {
201- _log .error ("Unexpected : TableMap Event obtained :" + tme );
202- throw new DatabusRuntimeException ("Unexpected : TableMap Event obtained :" +
203- " _currTableName = " + _currTableName +
204- " _curTableId = " + _currTableId +
205- " newTableName = " + newTableName +
206- " newTableId = " + newTableId );
207- }
208-
209- if (errorTransition )
210- {
211- throw new DatabusRuntimeException ("TableName and TableId should change simultaneously or not" +
212- " _currTableName = " + _currTableName +
213- " _curTableId = " + _currTableId +
214- " newTableName = " + newTableName +
215- " newTableId = " + newTableId );
216- }
177+ String newTableName = tme .getDatabaseName ().toString ().toLowerCase () + "." + tme .getTableName ().toString ().toLowerCase ();
178+ startSource (newTableName );
217179 }
218180
219181 private void startXtion (QueryEvent e )
@@ -273,57 +235,29 @@ private void rollbackXtion(QueryEvent e)
273235
274236 private void reset ()
275237 {
276- _currTableName = "" ;
277- _currTableId = -1 ;
278- _perSourceTransaction = null ;
279238 _transaction = null ;
280239 _currTxnSizeInBytes = 0 ;
281240 }
282241
283- private void startSource (String newTableName , long newTableId )
242+ private void startSource (String newTableName )
284243 {
285- _currTableName = newTableName ;
286- _currTableId = newTableId ;
287- if (_perSourceTransaction == null )
288- {
289- Short srcId = _tableUriToSrcIdMap .get (_currTableName );
244+ Short srcId = _tableUriToSrcIdMap .get (newTableName );
290245
291- if (null == srcId )
292- {
293- throw new DatabusRuntimeException ("Could not find a matching logical source for table Uri (" + _currTableName + ")" );
294- }
295- assert (_transaction != null );
296- _perSourceTransaction = new PerSourceTransaction (srcId );
297- _transaction .mergePerSourceTransaction (_perSourceTransaction );
298- }
299- else
300- {
301- throw new DatabusRuntimeException ("Seems like a startSource has been received without an endSource for previous source" );
302- }
303- }
304-
305- private void endSource ()
306- {
307- if (_perSourceTransaction != null )
308- {
309- _perSourceTransaction = null ;
310- }
311- else
246+ assert (_transaction != null );
247+ if (_transaction .getPerSourceTransaction (srcId ) == null )
312248 {
313- throw new DatabusRuntimeException ( "_perSourceTransaction should not be null in endSource()" );
249+ _transaction . mergePerSourceTransaction ( new PerSourceTransaction ( srcId ) );
314250 }
315251 }
316252
317253 private void deleteRows (DeleteRowsEventV2 dre )
318254 {
319- List <Row > lp = dre .getRows ();
320- frameAvroRecord (dre .getHeader (), lp , DbusOpcode .DELETE );
255+ frameAvroRecord (dre .getTableId (), dre .getHeader (), dre .getRows (), DbusOpcode .DELETE );
321256 }
322257
323258 private void deleteRows (DeleteRowsEvent dre )
324259 {
325- List <Row > lp = dre .getRows ();
326- frameAvroRecord (dre .getHeader (), lp , DbusOpcode .DELETE );
260+ frameAvroRecord (dre .getTableId (), dre .getHeader (), dre .getRows (), DbusOpcode .DELETE );
327261 }
328262
329263 private void updateRows (UpdateRowsEvent ure )
@@ -335,7 +269,7 @@ private void updateRows(UpdateRowsEvent ure)
335269 Row r = pr .getAfter ();
336270 lr .add (r );
337271 }
338- frameAvroRecord (ure .getHeader (), lr , DbusOpcode .UPSERT );
272+ frameAvroRecord (ure .getTableId (), ure . getHeader (), lr , DbusOpcode .UPSERT );
339273 }
340274
341275 private void updateRows (UpdateRowsEventV2 ure )
@@ -347,27 +281,29 @@ private void updateRows(UpdateRowsEventV2 ure)
347281 Row r = pr .getAfter ();
348282 lr .add (r );
349283 }
350- frameAvroRecord (ure .getHeader (), lr , DbusOpcode .UPSERT );
284+ frameAvroRecord (ure .getTableId (), ure . getHeader (), lr , DbusOpcode .UPSERT );
351285 }
352286
353287 private void insertRows (WriteRowsEvent wre )
354288 {
355- frameAvroRecord (wre .getHeader (), wre .getRows (), DbusOpcode .UPSERT );
289+ frameAvroRecord (wre .getTableId (), wre . getHeader (), wre .getRows (), DbusOpcode .UPSERT );
356290 }
357291
358292 private void insertRows (WriteRowsEventV2 wre )
359293 {
360- frameAvroRecord (wre .getHeader (), wre .getRows (), DbusOpcode .UPSERT );
294+ frameAvroRecord (wre .getTableId (), wre . getHeader (), wre .getRows (), DbusOpcode .UPSERT );
361295 }
362296
363- private void frameAvroRecord (BinlogEventV4Header bh , List <Row > rl , final DbusOpcode doc )
297+ private void frameAvroRecord (long tableId , BinlogEventV4Header bh , List <Row > rl , final DbusOpcode doc )
364298 {
365299 try
366300 {
367301 final long timestampInNanos = bh .getTimestamp () * 1000000L ;
368302 final long scn = scn (_currFileNum , (int )bh .getPosition ());
369303 final boolean isReplicated = false ;
370- VersionedSchema vs = _schemaRegistryService .fetchLatestVersionedSchemaBySourceName (_tableUriToSrcNameMap .get (_currTableName ));
304+ final TableMapEvent tme = _tableMapEvents .get (tableId );
305+ String tableName = tme .getDatabaseName ().toString ().toLowerCase () + "." + tme .getTableName ().toString ().toLowerCase ();
306+ VersionedSchema vs = _schemaRegistryService .fetchLatestVersionedSchemaBySourceName (_tableUriToSrcNameMap .get (tableName ));
371307 Schema schema = vs .getSchema ();
372308
373309 if ( _log .isDebugEnabled ())
@@ -382,7 +318,7 @@ private void frameAvroRecord(BinlogEventV4Header bh, List<Row> rl, final DbusOpc
382318 List <KeyPair > kps = generateKeyPair (cl , schema );
383319
384320 DbChangeEntry db = new DbChangeEntry (scn , timestampInNanos , gr , doc , isReplicated , schema , kps );
385- _perSourceTransaction .mergeDbChangeEntrySet (db );
321+ _transaction . getPerSourceTransaction ( _tableUriToSrcIdMap . get ( tableName )) .mergeDbChangeEntrySet (db );
386322 }
387323 } catch (NoSuchSchemaException ne )
388324 {
@@ -734,6 +670,7 @@ else if (event instanceof RotateEvent)
734670 _log .info ("File Rotated : FileName :" + fileName + ", _binlogFilePrefix :" + _binlogFilePrefix );
735671 String fileNumStr = fileName .substring (fileName .lastIndexOf (_binlogFilePrefix ) + _binlogFilePrefix .length () + 1 );
736672 _currFileNum = Integer .parseInt (fileNumStr );
673+ _tableMapEvents .clear ();
737674 continue ;
738675 }
739676
0 commit comments