diff --git a/activerecon/cli.py b/activerecon/cli.py new file mode 100644 index 0000000..00894f6 --- /dev/null +++ b/activerecon/cli.py @@ -0,0 +1,93 @@ +import argparse +import logging + +from .models import ReconOptions +from .modules.doctor import run_doctor +from .output_paths import DEFAULT_REPORT_DIR +from .runner import ReconValidationError, run_recon + + +LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" + + +def configure_logging(verbose=False, quiet=False): + if verbose: + level = logging.DEBUG + elif quiet: + level = logging.WARNING + else: + level = logging.INFO + + root_logger = logging.getLogger() + if root_logger.handlers: + root_logger.setLevel(level) + for handler in root_logger.handlers: + handler.setLevel(level) + else: + logging.basicConfig(level=level, format=LOG_FORMAT) + + +def build_parser(): + parser = argparse.ArgumentParser(description="Active Recon Tool") + parser.add_argument("--target", help="Target IP or domain") + parser.add_argument("--doctor", action="store_true", help="Check local dependencies and exit without scanning") + parser.add_argument("--verbose", action="store_true", help="Show debug logging") + parser.add_argument("--quiet", action="store_true", help="Only show warnings and errors") + parser.add_argument( + "--output", + default=None, + help="Report name or path. Bare names are saved as reports/_..", + ) + parser.add_argument( + "--output-format", + choices=("md", "json", "both"), + default="both", + help="Report output format. Defaults to both Markdown and JSON.", + ) + parser.add_argument("--scope", help="Optional scope file with allowed domains, IPs, or CIDRs") + parser.add_argument( + "--dry-run", + action="store_true", + help="Validate options and show planned outputs without scanning", + ) + parser.add_argument( + "--scan-profile", + default="fast", + help="Choose a pre-defined Nmap profile from config.yaml", + ) + return parser + + +def options_from_args(args): + return ReconOptions( + target=args.target, + scan_profile=args.scan_profile, + output=args.output, + output_format=args.output_format, + scope=args.scope, + dry_run=args.dry_run, + verbose=args.verbose, + quiet=args.quiet, + ) + + +def main(argv=None): + parser = build_parser() + args = parser.parse_args(argv) + if args.verbose and args.quiet: + parser.error("--verbose and --quiet cannot be used together") + + configure_logging(args.verbose, args.quiet) + + if args.doctor: + run_doctor(DEFAULT_REPORT_DIR) + return None + + if not args.target: + parser.error("--target is required unless --doctor is used") + + try: + return run_recon(options_from_args(args)) + except ReconValidationError as e: + parser.error(str(e)) + return None diff --git a/activerecon/main.py b/activerecon/main.py index 85f36ac..2f05ddc 100644 --- a/activerecon/main.py +++ b/activerecon/main.py @@ -1,269 +1,4 @@ -import argparse -import ipaddress -import logging -import re -from datetime import datetime -from pathlib import Path - -from .modules.nmap_scan import run_nmap_scan -from .modules.http_enum import analyze_http -from .modules.dns_analysis import analyze_dns -from .modules.report_generator import generate_report -from .modules.json_report import generate_json_report -from .modules.config_loader import load_config -from .modules.doctor import run_doctor -from .modules.endpoint_discovery import discover_endpoints -from .modules.risk_analysis import generate_attention_findings -from .modules.scope_guard import is_target_in_scope -from .modules.tls_analysis import analyze_tls - - -CONFIG = None -LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" - - -DEFAULT_REPORT_DIR = "reports" -COMMON_HTTP_PORTS = {"80", "443", "3000", "5000", "8000", "8080", "8443", "9000", "9443"} -DNS_IP_SKIP_REASON = "DNS analysis skipped for IP address target" - - -def configure_logging(verbose=False, quiet=False): - if verbose: - level = logging.DEBUG - elif quiet: - level = logging.WARNING - else: - level = logging.INFO - - root_logger = logging.getLogger() - if root_logger.handlers: - root_logger.setLevel(level) - for handler in root_logger.handlers: - handler.setLevel(level) - else: - logging.basicConfig(level=level, format=LOG_FORMAT) - - -def get_config(): - return CONFIG if CONFIG is not None else load_config() - - -def _is_http_service(port): - service = str(port.get("service", "")).lower() - state = str(port.get("state", "")).lower() - portid = str(port.get("portid", "")) - - if state and state != "open": - return False - - return "http" in service or portid in COMMON_HTTP_PORTS - - -def _get_http_ports(nmap_results): - ports = nmap_results.get("ports", []) if isinstance(nmap_results, dict) else [] - return [port for port in ports if _is_http_service(port)] - - -def _is_ip_target(target): - try: - ipaddress.ip_address(target) - return True - except ValueError: - return False - - -def _dns_skip_result(): - return { - "skipped": True, - "reason": DNS_IP_SKIP_REASON, - "A": [], - "MX": [], - "TXT": [], - } - - -def _web_recon_enabled(config, scan_profile): - web_recon = config.get("web_recon", {}) if isinstance(config, dict) else {} - enabled_profiles = web_recon.get("enabled_profiles", []) - return scan_profile in enabled_profiles - - -def _safe_report_name(target): - safe_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", target).strip("._-") - return safe_name or "target" - - -def build_report_path(target, output=None, now=None, suffix=".md"): - timestamp = (now or datetime.now()).strftime("%Y%m%d_%H%M%S") - - if output: - output_path = Path(output) - output_dir = output_path.parent if output_path.parent != Path(".") else Path(DEFAULT_REPORT_DIR) - stem = _safe_report_name(output_path.stem) - else: - output_dir = Path(DEFAULT_REPORT_DIR) - stem = _safe_report_name(target) - - filename = f"{stem}_{timestamp}{suffix}" - return str(output_dir / filename) - - -def build_output_paths(target, output=None, output_format="both", now=None): - now = now or datetime.now() - markdown_path = build_report_path(target, output, now, ".md") - markdown = markdown_path if output_format in {"md", "both"} else None - - json_path = build_report_path(target, output, now, ".json") - json_output = json_path if output_format in {"json", "both"} else None - return markdown, json_output - - -def main(): - parser = argparse.ArgumentParser(description="Active Recon Tool") - parser.add_argument("--target", help="Target IP or domain") - parser.add_argument("--doctor", action="store_true", help="Check local dependencies and exit without scanning") - parser.add_argument("--verbose", action="store_true", help="Show debug logging") - parser.add_argument("--quiet", action="store_true", help="Only show warnings and errors") - parser.add_argument( - "--output", - default=None, - help="Report name or path. Bare names are saved as reports/_..", - ) - parser.add_argument( - "--output-format", - choices=("md", "json", "both"), - default="both", - help="Report output format. Defaults to both Markdown and JSON.", - ) - parser.add_argument("--scope", help="Optional scope file with allowed domains, IPs, or CIDRs") - parser.add_argument("--dry-run", action="store_true", help="Validate options and show planned outputs without scanning") - - parser.add_argument( - "--scan-profile", - default="fast", - help="Choose a pre-defined Nmap profile from config.yaml", - ) - - args = parser.parse_args() - if args.verbose and args.quiet: - parser.error("--verbose and --quiet cannot be used together") - - configure_logging(args.verbose, args.quiet) - - if args.doctor: - run_doctor(DEFAULT_REPORT_DIR) - return - - if not args.target: - parser.error("--target is required unless --doctor is used") - - try: - config = get_config() - except Exception as e: - parser.error(f"Could not load config: {e}") - - scan_profiles = config.get("scan_profiles", {}) - if args.scan_profile not in scan_profiles: - choices = ", ".join(sorted(scan_profiles)) or "none configured" - parser.error(f"Unknown scan profile '{args.scan_profile}'. Available profiles: {choices}") - - chosen_profile = args.scan_profile - scan_command = scan_profiles[chosen_profile] - - target = args.target - markdown_output, json_output = build_output_paths(target, args.output, args.output_format) - results = {} - - if args.scope: - try: - in_scope = is_target_in_scope(target, args.scope) - except OSError as e: - parser.error(f"Could not read scope file {args.scope}: {e}") - if not in_scope: - parser.error(f"Target is outside the allowed scope file: {args.scope}") - - logging.info(f"Starting automated recon on target: {target}") - logging.info(f"Using scan profile: {chosen_profile} ({scan_command})") - if markdown_output: - logging.info(f"Markdown report path: {markdown_output}") - if json_output: - logging.info(f"JSON report path: {json_output}") - - if args.dry_run: - logging.info("Dry run requested. No Nmap, HTTP, TLS, or DNS checks were executed.") - return - - try: - nmap_results = run_nmap_scan(target, scan_command, config) - if not isinstance(nmap_results, dict): - nmap_results = {"target": target, "ports": [], "error": "Nmap scan returned invalid results"} - results["Nmap Scan"] = nmap_results - if nmap_results.get("error"): - logging.error(f"Nmap scan completed with errors: {nmap_results['error']}") - else: - logging.info(f"Nmap scan completed successfully. Found {len(nmap_results.get('ports', []))} ports.") - except Exception as e: - logging.error(f"Error during Nmap scan: {e}") - nmap_results = {"target": target, "ports": [], "error": f"Nmap scan failed: {e}"} - results["Nmap Scan"] = nmap_results - - http_ports = _get_http_ports(nmap_results) - if http_ports: - try: - logging.info(f"HTTP services found: {http_ports}. Running HTTP analysis.") - results["HTTP Analysis"] = analyze_http(target, config, http_ports) - except Exception as e: - logging.error(f"Error during HTTP analysis: {e}") - results["HTTP Analysis"] = {"error": f"HTTP analysis failed: {e}"} - else: - logging.info("No HTTP ports found. Skipping HTTP analysis.") - results["HTTP Analysis"] = [] - - try: - logging.info("Running TLS analysis.") - results["TLS Analysis"] = analyze_tls(results["HTTP Analysis"], config.get("http_timeout", 5)) - except Exception as e: - logging.error(f"Error during TLS analysis: {e}") - results["TLS Analysis"] = {"error": f"TLS analysis failed: {e}"} - - if _web_recon_enabled(config, chosen_profile): - try: - logging.info("Running endpoint discovery.") - results["Endpoint Discovery"] = discover_endpoints(results["HTTP Analysis"], config) - except Exception as e: - logging.error(f"Error during endpoint discovery: {e}") - results["Endpoint Discovery"] = {"error": f"Endpoint discovery failed: {e}"} - - if _is_ip_target(target): - logging.info(DNS_IP_SKIP_REASON) - results["DNS Analysis"] = _dns_skip_result() - else: - try: - logging.info("Running DNS analysis.") - results["DNS Analysis"] = analyze_dns(target) - except Exception as e: - logging.error(f"Error during DNS analysis: {e}") - results["DNS Analysis"] = {"error": f"DNS analysis failed: {e}"} - - interesting_signals = generate_attention_findings(results) - results["Attention"] = interesting_signals - results["Interesting Signals"] = interesting_signals - - if markdown_output: - try: - generate_report(target, results, markdown_output) - logging.info(f"Markdown report saved to {markdown_output}") - except Exception as e: - logging.error(f"Error during Markdown report generation: {e}") - - if json_output: - try: - generate_json_report(target, results, json_output, scan_profile=chosen_profile) - logging.info(f"JSON report saved to {json_output}") - except Exception as e: - logging.error(f"Error during JSON report generation: {e}") - - logging.info("Recon completed.") +from .cli import main if __name__ == "__main__": diff --git a/activerecon/models.py b/activerecon/models.py new file mode 100644 index 0000000..e71903c --- /dev/null +++ b/activerecon/models.py @@ -0,0 +1,37 @@ +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + + +@dataclass +class TargetSpec: + raw: str + host: str + scheme: Optional[str] = None + port: Optional[int] = None + path: str = "" + is_ip: bool = False + is_private: bool = False + is_loopback: bool = False + + +@dataclass +class ReconOptions: + target: str + scan_profile: str = "fast" + output: Optional[str] = None + output_format: str = "both" + scope: Optional[str] = None + dry_run: bool = False + verbose: bool = False + quiet: bool = False + + +@dataclass +class ReconResult: + target: str + target_spec: TargetSpec + scan_profile: str + markdown_output: Optional[str] = None + json_output: Optional[str] = None + results: Dict[str, Any] = field(default_factory=dict) + dry_run: bool = False diff --git a/activerecon/modules/endpoint_categories.py b/activerecon/modules/endpoint_categories.py new file mode 100644 index 0000000..9d8af2d --- /dev/null +++ b/activerecon/modules/endpoint_categories.py @@ -0,0 +1,119 @@ +from pathlib import PurePosixPath + + +STATIC_ASSET_EXTENSIONS = { + ".css", + ".eot", + ".gif", + ".ico", + ".jpeg", + ".jpg", + ".js", + ".map", + ".png", + ".svg", + ".ttf", + ".webp", + ".woff", + ".woff2", +} +WELL_KNOWN_PATHS = { + "/robots.txt", + "/sitemap.xml", + "/.well-known/security.txt", + "/swagger", + "/api-docs", + "/ftp", +} +CATEGORY_KEYS = ( + "api_like", + "frontend_routes", + "static_assets", + "well_known", + "header_discovered", + "realtime_services", +) +MARKDOWN_CATEGORY_TITLES = { + "api_like": "API-like Endpoints", + "frontend_routes": "Frontend Routes", + "realtime_services": "Realtime / Service Endpoints", + "well_known": "Well-known / Probed Paths", + "static_assets": "Static Assets", +} +MARKDOWN_CATEGORY_ORDER = ( + "api_like", + "frontend_routes", + "realtime_services", + "well_known", + "static_assets", +) + + +def path_without_query(path): + return str(path or "/").split("?", 1)[0].split("#", 1)[0] + + +def is_api_like(path): + lower_path = str(path or "").lower() + return ( + lower_path == "/api" + or lower_path == "/rest" + or lower_path.startswith("/api/") + or lower_path.startswith("/rest/") + ) + + +def is_static_asset(path): + clean_path = path_without_query(path).lower() + filename = PurePosixPath(clean_path).name + return PurePosixPath(clean_path).suffix in STATIC_ASSET_EXTENSIONS or "chunk" in filename + + +def is_realtime_service(path): + clean_path = path_without_query(path).lower().rstrip("/") + return ( + clean_path == "/socket.io" + or clean_path == "/engine.io" + or clean_path.startswith(("/socket.io/", "/engine.io/")) + ) + + +def unique_endpoint_paths(endpoints): + return { + endpoint.get("path") + for endpoint in endpoints + if isinstance(endpoint, dict) and endpoint.get("path") + } + + +def primary_endpoint_category(endpoint): + path = endpoint.get("path", "") + lower_path = path_without_query(path).lower() + if is_realtime_service(path): + return "realtime_services" + if is_static_asset(path): + return "static_assets" + if is_api_like(path): + return "api_like" + if lower_path in WELL_KNOWN_PATHS: + return "well_known" + return "frontend_routes" + + +def categorize_endpoints(endpoints): + categories = {key: [] for key in CATEGORY_KEYS} + for endpoint in endpoints: + if not isinstance(endpoint, dict): + continue + categories[primary_endpoint_category(endpoint)].append(endpoint) + if str(endpoint.get("source", "")).startswith("response-header"): + categories["header_discovered"].append(endpoint) + return categories + + +def endpoint_category_summary(endpoints, categories=None): + categories = categories or categorize_endpoints(endpoints) + summary = {"endpoint_count": len(unique_endpoint_paths(endpoints))} + for key in CATEGORY_KEYS: + summary[key] = len(unique_endpoint_paths(categories[key])) + return summary diff --git a/activerecon/modules/json_report.py b/activerecon/modules/json_report.py index 9cfe6f0..9785e7e 100644 --- a/activerecon/modules/json_report.py +++ b/activerecon/modules/json_report.py @@ -2,7 +2,13 @@ import ipaddress import json from datetime import datetime -from pathlib import Path, PurePosixPath +from pathlib import Path + +from .endpoint_categories import ( + categorize_endpoints, + endpoint_category_summary, + unique_endpoint_paths, +) SCHEMA_VERSION = "1.1" @@ -13,40 +19,6 @@ "Target appears to be local or private. Results may include local system, " "development, Docker, virtualization, or lab services." ) -STATIC_ASSET_EXTENSIONS = { - ".css", - ".eot", - ".gif", - ".ico", - ".jpeg", - ".jpg", - ".js", - ".map", - ".png", - ".svg", - ".ttf", - ".webp", - ".woff", - ".woff2", -} -WELL_KNOWN_PATHS = { - "/robots.txt", - "/sitemap.xml", - "/.well-known/security.txt", - "/swagger", - "/api-docs", - "/ftp", -} -CATEGORY_KEYS = ( - "api_like", - "frontend_routes", - "static_assets", - "well_known", - "header_discovered", - "realtime_services", -) - - def _as_list(value): return value if isinstance(value, list) else [] @@ -97,7 +69,7 @@ def _endpoint_count(results): paths = set() for group in _endpoint_groups(results): if isinstance(group, dict): - paths.update(_unique_endpoint_paths(_as_list(group.get("endpoints", [])))) + paths.update(unique_endpoint_paths(_as_list(group.get("endpoints", [])))) return len(paths) @@ -156,74 +128,13 @@ def build_json_metadata(target, results, scan_profile=None, scan_context=None): return metadata -def _path_without_query(path): - return str(path or "/").split("?", 1)[0].split("#", 1)[0] - - -def _is_api_like(path): - lower_path = str(path or "").lower() - return lower_path == "/api" or lower_path == "/rest" or lower_path.startswith("/api/") or lower_path.startswith("/rest/") - - -def _is_static_asset(path): - clean_path = _path_without_query(path).lower() - filename = PurePosixPath(clean_path).name - return PurePosixPath(clean_path).suffix in STATIC_ASSET_EXTENSIONS or "chunk" in filename - - -def _is_realtime_service(path): - clean_path = _path_without_query(path).lower().rstrip("/") - return ( - clean_path == "/socket.io" - or clean_path == "/engine.io" - or clean_path.startswith(("/socket.io/", "/engine.io/")) - ) - - -def _unique_endpoint_paths(endpoints): - return { - endpoint.get("path") - for endpoint in endpoints - if isinstance(endpoint, dict) and endpoint.get("path") - } - - -def _primary_endpoint_category(endpoint): - path = endpoint.get("path", "") - lower_path = _path_without_query(path).lower() - if _is_realtime_service(path): - return "realtime_services" - if _is_static_asset(path): - return "static_assets" - if _is_api_like(path): - return "api_like" - if lower_path in WELL_KNOWN_PATHS: - return "well_known" - return "frontend_routes" - - -def _endpoint_categories(endpoints): - categories = {key: [] for key in CATEGORY_KEYS} - for endpoint in endpoints: - if not isinstance(endpoint, dict): - continue - categories[_primary_endpoint_category(endpoint)].append(endpoint) - if str(endpoint.get("source", "")).startswith("response-header"): - categories["header_discovered"].append(endpoint) - - summary = {"endpoint_count": len(_unique_endpoint_paths(endpoints))} - for key in CATEGORY_KEYS: - summary[key] = len(_unique_endpoint_paths(categories[key])) - return summary, categories - - def _enrich_endpoint_discovery(results): for group in _endpoint_groups(results): if not isinstance(group, dict): continue endpoints = _as_list(group.get("endpoints", [])) - summary, categories = _endpoint_categories(endpoints) - group["summary"] = summary + categories = categorize_endpoints(endpoints) + group["summary"] = endpoint_category_summary(endpoints, categories) group["categories"] = categories diff --git a/activerecon/modules/report_generator.py b/activerecon/modules/report_generator.py index 7fecf17..23e13aa 100644 --- a/activerecon/modules/report_generator.py +++ b/activerecon/modules/report_generator.py @@ -1,8 +1,13 @@ import ipaddress import logging -from pathlib import PurePosixPath from pathlib import Path +from .endpoint_categories import ( + MARKDOWN_CATEGORY_ORDER, + MARKDOWN_CATEGORY_TITLES, + categorize_endpoints, +) + def _format_error(error): return f"\n**Error:** {error}\n" if error else "" @@ -74,59 +79,6 @@ def _write_http_result(f, item): f.write(f" - `{key}`: {value}\n") -STATIC_ASSET_EXTENSIONS = { - ".css", - ".eot", - ".gif", - ".ico", - ".jpeg", - ".jpg", - ".js", - ".map", - ".png", - ".svg", - ".ttf", - ".webp", - ".woff", - ".woff2", -} -WELL_KNOWN_REPORT_PATHS = { - "/robots.txt", - "/sitemap.xml", - "/.well-known/security.txt", - "/swagger", - "/api-docs", - "/ftp", -} - - -def _path_without_query(path): - return str(path or "/").split("?", 1)[0].split("#", 1)[0] - - -def _is_api_like_endpoint(path): - lower_path = str(path or "").lower() - return lower_path == "/api" or lower_path == "/rest" or lower_path.startswith("/api/") or lower_path.startswith("/rest/") - - -def _is_static_asset(path): - clean_path = _path_without_query(path).lower() - filename = PurePosixPath(clean_path).name - return PurePosixPath(clean_path).suffix in STATIC_ASSET_EXTENSIONS or "chunk" in filename - - -def _endpoint_category(endpoint): - path = endpoint.get("path", "") - lower_path = str(path).lower() - if _is_static_asset(path): - return "Static Assets" - if _is_api_like_endpoint(path): - return "API-like Endpoints" - if lower_path in WELL_KNOWN_REPORT_PATHS: - return "Well-known / Probed Paths" - return "Frontend Routes" - - def _endpoint_line(endpoint): line = ( f"- `{endpoint.get('path', '/')}` " @@ -182,17 +134,10 @@ def _write_endpoint_discovery(f, endpoint_results): f.write("- No endpoints discovered.\n\n") continue - categorized = { - "API-like Endpoints": [], - "Frontend Routes": [], - "Well-known / Probed Paths": [], - "Static Assets": [], - } - for endpoint in endpoints: - categorized[_endpoint_category(endpoint)].append(endpoint) - - for title in ("API-like Endpoints", "Frontend Routes", "Well-known / Probed Paths", "Static Assets"): - _write_endpoint_category(f, title, categorized[title]) + categorized = categorize_endpoints(endpoints) + + for key in MARKDOWN_CATEGORY_ORDER: + _write_endpoint_category(f, MARKDOWN_CATEGORY_TITLES[key], categorized[key]) f.write("---\n\n") diff --git a/activerecon/modules/scope_guard.py b/activerecon/modules/scope_guard.py index 57f03e9..76cbefc 100644 --- a/activerecon/modules/scope_guard.py +++ b/activerecon/modules/scope_guard.py @@ -1,47 +1,5 @@ -import ipaddress -from pathlib import Path -from urllib.parse import urlparse - - -def _clean_target(value): - value = value.strip() - parsed = urlparse(value if "://" in value else f"//{value}") - host = parsed.hostname or value - return host.strip().rstrip(".").lower() - - -def _read_scope(scope_file): - entries = [] - for line in Path(scope_file).read_text(encoding="utf-8").splitlines(): - line = line.split("#", 1)[0].strip() - if line: - entries.append(line) - return entries - - -def _domain_matches(target, entry): - entry = _clean_target(entry) - return target == entry or target.endswith(f".{entry}") - - -def _ip_matches(target, entry): - try: - target_ip = ipaddress.ip_address(target) - except ValueError: - return False - - try: - return target_ip in ipaddress.ip_network(entry, strict=False) - except ValueError: - try: - return target_ip == ipaddress.ip_address(entry) - except ValueError: - return False +from ..policies.scope_policy import ScopePolicy def is_target_in_scope(target, scope_file): - normalized_target = _clean_target(target) - return any( - _ip_matches(normalized_target, entry) or _domain_matches(normalized_target, entry) - for entry in _read_scope(scope_file) - ) + return ScopePolicy.from_file(scope_file).allows(target) diff --git a/activerecon/output_paths.py b/activerecon/output_paths.py new file mode 100644 index 0000000..9425432 --- /dev/null +++ b/activerecon/output_paths.py @@ -0,0 +1,36 @@ +import re +from datetime import datetime +from pathlib import Path + + +DEFAULT_REPORT_DIR = "reports" + + +def _safe_report_name(target): + safe_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", str(target)).strip("._-") + return safe_name or "target" + + +def build_report_path(target, output=None, now=None, suffix=".md"): + timestamp = (now or datetime.now()).strftime("%Y%m%d_%H%M%S") + + if output: + output_path = Path(output) + output_dir = output_path.parent if output_path.parent != Path(".") else Path(DEFAULT_REPORT_DIR) + stem = _safe_report_name(output_path.stem) + else: + output_dir = Path(DEFAULT_REPORT_DIR) + stem = _safe_report_name(target) + + filename = f"{stem}_{timestamp}{suffix}" + return str(output_dir / filename) + + +def build_output_paths(target, output=None, output_format="both", now=None): + now = now or datetime.now() + markdown_path = build_report_path(target, output, now, ".md") + markdown = markdown_path if output_format in {"md", "both"} else None + + json_path = build_report_path(target, output, now, ".json") + json_output = json_path if output_format in {"json", "both"} else None + return markdown, json_output diff --git a/activerecon/policies/__init__.py b/activerecon/policies/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/activerecon/policies/__init__.py @@ -0,0 +1 @@ + diff --git a/activerecon/policies/scope_policy.py b/activerecon/policies/scope_policy.py new file mode 100644 index 0000000..2616ab8 --- /dev/null +++ b/activerecon/policies/scope_policy.py @@ -0,0 +1,49 @@ +import ipaddress +from pathlib import Path + +from ..targets.parser import parse_target + + +def _read_scope_file(scope_file): + entries = [] + for line in Path(scope_file).read_text(encoding="utf-8").splitlines(): + line = line.split("#", 1)[0].strip() + if line: + entries.append(line) + return entries + + +def _domain_matches(target_host, entry): + entry_host = parse_target(entry).host + return target_host == entry_host or target_host.endswith(f".{entry_host}") + + +def _ip_matches(target_host, entry): + try: + target_ip = ipaddress.ip_address(target_host) + except ValueError: + return False + + try: + return target_ip in ipaddress.ip_network(entry, strict=False) + except ValueError: + try: + return target_ip == ipaddress.ip_address(entry) + except ValueError: + return False + + +class ScopePolicy: + def __init__(self, entries): + self.entries = list(entries or []) + + @classmethod + def from_file(cls, scope_file): + return cls(_read_scope_file(scope_file)) + + def allows(self, target): + target_host = parse_target(target).host + return any( + _ip_matches(target_host, entry) or _domain_matches(target_host, entry) + for entry in self.entries + ) diff --git a/activerecon/runner.py b/activerecon/runner.py new file mode 100644 index 0000000..618c870 --- /dev/null +++ b/activerecon/runner.py @@ -0,0 +1,177 @@ +import logging + +from .models import ReconOptions, ReconResult +from .modules.config_loader import load_config +from .modules.dns_analysis import analyze_dns +from .modules.endpoint_discovery import discover_endpoints +from .modules.http_enum import analyze_http +from .modules.json_report import generate_json_report +from .modules.nmap_scan import run_nmap_scan +from .modules.report_generator import generate_report +from .modules.risk_analysis import generate_attention_findings +from .modules.tls_analysis import analyze_tls +from .output_paths import build_output_paths +from .policies.scope_policy import ScopePolicy +from .targets.parser import parse_target +from .workflows import get_http_ports, web_recon_enabled + + +CONFIG = None +DNS_IP_SKIP_REASON = "DNS analysis skipped for IP address target" + + +class ReconValidationError(ValueError): + pass + + +def get_config(): + return CONFIG if CONFIG is not None else load_config() + + +def _dns_skip_result(): + return { + "skipped": True, + "reason": DNS_IP_SKIP_REASON, + "A": [], + "MX": [], + "TXT": [], + } + + +def _load_config(): + try: + return get_config() + except Exception as e: + raise ReconValidationError(f"Could not load config: {e}") from e + + +def _scan_command(config, scan_profile): + scan_profiles = config.get("scan_profiles", {}) + if scan_profile not in scan_profiles: + choices = ", ".join(sorted(scan_profiles)) or "none configured" + raise ReconValidationError( + f"Unknown scan profile '{scan_profile}'. Available profiles: {choices}" + ) + return scan_profiles[scan_profile] + + +def _validate_scope(target, scope_file): + if not scope_file: + return + try: + in_scope = ScopePolicy.from_file(scope_file).allows(target) + except OSError as e: + raise ReconValidationError(f"Could not read scope file {scope_file}: {e}") from e + if not in_scope: + raise ReconValidationError(f"Target is outside the allowed scope file: {scope_file}") + + +def run_recon(options: ReconOptions) -> ReconResult: + config = _load_config() + scan_command = _scan_command(config, options.scan_profile) + target_spec = parse_target(options.target) + markdown_output, json_output = build_output_paths( + options.target, + options.output, + options.output_format, + ) + recon_result = ReconResult( + target=options.target, + target_spec=target_spec, + scan_profile=options.scan_profile, + markdown_output=markdown_output, + json_output=json_output, + dry_run=options.dry_run, + ) + + _validate_scope(options.target, options.scope) + + logging.info(f"Starting automated recon on target: {options.target}") + logging.info(f"Using scan profile: {options.scan_profile} ({scan_command})") + if markdown_output: + logging.info(f"Markdown report path: {markdown_output}") + if json_output: + logging.info(f"JSON report path: {json_output}") + + if options.dry_run: + logging.info("Dry run requested. No Nmap, HTTP, TLS, or DNS checks were executed.") + return recon_result + + results = recon_result.results + + try: + nmap_results = run_nmap_scan(options.target, scan_command, config) + if not isinstance(nmap_results, dict): + nmap_results = { + "target": options.target, + "ports": [], + "error": "Nmap scan returned invalid results", + } + results["Nmap Scan"] = nmap_results + if nmap_results.get("error"): + logging.error(f"Nmap scan completed with errors: {nmap_results['error']}") + else: + logging.info(f"Nmap scan completed successfully. Found {len(nmap_results.get('ports', []))} ports.") + except Exception as e: + logging.error(f"Error during Nmap scan: {e}") + nmap_results = {"target": options.target, "ports": [], "error": f"Nmap scan failed: {e}"} + results["Nmap Scan"] = nmap_results + + http_ports = get_http_ports(nmap_results) + if http_ports: + try: + logging.info(f"HTTP services found: {http_ports}. Running HTTP analysis.") + results["HTTP Analysis"] = analyze_http(options.target, config, http_ports) + except Exception as e: + logging.error(f"Error during HTTP analysis: {e}") + results["HTTP Analysis"] = {"error": f"HTTP analysis failed: {e}"} + else: + logging.info("No HTTP ports found. Skipping HTTP analysis.") + results["HTTP Analysis"] = [] + + try: + logging.info("Running TLS analysis.") + results["TLS Analysis"] = analyze_tls(results["HTTP Analysis"], config.get("http_timeout", 5)) + except Exception as e: + logging.error(f"Error during TLS analysis: {e}") + results["TLS Analysis"] = {"error": f"TLS analysis failed: {e}"} + + if web_recon_enabled(config, options.scan_profile): + try: + logging.info("Running endpoint discovery.") + results["Endpoint Discovery"] = discover_endpoints(results["HTTP Analysis"], config) + except Exception as e: + logging.error(f"Error during endpoint discovery: {e}") + results["Endpoint Discovery"] = {"error": f"Endpoint discovery failed: {e}"} + + if target_spec.is_ip: + logging.info(DNS_IP_SKIP_REASON) + results["DNS Analysis"] = _dns_skip_result() + else: + try: + logging.info("Running DNS analysis.") + results["DNS Analysis"] = analyze_dns(options.target) + except Exception as e: + logging.error(f"Error during DNS analysis: {e}") + results["DNS Analysis"] = {"error": f"DNS analysis failed: {e}"} + + interesting_signals = generate_attention_findings(results) + results["Attention"] = interesting_signals + results["Interesting Signals"] = interesting_signals + + if markdown_output: + try: + generate_report(options.target, results, markdown_output) + logging.info(f"Markdown report saved to {markdown_output}") + except Exception as e: + logging.error(f"Error during Markdown report generation: {e}") + + if json_output: + try: + generate_json_report(options.target, results, json_output, scan_profile=options.scan_profile) + logging.info(f"JSON report saved to {json_output}") + except Exception as e: + logging.error(f"Error during JSON report generation: {e}") + + logging.info("Recon completed.") + return recon_result diff --git a/activerecon/targets/__init__.py b/activerecon/targets/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/activerecon/targets/__init__.py @@ -0,0 +1 @@ + diff --git a/activerecon/targets/parser.py b/activerecon/targets/parser.py new file mode 100644 index 0000000..e957a10 --- /dev/null +++ b/activerecon/targets/parser.py @@ -0,0 +1,42 @@ +import ipaddress +from urllib.parse import urlparse + +from ..models import TargetSpec + + +def parse_target(value): + raw = str(value or "").strip() + parsed = urlparse(raw if "://" in raw else f"//{raw}") + host = (parsed.hostname or raw).strip().rstrip(".").lower() + path = parsed.path or "" + + port = None + try: + port = parsed.port + except ValueError: + port = None + + is_ip = False + is_private = False + is_loopback = False + if host == "localhost": + is_private = True + is_loopback = True + try: + address = ipaddress.ip_address(host) + is_ip = True + is_private = address.is_private + is_loopback = address.is_loopback + except ValueError: + pass + + return TargetSpec( + raw=raw, + host=host, + scheme=parsed.scheme or None, + port=port, + path=path, + is_ip=is_ip, + is_private=is_private, + is_loopback=is_loopback, + ) diff --git a/activerecon/workflows.py b/activerecon/workflows.py new file mode 100644 index 0000000..2fddf79 --- /dev/null +++ b/activerecon/workflows.py @@ -0,0 +1,23 @@ +COMMON_HTTP_PORTS = {"80", "443", "3000", "5000", "8000", "8080", "8443", "9000", "9443"} + + +def is_http_service(port): + service = str(port.get("service", "")).lower() + state = str(port.get("state", "")).lower() + portid = str(port.get("portid", "")) + + if state and state != "open": + return False + + return "http" in service or portid in COMMON_HTTP_PORTS + + +def get_http_ports(nmap_results): + ports = nmap_results.get("ports", []) if isinstance(nmap_results, dict) else [] + return [port for port in ports if is_http_service(port)] + + +def web_recon_enabled(config, scan_profile): + web_recon = config.get("web_recon", {}) if isinstance(config, dict) else {} + enabled_profiles = web_recon.get("enabled_profiles", []) + return scan_profile in enabled_profiles diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..edbb027 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,69 @@ +import pytest + +from activerecon import cli +from activerecon.models import ReconOptions, ReconResult, TargetSpec + + +def test_options_from_args_preserves_cli_flags(): + parser = cli.build_parser() + args = parser.parse_args([ + "--target", + "example.com", + "--scan-profile", + "web", + "--output", + "report", + "--output-format", + "json", + "--scope", + "scope.txt", + "--dry-run", + "--verbose", + ]) + + assert cli.options_from_args(args) == ReconOptions( + target="example.com", + scan_profile="web", + output="report", + output_format="json", + scope="scope.txt", + dry_run=True, + verbose=True, + quiet=False, + ) + + +def test_cli_doctor_skips_recon(monkeypatch): + called = [] + + monkeypatch.setattr(cli, "run_doctor", lambda reports_dir: called.append(reports_dir)) + monkeypatch.setattr(cli, "run_recon", lambda options: (_ for _ in ()).throw(AssertionError())) + + assert cli.main(["--doctor"]) is None + assert called == [cli.DEFAULT_REPORT_DIR] + + +def test_cli_requires_target_without_doctor(): + with pytest.raises(SystemExit) as exc: + cli.main([]) + + assert exc.value.code == 2 + + +def test_cli_calls_runner(monkeypatch): + captured = {} + + def fake_run_recon(options): + captured["options"] = options + return ReconResult( + target=options.target, + target_spec=TargetSpec(raw=options.target, host=options.target), + scan_profile=options.scan_profile, + ) + + monkeypatch.setattr(cli, "run_recon", fake_run_recon) + + result = cli.main(["--target", "example.com", "--scan-profile", "fast", "--quiet"]) + + assert result.target == "example.com" + assert captured["options"].quiet is True diff --git a/tests/test_endpoint_categories.py b/tests/test_endpoint_categories.py new file mode 100644 index 0000000..18f7bab --- /dev/null +++ b/tests/test_endpoint_categories.py @@ -0,0 +1,25 @@ +from activerecon.modules.endpoint_categories import categorize_endpoints, endpoint_category_summary + + +def test_endpoint_categories_group_common_endpoint_types(): + endpoints = [ + {"path": "/api", "source": "well-known"}, + {"path": "/login", "source": "html:href"}, + {"path": "/app.js", "source": "html:script-src"}, + {"path": "/robots.txt", "source": "well-known"}, + {"path": "/#/jobs", "source": "response-header:X-Recruiting"}, + {"path": "/socket.io/?EIO=4", "source": "javascript"}, + {"path": "/api", "source": "javascript"}, + ] + + categories = categorize_endpoints(endpoints) + summary = endpoint_category_summary(endpoints, categories) + + assert [item["path"] for item in categories["api_like"]] == ["/api", "/api"] + assert categories["frontend_routes"][0]["path"] == "/login" + assert categories["static_assets"][0]["path"] == "/app.js" + assert categories["well_known"][0]["path"] == "/robots.txt" + assert categories["header_discovered"][0]["path"] == "/#/jobs" + assert categories["realtime_services"][0]["path"] == "/socket.io/?EIO=4" + assert summary["endpoint_count"] == 6 + assert summary["api_like"] == 1 diff --git a/tests/test_main.py b/tests/test_main.py index 565e851..1cc5af3 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,379 +1,6 @@ -import sys -from datetime import datetime -from pathlib import Path - +import activerecon.cli as cli_module import activerecon.main as main_module -def test_build_report_path_defaults_to_timestamped_reports_dir(): - report_path = main_module.build_report_path( - "https://example.com:443/path", - now=datetime(2026, 6, 17, 9, 8, 7), - ) - - assert report_path == str(Path("reports") / "https_example.com_443_path_20260617_090807.md") - - -def test_build_report_path_respects_explicit_output(): - report_path = main_module.build_report_path( - "example.com", - "custom.md", - now=datetime(2026, 6, 17, 9, 8, 7), - ) - - assert report_path == str(Path("reports") / "custom_20260617_090807.md") - - -def test_build_report_path_respects_explicit_output_directory(tmp_path): - report_path = main_module.build_report_path( - "example.com", - str(tmp_path / "custom.md"), - now=datetime(2026, 6, 17, 9, 8, 7), - ) - - assert report_path == str(tmp_path / "custom_20260617_090807.md") - - -def test_is_http_service_detects_open_dev_port(): - assert main_module._is_http_service({"portid": "3000", "state": "open", "service": "ppp"}) - assert not main_module._is_http_service({"portid": "3000", "state": "closed", "service": "ppp"}) - assert not main_module._is_http_service({"portid": "8080", "state": "filtered", "service": "http"}) - - -def test_main_smoke_with_mocked_modules(monkeypatch, tmp_path): - output = tmp_path / "report.md" - captured = {} - - def fake_nmap(target, scan_command, config): - assert target == "example.com" - assert scan_command == "-Pn" - assert config["http_timeout"] == 5 - return { - "target": target, - "ports": [{"portid": "80", "protocol": "tcp", "state": "open", "service": "http"}], - "status": {"state": "up"}, - "scan_info": {}, - "host": "example.com", - } - - def fake_http(target, config, http_ports): - captured["http_ports"] = http_ports - return [{"url": "http://example.com:80", "status": 200, "headers": {}}] - - def fake_dns(target): - return {"A": ["192.0.2.10"], "MX": [], "TXT": []} - - def fake_report(target, results, output_file): - captured["results"] = results - captured["output_file"] = output_file - - def fake_json_report(target, results, output_file, **kwargs): - captured["json_output_file"] = output_file - captured["json_kwargs"] = kwargs - - monkeypatch.setattr(main_module, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) - monkeypatch.setattr(main_module, "run_nmap_scan", fake_nmap) - monkeypatch.setattr(main_module, "analyze_http", fake_http) - monkeypatch.setattr(main_module, "analyze_tls", lambda http_results, timeout: []) - monkeypatch.setattr(main_module, "analyze_dns", fake_dns) - monkeypatch.setattr(main_module, "generate_attention_findings", lambda results: []) - monkeypatch.setattr(main_module, "generate_report", fake_report) - monkeypatch.setattr(main_module, "generate_json_report", fake_json_report) - monkeypatch.setattr( - main_module, - "datetime", - type("FixedDatetime", (), {"now": staticmethod(lambda: datetime(2026, 6, 17, 9, 8, 7))}), - ) - monkeypatch.setattr( - sys, - "argv", - ["activerecon", "--target", "example.com", "--scan-profile", "fast", "--output", str(output)], - ) - - main_module.main() - - assert captured["http_ports"] == [{"portid": "80", "protocol": "tcp", "state": "open", "service": "http"}] - assert captured["results"]["Nmap Scan"]["status"]["state"] == "up" - assert captured["results"]["Interesting Signals"] == captured["results"]["Attention"] - assert captured["output_file"] == str(tmp_path / "report_20260617_090807.md") - assert captured["json_output_file"] == str(tmp_path / "report_20260617_090807.json") - assert captured["json_kwargs"]["scan_profile"] == "fast" - - -def test_main_handles_failed_nmap_without_http(monkeypatch, tmp_path): - output = tmp_path / "report.md" - captured = {} - - def fake_http(target, config, http_ports): - raise AssertionError("HTTP analysis should not run without HTTP ports") - - def fake_report(target, results, output_file): - captured["results"] = results - - def fake_json_report(target, results, output_file, **kwargs): - captured["json_output_file"] = output_file - - monkeypatch.setattr(main_module, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) - monkeypatch.setattr( - main_module, - "run_nmap_scan", - lambda target, scan_command, config: {"target": target, "ports": [], "error": "nmap failed"}, - ) - monkeypatch.setattr(main_module, "analyze_http", fake_http) - monkeypatch.setattr(main_module, "analyze_tls", lambda http_results, timeout: []) - monkeypatch.setattr(main_module, "analyze_dns", lambda target: {"A": [], "MX": [], "TXT": []}) - monkeypatch.setattr(main_module, "generate_attention_findings", lambda results: []) - monkeypatch.setattr(main_module, "generate_report", fake_report) - monkeypatch.setattr(main_module, "generate_json_report", fake_json_report) - monkeypatch.setattr( - main_module, - "datetime", - type("FixedDatetime", (), {"now": staticmethod(lambda: datetime(2026, 6, 17, 9, 8, 7))}), - ) - monkeypatch.setattr( - sys, - "argv", - ["activerecon", "--target", "example.com", "--scan-profile", "fast", "--output", str(output)], - ) - - main_module.main() - - assert captured["results"]["Nmap Scan"]["error"] == "nmap failed" - assert captured["results"]["HTTP Analysis"] == [] - assert captured["results"]["TLS Analysis"] == [] - - -def test_build_output_paths_defaults_to_markdown_and_json(): - markdown, json_output = main_module.build_output_paths( - "example.com", - output_format="both", - now=datetime(2026, 6, 17, 9, 8, 7), - ) - - assert markdown == str(Path("reports") / "example.com_20260617_090807.md") - assert json_output == str(Path("reports") / "example.com_20260617_090807.json") - - -def test_build_output_paths_json_only_uses_explicit_output(): - markdown, json_output = main_module.build_output_paths( - "example.com", - "custom.json", - "json", - now=datetime(2026, 6, 17, 9, 8, 7), - ) - - assert markdown is None - assert json_output == str(Path("reports") / "custom_20260617_090807.json") - - -def test_build_output_paths_bare_output_stays_in_reports_with_timestamp(): - markdown, json_output = main_module.build_output_paths( - "scanme.nmap.org", - "report.md", - "both", - now=datetime(2026, 6, 17, 10, 28, 5), - ) - - assert markdown == str(Path("reports") / "report_20260617_102805.md") - assert json_output == str(Path("reports") / "report_20260617_102805.json") - - -def test_main_uses_timestamped_default_output(monkeypatch): - captured = {} - - def fake_report(target, results, output_file): - captured["output_file"] = output_file - - def fake_json_report(target, results, output_file, **kwargs): - captured["json_output_file"] = output_file - - monkeypatch.setattr(main_module, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) - monkeypatch.setattr( - main_module, - "run_nmap_scan", - lambda target, scan_command, config: {"target": target, "ports": [], "status": {}, "scan_info": {}}, - ) - monkeypatch.setattr(main_module, "analyze_tls", lambda http_results, timeout: []) - monkeypatch.setattr(main_module, "analyze_dns", lambda target: {"A": [], "MX": [], "TXT": []}) - monkeypatch.setattr(main_module, "generate_attention_findings", lambda results: []) - monkeypatch.setattr(main_module, "generate_report", fake_report) - monkeypatch.setattr(main_module, "generate_json_report", fake_json_report) - monkeypatch.setattr( - main_module, - "datetime", - type("FixedDatetime", (), {"now": staticmethod(lambda: datetime(2026, 6, 17, 9, 8, 7))}), - ) - monkeypatch.setattr(sys, "argv", ["activerecon", "--target", "example.com", "--scan-profile", "fast"]) - - main_module.main() - - assert captured["output_file"] == str(Path("reports") / "example.com_20260617_090807.md") - assert captured["json_output_file"] == str(Path("reports") / "example.com_20260617_090807.json") - - -def test_main_dry_run_skips_scanning(monkeypatch): - monkeypatch.setattr(main_module, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) - monkeypatch.setattr( - main_module, - "run_nmap_scan", - lambda target, scan_command, config: (_ for _ in ()).throw(AssertionError()), - ) - monkeypatch.setattr(sys, "argv", ["activerecon", "--target", "example.com", "--dry-run"]) - - main_module.main() - - -def test_main_doctor_skips_scanning(monkeypatch): - called = [] - - monkeypatch.setattr(main_module, "run_doctor", lambda reports_dir: called.append(reports_dir)) - monkeypatch.setattr( - main_module, - "run_nmap_scan", - lambda target, scan_command, config: (_ for _ in ()).throw(AssertionError()), - ) - monkeypatch.setattr(sys, "argv", ["activerecon", "--doctor"]) - - main_module.main() - - assert called == [main_module.DEFAULT_REPORT_DIR] - - -def test_main_skips_dns_for_ip_target(monkeypatch, tmp_path): - output = tmp_path / "report.md" - captured = {} - - def fake_nmap(target, scan_command, config): - return { - "target": target, - "ports": [{"portid": "3000", "protocol": "tcp", "state": "open", "service": "ppp"}], - "status": {"state": "up"}, - "scan_info": {}, - "host": target, - } - - def fake_http(target, config, http_ports): - captured["http_ports"] = http_ports - return [{"url": "http://127.0.0.1:3000", "status": 200, "headers": {}}] - - def fake_report(target, results, output_file): - captured["results"] = results - - monkeypatch.setattr(main_module, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) - monkeypatch.setattr(main_module, "run_nmap_scan", fake_nmap) - monkeypatch.setattr(main_module, "analyze_http", fake_http) - monkeypatch.setattr(main_module, "analyze_tls", lambda http_results, timeout: []) - monkeypatch.setattr( - main_module, - "analyze_dns", - lambda target: (_ for _ in ()).throw(AssertionError("DNS should be skipped for IP targets")), - ) - monkeypatch.setattr(main_module, "generate_attention_findings", lambda results: []) - monkeypatch.setattr(main_module, "generate_report", fake_report) - monkeypatch.setattr(main_module, "generate_json_report", lambda target, results, output_file, **kwargs: None) - monkeypatch.setattr( - sys, - "argv", - ["activerecon", "--target", "127.0.0.1", "--scan-profile", "fast", "--output", str(output)], - ) - - main_module.main() - - assert captured["http_ports"] == [{"portid": "3000", "protocol": "tcp", "state": "open", "service": "ppp"}] - assert captured["results"]["DNS Analysis"] == { - "skipped": True, - "reason": "DNS analysis skipped for IP address target", - "A": [], - "MX": [], - "TXT": [], - } - - -def test_main_web_profile_runs_endpoint_discovery(monkeypatch, tmp_path): - output = tmp_path / "report.md" - captured = {} - - def fake_nmap(target, scan_command, config): - assert scan_command == "-web" - return { - "target": target, - "ports": [{"portid": "3000", "protocol": "tcp", "state": "open", "service": "ppp"}], - "status": {"state": "up"}, - "scan_info": {}, - "host": target, - } - - def fake_http(target, config, http_ports): - return [{"url": "http://example.com:3000", "status": 200, "headers": {}}] - - def fake_endpoints(http_results, config): - captured["endpoint_http_results"] = http_results - return [{"base_url": "http://example.com:3000", "endpoints": [{"path": "/api", "source": "well-known"}]}] - - def fake_report(target, results, output_file): - captured["results"] = results - - monkeypatch.setattr( - main_module, - "CONFIG", - { - "scan_profiles": {"web": "-web"}, - "http_timeout": 5, - "web_recon": {"enabled_profiles": ["web"]}, - }, - ) - monkeypatch.setattr(main_module, "run_nmap_scan", fake_nmap) - monkeypatch.setattr(main_module, "analyze_http", fake_http) - monkeypatch.setattr(main_module, "analyze_tls", lambda http_results, timeout: []) - monkeypatch.setattr(main_module, "discover_endpoints", fake_endpoints) - monkeypatch.setattr(main_module, "analyze_dns", lambda target: {"A": [], "MX": [], "TXT": []}) - monkeypatch.setattr(main_module, "generate_attention_findings", lambda results: []) - monkeypatch.setattr(main_module, "generate_report", fake_report) - monkeypatch.setattr(main_module, "generate_json_report", lambda target, results, output_file, **kwargs: None) - monkeypatch.setattr( - sys, - "argv", - ["activerecon", "--target", "example.com", "--scan-profile", "web", "--output", str(output)], - ) - - main_module.main() - - assert captured["endpoint_http_results"] == [{"url": "http://example.com:3000", "status": 200, "headers": {}}] - assert captured["results"]["Endpoint Discovery"][0]["endpoints"][0]["path"] == "/api" - - -def test_main_rejects_target_outside_scope(monkeypatch, tmp_path): - scope = tmp_path / "scope.txt" - scope.write_text("allowed.example.com\n", encoding="utf-8") - - monkeypatch.setattr(main_module, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) - monkeypatch.setattr( - sys, - "argv", - ["activerecon", "--target", "example.com", "--scope", str(scope), "--dry-run"], - ) - - try: - main_module.main() - except SystemExit as e: - assert e.code == 2 - else: - raise AssertionError("Expected scope validation to reject target") - - -def test_main_rejects_missing_scope_file(monkeypatch, tmp_path): - missing_scope = tmp_path / "missing.txt" - - monkeypatch.setattr(main_module, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) - monkeypatch.setattr( - sys, - "argv", - ["activerecon", "--target", "example.com", "--scope", str(missing_scope), "--dry-run"], - ) - - try: - main_module.main() - except SystemExit as e: - assert e.code == 2 - else: - raise AssertionError("Expected missing scope file to reject target") +def test_main_module_is_thin_cli_wrapper(): + assert main_module.main is cli_module.main diff --git a/tests/test_output_paths.py b/tests/test_output_paths.py new file mode 100644 index 0000000..addd71f --- /dev/null +++ b/tests/test_output_paths.py @@ -0,0 +1,68 @@ +from datetime import datetime +from pathlib import Path + +from activerecon.output_paths import build_output_paths, build_report_path + + +def test_build_report_path_defaults_to_timestamped_reports_dir(): + report_path = build_report_path( + "https://example.com:443/path", + now=datetime(2026, 6, 17, 9, 8, 7), + ) + + assert report_path == str(Path("reports") / "https_example.com_443_path_20260617_090807.md") + + +def test_build_report_path_respects_explicit_output(): + report_path = build_report_path( + "example.com", + "custom.md", + now=datetime(2026, 6, 17, 9, 8, 7), + ) + + assert report_path == str(Path("reports") / "custom_20260617_090807.md") + + +def test_build_report_path_respects_explicit_output_directory(tmp_path): + report_path = build_report_path( + "example.com", + str(tmp_path / "custom.md"), + now=datetime(2026, 6, 17, 9, 8, 7), + ) + + assert report_path == str(tmp_path / "custom_20260617_090807.md") + + +def test_build_output_paths_defaults_to_markdown_and_json(): + markdown, json_output = build_output_paths( + "example.com", + output_format="both", + now=datetime(2026, 6, 17, 9, 8, 7), + ) + + assert markdown == str(Path("reports") / "example.com_20260617_090807.md") + assert json_output == str(Path("reports") / "example.com_20260617_090807.json") + + +def test_build_output_paths_json_only_uses_explicit_output(): + markdown, json_output = build_output_paths( + "example.com", + "custom.json", + "json", + now=datetime(2026, 6, 17, 9, 8, 7), + ) + + assert markdown is None + assert json_output == str(Path("reports") / "custom_20260617_090807.json") + + +def test_build_output_paths_bare_output_stays_in_reports_with_timestamp(): + markdown, json_output = build_output_paths( + "scanme.nmap.org", + "report.md", + "both", + now=datetime(2026, 6, 17, 10, 28, 5), + ) + + assert markdown == str(Path("reports") / "report_20260617_102805.md") + assert json_output == str(Path("reports") / "report_20260617_102805.json") diff --git a/tests/test_runner.py b/tests/test_runner.py new file mode 100644 index 0000000..85d64f7 --- /dev/null +++ b/tests/test_runner.py @@ -0,0 +1,190 @@ +from activerecon import runner +from activerecon.models import ReconOptions + + +def _fixed_output_paths(target, output, output_format): + markdown = "reports/report.md" if output_format in {"md", "both"} else None + json_output = "reports/report.json" if output_format in {"json", "both"} else None + return markdown, json_output + + +def test_runner_smoke_with_mocked_modules(monkeypatch): + captured = {} + + def fake_nmap(target, scan_command, config): + assert target == "example.com" + assert scan_command == "-Pn" + return { + "target": target, + "ports": [{"portid": "80", "protocol": "tcp", "state": "open", "service": "http"}], + "status": {"state": "up"}, + "scan_info": {}, + "host": "example.com", + } + + def fake_http(target, config, http_ports): + captured["http_ports"] = http_ports + return [{"url": "http://example.com:80", "status": 200, "headers": {}}] + + def fake_report(target, results, output_file): + captured["results"] = results + captured["markdown_output"] = output_file + + def fake_json_report(target, results, output_file, **kwargs): + captured["json_output"] = output_file + captured["json_kwargs"] = kwargs + + monkeypatch.setattr(runner, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) + monkeypatch.setattr(runner, "build_output_paths", _fixed_output_paths) + monkeypatch.setattr(runner, "run_nmap_scan", fake_nmap) + monkeypatch.setattr(runner, "analyze_http", fake_http) + monkeypatch.setattr(runner, "analyze_tls", lambda http_results, timeout: []) + monkeypatch.setattr(runner, "analyze_dns", lambda target: {"A": ["192.0.2.10"], "MX": [], "TXT": []}) + monkeypatch.setattr(runner, "generate_attention_findings", lambda results: []) + monkeypatch.setattr(runner, "generate_report", fake_report) + monkeypatch.setattr(runner, "generate_json_report", fake_json_report) + + result = runner.run_recon(ReconOptions(target="example.com", scan_profile="fast")) + + assert captured["http_ports"] == [{"portid": "80", "protocol": "tcp", "state": "open", "service": "http"}] + assert result.results["Nmap Scan"]["status"]["state"] == "up" + assert result.results["Interesting Signals"] == result.results["Attention"] + assert captured["markdown_output"] == "reports/report.md" + assert captured["json_output"] == "reports/report.json" + assert captured["json_kwargs"]["scan_profile"] == "fast" + + +def test_runner_handles_failed_nmap_without_http(monkeypatch): + captured = {} + + monkeypatch.setattr(runner, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) + monkeypatch.setattr(runner, "build_output_paths", _fixed_output_paths) + monkeypatch.setattr( + runner, + "run_nmap_scan", + lambda target, scan_command, config: {"target": target, "ports": [], "error": "nmap failed"}, + ) + monkeypatch.setattr( + runner, + "analyze_http", + lambda target, config, http_ports: (_ for _ in ()).throw(AssertionError()), + ) + monkeypatch.setattr(runner, "analyze_tls", lambda http_results, timeout: []) + monkeypatch.setattr(runner, "analyze_dns", lambda target: {"A": [], "MX": [], "TXT": []}) + monkeypatch.setattr(runner, "generate_attention_findings", lambda results: []) + monkeypatch.setattr( + runner, + "generate_report", + lambda target, results, output_file: captured.update(results=results), + ) + monkeypatch.setattr(runner, "generate_json_report", lambda target, results, output_file, **kwargs: None) + + result = runner.run_recon(ReconOptions(target="example.com", scan_profile="fast")) + + assert result.results["Nmap Scan"]["error"] == "nmap failed" + assert captured["results"]["HTTP Analysis"] == [] + assert captured["results"]["TLS Analysis"] == [] + + +def test_runner_dry_run_skips_scanning(monkeypatch): + monkeypatch.setattr(runner, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) + monkeypatch.setattr(runner, "build_output_paths", _fixed_output_paths) + monkeypatch.setattr( + runner, + "run_nmap_scan", + lambda target, scan_command, config: (_ for _ in ()).throw(AssertionError()), + ) + + result = runner.run_recon(ReconOptions(target="example.com", scan_profile="fast", dry_run=True)) + + assert result.dry_run + assert result.results == {} + + +def test_runner_skips_dns_for_ip_target(monkeypatch): + captured = {} + + monkeypatch.setattr(runner, "CONFIG", {"scan_profiles": {"fast": "-Pn"}, "http_timeout": 5}) + monkeypatch.setattr(runner, "build_output_paths", _fixed_output_paths) + monkeypatch.setattr( + runner, + "run_nmap_scan", + lambda target, scan_command, config: { + "target": target, + "ports": [{"portid": "3000", "protocol": "tcp", "state": "open", "service": "ppp"}], + "status": {"state": "up"}, + "scan_info": {}, + "host": target, + }, + ) + monkeypatch.setattr( + runner, + "analyze_dns", + lambda target: (_ for _ in ()).throw(AssertionError("DNS should be skipped for IP targets")), + ) + monkeypatch.setattr( + runner, + "analyze_http", + lambda target, config, http_ports: captured.setdefault("http_ports", http_ports) or [], + ) + monkeypatch.setattr(runner, "analyze_tls", lambda http_results, timeout: []) + monkeypatch.setattr(runner, "generate_attention_findings", lambda results: []) + monkeypatch.setattr(runner, "generate_report", lambda target, results, output_file: None) + monkeypatch.setattr(runner, "generate_json_report", lambda target, results, output_file, **kwargs: None) + + result = runner.run_recon(ReconOptions(target="127.0.0.1", scan_profile="fast")) + + assert captured["http_ports"] == [{"portid": "3000", "protocol": "tcp", "state": "open", "service": "ppp"}] + assert result.results["DNS Analysis"] == { + "skipped": True, + "reason": "DNS analysis skipped for IP address target", + "A": [], + "MX": [], + "TXT": [], + } + + +def test_runner_web_profile_runs_endpoint_discovery(monkeypatch): + captured = {} + + monkeypatch.setattr( + runner, + "CONFIG", + { + "scan_profiles": {"web": "-web"}, + "http_timeout": 5, + "web_recon": {"enabled_profiles": ["web"]}, + }, + ) + monkeypatch.setattr(runner, "build_output_paths", _fixed_output_paths) + monkeypatch.setattr( + runner, + "run_nmap_scan", + lambda target, scan_command, config: { + "target": target, + "ports": [{"portid": "3000", "protocol": "tcp", "state": "open", "service": "ppp"}], + "status": {"state": "up"}, + "scan_info": {}, + "host": target, + }, + ) + monkeypatch.setattr( + runner, + "analyze_http", + lambda target, config, http_ports: [{"url": "http://example.com:3000", "status": 200, "headers": {}}], + ) + monkeypatch.setattr(runner, "analyze_tls", lambda http_results, timeout: []) + def fake_endpoints(http_results, config): + captured["endpoint_http_results"] = http_results + return [{"base_url": "http://example.com:3000", "endpoints": [{"path": "/api"}]}] + + monkeypatch.setattr(runner, "discover_endpoints", fake_endpoints) + monkeypatch.setattr(runner, "analyze_dns", lambda target: {"A": [], "MX": [], "TXT": []}) + monkeypatch.setattr(runner, "generate_attention_findings", lambda results: []) + monkeypatch.setattr(runner, "generate_report", lambda target, results, output_file: None) + monkeypatch.setattr(runner, "generate_json_report", lambda target, results, output_file, **kwargs: None) + + result = runner.run_recon(ReconOptions(target="example.com", scan_profile="web")) + + assert captured["endpoint_http_results"] == [{"url": "http://example.com:3000", "status": 200, "headers": {}}] + assert result.results["Endpoint Discovery"][0]["endpoints"][0]["path"] == "/api" diff --git a/tests/test_scope_policy.py b/tests/test_scope_policy.py new file mode 100644 index 0000000..919f52c --- /dev/null +++ b/tests/test_scope_policy.py @@ -0,0 +1,22 @@ +from activerecon.policies.scope_policy import ScopePolicy + + +def test_scope_policy_allows_domain_subdomain_ip_and_cidr(): + policy = ScopePolicy(["example.com", "192.0.2.0/24", "198.51.100.10"]) + + assert policy.allows("example.com") + assert policy.allows("api.example.com") + assert policy.allows("192.0.2.5") + assert policy.allows("198.51.100.10") + assert not policy.allows("other.example.net") + assert not policy.allows("203.0.113.10") + + +def test_scope_policy_loads_scope_files_with_comments(tmp_path): + scope = tmp_path / "scope.txt" + scope.write_text("\n# comment\nallowed.example.com # inline\n", encoding="utf-8") + + policy = ScopePolicy.from_file(scope) + + assert policy.allows("allowed.example.com") + assert not policy.allows("example.com") diff --git a/tests/test_target_parser.py b/tests/test_target_parser.py new file mode 100644 index 0000000..40bb083 --- /dev/null +++ b/tests/test_target_parser.py @@ -0,0 +1,31 @@ +from activerecon.targets.parser import parse_target + + +def test_parse_target_splits_url_parts(): + target = parse_target("https://example.com:8443/admin") + + assert target.raw == "https://example.com:8443/admin" + assert target.host == "example.com" + assert target.scheme == "https" + assert target.port == 8443 + assert target.path == "/admin" + assert not target.is_ip + + +def test_parse_target_marks_ip_private_and_loopback(): + target = parse_target("127.0.0.1") + + assert target.host == "127.0.0.1" + assert target.is_ip + assert target.is_private + assert target.is_loopback + + +def test_parse_target_marks_localhost_as_loopback_foundation(): + target = parse_target("localhost:3000") + + assert target.host == "localhost" + assert target.port == 3000 + assert not target.is_ip + assert target.is_private + assert target.is_loopback diff --git a/tests/test_workflows.py b/tests/test_workflows.py new file mode 100644 index 0000000..165813f --- /dev/null +++ b/tests/test_workflows.py @@ -0,0 +1,7 @@ +from activerecon.workflows import is_http_service + + +def test_is_http_service_detects_open_dev_port(): + assert is_http_service({"portid": "3000", "state": "open", "service": "ppp"}) + assert not is_http_service({"portid": "3000", "state": "closed", "service": "ppp"}) + assert not is_http_service({"portid": "8080", "state": "filtered", "service": "http"})