diff --git a/setup.py b/setup.py index a340fd735..d53dea026 100644 --- a/setup.py +++ b/setup.py @@ -51,6 +51,7 @@ 'cactus-align = cactus.setup.cactus_align:main', 'cactus-align-batch = cactus.setup.cactus_align:main_batch', 'cactus-update-prepare = cactus.update.cactus_update_prepare:main', + 'cactus-hal2seqfile = cactus.update.cactus_hal2seqfile:main', 'cactus-terra-helper = cactus.progressive.cactus_terra_helper:main', 'cactus-hal2maf = cactus.maf.cactus_hal2maf:main', 'cactus-hal2chains = cactus.maf.cactus_hal2chains:main', diff --git a/src/cactus/cactus_progressive_config.xml b/src/cactus/cactus_progressive_config.xml index 936a7352a..64fa80d2a 100644 --- a/src/cactus/cactus_progressive_config.xml +++ b/src/cactus/cactus_progressive_config.xml @@ -83,6 +83,9 @@ + diff --git a/src/cactus/paf/local_alignment.py b/src/cactus/paf/local_alignment.py index f79c41c9a..193df4bd3 100755 --- a/src/cactus/paf/local_alignment.py +++ b/src/cactus/paf/local_alignment.py @@ -760,9 +760,20 @@ def make_paf_alignments(job, event_tree_string, event_names_to_sequences, ancest ingroup_events = get_leaves(ancestor_event) # Get the set of ingroup events # to be consistent with pre-refactor (and work with updating tests), we include the root when its id is input # and just treat it as an ingroup - if ancestor_event.iD in event_names_to_sequences and event_names_to_sequences[ancestor_event.iD]: + root_provided = ancestor_event.iD in event_names_to_sequences and event_names_to_sequences[ancestor_event.iD] + # When the root's sequence is supplied (cactus-blast/cactus-align --includeRoot), treat it as an + # outgroup rather than an ingroup, controlled by in the config (on by + # default). As an ingroup the root joins the flat all-against-all chaining and can be starved of + # primary alignments by ingroups that align better to each other than to the (often deeper) root, + # collapsing the root's coverage to its descendants. As an outgroup it is instead covered via the + # ingroup->outgroup scheme and cannot be starved (see chain_alignments_splitting_ingroups_and_outgroups). + pin_root_as_outgroup = root_provided and getOptionalAttrib( + params.find("blast"), "includeRootAsOutgroup", typeFn=bool, default=True) + if root_provided and not pin_root_as_outgroup: ingroup_events.append(ancestor_event) outgroup_events = [event for event in get_leaves(event_tree) if event not in ingroup_events] # Set of outgroups + if pin_root_as_outgroup: + outgroup_events.append(ancestor_event) logger.info("Got ingroup events: {} for ancestor event: {}".format(" ".join([i.iD for i in ingroup_events]), ancestor_event_string)) diff --git a/src/cactus/setup/cactus_align.py b/src/cactus/setup/cactus_align.py index 21f8fade0..41a0f4c63 100644 --- a/src/cactus/setup/cactus_align.py +++ b/src/cactus/setup/cactus_align.py @@ -91,7 +91,7 @@ def main(): " in the output. If no root is specifed then the root" " of the tree is used. ", default=None) parser.add_argument("--includeRoot", action="store_true", help="Include the root's sequence in the alignment" - " (used only when running alignment update recipes)") + " (used only when running alignment update recipes)") parser.add_argument("--latest", dest="latest", action="store_true", help="Use the latest version of the docker container " "rather than pulling one matching this version of cactus") @@ -292,6 +292,11 @@ def make_align_job(options, toil, config_wrapper=None, chrom_name=None): if options.root not in input_seq_map: raise RuntimeError("--includeRoot specified but root, {}, not found in input Seqfile".format(options.root)) event_set.add(options.root) + # NOTE: we deliberately do NOT add the root to og_map here. Marking the reference event as an + # outgroup of itself flows into cactus_consolidated's --outgroupEvents AND export_hal's + # --outgroups, which breaks the c2h/HAL build (the reference is the root whose bottom segments + # define the alignment; outgroups have no bottom segments). The root-as-outgroup treatment is + # applied only on the cactus-blast side (see make_paf_alignments / ). if options.reference and options.pangenome: # validate the sample names diff --git a/src/cactus/update/cactus_hal2seqfile.py b/src/cactus/update/cactus_hal2seqfile.py new file mode 100644 index 000000000..f443a4175 --- /dev/null +++ b/src/cactus/update/cactus_hal2seqfile.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python3 + +# Released under the MIT license, see LICENSE.txt + +"""Export a Cactus seqfile (plus gzipped per-genome FASTAs) for the subtree rooted at a +given ancestor of a HAL alignment. + +For the chosen --root ancestor and every genome below it in the tree, this runs +``hal2fasta | bgzip`` (one Toil job per genome, in parallel) to write +``/.fa.gz``, and writes a seqfile (newick subtree on the first line, +then ``\\t`` lines) to . The ancestor's own sequence is included, +so the seqfile is ready for re-aligning the subclade and pinning it back with e.g. +``cactus-blast``/``cactus-align --includeRoot``. + +The input HAL is symlinked (not copied) into the jobstore and read by each per-genome +job via symlink, so re-running this on a large HAL doesn't replicate it once per genome. + +Example: + cactus-hal2seqfile ./jobstore mammals.hal ./anc1-fastas ./anc1.seqfile --root Anc1 +""" + +import os +import sys +import timeit + +from sonLib.nxnewick import NXNewick + +from toil.job import Job +from toil.common import Toil +from toil.statsAndLogging import logger +from toil.statsAndLogging import set_logging_from_options + +from cactus.shared.common import setupBinaries, importSingularityImage +from cactus.shared.common import enableDumpStack +from cactus.shared.common import cactus_override_toil_options +from cactus.shared.common import makeURL, cactus_call, cactus_clamp_memory +from cactus.shared.version import cactus_commit + + +def hal_tree_and_lengths(hal_path): + """Return (newick tree string, {genome: sequence length}) from a single `halStats` call.""" + out = cactus_call(parameters=['halStats', hal_path], check_output=True) + tree, lengths, in_table = None, {}, False + for line in out.splitlines(): + line = line.strip() + if not line: + continue + if line.startswith('(') and line.endswith(';'): + tree = line + elif line.startswith('GenomeName'): + in_table = True + elif in_table: + toks = [t.strip() for t in line.split(',')] + if len(toks) >= 3: + lengths[toks[0]] = int(toks[2]) + if tree is None: + tree = cactus_call(parameters=['halStats', hal_path, '--tree'], check_output=True).strip() + return tree, lengths + + +def get_subtree(tree_string, root): + """Parse the newick tree and return (subtree_newick, [genome names]) for the subtree + rooted at ``root`` (the ancestor itself plus everything below it, in pre-order).""" + tree = NXNewick().parseString(tree_string) + name_to_id = {tree.getName(node): node for node in tree.breadthFirstTraversal()} + if root not in name_to_id: + raise RuntimeError('--root "{}" not found in HAL tree. Available genomes: {}'.format( + root, ' '.join(sorted(n for n in name_to_id if n)))) + root_id = name_to_id[root] + + def genome_names(node_id): + names = [tree.getName(node_id)] + for child in tree.getChildren(node_id): + names += genome_names(child) + return names + + def to_newick(node_id): + children = list(tree.getChildren(node_id)) + if not children: + return tree.getName(node_id) + parts = [] + for child in children: + sub = to_newick(child) + weight = tree.getWeight(node_id, child, None) + if weight is not None: + sub += ':{:g}'.format(weight) + parts.append(sub) + return '({}){}'.format(','.join(parts), tree.getName(node_id)) + + return to_newick(root_id) + ';', genome_names(root_id) + + +def export_subtree_fastas(job, hal_id, hal_name, genomes, lengths): + """Fan out one hal2fasta|bgzip job per genome (run in parallel by Toil). + Returns a {genome: fasta file id} map.""" + fa_ids = {} + for genome in genomes: + # the HAL is read by symlink (below), so each job only needs disk for its own FASTA + length = lengths.get(genome, hal_id.size) + fa_ids[genome] = job.addChildJobFn(hal2fasta_gz, hal_id, hal_name, genome, + memory=cactus_clamp_memory(3000000000), + disk=max(int(length * 2), 2**20)).rv() + return fa_ids + + +def hal2fasta_gz(job, hal_id, hal_name, genome): + """hal2fasta | bgzip, reading the HAL by symlink so that parallel jobs share a + single copy of the (possibly very large) HAL rather than each copying it out.""" + work_dir = job.fileStore.getLocalTempDir() + hal_path = os.path.join(work_dir, hal_name) + job.fileStore.readGlobalFile(hal_id, hal_path, mutable=False, symlink=True) + fa_path = os.path.join(work_dir, '{}.fa.gz'.format(genome)) + cactus_call(parameters=[['hal2fasta', hal_path, genome], ['bgzip', '--threads', str(job.cores)]], + outfile=fa_path) + return job.fileStore.writeGlobalFile(fa_path) + + +def main(): + parser = Job.Runner.getDefaultArgumentParser() + + parser.add_argument("halFile", help="input HAL alignment") + parser.add_argument("outDir", help="output directory for the gzipped per-genome FASTAs (created if needed)") + parser.add_argument("seqFile", help="output seqfile to write (newick subtree followed by \\t lines)") + parser.add_argument("--root", default=None, + help="ancestor (internal node) whose subtree -- the node and everything below it -- is " + "exported. Its own sequence is included so the seqfile works with --includeRoot. " + "[default: the root of the HAL tree]") + + parser.add_argument("--latest", dest="latest", action="store_true", + help="Use the latest version of the docker container " + "rather than pulling one matching this version of cactus") + parser.add_argument("--containerImage", dest="containerImage", default=None, + help="Use the the specified pre-built containter image " + "rather than pulling one from quay.io") + parser.add_argument("--binariesMode", choices=["docker", "local", "singularity"], + help="The way to run the Cactus binaries", default=None) + + options = parser.parse_args() + + setupBinaries(options) + set_logging_from_options(options) + enableDumpStack() + + # Mess with some toil options to create useful defaults. + cactus_override_toil_options(options) + + logger.info('Cactus Command: {}'.format(' '.join(sys.argv))) + logger.info('Cactus Commit: {}'.format(cactus_commit)) + start_time = timeit.default_timer() + + # Work out the subtree (genomes + newick) and per-genome sizes up front: halStats only reads + # tree metadata, so this is cheap even for big HALs, and we need the genome list to fan out the + # per-genome export jobs and the sizes to right-size their disk requirements. + tree_string, lengths = hal_tree_and_lengths(options.halFile) + root = options.root if options.root else cactus_call( + parameters=['halStats', options.halFile, '--root'], check_output=True).strip() + subtree_newick, genomes = get_subtree(tree_string, root) + + if not os.path.isdir(options.outDir): + os.makedirs(options.outDir) + out_dir = os.path.abspath(options.outDir) + + paths = {} + with Toil(options) as toil: + importSingularityImage(options) + if options.restart: + fa_ids = toil.restart() + else: + # symlink the HAL into the jobstore (rather than copying it) -- import_file already + # defaults to symlink=True in Toil, but we set it explicitly so this holds within the tool + hal_id = toil.importFile(makeURL(options.halFile), symlink=True) + fa_ids = toil.start(Job.wrapJobFn(export_subtree_fastas, hal_id, + os.path.basename(options.halFile), genomes, lengths)) + + # export each gzipped fasta to /.fa.gz + for genome, fa_id in fa_ids.items(): + out_path = os.path.join(out_dir, '{}.fa.gz'.format(genome)) + toil.exportFile(fa_id, makeURL(out_path)) + paths[genome] = out_path + + # write the seqfile: newick subtree, a blank line, then one \t per line + with open(options.seqFile, 'w') as seqfile_handle: + seqfile_handle.write(subtree_newick + '\n\n') + for genome in genomes: + seqfile_handle.write('{}\t{}\n'.format(genome, paths[genome])) + logger.info("Wrote seqfile for subtree rooted at {} ({} genomes) to {}".format( + root, len(genomes), options.seqFile)) + + run_time = timeit.default_timer() - start_time + logger.info("cactus-hal2seqfile has finished after {} seconds".format(run_time)) + + +if __name__ == '__main__': + main()