diff --git a/README.md b/README.md index 5851de1..29276ea 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,22 @@ pip install -r requirements.txt ## Usage +### Direct scan with anon key (Recommended) + +If you already have the Supabase anon key and URL, use direct scan mode: + +```bash +python supabase-exposure-check.py \ + --anon-key "the_anon_key" \ + --supabase-url "https://your-project.supabase.co" +``` + +This mode skips JavaScript scanning and directly enumerates tables using the provided credentials. + ### Scan a single website +Automatically detect Supabase credentials from JavaScript files: + ```bash python supabase-exposure-check.py --url https://example.com ``` @@ -54,9 +68,13 @@ Output is saved to the `output/` directory (by default), organized by domain, wi ## Command-line Options -- `--url`: Single website URL to scan -- `--file`, `-f`: File containing list of URLs to scan (one per line) -- `--output`, `-o`: Output directory (default: `output`) +| Option | Description | +|--------|-------------| +| `--anon-key` | Supabase anon key (JWT) for direct scan | +| `--supabase-url` | Supabase URL for direct scan (e.g., `https://xxx.supabase.co`) | +| `--url` | Single website URL to scan | +| `--file`, `-f` | File containing list of URLs to scan (one per line) | +| `--output`, `-o` | Output directory (default: `output`) | ## Example Output diff --git a/supabase-exposure-check.py b/supabase-exposure-check.py index 81a4934..7bc468d 100644 --- a/supabase-exposure-check.py +++ b/supabase-exposure-check.py @@ -116,7 +116,7 @@ # ================== HTTP HELPERS ================== -def safe_get(url, headers=None, **kwargs): +def safe_get(url, headers=None, verbose=True, **kwargs): final_headers = COMMON_HEADERS.copy() if headers: final_headers.update(headers) @@ -128,8 +128,9 @@ def safe_get(url, headers=None, **kwargs): timeout=TIMEOUT, **kwargs ) - except requests.exceptions.SSLError: - print(f" [!] SSL error → retrying insecure: {url}") + except requests.exceptions.SSLError as e: + if verbose: + print(f" [!] SSL error → retrying insecure: {url}") try: return requests.get( url, @@ -138,18 +139,28 @@ def safe_get(url, headers=None, **kwargs): verify=False, **kwargs ) - except Exception as e: - print(f" [-] Failed (SSL bypass): {e}") + except Exception as e2: + if verbose: + print(f" [-] Failed (SSL bypass): {e2}") return None + except requests.exceptions.ConnectionError as e: + if verbose: + print(f" [-] Connection error: {e}") + return None + except requests.exceptions.Timeout as e: + if verbose: + print(f" [-] Timeout: {e}") + return None except requests.exceptions.RequestException as e: - print(f" [-] Request failed: {e}") + if verbose: + print(f" [-] Request failed: {e}") return None # ================== JS DISCOVERY ================== def get_js_files(site_url): r = safe_get(site_url) - if not r: + if r is None or r.status_code != 200: return [] soup = BeautifulSoup(r.text, "html.parser") @@ -195,7 +206,7 @@ def extract_supabase_urls(content): def scan_js(js_url): r = safe_get(js_url) - if not r: + if r is None or r.status_code != 200: return [], [] content = r.text @@ -343,13 +354,43 @@ def analyze_table_for_sensitive_data(rows, max_samples=100): # ================== SUPABASE ENUM / DUMP ================== def get_tables(base_url, headers): - r = safe_get(f"{base_url}/rest/v1/", headers=headers) - if not r or r.status_code != 200: - raise Exception("Cannot enumerate tables") + url = f"{base_url}/rest/v1/" + r = safe_get(url, headers=headers) + + if r is None: + raise Exception(f"Cannot connect to {url}") + + # Handle authentication errors + if r.status_code == 401: + try: + msg = r.json().get("message", "Unauthorized") + except: + msg = "Unauthorized" + raise Exception(f"Authentication failed: {msg}") + + if r.status_code != 200: + try: + msg = r.json().get("message", r.text[:100]) + except: + msg = r.text[:100] + raise Exception(f"HTTP {r.status_code}: {msg}") + + # Check if response is JSON + content_type = r.headers.get('Content-Type', '') + if 'application/json' not in content_type and 'application/openapi+json' not in content_type: + # Show first 200 chars of response for debugging + preview = r.text[:200].strip() + raise Exception(f"Invalid Supabase URL (got {content_type}). Response: {preview}...") + + try: + data = r.json() + except Exception as e: + preview = r.text[:200].strip() + raise Exception(f"Invalid JSON response. Response: {preview}...") return [ p.strip("/") - for p in r.json().get("paths", {}) + for p in data.get("paths", {}) if not p.startswith("/rpc") and p != "/" ] @@ -361,8 +402,8 @@ def dump_table(base_url, table, headers): url = f"{base_url}/rest/v1/{table}?limit={PAGE_SIZE}&offset={offset}" r = safe_get(url, headers=headers) - if not r or r.status_code != 200: - return None, r.status_code if r else "ERR" + if r is None or r.status_code != 200: + return None, r.status_code if r is not None else "ERR" chunk = r.json() rows.extend(chunk) @@ -374,6 +415,119 @@ def dump_table(base_url, table, headers): return rows, 200 +# ================== DIRECT SCAN (with provided anon key & url) ================== + +def scan_direct(supabase_url, anon_key, label=None): + """Scan Supabase directly with provided anon key and URL.""" + # Use domain from supabase_url or label for output directory + if label: + domain = label + else: + domain = urlparse(supabase_url).netloc.replace("www.", "") + + site_dir = os.path.join(OUTPUT_DIR, domain) + tables_dir = os.path.join(site_dir, "tables") + os.makedirs(tables_dir, exist_ok=True) + + print(f"\nšŸ”‘ Direct scan: {supabase_url}") + + findings = { + "supabase_url": supabase_url, + "mode": "direct", + "vulnerable": True, + "supabase_urls": [supabase_url], + "jwts": [anon_key] + } + + supabase_headers = { + "apikey": anon_key, + "Authorization": f"Bearer {anon_key}" + } + + summary = [] + + try: + tables = get_tables(supabase_url, supabase_headers) + print(f" [+] Found {len(tables)} tables") + + for table in tables: + rows, status = dump_table(supabase_url, table, supabase_headers) + + if status == 200: + path = os.path.join(tables_dir, f"{table}.json") + with open(path, "w") as f: + json.dump(rows, f, indent=2) + + # Analyze for sensitive data + analysis = analyze_table_for_sensitive_data(rows) + + is_vulnerable = analysis["has_sensitive_data"] + vuln_level = analysis["vulnerability_level"] + sensitive_fields = analysis["sensitive_fields"] + + if is_vulnerable: + print(f" 🚨 {table}: {len(rows)} rows - VULNERABLE ({vuln_level}) - Sensitive fields: {', '.join(sensitive_fields)}") + else: + print(f" [+] {table}: {len(rows)} rows - Public data (no sensitive fields detected)") + + summary.append({ + "table": table, + "rows": len(rows), + "dumped": True, + "vulnerable": is_vulnerable, + "vulnerability_level": vuln_level, + "sensitive_fields": sensitive_fields, + "analysis": analysis + }) + else: + print(f" [-] {table}: blocked") + summary.append({ + "table": table, + "dumped": False, + "status": status, + "vulnerable": False + }) + + except Exception as e: + print(f" [-] Supabase error: {e}") + + # Calculate overall vulnerability assessment + vulnerable_tables = [s for s in summary if s.get("vulnerable", False)] + critical_tables = [s for s in vulnerable_tables if s.get("vulnerability_level") == "critical"] + high_tables = [s for s in vulnerable_tables if s.get("vulnerability_level") == "high"] + medium_tables = [s for s in vulnerable_tables if s.get("vulnerability_level") == "medium"] + + if vulnerable_tables: + print(f"\n āš ļø VULNERABILITY SUMMARY:") + print(f" - Critical: {len(critical_tables)} table(s)") + print(f" - High: {len(high_tables)} table(s)") + print(f" - Medium: {len(medium_tables)} table(s)") + print(f" - Total vulnerable: {len(vulnerable_tables)}/{len([s for s in summary if s.get('dumped')])} accessible tables") + + if critical_tables: + print(f"\n Critical tables:") + for t in critical_tables: + print(f" • {t['table']} - Fields: {', '.join(t.get('sensitive_fields', []))}") + + findings["vulnerability_summary"] = { + "total_tables_accessible": len([s for s in summary if s.get("dumped")]), + "vulnerable_tables_count": len(vulnerable_tables), + "critical_count": len(critical_tables), + "high_count": len(high_tables), + "medium_count": len(medium_tables), + "vulnerable_tables": [ + { + "table": t["table"], + "level": t.get("vulnerability_level"), + "sensitive_fields": t.get("sensitive_fields", []) + } + for t in vulnerable_tables + ] + } + + write_json(site_dir, "summary.json", summary) + write_json(site_dir, "findings.json", findings) + # ================== SITE SCANNER ================== def scan_site(site_url): @@ -529,6 +683,14 @@ def parse_args(): default=OUTPUT_DIR, help=f"Output directory (default: {OUTPUT_DIR})" ) + parser.add_argument( + "--anon-key", + help="Supabase anon key (JWT) to use directly" + ) + parser.add_argument( + "--supabase-url", + help="Supabase URL to scan directly (e.g., https://xxx.supabase.co)" + ) return parser.parse_args() def get_sites_from_file(file_path): @@ -547,6 +709,17 @@ def main(): OUTPUT_DIR = args.output os.makedirs(OUTPUT_DIR, exist_ok=True) + # Direct scan mode: user provides anon key and supabase URL + if args.anon_key and args.supabase_url: + print("[*] Direct scan mode") + scan_direct(args.supabase_url.rstrip('/'), args.anon_key) + return + + # If only one of them is provided, show error + if args.anon_key or args.supabase_url: + print("[-] Both --anon-key and --supabase-url are required for direct scan mode.") + return + sites = [] # Determine which sites to scan @@ -567,7 +740,7 @@ def main(): return else: print("[-] No input provided. Use --url to scan a single site or --file to scan from a file.") - print(" Alternatively, create a sites.txt file with URLs (one per line).") + print(" Or use --anon-key and --supabase-url for direct scan mode.") return print(f"[*] Scanning {len(sites)} site(s)\n")