|
| 1 | +SET SEARCH_PATH TO {SCHEMA_NAME}; |
| 2 | + |
| 3 | +CREATE OR REPLACE VIEW v_dq_profile_scoring_history_by_column |
| 4 | +AS |
| 5 | +SELECT tg.project_code, |
| 6 | + sr.definition_id, |
| 7 | + sr.score_history_cutoff_time, |
| 8 | + pr.table_groups_id, |
| 9 | + pr.profile_run_id, |
| 10 | + tg.table_groups_name, |
| 11 | + tg.data_location, |
| 12 | + COALESCE(dcc.data_source, dtc.data_source, tg.data_source) as data_source, |
| 13 | + COALESCE(dcc.source_system, dtc.source_system, tg.source_system) as source_system, |
| 14 | + COALESCE(dcc.source_process, dtc.source_process, tg.source_process) as source_process, |
| 15 | + COALESCE(dcc.business_domain, dtc.business_domain, tg.business_domain) as business_domain, |
| 16 | + COALESCE(dcc.stakeholder_group, dtc.stakeholder_group, tg.stakeholder_group) as stakeholder_group, |
| 17 | + COALESCE(dcc.transform_level, dtc.transform_level, tg.transform_level) as transform_level, |
| 18 | + COALESCE(dcc.critical_data_element, dtc.critical_data_element) as critical_data_element, |
| 19 | + COALESCE(dcc.data_product, dtc.data_product, tg.data_product) as data_product, |
| 20 | + dcc.functional_data_type as semantic_data_type, |
| 21 | + pr.table_name, |
| 22 | + pr.column_name, |
| 23 | + pr.run_date, |
| 24 | + MAX(pr.record_ct) as record_ct, |
| 25 | + COUNT(p.anomaly_id) as issue_ct, |
| 26 | + SUM_LN(COALESCE(p.dq_prevalence, 0.0)) as good_data_pct |
| 27 | + FROM profile_results pr |
| 28 | +INNER JOIN score_history_latest_runs sr |
| 29 | + ON (pr.profile_run_id = sr.last_profiling_run_id) |
| 30 | +INNER JOIN data_column_chars dcc |
| 31 | + ON (pr.table_groups_id = dcc.table_groups_id |
| 32 | + AND pr.table_name = dcc.table_name |
| 33 | + AND pr.column_name = dcc.column_name) |
| 34 | +INNER JOIN data_table_chars dtc |
| 35 | + ON (dcc.table_id = dtc.table_id) |
| 36 | +INNER JOIN table_groups tg |
| 37 | + ON (pr.table_groups_id = tg.id) |
| 38 | +LEFT JOIN (profile_anomaly_results p |
| 39 | + INNER JOIN profile_anomaly_types t |
| 40 | + ON p.anomaly_id = t.id) |
| 41 | + ON (pr.profile_run_id = p.profile_run_id |
| 42 | + AND pr.column_name = p.column_name |
| 43 | + AND pr.table_name = p.table_name) |
| 44 | +WHERE p.disposition = 'Confirmed' OR p.disposition IS NULL |
| 45 | +GROUP BY pr.profile_run_id, |
| 46 | + sr.definition_id, |
| 47 | + sr.score_history_cutoff_time, |
| 48 | + pr.table_groups_id, |
| 49 | + pr.table_name, pr.column_name, |
| 50 | + tg.table_groups_name, tg.data_location, |
| 51 | + COALESCE(dcc.data_source, dtc.data_source, tg.data_source), |
| 52 | + COALESCE(dcc.source_system, dtc.source_system, tg.source_system), |
| 53 | + COALESCE(dcc.source_process, dtc.source_process, tg.source_process), |
| 54 | + COALESCE(dcc.business_domain, dtc.business_domain, tg.business_domain), |
| 55 | + COALESCE(dcc.stakeholder_group, dtc.stakeholder_group, tg.stakeholder_group), |
| 56 | + COALESCE(dcc.transform_level, dtc.transform_level, tg.transform_level), |
| 57 | + COALESCE(dcc.critical_data_element, dtc.critical_data_element), |
| 58 | + COALESCE(dcc.data_product, dtc.data_product, tg.data_product), |
| 59 | + dcc.functional_data_type, pr.run_date, |
| 60 | + tg.project_code ; |
| 61 | + |
| 62 | +CREATE OR REPLACE VIEW v_dq_test_scoring_history_by_column |
| 63 | +AS |
| 64 | +SELECT |
| 65 | + tg.project_code, |
| 66 | + sr.definition_id, |
| 67 | + sr.score_history_cutoff_time, |
| 68 | + r.table_groups_id, |
| 69 | + r.test_suite_id, |
| 70 | + r.test_run_id, |
| 71 | + tg.table_groups_name, |
| 72 | + tg.data_location, |
| 73 | + COALESCE(dcc.data_source, dtc.data_source, tg.data_source) as data_source, |
| 74 | + COALESCE(dcc.source_system, dtc.source_system, tg.source_system) as source_system, |
| 75 | + COALESCE(dcc.source_process, dtc.source_process, tg.source_process) as source_process, |
| 76 | + COALESCE(dcc.business_domain, dtc.business_domain, tg.business_domain) as business_domain, |
| 77 | + COALESCE(dcc.stakeholder_group, dtc.stakeholder_group, tg.stakeholder_group) as stakeholder_group, |
| 78 | + COALESCE(dcc.transform_level, dtc.transform_level, tg.transform_level) as transform_level, |
| 79 | + COALESCE(dcc.critical_data_element, dtc.critical_data_element) as critical_data_element, |
| 80 | + COALESCE(dcc.data_product, dtc.data_product, tg.data_product) as data_product, |
| 81 | + dcc.functional_data_type as semantic_data_type, |
| 82 | + r.test_time, r.table_name, r.column_names as column_name, |
| 83 | + COUNT(*) as test_ct, |
| 84 | + SUM(r.result_code) as passed_ct, |
| 85 | + SUM(1 - r.result_code) as issue_ct, |
| 86 | + MAX(r.dq_record_ct) as dq_record_ct, |
| 87 | + SUM_LN(COALESCE(r.dq_prevalence, 0.0)) as good_data_pct |
| 88 | + FROM test_results r |
| 89 | +INNER JOIN test_suites s |
| 90 | + ON (r.test_suite_id = s.id) |
| 91 | +INNER JOIN score_history_latest_runs sr |
| 92 | + ON (r.test_run_id = sr.last_test_run_id) |
| 93 | +INNER JOIN table_groups tg |
| 94 | + ON r.table_groups_id = tg.id |
| 95 | +LEFT JOIN data_table_chars dtc |
| 96 | + ON (r.table_groups_id = dtc.table_groups_id |
| 97 | + AND r.table_name = dtc.table_name) |
| 98 | +LEFT JOIN data_column_chars dcc |
| 99 | + ON (r.table_groups_id = dcc.table_groups_id |
| 100 | + AND r.table_name = dcc.table_name |
| 101 | + AND r.column_names = dcc.column_name) |
| 102 | + WHERE r.dq_prevalence IS NOT NULL |
| 103 | + AND s.dq_score_exclude = FALSE |
| 104 | + AND (r.disposition IS NULL OR r.disposition = 'Confirmed') |
| 105 | +GROUP BY sr.definition_id, |
| 106 | + sr.score_history_cutoff_time, |
| 107 | + r.table_groups_id, r.table_name, r.column_names, |
| 108 | + r.test_suite_id, r.test_run_id, tg.table_groups_name, dcc.data_source, dtc.data_source, |
| 109 | + tg.data_source, tg.data_location, dcc.data_source, dtc.data_source, |
| 110 | + tg.data_source, dcc.source_system, dtc.source_system, tg.source_system, |
| 111 | + dcc.source_process, dtc.source_process, tg.source_process, dcc.business_domain, |
| 112 | + dtc.business_domain, tg.business_domain, dcc.stakeholder_group, dtc.stakeholder_group, |
| 113 | + tg.stakeholder_group, dcc.transform_level, dtc.transform_level, tg.transform_level, |
| 114 | + dcc.critical_data_element, dtc.critical_data_element, |
| 115 | + dcc.data_product, dtc.data_product, tg.data_product, |
| 116 | + dcc.functional_data_type, r.test_time, |
| 117 | + tg.project_code; |
| 118 | + |
| 119 | +DO $$ |
| 120 | +DECLARE |
| 121 | + current_project VARCHAR(30); |
| 122 | + current_definition UUID; |
| 123 | + current_definition_filter RECORD; |
| 124 | + where_condition TEXT; |
| 125 | + existing_history_entries TIMESTAMP[]; |
| 126 | + history_entry RECORD; |
| 127 | +BEGIN |
| 128 | + FOR current_project IN SELECT project_code FROM projects LOOP |
| 129 | + FOR current_definition IN SELECT id FROM score_definitions WHERE project_code = current_project LOOP |
| 130 | + |
| 131 | + -- Build the where condition for the scores queries |
| 132 | + where_condition := format('WHERE definition_id = %L AND project_code = %L', current_definition, current_project); |
| 133 | + FOR current_definition_filter IN SELECT field, string_agg(quote_literal(value), ', ') AS values FROM score_definition_filters WHERE definition_id = current_definition GROUP BY field LOOP |
| 134 | + where_condition := where_condition || format(' AND %I IN (%s)', current_definition_filter.field, current_definition_filter.values); |
| 135 | + END LOOP; |
| 136 | + |
| 137 | + -- Get already existing history entries |
| 138 | + SELECT ARRAY_AGG(last_run_time) INTO existing_history_entries FROM score_definition_results_history WHERE definition_id = current_definition; |
| 139 | + |
| 140 | + FOR history_entry IN EXECUTE format(' |
| 141 | + SELECT DISTINCT ON (last_run_time) |
| 142 | + COALESCE(profiling_scores.project_code, test_scores.project_code) AS project_code, |
| 143 | + COALESCE(profiling_scores.definition_id, test_scores.definition_id) AS definition_id, |
| 144 | + COALESCE(profiling_scores.last_run_time, test_scores.last_run_time) AS last_run_time, |
| 145 | + (COALESCE(profiling_scores.score, 1) * COALESCE(test_scores.score, 1)) AS score, |
| 146 | + (COALESCE(profiling_scores.cde_score, 1) * COALESCE(test_scores.cde_score, 1)) AS cde_score |
| 147 | + FROM ( |
| 148 | + SELECT |
| 149 | + project_code, |
| 150 | + definition_id, |
| 151 | + score_history_cutoff_time AS last_run_time, |
| 152 | + SUM(good_data_pct * record_ct) / NULLIF(SUM(record_ct), 0) AS score, |
| 153 | + SUM(CASE critical_data_element WHEN true THEN (good_data_pct * record_ct) ELSE 0 END) |
| 154 | + / NULLIF(SUM(CASE critical_data_element WHEN true THEN record_ct ELSE 0 END), 0) AS cde_score |
| 155 | + FROM v_dq_profile_scoring_history_by_column |
| 156 | + %s |
| 157 | + GROUP BY project_code, definition_id, score_history_cutoff_time |
| 158 | + ) AS profiling_scores |
| 159 | + FULL OUTER JOIN ( |
| 160 | + SELECT |
| 161 | + project_code, |
| 162 | + definition_id, |
| 163 | + score_history_cutoff_time AS last_run_time, |
| 164 | + SUM(good_data_pct * dq_record_ct) / NULLIF(SUM(dq_record_ct), 0) AS score, |
| 165 | + SUM(CASE critical_data_element WHEN true THEN (good_data_pct * dq_record_ct) ELSE 0 END) |
| 166 | + / NULLIF(SUM(CASE critical_data_element WHEN true THEN dq_record_ct ELSE 0 END), 0) AS cde_score |
| 167 | + FROM v_dq_test_scoring_history_by_column |
| 168 | + %s |
| 169 | + GROUP BY project_code, definition_id, score_history_cutoff_time |
| 170 | + ) AS test_scores |
| 171 | + ON ( |
| 172 | + test_scores.project_code = profiling_scores.project_code |
| 173 | + AND test_scores.definition_id = profiling_scores.definition_id |
| 174 | + AND test_scores.last_run_time = profiling_scores.last_run_time |
| 175 | + ) |
| 176 | + ', where_condition, where_condition) LOOP |
| 177 | + -- If a history entry with this `last_run_time` does not exist |
| 178 | + CONTINUE WHEN history_entry.last_run_time = ANY(existing_history_entries); |
| 179 | + |
| 180 | + -- insert it for both score and cde score |
| 181 | + EXECUTE format(' |
| 182 | + INSERT INTO score_definition_results_history (definition_id, category, score, last_run_time) |
| 183 | + VALUES (%L, %L, %L, %L) |
| 184 | + ', history_entry.definition_id, 'score', history_entry.score, history_entry.last_run_time); |
| 185 | + EXECUTE format(' |
| 186 | + INSERT INTO score_definition_results_history (definition_id, category, score, last_run_time) |
| 187 | + VALUES (%L, %L, %L, %L) |
| 188 | + ', history_entry.definition_id, 'cde_score', history_entry.cde_score, history_entry.last_run_time); |
| 189 | + END LOOP; |
| 190 | + END LOOP; |
| 191 | + END LOOP; |
| 192 | +END $$; |
0 commit comments