From 838ebc2f092a73702f1bea0d4781fdf2ef6689a6 Mon Sep 17 00:00:00 2001 From: seawinde Date: Fri, 29 May 2026 17:14:48 +0800 Subject: [PATCH] [improvement](fe) Optimize MTMV partition lineage check ### What problem does this PR solve? Issue Number: N/A Related PR: N/A Problem Summary: Complex partitioned async MTMV creation can spend excessive FE CPU in partition lineage analysis. The hot path repeatedly shuttles partition and checked expressions through the full plan lineage replacer, so wide UNION ALL, join, and aggregate plans multiply the same plan walks during CREATE MATERIALIZED VIEW analysis. Root cause: In PartitionIncrementMaintainer.PartitionIncrementChecker.checkPartition(), each partition candidate and checked expression calls ExpressionUtils.shuttleExpressionWithLineage() separately. Each call traverses the plan through ExpressionLineageReplacer and rebuilds equivalent normalized expressions. Change Summary: | File | Change Description | |------|-------------------| | PartitionIncrementMaintainer.java | Batch lineage shuttle calls, cache lineage-visible named expressions by plan identity, cache normalized expressions, and reuse the normalization rewrite context during one partition increment check. | | PartitionColumnTraceTest.java | Add a CTE plus UNION ALL plus wide aggregate lineage test to keep partition lineage behavior covered. | | test_mtmv_partition_lineage_performance.groovy | Add a desensitized static SQL performance regression case for the complex partitioned MTMV shape. | Design Rationale: The change keeps the existing ExpressionLineageReplacer semantics and limits caching to a single PartitionIncrementCheckContext. This avoids sharing mutable analysis state across optimizer contexts while removing repeated full plan walks for the same plan and expression set. ### Release note Improve performance when creating complex partitioned async materialized views. ### Check List (For Author) - Test: Unit Test / Manual test - Unit Test: ./run-fe-ut.sh --run org.apache.doris.nereids.rules.exploration.mv.PartitionColumnTraceTest - Manual test: git diff --check - Manual test: Tried ./run-regression-test.sh --run -d performance_p0 -s test_mtmv_partition_lineage_performance, but the local Doris FE was not running on 127.0.0.1:9030, so the regression could not execute SQL. - Behavior changed: No - Does this need documentation: No --- .../mv/PartitionIncrementMaintainer.java | 139 +++- .../mv/PartitionColumnTraceTest.java | 60 ++ ..._mtmv_partition_lineage_performance.groovy | 666 ++++++++++++++++++ 3 files changed, 838 insertions(+), 27 deletions(-) create mode 100644 regression-test/suites/performance_p0/test_mtmv_partition_lineage_performance.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionIncrementMaintainer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionIncrementMaintainer.java index 4b29f95aeef8ae..38d59167015c09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionIncrementMaintainer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionIncrementMaintainer.java @@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; +import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.SetOperation; @@ -58,6 +59,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; +import org.apache.doris.nereids.trees.plans.visitor.ExpressionLineageReplacer; import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.util.ExpressionUtils; @@ -69,6 +71,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -198,8 +201,7 @@ public Void visitLogicalCTEConsumer(LogicalCTEConsumer cteConsumer, PartitionInc Set> shuttledEqualSlotSet = context.getShuttledEqualSlotSet(); for (Set equalSlotSet : shuttledEqualSlotSet) { if (equalSlotSet.contains(consumerSlot)) { - Expression shuttledSlot = ExpressionUtils.shuttleExpressionWithLineage( - producerSlot, producerPlan); + Expression shuttledSlot = context.shuttleExpressionWithLineage(producerSlot, producerPlan); if (shuttledSlot instanceof Slot) { equalSlotSet.add((Slot) shuttledSlot); } @@ -239,7 +241,7 @@ public Void visitLogicalJoin(LogicalJoin join, continue; } Pair, Set> partitionEqualSlotPair = - calEqualSet((SlotReference) partitionSlotToCheck, join); + calEqualSet((SlotReference) partitionSlotToCheck, join, context); if (!partitionEqualSlotPair.value().isEmpty()) { context.getShuttledEqualSlotSet().add(partitionEqualSlotPair.value()); } @@ -526,31 +528,24 @@ private Set getPartitionColumnsToCheck(PartitionIncrementCheckCon */ private static boolean checkPartition(Collection expressionsToCheck, Plan plan, PartitionIncrementCheckContext context) { - Set> partitionAndExprEntrySet - = new HashSet<>(context.getPartitionAndRefExpressionMap().entrySet()); + List> partitionAndExprEntryList + = new ArrayList<>(context.getPartitionAndRefExpressionMap().entrySet()); + List partitionExpressions = new ArrayList<>(partitionAndExprEntryList.size()); + for (Entry entry : partitionAndExprEntryList) { + partitionExpressions.add(entry.getValue().getPartitionExpression().orElse(entry.getKey())); + } + List partitionExpressionActualList = + context.shuttleAndNormalizeExpressionWithLineage(partitionExpressions, context.getOriginalPlan()); + List expressionsShuttledToCheck = + context.shuttleAndNormalizeExpressionWithLineage(expressionsToCheck, context.getOriginalPlan()); boolean checked = false; - for (Map.Entry partitionExpressionEntry - : partitionAndExprEntrySet) { - NamedExpression partitionNamedExpression = partitionExpressionEntry.getKey(); + for (int i = 0; i < partitionAndExprEntryList.size(); i++) { + Map.Entry partitionExpressionEntry = + partitionAndExprEntryList.get(i); RelatedTableColumnInfo partitionTableColumnInfo = partitionExpressionEntry.getValue(); - Optional partitionExpressionOpt = partitionTableColumnInfo.getPartitionExpression(); - Expression partitionExpressionActual = partitionExpressionOpt - .map(expr -> ExpressionUtils.shuttleExpressionWithLineage(expr, - context.getOriginalPlan())) - .orElseGet(() -> ExpressionUtils.shuttleExpressionWithLineage(partitionNamedExpression, - context.getOriginalPlan())); - // merge date_trunc - partitionExpressionActual = new ExpressionNormalization().rewrite(partitionExpressionActual, - new ExpressionRewriteContext(context.getCascadesContext())); + Expression partitionExpressionActual = partitionExpressionActualList.get(i); OUTER_CHECK: - for (Expression projectSlotToCheck : expressionsToCheck) { - Expression expressionShuttledToCheck = - ExpressionUtils.shuttleExpressionWithLineage(projectSlotToCheck, - context.getOriginalPlan()); - // merge date_trunc - expressionShuttledToCheck = new ExpressionNormalization().rewrite(expressionShuttledToCheck, - new ExpressionRewriteContext(context.getCascadesContext())); - + for (Expression expressionShuttledToCheck : expressionsShuttledToCheck) { Set expressionToCheckSlots = expressionShuttledToCheck.collectToSet(SlotReference.class::isInstance); Set partitionColumnSlots = @@ -683,8 +678,19 @@ public static final class PartitionIncrementCheckContext { private final Set> shuttledEqualSlotSet = new HashSet<>(); private final Map producerCteIdToPlanMap; private final Plan originalPlan; + // Cache lineage-visible named expressions per plan identity to avoid repeated full plan walks. + private final Map> planLineageExpressionIndexes = new IdentityHashMap<>(); + // Cache normalized expressions within this check context; normalization uses the same CascadesContext. + private final Map normalizedExpressionMap = new IdentityHashMap<>(); + // Reuse the normalization rewriter during one partition lineage check. + private final ExpressionNormalization expressionNormalization = new ExpressionNormalization(); + // Reuse the rewrite context because all normalization in this checker shares the same CascadesContext. + private final ExpressionRewriteContext expressionRewriteContext; private boolean failFast = false; + /** + * Construct partition increment check context. + */ public PartitionIncrementCheckContext(NamedExpression mvPartitionColumn, Expression mvPartitionExpression, Map producerCteIdToPlanMap, Plan originalPlan, @@ -694,6 +700,7 @@ public PartitionIncrementCheckContext(NamedExpression mvPartitionColumn, this.cascadesContext = cascadesContext; this.producerCteIdToPlanMap = producerCteIdToPlanMap; this.originalPlan = originalPlan; + this.expressionRewriteContext = new ExpressionRewriteContext(cascadesContext); } public Set getFailReasons() { @@ -743,6 +750,64 @@ public Plan getOriginalPlan() { return originalPlan; } + private Expression shuttleExpressionWithLineage(Expression expression, Plan plan) { + return shuttleExpressionWithLineage(ImmutableList.of(expression), plan).get(0); + } + + private List shuttleExpressionWithLineage(List expressions, + Plan plan) { + if (expressions.isEmpty()) { + return ImmutableList.of(); + } + ExpressionLineageReplacer.ExpressionReplaceContext replaceContext = + new ExpressionLineageReplacer.ExpressionReplaceContext(expressions); + for (NamedExpression namedExpression : getLineageExpressionIndex(plan)) { + if (!replaceContext.getUsedExprIdSet().contains(namedExpression.getExprId())) { + continue; + } + namedExpression.accept(ExpressionLineageReplacer.NamedExpressionCollector.INSTANCE, replaceContext); + } + List replacedExpressions = replaceContext.getReplacedExpressions(); + if (replacedExpressions == null || expressions.size() != replacedExpressions.size()) { + return ExpressionUtils.shuttleExpressionWithLineage(expressions, plan); + } + return replacedExpressions; + } + + private List shuttleAndNormalizeExpressionWithLineage( + Collection expressions, Plan plan) { + if (expressions.isEmpty()) { + return ImmutableList.of(); + } + List shuttledExpressions = + shuttleExpressionWithLineage(ImmutableList.copyOf(expressions), plan); + List normalizedExpressions = new ArrayList<>(shuttledExpressions.size()); + for (Expression expression : shuttledExpressions) { + normalizedExpressions.add(normalizeExpression(expression)); + } + return normalizedExpressions; + } + + private Expression normalizeExpression(Expression expression) { + Expression normalizedExpression = normalizedExpressionMap.get(expression); + if (normalizedExpression == null) { + normalizedExpression = expressionNormalization.rewrite(expression, expressionRewriteContext); + normalizedExpressionMap.put(expression, normalizedExpression); + } + return normalizedExpression; + } + + private List getLineageExpressionIndex(Plan plan) { + List lineageExpressionIndex = planLineageExpressionIndexes.get(plan); + if (lineageExpressionIndex == null) { + List collectedIndex = new ArrayList<>(); + plan.accept(LineageExpressionCollector.INSTANCE, collectedIndex); + lineageExpressionIndex = ImmutableList.copyOf(collectedIndex); + planLineageExpressionIndexes.put(plan, lineageExpressionIndex); + } + return lineageExpressionIndex; + } + /** * collect invalid table set to check self join */ @@ -772,6 +837,25 @@ public Void visitLogicalCatalogRelation(LogicalCatalogRelation relation, } } + private static final class LineageExpressionCollector extends DefaultPlanVisitor> { + private static final LineageExpressionCollector INSTANCE = new LineageExpressionCollector(); + + @Override + public Void visitGroupPlan(GroupPlan groupPlan, List lineageExpressionIndex) { + return null; + } + + @Override + public Void visit(Plan plan, List lineageExpressionIndex) { + for (Expression expression : plan.getExpressions()) { + if (expression instanceof NamedExpression) { + lineageExpressionIndex.add((NamedExpression) expression); + } + } + return super.visit(plan, lineageExpressionIndex); + } + } + /** * Add partitionEqualSlot to partitionAndRefExpressionToCheck if partitionExpression use the partitionSlot */ @@ -816,7 +900,8 @@ public Expression visitNamedExpression(NamedExpression namedExpression, Void con * the value equal set contain the slot itself */ private static Pair, Set> calEqualSet(Slot slot, - LogicalJoin join) { + LogicalJoin join, + PartitionIncrementCheckContext context) { Set partitionEqualSlotSet = new HashSet<>(); JoinType joinType = join.getJoinType(); if (joinType.isInnerJoin() || joinType.isSemiJoin()) { @@ -829,7 +914,7 @@ private static Pair, Set> calEqualSet(Slot slot, } List extendedPartitionEqualSlotSet = new ArrayList<>(partitionEqualSlotSet); extendedPartitionEqualSlotSet.add(slot); - List shuttledEqualExpressions = ExpressionUtils.shuttleExpressionWithLineage( + List shuttledEqualExpressions = context.shuttleExpressionWithLineage( extendedPartitionEqualSlotSet, join); for (Expression shuttledEqualExpression : shuttledEqualExpressions) { Set objects = shuttledEqualExpression.collectToSet(expr -> expr instanceof SlotReference); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java index 33bbb6122ce0bb..a4b61b80988528 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionColumnTraceTest.java @@ -1156,6 +1156,66 @@ public void test40() { }); } + // CTE + union all + wide aggregate should keep partition lineage inside each plan boundary. + @Test + public void testCteUnionAllWideAggregatePartitionLineage() { + PlanChecker.from(connectContext) + .checkExplain("with union_src as (\n" + + " select\n" + + " L_SHIPDATE as part_date,\n" + + " L_ORDERKEY as order_key,\n" + + " L_QUANTITY as metric1,\n" + + " L_EXTENDEDPRICE as metric2,\n" + + " L_DISCOUNT as metric3,\n" + + " L_TAX as metric4,\n" + + " L_RETURNFLAG as flag\n" + + " from lineitem\n" + + " union all\n" + + " select\n" + + " O_ORDERDATE as part_date,\n" + + " O_ORDERKEY as order_key,\n" + + " O_TOTALPRICE as metric1,\n" + + " O_TOTALPRICE as metric2,\n" + + " O_TOTALPRICE as metric3,\n" + + " O_TOTALPRICE as metric4,\n" + + " O_ORDERSTATUS as flag\n" + + " from orders\n" + + "), wide_project as (\n" + + " select\n" + + " date_trunc(part_date, 'day') as part_day,\n" + + " part_date,\n" + + " order_key,\n" + + " metric1,\n" + + " metric2,\n" + + " metric3,\n" + + " metric4,\n" + + " flag\n" + + " from union_src\n" + + ")\n" + + "select\n" + + " part_day,\n" + + " flag,\n" + + " count(*) as cnt,\n" + + " sum(metric1) as sum_metric1,\n" + + " sum(metric2) as sum_metric2,\n" + + " sum(metric3) as sum_metric3,\n" + + " sum(metric4) as sum_metric4,\n" + + " max(order_key) as max_key,\n" + + " min(order_key) as min_key\n" + + "from wide_project\n" + + "group by part_day, flag", + nereidsPlanner -> { + Plan rewrittenPlan = nereidsPlanner.getRewrittenPlan(); + RelatedTableInfo relatedTableInfo = + MaterializedViewUtils.getRelatedTableInfos("part_day", null, + rewrittenPlan, nereidsPlanner.getCascadesContext()); + successWith(relatedTableInfo, ImmutableSet.of( + ImmutableList.of("lineitem", "l_shipdate", "true", "true"), + ImmutableList.of("orders", "o_orderdate", "true", "true")), + "day"); + }); + } + // test with union but not union all @Test diff --git a/regression-test/suites/performance_p0/test_mtmv_partition_lineage_performance.groovy b/regression-test/suites/performance_p0/test_mtmv_partition_lineage_performance.groovy new file mode 100644 index 00000000000000..d16cc7c35a99b2 --- /dev/null +++ b/regression-test/suites/performance_p0/test_mtmv_partition_lineage_performance.groovy @@ -0,0 +1,666 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mtmv_partition_lineage_performance", "mtmv") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=false" + + sql """DROP MATERIALIZED VIEW IF EXISTS mtmv_partition_lineage_perf_mv""" + sql """DROP TABLE IF EXISTS mtmv_partition_lineage_perf_fact""" + sql """DROP TABLE IF EXISTS mtmv_partition_lineage_perf_header""" + sql """DROP TABLE IF EXISTS mtmv_partition_lineage_perf_detail""" + sql """DROP TABLE IF EXISTS mtmv_partition_lineage_perf_attr""" + + sql """ + CREATE TABLE mtmv_partition_lineage_perf_fact ( + profile_id INT NOT NULL, + event_time DATETIME NOT NULL, + doc_id BIGINT NOT NULL, + line_id BIGINT NOT NULL, + dim_id INT NOT NULL, + entity_id INT NOT NULL, + attr_id INT NOT NULL, + doc_type INT NOT NULL, + status INT NOT NULL, + measure_a DECIMAL(18, 2) NOT NULL, + value_a DECIMAL(18, 2) NOT NULL, + value_b DECIMAL(18, 2) NOT NULL, + value_c DECIMAL(18, 2) NOT NULL, + value_d DECIMAL(18, 2) NOT NULL, + value_e DECIMAL(18, 2) NOT NULL + ) + DUPLICATE KEY(profile_id, event_time, doc_id, line_id) + AUTO PARTITION BY RANGE (date_trunc(event_time, 'month')) () + DISTRIBUTED BY HASH(doc_id) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + CREATE TABLE mtmv_partition_lineage_perf_header ( + profile_id INT NOT NULL, + event_time DATETIME NOT NULL, + doc_id BIGINT NOT NULL, + doc_type INT NOT NULL, + status INT NOT NULL, + flag_value INT NOT NULL, + source_id INT NOT NULL + ) + DUPLICATE KEY(profile_id, event_time, doc_id) + AUTO PARTITION BY RANGE (date_trunc(event_time, 'month')) () + DISTRIBUTED BY HASH(doc_id) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + CREATE TABLE mtmv_partition_lineage_perf_detail ( + profile_id INT NOT NULL, + event_time DATETIME NOT NULL, + doc_id BIGINT NOT NULL, + line_id BIGINT NOT NULL, + measure_b DECIMAL(18, 2) NOT NULL, + value_f DECIMAL(18, 2) NOT NULL, + factor_a DECIMAL(18, 2) NOT NULL, + factor_b DECIMAL(18, 2) NOT NULL + ) + DUPLICATE KEY(profile_id, event_time, doc_id, line_id) + AUTO PARTITION BY RANGE (date_trunc(event_time, 'month')) () + DISTRIBUTED BY HASH(doc_id) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + CREATE TABLE mtmv_partition_lineage_perf_attr ( + profile_id INT NOT NULL, + event_time DATETIME NOT NULL, + attr_id INT NOT NULL, + category_id INT NOT NULL, + active_flag INT NOT NULL + ) + DUPLICATE KEY(profile_id, event_time, attr_id) + AUTO PARTITION BY RANGE (date_trunc(event_time, 'month')) () + DISTRIBUTED BY HASH(attr_id) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + + sql """ + INSERT INTO mtmv_partition_lineage_perf_fact VALUES + (1, '2024-01-10 00:00:00', 1001, 1, 10, 20, 30, 601, 20, 1.00, 100.00, 6.00, 2.00, 3.00, 4.00) + """ + sql """ + INSERT INTO mtmv_partition_lineage_perf_header VALUES + (1, '2024-01-10 00:00:00', 1001, 601, 20, 0, 1) + """ + sql """ + INSERT INTO mtmv_partition_lineage_perf_detail VALUES + (1, '2024-01-10 00:00:00', 1001, 1, 1.00, 60.00, 2.00, 3.00) + """ + sql """ + INSERT INTO mtmv_partition_lineage_perf_attr VALUES + (1, '2024-01-10 00:00:00', 30, 4, 1) + """ + + long createTimeoutMs = 60000 + long createStartMs = System.currentTimeMillis() + sql """ + CREATE MATERIALIZED VIEW mtmv_partition_lineage_perf_mv + BUILD DEFERRED REFRESH AUTO ON MANUAL + PARTITION BY (date_trunc(event_time, 'month')) + DISTRIBUTED BY RANDOM BUCKETS 1 + PROPERTIES ("replication_num" = "1") + AS + SELECT + profile_id, + event_time, + dim_id, + entity_id, + SUM(m01) AS m01, + SUM(m02) AS m02, + SUM(m03) AS m03, + SUM(m04) AS m04, + SUM(m05) AS m05, + SUM(m06) AS m06, + SUM(m07) AS m07, + SUM(m08) AS m08, + SUM(m09) AS m09, + SUM(m10) AS m10, + SUM(m11) AS m11, + SUM(m12) AS m12, + SUM(m13) AS m13, + SUM(m14) AS m14, + SUM(m15) AS m15, + SUM(m16) AS m16, + SUM(m17) AS m17, + SUM(m18) AS m18 + FROM ( + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 601 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (601, 701) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 602 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (602, 702) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 603 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (603, 703) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 604 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (604, 704) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 605 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (605, 705) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 606 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (606, 706) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 607 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (607, 707) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 608 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (608, 708) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 609 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (609, 709) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 610 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (610, 710) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 611 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (611, 711) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + UNION ALL + SELECT + f.profile_id, + f.event_time, + f.dim_id, + f.entity_id, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m01, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m02, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m03, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m04, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m05, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m06, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m07, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m08, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m09, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m10, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m11, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m12, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.measure_a ELSE 0 END) AS m13, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_a ELSE 0 END) AS m14, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_b ELSE 0 END) AS m15, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN d.value_f ELSE 0 END) AS m16, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_d ELSE 0 END) AS m17, + SUM(CASE WHEN h.doc_type = 612 AND a.active_flag = 1 THEN f.value_e ELSE 0 END) AS m18 + FROM mtmv_partition_lineage_perf_fact f + INNER JOIN mtmv_partition_lineage_perf_header h + ON f.profile_id = h.profile_id + AND f.event_time = h.event_time + AND f.doc_id = h.doc_id + INNER JOIN mtmv_partition_lineage_perf_detail d + ON f.profile_id = d.profile_id + AND f.event_time = d.event_time + AND f.doc_id = d.doc_id + AND f.line_id = d.line_id + INNER JOIN mtmv_partition_lineage_perf_attr a + ON f.profile_id = a.profile_id + AND f.event_time = a.event_time + AND f.attr_id = a.attr_id + WHERE h.status >= 20 + AND h.doc_type IN (612, 712) + AND h.flag_value = 0 + GROUP BY f.profile_id, f.event_time, f.dim_id, f.entity_id + ) union_src + GROUP BY profile_id, event_time, dim_id, entity_id + """ + + long createElapsedMs = System.currentTimeMillis() - createStartMs + logger.info("partition lineage MTMV create elapsed: ${createElapsedMs} ms") + assertTrue(createElapsedMs < createTimeoutMs, + "partition lineage MTMV create took ${createElapsedMs} ms, expected less than ${createTimeoutMs} ms") + + def mvPartitions = sql """SHOW PARTITIONS FROM mtmv_partition_lineage_perf_mv""" + logger.info("mtmv_partition_lineage_perf_mv partitions: " + mvPartitions.toString()) + assertTrue(mvPartitions.toString().contains("p_20240101000000_20240201000000") + || mvPartitions.toString().contains("p_20240101_20240201")) +}