Skip to content

Commit fa22d67

Browse files
handmadecodejnturton
authored andcommitted
DRILL-8421: Parquet microsecond columns (#2793)
* Read parquet TIME_MICROS columns as 64-bit values before truncating to 32-bits * Truncate parquet min and max metadata values for microsecond columns to milliseconds * Express parquet TIME_MICROS metadata as Integer values
1 parent 20f9f2c commit fa22d67

6 files changed

Lines changed: 379 additions & 5 deletions

File tree

exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetTableMetadataUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,11 @@ public static Object getValue(Object value, PrimitiveType.PrimitiveTypeName prim
344344
case INT64:
345345
if (originalType == OriginalType.DECIMAL) {
346346
return BigInteger.valueOf(getLong(value));
347+
} else if (originalType == OriginalType.TIME_MICROS) {
348+
return getInt(value);
347349
} else {
348350
return getLong(value);
349351
}
350-
351352
case FLOAT:
352353
return getFloat(value);
353354

exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ static class NullableDictionaryTimeMicrosReader extends NullableColumnReader<Nul
193193
protected void readField(long recordsToReadInThisPass) {
194194
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
195195
for (int i = 0; i < recordsToReadInThisPass; i++) {
196-
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger() / 1000);
196+
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000));
197197
}
198198
}
199199
}

exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,13 @@ protected void readField(long recordsToReadInThisPass) {
168168
if (recordsRequireDecoding()) {
169169
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
170170
for (int i = 0; i < recordsReadInThisIteration; i++) {
171-
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger() / 1000);
171+
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000));
172172
}
173173
} else {
174174
int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
175175
for (int i = 0; i < recordsReadInThisIteration; i++) {
176-
int value = pageReader.pageData.getInt((int) readStartInBytes + i * dataTypeLengthInBytes);
177-
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, value / 1000);
176+
long value = pageReader.pageData.getLong((int) readStartInBytes + i * dataTypeLengthInBytes);
177+
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (value / 1000));
178178
}
179179
}
180180
}

exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/FileMetadataCollector.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@ private void addColumnMetadata(String[] columnName,
208208
minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue);
209209
maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue);
210210
}
211+
if (isMicrosecondColumnType(columnTypeMetadata.originalType)) {
212+
// DRILL-8241: truncate the min/max of microsecond columns to milliseconds, otherwise the
213+
// initial scanning of files when filtering will compare to the wrong values.
214+
minValue = truncateMicros(minValue);
215+
maxValue = truncateMicros(maxValue);
216+
}
211217
}
212218
long numNulls = stats.getNumNulls();
213219
Metadata_V4.ColumnMetadata_v4 columnMetadata = new Metadata_V4.ColumnMetadata_v4(columnTypeMetadata.name,
@@ -218,6 +224,18 @@ private void addColumnMetadata(String[] columnName,
218224
columnTypeInfo.put(columnTypeMetadataKey, columnTypeMetadata);
219225
}
220226

227+
private static boolean isMicrosecondColumnType(OriginalType columnType) {
228+
return columnType == OriginalType.TIME_MICROS || columnType == OriginalType.TIMESTAMP_MICROS;
229+
}
230+
231+
private static Object truncateMicros(Object microSeconds) {
232+
if (microSeconds instanceof Number) {
233+
return Long.valueOf(((Number) microSeconds).longValue() / 1000);
234+
} else {
235+
return microSeconds;
236+
}
237+
}
238+
221239
/**
222240
* Get the host affinity for a row group.
223241
*

0 commit comments

Comments
 (0)