Skip to content

Commit 716f8e6

Browse files
authored
Merge pull request #2030 from codalab/worker-status
Worker status to FAILED instead of SCORING or FINISHED in case of failure
2 parents dd0223c + fdb5a6b commit 716f8e6

1 file changed

Lines changed: 46 additions & 25 deletions

File tree

compute_worker/compute_worker.py

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -449,9 +449,10 @@ async def send_detailed_results(self, file_path):
449449
)
450450
)
451451
except Exception as e:
452-
logger.error("This error might result in a Execution Time Exceeded error" + e)
452+
logger.error(f"This error might result in a Execution Time Exceeded error: {e}")
453453
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
454454
logger.exception(e)
455+
raise SubmissionException("Could not connect to instance to update detailed result")
455456

456457
def _get_stdout_stderr_file_names(self, run_args):
457458
# run_args should be the run_args argument passed to __init__ from the run_wrapper.
@@ -628,10 +629,10 @@ def _get_bundle(self, url, destination, cache=True):
628629
except BadZipFile:
629630
retries += 1
630631
if retries >= max_retries:
631-
raise # Re-raise the last caught BadZipFile exception
632+
raise SubmissionException("Bad or empty zip file")
632633
else:
633-
logger.warning("Failed. Retrying in 60 seconds...")
634-
time.sleep(60) # Wait 60 seconds before retrying
634+
logger.warning("Failed. Retrying in 20 seconds...")
635+
time.sleep(20) # Wait 20 seconds before retrying
635636
# Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it
636637
return bundle_file
637638

@@ -668,8 +669,7 @@ async def _run_container_engine_cmd(self, container, kind):
668669
)
669670
except Exception as e:
670671
logger.error(
671-
"There was an error trying to connect to the websocket on the codabench instance"
672-
+ e
672+
f"There was an error trying to connect to the websocket on the codabench instance: {e}"
673673
)
674674
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
675675
logger.exception(e)
@@ -718,8 +718,7 @@ async def _run_container_engine_cmd(self, container, kind):
718718
logger.error(e)
719719
except Exception as e:
720720
logger.error(
721-
"There was an error while starting the container and getting the logs"
722-
+ e
721+
f"There was an error while starting the container and getting the logs: {e}"
723722
)
724723
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
725724
logger.exception(e)
@@ -748,7 +747,7 @@ async def _run_container_engine_cmd(self, container, kind):
748747
Exception,
749748
) as e:
750749
logger.error(e)
751-
return_Code = {"StatusCode": e}
750+
return_Code = {"StatusCode": 1}
752751

753752
self.logs[kind] = {
754753
"returncode": return_Code["StatusCode"],
@@ -1066,6 +1065,15 @@ def _prep_cache_dir(self, max_size=MAX_CACHE_DIR_SIZE_GB):
10661065
logger.info("Cache directory does not need to be pruned!")
10671066

10681067
def prepare(self):
1068+
hostname = utils.nodenames.gethostname()
1069+
if self.is_scoring:
1070+
self._update_status(
1071+
STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}"
1072+
)
1073+
else:
1074+
self._update_status(
1075+
STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}"
1076+
)
10691077
if not self.is_scoring:
10701078
# Only during prediction step do we want to announce "preparing"
10711079
self._update_status(STATUS_PREPARING)
@@ -1110,15 +1118,6 @@ def prepare(self):
11101118
self._get_container_image(self.container_image)
11111119

11121120
def start(self):
1113-
hostname = utils.nodenames.gethostname()
1114-
if self.is_scoring:
1115-
self._update_status(
1116-
STATUS_RUNNING, extra_information=f"scoring_hostname-{hostname}"
1117-
)
1118-
else:
1119-
self._update_status(
1120-
STATUS_RUNNING, extra_information=f"ingestion_hostname-{hostname}"
1121-
)
11221121
program_dir = os.path.join(self.root_dir, "program")
11231122
ingestion_program_dir = os.path.join(self.root_dir, "ingestion_program")
11241123

@@ -1129,12 +1128,16 @@ def start(self):
11291128
self._run_program_directory(ingestion_program_dir, kind="ingestion"),
11301129
self.watch_detailed_results(),
11311130
loop=loop,
1131+
return_exceptions=True,
11321132
)
11331133

1134+
task_results = [] # will store results/exceptions from gather
11341135
signal.signal(signal.SIGALRM, alarm_handler)
11351136
signal.alarm(self.execution_time_limit)
11361137
try:
1137-
loop.run_until_complete(gathered_tasks)
1138+
# run tasks
1139+
# keep what gather returned so we can detect async errors later
1140+
task_results = loop.run_until_complete(gathered_tasks) or []
11381141
except ExecutionTimeLimitExceeded:
11391142
error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds"
11401143
logger.error(error_message)
@@ -1159,7 +1162,7 @@ def start(self):
11591162
logger.error(e)
11601163
except Exception as e:
11611164
logger.error(
1162-
"There was a problem killing " + str(containers_to_kill) + e
1165+
f"There was a problem killing {containers_to_kill}: {e}"
11631166
)
11641167
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
11651168
logger.exception(e)
@@ -1175,7 +1178,12 @@ def start(self):
11751178
elapsed_time = logs["end"] - logs["start"]
11761179
else:
11771180
elapsed_time = self.execution_time_limit
1178-
return_code = logs["returncode"]
1181+
# Normalize the return_code
1182+
return_code = (
1183+
logs["returncode"]
1184+
if logs["returncode"] is None or isinstance(logs["returncode"], int)
1185+
else 1
1186+
)
11791187
if return_code is None:
11801188
logger.warning("No return code from Process. Killing it")
11811189
if kind == "ingestion":
@@ -1189,7 +1197,7 @@ def start(self):
11891197
logger.error(e)
11901198
except Exception as e:
11911199
logger.error(
1192-
"There was a problem killing " + str(containers_to_kill) + e
1200+
f"There was a problem killing {containers_to_kill}: {e}"
11931201
)
11941202
if os.environ.get("LOG_LEVEL", "info").lower() == "debug":
11951203
logger.exception(e)
@@ -1212,6 +1220,16 @@ def start(self):
12121220
signal.alarm(0)
12131221

12141222
if self.is_scoring:
1223+
# Check if scoring program failed
1224+
program_results, _, _ = task_results
1225+
# Gather returns either normal values or exception instances when return_exceptions=True
1226+
had_async_exc = isinstance(program_results, BaseException) and not isinstance(program_results, asyncio.CancelledError)
1227+
program_rc = getattr(self, "program_exit_code", None)
1228+
failed_rc = program_rc not in (0, None)
1229+
if had_async_exc or failed_rc:
1230+
self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, async={task_results}")
1231+
# Raise so upstream marks failed immediately
1232+
raise SubmissionException("Child task failed or non-zero return code")
12151233
self._update_status(STATUS_FINISHED)
12161234
else:
12171235
self._update_status(STATUS_SCORING)
@@ -1273,9 +1291,12 @@ def push_output(self):
12731291
"Error, the output directory already contains a metadata file. This file is used "
12741292
"to store exitCode and other data, do not write to this file manually."
12751293
)
1276-
1277-
with open(metadata_path, "w") as f:
1278-
f.write(yaml.dump(prog_status, default_flow_style=False))
1294+
try:
1295+
with open(metadata_path, "w") as f:
1296+
f.write(yaml.dump(prog_status, default_flow_style=False))
1297+
except Exception as e:
1298+
logger.error(e)
1299+
raise SubmissionException("Metadata file not found")
12791300

12801301
if not self.is_scoring:
12811302
self._put_dir(self.prediction_result, self.output_dir)

0 commit comments

Comments
 (0)