Skip to content

Commit e89aa11

Browse files
Merging main
2 parents 393a428 + 8b1fdc9 commit e89aa11

18 files changed

Lines changed: 364 additions & 295 deletions

File tree

bin/wfbench

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,10 @@ def cpu_mem_benchmark(cpu_threads: Optional[int] = 5,
124124
os.sched_setaffinity(cpu_proc.pid, {core})
125125
cpu_procs.append(cpu_proc)
126126

127-
mem_proc = subprocess.Popen(mem_prog)
128-
if core:
129-
os.sched_setaffinity(mem_proc.pid, {core})
127+
if mem_threads > 0:
128+
mem_proc = subprocess.Popen(mem_prog)
129+
if core:
130+
os.sched_setaffinity(mem_proc.pid, {core})
130131

131132
return cpu_procs
132133

@@ -176,6 +177,7 @@ def io_write_benchmark_user_input_data_size(outputs, memory_limit=None):
176177
for file_name, file_size in outputs.items():
177178
print(f"[WfBench] Writing output file '{file_name}'\n")
178179
file_size_todo = file_size
180+
pathlib.Path(file_name).touch()
179181
while file_size_todo > 0:
180182
with open(file_name, "ab") as fp:
181183
chunk_size = min(file_size_todo, memory_limit)
@@ -216,8 +218,9 @@ def main():
216218
if core:
217219
print(f"[WfBench] {args.name} acquired core {core}")
218220

221+
mem_threads=int(10 - 10 * args.percent_cpu)
219222
cpu_procs = cpu_mem_benchmark(cpu_threads=int(10 * args.percent_cpu),
220-
mem_threads=int(10 - 10 * args.percent_cpu),
223+
mem_threads=mem_threads,
221224
cpu_work=sys.maxsize if args.time else int(args.cpu_work),
222225
core=core,
223226
total_mem=args.mem)
@@ -229,9 +232,10 @@ def main():
229232
else:
230233
for proc in cpu_procs:
231234
proc.wait()
232-
233-
mem_kill = subprocess.Popen(["killall", "stress-ng"])
234-
mem_kill.wait()
235+
236+
if mem_threads > 0:
237+
mem_kill = subprocess.Popen(["killall", "stress-ng"])
238+
mem_kill.wait()
235239
print("[WfBench] Completed CPU and Memory Benchmarks!\n")
236240

237241
if args.out:

wfcommons/common/file.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2020-2023 The WfCommons Team.
4+
# Copyright (c) 2020-2024 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
@@ -25,8 +25,8 @@ class FileLink(NoValue):
2525
class File:
2626
"""Representation of a file.
2727
28-
:param name: The name of the file.
29-
:type name: str
28+
:param file_id: The id of the file.
29+
:type file_id: str
3030
:param size: File size in bytes.
3131
:type size: int
3232
:param link: Type of file link.
@@ -35,11 +35,11 @@ class File:
3535
:type logger: Optional[Logger]
3636
"""
3737

38-
def __init__(self, name: str, size: int, link: FileLink, logger: Optional[Logger] = None) -> None:
38+
def __init__(self, file_id: str, size: int, link: FileLink, logger: Optional[Logger] = None) -> None:
3939
"""A file used by tasks."""
4040
self.logger: Logger = logger if logger else logging.getLogger(__name__)
4141

42-
self.name: str = name
42+
self.file_id: str = file_id
4343
self.size: int = size
4444
self.link: FileLink = link
4545

@@ -50,7 +50,6 @@ def as_dict(self) -> Dict[str, Union[str, int, FileLink]]:
5050
:rtype: Dict[str, Union[str, int, FileLink]]
5151
"""
5252
return {
53-
'link': self.link.value,
54-
'name': self.name,
53+
'id': self.file_id,
5554
'sizeInBytes': self.size
5655
}

wfcommons/common/machine.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33
#
4-
# Copyright (c) 2020-2023 The WfCommons Team.
4+
# Copyright (c) 2020-2024 The WfCommons Team.
55
#
66
# This program is free software: you can redistribute it and/or modify
77
# it under the terms of the GNU General Public License as published by
@@ -35,8 +35,9 @@ class Machine:
3535
.. code-block:: python
3636
3737
cpu = {
38-
'count': 48,
39-
'speed': 1200
38+
'coreCount': 48,
39+
'speedInMHz': 1200,
40+
'vendor': 'Vendor Name'
4041
}
4142
4243
:type cpu: Dict[str, Union[int, str]]
@@ -73,9 +74,9 @@ def __init__(self,
7374
self.release: str = release
7475
self.hashcode = hashcode
7576

76-
self.cpu_cores: int = cpu['count']
77-
self.cpu_speed: int = cpu['speed'] if 'speed' in cpu else 0
78-
self.cpu_flops: int = cpu['count'] * cpu['speed'] * 10 ^ 6 if 'speed' in cpu else 0
77+
self.cpu_cores: int = cpu['coreCount']
78+
self.cpu_speed: int = cpu['speedInMHz'] if 'speedInMHz' in cpu else 0
79+
self.cpu_flops: int = cpu['coreCount'] * cpu['speedInMHz'] * 10 ^ 6 if 'speedInMHz' in cpu else 0
7980
self.cpu_vendor: str = cpu['vendor'] if 'vendor' in cpu else None
8081

8182
self.logger.debug(f"created machine: {self.name} with {self.cpu_cores} cores and {self.cpu_flops} FLOPS.")
@@ -96,9 +97,9 @@ def as_dict(self) -> Dict[str, Union[int, str]]:
9697
if self.release:
9798
machine['release'] = self.release
9899
if self.cpu_cores:
99-
machine['cpu'] = {'count': self.cpu_cores}
100+
machine['cpu'] = {'coreCount': self.cpu_cores}
100101
if self.cpu_speed:
101-
machine['cpu']['speed'] = self.cpu_speed
102+
machine['cpu']['speedInMHz'] = self.cpu_speed
102103
if self.cpu_vendor:
103104
machine['cpu']['vendor'] = self.cpu_vendor
104105
return machine

wfcommons/common/task.py

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,20 @@ class Task:
3131
3232
:param name: The name of the task.
3333
:type name: str
34-
:param task_type: The type of the task.
35-
:type task_type: TaskType
34+
:param task_id: Task unique ID (e.g., ID0000001).
35+
:type task_id: str
3636
:param runtime: Task runtime in seconds.
3737
:type runtime: float
38+
:param input_files: List of input files used by the task.
39+
:type input_files: Optional[List[File]]
40+
:param output_files: List of output files used by the task.
41+
:type output_files: Optional[List[File]]
3842
:param cores: Number of cores required by the task.
3943
:type cores: float
40-
:param task_id: Task unique ID (e.g., ID0000001).
41-
:type task_id: Optional[str]
4244
:param category: Task category (can be used, for example, to define tasks that use the same program).
4345
:type category: Optional[str]
44-
:param machine: Machine on which is the task has been executed.
45-
:type machine: Optional[Machine]
46+
:param machines: Machines on which is the task has been executed.
47+
:type machines: Optional[List[Machine]]
4648
:param program: Program name.
4749
:type program: Optional[str]
4850
:param args: List of task arguments.
@@ -61,20 +63,21 @@ class Task:
6163
:type avg_power: Optional[float]
6264
:param priority: Task priority.
6365
:type priority: Optional[int]
64-
:param files: List of input/output files used by the task.
65-
:type files: Optional[List[File]]
66+
:param task_type: The type of the task.
67+
:type task_type: TaskType
6668
:param logger: The logger where to log information/warning or errors.
6769
:type logger: Optional[Logger]
6870
"""
6971

7072
def __init__(self,
7173
name: str,
72-
# task_type: TaskType,
74+
task_id: str,
7375
runtime: float,
76+
input_files: Optional[List[File]] = None,
77+
output_files: Optional[List[File]] = None,
7478
cores: float = 1.0,
75-
task_id: Optional[str] = None,
7679
category: Optional[str] = None,
77-
machine: Optional[Machine] = None,
80+
machines: Optional[List[Machine]] = None,
7881
program: Optional[str] = None,
7982
args: Optional[List[str]] = None,
8083
avg_cpu: Optional[float] = None,
@@ -84,19 +87,18 @@ def __init__(self,
8487
energy: Optional[int] = None,
8588
avg_power: Optional[float] = None,
8689
priority: Optional[int] = None,
87-
files: Optional[List[File]] = None,
88-
logger: Optional[Logger] = None,
90+
executedAt: Optional[str] = None,
91+
task_type: Optional[TaskType] = None,
8992
launch_dir: Optional[str] = None,
90-
start_time: Optional[str] = None,
93+
logger: Optional[Logger] = None,
9194
) -> None:
9295
"""A task in a workflow."""
9396
self.logger: Logger = logging.getLogger(
9497
__name__) if logger is None else logger
9598
self.name: str = name
96-
# self.type: TaskType = task_type
99+
self.task_id: str = task_id
97100
self.runtime: float = runtime
98101
self.cores: Optional[float] = cores
99-
self.task_id: Optional[str] = task_id
100102
self.category: Optional[str] = category
101103
self.program: Optional[str] = program
102104
self.args: List[str] = args if args else []
@@ -106,40 +108,45 @@ def __init__(self,
106108
self.memory: Optional[int] = memory
107109
self.energy: Optional[int] = energy
108110
self.avg_power: Optional[float] = avg_power
109-
self.files: List[File] = files if files else []
110-
self.machine: Machine = machine
111+
self.input_files: List[File] = input_files if input_files else []
112+
self.output_files: List[File] = output_files if output_files else []
113+
self.machines: Optional[List[Machine]] = machines
111114
self.priority: Optional[int] = priority
115+
self.type: Optional[TaskType] = task_type
112116
self.launch_dir: Optional[str] = launch_dir
113-
self.start_time: Optional[str] = str(datetime.now().astimezone().isoformat()) if not start_time else start_time
117+
self.start_time: Optional[str] = str(datetime.now().astimezone().isoformat()) if not executedAt else executedAt
114118
self.logger.debug(
115-
f"created {self.task_id} task {self.name}: runtime => {self.runtime} seconds.")
119+
f"created task {self.task_id}: runtime => {self.runtime} seconds.")
116120

117-
def as_dict(self) -> Dict:
118-
"""A JSON representation of the task.
121+
def specification_as_dict(self) -> Dict:
122+
"""A JSON representation of the task specification.
119123
120124
:return: A JSON object representation of the task.
121125
:rtype: Dict
122126
"""
123-
task_files = []
124-
for f in self.files:
125-
task_files.append(f.as_dict())
126-
127127
task_obj = {
128128
'name': self.name,
129-
# 'type': self.type.value,
130-
'command': {},
129+
'id': self.task_id,
131130
'parents': [],
132131
'children': [],
133-
'files': task_files,
132+
'input_files': [f.file_id for f in self.input_files],
133+
'output_files': [f.file_id for f in self.output_files]
134+
}
135+
return task_obj
136+
137+
def execution_as_dict(self) -> Dict:
138+
"""A JSON representation of the task execution.
139+
140+
:return: A JSON object representation of the task.
141+
:rtype: Dict
142+
"""
143+
task_obj = {
144+
'id': self.task_id,
145+
'runtimeInSeconds': self.runtime,
146+
'command': {}
134147
}
135-
if self.runtime is not None:
136-
task_obj['runtimeInSeconds'] = self.runtime
137148
if self.cores is not None:
138-
task_obj['cores'] = self.cores
139-
if self.task_id is not None:
140-
task_obj['id'] = self.task_id
141-
if self.category is not None:
142-
task_obj['category'] = self.category
149+
task_obj['coreCount'] = self.cores
143150
if self.avg_cpu is not None:
144151
task_obj['avgCPU'] = self.avg_cpu
145152
if self.bytes_read is not None:
@@ -149,19 +156,21 @@ def as_dict(self) -> Dict:
149156
if self.memory is not None:
150157
task_obj['memoryInBytes'] = self.memory
151158
if self.energy is not None:
152-
task_obj['energy'] = self.energy
159+
task_obj['energyInKWh'] = self.energy
153160
if self.avg_power is not None:
154-
task_obj['avgPower'] = self.avg_power
161+
task_obj['avgPowerInKWh'] = self.avg_power
155162
if self.priority is not None:
156163
task_obj['priority'] = self.priority
157164
if self.program is not None:
158165
task_obj['command']['program'] = self.program
159166
if self.args is not None:
160167
task_obj['command']['arguments'] = self.args
161-
if self.machine is not None:
162-
task_obj['machine'] = self.machine.name
168+
if self.machines is not None:
169+
task_obj['machines'] = [m.name for m in self.machines]
170+
if self.start_time:
171+
task_obj['executedAt'] = self.start_time
172+
if self.category is not None:
173+
task_obj['category'] = self.category
163174
if self.launch_dir:
164175
task_obj['launchDir'] = self.launch_dir
165-
if self.start_time:
166-
task_obj['startedAt'] = self.start_time
167176
return task_obj

0 commit comments

Comments
 (0)