diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionIncrementMaintainer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionIncrementMaintainer.java index 4b29f95aeef8ae..38d59167015c09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionIncrementMaintainer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionIncrementMaintainer.java @@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; +import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.SetOperation; @@ -58,6 +59,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; +import org.apache.doris.nereids.trees.plans.visitor.ExpressionLineageReplacer; import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.util.ExpressionUtils; @@ -69,6 +71,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -198,8 +201,7 @@ public Void visitLogicalCTEConsumer(LogicalCTEConsumer cteConsumer, PartitionInc Set> shuttledEqualSlotSet = context.getShuttledEqualSlotSet(); for (Set equalSlotSet : shuttledEqualSlotSet) { if (equalSlotSet.contains(consumerSlot)) { - Expression shuttledSlot = ExpressionUtils.shuttleExpressionWithLineage( - producerSlot, producerPlan); + Expression shuttledSlot = context.shuttleExpressionWithLineage(producerSlot, producerPlan); if (shuttledSlot instanceof Slot) { equalSlotSet.add((Slot) shuttledSlot); } @@ -239,7 +241,7 @@ public Void visitLogicalJoin(LogicalJoin join, continue; } Pair, Set> partitionEqualSlotPair = - calEqualSet((SlotReference) partitionSlotToCheck, join); + calEqualSet((SlotReference) partitionSlotToCheck, join, context); if (!partitionEqualSlotPair.value().isEmpty()) { context.getShuttledEqualSlotSet().add(partitionEqualSlotPair.value()); } @@ -526,31 +528,24 @@ private Set getPartitionColumnsToCheck(PartitionIncrementCheckCon */ private static boolean checkPartition(Collection expressionsToCheck, Plan plan, PartitionIncrementCheckContext context) { - Set> partitionAndExprEntrySet - = new HashSet<>(context.getPartitionAndRefExpressionMap().entrySet()); + List> partitionAndExprEntryList + = new ArrayList<>(context.getPartitionAndRefExpressionMap().entrySet()); + List partitionExpressions = new ArrayList<>(partitionAndExprEntryList.size()); + for (Entry entry : partitionAndExprEntryList) { + partitionExpressions.add(entry.getValue().getPartitionExpression().orElse(entry.getKey())); + } + List partitionExpressionActualList = + context.shuttleAndNormalizeExpressionWithLineage(partitionExpressions, context.getOriginalPlan()); + List expressionsShuttledToCheck = + context.shuttleAndNormalizeExpressionWithLineage(expressionsToCheck, context.getOriginalPlan()); boolean checked = false; - for (Map.Entry partitionExpressionEntry - : partitionAndExprEntrySet) { - NamedExpression partitionNamedExpression = partitionExpressionEntry.getKey(); + for (int i = 0; i < partitionAndExprEntryList.size(); i++) { + Map.Entry partitionExpressionEntry = + partitionAndExprEntryList.get(i); RelatedTableColumnInfo partitionTableColumnInfo = partitionExpressionEntry.getValue(); - Optional partitionExpressionOpt = partitionTableColumnInfo.getPartitionExpression(); - Expression partitionExpressionActual = partitionExpressionOpt - .map(expr -> ExpressionUtils.shuttleExpressionWithLineage(expr, - context.getOriginalPlan())) - .orElseGet(() -> ExpressionUtils.shuttleExpressionWithLineage(partitionNamedExpression, - context.getOriginalPlan())); - // merge date_trunc - partitionExpressionActual = new ExpressionNormalization().rewrite(partitionExpressionActual, - new ExpressionRewriteContext(context.getCascadesContext())); + Expression partitionExpressionActual = partitionExpressionActualList.get(i); OUTER_CHECK: - for (Expression projectSlotToCheck : expressionsToCheck) { - Expression expressionShuttledToCheck = - ExpressionUtils.shuttleExpressionWithLineage(projectSlotToCheck, - context.getOriginalPlan()); - // merge date_trunc - expressionShuttledToCheck = new ExpressionNormalization().rewrite(expressionShuttledToCheck, - new ExpressionRewriteContext(context.getCascadesContext())); - + for (Expression expressionShuttledToCheck : expressionsShuttledToCheck) { Set expressionToCheckSlots = expressionShuttledToCheck.collectToSet(SlotReference.class::isInstance); Set partitionColumnSlots = @@ -683,8 +678,19 @@ public static final class PartitionIncrementCheckContext { private final Set> shuttledEqualSlotSet = new HashSet<>(); private final Map producerCteIdToPlanMap; private final Plan originalPlan; + // Cache lineage-visible named expressions per plan identity to avoid repeated full plan walks. + private final Map> planLineageExpressionIndexes = new IdentityHashMap<>(); + // Cache normalized expressions within this check context; normalization uses the same CascadesContext. + private final Map normalizedExpressionMap = new IdentityHashMap<>(); + // Reuse the normalization rewriter during one partition lineage check. + private final ExpressionNormalization expressionNormalization = new ExpressionNormalization(); + // Reuse the rewrite context because all normalization in this checker shares the same CascadesContext. + private final ExpressionRewriteContext expressionRewriteContext; private boolean failFast = false; + /** + * Construct partition increment check context. + */ public PartitionIncrementCheckContext(NamedExpression mvPartitionColumn, Expression mvPartitionExpression, Map producerCteIdToPlanMap, Plan originalPlan, @@ -694,6 +700,7 @@ public PartitionIncrementCheckContext(NamedExpression mvPartitionColumn, this.cascadesContext = cascadesContext; this.producerCteIdToPlanMap = producerCteIdToPlanMap; this.originalPlan = originalPlan; + this.expressionRewriteContext = new ExpressionRewriteContext(cascadesContext); } public Set getFailReasons() { @@ -743,6 +750,64 @@ public Plan getOriginalPlan() { return originalPlan; } + private Expression shuttleExpressionWithLineage(Expression expression, Plan plan) { + return shuttleExpressionWithLineage(ImmutableList.of(expression), plan).get(0); + } + + private List shuttleExpressionWithLineage(List expressions, + Plan plan) { + if (expressions.isEmpty()) { + return ImmutableList.of(); + } + ExpressionLineageReplacer.ExpressionReplaceContext replaceContext = + new ExpressionLineageReplacer.ExpressionReplaceContext(expressions); + for (NamedExpression namedExpression : getLineageExpressionIndex(plan)) { + if (!replaceContext.getUsedExprIdSet().contains(namedExpression.getExprId())) { + continue; + } + namedExpression.accept(ExpressionLineageReplacer.NamedExpressionCollector.INSTANCE, replaceContext); + } + List replacedExpressions = replaceContext.getReplacedExpressions(); + if (replacedExpressions == null || expressions.size() != replacedExpressions.size()) { + return ExpressionUtils.shuttleExpressionWithLineage(expressions, plan); + } + return replacedExpressions; + } + + private List shuttleAndNormalizeExpressionWithLineage( + Collection expressions, Plan plan) { + if (expressions.isEmpty()) { + return ImmutableList.of(); + } + List shuttledExpressions = + shuttleExpressionWithLineage(ImmutableList.copyOf(expressions), plan); + List normalizedExpressions = new ArrayList<>(shuttledExpressions.size()); + for (Expression expression : shuttledExpressions) { + normalizedExpressions.add(normalizeExpression(expression)); + } + return normalizedExpressions; + } + + private Expression normalizeExpression(Expression expression) { + Expression normalizedExpression = normalizedExpressionMap.get(expression); + if (normalizedExpression == null) { + normalizedExpression = expressionNormalization.rewrite(expression, expressionRewriteContext); + normalizedExpressionMap.put(expression, normalizedExpression); + } + return normalizedExpression; + } + + private List getLineageExpressionIndex(Plan plan) { + List lineageExpressionIndex = planLineageExpressionIndexes.get(plan); + if (lineageExpressionIndex == null) { + List collectedIndex = new ArrayList<>(); + plan.accept(LineageExpressionCollector.INSTANCE, collectedIndex); + lineageExpressionIndex = ImmutableList.copyOf(collectedIndex); + planLineageExpressionIndexes.put(plan, lineageExpressionIndex); + } + return lineageExpressionIndex; + } + /** * collect invalid table set to check self join */ @@ -772,6 +837,25 @@ public Void visitLogicalCatalogRelation(LogicalCatalogRelation relation, } } + private static final class LineageExpressionCollector extends DefaultPlanVisitor> { + private static final LineageExpressionCollector INSTANCE = new LineageExpressionCollector(); + + @Override + public Void visitGroupPlan(GroupPlan groupPlan, List lineageExpressionIndex) { + return null; + } + + @Override + public Void visit(Plan plan, List lineageExpressionIndex) { + for (Expression expression : plan.getExpressions()) { + if (expression instanceof NamedExpression) { + lineageExpressionIndex.add((NamedExpression) expression); + } + } + return super.visit(plan, lineageExpressionIndex); + } + } + /** * Add partitionEqualSlot to partitionAndRefExpressionToCheck if partitionExpression use the partitionSlot */ @@ -816,7 +900,8 @@ public Expression visitNamedExpression(NamedExpression namedExpression, Void con * the value equal set contain the slot itself */ private static Pair, Set> calEqualSet(Slot slot, - LogicalJoin join) { + LogicalJoin join, + PartitionIncrementCheckContext context) { Set partitionEqualSlotSet = new HashSet<>(); JoinType joinType = join.getJoinType(); if (joinType.isInnerJoin() || joinType.isSemiJoin()) { @@ -829,7 +914,7 @@ private static Pair, Set> calEqualSet(Slot slot, } List extendedPartitionEqualSlotSet = new ArrayList<>(partitionEqualSlotSet); extendedPartitionEqualSlotSet.add(slot); - List shuttledEqualExpressions = ExpressionUtils.shuttleExpressionWithLineage( + List shuttledEqualExpressions = context.shuttleExpressionWithLineage( extendedPartitionEqualSlotSet, join); for (Expression shuttledEqualExpression : shuttledEqualExpressions) { Set objects = shuttledEqualExpression.collectToSet(expr -> expr instanceof SlotReference); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java index 33bbb6122ce0bb..a4b61b80988528 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java @@ -1156,6 +1156,66 @@ public void test40() { }); } + // CTE + union all + wide aggregate should keep partition lineage inside each plan boundary. + @Test + public void testCteUnionAllWideAggregatePartitionLineage() { + PlanChecker.from(connectContext) + .checkExplain("with union_src as (\n" + + " select\n" + + " L_SHIPDATE as part_date,\n" + + " L_ORDERKEY as order_key,\n" + + " L_QUANTITY as metric1,\n" + + " L_EXTENDEDPRICE as metric2,\n" + + " L_DISCOUNT as metric3,\n" + + " L_TAX as metric4,\n" + + " L_RETURNFLAG as flag\n" + + " from lineitem\n" + + " union all\n" + + " select\n" + + " O_ORDERDATE as part_date,\n" + + " O_ORDERKEY as order_key,\n" + + " O_TOTALPRICE as metric1,\n" + + " O_TOTALPRICE as metric2,\n" + + " O_TOTALPRICE as metric3,\n" + + " O_TOTALPRICE as metric4,\n" + + " O_ORDERSTATUS as flag\n" + + " from orders\n" + + "), wide_project as (\n" + + " select\n" + + " date_trunc(part_date, 'day') as part_day,\n" + + " part_date,\n" + + " order_key,\n" + + " metric1,\n" + + " metric2,\n" + + " metric3,\n" + + " metric4,\n" + + " flag\n" + + " from union_src\n" + + ")\n" + + "select\n" + + " part_day,\n" + + " flag,\n" + + " count(*) as cnt,\n" + + " sum(metric1) as sum_metric1,\n" + + " sum(metric2) as sum_metric2,\n" + + " sum(metric3) as sum_metric3,\n" + + " sum(metric4) as sum_metric4,\n" + + " max(order_key) as max_key,\n" + + " min(order_key) as min_key\n" + + "from wide_project\n" + + "group by part_day, flag", + nereidsPlanner -> { + Plan rewrittenPlan = nereidsPlanner.getRewrittenPlan(); + RelatedTableInfo relatedTableInfo = + MaterializedViewUtils.getRelatedTableInfos("part_day", null, + rewrittenPlan, nereidsPlanner.getCascadesContext()); + successWith(relatedTableInfo, ImmutableSet.of( + ImmutableList.of("lineitem", "l_shipdate", "true", "true"), + ImmutableList.of("orders", "o_orderdate", "true", "true")), + "day"); + }); + } + // test with union but not union all @Test diff --git a/regression-test/suites/performance_p0/test_mtmv_partition_lineage_performance.groovy b/regression-test/suites/performance_p0/test_mtmv_partition_lineage_performance.groovy new file mode 100644 index 00000000000000..d16cc7c35a99b2 --- /dev/null +++ b/regression-test/suites/performance_p0/test_mtmv_partition_lineage_performance.groovy @@ -0,0 +1,666 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mtmv_partition_lineage_performance", "mtmv") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=false" + + sql """DROP MATERIALIZED VIEW IF EXISTS mtmv_partition_lineage_perf_mv""" + sql """DROP TABLE IF EXISTS mtmv_partition_lineage_perf_fact""" + sql """DROP TABLE IF EXISTS mtmv_partition_lineage_perf_header""" + sql """DROP TABLE IF EXISTS mtmv_partition_lineage_perf_detail""" + sql """DROP TABLE IF EXISTS mtmv_partition_lineage_perf_attr""" + + sql """ + CREATE TABLE mtmv_partition_lineage_perf_fact ( + profile_id INT NOT NULL, + event_time DATETIME NOT NULL, + doc_id BIGINT NOT NULL, + line_id BIGINT NOT NULL, + dim_id INT NOT NULL, + entity_id INT NOT NULL, + attr_id INT NOT NULL, + doc_type INT NOT NULL, + status INT NOT NULL, + measure_a DECIMAL(18, 2) NOT NULL, + value_a DECIMAL(18, 2) NOT NULL, + value_b DECIMAL(18, 2) NOT NULL, + value_c DECIMAL(18, 2) NOT NULL, + value_d DECIMAL(18, 2) NOT NULL, + value_e DECIMAL(18, 2) NOT NULL + ) + DUPLICATE KEY(profile_id, event_time, doc_id, line_id) + AUTO PARTITION BY RANGE (date_trunc(event_time, 'month')) () + DISTRIBUTED BY HASH(doc_id) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + CREATE TABLE mtmv_partition_lineage_perf_header ( + profile_id INT NOT NULL, + event_time DATETIME NOT NULL, + doc_id BIGINT NOT NULL, + doc_type INT NOT NULL, + status INT NOT NULL, + flag_value INT NOT NULL, + source_id INT NOT NULL + ) + DUPLICATE KEY(profile_id, event_time, doc_id) + AUTO PARTITION BY RANGE (date_trunc(event_time, 'month')) () + DISTRIBUTED BY HASH(doc_id) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + CREATE TABLE mtmv_partition_lineage_perf_detail ( + profile_id INT NOT NULL, + event_time DATETIME NOT NULL, + doc_id BIGINT NOT NULL, + line_id BIGINT NOT NULL, + measure_b DECIMAL(18, 2) NOT NULL, + value_f DECIMAL(18, 2) NOT NULL, + factor_a DECIMAL(18, 2) NOT NULL, + factor_b DECIMAL(18, 2) NOT NULL + ) + DUPLICATE KEY(profile_id, event_time, doc_id, line_id) + AUTO PARTITION BY RANGE (date_trunc(event_time, 'month')) () + DISTRIBUTED BY HASH(doc_id) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + CREATE TABLE mtmv_partition_lineage_perf_attr ( + profile_id INT NOT NULL, + event_time DATETIME NOT NULL, + attr_id INT NOT NULL, + category_id INT NOT NULL, + active_flag INT NOT NULL + ) + DUPLICATE KEY(profile_id, event_time, attr_id) + AUTO PARTITION BY RANGE (date_trunc(event_time, 'month')) () + DISTRIBUTED BY HASH(attr_id) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + INSERT INTO mtmv_partition_lineage_perf_fact VALUES + (1, '2024-01-10 00:00:00', 1001, 1, 10, 20, 30, 601, 20, 1.00, 100.00, 6.00, 2.00, 3.00, 4.00) + """ + sql """ + INSERT INTO mtmv_partition_lineage_perf_header VALUES + (1, '2024-01-10 00:00:00', 1001, 601, 20, 0, 1) + """ + sql """ + INSERT INTO mtmv_partition_lineage_perf_detail VALUES + (1, '2024-01-10 00:00:00', 1001, 1, 1.00, 60.00, 2.00, 3.00) + """ + sql """ + INSERT INTO mtmv_partition_lineage_perf_attr VALUES + (1, '2024-01-10 00:00:00', 30, 4, 1) + """ + + long createTimeoutMs = 60000 + long createStartMs = System.currentTimeMillis() + sql """ + CREATE MATERIALIZED VIEW mtmv_partition_lineage_perf_mv + BUILD DEFERRED REFRESH AUTO ON MANUAL + PARTITION BY (date_trunc(event_time, 'month')) + DISTRIBUTED BY RANDOM BUCKETS 1 + PROPERTIES ("replication_num" = "1") + AS + SELECT + profile_id, + event_time, + dim_id, + entity_id, + SUM(m01) AS m01, + SUM(m02) AS m02, + SUM(m03) AS m03, + SUM(m04) AS m04, + SUM(m05) AS m05, + SUM(m06) AS m06, + SUM(m07) AS m07, + SUM(m08) AS m08, + SUM(m09) AS m09, + SUM(m10) AS m10, + SUM(m11) AS m11, + SUM(m12) AS m12, + SUM(m13) AS m13, + SUM(m14) AS m14, + SUM(m15) AS m15, + SUM(m16) AS m16, + SUM(m17) AS m17, + SUM(m18) AS m18 + FROM ( + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (601, 701) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (602, 702) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (603, 703) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (604, 704) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (605, 705) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (606, 706) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (607, 707) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (608, 708) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (609, 709) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (610, 710) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (611, 711) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (612, 712) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + ) union_src + GROUP BY profile_id, event_time, dim_id, entity_id + """ + + long createElapsedMs = System.currentTimeMillis() - createStartMs + logger.info("partition lineage MTMV create elapsed: ${createElapsedMs} ms") + assertTrue(createElapsedMs < createTimeoutMs, + "partition lineage MTMV create took ${createElapsedMs} ms, expected less than ${createTimeoutMs} ms") + + def mvPartitions = sql """SHOW PARTITIONS FROM mtmv_partition_lineage_perf_mv""" + logger.info("mtmv_partition_lineage_perf_mv partitions: " + mvPartitions.toString()) + assertTrue(mvPartitions.toString().contains("p_20240101000000_20240201000000") + || mvPartitions.toString().contains("p_20240101_20240201")) +}