Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, PaimonSparkSession, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
import org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser.{NonReservedContext, QuotedIdentifierContext}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser.{CreateTableLikeContext, MultipartIdentifierContext, NonReservedContext, QuotedIdentifierContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.VariableSubstitution
Expand Down Expand Up @@ -65,17 +65,29 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf
if (isPaimonCommand(sqlTextAfterSubstitution)) {
parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement()))
.asInstanceOf[LogicalPlan]
} else if (isCatalogCreateTableLike(sqlTextAfterSubstitution)) {
applyParserRules(
parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement()))
.asInstanceOf[LogicalPlan])
} else {
var plan = delegate.parsePlan(sqlText)
val sparkSession = PaimonSparkSession.active
parserRules(sparkSession).foreach(
rule => {
plan = rule.apply(plan)
})
plan
parsePlanWithDelegate(sqlText)
}
}

private def parsePlanWithDelegate(sqlText: String): LogicalPlan = {
applyParserRules(delegate.parsePlan(sqlText))
}

private def applyParserRules(plan: LogicalPlan): LogicalPlan = {
var rewrittenPlan = plan
val sparkSession = PaimonSparkSession.active
parserRules(sparkSession).foreach(
rule => {
rewrittenPlan = rule.apply(rewrittenPlan)
})
rewrittenPlan
}

private def parserRules(sparkSession: SparkSession): Seq[Rule[LogicalPlan]] = {
Seq(
RewritePaimonViewCommands(sparkSession),
Expand Down Expand Up @@ -140,6 +152,66 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf
normalized.contains("delete tag")))
}

private def isCatalogCreateTableLike(sqlText: String): Boolean = {
if (org.apache.spark.SPARK_VERSION < "3.4") {
return false
}

tokenStream(sqlText) match {
case Some(tokens) if maybeCreateTableLike(tokens) =>
isParsedCatalogCreateTableLike(sqlText)
case _ => false
}
}

private def tokenStream(sqlText: String): Option[CommonTokenStream] = {
try {
val lexer = new PaimonSqlExtensionsLexer(
new UpperCaseCharStream(CharStreams.fromString(sqlText)))
lexer.removeErrorListeners()
lexer.addErrorListener(PaimonParseErrorListener)

val tokens = new CommonTokenStream(lexer)
tokens.fill()
Some(tokens)
} catch {
case _: PaimonParseException => None
}
}

private def maybeCreateTableLike(tokenStream: CommonTokenStream): Boolean = {
val tokens = tokenStream.getTokens.asScala
.filter(token => token.getChannel == Token.DEFAULT_CHANNEL)
.filterNot(token => token.getType == Token.EOF)

tokens.length >= 5 &&
tokens.head.getType == PaimonSqlExtensionsParser.CREATE &&
tokens(1).getType == PaimonSqlExtensionsParser.TABLE &&
tokens.exists(_.getType == PaimonSqlExtensionsParser.LIKE) &&
tokens.exists(_.getText == ".")
}

private def isParsedCatalogCreateTableLike(sqlText: String): Boolean = {
try {
parse(sqlText) {
parser =>
val singleStatement = parser.singleStatement()
singleStatement.statement() match {
case ctx: CreateTableLikeContext
if isCatalogIdentifier(ctx.target) || isCatalogIdentifier(ctx.source) =>
true
case _ => false
}
}
} catch {
case _: PaimonParseException => false
}
}

private def isCatalogIdentifier(identifier: MultipartIdentifierContext): Boolean = {
identifier.parts.size() == 3
}

protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = {
val lexer = new PaimonSqlExtensionsLexer(
new UpperCaseCharStream(CharStreams.fromString(command)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,67 @@ statement
| ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag
| ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag
| ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag
| CREATE TABLE (IF NOT EXISTS)? target=multipartIdentifier
LIKE source=multipartIdentifier
(tableProvider |
rowFormat |
createFileFormat |
locationSpec |
(TBLPROPERTIES tableProps=propertyList))* #createTableLike
;

tableProvider
: USING multipartIdentifier
;

locationSpec
: LOCATION stringLit
;

propertyList
: '(' property (',' property)* ')'
;

property
: key=propertyKey ('='? value=propertyValue)?
;

propertyKey
: identifier ('.' identifier)*
| stringLit
;

propertyValue
: INTEGER_VALUE
| DECIMAL_VALUE
| booleanValue
| stringLit
;

createFileFormat
: STORED AS fileFormat
| STORED BY storageHandler
;

fileFormat
: INPUTFORMAT inFmt=stringLit OUTPUTFORMAT outFmt=stringLit #tableFileFormat
| identifier #genericFileFormat
;

storageHandler
: stringLit (WITH SERDEPROPERTIES propertyList)?
;

rowFormat
: ROW FORMAT SERDE name=stringLit (WITH SERDEPROPERTIES props=propertyList)? #rowFormatSerde
| ROW FORMAT DELIMITED
(FIELDS TERMINATED BY fieldsTerminatedBy=stringLit (ESCAPED BY escapedBy=stringLit)?)?
(COLLECTION ITEMS TERMINATED BY collectionItemsTerminatedBy=stringLit)?
(MAP KEYS TERMINATED BY keysTerminatedBy=stringLit)?
(LINES TERMINATED BY linesSeparatedBy=stringLit)?
(NULL DEFINED AS nullDefinedAs=stringLit)? #rowFormatDelimited
;

callArgument
: expression #positionalArgument
| identifier '=>' expression #namedArgument
Expand Down Expand Up @@ -124,6 +183,10 @@ booleanValue
: TRUE | FALSE
;

stringLit
: STRING+
;

number
: MINUS? EXPONENT_VALUE #exponentLiteral
| MINUS? DECIMAL_VALUE #decimalLiteral
Expand Down Expand Up @@ -151,34 +214,60 @@ quotedIdentifier
;

nonReserved
: ALTER | AS | CALL | CREATE | DAYS | DELETE | EXISTS | HOURS | IF | NOT | OF | OR | TABLE
| REPLACE | RETAIN | VERSION | TAG
: ALTER | AS | BY | CALL | COLLECTION | CREATE | DAYS | DEFINED | DELETE | DELIMITED
| ESCAPED | EXISTS | FIELDS | FORMAT | HOURS | IF | INPUTFORMAT | ITEMS | KEYS | LIKE
| LINES | LOCATION | NOT | NULL | OF | OR | OUTPUTFORMAT | ROW | SERDE | SERDEPROPERTIES
| STORED | TABLE | TBLPROPERTIES | TERMINATED | REPLACE | RETAIN | USING | VERSION | TAG
| WITH
| TRUE | FALSE
| MAP
;

ALTER: 'ALTER';
AS: 'AS';
BY: 'BY';
CALL: 'CALL';
COLLECTION: 'COLLECTION';
CREATE: 'CREATE';
DAYS: 'DAYS';
DEFINED: 'DEFINED';
DELETE: 'DELETE';
DELIMITED: 'DELIMITED';
ESCAPED: 'ESCAPED';
EXISTS: 'EXISTS';
FIELDS: 'FIELDS';
FORMAT: 'FORMAT';
HOURS: 'HOURS';
IF : 'IF';
INPUTFORMAT: 'INPUTFORMAT';
ITEMS: 'ITEMS';
KEYS: 'KEYS';
LIKE: 'LIKE';
LINES: 'LINES';
LOCATION: 'LOCATION';
MINUTES: 'MINUTES';
NOT: 'NOT';
NULL: 'NULL';
OF: 'OF';
OR: 'OR';
OUTPUTFORMAT: 'OUTPUTFORMAT';
RENAME: 'RENAME';
REPLACE: 'REPLACE';
RETAIN: 'RETAIN';
ROW: 'ROW';
SERDE: 'SERDE';
SERDEPROPERTIES: 'SERDEPROPERTIES';
SHOW: 'SHOW';
STORED: 'STORED';
TABLE: 'TABLE';
TAG: 'TAG';
TAGS: 'TAGS';
TBLPROPERTIES: 'TBLPROPERTIES';
TERMINATED: 'TERMINATED';
TO: 'TO';
USING: 'USING';
VERSION: 'VERSION';
WITH: 'WITH';

TRUE: 'TRUE';
FALSE: 'FALSE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, PaimonSparkSession, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
import org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser.{NonReservedContext, QuotedIdentifierContext}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser.{CreateTableLikeContext, MultipartIdentifierContext, NonReservedContext, QuotedIdentifierContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.VariableSubstitution
Expand Down Expand Up @@ -65,17 +65,29 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf
if (isPaimonCommand(sqlTextAfterSubstitution)) {
parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement()))
.asInstanceOf[LogicalPlan]
} else if (isCatalogCreateTableLike(sqlTextAfterSubstitution)) {
applyParserRules(
parse(sqlTextAfterSubstitution)(parser => astBuilder.visit(parser.singleStatement()))
.asInstanceOf[LogicalPlan])
} else {
var plan = delegate.parsePlan(sqlText)
val sparkSession = PaimonSparkSession.active
parserRules(sparkSession).foreach(
rule => {
plan = rule.apply(plan)
})
plan
parsePlanWithDelegate(sqlText)
}
}

private def parsePlanWithDelegate(sqlText: String): LogicalPlan = {
applyParserRules(delegate.parsePlan(sqlText))
}

private def applyParserRules(plan: LogicalPlan): LogicalPlan = {
var rewrittenPlan = plan
val sparkSession = PaimonSparkSession.active
parserRules(sparkSession).foreach(
rule => {
rewrittenPlan = rule.apply(rewrittenPlan)
})
rewrittenPlan
}

private def parserRules(sparkSession: SparkSession): Seq[Rule[LogicalPlan]] = {
Seq(
RewritePaimonViewCommands(sparkSession),
Expand Down Expand Up @@ -140,6 +152,66 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf
normalized.contains("delete tag")))
}

private def isCatalogCreateTableLike(sqlText: String): Boolean = {
if (org.apache.spark.SPARK_VERSION < "3.4") {
return false
}

tokenStream(sqlText) match {
case Some(tokens) if maybeCreateTableLike(tokens) =>
isParsedCatalogCreateTableLike(sqlText)
case _ => false
}
}

private def tokenStream(sqlText: String): Option[CommonTokenStream] = {
try {
val lexer = new PaimonSqlExtensionsLexer(
new UpperCaseCharStream(CharStreams.fromString(sqlText)))
lexer.removeErrorListeners()
lexer.addErrorListener(PaimonParseErrorListener)

val tokens = new CommonTokenStream(lexer)
tokens.fill()
Some(tokens)
} catch {
case _: PaimonParseException => None
}
}

private def maybeCreateTableLike(tokenStream: CommonTokenStream): Boolean = {
val tokens = tokenStream.getTokens.asScala
.filter(token => token.getChannel == Token.DEFAULT_CHANNEL)
.filterNot(token => token.getType == Token.EOF)

tokens.length >= 5 &&
tokens.head.getType == PaimonSqlExtensionsParser.CREATE &&
tokens(1).getType == PaimonSqlExtensionsParser.TABLE &&
tokens.exists(_.getType == PaimonSqlExtensionsParser.LIKE) &&
tokens.exists(_.getText == ".")
}

private def isParsedCatalogCreateTableLike(sqlText: String): Boolean = {
try {
parse(sqlText) {
parser =>
val singleStatement = parser.singleStatement()
singleStatement.statement() match {
case ctx: CreateTableLikeContext
if isCatalogIdentifier(ctx.target) || isCatalogIdentifier(ctx.source) =>
true
case _ => false
}
}
} catch {
case _: PaimonParseException => false
}
}

private def isCatalogIdentifier(identifier: MultipartIdentifierContext): Boolean = {
identifier.parts.size() == 3
}

protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = {
val lexer = new PaimonSqlExtensionsLexer(
new UpperCaseCharStream(CharStreams.fromString(command)))
Expand Down
Loading