Skip to content

Commit edbba86

Browse files
committed
#19: making WfCommons compatible with WfFormat v1.2
1 parent a8bddae commit edbba86

3 files changed

Lines changed: 15 additions & 5 deletions

File tree

wfcommons/common/workflow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from ..common.task import Task
1818
from ..version import __version__
1919

20-
from ..wfchef.utils import annotate, create_graph
20+
from ..wfchef.utils import create_graph
2121
import tempfile
2222

2323

@@ -54,7 +54,7 @@ def __init__(self,
5454
"""Create an object of a workflow representation."""
5555
self.description = description if description else 'Instance generated with WfCommons - https://wfcommons.org'
5656
self.created_at = str(datetime.utcnow().isoformat())
57-
self.schema_version = '1.0'
57+
self.schema_version = '1.2'
5858
self.wms_name = 'WfCommons' if not wms_name else wms_name
5959
self.wms_version = str(__version__) if not wms_version else wms_version
6060
self.wms_url = 'https://docs.wfcommons.org/en/v{}/'.format(__version__) if not wms_url else wms_url

wfcommons/wfinstances/logs/makeflow.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,13 @@ def _parse_workflow_file(self):
113113
inputs = line.split(':')[1].split()
114114

115115
for file in itertools.chain(outputs, inputs):
116-
if not file in self.files_map:
116+
if file not in self.files_map:
117117
self.files_map[file] = {'task_name': None, 'children': [], 'file': []}
118118

119119
elif len(line.strip()) > 0:
120120
# task execution command
121121
prefix = line.replace('./', '').replace('perl', '').strip().split()[1 if 'LOCAL' in line else 0]
122122
task_name = "{}_ID{:07d}".format(prefix, task_id_counter)
123-
task_id_counter += 1
124123

125124
# create list of task files
126125
list_files = []
@@ -130,14 +129,18 @@ def _parse_workflow_file(self):
130129
# create task
131130
args = ' '.join(line.replace('LOCAL', '').replace('perl', '').strip().split())
132131
task = Task(name=task_name,
132+
task_id="ID{:07d}".format(task_id_counter),
133+
category=prefix,
133134
task_type=TaskType.COMPUTE,
134135
runtime=0,
136+
program=prefix,
135137
args=args.split(),
136138
cores=1,
137139
files=list_files,
138140
logger=self.logger)
139141
self.workflow.add_node(task_name, task=task)
140142
self.args_map[args] = task
143+
task_id_counter += 1
141144

142145
# adding edges
143146
for file in self.files_map:
@@ -219,7 +222,7 @@ def _parse_resource_monitor_logs(self):
219222
# task
220223
task = self.args_map[data['command'].replace('perl', '').strip()]
221224
task.runtime = float(data['wall_time'][0])
222-
task.cores = int(data['cores'][0])
225+
task.cores = float(data['cores'][0])
223226
task.memory = int(data['memory'][0]) * 1000 # MB to KB
224227
task.bytes_read = int(data['bytes_read'][0] * 1000) # MB to KB
225228
task.bytes_written = int(data['bytes_written'][0] * 1000) # MB to KB

wfcommons/wfinstances/logs/pegasus.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ def _parse_workflow(self):
156156
task_name,
157157
task=Task(
158158
name=task_name,
159+
task_id=j['id'],
160+
category=j['name'],
159161
task_type=TaskType.COMPUTE,
160162
runtime=0,
161163
args=j['arguments'],
@@ -212,6 +214,8 @@ def _parse_dax(self):
212214
task_name,
213215
task=Task(
214216
name=task_name,
217+
task_id=str(j.get('id')),
218+
category=str(j.get('name')),
215219
task_type=TaskType.COMPUTE,
216220
runtime=0,
217221
args=[],
@@ -406,6 +410,8 @@ def _parse_job_output_latest(self, task, output_file):
406410
with open(tmp_file) as f:
407411
data = yaml.load(f, Loader=yaml.FullLoader)[0]
408412

413+
task.program = data['transformation']
414+
409415
if data['transformation'].startswith('pegasus:') or task.name.lower().startswith('chmod_'):
410416
task.type = TaskType.AUXILIARY
411417

@@ -486,6 +492,7 @@ def _parse_job_output_legacy(self, task, output_file):
486492
try:
487493
e = xml.etree.ElementTree.parse(output_file).getroot()
488494
# main job information
495+
task.program = e.get('transformation')
489496
if e.get('transformation').startswith('pegasus:') or task.name.lower().startswith('chmod_'):
490497
task.type = TaskType.AUXILIARY
491498

0 commit comments

Comments
 (0)