|
2 | 2 | import subprocess |
3 | 3 | import threading |
4 | 4 | import uuid |
| 5 | +from datetime import UTC, datetime |
5 | 6 |
|
6 | 7 | import pandas as pd |
7 | 8 |
|
|
20 | 21 | date_service, |
21 | 22 | ) |
22 | 23 | from testgen.common.database.database_service import empty_cache |
| 24 | +from testgen.common.mixpanel_service import MixpanelService |
23 | 25 |
|
24 | 26 | booClean = True |
25 | 27 | LOG = logging.getLogger("testgen") |
@@ -234,15 +236,19 @@ def run_profiling_in_background(table_group_id): |
234 | 236 | if settings.IS_DEBUG: |
235 | 237 | LOG.info(msg + ". Running in debug mode (new thread instead of new process).") |
236 | 238 | empty_cache() |
237 | | - background_thread = threading.Thread(target=run_profiling_queries, args=(table_group_id,)) |
| 239 | + background_thread = threading.Thread( |
| 240 | + target=run_profiling_queries, |
| 241 | + args=(table_group_id), |
| 242 | + kwargs={"source": "ui"}, |
| 243 | + ) |
238 | 244 | background_thread.start() |
239 | 245 | else: |
240 | 246 | LOG.info(msg) |
241 | 247 | script = ["testgen", "run-profile", "-tg", table_group_id] |
242 | 248 | subprocess.Popen(script) # NOQA S603 |
243 | 249 |
|
244 | 250 |
|
245 | | -def run_profiling_queries(strTableGroupsID, spinner=None): |
| 251 | +def run_profiling_queries(strTableGroupsID, spinner=None, source=None): |
246 | 252 | if strTableGroupsID is None: |
247 | 253 | raise ValueError("Table Group ID was not specified") |
248 | 254 |
|
@@ -308,28 +314,29 @@ def run_profiling_queries(strTableGroupsID, spinner=None): |
308 | 314 | if spinner: |
309 | 315 | spinner.next() |
310 | 316 |
|
| 317 | + table_count = 0 |
| 318 | + column_count = 0 |
311 | 319 | try: |
312 | 320 | # Retrieve Column Metadata |
313 | 321 | LOG.info("CurrentStep: Getting DDF from project") |
314 | 322 |
|
315 | 323 | strQuery = clsProfiling.GetDDFQuery() |
316 | 324 | lstResult = RetrieveDBResultsToDictList("PROJECT", strQuery) |
317 | | - |
318 | | - if len(lstResult) == 0: |
319 | | - LOG.warning("SQL retrieved 0 records") |
| 325 | + column_count = len(lstResult) |
320 | 326 |
|
321 | 327 | if lstResult: |
322 | | - if clsProfiling.profile_use_sampling == "Y": |
323 | | - # Get distinct tables |
324 | | - distinct_tables = set() |
325 | | - for item in lstResult: |
326 | | - schema_name = item["table_schema"] |
327 | | - table_name = item["table_name"] |
328 | | - distinct_tables.add(f"{schema_name}.{table_name}") |
| 328 | + # Get distinct tables |
| 329 | + distinct_tables = set() |
| 330 | + for item in lstResult: |
| 331 | + schema_name = item["table_schema"] |
| 332 | + table_name = item["table_name"] |
| 333 | + distinct_tables.add(f"{schema_name}.{table_name}") |
329 | 334 |
|
330 | | - # Convert the set to a list |
331 | | - distinct_tables_list = list(distinct_tables) |
| 335 | + # Convert the set to a list |
| 336 | + distinct_tables_list = list(distinct_tables) |
| 337 | + table_count = len(distinct_tables_list) |
332 | 338 |
|
| 339 | + if clsProfiling.profile_use_sampling == "Y": |
333 | 340 | # Sampling tables |
334 | 341 | lstQueries = [] |
335 | 342 | for parm_sampling_table in distinct_tables_list: |
@@ -494,12 +501,24 @@ def run_profiling_queries(strTableGroupsID, spinner=None): |
494 | 501 | raise |
495 | 502 | finally: |
496 | 503 | LOG.info("Updating the profiling run record") |
497 | | - lstProfileRunQuery = [ |
| 504 | + RunActionQueryList("DKTG", [ |
498 | 505 | clsProfiling.GetProfileRunInfoRecordUpdateQuery(), |
| 506 | + ]) |
| 507 | + |
| 508 | + MixpanelService().send_event( |
| 509 | + "run-profiling", |
| 510 | + source=source, |
| 511 | + sql_flavor=clsProfiling.flavor, |
| 512 | + sampling=clsProfiling.profile_use_sampling == "Y", |
| 513 | + table_count=table_count, |
| 514 | + column_count=column_count, |
| 515 | + duration=(datetime.now(UTC) - date_service.parse_now(clsProfiling.run_date)).total_seconds(), |
| 516 | + ) |
| 517 | + |
| 518 | + RunActionQueryList("DKTG", [ |
499 | 519 | clsProfiling.GetAnomalyScoringRollupRunQuery(), |
500 | 520 | clsProfiling.GetAnomalyScoringRollupTableGroupQuery(), |
501 | | - ] |
502 | | - RunActionQueryList("DKTG", lstProfileRunQuery) |
| 521 | + ]) |
503 | 522 | run_refresh_score_cards_results( |
504 | 523 | project_code=dctParms["project_code"], |
505 | 524 | add_history_entry=True, |
|
0 commit comments