From 360ec82475bd579c0669580ee08daa74622f35f0 Mon Sep 17 00:00:00 2001 From: Ferenc Csaky Date: Wed, 20 May 2026 16:48:15 +0200 Subject: [PATCH] chore: Bump Flink version to 2.2.1 --- CLAUDE.md | 2 +- flink-sql-runner/pom.xml | 46 +- flink-sql-runner/src/main/docker/Dockerfile | 4 +- .../exec/serde/RexNodeJsonDeserializer.java | 618 ------------------ pom.xml | 2 +- 5 files changed, 4 insertions(+), 668 deletions(-) delete mode 100644 flink-sql-runner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java diff --git a/CLAUDE.md b/CLAUDE.md index 84e75c55..cf2fadcc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -62,7 +62,7 @@ java -jar flink-sql-runner/target/flink-sql-runner.jar --sqlfile script.sql --ud ## Technology Stack - **Java 17** - Base language version -- **Apache Flink 2.2.0** - Stream processing framework +- **Apache Flink 2.2.1** - Stream processing framework - **Maven** - Build system - **Lombok** - Code generation - **PicoCLI** - Command line interface diff --git a/flink-sql-runner/pom.xml b/flink-sql-runner/pom.xml index aaea8d97..94f73954 100644 --- a/flink-sql-runner/pom.xml +++ b/flink-sql-runner/pom.xml @@ -51,17 +51,9 @@ provided - - - - - - - - org.apache.flink - flink-table-planner_2.12 + flink-table-planner-loader ${flink.version} provided @@ -521,26 +513,6 @@ org.apache.maven.plugins maven-dependency-plugin - - unpack-flink-table-planner-without-rex-json-deser - - unpack - - prepare-package - - - - org.apache.flink - flink-table-planner_2.12 - ${flink.version} - jar - true - ${project.build.directory}/flink-table-planner-temp - org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer** - - - - unpack-iceberg-aws-bundle-without-logging @@ -675,22 +647,6 @@ org.apache.maven.plugins maven-antrun-plugin - - - repack-flink-table-planner - - run - - prepare-package - - - - - - - - - repack-iceberg-aws-bundle diff --git a/flink-sql-runner/src/main/docker/Dockerfile b/flink-sql-runner/src/main/docker/Dockerfile index f72d7c77..e34f229d 100644 --- a/flink-sql-runner/src/main/docker/Dockerfile +++ b/flink-sql-runner/src/main/docker/Dockerfile @@ -46,7 +46,6 @@ RUN set -eu; \ && rm -f /tmp/async-profiler.tar.gz \ && rm -rf /var/lib/apt/lists/* -COPY flink-table-planner_2.12-*.jar /opt/flink/lib COPY flink-s3-fs-hadoop-*.jar /opt/flink/lib COPY hadoop-common-*.jar /opt/flink/lib COPY hadoop-hdfs-client-*.jar /opt/flink/lib @@ -58,8 +57,7 @@ COPY flink-sql-runner.uber.jar /opt/flink/lib/sql-runner.uber.jar COPY --chmod=755 sql-runner /opt/flink/bin/sql-runner COPY --chmod=755 entrypoint.sh /entrypoint.sh -RUN rm -rf /opt/flink/lib/flink-table-planner-loader-*.jar \ - && echo 'UEsDBBQACAgIAG2UllwAAAAAAAAAAAAAAAAUAAQATUVUQS1JTkYvTUFOSUZFU1QuTUb+ygAA803My0xLLS7RDUstKs7Mz7NSMNQz4HIuSk0sSU3Rdaq0UkjLyczL1i0uzNEtKs3LSy3i4gIAUEsHCL/cUjU2AAAANAAAAFBLAQIUABQACAgIAG2Ully/3FI1NgAAADQAAAAUAAQAAAAAAAAAAAAAAAAAAABNRVRBLUlORi9NQU5JRkVTVC5NRv7KAABQSwUGAAAAAAEAAQBGAAAAfAAAAAAA' | base64 -d > /opt/flink/noop.jar \ +RUN echo 'UEsDBBQACAgIAG2UllwAAAAAAAAAAAAAAAAUAAQATUVUQS1JTkYvTUFOSUZFU1QuTUb+ygAA803My0xLLS7RDUstKs7Mz7NSMNQz4HIuSk0sSU3Rdaq0UkjLyczL1i0uzNEtKs3LSy3i4gIAUEsHCL/cUjU2AAAANAAAAFBLAQIUABQACAgIAG2Ully/3FI1NgAAADQAAAAUAAQAAAAAAAAAAAAAAAAAAABNRVRBLUlORi9NQU5JRkVTVC5NRv7KAABQSwUGAAAAAAEAAQBGAAAAfAAAAAAA' | base64 -d > /opt/flink/noop.jar \ && mkdir -p /opt/flink/hadoop-conf \ && echo 'fs.s3a.aws.credentials.providercom.amazonaws.auth.DefaultAWSCredentialsProviderChain' > /opt/flink/hadoop-conf/core-site.xml \ && chown -R flink:flink /opt/flink diff --git a/flink-sql-runner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-sql-runner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java deleted file mode 100644 index 9b076063..00000000 --- a/flink-sql-runner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java +++ /dev/null @@ -1,618 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.serde; - -import static org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore.IDENTIFIER; -import static org.apache.flink.table.api.config.TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS; -import static org.apache.flink.table.api.config.TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanSerdeUtil.loadClass; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_ALPHA; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_BOUND_LOWER; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_BOUND_TYPE; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_BOUND_UPPER; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_CATALOG_NAME; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_CLASS; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_CORREL; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_EXPR; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_INPUT_INDEX; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_INTERNAL_NAME; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_KIND; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NAME; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_NULL_AS; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_OPERANDS; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_ORDER_KEYS; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_PARTITION_KEYS; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_RANGES; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SARG; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SQL_KIND; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SYMBOL; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SYNTAX; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_SYSTEM_NAME; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_TYPE; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.FIELD_NAME_VALUE; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_CALL; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_CORREL_VARIABLE; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_FIELD_ACCESS; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_INPUT_REF; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_LITERAL; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_PATTERN_INPUT_REF; -import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer.KIND_TABLE_ARG_CALL; -import static org.apache.flink.table.planner.typeutils.SymbolUtil.serializableToCalcite; - -import java.io.IOException; -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import javax.annotation.Nullable; -import org.apache.calcite.avatica.util.ByteString; -import org.apache.calcite.rel.core.CorrelationId; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexBuilder; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexUnknownAs; -import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.SqlSyntax; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.validate.SqlNameMatchers; -import org.apache.calcite.util.DateString; -import org.apache.calcite.util.Sarg; -import org.apache.calcite.util.TimeString; -import org.apache.calcite.util.TimestampString; -import org.apache.flink.annotation.Internal; -import org.apache.flink.calcite.shaded.com.google.common.collect.BoundType; -import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableRangeSet; -import org.apache.flink.calcite.shaded.com.google.common.collect.Range; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation; -import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore; -import org.apache.flink.table.catalog.ContextResolvedFunction; -import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.functions.FunctionIdentifier; -import org.apache.flink.table.functions.UserDefinedFunction; -import org.apache.flink.table.functions.UserDefinedFunctionHelper; -import org.apache.flink.table.planner.calcite.RexTableArgCall; -import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction; -import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; -import org.apache.flink.table.planner.functions.sql.BuiltInSqlOperator; -import org.apache.flink.table.planner.typeutils.SymbolUtil.SerializableSymbol; - -/** - * Copied over from the Flink repo until the PTF fix is not released upstream. TODO: delete when we - * update to Flink 2.2.1+ - */ -@Internal -final class RexNodeJsonDeserializer extends StdDeserializer { - private static final long serialVersionUID = 1L; - - RexNodeJsonDeserializer() { - super(RexNode.class); - } - - @Override - public RexNode deserialize(JsonParser jsonParser, DeserializationContext ctx) throws IOException { - final JsonNode jsonNode = jsonParser.readValueAsTree(); - final SerdeContext serdeContext = SerdeContext.get(ctx); - return deserialize(jsonNode, serdeContext); - } - - private static RexNode deserialize(JsonNode jsonNode, SerdeContext serdeContext) - throws IOException { - final String kind = jsonNode.required(FIELD_NAME_KIND).asText(); - switch (kind) { - case KIND_INPUT_REF: - return deserializeInputRef(jsonNode, serdeContext); - case KIND_LITERAL: - return deserializeLiteral(jsonNode, serdeContext); - case KIND_FIELD_ACCESS: - return deserializeFieldAccess(jsonNode, serdeContext); - case KIND_CORREL_VARIABLE: - return deserializeCorrelVariable(jsonNode, serdeContext); - case KIND_PATTERN_INPUT_REF: - return deserializePatternFieldRef(jsonNode, serdeContext); - case KIND_TABLE_ARG_CALL: - return deserializeTableArgCall(jsonNode, serdeContext); - case KIND_CALL: - return deserializeCall(jsonNode, serdeContext); - default: - throw new TableException("Cannot convert to RexNode: " + jsonNode.toPrettyString()); - } - } - - private static RexNode deserializeInputRef(JsonNode jsonNode, SerdeContext serdeContext) { - final int inputIndex = jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue(); - final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE); - final RelDataType fieldType = - RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext); - return serdeContext.getRexBuilder().makeInputRef(fieldType, inputIndex); - } - - private static RexNode deserializeLiteral(JsonNode jsonNode, SerdeContext serdeContext) { - final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE); - final RelDataType relDataType = - RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext); - if (jsonNode.has(FIELD_NAME_SARG)) { - return deserializeSarg(jsonNode.required(FIELD_NAME_SARG), relDataType, serdeContext); - } else if (jsonNode.has(FIELD_NAME_VALUE)) { - final Object value = - deserializeLiteralValue(jsonNode, relDataType.getSqlTypeName(), serdeContext); - if (value == null) { - return serdeContext.getRexBuilder().makeNullLiteral(relDataType); - } - return serdeContext.getRexBuilder().makeLiteral(value, relDataType, true); - } else { - throw new TableException("Unknown literal: " + jsonNode.toPrettyString()); - } - } - - @SuppressWarnings({"UnstableApiUsage", "rawtypes", "unchecked"}) - private static RexNode deserializeSarg( - JsonNode sargNode, RelDataType relDataType, SerdeContext serdeContext) { - final RexBuilder rexBuilder = serdeContext.getRexBuilder(); - final ArrayNode rangesNode = (ArrayNode) sargNode.required(FIELD_NAME_RANGES); - final ImmutableRangeSet.Builder builder = ImmutableRangeSet.builder(); - for (JsonNode rangeNode : rangesNode) { - Range range = Range.all(); - if (rangeNode.has(FIELD_NAME_BOUND_LOWER)) { - final JsonNode lowerNode = rangeNode.required(FIELD_NAME_BOUND_LOWER); - final Comparable boundValue = - (Comparable) - deserializeLiteralValue(lowerNode, relDataType.getSqlTypeName(), serdeContext); - assert boundValue != null; - final BoundType boundType = - serializableToCalcite( - BoundType.class, lowerNode.required(FIELD_NAME_BOUND_TYPE).asText()); - final Range r = - boundType == BoundType.OPEN ? Range.greaterThan(boundValue) : Range.atLeast(boundValue); - range = range.intersection(r); - } - if (rangeNode.has(FIELD_NAME_BOUND_UPPER)) { - final JsonNode upperNode = rangeNode.required(FIELD_NAME_BOUND_UPPER); - final Comparable boundValue = - (Comparable) - deserializeLiteralValue(upperNode, relDataType.getSqlTypeName(), serdeContext); - assert boundValue != null; - final BoundType boundType = - serializableToCalcite( - BoundType.class, upperNode.required(FIELD_NAME_BOUND_TYPE).asText()); - final Range r = - boundType == BoundType.OPEN ? Range.lessThan(boundValue) : Range.atMost(boundValue); - range = range.intersection(r); - } - if (range.hasUpperBound() || range.hasLowerBound()) { - builder.add(range); - } - } - final RexUnknownAs nullAs = - serializableToCalcite(RexUnknownAs.class, sargNode.required(FIELD_NAME_NULL_AS).asText()); - return rexBuilder.makeSearchArgumentLiteral(Sarg.of(nullAs, builder.build()), relDataType); - } - - private static @Nullable Object deserializeLiteralValue( - JsonNode literalNode, SqlTypeName sqlTypeName, SerdeContext serdeContext) { - final JsonNode valueNode = literalNode.required(FIELD_NAME_VALUE); - if (valueNode.isNull()) { - return null; - } - - switch (sqlTypeName) { - case BOOLEAN: - return valueNode.booleanValue(); - case TINYINT: - case SMALLINT: - case INTEGER: - case BIGINT: - case FLOAT: - case DOUBLE: - case DECIMAL: - case INTERVAL_YEAR: - case INTERVAL_YEAR_MONTH: - case INTERVAL_MONTH: - case INTERVAL_DAY: - case INTERVAL_DAY_HOUR: - case INTERVAL_DAY_MINUTE: - case INTERVAL_DAY_SECOND: - case INTERVAL_HOUR: - case INTERVAL_HOUR_MINUTE: - case INTERVAL_HOUR_SECOND: - case INTERVAL_MINUTE: - case INTERVAL_MINUTE_SECOND: - case INTERVAL_SECOND: - return new BigDecimal(valueNode.asText()); - case DATE: - return new DateString(valueNode.asText()); - case TIME: - return new TimeString(valueNode.asText()); - case TIMESTAMP: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - return new TimestampString(valueNode.asText()); - case BINARY: - case VARBINARY: - return ByteString.ofBase64(valueNode.asText()); - case CHAR: - case VARCHAR: - return serdeContext.getRexBuilder().makeLiteral(valueNode.asText()).getValue(); - case SYMBOL: - final JsonNode symbolNode = literalNode.required(FIELD_NAME_SYMBOL); - final SerializableSymbol symbol = - SerializableSymbol.of(symbolNode.asText(), valueNode.asText()); - return serializableToCalcite(symbol); - default: - throw new TableException("Unknown literal: " + valueNode); - } - } - - private static RexNode deserializeFieldAccess(JsonNode jsonNode, SerdeContext serdeContext) - throws IOException { - final String fieldName = jsonNode.required(FIELD_NAME_NAME).asText(); - final JsonNode exprNode = jsonNode.required(FIELD_NAME_EXPR); - final RexNode refExpr = deserialize(exprNode, serdeContext); - return serdeContext.getRexBuilder().makeFieldAccess(refExpr, fieldName, true); - } - - private static RexNode deserializeCorrelVariable(JsonNode jsonNode, SerdeContext serdeContext) { - final String correl = jsonNode.required(FIELD_NAME_CORREL).asText(); - final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE); - final RelDataType fieldType = - RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext); - return serdeContext.getRexBuilder().makeCorrel(fieldType, new CorrelationId(correl)); - } - - private static RexNode deserializePatternFieldRef(JsonNode jsonNode, SerdeContext serdeContext) { - final int inputIndex = jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue(); - final String alpha = jsonNode.required(FIELD_NAME_ALPHA).asText(); - final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE); - final RelDataType fieldType = - RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext); - return serdeContext.getRexBuilder().makePatternFieldRef(alpha, fieldType, inputIndex); - } - - private static RexNode deserializeTableArgCall(JsonNode jsonNode, SerdeContext serdeContext) { - final JsonNode logicalTypeNode = jsonNode.required(FIELD_NAME_TYPE); - final RelDataType callType = - RelDataTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext); - - final int inputIndex = jsonNode.required(FIELD_NAME_INPUT_INDEX).intValue(); - - final JsonNode partitionKeysNode = jsonNode.required(FIELD_NAME_PARTITION_KEYS); - final int[] partitionKeys = new int[partitionKeysNode.size()]; - for (int i = 0; i < partitionKeysNode.size(); ++i) { - partitionKeys[i] = partitionKeysNode.get(i).asInt(); - } - - final JsonNode orderKeysNode = jsonNode.required(FIELD_NAME_ORDER_KEYS); - final int[] orderKeys = new int[orderKeysNode.size()]; - for (int i = 0; i < orderKeysNode.size(); ++i) { - orderKeys[i] = orderKeysNode.get(i).asInt(); - } - - return new RexTableArgCall(callType, inputIndex, partitionKeys, orderKeys); - } - - private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext serdeContext) - throws IOException { - final SqlOperator operator = deserializeSqlOperator(jsonNode, serdeContext); - final ArrayNode operandNodes = (ArrayNode) jsonNode.get(FIELD_NAME_OPERANDS); - final List rexOperands; - if (operandNodes == null) { - rexOperands = List.of(); - } else { - rexOperands = new ArrayList<>(); - for (JsonNode node : operandNodes) { - rexOperands.add(deserialize(node, serdeContext)); - } - } - final RelDataType callType; - if (jsonNode.has(FIELD_NAME_TYPE)) { - final JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE); - callType = RelDataTypeJsonDeserializer.deserialize(typeNode, serdeContext); - } else { - callType = serdeContext.getRexBuilder().deriveReturnType(operator, rexOperands); - } - return serdeContext.getRexBuilder().makeCall(callType, operator, rexOperands); - } - - // -------------------------------------------------------------------------------------------- - - /** Logic shared with {@link AggregateCallJsonDeserializer}. */ - static SqlOperator deserializeSqlOperator(JsonNode jsonNode, SerdeContext serdeContext) { - final SqlSyntax syntax; - if (jsonNode.has(FIELD_NAME_SYNTAX)) { - syntax = - serializableToCalcite(SqlSyntax.class, jsonNode.required(FIELD_NAME_SYNTAX).asText()); - } else { - syntax = SqlSyntax.FUNCTION; - } - - if (jsonNode.has(FIELD_NAME_INTERNAL_NAME)) { - return deserializeInternalFunction( - jsonNode.required(FIELD_NAME_INTERNAL_NAME).asText(), syntax, serdeContext); - } else if (jsonNode.has(FIELD_NAME_CATALOG_NAME)) { - return deserializeCatalogFunction(jsonNode, syntax, serdeContext); - } else if (jsonNode.has(FIELD_NAME_CLASS)) { - return deserializeFunctionClass(jsonNode, serdeContext); - } else if (jsonNode.has(FIELD_NAME_SYSTEM_NAME)) { - return deserializeSystemFunction( - jsonNode.required(FIELD_NAME_SYSTEM_NAME).asText(), syntax, serdeContext); - } else if (jsonNode.has(FIELD_NAME_SQL_KIND)) { - return deserializeInternalFunction( - syntax, SqlKind.valueOf(jsonNode.get(FIELD_NAME_SQL_KIND).asText())); - } else { - throw new TableException("Invalid function call."); - } - } - - private static SqlOperator deserializeSystemFunction( - String systemName, SqlSyntax syntax, SerdeContext serdeContext) { - // This method covers both temporary system functions and permanent system - // functions from a module - final Optional systemOperator = - lookupOptionalSqlOperator(FunctionIdentifier.of(systemName), syntax, serdeContext, true); - if (systemOperator.isPresent()) { - return systemOperator.get(); - } - throw missingSystemFunction(systemName); - } - - private static SqlOperator deserializeInternalFunction( - String internalName, SqlSyntax syntax, SerdeContext serdeContext) { - // Try $FUNC$1 - final Optional internalOperator = - lookupOptionalSqlOperator(FunctionIdentifier.of(internalName), syntax, serdeContext, false); - if (internalOperator.isPresent()) { - return internalOperator.get(); - } - // Try FUNC - final String publicName = BuiltInSqlOperator.extractNameFromQualifiedName(internalName); - final Optional latestOperator = - lookupOptionalSqlOperator(FunctionIdentifier.of(publicName), syntax, serdeContext, true); - if (latestOperator.isPresent()) { - return latestOperator.get(); - } - - Optional sqlStdOperator = lookupOptionalSqlStdOperator(publicName, syntax, null); - if (sqlStdOperator.isPresent()) { - return sqlStdOperator.get(); - } - - throw new TableException( - String.format( - "Could not resolve internal system function '%s'. " - + "This is a bug, please file an issue.", - internalName)); - } - - private static SqlOperator deserializeInternalFunction(SqlSyntax syntax, SqlKind sqlKind) { - final Optional stdOperator = lookupOptionalSqlStdOperator("", syntax, sqlKind); - if (stdOperator.isPresent()) { - return stdOperator.get(); - } - - throw new TableException( - String.format( - "Could not resolve internal system function '%s'. " - + "This is a bug, please file an issue.", - sqlKind.name())); - } - - private static SqlOperator deserializeFunctionClass( - JsonNode jsonNode, SerdeContext serdeContext) { - final String className = jsonNode.required(FIELD_NAME_CLASS).asText(); - final Class functionClass = loadClass(className, serdeContext, "function"); - final UserDefinedFunction functionInstance = - UserDefinedFunctionHelper.instantiateFunction(functionClass); - - final ContextResolvedFunction resolvedFunction; - // This can never be a system function - // because we never serialize classes for system functions - if (jsonNode.has(FIELD_NAME_CATALOG_NAME)) { - final ObjectIdentifier objectIdentifier = - ObjectIdentifierJsonDeserializer.deserialize( - jsonNode.required(FIELD_NAME_CATALOG_NAME).asText(), serdeContext); - resolvedFunction = - ContextResolvedFunction.permanent( - FunctionIdentifier.of(objectIdentifier), functionInstance); - } else { - resolvedFunction = ContextResolvedFunction.anonymous(functionInstance); - } - - switch (functionInstance.getKind()) { - case SCALAR: - case ASYNC_SCALAR: - case TABLE: - case PROCESS_TABLE: - return BridgingSqlFunction.of( - serdeContext.getFlinkContext(), serdeContext.getTypeFactory(), resolvedFunction); - case AGGREGATE: - return BridgingSqlAggFunction.of( - serdeContext.getFlinkContext(), serdeContext.getTypeFactory(), resolvedFunction); - default: - throw new TableException( - String.format( - "Unsupported anonymous function kind '%s' for class '%s'.", - functionInstance.getKind(), className)); - } - } - - private static SqlOperator deserializeCatalogFunction( - JsonNode jsonNode, SqlSyntax syntax, SerdeContext serdeContext) { - final CatalogPlanRestore restoreStrategy = - serdeContext.getConfiguration().get(PLAN_RESTORE_CATALOG_OBJECTS); - final FunctionIdentifier identifier = - FunctionIdentifier.of( - ObjectIdentifierJsonDeserializer.deserialize( - jsonNode.required(FIELD_NAME_CATALOG_NAME).asText(), serdeContext)); - - switch (restoreStrategy) { - case ALL: - { - final Optional lookupOperator = - lookupOptionalSqlOperator(identifier, syntax, serdeContext, false); - if (lookupOperator.isPresent()) { - return lookupOperator.get(); - } else if (jsonNode.has(FIELD_NAME_CLASS)) { - return deserializeFunctionClass(jsonNode, serdeContext); - } - throw missingFunctionFromCatalog(identifier, false); - } - case ALL_ENFORCED: - { - if (jsonNode.has(FIELD_NAME_CLASS)) { - return deserializeFunctionClass(jsonNode, serdeContext); - } - final Optional lookupOperator = - lookupOptionalSqlOperator(identifier, syntax, serdeContext, false); - if (lookupOperator.map(RexNodeJsonDeserializer::isTemporary).orElse(false)) { - return lookupOperator.get(); - } - throw lookupDisabled(identifier); - } - case IDENTIFIER: - final Optional lookupOperator = - lookupOptionalSqlOperator(identifier, syntax, serdeContext, true); - if (lookupOperator.isPresent()) { - return lookupOperator.get(); - } else { - throw missingFunctionFromCatalog(identifier, true); - } - default: - throw new TableException("Unsupported restore strategy: " + restoreStrategy); - } - } - - private static boolean isTemporary(SqlOperator sqlOperator) { - if (sqlOperator instanceof BridgingSqlFunction) { - return ((BridgingSqlFunction) sqlOperator).getResolvedFunction().isTemporary(); - } else if (sqlOperator instanceof BridgingSqlAggFunction) { - return ((BridgingSqlAggFunction) sqlOperator).getResolvedFunction().isTemporary(); - } - return false; - } - - private static Optional lookupOptionalSqlOperator( - FunctionIdentifier identifier, - SqlSyntax syntax, - SerdeContext serdeContext, - boolean throwOnError) { - final List foundOperators = new ArrayList<>(); - try { - serdeContext - .getOperatorTable() - .lookupOperatorOverloads( - new SqlIdentifier(identifier.toList(), new SqlParserPos(0, 0)), - null, // category - syntax, - foundOperators, - SqlNameMatchers.liberal()); - if (foundOperators.size() != 1) { - return Optional.empty(); - } - return Optional.of(foundOperators.get(0)); - } catch (Throwable t) { - if (throwOnError) { - throw new TableException( - String.format("Error during lookup of function '%s'.", identifier), t); - } - return Optional.empty(); - } - } - - private static Optional lookupOptionalSqlStdOperator( - String operatorName, SqlSyntax syntax, @Nullable SqlKind sqlKind) { - List foundOperators = new ArrayList<>(); - // try to find operator from std operator table. - SqlStdOperatorTable.instance() - .lookupOperatorOverloads( - new SqlIdentifier(operatorName, new SqlParserPos(0, 0)), - null, // category - syntax, - foundOperators, - SqlNameMatchers.liberal()); - if (foundOperators.size() == 1) { - return Optional.of(foundOperators.get(0)); - } - // in case different operator has the same kind, check with both name and kind. - return foundOperators.stream() - .filter(o -> sqlKind != null && o.getKind() == sqlKind) - .findFirst(); - } - - private static TableException missingSystemFunction(String systemName) { - return new TableException( - String.format( - "Could not lookup system function '%s'. " - + "Make sure it has been registered before because temporary " - + "functions are not contained in the persisted plan. " - + "If the function was provided by a module, make sure to reloaded " - + "all used modules in the correct order.", - systemName)); - } - - private static TableException lookupDisabled(FunctionIdentifier identifier) { - return new TableException( - String.format( - "The persisted plan does not include all required catalog metadata for function '%s'. " - + "However, lookup is disabled because option '%s' = '%s'. " - + "Either enable the catalog lookup with '%s' = '%s' / '%s' or " - + "regenerate the plan with '%s' != '%s'. " - + "Make sure the function is not compiled as a temporary function.", - identifier.asSummaryString(), - PLAN_RESTORE_CATALOG_OBJECTS.key(), - CatalogPlanRestore.ALL_ENFORCED.name(), - PLAN_RESTORE_CATALOG_OBJECTS.key(), - IDENTIFIER.name(), - CatalogPlanRestore.ALL.name(), - PLAN_COMPILE_CATALOG_OBJECTS.key(), - CatalogPlanCompilation.IDENTIFIER.name())); - } - - private static TableException missingFunctionFromCatalog( - FunctionIdentifier identifier, boolean forcedLookup) { - final String initialReason; - if (forcedLookup) { - initialReason = - String.format( - "Cannot resolve function '%s' and catalog lookup is forced because '%s' = '%s'. ", - identifier, PLAN_RESTORE_CATALOG_OBJECTS.key(), CatalogPlanRestore.IDENTIFIER); - } else { - initialReason = - String.format( - "Cannot resolve function '%s' and the persisted plan does not include " - + "all required catalog function metadata. ", - identifier); - } - return new TableException( - initialReason - + String.format( - "Make sure a registered catalog contains the function when restoring or " - + "the function is available as a temporary function. " - + "Otherwise regenerate the plan with '%s' != '%s' and make " - + "sure the function was not compiled as a temporary function.", - PLAN_COMPILE_CATALOG_OBJECTS.key(), CatalogPlanCompilation.IDENTIFIER.name())); - } -} diff --git a/pom.xml b/pom.xml index 1ee2e6e1..c59774b8 100644 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,7 @@ 17 - 2.2.0 + 2.2.1 flink-2.2 ${flink.version}-java17 3.6.0-2.2