Skip to content

Commit b5db320

Browse files
committed
Add rows tracking
1 parent eb17e24 commit b5db320

20 files changed

Lines changed: 460 additions & 94 deletions

File tree

sqlmesh/core/console.py

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ def update_snapshot_evaluation_progress(
428428
num_audits_passed: int,
429429
num_audits_failed: int,
430430
audit_only: bool = False,
431+
rows_processed: t.Optional[int] = None,
431432
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
432433
) -> None:
433434
"""Updates the snapshot evaluation progress."""
@@ -576,6 +577,7 @@ def update_snapshot_evaluation_progress(
576577
num_audits_passed: int,
577578
num_audits_failed: int,
578579
audit_only: bool = False,
580+
rows_processed: t.Optional[int] = None,
579581
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
580582
) -> None:
581583
pass
@@ -1013,7 +1015,9 @@ def start_evaluation_progress(
10131015

10141016
# determine column widths
10151017
self.evaluation_column_widths["annotation"] = (
1016-
_calculate_annotation_str_len(batched_intervals, self.AUDIT_PADDING)
1018+
_calculate_annotation_str_len(
1019+
batched_intervals, self.AUDIT_PADDING, len(" (XXXXXX rows processed)")
1020+
)
10171021
+ 3 # brackets and opening escape backslash
10181022
)
10191023
self.evaluation_column_widths["name"] = max(
@@ -1058,6 +1062,7 @@ def update_snapshot_evaluation_progress(
10581062
num_audits_passed: int,
10591063
num_audits_failed: int,
10601064
audit_only: bool = False,
1065+
rows_processed: t.Optional[int] = None,
10611066
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
10621067
) -> None:
10631068
"""Update the snapshot evaluation progress."""
@@ -1078,7 +1083,7 @@ def update_snapshot_evaluation_progress(
10781083
).ljust(self.evaluation_column_widths["name"])
10791084

10801085
annotation = _create_evaluation_model_annotation(
1081-
snapshot, _format_evaluation_model_interval(snapshot, interval)
1086+
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
10821087
)
10831088
audits_str = ""
10841089
if num_audits_passed:
@@ -3642,6 +3647,7 @@ def update_snapshot_evaluation_progress(
36423647
num_audits_passed: int,
36433648
num_audits_failed: int,
36443649
audit_only: bool = False,
3650+
rows_processed: t.Optional[int] = None,
36453651
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
36463652
) -> None:
36473653
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
@@ -3812,6 +3818,7 @@ def update_snapshot_evaluation_progress(
38123818
num_audits_passed: int,
38133819
num_audits_failed: int,
38143820
audit_only: bool = False,
3821+
rows_processed: t.Optional[int] = None,
38153822
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
38163823
) -> None:
38173824
message = f"Evaluated {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
@@ -3996,7 +4003,8 @@ def show_table_diff_summary(self, table_diff: TableDiff) -> None:
39964003
self._write(f"Join On: {keys}")
39974004

39984005

3999-
_CONSOLE: Console = NoopConsole()
4006+
# _CONSOLE: Console = NoopConsole()
4007+
_CONSOLE: Console = TerminalConsole()
40004008

40014009

40024010
def set_console(console: Console) -> None:
@@ -4143,33 +4151,49 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) ->
41434151
return ""
41444152

41454153

4146-
def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Optional[str]) -> str:
4154+
def _create_evaluation_model_annotation(
4155+
snapshot: Snapshot, interval_info: t.Optional[str], rows_processed: t.Optional[int]
4156+
) -> str:
4157+
annotation = None
4158+
num_rows_processed = str(rows_processed) if rows_processed else ""
4159+
rows_processed_str = f" ({num_rows_processed} rows processed)" if num_rows_processed else ""
4160+
41474161
if snapshot.is_audit:
4148-
return "run standalone audit"
4149-
if snapshot.is_model and snapshot.model.kind.is_external:
4150-
return "run external audits"
4151-
if snapshot.model.kind.is_seed:
4152-
return "insert seed file"
4153-
if snapshot.model.kind.is_full:
4154-
return "full refresh"
4155-
if snapshot.model.kind.is_view:
4156-
return "recreate view"
4157-
if snapshot.model.kind.is_incremental_by_unique_key:
4158-
return "insert/update rows"
4159-
if snapshot.model.kind.is_incremental_by_partition:
4160-
return "insert partitions"
4161-
4162-
return interval_info if interval_info else ""
4163-
4164-
4165-
def _calculate_interval_str_len(snapshot: Snapshot, intervals: t.List[Interval]) -> int:
4162+
annotation = "run standalone audit"
4163+
if snapshot.is_model:
4164+
if snapshot.model.kind.is_external:
4165+
annotation = "run external audits"
4166+
if snapshot.model.kind.is_view:
4167+
annotation = "recreate view"
4168+
if snapshot.model.kind.is_seed:
4169+
# no "processed" for seeds
4170+
seed_num_rows_inserted = (
4171+
f" ({num_rows_processed} rows inserted)" if num_rows_processed else ""
4172+
)
4173+
annotation = f"insert seed file{seed_num_rows_inserted}"
4174+
if snapshot.model.kind.is_full:
4175+
annotation = f"full refresh{rows_processed_str}"
4176+
if snapshot.model.kind.is_incremental_by_unique_key:
4177+
annotation = f"insert/update rows{rows_processed_str}"
4178+
if snapshot.model.kind.is_incremental_by_partition:
4179+
annotation = f"insert partitions{rows_processed_str}"
4180+
4181+
if annotation:
4182+
return annotation
4183+
4184+
return f"{interval_info}{rows_processed_str}" if interval_info else ""
4185+
4186+
4187+
def _calculate_interval_str_len(
4188+
snapshot: Snapshot, intervals: t.List[Interval], rows_processed: t.Optional[int] = None
4189+
) -> int:
41664190
interval_str_len = 0
41674191
for interval in intervals:
41684192
interval_str_len = max(
41694193
interval_str_len,
41704194
len(
41714195
_create_evaluation_model_annotation(
4172-
snapshot, _format_evaluation_model_interval(snapshot, interval)
4196+
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
41734197
)
41744198
),
41754199
)
@@ -4222,13 +4246,16 @@ def _calculate_audit_str_len(snapshot: Snapshot, audit_padding: int = 0) -> int:
42224246

42234247

42244248
def _calculate_annotation_str_len(
4225-
batched_intervals: t.Dict[Snapshot, t.List[Interval]], audit_padding: int = 0
4249+
batched_intervals: t.Dict[Snapshot, t.List[Interval]],
4250+
audit_padding: int = 0,
4251+
rows_processed_len: int = 0,
42264252
) -> int:
42274253
annotation_str_len = 0
42284254
for snapshot, intervals in batched_intervals.items():
42294255
annotation_str_len = max(
42304256
annotation_str_len,
42314257
_calculate_interval_str_len(snapshot, intervals)
4232-
+ _calculate_audit_str_len(snapshot, audit_padding),
4258+
+ _calculate_audit_str_len(snapshot, audit_padding)
4259+
+ rows_processed_len,
42334260
)
42344261
return annotation_str_len

sqlmesh/core/engine_adapter/base.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
)
4141
from sqlmesh.core.model.kind import TimeColumn
4242
from sqlmesh.core.schema_diff import SchemaDiffer
43+
from sqlmesh.core.execution_tracker import record_execution as track_execution_record
4344
from sqlmesh.utils import (
4445
CorrelationId,
4546
columns_to_types_all_known,
@@ -835,6 +836,7 @@ def _create_table_from_source_queries(
835836
table_description: t.Optional[str] = None,
836837
column_descriptions: t.Optional[t.Dict[str, str]] = None,
837838
table_kind: t.Optional[str] = None,
839+
track_row_count: bool = True,
838840
**kwargs: t.Any,
839841
) -> None:
840842
table = exp.to_table(table_name)
@@ -880,11 +882,15 @@ def _create_table_from_source_queries(
880882
replace=replace,
881883
table_description=table_description,
882884
table_kind=table_kind,
885+
track_row_count=track_row_count,
883886
**kwargs,
884887
)
885888
else:
886889
self._insert_append_query(
887-
table_name, query, target_columns_to_types or self.columns(table)
890+
table_name,
891+
query,
892+
target_columns_to_types or self.columns(table),
893+
track_row_count=track_row_count,
888894
)
889895

890896
# Register comments with commands if the engine supports comments and we weren't able to
@@ -908,6 +914,7 @@ def _create_table(
908914
table_description: t.Optional[str] = None,
909915
column_descriptions: t.Optional[t.Dict[str, str]] = None,
910916
table_kind: t.Optional[str] = None,
917+
track_row_count: bool = True,
911918
**kwargs: t.Any,
912919
) -> None:
913920
self.execute(
@@ -924,7 +931,8 @@ def _create_table(
924931
),
925932
table_kind=table_kind,
926933
**kwargs,
927-
)
934+
),
935+
track_row_count=track_row_count,
928936
)
929937

930938
def _build_create_table_exp(
@@ -1399,6 +1407,7 @@ def insert_append(
13991407
table_name: TableName,
14001408
query_or_df: QueryOrDF,
14011409
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1410+
track_row_count: bool = True,
14021411
source_columns: t.Optional[t.List[str]] = None,
14031412
) -> None:
14041413
source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
@@ -1407,30 +1416,39 @@ def insert_append(
14071416
target_table=table_name,
14081417
source_columns=source_columns,
14091418
)
1410-
self._insert_append_source_queries(table_name, source_queries, target_columns_to_types)
1419+
self._insert_append_source_queries(
1420+
table_name, source_queries, target_columns_to_types, track_row_count
1421+
)
14111422

14121423
def _insert_append_source_queries(
14131424
self,
14141425
table_name: TableName,
14151426
source_queries: t.List[SourceQuery],
14161427
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1428+
track_row_count: bool = True,
14171429
) -> None:
14181430
with self.transaction(condition=len(source_queries) > 0):
14191431
target_columns_to_types = target_columns_to_types or self.columns(table_name)
14201432
for source_query in source_queries:
14211433
with source_query as query:
1422-
self._insert_append_query(table_name, query, target_columns_to_types)
1434+
self._insert_append_query(
1435+
table_name, query, target_columns_to_types, track_row_count=track_row_count
1436+
)
14231437

14241438
def _insert_append_query(
14251439
self,
14261440
table_name: TableName,
14271441
query: Query,
14281442
target_columns_to_types: t.Dict[str, exp.DataType],
14291443
order_projections: bool = True,
1444+
track_row_count: bool = True,
14301445
) -> None:
14311446
if order_projections:
14321447
query = self._order_projections_and_filter(query, target_columns_to_types)
1433-
self.execute(exp.insert(query, table_name, columns=list(target_columns_to_types)))
1448+
self.execute(
1449+
exp.insert(query, table_name, columns=list(target_columns_to_types)),
1450+
track_row_count=track_row_count,
1451+
)
14341452

14351453
def insert_overwrite_by_partition(
14361454
self,
@@ -1572,7 +1590,7 @@ def _insert_overwrite_by_condition(
15721590
)
15731591
if insert_overwrite_strategy.is_replace_where:
15741592
insert_exp.set("where", where or exp.true())
1575-
self.execute(insert_exp)
1593+
self.execute(insert_exp, track_row_count=True)
15761594

15771595
def update_table(
15781596
self,
@@ -1593,7 +1611,7 @@ def _merge(
15931611
using = exp.alias_(
15941612
exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True
15951613
)
1596-
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens))
1614+
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens), track_row_count=True)
15971615

15981616
def scd_type_2_by_time(
15991617
self,
@@ -2342,6 +2360,7 @@ def execute(
23422360
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
23432361
ignore_unsupported_errors: bool = False,
23442362
quote_identifiers: bool = True,
2363+
track_row_count: bool = False,
23452364
**kwargs: t.Any,
23462365
) -> None:
23472366
"""Execute a sql query."""
@@ -2363,7 +2382,7 @@ def execute(
23632382
expression=e if isinstance(e, exp.Expression) else None,
23642383
quote_identifiers=quote_identifiers,
23652384
)
2366-
self._execute(sql, **kwargs)
2385+
self._execute(sql, track_row_count, **kwargs)
23672386

23682387
def _attach_correlation_id(self, sql: str) -> str:
23692388
if self.ATTACH_CORRELATION_ID and self.correlation_id:
@@ -2388,9 +2407,20 @@ def _log_sql(
23882407

23892408
logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log)
23902409

2391-
def _execute(self, sql: str, **kwargs: t.Any) -> None:
2410+
def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) -> None:
23922411
self.cursor.execute(sql, **kwargs)
23932412

2413+
if track_row_count:
2414+
rowcount_raw = getattr(self.cursor, "rowcount", None)
2415+
rowcount = None
2416+
if rowcount_raw is not None:
2417+
try:
2418+
rowcount = int(rowcount_raw)
2419+
except (TypeError, ValueError):
2420+
pass
2421+
2422+
track_execution_record(sql, rowcount)
2423+
23942424
@contextlib.contextmanager
23952425
def temp_table(
23962426
self,
@@ -2435,6 +2465,7 @@ def temp_table(
24352465
exists=True,
24362466
table_description=None,
24372467
column_descriptions=None,
2468+
track_row_count=False,
24382469
**kwargs,
24392470
)
24402471

@@ -2686,7 +2717,7 @@ def _replace_by_key(
26862717
insert_statement.set("where", delete_filter)
26872718
insert_statement.set("this", exp.to_table(target_table))
26882719

2689-
self.execute(insert_statement)
2720+
self.execute(insert_statement, track_row_count=True)
26902721
finally:
26912722
self.drop_table(temp_table)
26922723

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
SourceQuery,
2121
set_catalog,
2222
)
23+
from sqlmesh.core.execution_tracker import record_execution as track_execution_record
2324
from sqlmesh.core.node import IntervalUnit
2425
from sqlmesh.core.schema_diff import SchemaDiffer
2526
from sqlmesh.utils import optional_import, get_source_columns_to_types
@@ -1049,6 +1050,7 @@ def _db_call(self, func: t.Callable[..., t.Any], *args: t.Any, **kwargs: t.Any)
10491050
def _execute(
10501051
self,
10511052
sql: str,
1053+
track_row_count: bool = False,
10521054
**kwargs: t.Any,
10531055
) -> None:
10541056
"""Execute a sql query."""
@@ -1094,6 +1096,9 @@ def _execute(
10941096
self.cursor._set_rowcount(query_results)
10951097
self.cursor._set_description(query_results.schema)
10961098

1099+
if track_row_count:
1100+
track_execution_record(sql, query_results.total_rows)
1101+
10971102
def _get_data_objects(
10981103
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
10991104
) -> t.List[DataObject]:

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ def _insert_overwrite_by_condition(
294294
)
295295

296296
try:
297-
self.execute(existing_records_insert_exp)
297+
self.execute(existing_records_insert_exp, track_row_count=True)
298298
finally:
299299
if table_partition_exp:
300300
self.drop_table(partitions_temp_table_name)
@@ -489,6 +489,7 @@ def _create_table(
489489
table_description: t.Optional[str] = None,
490490
column_descriptions: t.Optional[t.Dict[str, str]] = None,
491491
table_kind: t.Optional[str] = None,
492+
track_row_count: bool = True,
492493
**kwargs: t.Any,
493494
) -> None:
494495
"""Creates a table in the database.
@@ -525,6 +526,7 @@ def _create_table(
525526
column_descriptions,
526527
table_kind,
527528
empty_ctas=(self.engine_run_mode.is_cloud and expression is not None),
529+
track_row_count=track_row_count,
528530
**kwargs,
529531
)
530532

0 commit comments

Comments
 (0)