diff --git a/TM1py/Services/ElementService.py b/TM1py/Services/ElementService.py index 701cc47a..5b31970b 100644 --- a/TM1py/Services/ElementService.py +++ b/TM1py/Services/ElementService.py @@ -321,10 +321,36 @@ def _build_unwind_hierarchy_edges_from_blob_process( hierarchyupdate_process.metadata_procedure = metadata_statement return hierarchyupdate_process - def get_elements(self, dimension_name: str, hierarchy_name: str, **kwargs) -> List[Element]: + def get_elements( + self, + dimension_name: str, + hierarchy_name: str, + element_type: Optional[Union[int, str, "Element.Types", Iterable]] = None, + name_pattern: Optional[str] = None, + level: Optional[int] = None, + **kwargs, + ) -> List[Element]: + """Get all elements as Element objects, optionally filtered. + + :param dimension_name: Name of the dimension. + :param hierarchy_name: Name of the hierarchy. + :param element_type: Restrict to elements of the given type(s). Accepts an + ``Element.Types`` enum value, a string ('numeric'/'string'/'consolidated', + case-insensitive), an int (1/2/3), or an iterable of any of those (OR-combined). + :param name_pattern: Restrict to elements whose name matches the glob pattern. + Supports ``*`` wildcard (use ``foo*``, ``*foo``, ``*foo*``, or exact ``foo``). + ``?`` is not supported. Matching is case- and space-insensitive. + :param level: Restrict to elements at the given hierarchy level (0 = leaf). + :return: List of Element objects. + """ url = format_url( - "/Dimensions('{}')/Hierarchies('{}')/Elements?select=Name,Type", dimension_name, hierarchy_name + "/Dimensions('{}')/Hierarchies('{}')/Elements?$select=Name,Type", + dimension_name, + hierarchy_name, ) + filter_clause = _build_elements_filter(element_type, name_pattern, level) + if filter_clause: + url += "&$filter=" + filter_clause response = self._rest.GET(url, **kwargs) return [Element.from_dict(element) for element in response.json()["value"]] @@ -345,6 +371,9 @@ def get_elements_dataframe( allow_empty_alias: bool = True, attribute_suffix: bool = False, element_type_column: str = "Type", + element_type: Optional[Union[int, str, "Element.Types", Iterable]] = None, + name_pattern: Optional[str] = None, + level: Optional[int] = None, **kwargs, ) -> "pd.DataFrame": """ @@ -363,6 +392,16 @@ def get_elements_dataframe( :param allow_empty_alias: False if empty alias values should be substituted with element names instead :param attribute_suffix: True if attribute columns should have ':a', ':s' or ':n' suffix :param element_type_column: The column name in the df which specifies which element is which type. + :param element_type: Restrict to elements of the given type(s). Accepts an + ``Element.Types`` enum value, a string ('numeric'/'string'/'consolidated', + case-insensitive), an int (1/2/3), or an iterable of any of those. + Only applied when ``elements`` is None. When explicitly set, overrides + ``skip_consolidations``. + :param name_pattern: Restrict to elements whose name matches the glob pattern + (``*`` wildcard, case- and space-insensitive). Only applied when ``elements`` + is None. + :param level: Restrict to elements at the given hierarchy level (0 = leaf). + Only applied when ``elements`` is None. :return: pandas DataFrame """ @@ -381,10 +420,39 @@ def get_elements_dataframe( unique_name = record[0][0]["UniqueName"] dimension_name, hierarchy_name, _ = dimension_hierarchy_element_tuple_from_unique_name(unique_name) + trio_filter_active = element_type is not None or name_pattern is not None or level is not None if elements is None or not any(elements): - elements = f"{{ [{dimension_name}].[{hierarchy_name}].Members }}" - if skip_consolidations: - elements = f"{{ Tm1FilterByLevel({elements}, 0) }}" + if trio_filter_active: + # Trio filter explicitly set. Resolve to a concrete element list via the + # filtered get_element_names path. The trio is authoritative and overrides + # skip_consolidations. + resolved = self.get_element_names( + dimension_name=dimension_name, + hierarchy_name=hierarchy_name, + element_type=element_type, + name_pattern=name_pattern, + level=level, + ) + if resolved: + elements = ( + "{" + ",".join(f"[{dimension_name}].[{hierarchy_name}].[{member}]" for member in resolved) + "}" + ) + else: + # Empty match. Filter the full Members set against an + # unreachably high level so the MDX produces zero rows but the + # downstream pipeline still emits the full column schema + # (dimension name, attributes, levels, parents). A bare "{}" + # axis would lose the dimension column and break the final + # pd.merge on dimension_name. + empty_set_level = 9999 + elements = ( + f"{{ Tm1FilterByLevel({{ [{dimension_name}].[{hierarchy_name}].Members }}, " + f"{empty_set_level}) }}" + ) + else: + elements = f"{{ [{dimension_name}].[{hierarchy_name}].Members }}" + if skip_consolidations: + elements = f"{{ Tm1FilterByLevel({elements}, 0) }}" if not isinstance(elements, str): if isinstance(elements, Iterable): @@ -404,8 +472,13 @@ def get_elements_dataframe( ) ] + # When the trio filter is active, the resolved element list is authoritative. + # Fetch the full type lookup so consolidated members survive the inner-join below. + element_types_skip_consolidations = False if trio_filter_active else skip_consolidations element_types = self.get_element_types( - dimension_name=dimension_name, hierarchy_name=hierarchy_name, skip_consolidations=skip_consolidations + dimension_name=dimension_name, + hierarchy_name=hierarchy_name, + skip_consolidations=element_types_skip_consolidations, ) df = pd.DataFrame( @@ -702,14 +775,36 @@ def get_string_element_names(self, dimension_name: str, hierarchy_name: str, **k response = self._rest.GET(url, **kwargs) return [e["Name"] for e in response.json()["value"]] - def get_element_names(self, dimension_name: str, hierarchy_name: str, **kwargs) -> List[str]: - """Get all element names - - :param dimension_name: - :param hierarchy_name: - :return: Generator of element-names + def get_element_names( + self, + dimension_name: str, + hierarchy_name: str, + element_type: Optional[Union[int, str, "Element.Types", Iterable]] = None, + name_pattern: Optional[str] = None, + level: Optional[int] = None, + **kwargs, + ) -> List[str]: + """Get all element names, optionally filtered. + + :param dimension_name: Name of the dimension. + :param hierarchy_name: Name of the hierarchy. + :param element_type: Restrict to elements of the given type(s). Accepts an + ``Element.Types`` enum value, a string ('numeric'/'string'/'consolidated', + case-insensitive), an int (1/2/3), or an iterable of any of those (OR-combined). + :param name_pattern: Restrict to elements whose name matches the glob pattern. + Supports ``*`` wildcard (use ``foo*``, ``*foo``, ``*foo*``, or exact ``foo``). + ``?`` is not supported. Matching is case- and space-insensitive. + :param level: Restrict to elements at the given hierarchy level (0 = leaf). + :return: List of element names. """ - url = format_url("/Dimensions('{}')/Hierarchies('{}')/Elements?$select=Name", dimension_name, hierarchy_name) + url = format_url( + "/Dimensions('{}')/Hierarchies('{}')/Elements?$select=Name", + dimension_name, + hierarchy_name, + ) + filter_clause = _build_elements_filter(element_type, name_pattern, level) + if filter_clause: + url += "&$filter=" + filter_clause response = self._rest.GET(url, **kwargs) return [e["Name"] for e in response.json()["value"]] @@ -759,43 +854,33 @@ def get_all_leaf_element_identifiers( return self.get_element_identifiers(dimension_name, hierarchy_name, mdx_elements, **kwargs) def get_elements_by_level(self, dimension_name: str, hierarchy_name: str, level: int, **kwargs) -> List[str]: - """Get all element names by level in a hierarchy + """Get all element names by level in a hierarchy. :param dimension_name: Name of the dimension :param hierarchy_name: Name of the hierarchy :param level: Level to filter :return: List of element names """ - url = format_url( - "/Dimensions('{}')/Hierarchies('{}')/Elements?$select=Name&$filter=Level eq {}", - dimension_name, - hierarchy_name, - str(level), - ) - response = self._rest.GET(url, **kwargs) - return [e["Name"] for e in response.json()["value"]] + return self.get_element_names(dimension_name, hierarchy_name, level=level, **kwargs) def get_elements_filtered_by_wildcard( self, dimension_name: str, hierarchy_name: str, wildcard: str, level: int = None, **kwargs ) -> List[str]: - """Get all element names filtered by wildcard (CaseAndSpaceInsensitive) and level in a hierarchy + """Get all element names filtered by wildcard (case- and space-insensitive contains) and optional level. :param dimension_name: Name of the dimension :param hierarchy_name: Name of the hierarchy - :param wildcard: wildcard to filter - :param level: Level to filter + :param wildcard: substring to match (case- and space-insensitive contains) + :param level: Optional level to filter :return: List of element names """ - filter_elements = format_url("contains(tolower(replace(Name,' ','')),tolower(replace('{}',' ', '')))", wildcard) - if level is not None: - filter_elements = filter_elements + f" and Level eq {level}" - url = format_url( - "/Dimensions('{}')/Hierarchies('{}')/Elements?$select=Name&$filter=" + filter_elements, + return self.get_element_names( dimension_name, hierarchy_name, + name_pattern=f"*{wildcard}*", + level=level, + **kwargs, ) - response = self._rest.GET(url, **kwargs) - return [e["Name"] for e in response.json()["value"]] def get_all_element_identifiers( self, dimension_name: str, hierarchy_name: str, **kwargs @@ -1553,3 +1638,160 @@ def _get_cell_service(self): from TM1py import CellService return CellService(self._rest) + + +# --------------------------------------------------------------------------- +# Filtering helpers (private, module-level) +# --------------------------------------------------------------------------- + +_TYPE_NAME_TO_CODE = {"numeric": 1, "string": 2, "consolidated": 3} +_VALID_TYPE_CODES = {1, 2, 3} +_INVALID_ELEMENT_TYPE_MSG = ( + "Invalid element_type {value!r}: expected 'numeric', 'string', 'consolidated', Element.Types enum, or int 1/2/3" +) + + +def _coerce_one_element_type(value) -> int: + """Coerce a single element_type input to its OData type code (1, 2, or 3). + + Accepts Element.Types enum, str (case-insensitive), or int in {1, 2, 3}. + Raises ValueError on anything else. + """ + # Reject bool first: bool is a subclass of int in Python, but True/False + # are not valid type codes. + if isinstance(value, bool): + raise ValueError(_INVALID_ELEMENT_TYPE_MSG.format(value=value)) + if isinstance(value, Element.Types): + return int(value.value) + if isinstance(value, str): + code = _TYPE_NAME_TO_CODE.get(value.lower()) + if code is None: + raise ValueError(_INVALID_ELEMENT_TYPE_MSG.format(value=value)) + return code + if isinstance(value, int) and value in _VALID_TYPE_CODES: + return value + raise ValueError(_INVALID_ELEMENT_TYPE_MSG.format(value=value)) + + +def _coerce_element_types(value) -> List[int]: + """Normalize element_type input to a deduplicated list of OData type codes. + + :param value: Element.Types enum, str ('numeric'/'string'/'consolidated', case-insensitive), + int in {1, 2, 3}, an iterable of any of those, or None. + :return: Deduplicated list of OData type codes (e.g. [1, 3]), preserving first-seen order. + Returns [] when value is None. + :raises ValueError: on unknown values or empty iterables. + """ + if value is None: + return [] + + # Single-value path (these come BEFORE the iterable check because str is iterable) + if isinstance(value, (Element.Types, str, int)): + return [_coerce_one_element_type(value)] + + # Iterable path + try: + items = list(value) + except TypeError: + raise ValueError(_INVALID_ELEMENT_TYPE_MSG.format(value=value)) + if not items: + raise ValueError("element_type list cannot be empty (pass None to skip the filter)") + + seen, out = set(), [] + for item in items: + code = _coerce_one_element_type(item) + if code not in seen: + seen.add(code) + out.append(code) + return out + + +def _odata_str_literal(s: str) -> str: + """Wrap a string in single quotes, doubling embedded single quotes per OData spec.""" + return "'" + s.replace("'", "''") + "'" + + +def _normalize_for_match(s: str) -> str: + """Lowercase and strip spaces; mirrors what we apply to the Name property server-side.""" + return s.replace(" ", "").lower() + + +def _pattern_to_odata(pattern: str) -> str: + """Translate a glob pattern (with '*' wildcards) to an OData filter fragment + that matches case- and space-insensitively against the Name property. + + Supports '*' only. '?' raises ValueError (caller responsibility to validate).""" + name_expr = "tolower(replace(Name,' ',''))" + + leading_anchor = not pattern.startswith("*") + trailing_anchor = not pattern.endswith("*") + inner = [s for s in pattern.split("*") if s != ""] + + if not inner: + # Pattern was '*' or '**': matches everything. Emit a tautology. + return f"{name_expr} eq {name_expr}" + + inner_lits = [_odata_str_literal(_normalize_for_match(s)) for s in inner] + + # Single inner segment, both anchored: exact match + if len(inner) == 1 and leading_anchor and trailing_anchor: + return f"{name_expr} eq {inner_lits[0]}" + + parts = [] + for i, lit in enumerate(inner_lits): + is_first = i == 0 + is_last = i == len(inner_lits) - 1 + if is_first and leading_anchor: + parts.append(f"startswith({name_expr},{lit})") + elif is_last and trailing_anchor: + parts.append(f"endswith({name_expr},{lit})") + else: + parts.append(f"contains({name_expr},{lit})") + return " and ".join(parts) + + +def _build_elements_filter( + element_type: Optional[Union[int, str, Element.Types, Iterable]], + name_pattern: Optional[str], + level: Optional[int], +) -> str: + """Build an OData $filter clause (without the leading '$filter=') from the optional + type / pattern / level filters. + + :param element_type: see _coerce_element_types + :param name_pattern: glob with '*' wildcard, case- and space-insensitive. Supports exact, + 'foo*', '*foo', '*foo*', and multi-segment (e.g. '*eu*region*'). '?' is not supported. + :param level: hierarchy level (>= 0). 0 is the leaf level. + :return: OData filter clause string, or '' if all three args are None. + :raises ValueError: on invalid type/pattern/level values. + :raises TypeError: when name_pattern is not str or level is not int. + """ + clauses: List[str] = [] + + # Type clause + type_codes = _coerce_element_types(element_type) + if type_codes: + if len(type_codes) == 1: + clauses.append(f"Type eq {type_codes[0]}") + else: + clauses.append("(" + " or ".join(f"Type eq {c}" for c in type_codes) + ")") + + # Pattern clause + if name_pattern is not None: + if not isinstance(name_pattern, str): + raise TypeError(f"name_pattern must be str, got {type(name_pattern).__name__}") + if name_pattern == "": + raise ValueError("name_pattern cannot be empty (pass None to skip the filter)") + if "?" in name_pattern: + raise ValueError("'?' wildcard not supported in name_pattern, only '*'") + clauses.append(_pattern_to_odata(name_pattern)) + + # Level clause + if level is not None: + if isinstance(level, bool) or not isinstance(level, int): + raise TypeError(f"level must be int, got {type(level).__name__}") + if level < 0: + raise ValueError("level must be >= 0") + clauses.append(f"Level eq {level}") + + return " and ".join(clauses) diff --git a/Tests/ElementService_filtering_helpers_test.py b/Tests/ElementService_filtering_helpers_test.py new file mode 100644 index 00000000..b43b5188 --- /dev/null +++ b/Tests/ElementService_filtering_helpers_test.py @@ -0,0 +1,231 @@ +import unittest + +from TM1py.Objects import Element +from TM1py.Services.ElementService import _build_elements_filter, _coerce_element_types + + +class TestCoerceElementTypes(unittest.TestCase): + def test_none_returns_empty_list(self): + self.assertEqual(_coerce_element_types(None), []) + + def test_enum_numeric(self): + self.assertEqual(_coerce_element_types(Element.Types.NUMERIC), [1]) + + def test_enum_string(self): + self.assertEqual(_coerce_element_types(Element.Types.STRING), [2]) + + def test_enum_consolidated(self): + self.assertEqual(_coerce_element_types(Element.Types.CONSOLIDATED), [3]) + + def test_str_lowercase(self): + self.assertEqual(_coerce_element_types("numeric"), [1]) + + def test_str_mixed_case(self): + self.assertEqual(_coerce_element_types("Numeric"), [1]) + + def test_str_uppercase(self): + self.assertEqual(_coerce_element_types("NUMERIC"), [1]) + + def test_str_string(self): + self.assertEqual(_coerce_element_types("string"), [2]) + + def test_str_consolidated(self): + self.assertEqual(_coerce_element_types("consolidated"), [3]) + + def test_int_codes(self): + self.assertEqual(_coerce_element_types(1), [1]) + self.assertEqual(_coerce_element_types(2), [2]) + self.assertEqual(_coerce_element_types(3), [3]) + + def test_list_of_ints(self): + self.assertEqual(_coerce_element_types([1, 3]), [1, 3]) + + def test_list_mixed_input_shapes(self): + self.assertEqual( + _coerce_element_types([1, "string", Element.Types.CONSOLIDATED]), + [1, 2, 3], + ) + + def test_list_dedupes_preserving_order(self): + self.assertEqual(_coerce_element_types([3, 1, 1, "consolidated", "Numeric"]), [3, 1]) + + def test_tuple_works(self): + self.assertEqual(_coerce_element_types((1, 2)), [1, 2]) + + def test_invalid_string_raises(self): + with self.assertRaisesRegex(ValueError, "Invalid element_type 'bogus'"): + _coerce_element_types("bogus") + + def test_invalid_int_too_low_raises(self): + with self.assertRaisesRegex(ValueError, "Invalid element_type 0"): + _coerce_element_types(0) + + def test_invalid_int_too_high_raises(self): + with self.assertRaisesRegex(ValueError, "Invalid element_type 4"): + _coerce_element_types(4) + + def test_empty_list_raises(self): + with self.assertRaisesRegex(ValueError, "cannot be empty"): + _coerce_element_types([]) + + def test_list_with_invalid_entry_raises(self): + with self.assertRaisesRegex(ValueError, "Invalid element_type 'bogus'"): + _coerce_element_types([1, "bogus"]) + + def test_bool_not_treated_as_int(self): + # True/False are technically ints in Python but should not coerce to type 1 + with self.assertRaisesRegex(ValueError, "Invalid element_type"): + _coerce_element_types(True) + + +class TestBuildElementsFilter(unittest.TestCase): + NAME_EXPR = "tolower(replace(Name,' ',''))" + + # --- empty / no-op --- + def test_all_none_returns_empty(self): + self.assertEqual(_build_elements_filter(None, None, None), "") + + # --- type only --- + def test_type_single(self): + self.assertEqual(_build_elements_filter(1, None, None), "Type eq 1") + + def test_type_list_two(self): + self.assertEqual(_build_elements_filter([1, 3], None, None), "(Type eq 1 or Type eq 3)") + + def test_type_list_three(self): + self.assertEqual( + _build_elements_filter([1, 2, 3], None, None), + "(Type eq 1 or Type eq 2 or Type eq 3)", + ) + + def test_type_via_enum(self): + self.assertEqual(_build_elements_filter(Element.Types.NUMERIC, None, None), "Type eq 1") + + def test_type_via_string(self): + self.assertEqual(_build_elements_filter("Consolidated", None, None), "Type eq 3") + + # --- pattern only --- + def test_pattern_exact(self): + self.assertEqual( + _build_elements_filter(None, "Region", None), + f"{self.NAME_EXPR} eq 'region'", + ) + + def test_pattern_startswith(self): + self.assertEqual( + _build_elements_filter(None, "Region*", None), + f"startswith({self.NAME_EXPR},'region')", + ) + + def test_pattern_endswith(self): + self.assertEqual( + _build_elements_filter(None, "*Region", None), + f"endswith({self.NAME_EXPR},'region')", + ) + + def test_pattern_contains(self): + self.assertEqual( + _build_elements_filter(None, "*Region*", None), + f"contains({self.NAME_EXPR},'region')", + ) + + def test_pattern_strips_spaces_in_literal(self): + self.assertEqual( + _build_elements_filter(None, "Region 1", None), + f"{self.NAME_EXPR} eq 'region1'", + ) + + def test_pattern_lowercases_literal(self): + self.assertEqual( + _build_elements_filter(None, "REGION*", None), + f"startswith({self.NAME_EXPR},'region')", + ) + + def test_pattern_multi_contains_bare(self): + # *foo*bar* -> contains(foo) and contains(bar) + self.assertEqual( + _build_elements_filter(None, "*foo*bar*", None), + f"contains({self.NAME_EXPR},'foo') and contains({self.NAME_EXPR},'bar')", + ) + + def test_pattern_startswith_with_middle_contains(self): + # foo*mid*bar* -> startswith(foo) and contains(mid) and contains(bar) + self.assertEqual( + _build_elements_filter(None, "foo*mid*bar*", None), + f"startswith({self.NAME_EXPR},'foo') and contains({self.NAME_EXPR},'mid') and contains({self.NAME_EXPR},'bar')", + ) + + def test_pattern_endswith_with_middle_contains(self): + # *foo*mid*bar -> contains(foo) and contains(mid) and endswith(bar) + self.assertEqual( + _build_elements_filter(None, "*foo*mid*bar", None), + f"contains({self.NAME_EXPR},'foo') and contains({self.NAME_EXPR},'mid') and endswith({self.NAME_EXPR},'bar')", + ) + + def test_pattern_startswith_endswith(self): + # foo*bar -> startswith(foo) and endswith(bar) + self.assertEqual( + _build_elements_filter(None, "foo*bar", None), + f"startswith({self.NAME_EXPR},'foo') and endswith({self.NAME_EXPR},'bar')", + ) + + def test_pattern_quote_escaping(self): + self.assertEqual( + _build_elements_filter(None, "*O'Brien*", None), + f"contains({self.NAME_EXPR},'o''brien')", + ) + + def test_pattern_only_asterisks_matches_all(self): + # '*' alone matches everything; emit a tautology + result = _build_elements_filter(None, "*", None) + self.assertEqual(result, f"{self.NAME_EXPR} eq {self.NAME_EXPR}") + + # --- level only --- + def test_level_zero(self): + self.assertEqual(_build_elements_filter(None, None, 0), "Level eq 0") + + def test_level_nonzero(self): + self.assertEqual(_build_elements_filter(None, None, 2), "Level eq 2") + + # --- composed --- + def test_all_three_composed(self): + self.assertEqual( + _build_elements_filter(1, "*foo*", 0), + f"Type eq 1 and contains({self.NAME_EXPR},'foo') and Level eq 0", + ) + + def test_type_list_with_pattern_and_level(self): + self.assertEqual( + _build_elements_filter([1, 3], "Region*", 1), + f"(Type eq 1 or Type eq 3) and startswith({self.NAME_EXPR},'region') and Level eq 1", + ) + + # --- validation errors --- + def test_pattern_question_mark_raises(self): + with self.assertRaisesRegex(ValueError, r"'\?' wildcard not supported"): + _build_elements_filter(None, "foo?", None) + + def test_pattern_empty_raises(self): + with self.assertRaisesRegex(ValueError, "cannot be empty"): + _build_elements_filter(None, "", None) + + def test_pattern_non_string_raises(self): + with self.assertRaisesRegex(TypeError, "name_pattern must be str"): + _build_elements_filter(None, 123, None) + + def test_level_negative_raises(self): + with self.assertRaisesRegex(ValueError, "must be >= 0"): + _build_elements_filter(None, None, -1) + + def test_level_non_int_raises(self): + with self.assertRaisesRegex(TypeError, "level must be int"): + _build_elements_filter(None, None, "0") + + def test_level_bool_raises(self): + # bool is a subclass of int in Python; reject anyway since it's meaningless here + with self.assertRaisesRegex(TypeError, "level must be int"): + _build_elements_filter(None, None, True) + + +if __name__ == "__main__": + unittest.main() diff --git a/Tests/ElementService_test.py b/Tests/ElementService_test.py index 92efdb52..e6a5cac2 100644 --- a/Tests/ElementService_test.py +++ b/Tests/ElementService_test.py @@ -1,5 +1,6 @@ import configparser import copy +import json import unittest from pathlib import Path @@ -1611,5 +1612,438 @@ def tearDownClass(cls): cls.tm1.logout() +class TestElementFiltering(unittest.TestCase): + """Tests for the element_type / name_pattern / level kwargs on + get_elements and get_element_names. + + Fixture cleanup is registered via self.addCleanup(...) inside setUp, + so there is no tearDown method. + """ + + tm1: TM1Service + + @classmethod + def setUpClass(cls): + cls.config = configparser.ConfigParser() + cls.config.read(Path(__file__).parent.joinpath("config.ini")) + cls.tm1 = TM1Service(**cls.config["tm1srv01"]) + + @classmethod + def tearDownClass(cls): + cls.tm1.logout() + + def setUp(self): + """Create a fixture dimension with a known, predictable element set. + + Level 0 (leaves): + Numeric: 'Numeric A', 'Numeric B', 'Numeric C', "O'Brien" + String: 'String A', 'String B' + Level 1 (consolidations): + 'Region North' (parent of Numeric A, Numeric B) + 'Region South' (parent of Numeric C, "O'Brien") + Level 2 (top consolidation): + 'Total Regions' (parent of both regions) + """ + dimension_uuid = generate_test_uuid() + self.dimension_name = f"TM1py_unittest_filter_{dimension_uuid}" + self.hierarchy_name = self.dimension_name + + d = Dimension(self.dimension_name) + h = Hierarchy(self.dimension_name, self.hierarchy_name) + + h.add_element("Numeric A", "Numeric") + h.add_element("Numeric B", "Numeric") + h.add_element("Numeric C", "Numeric") + h.add_element("O'Brien", "Numeric") + h.add_element("String A", "String") + h.add_element("String B", "String") + + h.add_element("Region North", "Consolidated") + h.add_element("Region South", "Consolidated") + h.add_element("Total Regions", "Consolidated") + + h.add_edge("Region North", "Numeric A", 1) + h.add_edge("Region North", "Numeric B", 1) + h.add_edge("Region South", "Numeric C", 1) + h.add_edge("Region South", "O'Brien", 1) + h.add_edge("Total Regions", "Region North", 1) + h.add_edge("Total Regions", "Region South", 1) + + # Add a placeholder attribute so the }ElementAttributes_ cube is + # created. get_elements_dataframe requires this cube to exist. + h.add_element_attribute("Description", "String") + + d.add_hierarchy(h) + self.tm1.dimensions.update_or_create(d) + self.addCleanup(self._cleanup_dimension) + + def _cleanup_dimension(self): + if self.tm1.dimensions.exists(self.dimension_name): + self.tm1.dimensions.delete(self.dimension_name) + + def test_fixture_creates(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name) + self.assertEqual( + set(names), + { + "Numeric A", + "Numeric B", + "Numeric C", + "O'Brien", + "String A", + "String B", + "Region North", + "Region South", + "Total Regions", + }, + ) + + # ------------------------------------------------------------------ + # get_element_names: element_type filter + # ------------------------------------------------------------------ + + def test_names_element_type_numeric(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, element_type="numeric") + self.assertEqual(set(names), {"Numeric A", "Numeric B", "Numeric C", "O'Brien"}) + + def test_names_element_type_string(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, element_type="string") + self.assertEqual(set(names), {"String A", "String B"}) + + def test_names_element_type_consolidated(self): + names = self.tm1.elements.get_element_names( + self.dimension_name, self.hierarchy_name, element_type="consolidated" + ) + self.assertEqual(set(names), {"Region North", "Region South", "Total Regions"}) + + def test_names_element_type_enum(self): + names = self.tm1.elements.get_element_names( + self.dimension_name, self.hierarchy_name, element_type=Element.Types.NUMERIC + ) + self.assertEqual(set(names), {"Numeric A", "Numeric B", "Numeric C", "O'Brien"}) + + def test_names_element_type_list_numeric_and_consolidated(self): + """Stated use case: 'all non-string elements'.""" + names = self.tm1.elements.get_element_names( + self.dimension_name, + self.hierarchy_name, + element_type=["numeric", "consolidated"], + ) + self.assertEqual( + set(names), + { + "Numeric A", + "Numeric B", + "Numeric C", + "O'Brien", + "Region North", + "Region South", + "Total Regions", + }, + ) + + # ------------------------------------------------------------------ + # get_element_names: name_pattern filter + # ------------------------------------------------------------------ + + def test_names_pattern_exact(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, name_pattern="Numeric A") + self.assertEqual(names, ["Numeric A"]) + + def test_names_pattern_startswith(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, name_pattern="Numeric*") + self.assertEqual(set(names), {"Numeric A", "Numeric B", "Numeric C"}) + + def test_names_pattern_endswith(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, name_pattern="*A") + # Numeric A, String A. Region North contains 'a' but does not endswith. + self.assertEqual(set(names), {"Numeric A", "String A"}) + + def test_names_pattern_contains(self): + # 'Total Regions' matches because after space-stripping the normalized name 'totalregions' contains 'region'. + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, name_pattern="*Region*") + self.assertEqual(set(names), {"Region North", "Region South", "Total Regions"}) + + def test_names_pattern_case_insensitive(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, name_pattern="numeric*") + self.assertEqual(set(names), {"Numeric A", "Numeric B", "Numeric C"}) + + def test_names_pattern_space_insensitive(self): + # 'NumericA' (no space) should match 'Numeric A' + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, name_pattern="NumericA") + self.assertEqual(names, ["Numeric A"]) + + def test_names_pattern_with_quote(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, name_pattern="*O'Brien*") + self.assertEqual(names, ["O'Brien"]) + + # ------------------------------------------------------------------ + # get_element_names: level filter + # ------------------------------------------------------------------ + + def test_names_level_zero(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, level=0) + self.assertEqual( + set(names), + {"Numeric A", "Numeric B", "Numeric C", "O'Brien", "String A", "String B"}, + ) + + def test_names_level_one(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, level=1) + self.assertEqual(set(names), {"Region North", "Region South"}) + + def test_names_level_two(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, level=2) + self.assertEqual(names, ["Total Regions"]) + + # ------------------------------------------------------------------ + # get_element_names: composed (AND) + # ------------------------------------------------------------------ + + def test_names_type_and_pattern(self): + # Numeric elements containing 'A' + names = self.tm1.elements.get_element_names( + self.dimension_name, + self.hierarchy_name, + element_type="numeric", + name_pattern="*A*", + ) + self.assertEqual(set(names), {"Numeric A"}) + + def test_names_type_and_level(self): + # Consolidated at level 1 + names = self.tm1.elements.get_element_names( + self.dimension_name, + self.hierarchy_name, + element_type="consolidated", + level=1, + ) + self.assertEqual(set(names), {"Region North", "Region South"}) + + def test_names_all_three_composed(self): + # Numeric leaves whose normalized name ends in 'c'. After space-stripping + # and lowercasing, only "Numeric C" qualifies. ("Numeric A"/"Numeric B" + # also contain the letter 'c' from "numeric" so a *C* pattern would + # match all three; use endswith to single out "Numeric C".) + names = self.tm1.elements.get_element_names( + self.dimension_name, + self.hierarchy_name, + element_type="numeric", + name_pattern="*C", + level=0, + ) + self.assertEqual(names, ["Numeric C"]) + + # ------------------------------------------------------------------ + # get_element_names: no filter sanity check + # ------------------------------------------------------------------ + + def test_names_no_filter_returns_all(self): + names = self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name) + # 4 numeric + 2 string + 3 consolidated + self.assertEqual(len(names), 9) + + # ------------------------------------------------------------------ + # get_element_names: validation passthrough + # ------------------------------------------------------------------ + + def test_names_invalid_type_raises(self): + with self.assertRaises(ValueError): + self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, element_type="bogus") + + def test_names_question_mark_raises(self): + with self.assertRaises(ValueError): + self.tm1.elements.get_element_names(self.dimension_name, self.hierarchy_name, name_pattern="foo?bar") + + # ------------------------------------------------------------------ + # get_elements (returns List[Element]) + # ------------------------------------------------------------------ + + def test_elements_type_numeric(self): + elements = self.tm1.elements.get_elements(self.dimension_name, self.hierarchy_name, element_type="numeric") + self.assertEqual( + {e.name for e in elements}, + {"Numeric A", "Numeric B", "Numeric C", "O'Brien"}, + ) + self.assertTrue(all(e.element_type == Element.Types.NUMERIC for e in elements)) + + def test_elements_pattern_and_level(self): + elements = self.tm1.elements.get_elements( + self.dimension_name, + self.hierarchy_name, + name_pattern="Region*", + level=1, + ) + self.assertEqual( + {e.name for e in elements}, + {"Region North", "Region South"}, + ) + + def test_elements_no_filter_returns_all_with_types(self): + elements = self.tm1.elements.get_elements(self.dimension_name, self.hierarchy_name) + self.assertEqual(len(elements), 9) + # Confirm we get the Type attribute populated (existing behavior preserved) + types = {e.element_type for e in elements} + self.assertEqual( + types, + {Element.Types.NUMERIC, Element.Types.STRING, Element.Types.CONSOLIDATED}, + ) + + def test_elements_quote_escape(self): + elements = self.tm1.elements.get_elements(self.dimension_name, self.hierarchy_name, name_pattern="*O'Brien*") + self.assertEqual([e.name for e in elements], ["O'Brien"]) + self.assertEqual(elements[0].element_type, Element.Types.NUMERIC) + + # ------------------------------------------------------------------ + # Regression: verify behavior of typed methods is preserved after they + # are refactored to delegate to get_element_names. Snapshots in + # Tests/fixtures/element_filtering_snapshots/ were generated against + # master before the refactor. + # ------------------------------------------------------------------ + + SNAPSHOT_DIR = Path(__file__).parent / "fixtures" / "element_filtering_snapshots" + + def _load_snapshot(self, name): + path = self.SNAPSHOT_DIR / name + if not path.exists(): + self.fail( + f"Snapshot '{name}' not found at {self.SNAPSHOT_DIR}. " + f"Regenerate by re-running the snapshot generator from the plan's " + f"Phase 3 / Task 3.1." + ) + with open(path) as f: + return json.load(f) + + def test_regression_by_level_0(self): + actual = self.tm1.elements.get_elements_by_level(self.dimension_name, self.hierarchy_name, level=0) + expected = self._load_snapshot("by_level_0.json") + self.assertEqual(sorted(actual), expected) + + def test_regression_by_level_1(self): + actual = self.tm1.elements.get_elements_by_level(self.dimension_name, self.hierarchy_name, level=1) + expected = self._load_snapshot("by_level_1.json") + self.assertEqual(sorted(actual), expected) + + def test_regression_by_level_2(self): + actual = self.tm1.elements.get_elements_by_level(self.dimension_name, self.hierarchy_name, level=2) + expected = self._load_snapshot("by_level_2.json") + self.assertEqual(sorted(actual), expected) + + def test_regression_wildcard_cases(self): + """Verify get_elements_filtered_by_wildcard preserves case+space-insensitive contains.""" + for i in range(5): + snap = self._load_snapshot(f"wildcard_{i}.json") + actual = self.tm1.elements.get_elements_filtered_by_wildcard( + self.dimension_name, + self.hierarchy_name, + wildcard=snap["wildcard"], + level=snap["level"], + ) + self.assertEqual( + sorted(actual), + snap["result"], + msg=( + f"wildcard_{i}: wildcard={snap['wildcard']!r} level={snap['level']}, " + f"got {sorted(actual)!r}, expected {snap['result']!r}" + ), + ) + + # ------------------------------------------------------------------ + # get_elements_dataframe with trio kwargs + # ------------------------------------------------------------------ + + @skip_if_no_pandas + def test_dataframe_element_type_numeric(self): + df = self.tm1.elements.get_elements_dataframe( + self.dimension_name, + self.hierarchy_name, + element_type="numeric", + skip_consolidations=False, + ) + names = set(df[self.dimension_name].tolist()) + self.assertEqual(names, {"Numeric A", "Numeric B", "Numeric C", "O'Brien"}) + + @skip_if_no_pandas + def test_dataframe_pattern(self): + df = self.tm1.elements.get_elements_dataframe( + self.dimension_name, + self.hierarchy_name, + name_pattern="Region*", + ) + names = set(df[self.dimension_name].tolist()) + self.assertEqual(names, {"Region North", "Region South"}) + + @skip_if_no_pandas + def test_dataframe_level(self): + df = self.tm1.elements.get_elements_dataframe( + self.dimension_name, + self.hierarchy_name, + level=0, + skip_consolidations=False, + ) + names = set(df[self.dimension_name].tolist()) + self.assertEqual( + names, + {"Numeric A", "Numeric B", "Numeric C", "O'Brien", "String A", "String B"}, + ) + + @skip_if_no_pandas + def test_dataframe_trio_composed(self): + df = self.tm1.elements.get_elements_dataframe( + self.dimension_name, + self.hierarchy_name, + element_type="numeric", + name_pattern="*A*", + level=0, + ) + names = set(df[self.dimension_name].tolist()) + self.assertEqual(names, {"Numeric A"}) + + @skip_if_no_pandas + def test_dataframe_element_type_overrides_skip_consolidations(self): + """When element_type is explicitly set, skip_consolidations is ignored + (documented in docstring).""" + df = self.tm1.elements.get_elements_dataframe( + self.dimension_name, + self.hierarchy_name, + element_type=["numeric", "consolidated"], + skip_consolidations=True, # would normally drop consolidations + ) + names = set(df[self.dimension_name].tolist()) + # Consolidations should be present despite skip_consolidations=True + self.assertIn("Region North", names) + self.assertIn("Region South", names) + self.assertIn("Total Regions", names) + + @skip_if_no_pandas + def test_dataframe_regression_no_filter(self): + """Without trio kwargs, get_elements_dataframe matches the snapshot from master.""" + import pandas as pd + + snapshot = pd.read_csv(self.SNAPSHOT_DIR / "dataframe_default.csv") + df = self.tm1.elements.get_elements_dataframe(self.dimension_name, self.hierarchy_name) + # Snapshot's first column is the snapshot's dimension name; the test's + # df uses a different dimension name. Compare row sets on element name + type. + snap_first = snapshot.columns[0] + df_first = df.columns[0] + snap_rows = sorted(zip(snapshot[snap_first].tolist(), snapshot["Type"].tolist())) + df_rows = sorted(zip(df[df_first].tolist(), df["Type"].tolist())) + self.assertEqual(snap_rows, df_rows) + + @skip_if_no_pandas + def test_dataframe_trio_empty_match_preserves_schema(self): + """When the trio filter matches zero elements, the returned DataFrame must + still carry the full column schema (attributes, levels, parents) so callers + relying on df[''] don't see KeyError.""" + df_full = self.tm1.elements.get_elements_dataframe(self.dimension_name, self.hierarchy_name) + df_empty = self.tm1.elements.get_elements_dataframe( + self.dimension_name, + self.hierarchy_name, + name_pattern="NonExistentNameThatMatchesNothing*", + ) + self.assertEqual(list(df_full.columns), list(df_empty.columns)) + self.assertEqual(len(df_empty), 0) + + if __name__ == "__main__": unittest.main() diff --git a/Tests/fixtures/element_filtering_snapshots/by_level_0.json b/Tests/fixtures/element_filtering_snapshots/by_level_0.json new file mode 100644 index 00000000..15b4e805 --- /dev/null +++ b/Tests/fixtures/element_filtering_snapshots/by_level_0.json @@ -0,0 +1,8 @@ +[ + "Numeric A", + "Numeric B", + "Numeric C", + "O'Brien", + "String A", + "String B" +] \ No newline at end of file diff --git a/Tests/fixtures/element_filtering_snapshots/by_level_1.json b/Tests/fixtures/element_filtering_snapshots/by_level_1.json new file mode 100644 index 00000000..daa52dc9 --- /dev/null +++ b/Tests/fixtures/element_filtering_snapshots/by_level_1.json @@ -0,0 +1,4 @@ +[ + "Region North", + "Region South" +] \ No newline at end of file diff --git a/Tests/fixtures/element_filtering_snapshots/by_level_2.json b/Tests/fixtures/element_filtering_snapshots/by_level_2.json new file mode 100644 index 00000000..cbbd1f87 --- /dev/null +++ b/Tests/fixtures/element_filtering_snapshots/by_level_2.json @@ -0,0 +1,3 @@ +[ + "Total Regions" +] \ No newline at end of file diff --git a/Tests/fixtures/element_filtering_snapshots/dataframe_default.csv b/Tests/fixtures/element_filtering_snapshots/dataframe_default.csv new file mode 100644 index 00000000..a17d8768 --- /dev/null +++ b/Tests/fixtures/element_filtering_snapshots/dataframe_default.csv @@ -0,0 +1,7 @@ +TM1py_snapshot_fixture,Type,Description,level001_Weight,level000_Weight,level001,level000 +String A,String,,0.000000,0.000000,, +String B,String,,0.000000,0.000000,, +Numeric A,Numeric,,1.000000,1.000000,Region North,Total Regions +Numeric B,Numeric,,1.000000,1.000000,Region North,Total Regions +Numeric C,Numeric,,1.000000,1.000000,Region South,Total Regions +O'Brien,Numeric,,1.000000,1.000000,Region South,Total Regions diff --git a/Tests/fixtures/element_filtering_snapshots/wildcard_0.json b/Tests/fixtures/element_filtering_snapshots/wildcard_0.json new file mode 100644 index 00000000..b49a9b3f --- /dev/null +++ b/Tests/fixtures/element_filtering_snapshots/wildcard_0.json @@ -0,0 +1,9 @@ +{ + "wildcard": "region", + "level": null, + "result": [ + "Region North", + "Region South", + "Total Regions" + ] +} \ No newline at end of file diff --git a/Tests/fixtures/element_filtering_snapshots/wildcard_1.json b/Tests/fixtures/element_filtering_snapshots/wildcard_1.json new file mode 100644 index 00000000..2af069a5 --- /dev/null +++ b/Tests/fixtures/element_filtering_snapshots/wildcard_1.json @@ -0,0 +1,9 @@ +{ + "wildcard": "REGION", + "level": null, + "result": [ + "Region North", + "Region South", + "Total Regions" + ] +} \ No newline at end of file diff --git a/Tests/fixtures/element_filtering_snapshots/wildcard_2.json b/Tests/fixtures/element_filtering_snapshots/wildcard_2.json new file mode 100644 index 00000000..7ef42e93 --- /dev/null +++ b/Tests/fixtures/element_filtering_snapshots/wildcard_2.json @@ -0,0 +1,7 @@ +{ + "wildcard": "Region North", + "level": null, + "result": [ + "Region North" + ] +} \ No newline at end of file diff --git a/Tests/fixtures/element_filtering_snapshots/wildcard_3.json b/Tests/fixtures/element_filtering_snapshots/wildcard_3.json new file mode 100644 index 00000000..d902b656 --- /dev/null +++ b/Tests/fixtures/element_filtering_snapshots/wildcard_3.json @@ -0,0 +1,9 @@ +{ + "wildcard": "numeric", + "level": 0, + "result": [ + "Numeric A", + "Numeric B", + "Numeric C" + ] +} \ No newline at end of file diff --git a/Tests/fixtures/element_filtering_snapshots/wildcard_4.json b/Tests/fixtures/element_filtering_snapshots/wildcard_4.json new file mode 100644 index 00000000..8756b72e --- /dev/null +++ b/Tests/fixtures/element_filtering_snapshots/wildcard_4.json @@ -0,0 +1,5 @@ +{ + "wildcard": "numeric", + "level": 1, + "result": [] +} \ No newline at end of file