Skip to content

Commit e0d5dea

Browse files
Merge pull request #130 from wfcommons/makeflow_logger
Update of the Makeflow logger
2 parents 321fc26 + dda0ab3 commit e0d5dea

4 files changed

Lines changed: 83 additions & 53 deletions

File tree

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
pip install .
3838
3939
- name: Run tests
40-
run: python3 -m pytest -s -v -m unit --cov=wfcommons tests/
40+
run: python3 -m pytest -s -v -m unit --cov=wfcommons tests
4141

4242
- name: Upload coverage
4343
if: github.ref == 'refs/heads/main'

tests/test_helpers.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,10 @@ def _install_WfCommons_on_container(container):
3939
target_path = '/tmp/' # inside container
4040
tar_data = _make_tarfile_of_wfcommons()
4141
container.put_archive(target_path, tar_data)
42-
# Cleanup files from the host
42+
# Cleanup files that came from the host
4343
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/build/", stdout=True, stderr=True)
4444
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/", stdout=True, stderr=True)
45+
# Clean up and force a rebuild of cpu-benchmark (because it may be compiled for the wrong architecture)
4546
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o", stdout=True,
4647
stderr=True)
4748
exit_code, output = container.exec_run("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark", stdout=True,
@@ -50,6 +51,7 @@ def _install_WfCommons_on_container(container):
5051
# Install WfCommons on the container (to install wfbench and cpu-benchmark really)
5152
exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages",
5253
workdir="/tmp/WfCommons", stdout=True, stderr=True)
54+
# print(output.decode())
5355
if exit_code != 0:
5456
raise RuntimeError("Failed to install WfCommons on the container")
5557

@@ -88,13 +90,19 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=
8890
stdout=True, stderr=True)
8991
if exit_code != 0:
9092
raise RuntimeError("Failed to copy wfbench script to the bin directory")
93+
9194
exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which cpu-benchmark` " + bin_dir],
9295
stdout=True, stderr=True)
9396
if exit_code != 0:
9497
raise RuntimeError("Failed to copy cpu-benchmark executable to the bin directory")
9598
else:
9699
sys.stderr.write(f"[{backend}] Not Copying wfbench and cpu-benchmark...\n")
97100

101+
# Change file permissions
102+
exit_code, output = container.exec_run(["sh", "-c", "sudo chown -R wfcommons:wfcommons "],
103+
stdout=True, stderr=True)
104+
105+
98106
container.backend = backend
99107
return container
100108

@@ -123,16 +131,16 @@ def _get_total_size_of_directory(directory_path: str):
123131
total_size += os.path.getsize(filepath)
124132
return total_size
125133

126-
def _compare_workflows(workflow1: Workflow, workflow_2: Workflow):
134+
def _compare_workflows(workflow_1: Workflow, workflow_2: Workflow):
127135

128136
# Test the number of tasks
129-
assert (len(workflow1.tasks) == len(workflow_2.tasks))
137+
assert (len(workflow_1.tasks) == len(workflow_2.tasks))
130138
# Test the task graph topology
131-
assert (networkx.is_isomorphic(workflow1, workflow_2))
139+
assert (networkx.is_isomorphic(workflow_1, workflow_2))
132140
# Test the total file size sum
133141
workflow1_input_bytes, workflow2_input_bytes = 0, 0
134142
workflow1_output_bytes, workflow2_output_bytes = 0, 0
135-
for workflow1_task, workflow2_task in zip(workflow1.tasks.values(), workflow_2.tasks.values()):
143+
for workflow1_task, workflow2_task in zip(workflow_1.tasks.values(), workflow_2.tasks.values()):
136144
# sys.stderr.write(f"WORKFLOW1: {workflow1_task.task_id} WORKFLOW2 TASK: {workflow2_task.task_id}\n")
137145
for input_file in workflow1_task.input_files:
138146
# sys.stderr.write(f"WORKFLOW1 INPUT FILE: {input_file.file_id} {input_file.size}\n")

tests/translators_loggers/test_translators_loggers.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import json
1616
import time
1717
import re
18+
import os
1819

1920
from tests.test_helpers import _create_fresh_local_dir
2021
from tests.test_helpers import _remove_local_dir_if_it_exists
@@ -38,6 +39,7 @@
3839

3940
from wfcommons.wfinstances import PegasusLogsParser
4041
from wfcommons.wfinstances.logs import TaskVineLogsParser
42+
from wfcommons.wfinstances.logs import MakeflowLogsParser
4143

4244

4345
def _create_workflow_benchmark() -> (WorkflowBenchmark, int):
@@ -165,7 +167,7 @@ def run_workflow_taskvine(container, num_tasks, str_dirpath):
165167

166168
def run_workflow_makeflow(container, num_tasks, str_dirpath):
167169
# Run the workflow (with full logging)
168-
exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && makeflow --log-verbose ./workflow.makeflow"], stdout=True, stderr=True)
170+
exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && makeflow --log-verbose --monitor=./monitor_data/ ./workflow.makeflow"], stdout=True, stderr=True)
169171
# Check sanity
170172
assert (exit_code == 0)
171173
num_completed_jobs = len(re.findall(r'job \d+ completed', output.decode()))
@@ -257,6 +259,12 @@ def test_translator(self, backend) -> None:
257259
translator = translator_classes[backend](benchmark.workflow)
258260
translator.translate(output_folder=dirpath)
259261

262+
# # Make the directory that holds the translation world-writable,
263+
# # so that docker commands won't fail
264+
# TODO: Explore whether this below makes tests runnable on Linux due to
265+
# different Docker permission schemes, etc.
266+
# os.chmod(dirpath, 0o777)
267+
260268
# Start the Docker container
261269
container = _start_docker_container(backend, str_dirpath, str_dirpath, str_dirpath + "bin/")
262270

@@ -274,6 +282,8 @@ def test_translator(self, backend) -> None:
274282
parser = PegasusLogsParser(dirpath / "work/wfcommons/pegasus/Blast-Benchmark/run0001/")
275283
elif backend == "taskvine":
276284
parser = TaskVineLogsParser(dirpath / "vine-run-info/most-recent/vine-logs", filenames_to_ignore=["cpu-benchmark","stress-ng", "wfbench"])
285+
elif backend == "makeflow":
286+
parser = MakeflowLogsParser(execution_dir = dirpath, resource_monitor_logs_dir = dirpath / "monitor_data/")
277287
else:
278288
parser = None
279289

wfcommons/wfinstances/logs/makeflow.py

Lines changed: 58 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ class MakeflowLogsParser(LogsParser):
2828
"""
2929
Parse Makeflow submit directory to generate workflow instance.
3030
31-
:param execution_dir: Makeflow workflow execution directory (contains .mf and .makeflowlog files).
31+
:param execution_dir: Makeflow workflow execution directory (contains .mf/.makeflow and .makeflowlog files).
3232
:type execution_dir: pathlib.Path
33-
:param resource_monitor_logs_dir: Resource Monitor log files directory.
33+
:param resource_monitor_logs_dir: Resource Monitor log files directory (created with `makeflow ----monitor=... ...`)
3434
:type resource_monitor_logs_dir: pathlib.Path
3535
:param description: Workflow instance description.
3636
:type description: Optional[str]
@@ -46,28 +46,38 @@ def __init__(self,
4646
"""Create an object of the makeflow log parser."""
4747
super().__init__('Makeflow', 'http://ccl.cse.nd.edu/software/makeflow/', description, logger)
4848

49-
# Sanity check
49+
# Sanity checks
5050
if not execution_dir.is_dir():
5151
raise OSError(f'The provided path does not exist or is not a folder: {execution_dir}')
52+
if not resource_monitor_logs_dir.is_dir():
53+
raise OSError(f'The provided path does not exist or is not a folder: {resource_monitor_logs_dir}')
5254

55+
# Makeflow file
5356
files: List[pathlib.Path] = list(execution_dir.glob('*.mf'))
57+
if len(files) > 1:
58+
raise OSError(f'Multiple .mf files in: {execution_dir}')
5459
if len(files) == 0:
55-
raise OSError(f'Unable to find .mf file in: {execution_dir}')
60+
files: List[pathlib.Path] = list(execution_dir.glob('*.makeflow'))
61+
if len(files) > 1:
62+
raise OSError(f'Multiple .makeflow files in: {execution_dir}')
63+
if len(files) == 0:
64+
raise OSError(f'Unable to find a .mf or .makeflow file in: {execution_dir}')
5665
self.mf_file: pathlib.Path = files[0]
5766

67+
# Log file
5868
files = list(execution_dir.glob('*.makeflowlog'))
5969
if len(files) == 0:
6070
raise OSError(f'Unable to find .makeflowlog file in: {execution_dir}')
71+
if len(files) > 1:
72+
raise OSError(f'Multiple .makeflowlog files in: {execution_dir}')
6173
self.mf_log_file: pathlib.Path = files[0]
74+
if self.mf_log_file.read_text().count("# NODE") == 0:
75+
raise OSError(f'Not sufficiently verbose log file {self.mf_log_file}. Re-run the workflow with `makeflow --log-verbose ...`')
6276

63-
if not resource_monitor_logs_dir.is_dir():
64-
raise OSError(f'The provided path does not exist or is not a folder: {resource_monitor_logs_dir}')
65-
66-
self.execution_dir: pathlib.Path = execution_dir
67-
68-
self.resource_monitor_logs_dir: pathlib.Path = resource_monitor_logs_dir
69-
self.files_map = {}
70-
self.args_map = {}
77+
self._execution_dir: pathlib.Path = execution_dir
78+
self._resource_monitor_logs_dir: pathlib.Path = resource_monitor_logs_dir
79+
self._files_map = {}
80+
self._args_map = {}
7181

7282
def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
7383
"""
@@ -106,46 +116,48 @@ def _parse_workflow_file(self) -> None:
106116
outputs = []
107117
inputs = []
108118
for line in f:
109-
if ':' in line:
119+
# print(f"Processing line: {line}")
120+
if line.lstrip().startswith('#'):
121+
continue
122+
if ':' in line and '\t' not in line:
110123
outputs = line.split(':')[0].split()
111124
inputs = line.split(':')[1].split()
112125

113126
for file in itertools.chain(outputs, inputs):
114-
if file not in self.files_map:
115-
self.files_map[file] = {'task_name': None, 'children': [], 'file': []}
127+
if file not in self._files_map:
128+
self._files_map[file] = {'task_name': None, 'children': [], 'file': []}
116129

117-
elif len(line.strip()) > 0:
118-
# task execution command
119-
prefix = line.replace('./', '').replace('perl', '').strip().split()[1 if 'LOCAL' in line else 0]
120-
task_name = "{}_ID{:07d}".format(prefix, task_id_counter)
130+
elif '\t' in line:
131+
# task execution command (likely olf here)
132+
prefix = line.replace('./', '').strip().split()[1 if 'LOCAL' in line else 0]
133+
task_name = "ID{:07d}".format(task_id_counter)
121134

122-
# create list of task files
123-
list_files = []
135+
# create list of input and output files
124136
output_files = self._create_files(outputs, "output", task_name)
125137
input_files = self._create_files(inputs, "input", task_name)
126138

127139
# create task
128-
args = ' '.join(line.replace('LOCAL', '').replace('perl', '').strip().split())
140+
args = ' '.join(line.split())
129141
task = Task(name=task_name,
130-
task_id="ID{:07d}".format(task_id_counter),
142+
task_id=task_name,
131143
category=prefix,
132-
task_type=TaskType.COMPUTE,
133144
runtime=0,
134145
program=prefix,
135146
args=args.split(),
136147
cores=1,
137148
input_files=input_files,
138149
output_files=output_files,
139150
logger=self.logger)
140-
self.workflow.add_node(task_name, task=task)
141-
self.args_map[args] = task
151+
self.workflow.add_task(task)
152+
args = args.replace('\\\\', '\\')
153+
self._args_map[args] = task
142154
task_id_counter += 1
143155

144156
# adding edges
145-
for file in self.files_map:
146-
for child in self.files_map[file]['children']:
147-
if self.files_map[file]['task_name']:
148-
self.workflow.add_edge(self.files_map[file]['task_name'], child)
157+
for file in self._files_map:
158+
for child in self._files_map[file]['children']:
159+
if self._files_map[file]['task_name']:
160+
self.workflow.add_edge(self._files_map[file]['task_name'], child)
149161

150162
def _create_files(self, files_list: List[str], input_or_output: str, task_name: str) -> List[File]:
151163
"""
@@ -163,16 +175,16 @@ def _create_files(self, files_list: List[str], input_or_output: str, task_name:
163175
"""
164176
list_files = []
165177
for file in files_list:
166-
if self.files_map[file]['file']:
178+
if self._files_map[file]['file']:
167179
list_files.append(
168-
self.files_map[file]['file'][0] if input_or_output == "input" else self.files_map[file]['file'][1])
180+
self._files_map[file]['file'][0] if input_or_output == "input" else self._files_map[file]['file'][1])
169181
else:
170182
size = 0
171-
file_path = self.execution_dir.joinpath(file)
183+
file_path = self._execution_dir.joinpath(file)
172184
if file_path.is_dir():
173-
size = sum(math.ceil(f.stat().st_size / 1000) for f in file_path.glob("*") if f.is_file())
185+
size = sum(f.stat().st_size for f in file_path.glob("*") if f.is_file())
174186
elif file_path.is_file():
175-
size = int(math.ceil(file_path.stat().st_size / 1000)) # B to KB
187+
size = int(file_path.stat().st_size)
176188

177189
file_obj_in = File(file_id=file,
178190
size=size,
@@ -181,13 +193,13 @@ def _create_files(self, files_list: List[str], input_or_output: str, task_name:
181193
size=size,
182194
logger=self.logger)
183195
list_files.append(file_obj_in if input_or_output == "input" else file_obj_out)
184-
self.files_map[file]['file'].extend([file_obj_in, file_obj_out])
196+
self._files_map[file]['file'].extend([file_obj_in, file_obj_out])
185197

186198
# files dependencies
187199
if input_or_output == "input":
188-
self.files_map[file]['children'].append(task_name)
200+
self._files_map[file]['children'].append(task_name)
189201
else:
190-
self.files_map[file]['task_name'] = task_name
202+
self._files_map[file]['task_name'] = task_name
191203

192204
return list_files
193205

@@ -208,24 +220,24 @@ def _parse_makeflow_log_file(self):
208220

209221
elif line.startswith('# FILE') and 'condorlog' not in line:
210222
file_name = line.split()[3]
211-
if file_name in self.files_map:
212-
size = int(math.ceil(int(line.split()[5]) / 1000)) # B to KB
213-
for file_obj in self.files_map[file_name]['file']:
223+
if file_name in self._files_map:
224+
size = int(line.split()[5])
225+
for file_obj in self._files_map[file_name]['file']:
214226
file_obj.size = size
215227

216228
def _parse_resource_monitor_logs(self):
217229
"""Parse the log files produced by resource monitor"""
218-
for file in pathlib.Path.glob(f'{self.resource_monitor_logs_dir}/*.summary'):
230+
for file in self._resource_monitor_logs_dir.glob("*.summary"):
219231
with open(file) as f:
220232
data = json.load(f)
221233

222234
# task
223-
task = self.args_map[data['command'].replace('perl', '').strip()]
235+
task = self._args_map[data['command'].replace('perl', '').strip()]
224236
task.runtime = float(data['wall_time'][0])
225237
task.cores = float(data['cores'][0])
226-
task.memory = int(data['memory'][0]) * 1000 # MB to KB
227-
task.bytes_read = int(data['bytes_read'][0] * 1000) # MB to KB
228-
task.bytes_written = int(data['bytes_written'][0] * 1000) # MB to KB
238+
task.memory = int(data['memory'][0])
239+
task.bytes_read = int(data['bytes_read'][0])
240+
task.bytes_written = int(data['bytes_written'][0])
229241
task.avg_cpu = float('%.4f' % (float(data['cpu_time'][0]) / float(data['wall_time'][0]) * 100))
230242
task.machine = Machine(name=data['host'],
231243
cpu={'coreCount': int(data['machine_cpus'][0]), 'speedInMHz': 0, 'vendor': ''},

0 commit comments

Comments
 (0)