From 71ac4e78d409770bc4abcd623bfb2d5020b0c017 Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Wed, 20 May 2026 19:08:21 -0300 Subject: [PATCH 1/2] feat: TM1 chore as a first-class task kind (closes #156) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces a polymorphic task abstraction where each task carries exactly one of `process` (TI process) or `chore` (TM1 chore). The field name is the discriminator — no meta-`kind` field. Mutual exclusion is enforced at parse-time validation and as a class invariant on `Task.__init__`, so downstream code can rely on exactly one being set. Chores are intentionally narrower than processes: no parameters, no minor-error tier, no native timeout. `safe_retry` is honoured only for SINGLE_COMMIT chores so partial state cannot leak on failure. The existence + execution-mode checks are added to `validate_tasks` and deduplicated by (instance, chore_name) so each chore is fetched at most once per run. JSON, TXT, and cube-source readers all accept chore tasks. TXT now runs through `validate_taskfile` on read, closing a pre-existing gap that silently accepted malformed input. Cube reader raises on rows with both `process` and `chore` populated. Schema changes are additive: a new `chore` measure element on the results cube (auto-merged by existing `_merge_measure_dimension_elements` path), a new `vchore` TI variable + `CellPutS` line in both v11 and v12 `}rushti.load.results` bodies, and a `chore TEXT` column on the SQLite `task_results` table with PRAGMA-driven migration. Dashboard replaces the per-task "Process" column with a unified "Task target" column carrying a `[P]` / `[C]` kind indicator in both the runtime dashboard and the DAG visualization templates. CHANGELOG records the two behavioural changes (TI schema, TXT validation tightening). Co-Authored-By: Claude Opus 4.7 (1M context) --- CHANGELOG.md | 28 ++ CONTEXT.md | 33 +- src/rushti/dashboard.py | 78 +++- src/rushti/execution.py | 232 +++++++++++- src/rushti/logging.py | 23 +- src/rushti/messages.py | 25 ++ src/rushti/optimizer.py | 6 +- src/rushti/parsing.py | 27 +- src/rushti/stats/dynamodb.py | 11 +- src/rushti/stats/repository.py | 1 + src/rushti/stats/signature.py | 33 +- src/rushti/stats/sqlite.py | 40 ++- src/rushti/task.py | 80 ++++- src/rushti/taskfile.py | 97 ++++- src/rushti/taskfile_ops.py | 30 +- src/rushti/templates/visualization.html | 25 +- src/rushti/tm1_integration.py | 33 +- src/rushti/tm1_objects.py | 10 + src/rushti/visualization_template.py | 25 +- tests/unit/test_chore_kind.py | 460 ++++++++++++++++++++++++ tests/unit/test_taskfile.py | 12 +- 21 files changed, 1185 insertions(+), 124 deletions(-) create mode 100644 tests/unit/test_chore_kind.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 027088e..746da60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,34 @@ All notable changes to RushTI are documented in this file. +## Unreleased — `feat/issue-156-chore-task-kind` + +- **Added: TM1 chore execution as a first-class task kind** (closes #156). + Mixed process + chore taskfiles are now supported across JSON, TXT, and + cube sources. A task carries exactly one of `process` or `chore` — the + field name is the discriminator. Chores are intentionally narrower than + processes: no parameters, no minor-error tier, no native timeout; only + `safe_retry` is honoured (and restricted to `SINGLE_COMMIT` chores so + partial state cannot leak on failure). See `docs/adr/0002-polymorphic-task-kinds.md` + for the full rationale. +- Added: `chore TEXT` column on the `task_results` SQLite table. + Auto-migrated on first connection — no manual step required. +- **Behavioural change:** `}rushti.load.results` TI schema changed. A new + `chore` measure element is added to the cube and a matching `vchore` + variable to the TI variable list. CSVs written by the previous version + cannot be loaded by the new TI. Mitigation: flush any pending CSVs + before upgrade (default `pDeleteSourceFile=1` makes CSVs transient in + practice), then re-run `rushti build --tm1-instance X` so the additive + merge adds the new measure element to the existing cube. +- **Behavioural change:** TXT taskfiles now run through `validate_taskfile` + on read. Pre-existing malformed TXT files (missing fields, wrong types, + chore tasks carrying process-only fields) that previously parsed + silently will now fail with explicit error messages. This closes a + long-standing gap; the fix surfaces bugs that were always present. +- Dashboard: the per-task "Process" column is replaced by a unified + "Task target" column with a `[P]` / `[C]` kind indicator so process + and chore rows render side-by-side. + ## Unreleased — `feat/issue-154-v12-load-results` - Fix: `rushti build` now installs a TM1-version-aware `}rushti.load.results` diff --git a/CONTEXT.md b/CONTEXT.md index a5ac4dd..6930f29 100644 --- a/CONTEXT.md +++ b/CONTEXT.md @@ -35,12 +35,39 @@ Use **"workflow"** as an adjective for scope ("workflow-level setting", "per-workflow override") rather than as a synonym for "taskfile". ### Task -A single TI process execution inside a taskfile. Each task has: +A single execution unit inside a taskfile. Each task has: - `id` — a positive integer string, used as an element name in `rushti_task_id`. - `instance` — **the TM1 instance where this task executes** (task-level). -- `process` — the TI process name on that instance. -- `parameters`, `predecessors`, `stage`, `timeout`, etc. +- exactly one **kind field** — either `process` (TI process) or `chore` + (TM1 chore). The field name *is* the discriminator; there is no + meta-`kind` field. See [[adr/0002-polymorphic-task-kinds]]. +- kind-specific and shared optional fields — see **Task kind** below. + +### Task kind +RushTI supports two task kinds, identified by which field names the +execution target: + +| Kind | Field | Applicable optional fields | +|---|---|---| +| **process** (TI process) | `process` | `parameters`, `succeed_on_minor_errors`, `timeout`, `cancel_at_timeout`, `safe_retry`, plus shared (below) | +| **chore** (TM1 chore) | `chore` | `safe_retry` (only when chore is `SINGLE_COMMIT`), plus shared | + +**Shared optional fields** (apply to both kinds): `predecessors`, `stage`, +`require_predecessor_success`. + +A task with both `process` and `chore` set is invalid; a task with +neither is invalid. The invariant is enforced at parse-time validation +*and* as a class invariant in `Task.__init__`. + +Chores are intentionally narrower than processes: +- **No parameters** — TM1 chores have no invocation parameters. +- **No timeout / cancel** — TM1 chore execution has no native timeout. +- **No minor-error tier** — chore execution is binary at the API + boundary (HTTP 204 = success, 500 = failure). +- **Retry is whole-chore.** When `safe_retry: true` on a chore task, + retry re-executes the entire chore. Restricted to SINGLE_COMMIT + chores so partial state never leaks on failure. ### TM1 instance A named TM1 server defined in `config.ini`. Three roles can apply diff --git a/src/rushti/dashboard.py b/src/rushti/dashboard.py index 8a04360..089ea0a 100644 --- a/src/rushti/dashboard.py +++ b/src/rushti/dashboard.py @@ -234,16 +234,26 @@ def _prepare_dashboard_data( } ) - # Build per-task aggregate data (across all runs) + # Build per-task aggregate data (across all runs). Each row carries + # both ``process`` (may be empty for chore rows) and ``chore`` so the + # dashboard can render a unified "Task target" column with a kind + # indicator. ``task_target``/``task_kind`` are the canonical fields + # for display; the raw kind fields stay populated for backward + # compatibility with downstream consumers. task_data: Dict[str, Dict[str, Any]] = {} for tr in task_results: sig = tr["task_signature"] + process_val = tr.get("process") or "" + chore_val = tr.get("chore") or "" if sig not in task_data: task_data[sig] = { "task_signature": sig, "task_id": tr["task_id"], "instance": tr["instance"], - "process": tr["process"], + "process": process_val, + "chore": chore_val, + "task_target": chore_val or process_val, + "task_kind": "chore" if chore_val else "process", "durations": [], "successes": 0, "total": 0, @@ -263,6 +273,9 @@ def _prepare_dashboard_data( "task_id": data["task_id"], "instance": data["instance"], "process": data["process"], + "chore": data["chore"], + "task_target": data["task_target"], + "task_kind": data["task_kind"], "executions": data["total"], "success_rate": ( round(data["successes"] / data["total"] * 100, 1) if data["total"] > 0 else 0 @@ -290,10 +303,15 @@ def _prepare_dashboard_data( outliers = [] for tr in task_results: if tr["duration_seconds"] is not None: + process_val = tr.get("process") or "" + chore_val = tr.get("chore") or "" outliers.append( { "task_id": tr["task_id"], - "process": tr["process"], + "process": process_val, + "chore": chore_val, + "task_target": chore_val or process_val, + "task_kind": "chore" if chore_val else "process", "instance": tr["instance"], "run_id": tr["run_id"], "duration": round(tr["duration_seconds"], 2), @@ -308,10 +326,15 @@ def _prepare_dashboard_data( failures = [] for tr in task_results: if tr["status"] != "Success": + process_val = tr.get("process") or "" + chore_val = tr.get("chore") or "" failures.append( { "task_id": tr["task_id"], - "process": tr["process"], + "process": process_val, + "chore": chore_val, + "task_target": chore_val or process_val, + "task_kind": "chore" if chore_val else "process", "instance": tr["instance"], "run_id": tr["run_id"], "duration": round(tr["duration_seconds"], 2) if tr["duration_seconds"] else 0, @@ -322,13 +345,18 @@ def _prepare_dashboard_data( # Slim task_results for JS (only fields needed for interactive filtering) slim_task_results = [] for tr in task_results: + process_val = tr.get("process") or "" + chore_val = tr.get("chore") or "" slim_task_results.append( { "run_id": tr["run_id"], "task_id": tr["task_id"], "task_signature": tr["task_signature"], "instance": tr["instance"], - "process": tr["process"], + "process": process_val, + "chore": chore_val, + "task_target": chore_val or process_val, + "task_kind": "chore" if chore_val else "process", "status": tr["status"], "duration_seconds": tr["duration_seconds"], "error_message": tr.get("error_message"), @@ -578,6 +606,11 @@ def generate_dashboard( .status-partial {{ color: #D97706; font-weight: 600; }} .high-cv {{ background: #FFF7ED; }} .duration-slow {{ color: #DC2626; font-weight: 600; }} + .kind-tag {{ + display: inline-block; padding: 1px 5px; border-radius: 4px; + font-size: 0.7rem; font-weight: 700; color: #475569; + background: #E2E8F0; margin-right: 4px; + }} /* Accordion */ .accordion {{ margin-bottom: 20px; }} @@ -752,7 +785,7 @@ def generate_dashboard( Task ID Instance - Process + Task target Avg (s) Min (s) Max (s) @@ -768,7 +801,7 @@ def generate_dashboard(

Top 10 Slowest Executions ?How to read: These are the 10 longest-running individual task executions across the selected runs. The "vs Median" column tells you how many times slower each one was compared to the typical task — a high multiplier (e.g., 5×) means that execution was significantly slower than usual and may be worth investigating. Check whether the same task appears multiple times — if so, it is a consistent bottleneck.

- + @@ -779,7 +812,7 @@ def generate_dashboard(

Failed Processes ?How to read: Every task execution that did not succeed is listed here. Look for patterns — does the same task fail repeatedly across runs? That points to a systemic issue. If failures are scattered across different tasks, the problem may be environmental (server load, connectivity). Hover over the Error column to see the full error message for each failure.

Task IDProcessInstanceTask IDTask targetInstance RunDuration (s)vs MedianStatus
- + @@ -1260,14 +1293,14 @@ def generate_dashboard( // Also store a lookup for tooltip labels const sigLabel = {{}}; relevant.forEach(tr => {{ - if (!sigLabel[tr.task_signature]) sigLabel[tr.task_signature] = tr.process; + if (!sigLabel[tr.task_signature]) sigLabel[tr.task_signature] = tr.task_target || tr.process || tr.chore; }}); const datasets = runs.map((run, idx) => {{ const runTasks = relevant.filter(tr => tr.run_id === run.run_id); return {{ label: formatDate(run.start_time), - data: runTasks.map(t => ({{ x: sigIndex[t.task_signature], y: t.duration_seconds || 0, _sig: t.task_signature, _task: t.task_id, _proc: t.process }})), + data: runTasks.map(t => ({{ x: sigIndex[t.task_signature], y: t.duration_seconds || 0, _sig: t.task_signature, _task: t.task_id, _proc: t.task_target || t.process || t.chore }})), backgroundColor: RUN_COLORS[idx % RUN_COLORS.length] + '80', borderColor: RUN_COLORS[idx % RUN_COLORS.length], pointRadius: 3, @@ -1470,7 +1503,10 @@ def generate_dashboard( const sig = tr.task_signature; if (!taskData[sig]) {{ taskData[sig] = {{ - task_id: tr.task_id, instance: tr.instance, process: tr.process, + task_id: tr.task_id, instance: tr.instance, + process: tr.process, chore: tr.chore, + task_target: tr.task_target || tr.process || tr.chore, + task_kind: tr.task_kind || (tr.chore ? 'chore' : 'process'), durations: [], successes: 0, total: 0 }}; }} @@ -1492,7 +1528,7 @@ def generate_dashboard( }} function applyTaskSort(rows) {{ - const fields = ['task_id', 'instance', 'process', 'avg', 'mn', 'mx', 'stdDev', 'successRate', 'total']; + const fields = ['task_id', 'instance', 'task_target', 'avg', 'mn', 'mx', 'stdDev', 'successRate', 'total']; const field = fields[taskTableSortCol]; rows.sort((a, b) => {{ const av = a[field], bv = b[field]; @@ -1556,10 +1592,12 @@ def generate_dashboard( tbody.innerHTML = pageRows.map(r => {{ const cvClass = r.cv > 0.5 ? 'high-cv' : ''; const statusCls = r.successRate < 100 ? 'status-fail' : 'status-success'; + const kindTag = r.task_kind === 'chore' ? '[C]' : '[P]'; + const target = r.task_target || r.process || r.chore || ''; return ` - + @@ -1585,9 +1623,11 @@ def generate_dashboard( tbody.innerHTML = sorted.map(o => {{ const statusCls = o.status === 'Success' ? 'status-success' : 'status-fail'; const vsMedian = o.duration_seconds - median; + const kindTag = o.task_kind === 'chore' || o.chore ? '[C]' : '[P]'; + const target = o.task_target || o.process || o.chore || ''; return ` - + @@ -1613,15 +1653,19 @@ def generate_dashboard( }} panel.style.display = 'block'; - tbody.innerHTML = filtered.map(f => ` + tbody.innerHTML = filtered.map(f => {{ + const kindTag = f.task_kind === 'chore' || f.chore ? '[C]' : '[P]'; + const target = f.task_target || f.process || f.chore || ''; + return ` - + - `).join(''); + `; + }}).join(''); }} function updateConfigDetails(runs) {{ diff --git a/src/rushti/execution.py b/src/rushti/execution.py index db6f058..fd80ac5 100644 --- a/src/rushti/execution.py +++ b/src/rushti/execution.py @@ -24,6 +24,7 @@ from TM1py import TM1Service from TM1py.Exceptions import TM1pyTimeout +from TM1py.Objects import Chore from rushti.task import Task, OptimizedTask from rushti.dag import DAG @@ -40,6 +41,11 @@ MSG_PROCESS_NOT_EXISTS, MSG_PROCESS_PARAMS_INCORRECT, MSG_PROCESS_TIMEOUT, + MSG_CHORE_EXECUTE, + MSG_CHORE_SUCCESS, + MSG_CHORE_FAIL, + MSG_CHORE_NOT_EXISTS, + MSG_CHORE_REQUIRES_SINGLE_COMMIT, ) from rushti.parsing import get_instances_from_tasks_file from rushti.exclusive import build_session_context @@ -99,12 +105,17 @@ def _collect_task_stats( return with ctx.stats_data_lock: + # ``process`` is NOT NULL in SQLite; chore tasks write the empty + # string so the column constraint stays satisfied without making + # it nullable. The kind is unambiguous: rows with a non-empty + # ``chore`` are chore tasks, rows without are processes. stats_entry = { "run_id": ctx.execution_logger.run_id if ctx.execution_logger else "", "workflow": ctx.execution_logger.workflow if ctx.execution_logger else None, "task_id": task.id, "instance": task.instance_name, - "process": task.process_name, + "process": task.process_name or "", + "chore": getattr(task, "chore_name", None), "parameters": task.parameters if isinstance(task.parameters, dict) else {}, "success": success, "start_time": start_time, @@ -217,6 +228,39 @@ def setup_tm1_services( return tm1_services, tm1_preserve_connections +def execute_chore_with_retries(tm1: TM1Service, task: Task, retries: int): + """Execute a TM1 chore with retry support. + + Mirrors :func:`execute_process_with_retries` for the chore path. + Chore execution at the TM1 API boundary is binary: HTTP 204 = success + (no body to parse), any exception (typically HTTP 500 with a + "Chore execution failed" body) = failure. + + Returns a 4-tuple shaped like the process variant for symmetry — the + ``status`` slot carries a short human-readable summary, ``error_log_file`` + is always empty (chores write no per-execution error file), and + ``attempts`` is the 0-based attempt index of the result. + + :raises Exception: re-raised on the final attempt when retries + exhausted, matching the process path's error propagation. + """ + # Only retry when ``safe_retry`` is True. ``validate_tasks`` has + # already asserted the chore is SINGLE_COMMIT in that case so + # whole-chore retry cannot leak partial state. + effective_retries = retries if task.safe_retry else 0 + + for attempt in range(effective_retries + 1): + try: + tm1.chores.execute_chore(task.chore_name) + return True, "Completed", "", attempt + except Exception: + if attempt == effective_retries: + raise + + # Defensive — loop body always returns or raises. + return False, "", "", effective_retries + + def execute_process_with_retries(tm1: TM1Service, task: Task, retries: int): for attempt in range(retries + 1): try: @@ -292,12 +336,21 @@ def execute_task( return False if task.instance_name not in tm1_services: + # Reuse the process-flavoured "instance not in config" message — + # the chore name slots in as the task identity for context. msg = MSG_PROCESS_FAIL_INSTANCE_NOT_IN_CONFIG_FILE.format( - process_name=task.process_name, instance_name=task.instance_name + process_name=task.process_name or task.chore_name, + instance_name=task.instance_name, ) logger.error(msg) return False + # Dispatch to chore branch for chore-kind tasks. Mutual exclusion + # guarantees that ``chore_name`` is set iff ``process_name`` is not, + # so the dispatch is unambiguous. + if getattr(task, "chore_name", None): + return _execute_chore_task(ctx, task, retries, tm1_services) + tm1 = tm1_services[task.instance_name] # Execute it - include stage in log if present stage_info = f" [stage: {task.stage}]" if task.stage else "" @@ -465,13 +518,128 @@ def execute_task( return False +def _execute_chore_task( + ctx: ExecutionContext, + task: Task, + retries: int, + tm1_services: Dict[str, TM1Service], +) -> bool: + """Execute a chore-kind task. + + Parallel to the chore branch of :func:`execute_task` for processes. + Logs success/failure, records stats, and threads the chore identity + into the structured execution logger via the new ``chore=`` parameter. + """ + tm1 = tm1_services[task.instance_name] + stage_info = f" [stage: {task.stage}]" if task.stage else "" + logger.info( + MSG_CHORE_EXECUTE.format( + chore_name=task.chore_name, + instance_name=task.instance_name, + ) + + stage_info + ) + start_time = datetime.now() + + try: + success, status, _, attempts = execute_chore_with_retries( + tm1=tm1, task=task, retries=retries + ) + end_time = datetime.now() + elapsed_time = end_time - start_time + + if success: + logger.info( + MSG_CHORE_SUCCESS.format( + chore=task.chore_name, + instance=task.instance_name, + retries=attempts, + time=elapsed_time, + ) + ) + + if ctx.execution_logger: + ctx.execution_logger.log_task_execution( + task_id=task.id, + instance=task.instance_name, + process="", + chore=task.chore_name, + parameters={}, + success=True, + start_time=start_time, + end_time=end_time, + retry_count=attempts, + ) + + _collect_task_stats( + ctx, + task, + success=True, + start_time=start_time, + end_time=end_time, + retry_count=attempts, + ) + return True + + # Unreachable in practice: chore execution returns True or raises. + # Guarded for symmetry with the process path. + return False + + except Exception as e: + end_time = datetime.now() + elapsed_time = end_time - start_time + # ``attempts`` is the global ``retries`` ceiling when safe_retry + # is enabled and we exhausted the loop, else 0. + attempts = retries if task.safe_retry else 0 + error = str(e) + logger.error( + MSG_CHORE_FAIL.format( + chore=task.chore_name, + instance=task.instance_name, + retries=attempts, + time=elapsed_time, + error=error, + ) + ) + + if ctx.execution_logger: + ctx.execution_logger.log_task_execution( + task_id=task.id, + instance=task.instance_name, + process="", + chore=task.chore_name, + parameters={}, + success=False, + start_time=start_time, + end_time=end_time, + retry_count=attempts, + error_message=error, + ) + + _collect_task_stats( + ctx, + task, + success=False, + start_time=start_time, + end_time=end_time, + retry_count=attempts, + error_message=error, + ) + return False + + def verify_predecessors_ok(ctx: ExecutionContext, task: OptimizedTask) -> bool: + # Use the kind-appropriate identity for log messages — chore tasks + # have no process name and no parameters to render. + target = task.process_name or task.chore_name + parameters = task.parameters if task.process_name else {} + for predecessor_id in task.predecessors: if predecessor_id not in ctx.task_execution_results: msg = MSG_PROCESS_ABORTED_UNCOMPLETE_PREDECESSOR.format( instance=task.instance_name, - process=task.process_name, - parameters=task.parameters, + process=target, + parameters=parameters, predecessor=predecessor_id, ) logger.error(msg) @@ -480,8 +648,8 @@ def verify_predecessors_ok(ctx: ExecutionContext, task: OptimizedTask) -> bool: if not ctx.task_execution_results[predecessor_id]: msg = MSG_PROCESS_ABORTED_FAILED_PREDECESSOR.format( instance=task.instance_name, - process=task.process_name, - parameters=task.parameters, + process=target, + parameters=parameters, predecessor=predecessor_id, ) logger.error(msg) @@ -491,21 +659,61 @@ def verify_predecessors_ok(ctx: ExecutionContext, task: OptimizedTask) -> bool: def validate_tasks(tasks: List[Task], tm1_services: Dict[str, TM1Service]) -> bool: - validated_tasks = [] + validated_processes: List[str] = [] + # Chore validation is keyed by (instance, chore_name) — chore names + # are scoped to a server and existence must be checked per-instance. + # safe_retry tasks additionally trigger a SINGLE_COMMIT check. + validated_chores: set = set() + safe_retry_chore_checked: set = set() validation_ok = True tasks = [task for task in tasks if isinstance(task, Task)] # --> ignore Wait(s) for task in tasks: + tm1 = tm1_services[task.instance_name] + + if getattr(task, "chore_name", None): + chore_key = (task.instance_name, task.chore_name) + + # Existence check — dedup'd by (instance, chore_name). + if chore_key not in validated_chores: + validated_chores.add(chore_key) + if not tm1.chores.exists(task.chore_name): + logger.error( + MSG_CHORE_NOT_EXISTS.format( + chore=task.chore_name, instance=task.instance_name + ) + ) + validation_ok = False + # Skip the safe_retry check — chore doesn't exist. + continue + + # SINGLE_COMMIT check — only fetched when safe_retry is True + # so we avoid the extra round-trip otherwise. Dedup'd + # independently so a mixed taskfile (some safe, some not) + # still triggers the fetch exactly once per chore. + if task.safe_retry and chore_key not in safe_retry_chore_checked: + safe_retry_chore_checked.add(chore_key) + chore = tm1.chores.get(task.chore_name) + if chore.execution_mode != Chore.SINGLE_COMMIT: + logger.error( + MSG_CHORE_REQUIRES_SINGLE_COMMIT.format( + chore=task.chore_name, + instance=task.instance_name, + execution_mode=chore.execution_mode, + ) + ) + validation_ok = False + continue + + # Process branch (unchanged behaviour, slightly tidied). current_task = { "instance": task.instance_name, "process": task.process_name, "parameters": task.parameters.keys(), } - tm1 = tm1_services[task.instance_name] - # avoid repeated validations - if current_task["process"] in validated_tasks: + if current_task["process"] in validated_processes: continue # check for process existence @@ -514,7 +722,7 @@ def validate_tasks(tasks: List[Task], tm1_services: Dict[str, TM1Service]) -> bo process=task.process_name, instance=task.instance_name ) logger.error(msg) - validated_tasks.append(current_task["process"]) + validated_processes.append(current_task["process"]) validation_ok = False continue @@ -536,7 +744,7 @@ def validate_tasks(tasks: List[Task], tm1_services: Dict[str, TM1Service]) -> bo logger.error(msg) validation_ok = False - validated_tasks.append(current_task["process"]) + validated_processes.append(current_task["process"]) return validation_ok diff --git a/src/rushti/logging.py b/src/rushti/logging.py index 509bda7..102aa99 100644 --- a/src/rushti/logging.py +++ b/src/rushti/logging.py @@ -22,7 +22,8 @@ class TaskExecutionLog: """Structured log entry for a single task execution. Contains all standardized fields for task execution logging as per - the logging specification. + the logging specification. Exactly one of ``process`` or ``chore`` + carries the kind-specific identity; the unused field is empty. """ workflow: str @@ -36,6 +37,7 @@ class TaskExecutionLog: duration_seconds: float retry_count: int = 0 error_message: Optional[str] = None + chore: Optional[str] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary with serializable values. @@ -67,19 +69,22 @@ def from_execution_result( end_time: datetime, retry_count: int = 0, error_message: Optional[str] = None, + chore: Optional[str] = None, ) -> "TaskExecutionLog": """Create a log entry from execution result data. :param workflow: Workflow name :param task_id: Task identifier within the file :param instance: TM1 instance name - :param process: TI process name + :param process: TI process name (empty for chore tasks) :param parameters: Process parameters as dictionary :param success: Whether execution succeeded :param start_time: Execution start time :param end_time: Execution end time :param retry_count: Number of retries attempted :param error_message: Error message if failed + :param chore: TM1 chore name (set for chore tasks; mutually + exclusive with a meaningful ``process``) :return: TaskExecutionLog instance """ duration = (end_time - start_time).total_seconds() @@ -95,6 +100,7 @@ def from_execution_result( duration_seconds=round(duration, 3), retry_count=retry_count, error_message=error_message if not success else None, + chore=chore, ) @@ -203,16 +209,16 @@ def write_logs(self, run: ExecutionRun) -> bool: # Use DEBUG level for individual task logs since execution status # is already logged as tasks complete. Only failures at ERROR. log_level = logging.DEBUG if log.status == "Success" else logging.ERROR + target = log.chore or log.process # Plain text format: concise, human-readable if log.status == "Success": msg = ( - f"Task completed: {log.instance}:{log.process} " - f"[{log.duration_seconds:.3f}s]" + f"Task completed: {log.instance}:{target} " f"[{log.duration_seconds:.3f}s]" ) else: error_info = f" - {log.error_message}" if log.error_message else "" msg = ( - f"Task failed: {log.instance}:{log.process} " + f"Task failed: {log.instance}:{target} " f"[{log.duration_seconds:.3f}s]{error_info}" ) self.exec_logger.log(log_level, msg) @@ -279,18 +285,22 @@ def log_task_execution( end_time: datetime, retry_count: int = 0, error_message: Optional[str] = None, + chore: Optional[str] = None, ) -> TaskExecutionLog: """Log a single task execution result. :param task_id: Task identifier :param instance: TM1 instance name - :param process: TI process name + :param process: TI process name (empty when ``chore`` is set) :param parameters: Process parameters :param success: Whether execution succeeded :param start_time: Execution start time :param end_time: Execution end time :param retry_count: Number of retries attempted :param error_message: Error message if failed + :param chore: TM1 chore name. Pass this for chore-kind tasks so + the structured log stays queryable by kind; do not overload + the ``process`` parameter. :return: The created TaskExecutionLog entry """ log = TaskExecutionLog.from_execution_result( @@ -304,6 +314,7 @@ def log_task_execution( end_time=end_time, retry_count=retry_count, error_message=error_message, + chore=chore, ) self.current_run.add_log(log) return log diff --git a/src/rushti/messages.py b/src/rushti/messages.py index 03d55da..ec09753 100644 --- a/src/rushti/messages.py +++ b/src/rushti/messages.py @@ -65,6 +65,31 @@ "{timeout} seconds on instance: '{instance}'. Elapsed time: {time}" ) +# --------------------------------------------------------------------------- +# Chore message templates +# --------------------------------------------------------------------------- +# Mirrors the MSG_PROCESS_* set above. Chore execution has no +# parameters, no minor-error tier, no native timeout, and no error log +# file — the message templates are intentionally narrower. + +MSG_CHORE_EXECUTE = "Executing chore: '{chore_name}' on instance: '{instance_name}'" +MSG_CHORE_SUCCESS = ( + "Execution successful: Chore '{chore}' with {retries} retries on instance: " + "'{instance}'. Elapsed time: {time}" +) +MSG_CHORE_FAIL = ( + "Execution failed. Chore: '{chore}' with {retries} retries on instance: '{instance}'. " + "Elapsed time: {time}. Error: {error}" +) +MSG_CHORE_NOT_EXISTS = ( + "Task validation failed. Chore: '{chore}' does not exist on instance: '{instance}'" +) +MSG_CHORE_REQUIRES_SINGLE_COMMIT = ( + "Task validation failed. Chore: '{chore}' on instance: '{instance}' has " + "execution_mode={execution_mode}; 'safe_retry: true' requires SINGLE_COMMIT " + "(MULTIPLE_COMMIT chores leak partial state on failure)" +) + # --------------------------------------------------------------------------- # CLI configuration values # --------------------------------------------------------------------------- diff --git a/src/rushti/optimizer.py b/src/rushti/optimizer.py index ff5c919..2d69638 100644 --- a/src/rushti/optimizer.py +++ b/src/rushti/optimizer.py @@ -126,8 +126,9 @@ def build_cache(self, tasks: List[Any]) -> None: for task in tasks: signature = calculate_task_signature( instance=getattr(task, "instance_name", ""), - process=getattr(task, "process_name", ""), + process=getattr(task, "process_name", None), parameters=getattr(task, "parameters", {}), + chore=getattr(task, "chore_name", None), ) if signature not in signatures: signatures[signature] = task @@ -172,8 +173,9 @@ def get_estimate(self, task: Any) -> RuntimeEstimate: """ signature = calculate_task_signature( instance=getattr(task, "instance_name", ""), - process=getattr(task, "process_name", ""), + process=getattr(task, "process_name", None), parameters=getattr(task, "parameters", {}), + chore=getattr(task, "chore_name", None), ) if signature in self._cache: diff --git a/src/rushti/parsing.py b/src/rushti/parsing.py index c91080e..0bf1539 100644 --- a/src/rushti/parsing.py +++ b/src/rushti/parsing.py @@ -64,7 +64,8 @@ def extract_task_from_line( return OptimizedTask( task_id=task_id, instance_name=line_arguments.pop("instance"), - process_name=line_arguments.pop("process"), + process_name=line_arguments.pop("process", None), + chore_name=line_arguments.pop("chore", None), predecessors=predecessors, require_predecessor_success=require_predecessor_success, succeed_on_minor_errors=succeed_on_minor_errors, @@ -78,7 +79,8 @@ def extract_task_from_line( return Task( instance_name=line_arguments.pop("instance"), succeed_on_minor_errors=line_arguments.pop("succeed_on_minor_errors", False), - process_name=line_arguments.pop("process"), + process_name=line_arguments.pop("process", None), + chore_name=line_arguments.pop("chore", None), safe_retry=safe_retry, stage=stage, timeout=timeout, @@ -104,6 +106,10 @@ def expand_task( :param task: Task to expand :return: List of expanded tasks (or single-element list with original task) """ + # Chore tasks have no parameters and nothing to expand. + if getattr(task, "chore_name", None): + return [task] + # Handle None or empty parameters - no expansion needed if not task.parameters: return [task] @@ -136,9 +142,10 @@ def expand_task( if isinstance(task, OptimizedTask): result.append( OptimizedTask( - task.id, - task.instance_name, - task.process_name, + task_id=task.id, + instance_name=task.instance_name, + process_name=task.process_name, + chore_name=task.chore_name, parameters=expanded_params, predecessors=task.predecessors, require_predecessor_success=task.require_predecessor_success, @@ -152,8 +159,9 @@ def expand_task( elif isinstance(task, Task): result.append( Task( - task.instance_name, - task.process_name, + instance_name=task.instance_name, + process_name=task.process_name, + chore_name=task.chore_name, parameters=expanded_params, succeed_on_minor_errors=task.succeed_on_minor_errors, safe_retry=task.safe_retry, @@ -314,11 +322,14 @@ def convert_json_to_dag( dag = DAG() for task_def in taskfile.tasks: - # Create OptimizedTask from TaskDefinition + # Create OptimizedTask from TaskDefinition. Mutual exclusion of + # process/chore was enforced at parse time and re-asserted by + # Task.__init__. task = OptimizedTask( task_id=task_def.id, instance_name=task_def.instance, process_name=task_def.process, + chore_name=task_def.chore, parameters=task_def.parameters.copy(), predecessors=task_def.predecessors.copy(), require_predecessor_success=task_def.require_predecessor_success, diff --git a/src/rushti/stats/dynamodb.py b/src/rushti/stats/dynamodb.py index 750bbdd..4c910ca 100644 --- a/src/rushti/stats/dynamodb.py +++ b/src/rushti/stats/dynamodb.py @@ -109,6 +109,7 @@ def _normalize_task_item(self, item: Dict[str, Any]) -> Dict[str, Any]: "task_signature": item.get("task_signature"), "instance": item.get("instance"), "process": item.get("process"), + "chore": item.get("chore"), "parameters": item.get("parameters", "{}"), "status": item.get("status"), "start_time": item.get("start_time"), @@ -198,12 +199,13 @@ def record_task( require_predecessor_success: Optional[bool] = None, succeed_on_minor_errors: Optional[bool] = None, workflow: Optional[str] = None, + chore: Optional[str] = None, ) -> None: if not self.enabled: return duration = (end_time - start_time).total_seconds() - task_signature = calculate_task_signature(instance, process, parameters) + task_signature = calculate_task_signature(instance, process, parameters, chore=chore) item = { "run_id": run_id, @@ -213,6 +215,7 @@ def record_task( "task_signature": task_signature, "instance": instance, "process": process, + "chore": chore, "parameters": json.dumps(parameters) if parameters else "{}", "status": "Success" if success else "Fail", "start_time": start_time.isoformat(), @@ -241,8 +244,11 @@ def batch_record_tasks(self, tasks: List[Dict[str, Any]]) -> None: duration = (end_time - start_time).total_seconds() instance = task["instance"] process = task["process"] + chore = task.get("chore") parameters = task.get("parameters") - task_signature = calculate_task_signature(instance, process, parameters) + task_signature = calculate_task_signature( + instance, process, parameters, chore=chore + ) writer.put_item( Item={ @@ -253,6 +259,7 @@ def batch_record_tasks(self, tasks: List[Dict[str, Any]]) -> None: "task_signature": task_signature, "instance": instance, "process": process, + "chore": chore, "parameters": json.dumps(parameters) if parameters else "{}", "status": "Success" if task["success"] else "Fail", "start_time": start_time.isoformat(), diff --git a/src/rushti/stats/repository.py b/src/rushti/stats/repository.py index 17b4f09..ba52e99 100644 --- a/src/rushti/stats/repository.py +++ b/src/rushti/stats/repository.py @@ -84,6 +84,7 @@ def record_task( require_predecessor_success: Optional[bool] = ..., succeed_on_minor_errors: Optional[bool] = ..., workflow: Optional[str] = ..., + chore: Optional[str] = ..., ) -> None: ... def batch_record_tasks(self, tasks: List[Dict[str, Any]]) -> None: ... diff --git a/src/rushti/stats/signature.py b/src/rushti/stats/signature.py index 30826d7..a3c129b 100644 --- a/src/rushti/stats/signature.py +++ b/src/rushti/stats/signature.py @@ -6,24 +6,37 @@ def calculate_task_signature( - instance: str, process: str, parameters: Optional[Dict[str, Any]] + instance: str, + process: Optional[str], + parameters: Optional[Dict[str, Any]], + chore: Optional[str] = None, ) -> str: """Calculate deterministic signature for task identity. The signature uniquely identifies a task configuration for runtime estimation across multiple runs. Tasks with the same - instance, process, and parameters will have the same signature. + instance + kind + name (+ parameters, for processes) will share a + signature. + + Chore signatures use a dedicated ``chore`` prefix so they occupy a + disjoint hash space from existing process signatures. This means a + chore named ``daily_etl`` and a process named ``daily_etl`` on the + same instance never collide in the optimizer's history cache, even + if they share a name by accident. :param instance: TM1 instance name - :param process: TI process name - :param parameters: Process parameters dictionary + :param process: TI process name (for process-kind tasks) + :param parameters: Process parameters dictionary (ignored for chores) + :param chore: TM1 chore name (for chore-kind tasks) :return: 16-character hex signature """ - # Sort parameters for deterministic hash - sorted_params = json.dumps(parameters, sort_keys=True) if parameters else "{}" - - # Combine components - signature_input = f"{instance}|{process}|{sorted_params}" + if chore: + # Chores have no invocation parameters — the dimension is just + # (instance, chore_name). Prefix keeps the signature space + # disjoint from process signatures. + signature_input = f"chore|{instance}|{chore}" + else: + sorted_params = json.dumps(parameters, sort_keys=True) if parameters else "{}" + signature_input = f"{instance}|{process or ''}|{sorted_params}" - # SHA256 hash (truncated for readability) return hashlib.sha256(signature_input.encode()).hexdigest()[:16] diff --git a/src/rushti/stats/sqlite.py b/src/rushti/stats/sqlite.py index 99fe871..4e82597 100644 --- a/src/rushti/stats/sqlite.py +++ b/src/rushti/stats/sqlite.py @@ -96,7 +96,10 @@ def _create_tables(self) -> None: ) """) - # Task results table - includes task configuration for analysis + # Task results table - includes task configuration for analysis. + # ``process`` stays NOT NULL for backward compatibility — chore + # tasks write the empty string. Rows with a non-empty ``chore`` + # are chore-kind tasks; mutual exclusion is enforced upstream. cursor.execute(""" CREATE TABLE IF NOT EXISTS task_results ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -106,6 +109,7 @@ def _create_tables(self) -> None: task_signature TEXT NOT NULL, instance TEXT NOT NULL, process TEXT NOT NULL, + chore TEXT, parameters TEXT, status TEXT NOT NULL, start_time TEXT NOT NULL, @@ -156,6 +160,12 @@ def _create_tables(self) -> None: cursor.execute("ALTER TABLE runs ADD COLUMN optimization_algorithm TEXT") logger.info("Added optimization_algorithm column to runs table") + cursor.execute("PRAGMA table_info(task_results)") + task_result_columns = [row[1] for row in cursor.fetchall()] + if "chore" not in task_result_columns: + cursor.execute("ALTER TABLE task_results ADD COLUMN chore TEXT") + logger.info("Added chore column to task_results table") + self._conn.commit() def start_run( @@ -248,6 +258,8 @@ def record_task( succeed_on_minor_errors: Optional[bool] = None, # Workflow context for TM1 cube alignment workflow: Optional[str] = None, + # Kind-discriminating field for chore-kind tasks + chore: Optional[str] = None, ) -> None: """Record a task execution result. @@ -274,7 +286,7 @@ def record_task( return duration = (end_time - start_time).total_seconds() - task_signature = calculate_task_signature(instance, process, parameters) + task_signature = calculate_task_signature(instance, process, parameters, chore=chore) params_json = json.dumps(parameters) if parameters else "{}" predecessors_json = json.dumps(predecessors) if predecessors else None status = "Success" if success else "Fail" @@ -283,11 +295,11 @@ def record_task( cursor.execute( """ INSERT INTO task_results ( - run_id, workflow, task_id, task_signature, instance, process, parameters, + run_id, workflow, task_id, task_signature, instance, process, chore, parameters, status, start_time, end_time, duration_seconds, retry_count, error_message, predecessors, stage, safe_retry, timeout, cancel_at_timeout, require_predecessor_success, succeed_on_minor_errors - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( run_id, @@ -296,6 +308,7 @@ def record_task( task_signature, instance, process, + chore, params_json, status, start_time.isoformat(), @@ -339,6 +352,7 @@ def batch_record_tasks(self, tasks: List[Dict[str, Any]]) -> None: task_id = task["task_id"] instance = task["instance"] process = task["process"] + chore = task.get("chore") parameters = task.get("parameters") success = task["success"] start_time = task["start_time"] @@ -354,7 +368,9 @@ def batch_record_tasks(self, tasks: List[Dict[str, Any]]) -> None: succeed_on_minor_errors = task.get("succeed_on_minor_errors") duration = (end_time - start_time).total_seconds() - task_signature = calculate_task_signature(instance, process, parameters) + task_signature = calculate_task_signature( + instance, process, parameters, chore=chore + ) params_json = json.dumps(parameters) if parameters else "{}" predecessors_json = json.dumps(predecessors) if predecessors else None status = "Success" if success else "Fail" @@ -362,11 +378,12 @@ def batch_record_tasks(self, tasks: List[Dict[str, Any]]) -> None: cursor.execute( """ INSERT INTO task_results ( - run_id, workflow, task_id, task_signature, instance, process, parameters, - status, start_time, end_time, duration_seconds, retry_count, error_message, - predecessors, stage, safe_retry, timeout, cancel_at_timeout, - require_predecessor_success, succeed_on_minor_errors - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + run_id, workflow, task_id, task_signature, instance, process, chore, + parameters, status, start_time, end_time, duration_seconds, + retry_count, error_message, predecessors, stage, safe_retry, + timeout, cancel_at_timeout, require_predecessor_success, + succeed_on_minor_errors + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( run_id, @@ -375,6 +392,7 @@ def batch_record_tasks(self, tasks: List[Dict[str, Any]]) -> None: task_signature, instance, process, + chore, params_json, status, start_time.isoformat(), @@ -602,7 +620,7 @@ def get_run_results(self, run_id: str) -> List[Dict[str, Any]]: cursor.execute( """ SELECT - workflow, task_id, task_signature, instance, process, parameters, + workflow, task_id, task_signature, instance, process, chore, parameters, status, start_time, end_time, duration_seconds, retry_count, error_message, predecessors, stage, safe_retry, timeout, cancel_at_timeout, require_predecessor_success, succeed_on_minor_errors diff --git a/src/rushti/task.py b/src/rushti/task.py index ac15fdc..96374d1 100644 --- a/src/rushti/task.py +++ b/src/rushti/task.py @@ -6,6 +6,13 @@ - Task: Base task for norm-mode execution - OptimizedTask: Extended task with explicit predecessors for opt-mode - ExecutionMode: Enum for execution mode selection (norm vs opt) + +A task is polymorphic on its **kind**: it either executes a TI +``process`` (with parameters) or a TM1 ``chore`` (no parameters). The +field name is the discriminator; there is no separate ``kind`` field. +Mutual exclusion of ``process_name`` and ``chore_name`` is enforced as +a class invariant on ``Task.__init__`` so downstream code can rely on +exactly one being set without re-checking. See ADR 0002. """ from enum import Enum @@ -39,17 +46,31 @@ def reset_id_counter(cls): def __init__( self, instance_name: str, - process_name: str, + process_name: Optional[str] = None, parameters: Dict[str, Any] = None, succeed_on_minor_errors: bool = False, safe_retry: bool = False, stage: Optional[str] = None, timeout: Optional[int] = None, cancel_at_timeout: bool = False, + chore_name: Optional[str] = None, ): + # Class invariant: exactly one of process_name / chore_name is set. + # The five Task(...) construction sites in parsing.py plus internal + # paths in execution.py make a class-level check cheaper than + # auditing each site individually — see ADR 0002 §2. + if process_name and chore_name: + raise ValueError( + "Task: 'process_name' and 'chore_name' are mutually exclusive — " + "exactly one must be set" + ) + if not process_name and not chore_name: + raise ValueError("Task: exactly one of 'process_name' or 'chore_name' must be set") + self.id = Task.id self.instance_name = instance_name self.process_name = process_name + self.chore_name = chore_name self.parameters = parameters self.succeed_on_minor_errors = succeed_on_minor_errors self.safe_retry = safe_retry @@ -60,6 +81,21 @@ def __init__( Task.id = Task.id + 1 def translate_to_line(self): + if self.chore_name: + # Chores are intentionally narrower than processes — no + # parameters, no minor-error tier, no timeout, no + # cancel_at_timeout. Only safe_retry and stage are meaningful + # alongside the kind-specific identity. + parts = [ + f'instance="{self.instance_name}"', + f'chore="{self.chore_name}"', + ] + if self.safe_retry: + parts.append(f'safe_retry="{self.safe_retry}"') + if self.stage: + parts.append(f'stage="{self.stage}"') + return " ".join(parts) + "\n" + parts = [ f'instance="{self.instance_name}"', f'process="{self.process_name}"', @@ -83,28 +119,30 @@ def __init__( self, task_id: str, instance_name: str, - process_name: str, - parameters: Dict[str, Any], - predecessors: List, - require_predecessor_success: bool, + process_name: Optional[str] = None, + parameters: Dict[str, Any] = None, + predecessors: List = None, + require_predecessor_success: bool = False, succeed_on_minor_errors: bool = False, safe_retry: bool = False, stage: Optional[str] = None, timeout: Optional[int] = None, cancel_at_timeout: bool = False, + chore_name: Optional[str] = None, ): super().__init__( - instance_name, - process_name, - parameters, - succeed_on_minor_errors, - safe_retry, - stage, - timeout, - cancel_at_timeout, + instance_name=instance_name, + process_name=process_name, + parameters=parameters, + succeed_on_minor_errors=succeed_on_minor_errors, + safe_retry=safe_retry, + stage=stage, + timeout=timeout, + cancel_at_timeout=cancel_at_timeout, + chore_name=chore_name, ) self.id = task_id - self.predecessors = predecessors + self.predecessors = predecessors if predecessors is not None else [] self.require_predecessor_success = require_predecessor_success self.successors = list() @@ -117,6 +155,20 @@ def has_successors(self): return len(self.successors) > 0 def translate_to_line(self): + if self.chore_name: + parts = [ + f'id="{self.id}"', + f'predecessors="{",".join(map(str, self.predecessors))}"', + f'require_predecessor_success="{self.require_predecessor_success}"', + ] + if self.safe_retry: + parts.append(f'safe_retry="{self.safe_retry}"') + if self.stage: + parts.append(f'stage="{self.stage}"') + parts.append(f'instance="{self.instance_name}"') + parts.append(f'chore="{self.chore_name}"') + return " ".join(parts) + "\n" + parts = [ f'id="{self.id}"', f'predecessors="{",".join(map(str, self.predecessors))}"', diff --git a/src/rushti/taskfile.py b/src/rushti/taskfile.py index c952f78..145dd4f 100644 --- a/src/rushti/taskfile.py +++ b/src/rushti/taskfile.py @@ -20,8 +20,20 @@ # JSON Schema version SCHEMA_VERSION = "2.0" -# Required task properties -REQUIRED_TASK_PROPERTIES = {"id", "instance", "process"} +# Always-required task properties. The kind-specific field (`process` or +# `chore`) is enforced separately as a mutual-exclusion check in +# `validate_task`. +REQUIRED_TASK_PROPERTIES = {"id", "instance"} + +# Fields that are not allowed on chore tasks. The chore execution surface +# at the TM1 API boundary is intentionally narrower than processes: +# no parameters, no minor-error tier, no native timeout. +CHORE_FORBIDDEN_FIELDS = ( + "parameters", + "succeed_on_minor_errors", + "timeout", + "cancel_at_timeout", +) # Default values for optional task properties TASK_DEFAULTS = { @@ -112,11 +124,19 @@ def from_dict(cls, data: Dict[str, Any]) -> "TaskfileSettings": @dataclass class TaskDefinition: - """Unified task definition from JSON.""" + """Unified task definition from JSON. + + A task carries exactly one kind-discriminating field: either + ``process`` (TI process) or ``chore`` (TM1 chore). The field name is + the kind discriminator; there is no separate ``kind`` field. Mutual + exclusion is enforced at parse-time validation and as a class + invariant on ``Task.__init__``. See ADR 0002. + """ id: str instance: str - process: str + process: Optional[str] = None + chore: Optional[str] = None parameters: Dict[str, Any] = field(default_factory=dict) predecessors: List[str] = field(default_factory=list) stage: Optional[str] = None @@ -127,11 +147,14 @@ class TaskDefinition: succeed_on_minor_errors: bool = False def to_dict(self) -> Dict[str, Any]: - result = { + result: Dict[str, Any] = { "id": self.id, "instance": self.instance, - "process": self.process, } + if self.process: + result["process"] = self.process + if self.chore: + result["chore"] = self.chore if self.parameters: result["parameters"] = self.parameters if self.predecessors: @@ -155,7 +178,8 @@ def from_dict(cls, data: Dict[str, Any]) -> "TaskDefinition": return cls( id=str(data["id"]), instance=data["instance"], - process=data["process"], + process=data.get("process") or None, + chore=data.get("chore") or None, parameters=data.get("parameters", {}), predecessors=[str(p) for p in data.get("predecessors", [])], stage=data.get("stage"), @@ -374,11 +398,50 @@ def validate_task(task_data: Dict[str, Any], index: int) -> List[str]: """ errors = [] - # Check required properties + # Check always-required properties (id, instance). for prop in REQUIRED_TASK_PROPERTIES: if prop not in task_data or not task_data[prop]: errors.append(f"Task {index}: Missing required property '{prop}'") + # Mutual exclusion: exactly one of `process` or `chore` must be set. + has_process = bool(task_data.get("process")) + has_chore = bool(task_data.get("chore")) + if has_process and has_chore: + errors.append( + f"Task {index}: 'process' and 'chore' are mutually exclusive — " + f"exactly one must be set" + ) + elif not has_process and not has_chore: + errors.append(f"Task {index}: exactly one of 'process' or 'chore' must be set") + + # Forbidden-field rejection for chore tasks. Each carries its own + # explanation so users learn why the field doesn't apply. + if has_chore and not has_process: + chore_forbidden_reasons = { + "parameters": "chores have no invocation parameters", + "succeed_on_minor_errors": "chores have no minor-error tier", + "timeout": "TM1 chore execution has no timeout", + "cancel_at_timeout": "no timeout to cancel", + } + for field_name in CHORE_FORBIDDEN_FIELDS: + if field_name not in task_data: + continue + value = task_data[field_name] + # An empty parameters dict / falsy default in the JSON is + # silently accepted; only explicit non-default values are + # rejected, so round-tripping a dict-only-default payload stays + # idempotent. + if field_name == "parameters" and not value: + continue + if field_name in ("succeed_on_minor_errors", "cancel_at_timeout") and not value: + continue + if field_name == "timeout" and value is None: + continue + errors.append( + f"Task {index}: '{field_name}' is not allowed on chore tasks " + f"({chore_forbidden_reasons[field_name]})" + ) + if "id" in task_data and not _is_positive_integer_id(task_data["id"]): errors.append( f"Task {index}: 'id' must be a positive integer. " @@ -576,7 +639,7 @@ def parse_line_arguments(line: str) -> Dict[str, Any]: # Handle specific keys with logic key_lower = argument.lower() - if key_lower in ["process", "instance", "id"]: + if key_lower in ["process", "chore", "instance", "id"]: line_arguments[key_lower] = value elif key_lower == "require_predecessor_success": line_arguments[key_lower] = value.lower() in TRUE_VALUES @@ -660,7 +723,8 @@ def convert_txt_to_json( task_def = TaskDefinition( id=tid, instance=parsed.get("instance", ""), - process=parsed.get("process", ""), + process=parsed.get("process") or None, + chore=parsed.get("chore") or None, parameters={}, predecessors=parsed.get("predecessors", []), stage=parsed.get("stage"), @@ -676,6 +740,7 @@ def convert_txt_to_json( "id", "instance", "process", + "chore", "predecessors", "stage", "safe_retry", @@ -713,6 +778,18 @@ def convert_txt_to_json( if not taskfile.metadata.workflow: taskfile.metadata.workflow = txt_path.stem + # Validate the converted taskfile — closes a pre-existing gap where + # malformed TXT files (missing fields, mutually-exclusive kinds, chore + # tasks carrying process-only fields) were silently accepted because + # `convert_txt_to_json` bypassed the structural validator. Aligns TXT + # input with JSON / cube sources so chore validation fires uniformly. + validation_errors = validate_taskfile(taskfile.to_dict()) + if validation_errors: + error_msg = "TXT task file validation failed:\n" + "\n".join( + f" - {e}" for e in validation_errors + ) + raise TaskfileValidationError(error_msg) + # Save if output path provided if output_path: taskfile.save(output_path) diff --git a/src/rushti/taskfile_ops.py b/src/rushti/taskfile_ops.py index c6ce8af..91bd211 100644 --- a/src/rushti/taskfile_ops.py +++ b/src/rushti/taskfile_ops.py @@ -520,20 +520,26 @@ def _visualize_dag_html( if in_degree[child] == 0: queue.append(child) - # Build nodes list for vis.js with all data needed for all views + # Build nodes list for vis.js with all data needed for all views. + # Chore tasks render the chore name as the task target with a short + # kind tag so the same column can show either kind without ambiguity. nodes = [] for task_id, task in tasks_by_id.items(): stage_name = task.stage if task.stage else "NoStage" color = stage_colors.get(stage_name, stage_colors["default"]) + is_chore = bool(task.chore) + task_target = task.chore if is_chore else (task.process or "") + kind_label = "Chore" if is_chore else "Process" + # Compact label (just ID) compact_label = f"{task_id}" - # Detailed label (ID, process, instance, optionally parameters) - detailed_parts = [f"ID: {task_id}", f"Process: {task.process}"] + # Detailed label (ID, target, instance, optionally parameters) + detailed_parts = [f"ID: {task_id}", f"{kind_label}: {task_target}"] if task.instance: detailed_parts.append(f"Instance: {task.instance}") - if show_parameters and task.parameters: + if show_parameters and not is_chore and task.parameters: params_str = ", ".join(f"{k}={v}" for k, v in task.parameters.items()) if len(params_str) > 40: params_str = params_str[:37] + "..." @@ -541,20 +547,20 @@ def _visualize_dag_html( detailed_label = "\n".join(detailed_parts) # Build tooltip with full details (plain text for proper rendering) - tooltip_parts = [f"ID: {task_id}", f"Process: {task.process}"] + tooltip_parts = [f"ID: {task_id}", f"{kind_label}: {task_target}"] if task.instance: tooltip_parts.append(f"Instance: {task.instance}") if task.stage: tooltip_parts.append(f"Stage: {task.stage}") if task.predecessors: tooltip_parts.append(f"Predecessors: {', '.join(task.predecessors)}") - if task.parameters: + if not is_chore and task.parameters: params_str = ", ".join(f"{k}={v}" for k, v in task.parameters.items()) tooltip_parts.append(f"Parameters: {params_str}") tooltip = "\n".join(tooltip_parts) - # Parameters as object for table/details view - params_obj = task.parameters if task.parameters else {} + # Parameters as object for table/details view (always empty for chores). + params_obj = task.parameters if (not is_chore and task.parameters) else {} nodes.append( { @@ -571,7 +577,13 @@ def _visualize_dag_html( "hover": {"background": color, "border": "#64748B"}, }, "stage": stage_name, - "process": task.process, + # Kind discriminator and unified task target. ``process`` + # and ``chore`` carry the raw kind-specific name; the JS + # renders ``task_target`` with a [P]/[C] tag. + "process": task.process or "", + "chore": task.chore or "", + "task_target": task_target, + "task_kind": "chore" if is_chore else "process", "instance": task.instance or "", "predecessors": task.predecessors if task.predecessors else [], "parameters": params_obj, diff --git a/src/rushti/templates/visualization.html b/src/rushti/templates/visualization.html index 8b547ff..1e8f6f6 100644 --- a/src/rushti/templates/visualization.html +++ b/src/rushti/templates/visualization.html @@ -255,6 +255,17 @@ color: white; } + .kind-tag { + display: inline-block; + padding: 1px 5px; + border-radius: 4px; + font-size: 0.7em; + font-weight: 700; + color: #475569; + background: #E2E8F0; + margin-right: 4px; + } + #taskTable .params-cell { max-width: 300px; font-family: 'SF Mono', 'Fira Code', monospace; @@ -534,7 +545,7 @@

DAG Visualization

- + @@ -572,7 +583,7 @@

Task ID

-

Process

+

Task target

@@ -771,7 +782,7 @@

Execution Options

} // Check search (includes parameters) if (searchTerm) { - var searchIn = (node.id + ' ' + node.process + ' ' + node.instance + ' ' + node.stage).toLowerCase(); + var searchIn = (node.id + ' ' + (node.task_target || node.process || node.chore || '') + ' ' + node.instance + ' ' + node.stage).toLowerCase(); // Also search in parameter names and values if (node.parameters) { Object.entries(node.parameters).forEach(function(p) { @@ -886,10 +897,12 @@

Execution Options

} var optionsHtml = optionsList.length > 0 ? optionsList.join('\\n') : '-'; + var kindTag = node.task_kind === 'chore' ? '[C]' : '[P]'; + var targetText = node.task_target || node.process || node.chore || ''; rows.push( '
' + '' + - '' + + '' + '' + '' + '' + @@ -954,7 +967,9 @@

Execution Options

document.getElementById('task-details').classList.remove('hidden'); document.getElementById('detail-id').textContent = node.id; - document.getElementById('detail-process').textContent = node.process; + var detailKindTag = node.task_kind === 'chore' ? '[C]' : '[P]'; + document.getElementById('detail-process').textContent = + detailKindTag + ' ' + (node.task_target || node.process || node.chore || ''); document.getElementById('detail-instance').textContent = node.instance || '-'; var stageEl = document.getElementById('detail-stage'); diff --git a/src/rushti/tm1_integration.py b/src/rushti/tm1_integration.py index 5ac420f..e05b9bd 100644 --- a/src/rushti/tm1_integration.py +++ b/src/rushti/tm1_integration.py @@ -38,6 +38,7 @@ INPUT_MEASURES = [ "instance", "process", + "chore", "parameters", "predecessors", "stage", @@ -53,6 +54,7 @@ ALL_MEASURES = [ "instance", "process", + "chore", "parameters", "status", "start_time", @@ -263,16 +265,29 @@ def _dataframe_to_task_definitions( if not task_id: task_id = str(len(tasks) + 1) - # Skip if no instance or process defined (empty row) + # Skip empty rows: a row is skipped iff it has no instance OR + # neither a process nor a chore. A row with both is a user error + # — raise immediately with the offending task ID so the cube + # editor can find and fix it. instance = str(row.get("instance", "")).strip() process = str(row.get("process", "")).strip() + chore = str(row.get("chore", "")).strip() - if not instance or not process: + if not instance or (not process and not chore): continue - # Parse parameters (supports JSON or space-separated key=value) + if process and chore: + raise ValueError( + f"Task '{task_id}': both 'process' and 'chore' are set in the " + f"cube — exactly one must be populated (the field name is the " + f"kind discriminator)" + ) + + # Parse parameters (supports JSON or space-separated key=value). + # Chore tasks have no invocation parameters; the cube field is + # ignored even if a user accidentally populated it. parameters_str = str(row.get("parameters", "")).strip() - parameters = _parse_parameters_string(parameters_str) + parameters = _parse_parameters_string(parameters_str) if process else {} # Parse predecessors based on mode predecessors: List[str] = [] @@ -306,7 +321,8 @@ def _dataframe_to_task_definitions( task = TaskDefinition( id=task_id, instance=instance, - process=process, + process=process or None, + chore=chore or None, parameters=parameters, predecessors=predecessors, stage=stage, @@ -470,7 +486,9 @@ def build_results_dataframe( logger.warning(f"No results found for run_id '{run_id}'") return pd.DataFrame() - # Build rows for DataFrame + # Build rows for DataFrame. Column order here is load-bearing: the + # ``}rushti.load.results`` TI declares its CSV variables positionally + # in ``tm1_objects.PROCESS_VARIABLES``, so the two must stay aligned. rows = [] for result in results: task_id = result.get("task_id", "") @@ -478,7 +496,8 @@ def build_results_dataframe( "task_id": task_id, "original_task_id": task_id, "instance": result.get("instance", ""), - "process": result.get("process", ""), + "process": result.get("process", "") or "", + "chore": result.get("chore", "") or "", "parameters": result.get("parameters", "{}"), "status": result.get("status", ""), "start_time": result.get("start_time", ""), diff --git a/src/rushti/tm1_objects.py b/src/rushti/tm1_objects.py index a88e089..98262d7 100644 --- a/src/rushti/tm1_objects.py +++ b/src/rushti/tm1_objects.py @@ -27,6 +27,7 @@ MEASURE_ELEMENTS = [ "instance", "process", + "chore", "parameters", "status", "start_time", @@ -50,6 +51,7 @@ MEASURE_ATTRIBUTES = { "instance": {"inputs": "Y", "results": "Y"}, "process": {"inputs": "Y", "results": "Y"}, + "chore": {"inputs": "Y", "results": "Y"}, "parameters": {"inputs": "Y", "results": "Y"}, "status": {"inputs": "", "results": "Y"}, "start_time": {"inputs": "", "results": "Y"}, @@ -395,6 +397,7 @@ CellPutS(vinstance, pTargetCube, vworkflow, vrun_id, vtask_id, 'instance'); CellPutS(vprocess, pTargetCube, vworkflow, vrun_id, vtask_id, 'process'); +CellPutS(vchore, pTargetCube, vworkflow, vrun_id, vtask_id, 'chore'); CellPutS(vparameters, pTargetCube, vworkflow, vrun_id, vtask_id, 'parameters'); CellPutS(vstatus, pTargetCube, vworkflow, vrun_id, vtask_id, 'status'); CellPutS(vstart_time, pTargetCube, vworkflow, vrun_id, vtask_id, 'start_time'); @@ -537,6 +540,12 @@ }, ] +# Order is load-bearing: TI reads CSV columns positionally against this +# declaration list. Must match the column order produced by +# ``rushti.tm1_integration.upload_results_to_tm1`` (which inserts +# ``workflow`` and ``run_id`` ahead of the columns built by +# ``build_results_dataframe``). Mismatched ordering silently scrambles +# the loaded payload. PROCESS_VARIABLES = [ "vworkflow", "vrun_id", @@ -544,6 +553,7 @@ "voriginal_task_id", "vinstance", "vprocess", + "vchore", "vparameters", "vstatus", "vstart_time", diff --git a/src/rushti/visualization_template.py b/src/rushti/visualization_template.py index c62d2f5..18ee0df 100644 --- a/src/rushti/visualization_template.py +++ b/src/rushti/visualization_template.py @@ -267,6 +267,17 @@ color: white; } + .kind-tag { + display: inline-block; + padding: 1px 5px; + border-radius: 4px; + font-size: 0.7em; + font-weight: 700; + color: #475569; + background: #E2E8F0; + margin-right: 4px; + } + #taskTable .params-cell { max-width: 300px; font-family: 'SF Mono', 'Fira Code', monospace; @@ -546,7 +557,7 @@ - + @@ -584,7 +595,7 @@
-

Process

+

Task target

@@ -783,7 +794,7 @@ } // Check search (includes parameters) if (searchTerm) { - var searchIn = (node.id + ' ' + node.process + ' ' + node.instance + ' ' + node.stage).toLowerCase(); + var searchIn = (node.id + ' ' + (node.task_target || node.process || node.chore || '') + ' ' + node.instance + ' ' + node.stage).toLowerCase(); // Also search in parameter names and values if (node.parameters) { Object.entries(node.parameters).forEach(function(p) { @@ -898,10 +909,12 @@ } var optionsHtml = optionsList.length > 0 ? optionsList.join('\\n') : '-'; + var kindTag = node.task_kind === 'chore' ? '[C]' : '[P]'; + var targetText = node.task_target || node.process || node.chore || ''; rows.push( '
' + '' + - '' + + '' + '' + '' + '' + @@ -966,7 +979,9 @@ document.getElementById('task-details').classList.remove('hidden'); document.getElementById('detail-id').textContent = node.id; - document.getElementById('detail-process').textContent = node.process; + var detailKindTag = node.task_kind === 'chore' ? '[C]' : '[P]'; + document.getElementById('detail-process').textContent = + detailKindTag + ' ' + (node.task_target || node.process || node.chore || ''); document.getElementById('detail-instance').textContent = node.instance || '-'; var stageEl = document.getElementById('detail-stage'); diff --git a/tests/unit/test_chore_kind.py b/tests/unit/test_chore_kind.py new file mode 100644 index 0000000..7e3491d --- /dev/null +++ b/tests/unit/test_chore_kind.py @@ -0,0 +1,460 @@ +"""Tests for the TM1 chore task kind (issue #156). + +Covers the polymorphic Task abstraction: +- ``TaskDefinition`` mutual-exclusion + per-kind forbidden-field validation +- ``Task`` / ``OptimizedTask`` class-level mutual-exclusion invariant +- TXT → JSON conversion (chore keyword, validate_taskfile invocation, + pre-existing malformed inputs now failing) +- Parsing: chore_name threaded through five construction sites; + expand_task early-returns for chores +- Execution: dispatch to chore branch, retry on safe_retry, validation + against a mocked TM1Service +- Cube reader: chore acceptance, both-populated rejection +- Signature: disjoint signature space, no collision with processes +""" + +import os +import tempfile +import unittest +import unittest.mock +from unittest.mock import MagicMock + +import pandas as pd + +from rushti.execution import ( + ExecutionContext, + execute_chore_with_retries, + execute_task, + validate_tasks, +) +from rushti.parsing import expand_task, extract_task_from_line, convert_json_to_dag +from rushti.stats.signature import calculate_task_signature +from rushti.task import OptimizedTask, Task +from rushti.taskfile import ( + TaskDefinition, + Taskfile, + TaskfileMetadata, + TaskfileSettings, + TaskfileValidationError, + convert_txt_to_json, + validate_taskfile, +) +from rushti.tm1_integration import _dataframe_to_task_definitions + + +def _base_chore_task(**overrides): + task = {"id": "1", "instance": "tm1srv01", "chore": "daily_etl"} + task.update(overrides) + return {"version": "2.0", "tasks": [task]} + + +def _base_process_task(**overrides): + task = {"id": "1", "instance": "tm1srv01", "process": "load.cube"} + task.update(overrides) + return {"version": "2.0", "tasks": [task]} + + +class TestKindValidation(unittest.TestCase): + """validate_taskfile / validate_task with mixed kinds.""" + + def test_valid_chore_task_passes(self): + errors = validate_taskfile(_base_chore_task()) + self.assertEqual(errors, []) + + def test_chore_with_safe_retry_passes(self): + errors = validate_taskfile(_base_chore_task(safe_retry=True)) + self.assertEqual(errors, []) + + def test_rejects_both_process_and_chore(self): + errors = validate_taskfile(_base_chore_task(process="load.cube")) + self.assertTrue(any("mutually exclusive" in e for e in errors), f"Got: {errors}") + + def test_rejects_neither_process_nor_chore(self): + data = { + "version": "2.0", + "tasks": [{"id": "1", "instance": "tm1srv01"}], + } + errors = validate_taskfile(data) + self.assertTrue( + any("exactly one of 'process' or 'chore'" in e for e in errors), + f"Got: {errors}", + ) + + def test_rejects_parameters_on_chore(self): + errors = validate_taskfile(_base_chore_task(parameters={"x": "1"})) + self.assertTrue( + any("'parameters' is not allowed on chore tasks" in e for e in errors), + f"Got: {errors}", + ) + + def test_rejects_succeed_on_minor_errors_on_chore(self): + errors = validate_taskfile(_base_chore_task(succeed_on_minor_errors=True)) + self.assertTrue( + any("'succeed_on_minor_errors' is not allowed on chore tasks" in e for e in errors), + f"Got: {errors}", + ) + + def test_rejects_timeout_on_chore(self): + errors = validate_taskfile(_base_chore_task(timeout=60)) + self.assertTrue( + any("'timeout' is not allowed on chore tasks" in e for e in errors), + f"Got: {errors}", + ) + + def test_rejects_cancel_at_timeout_on_chore(self): + errors = validate_taskfile(_base_chore_task(cancel_at_timeout=True)) + self.assertTrue( + any("'cancel_at_timeout' is not allowed on chore tasks" in e for e in errors), + f"Got: {errors}", + ) + + def test_allows_default_empty_parameters_on_chore(self): + # An explicit empty dict is silently accepted so round-tripping + # a TaskDefinition's `to_dict()` output stays idempotent. + errors = validate_taskfile(_base_chore_task(parameters={})) + self.assertEqual(errors, []) + + +class TestTaskInvariant(unittest.TestCase): + """Class-level mutual exclusion on Task.__init__.""" + + def test_chore_only_task_constructs(self): + Task.reset_id_counter() + t = Task(instance_name="tm1srv01", chore_name="daily_etl") + self.assertEqual(t.chore_name, "daily_etl") + self.assertIsNone(t.process_name) + + def test_process_only_task_constructs(self): + Task.reset_id_counter() + t = Task(instance_name="tm1srv01", process_name="load.cube") + self.assertEqual(t.process_name, "load.cube") + self.assertIsNone(t.chore_name) + + def test_both_set_raises(self): + Task.reset_id_counter() + with self.assertRaises(ValueError): + Task( + instance_name="tm1srv01", + process_name="load.cube", + chore_name="daily_etl", + ) + + def test_neither_set_raises(self): + Task.reset_id_counter() + with self.assertRaises(ValueError): + Task(instance_name="tm1srv01") + + def test_optimized_task_both_set_raises(self): + with self.assertRaises(ValueError): + OptimizedTask( + task_id="1", + instance_name="tm1srv01", + process_name="load.cube", + chore_name="daily_etl", + ) + + def test_translate_to_line_chore(self): + Task.reset_id_counter() + t = Task(instance_name="tm1srv01", chore_name="daily_etl", safe_retry=True) + line = t.translate_to_line() + self.assertIn('chore="daily_etl"', line) + self.assertIn('instance="tm1srv01"', line) + self.assertIn('safe_retry="True"', line) + # Process-only fields must NOT appear. + self.assertNotIn("succeed_on_minor_errors", line) + self.assertNotIn("timeout", line) + self.assertNotIn("cancel_at_timeout", line) + + def test_optimized_task_translate_to_line_chore(self): + t = OptimizedTask( + task_id="2", + instance_name="tm1srv01", + chore_name="daily_etl", + predecessors=["1"], + require_predecessor_success=True, + ) + line = t.translate_to_line() + self.assertIn('chore="daily_etl"', line) + self.assertIn('predecessors="1"', line) + self.assertNotIn("succeed_on_minor_errors", line) + + +class TestTxtConversion(unittest.TestCase): + """convert_txt_to_json now invokes validate_taskfile.""" + + def _write(self, content: str) -> str: + fd, path = tempfile.mkstemp(suffix=".txt") + with os.fdopen(fd, "w") as f: + f.write(content) + self.addCleanup(os.unlink, path) + return path + + def test_chore_line_parses(self): + path = self._write('instance="tm1srv01" chore="daily_etl" safe_retry="true"\n') + taskfile = convert_txt_to_json(path) + self.assertEqual(len(taskfile.tasks), 1) + self.assertEqual(taskfile.tasks[0].chore, "daily_etl") + self.assertIsNone(taskfile.tasks[0].process) + self.assertTrue(taskfile.tasks[0].safe_retry) + + def test_chore_with_forbidden_parameters_rejected(self): + # The new validate_taskfile call exposes a previously-silent gap. + path = self._write('instance="tm1srv01" chore="daily_etl" pX="1"\n') + with self.assertRaises(TaskfileValidationError): + convert_txt_to_json(path) + + def test_malformed_txt_now_raises(self): + # Previously: silently produced a Task with empty instance/process. + # Now: validate_taskfile rejects it explicitly. + path = self._write('process="orphan.process"\n') + with self.assertRaises(TaskfileValidationError): + convert_txt_to_json(path) + + +class TestExpandTaskChore(unittest.TestCase): + """expand_task early-returns for chore tasks.""" + + def test_chore_task_short_circuits(self): + Task.reset_id_counter() + t = Task(instance_name="tm1srv01", chore_name="daily_etl") + # tm1_services is empty — if expand_task tried to do anything + # real with it for a chore, this would KeyError. + result = expand_task(tm1_services={}, task=t) + self.assertEqual(result, [t]) + + +class TestParsingChoreThreading(unittest.TestCase): + """All five Task construction sites thread chore_name.""" + + def test_extract_task_from_line_chore(self): + Task.reset_id_counter() + task = extract_task_from_line('instance="tm1srv01" chore="daily_etl"', task_class=Task) + self.assertEqual(task.chore_name, "daily_etl") + self.assertIsNone(task.process_name) + + def test_extract_optimized_task_from_line_chore(self): + task = extract_task_from_line( + 'id="1" instance="tm1srv01" chore="daily_etl"', + task_class=OptimizedTask, + ) + self.assertEqual(task.chore_name, "daily_etl") + self.assertEqual(task.id, "1") + + def test_convert_json_to_dag_chore(self): + taskfile = Taskfile( + metadata=TaskfileMetadata(workflow="mixed"), + settings=TaskfileSettings(), + tasks=[ + TaskDefinition(id="1", instance="tm1srv01", chore="daily_etl"), + TaskDefinition( + id="2", + instance="tm1srv01", + process="load.cube", + predecessors=["1"], + ), + ], + ) + dag = convert_json_to_dag(taskfile) + all_tasks = dag.get_all_tasks() + chore_tasks = [t for t in all_tasks if t.chore_name] + process_tasks = [t for t in all_tasks if t.process_name] + self.assertEqual(len(chore_tasks), 1) + self.assertEqual(len(process_tasks), 1) + self.assertEqual(chore_tasks[0].chore_name, "daily_etl") + + +class TestSignatureChore(unittest.TestCase): + """Chore signatures are disjoint from process signatures.""" + + def test_chore_signature_ignores_parameters(self): + sig_no_params = calculate_task_signature("tm1srv01", None, None, chore="daily_etl") + sig_with_params = calculate_task_signature( + "tm1srv01", None, {"ignored": "1"}, chore="daily_etl" + ) + self.assertEqual(sig_no_params, sig_with_params) + + def test_chore_signature_disjoint_from_process(self): + # Same instance, same name — but different kinds must hash differently. + process_sig = calculate_task_signature("tm1srv01", "daily_etl", None) + chore_sig = calculate_task_signature("tm1srv01", None, None, chore="daily_etl") + self.assertNotEqual(process_sig, chore_sig) + + def test_different_chore_names_differ(self): + sig_a = calculate_task_signature("tm1srv01", None, None, chore="etl_a") + sig_b = calculate_task_signature("tm1srv01", None, None, chore="etl_b") + self.assertNotEqual(sig_a, sig_b) + + +class TestExecuteChoreRetries(unittest.TestCase): + """execute_chore_with_retries success + retry semantics.""" + + def _chore_task(self, safe_retry=False): + Task.reset_id_counter() + return Task( + instance_name="tm1srv01", + chore_name="daily_etl", + safe_retry=safe_retry, + ) + + def test_success_first_attempt(self): + tm1 = MagicMock() + tm1.chores.execute_chore = MagicMock(return_value=None) + ok, status, log, attempt = execute_chore_with_retries(tm1, self._chore_task(), retries=3) + self.assertTrue(ok) + self.assertEqual(status, "Completed") + self.assertEqual(log, "") + self.assertEqual(attempt, 0) + tm1.chores.execute_chore.assert_called_once_with("daily_etl") + + def test_failure_without_safe_retry_does_not_retry(self): + tm1 = MagicMock() + tm1.chores.execute_chore = MagicMock(side_effect=RuntimeError("boom")) + with self.assertRaises(RuntimeError): + execute_chore_with_retries(tm1, self._chore_task(safe_retry=False), retries=3) + # safe_retry=False → single attempt regardless of global retries. + self.assertEqual(tm1.chores.execute_chore.call_count, 1) + + def test_failure_with_safe_retry_retries(self): + tm1 = MagicMock() + tm1.chores.execute_chore = MagicMock(side_effect=RuntimeError("boom")) + with self.assertRaises(RuntimeError): + execute_chore_with_retries(tm1, self._chore_task(safe_retry=True), retries=2) + # safe_retry=True → initial attempt + 2 retries = 3 calls. + self.assertEqual(tm1.chores.execute_chore.call_count, 3) + + def test_safe_retry_recovers_on_second_attempt(self): + tm1 = MagicMock() + tm1.chores.execute_chore = MagicMock(side_effect=[RuntimeError("flake"), None]) + ok, _, _, attempt = execute_chore_with_retries( + tm1, self._chore_task(safe_retry=True), retries=3 + ) + self.assertTrue(ok) + self.assertEqual(attempt, 1) + + +class TestExecuteTaskDispatch(unittest.TestCase): + """execute_task routes chore-kind tasks to the chore branch.""" + + def test_chore_dispatch(self): + Task.reset_id_counter() + task = Task(instance_name="tm1srv01", chore_name="daily_etl") + tm1 = MagicMock() + tm1.chores.execute_chore = MagicMock(return_value=None) + tm1_services = {"tm1srv01": tm1} + ctx = ExecutionContext() + + result = execute_task(ctx, task, retries=0, tm1_services=tm1_services) + self.assertTrue(result) + tm1.chores.execute_chore.assert_called_once_with("daily_etl") + # The process path must not have been touched. + tm1.processes.execute_with_return.assert_not_called() + + +class TestValidateTasksChore(unittest.TestCase): + """validate_tasks chore branch: existence + SINGLE_COMMIT check.""" + + def _chore_task(self, safe_retry=False): + Task.reset_id_counter() + return Task( + instance_name="tm1srv01", + chore_name="daily_etl", + safe_retry=safe_retry, + ) + + def test_existence_check(self): + tm1 = MagicMock() + tm1.chores.exists = MagicMock(return_value=True) + tm1_services = {"tm1srv01": tm1} + + ok = validate_tasks([self._chore_task()], tm1_services) + self.assertTrue(ok) + tm1.chores.exists.assert_called_once_with("daily_etl") + # No safe_retry → never fetched the chore object. + tm1.chores.get.assert_not_called() + + def test_missing_chore_fails(self): + tm1 = MagicMock() + tm1.chores.exists = MagicMock(return_value=False) + tm1_services = {"tm1srv01": tm1} + + ok = validate_tasks([self._chore_task()], tm1_services) + self.assertFalse(ok) + + def test_safe_retry_requires_single_commit(self): + from TM1py.Objects import Chore + + tm1 = MagicMock() + tm1.chores.exists = MagicMock(return_value=True) + bad_chore = MagicMock() + bad_chore.execution_mode = Chore.MULTIPLE_COMMIT + tm1.chores.get = MagicMock(return_value=bad_chore) + + ok = validate_tasks( + [self._chore_task(safe_retry=True)], + {"tm1srv01": tm1}, + ) + self.assertFalse(ok) + tm1.chores.get.assert_called_once_with("daily_etl") + + def test_safe_retry_single_commit_ok(self): + from TM1py.Objects import Chore + + tm1 = MagicMock() + tm1.chores.exists = MagicMock(return_value=True) + good_chore = MagicMock() + good_chore.execution_mode = Chore.SINGLE_COMMIT + tm1.chores.get = MagicMock(return_value=good_chore) + + ok = validate_tasks( + [self._chore_task(safe_retry=True)], + {"tm1srv01": tm1}, + ) + self.assertTrue(ok) + + +class TestCubeReaderChore(unittest.TestCase): + """_dataframe_to_task_definitions accepts chore rows.""" + + def _row(self, **kwargs): + row = { + "rushti_task_id": "1", + "instance": "", + "process": "", + "chore": "", + "parameters": "", + "wait": "", + "predecessors": "", + "stage": "", + "safe_retry": "", + "timeout": "", + "cancel_at_timeout": "", + "require_predecessor_success": "", + "succeed_on_minor_errors": "", + } + row.update(kwargs) + return row + + def test_reads_chore_row(self): + df = pd.DataFrame([self._row(rushti_task_id="1", instance="tm1srv01", chore="daily_etl")]) + tasks = _dataframe_to_task_definitions(df, mode="opt") + self.assertEqual(len(tasks), 1) + self.assertEqual(tasks[0].chore, "daily_etl") + self.assertIsNone(tasks[0].process) + + def test_rejects_both_populated(self): + df = pd.DataFrame( + [ + self._row( + rushti_task_id="1", + instance="tm1srv01", + process="load.cube", + chore="daily_etl", + ) + ] + ) + with self.assertRaises(ValueError): + _dataframe_to_task_definitions(df, mode="opt") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/unit/test_taskfile.py b/tests/unit/test_taskfile.py index 6870c82..a0afa5a 100644 --- a/tests/unit/test_taskfile.py +++ b/tests/unit/test_taskfile.py @@ -57,11 +57,17 @@ def test_validate_empty_tasks(self): self.assertIn("'tasks' array cannot be empty", errors) def test_validate_missing_required_properties(self): - """Test validation catches missing required task properties""" - data = {"version": "2.0", "tasks": [{"id": "1"}]} # Missing instance and process + """Test validation catches missing required task properties. + + ``process`` is no longer a flat required property — it's part of + the process-xor-chore kind discriminator. The validator surfaces + a single "exactly one of 'process' or 'chore' must be set" + message instead. + """ + data = {"version": "2.0", "tasks": [{"id": "1"}]} # Missing instance and kind errors = validate_taskfile(data) self.assertTrue(any("Missing required property 'instance'" in e for e in errors)) - self.assertTrue(any("Missing required property 'process'" in e for e in errors)) + self.assertTrue(any("exactly one of 'process' or 'chore' must be set" in e for e in errors)) def test_validate_duplicate_task_ids(self): """Test validation catches duplicate task IDs""" From 833d6bb82fc919712489a97bec036edddec1b22f Mon Sep 17 00:00:00 2001 From: nicolasbisurgi Date: Wed, 20 May 2026 19:25:59 -0300 Subject: [PATCH 2/2] test: end-to-end chore execution against TM1 v11 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 13 integration tests against tm1srv01 (v11.8) using the preconfigured `test_chore_success` and `test_chore_error` chores. Covers: - execute_chore_with_retries: success, failure without safe_retry (single attempt), failure with safe_retry (raises after budget). - validate_tasks chore branch: existence pass/fail, SingleCommit check on safe_retry. - execute_task dispatch routes chore-kind tasks to the chore path on success and on failure (HTTP 500 → False). - Mixed process + chore DAG with predecessors crossing kinds — forward path executes in order, a failing chore aborts dependent tasks via require_predecessor_success. - build_logging_objects idempotently adds the `chore` measure element to an existing rushti cube. - Stats DB records process and chore rows with disjoint signatures (no collision when they share a name). - End-to-end results push: chore execution row → stats DB → upload_results_to_tm1 → }rushti.load.results TI → cube cell under the `chore` measure. Validates the v11 TI body, the CSV column ordering against PROCESS_VARIABLES, and the additive cube schema in a single round-trip. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/integration/test_v11_chore_kind.py | 459 +++++++++++++++++++++++ 1 file changed, 459 insertions(+) create mode 100644 tests/integration/test_v11_chore_kind.py diff --git a/tests/integration/test_v11_chore_kind.py b/tests/integration/test_v11_chore_kind.py new file mode 100644 index 0000000..de312fe --- /dev/null +++ b/tests/integration/test_v11_chore_kind.py @@ -0,0 +1,459 @@ +"""Integration tests for the TM1 chore task kind (issue #156) on v11. + +Targets a live TM1 v11 instance (``tm1srv01``) that has two preconfigured +chores: +- ``test_chore_success`` — succeeds (HTTP 204). +- ``test_chore_error`` — fails (HTTP 500, "Chore execution failed"). + +Both must be SingleCommit so the ``safe_retry`` path is exercisable. + +Run with: ``pytest tests/integration/test_v11_chore_kind.py -v`` +""" + +import asyncio +import os +import shutil +import sys +import tempfile +import unittest +from datetime import datetime + +import pytest + +_src_path = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "src" +) +if _src_path not in sys.path: + sys.path.insert(0, _src_path) +_integration_path = os.path.dirname(os.path.abspath(__file__)) +if _integration_path not in sys.path: + sys.path.insert(0, _integration_path) + +from rushti.execution import ( + ExecutionContext, + execute_chore_with_retries, + execute_task, + logout, + setup_tm1_services, + validate_tasks, + work_through_tasks_dag, +) +from rushti.parsing import convert_json_to_dag +from rushti.task import Task +from rushti.taskfile import ( + Taskfile, + TaskfileMetadata, + TaskfileSettings, + TaskDefinition, +) +from rushti.tm1_build import build_logging_objects, verify_logging_objects +from rushti.tm1_integration import ( + build_results_dataframe, + upload_results_to_tm1, +) +from rushti.stats import StatsDatabase +from conftest import get_all_test_tm1_configs, get_test_tm1_names +from tm1_setup import setup_tm1_test_objects + +CHORE_SUCCESS = "test_chore_success" +CHORE_ERROR = "test_chore_error" + + +@pytest.mark.requires_tm1 +@pytest.mark.integration +class TestV11ChoreKind(unittest.TestCase): + """End-to-end chore execution against a live TM1 v11 instance.""" + + INSTANCE = "tm1srv01" + + @classmethod + def setUpClass(cls): + configs, source = get_all_test_tm1_configs() + if not configs or cls.INSTANCE not in configs: + cls._tm1_available = False + return + + cls._config_path = source + cls._test_dir = tempfile.mkdtemp() + + # Bootstrap the connection pool via a tiny dummy taskfile so we + # reuse the production setup_tm1_services code path. We do NOT + # execute this taskfile. + bootstrap = os.path.join(cls._test_dir, "bootstrap.txt") + with open(bootstrap, "w") as f: + f.write(f'instance="{cls.INSTANCE}" process="}}bedrock.server.wait" pWaitSec="0"\n') + + try: + cls._tm1_services, cls._preserve_connections = setup_tm1_services( + max_workers=4, + tasks_file_path=bootstrap, + config_path=cls._config_path, + ) + tm1_names = get_test_tm1_names() + for tm1 in cls._tm1_services.values(): + setup_tm1_test_objects(tm1, **tm1_names) + cls._tm1_available = cls.INSTANCE in cls._tm1_services + cls._tm1_names = tm1_names + except Exception as e: + print(f"TM1 setup failed: {e}") + cls._tm1_available = False + cls._tm1_services = {} + cls._preserve_connections = {} + + @classmethod + def tearDownClass(cls): + if getattr(cls, "_tm1_services", None): + logout(cls._tm1_services, cls._preserve_connections or {}) + if hasattr(cls, "_test_dir"): + shutil.rmtree(cls._test_dir, ignore_errors=True) + + def setUp(self): + if not getattr(self, "_tm1_available", False): + self.skipTest(f"{self.INSTANCE} not available") + Task.reset_id_counter() + # Verify the two test chores are present — if a future operator + # tears them down, the suite should skip rather than spuriously + # fail. + tm1 = self._tm1_services[self.INSTANCE] + for chore in (CHORE_SUCCESS, CHORE_ERROR): + if not tm1.chores.exists(chore): + self.skipTest(f"chore '{chore}' not present on {self.INSTANCE}") + + # ------------------------------------------------------------------ + # Direct retry helper + # ------------------------------------------------------------------ + + def test_execute_chore_with_retries_success(self): + tm1 = self._tm1_services[self.INSTANCE] + task = Task(instance_name=self.INSTANCE, chore_name=CHORE_SUCCESS) + ok, status, log_file, attempt = execute_chore_with_retries(tm1, task, retries=2) + self.assertTrue(ok) + self.assertEqual(status, "Completed") + self.assertEqual(log_file, "") + self.assertEqual(attempt, 0) + + def test_execute_chore_with_retries_failure_no_safe_retry(self): + tm1 = self._tm1_services[self.INSTANCE] + task = Task(instance_name=self.INSTANCE, chore_name=CHORE_ERROR, safe_retry=False) + with self.assertRaises(Exception) as cm: + execute_chore_with_retries(tm1, task, retries=3) + # safe_retry=False → single attempt regardless of the global cap. + self.assertIn("Chore execution failed", str(cm.exception)) + + def test_execute_chore_with_retries_failure_with_safe_retry(self): + # safe_retry exhausts the retry budget and then raises. We don't + # assert exact call counts here (TM1 doesn't expose them), but we + # do assert the raised exception's text to be sure we went via + # the chore path. + tm1 = self._tm1_services[self.INSTANCE] + task = Task(instance_name=self.INSTANCE, chore_name=CHORE_ERROR, safe_retry=True) + with self.assertRaises(Exception) as cm: + execute_chore_with_retries(tm1, task, retries=1) + self.assertIn("Chore execution failed", str(cm.exception)) + + # ------------------------------------------------------------------ + # validate_tasks chore branch + # ------------------------------------------------------------------ + + def test_validate_tasks_chore_exists(self): + task = Task(instance_name=self.INSTANCE, chore_name=CHORE_SUCCESS) + ok = validate_tasks([task], self._tm1_services) + self.assertTrue(ok) + + def test_validate_tasks_chore_missing(self): + task = Task( + instance_name=self.INSTANCE, + chore_name="this_chore_definitely_does_not_exist", + ) + ok = validate_tasks([task], self._tm1_services) + self.assertFalse(ok) + + def test_validate_tasks_single_commit_check_passes(self): + # Both test chores are SingleCommit on this instance. + task = Task( + instance_name=self.INSTANCE, + chore_name=CHORE_SUCCESS, + safe_retry=True, + ) + ok = validate_tasks([task], self._tm1_services) + self.assertTrue(ok) + + # ------------------------------------------------------------------ + # execute_task dispatch + # ------------------------------------------------------------------ + + def test_execute_task_dispatches_chore_success(self): + task = Task(instance_name=self.INSTANCE, chore_name=CHORE_SUCCESS) + ctx = ExecutionContext() + ok = execute_task(ctx, task, retries=0, tm1_services=self._tm1_services) + self.assertTrue(ok) + + def test_execute_task_dispatches_chore_failure(self): + task = Task(instance_name=self.INSTANCE, chore_name=CHORE_ERROR) + ctx = ExecutionContext() + ok = execute_task(ctx, task, retries=0, tm1_services=self._tm1_services) + self.assertFalse(ok) + + # ------------------------------------------------------------------ + # Mixed process + chore DAG + # ------------------------------------------------------------------ + + def test_mixed_dag_executes_in_order(self): + """Mixed process + chore taskfile end-to-end, predecessors crossing kinds. + + DAG: process (id=1) → chore_success (id=2) → process (id=3) + """ + Task.reset_id_counter() + taskfile = Taskfile( + metadata=TaskfileMetadata(workflow="chore-mixed-v11"), + settings=TaskfileSettings(), + tasks=[ + TaskDefinition( + id="1", + instance=self.INSTANCE, + process="}bedrock.server.wait", + parameters={"pWaitSec": "0"}, + ), + TaskDefinition( + id="2", + instance=self.INSTANCE, + chore=CHORE_SUCCESS, + predecessors=["1"], + require_predecessor_success=True, + ), + TaskDefinition( + id="3", + instance=self.INSTANCE, + process="}bedrock.server.wait", + parameters={"pWaitSec": "0"}, + predecessors=["2"], + require_predecessor_success=True, + ), + ], + ) + dag = convert_json_to_dag(taskfile) + loop = asyncio.new_event_loop() + try: + outcomes = loop.run_until_complete( + work_through_tasks_dag( + ExecutionContext(), + dag, + max_workers=2, + retries=0, + tm1_services=self._tm1_services, + ) + ) + finally: + loop.close() + + self.assertEqual(len(outcomes), 3) + self.assertTrue(all(outcomes), f"Not all tasks succeeded: {outcomes}") + + def test_mixed_dag_failed_chore_aborts_dependent(self): + """A failing chore must skip dependents that require predecessor success.""" + Task.reset_id_counter() + taskfile = Taskfile( + metadata=TaskfileMetadata(workflow="chore-mixed-fail-v11"), + settings=TaskfileSettings(), + tasks=[ + TaskDefinition( + id="1", + instance=self.INSTANCE, + chore=CHORE_ERROR, + ), + TaskDefinition( + id="2", + instance=self.INSTANCE, + process="}bedrock.server.wait", + parameters={"pWaitSec": "0"}, + predecessors=["1"], + require_predecessor_success=True, + ), + ], + ) + dag = convert_json_to_dag(taskfile) + loop = asyncio.new_event_loop() + try: + outcomes = loop.run_until_complete( + work_through_tasks_dag( + ExecutionContext(), + dag, + max_workers=2, + retries=0, + tm1_services=self._tm1_services, + ) + ) + finally: + loop.close() + + # Both tasks reported failure: chore_error explicitly, and the + # dependent process aborts on require_predecessor_success. + self.assertEqual(len(outcomes), 2) + self.assertFalse(any(outcomes), f"Expected all failures, got {outcomes}") + + # ------------------------------------------------------------------ + # rushti.build idempotently adds the `chore` measure + # ------------------------------------------------------------------ + + def test_build_logging_objects_adds_chore_measure(self): + tm1 = self._tm1_services[self.INSTANCE] + names = self._tm1_names + # Run idempotently; the rushti cube already exists from the + # test environment setup. The additive merge must include + # ``chore`` after this PR. + build_logging_objects(tm1, force=False, **names) + status = verify_logging_objects(tm1, **names) + # The cube + dimensions all exist. + self.assertTrue(status.get(names["cube_name"], False)) + + # Verify the chore element landed in the measure dimension. + elements = tm1.elements.get_element_names( + dimension_name=names["dim_measure"], + hierarchy_name=names["dim_measure"], + ) + self.assertIn("chore", elements) + self.assertIn("process", elements) + + # ------------------------------------------------------------------ + # Stats DB round-trip with chore tasks + # ------------------------------------------------------------------ + + def test_stats_db_records_chore_with_disjoint_signature(self): + Task.reset_id_counter() + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + db_path = f.name + try: + stats_db = StatsDatabase(db_path=db_path, enabled=True) + run_id = "chore_stats_" + datetime.now().strftime("%Y%m%d_%H%M%S") + workflow = "test-chore-stats-v11" + stats_db.start_run(run_id=run_id, workflow=workflow) + + # Record one process row and one chore row sharing a name. + start = datetime.now() + stats_db.record_task( + run_id=run_id, + task_id="1", + instance=self.INSTANCE, + process="some.process", + chore=None, + parameters={}, + success=True, + start_time=start, + end_time=start, + workflow=workflow, + ) + stats_db.record_task( + run_id=run_id, + task_id="2", + instance=self.INSTANCE, + process="", + chore="some.process", + parameters=None, + success=True, + start_time=start, + end_time=start, + workflow=workflow, + ) + + rows = stats_db.get_run_results(run_id) + self.assertEqual(len(rows), 2) + sigs = {r["task_signature"] for r in rows} + # Disjoint signature space — process and chore named the same + # must NOT collide. + self.assertEqual(len(sigs), 2) + + kinds = {("chore" if r.get("chore") else "process") for r in rows} + self.assertEqual(kinds, {"process", "chore"}) + + stats_db.complete_run(run_id=run_id, success_count=2, failure_count=0) + stats_db.close() + finally: + os.unlink(db_path) + + # ------------------------------------------------------------------ + # Results-push round-trip: chore row lands under `chore` measure + # ------------------------------------------------------------------ + + def test_results_push_populates_chore_measure(self): + """End-to-end: chore success → stats DB → CSV upload → load TI → cube cell. + + Exercises the full results pipeline that broke in the issue body: + a chore-row's ``vchore`` variable must land in the ``chore`` + measure element under the right (workflow, run_id, task_id) tuple. + """ + tm1 = self._tm1_services[self.INSTANCE] + names = self._tm1_names + workflow = "chore-resultspush-v11" + # Use a fresh run_id so the cube cell doesn't collide with any + # historical data and so we can assert exact-match retrieval. + run_id = "chorerp_" + datetime.now().strftime("%Y%m%d_%H%M%S") + task_id = "1" + + # 1. Record a chore execution in the stats DB. + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + db_path = f.name + try: + stats_db = StatsDatabase(db_path=db_path, enabled=True) + stats_db.start_run(run_id=run_id, workflow=workflow) + now = datetime.now() + stats_db.record_task( + run_id=run_id, + task_id=task_id, + instance=self.INSTANCE, + process="", + chore=CHORE_SUCCESS, + parameters=None, + success=True, + start_time=now, + end_time=now, + workflow=workflow, + ) + stats_db.complete_run(run_id=run_id, success_count=1, failure_count=0) + + # 2. Build the results DataFrame and upload it as the CSV that + # the ``}rushti.load.results`` TI consumes. + df = build_results_dataframe(stats_db, workflow, run_id) + self.assertFalse(df.empty) + self.assertIn("chore", df.columns) + self.assertEqual(df.iloc[0]["chore"], CHORE_SUCCESS) + file_name = upload_results_to_tm1(tm1, workflow, run_id, df) + + # 3. Run the load process. It must (a) compile against the new + # vchore variable list, and (b) populate the ``chore`` + # measure cell for this row. TM1 v11 stores uploaded + # files with a ``.blb`` extension internally; v12 keeps + # the original name. + from TM1py.Utils import integerize_version + + version = integerize_version(tm1.version, 2) + load_file_name = file_name + ".blb" if version < 12 else file_name + success, status, error_file = tm1.processes.execute_with_return( + process_name="}rushti.load.results", + pSourceFile=load_file_name, + pTargetCube=names["cube_name"], + pWorkflow_Dim=names["dim_workflow"], + pTaskId_Dim=names["dim_task"], + pRunId_Dim=names["dim_run"], + ) + self.assertTrue( + success, + f"}}rushti.load.results failed (status={status}); error file={error_file}", + ) + + # 4. Read the cube cell back and assert the chore name landed + # under the ``chore`` measure. TM1py's get_value() expects + # elements separated by ``,`` (positional, matched against + # the cube's dimension order). + value = tm1.cells.get_value( + cube_name=names["cube_name"], + elements=",".join([workflow, run_id, task_id, "chore"]), + ) + self.assertEqual(value, CHORE_SUCCESS) + + stats_db.close() + finally: + os.unlink(db_path) + + +if __name__ == "__main__": + unittest.main()
Task IDProcessInstanceTask IDTask targetInstance RunDuration (s)Error
${{r.task_id}} ${{r.instance}}${{r.process}}${{kindTag}} ${{target}} ${{r.avg.toFixed(2)}} ${{r.mn.toFixed(2)}} ${{r.mx.toFixed(2)}}
${{o.task_id}}${{o.process}}${{kindTag}} ${{target}} ${{o.instance}} ${{o.run_id}} ${{o.duration_seconds.toFixed(2)}}
${{f.task_id}}${{f.process}}${{kindTag}} ${{target}} ${{f.instance}} ${{f.run_id}} ${{f.duration_seconds != null ? f.duration_seconds.toFixed(2) : '-'}} ${{f.error_message || '-'}}
ID Process Task target Instance Stage Predecessors
' + node.id + '' + node.process + '' + kindTag + ' ' + targetText + '' + (node.instance || '-') + '' + node.stage + '' + predsHtml + '
ID Process Task target Instance Stage Predecessors
' + node.id + '' + node.process + '' + kindTag + ' ' + targetText + '' + (node.instance || '-') + '' + node.stage + '' + predsHtml + '