Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,22 @@ pip install -r requirements.txt

## Usage

### Direct scan with anon key (Recommended)

If you already have the Supabase anon key and URL, use direct scan mode:

```bash
python supabase-exposure-check.py \
--anon-key "the_anon_key" \
--supabase-url "https://your-project.supabase.co"
```

This mode skips JavaScript scanning and directly enumerates tables using the provided credentials.

### Scan a single website

Automatically detect Supabase credentials from JavaScript files:

```bash
python supabase-exposure-check.py --url https://example.com
```
Expand Down Expand Up @@ -54,9 +68,13 @@ Output is saved to the `output/` directory (by default), organized by domain, wi

## Command-line Options

- `--url`: Single website URL to scan
- `--file`, `-f`: File containing list of URLs to scan (one per line)
- `--output`, `-o`: Output directory (default: `output`)
| Option | Description |
|--------|-------------|
| `--anon-key` | Supabase anon key (JWT) for direct scan |
| `--supabase-url` | Supabase URL for direct scan (e.g., `https://xxx.supabase.co`) |
| `--url` | Single website URL to scan |
| `--file`, `-f` | File containing list of URLs to scan (one per line) |
| `--output`, `-o` | Output directory (default: `output`) |

## Example Output

Expand Down
203 changes: 188 additions & 15 deletions supabase-exposure-check.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@

# ================== HTTP HELPERS ==================

def safe_get(url, headers=None, **kwargs):
def safe_get(url, headers=None, verbose=True, **kwargs):
final_headers = COMMON_HEADERS.copy()
if headers:
final_headers.update(headers)
Expand All @@ -128,8 +128,9 @@ def safe_get(url, headers=None, **kwargs):
timeout=TIMEOUT,
**kwargs
)
except requests.exceptions.SSLError:
print(f" [!] SSL error → retrying insecure: {url}")
except requests.exceptions.SSLError as e:
if verbose:
print(f" [!] SSL error → retrying insecure: {url}")
try:
return requests.get(
url,
Expand All @@ -138,18 +139,28 @@ def safe_get(url, headers=None, **kwargs):
verify=False,
**kwargs
)
except Exception as e:
print(f" [-] Failed (SSL bypass): {e}")
except Exception as e2:
if verbose:
print(f" [-] Failed (SSL bypass): {e2}")
return None
except requests.exceptions.ConnectionError as e:
if verbose:
print(f" [-] Connection error: {e}")
return None
except requests.exceptions.Timeout as e:
if verbose:
print(f" [-] Timeout: {e}")
return None
except requests.exceptions.RequestException as e:
print(f" [-] Request failed: {e}")
if verbose:
print(f" [-] Request failed: {e}")
return None

# ================== JS DISCOVERY ==================

def get_js_files(site_url):
r = safe_get(site_url)
if not r:
if r is None or r.status_code != 200:
return []

soup = BeautifulSoup(r.text, "html.parser")
Expand Down Expand Up @@ -195,7 +206,7 @@ def extract_supabase_urls(content):

def scan_js(js_url):
r = safe_get(js_url)
if not r:
if r is None or r.status_code != 200:
return [], []

content = r.text
Expand Down Expand Up @@ -343,13 +354,43 @@ def analyze_table_for_sensitive_data(rows, max_samples=100):
# ================== SUPABASE ENUM / DUMP ==================

def get_tables(base_url, headers):
r = safe_get(f"{base_url}/rest/v1/", headers=headers)
if not r or r.status_code != 200:
raise Exception("Cannot enumerate tables")
url = f"{base_url}/rest/v1/"
r = safe_get(url, headers=headers)

if r is None:
raise Exception(f"Cannot connect to {url}")

# Handle authentication errors
if r.status_code == 401:
try:
msg = r.json().get("message", "Unauthorized")
except:
msg = "Unauthorized"
raise Exception(f"Authentication failed: {msg}")

if r.status_code != 200:
try:
msg = r.json().get("message", r.text[:100])
except:
msg = r.text[:100]
raise Exception(f"HTTP {r.status_code}: {msg}")

# Check if response is JSON
content_type = r.headers.get('Content-Type', '')
if 'application/json' not in content_type and 'application/openapi+json' not in content_type:
# Show first 200 chars of response for debugging
preview = r.text[:200].strip()
raise Exception(f"Invalid Supabase URL (got {content_type}). Response: {preview}...")

try:
data = r.json()
except Exception as e:
preview = r.text[:200].strip()
raise Exception(f"Invalid JSON response. Response: {preview}...")

return [
p.strip("/")
for p in r.json().get("paths", {})
for p in data.get("paths", {})
if not p.startswith("/rpc") and p != "/"
]

Expand All @@ -361,8 +402,8 @@ def dump_table(base_url, table, headers):
url = f"{base_url}/rest/v1/{table}?limit={PAGE_SIZE}&offset={offset}"
r = safe_get(url, headers=headers)

if not r or r.status_code != 200:
return None, r.status_code if r else "ERR"
if r is None or r.status_code != 200:
return None, r.status_code if r is not None else "ERR"

chunk = r.json()
rows.extend(chunk)
Expand All @@ -374,6 +415,119 @@ def dump_table(base_url, table, headers):

return rows, 200

# ================== DIRECT SCAN (with provided anon key & url) ==================

def scan_direct(supabase_url, anon_key, label=None):
"""Scan Supabase directly with provided anon key and URL."""
# Use domain from supabase_url or label for output directory
if label:
domain = label
else:
domain = urlparse(supabase_url).netloc.replace("www.", "")

site_dir = os.path.join(OUTPUT_DIR, domain)
tables_dir = os.path.join(site_dir, "tables")
os.makedirs(tables_dir, exist_ok=True)

print(f"\n🔑 Direct scan: {supabase_url}")

findings = {
"supabase_url": supabase_url,
"mode": "direct",
"vulnerable": True,
"supabase_urls": [supabase_url],
"jwts": [anon_key]
}

supabase_headers = {
"apikey": anon_key,
"Authorization": f"Bearer {anon_key}"
}

summary = []

try:
tables = get_tables(supabase_url, supabase_headers)
print(f" [+] Found {len(tables)} tables")

for table in tables:
rows, status = dump_table(supabase_url, table, supabase_headers)

if status == 200:
path = os.path.join(tables_dir, f"{table}.json")
with open(path, "w") as f:
json.dump(rows, f, indent=2)

# Analyze for sensitive data
analysis = analyze_table_for_sensitive_data(rows)

is_vulnerable = analysis["has_sensitive_data"]
vuln_level = analysis["vulnerability_level"]
sensitive_fields = analysis["sensitive_fields"]

if is_vulnerable:
print(f" 🚨 {table}: {len(rows)} rows - VULNERABLE ({vuln_level}) - Sensitive fields: {', '.join(sensitive_fields)}")
else:
print(f" [+] {table}: {len(rows)} rows - Public data (no sensitive fields detected)")

summary.append({
"table": table,
"rows": len(rows),
"dumped": True,
"vulnerable": is_vulnerable,
"vulnerability_level": vuln_level,
"sensitive_fields": sensitive_fields,
"analysis": analysis
})
else:
print(f" [-] {table}: blocked")
summary.append({
"table": table,
"dumped": False,
"status": status,
"vulnerable": False
})

except Exception as e:
print(f" [-] Supabase error: {e}")

# Calculate overall vulnerability assessment
vulnerable_tables = [s for s in summary if s.get("vulnerable", False)]
critical_tables = [s for s in vulnerable_tables if s.get("vulnerability_level") == "critical"]
high_tables = [s for s in vulnerable_tables if s.get("vulnerability_level") == "high"]
medium_tables = [s for s in vulnerable_tables if s.get("vulnerability_level") == "medium"]

if vulnerable_tables:
print(f"\n ⚠️ VULNERABILITY SUMMARY:")
print(f" - Critical: {len(critical_tables)} table(s)")
print(f" - High: {len(high_tables)} table(s)")
print(f" - Medium: {len(medium_tables)} table(s)")
print(f" - Total vulnerable: {len(vulnerable_tables)}/{len([s for s in summary if s.get('dumped')])} accessible tables")

if critical_tables:
print(f"\n Critical tables:")
for t in critical_tables:
print(f" • {t['table']} - Fields: {', '.join(t.get('sensitive_fields', []))}")

findings["vulnerability_summary"] = {
"total_tables_accessible": len([s for s in summary if s.get("dumped")]),
"vulnerable_tables_count": len(vulnerable_tables),
"critical_count": len(critical_tables),
"high_count": len(high_tables),
"medium_count": len(medium_tables),
"vulnerable_tables": [
{
"table": t["table"],
"level": t.get("vulnerability_level"),
"sensitive_fields": t.get("sensitive_fields", [])
}
for t in vulnerable_tables
]
}

write_json(site_dir, "summary.json", summary)
write_json(site_dir, "findings.json", findings)

# ================== SITE SCANNER ==================

def scan_site(site_url):
Expand Down Expand Up @@ -529,6 +683,14 @@ def parse_args():
default=OUTPUT_DIR,
help=f"Output directory (default: {OUTPUT_DIR})"
)
parser.add_argument(
"--anon-key",
help="Supabase anon key (JWT) to use directly"
)
parser.add_argument(
"--supabase-url",
help="Supabase URL to scan directly (e.g., https://xxx.supabase.co)"
)
return parser.parse_args()

def get_sites_from_file(file_path):
Expand All @@ -547,6 +709,17 @@ def main():
OUTPUT_DIR = args.output
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Direct scan mode: user provides anon key and supabase URL
if args.anon_key and args.supabase_url:
print("[*] Direct scan mode")
scan_direct(args.supabase_url.rstrip('/'), args.anon_key)
return

# If only one of them is provided, show error
if args.anon_key or args.supabase_url:
print("[-] Both --anon-key and --supabase-url are required for direct scan mode.")
return

sites = []

# Determine which sites to scan
Expand All @@ -567,7 +740,7 @@ def main():
return
else:
print("[-] No input provided. Use --url to scan a single site or --file to scan from a file.")
print(" Alternatively, create a sites.txt file with URLs (one per line).")
print(" Or use --anon-key and --supabase-url for direct scan mode.")
return

print(f"[*] Scanning {len(sites)} site(s)\n")
Expand Down