Skip to content

Commit a2c62bd

Browse files
authored
Merge pull request #35 from ftschirpke/feature/nextflow_translator
Nextflow Translator
2 parents 73ef349 + c2489ec commit a2c62bd

5 files changed

Lines changed: 345 additions & 0 deletions

File tree

docs/source/dev_api_wfbench.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,16 @@ wfcommons.wfbench.bench
1313
:show-inheritance:
1414
:private-members:
1515
:noindex:
16+
17+
wfcommons.wfbench.translator.nextflow
18+
------------------------------------
19+
20+
.. automodule:: wfcommons.wfbench.translator.nextflow
21+
:members:
22+
:undoc-members:
23+
:show-inheritance:
24+
:private-members:
25+
:noindex:
1626

1727
wfcommons.wfbench.translator.pegasus
1828
------------------------------------

docs/source/generating_workflow_benchmarks.rst

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,35 @@ description in :ref:`json-format-label`.
6767
to execute memory-intensive threads. Therefore, it is crucial to ensure that
6868
:code:`stress-ng` is installed on all worker nodes.
6969

70+
Nextflow
71+
++++++++
72+
`Nextflow <https://www.nextflow.io/>`_ is a workflow management system that enables
73+
the development of portable and reproducible workflows. It supports deploying workflows
74+
on a variety of execution platforms including local, HPC schedulers, and cloud-based
75+
and container-based environments. Below, we provide an example on how to generate
76+
workflow benchmark for running with Nextflow:
77+
78+
import pathlib
79+
80+
from wfcommons import BlastRecipe
81+
from wfcommons.wfbench import WorkflowBenchmark, NextflowTranslator
82+
83+
# create a workflow benchmark object to generate specifications based on a recipe
84+
benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500)
85+
86+
# generate a specification based on performance characteristics
87+
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6)
88+
89+
# generate a Nextflow workflow
90+
translator = NextflowTranslator(benchmark.workflow)
91+
translator.translate(output_file_name=pathlib.Path("/tmp/benchmark-workflow.nf"))
92+
93+
.. warning::
94+
95+
Nextflow's way of defining workflows does not support tasks with iterations i.e. tasks
96+
that depend on another instance of the same abstract task. Thus, the translator
97+
fails when you try to translate a workflow with iterations.
98+
7099
Pegasus
71100
+++++++
72101

docs/source/user_api_wfbench.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ wfcommons.wfbench.bench
1414
:undoc-members:
1515
:show-inheritance:
1616

17+
wfcommons.wfbench.translator.nextflow
18+
-------------------------------------
19+
20+
.. automodule:: wfcommons.wfbench.translator.nextflow
21+
:members:
22+
:undoc-members:
23+
:show-inheritance:
1724

1825
wfcommons.wfbench.translator.pegasus
1926
------------------------------------

wfcommons/wfbench/translator/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@
1111
from .dask import DaskTranslator
1212
from .pegasus import PegasusTranslator
1313
from .swift_t import SwiftTTranslator
14+
from .nextflow import NextflowTranslator
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2021-2022 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
11+
import pathlib
12+
import json
13+
14+
from collections import defaultdict
15+
from math import ceil
16+
from logging import Logger
17+
from typing import Dict, List, Optional, Union, MutableSet
18+
19+
from .abstract_translator import Translator
20+
from ...common import File, FileLink, Workflow
21+
from ...common.task import Task
22+
23+
24+
class NextflowTranslator(Translator):
25+
"""
26+
A WfFormat parser for creating Nextflow workflow applications.
27+
28+
:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
29+
:type workflow: Union[Workflow, pathlib.Path],
30+
:param logger: The logger where to log information/warning or errors (optional).
31+
:type logger: Logger
32+
"""
33+
34+
def __init__(self,
35+
workflow: Union[Workflow, pathlib.Path],
36+
logger: Optional[Logger] = None) -> None:
37+
"""Create an object of the translator."""
38+
super().__init__(workflow, logger)
39+
40+
self.script = """
41+
import groovy.json.JsonSlurper
42+
def jsonSlurper = new JsonSlurper()
43+
44+
List<String> extractTaskIDforFile(Path filepath, String task_name) {
45+
String filename = filepath as String
46+
filename = filename[filename.lastIndexOf('/')+1..-1]
47+
48+
List<String> ids_for_file = new ArrayList<String>()
49+
for (destination : file_inputs[filename]) {
50+
def destination_task_name = destination[0]
51+
def destination_task_id = destination[1]
52+
if (destination_task_name == task_name)
53+
ids_for_file.add(destination_task_id)
54+
}
55+
return ids_for_file
56+
}
57+
58+
"""
59+
60+
def translate(self, output_file_path: pathlib.Path) -> None:
61+
"""
62+
Translate a workflow benchmark description(WfFormat) into a Nextflow workflow application.
63+
64+
: param output_file_path: The name of the output file(e.g., workflow.py).
65+
: type output_file_path: pathlib.Path
66+
"""
67+
68+
# determine the abstract tasks and their abstract parents and children
69+
self.abstract_tasks = defaultdict(list)
70+
self.abstract_parents: Dict[str, MutableSet[str]] = defaultdict(set)
71+
self._determine_abstract_relations()
72+
73+
self.task_inputs: Dict[str, List[File]] = {}
74+
self.task_outputs: Dict[str, List[File]] = {}
75+
self.task_input_amounts: Dict[str, int] = {}
76+
self._determine_input_output()
77+
78+
self.workflow_inputs = set().union(*self.task_inputs.values()).difference(*self.task_outputs.values())
79+
80+
self._create_file_task_mappings(output_file_path)
81+
82+
for abstract_task_name, physical_tasks in self.abstract_tasks.items():
83+
self._create_task_args_map(output_file_path, abstract_task_name, physical_tasks)
84+
self.script += "\n\n"
85+
86+
self.task_written: Dict[str, bool] = {}
87+
for abstract_task_name, physical_tasks in self.abstract_tasks.items():
88+
self.task_written[abstract_task_name] = False
89+
self._add_abstract_task_definition(abstract_task_name, physical_tasks)
90+
91+
self.script += "workflow {\n"
92+
self.script += " workflow_inputs = Channel.fromPath(\"${params.indir}/*\")\n"
93+
self.script += "\n"
94+
95+
for abstract_task_name, physical_tasks in self.abstract_tasks.items():
96+
if not self.task_written[abstract_task_name]:
97+
self._add_call_to_abstract_task(abstract_task_name, physical_tasks)
98+
self.script += "}\n"
99+
100+
self._write_output_file(self.script, output_file_path)
101+
102+
def _is_resource_arg(self, arg: str) -> bool:
103+
return arg.startswith("--percent-cpu") or arg.startswith("--mem") \
104+
or arg.startswith("--cpu-work") or arg.startswith("--gpu-work")
105+
106+
def _determine_abstract_relations(self) -> None:
107+
"""
108+
Determines the abstract tasks that will be used for the nextflow definition of the workflow.
109+
"""
110+
111+
for task in self.tasks.values():
112+
abstract_task: str = task.category
113+
self.abstract_tasks[abstract_task].append(task)
114+
115+
for parent in self.task_parents[task.name]:
116+
abstract_parent: str = self.tasks[parent].category
117+
self.abstract_parents[abstract_task].add(abstract_parent)
118+
119+
tasks_with_iterations = set()
120+
for abstract_task in self.abstract_tasks:
121+
for abstract_parent in self.abstract_parents[abstract_task]:
122+
if abstract_parent == abstract_task:
123+
tasks_with_iterations.add(abstract_task)
124+
if tasks_with_iterations:
125+
error_msg: str = "Iterations are not supported by Nextflow. "\
126+
"Thus, this workflow has no Nextflow translation since the following "\
127+
f"{'task uses' if len(tasks_with_iterations) == 1 else 'tasks use'} "\
128+
"iterations:\n "
129+
error_msg += ", ".join(tasks_with_iterations)
130+
raise RuntimeError(error_msg)
131+
132+
def _determine_input_output(self) -> None:
133+
"""
134+
Determines the inputs and outputs for the physical and abstract tasks.
135+
"""
136+
for task in self.tasks.values():
137+
self.task_inputs[task.name] = [file for file in task.files if file.link == FileLink.INPUT]
138+
self.task_outputs[task.name] = [file for file in task.files if file.link == FileLink.OUTPUT]
139+
140+
self.script += "// define amount of input files for abstracts tasks where the amount is not constant\n"
141+
for abstract_task_name, physical_tasks in self.abstract_tasks.items():
142+
input_amounts = {task.task_id: len(self.task_inputs[task.name]) for task in physical_tasks}
143+
if (max(input_amounts.values()) == min(input_amounts.values())):
144+
# all physical tasks have the same amount of inputs
145+
self.task_input_amounts[abstract_task_name] = max(input_amounts.values())
146+
else:
147+
# define amount of inputs for each physical task
148+
self.script += f"def {self.valid_task_name(abstract_task_name)}_input_amounts = [\n"
149+
for k, v in input_amounts.items():
150+
self.script += f" \"{k}\": {v},\n"
151+
self.script += "]\n"
152+
self.task_input_amounts[abstract_task_name] = None
153+
self.script += "\n"
154+
155+
def _create_file_task_mappings(self, output_file_path: pathlib.Path) -> None:
156+
file_task_map = defaultdict(list)
157+
for task_name, task_input_files in self.task_inputs.items():
158+
task = self.tasks[task_name]
159+
for file in task_input_files:
160+
file_task_map[file.name].append([task.category, task.task_id])
161+
self._write_map_file(file_task_map, "file_inputs", output_file_path)
162+
163+
def _write_map_file(self, map_dict: Dict, map_name: str, output_file_path: pathlib.Path) -> None:
164+
path = output_file_path.parent.joinpath(f"{map_name}.json")
165+
with open(path, "w") as f:
166+
f.write(json.dumps(map_dict, indent=4))
167+
self.script += f"{map_name} = jsonSlurper.parseText(file(\"${{projectDir}}/{map_name}.json\").text)\n"
168+
169+
def _create_task_args_map(self, output_file_path: pathlib.Path, abstract_task_name: str, physical_tasks: List[Task]) -> None:
170+
map_name = f"{self.valid_task_name(abstract_task_name)}_args"
171+
task_args_map = {}
172+
for ptask in physical_tasks:
173+
out_file_sizes = {file.name: file.size for file in self.task_outputs[ptask.name]}
174+
out_arg = str(out_file_sizes).replace("{", "").replace("}", "").replace("'", "\\\"").replace(": ", ":")
175+
task_args_map[ptask.task_id] = {
176+
"out": out_arg,
177+
"resources": " ".join((arg for arg in ptask.args if self._is_resource_arg(arg)))
178+
}
179+
self._write_map_file(task_args_map, map_name, output_file_path)
180+
181+
def valid_task_name(self, original_task_name: str) -> str:
182+
return original_task_name.replace('-', '_')
183+
184+
def _add_abstract_task_definition(self, abstract_task_name: str, physical_tasks: List[Task]) -> None:
185+
"""
186+
Add an abstract task to the workflow considering it's physical tasks.
187+
188+
: param abstract_task_name: the name of the abstract task
189+
: type abstract_task_name: str
190+
: param physical_tasks: a list of physical tasks for this abstract tasks
191+
: type physical_tasks: List[Task]
192+
"""
193+
194+
cores_values = [task.cores for task in physical_tasks if task.cores is not None]
195+
if len(cores_values) == 0:
196+
cores = None
197+
else:
198+
cores = int(max(cores_values))
199+
memory_values = [task.memory for task in physical_tasks if task.memory is not None]
200+
if len(memory_values) == 0:
201+
memory = None
202+
else:
203+
memory = max(memory_values) * 1.05
204+
205+
# creating the command for the abstract task using the first physical task as a template
206+
example_task = physical_tasks[0]
207+
cmd = pathlib.Path(example_task.program).name
208+
resource_args_done = False
209+
for a in example_task.args:
210+
if a == f"{abstract_task_name}_{example_task.task_id}":
211+
cmd += f" {abstract_task_name}_${{id}}"
212+
continue
213+
if self._is_resource_arg(a):
214+
if resource_args_done:
215+
continue
216+
cmd += " ${" + self.valid_task_name(abstract_task_name) + "_args.get(id).get(\"resources\")}"
217+
resource_args_done = True
218+
elif a.startswith("--out"):
219+
cmd += " --out \"{${" + self.valid_task_name(abstract_task_name) + "_args.get(id).get(\"out\")}}\""
220+
cmd += " \\$inputs"
221+
break
222+
else:
223+
a = a.replace(f"{abstract_task_name}_{example_task.task_id}", f"{abstract_task_name}_${{id}}")
224+
cmd += " " + a.replace("'", "\"")
225+
226+
# creating the abstract task
227+
self.script += f"process task_{self.valid_task_name(abstract_task_name)}" + " {\n"
228+
if cores:
229+
self.script += f" cpus {cores}\n"
230+
if memory:
231+
self.script += f" memory '{human_readable_memory(memory)}'\n"
232+
self.script += " input:\n"
233+
self.script += " tuple val( id ), path( \"*\" )\n"
234+
self.script += f" output:\n path( \"{self.valid_task_name(abstract_task_name)}_????????_outfile_????*\" )\n"
235+
self.script += " script:\n"
236+
self.script += " \"\"\"\n"
237+
self.script += " inputs=\\$(find . -maxdepth 1 -name \\\"workflow_infile_*\\\" -or -name \\\"*_outfile_0*\\\")\n"
238+
self.script += f" {cmd}\n"
239+
self.script += " \"\"\"\n"
240+
self.script += "}\n"
241+
242+
def _add_call_to_abstract_task(self, abstract_task_name: str, physical_tasks: List[Task]) -> None:
243+
parents = self.abstract_parents[abstract_task_name]
244+
for parent in parents:
245+
if parent == abstract_task_name:
246+
raise RuntimeError("Iterations are not supported by Nextflow.")
247+
if not self.task_written[parent]:
248+
self._add_call_to_abstract_task(parent, self.abstract_tasks[parent])
249+
250+
# determining the channel of all raw inputs (outputs of other tasks)
251+
input_channels = [
252+
f"{self.valid_task_name(parent)}_out" for parent in parents]
253+
example_task = physical_tasks[0]
254+
255+
for input_file in self.task_inputs[example_task.name]:
256+
if input_file in self.workflow_inputs:
257+
input_channels.append("workflow_inputs")
258+
break
259+
260+
if len(input_channels) == 1:
261+
inputs_channel = input_channels[0]
262+
elif len(input_channels) != 0:
263+
# concatenating all the input channels into one big channel
264+
one_channel = input_channels.pop()
265+
inputs_channel = f"concatenated_FOR_{self.valid_task_name(abstract_task_name)}"
266+
self.script += f" {inputs_channel} = {one_channel}.concat({', '.join(input_channels)})\n"
267+
else:
268+
raise RuntimeError(f"The abstract task {abstract_task_name} has no inputs.")
269+
270+
# creating the input channel for this abstract task by grouping the outputs from the parents by id
271+
self.script += f" {self.valid_task_name(abstract_task_name)}_in = {inputs_channel}.flatten().flatMap{{\n"
272+
self.script += f" List<String> ids = extractTaskIDforFile(it, \"{abstract_task_name}\")\n"
273+
self.script += " def pairs = new ArrayList()\n"
274+
self.script += " for (id : ids) pairs.add([id, it])\n"
275+
self.script += " return pairs\n"
276+
self.script += " }"
277+
278+
if self.task_input_amounts[abstract_task_name]:
279+
self.script += f".groupTuple(size: {self.task_input_amounts[abstract_task_name]})\n"
280+
else:
281+
self.script += f".map {{ id, file -> tuple( groupKey(id, {self.valid_task_name(abstract_task_name)}_input_amounts[id]), file ) }}\n"
282+
self.script += " .groupTuple()\n"
283+
284+
self.script += f" {self.valid_task_name(abstract_task_name)}_out = task_"
285+
self.script += f"{self.valid_task_name(abstract_task_name)}({self.valid_task_name(abstract_task_name)}_in)\n\n"
286+
287+
self.task_written[abstract_task_name] = True
288+
289+
290+
def human_readable_memory(mem_bytes: int) -> str:
291+
idx = 0
292+
memory = mem_bytes
293+
memory_units = ["B", "KB", "MB", "GB", "TB"]
294+
while memory > 4096 and idx < len(memory_units) - 1:
295+
memory /= 1024
296+
idx += 1
297+
memory = ceil(memory * 100) / 100 # ensure that it is an upper bound
298+
return f"{memory:.2f} {memory_units[idx]}"

0 commit comments

Comments
 (0)