Skip to content

Commit 97d1e95

Browse files
Merge pull request #128 from wfcommons/makeflow_translator
Implemented a Makeflow translator
2 parents f1091e5 + baa9b9b commit 97d1e95

7 files changed

Lines changed: 201 additions & 7 deletions

File tree

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# docker build --platform amd64 -t wfcommons-dev -f Dockerfile.parsl .
2+
# docker run -it --rm -v `pwd`:/home/wfcommons wfcommons-dev /bin/bash
3+
4+
FROM amd64/ubuntu:noble
5+
6+
LABEL org.containers.image.authors="henric@hawaii.edu"
7+
8+
# update repositories
9+
RUN apt-get update
10+
11+
# set timezone
12+
RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata
13+
14+
# install useful stuff
15+
RUN apt-get -y install pkg-config
16+
RUN apt-get -y install git
17+
RUN apt-get -y install wget
18+
RUN apt-get -y install curl
19+
RUN apt-get -y install make
20+
RUN apt-get -y install cmake
21+
RUN apt-get -y install cmake-data
22+
RUN apt-get -y install sudo
23+
RUN apt-get -y install vim --fix-missing
24+
RUN apt-get -y install gcc
25+
RUN apt-get -y install gcc-multilib
26+
27+
# Python stuff
28+
RUN apt-get -y install python3 python3-pip
29+
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1
30+
RUN python3 -m pip install --break-system-packages pathos pandas filelock
31+
RUN python3 -m pip install --break-system-packages networkx scipy matplotlib
32+
RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests
33+
RUN python3 -m pip install --break-system-packages --upgrade setuptools
34+
35+
# Stress-ng
36+
RUN apt-get -y install stress-ng
37+
38+
# Add wfcommons user
39+
RUN useradd -ms /bin/bash wfcommons
40+
RUN adduser wfcommons sudo
41+
RUN echo '%sudo ALL=(ALL) NOPASSWD:ALL' >> /etc/sudoers
42+
ENV PATH="$PATH:/home/wfcommons/.local/bin/"
43+
44+
USER wfcommons
45+
WORKDIR /home/wfcommons
46+
47+
# Install Miniforge
48+
RUN wget -O Miniforge3.sh "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh" && \
49+
bash Miniforge3.sh -b -p "${HOME}/conda" && \
50+
rm Miniforge3.sh
51+
52+
# Make sure conda is available in each shell session
53+
RUN echo "source ${HOME}/conda/etc/profile.d/conda.sh" >> ${HOME}/.bashrc
54+
RUN echo "conda activate" >> ${HOME}/.bashrc
55+
56+
# Install necessary packages
57+
RUN . ${HOME}/conda/etc/profile.d/conda.sh && \
58+
conda activate base && \
59+
conda install -c conda-forge ndcctools && \
60+
conda clean --all -f -y
61+

tests/translators_loggers/build_docker_docker_images.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
set -e
44

5-
for backend in "dask" "parsl" "nextflow" "airflow" "bash" "taskvine" "cwl" "pegasus" "swiftt"; do
5+
for backend in "dask" "parsl" "nextflow" "airflow" "bash" "taskvine" "makeflow" "cwl" "pegasus" "swiftt"; do
66
echo "Building $backend Docker image..."
77
docker build --platform linux/amd64 -t wfcommons/wfcommons-testing-$backend -f Dockerfile.$backend .
88
done

tests/translators_loggers/test_translators_loggers.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import sys
1515
import json
1616
import time
17-
import networkx
17+
import re
1818

1919
from tests.test_helpers import _create_fresh_local_dir
2020
from tests.test_helpers import _remove_local_dir_if_it_exists
@@ -31,6 +31,7 @@
3131
from wfcommons.wfbench import AirflowTranslator
3232
from wfcommons.wfbench import BashTranslator
3333
from wfcommons.wfbench import TaskVineTranslator
34+
from wfcommons.wfbench import MakeflowTranslator
3435
from wfcommons.wfbench import CWLTranslator
3536
from wfcommons.wfbench import PegasusTranslator
3637
from wfcommons.wfbench import SwiftTTranslator
@@ -105,6 +106,7 @@ def _additional_setup_swiftt(container):
105106
"airflow": noop,
106107
"bash": noop,
107108
"taskvine": _additional_setup_taskvine,
109+
"makeflow": noop,
108110
"cwl": noop,
109111
"pegasus": _additional_setup_pegasus,
110112
"swiftt": _additional_setup_swiftt,
@@ -161,6 +163,14 @@ def run_workflow_taskvine(container, num_tasks, str_dirpath):
161163
assert (exit_code == 0)
162164
assert (output.decode().count("completed") == num_tasks)
163165

166+
def run_workflow_makeflow(container, num_tasks, str_dirpath):
167+
# Run the workflow (with full logging)
168+
exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && makeflow --log-verbose ./workflow.makeflow"], stdout=True, stderr=True)
169+
# Check sanity
170+
assert (exit_code == 0)
171+
num_completed_jobs = len(re.findall(r'job \d+ completed', output.decode()))
172+
assert (num_completed_jobs == num_tasks)
173+
164174
def run_workflow_cwl(container, num_tasks, str_dirpath):
165175
# Run the workflow!
166176
# Note that the input file is hardcoded and Blast-specific
@@ -194,6 +204,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
194204
"airflow": run_workflow_airflow,
195205
"bash": run_workflow_bash,
196206
"taskvine": run_workflow_taskvine,
207+
"makeflow": run_workflow_makeflow,
197208
"cwl": run_workflow_cwl,
198209
"pegasus": run_workflow_pegasus,
199210
"swiftt": run_workflow_swiftt,
@@ -206,6 +217,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
206217
"airflow": AirflowTranslator,
207218
"bash": BashTranslator,
208219
"taskvine": TaskVineTranslator,
220+
"makeflow": MakeflowTranslator,
209221
"cwl": CWLTranslator,
210222
"pegasus": PegasusTranslator,
211223
"swiftt": SwiftTTranslator,
@@ -224,6 +236,7 @@ class TestTranslators:
224236
"airflow",
225237
"bash",
226238
"taskvine",
239+
"makeflow",
227240
"cwl",
228241
"pegasus",
229242
])

wfcommons/wfbench/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@
99
# (at your option) any later version.
1010

1111
from .bench import WorkflowBenchmark
12-
from .translator import AirflowTranslator, DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator, CWLTranslator, BashTranslator, PyCompssTranslator
12+
from .translator import AirflowTranslator, DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator, MakeflowTranslator, CWLTranslator, BashTranslator, PyCompssTranslator

wfcommons/wfbench/translator/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@
1818
from .pycompss import PyCompssTranslator
1919
from .swift_t import SwiftTTranslator
2020
from .taskvine import TaskVineTranslator
21+
from .makeflow import MakeflowTranslator
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2024-2025 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
11+
import pathlib
12+
import shutil
13+
14+
from logging import Logger
15+
from typing import Optional, Union
16+
17+
from .abstract_translator import Translator
18+
from ...common import Workflow
19+
20+
this_dir = pathlib.Path(__file__).resolve().parent
21+
22+
class MakeflowTranslator(Translator):
23+
"""
24+
A WfFormat parser for creating Makeflow workflow applications.
25+
26+
:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
27+
:type workflow: Union[Workflow, pathlib.Path],
28+
:param logger: The logger where to log information/warning or errors (optional).
29+
:type logger: Logger
30+
"""
31+
def __init__(self,
32+
workflow: Union[Workflow, pathlib.Path],
33+
logger: Optional[Logger] = None) -> None:
34+
"""Create an object of the translator."""
35+
super().__init__(workflow, logger)
36+
self._script = ""
37+
38+
def translate(self, output_folder: pathlib.Path) -> None:
39+
"""
40+
Translate a workflow benchmark description (WfFormat) into an actual workflow application.
41+
42+
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
43+
:type output_folder: pathlib.Path
44+
"""
45+
46+
# Generate code
47+
self._generate_code()
48+
49+
# write benchmark files
50+
output_folder.mkdir(parents=True)
51+
with open(output_folder.joinpath("workflow.makeflow"), "w") as fp:
52+
fp.write(self._script)
53+
54+
# additional files
55+
self._copy_binary_files(output_folder)
56+
self._generate_input_files(output_folder)
57+
58+
# README file
59+
self._write_readme_file(output_folder)
60+
61+
def _generate_code(self):
62+
"""
63+
Generate the Makeflow code
64+
65+
:return: the code
66+
:rtype: str
67+
"""
68+
self._script = "# Makeflow workflow specification\n\n"
69+
for task_name, task in self.workflow.tasks.items():
70+
make_clause = ""
71+
# output files
72+
for output_file in task.output_files:
73+
make_clause += f"data/{output_file.file_id} "
74+
make_clause += ": "
75+
# input files
76+
for input_file in task.input_files:
77+
make_clause += f"data/{input_file.file_id} "
78+
make_clause += "\n"
79+
# Command
80+
make_clause += "\t"
81+
make_clause += task.program + " "
82+
83+
input_spec = "\"["
84+
for file in task.input_files:
85+
input_spec += f"\\\\\"data/{file.file_id}\\\\\","
86+
input_spec = input_spec[:-1] + "]\""
87+
88+
output_spec = "\"{"
89+
for file in task.output_files:
90+
output_spec += f"\\\\\"data/{file.file_id}\\\\\":{str(file.size)},"
91+
output_spec = output_spec[:-1] + "}\""
92+
93+
args = []
94+
for a in task.args:
95+
if "--output-files" in a:
96+
args.append(f"--output-files {output_spec}")
97+
elif "--input-files" in a:
98+
args.append(f"--input-files {input_spec}")
99+
else:
100+
args.append(a)
101+
102+
args = " ".join(f"{a}" for a in args)
103+
make_clause += args + "\n"
104+
self._script += make_clause + "\n\n"
105+
return
106+
107+
def _write_readme_file(self, output_folder: pathlib.Path) -> None:
108+
"""
109+
Write the README file.
110+
111+
:param output_folder: The path of the output folder.
112+
:type output_folder: pathlib.Path
113+
"""
114+
readme_file_path = output_folder.joinpath("README")
115+
with open(readme_file_path, "w") as out:
116+
out.write(f"In directory {str(output_folder)}:\n")
117+
out.write(f" - The Makeflow input file: workflow.makeflow\n")
118+
out.write(f" - Run the workflow: makeflow workflow.makeflow\n")

wfcommons/wfbench/translator/taskvine.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def __init__(self,
3434
logger: Optional[Logger] = None) -> None:
3535
"""Create an object of the translator."""
3636
super().__init__(workflow, logger)
37+
self._script = ""
3738
self.parsed_tasks = []
3839
self.task_counter = 1
3940
self.output_files_map = {}
@@ -45,16 +46,16 @@ def translate(self, output_folder: pathlib.Path) -> None:
4546
:param output_folder: The path to the folder in which the workflow benchmark will be generated.
4647
:type output_folder: pathlib.Path
4748
"""
48-
self.script = "# workflow tasks\n"
49+
self._script = "# workflow tasks\n"
4950

5051
# add tasks per level
5152
self.next_level = self.root_task_names.copy()
5253
while self.next_level:
5354
self.next_level = self._add_level_tasks(self.next_level)
54-
self.script += "wait_for_tasks_completion()\n"
55+
self._script += "wait_for_tasks_completion()\n"
5556

5657
# generate code
57-
run_workflow_code = self._merge_codelines("templates/taskvine_template.py", self.script)
58+
run_workflow_code = self._merge_codelines("templates/taskvine_template.py", self._script)
5859

5960
# generate Flowcept code
6061
if self.workflow.workflow_id is not None:
@@ -156,7 +157,7 @@ def _add_task(self, task_name: str, parent_task: Optional[str] = None) -> list[s
156157
args = " ".join(f"{a}" for a in args)
157158

158159
# write task
159-
self.script += f"t_{self.task_counter} = vine.Task('{task.program} {args}')\n" \
160+
self._script += f"t_{self.task_counter} = vine.Task('{task.program} {args}')\n" \
160161
f"t_{self.task_counter}.set_cores(1)\n{task_script}"
161162

162163
self.task_counter += 1

0 commit comments

Comments
 (0)