diff --git a/README.md b/README.md index 9eaaf84..6f0bdca 100644 --- a/README.md +++ b/README.md @@ -316,6 +316,12 @@ The JSON report uses a simple stable wrapper: "schema_version": "1.0", "generated_at": "2026-06-17T09:08:07Z", "target": "example.com", + "metadata": { + "tool": "ActiveRecon", + "scan_profile": "web", + "authorized_use_notice": true + }, + "summary": {}, "results": {} } ``` @@ -328,11 +334,12 @@ HTTP Analysis TLS Analysis DNS Analysis Attention +Interesting Signals ``` -Markdown reports use the heading `Interesting Signals`. JSON output keeps the `Attention` key for compatibility. +Markdown reports use the heading `Interesting Signals`. JSON output keeps the `Attention` key for compatibility and also includes `Interesting Signals` as an alias. -When the `web` profile is used, reports also include `Endpoint Discovery`. +When the `web` profile is used, reports also include `Endpoint Discovery` with the original endpoint list plus summary counts and categorized endpoint groups. --- diff --git a/activerecon/main.py b/activerecon/main.py index 6ae571b..85f36ac 100644 --- a/activerecon/main.py +++ b/activerecon/main.py @@ -245,7 +245,9 @@ def main(): logging.error(f"Error during DNS analysis: {e}") results["DNS Analysis"] = {"error": f"DNS analysis failed: {e}"} - results["Attention"] = generate_attention_findings(results) + interesting_signals = generate_attention_findings(results) + results["Attention"] = interesting_signals + results["Interesting Signals"] = interesting_signals if markdown_output: try: @@ -256,7 +258,7 @@ def main(): if json_output: try: - generate_json_report(target, results, json_output) + generate_json_report(target, results, json_output, scan_profile=chosen_profile) logging.info(f"JSON report saved to {json_output}") except Exception as e: logging.error(f"Error during JSON report generation: {e}") diff --git a/activerecon/modules/http_enum.py b/activerecon/modules/http_enum.py index a659b44..cd09272 100644 --- a/activerecon/modules/http_enum.py +++ b/activerecon/modules/http_enum.py @@ -32,6 +32,12 @@ def _security_headers(headers): return present, missing +def _filter_missing_headers_for_url(missing_headers, url): + if str(url or "").lower().startswith("https://"): + return missing_headers + return [header for header in missing_headers if header != "strict-transport-security"] + + def _technology_hints(headers): hints = [] server = headers.get("Server") or headers.get("server") @@ -81,9 +87,11 @@ def analyze_http(target, config, http_ports): response = requests.get(url, timeout=timeout) headers = dict(response.headers) present_headers, missing_headers = _security_headers(headers) + final_url = getattr(response, "url", url) + missing_headers = _filter_missing_headers_for_url(missing_headers, final_url) results.append({ "url": url, - "final_url": getattr(response, "url", url), + "final_url": final_url, "port": portid, "service": service or scheme, "status": response.status_code, diff --git a/activerecon/modules/json_report.py b/activerecon/modules/json_report.py index 5f8ca00..5007b85 100644 --- a/activerecon/modules/json_report.py +++ b/activerecon/modules/json_report.py @@ -1,24 +1,239 @@ +import copy +import ipaddress import json from datetime import datetime -from pathlib import Path +from pathlib import Path, PurePosixPath SCHEMA_VERSION = "1.0" +TOOL_NAME = "ActiveRecon" +AUTHORIZED_USE_NOTICE = True +HSTS_HEADER = "strict-transport-security" +SCAN_CONTEXT_NOTE = ( + "Target appears to be local or private. Results may include local system, " + "development, Docker, virtualization, or lab services." +) +STATIC_ASSET_EXTENSIONS = { + ".css", + ".eot", + ".gif", + ".ico", + ".jpeg", + ".jpg", + ".js", + ".map", + ".png", + ".svg", + ".ttf", + ".webp", + ".woff", + ".woff2", +} +WELL_KNOWN_PATHS = { + "/robots.txt", + "/sitemap.xml", + "/.well-known/security.txt", + "/swagger", + "/api-docs", + "/ftp", +} +CATEGORY_KEYS = ("api_like", "frontend_routes", "static_assets", "well_known", "header_discovered") -def build_json_payload(target, results, generated_at=None): +def _as_list(value): + return value if isinstance(value, list) else [] + + +def _nmap_results(results): + return results.get("Nmap Scan", results) if isinstance(results, dict) else {} + + +def _is_https_http_item(item): + url = str(item.get("final_url") or item.get("url") or "").lower() + return url.startswith("https://") + + +def _filter_http_security_headers(results): + http_results = results.get("HTTP Analysis", []) + if not isinstance(http_results, list): + return + + for item in http_results: + if not isinstance(item, dict) or _is_https_http_item(item): + continue + missing_headers = item.get("missing_security_headers", []) + if isinstance(missing_headers, list): + item["missing_security_headers"] = [ + header + for header in missing_headers + if str(header).lower() != HSTS_HEADER + ] + + +def _dns_record_count(results): + dns_results = results.get("DNS Analysis", {}) + if not isinstance(dns_results, dict): + return 0 + return sum( + len(records) + for record_type, records in dns_results.items() + if record_type != "errors" and isinstance(records, list) + ) + + +def _endpoint_groups(results): + endpoint_results = results.get("Endpoint Discovery", []) + return endpoint_results if isinstance(endpoint_results, list) else [] + + +def _endpoint_count(results): + total = 0 + for group in _endpoint_groups(results): + if isinstance(group, dict): + total += len(_as_list(group.get("endpoints", []))) + return total + + +def build_json_summary(results): + nmap_results = _nmap_results(results) + ports = _as_list(nmap_results.get("ports", [])) + http_results = _as_list(results.get("HTTP Analysis", [])) + tls_results = _as_list(results.get("TLS Analysis", [])) + signals = _as_list(results.get("Attention", results.get("Interesting Signals", []))) + + return { + "host_status": nmap_results.get("status", {}).get("state", "Unknown"), + "total_ports_listed": len(ports), + "open_ports": len([ + port + for port in ports + if isinstance(port, dict) and port.get("state") == "open" + ]), + "http_services": len(http_results), + "tls_results": len(tls_results), + "dns_records": _dns_record_count(results), + "interesting_signals": len(signals), + "endpoint_count": _endpoint_count(results), + } + + +def _looks_local_or_private(value): + text = str(value or "").strip() + if text.lower() == "localhost": + return True + try: + address = ipaddress.ip_address(text) + return address.is_loopback or address.is_private + except ValueError: + return False + + +def _scan_context(target, results): + nmap_results = _nmap_results(results) + if _looks_local_or_private(target) or _looks_local_or_private(nmap_results.get("host")): + return SCAN_CONTEXT_NOTE + return None + + +def build_json_metadata(target, results, scan_profile=None, scan_context=None): + metadata = { + "tool": TOOL_NAME, + "authorized_use_notice": AUTHORIZED_USE_NOTICE, + } + if scan_profile: + metadata["scan_profile"] = scan_profile + + context = scan_context or _scan_context(target, results) + if context: + metadata["scan_context"] = context + return metadata + + +def _path_without_query(path): + return str(path or "/").split("?", 1)[0].split("#", 1)[0] + + +def _is_api_like(path): + lower_path = str(path or "").lower() + return lower_path == "/api" or lower_path == "/rest" or lower_path.startswith("/api/") or lower_path.startswith("/rest/") + + +def _is_static_asset(path): + clean_path = _path_without_query(path).lower() + filename = PurePosixPath(clean_path).name + return PurePosixPath(clean_path).suffix in STATIC_ASSET_EXTENSIONS or "chunk" in filename + + +def _primary_endpoint_category(endpoint): + path = endpoint.get("path", "") + lower_path = _path_without_query(path).lower() + if _is_static_asset(path): + return "static_assets" + if _is_api_like(path): + return "api_like" + if lower_path in WELL_KNOWN_PATHS: + return "well_known" + return "frontend_routes" + + +def _endpoint_categories(endpoints): + categories = {key: [] for key in CATEGORY_KEYS} + for endpoint in endpoints: + if not isinstance(endpoint, dict): + continue + categories[_primary_endpoint_category(endpoint)].append(endpoint) + if str(endpoint.get("source", "")).startswith("response-header"): + categories["header_discovered"].append(endpoint) + + summary = {"endpoint_count": len(endpoints)} + for key in CATEGORY_KEYS: + summary[key] = len(categories[key]) + return summary, categories + + +def _enrich_endpoint_discovery(results): + for group in _endpoint_groups(results): + if not isinstance(group, dict): + continue + endpoints = _as_list(group.get("endpoints", [])) + summary, categories = _endpoint_categories(endpoints) + group["summary"] = summary + group["categories"] = categories + + +def normalize_results_for_json(results): + normalized = copy.deepcopy(results if isinstance(results, dict) else {}) + signals = normalized.get("Attention", normalized.get("Interesting Signals", [])) + if not isinstance(signals, list): + signals = [] + normalized["Attention"] = signals + normalized["Interesting Signals"] = signals + _filter_http_security_headers(normalized) + _enrich_endpoint_discovery(normalized) + return normalized + + +def build_json_payload(target, results, generated_at=None, scan_profile=None, scan_context=None): + normalized_results = normalize_results_for_json(results) return { "schema_version": SCHEMA_VERSION, "generated_at": generated_at or datetime.utcnow().replace(microsecond=0).isoformat() + "Z", "target": target, - "results": results, + "metadata": build_json_metadata(target, normalized_results, scan_profile, scan_context), + "summary": build_json_summary(normalized_results), + "results": normalized_results, } -def generate_json_report(target, results, output_file, generated_at=None): +def generate_json_report(target, results, output_file, generated_at=None, scan_profile=None, scan_context=None): output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", encoding="utf-8") as f: - json.dump(build_json_payload(target, results, generated_at), f, indent=2, sort_keys=True) + json.dump( + build_json_payload(target, results, generated_at, scan_profile, scan_context), + f, + indent=2, + sort_keys=True, + ) f.write("\n") diff --git a/tests/test_http_enum.py b/tests/test_http_enum.py index 30f624f..7ed3276 100644 --- a/tests/test_http_enum.py +++ b/tests/test_http_enum.py @@ -58,3 +58,39 @@ def fake_get(url, timeout): assert results[0]["url"] == "http://example.com:8080" assert "timed out" in results[0]["error"] + + +def test_analyze_http_filters_hsts_missing_header_for_plain_http(monkeypatch): + class PlainResponse: + status_code = 200 + url = "http://example.com:80" + history = [] + headers = {"Content-Type": "text/html"} + text = "