diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index ffc7005f42c1..8f0917af3240 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -502,5 +502,34 @@ This section introduce all available spark procedures about paimon.
CALL sys.rescale(table => 'default.T', bucket_num => 16, partitions => 'dt=20250217,hh=08;dt=20250217,hh=09')
+
> initProcedureBuilders() {
"trigger_tag_automatic_creation", TriggerTagAutomaticCreationProcedure::builder);
procedureBuilders.put("rewrite_file_index", RewriteFileIndexProcedure::builder);
procedureBuilders.put("copy", CopyFilesProcedure::builder);
+ procedureBuilders.put("load_csv", LoadCsvProcedure::builder);
+ procedureBuilders.put("export_csv", ExportCsvProcedure::builder);
return procedureBuilders.build();
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExportCsvProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExportCsvProcedure.java
new file mode 100644
index 000000000000..384b0709f9d8
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExportCsvProcedure.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Export data from a Paimon table to CSV. Usage:
+ *
+ *
+ * CALL sys.export_csv(
+ * table => 'db.tbl',
+ * path => 'hdfs:///tmp/output',
+ * where => 'dt = "2024-01-01"',
+ * options => map('sep', ',', 'header', 'true'))
+ *
+ *
+ * The output is written as a standard Spark CSV directory at the specified path. If the output
+ * path already exists, it is overwritten. Nested columns (Struct/Array/Map) are serialized as JSON
+ * strings; {@code quoteAll=true} is enabled by default to ensure JSON values containing commas are
+ * properly quoted.
+ *
+ *
Returns a single row with {@code (result, exported_count)}.
+ */
+public class ExportCsvProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.required("path", StringType),
+ ProcedureParameter.optional("where", StringType),
+ ProcedureParameter.optional(
+ "options", DataTypes.createMapType(StringType, StringType)),
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, false, Metadata.empty()),
+ new StructField(
+ "exported_count", DataTypes.LongType, false, Metadata.empty())
+ });
+
+ protected ExportCsvProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ String tableArg = args.getString(0);
+ String path = args.getString(1);
+ String whereClause = args.isNullAt(2) ? null : args.getString(2);
+ Map options =
+ args.isNullAt(3) ? new HashMap<>() : toJavaMap(args.getMap(3));
+
+ options.putIfAbsent("sep", ",");
+ options.putIfAbsent("header", "true");
+ options.putIfAbsent("escape", "\"");
+ options.putIfAbsent("quoteAll", "true");
+
+ Identifier ident = toIdentifier(tableArg, PARAMETERS[0].name());
+ loadSparkTable(ident);
+
+ Dataset dataset = spark().table(fullyQualifiedName(ident));
+
+ if (whereClause != null && !whereClause.trim().isEmpty()) {
+ dataset = dataset.filter(whereClause);
+ }
+
+ dataset = flattenNestedColumns(dataset);
+
+ dataset.cache();
+ long exportedCount;
+ try {
+ exportedCount = dataset.count();
+ dataset.write().format("csv").options(options).mode("overwrite").save(path);
+ } finally {
+ dataset.unpersist();
+ }
+
+ return new InternalRow[] {newInternalRow(true, exportedCount)};
+ }
+
+ private static Dataset flattenNestedColumns(Dataset dataset) {
+ StructType schema = dataset.schema();
+ List cols = new ArrayList<>(schema.fields().length);
+ boolean hasNested = false;
+ for (StructField f : schema.fields()) {
+ if (isNested(f.dataType())) {
+ cols.add(functions.to_json(functions.col(quote(f.name()))).as(f.name()));
+ hasNested = true;
+ } else {
+ cols.add(functions.col(quote(f.name())));
+ }
+ }
+ return hasNested ? dataset.select(cols.toArray(new Column[0])) : dataset;
+ }
+
+ private static boolean isNested(DataType type) {
+ return type instanceof StructType || type instanceof ArrayType || type instanceof MapType;
+ }
+
+ private String fullyQualifiedName(Identifier ident) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(quote(tableCatalog().name()));
+ for (String ns : ident.namespace()) {
+ sb.append('.').append(quote(ns));
+ }
+ sb.append('.').append(quote(ident.name()));
+ return sb.toString();
+ }
+
+ private static String quote(String segment) {
+ return "`" + segment.replace("`", "``") + "`";
+ }
+
+ private static Map toJavaMap(MapData mapData) {
+ HashMap map = new HashMap<>();
+ if (mapData != null) {
+ for (int i = 0; i < mapData.numElements(); i++) {
+ String key = mapData.keyArray().getUTF8String(i).toString();
+ String value =
+ mapData.valueArray().isNullAt(i)
+ ? null
+ : mapData.valueArray().getUTF8String(i).toString();
+ map.put(key, value);
+ }
+ }
+ return map;
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder() {
+ @Override
+ public ExportCsvProcedure doBuild() {
+ return new ExportCsvProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "Export data from a Paimon table to a CSV directory.";
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/LoadCsvProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/LoadCsvProcedure.java
new file mode 100644
index 000000000000..7c48321b20c9
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/LoadCsvProcedure.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.spark.SparkTable;
+
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Load CSV files into an existing Paimon table. Usage:
+ *
+ *
+ * CALL sys.load_csv(
+ * table => 'db.tbl',
+ * path => 'hdfs:///tmp/data.csv',
+ * options => map('sep', ',', 'header', 'true'))
+ *
+ *
+ * The reader uses the target table's schema. CSV header columns are matched to target table
+ * columns by exact name. Missing columns become null, extra columns are dropped. Nested columns
+ * (Struct/Array/Map) are read as STRING and restored via {@code from_json}; if the JSON is invalid,
+ * the nested fields will be null but the row is still written.
+ *
+ *
The procedure always uses PERMISSIVE mode: malformed rows are counted in {@code invalid_count}
+ * and excluded from the write.
+ *
+ *
Returns a single row with {@code (result, imported_count, invalid_count)}.
+ */
+public class LoadCsvProcedure extends BaseProcedure {
+
+ private static final String CORRUPT_RECORD_COL_BASE = "_corrupt_record";
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.required("path", StringType),
+ ProcedureParameter.optional(
+ "options", DataTypes.createMapType(StringType, StringType)),
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, false, Metadata.empty()),
+ new StructField(
+ "imported_count", DataTypes.LongType, false, Metadata.empty()),
+ new StructField(
+ "invalid_count", DataTypes.LongType, false, Metadata.empty())
+ });
+
+ protected LoadCsvProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ String tableArg = args.getString(0);
+ String path = args.getString(1);
+ Map options =
+ args.isNullAt(2) ? new HashMap<>() : toJavaMap(args.getMap(2));
+
+ options.putIfAbsent("header", "true");
+ options.putIfAbsent("sep", ",");
+ options.put("mode", "PERMISSIVE");
+ options.putIfAbsent("multiLine", "true");
+ options.putIfAbsent("escape", "\"");
+
+ if (!"true".equalsIgnoreCase(options.get("header"))) {
+ throw new IllegalArgumentException(
+ "load_csv requires header=true; columns are matched by name.");
+ }
+
+ Identifier ident = toIdentifier(tableArg, PARAMETERS[0].name());
+ SparkTable sparkTable = loadSparkTable(ident);
+ StructType targetSchema = sparkTable.schema();
+
+ CsvReadPlan plan = buildCsvReadPlan(targetSchema, options, path);
+ String corruptCol = plan.corruptRecordCol;
+ options.put("columnNameOfCorruptRecord", corruptCol);
+
+ DataFrameReader reader = spark().read().options(options).schema(plan.readSchema);
+ Dataset raw = reader.csv(path);
+
+ List probeCols = new ArrayList<>();
+ for (StructField f : plan.readSchema.fields()) {
+ if (!corruptCol.equals(f.name())) {
+ probeCols.add(functions.col(quote(f.name())));
+ }
+ }
+ Column probeStruct = functions.struct(probeCols.toArray(new Column[0]));
+
+ Row counts =
+ raw.agg(
+ functions.count(functions.lit(1)).as("total"),
+ functions
+ .sum(
+ functions
+ .when(
+ functions
+ .col(quote(corruptCol))
+ .isNotNull(),
+ 1L)
+ .otherwise(0L))
+ .as("invalid"),
+ functions.first(probeStruct, true).as("_probe"))
+ .first();
+
+ long total = counts.getLong(0);
+ long invalidCount = counts.isNullAt(1) ? 0L : counts.getLong(1);
+ long importedCount = total - invalidCount;
+
+ Dataset notCorrupt = raw.filter(functions.col(quote(corruptCol)).isNull());
+ Dataset valid = notCorrupt.select(plan.projection);
+
+ try {
+ valid.writeTo(fullyQualifiedName(ident)).append();
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException("Failed to write to table: " + fullyQualifiedName(ident), e);
+ }
+
+ refreshSparkCache(ident, sparkTable);
+
+ return new InternalRow[] {newInternalRow(true, importedCount, invalidCount)};
+ }
+
+ private CsvReadPlan buildCsvReadPlan(
+ StructType target, Map options, String path) {
+ Set targetNames = new HashSet<>();
+ for (StructField f : target.fields()) {
+ targetNames.add(f.name());
+ }
+
+ Map peekOpts = new HashMap<>(options);
+ peekOpts.remove("columnNameOfCorruptRecord");
+ peekOpts.put("inferSchema", "false");
+ String[] headerCols = spark().read().options(peekOpts).csv(path).schema().fieldNames();
+ Set headerSet = new HashSet<>(Arrays.asList(headerCols));
+
+ String corruptCol = resolveCorruptRecordCol(headerSet, targetNames);
+
+ StructType readSchema = new StructType();
+ for (String h : headerCols) {
+ DataType rt = StringType;
+ for (StructField f : target.fields()) {
+ if (f.name().equals(h)) {
+ rt = isNested(f.dataType()) ? StringType : f.dataType();
+ break;
+ }
+ }
+ readSchema = readSchema.add(h, rt, true, Metadata.empty());
+ }
+ readSchema = readSchema.add(corruptCol, StringType, true, Metadata.empty());
+
+ List cols = new ArrayList<>(target.fields().length);
+ for (StructField f : target.fields()) {
+ String name = f.name();
+ boolean hasCol = headerSet.contains(name);
+ Column src = hasCol ? functions.col(quote(name)) : functions.lit(null);
+ if (isNested(f.dataType()) && hasCol) {
+ cols.add(functions.from_json(src, f.dataType()).as(name));
+ } else {
+ cols.add(src.cast(f.dataType()).as(name));
+ }
+ }
+ return new CsvReadPlan(readSchema, cols.toArray(new Column[0]), corruptCol);
+ }
+
+ private static String resolveCorruptRecordCol(Set headerCols, Set targetCols) {
+ Set reserved = new HashSet<>();
+ for (String s : headerCols) {
+ reserved.add(s.toLowerCase(Locale.ROOT));
+ }
+ for (String s : targetCols) {
+ reserved.add(s.toLowerCase(Locale.ROOT));
+ }
+ String candidate = CORRUPT_RECORD_COL_BASE;
+ while (reserved.contains(candidate.toLowerCase(Locale.ROOT))) {
+ candidate = "_" + candidate;
+ }
+ return candidate;
+ }
+
+ private static boolean isNested(DataType type) {
+ return type instanceof StructType || type instanceof ArrayType || type instanceof MapType;
+ }
+
+ private String fullyQualifiedName(Identifier ident) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(quote(tableCatalog().name()));
+ for (String ns : ident.namespace()) {
+ sb.append('.').append(quote(ns));
+ }
+ sb.append('.').append(quote(ident.name()));
+ return sb.toString();
+ }
+
+ private static String quote(String segment) {
+ return "`" + segment.replace("`", "``") + "`";
+ }
+
+ private static Map toJavaMap(MapData mapData) {
+ HashMap map = new HashMap<>();
+ if (mapData != null) {
+ for (int i = 0; i < mapData.numElements(); i++) {
+ String key = mapData.keyArray().getUTF8String(i).toString();
+ String value =
+ mapData.valueArray().isNullAt(i)
+ ? null
+ : mapData.valueArray().getUTF8String(i).toString();
+ map.put(key, value);
+ }
+ }
+ return map;
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder() {
+ @Override
+ public LoadCsvProcedure doBuild() {
+ return new LoadCsvProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "Load CSV files into an existing Paimon table.";
+ }
+
+ private static final class CsvReadPlan {
+ final StructType readSchema;
+ final Column[] projection;
+ final String corruptRecordCol;
+
+ CsvReadPlan(StructType readSchema, Column[] projection, String corruptRecordCol) {
+ this.readSchema = readSchema;
+ this.projection = projection;
+ this.corruptRecordCol = corruptRecordCol;
+ }
+ }
+}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CsvProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CsvProcedureTest.scala
new file mode 100644
index 000000000000..414344a83abf
--- /dev/null
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CsvProcedureTest.scala
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+import java.io.{File, PrintWriter}
+import java.nio.file.Files
+
+class CsvProcedureTest extends PaimonSparkTestBase {
+
+ test("Paimon procedure: load_csv basic") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, age INT)
+ |USING PAIMON
+ |""".stripMargin)
+
+ val csvDir = Files.createTempDirectory("csv_load_test").toFile
+ val csvFile = new File(csvDir, "data.csv")
+ val writer = new PrintWriter(csvFile)
+ writer.println("id,name,age")
+ writer.println("1,Alice,30")
+ writer.println("2,Bob,25")
+ writer.println("3,Charlie,35")
+ writer.close()
+
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.load_csv(table => 'test.T', path => '${csvFile.getPath}')"),
+ Row(true, 3L, 0L) :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Row(1, "Alice", 30) :: Row(2, "Bob", 25) :: Row(3, "Charlie", 35) :: Nil
+ )
+ }
+
+ test("Paimon procedure: load_csv with custom separator") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING)
+ |USING PAIMON
+ |""".stripMargin)
+
+ val csvDir = Files.createTempDirectory("csv_load_sep_test").toFile
+ val csvFile = new File(csvDir, "data.csv")
+ val writer = new PrintWriter(csvFile)
+ writer.println("id|name")
+ writer.println("1|Alice")
+ writer.println("2|Bob")
+ writer.close()
+
+ checkAnswer(
+ spark.sql(s"""CALL paimon.sys.load_csv(
+ | table => 'test.T',
+ | path => '${csvFile.getPath}',
+ | options => map('sep', '|'))""".stripMargin),
+ Row(true, 2L, 0L) :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Row(1, "Alice") :: Row(2, "Bob") :: Nil
+ )
+ }
+
+ test("Paimon procedure: load_csv with missing columns") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, city STRING)
+ |USING PAIMON
+ |""".stripMargin)
+
+ val csvDir = Files.createTempDirectory("csv_load_missing_test").toFile
+ val csvFile = new File(csvDir, "data.csv")
+ val writer = new PrintWriter(csvFile)
+ writer.println("id,name")
+ writer.println("1,Alice")
+ writer.close()
+
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.load_csv(table => 'test.T', path => '${csvFile.getPath}')"),
+ Row(true, 1L, 0L) :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("SELECT * FROM T"),
+ Row(1, "Alice", null) :: Nil
+ )
+ }
+
+ test("Paimon procedure: load_csv with nested types") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, info STRUCT)
+ |USING PAIMON
+ |""".stripMargin)
+
+ val csvDir = Files.createTempDirectory("csv_load_nested_test").toFile
+ val csvFile = new File(csvDir, "data.csv")
+ val writer = new PrintWriter(csvFile)
+ writer.println("id,info")
+ writer.println("1,\"{\"\"city\"\":\"\"NYC\"\",\"\"zip\"\":10001}\"")
+ writer.close()
+
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.load_csv(table => 'test.T', path => '${csvFile.getPath}')"),
+ Row(true, 1L, 0L) :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("SELECT id, info.city, info.zip FROM T"),
+ Row(1, "NYC", 10001) :: Nil
+ )
+ }
+
+ test("Paimon procedure: load_csv with invalid nested JSON writes null fields") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, info STRUCT)
+ |USING PAIMON
+ |""".stripMargin)
+
+ val csvDir = Files.createTempDirectory("csv_load_nested_malformed_test").toFile
+ val csvFile = new File(csvDir, "data.csv")
+ val writer = new PrintWriter(csvFile)
+ writer.println("id,info")
+ writer.println("1,\"{\"\"city\"\":\"\"NYC\"\",\"\"zip\"\":10001}\"")
+ writer.println("2,not-json")
+ writer.println("3,\"{\"\"city\"\":\"\"LA\"\",\"\"zip\"\":90001}\"")
+ writer.close()
+
+ // Invalid nested JSON is not detected as a malformed row — from_json produces null fields
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.load_csv(table => 'test.T', path => '${csvFile.getPath}')"),
+ Row(true, 3L, 0L) :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("SELECT id, info.city, info.zip FROM T ORDER BY id"),
+ Row(1, "NYC", 10001) :: Row(2, null, null) :: Row(3, "LA", 90001) :: Nil
+ )
+ }
+
+ test("Paimon procedure: export_csv basic") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, age INT)
+ |USING PAIMON
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'Alice', 30), (2, 'Bob', 25), (3, 'Charlie', 35)")
+
+ val outputPath = Files.createTempDirectory("csv_export_test").toFile.getPath + "/output.csv"
+
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.export_csv(table => 'test.T', path => '$outputPath')"),
+ Row(true, 3L) :: Nil
+ )
+
+ val exported = spark.read.option("header", "true").option("inferSchema", "true").csv(outputPath)
+ assert(exported.count() == 3)
+ checkAnswer(
+ exported.select("id", "name", "age").orderBy("id"),
+ Row(1, "Alice", 30) :: Row(2, "Bob", 25) :: Row(3, "Charlie", 35) :: Nil
+ )
+ }
+
+ test("Paimon procedure: export_csv with where clause") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING)
+ |USING PAIMON
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')")
+
+ val outputPath = Files.createTempDirectory("csv_export_where_test").toFile.getPath + "/out.csv"
+
+ checkAnswer(
+ spark.sql(s"""CALL paimon.sys.export_csv(
+ | table => 'test.T',
+ | path => '$outputPath',
+ | where => 'id > 1')""".stripMargin),
+ Row(true, 2L) :: Nil
+ )
+
+ val exported = spark.read.option("header", "true").option("inferSchema", "true").csv(outputPath)
+ assert(exported.count() == 2)
+ }
+
+ test("Paimon procedure: export_csv with nested types") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, tags ARRAY)
+ |USING PAIMON
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, array('a', 'b'))")
+
+ val outputPath =
+ Files.createTempDirectory("csv_export_nested_test").toFile.getPath + "/out.csv"
+
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.export_csv(table => 'test.T', path => '$outputPath')"),
+ Row(true, 1L) :: Nil
+ )
+
+ val exported = spark.read.option("header", "true").option("escape", "\"").csv(outputPath)
+ val tagsStr = exported.collect()(0).getString(1)
+ assert(tagsStr.contains("a") && tagsStr.contains("b"))
+ }
+
+ test("Paimon procedure: load_csv and export_csv roundtrip") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, score DOUBLE)
+ |USING PAIMON
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'Alice', 95.5), (2, 'Bob', 88.0), (3, 'Charlie', 72.3)")
+
+ val exportPath =
+ Files.createTempDirectory("csv_roundtrip_test").toFile.getPath + "/export.csv"
+
+ spark.sql(s"CALL paimon.sys.export_csv(table => 'test.T', path => '$exportPath')")
+
+ spark.sql("DROP TABLE T")
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, score DOUBLE)
+ |USING PAIMON
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.load_csv(table => 'test.T', path => '$exportPath')"),
+ Row(true, 3L, 0L) :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Row(1, "Alice", 95.5) :: Row(2, "Bob", 88.0) :: Row(3, "Charlie", 72.3) :: Nil
+ )
+ }
+
+ test("Paimon procedure: load_csv with _corrupt_record column in target table") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, _corrupt_record STRING, value INT)
+ |USING PAIMON
+ |""".stripMargin)
+
+ val csvDir = Files.createTempDirectory("csv_load_corrupt_col_test").toFile
+ val csvFile = new File(csvDir, "data.csv")
+ val writer = new PrintWriter(csvFile)
+ writer.println("id,_corrupt_record,value")
+ writer.println("1,some_text,100")
+ writer.println("2,other_text,not_int")
+ writer.println("3,more_text,300")
+ writer.close()
+
+ // Row 2 has a malformed 'value' (not_int for INT column) —
+ // internal bad-row counting should still work despite the column name collision
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.load_csv(table => 'test.T', path => '${csvFile.getPath}')"),
+ Row(true, 2L, 1L) :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Row(1, "some_text", 100) :: Row(3, "more_text", 300) :: Nil
+ )
+ }
+
+ test("Paimon procedure: load_csv with reserved corrupt record column name collision") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, `_CORRUPT_RECORD` STRING, value INT)
+ |USING PAIMON
+ |""".stripMargin)
+
+ val csvDir = Files.createTempDirectory("csv_load_corrupt_upper_test").toFile
+ val csvFile = new File(csvDir, "data.csv")
+ val writer = new PrintWriter(csvFile)
+ writer.println("id,_CORRUPT_RECORD,value")
+ writer.println("1,hello,10")
+ writer.println("2,world,not_int")
+ writer.println("3,foo,30")
+ writer.close()
+
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.load_csv(table => 'test.T', path => '${csvFile.getPath}')"),
+ Row(true, 2L, 1L) :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Row(1, "hello", 10) :: Row(3, "foo", 30) :: Nil
+ )
+ }
+
+ test("Paimon procedure: load_csv and export_csv with dotted column names") {
+ spark.sql("""
+ |CREATE TABLE T (`id` INT, `user.name` STRING, `user.age` INT)
+ |USING PAIMON
+ |""".stripMargin)
+
+ val csvDir = Files.createTempDirectory("csv_load_dotted_test").toFile
+ val csvFile = new File(csvDir, "data.csv")
+ val writer = new PrintWriter(csvFile)
+ writer.println("id,user.name,user.age")
+ writer.println("1,Alice,30")
+ writer.println("2,Bob,25")
+ writer.close()
+
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.load_csv(table => 'test.T', path => '${csvFile.getPath}')"),
+ Row(true, 2L, 0L) :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("SELECT `id`, `user.name`, `user.age` FROM T ORDER BY id"),
+ Row(1, "Alice", 30) :: Row(2, "Bob", 25) :: Nil
+ )
+
+ val exportPath =
+ Files.createTempDirectory("csv_export_dotted_test").toFile.getPath + "/out.csv"
+
+ checkAnswer(
+ spark.sql(s"CALL paimon.sys.export_csv(table => 'test.T', path => '$exportPath')"),
+ Row(true, 2L) :: Nil
+ )
+
+ val exported = spark.read.option("header", "true").option("inferSchema", "true").csv(exportPath)
+ assert(exported.columns.contains("user.name"))
+ assert(exported.count() == 2)
+ }
+}