From b1cbe6a49d306287a093091f1cbca44b63bf367a Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Wed, 27 May 2026 22:51:15 +0200 Subject: [PATCH 1/3] Refactor measurement dimensions handling in configuration and related components - Updated `ImpulseConfig` to accept a list of silver-layer `container_metrics` column names for `measurement_dimensions`, replacing the previous enum-based approach. - Enhanced documentation to clarify the behavior of `measurement_dimensions`, including the removal of automatic injection of `container_id`. - Introduced properties in `SolverConfig` for `start_ts` and `stop_ts` to facilitate easier access to internal timestamp column names. - Modified `ContainerDimension` to validate the presence of configured measurement dimensions in the silver `container_metrics` table, raising errors for missing columns. - Updated tests to ensure correct behavior with the new configuration structure and to validate the handling of custom timestamp column mappings. - Adjusted integration tests to reflect changes in measurement dimension handling and ensure compatibility with the updated configuration. --- docs/impulse/docs/config/configuration.md | 55 ++++---- .../docs/data_model/silver_layer_schema.md | 36 +++--- .../analyze/query/solvers/solver_config.py | 10 ++ src/impulse_reporting/config/config_parser.py | 120 ++++-------------- .../events/container_event.py | 14 +- .../meta/container_dimensions.py | 60 ++++----- tests/data/config/config.json | 2 +- .../delta_solver_column_mapping_test.py | 34 +++++ .../query/solvers/solver_config_test.py | 8 ++ .../container_and_sequence_event_test.py | 19 +-- .../integration/container_event_test.py | 99 +++++++++++++-- .../integration/simple_report_test.py | 5 +- .../unit/config/config_parser_test.py | 85 ++++--------- .../unit/meta/container_dimensions_test.py | 118 +++++++++-------- tests/unit/data/config/config.json | 2 +- 15 files changed, 337 insertions(+), 330 deletions(-) diff --git a/docs/impulse/docs/config/configuration.md b/docs/impulse/docs/config/configuration.md index c7c53e5..f19ae42 100644 --- a/docs/impulse/docs/config/configuration.md +++ b/docs/impulse/docs/config/configuration.md @@ -359,39 +359,42 @@ mode-resolution rules and what counts as a definition change. ## measurement_dimensions (optional) -List of `container_metrics` columns to surface into the gold-layer -`measurement_dimension` table. - -**Allowed values:** - -| Value | Description | -|--------------------|----------------------------------------------| -| `container_id` | Container identifier. | -| `uut_id` | Unit-under-test identifier. | -| `project_id` | Project identifier. | -| `vehicle_key` | Vehicle identifier. | -| `file_name` | Source measurement file name. | -| `source_file_path` | Full path to the source file. | -| `start_ts` | Measurement start timestamp. | -| `stop_ts` | Measurement stop timestamp. | -| `environment` | Recording environment (e.g. PUMA, datalogger). | +List of silver-layer `container_metrics` column names to surface into the +gold-layer `measurement_dimension` table. Names pass through unchanged: +whatever you list here is what you get in gold. + +Any column present in your silver `container_metrics` table is a valid +entry — there is no closed allow-list. Typical choices include +`container_id`, `uut_id`, `project`, `vehicle_key`, `file_name`, +`file_path`, `start_ts`, `stop_ts`, and `environment`, but any column +your silver schema carries is fair game. + +`container_id` is part of the default list and is recommended for any +real-world config: it is the upsert key used by incremental processing +and the join key between the measurement dimension and the event-fact +tables. If you override `measurement_dimensions` you take full +ownership of what ends up in gold — the framework does not inject +`container_id` for you. Omit it only if you know the consequences for +downstream joins and incremental runs. **Default:** ```json [ "container_id", - "uut_id", - "file_name", - "source_file_path", "start_ts", - "stop_ts", - "project_id", - "environment" + "stop_ts" ] ``` -Pick the entries that match the columns actually present in your -`container_metrics_table`. Columns referenced here must exist in your -silver schema; columns that don't appear here are ignored even if they -exist in silver. +If any listed column is not present in the silver `container_metrics` +table when the report runs, the run fails fast with a `ValueError` +naming the missing columns. + +**Migration note (pre-0.1):** earlier versions exposed a fixed enum +that renamed two silver columns on the way to gold (`project` → +`project_id`, `file_path` → `source_file_path`). The rename has been +removed; if you previously listed `"project_id"` or `"source_file_path"`, +list `"project"` and `"file_path"` instead. The default list also +shrank — if you relied on the old eight-column default, add the +columns you want explicitly. diff --git a/docs/impulse/docs/data_model/silver_layer_schema.md b/docs/impulse/docs/data_model/silver_layer_schema.md index 47add63..ffc94d0 100644 --- a/docs/impulse/docs/data_model/silver_layer_schema.md +++ b/docs/impulse/docs/data_model/silver_layer_schema.md @@ -110,20 +110,21 @@ count. get surfaced into the gold-layer `measurement_dimension` table when listed in the report's [`measurement_dimensions`](../config/configuration.md#measurement_dimensions-optional) -config. The framework recognises the following names through the -`MeasurementDimensions` enum — populate any subset that fits your -data; none are required by the engine. - -| Column | Type | Description | -|--------------------|----------|----------------------------------------------| -| `uut_id` | `long` | Unit-under-test identifier. | -| `vehicle_key` | `string` | Vehicle identifier. | -| `project_id` | `long` | Project identifier. | -| `file_name` | `string` | Source measurement file name. | -| `source_file_path` | `string` | Full path to the source file. | -| `start_ts` | `long` | Measurement start timestamp (epoch). | -| `stop_ts` | `long` | Measurement stop timestamp (epoch). | -| `environment` | `string` | Recording environment (e.g. PUMA, datalogger). | +config. Any column you list there must exist in this table — names +pass through to gold unchanged. The columns below are common choices, +but `measurement_dimensions` accepts any column you carry on this +table. + +| Column | Type | Description | +|---------------|----------|----------------------------------------------| +| `uut_id` | `long` | Unit-under-test identifier. | +| `vehicle_key` | `string` | Vehicle identifier. | +| `project` | `long` | Project identifier. | +| `file_name` | `string` | Source measurement file name. | +| `file_path` | `string` | Full path to the source file. | +| `start_ts` | `long` | Measurement start timestamp (epoch). | +| `stop_ts` | `long` | Measurement stop timestamp (epoch). | +| `environment` | `string` | Recording environment (e.g. PUMA, datalogger). | :::note Two timestamp conventions @@ -132,10 +133,9 @@ section) and `start_ts`/`stop_ts` (epoch long, listed here) are **different columns**, not naming variants. Real-world `container_metrics` tables typically carry both: `start_dt`/`stop_dt` for human-readable display, `start_ts`/`stop_ts` for the gold -`measurement_dimension` because the corresponding -`MeasurementDimensions` enum values map to the epoch-typed columns. -Populate whichever your queries and `measurement_dimensions` config -need. +`measurement_dimension` (the default `measurement_dimensions` includes +the epoch-typed pair). Populate whichever your queries and +`measurement_dimensions` config need. ::: diff --git a/src/impulse_query_engine/analyze/query/solvers/solver_config.py b/src/impulse_query_engine/analyze/query/solvers/solver_config.py index 48fc0bf..c4adc34 100644 --- a/src/impulse_query_engine/analyze/query/solvers/solver_config.py +++ b/src/impulse_query_engine/analyze/query/solvers/solver_config.py @@ -193,6 +193,16 @@ def tend_col(self) -> str: """Internal column name for the end timestamp.""" return "tend" + @property + def start_ts_col(self) -> str: + """Internal column name for the measurement-start epoch timestamp on container_metrics.""" + return "start_ts" + + @property + def stop_ts_col(self) -> str: + """Internal column name for the measurement-stop epoch timestamp on container_metrics.""" + return "stop_ts" + @property def value_col(self) -> str: """Internal column name for the signal value on the channels table.""" diff --git a/src/impulse_reporting/config/config_parser.py b/src/impulse_reporting/config/config_parser.py index 4953448..5a9db00 100644 --- a/src/impulse_reporting/config/config_parser.py +++ b/src/impulse_reporting/config/config_parser.py @@ -3,9 +3,7 @@ from enum import Enum, StrEnum from typing import Annotated -import pyspark.sql.functions as f -from pydantic import AfterValidator, BaseModel, model_validator -from pyspark.sql import Column +from pydantic import AfterValidator, BaseModel, field_validator, model_validator from impulse_query_engine.analyze.query.solvers.solver_config import SolverConfig @@ -78,88 +76,7 @@ def is_valid_unity_entity_name(entity_name: str) -> str: ) -class MeasurementDimensions(Enum): - """ - Enumeration for available measurement dimensions information. - Attributes - ---------- - CONTAINER_ID : str - Identifier for the container. - UUT_ID : str - Identifier for the unit under test (UUT). - PROJECT_ID : str - Identifier for the project. - UUT_NAME : str - Name of the unit under test (UUT). Currently not present in implementation. - FILE_NAME : str - Name of the file associated with the measurement. - SOURCE_FILE_PATH : str - Path to the source file containing the measurement data. - START_TS : str - Timestamp of the first data point in the measurement. - STOP_TS : str - Timestamp of the last data point in the measurement. - ODO_START : str - Starting odometer reading for the measurement. Currently not present in implementation. - ODO_STOP : str - Stopping odometer reading for the measurement. Currently not present in implementation. - ENVIRONMENT : str - Environment in which the measurement was taken either puma or datalogger. - Notes - ----- - The `get_column` method returns the corresponding Spark SQL column for each dimension. - """ - - CONTAINER_ID = "container_id" - UUT_ID = "uut_id" - PROJECT_ID = "project_id" # todo not present currently - VEHICLE_KEY = "vehicle_key" - UUT_NAME = "uut_name" # todo not present currently - FILE_NAME = "file_name" - SOURCE_FILE_PATH = "source_file_path" - START_TS = "start_ts" - STOP_TS = "stop_ts" - ODO_START = "odo_start" # todo not present currently - ODO_STOP = "odo_stop" # todo not present currently - ENVIRONMENT = "environment" - - def get_column(self) -> Column: - """ - Returns the corresponding Spark SQL column for the measurement dimension. - The column names are mapped to their respective values based on the ER gold naming conventions. - Returns - ------- - pyspark.sql.Column - The Spark SQL column corresponding to the measurement dimension. - """ - measurement_dimensions_not_present_currently = [ - MeasurementDimensions.UUT_NAME, - MeasurementDimensions.ODO_START, - MeasurementDimensions.ODO_STOP, - ] - measurement_column = ( - f.lit("NOT_IMPLEMENTED") - if self in measurement_dimensions_not_present_currently - else f.column(self.value) - ) - return measurement_column - - def map_gold_name_to_silver(self) -> str: - """ - Maps the silver layer column name to the ER gold layer column name. - - Returns - ------- - str - The gold layer column name. - """ - measurement_dimensions_er_gold_naming_map = { - MeasurementDimensions.PROJECT_ID: "project", - MeasurementDimensions.SOURCE_FILE_PATH: "file_path", - } - - column_name = measurement_dimensions_er_gold_naming_map.get(self, self.value) - return column_name +DEFAULT_MEASUREMENT_DIMENSIONS = ["container_id", "start_ts", "stop_ts"] class DataType(StrEnum): @@ -510,8 +427,14 @@ class ImpulseConfig(BaseModel): Optional query engine configuration. Defaults to Solvers.KEY_VALUE_STORE_SOLVER. incremental : IncrementalConfig, optional Optional incremental processing configuration. Defaults to IncrementalConfig(). - measurement_dimensions : list of MeasurementDimensions, optional - List of measurement dimensions to include in the configuration. + measurement_dimensions : list of str, optional + Silver-layer ``container_metrics`` column names to surface into the + gold-layer ``measurement_dimension`` table. Defaults to + ``["container_id", "start_ts", "stop_ts"]``. The framework does not + inject any column the user omits — keeping ``container_id`` in the + list is recommended because it is the upsert key for incremental + processing and the join key to event-fact tables, but the choice + is the user's. Examples -------- >>> config_data = { @@ -572,13 +495,16 @@ class ImpulseConfig(BaseModel): query_engine: QueryEngine = QueryEngine(solver=Solvers.KEY_VALUE_STORE_SOLVER) incremental: IncrementalConfig | None = None - measurement_dimensions: list[MeasurementDimensions] | None = [ - MeasurementDimensions.CONTAINER_ID, - MeasurementDimensions.UUT_ID, - MeasurementDimensions.FILE_NAME, - MeasurementDimensions.SOURCE_FILE_PATH, - MeasurementDimensions.START_TS, - MeasurementDimensions.STOP_TS, - MeasurementDimensions.PROJECT_ID, - MeasurementDimensions.ENVIRONMENT, - ] + measurement_dimensions: list[str] = list(DEFAULT_MEASUREMENT_DIMENSIONS) + + @field_validator("measurement_dimensions", mode="after") + @classmethod + def _normalize_measurement_dimensions(cls, value: list[str]) -> list[str]: + seen: set[str] = set() + normalized: list[str] = [] + for name in value: + is_valid_unity_entity_name(name) + if name not in seen: + seen.add(name) + normalized.append(name) + return normalized diff --git a/src/impulse_reporting/events/container_event.py b/src/impulse_reporting/events/container_event.py index c7ea338..5010500 100644 --- a/src/impulse_reporting/events/container_event.py +++ b/src/impulse_reporting/events/container_event.py @@ -11,7 +11,6 @@ from impulse_query_engine.analyze.query.query_builder import QueryBuilder from impulse_query_engine.analyze.query.solvers.query_solver import QuerySolver -from impulse_reporting.config.config_parser import MeasurementDimensions from impulse_reporting.events.event import Event from impulse_reporting.persist.dimension_schema import EVENT_DIMENSION_SCHEMA from impulse_reporting.persist.fact_schema import EVENT_INSTANCE_FACT_SCHEMA @@ -179,13 +178,14 @@ def determine_events( # Rename silver columns to gold event fact column names and cast # timestamps from TIMESTAMP to LongType so the DataFrame is # union-compatible with BasicEvent (which produces numeric ts). + # Silver-side names come from SolverConfig so customers can remap + # physical column names via column_name_mapping. Gold-side names + # ("start_ts", "end_ts") are owned by EVENT_INSTANCE_FACT_SCHEMA. + start_ts_col = solver.config.start_ts_col + stop_ts_col = solver.config.stop_ts_col df = ( - container_metrics_df.withColumnRenamed( - MeasurementDimensions.CONTAINER_ID.value, "container_id" - ) - .withColumnRenamed(MeasurementDimensions.START_TS.value, "start_ts") - .withColumnRenamed(MeasurementDimensions.STOP_TS.value, "end_ts") - .withColumn("start_ts", f.col("start_ts").cast("long")) + container_metrics_df.withColumnRenamed(stop_ts_col, "end_ts") + .withColumn("start_ts", f.col(start_ts_col).cast("long")) .withColumn("end_ts", f.col("end_ts").cast("long")) ) diff --git a/src/impulse_reporting/meta/container_dimensions.py b/src/impulse_reporting/meta/container_dimensions.py index 69fd7c4..040bdb0 100644 --- a/src/impulse_reporting/meta/container_dimensions.py +++ b/src/impulse_reporting/meta/container_dimensions.py @@ -5,7 +5,7 @@ from impulse_query_engine.analyze.query.query_builder import QueryBuilder from impulse_query_engine.analyze.query.solvers.query_solver import QuerySolver -from impulse_reporting.config.config_parser import ImpulseConfig, MeasurementDimensions +from impulse_reporting.config.config_parser import ImpulseConfig class ContainerDimension: @@ -20,10 +20,13 @@ def get_dimension( pre_filtered_containers_df: DataFrame = None, ) -> DataFrame: """ - Retrieves meta dimensions for the specified units under test (UUTs) from the silver container_metrics table. + Retrieves the configured measurement dimensions for the matching set + of containers from the silver ``container_metrics`` table. Uses the solver filter pipeline (filter_container_tags -> filter_container_metrics) - to resolve the matching set of containers and their full metrics. + to resolve the matching set of containers and their full metrics, then + selects exactly the columns listed in ``config.measurement_dimensions``. + Silver column names pass through to gold unchanged. Parameters ---------- @@ -41,48 +44,33 @@ def get_dimension( Returns ------- DataFrame - A DataFrame containing the selected measurement dimensions for the specified UUTs. + A DataFrame containing the selected measurement dimensions for the + matching set of containers. + + Raises + ------ + ValueError + If any column listed in ``config.measurement_dimensions`` is not + present in the silver ``container_metrics`` DataFrame. """ measurement_dimensions = config.measurement_dimensions - desired_container_metrics_columns = [ - dimension.get_column() for dimension in measurement_dimensions - ] - container_tags_df = solver.filter_container_tags(spark, query) df = solver.filter_container_metrics( spark, query, container_tags_df, pre_filtered_containers_df ) - df_renamed = ContainerDimension._rename_dimension_cols(df, measurement_dimensions) - return df_renamed.select(*desired_container_metrics_columns).transform( - ContainerDimension._add_config_hash(config) - ) - @staticmethod - def _rename_dimension_cols( - df: DataFrame, measurement_dimensions: list[MeasurementDimensions] - ) -> DataFrame: - """ - Renames the columns of the DataFrame to match the er gold layer schema. + missing = [c for c in measurement_dimensions if c not in df.columns] + if missing: + raise ValueError( + "Configured measurement_dimensions columns are not present in " + f"the silver container_metrics table: {missing}. Available " + f"columns: {df.columns}." + ) - Parameters - ---------- - df : DataFrame - The DataFrame containing the measurement dimensions. - measurement_dimensions : list[MeasurementDimensions] - List of measurement dimension columns to rename. - Returns - ------- - DataFrame - A DataFrame with renamed columns based on the measurement dimensions. - """ - renamed_columns = [] - for column_name in measurement_dimensions: - silver_layer_name = MeasurementDimensions(column_name).map_gold_name_to_silver() - if silver_layer_name in df.columns: - renamed_columns.append(F.col(silver_layer_name).alias(column_name.value)) - - return df.select(*renamed_columns) + return df.select(*measurement_dimensions).transform( + ContainerDimension._add_config_hash(config) + ) @staticmethod def _add_config_hash(config: ImpulseConfig) -> Callable[..., "DataFrame"]: diff --git a/tests/data/config/config.json b/tests/data/config/config.json index 4be6e69..21262f1 100644 --- a/tests/data/config/config.json +++ b/tests/data/config/config.json @@ -23,6 +23,6 @@ "measurement_dimensions": [ "uut_id", "file_name", - "source_file_path" + "file_path" ] } \ No newline at end of file diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py index 186d855..3db9284 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py @@ -282,6 +282,40 @@ def test_default_config_fails_on_renamed_metrics(self, spark, db_custom_metrics) solver.filter_container_metrics(spark, query, tags_df).collect() +class TestDeltaSolverContainerMetricsTimestampMapping: + """Verify start_ts/stop_ts on container_metrics can be remapped from custom physical names.""" + + @pytest.fixture + def db_custom_ts(self, spark): + tables = _default_tables(spark) + tables["container_metrics"] = _container_metrics_df( + spark, _CONTAINER_METRICS_ROWS, start_col="t_start", stop_col="t_stop" + ) + return _make_db(tables) + + def test_filter_container_metrics_exposes_internal_ts_names(self, spark, db_custom_ts): + cfg = SolverConfig( + container_metrics=TableConfig( + column_name_mapping={"t_start": "start_ts", "t_stop": "stop_ts"}, + ), + ) + solver = DeltaSolver(spark, config=cfg) + assert solver.config.start_ts_col == "start_ts" + assert solver.config.stop_ts_col == "stop_ts" + + query = db_custom_ts.query + tags_df = solver.filter_container_tags(spark, query) + result = solver.filter_container_metrics(spark, query, tags_df) + + assert {"start_ts", "stop_ts"}.issubset(set(result.columns)) + assert "t_start" not in result.columns + assert "t_stop" not in result.columns + rows = sorted( + (row.container_id, row.start_ts, row.stop_ts) for row in result.collect() + ) + assert rows == [(1, 1000, 3000), (2, 1000, 3000), (3, 1000, 3000)] + + # =================================================================== # TEST GROUP 3: channel_tags column rename # =================================================================== diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py index 6bc29ba..c692c2e 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py @@ -114,6 +114,12 @@ def test_container_id_col(self, cfg: SolverConfig): def test_parent_id_col(self, cfg: SolverConfig): assert cfg.parent_id_col == "parent_id" + def test_start_ts_col(self, cfg: SolverConfig): + assert cfg.start_ts_col == "start_ts" + + def test_stop_ts_col(self, cfg: SolverConfig): + assert cfg.stop_ts_col == "stop_ts" + def test_properties_same_for_default_config(self): default = SolverConfig() assert default.container_id_col == "container_id" @@ -123,6 +129,8 @@ def test_properties_same_for_default_config(self): assert default.value_col == "value" assert default.project_id_col == "project_id" assert default.parent_id_col == "parent_id" + assert default.start_ts_col == "start_ts" + assert default.stop_ts_col == "stop_ts" # --------------------------------------------------------------------------- diff --git a/tests/impulse_reporting/integration/container_and_sequence_event_test.py b/tests/impulse_reporting/integration/container_and_sequence_event_test.py index a88f2c7..24a27f8 100644 --- a/tests/impulse_reporting/integration/container_and_sequence_event_test.py +++ b/tests/impulse_reporting/integration/container_and_sequence_event_test.py @@ -9,7 +9,6 @@ Comparator, ContainerFilters, ImpulseConfig, - MeasurementDimensions, MetricFilter, QueryEngine, Solvers, @@ -59,11 +58,7 @@ def test_container_event_in_report(spark, basic_narrow_db): ] ), query_engine=QueryEngine(solver=Solvers.KEY_VALUE_STORE_SOLVER), - measurement_dimensions=[ - MeasurementDimensions.CONTAINER_ID, - MeasurementDimensions.START_TS, - MeasurementDimensions.STOP_TS, - ], + measurement_dimensions=["container_id", "start_ts", "stop_ts"], ) # Create report and add container event @@ -170,11 +165,7 @@ def test_container_event_with_basic_event(spark, basic_narrow_db): ] ), query_engine=QueryEngine(solver=Solvers.KEY_VALUE_STORE_SOLVER), - measurement_dimensions=[ - MeasurementDimensions.CONTAINER_ID, - MeasurementDimensions.START_TS, - MeasurementDimensions.STOP_TS, - ], + measurement_dimensions=["container_id", "start_ts", "stop_ts"], ) my_report = Report( @@ -255,11 +246,7 @@ def test_sequence_of_events_without_max_overlap_in_report(spark, basic_narrow_db ] ), query_engine=QueryEngine(solver=Solvers.KEY_VALUE_STORE_SOLVER), - measurement_dimensions=[ - MeasurementDimensions.CONTAINER_ID, - MeasurementDimensions.START_TS, - MeasurementDimensions.STOP_TS, - ], + measurement_dimensions=["container_id", "start_ts", "stop_ts"], ) my_report = Report( diff --git a/tests/impulse_reporting/integration/container_event_test.py b/tests/impulse_reporting/integration/container_event_test.py index 5e94c36..662a77c 100644 --- a/tests/impulse_reporting/integration/container_event_test.py +++ b/tests/impulse_reporting/integration/container_event_test.py @@ -5,11 +5,14 @@ import pytest from databricks.sdk import WorkspaceClient +from impulse_query_engine.analyze.query.solvers.solver_config import ( + SolverConfig, + TableConfig, +) from impulse_reporting.config.config_parser import ( Comparator, ContainerFilters, ImpulseConfig, - MeasurementDimensions, MetricFilter, QueryEngine, Solvers, @@ -56,11 +59,7 @@ def test_container_event_in_report(spark, basic_narrow_db): ] ), query_engine=QueryEngine(solver=Solvers.KEY_VALUE_STORE_SOLVER), - measurement_dimensions=[ - MeasurementDimensions.CONTAINER_ID, - MeasurementDimensions.START_TS, - MeasurementDimensions.STOP_TS, - ], + measurement_dimensions=["container_id", "start_ts", "stop_ts"], ) my_report = Report( @@ -164,11 +163,7 @@ def test_container_event_with_basic_event(spark, basic_narrow_db): ] ), query_engine=QueryEngine(solver=Solvers.KEY_VALUE_STORE_SOLVER), - measurement_dimensions=[ - MeasurementDimensions.CONTAINER_ID, - MeasurementDimensions.START_TS, - MeasurementDimensions.STOP_TS, - ], + measurement_dimensions=["container_id", "start_ts", "stop_ts"], ) my_report = Report( @@ -265,3 +260,85 @@ def test_report_rejects_two_container_events(): with pytest.raises(ValueError, match="Only one ContainerEvent is allowed per report"): report.add_event(event2) + + +def test_container_event_with_remapped_silver_timestamps(spark, basic_narrow_db): + """ContainerEvent works end-to-end when silver carries custom timestamp column names. + + Regression test for the SolverConfig.start_ts_col / stop_ts_col contract: + a customer whose container_metrics table uses t_start / t_stop instead of + start_ts / stop_ts must be able to remap them via column_name_mapping and + have ContainerEvent produce correct event-instance-fact rows without + framework code changes. + """ + remapped_table = "spark_catalog.silver.container_metrics_remapped_ts" + ( + spark.read.table("spark_catalog.silver.container_metrics") + .withColumnRenamed("start_ts", "t_start") + .withColumnRenamed("stop_ts", "t_stop") + .write.format("delta") + .mode("overwrite") + .saveAsTable(remapped_table) + ) + + try: + impulse_config = ImpulseConfig( + source=Source( + container_metrics_table=remapped_table, + channel_metrics_table="spark_catalog.silver.channel_metrics", + channels_uri="spark_catalog.silver.channels", + ), + unity_sink=UnitySink( + catalog="spark_catalog", + schema="gold", + table_prefix="container_event_remapped_test", + ), + container_filters=ContainerFilters( + metric_filters=[ + [ + MetricFilter( + column_name="vehicle_key", + comparator=Comparator.EQ, + value="Seat_Leon", + ), + MetricFilter( + column_name="start_dt", + comparator=Comparator.GE, + value="2025-07-03T07:00:00.000Z", + ), + ] + ] + ), + query_engine=QueryEngine( + solver=Solvers.KEY_VALUE_STORE_SOLVER, + solver_config=SolverConfig( + container_metrics=TableConfig( + column_name_mapping={"t_start": "start_ts", "t_stop": "stop_ts"}, + ), + ), + ), + measurement_dimensions=["container_id", "start_ts", "stop_ts"], + ) + + my_report = Report( + name="container_event_remapped_report", + spark=spark, + workspace_client=create_autospec(WorkspaceClient), + config=dict(impulse_config), + ) + my_report.add_event(ContainerEvent(name="full_measurement")) + my_report.determine_report() + + event_rows = my_report.event_dfs["CONTAINER_EVENT"]["changed"].collect() + assert len(event_rows) == 3 + + expected = { + 1: {"start_ts": 1751528502708, "end_ts": 1751528610253}, + 2: {"start_ts": 1751528501483, "end_ts": 1751528610235}, + 3: {"start_ts": 1751528500169, "end_ts": 1751528610252}, + } + for row in event_rows: + assert row.start_ts == expected[row.container_id]["start_ts"] + assert row.end_ts == expected[row.container_id]["end_ts"] + finally: + spark.sql(f"DROP TABLE IF EXISTS {remapped_table} PURGE") diff --git a/tests/impulse_reporting/integration/simple_report_test.py b/tests/impulse_reporting/integration/simple_report_test.py index 8e39c6d..5dc1c25 100644 --- a/tests/impulse_reporting/integration/simple_report_test.py +++ b/tests/impulse_reporting/integration/simple_report_test.py @@ -18,7 +18,6 @@ Comparator, ContainerFilters, ImpulseConfig, - MeasurementDimensions, MetricFilter, QueryEngine, Solvers, @@ -74,7 +73,7 @@ def test_simple_report1(spark, setup_narrow_db): ] ), query_engine=QueryEngine(solver=Solvers.DELTA_SOLVER), - measurement_dimensions=[MeasurementDimensions.CONTAINER_ID], + measurement_dimensions=["container_id"], ) my_report = Report( @@ -777,7 +776,7 @@ def test_simple_report_key_value_store(spark, key_value_store_db): ), ), ), - measurement_dimensions=[MeasurementDimensions.CONTAINER_ID], + measurement_dimensions=["container_id"], ) my_report = Report( diff --git a/tests/impulse_reporting/unit/config/config_parser_test.py b/tests/impulse_reporting/unit/config/config_parser_test.py index 6a87386..8917684 100644 --- a/tests/impulse_reporting/unit/config/config_parser_test.py +++ b/tests/impulse_reporting/unit/config/config_parser_test.py @@ -1,4 +1,3 @@ -import pyspark.sql.functions as f import pytest from pydantic import ValidationError @@ -9,7 +8,6 @@ DataType, IncrementalConfig, ImpulseConfig, - MeasurementDimensions, MetricFilter, Solvers, TagFilter, @@ -44,7 +42,7 @@ "measurement_dimensions": [ "uut_id", "file_name", - "source_file_path", + "file_path", "start_ts", "stop_ts", ], @@ -66,11 +64,14 @@ def test_impulse_config_from_dict(): assert config.container_filters.metric_filters[0][0].comparator == Comparator.EQ assert config.query_engine.solver == Solvers.KEY_VALUE_STORE_SOLVER - assert MeasurementDimensions.UUT_ID in config.measurement_dimensions - assert MeasurementDimensions.FILE_NAME in config.measurement_dimensions - assert MeasurementDimensions.SOURCE_FILE_PATH in config.measurement_dimensions - assert MeasurementDimensions.START_TS in config.measurement_dimensions - assert MeasurementDimensions.STOP_TS in config.measurement_dimensions + # The list passes through verbatim — the framework does not inject container_id. + assert config.measurement_dimensions == [ + "uut_id", + "file_name", + "file_path", + "start_ts", + "stop_ts", + ] def test_impulse_config_data_format_defaults_to_rle(): @@ -143,23 +144,24 @@ def test_impulse_config_from_dict_no_measurement_dim_provided(): config_json.pop("measurement_dimensions", None) config = ImpulseConfig.model_validate(config_json) - assert MeasurementDimensions.CONTAINER_ID in config.measurement_dimensions - assert MeasurementDimensions.UUT_ID in config.measurement_dimensions - assert MeasurementDimensions.FILE_NAME in config.measurement_dimensions - assert MeasurementDimensions.SOURCE_FILE_PATH in config.measurement_dimensions - assert MeasurementDimensions.START_TS in config.measurement_dimensions - assert MeasurementDimensions.STOP_TS in config.measurement_dimensions - assert MeasurementDimensions.PROJECT_ID in config.measurement_dimensions - assert MeasurementDimensions.ENVIRONMENT in config.measurement_dimensions + assert config.measurement_dimensions == ["container_id", "start_ts", "stop_ts"] -def test_impulse_config_from_dict_wrong_measurement_dim_provided(): - """Test ImpulseConfig with wrong measurement dimension info provided.""" +def test_impulse_config_measurement_dimensions_rejects_invalid_identifier(): + """Names that aren't valid Unity entity identifiers are rejected at config load.""" config_json = impulse_config_JSON.copy() - config_json.update({"measurement_dimensions": ["wrong_dimension"]}) + config_json.update({"measurement_dimensions": ["bad name with spaces"]}) with pytest.raises(ValidationError): - config = ImpulseConfig.model_validate(config_json) + ImpulseConfig.model_validate(config_json) + + +def test_impulse_config_measurement_dimensions_user_list_verbatim(): + """User-supplied list is preserved verbatim — no container_id injection.""" + config_json = impulse_config_JSON.copy() + config_json.update({"measurement_dimensions": ["uut_id"]}) + config = ImpulseConfig.model_validate(config_json) + assert config.measurement_dimensions == ["uut_id"] def test_impulse_config_no_container_filters(): @@ -180,49 +182,6 @@ def test_impulse_config_empty_container_filters(): assert config.container_filters.metric_filters == [] -def test_get_column(): - """Test the `get_column` method of `MeasurementDimensions`.""" - implemented = [ - MeasurementDimensions.CONTAINER_ID, - MeasurementDimensions.PROJECT_ID, - MeasurementDimensions.UUT_ID, - MeasurementDimensions.FILE_NAME, - MeasurementDimensions.SOURCE_FILE_PATH, - MeasurementDimensions.START_TS, - MeasurementDimensions.STOP_TS, - ] - not_implemented = [ - MeasurementDimensions.UUT_NAME, - MeasurementDimensions.ODO_START, - MeasurementDimensions.ODO_STOP, - ] - - for dim in implemented: - assert str(dim.get_column()) == str(f.col(dim.value)) - - for dim in not_implemented: - assert str(dim.get_column()) == str(f.lit("NOT_IMPLEMENTED")) - - -def test_map_gold_name_to_silver(): - """Test the `map_gold_name_to_silver` method of `MeasurementDimensions`.""" - expected_mappings = { - MeasurementDimensions.CONTAINER_ID: MeasurementDimensions.CONTAINER_ID.value, - MeasurementDimensions.UUT_ID: MeasurementDimensions.UUT_ID.value, - MeasurementDimensions.PROJECT_ID: "project", - MeasurementDimensions.UUT_NAME: MeasurementDimensions.UUT_NAME.value, - MeasurementDimensions.FILE_NAME: MeasurementDimensions.FILE_NAME.value, - MeasurementDimensions.SOURCE_FILE_PATH: "file_path", - MeasurementDimensions.START_TS: "start_ts", - MeasurementDimensions.STOP_TS: "stop_ts", - MeasurementDimensions.ODO_START: MeasurementDimensions.ODO_START.value, - MeasurementDimensions.ODO_STOP: MeasurementDimensions.ODO_STOP.value, - } - - for dim, expected in expected_mappings.items(): - assert dim.map_gold_name_to_silver() == expected - - def test_tags_table(): """Test the `container_tags_table` field in `ImpulseConfig.source`.""" config_json = impulse_config_JSON.copy() diff --git a/tests/impulse_reporting/unit/meta/container_dimensions_test.py b/tests/impulse_reporting/unit/meta/container_dimensions_test.py index acad8f8..066330f 100644 --- a/tests/impulse_reporting/unit/meta/container_dimensions_test.py +++ b/tests/impulse_reporting/unit/meta/container_dimensions_test.py @@ -3,52 +3,72 @@ from unittest.mock import create_autospec import pyspark.sql.types as T +import pytest from databricks.sdk import WorkspaceClient from pyspark.sql.types import Row -from impulse_reporting.config.config_parser import ImpulseConfig, MeasurementDimensions +from impulse_reporting.config.config_parser import ImpulseConfig from impulse_reporting.core.report import Report from impulse_reporting.meta.container_dimensions import ContainerDimension from tests.conftest import spark -def test_rename_dimension_cols(spark): - # Get all MeasurementDimensions as column names - silver_columns = [ - "uut_id", - "container_id", - "file_name", - "data_key", - "start_ts", - "stop_ts", - "project", - "file_path", - ] - - # Create schema with StringType for all columns - schema = T.StructType([T.StructField(col, T.StringType(), True) for col in silver_columns]) +def test_get_dimension_raises_on_missing_silver_column(spark): + """Missing silver columns must surface a clear error, not be silently dropped.""" + base_path = os.path.dirname(os.path.abspath(__file__)) + base_path = base_path[: base_path.find("tests")] + config_path = os.path.join(base_path, "tests", "data", "config", "config.json") - # Create empty DataFrame - df = spark.createDataFrame([], schema) + with open(config_path) as f: + config_dict = json.load(f) + config_dict["measurement_dimensions"] = ["container_id", "definitely_not_in_silver"] + + my_report: Report = Report( + name="my_report", + spark=spark, + workspace_client=create_autospec(WorkspaceClient), + config=config_dict, + ) - renamed_df = ContainerDimension._rename_dimension_cols( - df, [dim for dim in MeasurementDimensions] + with pytest.raises(ValueError, match="definitely_not_in_silver"): + ContainerDimension.get_dimension( + spark, my_report.query, my_report.solver, my_report.config + ) + + +def test_user_list_is_respected_verbatim(): + """A user-supplied list passes through unchanged — no auto-injection of container_id.""" + config = ImpulseConfig.model_validate( + { + "source": { + "container_metrics_table": "c.s.container_metrics", + "channel_metrics_table": "c.s.channel_metrics", + "channels_uri": "c.s.channels", + }, + "measurement_dimensions": ["uut_id", "start_ts"], + } + ) + assert config.measurement_dimensions == ["uut_id", "start_ts"] + + +def test_measurement_dimensions_dedupes_preserving_order(): + config = ImpulseConfig.model_validate( + { + "source": { + "container_metrics_table": "c.s.container_metrics", + "channel_metrics_table": "c.s.channel_metrics", + "channels_uri": "c.s.channels", + }, + "measurement_dimensions": [ + "container_id", + "uut_id", + "uut_id", + "start_ts", + "container_id", + ], + } ) - expected_column_names = [ - "container_id", - "uut_id", - "project_id", - "file_name", - "source_file_path", - "start_ts", - "stop_ts", - ] - - assert len(renamed_df.columns) == len(expected_column_names) - for column_name in renamed_df.columns: - assert ( - column_name in expected_column_names - ), f"Column {column_name} is not in the expected list of renamed columns." + assert config.measurement_dimensions == ["container_id", "uut_id", "start_ts"] def test_config_hashing(spark): @@ -63,22 +83,25 @@ def test_config_hashing(spark): impulse_config = ImpulseConfig(**config_dict) silver_columns = ["uut_id"] - - # Create schema with StringType for all columns schema = T.StructType([T.StructField(col, T.StringType(), True) for col in silver_columns]) + df = spark.createDataFrame([("test_vehicle",)], schema) result = df.transform(ContainerDimension._add_config_hash(impulse_config)) - expected_result = [Row(uut_id="test_vehicle", config_hash=1267386821)] - assert expected_result == result.collect() + rows = result.collect() + assert len(rows) == 1 + assert rows[0].uut_id == "test_vehicle" + assert rows[0].config_hash is not None + # Hashing must be deterministic for the same config. + second_run = df.transform(ContainerDimension._add_config_hash(impulse_config)).collect() + assert rows[0].config_hash == second_run[0].config_hash empty_df = spark.createDataFrame([], schema) - result = empty_df.transform(ContainerDimension._add_config_hash(impulse_config)) - assert result.count() == 0 + assert empty_df.transform(ContainerDimension._add_config_hash(impulse_config)).count() == 0 def test_container_dimensions_default_col_order(spark): - """Test if the default column order of the container dimensions is correct.""" + """Default measurement_dimensions surfaces only container_id, start_ts, stop_ts.""" base_path = os.path.dirname(os.path.abspath(__file__)) base_path = base_path[: base_path.find("tests")] config_path = os.path.join(base_path, "tests", "data", "config", "config.json") @@ -95,16 +118,9 @@ def test_container_dimensions_default_col_order(spark): config=config_dict, ) - # Definition of relevant channels dimensions_df = ContainerDimension.get_dimension( spark, my_report.query, my_report.solver, my_report.config ) - assert dimensions_df.columns[0] == MeasurementDimensions.CONTAINER_ID.value - assert dimensions_df.columns[1] == MeasurementDimensions.UUT_ID.value - assert dimensions_df.columns[2] == MeasurementDimensions.FILE_NAME.value - assert dimensions_df.columns[3] == MeasurementDimensions.SOURCE_FILE_PATH.value - assert dimensions_df.columns[4] == MeasurementDimensions.START_TS.value - assert dimensions_df.columns[5] == MeasurementDimensions.STOP_TS.value - assert dimensions_df.columns[6] == MeasurementDimensions.PROJECT_ID.value - assert dimensions_df.columns[7] == MeasurementDimensions.ENVIRONMENT.value + # Default list + the always-added config_hash column. + assert dimensions_df.columns == ["container_id", "start_ts", "stop_ts", "config_hash"] diff --git a/tests/unit/data/config/config.json b/tests/unit/data/config/config.json index 576ff46..b924370 100644 --- a/tests/unit/data/config/config.json +++ b/tests/unit/data/config/config.json @@ -33,6 +33,6 @@ "measurement_dimensions": [ "uut_id", "file_name", - "source_file_path" + "file_path" ] } From 7715afa04aacb110206d6cabf069688f4b565bd1 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Wed, 27 May 2026 22:53:10 +0200 Subject: [PATCH 2/3] formatting --- .../analyze/query/solvers/delta_solver_column_mapping_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py index 3db9284..c1c7e00 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/delta_solver_column_mapping_test.py @@ -310,9 +310,7 @@ def test_filter_container_metrics_exposes_internal_ts_names(self, spark, db_cust assert {"start_ts", "stop_ts"}.issubset(set(result.columns)) assert "t_start" not in result.columns assert "t_stop" not in result.columns - rows = sorted( - (row.container_id, row.start_ts, row.stop_ts) for row in result.collect() - ) + rows = sorted((row.container_id, row.start_ts, row.stop_ts) for row in result.collect()) assert rows == [(1, 1000, 3000), (2, 1000, 3000), (3, 1000, 3000)] From b2f2b777653771f9ba5d7f5646765c5fb75dee42 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Wed, 27 May 2026 23:01:03 +0200 Subject: [PATCH 3/3] Enhance documentation for internal column mappings in configuration - Updated `configuration.md` to clarify the purpose of `tstart` and `tend` in the `channels` table. - Added detailed sections in `silver_layer_schema.md` to document internal columns referenced by the framework across `container_metrics`, `container_tags`, `channel_metrics`, and `channel_tags`. - Improved clarity on how to map silver columns to internal names using `solver_config` for better integration and understanding of the data model. --- docs/impulse/docs/config/configuration.md | 3 +- .../docs/data_model/silver_layer_schema.md | 115 ++++++++++++++++++ 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/docs/impulse/docs/config/configuration.md b/docs/impulse/docs/config/configuration.md index f19ae42..c59a58c 100644 --- a/docs/impulse/docs/config/configuration.md +++ b/docs/impulse/docs/config/configuration.md @@ -183,7 +183,8 @@ Internal column names that mappings can target: |-----------------|----------------------------------------------------------| | `container_id` | Container identifier | | `channel_id` | Channel identifier | -| `tstart`, `tend`| Sample interval start/end (RLE) | +| `tstart`, `tend`| Sample interval start/end on the `channels` table (RLE) | +| `start_ts`, `stop_ts` | Measurement start/stop epoch timestamps on the `container_metrics` table — referenced by `ContainerEvent` to derive event-fact start/end | | `value` | Sample value (or attribute value on the EAV tag table) | | `key` | Attribute key on the EAV `container_tags` table | | `priority` | Tie-breaker column on the `channel_mapping` table | diff --git a/docs/impulse/docs/data_model/silver_layer_schema.md b/docs/impulse/docs/data_model/silver_layer_schema.md index ffc94d0..dc1c127 100644 --- a/docs/impulse/docs/data_model/silver_layer_schema.md +++ b/docs/impulse/docs/data_model/silver_layer_schema.md @@ -104,6 +104,25 @@ count. | `duration_ms` | `int` | Yes | Total duration in milliseconds. | | `num_channels` | `int` | Yes | Number of channels in the container. | +#### Internal columns referenced by the framework + +The framework hard-references the following internal names on `container_metrics`. Map any silver column to one of these via +[`solver_config.container_metrics.column_name_mapping`](../config/configuration.md#solver-column-mappings-and-filters) +when your physical column has a different name. + +| Internal name | Referenced by | +|---------------|-------------------------------------------------------------------------------| +| `container_id`| Join key against `container_tags`, upsert key in incremental mode | +| `start_ts` | `ContainerEvent` event-fact `start_ts`; default `measurement_dimensions` entry | +| `stop_ts` | `ContainerEvent` event-fact `end_ts`; default `measurement_dimensions` entry | +| `project_id` | `KeyValueStoreSolver` project scoping (when `solver_config.project_id` is set) | + +Every other column on `container_metrics` is **pass-through**: it lands in +the gold `measurement_dimension` table verbatim if you list it in +[`measurement_dimensions`](../config/configuration.md#measurement_dimensions-optional), +and can be used in `container_filters.metric_filters`; the framework does +not reference it under any specific internal name. + ### Additional columns commonly populated `container_metrics` typically carries additional metadata columns that @@ -154,6 +173,20 @@ TSAL queries select recordings by tag key (e.g. | `key` | `string` | Yes | Tag key (e.g. `"vehicle_key"`, `"project_id"`). | | `value` | `string` | Yes | Tag value. | +#### Internal columns referenced by the framework + +Map any silver column to these via +[`solver_config.container_tags.column_name_mapping`](../config/configuration.md#solver-column-mappings-and-filters) +when your physical column has a different name. + +| Internal name | Referenced by | +|---------------|------------------------------------------------------------------------------------------------| +| `container_id`| Join key into `container_metrics` after tag filtering | +| `key` | EAV pivot key — driven by `query.havingTag(...)` and `container_filters.tag_filters` | +| `value` | EAV pivot value — driven by `query.havingTag(...)` and `container_filters.tag_filters` | +| `project_id` | `KeyValueStoreSolver` project scoping (when `solver_config.project_id` is set, and this table carries a project column) | +| `parent_id` | Optional per-table filter target (e.g. `solver_config.container_tags.filters = {"parent_id": ...}`) | + --- ## channel_metrics @@ -191,6 +224,20 @@ via `COALESCE(channel_metrics.unit, channel_mapping.source_unit)`. The column is not part of the canonical schema — omit it for layouts that don't need per-channel physical units. +#### Internal columns referenced by the framework + +Map any silver column to these via +[`solver_config.channel_metrics.column_name_mapping`](../config/configuration.md#solver-column-mappings-and-filters) +when your physical column has a different name. + +| Internal name | Referenced by | +|----------------|--------------------------------------------------------------------------------------------| +| `container_id` | Composite join key with `channels` and `channel_tags` | +| `channel_id` | Composite join key with `channels` and `channel_tags` | +| `channel_name` | Default join target on the `channel_mapping`→`channel_metrics` alias-resolution join | +| `data_key` | Default join target on the `channel_mapping`→`channel_metrics` alias-resolution join | +| `unit` | Authoritative source unit on aliased reads (takes precedence over `channel_mapping.source_unit`) | + --- ## channel_tags @@ -207,6 +254,20 @@ selectors look up signals by tag key (e.g. | `key` | `string` | Yes | Tag key (e.g. `"channel_name"`, `"brand"`, `"model"`). | | `value` | `string` | Yes | Tag value. | +#### Internal columns referenced by the framework + +Map any silver column to these via +[`solver_config.channel_tags.column_name_mapping`](../config/configuration.md#solver-column-mappings-and-filters) +when your physical column has a different name. Note: `channel_tags` is +read by `DeltaSolver` only. + +| Internal name | Referenced by | +|---------------|------------------------------------------------------------------------------| +| `container_id`| Composite join key with `channel_metrics` and `channels` | +| `channel_id` | Composite join key with `channel_metrics` and `channels` | +| `key` | EAV pivot key — driven by `query.channel(...)` selector kwargs | +| `value` | EAV pivot value — driven by `query.channel(...)` selector kwargs | + --- ## channels @@ -249,6 +310,24 @@ constructed with `drop_implausible_data=True` — in that mode, samples with `is_plausible = False` are filtered before RLE encoding. If the flag is `False` (the default), the column is ignored and may be omitted. +#### Internal columns referenced by the framework + +Map any silver column to these via +[`solver_config.channels.column_name_mapping`](../config/configuration.md#solver-column-mappings-and-filters) +when your physical column has a different name. + +| Internal name | Referenced by | +|---------------|----------------------------------------------------------------------------------------| +| `container_id`| Composite key joining samples back to their container | +| `channel_id` | Composite key joining samples back to their channel | +| `tstart` | Sample interval start (RLE format) — consumed by the solve UDF | +| `tend` | Sample interval end (RLE format) — consumed by the solve UDF | +| `value` | Sample value — consumed by the solve UDF and aggregations | + +For raw-format `channels`, the same internal names apply except that +`timestamp` replaces the `tstart`/`tend` pair; the engine derives `tend` +during raw→RLE conversion. + --- ## channel_mapping (optional) @@ -287,6 +366,29 @@ conversions to the same channel in the same query. Workarounds: select the conflicting aliases in **separate queries**, or align the mapping rows so they agree on the unit pair per physical channel. +#### Internal columns referenced by the framework + +Map any silver column to these via +[`solver_config.channel_mapping.column_name_mapping`](../config/configuration.md#solver-column-mappings-and-filters) +when your physical column has a different name. Note: `channel_mapping` +is read by `KeyValueStoreSolver` only. + +| Internal name | Referenced by | +|------------------|------------------------------------------------------------------------------------------------| +| `channel_alias` | The user-facing alias selector kwarg in `query.channel_with_alias(channel_alias=...)` | +| `channel_name` | Default join target on the `channel_mapping`→`channel_metrics` join | +| `data_key` | Default join target on the `channel_mapping`→`channel_metrics` join | +| `source_channel` | Alias-resolution source on the `channel_mapping`→`channel_metrics` join | +| `priority` | Tie-breaker when multiple physical channels match a logical alias | +| `project_id` | `KeyValueStoreSolver` project scoping (when `solver_config.project_id` is set) | +| `source_unit` | Source unit fallback (when paired with a `unit_conversion_table`) | +| `target_unit` | Target unit for aliased reads (when paired with a `unit_conversion_table`) | + +To override the default `(source_channel, channel_name) + (data_key, data_key)` +composite join, set +[`channel_mapping.join_keys`](../config/configuration.md#alias-resolution-join-keys-optional) +explicitly. + --- ## unit_conversion (optional) @@ -307,5 +409,18 @@ whose `unit` matches `source_unit`) and `target_factor` (the row whose multiplies values by `source_factor / target_factor`. Missing rows or a `group_id` mismatch yield a null factor and no conversion. +#### Internal columns referenced by the framework + +Map any silver column to these via +[`solver_config.unit_conversion.column_name_mapping`](../config/configuration.md#solver-column-mappings-and-filters) +when your physical column has a different name. Note: `unit_conversion` +is read by `KeyValueStoreSolver` only. + +| Internal name | Referenced by | +|----------------------|--------------------------------------------------------------------------------------------| +| `group_id` | Constraint that source and target units live in the same family | +| `unit` | Lookup key matched against `channel_mapping.source_unit` / `target_unit` | +| `conversion_factor` | Multiplier to the family's base unit; combined into the per-channel `source/target` factor | + Configured via `source.unit_conversion_table` (see [Configuration](../config/configuration.md)).