diff --git a/MANIFEST.in b/MANIFEST.in index fc35676..5624db9 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,7 +1,7 @@ include LICENSE include README.md include pyproject.toml -recursive-include src/iac_code *.yml *.yaml *.json *.md *.mo *.po +recursive-include src/iac_code *.yml *.yaml *.json *.md *.rego *.mo *.po prune tests prune htmlcov diff --git a/pyproject.toml b/pyproject.toml index 1173f8b..9ec4e8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,6 +91,7 @@ version = {attr = "iac_code.__version__"} "**/*.yaml", "**/*.json", "**/*.md", + "**/*.rego", "**/*.mo", "**/*.po", ] diff --git a/src/iac_code/skills/auto_trigger.py b/src/iac_code/skills/auto_trigger.py index ecbe981..feda2cb 100644 --- a/src/iac_code/skills/auto_trigger.py +++ b/src/iac_code/skills/auto_trigger.py @@ -45,13 +45,17 @@ def find_auto_triggered_skills( return [] matches: list[PromptCommand] = [] + suppressed_skill_names: set[str] = set() context_messages = context_messages or [] for command in skills: skill = command.skill - if skill is None or command.name in loaded_skill_names: + if skill is None: continue - if context_has_skill_tag(context_messages, command.name): + is_loaded = command.name in loaded_skill_names + if not is_loaded and context_has_skill_tag(context_messages, command.name): loaded_skill_names.add(command.name) + is_loaded = True + if is_loaded and not skill.auto_trigger.get("supersedes"): continue script = skill.auto_trigger.get("script") if not script or command.source != SkillSource.BUNDLED or skill.source != SkillSource.BUNDLED: @@ -63,10 +67,15 @@ def find_auto_triggered_skills( if not callable(should_trigger): continue try: - if should_trigger(prompt): + if not should_trigger(prompt): + continue + suppressed_skill_names.update(_split_supersedes(skill.auto_trigger.get("supersedes", ""))) + if not is_loaded: matches.append(command) except Exception as exc: logger.warning("Skill auto-trigger failed for {}: {}", command.name, exc) + if suppressed_skill_names: + return [command for command in matches if command.name not in suppressed_skill_names] return matches @@ -113,3 +122,7 @@ def _load_trigger_module(skill_root: str, script: str, skill_name: str) -> Modul logger.warning("Failed to load skill auto-trigger script for {}: {}", skill_name, exc) return None return module + + +def _split_supersedes(value: str) -> set[str]: + return {name for name in value.replace(",", " ").split() if name} diff --git a/src/iac_code/skills/bundled/__init__.py b/src/iac_code/skills/bundled/__init__.py index d17678a..d5814a2 100644 --- a/src/iac_code/skills/bundled/__init__.py +++ b/src/iac_code/skills/bundled/__init__.py @@ -85,6 +85,10 @@ def init_bundled_skills() -> None: register_iac_aliyun_skill() + from iac_code.skills.bundled.pac_aliyun import register_pac_aliyun_skill + + register_pac_aliyun_skill() + class _FunctionPromptProvider: """Prompt provider that delegates to an async function.""" diff --git a/src/iac_code/skills/bundled/iac_aliyun/SKILL.md b/src/iac_code/skills/bundled/iac_aliyun/SKILL.md index 0d217dd..1a71e97 100644 --- a/src/iac_code/skills/bundled/iac_aliyun/SKILL.md +++ b/src/iac_code/skills/bundled/iac_aliyun/SKILL.md @@ -52,6 +52,10 @@ auto_trigger: ### 询价 - 查询部署的预估价格 +### Policy as Code / InfraGuard +- 用户要求“生成合规策略”“写 InfraGuard 规则”“用 Rego 检查模板”“策略校验”“策略库查询”等 PAC 工作时,改用 `pac-aliyun` skill。 +- 不在 `iac-aliyun` 内维护 InfraGuard 策略副本;`iac-aliyun` 只在 PAC 结果要求修改模板时负责 ROS/Terraform 模板改动。 + ## 参数化规则 生成模板时,以下属性**必须**定义为 Parameters(部署前通过 API 查询确定实际值): diff --git a/src/iac_code/skills/bundled/iac_aliyun/auto_trigger.py b/src/iac_code/skills/bundled/iac_aliyun/auto_trigger.py index a5cffba..6b96911 100644 --- a/src/iac_code/skills/bundled/iac_aliyun/auto_trigger.py +++ b/src/iac_code/skills/bundled/iac_aliyun/auto_trigger.py @@ -19,6 +19,16 @@ r"alicloud\s+provider", r'provider\s+"alicloud"', r'resource\s+"alicloud_', + r"\becs\b", + r"\brds\b", + r"\boss\b", + r"\bvpc\b", + r"\bslb\b", + r"\balb\b", + r"\bnlb\b", + r"安全组", + r"负载均衡", + r"云资源", ] _ES_TEMPLATE_ACTIONS = r"genera|generar|crea|crear|despliega|desplegar|explica|explicar|valida|validar|mejora|mejorar" diff --git a/src/iac_code/skills/bundled/pac_aliyun/SKILL.md b/src/iac_code/skills/bundled/pac_aliyun/SKILL.md new file mode 100644 index 0000000..68564c4 --- /dev/null +++ b/src/iac_code/skills/bundled/pac_aliyun/SKILL.md @@ -0,0 +1,65 @@ +--- +name: pac-aliyun +description: 阿里云 Alibaba Cloud Policy as Code / InfraGuard 合规策略生成、校验与策略库查询 +when_to_use: 当用户请求阿里云/Alibaba Cloud/Alicloud 的 Policy as Code、PAC、InfraGuard、Rego 合规策略生成、策略查询、策略更新或模板合规校验时,必须先调用 skill 工具加载 pac-aliyun。 +user_invocable: false +auto_trigger: + script: auto_trigger.py + supersedes: iac-aliyun +--- + +# 阿里云 PAC 技能 + +面向阿里云 ROS 模板的 Policy as Code 能力,使用 InfraGuard 进行策略查询、策略更新、模板扫描、合规策略生成与自定义 Rego 校验。 + +## PAC 边界 + +- 该技能拥有 InfraGuard、Rego、策略库、策略包、策略生成和模板合规扫描相关流程。 +- `iac-aliyun` 只负责 ROS/Terraform 模板生成、解释、参数推荐、询价、部署和资源栈操作。 +- 不在 iac-code 内维护 InfraGuard 官方策略副本;策略内容以 InfraGuard 官方工具和其策略更新机制为准。 + +## InfraGuard 懒加载 + +执行任何 PAC 后续能力前,先按 [references/infraguard-policy-generation.md](references/infraguard-policy-generation.md) 的 Lazy InfraGuard Sync 流程检查 InfraGuard 是否可用,并检查策略更新。 + +核心命令: +```bash +infraguard version +go install github.com/aliyun/infraguard/cmd/infraguard@latest +infraguard policy update +infraguard policy list +``` + +若用户只是咨询概念,可先简短回答;一旦需要生成、查询、校验或扫描策略,必须先完成懒加载检查。 + +## 常见流程 + +### 查询已有策略 + +1. 完成 Lazy InfraGuard Sync。 +2. 使用 `infraguard policy list` 查看官方可用策略。 +3. 必要时使用 `infraguard policy get ` 查看规则详情。 +4. 将策略 ID 以 `rule:aliyun:` 或 `pack:aliyun:` 的形式用于扫描。 + +### 扫描 ROS 模板 + +1. 完成 Lazy InfraGuard Sync。 +2. 确认模板文件是 ROS YAML/JSON。 +3. 使用 `infraguard scan -p ` 扫描;需要机器可读结果时加 `--format json`。 +4. 对违规结果给出资源名、属性路径、风险原因和修复建议。 + +### 生成或调整自定义策略 + +1. 完成 Lazy InfraGuard Sync。 +2. 优先查询官方策略是否已经覆盖需求。 +3. 只有官方策略无法满足时才生成自定义 Rego,并保持规则聚焦在 ROS 模板可静态证明的信息上。 +4. 写入用户指定文件或临时工作文件后,使用 `infraguard policy validate ` 校验。 +5. 若用户提供模板样例,使用该策略扫描样例模板,确认命中和不命中场景。 + +## 策略设计原则 + +- 只检查 ROS 模板中可静态读取的资源、属性、引用关系和条件。 +- 不把运行时指标、账单历史、审批记录、人工例外或账号侧状态写进 Rego。 +- 优先复用官方策略、官方策略包和 InfraGuard 的策略更新结果。 +- 自定义策略要有稳定 ID、清晰元数据、可定位的违规路径和可执行修复建议。 +- 生成策略时同步给出最小违规模板和最小合规模板,便于用户验证。 diff --git a/src/iac_code/skills/bundled/pac_aliyun/__init__.py b/src/iac_code/skills/bundled/pac_aliyun/__init__.py new file mode 100644 index 0000000..12af2ee --- /dev/null +++ b/src/iac_code/skills/bundled/pac_aliyun/__init__.py @@ -0,0 +1,20 @@ +from pathlib import Path + +from iac_code.skills.bundled import register_bundled_skill + +SKILL_DIR = Path(__file__).parent + + +def register_pac_aliyun_skill() -> None: + register_bundled_skill( + name="pac-aliyun", + description="阿里云 Alibaba Cloud Policy as Code / InfraGuard 合规策略生成、校验与策略库查询", + prompt=(SKILL_DIR / "SKILL.md").read_text(encoding="utf-8"), + when_to_use=( + "当用户请求阿里云/Alibaba Cloud/Alicloud 的 Policy as Code、PAC、InfraGuard、Rego " + "合规策略生成、策略查询、策略更新或模板合规校验时,必须先调用 skill 工具加载 pac-aliyun。" + ), + user_invocable=False, + skill_root=str(SKILL_DIR), + auto_trigger={"script": "auto_trigger.py", "supersedes": "iac-aliyun"}, + ) diff --git a/src/iac_code/skills/bundled/pac_aliyun/auto_trigger.py b/src/iac_code/skills/bundled/pac_aliyun/auto_trigger.py new file mode 100644 index 0000000..cfe2fdc --- /dev/null +++ b/src/iac_code/skills/bundled/pac_aliyun/auto_trigger.py @@ -0,0 +1,87 @@ +"""Auto-trigger rules for the bundled pac-aliyun skill.""" + +from __future__ import annotations + +import re + +ENABLE_AUTO_TRIGGER = True + +_ALIYUN_SCOPE_PATTERNS = [ + r"阿里云", + r"\baliyun\b", + r"\balicloud\b", + r"\balibaba\s+cloud\b", + r"\bros\b", + r"rostemplateformatversion", + r"aliyun::", + r"\becs\b", + r"\brds\b", + r"\boss\b", + r"\bvpc\b", + r"\bslb\b", + r"\balb\b", + r"\bnlb\b", + r"安全组", + r"云资源", +] + +_PAC_SCOPE_PATTERNS = [ + r"\binfraguard\b", + r"\brego\b", + r"\bpolicy\s+as\s+code\b", + r"\bpac\b", + r"\bpack:aliyun:", + r"\brule:aliyun:", + r"合规策略", + r"策略库", + r"策略包", +] + +_POLICY_WORKFLOW_PATTERNS = [ + r"\binfraguard\b", + r"\brego\b", + r"\bpolicy\s+as\s+code\b", + r"\bpac\b", + r"\bpack:aliyun:", + r"\brule:aliyun:", + r"\binfraguard\s+(scan|policy)\b", + r"\bpolicy\s+(list|get|update|validate)\b", + r"\b(scan|validate|check)\b.*\bcompliance\s+polic(y|ies)\b", + r"\b(generat|writ|creat)e?\b.*\bcompliance\s+polic(y|ies)\b", + r"\bcompliance\s+polic(y|ies)\b.*\b(generat|writ|creat|validat|check)e?\b", + r"合规策略", + r"策略生成", + r"生成.*策略", + r"编写.*策略", + r"写.*策略", + r"校验.*策略", + r"验证.*策略", + r"检查.*策略", + r"策略.*校验", + r"策略.*验证", + r"策略.*检查", + r"高可用.*策略", + r"成本优化.*策略", + r"合规性.*策略", + r"最佳实践.*策略", + r"可运维.*策略", + r"网络架构.*策略", + r"弹性.*策略", +] + + +def should_trigger(prompt: str) -> bool: + text = prompt.casefold() + return has_policy_workflow(text) and (has_pac_scope(text) or has_aliyun_scope(text)) + + +def has_aliyun_scope(text: str) -> bool: + return any(re.search(pattern, text, re.IGNORECASE) for pattern in _ALIYUN_SCOPE_PATTERNS) + + +def has_pac_scope(text: str) -> bool: + return any(re.search(pattern, text, re.IGNORECASE) for pattern in _PAC_SCOPE_PATTERNS) + + +def has_policy_workflow(text: str) -> bool: + return any(re.search(pattern, text, re.IGNORECASE) for pattern in _POLICY_WORKFLOW_PATTERNS) diff --git a/src/iac_code/skills/bundled/pac_aliyun/references/infraguard-policy-generation.md b/src/iac_code/skills/bundled/pac_aliyun/references/infraguard-policy-generation.md new file mode 100644 index 0000000..64c2ec2 --- /dev/null +++ b/src/iac_code/skills/bundled/pac_aliyun/references/infraguard-policy-generation.md @@ -0,0 +1,75 @@ +# InfraGuard Policy Generation + +This reference keeps PAC work aligned with InfraGuard without vendoring the InfraGuard policy catalog into iac-code. + +## Lazy InfraGuard Sync + +Run this sync before any PAC implementation, generation, validation, or catalog lookup. It is intentionally lazy: do it only when the PAC skill is triggered and the user needs InfraGuard-backed work. + +1. Check whether InfraGuard is available: + ```bash + infraguard version + ``` +2. If the command is missing and the user wants the agent to prepare the local toolchain, install the official CLI: + ```bash + go install github.com/aliyun/infraguard/cmd/infraguard@latest + ``` +3. Check for policy updates before relying on policy names or behavior: + ```bash + infraguard policy update + ``` +4. Inspect the current policy catalog from the refreshed tool: + ```bash + infraguard policy list + ``` +5. When generating or editing custom policies, validate the file: + ```bash + infraguard policy validate path/to/policy.rego + ``` + +If a command cannot run because InfraGuard or Go is not installed, explain the missing prerequisite and continue only with user-approved installation or with static guidance. + +## Policy Lookup + +- Prefer official policy IDs and packs from `infraguard policy list`. +- Use `infraguard policy get ` when the user needs details for an existing rule. +- Use policy references in scan commands as `rule:aliyun:` or `pack:aliyun:`. +- Do not infer that a previously known policy still exists; refresh first with `infraguard policy update`. + +## Template Scanning + +Use InfraGuard scan for ROS templates: + +```bash +infraguard scan template.yaml -p pack:aliyun:quick-start-compliance-pack +``` + +For automation or downstream analysis, request JSON output: + +```bash +infraguard scan template.yaml -p rule:aliyun:ecs-instance-no-public-ip --format json +``` + +Summaries should include the violating resource, property path, severity, reason, and concrete ROS template change. + +## Custom Policy Generation + +Generate custom Rego only when official policies do not cover the user requirement. Keep each rule focused on one static ROS-template assertion. + +Recommended output bundle: + +- The custom policy file. +- A minimal violating ROS template. +- A minimal passing ROS template. +- The validation command and scan commands used. + +Design constraints: + +- Read only from template input, resource definitions, properties, references, mappings, conditions, and parameters. +- Keep cloud account state, billing history, runtime metrics, and manual approval evidence outside the policy unless the user supplies them as explicit input data. +- Prefer actionable violation paths pointing to the ROS property the user should edit. +- Validate syntax with `infraguard policy validate` before presenting the policy as ready. + +## Handoff To IaC Workflows + +When a policy finding requires editing or regenerating a ROS/Terraform template, use the IaC template workflow after the PAC result is clear. Keep the PAC source of truth in InfraGuard; do not copy official policy bodies into iac-code. diff --git a/tests/skills/bundled/test_iac_skill.py b/tests/skills/bundled/test_iac_skill.py index 955a967..9df4ae4 100644 --- a/tests/skills/bundled/test_iac_skill.py +++ b/tests/skills/bundled/test_iac_skill.py @@ -2,6 +2,16 @@ from iac_code.skills.bundled import _bundled_skills, get_bundled_skills, init_bundled_skills +IAC_SKILL_ROOT = Path("src/iac_code/skills/bundled/iac_aliyun") + + +def _iac_aliyun_asset_text() -> str: + parts = [] + for path in sorted(IAC_SKILL_ROOT.rglob("*")): + if path.is_file() and path.suffix in {".md", ".py", ".rego"}: + parts.append(path.read_text(encoding="utf-8")) + return "\n".join(parts) + class TestIacSkill: def setup_method(self): @@ -38,8 +48,28 @@ def test_iac_skill_mentions_parameter_recommendation_reference(self): assert "references/template-parameter-recommendation.md" in iac_skill.content assert "已有模板参数推荐" in iac_skill.content + def test_iac_skill_delegates_infraguard_work_to_pac_skill(self): + init_bundled_skills() + skills = get_bundled_skills() + iac_skill = next(s for s in skills if s.name == "iac-aliyun") + assert "pac-aliyun" in iac_skill.content + assert "references/infraguard-policy-generation.md" not in iac_skill.content + assert "references/infraguard-policies/" not in iac_skill.content + assert "package infraguard.rules" not in iac_skill.content + assert "helpers.resources_by_type" not in iac_skill.content + + def test_iac_aliyun_assets_do_not_embed_infraguard_policy_catalog(self): + assets = _iac_aliyun_asset_text() + assert "references/infraguard-policies/" not in assets + assert "package infraguard.rules" not in assets + assert "rule_meta :=" not in assets + assert "deny contains result if" not in assets + assert "generate_infraguard_policies" not in assets + assert not (IAC_SKILL_ROOT / "references" / "infraguard-policies").exists() + assert not (IAC_SKILL_ROOT / "scripts" / "generate_infraguard_policies.py").exists() + def test_parameter_recommendation_reference_exists(self): - reference = Path("src/iac_code/skills/bundled/iac_aliyun/references/template-parameter-recommendation.md") + reference = IAC_SKILL_ROOT / "references" / "template-parameter-recommendation.md" assert reference.exists() content = reference.read_text(encoding="utf-8") assert "GetTemplateParameterConstraints" in content diff --git a/tests/skills/bundled/test_pac_aliyun_skill.py b/tests/skills/bundled/test_pac_aliyun_skill.py new file mode 100644 index 0000000..9ec5bae --- /dev/null +++ b/tests/skills/bundled/test_pac_aliyun_skill.py @@ -0,0 +1,64 @@ +from pathlib import Path + +from iac_code.skills.bundled import _bundled_skills, get_bundled_skills, init_bundled_skills + +PAC_SKILL_ROOT = Path("src/iac_code/skills/bundled/pac_aliyun") + + +def _pac_aliyun_asset_text() -> str: + parts = [] + for path in sorted(PAC_SKILL_ROOT.rglob("*")): + if path.is_file() and path.suffix in {".md", ".py", ".rego"}: + parts.append(path.read_text(encoding="utf-8")) + return "\n".join(parts) + + +class TestPacAliyunSkill: + def setup_method(self): + _bundled_skills.clear() + + def test_pac_aliyun_skill_registered(self): + init_bundled_skills() + skills = get_bundled_skills() + pac_skills = [s for s in skills if s.name == "pac-aliyun"] + assert len(pac_skills) == 1 + + def test_pac_aliyun_skill_not_user_invocable(self): + init_bundled_skills() + skills = get_bundled_skills() + pac_skill = next(s for s in skills if s.name == "pac-aliyun") + assert pac_skill.is_user_invocable is False + + def test_pac_aliyun_skill_has_auto_trigger_metadata(self): + init_bundled_skills() + skills = get_bundled_skills() + pac_skill = next(s for s in skills if s.name == "pac-aliyun") + assert pac_skill.auto_trigger == {"script": "auto_trigger.py", "supersedes": "iac-aliyun"} + + def test_pac_aliyun_skill_hosts_infraguard_policy_generation(self): + init_bundled_skills() + skills = get_bundled_skills() + pac_skill = next(s for s in skills if s.name == "pac-aliyun") + assert "InfraGuard" in pac_skill.content + assert "references/infraguard-policy-generation.md" in pac_skill.content + assert "infraguard policy update" in pac_skill.content + assert "go install github.com/aliyun/infraguard/cmd/infraguard@latest" in pac_skill.content + + def test_pac_aliyun_reference_requires_lazy_update_before_pac_work(self): + reference = PAC_SKILL_ROOT / "references" / "infraguard-policy-generation.md" + assert reference.exists() + content = reference.read_text(encoding="utf-8") + assert "Lazy InfraGuard Sync" in content + assert "Run this sync before any PAC implementation, generation, validation, or catalog lookup" in content + assert "infraguard policy update" in content + assert "infraguard policy list" in content + assert "infraguard policy validate" in content + + def test_pac_aliyun_assets_do_not_embed_infraguard_policy_catalog(self): + assets = _pac_aliyun_asset_text() + assert "package infraguard.rules" not in assets + assert "package infraguard.packs" not in assets + assert "rule_meta :=" not in assets + assert "pack_meta :=" not in assets + assert "helpers.resources_by_type" not in assets + assert not list(PAC_SKILL_ROOT.rglob("*.rego")) diff --git a/tests/skills/test_auto_trigger.py b/tests/skills/test_auto_trigger.py index 129542b..b8e649f 100644 --- a/tests/skills/test_auto_trigger.py +++ b/tests/skills/test_auto_trigger.py @@ -172,6 +172,86 @@ def test_iac_aliyun_trigger_matches_alicloud_provider_prompt(): assert should_trigger('用 provider "alicloud" 写一个 ECS 安全组模板') +def test_iac_aliyun_trigger_rejects_infraguard_policy_generation(): + from iac_code.skills.bundled.iac_aliyun.auto_trigger import should_trigger + + assert not should_trigger("生成一个 InfraGuard 合规策略,检查 ECS 不允许公网 IP") + + +@pytest.mark.parametrize( + "prompt", + [ + "用 InfraGuard 生成高可用合规策略,检查 RDS 必须多可用区", + "生成成本优化策略,检查 ECS 规格不能超过指定系列", + "写一个网络架构最佳实践策略,限制安全组不能开放全部端口", + ], +) +def test_iac_aliyun_trigger_rejects_infraguard_policy_dimensions(prompt): + from iac_code.skills.bundled.iac_aliyun.auto_trigger import should_trigger + + assert not should_trigger(prompt) + + +def test_iac_aliyun_auto_trigger_does_not_own_pac_classifier(): + source = Path("src/iac_code/skills/bundled/iac_aliyun/auto_trigger.py").read_text(encoding="utf-8") + + assert "_PAC_WORKFLOW_PATTERNS" not in source + assert "has_pac_workflow" not in source + + +def test_pac_aliyun_trigger_matches_infraguard_policy_generation(): + from iac_code.skills.bundled.pac_aliyun.auto_trigger import should_trigger + + assert should_trigger("生成一个 InfraGuard 合规策略,检查 ECS 不允许公网 IP") + + +@pytest.mark.parametrize( + "prompt", + [ + "用 InfraGuard 生成高可用合规策略,检查 RDS 必须多可用区", + "生成成本优化策略,检查 ECS 规格不能超过指定系列", + "写一个网络架构最佳实践策略,限制安全组不能开放全部端口", + "Validate this ROS template with InfraGuard policies", + ], +) +def test_pac_aliyun_trigger_matches_policy_dimensions(prompt): + from iac_code.skills.bundled.pac_aliyun.auto_trigger import should_trigger + + assert should_trigger(prompt) + + +def test_pac_aliyun_trigger_rejects_aliyun_ram_policy_template_prompt(): + from iac_code.skills.bundled.pac_aliyun.auto_trigger import should_trigger + + assert not should_trigger("Create an Alibaba Cloud ROS template for a RAM policy") + + +@pytest.mark.parametrize( + "prompt", + [ + "生成一个 InfraGuard 合规策略,检查 ECS 不允许公网 IP", + "用 Rego 检查这个阿里云 ROS 模板", + ], +) +def test_pac_prompt_auto_triggers_only_pac_skill(prompt): + from iac_code.skills.bundled import _bundled_skills, get_bundled_skills, init_bundled_skills + + _bundled_skills.clear() + init_bundled_skills() + commands = [ + PromptCommand(name=skill.name, description=skill.description, skill=skill, source=SkillSource.BUNDLED) + for skill in get_bundled_skills() + ] + + matches = find_auto_triggered_skills( + prompt, + commands, + loaded_skill_names=set(), + ) + + assert [match.name for match in matches] == ["pac-aliyun"] + + @pytest.mark.parametrize( "prompt", [