-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathoutput_record_validation.py
More file actions
176 lines (146 loc) · 6.17 KB
/
output_record_validation.py
File metadata and controls
176 lines (146 loc) · 6.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from gigl.common.logger import Logger
from gigl.src.common.utils.bq import BqUtils
logger = Logger()
def _check_table_row_count(
bq_utils: BqUtils,
table_path: str,
expected_count: int,
label: str,
) -> str | None:
"""Checks that a BQ table exists and has the expected row count.
Note: uses BqUtils.does_bq_table_exist() which returns False for any
exception (not just NotFound). Permission errors and transient failures
will be reported as "table does not exist."
Args:
bq_utils: BqUtils instance for BigQuery operations.
table_path: Fully qualified BQ table path (project.dataset.table).
expected_count: Expected number of rows.
label: Human-readable label for error messages
(e.g. "[paper] Embeddings").
Returns:
An error message string if validation fails, or None if valid.
"""
if not bq_utils.does_bq_table_exist(table_path):
return f"{label} table does not exist: {table_path}"
actual = bq_utils.count_number_of_rows_in_bq_table(table_path)
logger.info(f"{label} table ({table_path}) has {actual} rows.")
if actual != expected_count:
return (
f"{label} row count mismatch: "
f"expected {expected_count}, got {actual} "
f"(table: {table_path})"
)
return None
def validate_bq_table_row_count(
bq_utils: BqUtils,
table_path: str,
expected_count: int,
label: str,
) -> None:
"""Validates that a BQ table exists and has the expected row count.
This is the atomic building block for single-table validation.
For batch validation across multiple node types, use
validate_node_output_records instead.
Args:
bq_utils: BqUtils instance for BigQuery operations.
table_path: Fully qualified BQ table path (project.dataset.table).
expected_count: Expected number of rows.
label: Human-readable label for error messages
(e.g. "[paper] Embeddings").
Raises:
ValueError: If the table does not exist or the row count does not
match.
"""
error = _check_table_row_count(
bq_utils=bq_utils,
table_path=table_path,
expected_count=expected_count,
label=label,
)
if error is not None:
raise ValueError(error)
def validate_node_output_records(
bq_utils: BqUtils,
expected_count_tables: dict[str, str],
embeddings_tables: dict[str, str] | None = None,
predictions_tables: dict[str, str] | None = None,
) -> None:
"""Validates output BQ tables have matching record counts for all node types.
For each node type in expected_count_tables:
1. Validates the expected-count (enumerated) table is configured and exists.
2. Validates the embeddings table (if provided) exists and has matching
row count.
3. Validates the predictions table (if provided) exists and has matching
row count.
4. Validates at least one of embeddings or predictions is provided.
All errors are collected and reported together before raising.
Args:
bq_utils: BqUtils instance for BigQuery operations.
expected_count_tables: Mapping of node_type label to source-of-truth
BQ table path. Values must be non-empty strings.
embeddings_tables: Mapping of node_type label to embeddings BQ table
path. All keys must also exist in expected_count_tables.
predictions_tables: Mapping of node_type label to predictions BQ table
path. All keys must also exist in expected_count_tables.
Raises:
ValueError: If any validation errors are found, with all errors listed.
Also raised immediately if embeddings_tables or predictions_tables
contain keys not present in expected_count_tables.
"""
embeddings_tables = embeddings_tables or {}
predictions_tables = predictions_tables or {}
unexpected_keys = (
embeddings_tables.keys() | predictions_tables.keys()
) - expected_count_tables.keys()
if unexpected_keys:
raise ValueError(
f"Output tables reference node types not in expected_count_tables: "
f"{sorted(unexpected_keys)}"
)
validation_errors: list[str] = []
for node_type, enumerated_table in expected_count_tables.items():
if not enumerated_table:
validation_errors.append(
f"[{node_type}] No enumerated_node_ids_bq_table configured."
)
continue
if not bq_utils.does_bq_table_exist(enumerated_table):
validation_errors.append(
f"[{node_type}] enumerated_node_ids_bq_table does not exist: {enumerated_table}"
)
continue
expected_count = bq_utils.count_number_of_rows_in_bq_table(enumerated_table)
logger.info(
f"[{node_type}] enumerated_node_ids_bq_table ({enumerated_table}) has {expected_count} rows."
)
has_embeddings = node_type in embeddings_tables
has_predictions = node_type in predictions_tables
if has_embeddings:
error = _check_table_row_count(
bq_utils=bq_utils,
table_path=embeddings_tables[node_type],
expected_count=expected_count,
label=f"[{node_type}] Embeddings",
)
if error is not None:
validation_errors.append(error)
if has_predictions:
error = _check_table_row_count(
bq_utils=bq_utils,
table_path=predictions_tables[node_type],
expected_count=expected_count,
label=f"[{node_type}] Predictions",
)
if error is not None:
validation_errors.append(error)
if not has_embeddings and not has_predictions:
validation_errors.append(
f"[{node_type}] Neither embeddings_path nor predictions_path is set."
)
if validation_errors:
error_summary = "\n".join(validation_errors)
raise ValueError(
f"Record count validation failed with {len(validation_errors)} error(s):\n"
f"{error_summary}"
)
logger.info("All record count validations passed.")