diff --git a/docs/content/primary-key-table/chain-table.md b/docs/content/primary-key-table/chain-table.md index 839eb12fc954..2eb4a387fe29 100644 --- a/docs/content/primary-key-table/chain-table.md +++ b/docs/content/primary-key-table/chain-table.md @@ -87,7 +87,6 @@ Notice that: - Chain table is only supported for primary key table, which means you should define `bucket` and `bucket-key` for the table. - Chain table should ensure that the schema of each branch is consistent. - Only spark support now, flink will be supported later. -- Chain compact is not supported for now, and it will be supported later. - Deletion vector is not supported for chain table. After creating a chain table, you can read and write data in the following ways. diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index f27478bbbd26..17bcbf2c5c23 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -20,6 +20,7 @@ import org.apache.paimon.spark.procedure.AlterFunctionProcedure; import org.apache.paimon.spark.procedure.AlterViewDialectProcedure; +import org.apache.paimon.spark.procedure.ChainMergeProcedure; import org.apache.paimon.spark.procedure.ClearConsumersProcedure; import org.apache.paimon.spark.procedure.CompactDatabaseProcedure; import org.apache.paimon.spark.procedure.CompactManifestProcedure; @@ -123,6 +124,7 @@ private static Map> initProcedureBuilders() { "trigger_tag_automatic_creation", TriggerTagAutomaticCreationProcedure::builder); procedureBuilders.put("rewrite_file_index", RewriteFileIndexProcedure::builder); procedureBuilders.put("copy", CopyFilesProcedure::builder); + procedureBuilders.put("chain_merge", ChainMergeProcedure::builder); return procedureBuilders.build(); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ChainMergeProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ChainMergeProcedure.java new file mode 100644 index 000000000000..79f89bd43a37 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ChainMergeProcedure.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.spark.SparkTable; +import org.apache.paimon.spark.SparkUtils; +import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionUtils; +import org.apache.paimon.table.ChainGroupReadTable; +import org.apache.paimon.table.FallbackReadFileStoreTable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageSerializer; +import org.apache.paimon.table.source.ChainSplit; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.utils.InternalRowPartitionComputer; +import org.apache.paimon.utils.ParameterUtils; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import scala.Option; + +import static org.apache.paimon.spark.utils.SparkProcedureUtils.toWhere; +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Chain merge procedure. Usage: + * + *

+ *  CALL sys.compact(table => 'tableId', partitions => 'p1=0,p2=0', target_branch => 'snapshot')
+ * 
+ */ +public class ChainMergeProcedure extends BaseProcedure { + + private static final Logger LOG = LoggerFactory.getLogger(ChainMergeProcedure.class); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + ProcedureParameter.required("partitions", StringType), + ProcedureParameter.optional("target_branch", StringType), + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + }); + + protected ChainMergeProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + private boolean blank(InternalRow args, int index) { + return args.isNullAt(index) || StringUtils.isNullOrWhitespaceOnly(args.getString(index)); + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + String partitions = args.getString(1); + SparkTable sparkTable = loadSparkTable(tableIdent); + String targetBranch = + blank(args, 2) + ? sparkTable.coreOptions().scanFallbackSnapshotBranch() + : args.getString(2); + List> compactPartitions = + ParameterUtils.getPartitions(partitions.split(";")); + validataChainMerge(sparkTable, targetBranch, partitions, compactPartitions); + DataSourceV2Relation relation = createRelation(tableIdent); + Expression condition = + getPartitionCondition(relation, sparkTable.table(), toWhere(partitions)); + boolean executed = + executeChainMerge( + (FallbackReadFileStoreTable) sparkTable.getTable(), + condition, + relation, + targetBranch); + return new InternalRow[] {newInternalRow(executed)}; + } + + public boolean executeChainMerge( + FallbackReadFileStoreTable chainTable, + Expression partCondition, + DataSourceV2Relation relation, + String targetBranch) { + + // build scan for the specific partition + Preconditions.checkArgument( + chainTable.other() instanceof ChainGroupReadTable, + "The chain merge should perform on the ChainFileStoreTable"); + + Option filter = + ExpressionUtils.convertConditionToPaimonPredicate( + partCondition, + ((LogicalPlan) relation).output(), + chainTable.rowType(), + false); + + ChainGroupReadTable chainGroupReadTable = (ChainGroupReadTable) chainTable.other(); + ChainGroupReadTable.ChainTableBatchScan scan = + (ChainGroupReadTable.ChainTableBatchScan) chainGroupReadTable.newScan(); + if (filter.isDefined()) { + scan.withFilter(filter.get()); + } + List splits = scan.plan().splits(); + + if (splits.isEmpty()) { + LOG.info("The target partition={} is empty", partCondition); + return false; + } + Preconditions.checkArgument( + splits.stream().allMatch(s -> (s instanceof ChainSplit)), + "The chain merge only accepts ChainDataSplit"); + + // build snapshot branch write builder with static partition overwrite + FileStoreTable targetTable = ((ChainGroupReadTable) chainTable.other()).wrapped(); + checkArgument( + targetBranch.equals(targetTable.coreOptions().branch()), + "chain_merge should merge to snapshot branch"); + InternalRowPartitionComputer computer = + new InternalRowPartitionComputer( + chainTable.coreOptions().partitionDefaultName(), + chainTable.schema().logicalPartitionType(), + chainTable.schema().partitionKeys().toArray(new String[0]), + chainTable.coreOptions().legacyPartitionName()); + Map targetPartition = + computer.generatePartValues(((ChainSplit) splits.get(0)).logicalPartition()); + + LOG.info( + "Direct chain_merge plan built, splits: {}, target partition: {}, target branch: {}", + splits.size(), + targetPartition, + targetBranch); + BatchWriteBuilder writeBuilder = + targetTable.newBatchWriteBuilder().withOverwrite(targetPartition); + JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext()); + + JavaRDD commitMessageJavaRDD = + javaSparkContext + .parallelize(splits) + .mapPartitions( + (FlatMapFunction, byte[]>) + splitIterator -> { + List serializedMessages = new ArrayList<>(); + IOManager ioManager = SparkUtils.createIOManager(); + BatchTableWrite write = writeBuilder.newWrite(); + write.withIOManager(ioManager); + while (splitIterator.hasNext()) { + Split split = splitIterator.next(); + try { + TableRead read = + chainGroupReadTable + .newRead() + .withIOManager(ioManager); + RecordReader + reader = read.createReader(split); + try (RecordReader< + org.apache.paimon.data + .InternalRow> + rr = reader) { + RecordReaderIterator< + org.apache.paimon.data + .InternalRow> + it = new RecordReaderIterator<>(rr); + org.apache.paimon.data.InternalRow row; + while ((row = it.next()) != null) { + write.write(row); + } + } + CommitMessageSerializer serializer = + new CommitMessageSerializer(); + List messages = + write.prepareCommit(); + for (CommitMessage commitMessage : messages) { + serializedMessages.add( + serializer.serialize( + commitMessage)); + } + } finally { + write.close(); + ioManager.close(); + } + } + return serializedMessages.iterator(); + }); + + try (BatchTableCommit commit = writeBuilder.newCommit()) { + CommitMessageSerializer serializer = new CommitMessageSerializer(); + List serializedMessages = commitMessageJavaRDD.collect(); + List messages = new ArrayList<>(serializedMessages.size()); + for (byte[] serializedMessage : serializedMessages) { + messages.add(serializer.deserialize(serializer.getVersion(), serializedMessage)); + } + commit.commit(messages); + } catch (Exception e) { + throw new RuntimeException(e); + } + return true; + } + + private Expression getPartitionCondition( + DataSourceV2Relation relation, Table table, String where) { + Expression condition = null; + if (!StringUtils.isNullOrWhitespaceOnly(where)) { + condition = ExpressionUtils.resolveFilter(spark(), relation, where); + checkArgument( + ExpressionUtils.isValidPredicate( + spark(), condition, table.partitionKeys().toArray(new String[0])), + "Only partition predicate is supported, your predicate is %s, but partition keys are %s", + condition, + table.partitionKeys()); + } + return condition; + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public ChainMergeProcedure doBuild() { + return new ChainMergeProcedure(tableCatalog()); + } + }; + } + + private void validataChainMerge( + SparkTable sparkTable, + String targetBranch, + String partitions, + List> compactPartitions) { + checkArgument( + sparkTable.coreOptions().isChainTable(), "chain_merge only supports chain table"); + checkArgument( + targetBranch.equals(sparkTable.coreOptions().scanFallbackSnapshotBranch()), + "chain_merge should merge to snapshot branch"); + checkArgument( + sparkTable.getTable() instanceof FallbackReadFileStoreTable, + "The chain merge should perform on the chain table"); + checkArgument( + compactPartitions.size() == 1 + && compactPartitions.get(0).size() + == sparkTable.table().partitionKeys().size(), + "chain_merge only supports one partition %s", + partitions); + } +} diff --git a/paimon-spark/paimon-spark-ut/pom.xml b/paimon-spark/paimon-spark-ut/pom.xml index 28025755224f..235be7348cfe 100644 --- a/paimon-spark/paimon-spark-ut/pom.xml +++ b/paimon-spark/paimon-spark-ut/pom.xml @@ -95,6 +95,12 @@ under the License. ${project.version} test + + org.apache.paimon + paimon-spark-common_2.12 + 1.5-SNAPSHOT + test + diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java index 1907c7fcf3cc..f39d53e45bc0 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java @@ -2298,4 +2298,71 @@ public void testChainTableWithMultiGroupPartition(@TempDir java.nio.file.Path te spark.close(); } + + @Test + public void testChainMerge(@TempDir java.nio.file.Path tempDir) throws IOException { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = createSparkSessionBuilder(warehousePath); + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + /** Create table */ + spark.sql( + "CREATE TABLE IF NOT EXISTS \n" + + " `my_db1`.`chain_merge_test` (\n" + + " `t1` BIGINT COMMENT 't1',\n" + + " `t2` BIGINT COMMENT 't2',\n" + + " `t3` STRING COMMENT 't3'\n" + + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n" + + "WITH\n" + + " SERDEPROPERTIES ('serialization.format' = '1') STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES (\n" + + " 'bucket-key' = 't1',\n" + + " 'primary-key' = 'dt,t1',\n" + + " 'partition.timestamp-pattern' = '$dt',\n" + + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n" + + " 'chain-table.enabled' = 'true',\n" + + " 'bucket' = '2',\n" + + " 'merge-engine' = 'deduplicate', \n" + + " 'sequence.field' = 't2'\n" + + " )"); + + setupChainTableBranches(spark, "chain_merge_test"); + spark.close(); + + spark = builder.getOrCreate(); + /** Write delta branch */ + spark.sql("set spark.paimon.branch=delta;"); + spark.sql( + "insert overwrite table `my_db1`.`chain_merge_test` partition (dt = '20260514') values (1, 1, '1'),(2, 1, '1');"); + spark.sql( + "insert overwrite table `my_db1`.`chain_merge_test` partition (dt = '20260515') values (1, 2, '1-1' ),(3, 1, '1' );"); + spark.close(); + + /** Chain merge */ + spark = builder.getOrCreate(); + Row[] res = + spark.sql( + "CALL sys.chain_merge(table => 'my_db1.chain_merge_test', partitions => \"dt='20260515'\");") + .collectAsList() + .toArray(new Row[0]); + assertThat(res[0].getBoolean(0)).isTrue(); + spark.close(); + + spark = builder.getOrCreate(); + assertThat( + spark + .sql( + "SELECT t1,t2,t3 FROM `my_db1`.`chain_merge_test$branch_snapshot` where dt = '20260515'") + .collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[1,2,1-1]", "[2,1,1]", "[3,1,1]"); + spark.close(); + + /** Drop table */ + spark = builder.getOrCreate(); + spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_merge_test`;"); + spark.close(); + } }