Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import abc
import operator
import zlib
from collections.abc import Callable
from collections.abc import Callable, Iterable
from typing import TYPE_CHECKING, Any

import pyspark.sql.types as T
Expand Down Expand Up @@ -197,6 +197,48 @@ def get_selectors(self) -> list["TimeSeriesSelector"]:
"""
pass

@staticmethod
def collect_selectors(
expressions: Iterable[Any],
uses_alias: bool | None = None,
) -> list["TimeSeriesSelector"]:
"""Collect deduplicated leaf selectors from a list of expressions.

Iterates each item, skips anything that isn't a
:class:`TimeSeriesExpression`, walks ``get_selectors()``, applies
an optional ``uses_alias`` filter, and deduplicates by
``selector_id`` preserving discovery order.

Parameters
----------
expressions : Iterable[Any]
Items to walk; non-``TimeSeriesExpression`` entries are
silently skipped (e.g. the ``selections`` list on a
``QueryBuilder`` may carry other selector kinds).
uses_alias : bool or None, optional
When ``True``, keep only alias selectors; when ``False``,
keep only direct selectors; when ``None`` (default), keep
all.

Returns
-------
list of TimeSeriesSelector
Deduplicated selectors in discovery order.
"""
selectors: list["TimeSeriesSelector"] = []
seen_ids: set = set()
for expression in expressions:
if not isinstance(expression, TimeSeriesExpression):
continue
for selector in expression.get_selectors():
if uses_alias is not None and selector.uses_alias != uses_alias:
continue
if selector.selector_id in seen_ids:
continue
seen_ids.add(selector.selector_id)
selectors.append(selector)
return selectors

@abc.abstractmethod
def __str__(self) -> str:
"""
Expand Down
36 changes: 6 additions & 30 deletions src/impulse_query_engine/analyze/query/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,34 +174,6 @@ def select(self, *args) -> Self:
self.selections = list(args)
return self

def _collect_time_series_selectors(self, uses_alias=None) -> list[TimeSeriesSelector]:
"""Collect deduplicated leaf selectors from this query's selections.

Parameters
----------
uses_alias : bool or None, optional
When ``True``, keep only alias selectors; when ``False``, keep
only direct selectors; when ``None`` (default), keep all.

Returns
-------
list of TimeSeriesSelector
Deduplicated selectors in discovery order.
"""
selectors = []
seen_selector_ids = set()
for expression in self.selections:
if not isinstance(expression, TimeSeriesExpression):
continue
for selector in expression.get_selectors():
if uses_alias is not None and selector.uses_alias != uses_alias:
continue
if selector.selector_id in seen_selector_ids:
continue
seen_selector_ids.add(selector.selector_id)
selectors.append(selector)
return selectors

def _determine_result_objects_dtypes(self, default_dtype: T = T.DoubleType()):
"""
Determine result objects and their data types for the selections.
Expand Down Expand Up @@ -261,8 +233,12 @@ def solve(
) = self._determine_result_objects_dtypes()

# extract selectors upfront
direct_selectors = self._collect_time_series_selectors(uses_alias=False)
aliased_selectors = self._collect_time_series_selectors(uses_alias=True)
direct_selectors = TimeSeriesExpression.collect_selectors(
self.selections, uses_alias=False
)
aliased_selectors = TimeSeriesExpression.collect_selectors(
self.selections, uses_alias=True
)

# create Query
tags_df = solver.filter_container_tags(spark, self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,15 @@ def filter_aliased_channel_metrics(
Returns
-------
pyspark.sql.DataFrame
DataFrame with ``(container_id, channel_id, selector_ids)``
where ``selector_ids`` is an array column. When unit conversion
is active (see above), also carries ``source_unit`` and
``target_unit`` columns.
DataFrame with
``(container_id, channel_id, <metrics-side join keys>,
channel_alias, alias_priority, selector_ids)`` where
``selector_ids`` is an array column. The metrics-side join key
columns come from ``effective_alias_join_keys`` (default:
``channel_name``, ``data_key``) and are deduplicated in case the
same physical column appears on both sides of a join-key tuple.
When unit conversion is active (see above), also carries
``source_unit`` and ``target_unit`` columns.
"""
container_id_col = self.config.container_id_col
channel_id_col = self.config.channel_id_col
Expand Down Expand Up @@ -488,7 +493,15 @@ def filter_aliased_channel_metrics(
resolved = resolved.withColumn(
"selector_ids", F.array(self._build_selector_id_expr(selectors))
)
out_cols = [container_id_col, channel_id_col, "selector_ids"]
join_key_metrics_cols = list(dict.fromkeys(metrics_col for _, metrics_col in join_keys))
out_cols = [
container_id_col,
channel_id_col,
*join_key_metrics_cols,
channel_alias_col,
alias_priority_col,
"selector_ids",
]
if has_unit_cols:
out_cols.extend([source_unit_col, target_unit_col])
return resolved.select(*out_cols)
Expand Down Expand Up @@ -543,8 +556,22 @@ def resolve_channel_selections(
and target_unit_col in aliased_channel_metrics_df.columns
)

# ``filter_aliased_channel_metrics`` emits extra columns
# (metrics-side join keys, channel_alias, alias_priority) for the
# channel mapping resolution dimension; the solve pipeline only
# consumes (container_id, channel_id, selector_ids[, source_unit,
# target_unit]) and unionByName requires matching schemas.
aliased_solve_cols = [
self.config.container_id_col,
self.config.channel_id_col,
"selector_ids",
]
if has_unit_cols:
aliased_solve_cols.extend([source_unit_col, target_unit_col])
aliased_for_union = aliased_channel_metrics_df.select(*aliased_solve_cols)

merged = channel_metrics_df.unionByName(
aliased_channel_metrics_df, allowMissingColumns=has_unit_cols
aliased_for_union, allowMissingColumns=has_unit_cols
)

agg_exprs = [F.flatten(F.collect_list("selector_ids")).alias("selector_ids")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,12 @@ def filter_aliased_channel_metrics(
Returns
-------
pyspark.sql.DataFrame
DataFrame with ``(container_id, channel_id, selector_ids)``
where ``selector_ids`` is an array column.
DataFrame with
``(container_id, channel_id, <metrics-side join keys>,
channel_alias, alias_priority, selector_ids)`` where
``selector_ids`` is an array column. Implementations that
support unit conversion additionally include ``source_unit``
and ``target_unit`` columns.
"""
raise NotImplementedError(
f"{self.__class__.__name__} does not support aliased channel resolution"
Expand Down
46 changes: 45 additions & 1 deletion src/impulse_reporting/core/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
from impulse_reporting.incremental.definition_hash_comparator import (
DefinitionHashComparator,
)
from impulse_reporting.meta.container_dimensions import ContainerDimension
from impulse_reporting.meta.container_dimensions import (
ChannelMappingResolutionDimension,
ContainerDimension,
)
from impulse_reporting.persist.report_storage import (
ReportEntityTransformer,
Sink,
Expand Down Expand Up @@ -96,6 +99,7 @@ def __init__(
self.aggregation_dfs = {}
self.aggregation_metadata_dfs = {}
self.container_dimension_df = None
self.channel_mapping_resolution_dimension_df = None
self._is_incremental = None

if config:
Expand Down Expand Up @@ -591,6 +595,12 @@ def _persist_full(self):
uri = writer.get_output_uri()
writer.write(self.container_dimension_df, uri=uri)

# persist channel mapping resolution dimension
if self.channel_mapping_resolution_dimension_df is not None:
writer = storage_factory.create_channel_mapping_resolution_dimension_writer()
uri = writer.get_output_uri()
writer.write(self.channel_mapping_resolution_dimension_df, uri=uri)

@telemetry_logger("report", "determine_report")
def _persist_incremental(
self,
Expand Down Expand Up @@ -738,6 +748,25 @@ def _persist_incremental(
df_enriched = self.container_dimension_df.transform(transformer.add_meta_information)
self.sink.upsert(df_enriched, uri, ["container_id"])

# Persist channel mapping resolution dimension
# (upsert by container_id, channel_id, channel_alias)
if self.channel_mapping_resolution_dimension_df is not None:
writer = storage_factory.create_channel_mapping_resolution_dimension_writer()
uri = writer.get_output_uri()
df_enriched = self.channel_mapping_resolution_dimension_df.transform(
transformer.add_meta_information
)
solver_cfg = self.solver.config
self.sink.upsert(
df_enriched,
uri,
[
solver_cfg.container_id_col,
solver_cfg.channel_id_col,
solver_cfg.channel_alias_col,
],
)

def _transform_for_persistence(
self,
df: DataFrame,
Expand Down Expand Up @@ -1001,6 +1030,21 @@ def determine_report(self, is_incremental: bool = None):
pre_filtered_containers_df=pre_filtered_containers_df,
)

# Determine channel mapping resolution dimension
aliased_selectors = TimeSeriesExpression.collect_selectors(
all_changed_expressions + all_unchanged_expressions,
uses_alias=True,
)
self.channel_mapping_resolution_dimension_df = (
ChannelMappingResolutionDimension.get_dimension(
spark=self.spark,
query=self.query,
solver=self.solver,
aliased_selectors=aliased_selectors,
pre_filtered_containers_df=pre_filtered_containers_df,
)
)

def _resolve_is_incremental(self, is_incremental: bool = None) -> bool:
"""
Resolve the processing mode considering signature, config, and gold layer.
Expand Down
83 changes: 83 additions & 0 deletions src/impulse_reporting/meta/container_dimensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession

from impulse_query_engine.analyze.metadata.time_series_expression import (
TimeSeriesSelector,
)
from impulse_query_engine.analyze.query.query_builder import QueryBuilder
from impulse_query_engine.analyze.query.solvers.query_solver import QuerySolver
from impulse_reporting.config.config_parser import ImpulseConfig
Expand Down Expand Up @@ -102,3 +105,83 @@ def _(df: DataFrame) -> DataFrame:
return df.withColumn("config_hash", F.hash(F.lit(config_hash)))

return _


class ChannelMappingResolutionDimension:
"""Helper class to handle the channel mapping resolution dimension.

Persists the result of
:meth:`QuerySolver.filter_aliased_channel_metrics` so downstream BI
consumers can join on ``(container_id, channel_id, channel_alias)``
to recover the physical join keys, alias priority, and resolved unit
pair.
"""

@staticmethod
def get_dimension(
spark: SparkSession,
query: QueryBuilder,
solver: QuerySolver,
aliased_selectors: list[TimeSeriesSelector],
pre_filtered_containers_df: DataFrame = None,
) -> DataFrame | None:
"""
Compute the channel mapping resolution dimension for the report.

Returns ``None`` when the report has no aliased selectors — there
is nothing to resolve, and the persist step is a no-op.

Otherwise runs the solver's container-side filter pipeline
(``filter_container_tags`` → ``filter_container_metrics``) so the
result honors ``pre_filtered_containers_df`` for incremental mode,
then calls ``filter_aliased_channel_metrics`` with the aliased
selectors collected from the report's events and aggregations.
The internal ``selector_ids`` column is dropped since it is a
runtime artifact and not part of the dimension contract.

Notes
-----
Solver-capability is *not* checked here. If a report has aliased
selectors, the configured solver must support alias resolution —
otherwise ``QueryBuilder.solve`` upstream of this call has
already raised ``NotImplementedError``. We rely on that invariant
instead of introspecting the solver class.

Parameters
----------
spark : SparkSession
Spark session for data processing.
query : QueryBuilder
The query builder used for the report.
solver : QuerySolver
The solver instance to use for query execution.
aliased_selectors : list[TimeSeriesSelector]
Aliased selectors (``uses_alias=True``) collected from the
report's events and aggregations. May be empty.
pre_filtered_containers_df : DataFrame, optional
Pre-filtered containers for incremental processing.

Returns
-------
DataFrame or None
DataFrame with columns
``(container_id, channel_id, <metrics-side join keys>,
channel_alias, alias_priority[, source_unit, target_unit])``,
or ``None`` if the report has no aliased selectors.
"""
if not aliased_selectors:
return None

container_tags_df = solver.filter_container_tags(spark, query)
container_df = solver.filter_container_metrics(
spark, query, container_tags_df, pre_filtered_containers_df
)

resolved = solver.filter_aliased_channel_metrics(
spark, query.db, container_df, aliased_selectors
)

if "selector_ids" in resolved.columns:
resolved = resolved.drop("selector_ids")

return resolved
Loading
Loading