diff --git a/app.py b/app.py index 4b9cb7d..8c9b72e 100644 --- a/app.py +++ b/app.py @@ -1,5 +1,5 @@ -import asyncio -import logging +import asyncio +import logging import signal import sys import os @@ -11,180 +11,182 @@ load_dotenv() -DEFAULT_LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" +def _configure_structured_logging(log_level: str = "INFO") -> None: + """ + Replace the default plain-text formatter with StructuredJsonFormatter. + Called once at startup — after this, every logger.info() / logger.error() + across the entire codebase emits JSON automatically. No other files need changing. + """ + from monitoring import StructuredJsonFormatter + handler = logging.StreamHandler() + handler.setFormatter(StructuredJsonFormatter(app_name="tap_plg")) + root = logging.getLogger() + root.setLevel(getattr(logging, log_level.upper(), logging.INFO)) + root.handlers.clear() + root.addHandler(handler) -def get_log_format() -> str: - log_format = os.getenv("LOG_FORMAT", DEFAULT_LOG_FORMAT) - if log_format.lower() == "json": - return DEFAULT_LOG_FORMAT - return log_format +# Configure structured JSON logging at startup +_configure_structured_logging(log_level=os.getenv("LOG_LEVEL", "INFO")) +logger = logging.getLogger(__name__) -# Configure logging -logging.basicConfig( - level=getattr(logging, os.getenv("LOG_LEVEL", "INFO").upper(), logging.INFO), - format=get_log_format(), -) -logger = logging.getLogger(__name__) - - -def validate_configuration(): - """Validate required environment variables and configuration.""" - required_env_vars = [ - "POSTGRES_USER", - "POSTGRES_PASSWORD", - "POSTGRES_DB", - "POSTGRES_HOST", - "RABBITMQ_HOST", - "RABBITMQ_USER", - "RABBITMQ_PASS", - ] - - missing = [var for var in required_env_vars if not os.getenv(var)] - #print the required env vars and their values for debugging - # for var in required_env_vars: - # logger.info("###################") - # logger.info(f"{var}={os.getenv(var)}") - if missing: - logger.error(f"Missing required environment variables: {missing}") - raise ValueError(f"Missing required environment variables: {missing}") - - logger.info("Configuration validation passed") - - -async def health_check(mq_client, db_manager): - """Perform health checks on critical dependencies.""" - from datetime import datetime - - health_status = { - "status": "healthy", - "checks": {}, - "timestamp": datetime.utcnow().isoformat(), - } - - try: - # Check database connection - await db_manager.init_pool() - health_status["checks"]["database"] = "healthy" - except Exception as e: - health_status["checks"]["database"] = f"unhealthy: {str(e)}" - health_status["status"] = "unhealthy" - - try: - # Check message queue connection - await mq_client.connect() - health_status["checks"]["message_queue"] = "healthy" - except Exception as e: - health_status["checks"]["message_queue"] = f"unhealthy: {str(e)}" - health_status["status"] = "unhealthy" - - return health_status - - -def setup_signal_handlers(shutdown_evt): - """ - Setup signal handlers for graceful shutdown. - - Handles: - - SIGINT (Ctrl+C) - - SIGTERM (docker stop, systemctl stop) - - SIGBREAK (Windows Ctrl+Break) - """ - - def signal_handler(signum, frame): - sig_name = ( - signal.Signals(signum).name if hasattr(signal, "Signals") else str(signum) - ) - logger.info(f"Received signal {sig_name}, initiating graceful shutdown...") - shutdown_evt.set() - - # Register signal handlers - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - # Windows-specific signal - if sys.platform == "win32": - signal.signal(signal.SIGBREAK, signal_handler) - - logger.info("Signal handlers registered (SIGINT, SIGTERM)") - - -async def main(): - """ - Main application entry point with proper resource management. - - Lifecycle: - 1. Initialize shared database connection pool (ONCE) - 2. Initialize RabbitMQ client and submission checker with shared pool - 3. Connect to database and message queue - 4. Start consuming messages - 5. Wait for shutdown signal - 6. Gracefully cleanup all resources - """ - global consumer_task - import os - - os.environ["KMP_DUPLICATE_LIB_OK"] = "True" - logger.info(f"Starting plagiarism checker application v{__version__}") - - # Validate configuration - validate_configuration() - - # Initialize shared database connection pool ONCE - from database.db_manager import DatabaseManager - - db_manager = DatabaseManager() - await db_manager.init_pool() - logger.info("Shared database connection pool initialized") - - mq_client = RabbitMQClient() - # Pass shared database manager to checker - checker = SubmissionChecker(mq_client, db_manager=db_manager) - shutdown_evt = asyncio.Event() - - # Setup signal handlers - setup_signal_handlers(shutdown_evt) - - try: - # Initialize message queue (database already initialized) - logger.info("Initializing resources...") - await checker.initialize() - - logger.info("Application ready, processing submissions") - - # Wait for shutdown signal with periodic checks - while not shutdown_evt.is_set(): - try: - await asyncio.wait_for(shutdown_evt.wait(), timeout=1.0) - except asyncio.TimeoutError: - # Timeout is expected, continue checking - continue - - logger.info("Shutdown signal received, cleaning up resources...") - - except KeyboardInterrupt: - logger.info("Keyboard interrupt received") - except Exception as e: - logger.error(f"Application error: {e}", exc_info=True) - finally: - # Ensure resources are cleaned up with timeout - logger.info("Closing database and message queue connections...") - try: - # Give 30 seconds for graceful shutdown - await asyncio.wait_for(checker.close(), timeout=30.0) - # Close shared database pool last - await db_manager.close() - logger.info("Application shutdown complete") - except asyncio.TimeoutError: - logger.warning("Shutdown timeout exceeded, forcing exit") - except Exception as e: - logger.error(f"Error during cleanup: {e}", exc_info=True) - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - logger.info("Application terminated") - sys.exit(0) +def validate_configuration(): + """Validate required environment variables and configuration.""" + required_env_vars = [ + "POSTGRES_USER", + "POSTGRES_PASSWORD", + "POSTGRES_DB", + "POSTGRES_HOST", + "RABBITMQ_HOST", + "RABBITMQ_USER", + "RABBITMQ_PASS", + ] + + missing = [var for var in required_env_vars if not os.getenv(var)] + #print the required env vars and their values for debugging + # for var in required_env_vars: + # logger.info("###################") + # logger.info(f"{var}={os.getenv(var)}") + if missing: + logger.error(f"Missing required environment variables: {missing}") + raise ValueError(f"Missing required environment variables: {missing}") + + logger.info("Configuration validation passed") + + +async def health_check(mq_client, db_manager): + """Perform health checks on critical dependencies.""" + from datetime import datetime + + health_status = { + "status": "healthy", + "checks": {}, + "timestamp": datetime.utcnow().isoformat(), + } + + try: + # Check database connection + await db_manager.init_pool() + health_status["checks"]["database"] = "healthy" + except Exception as e: + health_status["checks"]["database"] = f"unhealthy: {str(e)}" + health_status["status"] = "unhealthy" + + try: + # Check message queue connection + await mq_client.connect() + health_status["checks"]["message_queue"] = "healthy" + except Exception as e: + health_status["checks"]["message_queue"] = f"unhealthy: {str(e)}" + health_status["status"] = "unhealthy" + + return health_status + + +def setup_signal_handlers(shutdown_evt): + """ + Setup signal handlers for graceful shutdown. + + Handles: + - SIGINT (Ctrl+C) + - SIGTERM (docker stop, systemctl stop) + - SIGBREAK (Windows Ctrl+Break) + """ + + def signal_handler(signum, frame): + sig_name = ( + signal.Signals(signum).name if hasattr(signal, "Signals") else str(signum) + ) + logger.info(f"Received signal {sig_name}, initiating graceful shutdown...") + shutdown_evt.set() + + # Register signal handlers + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Windows-specific signal + if sys.platform == "win32": + signal.signal(signal.SIGBREAK, signal_handler) + + logger.info("Signal handlers registered (SIGINT, SIGTERM)") + + +async def main(): + """ + Main application entry point with proper resource management. + + Lifecycle: + 1. Initialize shared database connection pool (ONCE) + 2. Initialize RabbitMQ client and submission checker with shared pool + 3. Connect to database and message queue + 4. Start consuming messages + 5. Wait for shutdown signal + 6. Gracefully cleanup all resources + """ + global consumer_task + import os + + os.environ["KMP_DUPLICATE_LIB_OK"] = "True" + logger.info(f"Starting plagiarism checker application v{__version__}") + + # Validate configuration + validate_configuration() + + # Initialize shared database connection pool ONCE + from database.db_manager import DatabaseManager + + db_manager = DatabaseManager() + await db_manager.init_pool() + logger.info("Shared database connection pool initialized") + + mq_client = RabbitMQClient() + # Pass shared database manager to checker + checker = SubmissionChecker(mq_client, db_manager=db_manager) + shutdown_evt = asyncio.Event() + + # Setup signal handlers + setup_signal_handlers(shutdown_evt) + + try: + # Initialize message queue (database already initialized) + logger.info("Initializing resources...") + await checker.initialize() + + logger.info("Application ready, processing submissions") + + # Wait for shutdown signal with periodic checks + while not shutdown_evt.is_set(): + try: + await asyncio.wait_for(shutdown_evt.wait(), timeout=1.0) + except asyncio.TimeoutError: + # Timeout is expected, continue checking + continue + + logger.info("Shutdown signal received, cleaning up resources...") + + except KeyboardInterrupt: + logger.info("Keyboard interrupt received") + except Exception as e: + logger.error(f"Application error: {e}", exc_info=True) + finally: + # Ensure resources are cleaned up with timeout + logger.info("Closing database and message queue connections...") + try: + # Give 30 seconds for graceful shutdown + await asyncio.wait_for(checker.close(), timeout=30.0) + # Close shared database pool last + await db_manager.close() + logger.info("Application shutdown complete") + except asyncio.TimeoutError: + logger.warning("Shutdown timeout exceeded, forcing exit") + except Exception as e: + logger.error(f"Error during cleanup: {e}", exc_info=True) + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("Application terminated") + sys.exit(0) diff --git a/docker/local/Dockerfile b/docker/local/Dockerfile new file mode 100644 index 0000000..4d65823 --- /dev/null +++ b/docker/local/Dockerfile @@ -0,0 +1,83 @@ +# tap_plg/docker/Dockerfile +# +# Multi-stage build for tap_plg plagiarism detection service. +# +# ── Build notes ─────────────────────────────────────────────────────────────── +# +# The CLIP model (ViT-L/14, ~3.5 GB) is NOT baked into this image. +# It is loaded from a host-mounted volume at runtime via CLIP_LOCAL_MODEL_PATH. +# +# Before running containers, download the model once on your machine: +# +# mkdir -p ./models/clip +# python3 tap_plg/scripts/download_clip_model.py +# +# Then mount it in docker-compose.yml: +# volumes: +# - ./models:/app/models:ro +# +# This keeps the image small (~2 GB instead of ~5.5 GB) and avoids +# re-downloading the model on every image rebuild. +# +# ── Stages ──────────────────────────────────────────────────────────────────── +# base — Python + system deps +# deps — pip install (cached unless requirements.txt changes) +# runtime — final slim image + +FROM python:3.11-slim AS base + +# System dependencies needed by: +# - Pillow (libpng, libjpeg, libwebp) +# - torch / open_clip (libgomp for OpenMP) +# - psycopg2 / asyncpg (libpq) +# - curl (health checks in compose) +RUN apt-get update && apt-get install -y --no-install-recommends \ + libpng-dev \ + libjpeg-dev \ + libwebp-dev \ + libgomp1 \ + libpq-dev \ + curl \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# ── Dependency stage ────────────────────────────────────────────────────────── +FROM base AS deps + +# Copy requirements first — Docker layer cache means pip install only reruns +# when requirements.txt changes, not on every code change. +COPY requirements.txt . + +# Install all dependencies. +# torch + open_clip are large — this layer will be cached after first build. +RUN pip install --no-cache-dir -r requirements.txt + +# ── Runtime stage ───────────────────────────────────────────────────────────── +FROM deps AS runtime + +# Copy application source +COPY . . + +# Create directories for runtime data +# These are overridden by volume mounts in compose but need to exist in image +RUN mkdir -p /app/models/clip \ + /app/data/faiss_index \ + /app/data/temp_images \ + /app/data/reference_images \ + /app/logs + +# Default environment — overridden by compose env_file / environment blocks +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + LOG_LEVEL=INFO \ + LOG_FORMAT=json \ + ENVIRONMENT=development + +# ── Entrypoints ─────────────────────────────────────────────────────────────── +# The compose file overrides the command per service: +# tap_plg_worker: python app.py (background consumer) +# tap_plg_api: uvicorn api.api:app (FastAPI HTTP server) +# +# Default runs the worker. +CMD ["python", "app.py"] diff --git a/docker/local/docker-compose.yml b/docker/local/docker-compose.yml new file mode 100644 index 0000000..2663393 --- /dev/null +++ b/docker/local/docker-compose.yml @@ -0,0 +1,118 @@ +name: tap_plg_local + +# tap_plg/docker-compose.yml +# +# Standalone compose file for running tap_plg on its own. +# For the full TAP LMS testbed, use tap_lms/docker/local/docker-compose.yml +# instead — this file is kept here for tap_plg-only development and testing. +# +# Usage: +# cp .env.example .env +# # Edit .env with your values +# docker compose up -d +# +# ── CLIP model setup (required before first run) ────────────────────────────── +# The CLIP model is mounted from your host machine to avoid a 3.5 GB download +# on every image rebuild. Download it once: +# +# mkdir -p ./models/clip +# python3 scripts/download_clip_model.py +# +# ── Queue names ──────────────────────────────────────────────────────────────── +# IMPORTANT: The queue names here must match what tap_lms publishes to. +# tap_lms RabbitMQ Settings DocType controls the names on that side. +# Update SUBMISSION_QUEUE and FEEDBACK_QUEUE in .env to match. +# Default values in .env.example are plagiarism_submissions / plagiarism_feedback. + +services: + + # ── tap_plg background worker ─────────────────────────────────────────────── + # Consumes from SUBMISSION_QUEUE, runs plagiarism detection, + # publishes results to FEEDBACK_QUEUE. + tap_plg_worker: + build: + context: . + dockerfile: docker/Dockerfile + container_name: tap_plg_worker + env_file: + - .env + environment: + # Explicitly set service hostnames so containers find each other + RABBITMQ_HOST: rabbitmq + POSTGRES_HOST: postgres + volumes: + # Mount CLIP model from host — avoids re-downloading on rebuild + - ./models:/app/models:ro + # Mount data dirs as named volumes for persistence + - faiss_data:/app/data/faiss_index + - temp_images:/app/data/temp_images + - reference_images:/app/data/reference_images + depends_on: + postgres: + condition: service_healthy + rabbitmq: + condition: service_started + restart: on-failure + # CLIP model load takes ~60 seconds on first start + + # ── tap_plg FastAPI ───────────────────────────────────────────────────────── + # Health check endpoint + REST API for direct submission queries. + tap_plg_api: + build: + context: . + dockerfile: docker/Dockerfile + container_name: tap_plg_api + command: ["uvicorn", "api.api:app", "--host", "0.0.0.0", "--port", "8080", "--log-level", "info"] + env_file: + - .env + environment: + POSTGRES_HOST: postgres + ports: + - "${TAP_PLG_API_PORT:-8080}:8080" + depends_on: + postgres: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"] + interval: 15s + timeout: 5s + retries: 5 + + # ── PostgreSQL with pgvector ──────────────────────────────────────────────── + # tap_plg's dedicated database for storing CLIP embeddings and submission + # hashes. Uses the pgvector image which includes the vector extension. + postgres: + image: pgvector/pgvector:pg15 + container_name: tap_plg_postgres + environment: + POSTGRES_USER: "${POSTGRES_USER:-postgres}" + POSTGRES_PASSWORD: "${POSTGRES_PASSWORD:-postgres}" + POSTGRES_DB: "${POSTGRES_DB:-plagiarism_db}" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-postgres}"] + interval: 5s + timeout: 5s + retries: 20 + ports: + - "${POSTGRES_PORT:-5432}:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + + # ── RabbitMQ ───────────────────────────────────────────────────────────────── + # Only included here for standalone tap_plg development. + # In the full TAP LMS testbed this is provided by tap_lms's compose file. + rabbitmq: + image: rabbitmq:4-management + container_name: tap_plg_rabbitmq + ports: + - "5672:5672" + - "15672:15672" + environment: + RABBITMQ_DEFAULT_USER: "${RABBITMQ_USER:-guest}" + RABBITMQ_DEFAULT_PASS: "${RABBITMQ_PASS:-guest}" + +volumes: + postgres_data: + faiss_data: + temp_images: + reference_images: diff --git a/image_worker/assigment_ref_images.py b/image_worker/assigment_ref_images.py index 10617f4..c6c03f3 100644 --- a/image_worker/assigment_ref_images.py +++ b/image_worker/assigment_ref_images.py @@ -1,6 +1,7 @@ import base64 import io import os +import time from PIL import Image import requests from typing import List, Dict, Optional @@ -8,6 +9,7 @@ import asyncpg from datetime import datetime import logging +from monitoring import emit load_dotenv() @@ -286,14 +288,35 @@ async def fetch_from_api( try: # Use synchronous requests in async context (consider aiohttp for true async) - response = requests.post( - api_url, - headers=headers, - json={"assignment_id": assignment_id}, - timeout=30 - ) - - response.raise_for_status() + _api_t0 = time.monotonic() + _api_status = None + try: + response = requests.post( + api_url, + headers=headers, + json={"assignment_id": assignment_id}, + timeout=30 + ) + _api_status = response.status_code + response.raise_for_status() + emit( + logger, "info", "tap_lms_api_call", + endpoint="get_assignment_context", + assignment_id=assignment_id, + http_status=_api_status, + duration_ms=round((time.monotonic() - _api_t0) * 1000, 2), + cached=False, + ) + except Exception as _api_err: + emit( + logger, "error", "tap_lms_api_call", + endpoint="get_assignment_context", + assignment_id=assignment_id, + http_status=_api_status, + duration_ms=round((time.monotonic() - _api_t0) * 1000, 2), + error=str(_api_err), + ) + raise data = response.json() reference_images = data.get("message", {}).get("assignment", {}).get("reference_images", []) diff --git a/image_worker/worker.py b/image_worker/worker.py index 4bc0899..2c03323 100644 --- a/image_worker/worker.py +++ b/image_worker/worker.py @@ -1,1462 +1,1488 @@ -import json -from datetime import datetime -import logging -from PIL import Image -import time -import numpy as np -import asyncio -from typing import Dict, Optional, Tuple, Any -from dotenv import load_dotenv -from image_worker.assigment_ref_images import get_reference_images -from image_worker.gcs_client import is_gcs_url, download_from_gcs, load_gcp_credentials - -from config.config import config -from database.db_manager import DatabaseManager -from image_worker.hash_handler import HashHandler -from image_worker.clip_handler import CLIPHandler -from image_worker.faiss_handler import FAISSHandler -from image_worker.pgvector_handler import PgVectorHandler -from image_worker.ai_generated_detector import AIGeneratedDetector -from image_worker.image_validator import ImageValidator -from utils.exceptions import ( - WorkerNotInitializedError, - ValidationError, - InvalidImageURLError, - GCSDownloadError, -) - -load_dotenv() - -logging.basicConfig( - level=getattr(logging, config.logging.log_level), - format=config.logging.log_format, -) -logger = logging.getLogger(__name__) - - -class ImageWorker: - def __init__(self, db_manager=None): - self.db_manager = db_manager if db_manager else DatabaseManager() - self._db_initialized = False - self._owns_db_manager = db_manager is None - - self.exact_dup_threshold = config.detection.exact_dup_threshold - self.near_dup_threshold = config.detection.near_dup_threshold - self.semantic_threshold = config.detection.semantic_threshold - self.hash_threshold = config.detection.hash_threshold - self.peer_hash_threshold = config.detection.peer_hash_threshold - self.self_hash_threshold = config.detection.self_hash_threshold - self.enable_peer_check = config.detection.enable_peer_check - self.enable_self_check = config.detection.enable_self_check - - self.MAX_IMAGE_SIZE = ( - config.image_processing.max_image_width, - config.image_processing.max_image_height, - ) - self.DOWNLOAD_TIMEOUT = config.image_processing.download_timeout - - self.use_pgvector = config.vector_search.use_pgvector - self.faiss_top_k = config.vector_search.faiss_top_k - self.faiss_index_path = config.vector_search.faiss_index_path - self.faiss_metadata_path = config.vector_search.faiss_metadata_path - - window_minutes = config.detection.resubmission_window_minutes - if window_minutes: - self.resubmission_window_days = float(window_minutes) / (24 * 60) - else: - self.resubmission_window_days = config.detection.resubmission_window_days - - self.hash_handler = HashHandler(hash_size=config.image_processing.hash_size) - self.clip_handler = CLIPHandler( - model_name=config.vector_search.clip_model, - device=config.vector_search.clip_device, - pretrained=config.vector_search.clip_pretrained, - local_model_path=config.vector_search.clip_local_model_path, - ) - self.ai_detector = AIGeneratedDetector() - self.image_validator = ImageValidator( - min_variance_threshold=config.image_processing.min_variance_threshold, - min_unique_colors=config.image_processing.min_unique_colors, - max_solid_color_ratio=config.image_processing.max_solid_color_ratio, - ) - - self.vector_handler = None - self.gcp_credentials = None - - # Initialize GCP credentials if enabled - if config.gcp.gcp_enabled: - try: - self.gcp_credentials = load_gcp_credentials(config.gcp.gcp_key_path) - logger.info(f"GCP credentials loaded from: {config.gcp.gcp_key_path}") - except Exception as e: - logger.error(f"Failed to load GCP credentials: {e}") - if config.environment == "production": - raise GCSDownloadError( - f"GCP authentication failed: {e}", - details={"key_path": config.gcp.gcp_key_path}, - ) - - if not self.use_pgvector: - self.vector_handler = FAISSHandler( - dimension=768, - index_path=self.faiss_index_path, - metadata_path=self.faiss_metadata_path, - ) - - async def initialize(self): - """ - Initialize async resources (database connection pool and pgvector if enabled). - - Must be called before processing submissions. - """ - if not self._db_initialized: - await self.db_manager.init_pool() - - if self.use_pgvector: - self.vector_handler = PgVectorHandler( - self.db_manager, - dimension=768, - ) - await self.vector_handler.get_stats() - - self._db_initialized = True - - async def close(self): - if self._db_initialized: - if self._owns_db_manager: - await self.db_manager.close() - self._db_initialized = False - - async def download_image(self, image_url: str) -> Image.Image: - """ - Download image from a Google Cloud Storage URL. - - Supports gs:// and storage.googleapis.com URLs via configured GCP - credentials. - - Args: - image_url: GCS URL (gs://bucket-name/path/to/image or - https://storage.googleapis.com/bucket-name/path/to/image) - - Returns: - PIL Image object - - Raises: - InvalidImageURLError: If URL is malformed or invalid - GCSDownloadError: For GCS authentication, bucket, blob, or download failures - """ - if not is_gcs_url(image_url): - raise InvalidImageURLError( - f"Invalid GCS URL: {image_url}", - details={"url": image_url}, - ) - - if not self.gcp_credentials: - raise GCSDownloadError( - "GCP credentials not initialized. Set GCP_ENABLED=true and GCP_KEY_PATH in .env", - details={"url": image_url}, - ) - - logger.debug(f"Downloading image from GCS: url={image_url}") - try: - return await download_from_gcs( - image_url, - self.gcp_credentials, - timeout=self.DOWNLOAD_TIMEOUT, - ) - except FileNotFoundError as e: - raise GCSDownloadError( - f"GCS bucket or blob not found: {str(e)}", - details={"url": image_url}, - ) - except ValueError as e: - raise GCSDownloadError( - f"Invalid or corrupted image from GCS: {str(e)}", - details={"url": image_url}, - ) - except Exception as e: - raise GCSDownloadError( - f"GCS download failed: {str(e)}", - details={"url": image_url, "error": str(e)}, - ) - - def _validate_input(self, data: Dict[str, Any]) -> Tuple[str, str, str, str, str]: - """ - Validate input data and extract required fields. - - Args: - data: Input submission data dictionary - - Returns: - Tuple of (submission_id, student_id, assign_id, image_url, db_record_id) - - Raises: - ValidationError: If required fields are missing or invalid - """ - required_fields = ["submission_id", "student_id", "submission_url", "db_record_id"] - for field in required_fields: - if field not in data or not data[field]: - raise ValidationError( - f"Missing required field: {field}", - details={"field": field, "data": data}, - ) - - submission_id = data["submission_id"] - student_id = data["student_id"] - assign_id = data.get("assign_id", "N/A") - image_url = data["submission_url"] - db_record_id = data["db_record_id"] - - return submission_id, student_id, assign_id, image_url, db_record_id - - def _sync_compare_hashes(self, hashes1, hashes2, threshold): - return self.hash_handler.compare_all_hashes(hashes1, hashes2, threshold) - - async def _async_compare_ref(self, hashes, ref): - loop = asyncio.get_event_loop() - comparison = await loop.run_in_executor( - None, - self._sync_compare_hashes, - hashes, - { - "phash": ref["phash"], - "dhash": ref["dhash"], - "ahash": ref["ahash"], - }, - self.hash_threshold, - ) - return comparison, ref - - async def _async_compare_peer(self, hashes, peer): - loop = asyncio.get_event_loop() - comparison = await loop.run_in_executor( - None, - self._sync_compare_hashes, - hashes, - { - "phash": peer["phash"], - "dhash": peer["dhash"], - "ahash": peer["ahash"], - }, - self.peer_hash_threshold, - ) - return comparison, peer - - async def _async_compare_self(self, hashes, prev): - loop = asyncio.get_event_loop() - comparison = await loop.run_in_executor( - None, - self._sync_compare_hashes, - hashes, - { - "phash": prev["phash"], - "dhash": prev["dhash"], - "ahash": prev["ahash"], - }, - self.self_hash_threshold, - ) - return comparison, prev - - async def check_assignment_reference_hash_match( - self, hashes: dict, assignment_id: str - ) -> Tuple[bool, Optional[str], Optional[float], Optional[str]]: - """ - Check if submission matches any reference image via perceptual hash comparison. - - Uses three hash types (pHash, dHash, aHash) for robust duplicate detection. - - Args: - hashes: Dict containing 'phash', 'dhash', 'ahash' hex strings - - Returns: - Tuple of (is_match, reference_id, similarity_score, image_url) - - is_match: True if hash match found - - reference_id: UUID of matched reference (or None) - - similarity_score: 0.0-1.0 similarity score (or None) - - image_url: URL of matched reference image (or None) - - Raises: - Exception: If database query fails - """ - try: - - references = await get_reference_images(assignment_id, self.clip_handler, self.hash_handler) - if not references: - return False, None, None, None - - # for ref_image in references: - # if ref_image["content"] is not None: - # hashes = self.hash_handler.compute_hashes(ref_image["content"]) - # ref_image['phash'] = hashes['phash'] - # ref_image['dhash'] = hashes['dhash'] - # ref_image['ahash'] = hashes['ahash'] - - tasks = [self._async_compare_ref(hashes, ref) for ref in references] - results = await asyncio.gather(*tasks) - - - best_match = None - best_score = 999 - best_comparison = None - for comparison, ref in results: - if comparison["is_match"] and comparison["avg_distance"] < best_score: - best_score = comparison["avg_distance"] - best_match = ref - best_comparison = comparison - - if best_match and best_comparison: - logger.info("Assignment reference match found") - similarity = 1 - (best_score / 64.0) - return ( - True, - str(best_match["name"]), - similarity, - str(best_match["name"]), - ) - else: - logger.info("No assignment reference match found") - return False, None, None, None - - except Exception as e: - logger.error(f"Hash check failed: {e}", exc_info=True) - raise - - async def check_db_reference_hash_match( - self, hashes: dict - ) -> Tuple[bool, Optional[str], Optional[float], Optional[str]]: - """ - Check if submission matches any reference image via perceptual hash comparison. - - Uses three hash types (pHash, dHash, aHash) for robust duplicate detection. - - Args: - hashes: Dict containing 'phash', 'dhash', 'ahash' hex strings - - Returns: - Tuple of (is_match, reference_id, similarity_score, image_url) - - is_match: True if hash match found - - reference_id: UUID of matched reference (or None) - - similarity_score: 0.0-1.0 similarity score (or None) - - image_url: URL of matched reference image (or None) - - Raises: - Exception: If database query fails - """ - try: - references = await self.db_manager.fetch_all_reference_images() - - if not references: - return False, None, None, None - - tasks = [self._async_compare_ref(hashes, ref) for ref in references] - results = await asyncio.gather(*tasks) - - best_match = None - best_score = 999 - best_comparison = None - for comparison, ref in results: - if comparison["is_match"] and comparison["avg_distance"] < best_score: - best_score = comparison["avg_distance"] - best_match = ref - best_comparison = comparison - - if best_match and best_comparison: - similarity = 1 - (best_score / 64.0) - return ( - True, - str(best_match["reference_id"]), - similarity, - best_match.get("image_path"), - ) - else: - return False, None, None, None - - except Exception as e: - logger.error(f"Hash check failed: {e}", exc_info=True) - raise - - async def check_clip_match( - self, image: Image.Image - ) -> Tuple[Optional[str], float, Optional[np.ndarray]]: - """ - Check similarity using CLIP + vector search (FAISS or pgvector) - Retrieves top-K candidates and picks best match above threshold - - Returns: - Tuple of (matched_ref_id, similarity_score, embedding): - - matched_ref_id: Reference ID if match found, None otherwise - - similarity_score: Float similarity score (0.0-1.0) - - embedding: CLIP embedding vector or None on error - """ - try: - embedding = self.clip_handler.generate_embedding(image) - - if self.vector_handler is None: - logger.error("Vector handler not initialized") - return None, 0.0, None - - results = None - if self.use_pgvector: - results = await self.vector_handler.search( - embedding, k=self.faiss_top_k - ) # type: ignore - else: - results = self.vector_handler.search(embedding, k=self.faiss_top_k) - - if not results: - return None, 0.0, None - - # print("#"*70) - # for ref_id, sim, meta in results: - # print(f" Ref ID: {ref_id}, Similarity: {sim:.4f}, Meta: {meta}") - # print("#"*70) - - matches = [ - (ref_id, sim, meta) - for ref_id, sim, meta in results # type: ignore - if sim >= self.semantic_threshold - ] - - if matches: - ref_id, similarity, metadata = matches[0] - return ref_id, float(similarity), embedding - else: - return None, 0.0, embedding - - except Exception as e: - logger.error(f"CLIP search failed: {e}", exc_info=True) - return None, 0.0, None - - def determine_match_type(self, similarity: float) -> str: - """Determine plagiarism category based on similarity score""" - if similarity >= self.exact_dup_threshold: - return "exact_duplicate" - elif similarity >= self.near_dup_threshold: - return "near_duplicate" - elif similarity >= self.semantic_threshold: - return "semantic_match" - else: - return "original" - - async def update_submission( - self, - db_record_id: str, - hashes: dict, - plagiarism_status: dict, - processing_time: int, - clip_embedding: Optional[np.ndarray] = None, - ): - """ - Update submission record in PostgreSQL with comprehensive plagiarism details - - Args: - db_record_id: UUID of the submission record - hashes: dict with 'phash', 'dhash', 'ahash' - plagiarism_status: dict from determine_plagiarism_status method - processing_time: processing time in milliseconds - clip_embedding: CLIP embedding vector (optional, for pgvector storage) - """ - - updated_record = await self.db_manager.update_submission( - db_record_id, - hashes, - plagiarism_status, - processing_time, - ) - - if ( - self.use_pgvector - and clip_embedding is not None - and self.vector_handler is not None - ): - submission_id = plagiarism_status.get("submission_id") - if submission_id and hasattr( - self.vector_handler, "add_submission_embedding" - ): - success = await self.vector_handler.add_submission_embedding( # type: ignore - submission_id, clip_embedding - ) - if success: - logger.debug( - f"Stored CLIP embedding for submission {submission_id}" - ) - else: - logger.warning( - f"Failed to store CLIP embedding for submission {submission_id}" - ) - - logger.info( - f"Updated submission in worker {db_record_id}: {plagiarism_status['match_type']} " - f"(similarity: {plagiarism_status['similarity_score']:.4f})" - ) - return updated_record - - async def process_submission(self, data: Dict[str, Any]) -> Optional[str]: - """ - Main processing callback for image plagiarism detection. - - Args: - data: Submission data containing submission_id, student_id, image_url, db_record_id - - Returns: - JSON string with processing results and plagiarism status, or None on error - - Raises: - RuntimeError: If ImageWorker not initialized - """ - if self.vector_handler is None: - raise WorkerNotInitializedError( - "ImageWorker not initialized. Call initialize() before processing submissions." - ) - - start_time = time.time() - image = None - - try: - extracted = self._validate_input(data) - submission_id, student_id, assign_id, submission_url, db_record_id = extracted - - logger.info(f"Processing submission: {submission_id}") - - image_url = submission_url - - # Check for stock image URLs before downloading - is_stock, stock_site = self.image_validator.check_stock_image_url(image_url) - if is_stock and stock_site: - logger.warning( - f"Stock image rejected: submission={submission_id}, " - f"source={stock_site}, url={image_url}" - ) - stock_result = self._create_stock_image_result( - submission_id, student_id, assign_id, image_url, stock_site - ) - processing_time_ms = int((time.time() - start_time) * 1000) - return json.dumps(stock_result) - - try: - image = await self.download_image(image_url) - except (GCSDownloadError, InvalidImageURLError) as download_error: - logger.warning( - f"Image download failed; marking submission as original: " - f"submission={submission_id}, url={image_url}, error={download_error}", - exc_info=True, - ) - download_failure_result = self._create_download_failure_original_result( - submission_id, - student_id, - assign_id, - image_url, - download_error, - ) - return json.dumps(download_failure_result) - - is_ai_generated, ai_source, ai_confidence = ( - self.ai_detector.check_ai_generated(image) - ) - - if is_ai_generated and ai_confidence >= 0.70: - logger.warning( - f"AI-generated image rejected: submission={submission_id}, " - f"source={ai_source}, confidence={ai_confidence:.2f}" - ) - - ai_detection_result = self._create_ai_detection_result( - submission_id, - student_id, - assign_id, - image_url, - ai_source or "unknown", - ai_confidence, - ) - - processing_time_ms = int((time.time() - start_time) * 1000) - logger.info( - f"Database updated: submission={submission_id}, " - f"processing_time={processing_time_ms}ms" - ) - return json.dumps(ai_detection_result) - - elif is_ai_generated: - logger.info( - f"Low-confidence AI detection: submission={submission_id}, " - f"source={ai_source}, confidence={ai_confidence:.2f}, threshold=0.70" - ) - - hashes = self.hash_handler.compute_hashes(image) - - self_result = await self.check_self_submissions( - hashes, student_id, assign_id, datetime.utcnow() - ) - peer_result = await self.check_peer_submissions( - hashes, - student_id, - self_result.get("first_submission_date_for_image", None), - ) - # hash_check_result = await self.check_db_reference_hash_match(hashes) - hash_check_result = await self.check_assignment_reference_hash_match(hashes,assign_id) - - ( - hash_match, - matched_ref, - hash_similarity, - matched_ref_image_url, - ) = hash_check_result - - skip_clip = ( - hash_match - and hash_similarity is not None - and hash_similarity >= 0.80 - and not peer_result.get("is_match") - and not self_result.get("is_match") - ) - - matched_ref_clip = None - clip_similarity = 0.0 - clip_embedding = None - - if skip_clip: - logger.info( - f"Skipping CLIP: strong hash match (similarity={hash_similarity:.4f} >= 0.80)" - ) - else: - ( - matched_ref_clip, - clip_similarity, - clip_embedding, - ) = await self.check_clip_match(image) - - ref_result = await self._build_reference_result( - hash_match, - matched_ref, - hash_similarity, - matched_ref_image_url, - matched_ref_clip, - clip_similarity, - ) - - plagiarism_status = self.determine_plagiarism_status( - peer_result, self_result, ref_result - ) - plagiarism_status["submission_id"] = submission_id - - plagiarism_status["is_ai_generated"] = is_ai_generated - plagiarism_status["ai_detection_source"] = ( - ai_source if is_ai_generated else None - ) - plagiarism_status["ai_confidence"] = ( - float(ai_confidence) if is_ai_generated else 0.0 - ) - - processing_time = int((time.time() - start_time) * 1000) - - await self.update_submission( - db_record_id, - hashes, - plagiarism_status, - processing_time, - clip_embedding, - ) - - logger.info( - f"Processing complete: submission={submission_id}, " - f"plagiarized={plagiarism_status.get('is_plagiarized', False)}, " - f"match_type={plagiarism_status.get('match_type', 'none')}, " - f"similarity={plagiarism_status.get('similarity_score', 0.0):.4f}, " - f"time={processing_time}ms" - ) - - return self.format_results( - submission_id, assign_id, student_id, image_url, plagiarism_status - ) - - except Exception as e: - logger.error( - f"Failed to process submission: submission_id={data.get('submission_id', 'unknown')}, " - f"error={e}", - exc_info=True, - ) - return None - - finally: - if image: - try: - image.close() - logger.debug("Image resources freed successfully") - except Exception as e: - logger.error(f"Failed to free image resources: error={e}") - - async def check_peer_submissions( - self, - hashes: dict, - current_student_id: str, - first_submission_date_for_image=None, - ) -> dict: - """ - Check if submission matches any peer's submission (hash-based, async). - - Args: - hashes: dict with 'phash', 'dhash', 'ahash' - current_student_id: hashed student_id of current submission - - Returns: - dict with keys: - - is_match (bool) - - matched_submission_ids (list) - - matched_student_ids (list) - - matched_image_urls (list) - - matched_similarities (list) - - best_similarity (float) - - best_match_student_id (str or None) - """ - if not self.enable_peer_check: - return self._create_empty_peer_result() - - try: - peer_submissions = await self.db_manager.fetch_peer_submissions( - current_student_id, first_submission_date_for_image - ) - - if not peer_submissions: - logger.info("No peer submissions for comparison") - return self._create_empty_peer_result() - - tasks = [ - self._async_compare_peer(hashes, peer) for peer in peer_submissions - ] - results = await asyncio.gather(*tasks) - - return self._process_peer_matches(results, peer_submissions) - - except Exception as e: - logger.error(f"Peer check failed: {e}") - return self._create_empty_peer_result() - - async def check_self_submissions( - self, - hashes: dict, - current_student_id: str, - current_assign_id: str, - current_submission_date, - ) -> dict: - """ - Check if submission matches student's own previous submissions (async). - - Logic: - - Within resubmission window: Allowed (not flagged as plagiarism) - - Outside window: Flagged as self-plagiarism - - Args: - hashes: dict with 'phash', 'dhash', 'ahash' - current_student_id: hashed student_id - current_submission_date: datetime of current submission - - Returns: - dict with keys: - - is_match (bool) - - matched_submission_ids (list) - - best_similarity (float) - - days_since_last (int or None) - - within_resubmission_window (bool) - - previous_submission_date (datetime or None) - :param current_student_id: - :param current_assign_id: - """ - if not self.enable_self_check: - return { - "is_match": False, - "matched_submission_ids": [], - "best_similarity": 0.0, - "days_since_last": None, - "within_resubmission_window": False, - "previous_submission_date": None, - } - - try: - self_submissions = await self.db_manager.fetch_self_submissions( - current_student_id - ) - - if not self_submissions: - logger.info("First-time submission for this student") - return { - "is_match": False, - "matched_submission_ids": [], - "matched_assign_ids": [], - "matched_image_urls": [], - "best_similarity": 0.0, - "days_since_last": None, - "same_assignment": False, - "within_resubmission_window": False, - "previous_submission_date": None, - "previous_assign_id": None, - } - - best_match = None - best_score = 999 - matched_ids = [] - matched_assign_ids = [] - matched_image_urls = [] - - first_submission_date_for_image = None - for prev_sub in self_submissions: - comparison = self.hash_handler.compare_all_hashes( - hashes, - { - "phash": prev_sub["phash"], - "dhash": prev_sub["dhash"], - "ahash": prev_sub["ahash"], - }, - threshold=self.self_hash_threshold, - ) - - if comparison["is_match"] and comparison["avg_distance"] < best_score: - best_score = comparison["avg_distance"] - best_match = prev_sub - - if comparison["is_match"]: - matched_ids.append(str(prev_sub["id"])) - matched_assign_ids.append(prev_sub.get("assign_id", "N/A")) - matched_image_urls.append(prev_sub.get("image_url", "")) - - submission_date = prev_sub.get("created_at") - if ( - first_submission_date_for_image is None - or submission_date < first_submission_date_for_image - ): - first_submission_date_for_image = submission_date - - if best_match: - return self._analyze_self_plagiarism( - best_match, - current_assign_id, - current_submission_date, - best_score, - matched_ids, - matched_assign_ids, - matched_image_urls, - first_submission_date_for_image, - ) - else: - logger.info( - f"No self-plagiarism detected: checked {len(self_submissions)} previous submissions" - ) - return self._create_empty_self_result() - - except Exception as e: - logger.error(f"Self-check failed: {e}") - return self._create_empty_self_result() - - def determine_plagiarism_status( - self, peer_result: dict, self_result: dict, ref_result: dict - ) -> dict: - """ - Determine final plagiarism status with priority: Self → Peer → Reference. - - Priority Order: - 1. Self-plagiarism (cross-assignment or late resubmission) - 2. Resubmission within window (same assignment) → Continue checks - 3. Peer plagiarism - 4. Reference database match - 5. Original (no matches) - """ - result = self._init_plagiarism_result() - - if self_result["is_match"]: - self_status = self._handle_self_plagiarism(self_result) - if self_status: - return self_status - result["resubmission_within_window"] = True - result["matched_self_submission_ids"] = self_result[ - "matched_submission_ids" - ] - result["matched_peer_assign_ids"] = self_result["matched_assign_ids"] - result["matched_self_image_urls"] = self_result["matched_image_urls"] - result["days_since_last_submission"] = self_result["days_since_last"] - - if peer_result["is_match"]: - return self._handle_peer_plagiarism(peer_result) - - if ref_result["is_match"]: - ref_status = self._handle_reference_match(ref_result) - if ref_status["is_plagiarized"]: - return ref_status - if result["resubmission_within_window"]: - result["similarity_score"] = self_result["best_similarity"] - result["match_type"] = "resubmission_allowed" - logger.info( - f"Final Status: RESUBMISSION ALLOWED (within window, " - f"{result['days_since_last_submission']} days, passed all checks)" - ) - return result - return ref_status - - if result["resubmission_within_window"]: - result["similarity_score"] = self_result["best_similarity"] - result["match_type"] = "resubmission_allowed" - logger.info( - f"Final Status: RESUBMISSION ALLOWED (within window, " - f"{result['days_since_last_submission']} days, passed all checks)" - ) - - return result - - def _init_plagiarism_result(self) -> dict: - """Initialize empty plagiarism result structure.""" - return { - "is_plagiarized": False, - "match_type": "original", - "plagiarism_source": "none", - "similarity_score": 0.0, - "peer_plagiarism_detected": False, - "self_plagiarism_detected": False, - "resubmission_within_window": False, - "matched_peer_submission_ids": [], - "matched_peer_student_ids": [], - "matched_peer_image_urls": [], - "matched_peer_similarity_scores": [], - "matched_peer_assign_ids": [], - "matched_self_submission_ids": [], - "matched_self_image_urls": [], - "matched_reference_ids": [], - "matched_reference_image_urls": [], - "days_since_last_submission": None, - } - - def _handle_self_plagiarism(self, self_result: dict) -> Optional[dict]: - """ - Handle self-plagiarism detection. - - Returns plagiarism result if actual plagiarism, None if allowed resubmission. - """ - if not self_result["same_assignment"]: - result = self._init_plagiarism_result() - result.update( - { - "is_plagiarized": True, - "self_plagiarism_detected": True, - "plagiarism_source": "self_cross_assignment", - "similarity_score": self_result["best_similarity"], - "match_type": self.determine_match_type( - self_result["best_similarity"] - ), - "matched_self_submission_ids": self_result[ - "matched_submission_ids" - ], - "matched_peer_assign_ids": self_result["matched_assign_ids"], - "matched_self_image_urls": self_result["matched_image_urls"], - "days_since_last_submission": self_result["days_since_last"], - } - ) - logger.info( - f"Final Status: SELF-PLAGIARISM (cross-assignment) - {result['match_type']} " - f"(prev: {self_result['previous_assign_id']})" - ) - return result - - if ( - self_result["same_assignment"] - and not self_result["within_resubmission_window"] - ): - result = self._init_plagiarism_result() - result.update( - { - "is_plagiarized": True, - "self_plagiarism_detected": True, - "plagiarism_source": "self_late_resubmission", - "similarity_score": self_result["best_similarity"], - "match_type": self.determine_match_type( - self_result["best_similarity"] - ), - "matched_self_submission_ids": self_result[ - "matched_submission_ids" - ], - "matched_peer_assign_ids": self_result["matched_assign_ids"], - "matched_self_image_urls": self_result["matched_image_urls"], - "days_since_last_submission": self_result["days_since_last"], - } - ) - logger.info( - f"Final Status: SELF-PLAGIARISM (late resubmission) - {result['match_type']} " - f"({result['days_since_last_submission']} days)" - ) - return result - - return None - - def _handle_peer_plagiarism(self, peer_result: dict) -> dict: - """Handle peer plagiarism detection.""" - result = self._init_plagiarism_result() - result.update( - { - "is_plagiarized": True, - "peer_plagiarism_detected": True, - "plagiarism_source": "peer" - if len(peer_result["matched_submission_ids"]) == 1 - else "peer_collusion", - "similarity_score": peer_result["best_similarity"], - "match_type": self.determine_match_type(peer_result["best_similarity"]), - "matched_peer_submission_ids": peer_result["matched_submission_ids"], - "matched_peer_student_ids": peer_result["matched_student_ids"], - "matched_peer_image_urls": peer_result["matched_image_urls"], - "matched_peer_similarity_scores": peer_result["matched_similarities"], - "matched_peer_assign_ids": peer_result["matched_assign_ids"], - } - ) - logger.info( - f"Final Status: PEER PLAGIARISM ({result['plagiarism_source']}) - {result['match_type']}" - ) - return result - - def _handle_reference_match(self, ref_result: dict) -> dict: - """Handle reference database match.""" - result = self._init_plagiarism_result() - - match_type = self.determine_match_type(ref_result["similarity"]) - - is_plagiarized = match_type != "original" - plagiarism_source = "reference" if is_plagiarized else "none" - - result.update( - { - "is_plagiarized": is_plagiarized, - "plagiarism_source": plagiarism_source, - "similarity_score": ref_result["similarity"], - "match_type": match_type, - } - ) - - if is_plagiarized: - if ref_result.get("matched_ref_id"): - result["matched_reference_ids"] = [ref_result["matched_ref_id"]] - if ref_result.get("image_url"): - result["matched_reference_image_urls"] = [ref_result["image_url"]] - - logger.info(f"Final Status: REFERENCE MATCH - {result['match_type']}") - return result - - def _create_ai_detection_result( - self, - submission_id: str, - student_id: str, - assign_id: str, - image_url: str, - ai_source: str, - ai_confidence: float, - ) -> dict: - """Create AI detection result dictionary.""" - return { - "submission_id": submission_id, - "student_id": student_id, - "assignment_id": assign_id, - "image_url": image_url, - "is_ai_generated": True, - "ai_detection_source": ai_source, - "ai_confidence": float(ai_confidence), - "is_plagiarized": True, - "similarity_score": float(ai_confidence), - "match_type": "ai_generated", - "plagiarism_source": "ai_generated", - "similar_sources": [], - } - - def _create_stock_image_result( - self, - submission_id: str, - student_id: str, - assign_id: str, - image_url: str, - stock_site: str, - ) -> dict: - """Create stock image detection result dictionary.""" - return { - "submission_id": submission_id, - "student_id": student_id, - "assignment_id": assign_id, - "image_url": image_url, - "is_ai_generated": False, - "ai_detection_source": "None", - "ai_confidence": 0.0, - "is_plagiarized": True, - "similarity_score": 1.0, - "match_type": "stock_image", - "plagiarism_source": f"stock_image_{stock_site}", - "similar_sources": [{"source": stock_site, "url": image_url}], - } - - def _create_download_failure_original_result( - self, - submission_id: str, - student_id: str, - assign_id: str, - image_url: str, - error: Exception, - ) -> dict: - """Create an original result when the image cannot be downloaded.""" - return { - "submission_id": submission_id, - "student_id": student_id, - "assignment_id": assign_id, - "image_url": image_url, - "is_ai_generated": False, - "ai_detection_source": "", - "ai_confidence": 0.0, - "is_plagiarized": False, - "similarity_score": 0.0, - "match_type": "original", - "plagiarism_source": "none", - "similar_sources": [], - "download_failed": True, - "download_error": str(error), - } - - async def _build_reference_result( - self, - hash_match: bool, - matched_ref: Optional[str], - hash_similarity: Optional[float], - matched_ref_image_url: Optional[str], - matched_ref_clip: Optional[str], - clip_similarity: Optional[float], - ) -> dict: - """Build reference result from hash or CLIP match.""" - if hash_match: - logger.debug( - f"Using perceptual hash match: reference_id={matched_ref}, similarity={hash_similarity:.4f}" - ) - return { - "is_match": True, - "matched_ref_id": matched_ref, - "similarity": hash_similarity, - "image_url": matched_ref_image_url, - } - - ref_image_url = None - if matched_ref_clip: - logger.debug( - f"Fetching reference image URL: reference_id={matched_ref_clip}" - ) - try: - image_path_val = await self.db_manager.fetch_reference_images_by_id( - matched_ref_clip - ) - if image_path_val: - ref_image_url = image_path_val - logger.debug( - f"Reference image URL retrieved: url={ref_image_url[:80]}..." - ) - else: - logger.warning( - f"Reference image URL not found in database: reference_id={matched_ref_clip}" - ) - except Exception as e: - logger.error( - f"Failed to fetch reference image URL: reference_id={matched_ref_clip}, error={e}" - ) - else: - logger.debug("No CLIP match found, skipping reference image URL fetch") - - is_match = ( - clip_similarity is not None and clip_similarity >= self.semantic_threshold - ) - - return { - "is_match": is_match, - "matched_ref_id": matched_ref_clip, - "similarity": clip_similarity if clip_similarity is not None else 0.0, - "image_url": ref_image_url, - } - - def _create_empty_peer_result(self) -> dict: - """Create empty peer result (no matches found).""" - return { - "is_match": False, - "matched_submission_ids": [], - "matched_student_ids": [], - "matched_assign_ids": [], - "matched_image_urls": [], - "matched_similarities": [], - "best_similarity": 0.0, - "best_match_student_id": None, - } - - def _create_empty_self_result(self) -> dict: - """Create empty self result (no matches found).""" - return { - "is_match": False, - "matched_submission_ids": [], - "matched_assign_ids": [], - "matched_image_urls": [], - "best_similarity": 0.0, - "days_since_last": None, - "same_assignment": False, - "within_resubmission_window": False, - "previous_submission_date": None, - "previous_assign_id": None, - } - - def _process_peer_matches(self, results: list, peer_submissions: list) -> dict: - """Process peer comparison results and build match dictionary.""" - best_match = None - best_score = 999 - matches = [] - - for comparison, peer in results: - if comparison["is_match"]: - similarity = 1 - (comparison["avg_distance"] / 64.0) - matches.append( - { - "submission_id": str(peer["id"]), - "student_id": peer["student_id"], - "assign_id": peer.get("assign_id", "N/A"), - "img_url": peer.get("image_url", ""), - "similarity": similarity, - "avg_distance": comparison["avg_distance"], - } - ) - - if comparison["avg_distance"] < best_score: - best_score = comparison["avg_distance"] - best_match = peer - - if not matches: - logger.info( - f"No peer plagiarism detected (checked {len(peer_submissions)} peers)" - ) - return self._create_empty_peer_result() - - matches.sort(key=lambda x: x["similarity"], reverse=True) - - logger.info( - f"PEER MATCH found: {len(matches)} peer submission(s) " - f"(similarity: {matches[0]['similarity']:.4f})" - ) - - return { - "is_match": True, - "matched_submission_ids": [m["submission_id"] for m in matches], - "matched_student_ids": [m["student_id"] for m in matches], - "matched_assign_ids": [m["assign_id"] for m in matches], - "matched_image_urls": [m["img_url"] for m in matches], - "matched_similarities": [m["similarity"] for m in matches], - "best_similarity": matches[0]["similarity"], - "best_match_student_id": best_match["student_id"] if best_match else None, - } - - def _analyze_self_plagiarism( - self, - best_match: dict, - current_assign_id: str, - current_submission_date, - best_score: float, - matched_ids: list, - matched_assign_ids: list, - matched_image_urls: list, - first_submission_date_for_image: Optional[datetime], - ) -> dict: - """Analyze self-plagiarism match and determine if it's within resubmission window.""" - similarity = 1 - (best_score / 64.0) - days_diff = (current_submission_date - best_match["created_at"]).days - same_assignment = best_match["assign_id"] == current_assign_id - - within_window = False - if same_assignment: - within_window = days_diff <= self.resubmission_window_days - - if not same_assignment: - logger.info( - f" SELF-PLAGIARISM (cross-assignment): Reused from assignment '{best_match['assign_id']}' → '{current_assign_id}' (similarity: {similarity:.4f})" - ) - elif same_assignment and not within_window: - logger.info( - f" SELF-PLAGIARISM (late resubmission): Same assignment '{current_assign_id}' after {days_diff} days (window: {self.resubmission_window_days} days, similarity: {similarity:.4f})" - ) - elif same_assignment and within_window: - logger.info( - f" RESUBMISSION within window: Same assignment '{current_assign_id}', {days_diff} days ago (similarity: {similarity:.4f}) - will check peer/reference" - ) - - return { - "is_match": True, - "matched_submission_ids": matched_ids, - "matched_assign_ids": matched_assign_ids, - "matched_image_urls": matched_image_urls, - "best_similarity": similarity, - "days_since_last": days_diff, - "same_assignment": same_assignment, - "within_resubmission_window": within_window, - "previous_submission_date": best_match["created_at"], - "previous_assign_id": best_match["assign_id"], - "first_submission_date_for_image": first_submission_date_for_image, - } - - def determine_role(self, similarity: float, match_type: str) -> str: - """ - Determine role based on individual match similarity score - - Role classification per match (not submission-level): - - peer_exact_match: >= 0.95 (exact duplicate) - - peer_near_duplicate: >= 0.90 (near duplicate) - - peer_semantic_match: >= 0.80 (semantic match) - - self_copy: self-plagiarism - - reference_copy: reference database match - - Args: - similarity: Individual match similarity score (0.0 to 1.0) - match_type: Type of match ('peer', 'self', 'reference') - - Returns: - Role string for similar_sources array - """ - if match_type == "self": - return "self_copy" - elif match_type == "reference": - return "reference_copy" - else: # peer matches - if similarity >= self.exact_dup_threshold: # >= 0.95 - return "peer_exact_match" - elif similarity >= self.near_dup_threshold: # >= 0.90 - return "peer_near_duplicate" - else: # >= 0.80 - return "peer_semantic_match" - - def format_results( - self, - submission_id: str, - assign_id: str, - student_id: str, - image_url: str, - plagiarism_status: dict, - ) -> str: - """ - Format plagiarism results as JSON for feedback queue. - - Returns JSON string with submission results and similar sources. - """ - try: - similar_sources = [] - - if plagiarism_status.get("is_plagiarized", False): - if plagiarism_status.get("peer_plagiarism_detected"): - similar_sources.extend(self._format_peer_matches(plagiarism_status)) - - if plagiarism_status.get("self_plagiarism_detected"): - similar_sources.extend( - self._format_self_matches(plagiarism_status, student_id) - ) - - if plagiarism_status.get("matched_reference_ids"): - similar_sources.extend( - self._format_reference_matches(plagiarism_status) - ) - - is_plagiarized = plagiarism_status.get("is_plagiarized", False) - - message = { - "submission_id": submission_id, - "student_id": student_id, - "assignment_id": assign_id, - "image_url": image_url, - "is_plagiarized": is_plagiarized, - "match_type": plagiarism_status["match_type"], - } - - if is_plagiarized: - message["similarity_score"] = plagiarism_status["similarity_score"] - message["plagiarism_source"] = plagiarism_status["plagiarism_source"] - message["similar_sources"] = similar_sources - - if plagiarism_status.get("is_ai_generated", False): - message["is_ai_generated"] = True - message["ai_detection_source"] = plagiarism_status.get( - "ai_detection_source" - ) - message["ai_confidence"] = plagiarism_status.get("ai_confidence", 0.0) - - # payload_preview = json.dumps(message, indent=2)[:2000] - # logger.info(f"Result payload preview (2000 chars):\n{payload_preview}...") - - - return json.dumps(message) - - except Exception as e: - logger.error(f"Failed to format results: {e}", exc_info=True) - return json.dumps({"error": str(e)}) - - def _format_peer_matches(self, plagiarism_status: dict) -> list: - """Format peer plagiarism matches for similar_sources.""" - matches = [] - matched_ids = plagiarism_status.get("matched_peer_submission_ids", []) - matched_students = plagiarism_status.get("matched_peer_student_ids", []) - matched_assigns = plagiarism_status.get("matched_peer_assign_ids", []) - matched_urls = plagiarism_status.get("matched_peer_image_urls", []) - matched_scores = plagiarism_status.get("matched_peer_similarity_scores", []) - - for i in range(len(matched_ids)): - similarity = matched_scores[i] if i < len(matched_scores) else 0.0 - matches.append( - { - "submission_id": str(matched_ids[i]) - if i < len(matched_ids) - else None, - "student_id": str(matched_students[i]) - if i < len(matched_students) - else None, - "assignment_id": str(matched_assigns[i]) - if i < len(matched_assigns) - else "N/A", - "image_url": matched_urls[i] if i < len(matched_urls) else "", - "similarity_score": similarity, - "role": self.determine_role(similarity, "peer"), - } - ) - - return matches - - def _format_self_matches(self, plagiarism_status: dict, student_id: str) -> list: - """Format self-plagiarism matches for similar_sources.""" - matches = [] - matched_self_ids = plagiarism_status.get("matched_self_submission_ids", []) - matched_self_assigns = plagiarism_status.get("matched_peer_assign_ids", []) - matched_self_urls = plagiarism_status.get("matched_self_image_urls", []) - similarity = plagiarism_status["similarity_score"] - - for i, self_id in enumerate(matched_self_ids): - matches.append( - { - "submission_id": str(self_id), - "student_id": str(student_id), - "assignment_id": str(matched_self_assigns[i]) - if i < len(matched_self_assigns) - else "N/A", - "image_url": matched_self_urls[i] - if i < len(matched_self_urls) - else "", - "similarity_score": similarity, - "role": "self_copy", - } - ) - - return matches - - def _format_reference_matches(self, plagiarism_status: dict) -> list: - """Format reference database matches for similar_sources.""" - matches = [] - matched_ref_ids = plagiarism_status.get("matched_reference_ids", []) - matched_ref_urls = plagiarism_status.get("matched_reference_image_urls", []) - similarity = plagiarism_status["similarity_score"] - - for i, ref_id in enumerate(matched_ref_ids): - matches.append( - { - "reference_id": str( - ref_id - ), # REF-V2-00211 format from reference_images table - "image_url": matched_ref_urls[i] - if i < len(matched_ref_urls) - else "", - "similarity_score": similarity, - "role": "reference_copy", - } - ) - - return matches +import json +from datetime import datetime +import logging +from PIL import Image +import time +import numpy as np +import asyncio +from typing import Dict, Optional, Tuple, Any +from dotenv import load_dotenv +from image_worker.assigment_ref_images import get_reference_images +from image_worker.gcs_client import is_gcs_url, download_from_gcs, load_gcp_credentials + +from config.config import config +from database.db_manager import DatabaseManager +from image_worker.hash_handler import HashHandler +from image_worker.clip_handler import CLIPHandler +from image_worker.faiss_handler import FAISSHandler +from image_worker.pgvector_handler import PgVectorHandler +from image_worker.ai_generated_detector import AIGeneratedDetector +from image_worker.image_validator import ImageValidator +from utils.exceptions import ( + WorkerNotInitializedError, + ValidationError, + InvalidImageURLError, + GCSDownloadError, +) + +load_dotenv() + +logging.basicConfig( + level=getattr(logging, config.logging.log_level), + format=config.logging.log_format, +) +logger = logging.getLogger(__name__) + + +class ImageWorker: + def __init__(self, db_manager=None): + self.db_manager = db_manager if db_manager else DatabaseManager() + self._db_initialized = False + self._owns_db_manager = db_manager is None + + self.exact_dup_threshold = config.detection.exact_dup_threshold + self.near_dup_threshold = config.detection.near_dup_threshold + self.semantic_threshold = config.detection.semantic_threshold + self.hash_threshold = config.detection.hash_threshold + self.peer_hash_threshold = config.detection.peer_hash_threshold + self.self_hash_threshold = config.detection.self_hash_threshold + self.enable_peer_check = config.detection.enable_peer_check + self.enable_self_check = config.detection.enable_self_check + + self.MAX_IMAGE_SIZE = ( + config.image_processing.max_image_width, + config.image_processing.max_image_height, + ) + self.DOWNLOAD_TIMEOUT = config.image_processing.download_timeout + + self.use_pgvector = config.vector_search.use_pgvector + self.faiss_top_k = config.vector_search.faiss_top_k + self.faiss_index_path = config.vector_search.faiss_index_path + self.faiss_metadata_path = config.vector_search.faiss_metadata_path + + window_minutes = config.detection.resubmission_window_minutes + if window_minutes: + self.resubmission_window_days = float(window_minutes) / (24 * 60) + else: + self.resubmission_window_days = config.detection.resubmission_window_days + + self.hash_handler = HashHandler(hash_size=config.image_processing.hash_size) + self.clip_handler = CLIPHandler( + model_name=config.vector_search.clip_model, + device=config.vector_search.clip_device, + pretrained=config.vector_search.clip_pretrained, + local_model_path=config.vector_search.clip_local_model_path, + ) + self.ai_detector = AIGeneratedDetector() + self.image_validator = ImageValidator( + min_variance_threshold=config.image_processing.min_variance_threshold, + min_unique_colors=config.image_processing.min_unique_colors, + max_solid_color_ratio=config.image_processing.max_solid_color_ratio, + ) + + self.vector_handler = None + self.gcp_credentials = None + + # Initialize GCP credentials if enabled + if config.gcp.gcp_enabled: + try: + self.gcp_credentials = load_gcp_credentials(config.gcp.gcp_key_path) + logger.info(f"GCP credentials loaded from: {config.gcp.gcp_key_path}") + except Exception as e: + logger.error(f"Failed to load GCP credentials: {e}") + if config.environment == "production": + raise GCSDownloadError( + f"GCP authentication failed: {e}", + details={"key_path": config.gcp.gcp_key_path}, + ) + + if not self.use_pgvector: + self.vector_handler = FAISSHandler( + dimension=768, + index_path=self.faiss_index_path, + metadata_path=self.faiss_metadata_path, + ) + + async def initialize(self): + """ + Initialize async resources (database connection pool and pgvector if enabled). + + Must be called before processing submissions. + """ + if not self._db_initialized: + await self.db_manager.init_pool() + + if self.use_pgvector: + self.vector_handler = PgVectorHandler( + self.db_manager, + dimension=768, + ) + await self.vector_handler.get_stats() + + self._db_initialized = True + + async def close(self): + if self._db_initialized: + if self._owns_db_manager: + await self.db_manager.close() + self._db_initialized = False + + async def download_image(self, image_url: str) -> Image.Image: + """ + Download image from a Google Cloud Storage URL. + + Supports gs:// and storage.googleapis.com URLs via configured GCP + credentials. + + Args: + image_url: GCS URL (gs://bucket-name/path/to/image or + https://storage.googleapis.com/bucket-name/path/to/image) + + Returns: + PIL Image object + + Raises: + InvalidImageURLError: If URL is malformed or invalid + GCSDownloadError: For GCS authentication, bucket, blob, or download failures + """ + if not is_gcs_url(image_url): + raise InvalidImageURLError( + f"Invalid GCS URL: {image_url}", + details={"url": image_url}, + ) + + if not self.gcp_credentials: + raise GCSDownloadError( + "GCP credentials not initialized. Set GCP_ENABLED=true and GCP_KEY_PATH in .env", + details={"url": image_url}, + ) + + logger.debug(f"Downloading image from GCS: url={image_url}") + try: + return await download_from_gcs( + image_url, + self.gcp_credentials, + timeout=self.DOWNLOAD_TIMEOUT, + ) + except FileNotFoundError as e: + raise GCSDownloadError( + f"GCS bucket or blob not found: {str(e)}", + details={"url": image_url}, + ) + except ValueError as e: + raise GCSDownloadError( + f"Invalid or corrupted image from GCS: {str(e)}", + details={"url": image_url}, + ) + except Exception as e: + raise GCSDownloadError( + f"GCS download failed: {str(e)}", + details={"url": image_url, "error": str(e)}, + ) + + def _validate_input(self, data: Dict[str, Any]) -> Tuple[str, str, str, str, str]: + """ + Validate input data and extract required fields. + + Args: + data: Input submission data dictionary + + Returns: + Tuple of (submission_id, student_id, assign_id, image_url, db_record_id) + + Raises: + ValidationError: If required fields are missing or invalid + """ + required_fields = ["submission_id", "student_id", "submission_url", "db_record_id"] + for field in required_fields: + if field not in data or not data[field]: + raise ValidationError( + f"Missing required field: {field}", + details={"field": field, "data": data}, + ) + + submission_id = data["submission_id"] + student_id = data["student_id"] + assign_id = data.get("assign_id", "N/A") + image_url = data["submission_url"] + db_record_id = data["db_record_id"] + + return submission_id, student_id, assign_id, image_url, db_record_id + + def _sync_compare_hashes(self, hashes1, hashes2, threshold): + return self.hash_handler.compare_all_hashes(hashes1, hashes2, threshold) + + async def _async_compare_ref(self, hashes, ref): + loop = asyncio.get_event_loop() + comparison = await loop.run_in_executor( + None, + self._sync_compare_hashes, + hashes, + { + "phash": ref["phash"], + "dhash": ref["dhash"], + "ahash": ref["ahash"], + }, + self.hash_threshold, + ) + return comparison, ref + + async def _async_compare_peer(self, hashes, peer): + loop = asyncio.get_event_loop() + comparison = await loop.run_in_executor( + None, + self._sync_compare_hashes, + hashes, + { + "phash": peer["phash"], + "dhash": peer["dhash"], + "ahash": peer["ahash"], + }, + self.peer_hash_threshold, + ) + return comparison, peer + + async def _async_compare_self(self, hashes, prev): + loop = asyncio.get_event_loop() + comparison = await loop.run_in_executor( + None, + self._sync_compare_hashes, + hashes, + { + "phash": prev["phash"], + "dhash": prev["dhash"], + "ahash": prev["ahash"], + }, + self.self_hash_threshold, + ) + return comparison, prev + + async def check_assignment_reference_hash_match( + self, hashes: dict, assignment_id: str + ) -> Tuple[bool, Optional[str], Optional[float], Optional[str]]: + """ + Check if submission matches any reference image via perceptual hash comparison. + + Uses three hash types (pHash, dHash, aHash) for robust duplicate detection. + + Args: + hashes: Dict containing 'phash', 'dhash', 'ahash' hex strings + + Returns: + Tuple of (is_match, reference_id, similarity_score, image_url) + - is_match: True if hash match found + - reference_id: UUID of matched reference (or None) + - similarity_score: 0.0-1.0 similarity score (or None) + - image_url: URL of matched reference image (or None) + + Raises: + Exception: If database query fails + """ + try: + + references = await get_reference_images(assignment_id, self.clip_handler, self.hash_handler) + if not references: + return False, None, None, None + + # for ref_image in references: + # if ref_image["content"] is not None: + # hashes = self.hash_handler.compute_hashes(ref_image["content"]) + # ref_image['phash'] = hashes['phash'] + # ref_image['dhash'] = hashes['dhash'] + # ref_image['ahash'] = hashes['ahash'] + + tasks = [self._async_compare_ref(hashes, ref) for ref in references] + results = await asyncio.gather(*tasks) + + + best_match = None + best_score = 999 + best_comparison = None + for comparison, ref in results: + if comparison["is_match"] and comparison["avg_distance"] < best_score: + best_score = comparison["avg_distance"] + best_match = ref + best_comparison = comparison + + if best_match and best_comparison: + logger.info("Assignment reference match found") + similarity = 1 - (best_score / 64.0) + return ( + True, + str(best_match["name"]), + similarity, + str(best_match["name"]), + ) + else: + logger.info("No assignment reference match found") + return False, None, None, None + + except Exception as e: + logger.error(f"Hash check failed: {e}", exc_info=True) + raise + + async def check_db_reference_hash_match( + self, hashes: dict + ) -> Tuple[bool, Optional[str], Optional[float], Optional[str]]: + """ + Check if submission matches any reference image via perceptual hash comparison. + + Uses three hash types (pHash, dHash, aHash) for robust duplicate detection. + + Args: + hashes: Dict containing 'phash', 'dhash', 'ahash' hex strings + + Returns: + Tuple of (is_match, reference_id, similarity_score, image_url) + - is_match: True if hash match found + - reference_id: UUID of matched reference (or None) + - similarity_score: 0.0-1.0 similarity score (or None) + - image_url: URL of matched reference image (or None) + + Raises: + Exception: If database query fails + """ + try: + references = await self.db_manager.fetch_all_reference_images() + + if not references: + return False, None, None, None + + tasks = [self._async_compare_ref(hashes, ref) for ref in references] + results = await asyncio.gather(*tasks) + + best_match = None + best_score = 999 + best_comparison = None + for comparison, ref in results: + if comparison["is_match"] and comparison["avg_distance"] < best_score: + best_score = comparison["avg_distance"] + best_match = ref + best_comparison = comparison + + if best_match and best_comparison: + similarity = 1 - (best_score / 64.0) + return ( + True, + str(best_match["reference_id"]), + similarity, + best_match.get("image_path"), + ) + else: + return False, None, None, None + + except Exception as e: + logger.error(f"Hash check failed: {e}", exc_info=True) + raise + + async def check_clip_match( + self, image: Image.Image + ) -> Tuple[Optional[str], float, Optional[np.ndarray]]: + """ + Check similarity using CLIP + vector search (FAISS or pgvector) + Retrieves top-K candidates and picks best match above threshold + + Returns: + Tuple of (matched_ref_id, similarity_score, embedding): + - matched_ref_id: Reference ID if match found, None otherwise + - similarity_score: Float similarity score (0.0-1.0) + - embedding: CLIP embedding vector or None on error + """ + try: + embedding = self.clip_handler.generate_embedding(image) + + if self.vector_handler is None: + logger.error("Vector handler not initialized") + return None, 0.0, None + + results = None + if self.use_pgvector: + results = await self.vector_handler.search( + embedding, k=self.faiss_top_k + ) # type: ignore + else: + results = self.vector_handler.search(embedding, k=self.faiss_top_k) + + if not results: + return None, 0.0, None + + # print("#"*70) + # for ref_id, sim, meta in results: + # print(f" Ref ID: {ref_id}, Similarity: {sim:.4f}, Meta: {meta}") + # print("#"*70) + + matches = [ + (ref_id, sim, meta) + for ref_id, sim, meta in results # type: ignore + if sim >= self.semantic_threshold + ] + + if matches: + ref_id, similarity, metadata = matches[0] + return ref_id, float(similarity), embedding + else: + return None, 0.0, embedding + + except Exception as e: + logger.error(f"CLIP search failed: {e}", exc_info=True) + return None, 0.0, None + + def determine_match_type(self, similarity: float) -> str: + """Determine plagiarism category based on similarity score""" + if similarity >= self.exact_dup_threshold: + return "exact_duplicate" + elif similarity >= self.near_dup_threshold: + return "near_duplicate" + elif similarity >= self.semantic_threshold: + return "semantic_match" + else: + return "original" + + async def update_submission( + self, + db_record_id: str, + hashes: dict, + plagiarism_status: dict, + processing_time: int, + clip_embedding: Optional[np.ndarray] = None, + ): + """ + Update submission record in PostgreSQL with comprehensive plagiarism details + + Args: + db_record_id: UUID of the submission record + hashes: dict with 'phash', 'dhash', 'ahash' + plagiarism_status: dict from determine_plagiarism_status method + processing_time: processing time in milliseconds + clip_embedding: CLIP embedding vector (optional, for pgvector storage) + """ + + updated_record = await self.db_manager.update_submission( + db_record_id, + hashes, + plagiarism_status, + processing_time, + ) + + if ( + self.use_pgvector + and clip_embedding is not None + and self.vector_handler is not None + ): + submission_id = plagiarism_status.get("submission_id") + if submission_id and hasattr( + self.vector_handler, "add_submission_embedding" + ): + success = await self.vector_handler.add_submission_embedding( # type: ignore + submission_id, clip_embedding + ) + if success: + logger.debug( + f"Stored CLIP embedding for submission {submission_id}" + ) + else: + logger.warning( + f"Failed to store CLIP embedding for submission {submission_id}" + ) + + logger.info( + f"Updated submission in worker {db_record_id}: {plagiarism_status['match_type']} " + f"(similarity: {plagiarism_status['similarity_score']:.4f})" + ) + return updated_record + + async def process_submission(self, data: Dict[str, Any]) -> Optional[str]: + """ + Main processing callback for image plagiarism detection. + + Args: + data: Submission data containing submission_id, student_id, image_url, db_record_id + + Returns: + JSON string with processing results and plagiarism status, or None on error + + Raises: + RuntimeError: If ImageWorker not initialized + """ + if self.vector_handler is None: + raise WorkerNotInitializedError( + "ImageWorker not initialized. Call initialize() before processing submissions." + ) + + start_time = time.time() + image = None + + try: + extracted = self._validate_input(data) + submission_id, student_id, assign_id, submission_url, db_record_id = extracted + + logger.info(f"Processing submission: {submission_id}") + + image_url = submission_url + + # Check for stock image URLs before downloading + is_stock, stock_site = self.image_validator.check_stock_image_url(image_url) + if is_stock and stock_site: + logger.warning( + f"Stock image rejected: submission={submission_id}, " + f"source={stock_site}, url={image_url}" + ) + stock_result = self._create_stock_image_result( + submission_id, student_id, assign_id, image_url, stock_site + ) + processing_time_ms = int((time.time() - start_time) * 1000) + return json.dumps(stock_result) + + try: + image = await self.download_image(image_url) + except (GCSDownloadError, InvalidImageURLError) as download_error: + logger.warning( + f"Image download failed; marking submission as original: " + f"submission={submission_id}, url={image_url}, error={download_error}", + exc_info=True, + ) + download_failure_result = self._create_download_failure_original_result( + submission_id, + student_id, + assign_id, + image_url, + download_error, + ) + return json.dumps(download_failure_result) + + _step_t0 = time.monotonic() + is_ai_generated, ai_source, ai_confidence = ( + self.ai_detector.check_ai_generated(image) + ) + record_detection_step( + logger=logger, + submission_id=submission_id, + step="ai_detection", + status="complete", + duration_ms=(time.monotonic() - _step_t0) * 1000, + result="ai_generated" if is_ai_generated else "original", + ) + + if is_ai_generated and ai_confidence >= 0.70: + logger.warning( + f"AI-generated image rejected: submission={submission_id}, " + f"source={ai_source}, confidence={ai_confidence:.2f}" + ) + + ai_detection_result = self._create_ai_detection_result( + submission_id, + student_id, + assign_id, + image_url, + ai_source or "unknown", + ai_confidence, + ) + + processing_time_ms = int((time.time() - start_time) * 1000) + logger.info( + f"Database updated: submission={submission_id}, " + f"processing_time={processing_time_ms}ms" + ) + return json.dumps(ai_detection_result) + + elif is_ai_generated: + logger.info( + f"Low-confidence AI detection: submission={submission_id}, " + f"source={ai_source}, confidence={ai_confidence:.2f}, threshold=0.70" + ) + + _step_t0 = time.monotonic() + hashes = self.hash_handler.compute_hashes(image) + record_detection_step( + logger=logger, + submission_id=submission_id, + step="hash_check", + status="complete", + duration_ms=(time.monotonic() - _step_t0) * 1000, + ) + + self_result = await self.check_self_submissions( + hashes, student_id, assign_id, datetime.utcnow() + ) + peer_result = await self.check_peer_submissions( + hashes, + student_id, + self_result.get("first_submission_date_for_image", None), + ) + # hash_check_result = await self.check_db_reference_hash_match(hashes) + hash_check_result = await self.check_assignment_reference_hash_match(hashes,assign_id) + + ( + hash_match, + matched_ref, + hash_similarity, + matched_ref_image_url, + ) = hash_check_result + + skip_clip = ( + hash_match + and hash_similarity is not None + and hash_similarity >= 0.80 + and not peer_result.get("is_match") + and not self_result.get("is_match") + ) + + matched_ref_clip = None + clip_similarity = 0.0 + clip_embedding = None + + if skip_clip: + logger.info( + f"Skipping CLIP: strong hash match (similarity={hash_similarity:.4f} >= 0.80)" + ) + else: + _step_t0 = time.monotonic() + ( + matched_ref_clip, + clip_similarity, + clip_embedding, + ) = await self.check_clip_match(image) + record_detection_step( + logger=logger, + submission_id=submission_id, + step="clip_similarity", + status="complete", + duration_ms=(time.monotonic() - _step_t0) * 1000, + result="match" if matched_ref_clip else "no_match", + ) + + ref_result = await self._build_reference_result( + hash_match, + matched_ref, + hash_similarity, + matched_ref_image_url, + matched_ref_clip, + clip_similarity, + ) + + plagiarism_status = self.determine_plagiarism_status( + peer_result, self_result, ref_result + ) + plagiarism_status["submission_id"] = submission_id + + plagiarism_status["is_ai_generated"] = is_ai_generated + plagiarism_status["ai_detection_source"] = ( + ai_source if is_ai_generated else None + ) + plagiarism_status["ai_confidence"] = ( + float(ai_confidence) if is_ai_generated else 0.0 + ) + + processing_time = int((time.time() - start_time) * 1000) + + await self.update_submission( + db_record_id, + hashes, + plagiarism_status, + processing_time, + clip_embedding, + ) + + logger.info( + f"Processing complete: submission={submission_id}, " + f"plagiarized={plagiarism_status.get('is_plagiarized', False)}, " + f"match_type={plagiarism_status.get('match_type', 'none')}, " + f"similarity={plagiarism_status.get('similarity_score', 0.0):.4f}, " + f"time={processing_time}ms" + ) + + return self.format_results( + submission_id, assign_id, student_id, image_url, plagiarism_status + ) + + except Exception as e: + logger.error( + f"Failed to process submission: submission_id={data.get('submission_id', 'unknown')}, " + f"error={e}", + exc_info=True, + ) + return None + + finally: + if image: + try: + image.close() + logger.debug("Image resources freed successfully") + except Exception as e: + logger.error(f"Failed to free image resources: error={e}") + + async def check_peer_submissions( + self, + hashes: dict, + current_student_id: str, + first_submission_date_for_image=None, + ) -> dict: + """ + Check if submission matches any peer's submission (hash-based, async). + + Args: + hashes: dict with 'phash', 'dhash', 'ahash' + current_student_id: hashed student_id of current submission + + Returns: + dict with keys: + - is_match (bool) + - matched_submission_ids (list) + - matched_student_ids (list) + - matched_image_urls (list) + - matched_similarities (list) + - best_similarity (float) + - best_match_student_id (str or None) + """ + if not self.enable_peer_check: + return self._create_empty_peer_result() + + try: + peer_submissions = await self.db_manager.fetch_peer_submissions( + current_student_id, first_submission_date_for_image + ) + + if not peer_submissions: + logger.info("No peer submissions for comparison") + return self._create_empty_peer_result() + + tasks = [ + self._async_compare_peer(hashes, peer) for peer in peer_submissions + ] + results = await asyncio.gather(*tasks) + + return self._process_peer_matches(results, peer_submissions) + + except Exception as e: + logger.error(f"Peer check failed: {e}") + return self._create_empty_peer_result() + + async def check_self_submissions( + self, + hashes: dict, + current_student_id: str, + current_assign_id: str, + current_submission_date, + ) -> dict: + """ + Check if submission matches student's own previous submissions (async). + + Logic: + - Within resubmission window: Allowed (not flagged as plagiarism) + - Outside window: Flagged as self-plagiarism + + Args: + hashes: dict with 'phash', 'dhash', 'ahash' + current_student_id: hashed student_id + current_submission_date: datetime of current submission + + Returns: + dict with keys: + - is_match (bool) + - matched_submission_ids (list) + - best_similarity (float) + - days_since_last (int or None) + - within_resubmission_window (bool) + - previous_submission_date (datetime or None) + :param current_student_id: + :param current_assign_id: + """ + if not self.enable_self_check: + return { + "is_match": False, + "matched_submission_ids": [], + "best_similarity": 0.0, + "days_since_last": None, + "within_resubmission_window": False, + "previous_submission_date": None, + } + + try: + self_submissions = await self.db_manager.fetch_self_submissions( + current_student_id + ) + + if not self_submissions: + logger.info("First-time submission for this student") + return { + "is_match": False, + "matched_submission_ids": [], + "matched_assign_ids": [], + "matched_image_urls": [], + "best_similarity": 0.0, + "days_since_last": None, + "same_assignment": False, + "within_resubmission_window": False, + "previous_submission_date": None, + "previous_assign_id": None, + } + + best_match = None + best_score = 999 + matched_ids = [] + matched_assign_ids = [] + matched_image_urls = [] + + first_submission_date_for_image = None + for prev_sub in self_submissions: + comparison = self.hash_handler.compare_all_hashes( + hashes, + { + "phash": prev_sub["phash"], + "dhash": prev_sub["dhash"], + "ahash": prev_sub["ahash"], + }, + threshold=self.self_hash_threshold, + ) + + if comparison["is_match"] and comparison["avg_distance"] < best_score: + best_score = comparison["avg_distance"] + best_match = prev_sub + + if comparison["is_match"]: + matched_ids.append(str(prev_sub["id"])) + matched_assign_ids.append(prev_sub.get("assign_id", "N/A")) + matched_image_urls.append(prev_sub.get("image_url", "")) + + submission_date = prev_sub.get("created_at") + if ( + first_submission_date_for_image is None + or submission_date < first_submission_date_for_image + ): + first_submission_date_for_image = submission_date + + if best_match: + return self._analyze_self_plagiarism( + best_match, + current_assign_id, + current_submission_date, + best_score, + matched_ids, + matched_assign_ids, + matched_image_urls, + first_submission_date_for_image, + ) + else: + logger.info( + f"No self-plagiarism detected: checked {len(self_submissions)} previous submissions" + ) + return self._create_empty_self_result() + + except Exception as e: + logger.error(f"Self-check failed: {e}") + return self._create_empty_self_result() + + def determine_plagiarism_status( + self, peer_result: dict, self_result: dict, ref_result: dict + ) -> dict: + """ + Determine final plagiarism status with priority: Self → Peer → Reference. + + Priority Order: + 1. Self-plagiarism (cross-assignment or late resubmission) + 2. Resubmission within window (same assignment) → Continue checks + 3. Peer plagiarism + 4. Reference database match + 5. Original (no matches) + """ + result = self._init_plagiarism_result() + + if self_result["is_match"]: + self_status = self._handle_self_plagiarism(self_result) + if self_status: + return self_status + result["resubmission_within_window"] = True + result["matched_self_submission_ids"] = self_result[ + "matched_submission_ids" + ] + result["matched_peer_assign_ids"] = self_result["matched_assign_ids"] + result["matched_self_image_urls"] = self_result["matched_image_urls"] + result["days_since_last_submission"] = self_result["days_since_last"] + + if peer_result["is_match"]: + return self._handle_peer_plagiarism(peer_result) + + if ref_result["is_match"]: + ref_status = self._handle_reference_match(ref_result) + if ref_status["is_plagiarized"]: + return ref_status + if result["resubmission_within_window"]: + result["similarity_score"] = self_result["best_similarity"] + result["match_type"] = "resubmission_allowed" + logger.info( + f"Final Status: RESUBMISSION ALLOWED (within window, " + f"{result['days_since_last_submission']} days, passed all checks)" + ) + return result + return ref_status + + if result["resubmission_within_window"]: + result["similarity_score"] = self_result["best_similarity"] + result["match_type"] = "resubmission_allowed" + logger.info( + f"Final Status: RESUBMISSION ALLOWED (within window, " + f"{result['days_since_last_submission']} days, passed all checks)" + ) + + return result + + def _init_plagiarism_result(self) -> dict: + """Initialize empty plagiarism result structure.""" + return { + "is_plagiarized": False, + "match_type": "original", + "plagiarism_source": "none", + "similarity_score": 0.0, + "peer_plagiarism_detected": False, + "self_plagiarism_detected": False, + "resubmission_within_window": False, + "matched_peer_submission_ids": [], + "matched_peer_student_ids": [], + "matched_peer_image_urls": [], + "matched_peer_similarity_scores": [], + "matched_peer_assign_ids": [], + "matched_self_submission_ids": [], + "matched_self_image_urls": [], + "matched_reference_ids": [], + "matched_reference_image_urls": [], + "days_since_last_submission": None, + } + + def _handle_self_plagiarism(self, self_result: dict) -> Optional[dict]: + """ + Handle self-plagiarism detection. + + Returns plagiarism result if actual plagiarism, None if allowed resubmission. + """ + if not self_result["same_assignment"]: + result = self._init_plagiarism_result() + result.update( + { + "is_plagiarized": True, + "self_plagiarism_detected": True, + "plagiarism_source": "self_cross_assignment", + "similarity_score": self_result["best_similarity"], + "match_type": self.determine_match_type( + self_result["best_similarity"] + ), + "matched_self_submission_ids": self_result[ + "matched_submission_ids" + ], + "matched_peer_assign_ids": self_result["matched_assign_ids"], + "matched_self_image_urls": self_result["matched_image_urls"], + "days_since_last_submission": self_result["days_since_last"], + } + ) + logger.info( + f"Final Status: SELF-PLAGIARISM (cross-assignment) - {result['match_type']} " + f"(prev: {self_result['previous_assign_id']})" + ) + return result + + if ( + self_result["same_assignment"] + and not self_result["within_resubmission_window"] + ): + result = self._init_plagiarism_result() + result.update( + { + "is_plagiarized": True, + "self_plagiarism_detected": True, + "plagiarism_source": "self_late_resubmission", + "similarity_score": self_result["best_similarity"], + "match_type": self.determine_match_type( + self_result["best_similarity"] + ), + "matched_self_submission_ids": self_result[ + "matched_submission_ids" + ], + "matched_peer_assign_ids": self_result["matched_assign_ids"], + "matched_self_image_urls": self_result["matched_image_urls"], + "days_since_last_submission": self_result["days_since_last"], + } + ) + logger.info( + f"Final Status: SELF-PLAGIARISM (late resubmission) - {result['match_type']} " + f"({result['days_since_last_submission']} days)" + ) + return result + + return None + + def _handle_peer_plagiarism(self, peer_result: dict) -> dict: + """Handle peer plagiarism detection.""" + result = self._init_plagiarism_result() + result.update( + { + "is_plagiarized": True, + "peer_plagiarism_detected": True, + "plagiarism_source": "peer" + if len(peer_result["matched_submission_ids"]) == 1 + else "peer_collusion", + "similarity_score": peer_result["best_similarity"], + "match_type": self.determine_match_type(peer_result["best_similarity"]), + "matched_peer_submission_ids": peer_result["matched_submission_ids"], + "matched_peer_student_ids": peer_result["matched_student_ids"], + "matched_peer_image_urls": peer_result["matched_image_urls"], + "matched_peer_similarity_scores": peer_result["matched_similarities"], + "matched_peer_assign_ids": peer_result["matched_assign_ids"], + } + ) + logger.info( + f"Final Status: PEER PLAGIARISM ({result['plagiarism_source']}) - {result['match_type']}" + ) + return result + + def _handle_reference_match(self, ref_result: dict) -> dict: + """Handle reference database match.""" + result = self._init_plagiarism_result() + + match_type = self.determine_match_type(ref_result["similarity"]) + + is_plagiarized = match_type != "original" + plagiarism_source = "reference" if is_plagiarized else "none" + + result.update( + { + "is_plagiarized": is_plagiarized, + "plagiarism_source": plagiarism_source, + "similarity_score": ref_result["similarity"], + "match_type": match_type, + } + ) + + if is_plagiarized: + if ref_result.get("matched_ref_id"): + result["matched_reference_ids"] = [ref_result["matched_ref_id"]] + if ref_result.get("image_url"): + result["matched_reference_image_urls"] = [ref_result["image_url"]] + + logger.info(f"Final Status: REFERENCE MATCH - {result['match_type']}") + return result + + def _create_ai_detection_result( + self, + submission_id: str, + student_id: str, + assign_id: str, + image_url: str, + ai_source: str, + ai_confidence: float, + ) -> dict: + """Create AI detection result dictionary.""" + return { + "submission_id": submission_id, + "student_id": student_id, + "assignment_id": assign_id, + "image_url": image_url, + "is_ai_generated": True, + "ai_detection_source": ai_source, + "ai_confidence": float(ai_confidence), + "is_plagiarized": True, + "similarity_score": float(ai_confidence), + "match_type": "ai_generated", + "plagiarism_source": "ai_generated", + "similar_sources": [], + } + + def _create_stock_image_result( + self, + submission_id: str, + student_id: str, + assign_id: str, + image_url: str, + stock_site: str, + ) -> dict: + """Create stock image detection result dictionary.""" + return { + "submission_id": submission_id, + "student_id": student_id, + "assignment_id": assign_id, + "image_url": image_url, + "is_ai_generated": False, + "ai_detection_source": "None", + "ai_confidence": 0.0, + "is_plagiarized": True, + "similarity_score": 1.0, + "match_type": "stock_image", + "plagiarism_source": f"stock_image_{stock_site}", + "similar_sources": [{"source": stock_site, "url": image_url}], + } + + def _create_download_failure_original_result( + self, + submission_id: str, + student_id: str, + assign_id: str, + image_url: str, + error: Exception, + ) -> dict: + """Create an original result when the image cannot be downloaded.""" + return { + "submission_id": submission_id, + "student_id": student_id, + "assignment_id": assign_id, + "image_url": image_url, + "is_ai_generated": False, + "ai_detection_source": "", + "ai_confidence": 0.0, + "is_plagiarized": False, + "similarity_score": 0.0, + "match_type": "original", + "plagiarism_source": "none", + "similar_sources": [], + "download_failed": True, + "download_error": str(error), + } + + async def _build_reference_result( + self, + hash_match: bool, + matched_ref: Optional[str], + hash_similarity: Optional[float], + matched_ref_image_url: Optional[str], + matched_ref_clip: Optional[str], + clip_similarity: Optional[float], + ) -> dict: + """Build reference result from hash or CLIP match.""" + if hash_match: + logger.debug( + f"Using perceptual hash match: reference_id={matched_ref}, similarity={hash_similarity:.4f}" + ) + return { + "is_match": True, + "matched_ref_id": matched_ref, + "similarity": hash_similarity, + "image_url": matched_ref_image_url, + } + + ref_image_url = None + if matched_ref_clip: + logger.debug( + f"Fetching reference image URL: reference_id={matched_ref_clip}" + ) + try: + image_path_val = await self.db_manager.fetch_reference_images_by_id( + matched_ref_clip + ) + if image_path_val: + ref_image_url = image_path_val + logger.debug( + f"Reference image URL retrieved: url={ref_image_url[:80]}..." + ) + else: + logger.warning( + f"Reference image URL not found in database: reference_id={matched_ref_clip}" + ) + except Exception as e: + logger.error( + f"Failed to fetch reference image URL: reference_id={matched_ref_clip}, error={e}" + ) + else: + logger.debug("No CLIP match found, skipping reference image URL fetch") + + is_match = ( + clip_similarity is not None and clip_similarity >= self.semantic_threshold + ) + + return { + "is_match": is_match, + "matched_ref_id": matched_ref_clip, + "similarity": clip_similarity if clip_similarity is not None else 0.0, + "image_url": ref_image_url, + } + + def _create_empty_peer_result(self) -> dict: + """Create empty peer result (no matches found).""" + return { + "is_match": False, + "matched_submission_ids": [], + "matched_student_ids": [], + "matched_assign_ids": [], + "matched_image_urls": [], + "matched_similarities": [], + "best_similarity": 0.0, + "best_match_student_id": None, + } + + def _create_empty_self_result(self) -> dict: + """Create empty self result (no matches found).""" + return { + "is_match": False, + "matched_submission_ids": [], + "matched_assign_ids": [], + "matched_image_urls": [], + "best_similarity": 0.0, + "days_since_last": None, + "same_assignment": False, + "within_resubmission_window": False, + "previous_submission_date": None, + "previous_assign_id": None, + } + + def _process_peer_matches(self, results: list, peer_submissions: list) -> dict: + """Process peer comparison results and build match dictionary.""" + best_match = None + best_score = 999 + matches = [] + + for comparison, peer in results: + if comparison["is_match"]: + similarity = 1 - (comparison["avg_distance"] / 64.0) + matches.append( + { + "submission_id": str(peer["id"]), + "student_id": peer["student_id"], + "assign_id": peer.get("assign_id", "N/A"), + "img_url": peer.get("image_url", ""), + "similarity": similarity, + "avg_distance": comparison["avg_distance"], + } + ) + + if comparison["avg_distance"] < best_score: + best_score = comparison["avg_distance"] + best_match = peer + + if not matches: + logger.info( + f"No peer plagiarism detected (checked {len(peer_submissions)} peers)" + ) + return self._create_empty_peer_result() + + matches.sort(key=lambda x: x["similarity"], reverse=True) + + logger.info( + f"PEER MATCH found: {len(matches)} peer submission(s) " + f"(similarity: {matches[0]['similarity']:.4f})" + ) + + return { + "is_match": True, + "matched_submission_ids": [m["submission_id"] for m in matches], + "matched_student_ids": [m["student_id"] for m in matches], + "matched_assign_ids": [m["assign_id"] for m in matches], + "matched_image_urls": [m["img_url"] for m in matches], + "matched_similarities": [m["similarity"] for m in matches], + "best_similarity": matches[0]["similarity"], + "best_match_student_id": best_match["student_id"] if best_match else None, + } + + def _analyze_self_plagiarism( + self, + best_match: dict, + current_assign_id: str, + current_submission_date, + best_score: float, + matched_ids: list, + matched_assign_ids: list, + matched_image_urls: list, + first_submission_date_for_image: Optional[datetime], + ) -> dict: + """Analyze self-plagiarism match and determine if it's within resubmission window.""" + similarity = 1 - (best_score / 64.0) + days_diff = (current_submission_date - best_match["created_at"]).days + same_assignment = best_match["assign_id"] == current_assign_id + + within_window = False + if same_assignment: + within_window = days_diff <= self.resubmission_window_days + + if not same_assignment: + logger.info( + f" SELF-PLAGIARISM (cross-assignment): Reused from assignment '{best_match['assign_id']}' → '{current_assign_id}' (similarity: {similarity:.4f})" + ) + elif same_assignment and not within_window: + logger.info( + f" SELF-PLAGIARISM (late resubmission): Same assignment '{current_assign_id}' after {days_diff} days (window: {self.resubmission_window_days} days, similarity: {similarity:.4f})" + ) + elif same_assignment and within_window: + logger.info( + f" RESUBMISSION within window: Same assignment '{current_assign_id}', {days_diff} days ago (similarity: {similarity:.4f}) - will check peer/reference" + ) + + return { + "is_match": True, + "matched_submission_ids": matched_ids, + "matched_assign_ids": matched_assign_ids, + "matched_image_urls": matched_image_urls, + "best_similarity": similarity, + "days_since_last": days_diff, + "same_assignment": same_assignment, + "within_resubmission_window": within_window, + "previous_submission_date": best_match["created_at"], + "previous_assign_id": best_match["assign_id"], + "first_submission_date_for_image": first_submission_date_for_image, + } + + def determine_role(self, similarity: float, match_type: str) -> str: + """ + Determine role based on individual match similarity score + + Role classification per match (not submission-level): + - peer_exact_match: >= 0.95 (exact duplicate) + - peer_near_duplicate: >= 0.90 (near duplicate) + - peer_semantic_match: >= 0.80 (semantic match) + - self_copy: self-plagiarism + - reference_copy: reference database match + + Args: + similarity: Individual match similarity score (0.0 to 1.0) + match_type: Type of match ('peer', 'self', 'reference') + + Returns: + Role string for similar_sources array + """ + if match_type == "self": + return "self_copy" + elif match_type == "reference": + return "reference_copy" + else: # peer matches + if similarity >= self.exact_dup_threshold: # >= 0.95 + return "peer_exact_match" + elif similarity >= self.near_dup_threshold: # >= 0.90 + return "peer_near_duplicate" + else: # >= 0.80 + return "peer_semantic_match" + + def format_results( + self, + submission_id: str, + assign_id: str, + student_id: str, + image_url: str, + plagiarism_status: dict, + ) -> str: + """ + Format plagiarism results as JSON for feedback queue. + + Returns JSON string with submission results and similar sources. + """ + try: + similar_sources = [] + + if plagiarism_status.get("is_plagiarized", False): + if plagiarism_status.get("peer_plagiarism_detected"): + similar_sources.extend(self._format_peer_matches(plagiarism_status)) + + if plagiarism_status.get("self_plagiarism_detected"): + similar_sources.extend( + self._format_self_matches(plagiarism_status, student_id) + ) + + if plagiarism_status.get("matched_reference_ids"): + similar_sources.extend( + self._format_reference_matches(plagiarism_status) + ) + + is_plagiarized = plagiarism_status.get("is_plagiarized", False) + + message = { + "submission_id": submission_id, + "student_id": student_id, + "assignment_id": assign_id, + "image_url": image_url, + "is_plagiarized": is_plagiarized, + "match_type": plagiarism_status["match_type"], + } + + if is_plagiarized: + message["similarity_score"] = plagiarism_status["similarity_score"] + message["plagiarism_source"] = plagiarism_status["plagiarism_source"] + message["similar_sources"] = similar_sources + + if plagiarism_status.get("is_ai_generated", False): + message["is_ai_generated"] = True + message["ai_detection_source"] = plagiarism_status.get( + "ai_detection_source" + ) + message["ai_confidence"] = plagiarism_status.get("ai_confidence", 0.0) + + # payload_preview = json.dumps(message, indent=2)[:2000] + # logger.info(f"Result payload preview (2000 chars):\n{payload_preview}...") + + + return json.dumps(message) + + except Exception as e: + logger.error(f"Failed to format results: {e}", exc_info=True) + return json.dumps({"error": str(e)}) + + def _format_peer_matches(self, plagiarism_status: dict) -> list: + """Format peer plagiarism matches for similar_sources.""" + matches = [] + matched_ids = plagiarism_status.get("matched_peer_submission_ids", []) + matched_students = plagiarism_status.get("matched_peer_student_ids", []) + matched_assigns = plagiarism_status.get("matched_peer_assign_ids", []) + matched_urls = plagiarism_status.get("matched_peer_image_urls", []) + matched_scores = plagiarism_status.get("matched_peer_similarity_scores", []) + + for i in range(len(matched_ids)): + similarity = matched_scores[i] if i < len(matched_scores) else 0.0 + matches.append( + { + "submission_id": str(matched_ids[i]) + if i < len(matched_ids) + else None, + "student_id": str(matched_students[i]) + if i < len(matched_students) + else None, + "assignment_id": str(matched_assigns[i]) + if i < len(matched_assigns) + else "N/A", + "image_url": matched_urls[i] if i < len(matched_urls) else "", + "similarity_score": similarity, + "role": self.determine_role(similarity, "peer"), + } + ) + + return matches + + def _format_self_matches(self, plagiarism_status: dict, student_id: str) -> list: + """Format self-plagiarism matches for similar_sources.""" + matches = [] + matched_self_ids = plagiarism_status.get("matched_self_submission_ids", []) + matched_self_assigns = plagiarism_status.get("matched_peer_assign_ids", []) + matched_self_urls = plagiarism_status.get("matched_self_image_urls", []) + similarity = plagiarism_status["similarity_score"] + + for i, self_id in enumerate(matched_self_ids): + matches.append( + { + "submission_id": str(self_id), + "student_id": str(student_id), + "assignment_id": str(matched_self_assigns[i]) + if i < len(matched_self_assigns) + else "N/A", + "image_url": matched_self_urls[i] + if i < len(matched_self_urls) + else "", + "similarity_score": similarity, + "role": "self_copy", + } + ) + + return matches + + def _format_reference_matches(self, plagiarism_status: dict) -> list: + """Format reference database matches for similar_sources.""" + matches = [] + matched_ref_ids = plagiarism_status.get("matched_reference_ids", []) + matched_ref_urls = plagiarism_status.get("matched_reference_image_urls", []) + similarity = plagiarism_status["similarity_score"] + + for i, ref_id in enumerate(matched_ref_ids): + matches.append( + { + "reference_id": str( + ref_id + ), # REF-V2-00211 format from reference_images table + "image_url": matched_ref_urls[i] + if i < len(matched_ref_urls) + else "", + "similarity_score": similarity, + "role": "reference_copy", + } + ) + + return matches diff --git a/monitoring.py b/monitoring.py new file mode 100644 index 0000000..ab1dcaf --- /dev/null +++ b/monitoring.py @@ -0,0 +1,163 @@ +# tap_plg/monitoring.py +# +# Structured JSON logging for tap_plg. +# +# Unlike tap_lms and rag_service (Frappe apps), tap_plg uses Python's +# standard logging module. This module provides: +# +# 1. StructuredJsonFormatter — replaces the default plain-text formatter. +# Swap it in once in app.py and EVERY existing logger.* call across +# the entire codebase emits JSON automatically. Zero changes elsewhere. +# +# 2. emit() — for explicit structured log lines with extra fields +# (submission_id, step timings, etc.) where the standard logger +# doesn't carry enough context. + +import json +import logging +import time +import traceback as tb +from datetime import datetime, timezone +from typing import Any, Dict, Optional + + +# Fields that are part of LogRecord internals — excluded from the JSON output +# to avoid noise. +_EXCLUDED_LOG_RECORD_FIELDS = frozenset({ + "msg", "args", "levelname", "name", "exc_info", "exc_text", + "stack_info", "lineno", "funcName", "created", "msecs", + "relativeCreated", "thread", "threadName", "processName", + "process", "message", "asctime", "filename", "module", "pathname", +}) + + +class StructuredJsonFormatter(logging.Formatter): + """ + Replaces the default log formatter with structured JSON output. + + GCP Cloud Logging parses the `severity` and `message` fields automatically. + All extra fields passed via logger.info("msg", extra={...}) are included + as top-level JSON fields and become queryable in Log Explorer. + + Usage in app.py: + from tap_plg.monitoring import StructuredJsonFormatter + + handler = logging.StreamHandler() + handler.setFormatter(StructuredJsonFormatter(app_name="tap_plg")) + logging.root.setLevel(logging.INFO) + logging.root.addHandler(handler) + """ + + # Map Python log level names to GCP severity labels + _SEVERITY_MAP = { + "DEBUG": "DEBUG", + "INFO": "INFO", + "WARNING": "WARNING", + "ERROR": "ERROR", + "CRITICAL": "CRITICAL", + } + + def __init__(self, app_name: str = "tap_plg"): + super().__init__() + self.app_name = app_name + # Read once at startup — APP_ENV is stable for the process lifetime + import os + self.app_env = os.environ.get("APP_ENV", "unknown") + + def format(self, record: logging.LogRecord) -> str: + payload: Dict[str, Any] = { + "severity": self._SEVERITY_MAP.get(record.levelname, record.levelname), + "message": record.getMessage(), + "logger": record.name, + "timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(), + "app": self.app_name, + "app_env": self.app_env, + } + + # Include exception traceback if present + if record.exc_info: + payload["traceback"] = self.formatException(record.exc_info) + + # Include any extra fields passed via logger.info(..., extra={...}) + for key, val in record.__dict__.items(): + if key not in _EXCLUDED_LOG_RECORD_FIELDS: + payload[key] = val + + try: + return json.dumps(payload, ensure_ascii=False, default=str) + except Exception: + # Fallback to plain text if JSON serialisation fails + return f'{{"severity":"ERROR","message":"log serialisation failed","raw":"{record.getMessage()}"}}' + + +def emit(logger: logging.Logger, severity: str, message: str, **kwargs) -> None: + """ + Emit a structured log line with explicit extra fields. + + Args: + logger: the module-level logger (logging.getLogger(__name__)) + severity: "info" | "warning" | "error" + message: short machine-readable event name + **kwargs: extra fields (submission_id, step, duration_ms, etc.) + """ + try: + level = getattr(logging, severity.upper(), logging.INFO) + logger.log(level, message, extra={"message_event": message, **kwargs}) + except Exception: + pass + + +# ── Convenience wrappers for pipeline events ────────────────────────────────── + +def record_submission_received(logger: logging.Logger, submission_id: str, student_id: str = None) -> None: + emit(logger, "info", "plg_submission_received", + submission_id=submission_id, student_id=student_id) + + +def record_detection_step( + logger: logging.Logger, + submission_id: str, + step: str, + status: str, + duration_ms: float, + result: str = None, + error: str = None, +) -> None: + """ + Emit one log line per detection step. + + step: "image_validation" | "hash_check" | "ai_detection" | + "clip_similarity" | "pgvector_search" + status: "complete" | "failed" | "skip" + result: e.g. "original", "near_duplicate", "ai_generated" + """ + emit( + logger, + "info" if status != "failed" else "error", + f"detection_step_{status}", + submission_id=submission_id, + step=step, + duration_ms=round(duration_ms, 2), + result=result, + error=error, + ) + + +def record_result_published( + logger: logging.Logger, + submission_id: str, + plagiarism_status: str, + is_plagiarized: bool, + is_ai_generated: bool, + total_duration_ms: float, +) -> None: + emit( + logger, + "info", + "plg_result_published", + submission_id=submission_id, + plagiarism_status=plagiarism_status, + is_plagiarized=is_plagiarized, + is_ai_generated=is_ai_generated, + total_duration_ms=round(total_duration_ms, 2), + ) diff --git a/plag_checker/submissions_checker.py b/plag_checker/submissions_checker.py index 84f33f3..8b5cab8 100644 --- a/plag_checker/submissions_checker.py +++ b/plag_checker/submissions_checker.py @@ -1,409 +1,427 @@ -import json -import logging -import os - -from plag_checker.submission_status import SubmissionStatus -from mq.mq_client import MQClient -from database.db_manager import DatabaseManager -from processors.image_processor import ImageProcessor - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from image_worker.worker import ImageWorker - -logger = logging.getLogger(__name__) - - -class MessageAckManager: - """ - Context manager to ensure message acknowledgment happens exactly once. - Prevents race conditions and message loss. - """ - - def __init__(self, message): - self.message = message - self.acked = False - self.action = None # Will be 'ack', 'nack', or 'reject' - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - """Ensure message is acknowledged exactly once on exit.""" - if not self.acked: - # Default behavior: requeue unhandled messages to avoid message loss. - logger.warning("Message not explicitly acknowledged, requeueing") - try: - await self.message.nack(requeue=True) - self.acked = True - self.action = "nack(cleanup, requeue=True)" - except Exception as e: - logger.critical(f"CRITICAL: Failed to nack message in cleanup: {e}") - # DO NOT set self.acked = True here - let finally block handle it - return False # Don't suppress exceptions - - async def ack(self): - """Acknowledge successful processing.""" - if self.acked: - logger.warning("Attempted to ack already-acknowledged message") - return - await self.message.ack() - self.acked = True - self.action = "ack" - - async def nack(self, requeue: bool = True): - """Negative acknowledgment (for retry).""" - if self.acked: - logger.warning("Attempted to nack already-acknowledged message") - return - await self.message.nack(requeue=requeue) - self.acked = True - self.action = f"nack(requeue={requeue})" - - async def reject(self, requeue: bool = False): - """Reject message (typically for poison messages).""" - if self.acked: - logger.warning("Attempted to reject already-acknowledged message") - return - await self.message.reject(requeue=requeue) - self.acked = True - self.action = f"reject(requeue={requeue})" - - -class SubmissionChecker: - def __init__( - self, - mq_client: MQClient, - db_manager: "DatabaseManager | None" = None, - image_worker: "ImageWorker | None" = None, - startup_retry_delay: int = 30, - ): - self.client = mq_client - self.db = db_manager if db_manager else DatabaseManager() - self._owns_db_manager = db_manager is None - - # Initialize processors with shared resources - # Image worker will be initialized in initialize() if not provided - self.image_worker = image_worker - self._owns_image_worker = image_worker is None - - self.image_processor = None # Will be set after image_worker initialization - - self.STARTUP_RETRY_DELAY = startup_retry_delay - self.MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3")) - self._shutdown = False - - async def initialize(self): - await self.start_db() - - # Initialize ImageWorker if not provided (load models once) - if self._owns_image_worker: - from image_worker.worker import ImageWorker - - self.image_worker = ImageWorker(db_manager=self.db) - await self.image_worker.initialize() - - # Now initialize ImageProcessor with the shared worker - self.image_processor = ImageProcessor( - db_manager=self.db, image_worker=self.image_worker - ) - - # Validate system is ready before starting consumer - if self.image_processor is None: - raise RuntimeError("ImageProcessor failed to initialize") - if self.image_worker is None: - raise RuntimeError("ImageWorker failed to initialize") - - logger.info("Submission Checker initialized successfully") - - await self.start_consumer() - - async def process_submission(self, submission): - if self._shutdown: - logger.warning("Shutdown in progress, requeueing message") - try: - await submission.nack(requeue=True) - except Exception as e: - logger.error(f"Failed to nack message during shutdown: {e}") - return - - retry_count = 0 - status = SubmissionStatus.SUBMITTED - submission_id = "unknown" - data = {} - message_acked = False - - try: - body = submission.body - - if isinstance(body, (bytes, bytearray)): - text = body.decode("utf-8", errors="replace") - data = json.loads(text) - elif isinstance(body, str): - data = json.loads(body) - elif isinstance(body, dict): - data = body - else: - try: - data = json.loads(str(body)) - except Exception: - data = {"payload": body} - - submission_id = data.get("submission_id", "unknown") - - redelivered = getattr(submission, "redelivered", False) - delivery_count = 0 - - from collections.abc import Mapping - - if ( - hasattr(submission, "headers") - and isinstance(submission.headers, Mapping) - and submission.headers - ): - delivery_count = submission.headers.get("x-delivery-count", 0) - - if redelivered and delivery_count == 0: - delivery_count = 1 - - if delivery_count > self.MAX_RETRIES: - logger.error( - f"Poison message detected for submission {submission_id}: " - f"delivery_count={delivery_count} exceeds MAX_RETRIES={self.MAX_RETRIES}" - ) - await self.db.update_status( - submission_id, - SubmissionStatus.FAILED, - delivery_count, - f"Poison message: exceeded {self.MAX_RETRIES} redelivery attempts", - ) - - dlq_success = await self.client.publish_to_dlq( - data, - reason=f"Poison message: {delivery_count} redeliveries exceeded MAX_RETRIES={self.MAX_RETRIES}", - ) - if not dlq_success: - logger.error( - f"Failed to send poison message to DLQ for submission {submission_id}; requeueing" - ) +import json +import logging +import os + +from plag_checker.submission_status import SubmissionStatus +from mq.mq_client import MQClient +from database.db_manager import DatabaseManager +from processors.image_processor import ImageProcessor + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from image_worker.worker import ImageWorker + +logger = logging.getLogger(__name__) + + +class MessageAckManager: + """ + Context manager to ensure message acknowledgment happens exactly once. + Prevents race conditions and message loss. + """ + + def __init__(self, message): + self.message = message + self.acked = False + self.action = None # Will be 'ack', 'nack', or 'reject' + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Ensure message is acknowledged exactly once on exit.""" + if not self.acked: + # Default behavior: requeue unhandled messages to avoid message loss. + logger.warning("Message not explicitly acknowledged, requeueing") + try: + await self.message.nack(requeue=True) + self.acked = True + self.action = "nack(cleanup, requeue=True)" + except Exception as e: + logger.critical(f"CRITICAL: Failed to nack message in cleanup: {e}") + # DO NOT set self.acked = True here - let finally block handle it + return False # Don't suppress exceptions + + async def ack(self): + """Acknowledge successful processing.""" + if self.acked: + logger.warning("Attempted to ack already-acknowledged message") + return + await self.message.ack() + self.acked = True + self.action = "ack" + + async def nack(self, requeue: bool = True): + """Negative acknowledgment (for retry).""" + if self.acked: + logger.warning("Attempted to nack already-acknowledged message") + return + await self.message.nack(requeue=requeue) + self.acked = True + self.action = f"nack(requeue={requeue})" + + async def reject(self, requeue: bool = False): + """Reject message (typically for poison messages).""" + if self.acked: + logger.warning("Attempted to reject already-acknowledged message") + return + await self.message.reject(requeue=requeue) + self.acked = True + self.action = f"reject(requeue={requeue})" + + +class SubmissionChecker: + def __init__( + self, + mq_client: MQClient, + db_manager: "DatabaseManager | None" = None, + image_worker: "ImageWorker | None" = None, + startup_retry_delay: int = 30, + ): + self.client = mq_client + self.db = db_manager if db_manager else DatabaseManager() + self._owns_db_manager = db_manager is None + + # Initialize processors with shared resources + # Image worker will be initialized in initialize() if not provided + self.image_worker = image_worker + self._owns_image_worker = image_worker is None + + self.image_processor = None # Will be set after image_worker initialization + + self.STARTUP_RETRY_DELAY = startup_retry_delay + self.MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3")) + self._shutdown = False + + async def initialize(self): + await self.start_db() + + # Initialize ImageWorker if not provided (load models once) + if self._owns_image_worker: + from image_worker.worker import ImageWorker + + self.image_worker = ImageWorker(db_manager=self.db) + await self.image_worker.initialize() + + # Now initialize ImageProcessor with the shared worker + self.image_processor = ImageProcessor( + db_manager=self.db, image_worker=self.image_worker + ) + + # Validate system is ready before starting consumer + if self.image_processor is None: + raise RuntimeError("ImageProcessor failed to initialize") + if self.image_worker is None: + raise RuntimeError("ImageWorker failed to initialize") + + logger.info("Submission Checker initialized successfully") + + await self.start_consumer() + + async def process_submission(self, submission): + if self._shutdown: + logger.warning("Shutdown in progress, requeueing message") + try: + await submission.nack(requeue=True) + except Exception as e: + logger.error(f"Failed to nack message during shutdown: {e}") + return + + retry_count = 0 + status = SubmissionStatus.SUBMITTED + submission_id = "unknown" + data = {} + message_acked = False + + try: + body = submission.body + + if isinstance(body, (bytes, bytearray)): + text = body.decode("utf-8", errors="replace") + data = json.loads(text) + elif isinstance(body, str): + data = json.loads(body) + elif isinstance(body, dict): + data = body + else: + try: + data = json.loads(str(body)) + except Exception: + data = {"payload": body} + + submission_id = data.get("submission_id", "unknown") + + # SRE: pipeline trace — step 1 in tap_plg + _plg_t0 = time.monotonic() + record_submission_received( + logger=logger, + submission_id=submission_id, + student_id=data.get("student_id"), + ) + + redelivered = getattr(submission, "redelivered", False) + delivery_count = 0 + + from collections.abc import Mapping + + if ( + hasattr(submission, "headers") + and isinstance(submission.headers, Mapping) + and submission.headers + ): + delivery_count = submission.headers.get("x-delivery-count", 0) + + if redelivered and delivery_count == 0: + delivery_count = 1 + + if delivery_count > self.MAX_RETRIES: + logger.error( + f"Poison message detected for submission {submission_id}: " + f"delivery_count={delivery_count} exceeds MAX_RETRIES={self.MAX_RETRIES}" + ) + await self.db.update_status( + submission_id, + SubmissionStatus.FAILED, + delivery_count, + f"Poison message: exceeded {self.MAX_RETRIES} redelivery attempts", + ) + + dlq_success = await self.client.publish_to_dlq( + data, + reason=f"Poison message: {delivery_count} redeliveries exceeded MAX_RETRIES={self.MAX_RETRIES}", + ) + if not dlq_success: + logger.error( + f"Failed to send poison message to DLQ for submission {submission_id}; requeueing" + ) await submission.nack(requeue=True) - message_acked = True - return - + message_acked = True + return + await submission.nack(requeue=False) - message_acked = True - return - - submission_url = data.get("submission_url") - submission_type = data.get("submission_type") - - record_id = await self.db.insert_submission_if_not_exists( - data, submission_url or "", status - ) - if submission_type != "image": - logger.info( - f"Skipping plagiarism processing for non-image submission {submission_id}" - ) - elif not submission_url: - logger.error(f"No submission_url in image submission {submission_id}") - await self.db.update_status( - submission_id, - SubmissionStatus.FAILED, - 0, - "No submission_url in image submission", - ) + message_acked = True + return + + submission_url = data.get("submission_url") + submission_type = data.get("submission_type") + + record_id = await self.db.insert_submission_if_not_exists( + data, submission_url or "", status + ) + if submission_type != "image": + logger.info( + f"Skipping plagiarism processing for non-image submission {submission_id}" + ) + elif not submission_url: + logger.error(f"No submission_url in image submission {submission_id}") + await self.db.update_status( + submission_id, + SubmissionStatus.FAILED, + 0, + "No submission_url in image submission", + ) await submission.nack(requeue=False) - message_acked = True - return - - retry_count = await self.db.get_retry_count(record_id) - if not isinstance(retry_count, int): - retry_count = 0 - - data["db_record_id"] = str(record_id) # Convert UUID to string - - submission_type = data.get("submission_type") - if submission_type == "image": - try: - processor = self.get_processor(data) - except RuntimeError as init_error: - logger.error(f"System not ready: {init_error}") - await self.db.update_status( - submission_id, - SubmissionStatus.FAILED, - 0, - f"System initialization error: {str(init_error)}", - ) + message_acked = True + return + + retry_count = await self.db.get_retry_count(record_id) + if not isinstance(retry_count, int): + retry_count = 0 + + data["db_record_id"] = str(record_id) # Convert UUID to string + + submission_type = data.get("submission_type") + if submission_type == "image": + try: + processor = self.get_processor(data) + except RuntimeError as init_error: + logger.error(f"System not ready: {init_error}") + await self.db.update_status( + submission_id, + SubmissionStatus.FAILED, + 0, + f"System initialization error: {str(init_error)}", + ) await submission.nack(requeue=True) - message_acked = True - return - result_text = await processor.process(data) - else: - result_text = { - "similar_sources": [], - "similarity_score": 0.0, - "is_plagiarized": False, - "match_type": "original", - "is_ai_generated": False, - "ai_detection_source": "", - "ai_confidence": 0.0, - "plagiarism_source": "", - } - - try: - if isinstance(result_text, (str, bytes, bytearray)): - result_text = json.loads(result_text) - - if not isinstance(result_text, dict): - raise ValueError("Processing result is not a JSON object") - - if result_text.get("error") is not None: - error_msg = result_text.get("error", "Unknown error") - logger.warning( - f"Invalid result for submission {submission_id}: {error_msg}" - ) - raise ValueError(f"Invalid result: {error_msg}") - except (json.JSONDecodeError, ValueError) as parse_error: - logger.warning( - f"Invalid result for submission {submission_id}: {parse_error}" - ) - raise RuntimeError(f"Invalid processing result: {parse_error}") from parse_error - - data["similar_sources"] = result_text.get("similar_sources") - data["similarity_score"] = result_text.get("similarity_score") - data["is_plagiarized"] = result_text.get("is_plagiarized") - data["match_type"] = result_text.get("match_type") - data["assignment_id"] = data.pop("assign_id") - data["is_ai_generated"] = result_text.get("is_ai_generated", False) - data["ai_detection_source"] = result_text.get("ai_detection_source", "") - data["ai_confidence"] = result_text.get("ai_confidence", 0.0) - data["plagiarism_source"] = result_text.get("plagiarism_source", "") - - publish_data = {k: v for k, v in data.items() if k != "db_record_id"} - - message_to_update_db = { - "similar_sources": data["similar_sources"], - "similarity_score": data["similarity_score"], - } - - await self.db.update_result( - submission_id, message_to_update_db, SubmissionStatus.EVALUATED - ) - status = SubmissionStatus.EVALUATED - await self.client.publish_message(publish_data) - - await submission.ack() - message_acked = True - - except json.JSONDecodeError as e: - logger.error(f"Invalid JSON in message: {e}") + message_acked = True + return + result_text = await processor.process(data) + else: + result_text = { + "similar_sources": [], + "similarity_score": 0.0, + "is_plagiarized": False, + "match_type": "original", + "is_ai_generated": False, + "ai_detection_source": "", + "ai_confidence": 0.0, + "plagiarism_source": "", + } + + try: + if isinstance(result_text, (str, bytes, bytearray)): + result_text = json.loads(result_text) + + if not isinstance(result_text, dict): + raise ValueError("Processing result is not a JSON object") + + if result_text.get("error") is not None: + error_msg = result_text.get("error", "Unknown error") + logger.warning( + f"Invalid result for submission {submission_id}: {error_msg}" + ) + raise ValueError(f"Invalid result: {error_msg}") + except (json.JSONDecodeError, ValueError) as parse_error: + logger.warning( + f"Invalid result for submission {submission_id}: {parse_error}" + ) + raise RuntimeError(f"Invalid processing result: {parse_error}") from parse_error + + data["similar_sources"] = result_text.get("similar_sources") + data["similarity_score"] = result_text.get("similarity_score") + data["is_plagiarized"] = result_text.get("is_plagiarized") + data["match_type"] = result_text.get("match_type") + data["assignment_id"] = data.pop("assign_id") + data["is_ai_generated"] = result_text.get("is_ai_generated", False) + data["ai_detection_source"] = result_text.get("ai_detection_source", "") + data["ai_confidence"] = result_text.get("ai_confidence", 0.0) + data["plagiarism_source"] = result_text.get("plagiarism_source", "") + + publish_data = {k: v for k, v in data.items() if k != "db_record_id"} + + message_to_update_db = { + "similar_sources": data["similar_sources"], + "similarity_score": data["similarity_score"], + } + + await self.db.update_result( + submission_id, message_to_update_db, SubmissionStatus.EVALUATED + ) + status = SubmissionStatus.EVALUATED + await self.client.publish_message(publish_data) + + # SRE: pipeline trace — result published + record_result_published( + logger=logger, + submission_id=submission_id, + plagiarism_status=data.get("match_type", "original"), + is_plagiarized=data.get("is_plagiarized", False), + is_ai_generated=data.get("is_ai_generated", False), + total_duration_ms=(time.monotonic() - _plg_t0) * 1000, + ) + + await submission.ack() + message_acked = True + + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in message: {e}") await submission.nack(requeue=False) - message_acked = True - - except RuntimeError as e: - if "pool is closed" in str(e).lower(): - logger.warning( - f"Database pool closed during processing of {submission_id}, " - f"rejecting message (shutdown={self._shutdown})" - ) + message_acked = True + + except RuntimeError as e: + if "pool is closed" in str(e).lower(): + logger.warning( + f"Database pool closed during processing of {submission_id}, " + f"rejecting message (shutdown={self._shutdown})" + ) await submission.nack(requeue=True) - message_acked = True - else: - raise - - except Exception as e: - logger.error( - f"Error processing submission {submission_id}: type={type(e).__name__}, message={str(e)}", - exc_info=True, - ) - try: - rc = int(retry_count or 0) - - if rc < self.MAX_RETRIES: - await self.db.update_status( - submission_id or "unknown", - status, - rc + 1, - f"Retry {rc + 1}/{self.MAX_RETRIES}: {str(e)[:200]}", - ) + message_acked = True + else: + raise + + except Exception as e: + logger.error( + f"Error processing submission {submission_id}: type={type(e).__name__}, message={str(e)}", + exc_info=True, + ) + try: + rc = int(retry_count or 0) + + if rc < self.MAX_RETRIES: + await self.db.update_status( + submission_id or "unknown", + status, + rc + 1, + f"Retry {rc + 1}/{self.MAX_RETRIES}: {str(e)[:200]}", + ) await submission.nack(requeue=True) - message_acked = True - else: - await self.db.update_status( - submission_id or "unknown", - SubmissionStatus.FAILED, - rc, - f"Max retries ({self.MAX_RETRIES}) exceeded: {str(e)[:200]}", - ) - - dlq_success = await self.client.publish_to_dlq( - data or {}, - reason=f"Max retries ({rc}) exceeded: {str(e)[:100]}", - ) - if not dlq_success: - logger.error( - f"Failed to send max-retry message to DLQ for submission {submission_id}; requeueing" - ) + message_acked = True + else: + await self.db.update_status( + submission_id or "unknown", + SubmissionStatus.FAILED, + rc, + f"Max retries ({self.MAX_RETRIES}) exceeded: {str(e)[:200]}", + ) + + dlq_success = await self.client.publish_to_dlq( + data or {}, + reason=f"Max retries ({rc}) exceeded: {str(e)[:100]}", + ) + if not dlq_success: + logger.error( + f"Failed to send max-retry message to DLQ for submission {submission_id}; requeueing" + ) await submission.nack(requeue=True) - message_acked = True - return - + message_acked = True + return + await submission.nack(requeue=False) - message_acked = True - logger.warning( - f"Message discarded after {rc} retries and sent to DLQ" - ) - except Exception as nack_error: - logger.error( - f"Failed to handle error for {submission_id}: {nack_error}" - ) - # Don't re-raise - message will be handled in finally block - - finally: - # Final safety net: ensure message is acknowledged - if not message_acked: - logger.warning( - f"Emergency requeue for un-acknowledged message {submission_id}" - ) + message_acked = True + logger.warning( + f"Message discarded after {rc} retries and sent to DLQ" + ) + except Exception as nack_error: + logger.error( + f"Failed to handle error for {submission_id}: {nack_error}" + ) + # Don't re-raise - message will be handled in finally block + + finally: + # Final safety net: ensure message is acknowledged + if not message_acked: + logger.warning( + f"Emergency requeue for un-acknowledged message {submission_id}" + ) await submission.nack(requeue=True) - message_acked = True - - def get_processor(self, data: dict): - """ - Get the appropriate processor for the submission data. - - Raises: - RuntimeError: If processor is not initialized - ValueError: If data format is invalid - """ - submission_type = data.get("submission_type") - if submission_type == "image": - if self.image_processor is None: - raise RuntimeError( - "ImageProcessor not initialized - initialize() must be called before processing messages" - ) - return self.image_processor - if submission_type is None: - raise ValueError("Invalid submission format: missing submission_type") - raise ValueError( - f"Unsupported submission_type for processing: {data.get('submission_type')}" - ) - - async def start_consumer(self): - await self.client.start_consumer(self.process_submission) - - async def start_db(self): - await self.db.init_pool() - - async def close(self): - self._shutdown = True - - await self.client.close() - - # Close ImageWorker if we own it - if self._owns_image_worker and self.image_worker is not None: - await self.image_worker.close() - - if self._owns_db_manager: - await self.db.close() + message_acked = True + + def get_processor(self, data: dict): + """ + Get the appropriate processor for the submission data. + + Raises: + RuntimeError: If processor is not initialized + ValueError: If data format is invalid + """ + submission_type = data.get("submission_type") + if submission_type == "image": + if self.image_processor is None: + raise RuntimeError( + "ImageProcessor not initialized - initialize() must be called before processing messages" + ) + return self.image_processor + if submission_type is None: + raise ValueError("Invalid submission format: missing submission_type") + raise ValueError( + f"Unsupported submission_type for processing: {data.get('submission_type')}" + ) + + async def start_consumer(self): + await self.client.start_consumer(self.process_submission) + + async def start_db(self): + await self.db.init_pool() + + async def close(self): + self._shutdown = True + + await self.client.close() + + # Close ImageWorker if we own it + if self._owns_image_worker and self.image_worker is not None: + await self.image_worker.close() + + if self._owns_db_manager: + await self.db.close()