Skip to content

Commit 72331f1

Browse files
buptyzcchavdar
authored andcommitted
Refactor or to avro method (#101)
* refactor orToAvro method * style format * use single instance * code simplification
1 parent 85255c3 commit 72331f1

2 files changed

Lines changed: 342 additions & 103 deletions

File tree

databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java

Lines changed: 5 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import com.linkedin.databus2.producers.ds.PerSourceTransaction;
5252
import com.linkedin.databus2.producers.ds.PrimaryKeySchema;
5353
import com.linkedin.databus2.producers.ds.Transaction;
54+
import com.linkedin.databus2.producers.util.Or2AvroConvert;
5455
import com.linkedin.databus2.schemas.NoSuchSchemaException;
5556
import com.linkedin.databus2.schemas.SchemaRegistryService;
5657
import com.linkedin.databus2.schemas.VersionedSchema;
@@ -510,113 +511,14 @@ private void insertFieldIntoRecord(
510511
private Object orToAvroType(Column s, Field avroField)
511512
throws DatabusException
512513
{
513-
if (s instanceof NullColumn)
514+
try
514515
{
515-
return null;
516+
return Or2AvroConvert.convert(s, avroField);
516517
}
517-
String fieldTypeSchemaStr = avroField.schema().toString();
518-
if (fieldTypeSchemaStr.contains("int"))
518+
catch (Exception e)
519519
{
520-
return Integer.parseInt(s.getValue() + "") + (int) unsignedOffset(s, avroField);
520+
throw new DatabusRuntimeException("Unknown MySQL type in the event" + s.getClass() + " Object = " + s, e);
521521
}
522-
if (fieldTypeSchemaStr.contains("long"))
523-
{
524-
if (s.getValue() instanceof Date)
525-
{
526-
return ((Date) s.getValue()).getTime();
527-
}
528-
if (s instanceof LongLongColumn)
529-
{
530-
LongLongColumn llc = (LongLongColumn) s;
531-
BigInteger b = new BigInteger(llc.getValue() + "");
532-
return b.add(new BigInteger(unsignedOffset(s, avroField) + "")).longValue();
533-
}
534-
return Long.parseLong(s.getValue() + "") + unsignedOffset(s, avroField);
535-
}
536-
if (fieldTypeSchemaStr.contains("double"))
537-
{
538-
return Double.parseDouble(s.getValue() + "");
539-
}
540-
if (fieldTypeSchemaStr.contains("string"))
541-
{
542-
if (s.getValue() instanceof byte[])
543-
{
544-
return new String((byte[]) s.getValue(), Charset.defaultCharset());
545-
}
546-
return s.getValue() + "";
547-
}
548-
if (fieldTypeSchemaStr.contains("bytes"))
549-
{
550-
if (!(s.getValue() instanceof byte[]))
551-
{
552-
throw new DatabusException(avroField.name()+" need convert to bytes,but the column type is " + s.getClass());
553-
}
554-
byte[] byteArr = (byte[]) s.getValue();
555-
return ByteBuffer.wrap(byteArr);
556-
}
557-
if (fieldTypeSchemaStr.contains("float"))
558-
{
559-
return Float.parseFloat(s.getValue() + "");
560-
}
561-
else
562-
{
563-
throw new DatabusRuntimeException("Unknown schema type " + fieldTypeSchemaStr + " Object = " + s);
564-
}
565-
}
566-
567-
private long unsignedOffset(Column s, Field avroField)
568-
{
569-
if (s instanceof Int24Column)
570-
{
571-
Int24Column ic = (Int24Column) s;
572-
Integer i = ic.getValue();
573-
if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED"))
574-
{
575-
return ORListener.MEDIUMINT_MAX_VALUE;
576-
}
577-
return 0;
578-
}
579-
else if (s instanceof LongColumn)
580-
{
581-
LongColumn lc = (LongColumn) s;
582-
Long i = lc.getValue().longValue();
583-
if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED"))
584-
{
585-
return ORListener.INTEGER_MAX_VALUE;
586-
}
587-
return 0;
588-
}
589-
else if (s instanceof LongLongColumn)
590-
{
591-
LongLongColumn llc = (LongLongColumn) s;
592-
Long l = llc.getValue();
593-
if (l < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED"))
594-
{
595-
return ORListener.BIGINT_MAX_VALUE.longValue();
596-
}
597-
return 0;
598-
}
599-
else if (s instanceof ShortColumn)
600-
{
601-
ShortColumn sc = (ShortColumn) s;
602-
Integer i = sc.getValue();
603-
if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED"))
604-
{
605-
return ORListener.SMALLINT_MAX_VALUE;
606-
}
607-
return 0;
608-
}
609-
else if (s instanceof TinyColumn)
610-
{
611-
TinyColumn tc = (TinyColumn) s;
612-
Integer i = tc.getValue();
613-
if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED"))
614-
{
615-
return ORListener.TINYINT_MAX_VALUE;
616-
}
617-
return 0;
618-
}
619-
return 0;
620522
}
621523

622524
/**

0 commit comments

Comments
 (0)