diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala index 0398df809f00..c9645bb8c950 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala @@ -28,8 +28,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, PaimonSparkSession, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} -import org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser.{NonReservedContext, QuotedIdentifierContext} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser.{CreateTableLikeContext, MultipartIdentifierContext, NonReservedContext, QuotedIdentifierContext} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.VariableSubstitution @@ -65,17 +65,29 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf if (isPaimonCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement())) .asInstanceOf[LogicalPlan] + } else if (isCatalogCreateTableLike(sqlTextAfterSubstitution)) { + applyParserRules( + parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement())) + .asInstanceOf[LogicalPlan]) } else { - var plan = delegate.parsePlan(sqlText) - val sparkSession = PaimonSparkSession.active - parserRules(sparkSession).foreach( - rule => { - plan = rule.apply(plan) - }) - plan + parsePlanWithDelegate(sqlText) } } + private def parsePlanWithDelegate(sqlText: String): LogicalPlan = { + applyParserRules(delegate.parsePlan(sqlText)) + } + + private def applyParserRules(plan: LogicalPlan): LogicalPlan = { + var rewrittenPlan = plan + val sparkSession = PaimonSparkSession.active + parserRules(sparkSession).foreach( + rule => { + rewrittenPlan = rule.apply(rewrittenPlan) + }) + rewrittenPlan + } + private def parserRules(sparkSession: SparkSession): Seq[Rule[LogicalPlan]] = { Seq( RewritePaimonViewCommands(sparkSession), @@ -140,6 +152,66 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf normalized.contains("delete tag"))) } + private def isCatalogCreateTableLike(sqlText: String): Boolean = { + if (org.apache.spark.SPARK_VERSION < "3.4") { + return false + } + + tokenStream(sqlText) match { + case Some(tokens) if maybeCreateTableLike(tokens) => + isParsedCatalogCreateTableLike(sqlText) + case _ => false + } + } + + private def tokenStream(sqlText: String): Option[CommonTokenStream] = { + try { + val lexer = new PaimonSqlExtensionsLexer( + new UpperCaseCharStream(CharStreams.fromString(sqlText))) + lexer.removeErrorListeners() + lexer.addErrorListener(PaimonParseErrorListener) + + val tokens = new CommonTokenStream(lexer) + tokens.fill() + Some(tokens) + } catch { + case _: PaimonParseException => None + } + } + + private def maybeCreateTableLike(tokenStream: CommonTokenStream): Boolean = { + val tokens = tokenStream.getTokens.asScala + .filter(token => token.getChannel == Token.DEFAULT_CHANNEL) + .filterNot(token => token.getType == Token.EOF) + + tokens.length >= 5 && + tokens.head.getType == PaimonSqlExtensionsParser.CREATE && + tokens(1).getType == PaimonSqlExtensionsParser.TABLE && + tokens.exists(_.getType == PaimonSqlExtensionsParser.LIKE) && + tokens.exists(_.getText == ".") + } + + private def isParsedCatalogCreateTableLike(sqlText: String): Boolean = { + try { + parse(sqlText) { + parser => + val singleStatement = parser.singleStatement() + singleStatement.statement() match { + case ctx: CreateTableLikeContext + if isCatalogIdentifier(ctx.target) || isCatalogIdentifier(ctx.source) => + true + case _ => false + } + } + } catch { + case _: PaimonParseException => false + } + } + + private def isCatalogIdentifier(identifier: MultipartIdentifierContext): Boolean = { + identifier.parts.size() == 3 + } + protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = { val lexer = new PaimonSqlExtensionsLexer( new UpperCaseCharStream(CharStreams.fromString(command))) diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index 207d9732160f..3c67f49906c5 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -74,8 +74,67 @@ statement | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag | ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag | ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag + | CREATE TABLE (IF NOT EXISTS)? target=multipartIdentifier + LIKE source=multipartIdentifier + (tableProvider | + rowFormat | + createFileFormat | + locationSpec | + (TBLPROPERTIES tableProps=propertyList))* #createTableLike ; +tableProvider + : USING multipartIdentifier + ; + +locationSpec + : LOCATION stringLit + ; + +propertyList + : '(' property (',' property)* ')' + ; + +property + : key=propertyKey ('='? value=propertyValue)? + ; + +propertyKey + : identifier ('.' identifier)* + | stringLit + ; + +propertyValue + : INTEGER_VALUE + | DECIMAL_VALUE + | booleanValue + | stringLit + ; + +createFileFormat + : STORED AS fileFormat + | STORED BY storageHandler + ; + +fileFormat + : INPUTFORMAT inFmt=stringLit OUTPUTFORMAT outFmt=stringLit #tableFileFormat + | identifier #genericFileFormat + ; + +storageHandler + : stringLit (WITH SERDEPROPERTIES propertyList)? + ; + +rowFormat + : ROW FORMAT SERDE name=stringLit (WITH SERDEPROPERTIES props=propertyList)? #rowFormatSerde + | ROW FORMAT DELIMITED + (FIELDS TERMINATED BY fieldsTerminatedBy=stringLit (ESCAPED BY escapedBy=stringLit)?)? + (COLLECTION ITEMS TERMINATED BY collectionItemsTerminatedBy=stringLit)? + (MAP KEYS TERMINATED BY keysTerminatedBy=stringLit)? + (LINES TERMINATED BY linesSeparatedBy=stringLit)? + (NULL DEFINED AS nullDefinedAs=stringLit)? #rowFormatDelimited + ; + callArgument : expression #positionalArgument | identifier '=>' expression #namedArgument @@ -124,6 +183,10 @@ booleanValue : TRUE | FALSE ; +stringLit + : STRING+ + ; + number : MINUS? EXPONENT_VALUE #exponentLiteral | MINUS? DECIMAL_VALUE #decimalLiteral @@ -151,34 +214,60 @@ quotedIdentifier ; nonReserved - : ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | OR | TABLE - | REPLACE | RETAIN | VERSION | TAG + : ALTER | AS | BY | CALL | COLLECTION | CREATE | DAYS | DEFINED | DELETE | DELIMITED + | ESCAPED | EXISTS | FIELDS | FORMAT | HOURS | IF | INPUTFORMAT | ITEMS | KEYS | LIKE + | LINES | LOCATION | NOT | NULL | OF | OR | OUTPUTFORMAT | ROW | SERDE | SERDEPROPERTIES + | STORED | TABLE | TBLPROPERTIES | TERMINATED | REPLACE | RETAIN | USING | VERSION | TAG + | WITH | TRUE | FALSE | MAP ; ALTER: 'ALTER'; AS: 'AS'; +BY: 'BY'; CALL: 'CALL'; +COLLECTION: 'COLLECTION'; CREATE: 'CREATE'; DAYS: 'DAYS'; +DEFINED: 'DEFINED'; DELETE: 'DELETE'; +DELIMITED: 'DELIMITED'; +ESCAPED: 'ESCAPED'; EXISTS: 'EXISTS'; +FIELDS: 'FIELDS'; +FORMAT: 'FORMAT'; HOURS: 'HOURS'; IF : 'IF'; +INPUTFORMAT: 'INPUTFORMAT'; +ITEMS: 'ITEMS'; +KEYS: 'KEYS'; +LIKE: 'LIKE'; +LINES: 'LINES'; +LOCATION: 'LOCATION'; MINUTES: 'MINUTES'; NOT: 'NOT'; +NULL: 'NULL'; OF: 'OF'; OR: 'OR'; +OUTPUTFORMAT: 'OUTPUTFORMAT'; RENAME: 'RENAME'; REPLACE: 'REPLACE'; RETAIN: 'RETAIN'; +ROW: 'ROW'; +SERDE: 'SERDE'; +SERDEPROPERTIES: 'SERDEPROPERTIES'; SHOW: 'SHOW'; +STORED: 'STORED'; TABLE: 'TABLE'; TAG: 'TAG'; TAGS: 'TAGS'; +TBLPROPERTIES: 'TBLPROPERTIES'; +TERMINATED: 'TERMINATED'; TO: 'TO'; +USING: 'USING'; VERSION: 'VERSION'; +WITH: 'WITH'; TRUE: 'TRUE'; FALSE: 'FALSE'; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala index 0398df809f00..c9645bb8c950 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala @@ -28,8 +28,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, PaimonSparkSession, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} -import org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser.{NonReservedContext, QuotedIdentifierContext} +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser.{CreateTableLikeContext, MultipartIdentifierContext, NonReservedContext, QuotedIdentifierContext} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.VariableSubstitution @@ -65,17 +65,29 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf if (isPaimonCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement())) .asInstanceOf[LogicalPlan] + } else if (isCatalogCreateTableLike(sqlTextAfterSubstitution)) { + applyParserRules( + parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement())) + .asInstanceOf[LogicalPlan]) } else { - var plan = delegate.parsePlan(sqlText) - val sparkSession = PaimonSparkSession.active - parserRules(sparkSession).foreach( - rule => { - plan = rule.apply(plan) - }) - plan + parsePlanWithDelegate(sqlText) } } + private def parsePlanWithDelegate(sqlText: String): LogicalPlan = { + applyParserRules(delegate.parsePlan(sqlText)) + } + + private def applyParserRules(plan: LogicalPlan): LogicalPlan = { + var rewrittenPlan = plan + val sparkSession = PaimonSparkSession.active + parserRules(sparkSession).foreach( + rule => { + rewrittenPlan = rule.apply(rewrittenPlan) + }) + rewrittenPlan + } + private def parserRules(sparkSession: SparkSession): Seq[Rule[LogicalPlan]] = { Seq( RewritePaimonViewCommands(sparkSession), @@ -140,6 +152,66 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf normalized.contains("delete tag"))) } + private def isCatalogCreateTableLike(sqlText: String): Boolean = { + if (org.apache.spark.SPARK_VERSION < "3.4") { + return false + } + + tokenStream(sqlText) match { + case Some(tokens) if maybeCreateTableLike(tokens) => + isParsedCatalogCreateTableLike(sqlText) + case _ => false + } + } + + private def tokenStream(sqlText: String): Option[CommonTokenStream] = { + try { + val lexer = new PaimonSqlExtensionsLexer( + new UpperCaseCharStream(CharStreams.fromString(sqlText))) + lexer.removeErrorListeners() + lexer.addErrorListener(PaimonParseErrorListener) + + val tokens = new CommonTokenStream(lexer) + tokens.fill() + Some(tokens) + } catch { + case _: PaimonParseException => None + } + } + + private def maybeCreateTableLike(tokenStream: CommonTokenStream): Boolean = { + val tokens = tokenStream.getTokens.asScala + .filter(token => token.getChannel == Token.DEFAULT_CHANNEL) + .filterNot(token => token.getType == Token.EOF) + + tokens.length >= 5 && + tokens.head.getType == PaimonSqlExtensionsParser.CREATE && + tokens(1).getType == PaimonSqlExtensionsParser.TABLE && + tokens.exists(_.getType == PaimonSqlExtensionsParser.LIKE) && + tokens.exists(_.getText == ".") + } + + private def isParsedCatalogCreateTableLike(sqlText: String): Boolean = { + try { + parse(sqlText) { + parser => + val singleStatement = parser.singleStatement() + singleStatement.statement() match { + case ctx: CreateTableLikeContext + if isCatalogIdentifier(ctx.target) || isCatalogIdentifier(ctx.source) => + true + case _ => false + } + } + } catch { + case _: PaimonParseException => false + } + } + + private def isCatalogIdentifier(identifier: MultipartIdentifierContext): Boolean = { + identifier.parts.size() == 3 + } + protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = { val lexer = new PaimonSqlExtensionsLexer( new UpperCaseCharStream(CharStreams.fromString(command))) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index a1289a5f0b50..556b4d5d1a30 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -26,11 +26,13 @@ import org.antlr.v4.runtime._ import org.antlr.v4.runtime.misc.Interval import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode} import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.extensions.PaimonParserUtils.withOrigin import org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.command.{CreateTableLikeCommand => SparkCreateTableLikeCommand} import scala.collection.JavaConverters._ @@ -98,6 +100,13 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) ShowTagsCommand(typedVisit[Seq[String]](ctx.multipartIdentifier)) } + /** Create a CREATE TABLE LIKE logical command. */ + override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan = withOrigin(ctx) { + sparkCreateTableLikeCommand(ctx).copy( + targetTable = toTableIdentifier(typedVisit[Seq[String]](ctx.target)), + sourceTable = toTableIdentifier(typedVisit[Seq[String]](ctx.source))) + } + /** Create a CREATE OR REPLACE TAG logical command. */ override def visitCreateOrReplaceTag(ctx: CreateOrReplaceTagContext): CreateOrReplaceTagCommand = withOrigin(ctx) { @@ -157,6 +166,46 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) private def toSeq[T](list: java.util.List[T]) = toBuffer(list) + private def toTableIdentifier(identifier: Seq[String]): TableIdentifier = { + identifier match { + case Seq(table) => + TableIdentifier(table) + case Seq(database, table) => + TableIdentifier(table, Some(database)) + case parts => + TableIdentifier( + parts.last, + Some(parts.slice(1, parts.length - 1).mkString(".")), + Some(parts.head)) + } + } + + private def sparkCreateTableLikeCommand( + ctx: CreateTableLikeContext): SparkCreateTableLikeCommand = { + delegate.parsePlan(createSparkCreateTableLikeSql(ctx)) match { + case command: SparkCreateTableLikeCommand => command + case plan => + throw new UnsupportedOperationException( + s"Expected Spark CREATE TABLE LIKE command, but got ${plan.nodeName}.") + } + } + + private def createSparkCreateTableLikeSql(ctx: CreateTableLikeContext): String = { + val ifNotExists = if (ctx.EXISTS() != null) " IF NOT EXISTS" else "" + s"CREATE TABLE$ifNotExists __paimon_create_like_target " + + s"LIKE __paimon_create_like_source${createTableLikeClausesText(ctx)}" + } + + private def createTableLikeClausesText(ctx: CreateTableLikeContext): String = { + val start = ctx.source.getStop.getStopIndex + 1 + val stop = ctx.getStop.getStopIndex + if (start <= stop) { + ctx.getStart.getInputStream.getText(Interval.of(start, stop)) + } else { + "" + } + } + private def reconstructSqlString(ctx: ParserRuleContext): String = { toBuffer(ctx.children) .map { diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala new file mode 100644 index 000000000000..3d6200a3e1ea --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.junit.jupiter.api.Assertions + +import scala.collection.JavaConverters._ + +class CatalogQualifiedCreateTableLikeTest extends PaimonSparkTestBase { + + test("Create table like with catalog-qualified identifiers") { + assume(gteqSpark3_4) + withTable("source_tbl", "target_tbl", "target_from_qualified_source", "qualified_target") { + createSourceTable() + + sql(s"CREATE TABLE paimon.$dbName0.target_tbl LIKE paimon.$dbName0.source_tbl") + assertCreatedLike("target_tbl") + + sql(s"CREATE TABLE target_from_qualified_source LIKE paimon.$dbName0.source_tbl") + assertCreatedLike("target_from_qualified_source") + + sql(s"CREATE TABLE paimon.$dbName0.qualified_target LIKE source_tbl") + assertCreatedLike("qualified_target") + } + } + + test("Create table like if not exists with catalog-qualified identifiers") { + assume(gteqSpark3_4) + withTable("source_tbl", "target_tbl") { + createSourceTable() + sql(""" + |CREATE TABLE target_tbl ( + | id BIGINT, + | pt STRING + |) COMMENT 'target comment' + |PARTITIONED BY (pt) + |TBLPROPERTIES ( + | 'primary-key' = 'id,pt', + | 'bucket' = '3' + |) + |""".stripMargin) + + val targetSchema = spark.table("target_tbl").schema + val targetLocation = loadTable("target_tbl").location().toString + + sql(s""" + |CREATE TABLE IF NOT EXISTS paimon.$dbName0.target_tbl + |LIKE paimon.$dbName0.source_tbl + |""".stripMargin) + + val target = loadTable("target_tbl") + Assertions.assertEquals(targetSchema, spark.table("target_tbl").schema) + Assertions.assertFalse(spark.table("target_tbl").schema.fieldNames.contains("name")) + Assertions.assertEquals("target comment", target.comment().get()) + Assertions.assertEquals("3", target.options().get("bucket")) + Assertions.assertEquals(targetLocation, target.location().toString) + } + } + + test("Create table like clauses with catalog-qualified identifiers") { + assume(gteqSpark3_4) + withTable("source_tbl", "target_tbl") { + createSourceTable() + + sql(s""" + |CREATE TABLE paimon.$dbName0.target_tbl + |LIKE paimon.$dbName0.source_tbl + |USING paimon + |TBLPROPERTIES ( + | 'bucket' = '8', + | 'target-file-size' = '256MB' + |) + |""".stripMargin) + + val source = loadTable("source_tbl") + val target = loadTable("target_tbl") + Assertions.assertEquals(spark.table("source_tbl").schema, spark.table("target_tbl").schema) + Assertions.assertEquals("source comment", target.comment().get()) + Assertions.assertEquals(List("pt"), target.partitionKeys().asScala.toList) + Assertions.assertEquals(List("id", "pt"), target.primaryKeys().asScala.toList) + Assertions.assertEquals("8", target.options().get("bucket")) + Assertions.assertEquals("256MB", target.options().get("target-file-size")) + Assertions.assertNotEquals(source.location().toString, target.location().toString) + } + } + + test("Create table like stored as is unsupported with catalog-qualified identifiers") { + assume(gteqSpark3_4) + withTable("source_tbl", "target_tbl") { + sql("CREATE TABLE source_tbl (id INT)") + + val error = intercept[Exception] { + sql(s""" + |CREATE TABLE paimon.$dbName0.target_tbl + |LIKE paimon.$dbName0.source_tbl + |STORED AS PARQUET + |""".stripMargin) + }.getMessage + + Assertions.assertTrue( + error.contains("CREATE TABLE LIKE ... STORED AS is not supported for SparkCatalog.")) + } + } + + private def createSourceTable(): Unit = { + sql(""" + |CREATE TABLE source_tbl ( + | id BIGINT, + | name STRING COMMENT 'name column', + | pt STRING + |) COMMENT 'source comment' + |PARTITIONED BY (pt) + |TBLPROPERTIES ( + | 'primary-key' = 'id,pt', + | 'bucket' = '2', + | 'target-file-size' = '64MB' + |) + |""".stripMargin) + } + + private def assertCreatedLike(tableName: String): Unit = { + val target = loadTable(tableName) + + Assertions.assertEquals(spark.table("source_tbl").schema, spark.table(tableName).schema) + Assertions.assertEquals("source comment", target.comment().get()) + Assertions.assertEquals(List("pt"), target.partitionKeys().asScala.toList) + Assertions.assertEquals(List("id", "pt"), target.primaryKeys().asScala.toList) + Assertions.assertEquals("2", target.options().get("bucket")) + Assertions.assertEquals("64MB", target.options().get("target-file-size")) + } +}