|
| 1 | +"""PII masking utilities for redacting sensitive data in the UI.""" |
| 2 | +import pandas as pd |
| 3 | + |
| 4 | +from testgen.ui.services.database_service import fetch_all_from_db |
| 5 | + |
| 6 | +PII_REDACTED = "[PII Redacted]" |
| 7 | + |
| 8 | +PROFILING_PII_FIELDS = ( |
| 9 | + "top_freq_values", "min_text", "max_text", |
| 10 | + "min_value", "min_value_over_0", "max_value", |
| 11 | + "min_date", "max_date", |
| 12 | +) |
| 13 | + |
| 14 | + |
| 15 | +def get_pii_columns(table_group_id: str, schema: str | None = None, table_name: str | None = None) -> set[str]: |
| 16 | + """Look up PII-flagged column names from data_column_chars.""" |
| 17 | + |
| 18 | + query = f""" |
| 19 | + SELECT column_name |
| 20 | + FROM data_column_chars |
| 21 | + WHERE table_groups_id = :table_group_id |
| 22 | + AND pii_flag IS NOT NULL |
| 23 | + {"AND schema_name = :schema" if schema else ""} |
| 24 | + {"AND table_name = :table_name" if table_name else ""} |
| 25 | + """ |
| 26 | + params: dict = { |
| 27 | + "table_group_id": table_group_id, |
| 28 | + "schema": schema, |
| 29 | + "table_name": table_name, |
| 30 | + } |
| 31 | + |
| 32 | + results = fetch_all_from_db(query, params) |
| 33 | + return {row.column_name for row in results} |
| 34 | + |
| 35 | + |
| 36 | +def mask_source_data_pii(df: pd.DataFrame, pii_columns: set[str]) -> None: |
| 37 | + """In-place mask values in PII columns with PII_REDACTED.""" |
| 38 | + if df.empty or not pii_columns: |
| 39 | + return |
| 40 | + for col in pii_columns: |
| 41 | + # Match case-insensitively since column names may differ in case |
| 42 | + for df_col in df.columns: |
| 43 | + if df_col.lower() == col.lower(): |
| 44 | + df[df_col] = PII_REDACTED |
| 45 | + |
| 46 | + |
| 47 | +def mask_hygiene_detail(data: pd.DataFrame | list[dict], pii_columns: set[str] | None = None) -> None: |
| 48 | + """Redact hygiene issue detail for PII columns where detail_redactable is true. |
| 49 | +
|
| 50 | + Accepts: |
| 51 | + - DataFrame with detail_redactable, pii_flag, and detail columns (hygiene issues grid/export) |
| 52 | + - List of issue dicts, each with detail_redactable and either pii_flag or column_name |
| 53 | + (when pii_columns set is provided, matches column_name against it) |
| 54 | + """ |
| 55 | + if isinstance(data, pd.DataFrame): |
| 56 | + if data.empty or "detail_redactable" not in data.columns: |
| 57 | + return |
| 58 | + pii_mask = data["detail_redactable"].fillna(False) & data["pii_flag"].notna() |
| 59 | + data.loc[pii_mask, "detail"] = PII_REDACTED |
| 60 | + return |
| 61 | + |
| 62 | + if not data: |
| 63 | + return |
| 64 | + pii_lower = {c.lower() for c in pii_columns} if pii_columns else None |
| 65 | + for issue in data: |
| 66 | + if not issue.get("detail_redactable"): |
| 67 | + continue |
| 68 | + if pii_lower is not None: |
| 69 | + if issue.get("column_name", "").lower() in pii_lower: |
| 70 | + issue["detail"] = PII_REDACTED |
| 71 | + elif issue.get("pii_flag"): |
| 72 | + issue["detail"] = PII_REDACTED |
| 73 | + |
| 74 | + |
| 75 | +def mask_profiling_pii(data: pd.DataFrame | dict, pii_columns: set[str]) -> None: |
| 76 | + """Mask profiling fields for PII columns. Accepts a DataFrame or a single-row dict.""" |
| 77 | + if isinstance(data, dict): |
| 78 | + if not pii_columns: |
| 79 | + return |
| 80 | + column_name = data.get("column_name") |
| 81 | + if column_name and column_name.lower() not in {c.lower() for c in pii_columns}: |
| 82 | + return |
| 83 | + for field in PROFILING_PII_FIELDS: |
| 84 | + if field in data: |
| 85 | + data[field] = PII_REDACTED |
| 86 | + return |
| 87 | + |
| 88 | + if data.empty or not pii_columns: |
| 89 | + return |
| 90 | + pii_lower = {c.lower() for c in pii_columns} |
| 91 | + mask = data["column_name"].str.lower().isin(pii_lower) |
| 92 | + for field in PROFILING_PII_FIELDS: |
| 93 | + if field in data.columns: |
| 94 | + if data[field].dtype != object: |
| 95 | + data[field] = data[field].astype(object) |
| 96 | + data.loc[mask, field] = PII_REDACTED |
0 commit comments