diff --git a/paimon-mosaic/pom.xml b/paimon-mosaic/pom.xml new file mode 100644 index 000000000000..95c11789cf95 --- /dev/null +++ b/paimon-mosaic/pom.xml @@ -0,0 +1,86 @@ + + + + 4.0.0 + + + paimon-parent + org.apache.paimon + 1.5-SNAPSHOT + + + paimon-mosaic + Paimon : Mosaic Format + + + + org.apache.paimon + mosaic + 0.1.0-SNAPSHOT + + + + org.apache.paimon + paimon-arrow + ${project.version} + + + + org.apache.paimon + paimon-common + ${project.version} + provided + + + + org.apache.paimon + paimon-core + ${project.version} + provided + + + + + + org.apache.paimon + paimon-common + ${project.version} + test-jar + test + + + + org.apache.paimon + paimon-test-utils + ${project.version} + test + + + + org.apache.paimon + paimon-core + ${project.version} + test-jar + test + + + diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormat.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormat.java new file mode 100644 index 000000000000..bff850a4e06c --- /dev/null +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormat.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BlobType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +/** Mosaic {@link FileFormat}. */ +public class MosaicFileFormat extends FileFormat { + + public static final ConfigOption STATS_COLUMNS = + ConfigOptions.key("mosaic.stats-columns") + .stringType() + .defaultValue("") + .withDescription( + "Comma-separated list of column names to collect statistics for. " + + "Empty means no statistics collection."); + + public static final ConfigOption NUM_BUCKETS = + ConfigOptions.key("mosaic.num-buckets") + .intType() + .noDefaultValue() + .withDescription("Number of column buckets for parallel IO."); + + static { + System.setProperty("arrow.enable_unsafe_memory_access", "true"); + } + + private final FileFormatFactory.FormatContext formatContext; + + public MosaicFileFormat(FileFormatFactory.FormatContext formatContext) { + super("mosaic"); + this.formatContext = formatContext; + } + + @Override + public FormatReaderFactory createReaderFactory( + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List predicates) { + return new MosaicReaderFactory(dataSchemaRowType, projectedRowType, predicates); + } + + @Override + public FormatWriterFactory createWriterFactory(RowType type) { + return new MosaicWriterFactory(type, formatContext); + } + + @Override + public void validateDataFields(RowType rowType) { + MosaicRowTypeVisitor visitor = new MosaicRowTypeVisitor(); + for (DataType fieldType : rowType.getFieldTypes()) { + fieldType.accept(visitor); + } + } + + @Override + public Optional createStatsExtractor( + RowType type, SimpleColStatsCollector.Factory[] statsCollectors) { + return Optional.of(new MosaicSimpleStatsExtractor(type, statsCollectors)); + } + + static class MosaicRowTypeVisitor implements DataTypeVisitor { + + @Override + public Void visit(CharType charType) { + return null; + } + + @Override + public Void visit(VarCharType varCharType) { + return null; + } + + @Override + public Void visit(BooleanType booleanType) { + return null; + } + + @Override + public Void visit(BinaryType binaryType) { + return null; + } + + @Override + public Void visit(VarBinaryType varBinaryType) { + return null; + } + + @Override + public Void visit(DecimalType decimalType) { + return null; + } + + @Override + public Void visit(TinyIntType tinyIntType) { + return null; + } + + @Override + public Void visit(SmallIntType smallIntType) { + return null; + } + + @Override + public Void visit(IntType intType) { + return null; + } + + @Override + public Void visit(BigIntType bigIntType) { + return null; + } + + @Override + public Void visit(FloatType floatType) { + return null; + } + + @Override + public Void visit(DoubleType doubleType) { + return null; + } + + @Override + public Void visit(DateType dateType) { + return null; + } + + @Override + public Void visit(TimeType timeType) { + return null; + } + + @Override + public Void visit(TimestampType timestampType) { + return null; + } + + @Override + public Void visit(LocalZonedTimestampType localZonedTimestampType) { + return null; + } + + @Override + public Void visit(VariantType variantType) { + throw new UnsupportedOperationException( + "Mosaic file format does not support type VARIANT"); + } + + @Override + public Void visit(BlobType blobType) { + throw new UnsupportedOperationException( + "Mosaic file format does not support type BLOB"); + } + + @Override + public Void visit(ArrayType arrayType) { + throw new UnsupportedOperationException( + "Mosaic file format does not support type ARRAY"); + } + + @Override + public Void visit(VectorType vectorType) { + throw new UnsupportedOperationException( + "Mosaic file format does not support type VECTOR"); + } + + @Override + public Void visit(MultisetType multisetType) { + throw new UnsupportedOperationException( + "Mosaic file format does not support type MULTISET"); + } + + @Override + public Void visit(MapType mapType) { + throw new UnsupportedOperationException("Mosaic file format does not support type MAP"); + } + + @Override + public Void visit(RowType rowType) { + throw new UnsupportedOperationException("Mosaic file format does not support type ROW"); + } + } +} diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormatFactory.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormatFactory.java new file mode 100644 index 000000000000..782faba3e8f9 --- /dev/null +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicFileFormatFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory; + +/** Factory to create {@link MosaicFileFormat}. */ +public class MosaicFileFormatFactory implements FileFormatFactory { + + public static final String IDENTIFIER = "mosaic"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public FileFormat create(FormatContext formatContext) { + return new MosaicFileFormat(formatContext); + } +} diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicInputFileAdapter.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicInputFileAdapter.java new file mode 100644 index 000000000000..3a307ea0f296 --- /dev/null +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicInputFileAdapter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.VectoredReadable; +import org.apache.paimon.mosaic.InputFile; + +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; + +/** + * Adapts Paimon's {@link FileIO} to Mosaic's {@link InputFile} interface. + * + *

Maintains a single {@link SeekableInputStream}. If the stream implements {@link + * VectoredReadable}, reads use {@link VectoredReadable#preadFully} which is thread-safe. Otherwise, + * reads are synchronized to protect seek+read sequences. + */ +public class MosaicInputFileAdapter implements InputFile, Closeable { + + private final Path path; + private final SeekableInputStream in; + private final VectoredReadable vectoredReadable; + + public MosaicInputFileAdapter(FileIO fileIO, Path path) throws IOException { + this.path = path; + this.in = fileIO.newInputStream(path); + this.vectoredReadable = in instanceof VectoredReadable ? (VectoredReadable) in : null; + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { + if (vectoredReadable != null) { + vectoredReadable.preadFully(position, buffer, offset, length); + } else { + synchronized (in) { + in.seek(position); + int remaining = length; + int off = offset; + while (remaining > 0) { + int read = in.read(buffer, off, remaining); + if (read < 0) { + throw new EOFException( + "Reached end of file while reading " + + path + + " at position " + + position); + } + off += read; + remaining -= read; + } + } + } + } + + @Override + public void close() throws IOException { + in.close(); + } +} diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicObjects.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicObjects.java new file mode 100644 index 000000000000..54d15c43c09e --- /dev/null +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicObjects.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.TimestampType; + +import javax.annotation.Nullable; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; + +/** Converts Mosaic's byte[] statistics to Paimon objects. */ +public class MosaicObjects { + + @Nullable + public static Object convertStatsValue(byte[] bytes, DataType dataType) { + if (bytes == null) { + return null; + } + switch (dataType.getTypeRoot()) { + case CHAR: + case VARCHAR: + return BinaryString.fromBytes(bytes); + case BINARY: + case VARBINARY: + return bytes; + default: + break; + } + if (bytes.length == 0) { + return null; + } + ByteBuffer buf = ByteBuffer.wrap(bytes); + switch (dataType.getTypeRoot()) { + case BOOLEAN: + return bytes[0] != 0; + case TINYINT: + return bytes[0]; + case SMALLINT: + return buf.getShort(); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return buf.getInt(); + case BIGINT: + return buf.getLong(); + case FLOAT: + return buf.getFloat(); + case DOUBLE: + return buf.getDouble(); + case DECIMAL: + DecimalType decimalType = (DecimalType) dataType; + BigInteger unscaled = new BigInteger(bytes); + BigDecimal decimal = new BigDecimal(unscaled, decimalType.getScale()); + return Decimal.fromBigDecimal( + decimal, decimalType.getPrecision(), decimalType.getScale()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return convertTimestamp(buf, ((TimestampType) dataType).getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertTimestamp(buf, ((LocalZonedTimestampType) dataType).getPrecision()); + default: + return null; + } + } + + private static Timestamp convertTimestamp(ByteBuffer buf, int precision) { + if (precision <= 3) { + return Timestamp.fromEpochMillis(buf.getLong()); + } else if (precision <= 6) { + return Timestamp.fromMicros(buf.getLong()); + } else { + // precision 7-9: 12 bytes = i64 millis (BE) + i32 nanos_of_milli (BE) + long millis = buf.getLong(); + int nanosOfMilli = buf.getInt(); + return Timestamp.fromEpochMillis(millis, nanosOfMilli); + } + } + + private MosaicObjects() {} +} diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicReaderFactory.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicReaderFactory.java new file mode 100644 index 000000000000..5b39c867e290 --- /dev/null +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicReaderFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.List; + +/** A factory to create Mosaic reader. */ +public class MosaicReaderFactory implements FormatReaderFactory { + + private final RowType dataSchemaRowType; + private final RowType projectedRowType; + @Nullable private final List predicates; + + public MosaicReaderFactory( + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List predicates) { + this.dataSchemaRowType = dataSchemaRowType; + this.projectedRowType = projectedRowType; + this.predicates = predicates; + } + + @Override + public FileRecordReader createReader(Context context) throws IOException { + MosaicInputFileAdapter inputFile = + new MosaicInputFileAdapter(context.fileIO(), context.filePath()); + return new MosaicRecordsReader( + inputFile, + context.fileSize(), + dataSchemaRowType, + projectedRowType, + predicates, + context.filePath()); + } +} diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java new file mode 100644 index 000000000000..24cdcbf05c96 --- /dev/null +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsReader.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.arrow.reader.ArrowBatchReader; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.mosaic.ColumnStatistics; +import org.apache.paimon.mosaic.MosaicReader; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.paimon.format.mosaic.MosaicObjects.convertStatsValue; + +/** File reader for Mosaic format. */ +public class MosaicRecordsReader implements FileRecordReader { + + private final MosaicInputFileAdapter inputFileAdapter; + private final MosaicReader reader; + private final ArrowBatchReader arrowBatchReader; + private final Path filePath; + private final BufferAllocator allocator; + private final int numRowGroups; + private final RowType dataSchemaRowType; + @Nullable private final List predicates; + + private int currentRowGroup; + private long returnedPosition = -1; + private VectorSchemaRoot currentVsr; + + public MosaicRecordsReader( + MosaicInputFileAdapter inputFileAdapter, + long fileSize, + RowType dataSchemaRowType, + RowType projectedRowType, + @Nullable List predicates, + Path filePath) { + this.filePath = filePath; + this.inputFileAdapter = inputFileAdapter; + this.dataSchemaRowType = dataSchemaRowType; + this.predicates = predicates; + this.allocator = new RootAllocator(); + + try { + this.reader = MosaicReader.open(inputFileAdapter, fileSize, allocator); + } catch (Exception e) { + allocator.close(); + throw e; + } + + Schema fileSchema = reader.getSchema(); + Set fileColumnNames = new HashSet<>(); + for (Field field : fileSchema.getFields()) { + fileColumnNames.add(field.getName()); + } + List projectedNames = projectedRowType.getFieldNames(); + List existingColumns = new ArrayList<>(); + for (String name : projectedNames) { + if (fileColumnNames.contains(name)) { + existingColumns.add(name); + } + } + if (!existingColumns.isEmpty()) { + reader.project(existingColumns.toArray(new String[0])); + } + + this.numRowGroups = reader.numRowGroups(); + this.currentRowGroup = 0; + this.arrowBatchReader = new ArrowBatchReader(projectedRowType, true); + } + + @Nullable + @Override + public FileRecordIterator readBatch() throws IOException { + while (currentRowGroup < numRowGroups) { + int numRows = reader.rowGroupNumRows(currentRowGroup); + if (!matchesRowGroup(currentRowGroup, numRows)) { + returnedPosition += numRows; + currentRowGroup++; + continue; + } + + releaseCurrentVsr(); + + VectorSchemaRoot vsr = reader.readRowGroup(currentRowGroup, allocator); + currentRowGroup++; + this.currentVsr = vsr; + + Iterator rows = arrowBatchReader.readBatch(vsr).iterator(); + + return new FileRecordIterator() { + @Override + public long returnedPosition() { + return returnedPosition; + } + + @Override + public Path filePath() { + return filePath; + } + + @Nullable + @Override + public InternalRow next() { + if (rows.hasNext()) { + returnedPosition++; + return rows.next(); + } + return null; + } + + @Override + public void releaseBatch() { + releaseCurrentVsr(); + } + }; + } + return null; + } + + private boolean matchesRowGroup(int rowGroupIndex, long rowCount) { + if (predicates == null || predicates.isEmpty()) { + return true; + } + + Map statsMap = reader.getRowGroupStatistics(rowGroupIndex); + if (statsMap.isEmpty()) { + return true; + } + + int fieldCount = dataSchemaRowType.getFieldCount(); + GenericRow minValues = new GenericRow(fieldCount); + GenericRow maxValues = new GenericRow(fieldCount); + long[] nullCounts = new long[fieldCount]; + + List fields = dataSchemaRowType.getFields(); + for (int i = 0; i < fieldCount; i++) { + String colName = fields.get(i).name(); + ColumnStatistics stats = statsMap.get(colName); + if (stats == null) { + continue; + } + + nullCounts[i] = stats.getNullCount(); + if (stats.hasMinMax()) { + DataType dataType = fields.get(i).type(); + Object min = convertStatsValue(stats.getMin(), dataType); + Object max = convertStatsValue(stats.getMax(), dataType); + minValues.setField(i, min); + maxValues.setField(i, max); + } + } + + for (Predicate predicate : predicates) { + if (!predicate.test(rowCount, minValues, maxValues, new GenericArray(nullCounts))) { + return false; + } + } + return true; + } + + private void releaseCurrentVsr() { + if (currentVsr != null) { + currentVsr.close(); + currentVsr = null; + } + } + + @Override + public void close() throws IOException { + releaseCurrentVsr(); + reader.close(); + allocator.close(); + inputFileAdapter.close(); + } +} diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java new file mode 100644 index 000000000000..fdef0eb3652b --- /dev/null +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicRecordsWriter.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.arrow.ArrowBundleRecords; +import org.apache.paimon.arrow.vector.ArrowFormatWriter; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.format.BundleFormatWriter; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.io.BundleRecords; +import org.apache.paimon.mosaic.ColumnStatistics; +import org.apache.paimon.mosaic.MosaicWriter; +import org.apache.paimon.mosaic.WriterOptions; +import org.apache.paimon.options.MemorySize; +import org.apache.paimon.types.RowType; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** Mosaic records writer. */ +public class MosaicRecordsWriter implements BundleFormatWriter { + + private final ArrowFormatWriter arrowFormatWriter; + private final MosaicWriter nativeWriter; + private final BufferAllocator allocator; + private final List statsColumnNames; + @Nullable private MosaicWriterMetadata metadata; + + public MosaicRecordsWriter( + OutputStream outputStream, + RowType rowType, + FileFormatFactory.FormatContext formatContext, + List statsColumnNames, + @Nullable Integer numBuckets) { + this.statsColumnNames = statsColumnNames; + this.allocator = new RootAllocator(); + + int writeBatchSize = formatContext.writeBatchSize(); + long writeBatchMemory = formatContext.writeBatchMemory().getBytes(); + + this.arrowFormatWriter = + new ArrowFormatWriter(rowType, writeBatchSize, true, allocator, writeBatchMemory); + + WriterOptions options = new WriterOptions().zstdLevel(formatContext.zstdLevel()); + if (numBuckets != null) { + options = options.numBuckets(numBuckets); + } + MemorySize blockSize = formatContext.blockSize(); + if (blockSize != null) { + options = options.rowGroupMaxSize(blockSize.getBytes()); + } + if (!statsColumnNames.isEmpty()) { + options.statsColumns(statsColumnNames.toArray(new String[0])); + } + + Schema arrowSchema = arrowFormatWriter.getVectorSchemaRoot().getSchema(); + this.nativeWriter = new MosaicWriter(outputStream, arrowSchema, options, allocator); + } + + @Override + public void addElement(InternalRow internalRow) { + if (!arrowFormatWriter.write(internalRow)) { + flush(); + if (!arrowFormatWriter.write(internalRow)) { + throw new RuntimeException("Failed to write row to Mosaic file"); + } + } + } + + @Override + public void writeBundle(BundleRecords bundleRecords) { + if (bundleRecords instanceof ArrowBundleRecords) { + flush(); + nativeWriter.write(((ArrowBundleRecords) bundleRecords).getVectorSchemaRoot()); + } else { + for (InternalRow row : bundleRecords) { + addElement(row); + } + } + } + + @Override + public boolean reachTargetSize(boolean suggestedCheck, long targetSize) { + if (!suggestedCheck) { + return false; + } + return nativeWriter.estimatedFileSize() >= targetSize; + } + + @Override + public void close() throws IOException { + Throwable throwable = null; + + try { + flush(); + } catch (Throwable t) { + throwable = t; + } + + try { + nativeWriter.close(); + } catch (Throwable t) { + throwable = addSuppressed(throwable, t); + } + + try { + collectMetadata(); + } catch (Throwable t) { + throwable = addSuppressed(throwable, t); + } + + try { + arrowFormatWriter.close(); + } catch (Throwable t) { + throwable = addSuppressed(throwable, t); + } + + try { + allocator.close(); + } catch (Throwable t) { + throwable = addSuppressed(throwable, t); + } + + if (throwable != null) { + rethrow(throwable); + } + } + + @Nullable + @Override + public Object writerMetadata() { + return metadata; + } + + private void collectMetadata() { + int numRowGroups = nativeWriter.numRowGroups(); + List> allStats = new ArrayList<>(numRowGroups); + for (int i = 0; i < numRowGroups; i++) { + allStats.add(nativeWriter.getRowGroupStatistics(i)); + } + this.metadata = new MosaicWriterMetadata(numRowGroups, allStats, statsColumnNames); + } + + private void flush() { + arrowFormatWriter.flush(); + if (!arrowFormatWriter.empty()) { + VectorSchemaRoot vsr = arrowFormatWriter.getVectorSchemaRoot(); + nativeWriter.write(vsr); + } + arrowFormatWriter.reset(); + } + + private static Throwable addSuppressed(Throwable throwable, Throwable suppressed) { + if (throwable == null) { + return suppressed; + } + throwable.addSuppressed(suppressed); + return throwable; + } + + private static void rethrow(Throwable throwable) throws IOException { + if (throwable instanceof IOException) { + throw (IOException) throwable; + } + if (throwable instanceof RuntimeException) { + throw (RuntimeException) throwable; + } + if (throwable instanceof Error) { + throw (Error) throwable; + } + throw new IOException(throwable); + } +} diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractor.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractor.java new file mode 100644 index 000000000000..f426b27cfa81 --- /dev/null +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractor.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.mosaic.ColumnStatistics; +import org.apache.paimon.mosaic.MosaicReader; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.paimon.format.mosaic.MosaicObjects.convertStatsValue; + +/** Extracts statistics from Mosaic file metadata. */ +public class MosaicSimpleStatsExtractor implements SimpleStatsExtractor { + + private final RowType rowType; + private final SimpleColStatsCollector.Factory[] statsCollectors; + + public MosaicSimpleStatsExtractor( + RowType rowType, SimpleColStatsCollector.Factory[] statsCollectors) { + this.rowType = rowType; + this.statsCollectors = statsCollectors; + } + + @Override + public SimpleColStats[] extract(FileIO fileIO, Path path, long length) { + try (MosaicInputFileAdapter inputFile = new MosaicInputFileAdapter(fileIO, path); + BufferAllocator allocator = new RootAllocator(); + MosaicReader reader = MosaicReader.open(inputFile, length, allocator)) { + return extractFromStats(reader.numRowGroups(), reader::getRowGroupStatistics, null); + } catch (IOException e) { + throw new RuntimeException("Failed to extract stats from " + path, e); + } + } + + @Override + public SimpleColStats[] extract( + FileIO fileIO, Path path, long length, @Nullable Object writerMetadata) { + if (writerMetadata instanceof MosaicWriterMetadata) { + MosaicWriterMetadata meta = (MosaicWriterMetadata) writerMetadata; + Set statsFieldIndices = resolveStatsFieldIndices(meta.statsColumnNames()); + return extractFromStats( + meta.numRowGroups(), meta::getRowGroupStatistics, statsFieldIndices); + } + return extract(fileIO, path, length); + } + + @Override + public Pair extractWithFileInfo( + FileIO fileIO, Path path, long length) { + try (MosaicInputFileAdapter inputFile = new MosaicInputFileAdapter(fileIO, path); + BufferAllocator allocator = new RootAllocator(); + MosaicReader reader = MosaicReader.open(inputFile, length, allocator)) { + int numRowGroups = reader.numRowGroups(); + SimpleColStats[] stats = + extractFromStats(numRowGroups, reader::getRowGroupStatistics, null); + long rowCount = 0; + for (int rg = 0; rg < numRowGroups; rg++) { + rowCount += reader.rowGroupNumRows(rg); + } + return Pair.of(stats, new FileInfo(rowCount)); + } catch (IOException e) { + throw new RuntimeException("Failed to extract stats from " + path, e); + } + } + + @SuppressWarnings("unchecked") + private SimpleColStats[] extractFromStats( + int numRowGroups, + RowGroupStatsProvider statsProvider, + @Nullable Set statsFieldIndices) { + int fieldCount = rowType.getFieldCount(); + List fieldNames = rowType.getFieldNames(); + Object[] minValues = new Object[fieldCount]; + Object[] maxValues = new Object[fieldCount]; + long[] nullCounts = new long[fieldCount]; + Set seenColumns = new HashSet<>(); + + for (int rg = 0; rg < numRowGroups; rg++) { + Map statsMap = statsProvider.getRowGroupStatistics(rg); + for (Map.Entry entry : statsMap.entrySet()) { + int colIdx = fieldNames.indexOf(entry.getKey()); + if (colIdx < 0) { + continue; + } + + ColumnStatistics stat = entry.getValue(); + seenColumns.add(colIdx); + nullCounts[colIdx] += stat.getNullCount(); + + if (stat.hasMinMax()) { + DataType dataType = rowType.getFields().get(colIdx).type(); + Object min = convertStatsValue(stat.getMin(), dataType); + Object max = convertStatsValue(stat.getMax(), dataType); + if (min instanceof Comparable) { + if (minValues[colIdx] == null) { + minValues[colIdx] = min; + } else { + if (((Comparable) min).compareTo(minValues[colIdx]) < 0) { + minValues[colIdx] = min; + } + } + } + if (max instanceof Comparable) { + if (maxValues[colIdx] == null) { + maxValues[colIdx] = max; + } else { + if (((Comparable) max).compareTo(maxValues[colIdx]) > 0) { + maxValues[colIdx] = max; + } + } + } + } + } + } + + Set trackedColumns = statsFieldIndices != null ? statsFieldIndices : seenColumns; + SimpleColStatsCollector[] collectors = SimpleColStatsCollector.create(statsCollectors); + SimpleColStats[] result = new SimpleColStats[fieldCount]; + for (int i = 0; i < fieldCount; i++) { + if (!trackedColumns.contains(i) || !seenColumns.contains(i)) { + result[i] = collectors[i].convert(new SimpleColStats(null, null, null)); + } else { + SimpleColStats fieldStats = + new SimpleColStats(minValues[i], maxValues[i], nullCounts[i]); + result[i] = collectors[i].convert(fieldStats); + } + } + return result; + } + + private Set resolveStatsFieldIndices(List statsColumnNames) { + Set indices = new HashSet<>(); + List fieldNames = rowType.getFieldNames(); + for (String name : statsColumnNames) { + int idx = fieldNames.indexOf(name); + if (idx >= 0) { + indices.add(idx); + } + } + return indices; + } + + @FunctionalInterface + private interface RowGroupStatsProvider { + Map getRowGroupStatistics(int rowGroupIndex); + } +} diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterFactory.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterFactory.java new file mode 100644 index 000000000000..ca67647f8cab --- /dev/null +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +/** A factory to create Mosaic {@link FormatWriter}. */ +public class MosaicWriterFactory implements FormatWriterFactory { + + private final RowType rowType; + private final FileFormatFactory.FormatContext formatContext; + private final List statsColumnNames; + private final @Nullable Integer numBuckets; + + public MosaicWriterFactory(RowType rowType, FileFormatFactory.FormatContext formatContext) { + this.rowType = rowType; + this.formatContext = formatContext; + String statsColumnsValue = formatContext.options().get(MosaicFileFormat.STATS_COLUMNS); + if (statsColumnsValue == null || statsColumnsValue.trim().isEmpty()) { + this.statsColumnNames = new ArrayList<>(); + } else { + this.statsColumnNames = + Arrays.stream(statsColumnsValue.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + } + this.numBuckets = formatContext.options().get(MosaicFileFormat.NUM_BUCKETS); + } + + @Override + public FormatWriter create(PositionOutputStream out, String compression) { + validateCompression(compression); + return new MosaicRecordsWriter(out, rowType, formatContext, statsColumnNames, numBuckets); + } + + private static void validateCompression(String compression) { + if (compression == null) { + return; + } + String normalized = compression.toLowerCase(Locale.ROOT); + if (!normalized.equals("zstd")) { + throw new UnsupportedOperationException( + "Mosaic format only supports zstd compression, but got: " + compression); + } + } +} diff --git a/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterMetadata.java b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterMetadata.java new file mode 100644 index 000000000000..cd3149fd4470 --- /dev/null +++ b/paimon-mosaic/src/main/java/org/apache/paimon/format/mosaic/MosaicWriterMetadata.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.mosaic.ColumnStatistics; + +import java.util.List; +import java.util.Map; + +/** In-memory metadata captured from MosaicWriter after close. */ +public class MosaicWriterMetadata { + + private final int numRowGroups; + private final List> rowGroupStats; + private final List statsColumnNames; + + public MosaicWriterMetadata( + int numRowGroups, + List> rowGroupStats, + List statsColumnNames) { + this.numRowGroups = numRowGroups; + this.rowGroupStats = rowGroupStats; + this.statsColumnNames = statsColumnNames; + } + + public int numRowGroups() { + return numRowGroups; + } + + public Map getRowGroupStatistics(int rowGroupIndex) { + return rowGroupStats.get(rowGroupIndex); + } + + public List statsColumnNames() { + return statsColumnNames; + } +} diff --git a/paimon-mosaic/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-mosaic/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory new file mode 100644 index 000000000000..bc955c493506 --- /dev/null +++ b/paimon-mosaic/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.format.mosaic.MosaicFileFormatFactory diff --git a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFileFormatTest.java b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFileFormatTest.java new file mode 100644 index 000000000000..8e53164e8627 --- /dev/null +++ b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFileFormatTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Unit tests for {@link MosaicFileFormat} and {@link MosaicFileFormatFactory}. */ +class MosaicFileFormatTest { + + @Test + void testFactoryIdentifier() { + MosaicFileFormatFactory factory = new MosaicFileFormatFactory(); + assertThat(factory.identifier()).isEqualTo("mosaic"); + } + + @Test + void testFactoryCreate() { + MosaicFileFormatFactory factory = new MosaicFileFormatFactory(); + FileFormatFactory.FormatContext context = + new FileFormatFactory.FormatContext(new Options(), 1024, 1024); + assertThat(factory.create(context)).isInstanceOf(MosaicFileFormat.class); + } + + @Test + void testCreateReaderFactory() { + MosaicFileFormat format = createFormat(); + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + FormatReaderFactory readerFactory = + format.createReaderFactory(rowType, rowType, new ArrayList<>()); + assertThat(readerFactory).isInstanceOf(MosaicReaderFactory.class); + } + + @Test + void testCreateWriterFactory() { + MosaicFileFormat format = createFormat(); + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + assertThat(writerFactory).isInstanceOf(MosaicWriterFactory.class); + } + + @Test + void testValidateDataFieldsSupported() { + MosaicFileFormat format = createFormat(); + RowType rowType = + DataTypes.ROW( + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.STRING(), + DataTypes.DOUBLE(), + DataTypes.FLOAT(), + DataTypes.BOOLEAN(), + DataTypes.DATE(), + DataTypes.TIMESTAMP(3), + DataTypes.DECIMAL(10, 2), + DataTypes.BYTES()); + format.validateDataFields(rowType); + } + + @Test + void testValidateDataFieldsMapUnsupported() { + MosaicFileFormat format = createFormat(); + RowType rowType = DataTypes.ROW(DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())); + assertThatThrownBy(() -> format.validateDataFields(rowType)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("MAP"); + } + + @Test + void testValidateDataFieldsMultisetUnsupported() { + MosaicFileFormat format = createFormat(); + RowType rowType = DataTypes.ROW(DataTypes.MULTISET(DataTypes.STRING())); + assertThatThrownBy(() -> format.validateDataFields(rowType)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("MULTISET"); + } + + @Test + void testCreateStatsExtractor() { + MosaicFileFormat format = createFormat(); + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + assertThat( + format.createStatsExtractor( + rowType, + new org.apache.paimon.statistics.SimpleColStatsCollector.Factory[] { + org.apache.paimon.statistics.SimpleColStatsCollector.from( + "full"), + org.apache.paimon.statistics.SimpleColStatsCollector.from( + "full") + })) + .isPresent(); + } + + private static MosaicFileFormat createFormat() { + return new MosaicFileFormat(new FileFormatFactory.FormatContext(new Options(), 1024, 1024)); + } +} diff --git a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFormatReadWriteTest.java b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFormatReadWriteTest.java new file mode 100644 index 000000000000..41f632b3ee3f --- /dev/null +++ b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicFormatReadWriteTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatReadWriteTest; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.BeforeAll; + +import java.math.BigDecimal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** Round-trip read/write tests for Mosaic format. */ +class MosaicFormatReadWriteTest extends FormatReadWriteTest { + + MosaicFormatReadWriteTest() { + super("mosaic"); + } + + @BeforeAll + static void checkNativeLibrary() { + assumeTrue(isNativeAvailable(), "Mosaic native library not available"); + } + + @Override + protected FileFormat fileFormat() { + return new MosaicFileFormat(new FileFormatFactory.FormatContext(new Options(), 1024, 1024)); + } + + @Override + public String compression() { + return "zstd"; + } + + @Override + public boolean supportNestedReadPruning() { + return false; + } + + @Override + protected RowType rowTypeForFullTypesTest() { + return RowType.builder() + .field("f_int", DataTypes.INT().notNull()) + .field("f_string", DataTypes.STRING()) + .field("f_double", DataTypes.DOUBLE().notNull()) + .field("f_boolean", DataTypes.BOOLEAN()) + .field("f_tinyint", DataTypes.TINYINT()) + .field("f_smallint", DataTypes.SMALLINT()) + .field("f_bigint", DataTypes.BIGINT()) + .field("f_float", DataTypes.FLOAT()) + .field("f_binary", DataTypes.BYTES()) + .field("f_date", DataTypes.DATE()) + .field("f_timestamp3", DataTypes.TIMESTAMP(3)) + .field("f_timestamp6", DataTypes.TIMESTAMP(6)) + .field("f_decimal_5_2", DataTypes.DECIMAL(5, 2)) + .field("f_decimal_20_0", DataTypes.DECIMAL(20, 0)) + .build(); + } + + @Override + protected GenericRow expectedRowForFullTypesTest() { + return GenericRow.of( + 42, + BinaryString.fromString("hello mosaic"), + 3.14d, + true, + (byte) 7, + (short) 256, + 9876543210L, + 1.5f, + new byte[] {1, 2, 3}, + 18000, + Timestamp.fromEpochMillis(1700000000000L), + Timestamp.fromMicros(1700000000000000L), + Decimal.fromBigDecimal(new BigDecimal("123.45"), 5, 2), + Decimal.fromBigDecimal(new BigDecimal("12345678901234567890"), 20, 0)); + } + + @Override + protected void validateFullTypesResult(InternalRow actual, InternalRow expected) { + for (int i = 0; i < 14; i++) { + if (expected.isNullAt(i)) { + assertThat(actual.isNullAt(i)).isTrue(); + } + } + assertThat(actual.getInt(0)).isEqualTo(expected.getInt(0)); + assertThat(actual.getString(1)).isEqualTo(expected.getString(1)); + assertThat(actual.getDouble(2)).isEqualTo(expected.getDouble(2)); + assertThat(actual.getBoolean(3)).isEqualTo(expected.getBoolean(3)); + assertThat(actual.getByte(4)).isEqualTo(expected.getByte(4)); + assertThat(actual.getShort(5)).isEqualTo(expected.getShort(5)); + assertThat(actual.getLong(6)).isEqualTo(expected.getLong(6)); + assertThat(actual.getFloat(7)).isEqualTo(expected.getFloat(7)); + assertThat(actual.getBinary(8)).isEqualTo(expected.getBinary(8)); + assertThat(actual.getInt(9)).isEqualTo(expected.getInt(9)); + assertThat(actual.getTimestamp(10, 3)).isEqualTo(expected.getTimestamp(10, 3)); + assertThat(actual.getTimestamp(11, 6)).isEqualTo(expected.getTimestamp(11, 6)); + assertThat(actual.getDecimal(12, 5, 2)).isEqualTo(expected.getDecimal(12, 5, 2)); + assertThat(actual.getDecimal(13, 20, 0)).isEqualTo(expected.getDecimal(13, 20, 0)); + } + + private static boolean isNativeAvailable() { + try { + Class.forName("org.apache.paimon.mosaic.NativeLib"); + return true; + } catch (Throwable t) { + return false; + } + } +} diff --git a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicObjectsTest.java b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicObjectsTest.java new file mode 100644 index 000000000000..e05ed1709c4a --- /dev/null +++ b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicObjectsTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link MosaicObjects}. */ +class MosaicObjectsTest { + + @Test + void testNullBytes() { + assertThat(MosaicObjects.convertStatsValue(null, DataTypes.INT())).isNull(); + } + + @Test + void testEmptyBytes() { + assertThat(MosaicObjects.convertStatsValue(new byte[0], DataTypes.INT())).isNull(); + } + + @Test + void testBoolean() { + assertThat(MosaicObjects.convertStatsValue(new byte[] {1}, DataTypes.BOOLEAN())) + .isEqualTo(true); + assertThat(MosaicObjects.convertStatsValue(new byte[] {0}, DataTypes.BOOLEAN())) + .isEqualTo(false); + } + + @Test + void testTinyInt() { + assertThat(MosaicObjects.convertStatsValue(new byte[] {42}, DataTypes.TINYINT())) + .isEqualTo((byte) 42); + assertThat(MosaicObjects.convertStatsValue(new byte[] {(byte) -1}, DataTypes.TINYINT())) + .isEqualTo((byte) -1); + } + + @Test + void testSmallInt() { + byte[] bytes = ByteBuffer.allocate(2).putShort((short) 1234).array(); + assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.SMALLINT())) + .isEqualTo((short) 1234); + } + + @Test + void testInt() { + byte[] bytes = ByteBuffer.allocate(4).putInt(123456).array(); + assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.INT())).isEqualTo(123456); + } + + @Test + void testIntNegative() { + byte[] bytes = ByteBuffer.allocate(4).putInt(-999).array(); + assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.INT())).isEqualTo(-999); + } + + @Test + void testBigInt() { + byte[] bytes = ByteBuffer.allocate(8).putLong(9876543210L).array(); + assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.BIGINT())) + .isEqualTo(9876543210L); + } + + @Test + void testFloat() { + byte[] bytes = ByteBuffer.allocate(4).putFloat(3.14f).array(); + assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.FLOAT())).isEqualTo(3.14f); + } + + @Test + void testDouble() { + byte[] bytes = ByteBuffer.allocate(8).putDouble(2.718281828).array(); + assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.DOUBLE())) + .isEqualTo(2.718281828); + } + + @Test + void testVarChar() { + byte[] bytes = "hello".getBytes(); + assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.STRING())) + .isEqualTo(BinaryString.fromString("hello")); + } + + @Test + void testBinary() { + byte[] bytes = new byte[] {1, 2, 3, 4, 5}; + assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.BYTES())).isEqualTo(bytes); + } + + @Test + void testDate() { + byte[] bytes = ByteBuffer.allocate(4).putInt(18000).array(); + assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.DATE())).isEqualTo(18000); + } + + @Test + void testTimestampMillis() { + long millis = 1700000000000L; + byte[] bytes = ByteBuffer.allocate(8).putLong(millis).array(); + Object result = MosaicObjects.convertStatsValue(bytes, DataTypes.TIMESTAMP(3)); + assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis)); + } + + @Test + void testTimestampMicros() { + long micros = 1700000000000000L; + byte[] bytes = ByteBuffer.allocate(8).putLong(micros).array(); + Object result = MosaicObjects.convertStatsValue(bytes, DataTypes.TIMESTAMP(6)); + assertThat(result).isEqualTo(Timestamp.fromMicros(micros)); + } + + @Test + void testDecimal() { + // 1000 in big-endian two's complement = 0x03E8 + byte[] beBytes = new byte[] {0x03, (byte) 0xE8}; + Object result = MosaicObjects.convertStatsValue(beBytes, DataTypes.DECIMAL(10, 2)); + assertThat(result).isInstanceOf(Decimal.class); + Decimal decimal = (Decimal) result; + assertThat(decimal.toBigDecimal().intValue()).isEqualTo(10); + } + + @Test + void testTimestampNanos() { + long millis = 1700000000123L; + int nanosOfMilli = 456789; + byte[] bytes = ByteBuffer.allocate(12).putLong(millis).putInt(nanosOfMilli).array(); + Object result = MosaicObjects.convertStatsValue(bytes, DataTypes.TIMESTAMP(9)); + assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis, nanosOfMilli)); + } + + @Test + void testTimestampNanosPrecision7() { + long millis = 1700000000000L; + int nanosOfMilli = 100000; + byte[] bytes = ByteBuffer.allocate(12).putLong(millis).putInt(nanosOfMilli).array(); + Object result = MosaicObjects.convertStatsValue(bytes, DataTypes.TIMESTAMP(7)); + assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis, nanosOfMilli)); + } + + @Test + void testTimestampWithLocalTimeZoneMillis() { + long millis = 1700000000000L; + byte[] bytes = ByteBuffer.allocate(8).putLong(millis).array(); + Object result = + MosaicObjects.convertStatsValue(bytes, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)); + assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis)); + } + + @Test + void testTimestampWithLocalTimeZoneMicros() { + long micros = 1700000000000000L; + byte[] bytes = ByteBuffer.allocate(8).putLong(micros).array(); + Object result = + MosaicObjects.convertStatsValue(bytes, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)); + assertThat(result).isEqualTo(Timestamp.fromMicros(micros)); + } + + @Test + void testTimestampWithLocalTimeZoneNanos() { + long millis = 1700000000123L; + int nanosOfMilli = 456789; + byte[] bytes = ByteBuffer.allocate(12).putLong(millis).putInt(nanosOfMilli).array(); + Object result = + MosaicObjects.convertStatsValue(bytes, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)); + assertThat(result).isEqualTo(Timestamp.fromEpochMillis(millis, nanosOfMilli)); + } + + @Test + void testEmptyStringVarChar() { + Object result = MosaicObjects.convertStatsValue(new byte[0], DataTypes.STRING()); + assertThat(result).isEqualTo(BinaryString.fromString("")); + } + + @Test + void testEmptyBinary() { + Object result = MosaicObjects.convertStatsValue(new byte[0], DataTypes.BYTES()); + assertThat(result).isEqualTo(new byte[0]); + } + + @Test + void testUnsupportedTypeReturnsNull() { + byte[] bytes = new byte[] {1, 2, 3}; + assertThat(MosaicObjects.convertStatsValue(bytes, DataTypes.ARRAY(DataTypes.INT()))) + .isNull(); + } +} diff --git a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicReaderWriterTest.java b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicReaderWriterTest.java new file mode 100644 index 000000000000..60efceed08e3 --- /dev/null +++ b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicReaderWriterTest.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatReaderContext; +import org.apache.paimon.format.FormatReaderFactory; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** Integration tests for Mosaic reader and writer. */ +class MosaicReaderWriterTest { + + @TempDir java.nio.file.Path tempDir; + + @BeforeAll + static void checkNativeLibrary() { + assumeTrue(isNativeAvailable(), "Mosaic native library not available"); + } + + @Test + void testWriteAndRead() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + Path path = newPath(); + + writeRows( + rowType, + path, + GenericRow.of(1, BinaryString.fromString("hello")), + GenericRow.of(2, BinaryString.fromString("world"))); + + List result = readAll(rowType, rowType, path, null); + assertThat(result).hasSize(2); + assertThat(result.get(0).getInt(0)).isEqualTo(1); + assertThat(result.get(0).getString(1).toString()).isEqualTo("hello"); + assertThat(result.get(1).getInt(0)).isEqualTo(2); + assertThat(result.get(1).getString(1).toString()).isEqualTo("world"); + } + + @Test + void testNullValues() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + Path path = newPath(); + + writeRows( + rowType, + path, + GenericRow.of(1, null), + GenericRow.of(null, BinaryString.fromString("test")), + GenericRow.of(null, null)); + + List result = readAll(rowType, rowType, path, null); + assertThat(result).hasSize(3); + assertThat(result.get(0).isNullAt(1)).isTrue(); + assertThat(result.get(1).isNullAt(0)).isTrue(); + assertThat(result.get(2).isNullAt(0)).isTrue(); + assertThat(result.get(2).isNullAt(1)).isTrue(); + } + + @Test + void testColumnProjection() throws IOException { + RowType writeType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_string", DataTypes.STRING()) + .field("f_double", DataTypes.DOUBLE()) + .build(); + RowType readType = RowType.builder().field("f_string", DataTypes.STRING()).build(); + Path path = newPath(); + + writeRows( + writeType, + path, + GenericRow.of(1, BinaryString.fromString("aaa"), 1.1), + GenericRow.of(2, BinaryString.fromString("bbb"), 2.2)); + + List result = readAll(writeType, readType, path, null); + assertThat(result).hasSize(2); + assertThat(result.get(0).getString(0).toString()).isEqualTo("aaa"); + assertThat(result.get(1).getString(0).toString()).isEqualTo("bbb"); + } + + @Test + void testLargeDataset() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + Path path = newPath(); + + int numRows = 10000; + GenericRow[] rows = new GenericRow[numRows]; + for (int i = 0; i < numRows; i++) { + rows[i] = GenericRow.of(i, BinaryString.fromString("row" + i)); + } + writeRows(rowType, path, rows); + + List result = readAll(rowType, rowType, path, null); + assertThat(result).hasSize(numRows); + assertThat(result.get(0).getInt(0)).isEqualTo(0); + assertThat(result.get(numRows - 1).getInt(0)).isEqualTo(numRows - 1); + } + + @Test + void testRowGroupPredicateFiltering() throws IOException { + RowType rowType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_string", DataTypes.STRING()) + .build(); + Path path = newPath(); + + int numRows = 10000; + GenericRow[] rows = new GenericRow[numRows]; + for (int i = 0; i < numRows; i++) { + rows[i] = GenericRow.of(i, BinaryString.fromString("v" + i)); + } + writeRows(rowType, path, "f_int", rows); + + // Predicate that cannot match any row group (all values are 0..9999) + PredicateBuilder builder = new PredicateBuilder(rowType); + Predicate predicate = builder.greaterThan(0, 99999); + List result = + readAll(rowType, rowType, path, Collections.singletonList(predicate)); + assertThat(result).isEmpty(); + + // Predicate that matches the row group (values include range 0..9999) + Predicate matchPredicate = builder.greaterThan(0, 5000); + List matchResult = + readAll(rowType, rowType, path, Collections.singletonList(matchPredicate)); + assertThat(matchResult).hasSize(numRows); + } + + @Test + void testReturnedPosition() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + Path path = newPath(); + + writeRows( + rowType, + path, + GenericRow.of(1, BinaryString.fromString("a")), + GenericRow.of(2, BinaryString.fromString("b")), + GenericRow.of(3, BinaryString.fromString("c"))); + + MosaicFileFormat format = createFormat(); + FormatReaderFactory readerFactory = format.createReaderFactory(rowType, rowType, null); + LocalFileIO fileIO = new LocalFileIO(); + RecordReader reader = + readerFactory.createReader( + new FormatReaderContext(fileIO, path, fileIO.getFileSize(path))); + + RecordReader.RecordIterator batch = reader.readBatch(); + assertThat(batch).isNotNull(); + FileRecordIterator fileIter = (FileRecordIterator) batch; + + fileIter.next(); + assertThat(fileIter.returnedPosition()).isEqualTo(0); + fileIter.next(); + assertThat(fileIter.returnedPosition()).isEqualTo(1); + fileIter.next(); + assertThat(fileIter.returnedPosition()).isEqualTo(2); + + reader.close(); + } + + @Test + void testProjectionWithMissingColumns() throws IOException { + RowType writeType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_string", DataTypes.STRING()) + .build(); + // Read type has a column that doesn't exist in the file (schema evolution) + RowType readType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_new_col", DataTypes.BIGINT()) + .field("f_string", DataTypes.STRING()) + .build(); + Path path = newPath(); + + writeRows( + writeType, + path, + GenericRow.of(1, BinaryString.fromString("aaa")), + GenericRow.of(2, BinaryString.fromString("bbb"))); + + List result = readAll(writeType, readType, path, null); + assertThat(result).hasSize(2); + assertThat(result.get(0).getInt(0)).isEqualTo(1); + assertThat(result.get(0).isNullAt(1)).isTrue(); + assertThat(result.get(0).getString(2).toString()).isEqualTo("aaa"); + assertThat(result.get(1).getInt(0)).isEqualTo(2); + assertThat(result.get(1).isNullAt(1)).isTrue(); + assertThat(result.get(1).getString(2).toString()).isEqualTo("bbb"); + } + + @Test + void testProjectionAllColumnsMissing() throws IOException { + RowType writeType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_string", DataTypes.STRING()) + .build(); + // Read type has only columns that don't exist in the file + RowType readType = + RowType.builder() + .field("f_new_a", DataTypes.INT()) + .field("f_new_b", DataTypes.STRING()) + .build(); + Path path = newPath(); + + writeRows( + writeType, + path, + GenericRow.of(1, BinaryString.fromString("x")), + GenericRow.of(2, BinaryString.fromString("y"))); + + List result = readAll(writeType, readType, path, null); + assertThat(result).hasSize(2); + assertThat(result.get(0).isNullAt(0)).isTrue(); + assertThat(result.get(0).isNullAt(1)).isTrue(); + assertThat(result.get(1).isNullAt(0)).isTrue(); + assertThat(result.get(1).isNullAt(1)).isTrue(); + } + + @Test + void testUnsupportedCompressionThrows() { + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + Path path = newPath(); + MosaicFileFormat format = createFormat(); + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + LocalFileIO fileIO = new LocalFileIO(); + + assertThatThrownBy(() -> writerFactory.create(fileIO.newOutputStream(path, false), "lz4")) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("lz4"); + } + + @Test + void testReachTargetSize() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + Path path = newPath(); + MosaicFileFormat format = createFormat(); + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + + LocalFileIO fileIO = new LocalFileIO(); + FormatWriter writer = writerFactory.create(fileIO.newOutputStream(path, false), "zstd"); + + boolean reached = false; + for (int i = 0; i < 100000; i++) { + writer.addElement(GenericRow.of(i, BinaryString.fromString("value_" + i + "_padding"))); + if (writer.reachTargetSize(true, 1024)) { + reached = true; + break; + } + } + writer.close(); + assertThat(reached).isTrue(); + } + + private Path newPath() { + return new Path(tempDir.toUri().toString(), UUID.randomUUID() + ".mosaic"); + } + + private void writeRows(RowType rowType, Path path, GenericRow... rows) throws IOException { + writeRows(rowType, path, "", rows); + } + + private void writeRows(RowType rowType, Path path, String statsColumns, GenericRow... rows) + throws IOException { + MosaicFileFormat format = createFormat(statsColumns); + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + LocalFileIO fileIO = new LocalFileIO(); + FormatWriter writer = writerFactory.create(fileIO.newOutputStream(path, false), "zstd"); + for (GenericRow row : rows) { + writer.addElement(row); + } + writer.close(); + } + + private List readAll( + RowType dataType, RowType readType, Path path, List predicates) + throws IOException { + MosaicFileFormat format = createFormat(); + FormatReaderFactory readerFactory = + format.createReaderFactory(dataType, readType, predicates); + LocalFileIO fileIO = new LocalFileIO(); + RecordReader reader = + readerFactory.createReader( + new FormatReaderContext(fileIO, path, fileIO.getFileSize(path))); + + InternalRowSerializer serializer = new InternalRowSerializer(readType); + List result = new ArrayList<>(); + reader.forEachRemaining(row -> result.add(serializer.copy(row))); + reader.close(); + return result; + } + + private static MosaicFileFormat createFormat() { + return createFormat(""); + } + + private static MosaicFileFormat createFormat(String statsColumns) { + Options options = new Options(); + if (!statsColumns.isEmpty()) { + options.set(MosaicFileFormat.STATS_COLUMNS, statsColumns); + } + return new MosaicFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + } + + private static boolean isNativeAvailable() { + try { + Class.forName("org.apache.paimon.mosaic.NativeLib"); + return true; + } catch (Throwable t) { + return false; + } + } +} diff --git a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractorTest.java b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractorTest.java new file mode 100644 index 000000000000..8477c5b06540 --- /dev/null +++ b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicSimpleStatsExtractorTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleColStatsExtractorTest; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.UUID; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** Tests for {@link MosaicSimpleStatsExtractor}. */ +class MosaicSimpleStatsExtractorTest extends SimpleColStatsExtractorTest { + + @TempDir java.nio.file.Path statsTestTempDir; + + @BeforeAll + static void checkNativeLibrary() { + assumeTrue(isNativeAvailable(), "Mosaic native library not available"); + } + + @Override + protected FileFormat createFormat() { + Options options = new Options(); + options.set( + MosaicFileFormat.STATS_COLUMNS, + "f_boolean,f_tinyint,f_smallint,f_int,f_bigint,f_float," + + "f_double,f_string,f_decimal_5_2,f_date,f_timestamp3,f_timestamp6"); + return new MosaicFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + } + + @Override + protected RowType rowType() { + return RowType.builder() + .field("f_boolean", DataTypes.BOOLEAN()) + .field("f_tinyint", DataTypes.TINYINT()) + .field("f_smallint", DataTypes.SMALLINT()) + .field("f_int", DataTypes.INT()) + .field("f_bigint", DataTypes.BIGINT()) + .field("f_float", DataTypes.FLOAT()) + .field("f_double", DataTypes.DOUBLE()) + .field("f_string", DataTypes.VARCHAR(100)) + .field("f_decimal_5_2", DataTypes.DECIMAL(5, 2)) + .field("f_date", DataTypes.DATE()) + .field("f_timestamp3", DataTypes.TIMESTAMP(3)) + .field("f_timestamp6", DataTypes.TIMESTAMP(6)) + .build(); + } + + @Override + protected String fileCompression() { + return "zstd"; + } + + @Test + void testUntrackedColumnsReturnNone() throws IOException { + // stats_columns only tracks f_int, but the table has f_int + f_string + RowType rowType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_string", DataTypes.STRING()) + .build(); + Options options = new Options(); + options.set(MosaicFileFormat.STATS_COLUMNS, "f_int"); + MosaicFileFormat format = + new MosaicFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + + Path path = new Path(statsTestTempDir.toUri().toString(), UUID.randomUUID() + ".mosaic"); + LocalFileIO fileIO = new LocalFileIO(); + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + FormatWriter writer = writerFactory.create(fileIO.newOutputStream(path, false), "zstd"); + writer.addElement(GenericRow.of(1, BinaryString.fromString("a"))); + writer.addElement(GenericRow.of(2, BinaryString.fromString("b"))); + writer.close(); + + SimpleColStatsCollector.Factory[] collectors = + IntStream.range(0, rowType.getFieldCount()) + .mapToObj(i -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, collectors).get(); + SimpleColStats[] stats = extractor.extract(fileIO, path, fileIO.getFileSize(path)); + + // f_int is tracked, should have real stats + assertThat(stats[0].min()).isEqualTo(1); + assertThat(stats[0].max()).isEqualTo(2); + assertThat(stats[0].nullCount()).isEqualTo(0L); + // f_string is NOT tracked, should be NONE (null nullCount) + assertThat(stats[1].min()).isNull(); + assertThat(stats[1].max()).isNull(); + assertThat(stats[1].nullCount()).isNull(); + } + + @Test + void testBinaryColumnStatsNoException() throws Exception { + // Binary columns produce byte[] from convertStatsValue, which is not Comparable. + // Verify multi-row-group aggregation doesn't throw ClassCastException. + RowType rowType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_binary", DataTypes.VARBINARY(100)) + .build(); + // Build a fake MosaicWriterMetadata with binary stats across 2 row groups + java.lang.reflect.Constructor ctor = + org.apache.paimon.mosaic.ColumnStatistics.class.getDeclaredConstructor( + long.class, byte[].class, byte[].class); + ctor.setAccessible(true); + + java.util.Map rg0 = + new java.util.HashMap<>(); + rg0.put( + "f_int", + (org.apache.paimon.mosaic.ColumnStatistics) + ctor.newInstance(0L, intBytes(0), intBytes(100))); + rg0.put( + "f_binary", + (org.apache.paimon.mosaic.ColumnStatistics) + ctor.newInstance(0L, new byte[] {1, 2}, new byte[] {3, 4})); + + java.util.Map rg1 = + new java.util.HashMap<>(); + rg1.put( + "f_int", + (org.apache.paimon.mosaic.ColumnStatistics) + ctor.newInstance(0L, intBytes(50), intBytes(200))); + rg1.put( + "f_binary", + (org.apache.paimon.mosaic.ColumnStatistics) + ctor.newInstance(0L, new byte[] {5, 6}, new byte[] {7, 8})); + + java.util.List> allStats = + java.util.Arrays.asList(rg0, rg1); + MosaicWriterMetadata metadata = + new MosaicWriterMetadata(2, allStats, java.util.Arrays.asList("f_int", "f_binary")); + + // Write a minimal file (only f_int in stats_columns since native rejects binary) + Options options = new Options(); + options.set(MosaicFileFormat.STATS_COLUMNS, "f_int"); + MosaicFileFormat format = + new MosaicFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + Path path = new Path(statsTestTempDir.toUri().toString(), UUID.randomUUID() + ".mosaic"); + LocalFileIO fileIO = new LocalFileIO(); + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + FormatWriter writer = writerFactory.create(fileIO.newOutputStream(path, false), "zstd"); + writer.addElement(GenericRow.of(1, new byte[] {1})); + writer.close(); + + SimpleColStatsCollector.Factory[] collectors = + IntStream.range(0, rowType.getFieldCount()) + .mapToObj(i -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, collectors).get(); + // Should not throw ClassCastException + SimpleColStats[] stats = + extractor.extract(fileIO, path, fileIO.getFileSize(path), metadata); + + // f_int aggregated across row groups: min=0, max=200 + assertThat(stats[0].min()).isEqualTo(0); + assertThat(stats[0].max()).isEqualTo(200); + // f_binary min/max should be null (byte[] not Comparable, skipped) + assertThat(stats[1].min()).isNull(); + assertThat(stats[1].max()).isNull(); + } + + private static byte[] intBytes(int value) { + return java.nio.ByteBuffer.allocate(4).putInt(value).array(); + } + + private static boolean isNativeAvailable() { + try { + Class.forName("org.apache.paimon.mosaic.NativeLib"); + return true; + } catch (Throwable t) { + return false; + } + } +} diff --git a/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicWriterMetadataTest.java b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicWriterMetadataTest.java new file mode 100644 index 000000000000..4caf65d66176 --- /dev/null +++ b/paimon-mosaic/src/test/java/org/apache/paimon/format/mosaic/MosaicWriterMetadataTest.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.mosaic; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.format.FileFormatFactory; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.format.SimpleStatsExtractor.FileInfo; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.UUID; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** Tests for writer metadata based stats extraction in Mosaic format. */ +class MosaicWriterMetadataTest { + + @TempDir java.nio.file.Path tempDir; + + @BeforeAll + static void checkNativeLibrary() { + assumeTrue(isNativeAvailable(), "Mosaic native library not available"); + } + + @Test + void testWriterMetadataNotNull() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + Path path = newPath(); + + FormatWriter writer = createWriter(rowType, path, "f0,f1"); + writer.addElement(GenericRow.of(1, BinaryString.fromString("hello"))); + writer.addElement(GenericRow.of(2, BinaryString.fromString("world"))); + writer.close(); + + Object metadata = writer.writerMetadata(); + assertThat(metadata).isNotNull(); + assertThat(metadata).isInstanceOf(MosaicWriterMetadata.class); + + MosaicWriterMetadata mosaicMeta = (MosaicWriterMetadata) metadata; + assertThat(mosaicMeta.numRowGroups()).isGreaterThan(0); + } + + @Test + void testStatsFromMetadataMatchesStatsFromFile() throws IOException { + RowType rowType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_bigint", DataTypes.BIGINT()) + .field("f_string", DataTypes.STRING()) + .field("f_double", DataTypes.DOUBLE()) + .build(); + Path path = newPath(); + String statsColumns = "f_int,f_bigint,f_string,f_double"; + + FormatWriter writer = createWriter(rowType, path, statsColumns); + for (int i = 0; i < 1000; i++) { + writer.addElement( + GenericRow.of(i, (long) i * 100, BinaryString.fromString("val_" + i), i * 1.1)); + } + writer.close(); + + Object metadata = writer.writerMetadata(); + assertThat(metadata).isNotNull(); + + MosaicFileFormat format = createFormat(statsColumns); + int fieldCount = rowType.getFieldCount(); + SimpleColStatsCollector.Factory[] collectors = + IntStream.range(0, fieldCount) + .mapToObj(i -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + + SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, collectors).get(); + LocalFileIO fileIO = new LocalFileIO(); + long fileSize = fileIO.getFileSize(path); + + SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize); + SimpleColStats[] fromMetadata = extractor.extract(fileIO, path, fileSize, metadata); + + assertThat(fromMetadata).isEqualTo(fromFile); + } + + @Test + void testStatsFromMetadataWithNullValues() throws IOException { + RowType rowType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_string", DataTypes.STRING()) + .build(); + Path path = newPath(); + String statsColumns = "f_int,f_string"; + + FormatWriter writer = createWriter(rowType, path, statsColumns); + writer.addElement(GenericRow.of(1, null)); + writer.addElement(GenericRow.of(null, BinaryString.fromString("a"))); + writer.addElement(GenericRow.of(3, BinaryString.fromString("b"))); + writer.close(); + + Object metadata = writer.writerMetadata(); + MosaicFileFormat format = createFormat(statsColumns); + int fieldCount = rowType.getFieldCount(); + SimpleColStatsCollector.Factory[] collectors = + IntStream.range(0, fieldCount) + .mapToObj(i -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + + SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, collectors).get(); + LocalFileIO fileIO = new LocalFileIO(); + long fileSize = fileIO.getFileSize(path); + + SimpleColStats[] fromMetadata = extractor.extract(fileIO, path, fileSize, metadata); + assertThat(fromMetadata).isNotNull(); + assertThat(fromMetadata[0].nullCount()).isEqualTo(1L); + assertThat(fromMetadata[1].nullCount()).isEqualTo(1L); + } + + @Test + void testExtractWithFileInfoRowCount() throws IOException { + RowType rowType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_string", DataTypes.STRING()) + .build(); + Path path = newPath(); + String statsColumns = "f_int,f_string"; + + int numRows = 500; + FormatWriter writer = createWriter(rowType, path, statsColumns); + for (int i = 0; i < numRows; i++) { + writer.addElement(GenericRow.of(i, BinaryString.fromString("row_" + i))); + } + writer.close(); + + MosaicFileFormat format = createFormat(statsColumns); + int fieldCount = rowType.getFieldCount(); + SimpleColStatsCollector.Factory[] collectors = + IntStream.range(0, fieldCount) + .mapToObj(i -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + + SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, collectors).get(); + LocalFileIO fileIO = new LocalFileIO(); + long fileSize = fileIO.getFileSize(path); + + Pair result = + extractor.extractWithFileInfo(fileIO, path, fileSize); + assertThat(result.getRight().getRowCount()).isEqualTo(numRows); + assertThat(result.getLeft()).isNotNull(); + assertThat(result.getLeft()).hasSize(fieldCount); + } + + @Test + void testPartialStatsColumnsFromMetadata() throws IOException { + RowType rowType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_string", DataTypes.STRING()) + .field("f_double", DataTypes.DOUBLE()) + .build(); + Path path = newPath(); + String statsColumns = "f_int"; + + FormatWriter writer = createWriter(rowType, path, statsColumns); + writer.addElement(GenericRow.of(1, BinaryString.fromString("a"), 1.0)); + writer.addElement(GenericRow.of(null, BinaryString.fromString("b"), 2.0)); + writer.addElement(GenericRow.of(3, null, null)); + writer.close(); + + Object metadata = writer.writerMetadata(); + assertThat(metadata).isInstanceOf(MosaicWriterMetadata.class); + MosaicWriterMetadata mosaicMeta = (MosaicWriterMetadata) metadata; + assertThat(mosaicMeta.statsColumnNames()).containsExactly("f_int"); + + MosaicFileFormat format = createFormat(statsColumns); + int fieldCount = rowType.getFieldCount(); + SimpleColStatsCollector.Factory[] collectors = + IntStream.range(0, fieldCount) + .mapToObj(i -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + + SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, collectors).get(); + LocalFileIO fileIO = new LocalFileIO(); + long fileSize = fileIO.getFileSize(path); + + SimpleColStats[] fromMetadata = extractor.extract(fileIO, path, fileSize, metadata); + + // f_int has stats: min=1, max=3, nullCount=1 + assertThat(fromMetadata[0].min()).isEqualTo(1); + assertThat(fromMetadata[0].max()).isEqualTo(3); + assertThat(fromMetadata[0].nullCount()).isEqualTo(1L); + + // f_string and f_double have no stats (not in statsColumns) + assertThat(fromMetadata[1].min()).isNull(); + assertThat(fromMetadata[1].max()).isNull(); + assertThat(fromMetadata[1].nullCount()).isNull(); + assertThat(fromMetadata[2].min()).isNull(); + assertThat(fromMetadata[2].max()).isNull(); + assertThat(fromMetadata[2].nullCount()).isNull(); + } + + @Test + void testStatsOnMiddleColumn() throws IOException { + RowType rowType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_string", DataTypes.STRING()) + .field("f_double", DataTypes.DOUBLE()) + .build(); + Path path = newPath(); + String statsColumns = "f_string"; + + FormatWriter writer = createWriter(rowType, path, statsColumns); + writer.addElement(GenericRow.of(1, BinaryString.fromString("banana"), 1.0)); + writer.addElement(GenericRow.of(2, BinaryString.fromString("apple"), 2.0)); + writer.addElement(GenericRow.of(3, null, 3.0)); + writer.close(); + + Object metadata = writer.writerMetadata(); + MosaicFileFormat format = createFormat(statsColumns); + int fieldCount = rowType.getFieldCount(); + SimpleColStatsCollector.Factory[] collectors = + IntStream.range(0, fieldCount) + .mapToObj(i -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + + SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, collectors).get(); + LocalFileIO fileIO = new LocalFileIO(); + long fileSize = fileIO.getFileSize(path); + + SimpleColStats[] fromMetadata = extractor.extract(fileIO, path, fileSize, metadata); + + // f_int has no stats + assertThat(fromMetadata[0].min()).isNull(); + assertThat(fromMetadata[0].max()).isNull(); + assertThat(fromMetadata[0].nullCount()).isNull(); + + // f_string has stats: min="apple", max="banana", nullCount=1 + assertThat(fromMetadata[1].min()).isEqualTo(BinaryString.fromString("apple")); + assertThat(fromMetadata[1].max()).isEqualTo(BinaryString.fromString("banana")); + assertThat(fromMetadata[1].nullCount()).isEqualTo(1L); + + // f_double has no stats + assertThat(fromMetadata[2].min()).isNull(); + assertThat(fromMetadata[2].max()).isNull(); + assertThat(fromMetadata[2].nullCount()).isNull(); + } + + @Test + void testPartialStatsColumnsFromFile() throws IOException { + RowType rowType = + RowType.builder() + .field("f_int", DataTypes.INT()) + .field("f_string", DataTypes.STRING()) + .field("f_double", DataTypes.DOUBLE()) + .build(); + Path path = newPath(); + String statsColumns = "f_string"; + + FormatWriter writer = createWriter(rowType, path, statsColumns); + writer.addElement(GenericRow.of(1, BinaryString.fromString("banana"), 1.0)); + writer.addElement(GenericRow.of(2, BinaryString.fromString("apple"), 2.0)); + writer.addElement(GenericRow.of(3, null, 3.0)); + writer.close(); + + // Extract from file (no writer metadata), simulating fallback path + MosaicFileFormat format = createFormat(statsColumns); + int fieldCount = rowType.getFieldCount(); + SimpleColStatsCollector.Factory[] collectors = + IntStream.range(0, fieldCount) + .mapToObj(i -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + + SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, collectors).get(); + LocalFileIO fileIO = new LocalFileIO(); + long fileSize = fileIO.getFileSize(path); + + SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize); + + // f_int has no stats in file + assertThat(fromFile[0].min()).isNull(); + assertThat(fromFile[0].max()).isNull(); + assertThat(fromFile[0].nullCount()).isNull(); + + // f_string has stats + assertThat(fromFile[1].min()).isEqualTo(BinaryString.fromString("apple")); + assertThat(fromFile[1].max()).isEqualTo(BinaryString.fromString("banana")); + assertThat(fromFile[1].nullCount()).isEqualTo(1L); + + // f_double has no stats in file + assertThat(fromFile[2].min()).isNull(); + assertThat(fromFile[2].max()).isNull(); + assertThat(fromFile[2].nullCount()).isNull(); + } + + @Test + void testFallbackToFileWhenMetadataIsNull() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()); + Path path = newPath(); + String statsColumns = "f0,f1"; + + FormatWriter writer = createWriter(rowType, path, statsColumns); + writer.addElement(GenericRow.of(10, BinaryString.fromString("test"))); + writer.close(); + + MosaicFileFormat format = createFormat(statsColumns); + int fieldCount = rowType.getFieldCount(); + SimpleColStatsCollector.Factory[] collectors = + IntStream.range(0, fieldCount) + .mapToObj(i -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + + SimpleStatsExtractor extractor = format.createStatsExtractor(rowType, collectors).get(); + LocalFileIO fileIO = new LocalFileIO(); + long fileSize = fileIO.getFileSize(path); + + SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize); + SimpleColStats[] fromNull = extractor.extract(fileIO, path, fileSize, null); + + assertThat(fromNull).isEqualTo(fromFile); + } + + private Path newPath() { + return new Path(tempDir.toUri().toString(), UUID.randomUUID() + ".mosaic"); + } + + private FormatWriter createWriter(RowType rowType, Path path, String statsColumns) + throws IOException { + MosaicFileFormat format = createFormat(statsColumns); + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + LocalFileIO fileIO = new LocalFileIO(); + return writerFactory.create(fileIO.newOutputStream(path, false), "zstd"); + } + + private static MosaicFileFormat createFormat() { + return createFormat(""); + } + + private static MosaicFileFormat createFormat(String statsColumns) { + Options options = new Options(); + if (!statsColumns.isEmpty()) { + options.set(MosaicFileFormat.STATS_COLUMNS, statsColumns); + } + return new MosaicFileFormat(new FileFormatFactory.FormatContext(options, 1024, 1024)); + } + + private static boolean isNativeAvailable() { + try { + Class.forName("org.apache.paimon.mosaic.NativeLib"); + return true; + } catch (Throwable t) { + return false; + } + } +} diff --git a/pom.xml b/pom.xml index 22fddbb60dde..991c6c487421 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ under the License. paimon-api paimon-lumina paimon-vortex + paimon-mosaic paimon-tantivy