diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md index 74bec3f9e7d3..3a05500cccf8 100644 --- a/docs/docs/spark/sql-write.md +++ b/docs/docs/spark/sql-write.md @@ -294,3 +294,158 @@ INSERT INTO t VALUES (1, '1'), (2, '2'); -- Need using `BY NAME` statement (requires Spark 3.5+) INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c; ``` + +## COPY INTO + +`COPY INTO` provides a SQL command for bulk loading CSV files into Paimon tables and writing table data to CSV files. + +### CSV Import + +```sql +COPY INTO table_name [(col1, col2, ...)] +FROM 'source_path' +FILE_FORMAT = (TYPE = CSV [, option = value, ...]) +[PATTERN = 'regex'] +[FORCE = TRUE|FALSE] +[ON_ERROR = ABORT_STATEMENT] +``` + +**Basic import:** + +```sql +COPY INTO my_db.my_table +FROM '/data/csv_files/' +FILE_FORMAT = (TYPE = CSV); +``` + +**Import with explicit column mapping:** + +```sql +-- Only load into specified columns; omitted columns use their DEFAULT value or NULL +COPY INTO my_db.users (id, name) +FROM '/data/new_users/' +FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1); +``` + +**Import with NULL_IF and PATTERN:** + +```sql +COPY INTO my_db.events +FROM '/data/logs/' +FILE_FORMAT = (TYPE = CSV, FIELD_DELIMITER = '|', NULL_IF = ('NULL', '\\N', '')) +PATTERN = '.*\.csv' +FORCE = FALSE; +``` + +### Write CSV Files + +```sql +COPY INTO 'target_path' +FROM table_name +FILE_FORMAT = (TYPE = CSV [, option = value, ...]) +[OVERWRITE = TRUE|FALSE] +``` + +**Write with header and overwrite:** + +```sql +COPY INTO '/export/users_backup/' +FROM my_db.users +FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER = ',') +OVERWRITE = TRUE; +``` + +### FILE_FORMAT Options + +`FILE_FORMAT` is required and must include `TYPE = CSV`. + +**Import options:** + +| Option | Description | Default | +|--------|-------------|---------| +| TYPE | File format type. Must be `CSV`. | (required) | +| FIELD_DELIMITER | Column delimiter character. | `,` | +| SKIP_HEADER | Skip the first line as header. Only `0` or `1`. | `0` | +| QUOTE | Quote character for enclosing fields. | `"` | +| ESCAPE | Escape character within quoted fields. | `\` | +| NULL_IF | List of string values to interpret as NULL, e.g. `('NULL', '\\N')`. | (none) | +| EMPTY_FIELD_AS_NULL | Treat empty fields as NULL. `TRUE` or `FALSE`. | `FALSE` | +| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` | + +**Write options:** + +| Option | Description | Default | +|--------|-------------|---------| +| TYPE | File format type. Must be `CSV`. | (required) | +| FIELD_DELIMITER | Column delimiter character. | `,` | +| HEADER | Write column names as the first line. `TRUE` or `FALSE`. | `FALSE` | +| QUOTE | Quote character for enclosing fields. | `"` | +| ESCAPE | Escape character within quoted fields. | `\` | +| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` | + +### Import Options + +| Option | Description | Default | +|--------|-------------|---------| +| PATTERN | Regex to filter source files by base file name. Only matching files are loaded. | (all files) | +| FORCE | `FALSE`: skip files already loaded (idempotent). `TRUE`: reload all files. | `FALSE` | +| ON_ERROR | Error handling strategy. Only `ABORT_STATEMENT` is supported. | `ABORT_STATEMENT` | + +### File Write Options + +| Option | Description | Default | +|--------|-------------|---------| +| OVERWRITE | `FALSE`: fail if target path exists. `TRUE`: overwrite existing files. | `FALSE` | + +### Column Mapping + +When an explicit column list is provided (e.g., `COPY INTO t (col1, col2) FROM ...`): + +- CSV columns are mapped **positionally** to the specified column list. +- The number of CSV columns must match the column list length. +- Columns not in the list are filled with their **DEFAULT value** (if defined in the table schema) or **NULL**. +- Non-nullable columns without a default value that are not in the list will cause an error. + +When no column list is provided: + +- CSV columns are mapped positionally to all writable columns in the target table. +- The number of CSV columns must match the number of writable columns. + +### Repeated Imports + +By default (`FORCE = FALSE`), COPY INTO tracks which files have been successfully loaded. A file is identified by its path, size, and last-modified timestamp. + +- Re-running the same COPY INTO command will **skip** already-loaded files and return status `SKIPPED`. +- If a source file is modified (size or timestamp changes), it becomes eligible for re-loading. +- `FORCE = TRUE` bypasses load history and always re-imports all matching files. + +### Result Output + +**Import** returns one row per source file: + +| Column | Type | Description | +|--------|------|-------------| +| file_name | STRING | Source file name | +| status | STRING | `LOADED` or `SKIPPED` | +| rows_loaded | BIGINT | Number of rows written | +| rows_parsed | BIGINT | Number of rows parsed from the file | + +**File write** returns a single row: + +| Column | Type | Description | +|--------|------|-------------| +| output_path | STRING | Target output path | +| file_count | INT | Number of files written | +| rows_written | BIGINT | Total rows written | + +### Limitations + +- Only **CSV** format is supported. +- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not supported. +- `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the entire command. +- `SINGLE = TRUE` (single-file output) is not supported. +- File format options must be specified inline in `FILE_FORMAT = (...)`. +- File listing is **non-recursive**: only direct files under the source path are processed. Subdirectories are ignored. +- `PATTERN` matches the **base file name** only (not the full path). +- Concurrent COPY INTO commands targeting the same table may produce duplicate data. +- `SKIP_HEADER` only supports values `0` or `1`. diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala new file mode 100644 index 000000000000..d1515031e525 --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class CopyIntoTest extends CopyIntoTestBase {} diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala new file mode 100644 index 000000000000..d1515031e525 --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class CopyIntoTest extends CopyIntoTestBase {} diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala new file mode 100644 index 000000000000..d1515031e525 --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class CopyIntoTest extends CopyIntoTestBase {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala new file mode 100644 index 000000000000..d1515031e525 --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class CopyIntoTest extends CopyIntoTestBase {} diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala index 0398df809f00..67f72d953a3c 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala @@ -122,7 +122,7 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf .replaceAll("/\\*.*?\\*/", " ") .replaceAll("`", "") .trim() - isPaimonProcedure(normalized) || isTagRefDdl(normalized) + isPaimonProcedure(normalized) || isTagRefDdl(normalized) || isCopyInto(normalized) } // All builtin paimon procedures are under the 'sys' namespace @@ -140,6 +140,10 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf normalized.contains("delete tag"))) } + private def isCopyInto(normalized: String): Boolean = { + normalized.startsWith("copy into") + } + protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = { val lexer = new PaimonSqlExtensionsLexer( new UpperCaseCharStream(CharStreams.fromString(command))) diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala new file mode 100644 index 000000000000..d1515031e525 --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class CopyIntoTest extends CopyIntoTestBase {} diff --git a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala new file mode 100644 index 000000000000..d1515031e525 --- /dev/null +++ b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class CopyIntoTest extends CopyIntoTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index 207d9732160f..4255e530c122 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -74,6 +74,16 @@ statement | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag | ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag | ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag + | COPY INTO multipartIdentifier ('(' columnList ')')? + FROM sourcePath=STRING + fileFormatClause + patternClause? + forceClause? + onErrorClause? #copyIntoTable + | COPY INTO targetPath=STRING + FROM multipartIdentifier + fileFormatClause + overwriteClause? #copyIntoLocation ; callArgument @@ -104,6 +114,42 @@ timeUnit | MINUTES ; +columnList + : identifier (',' identifier)* + ; + +fileFormatClause + : FILE_FORMAT '=' '(' fileFormatOption (',' fileFormatOption)* ')' + ; + +fileFormatOption + : key=identifier '=' fileFormatValue + ; + +fileFormatValue + : STRING #stringFormatValue + | identifier #identFormatValue + | booleanValue #boolFormatValue + | INTEGER_VALUE #intFormatValue + | '(' STRING (',' STRING)* ')' #listFormatValue + ; + +patternClause + : PATTERN '=' STRING + ; + +forceClause + : FORCE '=' booleanValue + ; + +onErrorClause + : ON_ERROR '=' ABORT_STATEMENT + ; + +overwriteClause + : OVERWRITE '=' booleanValue + ; + expression : constant | stringMap @@ -155,6 +201,8 @@ nonReserved | REPLACE | RETAIN | VERSION | TAG | TRUE | FALSE | MAP + | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR | ABORT_STATEMENT | OVERWRITE + | CSV ; ALTER: 'ALTER'; @@ -185,6 +233,17 @@ FALSE: 'FALSE'; MAP: 'MAP'; +COPY: 'COPY'; +INTO: 'INTO'; +FROM: 'FROM'; +FILE_FORMAT: 'FILE_FORMAT'; +PATTERN: 'PATTERN'; +FORCE: 'FORCE'; +ON_ERROR: 'ON_ERROR'; +ABORT_STATEMENT: 'ABORT_STATEMENT'; +OVERWRITE: 'OVERWRITE'; +CSV: 'CSV'; + PLUS: '+'; MINUS: '-'; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala new file mode 100644 index 000000000000..3e5d0a066651 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import org.apache.paimon.spark.leafnode.PaimonLeafCommand + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType} + +case class CopyIntoLocationCommand( + targetPath: String, + table: Seq[String], + fileFormat: CopyFileFormat, + overwrite: Boolean) + extends PaimonLeafCommand { + + override def output: Seq[Attribute] = Seq( + AttributeReference("output_path", StringType, nullable = false)(), + AttributeReference("file_count", IntegerType, nullable = false)(), + AttributeReference("rows_written", LongType, nullable = false)() + ) + + override def simpleString(maxFields: Int): String = { + s"CopyIntoLocation: target=$targetPath, source=$table" + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala new file mode 100644 index 000000000000..eedad9763ed6 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import org.apache.paimon.spark.leafnode.PaimonLeafCommand + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.{LongType, StringType} + +case class CopyIntoTableCommand( + table: Seq[String], + columns: Option[Seq[String]], + sourcePath: String, + fileFormat: CopyFileFormat, + pattern: Option[String], + force: Boolean) + extends PaimonLeafCommand { + + override def output: Seq[Attribute] = Seq( + AttributeReference("file_name", StringType, nullable = false)(), + AttributeReference("status", StringType, nullable = false)(), + AttributeReference("rows_loaded", LongType, nullable = false)(), + AttributeReference("rows_parsed", LongType, nullable = false)() + ) + + override def simpleString(maxFields: Int): String = { + s"CopyIntoTable: table=$table, source=$sourcePath" + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala new file mode 100644 index 000000000000..2e2f7e2ec1e0 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +sealed trait FileFormatType + +object FileFormatType { + case object CSV extends FileFormatType + case class Unsupported(name: String) extends FileFormatType +} + +case class CopyFileFormat(formatType: FileFormatType, options: Map[String, String]) { + + def toSparkReaderOptions: Map[String, String] = { + val mapped = scala.collection.mutable.Map[String, String]("mode" -> "FAILFAST") + options.foreach { + case (k, v) => + k match { + case "FIELD_DELIMITER" => mapped("sep") = v + case "QUOTE" => mapped("quote") = v + case "ESCAPE" => mapped("escape") = v + case "COMPRESSION" => mapped("compression") = v + case "SKIP_HEADER" => + mapped("header") = if (v == "1" || v.equalsIgnoreCase("TRUE")) "true" else "false" + case _ => + } + } + mapped.toMap + } + + def toSparkWriterOptions: Map[String, String] = { + val mapped = scala.collection.mutable.Map[String, String]() + options.foreach { + case (k, v) => + k match { + case "FIELD_DELIMITER" => mapped("sep") = v + case "HEADER" => mapped("header") = v.toLowerCase + case "QUOTE" => mapped("quote") = v + case "ESCAPE" => mapped("escape") = v + case "COMPRESSION" => mapped("compression") = v + case _ => + } + } + mapped.toMap + } + + def nullIfValues: Seq[String] = { + options.get("NULL_IF") match { + case Some(v) if v.nonEmpty => + v.split(CopyFileFormat.LIST_SEPARATOR, -1).toSeq + case _ => Seq.empty + } + } + + def emptyFieldAsNull: Boolean = { + options.get("EMPTY_FIELD_AS_NULL").exists(v => v == "TRUE" || v.equalsIgnoreCase("TRUE")) + } + + def validateForImport(): Unit = { + validateFormatType() + if (options.contains("MODE")) { + throw new IllegalArgumentException( + "MODE cannot be specified in FILE_FORMAT options; it is reserved for ON_ERROR handling") + } + val invalid = options.keys.filterNot(CopyFileFormat.VALID_IMPORT_KEYS.contains) + if (invalid.nonEmpty) { + throw new IllegalArgumentException( + s"Unsupported FILE_FORMAT options for import: ${invalid.mkString(", ")}") + } + options.get("SKIP_HEADER").foreach { + v => + val intVal = + try v.toInt + catch { case _: NumberFormatException => -1 } + if (intVal != 0 && intVal != 1) { + throw new IllegalArgumentException(s"SKIP_HEADER supports only 0 or 1, got: $v") + } + } + } + + def validateForExport(): Unit = { + validateFormatType() + val invalid = options.keys.filterNot(CopyFileFormat.VALID_EXPORT_KEYS.contains) + if (invalid.nonEmpty) { + throw new IllegalArgumentException( + s"Unsupported FILE_FORMAT options for export: ${invalid.mkString(", ")}") + } + } + + private def validateFormatType(): Unit = { + formatType match { + case FileFormatType.CSV => + case FileFormatType.Unsupported(name) => + throw new IllegalArgumentException( + s"Unsupported file format type: $name. Only CSV is currently supported") + } + } +} + +object CopyFileFormat { + + val VALID_IMPORT_KEYS: Set[String] = Set( + "FIELD_DELIMITER", + "SKIP_HEADER", + "QUOTE", + "ESCAPE", + "NULL_IF", + "EMPTY_FIELD_AS_NULL", + "COMPRESSION" + ) + + val VALID_EXPORT_KEYS: Set[String] = Set( + "FIELD_DELIMITER", + "HEADER", + "QUOTE", + "ESCAPE", + "COMPRESSION" + ) + + // Unit Separator (U+001F) used to encode multi-value lists in a single string + val LIST_SEPARATOR: String = "\u001f" + + def parseFormatType(typeStr: String): FileFormatType = { + typeStr.toUpperCase match { + case "CSV" => FileFormatType.CSV + case other => FileFormatType.Unsupported(other) + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyLoadHistoryManager.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyLoadHistoryManager.scala new file mode 100644 index 000000000000..c85a5521519b --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyLoadHistoryManager.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.copyinto + +import org.apache.paimon.fs.{FileIO, Path} +import org.apache.paimon.utils.JsonSerdeUtil + +import java.nio.charset.StandardCharsets +import java.security.MessageDigest +import java.util.{LinkedHashMap => JLinkedHashMap} + +case class CopyLoadRecord( + filePath: String, + fileSize: Long, + lastModified: Long, + loadedAt: Long, + snapshotId: Long, + rowsLoaded: Long) { + + def toJson: String = { + val map = new JLinkedHashMap[String, AnyRef]() + map.put("filePath", filePath) + map.put("fileSize", java.lang.Long.valueOf(fileSize)) + map.put("lastModified", java.lang.Long.valueOf(lastModified)) + map.put("loadedAt", java.lang.Long.valueOf(loadedAt)) + map.put("snapshotId", java.lang.Long.valueOf(snapshotId)) + map.put("rowsLoaded", java.lang.Long.valueOf(rowsLoaded)) + JsonSerdeUtil.toJson(map) + } +} + +object CopyLoadRecord { + def fromJson(json: String): CopyLoadRecord = { + val map = JsonSerdeUtil.parseJsonMap(json, classOf[AnyRef]) + CopyLoadRecord( + filePath = map.get("filePath").toString, + fileSize = map.get("fileSize").toString.toLong, + lastModified = map.get("lastModified").toString.toLong, + loadedAt = map.get("loadedAt").toString.toLong, + snapshotId = map.get("snapshotId").toString.toLong, + rowsLoaded = map.get("rowsLoaded").toString.toLong + ) + } +} + +class CopyLoadHistoryManager(fileIO: FileIO, tablePath: Path) { + + private val LOG = org.slf4j.LoggerFactory.getLogger(classOf[CopyLoadHistoryManager]) + + private val historyDir = new Path(tablePath, "copy-into/history") + + def isLoaded(filePath: String, fileSize: Long, lastModified: Long): Boolean = { + val prefix = s"load-${sha256(filePath)}-" + if (!fileIO.exists(historyDir)) return false + try { + val files = fileIO.listStatus(historyDir) + files.exists { + status => + val name = status.getPath.getName + if (name.startsWith(prefix)) { + try { + val content = fileIO.readFileUtf8(status.getPath) + val record = CopyLoadRecord.fromJson(content) + record.fileSize == fileSize && record.lastModified == lastModified + } catch { + case e: Exception => + LOG.warn(s"Failed to read load history record ${status.getPath}: ${e.getMessage}") + false + } + } else { + false + } + } + } catch { + case e: Exception => + LOG.warn(s"Failed to list load history directory $historyDir: ${e.getMessage}") + false + } + } + + def recordLoaded(record: CopyLoadRecord): Unit = { + fileIO.mkdirs(historyDir) + val hash = sha256(record.filePath) + val recordPath = new Path(historyDir, s"load-$hash-${record.loadedAt}") + fileIO.overwriteFileUtf8(recordPath, record.toJson) + } + + private def sha256(input: String): String = { + val digest = MessageDigest.getInstance("SHA-256") + val hash = digest.digest(input.getBytes(StandardCharsets.UTF_8)) + hash.map("%02x".format(_)).mkString.take(16) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala new file mode 100644 index 000000000000..f4f0720b289a --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.paimon.spark.catalyst.plans.logical.CopyFileFormat +import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.unsafe.types.UTF8String + +case class CopyIntoLocationExec( + spark: SparkSession, + catalog: TableCatalog, + ident: Identifier, + targetPath: String, + fileFormat: CopyFileFormat, + overwrite: Boolean, + out: Seq[Attribute]) + extends PaimonLeafV2CommandExec { + + override def output: Seq[Attribute] = out + + override protected def run(): Seq[InternalRow] = { + fileFormat.validateForExport() + + val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident) + val df = spark.table(tableName) + + val rowCount = df.count() + + val writerOptions = fileFormat.toSparkWriterOptions + val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists + + df.write.options(writerOptions).mode(saveMode).csv(targetPath) + + val hadoopConf = spark.sessionState.newHadoopConf() + val fsPath = new Path(targetPath) + val fs = fsPath.getFileSystem(hadoopConf) + val fileCount = if (fs.exists(fsPath)) { + fs.listStatus(fsPath).count(_.isFile) + } else { + 0 + } + + Seq(InternalRow(UTF8String.fromString(targetPath), fileCount, rowCount)) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala new file mode 100644 index 000000000000..260add434969 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.catalyst.plans.logical.CopyFileFormat +import org.apache.paimon.spark.copyinto.{CopyLoadHistoryManager, CopyLoadRecord} +import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.types.DataField + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.functions.{col, input_file_name, lit, when} +import org.apache.spark.sql.paimon.shims.SparkShimLoader +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class CopyIntoTableExec( + spark: SparkSession, + catalog: TableCatalog, + ident: Identifier, + sourcePath: String, + columns: Option[Seq[String]], + fileFormat: CopyFileFormat, + pattern: Option[String], + force: Boolean, + out: Seq[Attribute]) + extends PaimonLeafV2CommandExec { + + override def output: Seq[Attribute] = out + + override protected def run(): Seq[InternalRow] = { + fileFormat.validateForImport() + + val table = catalog.loadTable(ident) + assert(table.isInstanceOf[SparkTable]) + val paimonTable = table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable] + val tableSchema = paimonTable.schema() + val writableColumns = tableSchema.fieldNames().asScala.toSeq + val fields = tableSchema.fields().asScala.toSeq + val targetColumns = resolveTargetColumns(writableColumns) + + validateNonNullableDefaults(writableColumns, targetColumns, fields) + + val (filesToLoad, skippedFiles) = listAndFilterFiles(paimonTable) + + if (filesToLoad.isEmpty) { + return buildSkippedResults(skippedFiles) + } + + val filePaths = filesToLoad.map(_.getPath.toString) + val stringSchema = StructType( + (0 until targetColumns.size).map(i => StructField(s"_c$i", StringType, nullable = true))) + val readerOptions = fileFormat.toSparkReaderOptions + + val csvDf = readAndProcessCsv(filePaths, stringSchema, readerOptions) + val finalDf = + buildFinalDataFrame(csvDf, targetColumns, writableColumns, fields) + val castedDf = castAndValidate(finalDf, writableColumns, fields) + + val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident) + castedDf.write.format("paimon").mode("append").insertInto(tableName) + + recordHistoryAndBuildResults( + paimonTable, + filesToLoad, + skippedFiles, + filePaths, + stringSchema, + readerOptions) + } + + private def resolveTargetColumns(writableColumns: Seq[String]): Seq[String] = { + columns match { + case Some(cols) => + val resolver = spark.sessionState.conf.resolver + cols.indices.foreach { + i => + cols.indices.filter(_ > i).foreach { + j => + if (resolver(cols(i), cols(j))) { + throw new IllegalArgumentException( + s"Duplicate columns in column list: ${cols(i)}") + } + } + } + cols.map { + c => + writableColumns.find(w => resolver(w, c)).getOrElse { + throw new IllegalArgumentException( + s"Column '$c' does not exist in target table. Available columns: ${writableColumns.mkString(", ")}") + } + } + case None => writableColumns + } + } + + private def validateNonNullableDefaults( + writableColumns: Seq[String], + targetColumns: Seq[String], + fields: Seq[DataField]): Unit = { + if (columns.isEmpty) return + val unmapped = writableColumns.filterNot(targetColumns.contains) + unmapped.foreach { + colName => + val field = fields.find(_.name() == colName).get + if (!field.`type`().isNullable && field.defaultValue() == null) { + throw new IllegalArgumentException( + s"Non-nullable column '$colName' is not in the column list and has no default value") + } + } + } + + private def listAndFilterFiles( + paimonTable: FileStoreTable): (Array[FileStatus], Array[FileStatus]) = { + val hadoopConf = spark.sessionState.newHadoopConf() + val fsPath = new Path(sourcePath) + val fs = fsPath.getFileSystem(hadoopConf) + val allFiles = fs.listStatus(fsPath).filter(_.isFile) + + val patternFiltered = pattern match { + case Some(p) => + val regex = p.r + allFiles.filter(f => regex.findFirstIn(f.getPath.getName).isDefined) + case None => allFiles + } + + if (patternFiltered.isEmpty) { + return (Array.empty, Array.empty) + } + + val paimonPath = new org.apache.paimon.fs.Path(paimonTable.location().toString) + val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), paimonPath) + + if (!force) { + val (skip, load) = patternFiltered.partition { + f => historyManager.isLoaded(f.getPath.toString, f.getLen, f.getModificationTime) + } + (load, skip) + } else { + (patternFiltered, Array.empty[FileStatus]) + } + } + + private def readAndProcessCsv( + filePaths: Array[String], + stringSchema: StructType, + readerOptions: Map[String, String]): DataFrame = { + var df = spark.read + .options(readerOptions) + .schema(stringSchema) + .csv(filePaths: _*) + + val nullIfVals = fileFormat.nullIfValues + if (nullIfVals.nonEmpty) { + df.columns.foreach { + colName => + df = df.withColumn( + colName, + when(col(colName).isin(nullIfVals: _*), lit(null).cast(StringType)) + .otherwise(col(colName))) + } + } + + if (fileFormat.emptyFieldAsNull) { + df.columns.foreach { + colName => + df = df.withColumn( + colName, + when(col(colName) === lit(""), lit(null).cast(StringType)) + .otherwise(col(colName))) + } + } + + df + } + + private def buildFinalDataFrame( + csvDf: DataFrame, + targetColumns: Seq[String], + writableColumns: Seq[String], + fields: Seq[DataField]): DataFrame = { + val renamedDf = targetColumns.zipWithIndex.foldLeft(csvDf) { + case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx", targetCol) + } + + if (columns.isDefined) { + val selectExprs: Seq[Column] = writableColumns.map { + colName => + if (targetColumns.contains(colName)) { + col(colName) + } else { + val field = fields.find(_.name() == colName).get + val defaultVal = field.defaultValue() + if (defaultVal != null) { + val sparkType = + org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`()) + try { + val parsed = spark.sessionState.sqlParser.parseExpression(defaultVal) + SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName) + } catch { + case _: Exception => lit(null).cast(sparkType).as(colName) + } + } else { + lit(null).as(colName) + } + } + } + renamedDf.select(selectExprs: _*) + } else { + renamedDf + } + } + + private def castAndValidate( + finalDf: DataFrame, + writableColumns: Seq[String], + fields: Seq[DataField]): DataFrame = { + val nonStringCastCols = ArrayBuffer[String]() + var castedDf = finalDf + writableColumns.zip(fields).foreach { + case (colName, field) => + val sparkType = org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`()) + castedDf = castedDf.withColumn(colName, col(colName).cast(sparkType)) + if (sparkType != StringType) { + nonStringCastCols += colName + } + } + + if (nonStringCastCols.nonEmpty) { + val castSuffix = "__cv" + val validationDf = nonStringCastCols.zipWithIndex.foldLeft(finalDf) { + case (df, (colName, idx)) => + val field = fields.find(_.name() == colName).get + val sparkType = org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`()) + df.withColumn(castSuffix + idx, col(colName).cast(sparkType)) + } + val badCastFilter = nonStringCastCols.zipWithIndex + .map { case (cn, idx) => col(cn).isNotNull && col(castSuffix + idx).isNull } + .reduce(_ || _) + val badRows = validationDf.filter(badCastFilter).limit(1).collect() + if (badRows.nonEmpty) { + val example = nonStringCastCols.zipWithIndex.find { + case (cn, idx) => + val row = badRows(0) + val srcIdx = validationDf.schema.fieldIndex(cn) + val dstIdx = validationDf.schema.fieldIndex(castSuffix + idx) + !row.isNullAt(srcIdx) && row.isNullAt(dstIdx) + } + throw new IllegalArgumentException( + s"ON_ERROR = ABORT_STATEMENT: Cast failure in column '${example.map(_._1).getOrElse("unknown")}'. Source data contains values that cannot be converted to the target type.") + } + } + + castedDf + } + + private def recordHistoryAndBuildResults( + paimonTable: FileStoreTable, + filesToLoad: Array[FileStatus], + skippedFiles: Array[FileStatus], + filePaths: Array[String], + stringSchema: StructType, + readerOptions: Map[String, String]): Seq[InternalRow] = { + val paimonPath = new org.apache.paimon.fs.Path(paimonTable.location().toString) + val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), paimonPath) + val snapshotId = paimonTable.snapshotManager().latestSnapshotId() + val loadedAt = System.currentTimeMillis() + + val rowCounts = spark.read + .options(readerOptions) + .schema(stringSchema) + .csv(filePaths: _*) + .groupBy(input_file_name().as("file")) + .count() + .collect() + + val fileCountMap = rowCounts.map { + row => + val fullPath = row.getString(0) + val baseName = fullPath.substring(fullPath.lastIndexOf('/') + 1) + baseName -> row.getLong(1) + }.toMap + + val loadedResults = filesToLoad.map { + fileStatus => + val baseName = fileStatus.getPath.getName + val rowCount = fileCountMap.getOrElse(baseName, 0L) + + historyManager.recordLoaded( + CopyLoadRecord( + filePath = fileStatus.getPath.toString, + fileSize = fileStatus.getLen, + lastModified = fileStatus.getModificationTime, + loadedAt = loadedAt, + snapshotId = snapshotId, + rowsLoaded = rowCount + )) + + InternalRow( + UTF8String.fromString(baseName), + UTF8String.fromString("LOADED"), + rowCount, + rowCount) + }.toSeq + + val skippedResults = buildSkippedResults(skippedFiles) + loadedResults ++ skippedResults + } + + private def buildSkippedResults(files: Array[FileStatus]): Seq[InternalRow] = { + files.map { + f => + InternalRow( + UTF8String.fromString(f.getPath.getName), + UTF8String.fromString("SKIPPED"), + 0L, + 0L) + }.toSeq + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala new file mode 100644 index 000000000000..dd85af058d56 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.spark.sql.connector.catalog.Identifier + +object CopyIntoUtils { + + def quoteIdentifier(catalogName: String, ident: Identifier): String = { + val parts = Seq(catalogName) ++ + ident.namespace().toSeq ++ + Seq(ident.name()) + parts.filter(_.nonEmpty).map(p => s"`${p.replace("`", "``")}`").mkString(".") + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index 63c61a16e8b5..76bd6beb1559 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -22,7 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkTable, SparkUtils} import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView} import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView -import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand, TruncatePaimonTableWithFilter} +import org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand, CopyIntoTableCommand, CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand, TruncatePaimonTableWithFilter} import org.apache.paimon.table.Table import org.apache.spark.sql.SparkSession @@ -149,6 +149,28 @@ case class PaimonStrategy(spark: SparkSession) partitionPredicate: Option[PartitionPredicate]) => TruncatePaimonTableWithFilterExec(table, partitionPredicate) :: Nil + case c @ CopyIntoTableCommand(PaimonCatalogAndIdentifier(catalog, ident), _, _, _, _, _) => + CopyIntoTableExec( + spark, + catalog, + ident, + c.sourcePath, + c.columns, + c.fileFormat, + c.pattern, + c.force, + c.output) :: Nil + + case c @ CopyIntoLocationCommand(_, PaimonCatalogAndIdentifier(catalog, ident), _, _) => + CopyIntoLocationExec( + spark, + catalog, + ident, + c.targetPath, + c.fileFormat, + c.overwrite, + c.output) :: Nil + case _ => Nil } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala index 0398df809f00..67f72d953a3c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala @@ -122,7 +122,7 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf .replaceAll("/\\*.*?\\*/", " ") .replaceAll("`", "") .trim() - isPaimonProcedure(normalized) || isTagRefDdl(normalized) + isPaimonProcedure(normalized) || isTagRefDdl(normalized) || isCopyInto(normalized) } // All builtin paimon procedures are under the 'sys' namespace @@ -140,6 +140,10 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf normalized.contains("delete tag"))) } + private def isCopyInto(normalized: String): Boolean = { + normalized.startsWith("copy into") + } + protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = { val lexer = new PaimonSqlExtensionsLexer( new UpperCaseCharStream(CharStreams.fromString(command))) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index a1289a5f0b50..dd3f3c4d15a4 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser import org.apache.spark.sql.catalyst.plans.logical._ import scala.collection.JavaConverters._ +import scala.collection.mutable /* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -153,6 +154,101 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) ctx.identifier(1).getText) } + /** Create a COPY INTO TABLE (import) logical command. */ + override def visitCopyIntoTable(ctx: CopyIntoTableContext): logical.CopyIntoTableCommand = + withOrigin(ctx) { + val table = typedVisit[Seq[String]](ctx.multipartIdentifier) + val columns = Option(ctx.columnList()).map(_.identifier().asScala.map(_.getText).toSeq) + val sourcePath = unquoteString(ctx.sourcePath.getText) + val fileFormat = buildFileFormat(ctx.fileFormatClause()) + val pattern = Option(ctx.patternClause()).map(p => unquoteString(p.STRING().getText)) + val force = Option(ctx.forceClause()).exists(_.booleanValue().TRUE() != null) + logical.CopyIntoTableCommand(table, columns, sourcePath, fileFormat, pattern, force) + } + + /** Create a COPY INTO LOCATION (export) logical command. */ + override def visitCopyIntoLocation( + ctx: CopyIntoLocationContext): logical.CopyIntoLocationCommand = withOrigin(ctx) { + val targetPath = unquoteString(ctx.targetPath.getText) + val table = typedVisit[Seq[String]](ctx.multipartIdentifier) + val fileFormat = buildFileFormat(ctx.fileFormatClause()) + val overwrite = Option(ctx.overwriteClause()).exists(_.booleanValue().TRUE() != null) + logical.CopyIntoLocationCommand(targetPath, table, fileFormat, overwrite) + } + + private def buildFileFormat(ctx: FileFormatClauseContext): CopyFileFormat = { + val opts = ctx.fileFormatOption().asScala.toSeq + val seen = mutable.Set[String]() + val optionsBuilder = mutable.LinkedHashMap[String, String]() + + opts.foreach { + opt => + val key = opt.key.getText.toUpperCase + if (!seen.add(key)) { + throw new IllegalArgumentException(s"Duplicate FILE_FORMAT option: $key") + } + val value = extractFormatValue(opt.fileFormatValue()) + optionsBuilder(key) = value + } + + val typeValue = optionsBuilder.remove("TYPE") + if (typeValue.isEmpty) { + throw new IllegalArgumentException("FILE_FORMAT must include TYPE") + } + + val formatType = CopyFileFormat.parseFormatType(typeValue.get) + + CopyFileFormat(formatType, optionsBuilder.toMap) + } + + private def extractFormatValue(ctx: FileFormatValueContext): String = { + ctx match { + case c: StringFormatValueContext => + unquoteString(c.STRING().getText) + case c: IdentFormatValueContext => + c.identifier().getText + case c: BoolFormatValueContext => + if (c.booleanValue().TRUE() != null) "TRUE" else "FALSE" + case c: IntFormatValueContext => + c.INTEGER_VALUE().getText + case c: ListFormatValueContext => + c.STRING() + .asScala + .map(s => unquoteString(s.getText)) + .mkString(CopyFileFormat.LIST_SEPARATOR) + } + } + + private def unquoteString(s: String): String = { + if (s == null || s.length < 2) return s + val first = s.charAt(0) + if ((first == '\'' || first == '"') && s.charAt(s.length - 1) == first) { + val inner = s.substring(1, s.length - 1) + val sb = new StringBuilder + var i = 0 + while (i < inner.length) { + val c = inner.charAt(i) + if (c == '\\' && i + 1 < inner.length) { + inner.charAt(i + 1) match { + case 'n' => sb.append('\n'); i += 2 + case 't' => sb.append('\t'); i += 2 + case 'r' => sb.append('\r'); i += 2 + case '\\' => sb.append('\\'); i += 2 + case q if q == first => sb.append(q); i += 2 + case other => sb.append('\\'); sb.append(other); i += 2 + } + } else if (c == first && i + 1 < inner.length && inner.charAt(i + 1) == first) { + sb.append(first); i += 2 + } else { + sb.append(c); i += 1 + } + } + sb.toString() + } else { + s + } + } + private def toBuffer[T](list: java.util.List[T]) = list.asScala private def toSeq[T](list: java.util.List[T]) = toBuffer(list) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala new file mode 100644 index 000000000000..f3fb75a8c6d2 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row + +import java.io.{File, PrintWriter} +import java.nio.file.Files + +class CopyIntoTestBase extends PaimonSparkTestBase { + + private def createCsvFile(dir: File, name: String, content: String): File = { + val file = new File(dir, name) + val writer = new PrintWriter(file) + try writer.write(content) + finally writer.close() + file + } + + private def withCsvDir(testBody: File => Unit): Unit = { + val dir = Files.createTempDirectory("copy_into_test").toFile + try testBody(dir) + finally deleteRecursively(dir) + } + + private def deleteRecursively(file: File): Unit = { + if (file.isDirectory) { + file.listFiles().foreach(deleteRecursively) + } + file.delete() + } + + test("COPY INTO: basic CSV import") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_basic") + spark.sql(s"CREATE TABLE $dbName0.copy_basic (id INT, name STRING, age INT)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice,30\n2,Bob,25\n") + + val result = spark.sql(s"""COPY INTO $dbName0.copy_basic + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + assert(result.collect().length > 0) + val row = result.collect()(0) + assert(row.getString(1) == "LOADED") + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_basic ORDER BY id"), + Seq(Row(1, "Alice", 30), Row(2, "Bob", 25))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_basic") + } + + test("COPY INTO: CSV with custom delimiter") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_delim") + spark.sql(s"CREATE TABLE $dbName0.copy_delim (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1|Alice\n2|Bob\n") + + spark.sql(s"""COPY INTO $dbName0.copy_delim + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV, FIELD_DELIMITER = '|') + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_delim ORDER BY id"), + Seq(Row(1, "Alice"), Row(2, "Bob"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_delim") + } + + test("COPY INTO: CSV with SKIP_HEADER") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_header") + spark.sql(s"CREATE TABLE $dbName0.copy_header (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "id,name\n1,Alice\n2,Bob\n") + + spark.sql(s"""COPY INTO $dbName0.copy_header + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_header ORDER BY id"), + Seq(Row(1, "Alice"), Row(2, "Bob"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_header") + } + + test("COPY INTO: CSV with quote and escape") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_quote") + spark.sql(s"CREATE TABLE $dbName0.copy_quote (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,\"Alice \"\"the great\"\"\"\n2,\"Bob\"\n") + + spark.sql(s"""COPY INTO $dbName0.copy_quote + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV, QUOTE = '"', ESCAPE = '"') + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_quote ORDER BY id"), + Seq(Row(1, "Alice \"the great\""), Row(2, "Bob"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_quote") + } + + test("COPY INTO: NULL_IF with multiple values") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_null") + spark.sql(s"CREATE TABLE $dbName0.copy_null (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,NULL\n2,\\N\n3,Alice\n") + + spark.sql(s"""COPY INTO $dbName0.copy_null + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV, NULL_IF = ('NULL', '\\N')) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_null ORDER BY id"), + Seq(Row(1, null), Row(2, null), Row(3, "Alice"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_null") + } + + test("COPY INTO: EMPTY_FIELD_AS_NULL") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_empty") + spark.sql(s"CREATE TABLE $dbName0.copy_empty (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,\n2,Alice\n") + + spark.sql(s"""COPY INTO $dbName0.copy_empty + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV, EMPTY_FIELD_AS_NULL = TRUE) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_empty ORDER BY id"), + Seq(Row(1, null), Row(2, "Alice"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_empty") + } + + test("COPY INTO: PATTERN filters files") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_pattern") + spark.sql(s"CREATE TABLE $dbName0.copy_pattern (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data1.csv", "1,Alice\n") + createCsvFile(dir, "data2.csv", "2,Bob\n") + createCsvFile(dir, "ignore.txt", "3,Charlie\n") + + spark.sql(s"""COPY INTO $dbName0.copy_pattern + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |PATTERN = '.*\\.csv' + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_pattern ORDER BY id"), + Seq(Row(1, "Alice"), Row(2, "Bob"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_pattern") + } + + test("COPY INTO: explicit column list") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_cols") + spark.sql(s"CREATE TABLE $dbName0.copy_cols (id INT, name STRING, age INT)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n") + + spark.sql(s"""COPY INTO $dbName0.copy_cols (id, name) + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_cols ORDER BY id"), + Seq(Row(1, "Alice", null), Row(2, "Bob", null))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_cols") + } + + test("COPY INTO: unsupported TYPE errors at validation time") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unsup") + spark.sql(s"CREATE TABLE $dbName0.copy_unsup (id INT)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1\n") + + val e = intercept[IllegalArgumentException] { + spark.sql(s"""COPY INTO $dbName0.copy_unsup + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = PARQUET) + |""".stripMargin) + } + assert(e.getMessage.contains("Unsupported file format type")) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unsup") + } + + test("COPY INTO: missing TYPE errors") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_notype") + spark.sql(s"CREATE TABLE $dbName0.copy_notype (id INT)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1\n") + + val e = intercept[IllegalArgumentException] { + spark.sql(s"""COPY INTO $dbName0.copy_notype + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (FIELD_DELIMITER = ',') + |""".stripMargin) + } + assert(e.getMessage.contains("FILE_FORMAT must include TYPE")) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_notype") + } + + test("COPY INTO: duplicate option key errors") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup") + spark.sql(s"CREATE TABLE $dbName0.copy_dup (id INT)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1\n") + + val e = intercept[IllegalArgumentException] { + spark.sql(s"""COPY INTO $dbName0.copy_dup + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV, TYPE = CSV) + |""".stripMargin) + } + assert(e.getMessage.contains("Duplicate FILE_FORMAT option")) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup") + } + + test("COPY INTO: export table to CSV") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export") + spark.sql(s"CREATE TABLE $dbName0.copy_export (id INT, name STRING)") + spark.sql(s"INSERT INTO $dbName0.copy_export VALUES (1, 'Alice'), (2, 'Bob')") + + withCsvDir { + dir => + val outputPath = new File(dir, "output").getAbsolutePath + + val result = spark.sql(s"""COPY INTO '$outputPath' + |FROM $dbName0.copy_export + |FILE_FORMAT = (TYPE = CSV, HEADER = TRUE) + |""".stripMargin) + + val row = result.collect()(0) + assert(row.getString(0) == outputPath) + assert(row.getLong(2) == 2L) + + // Verify exported data + val exported = spark.read.option("header", "true").csv(outputPath) + assert(exported.count() == 2) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export") + } + + test("COPY INTO: export OVERWRITE=FALSE fails on existing path") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_fail") + spark.sql(s"CREATE TABLE $dbName0.copy_export_fail (id INT)") + spark.sql(s"INSERT INTO $dbName0.copy_export_fail VALUES (1)") + + withCsvDir { + dir => + createCsvFile(dir, "existing.csv", "data\n") + + intercept[Exception] { + spark.sql(s"""COPY INTO '${dir.getAbsolutePath}' + |FROM $dbName0.copy_export_fail + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + } + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_fail") + } + + test("COPY INTO: export OVERWRITE=TRUE succeeds") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_ow") + spark.sql(s"CREATE TABLE $dbName0.copy_export_ow (id INT, name STRING)") + spark.sql(s"INSERT INTO $dbName0.copy_export_ow VALUES (1, 'Alice')") + + withCsvDir { + dir => + createCsvFile(dir, "existing.csv", "old data\n") + + val result = spark.sql(s"""COPY INTO '${dir.getAbsolutePath}' + |FROM $dbName0.copy_export_ow + |FILE_FORMAT = (TYPE = CSV) + |OVERWRITE = TRUE + |""".stripMargin) + + assert(result.collect()(0).getLong(2) == 1L) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_ow") + } + + test("COPY INTO: FORCE=FALSE skips already-loaded files") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force") + spark.sql(s"CREATE TABLE $dbName0.copy_force (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n") + + // First load + spark.sql(s"""COPY INTO $dbName0.copy_force + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + // Second load with FORCE=FALSE (default) should skip + val result = spark.sql(s"""COPY INTO $dbName0.copy_force + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |FORCE = FALSE + |""".stripMargin) + + val rows = result.collect() + assert(rows.length == 1) + assert(rows(0).getString(1) == "SKIPPED") + + // Table should still have only original 2 rows + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_force ORDER BY id"), + Seq(Row(1, "Alice"), Row(2, "Bob"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force") + } + + test("COPY INTO: FORCE=TRUE reloads already-loaded files") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force_true") + spark.sql(s"CREATE TABLE $dbName0.copy_force_true (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice\n") + + // First load + spark.sql(s"""COPY INTO $dbName0.copy_force_true + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + // Second load with FORCE=TRUE should re-import + val result = spark.sql(s"""COPY INTO $dbName0.copy_force_true + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |FORCE = TRUE + |""".stripMargin) + + val rows = result.collect() + assert(rows.length == 1) + assert(rows(0).getString(1) == "LOADED") + + // Table should have duplicated data (2 rows total) + assert(spark.sql(s"SELECT * FROM $dbName0.copy_force_true").count() == 2) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force_true") + } + + test("COPY INTO: bad numeric cast fails with ABORT_STATEMENT") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_badcast") + spark.sql(s"CREATE TABLE $dbName0.copy_badcast (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "abc,Alice\n") + + val e = intercept[Exception] { + spark.sql(s"""COPY INTO $dbName0.copy_badcast + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + } + val msg = e.getMessage + assert( + msg.contains("Cast failure") || + msg.contains("ABORT_STATEMENT") || + msg.contains("CAST_INVALID_INPUT") || + msg.contains("cannot be cast to") || + e.getCause != null) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_badcast") + } + + test("COPY INTO: no namespace table works") { + spark.sql(s"DROP TABLE IF EXISTS copy_no_ns") + spark.sql(s"CREATE TABLE copy_no_ns (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice\n") + + spark.sql(s"""COPY INTO copy_no_ns + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + checkAnswer(spark.sql(s"SELECT * FROM copy_no_ns"), Seq(Row(1, "Alice"))) + } + + spark.sql(s"DROP TABLE IF EXISTS copy_no_ns") + } + + test("COPY INTO: lowercase options are accepted") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_lcase") + spark.sql(s"CREATE TABLE $dbName0.copy_lcase (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1|Alice\n") + + spark.sql(s"""COPY INTO $dbName0.copy_lcase + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (type = csv, field_delimiter = '|') + |""".stripMargin) + + checkAnswer(spark.sql(s"SELECT * FROM $dbName0.copy_lcase"), Seq(Row(1, "Alice"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_lcase") + } + + test("COPY INTO: unknown option key errors") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unknown_opt") + spark.sql(s"CREATE TABLE $dbName0.copy_unknown_opt (id INT)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1\n") + + val e = intercept[Exception] { + spark.sql(s"""COPY INTO $dbName0.copy_unknown_opt + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV, BOGUS_OPTION = TRUE) + |""".stripMargin) + } + assert(e.getMessage.contains("Unsupported FILE_FORMAT options")) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unknown_opt") + } + + test("COPY INTO: SKIP_HEADER > 1 errors") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skip2") + spark.sql(s"CREATE TABLE $dbName0.copy_skip2 (id INT)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "h1\nh2\n1\n") + + val e = intercept[Exception] { + spark.sql(s"""COPY INTO $dbName0.copy_skip2 + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 2) + |""".stripMargin) + } + assert(e.getMessage.contains("SKIP_HEADER supports only 0 or 1")) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skip2") + } + + test("COPY INTO: export rejects import-only options") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_bad") + spark.sql(s"CREATE TABLE $dbName0.copy_export_bad (id INT)") + spark.sql(s"INSERT INTO $dbName0.copy_export_bad VALUES (1)") + + withCsvDir { + dir => + val outputPath = new File(dir, "out").getAbsolutePath + val e = intercept[Exception] { + spark.sql(s"""COPY INTO '$outputPath' + |FROM $dbName0.copy_export_bad + |FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1) + |""".stripMargin) + } + assert(e.getMessage.contains("Unsupported FILE_FORMAT options for export")) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_bad") + } + + test("COPY INTO: explicit column list with default value column") { + assume(gteqSpark3_4) + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_default") + spark.sql( + s"CREATE TABLE $dbName0.copy_default (id INT, name STRING, status STRING DEFAULT 'active')") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n") + + spark.sql(s"""COPY INTO $dbName0.copy_default (id, name) + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM $dbName0.copy_default ORDER BY id"), + Seq(Row(1, "Alice", "active"), Row(2, "Bob", "active"))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_default") + } + + test("COPY INTO: too many CSV columns fails") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toomany") + spark.sql(s"CREATE TABLE $dbName0.copy_toomany (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice,extra,columns\n") + + intercept[Exception] { + spark.sql(s"""COPY INTO $dbName0.copy_toomany + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + } + assert(spark.sql(s"SELECT * FROM $dbName0.copy_toomany").count() == 0) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toomany") + } + + test("COPY INTO: too few CSV columns fails") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toofew") + spark.sql(s"CREATE TABLE $dbName0.copy_toofew (id INT, name STRING, age INT)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice\n") + + intercept[Exception] { + spark.sql(s"""COPY INTO $dbName0.copy_toofew + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + } + assert(spark.sql(s"SELECT * FROM $dbName0.copy_toofew").count() == 0) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toofew") + } + + test("COPY INTO: rows_loaded count is accurate") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_count") + spark.sql(s"CREATE TABLE $dbName0.copy_count (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n3,Charlie\n") + + val result = spark.sql(s"""COPY INTO $dbName0.copy_count + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + val rows = result.collect() + assert(rows.length == 1) + assert(rows(0).getString(1) == "LOADED") + assert(rows(0).getLong(2) == 3L) + assert(rows(0).getLong(3) == 3L) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_count") + } + + test("COPY INTO: duplicate column list errors") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup_col") + spark.sql(s"CREATE TABLE $dbName0.copy_dup_col (id INT, name STRING)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice\n") + + val e = intercept[IllegalArgumentException] { + spark.sql(s"""COPY INTO $dbName0.copy_dup_col (id, id) + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + } + assert(e.getMessage.contains("Duplicate columns")) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup_col") + } + + test("COPY INTO: case-insensitive column matching") { + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case") + spark.sql(s"CREATE TABLE $dbName0.copy_case (id INT, name STRING, age INT)") + + withCsvDir { + dir => + createCsvFile(dir, "data.csv", "1,Alice\n") + + spark.sql(s"""COPY INTO $dbName0.copy_case (ID, NAME) + |FROM '${dir.getAbsolutePath}' + |FILE_FORMAT = (TYPE = CSV) + |""".stripMargin) + + checkAnswer(spark.sql(s"SELECT * FROM $dbName0.copy_case"), Seq(Row(1, "Alice", null))) + } + + spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case") + } +}