From 7bedd537989b5f7d503c8af1f43640e933901f5e Mon Sep 17 00:00:00 2001 From: Dan Chaffelson Date: Mon, 22 Jun 2026 09:05:30 +0100 Subject: [PATCH] feat(ci): add port status visibility and consistent component recursion get_status now reports input/output port counts (total, running, stopped, invalid) and derives processor counts from enumeration rather than the PG aggregate, so invalid ports are no longer mis-reported as invalid processors. Adds canvas.list_invalid_ports (parallel to list_invalid_processors) and a verify_ports option to verify_config with a new port_results key. verify_config controller verification now recurses into descendant PGs (descendants=True), consistent with processor and port verification, while still excluding ancestor-inherited services. Also reworks the developer release guide into separate Patch and Full release workflows. .... Generated with [Cortex Code](https://docs.snowflake.com/en/user-guide/cortex-code/cortex-code) Co-Authored-By: Cortex Code --- docs/ci.rst | 2 + docs/devnotes.rst | 116 ++++++++++++++++++++++++++++-------- docs/history.rst | 23 +++++++ nipyapi/canvas.py | 32 ++++++++++ nipyapi/ci/get_status.py | 65 +++++++++++++++----- nipyapi/ci/verify_config.py | 60 ++++++++++++++----- tests/conftest.py | 23 +++++++ tests/test_canvas.py | 20 +++++++ tests/test_ci.py | 60 ++++++++++++++++++- 9 files changed, 345 insertions(+), 56 deletions(-) diff --git a/docs/ci.rst b/docs/ci.rst index 1eb28e3d..d1301983 100644 --- a/docs/ci.rst +++ b/docs/ci.rst @@ -194,6 +194,8 @@ Parameter Description Env - ``process_group_id``, ``process_group_name``, ``state``, ``is_root`` - Processor counts: ``total_processors``, ``running_processors``, ``stopped_processors``, ``invalid_processors``, ``disabled_processors`` +- Input port counts: ``total_input_ports``, ``running_input_ports``, ``stopped_input_ports``, ``invalid_input_ports`` +- Output port counts: ``total_output_ports``, ``running_output_ports``, ``stopped_output_ports``, ``invalid_output_ports`` - Controller counts: ``total_controllers``, ``enabled_controllers``, ``disabled_controllers`` - Queue stats: ``queued_flowfiles``, ``queued_bytes``, ``active_threads`` - Version control: ``versioned``, ``version_id``, ``flow_id``, ``version_state``, ``modified`` diff --git a/docs/devnotes.rst b/docs/devnotes.rst index 9a0cf4b1..a2c8ef49 100644 --- a/docs/devnotes.rst +++ b/docs/devnotes.rst @@ -215,8 +215,69 @@ Release Process Streamlined release workflow using our modern build system. Assumes development environment is set up (``make dev-install`` completed). -Pre-release Preparation -~~~~~~~~~~~~~~~~~~~~~~~ +The key principle is **tag locally, build, verify, then push**. This avoids force pushes if something is wrong with the built distribution. + +Patch Release (bug fixes only) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Use this when only maintained code has changed (no client regeneration, no new NiFi version). + +1. **Update Release Notes**: + + Update ``docs/history.rst`` — move the ``Unreleased`` section to a dated version heading. + +2. **Commit Release Preparation**: + + .. code-block:: shell + + git add docs/history.rst + git commit -S -m "Prepare release X.Y.Z: brief summary" + +3. **Tag Locally** (do NOT push yet): + + .. code-block:: shell + + git tag -a -s vX.Y.Z -m "Release X.Y.Z" + +4. **Build and Verify**: + + .. code-block:: shell + + # Clean build artifacts (preserves generated clients) + make clean && make dist + + # Verify clean version string (no .devN+gHASH suffix) + ls dist/ + # Should show: nipyapi-X.Y.Z-py2.py3-none-any.whl and nipyapi-X.Y.Z.tar.gz + + .. warning:: + + Do NOT use ``make clean-all`` for patch releases — it removes generated API clients + (``nipyapi/nifi/``, ``nipyapi/registry/``) and would require ``make gen-clients`` to restore them. + +5. **Push to GitHub** (triggers CI validation): + + .. code-block:: shell + + git push origin main && git push --tags + +6. **Wait for CI to pass** before publishing: + + .. code-block:: shell + + gh run list --limit 3 + # Or check GitHub Actions UI + +7. **Publish to PyPI** (only after CI is green): + + .. code-block:: shell + + twine upload dist/* + +Full Release (new features, client regeneration) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Use this for minor/major releases, especially when generated clients have changed. 1. **Update Release Notes**: @@ -237,39 +298,45 @@ Pre-release Preparation .. code-block:: shell git add docs/history.rst - git commit -S -m "Prepare release: update history and documentation" + git commit -S -m "Prepare release X.Y.Z: brief summary" -Build and Quality Assurance -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +4. **Tag Locally** (do NOT push yet): -.. code-block:: shell + .. code-block:: shell - # Build fresh distributions for release (rebuild-all already validated them) - make clean-all - make dist + git tag -a -s vX.Y.Z -m "Release X.Y.Z" -Create Release -~~~~~~~~~~~~~~ +5. **Build and Verify**: -.. code-block:: shell + .. code-block:: shell - # Tag the release (triggers version detection via setuptools-scm) - git tag -a -s v1.0.0 -m "Release 1.0.0" + # Clean build artifacts and rebuild with the tagged version + make clean && make dist - # Push commit and tags to GitHub (triggers CI validation) - git push origin main - git push --tags + # Verify clean version string (no .devN+gHASH suffix) + ls dist/ + # Should show: nipyapi-X.Y.Z-py2.py3-none-any.whl and nipyapi-X.Y.Z.tar.gz -Publish to PyPI -~~~~~~~~~~~~~~~ +6. **Push to GitHub** (triggers CI validation): -.. code-block:: shell + .. code-block:: shell + + git push origin main && git push --tags + +7. **Wait for CI to pass** before publishing: + + .. code-block:: shell + + gh run list --limit 3 + +8. **Publish to PyPI** (only after CI is green): + + .. code-block:: shell - # Upload to PyPI (requires PyPI API token configured) - twine upload dist/* + twine upload dist/* - # Alternative: Upload to TestPyPI first for validation - # twine upload --repository testpypi dist/* + # Alternative: Upload to TestPyPI first for validation + # twine upload --repository testpypi dist/* Post-release Verification ~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -292,3 +359,4 @@ Version Management Notes - **Development Versions**: Commits after tags get ``.devN+gHASH`` suffix automatically - **Release Versions**: Clean git tags (e.g., ``v1.0.0``) produce clean versions (``1.0.0``) - **Pre-releases**: Use tag patterns like ``v1.0.0rc1`` for release candidates +- **If the build is wrong**: Delete the local tag (``git tag -d vX.Y.Z``), fix the issue, re-tag, and retry. No force push needed since nothing was pushed yet. diff --git a/docs/history.rst b/docs/history.rst index e14f9d4d..93e88110 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -2,6 +2,29 @@ History ======= +1.6.0 (2026-06-21) +------------------- + +| Port status visibility and consistent component recursion for CI functions + +**Bug Fixes** + +- **get_status()**: Processor counts (``running_processors``, ``invalid_processors``, etc.) are now derived from actual processor enumeration rather than the Process Group's aggregate component counts. Previously these counts included all component types (ports, etc.), so invalid output ports were mis-reported as invalid processors. +- **verify_config()**: Controller service verification now recurses into descendant Process Groups (``descendants=True``), consistent with processor and port verification. Ancestor-inherited services remain excluded. Previously controllers in child PGs were not verified, so a broken controller in a sub-flow could pass CI. + +**CI Module** + +- **get_status()**: Now reports input and output port counts: ``total_input_ports``, ``running_input_ports``, ``stopped_input_ports``, ``invalid_input_ports`` and the equivalent ``_output_ports`` fields. Invalid ports (e.g. an output port with no outgoing connection) are a real operational problem that was previously invisible to status checks. These are additive fields; existing output is unchanged. +- **verify_config()**: New ``verify_ports`` parameter (default ``True``) adds input/output port validation. Ports with validation errors now appear in a new ``port_results`` key and cause verification to fail. + +**Canvas Module** + +- **list_invalid_ports()**: New function, parallel to ``list_invalid_processors``, returning input/output ports with validation errors across a Process Group and its descendants. + +**Documentation** + +- **Release process**: Reworked the developer release guide (``docs/devnotes.rst``) into separate Patch and Full release workflows, following a "tag locally, build, verify, then push" principle to avoid force pushes when a built distribution is wrong. + 1.5.1 (2026-06-02) ------------------- diff --git a/nipyapi/canvas.py b/nipyapi/canvas.py index 4d70ab92..4c42d422 100644 --- a/nipyapi/canvas.py +++ b/nipyapi/canvas.py @@ -273,6 +273,38 @@ def list_invalid_processors(pg_id="root", summary=False): return out +def list_invalid_ports(pg_id="root", summary=False): + """ + Returns a flattened list of all Ports with Invalid Statuses + + Args: + pg_id (str): The UUID of the Process Group to start from, defaults to + the Canvas root + summary (bool): True to return just the list of relevant + properties per Port, False for the full listing + + Returns: + list[PortEntity] + """ + assert isinstance(pg_id, str), "pg_id should be a string" + assert isinstance(summary, bool) + all_ports = list_all_input_ports(pg_id) + list_all_output_ports(pg_id) + port_list = [x for x in all_ports if x.component.validation_errors] + if summary: + out = [ + { + "id": x.id, + "name": x.component.name, + "type": x.component.type, + "summary": x.component.validation_errors, + } + for x in port_list + ] + else: + out = port_list + return out + + def list_sensitive_processors(pg_id="root", summary=False): """ Returns a flattened list of all Processors on the canvas which have diff --git a/nipyapi/ci/get_status.py b/nipyapi/ci/get_status.py index 84522bab..59e06740 100644 --- a/nipyapi/ci/get_status.py +++ b/nipyapi/ci/get_status.py @@ -26,6 +26,7 @@ def get_status( # pylint: disable=too-many-locals,too-many-branches,too-many-st dict with status information including: - process_group_id, process_group_name, state, is_root - Processor counts (total, running, stopped, invalid, disabled) + - Port counts (total, running, stopped, invalid for input and output) - Controller counts (total, enabled, disabled) - Version control info (versioned, version_id, version_state, etc.) - Parameter context info @@ -63,27 +64,63 @@ def get_status( # pylint: disable=too-many-locals,too-many-branches,too-many-st "is_root": str(is_root).lower(), } - # Processor counts - running = pg.running_count or 0 - stopped = pg.stopped_count or 0 - invalid = pg.invalid_count or 0 - disabled = pg.disabled_count or 0 + # Processor counts (enumerated for accuracy — pg.*_count includes all component types) + processors = nipyapi.canvas.list_all_processors(process_group_id) - if running > 0: + def _proc_status(p): + return (p.status.run_status or "").upper() if p.status else "" + + proc_running = sum(1 for p in processors if _proc_status(p) == "RUNNING") + proc_stopped = sum(1 for p in processors if _proc_status(p) == "STOPPED") + proc_invalid = sum(1 for p in processors if _proc_status(p) == "INVALID") + proc_disabled = sum(1 for p in processors if _proc_status(p) == "DISABLED") + + # State reflects processor activity only (processor-centric: ports and + # controllers do not influence the PG state field) + if proc_running > 0: state = "RUNNING" - elif stopped > 0: + elif proc_stopped > 0: state = "STOPPED" else: state = "EMPTY" result["state"] = state - result["total_processors"] = str(running + stopped + invalid + disabled) - result["running_processors"] = str(running) - result["stopped_processors"] = str(stopped) - result["invalid_processors"] = str(invalid) - result["disabled_processors"] = str(disabled) - - log.debug("State: %s (%d running, %d stopped)", state, running, stopped) + result["total_processors"] = str(len(processors)) + result["running_processors"] = str(proc_running) + result["stopped_processors"] = str(proc_stopped) + result["invalid_processors"] = str(proc_invalid) + result["disabled_processors"] = str(proc_disabled) + + log.debug("State: %s (%d running, %d stopped)", state, proc_running, proc_stopped) + + # Port counts (enumerated from all descendant PGs) + input_ports = nipyapi.canvas.list_all_input_ports(process_group_id) + output_ports = nipyapi.canvas.list_all_output_ports(process_group_id) + + def _port_run_status(p): + return (p.status.run_status or "").upper() if p.status else "" + + result["total_input_ports"] = str(len(input_ports)) + result["running_input_ports"] = str( + sum(1 for p in input_ports if _port_run_status(p) == "RUNNING") + ) + result["stopped_input_ports"] = str( + sum(1 for p in input_ports if _port_run_status(p) == "STOPPED") + ) + result["invalid_input_ports"] = str( + sum(1 for p in input_ports if _port_run_status(p) == "INVALID") + ) + + result["total_output_ports"] = str(len(output_ports)) + result["running_output_ports"] = str( + sum(1 for p in output_ports if _port_run_status(p) == "RUNNING") + ) + result["stopped_output_ports"] = str( + sum(1 for p in output_ports if _port_run_status(p) == "STOPPED") + ) + result["invalid_output_ports"] = str( + sum(1 for p in output_ports if _port_run_status(p) == "INVALID") + ) # Queue stats, active threads, and throughput if pg.status and pg.status.aggregate_snapshot: diff --git a/nipyapi/ci/verify_config.py b/nipyapi/ci/verify_config.py index ccbdb455..47157260 100644 --- a/nipyapi/ci/verify_config.py +++ b/nipyapi/ci/verify_config.py @@ -80,15 +80,14 @@ def _verify_single_processor(processor) -> Dict[str, Any]: def _verify_controllers(process_group_id: str) -> List[Dict[str, Any]]: - """Verify controller services owned by ``process_group_id``. + """Verify controller services in ``process_group_id`` and descendants. - Scoped strictly to the given Process Group: excludes controller services - inherited from ancestor/parent PGs and does not recurse into descendants. - This keeps CI verification verdicts dependent only on the flow being - deployed. + Includes controllers owned by the target PG and all descendant PGs. + Excludes controllers inherited from ancestor/parent PGs so a broken + controller on a sibling/parent flow cannot fail this flow's CI. """ controllers = nipyapi.canvas.list_all_controllers( - process_group_id, descendants=False, include_ancestors=False + process_group_id, descendants=True, include_ancestors=False ) log.debug("Found %d controller services in PG", len(controllers)) return [_verify_single_controller(c) for c in controllers] @@ -101,10 +100,32 @@ def _verify_processors(process_group_id: str) -> List[Dict[str, Any]]: return [_verify_single_processor(p) for p in processors] +def _verify_ports(process_group_id: str) -> List[Dict[str, Any]]: + """Verify all ports in a process group and its descendants.""" + all_ports = nipyapi.canvas.list_all_input_ports( + process_group_id + ) + nipyapi.canvas.list_all_output_ports(process_group_id) + log.debug("Found %d ports in PG and descendants", len(all_ports)) + results = [] + for port in all_ports: + base_result = { + "id": port.id, + "name": port.component.name, + "type": port.component.type, + } + errors = port.component.validation_errors + if errors: + results.append({**base_result, "success": False, "failures": errors}) + else: + results.append({**base_result, "success": True, "failures": []}) + return results + + def verify_config( process_group_id: Optional[str] = None, verify_controllers: bool = True, verify_processors: bool = True, + verify_ports: bool = True, only_failures: bool = False, ) -> dict: """ @@ -113,28 +134,30 @@ def verify_config( Validates that all required properties are set and property values meet their defined constraints. Does NOT test actual connectivity or credentials. Designed for CI/CD pipelines to catch configuration errors before starting - a flow. Verifies controller services and processors that are in a - stopped/disabled state. + a flow. Verifies controller services, processors, and ports that are in a + stopped/disabled/invalid state. Scope: - - Controller services: only those owned by ``process_group_id`` - (ancestor-inherited services are intentionally excluded so a broken - controller on a sibling/parent flow cannot fail this flow's CI). + - Controller services: ``process_group_id`` and all descendant PGs + (ancestor-inherited services are excluded so a broken controller on a + parent flow cannot fail this flow's CI). - Processors: ``process_group_id`` and all descendant Process Groups. + - Ports: ``process_group_id`` and all descendant Process Groups. Args: process_group_id: ID of the process group. Env: NIFI_PROCESS_GROUP_ID verify_controllers: Verify controller services (default: True) verify_processors: Verify processors (default: True) + verify_ports: Verify input/output ports (default: True) only_failures: Only include failed components in results (default: False). When True, controller_results and processor_results contain only items with success=False. Reduces output size for large process groups. Returns: dict with keys: verified ("true"/"false"), failed_count, - controller_results, processor_results, summary, and process_group_name. - When only_failures=True, also includes controllers_checked and - processors_checked counts. + controller_results, processor_results, port_results, summary, and + process_group_name. When only_failures=True, also includes + controllers_checked, processors_checked, and ports_checked counts. Caller should check verified or failed_count to determine next steps. Raises: @@ -169,7 +192,8 @@ def verify_config( # Verify components controller_results = _verify_controllers(process_group_id) if verify_controllers else [] processor_results = _verify_processors(process_group_id) if verify_processors else [] - all_results = controller_results + processor_results + port_results = _verify_ports(process_group_id) if verify_ports else [] + all_results = controller_results + processor_results + port_results # Count and log failures (results with success=False, excluding skipped) failed_count = sum(1 for r in all_results if r.get("success") is False) @@ -200,6 +224,11 @@ def verify_config( if only_failures else processor_results ), + "port_results": ( + [r for r in port_results if r.get("success") is False] + if only_failures + else port_results + ), "summary": summary, "process_group_name": process_group.component.name, } @@ -208,6 +237,7 @@ def verify_config( if only_failures: result["controllers_checked"] = len(controller_results) result["processors_checked"] = len(processor_results) + result["ports_checked"] = len(port_results) # Add error key for CLI exit code detection when verification fails if failed_count > 0: diff --git a/tests/conftest.py b/tests/conftest.py index f07de49c..72ec301a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -48,6 +48,7 @@ def _flag(name: str, default: bool = False) -> bool: test_another_pg_name = test_basename + "_AnotherProcessGroup" test_registry_client_name = test_basename + "_reg_client" test_processor_name = test_basename + "_proc" +test_port_name = test_basename + "_port" test_bucket_name = test_basename + "_bucket" test_versioned_flow_name = test_basename + "_ver_flow" test_cloned_ver_flow_name = test_basename + '_cloned_ver_flow' @@ -459,6 +460,28 @@ def generate(self, parent_pg=None, suffix='', kind=None, config=None): return Dummy() +@pytest.fixture(name='fix_port') +def fixture_port(request): + class Dummy: + def __init__(self): + pass + + def generate(self, parent_pg=None, suffix='', port_type='OUTPUT_PORT', state='STOPPED'): + if parent_pg is None: + target_pg_id = nipyapi.canvas.get_root_pg_id() + else: + target_pg_id = parent_pg.id + return nipyapi.canvas.create_port( + pg_id=target_pg_id, + port_type=port_type, + name=test_port_name + suffix, + state=state, + ) + + request.addfinalizer(remove_test_ports) + return Dummy() + + @pytest.fixture(name='fix_context') def fixture_context(request): class Dummy: diff --git a/tests/test_canvas.py b/tests/test_canvas.py index 7e8d5452..9cd0100d 100644 --- a/tests/test_canvas.py +++ b/tests/test_canvas.py @@ -586,6 +586,26 @@ def test_list_invalid_processors(): pass +def test_list_invalid_ports(fix_pg, fix_port): + """Test list_invalid_ports returns ports with validation errors (R3).""" + f_pg = fix_pg.generate() + # Port with no connection — NiFi marks it invalid + f_port = fix_port.generate(parent_pg=f_pg) + + # Full entity list + invalid = nipyapi.canvas.list_invalid_ports(pg_id=f_pg.id) + assert len(invalid) == 1 + assert invalid[0].id == f_port.id + assert invalid[0].component.validation_errors + + # Summary mode + summary = nipyapi.canvas.list_invalid_ports(pg_id=f_pg.id, summary=True) + assert len(summary) == 1 + assert summary[0]["id"] == f_port.id + assert summary[0]["type"] == "OUTPUT_PORT" + assert len(summary[0]["summary"]) > 0 + + def test_list_sensitive_processors(): # TODO: write test for new feature pass diff --git a/tests/test_ci.py b/tests/test_ci.py index 36ff78fa..a3753e93 100644 --- a/tests/test_ci.py +++ b/tests/test_ci.py @@ -297,6 +297,13 @@ def test_get_status_root(fix_pg): assert "total_processors" in result assert "running_processors" in result assert "stopped_processors" in result + # Port fields (R2, R7) + assert "total_input_ports" in result + assert "total_output_ports" in result + assert "running_input_ports" in result + assert "invalid_input_ports" in result + assert "running_output_ports" in result + assert "invalid_output_ports" in result def test_get_status_specific_pg(fix_pg): @@ -1734,9 +1741,11 @@ def test_process_group_id_from_env(self): with patch("nipyapi.canvas.get_process_group", return_value=mock_pg): with patch("nipyapi.canvas.list_all_controllers", return_value=[]): with patch("nipyapi.canvas.list_all_processors", return_value=[]): - result = ci.verify_config() - assert result["verified"] == "true" - assert result["process_group_name"] == "TestPG" + with patch("nipyapi.canvas.list_all_input_ports", return_value=[]): + with patch("nipyapi.canvas.list_all_output_ports", return_value=[]): + result = ci.verify_config() + assert result["verified"] == "true" + assert result["process_group_name"] == "TestPG" def test_verify_config_empty_pg(fix_pg): @@ -1840,6 +1849,51 @@ def test_verify_config_ignores_ancestor_controllers(fix_pg, fix_cont): assert result["process_group_name"] == child.component.name +def test_verify_config_includes_descendant_controllers(fix_pg, fix_cont): + """verify_config must recurse into descendant PGs for controllers (R5).""" + parent = fix_pg.generate() + child = fix_pg.generate(parent_pg=parent) + f_c_descendant = fix_cont(parent_pg=child) + + result = ci.verify_config(process_group_id=parent.id) + + # The descendant PG's controller service must appear in the results + result_ids = {r["id"] for r in result["controller_results"]} + assert f_c_descendant.id in result_ids + + +def test_verify_config_with_invalid_port(fix_pg, fix_port): + """Test verify_config detects invalid ports (R4).""" + f_pg = fix_pg.generate() + # Port with no connection — NiFi marks it invalid + f_port = fix_port.generate(parent_pg=f_pg) + + result = ci.verify_config(process_group_id=f_pg.id) + + assert result["verified"] == "false" + assert result["failed_count"] > 0 + assert "port_results" in result + failed_ports = [r for r in result["port_results"] if r.get("success") is False] + assert len(failed_ports) == 1 + assert failed_ports[0]["id"] == f_port.id + assert len(failed_ports[0]["failures"]) > 0 + + +def test_get_status_port_counts(fix_pg, fix_port): + """Test get_status reports accurate port counts (R1, R2).""" + f_pg = fix_pg.generate() + # Stopped output port (invalid due to no connection) + fix_port.generate(parent_pg=f_pg) + + result = ci.get_status(process_group_id=f_pg.id) + + assert result["total_output_ports"] == "1" + assert result["invalid_output_ports"] == "1" + assert result["total_input_ports"] == "0" + # Processors should be accurate (not conflated with ports) + assert result["invalid_processors"] == "0" + + # ============================================================================= # Export Parameters Tests # =============================================================================