11package com .linkedin .databus2 .producers ;
22
3+ import java .math .BigInteger ;
34import java .nio .ByteBuffer ;
45import java .nio .charset .Charset ;
56import java .sql .Time ;
1617import java .util .concurrent .TimeUnit ;
1718
1819import org .apache .avro .Schema ;
20+ import org .apache .avro .Schema .Field ;
1921import org .apache .avro .generic .GenericData ;
2022import org .apache .avro .generic .GenericRecord ;
2123import org .apache .log4j .Logger ;
@@ -138,6 +140,13 @@ public interface TransactionProcessor
138140
139141 /** Milli sec timeout for _binlogEventQueue operation **/
140142 private long _queueTimeoutMs = 100L ;
143+
144+ /** correct unsigned int type*/
145+ public static final int TINYINT_MAX_VALUE = 256 ;
146+ public static final int SMALLINT_MAX_VALUE = 65536 ;
147+ public static final int MEDIUMINT_MAX_VALUE = 16777216 ;
148+ public static final long INTEGER_MAX_VALUE = 4294967296L ;
149+ public static final BigInteger BIGINT_MAX_VALUE = new BigInteger ("18446744073709551616" );
141150
142151 public ORListener (String name ,
143152 int currentFileNumber ,
@@ -444,7 +453,7 @@ private void insertFieldIntoRecord(
444453 try
445454 {
446455 if (! isFieldNull )
447- fieldValueObj = orToAvroType (fieldValue );
456+ fieldValueObj = orToAvroType (fieldValue , avroField );
448457 else
449458 fieldValueObj = null ;
450459
@@ -461,8 +470,9 @@ private void insertFieldIntoRecord(
461470 /**
462471 * Given a OR Column, it returns a corresponding Java object that can be inserted into
463472 * AVRO record
473+ * @param avroField
464474 */
465- private Object orToAvroType (Column s )
475+ private Object orToAvroType (Column s , Field avroField )
466476 throws DatabusException
467477 {
468478 if (s instanceof BitColumn )
@@ -523,19 +533,31 @@ else if (s instanceof Int24Column)
523533 {
524534 Int24Column ic = (Int24Column ) s ;
525535 Integer i = ic .getValue ();
536+ if (i < 0 && SchemaHelper .getMetaField (avroField , "dbFieldType" ).contains ("UNSIGNED" ))
537+ {
538+ i += ORListener .MEDIUMINT_MAX_VALUE ;
539+ }
526540 return i ;
527541 }
528542 else if (s instanceof LongColumn )
529543 {
530544 LongColumn lc = (LongColumn ) s ;
531- Integer i = lc .getValue ();
532- return i ;
545+ Long l = lc .getValue ().longValue ();
546+ if (l < 0 && SchemaHelper .getMetaField (avroField , "dbFieldType" ).contains ("UNSIGNED" ))
547+ {
548+ l += ORListener .INTEGER_MAX_VALUE ;
549+ }
550+ return l ;
533551 }
534552 else if (s instanceof LongLongColumn )
535553 {
536554 LongLongColumn llc = (LongLongColumn ) s ;
537- Long l = llc .getValue ();
538- return l ;
555+ BigInteger b = new BigInteger (llc .getValue ()+"" );
556+ if (b .compareTo (BigInteger .ZERO ) < 0 && SchemaHelper .getMetaField (avroField , "dbFieldType" ).contains ("UNSIGNED" ))
557+ {
558+ b = b .add (ORListener .BIGINT_MAX_VALUE );
559+ }
560+ return b ;
539561 }
540562 else if (s instanceof NullColumn )
541563 {
@@ -551,6 +573,10 @@ else if (s instanceof ShortColumn)
551573 {
552574 ShortColumn sc = (ShortColumn ) s ;
553575 Integer i = sc .getValue ();
576+ if (i < 0 && SchemaHelper .getMetaField (avroField , "dbFieldType" ).contains ("UNSIGNED" ))
577+ {
578+ i = i + ORListener .SMALLINT_MAX_VALUE ;
579+ }
554580 return i ;
555581 }
556582 else if (s instanceof StringColumn )
@@ -587,6 +613,10 @@ else if (s instanceof TinyColumn)
587613 {
588614 TinyColumn tc = (TinyColumn ) s ;
589615 Integer i = tc .getValue ();
616+ if (i < 0 && SchemaHelper .getMetaField (avroField , "dbFieldType" ).contains ("UNSIGNED" ))
617+ {
618+ i = i + ORListener .TINYINT_MAX_VALUE ;
619+ }
590620 return i ;
591621 }
592622 else if (s instanceof YearColumn )
0 commit comments