Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions server/secops/secops_mcp/tools/security_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,121 @@ async def create_rule(
return f"Error creating rule: {str(e)}"


@server.tool()
async def update_rule(
rule_id: str,
rule_text: str,
project_id: Optional[str] = None,
customer_id: Optional[str] = None,
region: Optional[str] = None,
) -> str:
"""Update an existing detection rule in Chronicle SIEM.

Replaces the text of an existing YARA-L 2.0 detection rule in place. Chronicle
preserves the full version history of the rule, so the previous version remains
accessible for audit purposes. The rule's deployment state (enabled/disabled,
alerting on/off) is not affected by this operation.

Use this tool instead of `create_rule` when iterating on an existing rule, to
avoid accumulating orphaned rule versions that must be manually cleaned up.

**Workflow Integration:**
- Preferred over `create_rule` for rule edits — same rule ID, same deployment state.
- Use after `test_rule` confirms the revised rule text behaves as expected.
- Follows the same development lifecycle as `create_rule`: edit → test → update → monitor.

**Use Cases:**
- Fix a false-positive condition in a deployed rule without disabling it first.
- Tighten detection logic based on alert review findings.
- Add new event types or conditions to an existing rule.
- Update rule metadata (description, severity, MITRE mappings) without changing detection logic.
- Apply YARA-L syntax corrections identified during rule validation.

**Rule Update Best Practices:**
- Validate the updated rule text with `validate_rule` or `test_rule` before applying.
- Retrieve the current rule text with `get_detection_rule` to use as a baseline.
- Supply the complete rule text — the API replaces the entire rule body, not a diff.
- Update the `last_modified` date in the rule's metadata section to reflect the change.

Args:
rule_id (str): Unique ID of the rule to update, in the format "ru_<UUID>".
Obtain this from `list_security_rules` or `get_detection_rule`.
rule_text (str): Complete, updated YARA-L 2.0 rule definition. This replaces
the existing rule text in full.
project_id (str): Google Cloud project ID (required).
customer_id (str): Chronicle customer ID (required).
region (str): Chronicle region (e.g., "us", "europe") (required).

Returns:
str: Success message with the rule ID, rule name, and new version information.
Returns an error message if the update fails.

Example Usage:
update_rule(
rule_id="ru_12345678-1234-1234-1234-123456789012",
rule_text='''
rule suspicious_powershell_download {
meta:
description = "Detects PowerShell downloading files"
author = "Security Team"
severity = "High"
yara_version = "YL2.0"
rule_version = "1.1"
mitre_attack_tactic = "TA0011"
mitre_attack_technique = "T1059.001"
events:
$process.metadata.event_type = "PROCESS_LAUNCH"
$process.principal.process.command_line = /powershell.*downloadfile/i
$process.principal.hostname != ""
condition:
$process
}
''',
project_id="my-project",
customer_id="my-customer",
region="us"
)

Next Steps (using MCP-enabled tools):
- Verify the update with `get_detection_rule` to confirm the new rule text is live.
- Run `test_rule` with the same rule text to validate detection behavior post-update.
- Monitor alerts with `get_security_alerts` to confirm the rule is performing as expected.
"""
try:
logger.info(f"Updating detection rule {rule_id}")

chronicle = get_chronicle_client(project_id, customer_id, region)

rule = chronicle.update_rule(rule_id, rule_text)

# Chronicle returns the versioned resource name: .../rules/ru_<UUID>@v_<sec>_<ns>
rule_id_out = rule.get("name", "").split("/")[-1]

result = "Successfully updated detection rule.\n"
result += f"Rule ID: {rule_id}\n"
if "@" in rule_id_out:
result += f"New version: {rule_id_out}\n"

for line in rule_text.strip().split("\n"):
if line.strip().startswith("rule "):
rule_name = (
line.strip().replace("rule ", "").replace(" {", "").strip()
)
result += f"Rule Name: {rule_name}\n"
break

result += (
"Rule updated successfully. Deployment state (enabled/alerting) is unchanged. "
"Use test_rule to validate the updated rule before relying on new alerts."
)

return result

except Exception as e:
logger.error(f"Error updating rule {rule_id}: {str(e)}", exc_info=True)
return f"Error updating rule: {str(e)}"


@server.tool()
async def test_rule(
rule_text: str,
Expand Down
263 changes: 263 additions & 0 deletions server/secops/tests/test_secops_rules_unit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for security rule management tools."""

import sys
import os
import pytest
from unittest.mock import MagicMock, patch

# Ensure server/secops is in path to import secops_mcp
current_dir = os.path.dirname(os.path.abspath(__file__))
server_secops_dir = os.path.dirname(current_dir)
if server_secops_dir not in sys.path:
sys.path.append(server_secops_dir)

# Mock secops if not installed (for unit testing without dependencies)
try:
import secops
except ImportError:
mock_secops = MagicMock()
sys.modules["secops"] = mock_secops
sys.modules["secops.chronicle"] = MagicMock()
sys.modules["secops.exceptions"] = MagicMock()

# Mock mcp if not installed
try:
import mcp
except ImportError:
mock_mcp = MagicMock()
sys.modules["mcp"] = mock_mcp
sys.modules["mcp.server"] = MagicMock()
sys.modules["mcp.server.fastmcp"] = MagicMock()

def tool_decorator(*args, **kwargs):
def wrapper(func):
return func
return wrapper

mock_fastmcp_instance = MagicMock()
mock_fastmcp_instance.tool.side_effect = tool_decorator
sys.modules["mcp.server.fastmcp"].FastMCP.return_value = mock_fastmcp_instance

from secops_mcp.tools.security_rules import create_rule, update_rule


# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------

SAMPLE_RULE_TEXT = """rule suspicious_powershell_download {
meta:
description = "Detects PowerShell downloading files"
author = "Security Team"
severity = "Medium"
events:
$process.metadata.event_type = "PROCESS_LAUNCH"
$process.principal.process.command_line = /powershell.*downloadfile/i
$process.principal.hostname != ""
condition:
$process
}"""

SAMPLE_RULE_ID = "ru_12345678-1234-1234-1234-123456789012"

VERSIONED_RULE_NAME = (
"projects/my-project/locations/us/instances/my-customer"
f"/rules/{SAMPLE_RULE_ID}@v_1234567890_123456789"
)


@pytest.fixture
def mock_chronicle_client():
client = MagicMock()
client.update_rule.return_value = {"name": VERSIONED_RULE_NAME}
client.create_rule.return_value = {
"name": f"projects/my-project/locations/us/instances/my-customer/rules/{SAMPLE_RULE_ID}"
}
return client


@pytest.fixture
def mock_get_client(mock_chronicle_client):
with patch(
"secops_mcp.tools.security_rules.get_chronicle_client",
return_value=mock_chronicle_client,
):
yield mock_chronicle_client


# ---------------------------------------------------------------------------
# update_rule tests
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_update_rule_success(mock_get_client):
"""update_rule returns a success message containing the rule ID and name."""
result = await update_rule(
rule_id=SAMPLE_RULE_ID,
rule_text=SAMPLE_RULE_TEXT,
project_id="my-project",
customer_id="my-customer",
region="us",
)

assert isinstance(result, str)
assert SAMPLE_RULE_ID in result
assert "suspicious_powershell_download" in result
assert "successfully" in result.lower()


@pytest.mark.asyncio
async def test_update_rule_calls_chronicle_client(mock_get_client):
"""update_rule passes the correct rule_id and rule_text to the Chronicle client."""
await update_rule(
rule_id=SAMPLE_RULE_ID,
rule_text=SAMPLE_RULE_TEXT,
project_id="my-project",
customer_id="my-customer",
region="us",
)

mock_get_client.update_rule.assert_called_once_with(SAMPLE_RULE_ID, SAMPLE_RULE_TEXT)


@pytest.mark.asyncio
async def test_update_rule_passes_connection_params():
"""update_rule forwards project_id, customer_id, and region to get_chronicle_client."""
mock_client = MagicMock()
mock_client.update_rule.return_value = {"name": VERSIONED_RULE_NAME}

with patch(
"secops_mcp.tools.security_rules.get_chronicle_client",
return_value=mock_client,
) as mock_get_client_fn:
await update_rule(
rule_id=SAMPLE_RULE_ID,
rule_text=SAMPLE_RULE_TEXT,
project_id="proj-123",
customer_id="cust-456",
region="europe",
)

mock_get_client_fn.assert_called_once_with("proj-123", "cust-456", "europe")


@pytest.mark.asyncio
async def test_update_rule_includes_version_info(mock_get_client):
"""update_rule surfaces the new version string when the API returns it."""
result = await update_rule(
rule_id=SAMPLE_RULE_ID,
rule_text=SAMPLE_RULE_TEXT,
project_id="my-project",
customer_id="my-customer",
region="us",
)

# The versioned ID (@v_...) should appear in the output.
assert f"{SAMPLE_RULE_ID}@v_" in result


@pytest.mark.asyncio
async def test_update_rule_no_version_in_response(mock_get_client):
"""update_rule handles a response without a version suffix gracefully."""
mock_get_client.update_rule.return_value = {
"name": f"projects/my-project/locations/us/instances/my-customer/rules/{SAMPLE_RULE_ID}"
}

result = await update_rule(
rule_id=SAMPLE_RULE_ID,
rule_text=SAMPLE_RULE_TEXT,
project_id="my-project",
customer_id="my-customer",
region="us",
)

assert isinstance(result, str)
assert "successfully" in result.lower()
# No version line emitted — the output should not contain "@v_".
assert "@v_" not in result


@pytest.mark.asyncio
async def test_update_rule_extracts_rule_name(mock_get_client):
"""update_rule correctly parses the rule name from the rule text header."""
rule_text = "rule detect_lateral_movement {\n condition: true\n}"

result = await update_rule(
rule_id=SAMPLE_RULE_ID,
rule_text=rule_text,
project_id="my-project",
customer_id="my-customer",
region="us",
)

assert "detect_lateral_movement" in result


@pytest.mark.asyncio
async def test_update_rule_api_error(mock_get_client):
"""update_rule returns an error message when the Chronicle API raises an exception."""
mock_get_client.update_rule.side_effect = Exception("API request failed: 404 Not Found")

result = await update_rule(
rule_id=SAMPLE_RULE_ID,
rule_text=SAMPLE_RULE_TEXT,
project_id="my-project",
customer_id="my-customer",
region="us",
)

assert isinstance(result, str)
assert "Error" in result
assert "API request failed" in result


@pytest.mark.asyncio
async def test_update_rule_empty_name_response(mock_get_client):
"""update_rule handles an empty name field in the API response without crashing."""
mock_get_client.update_rule.return_value = {}

result = await update_rule(
rule_id=SAMPLE_RULE_ID,
rule_text=SAMPLE_RULE_TEXT,
project_id="my-project",
customer_id="my-customer",
region="us",
)

assert isinstance(result, str)
assert SAMPLE_RULE_ID in result


# ---------------------------------------------------------------------------
# create_rule smoke test (establishes baseline parity with update_rule)
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_create_rule_success(mock_get_client):
"""create_rule returns a success message with the rule ID and name."""
result = await create_rule(
rule_text=SAMPLE_RULE_TEXT,
project_id="my-project",
customer_id="my-customer",
region="us",
)

assert isinstance(result, str)
assert SAMPLE_RULE_ID in result
assert "suspicious_powershell_download" in result
assert "successfully" in result.lower()