Skip to content

Commit b29c371

Browse files
committed
Add BQ support and track bytes processed
1 parent dcbb1d4 commit b29c371

19 files changed

Lines changed: 191 additions & 115 deletions

File tree

sqlmesh/core/console.py

Lines changed: 74 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from sqlmesh.core.environment import EnvironmentNamingInfo, EnvironmentSummary
3232
from sqlmesh.core.linter.rule import RuleViolation
3333
from sqlmesh.core.model import Model
34+
from sqlmesh.core.execution_tracker import QueryExecutionStats
3435
from sqlmesh.core.snapshot import (
3536
Snapshot,
3637
SnapshotChangeCategory,
@@ -428,7 +429,7 @@ def update_snapshot_evaluation_progress(
428429
num_audits_passed: int,
429430
num_audits_failed: int,
430431
audit_only: bool = False,
431-
rows_processed: t.Optional[int] = None,
432+
execution_stats: t.Optional[QueryExecutionStats] = None,
432433
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
433434
) -> None:
434435
"""Updates the snapshot evaluation progress."""
@@ -577,7 +578,7 @@ def update_snapshot_evaluation_progress(
577578
num_audits_passed: int,
578579
num_audits_failed: int,
579580
audit_only: bool = False,
580-
rows_processed: t.Optional[int] = None,
581+
execution_stats: t.Optional[QueryExecutionStats] = None,
581582
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
582583
) -> None:
583584
pass
@@ -1016,7 +1017,7 @@ def start_evaluation_progress(
10161017
# determine column widths
10171018
self.evaluation_column_widths["annotation"] = (
10181019
_calculate_annotation_str_len(
1019-
batched_intervals, self.AUDIT_PADDING, len(" (XXXXXX rows processed)")
1020+
batched_intervals, self.AUDIT_PADDING, len(" (123.4m rows, 123.4 KiB)")
10201021
)
10211022
+ 3 # brackets and opening escape backslash
10221023
)
@@ -1062,7 +1063,7 @@ def update_snapshot_evaluation_progress(
10621063
num_audits_passed: int,
10631064
num_audits_failed: int,
10641065
audit_only: bool = False,
1065-
rows_processed: t.Optional[int] = None,
1066+
execution_stats: t.Optional[QueryExecutionStats] = None,
10661067
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
10671068
) -> None:
10681069
"""Update the snapshot evaluation progress."""
@@ -1083,7 +1084,7 @@ def update_snapshot_evaluation_progress(
10831084
).ljust(self.evaluation_column_widths["name"])
10841085

10851086
annotation = _create_evaluation_model_annotation(
1086-
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
1087+
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
10871088
)
10881089
audits_str = ""
10891090
if num_audits_passed:
@@ -3647,7 +3648,7 @@ def update_snapshot_evaluation_progress(
36473648
num_audits_passed: int,
36483649
num_audits_failed: int,
36493650
audit_only: bool = False,
3650-
rows_processed: t.Optional[int] = None,
3651+
execution_stats: t.Optional[QueryExecutionStats] = None,
36513652
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
36523653
) -> None:
36533654
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
@@ -3818,7 +3819,7 @@ def update_snapshot_evaluation_progress(
38183819
num_audits_passed: int,
38193820
num_audits_failed: int,
38203821
audit_only: bool = False,
3821-
rows_processed: t.Optional[int] = None,
3822+
execution_stats: t.Optional[QueryExecutionStats] = None,
38223823
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
38233824
) -> None:
38243825
message = f"Evaluated {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
@@ -4153,11 +4154,27 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) ->
41534154

41544155

41554156
def _create_evaluation_model_annotation(
4156-
snapshot: Snapshot, interval_info: t.Optional[str], rows_processed: t.Optional[int]
4157+
snapshot: Snapshot,
4158+
interval_info: t.Optional[str],
4159+
execution_stats: t.Optional[QueryExecutionStats],
41574160
) -> str:
41584161
annotation = None
4159-
num_rows_processed = str(rows_processed) if rows_processed else ""
4160-
rows_processed_str = f" ({num_rows_processed} rows)" if num_rows_processed else ""
4162+
execution_stats_str = ""
4163+
if execution_stats:
4164+
rows_processed = execution_stats.total_rows_processed
4165+
execution_stats_str += (
4166+
f"{_abbreviate_integer_count(rows_processed)} row{'s' if rows_processed > 1 else ''}"
4167+
if rows_processed
4168+
else ""
4169+
)
4170+
4171+
bytes_processed = execution_stats.total_bytes_processed
4172+
execution_stats_str += (
4173+
f"{', ' if execution_stats_str else ''}{_format_bytes(bytes_processed)}"
4174+
if bytes_processed
4175+
else ""
4176+
)
4177+
execution_stats_str = f" ({execution_stats_str})" if execution_stats_str else ""
41614178

41624179
if snapshot.is_audit:
41634180
annotation = "run standalone audit"
@@ -4167,30 +4184,32 @@ def _create_evaluation_model_annotation(
41674184
if snapshot.model.kind.is_view:
41684185
annotation = "recreate view"
41694186
if snapshot.model.kind.is_seed:
4170-
annotation = f"insert seed file{rows_processed_str}"
4187+
annotation = f"insert seed file{execution_stats_str}"
41714188
if snapshot.model.kind.is_full:
4172-
annotation = f"full refresh{rows_processed_str}"
4189+
annotation = f"full refresh{execution_stats_str}"
41734190
if snapshot.model.kind.is_incremental_by_unique_key:
4174-
annotation = f"insert/update rows{rows_processed_str}"
4191+
annotation = f"insert/update rows{execution_stats_str}"
41754192
if snapshot.model.kind.is_incremental_by_partition:
4176-
annotation = f"insert partitions{rows_processed_str}"
4193+
annotation = f"insert partitions{execution_stats_str}"
41774194

41784195
if annotation:
41794196
return annotation
41804197

4181-
return f"{interval_info}{rows_processed_str}" if interval_info else ""
4198+
return f"{interval_info}{execution_stats_str}" if interval_info else ""
41824199

41834200

41844201
def _calculate_interval_str_len(
4185-
snapshot: Snapshot, intervals: t.List[Interval], rows_processed: t.Optional[int] = None
4202+
snapshot: Snapshot,
4203+
intervals: t.List[Interval],
4204+
execution_stats: t.Optional[QueryExecutionStats] = None,
41864205
) -> int:
41874206
interval_str_len = 0
41884207
for interval in intervals:
41894208
interval_str_len = max(
41904209
interval_str_len,
41914210
len(
41924211
_create_evaluation_model_annotation(
4193-
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
4212+
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
41944213
)
41954214
),
41964215
)
@@ -4245,14 +4264,50 @@ def _calculate_audit_str_len(snapshot: Snapshot, audit_padding: int = 0) -> int:
42454264
def _calculate_annotation_str_len(
42464265
batched_intervals: t.Dict[Snapshot, t.List[Interval]],
42474266
audit_padding: int = 0,
4248-
rows_processed_len: int = 0,
4267+
execution_stats_len: int = 0,
42494268
) -> int:
42504269
annotation_str_len = 0
42514270
for snapshot, intervals in batched_intervals.items():
42524271
annotation_str_len = max(
42534272
annotation_str_len,
42544273
_calculate_interval_str_len(snapshot, intervals)
42554274
+ _calculate_audit_str_len(snapshot, audit_padding)
4256-
+ rows_processed_len,
4275+
+ execution_stats_len,
42574276
)
42584277
return annotation_str_len
4278+
4279+
4280+
# Convert number of bytes to a human-readable string
4281+
# https://github.com/dbt-labs/dbt-adapters/blob/34fd178539dcb6f82e18e738adc03de7784c032f/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L165
4282+
def _format_bytes(num_bytes: t.Optional[int]) -> str:
4283+
if num_bytes and num_bytes > 0:
4284+
if num_bytes < 1024:
4285+
return f"{num_bytes} Bytes"
4286+
4287+
num_bytes_float = float(num_bytes) / 1024.0
4288+
for unit in ["KiB", "MiB", "GiB", "TiB", "PiB"]:
4289+
if num_bytes_float < 1024.0:
4290+
return f"{num_bytes_float:3.1f} {unit}"
4291+
num_bytes_float /= 1024.0
4292+
4293+
num_bytes_float *= 1024.0 # undo last division in loop
4294+
return f"{num_bytes_float:3.1f} {unit}"
4295+
return ""
4296+
4297+
4298+
# Abbreviate integer count. Example: 1,000,000,000 -> 1b
4299+
# https://github.com/dbt-labs/dbt-adapters/blob/34fd178539dcb6f82e18e738adc03de7784c032f/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L178
4300+
def _abbreviate_integer_count(count: t.Optional[int]) -> str:
4301+
if count and count > 0:
4302+
if count < 1000:
4303+
return str(count)
4304+
4305+
count_float = float(count) / 1000.0
4306+
for unit in ["k", "m", "b", "t"]:
4307+
if count_float < 1000.0:
4308+
return f"{count_float:3.1f}{unit}".strip()
4309+
count_float /= 1000.0
4310+
4311+
count_float *= 1000.0 # undo last division in loop
4312+
return f"{count_float:3.1f}{unit}".strip()
4313+
return ""

sqlmesh/core/engine_adapter/base.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,7 @@ def _create_table_from_source_queries(
837837
table_description: t.Optional[str] = None,
838838
column_descriptions: t.Optional[t.Dict[str, str]] = None,
839839
table_kind: t.Optional[str] = None,
840-
track_row_count: bool = True,
840+
track_execution_stats: bool = True,
841841
**kwargs: t.Any,
842842
) -> None:
843843
table = exp.to_table(table_name)
@@ -883,15 +883,15 @@ def _create_table_from_source_queries(
883883
replace=replace,
884884
table_description=table_description,
885885
table_kind=table_kind,
886-
track_row_count=track_row_count,
886+
track_execution_stats=track_execution_stats,
887887
**kwargs,
888888
)
889889
else:
890890
self._insert_append_query(
891891
table_name,
892892
query,
893893
target_columns_to_types or self.columns(table),
894-
track_row_count=track_row_count,
894+
track_execution_stats=track_execution_stats,
895895
)
896896

897897
# Register comments with commands if the engine supports comments and we weren't able to
@@ -915,7 +915,7 @@ def _create_table(
915915
table_description: t.Optional[str] = None,
916916
column_descriptions: t.Optional[t.Dict[str, str]] = None,
917917
table_kind: t.Optional[str] = None,
918-
track_row_count: bool = True,
918+
track_execution_stats: bool = True,
919919
**kwargs: t.Any,
920920
) -> None:
921921
self.execute(
@@ -933,7 +933,7 @@ def _create_table(
933933
table_kind=table_kind,
934934
**kwargs,
935935
),
936-
track_row_count=track_row_count,
936+
track_execution_stats=track_execution_stats,
937937
)
938938

939939
def _build_create_table_exp(
@@ -1408,7 +1408,7 @@ def insert_append(
14081408
table_name: TableName,
14091409
query_or_df: QueryOrDF,
14101410
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1411-
track_row_count: bool = True,
1411+
track_execution_stats: bool = True,
14121412
source_columns: t.Optional[t.List[str]] = None,
14131413
) -> None:
14141414
source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
@@ -1418,22 +1418,25 @@ def insert_append(
14181418
source_columns=source_columns,
14191419
)
14201420
self._insert_append_source_queries(
1421-
table_name, source_queries, target_columns_to_types, track_row_count
1421+
table_name, source_queries, target_columns_to_types, track_execution_stats
14221422
)
14231423

14241424
def _insert_append_source_queries(
14251425
self,
14261426
table_name: TableName,
14271427
source_queries: t.List[SourceQuery],
14281428
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1429-
track_row_count: bool = True,
1429+
track_execution_stats: bool = True,
14301430
) -> None:
14311431
with self.transaction(condition=len(source_queries) > 0):
14321432
target_columns_to_types = target_columns_to_types or self.columns(table_name)
14331433
for source_query in source_queries:
14341434
with source_query as query:
14351435
self._insert_append_query(
1436-
table_name, query, target_columns_to_types, track_row_count=track_row_count
1436+
table_name,
1437+
query,
1438+
target_columns_to_types,
1439+
track_execution_stats=track_execution_stats,
14371440
)
14381441

14391442
def _insert_append_query(
@@ -1442,13 +1445,13 @@ def _insert_append_query(
14421445
query: Query,
14431446
target_columns_to_types: t.Dict[str, exp.DataType],
14441447
order_projections: bool = True,
1445-
track_row_count: bool = True,
1448+
track_execution_stats: bool = True,
14461449
) -> None:
14471450
if order_projections:
14481451
query = self._order_projections_and_filter(query, target_columns_to_types)
14491452
self.execute(
14501453
exp.insert(query, table_name, columns=list(target_columns_to_types)),
1451-
track_row_count=track_row_count,
1454+
track_execution_stats=track_execution_stats,
14521455
)
14531456

14541457
def insert_overwrite_by_partition(
@@ -1591,7 +1594,7 @@ def _insert_overwrite_by_condition(
15911594
)
15921595
if insert_overwrite_strategy.is_replace_where:
15931596
insert_exp.set("where", where or exp.true())
1594-
self.execute(insert_exp, track_row_count=True)
1597+
self.execute(insert_exp, track_execution_stats=True)
15951598

15961599
def update_table(
15971600
self,
@@ -1612,7 +1615,9 @@ def _merge(
16121615
using = exp.alias_(
16131616
exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True
16141617
)
1615-
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens), track_row_count=True)
1618+
self.execute(
1619+
exp.Merge(this=this, using=using, on=on, whens=whens), track_execution_stats=True
1620+
)
16161621

16171622
def scd_type_2_by_time(
16181623
self,
@@ -2361,7 +2366,7 @@ def execute(
23612366
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
23622367
ignore_unsupported_errors: bool = False,
23632368
quote_identifiers: bool = True,
2364-
track_row_count: bool = False,
2369+
track_execution_stats: bool = False,
23652370
**kwargs: t.Any,
23662371
) -> None:
23672372
"""Execute a sql query."""
@@ -2383,7 +2388,7 @@ def execute(
23832388
expression=e if isinstance(e, exp.Expression) else None,
23842389
quote_identifiers=quote_identifiers,
23852390
)
2386-
self._execute(sql, track_row_count, **kwargs)
2391+
self._execute(sql, track_execution_stats, **kwargs)
23872392

23882393
def _attach_correlation_id(self, sql: str) -> str:
23892394
if self.ATTACH_CORRELATION_ID and self.correlation_id:
@@ -2408,12 +2413,12 @@ def _log_sql(
24082413

24092414
logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log)
24102415

2411-
def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) -> None:
2416+
def _execute(self, sql: str, track_execution_stats: bool = False, **kwargs: t.Any) -> None:
24122417
self.cursor.execute(sql, **kwargs)
24132418

24142419
if (
24152420
self.SUPPORTS_QUERY_EXECUTION_TRACKING
2416-
and track_row_count
2421+
and track_execution_stats
24172422
and QueryExecutionTracker.is_tracking()
24182423
):
24192424
rowcount_raw = getattr(self.cursor, "rowcount", None)
@@ -2424,7 +2429,7 @@ def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) ->
24242429
except (TypeError, ValueError):
24252430
pass
24262431

2427-
QueryExecutionTracker.record_execution(sql, rowcount)
2432+
QueryExecutionTracker.record_execution(sql, rowcount, None)
24282433

24292434
@contextlib.contextmanager
24302435
def temp_table(
@@ -2470,7 +2475,7 @@ def temp_table(
24702475
exists=True,
24712476
table_description=None,
24722477
column_descriptions=None,
2473-
track_row_count=False,
2478+
track_execution_stats=False,
24742479
**kwargs,
24752480
)
24762481

@@ -2722,7 +2727,7 @@ def _replace_by_key(
27222727
insert_statement.set("where", delete_filter)
27232728
insert_statement.set("this", exp.to_table(target_table))
27242729

2725-
self.execute(insert_statement, track_row_count=True)
2730+
self.execute(insert_statement, track_execution_stats=True)
27262731
finally:
27272732
self.drop_table(temp_table)
27282733

0 commit comments

Comments
 (0)