|
| 1 | +from typing import Optional, Dict, Any |
| 2 | +import json |
| 3 | +import sys |
| 4 | +import logging |
| 5 | +from pathlib import Path |
| 6 | +from .core.classes import Diff, Issue |
| 7 | + |
| 8 | + |
| 9 | +class OutputHandler: |
| 10 | + blocking_disabled: bool |
| 11 | + logger: logging.Logger |
| 12 | + |
| 13 | + def __init__(self, blocking_disabled: bool): |
| 14 | + self.blocking_disabled = blocking_disabled |
| 15 | + self.logger = logging.getLogger("socketcli") |
| 16 | + |
| 17 | + def handle_output(self, diff_report: Diff, sbom_file_name: Optional[str] = None, json_output: bool = False) -> int: |
| 18 | + """Main output handler that determines output format and returns exit code""" |
| 19 | + if json_output: |
| 20 | + self.output_console_json(diff_report, sbom_file_name) |
| 21 | + else: |
| 22 | + self.output_console_comments(diff_report, sbom_file_name) |
| 23 | + |
| 24 | + self.save_sbom_file(diff_report, sbom_file_name) |
| 25 | + return 0 if self.report_pass(diff_report) else 1 |
| 26 | + |
| 27 | + def output_console_comments(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None: |
| 28 | + """Outputs formatted console comments""" |
| 29 | + if not diff_report.issues: |
| 30 | + self.logger.info("No issues found") |
| 31 | + return |
| 32 | + |
| 33 | + for issue in diff_report.issues: |
| 34 | + self._output_issue(issue) |
| 35 | + |
| 36 | + def output_console_json(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None: |
| 37 | + """Outputs JSON formatted results""" |
| 38 | + output = { |
| 39 | + "issues": [self._format_issue(issue) for issue in diff_report.issues], |
| 40 | + "pass": self.report_pass(diff_report) |
| 41 | + } |
| 42 | + if sbom_file_name: |
| 43 | + output["sbom_file"] = sbom_file_name |
| 44 | + |
| 45 | + json.dump(output, sys.stdout, indent=2) |
| 46 | + sys.stdout.write("\n") |
| 47 | + |
| 48 | + def report_pass(self, diff_report: Diff) -> bool: |
| 49 | + """Determines if the report passes security checks""" |
| 50 | + if not diff_report.issues: |
| 51 | + return True |
| 52 | + |
| 53 | + if self.blocking_disabled: |
| 54 | + return True |
| 55 | + |
| 56 | + return not any(issue.blocking for issue in diff_report.issues) |
| 57 | + |
| 58 | + def save_sbom_file(self, diff_report: Diff, sbom_file_name: Optional[str] = None) -> None: |
| 59 | + """Saves SBOM file if filename is provided""" |
| 60 | + if not sbom_file_name or not diff_report.sbom: |
| 61 | + return |
| 62 | + |
| 63 | + sbom_path = Path(sbom_file_name) |
| 64 | + sbom_path.parent.mkdir(parents=True, exist_ok=True) |
| 65 | + |
| 66 | + with open(sbom_path, "w") as f: |
| 67 | + json.dump(diff_report.sbom, f, indent=2) |
| 68 | + |
| 69 | + def _output_issue(self, issue: Issue) -> None: |
| 70 | + """Helper method to format and output a single issue""" |
| 71 | + severity = issue.severity.upper() if issue.severity else "UNKNOWN" |
| 72 | + status = "🚫 Blocking" if issue.blocking else "⚠️ Warning" |
| 73 | + |
| 74 | + self.logger.warning(f"\n{status} - Severity: {severity}") |
| 75 | + self.logger.warning(f"Title: {issue.title}") |
| 76 | + if issue.description: |
| 77 | + self.logger.warning(f"Description: {issue.description}") |
| 78 | + if issue.recommendation: |
| 79 | + self.logger.warning(f"Recommendation: {issue.recommendation}") |
| 80 | + |
| 81 | + def _format_issue(self, issue: Issue) -> Dict[str, Any]: |
| 82 | + """Helper method to format an issue for JSON output""" |
| 83 | + return { |
| 84 | + "title": issue.title, |
| 85 | + "description": issue.description, |
| 86 | + "severity": issue.severity, |
| 87 | + "blocking": issue.blocking, |
| 88 | + "recommendation": issue.recommendation |
| 89 | + } |
0 commit comments