Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions archize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import json
import time
from datetime import datetime

import requests

# ── CONFIG ────────────────────────────────────────────────────────────────────
URLS = [
"https://github.com/aboutcode-org/vulnerablecode/issues/17",
]

DELAY_SECONDS = 5 # pause between requests to avoid rate-limiting
LOG_FILE = "archive_log.json"
# ─────────────────────────────────────────────────────────────────────────────

SPN_ENDPOINT = "https://web.archive.org/save/"


def save_url(url: str) -> dict:
"""Submit a single URL to the Wayback Machine."""
try:
response = requests.post(
SPN_ENDPOINT,
data={"url": url},
headers={"User-Agent": "ArchiveBot/1.0"},
timeout=30,
)

if response.status_code == 200:
# Archive.org returns the archived URL in the Content-Location header
location = response.headers.get("Content-Location", "")
archived_url = f"https://web.archive.org{location}" if location else "check manually"
return {"url": url, "status": "success", "archived_url": archived_url}

else:
return {
"url": url,
"status": "failed",
"http_code": response.status_code,
"reason": response.text[:200],
}

except requests.exceptions.Timeout:
return {"url": url, "status": "error", "reason": "Request timed out"}
except requests.exceptions.RequestException as e:
return {"url": url, "status": "error", "reason": str(e)}


def archive_all(urls: list[str]) -> list[dict]:
results = []
total = len(urls)

print(f"Starting archive of {total} URL(s)...\n")

for i, url in enumerate(urls, start=1):
print(f"[{i}/{total}] Submitting: {url}")
result = save_url(url)
result["timestamp"] = datetime.utcnow().isoformat()
results.append(result)

if result["status"] == "success":
print(f" ✓ Archived → {result['archived_url']}")
else:
print(f" ✗ {result.get('reason') or result.get('http_code')}")

if i < total:
time.sleep(DELAY_SECONDS)

return results


def save_log(results: list[dict], path: str) -> None:
with open(path, "w") as f:
json.dump(results, f, indent=2)
print(f"\nLog saved to {path}")


def print_summary(results: list[dict]) -> None:
success = sum(1 for r in results if r["status"] == "success")
failed = len(results) - success
print(f"\n── Summary ──────────────────────")
print(f" Total : {len(results)}")
print(f" Success : {success}")
print(f" Failed : {failed}")
print(f"─────────────────────────────────")


if __name__ == "__main__":
results = archive_all(URLS)
print_summary(results)
save_log(results, LOG_FILE)
2 changes: 2 additions & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from vulnerabilities.pipelines import flag_ghost_packages
from vulnerabilities.pipelines import populate_vulnerability_summary_pipeline
from vulnerabilities.pipelines import remove_duplicate_advisories
from vulnerabilities.pipelines.v2_improvers import archive_urls
from vulnerabilities.pipelines.v2_improvers import collect_ssvc_trees
from vulnerabilities.pipelines.v2_improvers import compute_advisory_content_hash
from vulnerabilities.pipelines.v2_improvers import compute_advisory_todo as compute_advisory_todo_v2
Expand Down Expand Up @@ -76,5 +77,6 @@
collect_ssvc_trees.CollectSSVCPipeline,
relate_severities.RelateSeveritiesPipeline,
compute_advisory_content_hash.ComputeAdvisoryContentHash,
archive_urls.ArchiveImproverPipeline,
]
)
20 changes: 20 additions & 0 deletions vulnerabilities/migrations/0117_advisoryreference_archive_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 5.2.11 on 2026-03-31 11:41

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0116_advisoryv2_advisory_content_hash"),
]

operations = [
migrations.AddField(
model_name="advisoryreference",
name="archive_url",
field=models.URLField(
help_text="URL to the backup vulnerability reference", max_length=1024, null=True
),
),
]
6 changes: 6 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2672,6 +2672,12 @@ class AdvisoryReference(models.Model):
help_text="URL to the vulnerability reference",
)

archive_url = models.URLField(
max_length=1024,
null=True,
help_text="URL to the backup vulnerability reference",
)

ADVISORY = "advisory"
EXPLOIT = "exploit"
COMMIT = "commit"
Expand Down
63 changes: 63 additions & 0 deletions vulnerabilities/pipelines/v2_improvers/archive_urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import time

import requests

from vulnerabilities.models import AdvisoryReference
from vulnerabilities.pipelines import VulnerableCodePipeline


class ArchiveImproverPipeline(VulnerableCodePipeline):
"""
Archive Improver Pipeline
"""

pipeline_id = "archive_improver_pipeline"

@classmethod
def steps(cls):
return (cls.archive_urls,)

def archive_urls(self):
"""Get and stores archive URLs for AdvisoryReferences, flagging missing ones as NO_ARCHIVE"""
advisory_refs = (
AdvisoryReference.objects.filter(archive_url__isnull=True)
.exclude(archive_url="NO_ARCHIVE")
.only("id", "url")
)

for advisory_ref in advisory_refs:
url = advisory_ref.url
if not url or not url.startswith("http"):
continue

archive_url = self.get_archival(url)
if not archive_url:
AdvisoryReference.objects.filter(id=advisory_ref.id).update(
archive_url="NO_ARCHIVE"
)
self.log(f"URL unreachable or returned no archive url: {url}")
continue
self.log(f"Found Archived Reference URL: {archive_url}")
AdvisoryReference.objects.filter(id=advisory_ref.id).update(archive_url=archive_url)

def get_archival(self, url):
self.log(f"Searching for archive URL for this Reference URL: {url}")
try:
archive_response = requests.get(
url=f"https://web.archive.org/web/{url}", allow_redirects=True
)
time.sleep(30)
if archive_response.status_code == 200:
return archive_response.url
elif archive_response.status_code == 403:
self.log(f"Wayback Machine permission denied for '{url}'.")
except requests.RequestException as e:
self.log(f"Error checking existing archival: {e}")
37 changes: 37 additions & 0 deletions vulnerabilities/tests/pipelines/v2_improvers/test_archive_urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from unittest.mock import MagicMock

import pytest

from vulnerabilities.models import AdvisoryReference
from vulnerabilities.pipelines.v2_improvers.archive_urls import ArchiveImproverPipeline


@pytest.mark.django_db
def test_archive_urls_pipeline(monkeypatch):
advisory = AdvisoryReference.objects.create(url="https://example.com", archive_url=None)

mock_response = MagicMock()
mock_response.status_code = 200
mock_response.url = "https://web.archive.org/web/20250519082420/https://example.com"

monkeypatch.setattr(
f"vulnerabilities.pipelines.v2_improvers.archive_urls.time.sleep", MagicMock()
)
monkeypatch.setattr(
f"vulnerabilities.pipelines.v2_improvers.archive_urls.requests.get",
MagicMock(return_value=mock_response),
)

pipeline = ArchiveImproverPipeline()
pipeline.archive_urls()

advisory.refresh_from_db()
assert advisory.archive_url == "https://web.archive.org/web/20250519082420/https://example.com"