diff --git a/archize.py b/archize.py new file mode 100644 index 000000000..3bb0ff7c9 --- /dev/null +++ b/archize.py @@ -0,0 +1,91 @@ +import json +import time +from datetime import datetime + +import requests + +# ── CONFIG ──────────────────────────────────────────────────────────────────── +URLS = [ + "https://github.com/aboutcode-org/vulnerablecode/issues/17", +] + +DELAY_SECONDS = 5 # pause between requests to avoid rate-limiting +LOG_FILE = "archive_log.json" +# ───────────────────────────────────────────────────────────────────────────── + +SPN_ENDPOINT = "https://web.archive.org/save/" + + +def save_url(url: str) -> dict: + """Submit a single URL to the Wayback Machine.""" + try: + response = requests.post( + SPN_ENDPOINT, + data={"url": url}, + headers={"User-Agent": "ArchiveBot/1.0"}, + timeout=30, + ) + + if response.status_code == 200: + # Archive.org returns the archived URL in the Content-Location header + location = response.headers.get("Content-Location", "") + archived_url = f"https://web.archive.org{location}" if location else "check manually" + return {"url": url, "status": "success", "archived_url": archived_url} + + else: + return { + "url": url, + "status": "failed", + "http_code": response.status_code, + "reason": response.text[:200], + } + + except requests.exceptions.Timeout: + return {"url": url, "status": "error", "reason": "Request timed out"} + except requests.exceptions.RequestException as e: + return {"url": url, "status": "error", "reason": str(e)} + + +def archive_all(urls: list[str]) -> list[dict]: + results = [] + total = len(urls) + + print(f"Starting archive of {total} URL(s)...\n") + + for i, url in enumerate(urls, start=1): + print(f"[{i}/{total}] Submitting: {url}") + result = save_url(url) + result["timestamp"] = datetime.utcnow().isoformat() + results.append(result) + + if result["status"] == "success": + print(f" ✓ Archived → {result['archived_url']}") + else: + print(f" ✗ {result.get('reason') or result.get('http_code')}") + + if i < total: + time.sleep(DELAY_SECONDS) + + return results + + +def save_log(results: list[dict], path: str) -> None: + with open(path, "w") as f: + json.dump(results, f, indent=2) + print(f"\nLog saved to {path}") + + +def print_summary(results: list[dict]) -> None: + success = sum(1 for r in results if r["status"] == "success") + failed = len(results) - success + print(f"\n── Summary ──────────────────────") + print(f" Total : {len(results)}") + print(f" Success : {success}") + print(f" Failed : {failed}") + print(f"─────────────────────────────────") + + +if __name__ == "__main__": + results = archive_all(URLS) + print_summary(results) + save_log(results, LOG_FILE) diff --git a/vulnerabilities/improvers/__init__.py b/vulnerabilities/improvers/__init__.py index 982b4bbd8..903ec523f 100644 --- a/vulnerabilities/improvers/__init__.py +++ b/vulnerabilities/improvers/__init__.py @@ -19,6 +19,7 @@ from vulnerabilities.pipelines import flag_ghost_packages from vulnerabilities.pipelines import populate_vulnerability_summary_pipeline from vulnerabilities.pipelines import remove_duplicate_advisories +from vulnerabilities.pipelines.v2_improvers import archive_urls from vulnerabilities.pipelines.v2_improvers import collect_ssvc_trees from vulnerabilities.pipelines.v2_improvers import compute_advisory_content_hash from vulnerabilities.pipelines.v2_improvers import compute_advisory_todo as compute_advisory_todo_v2 @@ -76,5 +77,6 @@ collect_ssvc_trees.CollectSSVCPipeline, relate_severities.RelateSeveritiesPipeline, compute_advisory_content_hash.ComputeAdvisoryContentHash, + archive_urls.ArchiveImproverPipeline, ] ) diff --git a/vulnerabilities/migrations/0117_advisoryreference_archive_url.py b/vulnerabilities/migrations/0117_advisoryreference_archive_url.py new file mode 100644 index 000000000..659113802 --- /dev/null +++ b/vulnerabilities/migrations/0117_advisoryreference_archive_url.py @@ -0,0 +1,20 @@ +# Generated by Django 5.2.11 on 2026-03-31 11:41 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0116_advisoryv2_advisory_content_hash"), + ] + + operations = [ + migrations.AddField( + model_name="advisoryreference", + name="archive_url", + field=models.URLField( + help_text="URL to the backup vulnerability reference", max_length=1024, null=True + ), + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 2e69be49a..98b1af38f 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -2672,6 +2672,12 @@ class AdvisoryReference(models.Model): help_text="URL to the vulnerability reference", ) + archive_url = models.URLField( + max_length=1024, + null=True, + help_text="URL to the backup vulnerability reference", + ) + ADVISORY = "advisory" EXPLOIT = "exploit" COMMIT = "commit" diff --git a/vulnerabilities/pipelines/v2_improvers/archive_urls.py b/vulnerabilities/pipelines/v2_improvers/archive_urls.py new file mode 100644 index 000000000..6337a18ca --- /dev/null +++ b/vulnerabilities/pipelines/v2_improvers/archive_urls.py @@ -0,0 +1,63 @@ +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import time + +import requests + +from vulnerabilities.models import AdvisoryReference +from vulnerabilities.pipelines import VulnerableCodePipeline + + +class ArchiveImproverPipeline(VulnerableCodePipeline): + """ + Archive Improver Pipeline + """ + + pipeline_id = "archive_improver_pipeline" + + @classmethod + def steps(cls): + return (cls.archive_urls,) + + def archive_urls(self): + """Get and stores archive URLs for AdvisoryReferences, flagging missing ones as NO_ARCHIVE""" + advisory_refs = ( + AdvisoryReference.objects.filter(archive_url__isnull=True) + .exclude(archive_url="NO_ARCHIVE") + .only("id", "url") + ) + + for advisory_ref in advisory_refs: + url = advisory_ref.url + if not url or not url.startswith("http"): + continue + + archive_url = self.get_archival(url) + if not archive_url: + AdvisoryReference.objects.filter(id=advisory_ref.id).update( + archive_url="NO_ARCHIVE" + ) + self.log(f"URL unreachable or returned no archive url: {url}") + continue + self.log(f"Found Archived Reference URL: {archive_url}") + AdvisoryReference.objects.filter(id=advisory_ref.id).update(archive_url=archive_url) + + def get_archival(self, url): + self.log(f"Searching for archive URL for this Reference URL: {url}") + try: + archive_response = requests.get( + url=f"https://web.archive.org/web/{url}", allow_redirects=True + ) + time.sleep(30) + if archive_response.status_code == 200: + return archive_response.url + elif archive_response.status_code == 403: + self.log(f"Wayback Machine permission denied for '{url}'.") + except requests.RequestException as e: + self.log(f"Error checking existing archival: {e}") diff --git a/vulnerabilities/tests/pipelines/v2_improvers/test_archive_urls.py b/vulnerabilities/tests/pipelines/v2_improvers/test_archive_urls.py new file mode 100644 index 000000000..e7e38c5d7 --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_improvers/test_archive_urls.py @@ -0,0 +1,37 @@ +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from unittest.mock import MagicMock + +import pytest + +from vulnerabilities.models import AdvisoryReference +from vulnerabilities.pipelines.v2_improvers.archive_urls import ArchiveImproverPipeline + + +@pytest.mark.django_db +def test_archive_urls_pipeline(monkeypatch): + advisory = AdvisoryReference.objects.create(url="https://example.com", archive_url=None) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.url = "https://web.archive.org/web/20250519082420/https://example.com" + + monkeypatch.setattr( + f"vulnerabilities.pipelines.v2_improvers.archive_urls.time.sleep", MagicMock() + ) + monkeypatch.setattr( + f"vulnerabilities.pipelines.v2_improvers.archive_urls.requests.get", + MagicMock(return_value=mock_response), + ) + + pipeline = ArchiveImproverPipeline() + pipeline.archive_urls() + + advisory.refresh_from_db() + assert advisory.archive_url == "https://web.archive.org/web/20250519082420/https://example.com"