Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions docs/docs/spark/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,158 @@ INSERT INTO t VALUES (1, '1'), (2, '2');
-- Need using `BY NAME` statement (requires Spark 3.5+)
INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c;
```

## COPY INTO

`COPY INTO` provides a SQL command for bulk loading CSV files into Paimon tables and writing table data to CSV files.

### CSV Import

```sql
COPY INTO table_name [(col1, col2, ...)]
FROM 'source_path'
FILE_FORMAT = (TYPE = CSV [, option = value, ...])
[PATTERN = 'regex']
[FORCE = TRUE|FALSE]
[ON_ERROR = ABORT_STATEMENT]
```

**Basic import:**

```sql
COPY INTO my_db.my_table
FROM '/data/csv_files/'
FILE_FORMAT = (TYPE = CSV);
```

**Import with explicit column mapping:**

```sql
-- Only load into specified columns; omitted columns use their DEFAULT value or NULL
COPY INTO my_db.users (id, name)
FROM '/data/new_users/'
FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1);
```

**Import with NULL_IF and PATTERN:**

```sql
COPY INTO my_db.events
FROM '/data/logs/'
FILE_FORMAT = (TYPE = CSV, FIELD_DELIMITER = '|', NULL_IF = ('NULL', '\\N', ''))
PATTERN = '.*\.csv'
FORCE = FALSE;
```

### Write CSV Files

```sql
COPY INTO 'target_path'
FROM table_name
FILE_FORMAT = (TYPE = CSV [, option = value, ...])
[OVERWRITE = TRUE|FALSE]
```

**Write with header and overwrite:**

```sql
COPY INTO '/export/users_backup/'
FROM my_db.users
FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER = ',')
OVERWRITE = TRUE;
```

### FILE_FORMAT Options

`FILE_FORMAT` is required and must include `TYPE = CSV`.

**Import options:**

| Option | Description | Default |
|--------|-------------|---------|
| TYPE | File format type. Must be `CSV`. | (required) |
| FIELD_DELIMITER | Column delimiter character. | `,` |
| SKIP_HEADER | Skip the first line as header. Only `0` or `1`. | `0` |
| QUOTE | Quote character for enclosing fields. | `"` |
| ESCAPE | Escape character within quoted fields. | `\` |
| NULL_IF | List of string values to interpret as NULL, e.g. `('NULL', '\\N')`. | (none) |
| EMPTY_FIELD_AS_NULL | Treat empty fields as NULL. `TRUE` or `FALSE`. | `FALSE` |
| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |

**Write options:**

| Option | Description | Default |
|--------|-------------|---------|
| TYPE | File format type. Must be `CSV`. | (required) |
| FIELD_DELIMITER | Column delimiter character. | `,` |
| HEADER | Write column names as the first line. `TRUE` or `FALSE`. | `FALSE` |
| QUOTE | Quote character for enclosing fields. | `"` |
| ESCAPE | Escape character within quoted fields. | `\` |
| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |

### Import Options

| Option | Description | Default |
|--------|-------------|---------|
| PATTERN | Regex to filter source files by base file name. Only matching files are loaded. | (all files) |
| FORCE | `FALSE`: skip files already loaded (idempotent). `TRUE`: reload all files. | `FALSE` |
| ON_ERROR | Error handling strategy. Only `ABORT_STATEMENT` is supported. | `ABORT_STATEMENT` |

### File Write Options

| Option | Description | Default |
|--------|-------------|---------|
| OVERWRITE | `FALSE`: fail if target path exists. `TRUE`: overwrite existing files. | `FALSE` |

### Column Mapping

When an explicit column list is provided (e.g., `COPY INTO t (col1, col2) FROM ...`):

- CSV columns are mapped **positionally** to the specified column list.
- The number of CSV columns must match the column list length.
- Columns not in the list are filled with their **DEFAULT value** (if defined in the table schema) or **NULL**.
- Non-nullable columns without a default value that are not in the list will cause an error.

When no column list is provided:

- CSV columns are mapped positionally to all writable columns in the target table.
- The number of CSV columns must match the number of writable columns.

### Repeated Imports

By default (`FORCE = FALSE`), COPY INTO tracks which files have been successfully loaded. A file is identified by its path, size, and last-modified timestamp.

- Re-running the same COPY INTO command will **skip** already-loaded files and return status `SKIPPED`.
- If a source file is modified (size or timestamp changes), it becomes eligible for re-loading.
- `FORCE = TRUE` bypasses load history and always re-imports all matching files.

### Result Output

**Import** returns one row per source file:

| Column | Type | Description |
|--------|------|-------------|
| file_name | STRING | Source file name |
| status | STRING | `LOADED` or `SKIPPED` |
| rows_loaded | BIGINT | Number of rows written |
| rows_parsed | BIGINT | Number of rows parsed from the file |

**File write** returns a single row:

| Column | Type | Description |
|--------|------|-------------|
| output_path | STRING | Target output path |
| file_count | INT | Number of files written |
| rows_written | BIGINT | Total rows written |

### Limitations

- Only **CSV** format is supported.
- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not supported.
- `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the entire command.
- `SINGLE = TRUE` (single-file output) is not supported.
- File format options must be specified inline in `FILE_FORMAT = (...)`.
- File listing is **non-recursive**: only direct files under the source path are processed. Subdirectories are ignored.
- `PATTERN` matches the **base file name** only (not the full path).
- Concurrent COPY INTO commands targeting the same table may produce duplicate data.
- `SKIP_HEADER` only supports values `0` or `1`.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class CopyIntoTest extends CopyIntoTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class CopyIntoTest extends CopyIntoTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class CopyIntoTest extends CopyIntoTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class CopyIntoTest extends CopyIntoTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf
.replaceAll("/\\*.*?\\*/", " ")
.replaceAll("`", "")
.trim()
isPaimonProcedure(normalized) || isTagRefDdl(normalized)
isPaimonProcedure(normalized) || isTagRefDdl(normalized) || isCopyInto(normalized)
}

// All builtin paimon procedures are under the 'sys' namespace
Expand All @@ -140,6 +140,10 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf
normalized.contains("delete tag")))
}

private def isCopyInto(normalized: String): Boolean = {
normalized.startsWith("copy into")
}

protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = {
val lexer = new PaimonSqlExtensionsLexer(
new UpperCaseCharStream(CharStreams.fromString(command)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class CopyIntoTest extends CopyIntoTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class CopyIntoTest extends CopyIntoTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ statement
| ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag
| ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag
| ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag
| COPY INTO multipartIdentifier ('(' columnList ')')?
FROM sourcePath=STRING
fileFormatClause
patternClause?
forceClause?
onErrorClause? #copyIntoTable
| COPY INTO targetPath=STRING
FROM multipartIdentifier
fileFormatClause
overwriteClause? #copyIntoLocation
;

callArgument
Expand Down Expand Up @@ -104,6 +114,42 @@ timeUnit
| MINUTES
;

columnList
: identifier (',' identifier)*
;

fileFormatClause
: FILE_FORMAT '=' '(' fileFormatOption (',' fileFormatOption)* ')'
;

fileFormatOption
: key=identifier '=' fileFormatValue
;

fileFormatValue
: STRING #stringFormatValue
| identifier #identFormatValue
| booleanValue #boolFormatValue
| INTEGER_VALUE #intFormatValue
| '(' STRING (',' STRING)* ')' #listFormatValue
;

patternClause
: PATTERN '=' STRING
;

forceClause
: FORCE '=' booleanValue
;

onErrorClause
: ON_ERROR '=' ABORT_STATEMENT
;

overwriteClause
: OVERWRITE '=' booleanValue
;

expression
: constant
| stringMap
Expand Down Expand Up @@ -155,6 +201,8 @@ nonReserved
| REPLACE | RETAIN | VERSION | TAG
| TRUE | FALSE
| MAP
| COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR | ABORT_STATEMENT | OVERWRITE
| CSV
;

ALTER: 'ALTER';
Expand Down Expand Up @@ -185,6 +233,17 @@ FALSE: 'FALSE';

MAP: 'MAP';

COPY: 'COPY';
INTO: 'INTO';
FROM: 'FROM';
FILE_FORMAT: 'FILE_FORMAT';
PATTERN: 'PATTERN';
FORCE: 'FORCE';
ON_ERROR: 'ON_ERROR';
ABORT_STATEMENT: 'ABORT_STATEMENT';
OVERWRITE: 'OVERWRITE';
CSV: 'CSV';

PLUS: '+';
MINUS: '-';

Expand Down
Loading
Loading