Skip to content
Merged
21 changes: 21 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,26 @@
# Changes

## Branch: enable_augustustimeout_busco

### Fix: Augustus jobs in BUSCO training can now be killed if they hang

Individual Augustus jobs called during BUSCO-based ab-initio training had no timeout,
so a single stuck job would block its worker thread indefinitely and hang the entire run.

**`funannotate/aux_scripts/funannotate-BUSCO2.py`**
- Added `--augustus_timeout` argument (default 300 s). Each per-gene Augustus job is now
killed with SIGKILL (via `os.killpg` on its own process group) if it exceeds this limit,
and the run continues with that gene model skipped.
- Per-job elapsed times are collected across all worker threads and logged as a summary
after the Augustus pass (`mean`, `median`, `max`, timeout count) so the threshold can
be calibrated empirically.
- `_extract` calls are now wrapped in try/except so a truncated output file left by a
killed job does not abort the protein-extraction step.

**`funannotate/predict.py`**
- Added `--augustus_BUSCO_timeout SECONDS` argument (default 300) that is forwarded to
`funannotate-BUSCO2.py` as `--augustus_timeout`.

## Branch: auto_skip_genemark

### Feature: `--auto-skip-genemark` flag for `funannotate predict`
Expand Down
95 changes: 83 additions & 12 deletions funannotate/aux_scripts/funannotate-BUSCO2.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import os
import sys
import signal
import subprocess
import argparse
from argparse import RawTextHelpFormatter
Expand Down Expand Up @@ -320,7 +321,7 @@ def cmd_exists(cmd):
return subprocess.call('type %s' % cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) == 0

@staticmethod
def p_open(cmd, name, shell=False):
def p_open(cmd, name, shell=False, timeout=None):
"""
This function call subprocess.Popen for the provided command and log the results with the provided name
:param cmd: the command to execute
Expand All @@ -329,13 +330,31 @@ def p_open(cmd, name, shell=False):
:type name: str
:param shell: whether to use the shell parameter to Popen. Needed if wildcard charcter used (*?). See on web
:type shell: bool
:param timeout: seconds before killing the process; None means no timeout
:type timeout: int or None
"""
# note, all augustus related commands do not write to the stdout and stderr and therefore get nothing here
# start_new_session=True puts the process in its own process group so we can
# kill the entire group (shell + augustus child) on timeout without orphaning it.
# preexec_fn is not used because it is not thread-safe.
process = subprocess.Popen(
cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=shell)
process_out = process.stderr.readlines() + process.stdout.readlines()
for line in process_out:
_logger.info_external_tool(name, line.decode("utf-8").strip())
cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=shell,
start_new_session=True)
try:
stdout_data, stderr_data = process.communicate(timeout=timeout)
process_out = (stderr_data or b'').splitlines() + (stdout_data or b'').splitlines()
for line in process_out:
_logger.info_external_tool(name, line.decode("utf-8").strip())
return True
except subprocess.TimeoutExpired:
try:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except ProcessLookupError:
pass
process.communicate()
_logger.warning(
'%s timed out after %s seconds, skipping this gene model' % (name, timeout)
)
return False

@staticmethod
def check_fasta_header(header):
Expand Down Expand Up @@ -873,6 +892,10 @@ def __init__(self, params):
self._target_species = params['target_species']
self._augustus_parameters = params['augustus_parameters']
self._augustus_config_path = params['augustus_config_path']
self._augustus_timeout = params.get('augustus_timeout', 300)
self._augustus_timeout_count = 0
self._augustus_durations = []
self._augustus_stats_lock = threading.Lock()
self._tarzip = params['tarzip']
self._dataset_creation_date = params['dataset_creation_date']
self._dataset_nb_species = params['dataset_nb_species']
Expand Down Expand Up @@ -1221,6 +1244,23 @@ def _augustus(self):

self._run_threads(augustus_first_run_strings, self._AugustusThreads)

# Log Augustus timing summary to help users calibrate --augustus_timeout
if self._augustus_durations:
durations = sorted(self._augustus_durations)
n = len(durations)
median = durations[n // 2]
mean = sum(durations) / n
_logger.info(
'Augustus timing: %d jobs, mean=%.1fs, median=%.1fs, max=%.1fs, timeout_limit=%ds, timed_out=%d'
% (n, mean, median, durations[-1], self._augustus_timeout, self._augustus_timeout_count)
)
if self._augustus_timeout_count > 0:
_logger.warning(
'%d Augustus job(s) exceeded the %ds timeout and were skipped. '
'If this is too many, increase --augustus_timeout.'
% (self._augustus_timeout_count, self._augustus_timeout)
)

# Preparation of sequences for use with HMMer

# Parse Augustus output files ('run_XXXX/augustus') and extract protein sequences
Expand All @@ -1238,8 +1278,14 @@ def _augustus(self):

self._no_predictions = []
for entry in files:
self._extract(self.mainout, entry)
self._extract(self.mainout, entry, aa=False)
try:
self._extract(self.mainout, entry)
self._extract(self.mainout, entry, aa=False)
except Exception as e:
_logger.warning(
'Failed to extract proteins from %s (possibly truncated by timeout), skipping: %s'
% (entry, e)
)
Comment thread
hyphaltip marked this conversation as resolved.

Analysis.p_open(['find %saugustus_output/extracted_proteins -size 0 -delete' % self.mainout], 'bash',
shell=True)
Expand Down Expand Up @@ -1453,8 +1499,14 @@ def _augustus_rerun(self):
location) + 1
# when extract gets reworked to not need MAINOUT, change to OUT_NAME
plain_name = '%s.out.%s' % (entry, output_index)
self._extract(self.mainout, plain_name)
self._extract(self.mainout, plain_name, aa=False)
try:
self._extract(self.mainout, plain_name)
self._extract(self.mainout, plain_name, aa=False)
except Exception as e:
_logger.warning(
'Failed to extract proteins from %s (possibly truncated by timeout), skipping: %s'
% (plain_name, e)
)
else:
pass

Expand Down Expand Up @@ -2030,7 +2082,14 @@ def _process_augustus_tasks(self):
if state > self.slate[-1]:
_logger.info('%s =>\t%s%% of predictions performed (%i/%i candidate regions)'
% (time.strftime("%m/%d/%Y %H:%M:%S"), self.slate.pop(), check, self._total))
Analysis.p_open([data], 'augustus', shell=True)
_t0 = time.time()
completed = Analysis.p_open([data], 'augustus', shell=True, timeout=self._augustus_timeout)
elapsed = time.time() - _t0
with self._augustus_stats_lock:
self._augustus_durations.append(elapsed)
if not completed:
self._augustus_timeout_count += 1
_logger.debug('augustus job elapsed %.1fs%s' % (elapsed, ' [TIMED OUT]' if not completed else ''))
else:
self._queue_lock.release()

Expand Down Expand Up @@ -2971,6 +3030,17 @@ def _parse_args():
optional.add_argument('--local_augustus', required=False, dest='local_augustus',
help='local augustus folder')

def _positive_seconds(value):
ivalue = int(value)
if ivalue <= 0:
raise argparse.ArgumentTypeError('SECONDS must be a positive integer')
return ivalue

optional.add_argument('--augustus_timeout', required=False, dest='augustus_timeout',
metavar='SECONDS', type=_positive_seconds, default=300,
help='Kill individual Augustus jobs that exceed this many seconds (default: 300). '
'Timed-out jobs are skipped so the run continues rather than hanging.')
Comment thread
hyphaltip marked this conversation as resolved.

optional.add_argument('-a', '--augustus_parameters', required=False, default='', dest='augustus_parameters',
help='Additional parameters for the fine-tuning of Augustus run. '
'For the species, do not use this option.\n'
Expand Down Expand Up @@ -3202,7 +3272,8 @@ def _define_parameters(args):
"augustus_config_path": augustus_config_path, "tarzip": args['tarzip'],
"region_limit": region_limit, "flank": flank, "long": args['long'],
"dataset_creation_date": dataset_creation_date, "dataset_nb_species": dataset_nb_species,
"dataset_nb_buscos": dataset_nb_buscos, "augustus_parameters": args['augustus_parameters']
"dataset_nb_buscos": dataset_nb_buscos, "augustus_parameters": args['augustus_parameters'],
"augustus_timeout": args['augustus_timeout']
}


Expand Down
3 changes: 2 additions & 1 deletion funannotate/aux_scripts/tbl2asn_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ def tbl2asn_runner(cmd, dir, dialect='tbl2asn'):
indir_flag = '-indir' if dialect == 'table2asn' else '-p'
if dialect == 'table2asn':
# table2asn: -Z is a boolean flag; discrepancy written as <input>.dr
cmd = cmd + ['-Z', indir_flag, dir]
# -outdir ensures .dr/.stats/etc. land in dir rather than the process CWD
cmd = cmd + ['-Z', indir_flag, dir, '-outdir', dir]
else:
cmd = cmd + ['-Z', os.path.join(dir, 'discrepency.report.txt'), indir_flag, dir]
print("DEBUG tbl2asn_runner cmd: %s" % " ".join(cmd), flush=True)
Expand Down
8 changes: 8 additions & 0 deletions funannotate/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ def __init__(self, prog):
help="Annotated if genome not masked and skip bad contigs",
)
parser.add_argument("--busco_db", default="dikarya", help="BUSCO model database")
parser.add_argument(
"--augustus_BUSCO_timeout",
default=300,
type=int,
metavar="SECONDS",
help="Per-job timeout in seconds for Augustus during BUSCO training; hung jobs are killed and skipped (default: 300)",
)
parser.add_argument(
"-t",
"--tbl2asn",
Expand Down Expand Up @@ -1967,6 +1974,7 @@ def __init__(self, prog):
]
if int(args.table) != 1:
busco_cmd += ["-a", "--translation_table={}".format(int(args.table))]
busco_cmd += ["--augustus_timeout", str(args.augustus_BUSCO_timeout)]
lib.log.debug(" ".join(busco_cmd))

with open(busco_log, "w") as logfile:
Expand Down