Skip to content

Commit e73fbbd

Browse files
committed
#19: making WfCommons compatible with WfFormat v1.2
1 parent 1bb6027 commit e73fbbd

5 files changed

Lines changed: 30 additions & 8 deletions

File tree

wfcommons/common/task.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,15 @@ class Task:
3535
:param runtime: Task runtime in seconds.
3636
:type runtime: float
3737
:param cores: Number of cores required by the task.
38-
:type cores: int
38+
:type cores: float
39+
:param task_id: Job unique ID (e.g., ID0000001).
40+
:type task_id: str
41+
:param category: Job category (can be used, for example, to define jobs that use the same program).
42+
:type category: str
3943
:param machine: Machine on which is the task has been executed.
4044
:type machine: Machine
45+
:param program: Program name.
46+
:type program: str
4147
:param args: List of task arguments.
4248
:type args: List[str]
4349
:param avg_cpu: Average CPU utilization in %.
@@ -64,8 +70,11 @@ def __init__(self,
6470
name: str,
6571
task_type: TaskType,
6672
runtime: float,
67-
cores: int,
73+
cores: float = 1.0,
74+
task_id: Optional[str] = None,
75+
category: Optional[str] = None,
6876
machine: Optional[Machine] = None,
77+
program: Optional[str] = None,
6978
args: List[str] = [],
7079
avg_cpu: Optional[float] = None,
7180
bytes_read: Optional[int] = None,
@@ -82,7 +91,10 @@ def __init__(self,
8291
self.name: str = name
8392
self.type: TaskType = task_type
8493
self.runtime: float = runtime
85-
self.cores: Optional[int] = cores
94+
self.cores: Optional[float] = cores
95+
self.task_id: Optional[str] = task_id
96+
self.category: Optional[str] = category
97+
self.program: Optional[str] = program
8698
self.args: List[str] = args
8799
self.avg_cpu: Optional[float] = avg_cpu
88100
self.bytes_read: Optional[int] = bytes_read
@@ -112,6 +124,7 @@ def as_dict(self) -> Dict:
112124
'name': self.name,
113125
'type': self.type.value,
114126
'runtime': self.runtime,
127+
'command': {},
115128
'parents': [],
116129
'children': [],
117130
'files': task_files,
@@ -132,8 +145,10 @@ def as_dict(self) -> Dict:
132145
task_obj['avgPower'] = self.avg_power
133146
if self.priority:
134147
task_obj['priority'] = self.priority
148+
if self.program:
149+
task_obj['command']['program'] = self.program
135150
if self.args:
136-
task_obj['arguments'] = self.args
151+
task_obj['command']['arguments'] = self.args
137152
if self.machine:
138153
task_obj['machine'] = self.machine.name
139154

wfcommons/common/workflow.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from ..wfchef.utils import annotate, create_graph
2121
import tempfile
2222

23+
2324
class Workflow(nx.DiGraph):
2425
"""
2526
Representation of a workflow. The workflow representation is an extension of the
@@ -137,4 +138,4 @@ def write_dot(self, dot_filename: str = None) -> None:
137138
def to_nx_digraph(self) -> nx.DiGraph:
138139
with tempfile.NamedTemporaryFile() as temp:
139140
self.write_json(temp.name)
140-
return create_graph(temp.name)
141+
return create_graph(temp.name)

wfcommons/wfinstances/instance.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,20 @@ def __init__(self, input_instance: str, schema_file: Optional[str] = None, logge
117117
machine = task.get('machine', None)
118118
machine = None if machine is None else self.machines[machine]
119119

120+
# Fetch the command associated to this task
121+
command = task.get('command', None)
122+
120123
self.workflow.add_node(
121124
task['name'],
122125
task=Task(
123126
name=task['name'],
127+
task_id=task.get('id', None),
128+
category=task.get('category', None),
124129
task_type=TaskType(task['type']),
125130
runtime=task['runtime'],
126131
machine=machine,
127-
args=task.get('arguments', None),
132+
program=command.get('program', None) if command else None,
133+
args=command.get('arguments', None) if command else None,
128134
cores=task.get('cores', None),
129135
avg_cpu=task.get('avgCPU', None),
130136
bytes_read=task.get('bytesRead', None),

wfcommons/wfinstances/logs/makeflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ def _parse_workflow_file(self):
119119
elif len(line.strip()) > 0:
120120
# task execution command
121121
prefix = line.replace('./', '').replace('perl', '').strip().split()[1 if 'LOCAL' in line else 0]
122-
task_name = "{}_ID{:06d}".format(prefix, task_id_counter)
122+
task_name = "{}_ID{:07d}".format(prefix, task_id_counter)
123123
task_id_counter += 1
124124

125125
# create list of task files

wfcommons/wfinstances/schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def _load_schema(self, schema_file: Optional[str] = None):
7070
return json.loads(open(schema_path).read())
7171

7272
# fetching latest schema file from GitHub repository
73-
url = 'https://raw.githubusercontent.com/wfcommons/workflow-schema/master/wfcommons-schema.json'
73+
url = 'https://raw.githubusercontent.com/wfcommons/wfformat/master/wfcommons-schema.json'
7474
response = requests.get(url)
7575
schema = json.loads(response.content)
7676
with open(schema_path, 'w') as outfile:

0 commit comments

Comments
 (0)