Skip to content

Commit 47ea799

Browse files
author
ci bot
committed
Merge branch 'tg-965-email-notifications' into 'enterprise'
feat(notifications): Implementing email notifications See merge request dkinternal/testgen/dataops-testgen!348
2 parents 20453cb + 98ec5ba commit 47ea799

47 files changed

Lines changed: 3389 additions & 151 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

testgen/__main__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
from testgen.common.models.profiling_run import ProfilingRun
4747
from testgen.common.models.test_run import TestRun
4848
from testgen.common.models.test_suite import TestSuite
49+
from testgen.common.notifications.profiling_run import send_profiling_run_notifications
50+
from testgen.common.notifications.test_run import send_test_run_notifications
4951
from testgen.scheduler import register_scheduler_job, run_scheduler
5052
from testgen.utils import plugins
5153

@@ -647,8 +649,10 @@ def run_ui():
647649
@with_database_session
648650
def cancel_all_running():
649651
try:
650-
ProfilingRun.cancel_all_running()
651-
TestRun.cancel_all_running()
652+
for profiling_run_id in ProfilingRun.cancel_all_running():
653+
send_profiling_run_notifications(ProfilingRun.get(profiling_run_id))
654+
for test_run_id in TestRun.cancel_all_running():
655+
send_test_run_notifications(TestRun.get(test_run_id))
652656
except Exception:
653657
LOG.warning("Failed to cancel 'Running' profiling/test runs")
654658

testgen/commands/run_profiling.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from testgen.common.models.profiling_run import ProfilingRun
2828
from testgen.common.models.table_group import TableGroup
2929
from testgen.common.models.test_suite import TestSuite
30+
from testgen.common.notifications.profiling_run import send_profiling_run_notifications
3031
from testgen.ui.session import session
3132
from testgen.utils import get_exception_message
3233

@@ -53,7 +54,7 @@ def run_profiling_in_background(table_group_id: str | UUID) -> None:
5354
def run_profiling(table_group_id: str | UUID, username: str | None = None, run_date: datetime | None = None) -> str:
5455
if table_group_id is None:
5556
raise ValueError("Table Group ID was not specified")
56-
57+
5758
LOG.info(f"Starting profiling run for table group {table_group_id}")
5859
time_delta = (run_date - datetime.now(UTC)) if run_date else timedelta()
5960

@@ -104,12 +105,16 @@ def run_profiling(table_group_id: str | UUID, username: str | None = None, run_d
104105
profiling_run.profiling_endtime = datetime.now(UTC) + time_delta
105106
profiling_run.status = "Error"
106107
profiling_run.save()
108+
109+
send_profiling_run_notifications(profiling_run)
107110
else:
108111
LOG.info("Setting profiling run status to Completed")
109112
profiling_run.profiling_endtime = datetime.now(UTC) + time_delta
110113
profiling_run.status = "Complete"
111114
profiling_run.save()
112115

116+
send_profiling_run_notifications(profiling_run)
117+
113118
_rollup_profiling_scores(profiling_run, table_group)
114119

115120
if bool(table_group.monitor_test_suite_id) and not table_group.last_complete_profile_run_id:

testgen/commands/run_refresh_score_cards_results.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
ScoreDefinitionResult,
1212
ScoreDefinitionResultHistoryEntry,
1313
)
14+
from testgen.common.notifications.score_drop import collect_score_notification_data, send_score_drop_notifications
1415

1516
LOG = logging.getLogger("testgen")
1617

@@ -26,16 +27,16 @@ def run_refresh_score_cards_results(
2627
_refresh_date = refresh_date or datetime.datetime.now(datetime.UTC)
2728

2829
try:
29-
definitions = []
3030
if not definition_id:
3131
definitions = ScoreDefinition.all(project_code=project_code)
3232
else:
33-
definitions.append(ScoreDefinition.get(str(definition_id)))
33+
definitions = [ScoreDefinition.get(str(definition_id))]
3434
except Exception:
3535
LOG.exception("Stopping scorecards results refresh after unexpected error")
3636
return
3737

3838
db_session = get_current_session()
39+
score_notification_data = []
3940

4041
for definition in definitions:
4142
LOG.info(
@@ -46,6 +47,9 @@ def run_refresh_score_cards_results(
4647

4748
try:
4849
fresh_score_card = definition.as_score_card()
50+
51+
collect_score_notification_data(score_notification_data, definition, fresh_score_card)
52+
4953
definition.clear_results()
5054
definition.results = _score_card_to_results(fresh_score_card)
5155
definition.breakdown = _score_definition_to_results_breakdown(definition)
@@ -89,6 +93,8 @@ def run_refresh_score_cards_results(
8993
end_time = time.time()
9094
LOG.info("Refreshing results for %s done after %s seconds", scope, round(end_time - start_time, 2))
9195

96+
send_score_drop_notifications(score_notification_data)
97+
9298

9399
def _score_card_to_results(score_card: ScoreCard) -> list[ScoreDefinitionResult]:
94100
return [

testgen/commands/run_test_execution.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from testgen.common.models.table_group import TableGroup
2626
from testgen.common.models.test_run import TestRun
2727
from testgen.common.models.test_suite import TestSuite
28+
from testgen.common.notifications.test_run import send_test_run_notifications
2829
from testgen.ui.session import session
2930
from testgen.utils import get_exception_message
3031

@@ -128,13 +129,16 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
128129
execute_db_queries(sql_generator.update_test_results())
129130
# Refresh needed because previous query updates the test run too
130131
test_run.refresh()
132+
131133
except Exception as e:
132134
LOG.exception("Test execution encountered an error.")
133135
LOG.info("Setting test run status to Error")
134136
test_run.log_message = get_exception_message(e)
135137
test_run.test_endtime = datetime.now(UTC) + time_delta
136138
test_run.status = "Error"
137139
test_run.save()
140+
141+
send_test_run_notifications(test_run)
138142
else:
139143
LOG.info("Setting test run status to Completed")
140144
test_run.test_endtime = datetime.now(UTC) + time_delta
@@ -145,6 +149,7 @@ def run_test_execution(test_suite_id: str | UUID, username: str | None = None, r
145149
test_suite.last_complete_test_run_id = test_run.id
146150
test_suite.save()
147151

152+
send_test_run_notifications(test_run)
148153
_rollup_test_scores(test_run, table_group)
149154
finally:
150155
MixpanelService().send_event(
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import re
2+
from collections.abc import Iterable
3+
from dataclasses import dataclass
4+
from typing import Self
5+
from uuid import UUID, uuid4
6+
7+
from sqlalchemy import Column, ForeignKey, String, and_, case, null, select
8+
from sqlalchemy.dialects import postgresql
9+
from sqlalchemy.ext.hybrid import hybrid_property
10+
from sqlalchemy.orm import aliased, relationship
11+
from sqlalchemy.sql.functions import func
12+
13+
from testgen.common.models import Base, get_current_session
14+
from testgen.common.models.entity import Entity
15+
16+
PII_RISK_RE = re.compile(r"Risk: (MODERATE|HIGH),")
17+
18+
19+
@dataclass
20+
class IssueCount:
21+
total: int = 0
22+
inactive: int = 0
23+
24+
@property
25+
def active(self):
26+
return self.total - self.inactive
27+
28+
29+
class HygieneIssueType(Base):
30+
__tablename__ = "profile_anomaly_types"
31+
32+
id: str = Column(String, primary_key=True)
33+
likelihood: str = Column("issue_likelihood", String)
34+
name: str = Column("anomaly_name", String)
35+
36+
# Note: not all table columns are implemented by this entity
37+
38+
39+
class HygieneIssue(Entity):
40+
__tablename__ = "profile_anomaly_results"
41+
42+
id: UUID = Column(postgresql.UUID(as_uuid=True), primary_key=True, nullable=False, default=uuid4)
43+
44+
project_code: str = Column(String, ForeignKey("projects.project_code"))
45+
table_groups_id: UUID = Column(postgresql.UUID(as_uuid=True), ForeignKey("table_groups.id"), nullable=False)
46+
profile_run_id: UUID = Column(postgresql.UUID(as_uuid=True), ForeignKey("profiling_runs.id"), nullable=False)
47+
48+
type_id: str = Column("anomaly_id", String, ForeignKey("profile_anomaly_types.id"), nullable=False)
49+
type_ = relationship(HygieneIssueType)
50+
51+
schema_name: str = Column(String, nullable=False)
52+
table_name: str = Column(String, nullable=False)
53+
column_name: str = Column(String, nullable=False)
54+
55+
detail: str = Column(String, nullable=False)
56+
disposition: str = Column(String)
57+
58+
# Note: not all table columns are implemented by this entity
59+
60+
@hybrid_property
61+
def priority(self):
62+
if self.type_.likelihood != "Potential PII":
63+
return self.type_.likelihood
64+
elif self.detail and (match := PII_RISK_RE.search(self.detail)):
65+
return match.group(1).capitalize()
66+
else:
67+
return None
68+
69+
@priority.expression
70+
def priority(cls):
71+
return case(
72+
(
73+
HygieneIssueType.likelihood != "Potential PII",
74+
HygieneIssueType.likelihood,
75+
),
76+
else_=func.initcap(
77+
func.substring(cls.detail, PII_RISK_RE.pattern)
78+
),
79+
)
80+
81+
@classmethod
82+
def select_count_by_priority(cls, profiling_run_id: UUID) -> dict[str, IssueCount]:
83+
count_query = (
84+
select(
85+
cls.priority,
86+
func.count(),
87+
func.count(cls.disposition.in_(("Dismissed", "Inactive"))),
88+
)
89+
.select_from(cls)
90+
.join(HygieneIssueType)
91+
.where(cls.profile_run_id == profiling_run_id)
92+
.group_by(cls.priority)
93+
)
94+
result = {
95+
priority: IssueCount(total, inactive)
96+
for priority, total, inactive in get_current_session().execute(count_query)
97+
}
98+
for p in ("Definite", "Likely", "Possible", "High", "Moderate"):
99+
result.setdefault(p, IssueCount())
100+
return result
101+
102+
@classmethod
103+
def select_with_diff(
104+
cls, profiling_run_id: UUID, other_profiling_run_id: UUID | None, *where_clauses, limit: int | None = None
105+
) -> Iterable[tuple[Self,bool,str]]:
106+
other = aliased(cls)
107+
order_weight = case(
108+
(cls.priority == "Definite", 1),
109+
(cls.priority == "Likely", 2),
110+
(cls.priority == "Possible", 3),
111+
(cls.priority == "High", 4),
112+
(cls.priority == "Moderate", 5),
113+
else_=6,
114+
)
115+
is_new_col = (other.id.is_(None) if other_profiling_run_id else null()).label("is_new")
116+
query = (
117+
select(
118+
cls,
119+
is_new_col,
120+
)
121+
.outerjoin(
122+
other,
123+
and_(
124+
other.table_groups_id == cls.table_groups_id,
125+
other.schema_name == cls.schema_name,
126+
other.table_name == cls.table_name,
127+
other.column_name == cls.column_name,
128+
other.type_id == cls.type_id,
129+
other.profile_run_id == other_profiling_run_id,
130+
),
131+
).join(
132+
HygieneIssueType,
133+
HygieneIssueType.id == cls.type_id,
134+
).where(
135+
cls.profile_run_id == profiling_run_id,
136+
*where_clauses
137+
).order_by(
138+
is_new_col.desc(),
139+
order_weight,
140+
).limit(
141+
limit,
142+
)
143+
)
144+
145+
return get_current_session().execute(query)

0 commit comments

Comments
 (0)