diff --git a/README.md b/README.md index b662cbc..7ecfff9 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,61 @@ sigma convert -t splunk -f savedsearches -p sysmon -o savedsearches.conf sigma/r Outputs a Splunk savedsearches.conf containing the converted searches. +#### Separate File Output + +For scenarios where you need to convert multiple rules into separate files (e.g., for version control or selective deployment), +use the `--output-dir` parameter along with `--output-filename-template`: + +``` +sigma convert -t esql -p ecs_windows --output-dir translated_rules/ rules/ +``` + +This will create a separate file for each converted rule in the `translated_rules/` directory. + +**Template Variables:** + +The `--output-filename-template` parameter accepts the following variables: + +- `{path}`: Relative source directory path (e.g., `windows` for `rules/windows/rule.yml`) +- `{stem}`: Filename without extension (e.g., `rule` for `rule.yml`) +- `{index}`: Query index for rules that generate multiple queries (empty if only one query) + +**Examples:** + +Flat output structure (all files in one directory): +``` +sigma convert -t esql -p ecs_windows --output-dir translated/ --output-filename-template "{stem}.esql" rules/ +``` + +Preserve directory structure: +``` +sigma convert -t esql -p ecs_windows --output-dir translated/ --output-filename-template "{path}/{stem}.esql" rules/ +``` + +Handle rules with multiple queries: +``` +sigma convert -t esql -p ecs_windows --output-dir translated/ --output-filename-template "{stem}-{index}.esql" rules/ +``` + +Given input structure: +``` +rules/ +├── windows/ +│ └── rule_1.yml +├── linux/ +│ └── rule_2.yml +``` + +With `--output-filename-template "{path}/{stem}.esql"`, the output will be: +``` +translated/ +├── windows/ +│ └── rule_1.esql +├── linux/ +│ └── rule_2.esql +``` + + ### Integration of Backends and Pipelines Backends and pipelines can be integrated by adding the corresponding packages as dependency with: diff --git a/sigma/cli/convert.py b/sigma/cli/convert.py index cfe4d51..fbef22f 100644 --- a/sigma/cli/convert.py +++ b/sigma/cli/convert.py @@ -1,4 +1,5 @@ import json +import os import pathlib import textwrap from typing import Sequence @@ -6,6 +7,8 @@ import click from sigma.cli.rules import load_rules, check_rule_errors +from sigma.collection import SigmaCollection +from sigma.correlations import SigmaCorrelationRule from sigma.conversion.base import Backend from sigma.exceptions import ( SigmaError, @@ -67,6 +70,171 @@ def fail(self, message: str, param, ctx): ) +def render_output_filename(template: str, rule_source_path: pathlib.Path, base_dir: pathlib.Path, index: int = None) -> pathlib.Path: + """ + Render output filename template with available variables. + + Args: + template: Template string with variables {path}, {stem}, {index} + rule_source_path: Path to the source rule file + base_dir: Base directory to calculate relative path from + index: Query index for rules that generate multiple queries (optional) + + Returns: + Path object for the output file + """ + # Calculate relative path from base directory + try: + relative_path = rule_source_path.relative_to(base_dir) + except ValueError: + # If rule_source_path is not relative to base_dir, use the rule path as-is + relative_path = rule_source_path + + # Get parent directory path (without filename) + if relative_path.parent != pathlib.Path("."): + path_component = str(relative_path.parent) + else: + path_component = "" + + # Get filename stem (without extension) + stem = rule_source_path.stem + + # Render template + rendered = template.format( + path=path_component, + stem=stem, + index=index if index is not None else "" + ) + + # Clean up any double slashes or empty path components + rendered = rendered.replace("//", "/").strip("/") + + return pathlib.Path(rendered) + + +def write_separate_files( + rule_collection: SigmaCollection, + backend: Backend, + output_dir: pathlib.Path, + filename_template: str, + format: str, + correlation_method: str, + encoding: str, + json_indent: int, + base_dir: pathlib.Path, +): + """ + Convert rules and write each to a separate file. + + Args: + rule_collection: Collection of Sigma rules to convert + backend: Backend instance for conversion + output_dir: Directory to write output files + filename_template: Template for output filenames + format: Output format + correlation_method: Correlation method + encoding: Output encoding + json_indent: JSON indentation + base_dir: Base directory to calculate relative paths from + + Raises: + click.UsageError: If the collection contains correlation rules + """ + output_dir = pathlib.Path(output_dir) + + # Check for correlation rules - they cannot be converted individually + # because they reference other rules in the collection + for rule in rule_collection.rules: + if isinstance(rule, SigmaCorrelationRule): + raise click.UsageError( + f"Cannot use --output-dir with correlation rules. " + f"Correlation rule '{rule.title}' (ID: {rule.id}) references other rules " + f"and must be converted as part of the full collection. " + f"Use --output instead to write all rules to a single file." + ) + + # Track number of files written + files_written = 0 + + # Convert each rule individually + for rule in rule_collection.rules: + # Create a single-rule collection for this rule + single_rule_collection = SigmaCollection([rule]) + + # Convert the rule + try: + result = backend.convert(single_rule_collection, format, correlation_method) + except Exception as e: + # Skip rules that can't be converted - continue with remaining rules + click.echo(f"Warning: Failed to convert {rule.source}: {e}. Skipping rule.", err=True) + continue + + # Get rule source path + if rule.source and hasattr(rule.source, 'path'): + rule_source_path = pathlib.Path(rule.source.path) + else: + # If no source path, use rule ID or title as filename + rule_source_path = pathlib.Path(f"{rule.id or rule.title}.yml") + + # Handle different result types + if isinstance(result, str): + # Single string result - write to one file + output_path = output_dir / render_output_filename(filename_template, rule_source_path, base_dir, None) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(bytes(result, encoding)) + files_written += 1 + + elif isinstance(result, bytes): + # Binary result - write to one file + output_path = output_dir / render_output_filename(filename_template, rule_source_path, base_dir, None) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(result) + files_written += 1 + + elif isinstance(result, list) and all(isinstance(item, str) for item in result): + # List of strings - write each to a separate file with index + if len(result) == 1: + # Single result, no index needed + output_path = output_dir / render_output_filename(filename_template, rule_source_path, base_dir, None) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(bytes(result[0], encoding)) + files_written += 1 + else: + # Multiple results, add index to filename + for idx, item in enumerate(result, start=1): + output_path = output_dir / render_output_filename(filename_template, rule_source_path, base_dir, idx) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(bytes(item, encoding)) + files_written += 1 + + elif isinstance(result, list) and all(isinstance(item, dict) for item in result): + # List of dicts - write each to a separate file with index as JSON + if len(result) == 1: + # Single result, no index needed + output_path = output_dir / render_output_filename(filename_template, rule_source_path, base_dir, None) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(bytes(json.dumps(result[0], indent=json_indent), encoding)) + files_written += 1 + else: + # Multiple results, add index to filename + for idx, item in enumerate(result, start=1): + output_path = output_dir / render_output_filename(filename_template, rule_source_path, base_dir, idx) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(bytes(json.dumps(item, indent=json_indent), encoding)) + files_written += 1 + + elif isinstance(result, dict): + # Dict result - write as JSON + output_path = output_dir / render_output_filename(filename_template, rule_source_path, base_dir, None) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(bytes(json.dumps(result, indent=json_indent), encoding)) + files_written += 1 + else: + click.echo(f"Warning: Backend returned unexpected format {str(type(result))} for {rule.source}. Expected str, bytes, list, or dict. Skipping rule.", err=True) + + click.echo(f"Wrote {files_written} file(s) to {output_dir}", err=True) + + @click.command() @click.option( "--target", @@ -133,7 +301,25 @@ def fail(self, message: str, param, ctx): type=click.File("wb"), default="-", show_default=True, - help="Write result to specified file. '-' writes to standard output.", + help="Write result to specified file. '-' writes to standard output. Mutually exclusive with --output-dir.", +) +@click.option( + "--output-dir", + "-od", + type=click.Path(path_type=pathlib.Path), + default=None, + help="Write individual converted rules to separate files in this directory. Mutually exclusive with --output.", +) +@click.option( + "--output-filename-template", + "-ot", + type=str, + default="{stem}.txt", + show_default=True, + help="Template for output filenames when using --output-dir. " + "Available variables: {path} (relative source path), {stem} (filename without extension), " + "{index} (query index for rules that generate multiple queries). " + "Example: '{path}/{stem}-{index}.txt' or 'converted/{stem}.esql'", ) @click.option( "--encoding", @@ -193,6 +379,8 @@ def convert( filter, skip_unsupported, output, + output_dir, + output_filename_template, encoding, json_indent, backend_option, @@ -207,6 +395,12 @@ def convert( into directories and converts all files matching the pattern in --file-pattern. """ + # Validate mutually exclusive options + if output_dir is not None and hasattr(output, 'name') and output.name != "": + raise click.UsageError( + "--output/-o and --output-dir/-od are mutually exclusive. Use --output for single file output or --output-dir for separate file outputs." + ) + # Check if pipeline is required if backends[target].requires_pipeline and pipeline == () and not without_pipeline: raise click.UsageError( @@ -314,42 +508,69 @@ def convert( try: rule_collection = load_rules(input + filter, file_pattern) check_rule_errors(rule_collection) - result = backend.convert(rule_collection, format, correlation_method) - if isinstance(result, str): # String result - click.echo(bytes(result, encoding), output) - elif isinstance(result, bytes): # Bytes result: only allow to write it to file. - if output.isatty(): - raise click.UsageError( - "Backend returns binary output. Please provide output file with --output/-o." - ) - else: - click.echo(result, output) - elif isinstance(result, list) and all( - ( # List of strings Concatenate with newlines in between. - isinstance(item, str) for item in result - ) - ): - click.echo(bytes("\n\n".join(result), encoding), output) - elif isinstance(result, list) and all( - ( # List of dicts: concatenate with newline and render each result als JSON. - isinstance(item, dict) for item in result - ) - ): - click.echo( - bytes( - "\n".join( - (json.dumps(item, indent=json_indent) for item in result) - ), - encoding, - ), - output, + + # Check if we should write to separate files + if output_dir is not None: + # Determine base directory for relative path calculation + # Use the first input path as base directory + base_dir = pathlib.Path.cwd() + if input and input[0] != pathlib.Path("-"): + first_input = pathlib.Path(input[0]) + if first_input.is_dir(): + base_dir = first_input + else: + base_dir = first_input.parent + + # Write separate files + write_separate_files( + rule_collection=rule_collection, + backend=backend, + output_dir=output_dir, + filename_template=output_filename_template, + format=format, + correlation_method=correlation_method, + encoding=encoding, + json_indent=json_indent, + base_dir=base_dir, ) - elif isinstance(result, dict): - click.echo(bytes(json.dumps(result, indent=json_indent), encoding)) else: - raise click.ClickException( - f"Backend returned unexpected format {str(type(result))}" - ) + # Original behavior: convert entire collection and write to single output + result = backend.convert(rule_collection, format, correlation_method) + if isinstance(result, str): # String result + click.echo(bytes(result, encoding), output) + elif isinstance(result, bytes): # Bytes result: only allow to write it to file. + if output.isatty(): + raise click.UsageError( + "Backend returns binary output. Please provide output file with --output/-o." + ) + else: + click.echo(result, output) + elif isinstance(result, list) and all( + ( # List of strings Concatenate with newlines in between. + isinstance(item, str) for item in result + ) + ): + click.echo(bytes("\n\n".join(result), encoding), output) + elif isinstance(result, list) and all( + ( # List of dicts: concatenate with newline and render each result als JSON. + isinstance(item, dict) for item in result + ) + ): + click.echo( + bytes( + "\n".join( + (json.dumps(item, indent=json_indent) for item in result) + ), + encoding, + ), + output, + ) + elif isinstance(result, dict): + click.echo(bytes(json.dumps(result, indent=json_indent), encoding)) + else: + raise click.ClickException( + f"Backend returned unexpected format {str(type(result))}" + ) except SigmaError as e: if verbose: click.echo('Error while converting') diff --git a/tests/files/multiple_rules/linux/linux_rule.yml b/tests/files/multiple_rules/linux/linux_rule.yml new file mode 100644 index 0000000..9de8bf0 --- /dev/null +++ b/tests/files/multiple_rules/linux/linux_rule.yml @@ -0,0 +1,12 @@ +title: Linux rule +id: 44444444-4444-4444-4444-444444444444 +description: Linux specific rule +status: stable +level: high +logsource: + category: process_creation + product: linux +detection: + selection: + Image|endswith: '/linux' + condition: selection diff --git a/tests/files/multiple_rules/multi_condition.yml b/tests/files/multiple_rules/multi_condition.yml new file mode 100644 index 0000000..a165a9a --- /dev/null +++ b/tests/files/multiple_rules/multi_condition.yml @@ -0,0 +1,19 @@ +title: Multiple condition rule +id: 55555555-5555-5555-5555-555555555555 +description: Rule with multiple conditions +status: stable +level: high +logsource: + category: process_creation + product: windows +detection: + selection1: + Image|endswith: '\first.exe' + selection2: + Image|endswith: '\second.exe' + selection3: + Image|endswith: '\third.exe' + condition: + - selection1 + - selection2 + - selection3 diff --git a/tests/files/multiple_rules/rule_1.yml b/tests/files/multiple_rules/rule_1.yml new file mode 100644 index 0000000..11f9c11 --- /dev/null +++ b/tests/files/multiple_rules/rule_1.yml @@ -0,0 +1,12 @@ +title: Test rule 1 +id: 11111111-1111-1111-1111-111111111111 +description: First test rule +status: stable +level: high +logsource: + category: process_creation + product: windows +detection: + selection: + Image|endswith: '\test1.exe' + condition: selection diff --git a/tests/files/multiple_rules/rule_2.yml b/tests/files/multiple_rules/rule_2.yml new file mode 100644 index 0000000..f26a815 --- /dev/null +++ b/tests/files/multiple_rules/rule_2.yml @@ -0,0 +1,12 @@ +title: Test rule 2 +id: 22222222-2222-2222-2222-222222222222 +description: Second test rule +status: stable +level: medium +logsource: + category: process_creation + product: windows +detection: + selection: + Image|endswith: '\test2.exe' + condition: selection diff --git a/tests/files/multiple_rules/windows/windows_rule.yml b/tests/files/multiple_rules/windows/windows_rule.yml new file mode 100644 index 0000000..fdf12b3 --- /dev/null +++ b/tests/files/multiple_rules/windows/windows_rule.yml @@ -0,0 +1,12 @@ +title: Windows rule +id: 33333333-3333-3333-3333-333333333333 +description: Windows specific rule +status: stable +level: high +logsource: + category: process_creation + product: windows +detection: + selection: + Image|endswith: '\windows.exe' + condition: selection diff --git a/tests/test_convert.py b/tests/test_convert.py index 9aee53f..774582d 100644 --- a/tests/test_convert.py +++ b/tests/test_convert.py @@ -270,4 +270,188 @@ def test_convert_invalid_correlation_method(): convert, ["-t", "text_query_test", "-f", "str", "-c", "invalid", "tests/files/valid"] ) assert result.exit_code != 0 - assert "Correlation method 'invalid' is not supported" in result.stderr \ No newline at end of file + assert "Correlation method 'invalid' is not supported" in result.stderr + + +def test_convert_output_dir_basic(tmp_path): + """Test basic output to separate files in a directory.""" + cli = CliRunner() + output_dir = tmp_path / "output" + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "-p", + "another_test", + "--disable-pipeline-check", + "--output-dir", + str(output_dir), + "tests/files/multiple_rules/rule_1.yml", + "tests/files/multiple_rules/rule_2.yml", + ], + ) + assert result.exit_code == 0 + assert "Wrote 2 file(s)" in result.stderr + + # Check that output files exist + assert (output_dir / "rule_1.txt").exists() + assert (output_dir / "rule_2.txt").exists() + + # Check content + content1 = (output_dir / "rule_1.txt").read_text() + assert 'Image endswith "\\test1.exe"' in content1 + + +def test_convert_output_dir_with_subdirs(tmp_path): + """Test output with directory structure preserved.""" + cli = CliRunner() + output_dir = tmp_path / "output" + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "-p", + "another_test", + "--disable-pipeline-check", + "--output-dir", + str(output_dir), + "--output-filename-template", + "{path}/{stem}.esql", + "tests/files/multiple_rules/", + ], + ) + assert result.exit_code == 0 + + # Check that output files exist with subdirectories + assert (output_dir / "windows" / "windows_rule.esql").exists() + assert (output_dir / "linux" / "linux_rule.esql").exists() + + +def test_convert_output_dir_flat(tmp_path): + """Test output with flat structure (no subdirs).""" + cli = CliRunner() + output_dir = tmp_path / "output" + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "-p", + "another_test", + "--disable-pipeline-check", + "--output-dir", + str(output_dir), + "--output-filename-template", + "{stem}.txt", + "tests/files/multiple_rules/", + ], + ) + assert result.exit_code == 0 + + # Check that all files are in the root output directory + assert (output_dir / "rule_1.txt").exists() + assert (output_dir / "rule_2.txt").exists() + assert (output_dir / "windows_rule.txt").exists() + assert (output_dir / "linux_rule.txt").exists() + + +def test_convert_output_dir_mutually_exclusive(tmp_path): + """Test that --output and --output-dir are mutually exclusive.""" + cli = CliRunner() + output_file = tmp_path / "output.txt" + output_dir = tmp_path / "output" + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "-p", + "another_test", + "--disable-pipeline-check", + "--output", + str(output_file), + "--output-dir", + str(output_dir), + "tests/files/valid", + ], + ) + assert result.exit_code != 0 + assert "mutually exclusive" in result.stderr + + +def test_convert_output_dir_with_index(tmp_path): + """Test output with index for rules that generate multiple queries.""" + cli = CliRunner() + output_dir = tmp_path / "output" + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "-p", + "another_test", + "--disable-pipeline-check", + "--output-dir", + str(output_dir), + "--output-filename-template", + "{stem}-{index}.txt", + "tests/files/multiple_rules/multi_condition.yml", + ], + ) + assert result.exit_code == 0 + + # Check that output files with indexes exist + # The rule has 3 conditions, so it should generate 3 separate queries + assert (output_dir / "multi_condition-1.txt").exists() + assert (output_dir / "multi_condition-2.txt").exists() + assert (output_dir / "multi_condition-3.txt").exists() + + +def test_convert_output_dir_with_correlation_rules(tmp_path): + """Test that correlation rules are not supported with --output-dir.""" + cli = CliRunner() + output_dir = tmp_path / "output" + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "-c", + "test", + "--output-dir", + str(output_dir), + "tests/files/sigma_correlation_rules.yml", + ], + ) + # Should fail with a clear error message + assert result.exit_code != 0 + assert "correlation" in result.stderr.lower() or "collection" in result.stderr.lower() + + +def test_convert_output_dir_with_filter(tmp_path): + """Test that filters are applied correctly with --output-dir.""" + cli = CliRunner() + output_dir = tmp_path / "output" + result = cli.invoke( + convert, + [ + "-t", + "text_query_test", + "--filter", + "tests/files/sigma_filter.yml", + "--output-dir", + str(output_dir), + "tests/files/valid/sigma_rule.yml", + ], + ) + assert result.exit_code == 0 + + # Check that output file exists + assert (output_dir / "sigma_rule.txt").exists() + + # Check that filter was applied + content = (output_dir / "sigma_rule.txt").read_text() + assert 'not User startswith "ADM_"' in content +