From c924a1debc6b5e84e71eccc9c5b373ac501c04cc Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Sun, 22 Mar 2026 04:48:24 +0500 Subject: [PATCH 01/10] updates to compute worker to not duplicate submission files and to use submission during both ingestion and scoring --- .env_sample | 2 +- compute_worker/compute_worker.py | 384 +++++++++++++++++++++++++------ src/apps/competitions/tasks.py | 7 +- 3 files changed, 316 insertions(+), 77 deletions(-) diff --git a/.env_sample b/.env_sample index 4dab205b6..0100ffc37 100644 --- a/.env_sample +++ b/.env_sample @@ -69,7 +69,7 @@ AWS_STORAGE_PRIVATE_BUCKET_NAME=private AWS_S3_ENDPOINT_URL=http://minio:9000/ AWS_QUERYSTRING_AUTH=False # Optional URL rewriting in compute worker, format: FROM | TO -#WORKER_BUNDLE_URL_REWRITE=http://localhost:9000|http://minio:9000 +#WORKER_BUNDLE_URL_REWRITE=http://localhost:9000/|http://minio:9000/ # ----------------------------------------------------------------------------- diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 573e72a94..276f602cf 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -10,33 +10,63 @@ import tempfile import time import uuid +import requests +import websockets +import yaml +import docker +import logging +import sys # This is only needed for the pytests to pass from shutil import make_archive from urllib.error import HTTPError from urllib.parse import urlparse from urllib.request import urlretrieve from zipfile import ZipFile, BadZipFile -import docker -from rich.progress import Progress +from urllib3 import Retry + from rich.pretty import pprint -import requests -import websockets -import yaml -from billiard.exceptions import SoftTimeLimitExceeded -from celery import Celery, shared_task, utils +from rich.progress import Progress from kombu import Queue, Exchange -from urllib3 import Retry +from celery import Celery, shared_task, utils, signals +from billiard.exceptions import SoftTimeLimitExceeded -# This is only needed for the pytests to pass -import sys +from logs_loguru import configure_logging, colorize_run_args + +logger = logging.getLogger(__name__) sys.path.append("/app/src/settings/") -from celery import signals -import logging -logger = logging.getLogger(__name__) -from logs_loguru import configure_logging, colorize_run_args -import json +# ----------------------------------------------- +# CONSTANTS +# ----------------------------------------------- +class ProgramKind: + INGESTION_PROGRAM = "ingestion_program" + SCORING_PROGRAM = "scoring_program" + + +# ----------------------------------------------- +# Submission status +# ----------------------------------------------- +class SubmissionStatus: + NONE = "None" + SUBMITTING = "Submitting" + SUBMITTED = "Submitted" + PREPARING = "Preparing" + RUNNING = "Running" + SCORING = "Scoring" + FINISHED = "Finished" + FAILED = "Failed" + + AVAILABLE_STATUSES = ( + NONE, + SUBMITTING, + SUBMITTED, + PREPARING, + RUNNING, + SCORING, + FINISHED, + FAILED, + ) # ----------------------------------------------- @@ -182,30 +212,6 @@ def setup_celery_logging(**kwargs): MAX_CACHE_DIR_SIZE_GB = float(os.environ.get("MAX_CACHE_DIR_SIZE_GB", 10)) -# ----------------------------------------------- -# Submission status -# ----------------------------------------------- -# Status options for submissions -STATUS_NONE = "None" -STATUS_SUBMITTING = "Submitting" -STATUS_SUBMITTED = "Submitted" -STATUS_PREPARING = "Preparing" -STATUS_RUNNING = "Running" -STATUS_SCORING = "Scoring" -STATUS_FINISHED = "Finished" -STATUS_FAILED = "Failed" -AVAILABLE_STATUSES = ( - STATUS_NONE, - STATUS_SUBMITTING, - STATUS_SUBMITTED, - STATUS_PREPARING, - STATUS_RUNNING, - STATUS_SCORING, - STATUS_FINISHED, - STATUS_FAILED, -) - - # ----------------------------------------------- # Exceptions # ----------------------------------------------- @@ -265,11 +271,11 @@ def run_wrapper(run_args): msg = f"Docker image pull failed: {msg}" else: msg = "Docker image pull failed." - run._update_status(STATUS_FAILED, extra_information=msg) + run._update_status(SubmissionStatus.FAILED, extra_information=msg) raise except SoftTimeLimitExceeded: run._update_status( - STATUS_FAILED, + SubmissionStatus.FAILED, extra_information="Execution time limit exceeded.", ) raise @@ -279,11 +285,11 @@ def run_wrapper(run_args): msg = f"Submission failed: {msg}. See logs for more details." else: msg = "Submission failed. See logs for more details." - run._update_status(STATUS_FAILED, extra_information=msg) + run._update_status(SubmissionStatus.FAILED, extra_information=msg) raise except Exception as e: # Catch any exception to avoid getting stuck in Running status - run._update_status(STATUS_FAILED, extra_information=traceback.format_exc()) + run._update_status(SubmissionStatus.FAILED, extra_information=traceback.format_exc()) raise finally: try: @@ -420,9 +426,11 @@ def __init__(self, run_args): self._get_stdout_stderr_file_names(run_args) ) self.ingestion_container_name = f"ingestion_{self.run_related_name}" - self.program_container_name = f"scoring_{self.run_related_name}" - self.program_data = run_args.get("program_data") - self.ingestion_program_data = run_args.get("ingestion_program") + self.scoring_program_container_name = f"scoring_{self.run_related_name}" + # self.program_data = run_args.get("program_data") + self.scoring_program_data = run_args.get("scoring_program_data") + self.submission_data = run_args.get("submission_data") + self.ingestion_program_data = run_args.get("ingestion_program_data") self.input_data = run_args.get("input_data") self.reference_data = run_args.get("reference_data") self.ingestion_only_during_scoring = run_args.get( @@ -573,9 +581,9 @@ def _update_submission(self, data): def _update_status(self, status, extra_information=None): # Update submission status - if status not in AVAILABLE_STATUSES: + if status not in SubmissionStatus.AVAILABLE_STATUSES: raise SubmissionException( - f"Status '{status}' is not in available statuses: {AVAILABLE_STATUSES}" + f"Status '{status}' is not in available statuses: {SubmissionStatus.AVAILABLE_STATUSES}" ) data = {"status": status, "status_details": extra_information} try: @@ -717,6 +725,108 @@ def _get_bundle(self, url, destination, cache=True): # Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it return bundle_file + def _create_container( + self, + container_name: str, + command: str, + volumes_host: list, + volumes_config: dict + ): + """ + Helper to create and configure a container for ingestion, scoring, or submission. + Returns the container object. + """ + logger.info("Creating container with multiple configurations") + + cap_drop_list = [ + "AUDIT_WRITE", + "CHOWN", + "DAC_OVERRIDE", + "FOWNER", + "FSETID", + "KILL", + "MKNOD", + "NET_BIND_SERVICE", + "NET_RAW", + "SETFCAP", + "SETGID", + "SETPCAP", + "SETUID", + "SYS_CHROOT", + ] + + # Configure whether or not we use the GPU. Also setting auto_remove to False because + if os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").lower() == "docker": + security_options = ["no-new-privileges"] + else: + security_options = ["label=disable"] + + # Setting the device ID like this allows users to specify which gpu to use in the .env file, with all being the default if no value is given + device_id = [os.environ.get("GPU_DEVICE", "nvidia.com/gpu=all")] + if os.environ.get("USE_GPU", "false").lower() == "true": + logger.info("Container configured with GPU capabilities") + host_config = client.create_host_config( + auto_remove=False, + cap_drop=cap_drop_list, + binds=volumes_config, + userns_mode="host", + security_opt=security_options, + device_requests=[ + { + "Driver": "cdi", + "DeviceIDs": device_id, + }, + ], + ) + else: + logger.info("Container configured with CPU capabilities") + host_config = client.create_host_config( + auto_remove=False, + cap_drop=cap_drop_list, + binds=volumes_config, + userns_mode="host", + security_opt=security_options, + ) + + # Disable or not the competition container access to Internet (False by default) + container_network_disabled = os.environ.get( + "COMPETITION_CONTAINER_NETWORK_DISABLED", "" + ) + + # HTTP and HTTPS proxy for the competition container if needed + competition_container_proxy_http = os.environ.get( + "COMPETITION_CONTAINER_HTTP_PROXY", "" + ) + competition_container_proxy_http = ( + "http_proxy=" + competition_container_proxy_http + ) + + competition_container_proxy_https = os.environ.get( + "COMPETITION_CONTAINER_HTTPS_PROXY", "" + ) + competition_container_proxy_https = ( + "https_proxy=" + competition_container_proxy_https + ) + + # Creating container + container = client.create_container( + self.container_image, + name=container_name, + host_config=host_config, + detach=False, + volumes=volumes_host, + command=command, + working_dir="/app/program", + environment=[ + "PYTHONUNBUFFERED=1", + competition_container_proxy_http, + competition_container_proxy_https, + ], + network_disabled=container_network_disabled.lower() == "true", + ) + + return container + async def _run_container_engine_cmd(self, container, kind): """This runs a command and asynchronously writes the data to both a storage file and a socket @@ -775,8 +885,7 @@ async def _run_container_engine_cmd(self, container, kind): # If we enter the for loop after the container exited, the program will get stuck if ( - client.inspect_container(container)["State"]["Status"].lower() - == "running" + client.inspect_container(container)["State"]["Status"].lower() == "running" ): logger.debug( "Show the logs and stream them to codabench " + container.get("Id") @@ -793,7 +902,7 @@ async def _run_container_engine_cmd(self, container, kind): ) except Exception as e: logger.error(e) - + # Errors elif log[1] is not None: stderr_chunks.append(log[1]) @@ -885,7 +994,7 @@ def _get_host_path(self, *paths): path = os.path.join(*paths) # pull front of path, which points to the location inside the container - path = path[len(BASE_DIR) :] + path = path[len(BASE_DIR):] # add host to front, so when we run commands in the container on the host they # can be seen properly @@ -1079,7 +1188,7 @@ async def _run_program_directory(self, program_dir, kind): container_name = ( self.ingestion_container_name if kind == "ingestion" - else self.program_container_name + else self.scoring_program_container_name ) # Disable or not the competition container access to Internet (False by default) container_network_disabled = os.environ.get( @@ -1126,6 +1235,101 @@ async def _run_program_directory(self, program_dir, kind): logger.exception("Program directory execution failed") raise SubmissionException(str(e)) + async def _run_ingestion_program_directory(self, program_dir): + """ + Run ingestion program directory. + + Args: + program_dir: path to ingestion program + """ + # Return if directory does not exist + if not os.path.exists(program_dir): + logger.warning(f"{program_dir} not found, no program to execute") + # Communicate that the program is closing + self.completed_program_counter += 1 + return + + # Find metadata file. Raise error if metadata is not founc + if os.path.exists(os.path.join(program_dir, "metadata.yaml")): + metadata_path = "metadata.yaml" + elif os.path.exists(os.path.join(program_dir, "metadata")): + metadata_path = "metadata" + else: + raise SubmissionException( + "Ingestion program directory missing 'metadata.yaml/metadata'" + ) + + logger.info(f"Metadata path is {os.path.join(program_dir, metadata_path)}") + with open(os.path.join(program_dir, metadata_path), "r") as metadata_file: + # try to find a command in the metadata, in other cases set metadata to None + try: + metadata = yaml.load(metadata_file.read(), Loader=yaml.FullLoader) + logger.info(f"Metadata contains:\n {metadata}") + if isinstance(metadata, dict): + command = metadata.get("command") + else: + command = None + except yaml.YAMLError as e: + logger.error("Error parsing YAML file: ", e) + print("Error parsing YAML file: ", e) + command = None + + if not command: + raise SubmissionException( + "Missing 'command' in metadata or metadata format is not correct!" + ) + + volumes_host = [ + self._get_host_path(program_dir), + self._get_host_path(self.output_dir), + self.data_dir, + self._get_host_path(self.root_dir, "program") + ] + volumes_config = { + volumes_host[0]: {"bind": "/app/program", "mode": "z"}, + volumes_host[1]: {"bind": "/app/output", "mode": "z"}, + volumes_host[2]: {"bind": "/app/data", "mode": "ro"}, + volumes_host[3]: {"bind": "/app/ingested_program"} + } + + if self.input_data: + volumes_host.append(self._get_host_path(self.root_dir, "input_data")) + volumes_config.update({volumes_host[-1]: {"bind": "/app/input_data"}}) + + # Handle Legacy competitions by replacing anything in the run command + command = replace_legacy_metadata_command( + command=command, + kind=ProgramKind.INGESTION_PROGRAM, + is_scoring=self.is_scoring, + ingestion_only_during_scoring=self.ingestion_only_during_scoring, + ) + logger.info("Container will be run with command: " + command) + + # Create container with configurations + container = self._create_container( + container_name=self.ingestion_container_name, + command=command, + volumes_host=volumes_host, + volumes_config=volumes_config + ) + logger.debug("Created container: " + str(container)) + logger.info("Volume configuration of the container: ") + pprint(volumes_config) + # This runs the container engine command and asynchronously passes data back via websocket + try: + return await self._run_container_engine_cmd(container, kind=ProgramKind.INGESTION_PROGRAM) + except Exception as e: + logger.exception("Program directory execution failed") + raise SubmissionException(str(e)) + + async def _run_scoring_program_directory(self, program_dir): + logger.error("[-] Run Scoring Program not implemented") + pass + + async def _run_submission_directory(self, program_dir): + logger.error("[-] Run Submission not implemented") + pass + def _put_dir(self, url, directory): """Zip the directory and send it to the given URL using _put_file.""" logger.info("Putting dir %s in %s" % (directory, url)) @@ -1197,15 +1401,15 @@ def prepare(self): hostname = utils.nodenames.gethostname() if self.is_scoring: self._update_status( - STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}" + SubmissionStatus.RUNNING, extra_information=f"scoring_hostname-{hostname}" ) else: self._update_status( - STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}" + SubmissionStatus.RUNNING, extra_information=f"ingestion_hostname-{hostname}" ) if not self.is_scoring: # Only during prediction step do we want to announce "preparing" - self._update_status(STATUS_PREPARING) + self._update_status(SubmissionStatus.PREPARING) # Setup cache and prune if it's out of control self._prep_cache_dir() @@ -1214,7 +1418,9 @@ def prepare(self): # sub folder. bundles = [ # (url to file, relative folder destination) - (self.program_data, "program"), + # (self.program_data, "program"), + (self.scoring_program_data, "scoring_program"), + (self.submission_data, "submission"), (self.ingestion_program_data, "ingestion_program"), (self.input_data, "input_data"), (self.reference_data, "input/ref"), @@ -1229,8 +1435,11 @@ def prepare(self): cache_this_bundle = path in ("input_data", "input/ref") zip_file = self._get_bundle(url, path, cache=cache_this_bundle) - # TODO: When we have `is_scoring_only` this needs to change... - if url == self.program_data and not self.is_scoring: + # Originally the following if condition was + # `if url == self.program_data and not self.is_scoring:` + # Which means if url == submission and this is ingestion run + # Below now we have a new condition i.e. when url == submission and this is ingestion run + if url == self.submission_data and not self.is_scoring: # We want to get a checksum of submissions so we can check if they are # a solution, or maybe match them against other submissions later logger.info(f"Beginning MD5 checksum of submission: {zip_file}") @@ -1247,19 +1456,50 @@ def prepare(self): self._get_container_image(self.container_image) def start(self): - program_dir = os.path.join(self.root_dir, "program") + # program_dir = os.path.join(self.root_dir, "program") + # ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program") + submission_dir = os.path.join(self.root_dir, "submission") + scoring_program_dir = os.path.join(self.root_dir, "scoring_program") ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program") - logger.info("Running scoring program, and then ingestion program") + # logger.info("Running scoring program, and then ingestion program") + logger.info(f"Starting run: {ProgramKind.SCORING_PROGRAM if self.is_scoring else ProgramKind.INGESTION_PROGRAM}") + loop = asyncio.new_event_loop() # Set the event loop for the gather asyncio.set_event_loop(loop) - gathered_tasks = asyncio.gather( - self._run_program_directory(program_dir, kind="program"), - self._run_program_directory(ingestion_program_dir, kind="ingestion"), - self.watch_detailed_results(), - return_exceptions=True, - ) + + tasks = [] + if self.is_scoring: + # During scoring, run scoring program directory + tasks.append( + self._run_scoring_program_directory(scoring_program_dir) + ) + + # If ingestion_only_during_scoring is true, we also run ingestion program directory + if self.ingestion_only_during_scoring: + tasks.append( + self._run_ingestion_program_directory(ingestion_program_dir) + ) + + tasks.append( + self.watch_detailed_results() + ) + else: + # During ingestion we run ingestion program directory and submission directory + tasks.extend([ + self._run_ingestion_program_directory(ingestion_program_dir), + self._run_submission_directory(submission_dir) + ]) + + # gathered_tasks = asyncio.gather( + # self._run_program_directory(program_dir, kind="program"), + # self._run_program_directory(ingestion_program_dir, kind="ingestion"), + # self.watch_detailed_results(), + # return_exceptions=True, + # ) + gathered_tasks = asyncio.gather(*tasks, return_exceptions=True) + task_results = [] # will store results/exceptions from gather signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) @@ -1282,7 +1522,7 @@ def start(self): for kind, logs in self.logs.items(): containers_to_kill = [] containers_to_kill.append(self.ingestion_container_name) - containers_to_kill.append(self.program_container_name) + containers_to_kill.append(self.scoring_program_container_name) logger.debug( "Trying to kill and remove container " + str(containers_to_kill) ) @@ -1337,7 +1577,7 @@ def start(self): if kind == "ingestion": containers_to_kill = self.ingestion_container_name else: - containers_to_kill = self.program_container_name + containers_to_kill = self.scoring_program_container_name try: client.kill(containers_to_kill) client.remove_container(containers_to_kill, force=True) @@ -1381,15 +1621,15 @@ def start(self): failed_rc = (program_rc is None) or (program_rc != 0) if had_async_exc or failed_rc: self._update_status( - STATUS_FAILED, + SubmissionStatus.FAILED, extra_information=f"program_rc={program_rc}, async={task_results}", ) # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") - self._update_status(STATUS_FINISHED) + self._update_status(SubmissionStatus.FINISHED) else: - self._update_status(STATUS_SCORING) + self._update_status(SubmissionStatus.SCORING) def push_scores(self): """This is only ran at the end of the scoring step""" diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index 4990d04f5..8874bd380 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -165,7 +165,7 @@ def _send_to_compute_worker(submission, is_scoring): if task.ingestion_program: if (task.ingestion_only_during_scoring and is_scoring) or (not task.ingestion_only_during_scoring and not is_scoring): - run_args['ingestion_program'] = make_url_sassy(task.ingestion_program.data_file.name) + run_args['ingestion_program_data'] = make_url_sassy(task.ingestion_program.data_file.name) if task.input_data and (not is_scoring or task.ingestion_only_during_scoring): run_args['input_data'] = make_url_sassy(task.input_data.data_file.name) @@ -175,9 +175,8 @@ def _send_to_compute_worker(submission, is_scoring): run_args['ingestion_only_during_scoring'] = task.ingestion_only_during_scoring - run_args['program_data'] = make_url_sassy( - path=submission.data.data_file.name if not is_scoring else task.scoring_program.data_file.name - ) + run_args['scoring_program_data'] = make_url_sassy(path=task.scoring_program.data_file.name) + run_args['submission_data'] = make_url_sassy(path=submission.data.data_file.name) if not is_scoring: detail_names = SubmissionDetails.DETAILED_OUTPUT_NAMES_PREDICTION From d7ed570f726a85f684df54bd85fc373d5c5ba378 Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Sun, 22 Mar 2026 06:56:08 +0500 Subject: [PATCH 02/10] compute worker updates --- compute_worker/compute_worker.py | 101 ++++++++++++++++++------------- src/apps/competitions/tasks.py | 4 +- 2 files changed, 63 insertions(+), 42 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 276f602cf..38210440a 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -42,6 +42,7 @@ class ProgramKind: INGESTION_PROGRAM = "ingestion_program" SCORING_PROGRAM = "scoring_program" + SUBMISSION = "submission" # ----------------------------------------------- @@ -970,13 +971,13 @@ async def _run_container_engine_cmd(self, container, kind): "data": logs_Unified[0], "stream": logs_Unified[0], "continue": True, - "location": self.stdout if kind == "program" else self.ingestion_stdout, + "location": self.stdout if kind == ProgramKind.SCORING_PROGRAM else self.ingestion_stdout, }, "stderr": { "data": logs_Unified[1], "stream": logs_Unified[1], "continue": True, - "location": self.stderr if kind == "program" else self.ingestion_stderr, + "location": self.stderr if kind == ProgramKind.SCORING_PROGRAM else self.ingestion_stderr, }, } @@ -1005,7 +1006,7 @@ def _get_host_path(self, *paths): return path - async def _run_program_directory(self, program_dir, kind): + async def _rrun_program_directory(self, program_dir, kind): """ Function responsible for running program directory @@ -1235,63 +1236,87 @@ async def _run_program_directory(self, program_dir, kind): logger.exception("Program directory execution failed") raise SubmissionException(str(e)) - async def _run_ingestion_program_directory(self, program_dir): + async def _run_program_directory(self, kind, program_dir): """ - Run ingestion program directory. + Function responsible for running + - ingestion program + - scoring program + - submission Args: - program_dir: path to ingestion program + kind: `ingestion_program` or `scoring_program` or `submission` + program_dir: path to the program to run """ # Return if directory does not exist if not os.path.exists(program_dir): - logger.warning(f"{program_dir} not found, no program to execute") + logger.warning(f"{program_dir} for {kind} not found, no program to execute") # Communicate that the program is closing self.completed_program_counter += 1 return - # Find metadata file. Raise error if metadata is not founc + # Find metadata file. + # Raise error if metadata is not found for ingestion or scoring if os.path.exists(os.path.join(program_dir, "metadata.yaml")): metadata_path = "metadata.yaml" elif os.path.exists(os.path.join(program_dir, "metadata")): metadata_path = "metadata" else: - raise SubmissionException( - "Ingestion program directory missing 'metadata.yaml/metadata'" - ) + if kind in [ProgramKind.INGESTION_PROGRAM, ProgramKind.SCORING_PROGRAM]: + error_message = f"{program_dir} for {kind} missing 'metadata.yaml/metadata' file." + logger.error(error_message) + raise SubmissionException(error_message) + else: + logger.warning(f"{program_dir} for {kind} missing 'metadata.yaml/metadata' file. Assuming it is going to be handled by ingestion or scoring") + # Metadata file is found logger.info(f"Metadata path is {os.path.join(program_dir, metadata_path)}") + + # Reading metadata file to find command. + # Raise error if command is not found for ingestion or scoring with open(os.path.join(program_dir, metadata_path), "r") as metadata_file: - # try to find a command in the metadata, in other cases set metadata to None + command = None try: metadata = yaml.load(metadata_file.read(), Loader=yaml.FullLoader) logger.info(f"Metadata contains:\n {metadata}") if isinstance(metadata, dict): command = metadata.get("command") - else: - command = None except yaml.YAMLError as e: logger.error("Error parsing YAML file: ", e) - print("Error parsing YAML file: ", e) - command = None - if not command: + if not command and kind in [ProgramKind.INGESTION_PROGRAM, ProgramKind.SCORING_PROGRAM]: raise SubmissionException( "Missing 'command' in metadata or metadata format is not correct!" ) + else: + logger.warning( + "Missing 'command' in metadata or metadata format is not correct! Continuing anyway assuming it is going to be handled by ingestion or scoring" + ) + # Setting volume host and volumes config. + # To be used by `_create_container` function volumes_host = [ self._get_host_path(program_dir), self._get_host_path(self.output_dir), self.data_dir, - self._get_host_path(self.root_dir, "program") + self._get_host_path(self.root_dir, "submission") ] volumes_config = { volumes_host[0]: {"bind": "/app/program", "mode": "z"}, volumes_host[1]: {"bind": "/app/output", "mode": "z"}, volumes_host[2]: {"bind": "/app/data", "mode": "ro"}, - volumes_host[3]: {"bind": "/app/ingested_program"} + volumes_host[3]: {"bind": "/app/ingested_program", "mode": "ro"}, } + if kind == ProgramKind.SCORING_PROGRAM: + # For scoring program, we want to have a shared directory just in case we have an ingestion program. + volumes_host.extend([self._get_host_path(self.root_dir, "shared")]) + volumes_config.update({volumes_host[-1]: {"bind": "/app/shared"}}) + + # Input dir for scoring program + volumes_host.extend([self._get_host_path(self.root_dir, "input")]) + volumes_config.update({volumes_host[-1]: {"bind": "/app/input"}}) + + # NOTE: self.input_data is valid when running an ingestion program and competition task has input data if self.input_data: volumes_host.append(self._get_host_path(self.root_dir, "input_data")) volumes_config.update({volumes_host[-1]: {"bind": "/app/input_data"}}) @@ -1317,7 +1342,7 @@ async def _run_ingestion_program_directory(self, program_dir): pprint(volumes_config) # This runs the container engine command and asynchronously passes data back via websocket try: - return await self._run_container_engine_cmd(container, kind=ProgramKind.INGESTION_PROGRAM) + return await self._run_container_engine_cmd(container, kind=kind) except Exception as e: logger.exception("Program directory execution failed") raise SubmissionException(str(e)) @@ -1456,14 +1481,13 @@ def prepare(self): self._get_container_image(self.container_image) def start(self): - # program_dir = os.path.join(self.root_dir, "program") - # ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program") - submission_dir = os.path.join(self.root_dir, "submission") - scoring_program_dir = os.path.join(self.root_dir, "scoring_program") - ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program") - # logger.info("Running scoring program, and then ingestion program") - logger.info(f"Starting run: {ProgramKind.SCORING_PROGRAM if self.is_scoring else ProgramKind.INGESTION_PROGRAM}") + logger.info(f"Preparing to run: {ProgramKind.SCORING_PROGRAM if self.is_scoring else ProgramKind.INGESTION_PROGRAM}") + + # Define directories for ingestion, scoring and submission + ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program") + scoring_program_dir = os.path.join(self.root_dir, "scoring_program") + submission_dir = os.path.join(self.root_dir, "submission") loop = asyncio.new_event_loop() # Set the event loop for the gather @@ -1473,31 +1497,26 @@ def start(self): if self.is_scoring: # During scoring, run scoring program directory tasks.append( - self._run_scoring_program_directory(scoring_program_dir) + self._run_program_directory(kind=ProgramKind.SCORING_PROGRAM, program_dir=scoring_program_dir) ) - # If ingestion_only_during_scoring is true, we also run ingestion program directory + # If ingestion_only_during_scoring is true, we also run ingestion program directory in parallel to scoring program if self.ingestion_only_during_scoring: tasks.append( - self._run_ingestion_program_directory(ingestion_program_dir) + self._run_program_directory(kind=ProgramKind.INGESTION_PROGRAM, program_dir=ingestion_program_dir) ) - tasks.append( - self.watch_detailed_results() - ) + # During scoring we watch for detailed results + # tasks.append( + # self.watch_detailed_results() + # ) else: # During ingestion we run ingestion program directory and submission directory tasks.extend([ - self._run_ingestion_program_directory(ingestion_program_dir), - self._run_submission_directory(submission_dir) + self._run_program_directory(kind=ProgramKind.INGESTION_PROGRAM, program_dir=ingestion_program_dir), + self._run_program_directory(kind=ProgramKind.SUBMISSION, program_dir=submission_dir) ]) - # gathered_tasks = asyncio.gather( - # self._run_program_directory(program_dir, kind="program"), - # self._run_program_directory(ingestion_program_dir, kind="ingestion"), - # self.watch_detailed_results(), - # return_exceptions=True, - # ) gathered_tasks = asyncio.gather(*tasks, return_exceptions=True) task_results = [] # will store results/exceptions from gather diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index 8874bd380..a91742c37 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -175,7 +175,9 @@ def _send_to_compute_worker(submission, is_scoring): run_args['ingestion_only_during_scoring'] = task.ingestion_only_during_scoring - run_args['scoring_program_data'] = make_url_sassy(path=task.scoring_program.data_file.name) + if is_scoring: + run_args['scoring_program_data'] = make_url_sassy(path=task.scoring_program.data_file.name) + run_args['submission_data'] = make_url_sassy(path=submission.data.data_file.name) if not is_scoring: From fc900ab5452bb6ce218acc6679bcb9c23cf40f05 Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Sun, 22 Mar 2026 07:05:14 +0500 Subject: [PATCH 03/10] removed unused code --- compute_worker/compute_worker.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 38210440a..698234dc3 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1347,14 +1347,6 @@ async def _run_program_directory(self, kind, program_dir): logger.exception("Program directory execution failed") raise SubmissionException(str(e)) - async def _run_scoring_program_directory(self, program_dir): - logger.error("[-] Run Scoring Program not implemented") - pass - - async def _run_submission_directory(self, program_dir): - logger.error("[-] Run Submission not implemented") - pass - def _put_dir(self, url, directory): """Zip the directory and send it to the given URL using _put_file.""" logger.info("Putting dir %s in %s" % (directory, url)) From 62b1da4a7be00dfc86c66c7272ab351aeaea2680 Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Sun, 22 Mar 2026 07:50:06 +0500 Subject: [PATCH 04/10] updated watchRetailedResults function --- compute_worker/compute_worker.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 698234dc3..a8d087920 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -474,7 +474,7 @@ async def watch_detailed_results(self): start = time.time() expiration_seconds = 60 - while self.watch and self.completed_program_counter < 2: + while self.watch and self.completed_program_counter < 1: if file_path: new_time = os.path.getmtime(file_path) if new_time != last_modified_time: @@ -1499,9 +1499,9 @@ def start(self): ) # During scoring we watch for detailed results - # tasks.append( - # self.watch_detailed_results() - # ) + tasks.append( + self.watch_detailed_results() + ) else: # During ingestion we run ingestion program directory and submission directory tasks.extend([ @@ -1623,7 +1623,10 @@ def start(self): if self.is_scoring: # Check if scoring program failed - program_results, _, _ = task_results + try: + program_results, _, _ = task_results + except: + program_results, _ = task_results # Gather returns either normal values or exception instances when return_exceptions=True had_async_exc = isinstance( program_results, BaseException From 9689913b9cc731e5fae7178f97f58be6a825a92f Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Sun, 22 Mar 2026 16:04:32 +0500 Subject: [PATCH 05/10] commented code that was failing the submission --- compute_worker/compute_worker.py | 267 +++---------------------------- 1 file changed, 19 insertions(+), 248 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index a8d087920..fca64ac08 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1006,236 +1006,6 @@ def _get_host_path(self, *paths): return path - async def _rrun_program_directory(self, program_dir, kind): - """ - Function responsible for running program directory - - Args: - - program_dir : can be either ingestion program or program/submission - - kind : either `program` or `ingestion` - """ - # If the directory doesn't even exist, move on - if not os.path.exists(program_dir): - logger.warning(f"{program_dir} not found, no program to execute") - - # Communicate that the program is closing - self.completed_program_counter += 1 - return - - if os.path.exists(os.path.join(program_dir, "metadata.yaml")): - metadata_path = "metadata.yaml" - elif os.path.exists(os.path.join(program_dir, "metadata")): - metadata_path = "metadata" - else: - # Display a warning in logs when there is no metadata file in submission/program dir - if kind == "program": - logger.warning( - "Program directory missing metadata, assuming it's going to be handled by ingestion" - ) - # Copy submission files into prediction output - # This is useful for results submissions but wrongly uses storage - shutil.copytree(program_dir, self.output_dir) - return - else: - raise SubmissionException( - "Program directory missing 'metadata.yaml/metadata'" - ) - - logger.info(f"Metadata path is {os.path.join(program_dir, metadata_path)}") - with open(os.path.join(program_dir, metadata_path), "r") as metadata_file: - try: # try to find a command in the metadata, in other cases set metadata to None - metadata = yaml.load(metadata_file.read(), Loader=yaml.FullLoader) - logger.info(f"Metadata contains:\n {metadata}") - if isinstance(metadata, dict): # command found - command = metadata.get("command") - else: - command = None - except yaml.YAMLError as e: - logger.error("Error parsing YAML file: ", e) - print("Error parsing YAML file: ", e) - command = None - if not command and kind == "ingestion": - raise SubmissionException( - "Program directory missing 'command' in metadata" - ) - elif not command: - logger.warning( - f"Warning: {program_dir} has no command in metadata, continuing anyway " - f"(may be meant to be consumed by an ingestion program)" - ) - return - volumes_host = [ - self._get_host_path(program_dir), - self._get_host_path(self.output_dir), - self.data_dir, - ] - volumes_config = { - volumes_host[0]: { - "bind": "/app/program", - "mode": "z", - }, - volumes_host[1]: { - "bind": "/app/output", - "mode": "z", - }, - volumes_host[2]: { - "bind": "/app/data", - "mode": "ro", - }, - } - - if kind == "ingestion": - # program here is either scoring program or submission, depends on if this ran during Prediction or Scoring - if self.ingestion_only_during_scoring and self.is_scoring: - # submission program moved to 'input/res' with shutil.move() above - ingested_program_location = "input/res" - else: - ingested_program_location = "program" - volumes_host.extend( - [self._get_host_path(self.root_dir, ingested_program_location)] - ) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/ingested_program", - } - } - volumes_config.update(tempvolumeConfig) - - if self.is_scoring: - # For scoring programs, we want to have a shared directory just in case we have an ingestion program. - # This will add the share dir regardless of ingestion or scoring, as long as we're `is_scoring` - volumes_host.extend([self._get_host_path(self.root_dir, "shared")]) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/shared", - } - } - volumes_config.update(tempvolumeConfig) - - # Input from submission (or submission + ingestion combo) - volumes_host.extend([self._get_host_path(self.input_dir)]) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/input", - } - } - volumes_config.update(tempvolumeConfig) - - if self.input_data: - volumes_host.extend([self._get_host_path(self.root_dir, "input_data")]) - tempvolumeConfig = { - volumes_host[-1]: { - "bind": "/app/input_data", - } - } - volumes_config.update(tempvolumeConfig) - - # Handle Legacy competitions by replacing anything in the run command - command = replace_legacy_metadata_command( - command=command, - kind=kind, - is_scoring=self.is_scoring, - ingestion_only_during_scoring=self.ingestion_only_during_scoring, - ) - - cap_drop_list = [ - "AUDIT_WRITE", - "CHOWN", - "DAC_OVERRIDE", - "FOWNER", - "FSETID", - "KILL", - "MKNOD", - "NET_BIND_SERVICE", - "NET_RAW", - "SETFCAP", - "SETGID", - "SETPCAP", - "SETUID", - "SYS_CHROOT", - ] - # Configure whether or not we use the GPU. Also setting auto_remove to False because - if os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").lower() == "docker": - security_options = ["no-new-privileges"] - else: - security_options = ["label=disable"] - # Setting the device ID like this allows users to specify which gpu to use in the .env file, with all being the default if no value is given - device_id = [os.environ.get("GPU_DEVICE", "nvidia.com/gpu=all")] - if os.environ.get("USE_GPU", "false").lower() == "true": - logger.info("Running the container with GPU capabilities") - host_config = client.create_host_config( - auto_remove=False, - cap_drop=cap_drop_list, - binds=volumes_config, - userns_mode="host", - security_opt=security_options, - device_requests=[ - { - "Driver": "cdi", - "DeviceIDs": device_id, - }, - ], - ) - else: - host_config = client.create_host_config( - auto_remove=False, - cap_drop=cap_drop_list, - binds=volumes_config, - userns_mode="host", - security_opt=security_options, - ) - - logger.info("Running container with command " + command) - container_name = ( - self.ingestion_container_name - if kind == "ingestion" - else self.scoring_program_container_name - ) - # Disable or not the competition container access to Internet (False by default) - container_network_disabled = os.environ.get( - "COMPETITION_CONTAINER_NETWORK_DISABLED", "" - ) - - # HTTP and HTTPS proxy for the competition container if needed - competition_container_proxy_http = os.environ.get( - "COMPETITION_CONTAINER_HTTP_PROXY", "" - ) - competition_container_proxy_http = ( - "http_proxy=" + competition_container_proxy_http - ) - - competition_container_proxy_https = os.environ.get( - "COMPETITION_CONTAINER_HTTPS_PROXY", "" - ) - competition_container_proxy_https = ( - "https_proxy=" + competition_container_proxy_https - ) - - container = client.create_container( - self.container_image, - name=container_name, - host_config=host_config, - detach=False, - volumes=volumes_host, - command=command, - working_dir="/app/program", - environment=[ - "PYTHONUNBUFFERED=1", - competition_container_proxy_http, - competition_container_proxy_https, - ], - network_disabled=container_network_disabled.lower() == "true", - ) - logger.debug("Created container : " + str(container)) - logger.info("Volume configuration of the container: ") - pprint(volumes_config) - # This runs the container engine command and asynchronously passes data back via websocket - try: - return await self._run_container_engine_cmd(container, kind=kind) - except Exception as e: - logger.exception("Program directory execution failed") - raise SubmissionException(str(e)) - async def _run_program_directory(self, kind, program_dir): """ Function responsible for running @@ -1254,7 +1024,7 @@ async def _run_program_directory(self, kind, program_dir): self.completed_program_counter += 1 return - # Find metadata file. + # Find metadata file. # Raise error if metadata is not found for ingestion or scoring if os.path.exists(os.path.join(program_dir, "metadata.yaml")): metadata_path = "metadata.yaml" @@ -1623,23 +1393,24 @@ def start(self): if self.is_scoring: # Check if scoring program failed - try: - program_results, _, _ = task_results - except: - program_results, _ = task_results - # Gather returns either normal values or exception instances when return_exceptions=True - had_async_exc = isinstance( - program_results, BaseException - ) and not isinstance(program_results, asyncio.CancelledError) - program_rc = getattr(self, "program_exit_code", None) - failed_rc = (program_rc is None) or (program_rc != 0) - if had_async_exc or failed_rc: - self._update_status( - SubmissionStatus.FAILED, - extra_information=f"program_rc={program_rc}, async={task_results}", - ) - # Raise so upstream marks failed immediately - raise SubmissionException("Child task failed or non-zero return code") + # try: + # program_results, _, _ = task_results + # except: + # program_results, _ = task_results + # # Gather returns either normal values or exception instances when return_exceptions=True + # had_async_exc = isinstance( + # program_results, BaseException + # ) and not isinstance(program_results, asyncio.CancelledError) + # program_rc = getattr(self, "program_exit_code", None) + # failed_rc = (program_rc is None) or (program_rc != 0) + # if had_async_exc or failed_rc: + # self._update_status( + # SubmissionStatus.FAILED, + # extra_information=f"program_rc={program_rc}, async={task_results}", + # ) + # # Raise so upstream marks failed immediately + # raise SubmissionException("Child task failed or non-zero return code") + self._update_status(SubmissionStatus.FINISHED) else: From 5731c11ad0268e949450c05f92484b595fb7a731 Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Sun, 22 Mar 2026 16:17:10 +0500 Subject: [PATCH 06/10] replaced container_id by container.get(Id). in _run_container_engine_cmd --- compute_worker/compute_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index fca64ac08..ea9d2e1eb 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -959,7 +959,7 @@ async def _run_container_engine_cmd(self, container, kind): finally: try: # Last chance of removing container - client.remove_container(container_id, force=True) + client.remove_container(container.get("Id"), force=True) except Exception: pass From f967e22a0ad7d7cbadafa80c50cb469b9ec95bc3 Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Sun, 22 Mar 2026 16:23:23 +0500 Subject: [PATCH 07/10] minor fixes --- compute_worker/compute_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index ea9d2e1eb..9a7236975 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1094,7 +1094,7 @@ async def _run_program_directory(self, kind, program_dir): # Handle Legacy competitions by replacing anything in the run command command = replace_legacy_metadata_command( command=command, - kind=ProgramKind.INGESTION_PROGRAM, + kind=kind, is_scoring=self.is_scoring, ingestion_only_during_scoring=self.ingestion_only_during_scoring, ) From 45454067032dd27b93e403712ac4d33ad06a6f84 Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Mon, 23 Mar 2026 05:41:34 +0500 Subject: [PATCH 08/10] gathered all env settings in a new Settings class --- compute_worker/compute_worker.py | 150 +++++++++++++++++-------------- 1 file changed, 81 insertions(+), 69 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 9a7236975..85853f5cd 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -37,7 +37,56 @@ # ----------------------------------------------- -# CONSTANTS +# Env Settings +# ----------------------------------------------- +class Settings: + + @staticmethod + def get(key, default=None): + """ + Return the env var value if set, else default; returns None if not set and no default. + """ + val = os.getenv(key) + + if val is not None: + return val + + if default is not None: + return default + + logger.warning(f"Environment variable '{key}' not found and no default provided.") + return None + + # Defaults + DEFAULT_SOCKETS = { + "docker": "unix:///var/run/docker.sock", + "podman": "unix:///run/user/1000/podman/podman.sock", + } + + # Settings variables + LOG_LEVEL = get("LOG_LEVEL", "INFO") + SERIALIZED = get("SERIALIZED", "false") + + USE_GPU = get("USE_GPU", "false") + CONTAINER_ENGINE_EXECUTABLE = get("CONTAINER_ENGINE_EXECUTABLE", "docker") + GPU_DEVICE = get("GPU_DEVICE", "nvidia.com/gpu=all") + + CONTAINER_SOCKET = get("CONTAINER_SOCKET", DEFAULT_SOCKETS.get(CONTAINER_ENGINE_EXECUTABLE)) + + HOST_DIRECTORY = get("HOST_DIRECTORY", "/tmp/codabench/") + MAX_CACHE_DIR_SIZE_GB = get("MAX_CACHE_DIR_SIZE_GB", 10) + + COMPETITION_CONTAINER_NETWORK_DISABLED = get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False") + COMPETITION_CONTAINER_HTTP_PROXY = get("COMPETITION_CONTAINER_HTTP_PROXY", "") + COMPETITION_CONTAINER_HTTPS_PROXY = get("COMPETITION_CONTAINER_HTTPS_PROXY", "") + + CODALAB_IGNORE_CLEANUP_STEP = get("CODALAB_IGNORE_CLEANUP_STEP") + + WORKER_BUNDLE_URL_REWRITE = get("WORKER_BUNDLE_URL_REWRITE", "") + + +# ----------------------------------------------- +# Program Kind # ----------------------------------------------- class ProgramKind: INGESTION_PROGRAM = "ingestion_program" @@ -74,43 +123,24 @@ class SubmissionStatus: # Logging # ----------------------------------------------- configure_logging( - os.environ.get("LOG_LEVEL", "INFO"), os.environ.get("SERIALIZED", "false") + Settings.LOG_LEVEL, Settings.SERIALIZED ) # ----------------------------------------------- # Initialize Docker or Podman depending on .env # ----------------------------------------------- -if os.environ.get("USE_GPU", "false").lower() == "true": - logger.info( - "Using " - + os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").upper() - + "with GPU capabilites : " - + os.environ.get("GPU_DEVICE", "nvidia.com/gpu=all") - + " network_disabled for the competition container is set to " - + os.environ.get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False") - ) -else: - logger.info( - "Using " - + os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").upper() - + " without GPU capabilities. " - + "network_disabled for the competition container is set to " - + os.environ.get("COMPETITION_CONTAINER_NETWORK_DISABLED", "False") - ) - -if os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").lower() == "docker": - client = docker.APIClient( - base_url=os.environ.get("CONTAINER_SOCKET", "unix:///var/run/docker.sock"), - version="auto", - ) -elif os.environ.get("CONTAINER_ENGINE_EXECUTABLE").lower() == "podman": - client = docker.APIClient( - base_url=os.environ.get( - "CONTAINER_SOCKET", "unix:///run/user/1000/podman/podman.sock" - ), - version="auto", - ) +logger.info( + f"Using {Settings.CONTAINER_ENGINE_EXECUTABLE.upper()} " + f"{'with GPU capabilities: ' + Settings.GPU_DEVICE if Settings.USE_GPU.lower() == 'true' else 'without GPU capabilities'}. " + f"Network disabled for the competition container is set to {Settings.COMPETITION_CONTAINER_NETWORK_DISABLED}" +) +# Intializing client +# NOTE: CONTAINER_SOCKET is set in Settings based on CONTAINER_ENGINE_EXECUTABLE which must has either podman or docker +client = docker.APIClient( + base_url=Settings.CONTAINER_SOCKET, + version="auto", +) # ----------------------------------------------- # Show Progress bar on downloading images @@ -178,8 +208,8 @@ def show_progress(line, progress): total=total, ) except Exception as e: - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": - logger.exception("There was an error showing the progress bar") + if Settings.LOG_LEVEL.lower() == "debug": + logger.exception(f"There was an error showing the progress bar: {e}") # ----------------------------------------------- @@ -206,11 +236,11 @@ def setup_celery_logging(**kwargs): # Directories # ----------------------------------------------- # Setup base directories used by all submissions -# note: we need to pass this directory to docker/podman so it knows where to store things! -HOST_DIRECTORY = os.environ.get("HOST_DIRECTORY", "/tmp/codabench/") +# NOTE: we need to pass this directory to docker/podman so it knows where to store things! +HOST_DIRECTORY = Settings.HOST_DIRECTORY BASE_DIR = "/codabench/" # base directory inside the container CACHE_DIR = os.path.join(BASE_DIR, "cache") -MAX_CACHE_DIR_SIZE_GB = float(os.environ.get("MAX_CACHE_DIR_SIZE_GB", 10)) +MAX_CACHE_DIR_SIZE_GB = float(Settings.MAX_CACHE_DIR_SIZE_GB) # ----------------------------------------------- @@ -239,7 +269,7 @@ def rewrite_bundle_url_if_needed(url): Example: http://localhost:9000|http://minio:9000 """ - rule = os.getenv("WORKER_BUNDLE_URL_REWRITE", "").strip() + rule = Settings.WORKER_BUNDLE_URL_REWRITE.strip() if not rule or "|" not in rule: return url src, dst = rule.split("|", 1) @@ -757,14 +787,14 @@ def _create_container( ] # Configure whether or not we use the GPU. Also setting auto_remove to False because - if os.environ.get("CONTAINER_ENGINE_EXECUTABLE", "docker").lower() == "docker": + if Settings.CONTAINER_ENGINE_EXECUTABLE.lower() == "docker": security_options = ["no-new-privileges"] else: security_options = ["label=disable"] # Setting the device ID like this allows users to specify which gpu to use in the .env file, with all being the default if no value is given - device_id = [os.environ.get("GPU_DEVICE", "nvidia.com/gpu=all")] - if os.environ.get("USE_GPU", "false").lower() == "true": + device_id = [Settings.GPU_DEVICE] + if Settings.USE_GPU.lower() == "true": logger.info("Container configured with GPU capabilities") host_config = client.create_host_config( auto_remove=False, @@ -789,27 +819,9 @@ def _create_container( security_opt=security_options, ) - # Disable or not the competition container access to Internet (False by default) - container_network_disabled = os.environ.get( - "COMPETITION_CONTAINER_NETWORK_DISABLED", "" - ) - - # HTTP and HTTPS proxy for the competition container if needed - competition_container_proxy_http = os.environ.get( - "COMPETITION_CONTAINER_HTTP_PROXY", "" - ) - competition_container_proxy_http = ( - "http_proxy=" + competition_container_proxy_http - ) - - competition_container_proxy_https = os.environ.get( - "COMPETITION_CONTAINER_HTTPS_PROXY", "" - ) - competition_container_proxy_https = ( - "https_proxy=" + competition_container_proxy_https - ) - # Creating container + # COMPETITION_CONTAINER_NETWORK_DISABLED: Disable or not the competition container access to Internet (False by default) + # HTTP and HTTPS proxy for the competition container if needed container = client.create_container( self.container_image, name=container_name, @@ -820,10 +832,10 @@ def _create_container( working_dir="/app/program", environment=[ "PYTHONUNBUFFERED=1", - competition_container_proxy_http, - competition_container_proxy_https, + "http_proxy=" + Settings.COMPETITION_CONTAINER_HTTP_PROXY, + "https_proxy=" + Settings.COMPETITION_CONTAINER_HTTPS_PROXY, ], - network_disabled=container_network_disabled.lower() == "true", + network_disabled=Settings.COMPETITION_CONTAINER_NETWORK_DISABLED.lower() == "true", ) return container @@ -868,7 +880,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error( f"There was an error trying to connect to the websocket on the codabench instance: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL.lower() == "debug": logger.exception(e) start = time.time() @@ -922,7 +934,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error( f"There was an error while starting the container and getting the logs: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL.lower() == "debug": logger.exception(e) # Get the return code of the competition container once done @@ -1316,7 +1328,7 @@ def start(self): logger.error( f"There was a problem killing {containers_to_kill}: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL.lower() == "debug": logger.exception(e) # Send data to be written to ingestion/scoring std_err self._update_submission(execution_time_limit_exceeded_data) @@ -1368,7 +1380,7 @@ def start(self): logger.error( f"There was a problem killing {containers_to_kill}: {e}" ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + if Settings.LOG_LEVEL.lower() == "debug": logger.exception(e) if kind == "program": self.program_exit_code = return_code @@ -1486,7 +1498,7 @@ def push_output(self): self._put_dir(self.scoring_result, self.output_dir) def clean_up(self): - if os.environ.get("CODALAB_IGNORE_CLEANUP_STEP"): + if Settings.CODALAB_IGNORE_CLEANUP_STEP: logger.warning( f"CODALAB_IGNORE_CLEANUP_STEP mode enabled, ignoring clean up of: {self.root_dir}" ) From 951eadba5e4d553d24c3a2383f29c183c3f33f2c Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Tue, 24 Mar 2026 23:51:40 +0500 Subject: [PATCH 09/10] Copy submission to input/res added during Scoring --- compute_worker/compute_worker.py | 61 +++++++++++++++++++++++--------- 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 85853f5cd..c1f3a7b25 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -862,19 +862,19 @@ async def _run_container_engine_cmd(self, container, kind): try: websocket_url = f"{self.websocket_url}?kind={kind}" logger.debug( - "Connecting to " - + websocket_url - + "for container " - + str(container.get("Id")) + "Connecting to " + + websocket_url + + "for container " + + str(container.get("Id")) ) websocket = await asyncio.wait_for( websockets.connect(websocket_url), timeout=10.0 ) logger.debug( - "connected to " - + str(websocket_url) - + "for container " - + str(container.get("Id")) + "connected to " + + str(websocket_url) + + "for container " + + str(container.get("Id")) ) except Exception as e: logger.error( @@ -954,10 +954,10 @@ async def _run_container_engine_cmd(self, container, kind): client.remove_container(container, force=True) logger.debug( - "Container " - + container.get("Id") - + "exited with status code : " - + str(return_Code["StatusCode"]) + "Container " + + container.get("Id") + + "exited with status code : " + + str(return_Code["StatusCode"]) ) except ( @@ -1095,7 +1095,7 @@ async def _run_program_directory(self, kind, program_dir): volumes_config.update({volumes_host[-1]: {"bind": "/app/shared"}}) # Input dir for scoring program - volumes_host.extend([self._get_host_path(self.root_dir, "input")]) + volumes_host.extend([self._get_host_path(self.input_dir)]) volumes_config.update({volumes_host[-1]: {"bind": "/app/input"}}) # NOTE: self.input_data is valid when running an ingestion program and competition task has input data @@ -1196,6 +1196,31 @@ def _prep_cache_dir(self, max_size=MAX_CACHE_DIR_SIZE_GB): else: logger.info("Cache directory does not need to be pruned!") + def _copy_submission_to_input_res(self): + """ + Temporary backward-compatibility function. + + Earlier, scoring programs expected submission files in ingestion output: + /app/input/res/ + + Newer changes expose submission under: + /app/ingested_program/ + + To avoid breaking older scoring programs, we copy the submission + directory into input/res + """ + + submission_directory = os.path.join(self.root_dir, "submission") + ingestion_res_directory = os.path.join(self.root_dir, "input/res") + + # copy from submission_directory ingestion_res_directory + try: + shutil.copytree(submission_directory, ingestion_res_directory, dirs_exist_ok=True) + logger.info(f"Copied submission files to input/res successfully") + + except Exception as e: + logger.error(f"Failed to copy submission to input/res: {e}") + def prepare(self): hostname = utils.nodenames.gethostname() if self.is_scoring: @@ -1234,10 +1259,7 @@ def prepare(self): cache_this_bundle = path in ("input_data", "input/ref") zip_file = self._get_bundle(url, path, cache=cache_this_bundle) - # Originally the following if condition was - # `if url == self.program_data and not self.is_scoring:` - # Which means if url == submission and this is ingestion run - # Below now we have a new condition i.e. when url == submission and this is ingestion run + # Computing checksum of the submission file during ingestion run if url == self.submission_data and not self.is_scoring: # We want to get a checksum of submissions so we can check if they are # a solution, or maybe match them against other submissions later @@ -1246,6 +1268,11 @@ def prepare(self): logger.info(f"Checksum result: {checksum}") self._update_submission({"md5": checksum}) + # During scoring: copy submission files into "input/res" + if self.is_scoring: + # NOTE: Temporary compatibility hook (To be removed in the future) + self._copy_submission_to_input_res() + # For logging purposes let's dump file names for filename in glob.iglob(self.root_dir + "**/*.*", recursive=True): logger.info(filename) From e18c68f2c83d5fd5f440b3c9d03ad3b15d77bea0 Mon Sep 17 00:00:00 2001 From: Ihsan Ullah Date: Tue, 24 Mar 2026 23:54:02 +0500 Subject: [PATCH 10/10] uncommented code that was checking for failed program (for circle ci tests) --- compute_worker/compute_worker.py | 34 ++++++++++++++++---------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index c1f3a7b25..623728cec 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1432,23 +1432,23 @@ def start(self): if self.is_scoring: # Check if scoring program failed - # try: - # program_results, _, _ = task_results - # except: - # program_results, _ = task_results - # # Gather returns either normal values or exception instances when return_exceptions=True - # had_async_exc = isinstance( - # program_results, BaseException - # ) and not isinstance(program_results, asyncio.CancelledError) - # program_rc = getattr(self, "program_exit_code", None) - # failed_rc = (program_rc is None) or (program_rc != 0) - # if had_async_exc or failed_rc: - # self._update_status( - # SubmissionStatus.FAILED, - # extra_information=f"program_rc={program_rc}, async={task_results}", - # ) - # # Raise so upstream marks failed immediately - # raise SubmissionException("Child task failed or non-zero return code") + try: + program_results, _, _ = task_results + except: + program_results, _ = task_results + # Gather returns either normal values or exception instances when return_exceptions=True + had_async_exc = isinstance( + program_results, BaseException + ) and not isinstance(program_results, asyncio.CancelledError) + program_rc = getattr(self, "program_exit_code", None) + failed_rc = (program_rc is None) or (program_rc != 0) + if had_async_exc or failed_rc: + self._update_status( + SubmissionStatus.FAILED, + extra_information=f"program_rc={program_rc}, async={task_results}", + ) + # Raise so upstream marks failed immediately + raise SubmissionException("Child task failed or non-zero return code") self._update_status(SubmissionStatus.FINISHED)