Skip to content

Commit ab3d87c

Browse files
committed
Merge branch 'main' into flowcept_improvements
2 parents a9f40dd + 958eeed commit ab3d87c

6 files changed

Lines changed: 309 additions & 2 deletions

File tree

tests/translators/README

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
For now these are individual Dockerfiles for each workflow system, which can be used for local testing/development. The idea is to eventually merge them all (if possible), and great a Github action for testing all translators.
1+
For now these are individual Dockerfiles for each workflow system, which can be used for local testing/development. The idea is to eventually merge them all (if possible), and create a Github action for testing all translators.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# docker build -t wfcommons-dev -f Dockerfile_Airflow .
2+
# docker run -it --rm -v .:/home/wfcommons/mount wfcommons-dev /bin/bash
3+
4+
FROM ubuntu:noble
5+
6+
LABEL org.containers.image.authors="sukaryor@hawaii.edu"
7+
8+
# update repositories
9+
RUN apt-get update
10+
11+
# set timezone
12+
RUN echo "America/Los_Angeles" > /etc/timezone && export DEBIAN_FRONTEND=noninteractive && apt-get install -y tzdata
13+
14+
# install useful stuff
15+
RUN apt-get -y install pkg-config
16+
RUN apt-get -y install git
17+
RUN apt-get -y install wget
18+
RUN apt-get -y install make
19+
RUN apt-get -y install cmake
20+
RUN apt-get -y install cmake-data
21+
RUN apt-get -y install sudo
22+
RUN apt-get -y install vim --fix-missing
23+
RUN apt-get -y install gcc
24+
#RUN apt-get -y install gcc-multilib
25+
26+
# Python stuff
27+
RUN apt-get -y install python3 python3-pip
28+
RUN python3 -m pip install --break-system-packages pathos pandas filelock
29+
RUN python3 -m pip install --break-system-packages networkx scipy matplotlib
30+
RUN python3 -m pip install --break-system-packages pyyaml jsonschema requests
31+
RUN update-alternatives --install /usr/bin/python python /usr/bin/python3 1
32+
33+
# Stress-ng
34+
RUN apt-get -y install stress-ng
35+
36+
# WfCommons
37+
RUN python3 -m pip install --break-system-packages wfcommons
38+
39+
# Install Airflow
40+
RUN python3 -m pip install --break-system-packages apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt"
41+
42+
# Install MySQL/MyClient
43+
RUN apt-get install -y mysql-server
44+
RUN apt-get install -y python3-dev build-essential
45+
RUN apt-get install -y default-libmysqlclient-dev
46+
RUN python3 -m pip install --break-system-packages mysqlclient
47+
48+
# Setup directory
49+
RUN mkdir /home/wfcommons
50+
51+
# Create an entrypoint script to start mysqld in the background
52+
# and setup the Airflow DB
53+
RUN echo '#!/bin/bash' > /entrypoint.sh && \
54+
echo 'mysqld --explicit-defaults-for-timestamp &' >> /entrypoint.sh && \
55+
echo 'until mysqladmin ping -h localhost --silent; do' >> /entrypoint.sh && \
56+
echo ' echo "Waiting for MySQL to be ready..."' >> /entrypoint.sh && \
57+
echo ' sleep 2' >> /entrypoint.sh && \
58+
echo 'done' >> /entrypoint.sh && \
59+
echo 'echo "MySQL is ready!"' >> /entrypoint.sh && \
60+
echo 'echo "Setting up database for Airflow..."' >> /entrypoint.sh && \
61+
echo 'mysql -u root -e "CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci";' >> /entrypoint.sh && \
62+
echo "mysql -u root -e \"CREATE USER 'airflow_user'@'%' IDENTIFIED BY 'airflow_pass';\"" >> /entrypoint.sh && \
63+
echo "mysql -u root -e \"GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';\"" >> /entrypoint.sh && \
64+
echo 'export AIRFLOW_HOME="$(pwd)"/airflow/' >> /entrypoint.sh && \
65+
echo "airflow config list 1> /dev/null" >> /entrypoint.sh && \
66+
echo "sed -i ./airflow/airflow.cfg -e 's/sqlite:.*/mysql+mysqldb:\/\/airflow_user:airflow_pass@localhost:3306\/airflow_db/'" >> /entrypoint.sh && \
67+
echo 'airflow db migrate' >> /entrypoint.sh && \
68+
echo 'echo "Airflow database setup!"' >> /entrypoint.sh && \
69+
echo 'mkdir ./airflow/dags' >> /entrypoint.sh && \
70+
echo 'exec bash' >> /entrypoint.sh && \
71+
chmod +x /entrypoint.sh
72+
73+
WORKDIR /home/wfcommons
74+
75+
ENTRYPOINT ["/entrypoint.sh"]

tests/translators/airflow/README

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
This README file describes steps to install/run Airflow, and then run a
2+
translated workflow.
3+
4+
There are three sections:
5+
- Installing Airflow on bare-metal
6+
- Installing Airflow via Docker
7+
- Running a translated workflow
8+
9+
10+
Install Airflow on bare-metal
11+
------------------------------
12+
13+
1. Install Airflow
14+
15+
pip install apache-airflow==2.10.2 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.12.txt"
16+
17+
2. Install MySQL and MySQLClient
18+
19+
apt-get -y install pkg-config
20+
apt-get install -y mysql-server
21+
apt-get install -y python3-dev build-essential
22+
apt-get install -y default-libmysqlclient-dev
23+
pip install mysqlclient
24+
25+
3. Setup database for Airflow
26+
27+
mysqld --explicit-defaults-for-timestamp &
28+
In MySQL client, type the following:
29+
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
30+
CREATE USER 'airflow_user'@'%' IDENTIFIED BY 'airflow_pass';
31+
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';
32+
33+
4. Set env variable for Airflow's home directory
34+
35+
export AIRFLOW_HOME="$(pwd)"
36+
37+
5. Edit $AIRFLOW_HOME/airflow.cfg (may need to run `airflow dags list` to create this file in the first place)
38+
39+
Update the "sql_alchemy_conn = ..." line to be:
40+
41+
sql_alchemy_conn = mysql+mysqldb://airflow_user:airflow_pass@localhost:3306/airflow_db
42+
43+
6. Finish setting up the database
44+
45+
airflow db migrate
46+
47+
48+
Installing Airflow via Docker
49+
-----------------------------
50+
51+
A much simpler alternative is to use Docker.
52+
53+
1. Build the docker image
54+
55+
docker build -t wfcommons-dev -f Dockerfile_Airflow .
56+
57+
(if building on a Mac, add the `--platform linux/amd64` argument after build above)
58+
59+
2. Run the docker container in the directory to contains the translated
60+
workflow (see last section below)
61+
62+
docker run -it --rm -v .:/home/wfcommons/mount wfcommons-dev /bin/bash
63+
64+
65+
Running a translated workflow with Airflow
66+
-------------------------------------------
67+
68+
Assuming that you have run the airflow translator, for instance, using this Python code:
69+
70+
```
71+
import pathlib
72+
73+
from wfcommons import BlastRecipe
74+
from wfcommons.wfbench import WorkflowBenchmark, AirflowTranslator
75+
76+
# create a workflow benchmark object to generate specifications based on a recipe
77+
benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=45)
78+
79+
# generate a specification based on performance characteristics
80+
benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6)
81+
82+
# generate an Airflow workflow
83+
translator = AirflowTranslator(benchmark.workflow)
84+
translator.translate(output_folder=pathlib.Path("/tmp/translated_workflow/"))
85+
```
86+
87+
The above will create a JSON worfklow file in /tmp/blast-benchmark-45.json.
88+
In that file, the workflow name (this is used below) is set to
89+
"Blast-Benchmark".
90+
91+
The above will also create the translated workflow the
92+
/tmp/translated_workflow/ directory. Some directories and files need to be copied/moved as follows:
93+
94+
cp -r /tmp/translated_workflow/ $AIRFLOW_HOME/dags/
95+
mv $AIRFLOW_HOME/dags/translated_workflow/workflow.py $AIRFLOW_HOME/dags/
96+
97+
Finally, run the workflow as:
98+
99+
airflow dags test Blast-Benchmark (not the "Blast-Benchmark" workflow name from above)
100+
101+
102+

wfcommons/wfbench/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@
99
# (at your option) any later version.
1010

1111
from .bench import WorkflowBenchmark
12-
from .translator import DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator, CWLTranslator, BashTranslator, PyCompssTranslator
12+
from .translator import AirflowTranslator, DaskTranslator, NextflowTranslator, ParslTranslator, PegasusTranslator, SwiftTTranslator, TaskVineTranslator, CWLTranslator, BashTranslator, PyCompssTranslator

wfcommons/wfbench/translator/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
1010

11+
from .airflow import AirflowTranslator
1112
from .dask import DaskTranslator
1213
from .nextflow import NextflowTranslator
1314
from .parsl import ParslTranslator
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2021-2024 The WfCommons Team.
5+
#
6+
# This program is free software: you can redistribute it and/or modify
7+
# it under the terms of the GNU General Public License as published by
8+
# the Free Software Foundation, either version 3 of the License, or
9+
# (at your option) any later version.
10+
11+
import pathlib
12+
import re
13+
import ast
14+
import json
15+
16+
from logging import Logger
17+
from typing import Optional, Union
18+
19+
from .abstract_translator import Translator
20+
from ...common import Workflow
21+
22+
class AirflowTranslator(Translator):
23+
"""
24+
A WfFormat parser for creating Nextflow workflow applications.
25+
26+
:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
27+
:type workflow: Union[Workflow, pathlib.Path],
28+
:param logger: The logger where to log information/warning or errors (optional).
29+
:type logger: Logger
30+
"""
31+
32+
def __init__(self,
33+
workflow: Union[Workflow, pathlib.Path],
34+
logger: Optional[Logger] = None) -> None:
35+
"""Create an object of the translator."""
36+
super().__init__(workflow, logger)
37+
38+
self.script = f"""
39+
from __future__ import annotations
40+
41+
import os
42+
from datetime import datetime
43+
from airflow.models.dag import DAG
44+
from airflow.operators.bash import BashOperator
45+
46+
with DAG(
47+
"{self.workflow.name}",
48+
description="airflow translation of a wfcommons instance",
49+
schedule="0 0 * * *",
50+
start_date=datetime(2021, 1, 1),
51+
catchup=False,
52+
tags=["wfcommons"],
53+
) as dag:
54+
"""
55+
56+
def translate(self, output_folder: pathlib.Path) -> None:
57+
"""
58+
Translate a workflow benchmark description(WfFormat) into an Airflow workflow application.
59+
60+
:param output_folder: The name of the output folder.
61+
:type output_folder: pathlib.Path
62+
"""
63+
64+
self._prep_commands(output_folder)
65+
66+
for task in self.tasks.values():
67+
self.script += f"""
68+
{task.task_id} = BashOperator(
69+
task_id="{task.task_id}",
70+
depends_on_past=False,
71+
bash_command='{self.task_commands[task.task_id]}',
72+
env={{"AIRFLOW_HOME": os.environ["AIRFLOW_HOME"]}},
73+
retries=3,
74+
)
75+
"""
76+
for task in self.tasks.values():
77+
parents = ", ".join(self.task_parents[task.task_id])
78+
if parents:
79+
self.script += f"""
80+
[{parents}] >> {task.task_id}
81+
"""
82+
# write benchmark files
83+
output_folder.mkdir(parents=True)
84+
with open(output_folder.joinpath("workflow.py"), "w") as fp:
85+
fp.write(self.script)
86+
87+
# additional files
88+
self._copy_binary_files(output_folder)
89+
self._generate_input_files(output_folder)
90+
91+
def _prep_commands(self, output_folder: pathlib.Path) -> None:
92+
"""
93+
Prepares the bash_command strings for the BashOperators.
94+
95+
:param output_folder: The name of the output folder.
96+
:type output_folder: pathlib.Path
97+
"""
98+
self.task_commands = {}
99+
100+
for task in self.tasks.values():
101+
program = task.program
102+
args = []
103+
for a in task.args:
104+
if "--output-files" in a:
105+
flag, output_files_dict = a.split(" ", 1)
106+
output_files_dict = {str(f"${{AIRFLOW_HOME}}/dags/{output_folder.name}/data/{key}"): value for key, value in ast.literal_eval(output_files_dict).items()}
107+
a = f"{flag} {json.dumps(output_files_dict)}"
108+
elif "--input-files" in a:
109+
flag, input_files_arr = a.split(" ", 1)
110+
input_files_arr = [str(f"${{AIRFLOW_HOME}}/dags/{output_folder.name}/data/{file}") for file in ast.literal_eval(input_files_arr)]
111+
a = f"{flag} {json.dumps(input_files_arr)}"
112+
else:
113+
a = a.replace("'", "\"")
114+
args.append(a)
115+
116+
command_str = " ".join([str(program)] + args)
117+
118+
# Escapes all double quotes
119+
command_str = command_str.replace('"', '\\\\"')
120+
121+
# Wraps --output-files and --input-files arguments in double quotes
122+
command_str = re.sub(
123+
r'(--output-files) (\{.*\}) (--input-files) (\[.*?\])',
124+
lambda m: f'{m.group(1)} "{m.group(2)}" {m.group(3)} "{m.group(4)}"',
125+
command_str
126+
)
127+
128+
self.task_commands[task.task_id] = command_str
129+

0 commit comments

Comments
 (0)