diff --git a/server/secops/secops_mcp/tools/security_rules.py b/server/secops/secops_mcp/tools/security_rules.py index c3ffe23..31d2f1c 100644 --- a/server/secops/secops_mcp/tools/security_rules.py +++ b/server/secops/secops_mcp/tools/security_rules.py @@ -514,6 +514,121 @@ async def create_rule( return f"Error creating rule: {str(e)}" +@server.tool() +async def update_rule( + rule_id: str, + rule_text: str, + project_id: Optional[str] = None, + customer_id: Optional[str] = None, + region: Optional[str] = None, +) -> str: + """Update an existing detection rule in Chronicle SIEM. + + Replaces the text of an existing YARA-L 2.0 detection rule in place. Chronicle + preserves the full version history of the rule, so the previous version remains + accessible for audit purposes. The rule's deployment state (enabled/disabled, + alerting on/off) is not affected by this operation. + + Use this tool instead of `create_rule` when iterating on an existing rule, to + avoid accumulating orphaned rule versions that must be manually cleaned up. + + **Workflow Integration:** + - Preferred over `create_rule` for rule edits — same rule ID, same deployment state. + - Use after `test_rule` confirms the revised rule text behaves as expected. + - Follows the same development lifecycle as `create_rule`: edit → test → update → monitor. + + **Use Cases:** + - Fix a false-positive condition in a deployed rule without disabling it first. + - Tighten detection logic based on alert review findings. + - Add new event types or conditions to an existing rule. + - Update rule metadata (description, severity, MITRE mappings) without changing detection logic. + - Apply YARA-L syntax corrections identified during rule validation. + + **Rule Update Best Practices:** + - Validate the updated rule text with `validate_rule` or `test_rule` before applying. + - Retrieve the current rule text with `get_detection_rule` to use as a baseline. + - Supply the complete rule text — the API replaces the entire rule body, not a diff. + - Update the `last_modified` date in the rule's metadata section to reflect the change. + + Args: + rule_id (str): Unique ID of the rule to update, in the format "ru_". + Obtain this from `list_security_rules` or `get_detection_rule`. + rule_text (str): Complete, updated YARA-L 2.0 rule definition. This replaces + the existing rule text in full. + project_id (str): Google Cloud project ID (required). + customer_id (str): Chronicle customer ID (required). + region (str): Chronicle region (e.g., "us", "europe") (required). + + Returns: + str: Success message with the rule ID, rule name, and new version information. + Returns an error message if the update fails. + + Example Usage: + update_rule( + rule_id="ru_12345678-1234-1234-1234-123456789012", + rule_text=''' + rule suspicious_powershell_download { + meta: + description = "Detects PowerShell downloading files" + author = "Security Team" + severity = "High" + yara_version = "YL2.0" + rule_version = "1.1" + mitre_attack_tactic = "TA0011" + mitre_attack_technique = "T1059.001" + events: + $process.metadata.event_type = "PROCESS_LAUNCH" + $process.principal.process.command_line = /powershell.*downloadfile/i + $process.principal.hostname != "" + condition: + $process + } + ''', + project_id="my-project", + customer_id="my-customer", + region="us" + ) + + Next Steps (using MCP-enabled tools): + - Verify the update with `get_detection_rule` to confirm the new rule text is live. + - Run `test_rule` with the same rule text to validate detection behavior post-update. + - Monitor alerts with `get_security_alerts` to confirm the rule is performing as expected. + """ + try: + logger.info(f"Updating detection rule {rule_id}") + + chronicle = get_chronicle_client(project_id, customer_id, region) + + rule = chronicle.update_rule(rule_id, rule_text) + + # Chronicle returns the versioned resource name: .../rules/ru_@v__ + rule_id_out = rule.get("name", "").split("/")[-1] + + result = "Successfully updated detection rule.\n" + result += f"Rule ID: {rule_id}\n" + if "@" in rule_id_out: + result += f"New version: {rule_id_out}\n" + + for line in rule_text.strip().split("\n"): + if line.strip().startswith("rule "): + rule_name = ( + line.strip().replace("rule ", "").replace(" {", "").strip() + ) + result += f"Rule Name: {rule_name}\n" + break + + result += ( + "Rule updated successfully. Deployment state (enabled/alerting) is unchanged. " + "Use test_rule to validate the updated rule before relying on new alerts." + ) + + return result + + except Exception as e: + logger.error(f"Error updating rule {rule_id}: {str(e)}", exc_info=True) + return f"Error updating rule: {str(e)}" + + @server.tool() async def test_rule( rule_text: str, diff --git a/server/secops/tests/test_secops_rules_unit.py b/server/secops/tests/test_secops_rules_unit.py new file mode 100644 index 0000000..25eb9af --- /dev/null +++ b/server/secops/tests/test_secops_rules_unit.py @@ -0,0 +1,263 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for security rule management tools.""" + +import sys +import os +import pytest +from unittest.mock import MagicMock, patch + +# Ensure server/secops is in path to import secops_mcp +current_dir = os.path.dirname(os.path.abspath(__file__)) +server_secops_dir = os.path.dirname(current_dir) +if server_secops_dir not in sys.path: + sys.path.append(server_secops_dir) + +# Mock secops if not installed (for unit testing without dependencies) +try: + import secops +except ImportError: + mock_secops = MagicMock() + sys.modules["secops"] = mock_secops + sys.modules["secops.chronicle"] = MagicMock() + sys.modules["secops.exceptions"] = MagicMock() + +# Mock mcp if not installed +try: + import mcp +except ImportError: + mock_mcp = MagicMock() + sys.modules["mcp"] = mock_mcp + sys.modules["mcp.server"] = MagicMock() + sys.modules["mcp.server.fastmcp"] = MagicMock() + + def tool_decorator(*args, **kwargs): + def wrapper(func): + return func + return wrapper + + mock_fastmcp_instance = MagicMock() + mock_fastmcp_instance.tool.side_effect = tool_decorator + sys.modules["mcp.server.fastmcp"].FastMCP.return_value = mock_fastmcp_instance + +from secops_mcp.tools.security_rules import create_rule, update_rule + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +SAMPLE_RULE_TEXT = """rule suspicious_powershell_download { + meta: + description = "Detects PowerShell downloading files" + author = "Security Team" + severity = "Medium" + events: + $process.metadata.event_type = "PROCESS_LAUNCH" + $process.principal.process.command_line = /powershell.*downloadfile/i + $process.principal.hostname != "" + condition: + $process +}""" + +SAMPLE_RULE_ID = "ru_12345678-1234-1234-1234-123456789012" + +VERSIONED_RULE_NAME = ( + "projects/my-project/locations/us/instances/my-customer" + f"/rules/{SAMPLE_RULE_ID}@v_1234567890_123456789" +) + + +@pytest.fixture +def mock_chronicle_client(): + client = MagicMock() + client.update_rule.return_value = {"name": VERSIONED_RULE_NAME} + client.create_rule.return_value = { + "name": f"projects/my-project/locations/us/instances/my-customer/rules/{SAMPLE_RULE_ID}" + } + return client + + +@pytest.fixture +def mock_get_client(mock_chronicle_client): + with patch( + "secops_mcp.tools.security_rules.get_chronicle_client", + return_value=mock_chronicle_client, + ): + yield mock_chronicle_client + + +# --------------------------------------------------------------------------- +# update_rule tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_update_rule_success(mock_get_client): + """update_rule returns a success message containing the rule ID and name.""" + result = await update_rule( + rule_id=SAMPLE_RULE_ID, + rule_text=SAMPLE_RULE_TEXT, + project_id="my-project", + customer_id="my-customer", + region="us", + ) + + assert isinstance(result, str) + assert SAMPLE_RULE_ID in result + assert "suspicious_powershell_download" in result + assert "successfully" in result.lower() + + +@pytest.mark.asyncio +async def test_update_rule_calls_chronicle_client(mock_get_client): + """update_rule passes the correct rule_id and rule_text to the Chronicle client.""" + await update_rule( + rule_id=SAMPLE_RULE_ID, + rule_text=SAMPLE_RULE_TEXT, + project_id="my-project", + customer_id="my-customer", + region="us", + ) + + mock_get_client.update_rule.assert_called_once_with(SAMPLE_RULE_ID, SAMPLE_RULE_TEXT) + + +@pytest.mark.asyncio +async def test_update_rule_passes_connection_params(): + """update_rule forwards project_id, customer_id, and region to get_chronicle_client.""" + mock_client = MagicMock() + mock_client.update_rule.return_value = {"name": VERSIONED_RULE_NAME} + + with patch( + "secops_mcp.tools.security_rules.get_chronicle_client", + return_value=mock_client, + ) as mock_get_client_fn: + await update_rule( + rule_id=SAMPLE_RULE_ID, + rule_text=SAMPLE_RULE_TEXT, + project_id="proj-123", + customer_id="cust-456", + region="europe", + ) + + mock_get_client_fn.assert_called_once_with("proj-123", "cust-456", "europe") + + +@pytest.mark.asyncio +async def test_update_rule_includes_version_info(mock_get_client): + """update_rule surfaces the new version string when the API returns it.""" + result = await update_rule( + rule_id=SAMPLE_RULE_ID, + rule_text=SAMPLE_RULE_TEXT, + project_id="my-project", + customer_id="my-customer", + region="us", + ) + + # The versioned ID (@v_...) should appear in the output. + assert f"{SAMPLE_RULE_ID}@v_" in result + + +@pytest.mark.asyncio +async def test_update_rule_no_version_in_response(mock_get_client): + """update_rule handles a response without a version suffix gracefully.""" + mock_get_client.update_rule.return_value = { + "name": f"projects/my-project/locations/us/instances/my-customer/rules/{SAMPLE_RULE_ID}" + } + + result = await update_rule( + rule_id=SAMPLE_RULE_ID, + rule_text=SAMPLE_RULE_TEXT, + project_id="my-project", + customer_id="my-customer", + region="us", + ) + + assert isinstance(result, str) + assert "successfully" in result.lower() + # No version line emitted — the output should not contain "@v_". + assert "@v_" not in result + + +@pytest.mark.asyncio +async def test_update_rule_extracts_rule_name(mock_get_client): + """update_rule correctly parses the rule name from the rule text header.""" + rule_text = "rule detect_lateral_movement {\n condition: true\n}" + + result = await update_rule( + rule_id=SAMPLE_RULE_ID, + rule_text=rule_text, + project_id="my-project", + customer_id="my-customer", + region="us", + ) + + assert "detect_lateral_movement" in result + + +@pytest.mark.asyncio +async def test_update_rule_api_error(mock_get_client): + """update_rule returns an error message when the Chronicle API raises an exception.""" + mock_get_client.update_rule.side_effect = Exception("API request failed: 404 Not Found") + + result = await update_rule( + rule_id=SAMPLE_RULE_ID, + rule_text=SAMPLE_RULE_TEXT, + project_id="my-project", + customer_id="my-customer", + region="us", + ) + + assert isinstance(result, str) + assert "Error" in result + assert "API request failed" in result + + +@pytest.mark.asyncio +async def test_update_rule_empty_name_response(mock_get_client): + """update_rule handles an empty name field in the API response without crashing.""" + mock_get_client.update_rule.return_value = {} + + result = await update_rule( + rule_id=SAMPLE_RULE_ID, + rule_text=SAMPLE_RULE_TEXT, + project_id="my-project", + customer_id="my-customer", + region="us", + ) + + assert isinstance(result, str) + assert SAMPLE_RULE_ID in result + + +# --------------------------------------------------------------------------- +# create_rule smoke test (establishes baseline parity with update_rule) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_create_rule_success(mock_get_client): + """create_rule returns a success message with the rule ID and name.""" + result = await create_rule( + rule_text=SAMPLE_RULE_TEXT, + project_id="my-project", + customer_id="my-customer", + region="us", + ) + + assert isinstance(result, str) + assert SAMPLE_RULE_ID in result + assert "suspicious_powershell_download" in result + assert "successfully" in result.lower()