Skip to content

Commit 60ca14e

Browse files
Merge pull request #111 from wfcommons/create_data_footprint_bug
Simplified the data footprint specification for benchmark generation
2 parents 4305d1f + 02f7bea commit 60ca14e

2 files changed

Lines changed: 51 additions & 98 deletions

File tree

tests/wfbench/test_wfbench.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,6 @@ def test_create_from_recipe(self) -> None:
142142

143143
# Create the data_specification options
144144
fixed_total_footprint_in_mb = 5
145-
# TODO: This seems really broken right now
146-
# per_type_footprint = {}
147-
# for task_type in ["blastall", "split_fasta", None]:
148-
# per_type_footprint[task_type] = "1" # string???
149145

150146
for data_spec in [fixed_total_footprint_in_mb]:
151147
benchmark.create_benchmark(_create_fresh_local_dir(f"/tmp/benchmark"), cpu_work=1, data=data_spec, percent_cpu=0.6)

wfcommons/wfbench/bench.py

Lines changed: 51 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ def create_benchmark(self,
253253
cpu_work: Union[int, Dict[str, int]] = None,
254254
gpu_work: Union[int, Dict[str, int]] = None,
255255
time: Optional[int] = None,
256-
data: Optional[Union[int, Dict[str, str]]] = None,
256+
data: Optional[int] = 0,
257257
mem: Optional[float] = None,
258258
lock_files_folder: Optional[pathlib.Path] = None,
259259
regenerate: Optional[bool] = True,
@@ -271,7 +271,7 @@ def create_benchmark(self,
271271
:type gpu_work: Union[int, Dict[str, int]]
272272
:param time: Time limit for running each task (in seconds).
273273
:type time: Optional[int]
274-
:param data: Dictionary of input size files per workflow task type or total workflow data footprint (in MB).
274+
:param data: Total workflow data footprint (in MB).
275275
:type data: Optional[Union[int, Dict[str, str]]]
276276
:param mem: Maximum amount of memory consumption per task (in MB).
277277
:type mem: Optional[float]
@@ -317,7 +317,7 @@ def create_benchmark(self,
317317
task.input_files = []
318318
task.output_files = []
319319

320-
self._create_data_footprint(data, save_dir)
320+
self._create_data_footprint(data)
321321

322322
# TODO: add a flag to allow the file names to be changed
323323
workflow_input_files: List[File] = self._rename_files_to_wfbench_format()
@@ -439,48 +439,27 @@ def _generate_task_gpu_params(self, task: Task, gpu_work: Union[int, Dict[str, i
439439

440440
return [f"--gpu-work {_gpu_work}"]
441441

442-
def _create_data_footprint(self, data: Optional[Union[int, Dict[str, str]]], save_dir: pathlib.Path) -> None:
442+
443+
def _create_data_footprint(self, data: int) -> None:
443444
"""
444-
task's data footprint provided as individual data input size (JSON file)
445+
task's data footprint provided as an int
445446
"""
446-
if isinstance(data, dict):
447-
outputs = self._output_files(data)
448-
for task in self.workflow.tasks.values():
449-
outputs_file_size = {}
450-
for child, data_size in outputs[task.task_id].items():
451-
outputs_file_size[f"{task.task_id}_{child}_output.txt"] = data_size
452-
453-
task.args.extend([f"--output-files {outputs_file_size}"])
447+
num_sys_files, num_total_files = self._calculate_input_files()
448+
self.logger.debug(
449+
f"Number of input files to be created by the system: {num_sys_files}")
450+
self.logger.debug(
451+
f"Total number of files used by the workflow: {num_total_files}")
452+
file_size = round(data * 1000000 / num_total_files) # MB to B
453+
self.logger.debug(
454+
f"Every input/output file is of size: {file_size}")
454455

455-
self._add_output_files(outputs)
456-
self._add_input_files(outputs, data)
457-
self.logger.debug("Generating system files.")
458-
# self._generate_data_for_root_nodes(save_dir, data)
456+
for task in self.workflow.tasks.values():
457+
output = {f"{task.task_id}_output.txt": file_size}
458+
task.args.extend([f"--output-files {output}"])
459459

460-
# data footprint provided as an integer
461-
elif isinstance(data, int):
462-
num_sys_files, num_total_files = self._calculate_input_files()
463-
self.logger.debug(
464-
f"Number of input files to be created by the system: {num_sys_files}")
465-
self.logger.debug(
466-
f"Total number of files used by the workflow: {num_total_files}")
467-
file_size = round(data * 1000000 / num_total_files) # MB to B
468-
self.logger.debug(
469-
f"Every input/output file is of size: {file_size}")
460+
self._add_output_files(file_size)
461+
self._add_input_files(file_size)
470462

471-
for task in self.workflow.tasks.values():
472-
output = {f"{task.task_id}_output.txt": file_size}
473-
task.args.extend([f"--output-files {output}"])
474-
outputs = {}
475-
if self.workflow.tasks_children[task.task_id]:
476-
outputs.setdefault(task.task_id, {})
477-
for child in self.workflow.tasks_children[task.task_id]:
478-
outputs[task.task_id][child] = file_size
479-
480-
self._add_output_files(file_size)
481-
self._add_input_files(outputs, file_size)
482-
self.logger.debug("Generating system files.")
483-
# self._generate_data_for_root_nodes(save_dir, file_size)
484463

485464
def _output_files(self, data: Dict[str, str]) -> Dict[str, Dict[str, int]]:
486465
"""
@@ -525,78 +504,56 @@ def _calculate_input_files(self):
525504

526505
return tasks_need_input, total_num_files
527506

528-
def _add_output_files(self, output_files: Union[int, Dict[str, Dict[str, int]]]) -> None:
507+
def _add_output_files(self, output_file_size: int) -> None:
529508
"""
530509
Add output files when input data was offered by the user.
531510
532-
:param output_files:
533-
:type wf: Union[int, Dict[str, Dict[str, int]]]
511+
:param output_file_size: file size in MB
512+
:type output_file_size: int
534513
"""
535514
for task in self.workflow.tasks.values():
536-
if isinstance(output_files, Dict):
537-
for child, file_size in output_files[task.task_id].items():
538-
task.output_files.append(
539-
File(f"{task.task_id}_{child}_output.txt", file_size))
540-
elif isinstance(output_files, int):
541-
task.output_files.append(
542-
File(f"{task.task_id}_output.txt", output_files))
543-
544-
def _add_input_files(self, output_files: Dict[str, Dict[str, str]], data: Union[int, Dict[str, str]]) -> None:
515+
task.output_files.append(
516+
File(f"{task.task_id}_output.txt", output_file_size))
517+
518+
def _add_input_files(self, input_file_size: int) -> None:
545519
"""
546520
Add input files when input data was offered by the user.
547521
548-
:param output_files:
549-
:type wf: Dict[str, Dict[str, str]]
550-
:param data:
551-
:type data: Union[int, Dict[str, str]]
522+
:param input_file_size: a file size in MB
523+
:type input_file_size: int
552524
"""
553-
input_files = {}
554-
for parent, children in output_files.items():
555-
for child, file_size in children.items():
556-
input_files.setdefault(child, {})
557-
input_files[child][parent] = file_size
558-
559525
for task in self.workflow.tasks.values():
560526
inputs = []
561527
if not self.workflow.tasks_parents[task.task_id]:
562528
task.input_files.append(
563-
File(f"{task.task_id}_input.txt",
564-
data[task.category] if isinstance(
565-
data, Dict) else data))
529+
File(f"{task.task_id}_input.txt", input_file_size))
566530
inputs.append(f'{task.task_id}_input.txt')
567531
else:
568-
if isinstance(data, Dict):
569-
for parent, file_size in input_files[task.task_id].items():
570-
task.input_files.append(
571-
File(f"{parent}_{task.task_id}_output.txt", file_size))
572-
inputs.append(f"{parent}_{task.task_id}_output.txt")
573-
574-
elif isinstance(data, int):
575-
for parent in self.workflow.tasks_parents[task.task_id]:
576-
task.input_files.append(
577-
File(f"{parent}_output.txt", data))
578-
inputs.append(f"{parent}_output.txt")
532+
for parent in self.workflow.tasks_parents[task.task_id]:
533+
task.input_files.append(
534+
File(f"{parent}_output.txt", input_file_size))
535+
inputs.append(f"{parent}_output.txt")
579536

580537
task.args.append(f"--input-files {inputs}")
581538

582-
def _generate_data_for_root_nodes(self, save_dir: pathlib.Path, data: Union[int, Dict[str, str]]) -> None:
583-
"""
584-
Generate workflow's input data for root nodes based on user's input.
585-
586-
:param save_dir:
587-
:type save_dir: pathlib.Path
588-
:param data:
589-
:type data: Dict[str, str]
590-
"""
591-
for task in self.workflow.tasks.values():
592-
if not self.workflow.tasks_parents[task.task_id]:
593-
file_size = data[task.category] if isinstance(
594-
data, Dict) else data
595-
file = save_dir.joinpath(f"{task.task_id}_input.txt")
596-
if not file.is_file():
597-
with open(file, 'wb') as fp:
598-
fp.write(os.urandom(int(file_size)))
599-
self.logger.debug(f"Created file: {str(file)}")
539+
# def _generate_data_for_root_nodes(self, save_dir: pathlib.Path, data: Union[int, Dict[str, str]]) -> None:
540+
# """
541+
# Generate workflow's input data for root nodes based on user's input.
542+
#
543+
# :param save_dir:
544+
# :type save_dir: pathlib.Path
545+
# :param data:
546+
# :type data: Dict[str, str]
547+
# """
548+
# for task in self.workflow.tasks.values():
549+
# if not self.workflow.tasks_parents[task.task_id]:
550+
# file_size = data[task.category] if isinstance(
551+
# data, Dict) else data
552+
# file = save_dir.joinpath(f"{task.task_id}_input.txt")
553+
# if not file.is_file():
554+
# with open(file, 'wb') as fp:
555+
# fp.write(os.urandom(int(file_size)))
556+
# self.logger.debug(f"Created file: {str(file)}")
600557

601558
def generate_input_file(self, path: pathlib.Path) -> None:
602559
"""

0 commit comments

Comments
 (0)