Skip to content

Commit 9c30050

Browse files
committed
#22: using pathlib to handle files and directories
1 parent 4abe254 commit 9c30050

6 files changed

Lines changed: 146 additions & 143 deletions

File tree

wfcommons/utils.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ def __repr__(self):
2828

2929

3030
def read_json(instance_filename: str) -> Dict[str, Any]:
31-
"""Read the JSON from the file path.
31+
"""
32+
Read the JSON from the file path.
3233
3334
:param instance_filename: The absolute path of the instance file.
3435
:type instance_filename: str
@@ -41,7 +42,8 @@ def read_json(instance_filename: str) -> Dict[str, Any]:
4142

4243

4344
def best_fit_distribution(data: List[float], logger: Optional[Logger] = None) -> Tuple:
44-
"""Fit a list of values to a distribution.
45+
"""
46+
Fit a list of values to a distribution.
4547
4648
:param data: List of values to be fitted to a distribution.
4749
:type data: List[float]
@@ -95,12 +97,13 @@ def best_fit_distribution(data: List[float], logger: Optional[Logger] = None) ->
9597
except Exception as e:
9698
print(f"WARNING: distribution \"{dist_name}\" failed ({e})")
9799

98-
logger.debug('Best distribution fit: {}'.format(best_distribution))
100+
logger.debug(f'Best distribution fit: {best_distribution}')
99101
return best_distribution, best_params
100102

101103

102104
def generate_rvs(distribution: Dict, min_value: float, max_value: float) -> float:
103-
"""Generate a random variable from a distribution.
105+
"""
106+
Generate a random variable from a distribution.
104107
105108
:param distribution: Distribution dictionary (name and parameters).
106109
:type distribution: Dict
@@ -122,7 +125,8 @@ def generate_rvs(distribution: Dict, min_value: float, max_value: float) -> floa
122125

123126

124127
def ncr(n: int, r: int) -> int:
125-
"""Calculate the number of combinations.
128+
"""
129+
Calculate the number of combinations.
126130
127131
:param n: The number of items.
128132
:type n: int

wfcommons/wfgen/abstract_recipe.py

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from abc import ABC, abstractmethod
1515
from os import path
1616
from logging import Logger
17-
from typing import Any, Dict, List, Optional, Set
17+
from typing import Any, Dict, List, Optional
1818

1919
from wfcommons.common.file import File, FileLink
2020
from wfcommons.common.task import Task, TaskType
@@ -23,22 +23,23 @@
2323

2424

2525
class WorkflowRecipe(ABC):
26-
"""An abstract class of workflow recipes for creating synthetic workflow instances.
26+
"""
27+
An abstract class of workflow recipes for creating synthetic workflow instances.
2728
2829
:param name: The workflow recipe name.
2930
:type name: str
3031
:param data_footprint: The upper bound for the workflow total data footprint (in bytes).
31-
:type data_footprint: int
32+
:type data_footprint: Optional[int]
3233
:param num_tasks: The upper bound for the total number of tasks in the workflow.
33-
:type num_tasks: int
34+
:type num_tasks: Optional[int]
3435
:param runtime_factor: The factor of which tasks runtime will be increased/decreased.
35-
:type runtime_factor: float
36+
:type runtime_factor: Optional[float]
3637
:param input_file_size_factor: The factor of which tasks input files size will be increased/decreased.
37-
:type input_file_size_factor: float
38+
:type input_file_size_factor: Optional[float]
3839
:param output_file_size_factor: The factor of which tasks output files size will be increased/decreased.
39-
:type output_file_size_factor: float
40+
:type output_file_size_factor: Optional[float]
4041
:param logger: The logger where to log information/warning or errors (optional).
41-
:type logger: Logger
42+
:type logger: Optional[Logger]
4243
"""
4344

4445
def __init__(self, name: str,
@@ -57,23 +58,24 @@ def __init__(self, name: str,
5758
if output_file_size_factor <= 0.0:
5859
raise ValueError("The output file size factor should be a number higher than 0.0.")
5960

60-
self.logger = logging.getLogger(__name__) if logger is None else logger
61+
self.logger: Optional[Logger] = logging.getLogger(__name__) if logger is None else logger
6162
self.name = name
62-
self.data_footprint = data_footprint
63-
self.num_tasks = num_tasks
64-
self.runtime_factor = runtime_factor
65-
self.input_file_size_factor = input_file_size_factor
66-
self.output_file_size_factor = output_file_size_factor
63+
self.data_footprint: Optional[int] = data_footprint
64+
self.num_tasks: Optional[int] = num_tasks
65+
self.runtime_factor: Optional[float] = runtime_factor
66+
self.input_file_size_factor: Optional[float] = input_file_size_factor
67+
self.output_file_size_factor: Optional[float] = output_file_size_factor
6768
self.tasks_files: Dict[str, List[File]] = {}
6869
self.tasks_files_names: Dict[str, List[str]] = {}
69-
self.task_id_counter = 1
70+
self.task_id_counter: int = 1
7071
self.tasks_map = {}
7172
self.tasks_children = {}
7273
self.tasks_parents = {}
7374

7475
@abstractmethod
7576
def _workflow_recipe(self) -> Dict[str, Any]:
76-
"""Recipe for generating synthetic instances for a workflow. Recipes can be
77+
"""
78+
Recipe for generating synthetic instances for a workflow. Recipes can be
7779
generated by using the :class:`~wfcommons.wfinstances.instance_analyzer.InstanceAnalyzer`.
7880
7981
:return: A recipe in the form of a dictionary in which keys are task prefixes.
@@ -96,11 +98,11 @@ def from_num_tasks(cls,
9698
:param num_tasks: The upper bound for the total number of tasks in the workflow.
9799
:type num_tasks: int
98100
:param runtime_factor: The factor of which tasks runtime will be increased/decreased.
99-
:type runtime_factor: float
101+
:type runtime_factor: Optional[float]
100102
:param input_file_size_factor: The factor of which tasks input files size will be increased/decreased.
101-
:type input_file_size_factor: float
103+
:type input_file_size_factor: Optional[float]
102104
:param output_file_size_factor: The factor of which tasks output files size will be increased/decreased.
103-
:type output_file_size_factor: float
105+
:type output_file_size_factor: Optional[float]
104106
105107
:return: A workflow recipe object that will generate synthetic workflows up to
106108
the total number of tasks provided.
@@ -110,18 +112,20 @@ def from_num_tasks(cls,
110112

111113
@abstractmethod
112114
def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
113-
"""Generate a synthetic workflow instance.
115+
"""
116+
Generate a synthetic workflow instance.
114117
115118
:param workflow_name: The workflow name
116-
:type workflow_name: int
119+
:type workflow_name: Optional[str]
117120
118121
:return: A synthetic workflow instance object.
119122
:rtype: Workflow
120123
"""
121124
raise NotImplementedError
122125

123126
def _generate_task(self, task_name: str, task_id: str) -> Task:
124-
"""Generate a synthetic task.
127+
"""
128+
Generate a synthetic task.
125129
126130
:param task_name: task name.
127131
:type task_name: str
@@ -165,7 +169,8 @@ def _generate_task(self, task_name: str, task_id: str) -> Task:
165169
return task
166170

167171
def _generate_task_name(self, prefix: str) -> str:
168-
"""Generate a task name from a prefix appended with an ID.
172+
"""
173+
Generate a task name from a prefix appended with an ID.
169174
170175
:param prefix: task prefix.
171176
:type prefix: str
@@ -178,7 +183,8 @@ def _generate_task_name(self, prefix: str) -> str:
178183
return task_name
179184

180185
def _generate_task_files(self, task: Task) -> List[File]:
181-
"""Generate input and output files for a task.
186+
"""
187+
Generate input and output files for a task.
182188
183189
:param task: task object.
184190
:type task: Task
@@ -211,7 +217,8 @@ def _generate_task_files(self, task: Task) -> List[File]:
211217
return output_files_list
212218

213219
def _generate_files(self, task_id: str, recipe: Dict[str, Any], link: FileLink) -> List[File]:
214-
"""Generate files for a specific task ID.
220+
"""
221+
Generate files for a specific task ID.
215222
216223
:param task_id: task ID.
217224
:type task_id: str
@@ -242,7 +249,8 @@ def _generate_files(self, task_id: str, recipe: Dict[str, Any], link: FileLink)
242249
return files_list
243250

244251
def _generate_file(self, extension: str, recipe: Dict[str, Any], link: FileLink) -> File:
245-
"""Generate a file according to a file recipe.
252+
"""
253+
Generate a file according to a file recipe.
246254
247255
:param extension:
248256
:type extension: str
@@ -263,7 +271,8 @@ def _generate_file(self, extension: str, recipe: Dict[str, Any], link: FileLink)
263271
size=size)
264272

265273
def _get_files_by_task_and_link(self, task_id: str, link: FileLink) -> List[File]:
266-
"""Get the list of files for a task ID and link type.
274+
"""
275+
Get the list of files for a task ID and link type.
267276
268277
:param task_id: task ID.
269278
:type task_id: str

wfcommons/wfgen/generator.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class WorkflowGenerator:
2525
:param workflow_recipe: The workflow recipe to be used for this generator.
2626
:type workflow_recipe: WorkflowRecipe
2727
:param logger: The logger where to log information/warning or errors (optional).
28-
:type logger: Logger
28+
:type logger: Optional[Logger]
2929
"""
3030

3131
def __init__(self, workflow_recipe: WorkflowRecipe, logger: Optional[Logger] = None) -> None:
@@ -44,15 +44,14 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
4444
the generator.
4545
4646
:param workflow_name: The workflow name.
47-
:type workflow_name: str
47+
:type workflow_name: Optional[str]
4848
4949
:return: A synthetic workflow instance object.
5050
:rtype: Workflow
5151
"""
5252
workflow: Workflow = self.workflow_recipe.build_workflow(workflow_name)
5353
self.workflows.append(workflow)
54-
self.logger.info(
55-
"Generated a synthetic workflow with {} tasks".format(len(workflow.nodes)))
54+
self.logger.info(f"Generated a synthetic workflow with {len(workflow.nodes)} tasks")
5655
return workflow
5756

5857
def build_workflows(self, num_workflows: int) -> List[Workflow]:

wfcommons/wfinstances/logs/makeflow.py

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@
88
# the Free Software Foundation, either version 3 of the License, or
99
# (at your option) any later version.
1010

11-
import glob
1211
import json
1312
import itertools
1413
import math
15-
import os
14+
import pathlib
1615

1716
from datetime import datetime
1817
from logging import Logger
@@ -30,44 +29,43 @@ class MakeflowLogsParser(LogsParser):
3029
Parse Makeflow submit directory to generate workflow instance.
3130
3231
:param execution_dir: Makeflow workflow execution directory (contains .mf and .makeflowlog files).
33-
:type execution_dir: str
32+
:type execution_dir: pathlib.Path
3433
:param resource_monitor_logs_dir: Resource Monitor log files directory.
35-
:type resource_monitor_logs_dir: str
34+
:type resource_monitor_logs_dir: pathlib.Path
3635
:param description: Workflow instance description.
37-
:type description: str
36+
:type description: Optional[str]
3837
:param logger: The logger where to log information/warning or errors (optional).
39-
:type logger: Logger
38+
:type logger: Optional[Logger]
4039
"""
4140

4241
def __init__(self,
43-
execution_dir: str,
44-
resource_monitor_logs_dir: str,
42+
execution_dir: pathlib.Path,
43+
resource_monitor_logs_dir: pathlib.Path,
4544
description: Optional[str] = None,
4645
logger: Optional[Logger] = None) -> None:
4746
"""Create an object of the makeflow log parser."""
4847
super().__init__('Makeflow', 'http://ccl.cse.nd.edu/software/makeflow/', description, logger)
4948

5049
# Sanity check
51-
if not os.path.isdir(execution_dir):
52-
raise OSError('The provided path does not exist or is not a folder: {}'.format(execution_dir))
50+
if not execution_dir.is_dir():
51+
raise OSError(f'The provided path does not exist or is not a folder: {execution_dir}')
5352

54-
files = glob.glob('{}/*.mf'.format(execution_dir))
53+
files: List[pathlib.Path] = list(execution_dir.glob('*.mf'))
5554
if len(files) == 0:
56-
raise OSError('Unable to find .mf file in: {}'.format(execution_dir))
57-
self.mf_file = files[0]
55+
raise OSError(f'Unable to find .mf file in: {execution_dir}')
56+
self.mf_file: pathlib.Path = files[0]
5857

59-
files = glob.glob('{}/*.makeflowlog'.format(execution_dir))
58+
files = list(execution_dir.glob('*.makeflowlog'))
6059
if len(files) == 0:
61-
raise OSError('Unable to find .makeflowlog file in: {}'.format(execution_dir))
62-
self.mf_log_file = files[0]
60+
raise OSError(f'Unable to find .makeflowlog file in: {execution_dir}')
61+
self.mf_log_file: pathlib.Path = files[0]
6362

64-
if not os.path.isdir(resource_monitor_logs_dir):
65-
raise OSError('The provided path does not exist or is not a folder: {}'.format(resource_monitor_logs_dir))
63+
if not resource_monitor_logs_dir.is_dir():
64+
raise OSError(f'The provided path does not exist or is not a folder: {resource_monitor_logs_dir}')
6665

67-
self.execution_dir = execution_dir
66+
self.execution_dir: pathlib.Path = execution_dir
6867

69-
# self.mf_log_file = mf_log_file
70-
self.resource_monitor_logs_dir = resource_monitor_logs_dir
68+
self.resource_monitor_logs_dir: pathlib.Path = resource_monitor_logs_dir
7169
self.files_map = {}
7270
self.args_map = {}
7371

@@ -76,7 +74,7 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
7674
Create workflow instance based on the workflow execution logs.
7775
7876
:param workflow_name: The workflow name.
79-
:type workflow_name: str
77+
:type workflow_name: Optional[str]
8078
8179
:return: A workflow instance object.
8280
:rtype: Workflow
@@ -100,7 +98,7 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
10098

10199
return self.workflow
102100

103-
def _parse_workflow_file(self):
101+
def _parse_workflow_file(self) -> None:
104102
"""Parse the makeflow workflow file and build the workflow structure."""
105103
task_id_counter = 1
106104

@@ -148,8 +146,9 @@ def _parse_workflow_file(self):
148146
if self.files_map[file]['task_name']:
149147
self.workflow.add_edge(self.files_map[file]['task_name'], child)
150148

151-
def _create_files(self, files_list: List[str], link: FileLink, task_name: str):
152-
""" Create a list of files objects.
149+
def _create_files(self, files_list: List[str], link: FileLink, task_name: str) -> List[File]:
150+
"""
151+
Create a list of files objects.
153152
154153
:param files_list: list of file names.
155154
:rtype files_list: List[str]
@@ -168,10 +167,11 @@ def _create_files(self, files_list: List[str], link: FileLink, task_name: str):
168167
self.files_map[file]['file'][0] if link == FileLink.INPUT else self.files_map[file]['file'][1])
169168
else:
170169
size = 0
171-
if os.path.isdir('{}/{}'.format(self.execution_dir, file)):
172-
size = sum(os.path.getsize(f) for f in os.listdir('.') if os.path.isfile(f))
173-
elif os.path.isfile('{}/{}'.format(self.execution_dir, file)):
174-
size = int(math.ceil(os.stat('{}/{}'.format(self.execution_dir, file)).st_size / 1000)) # B to KB
170+
file_path = self.execution_dir.joinpath(file)
171+
if file_path.is_dir():
172+
size = sum(math.ceil(f.stat().st_size / 1000) for f in file_path.glob("*") if f.is_file())
173+
elif file_path.is_file():
174+
size = int(math.ceil(file_path.stat().st_size / 1000)) # B to KB
175175

176176
file_obj_in = File(name=file,
177177
size=size,
@@ -206,7 +206,7 @@ def _parse_makeflow_log_file(self):
206206
elif 'COMPLETED' in line:
207207
self.workflow.makespan = float('%.2f' % ((int(line.split()[2]) - start_time) / 1000000))
208208

209-
elif line.startswith('# FILE') and not 'condorlog' in line:
209+
elif line.startswith('# FILE') and 'condorlog' not in line:
210210
file_name = line.split()[3]
211211
if file_name in self.files_map:
212212
size = int(math.ceil(int(line.split()[5]) / 1000)) # B to KB
@@ -215,7 +215,7 @@ def _parse_makeflow_log_file(self):
215215

216216
def _parse_resource_monitor_logs(self):
217217
"""Parse the log files produced by resource monitor"""
218-
for file in glob.glob('{}/*.summary'.format(self.resource_monitor_logs_dir)):
218+
for file in pathlib.Path.glob(f'{self.resource_monitor_logs_dir}/*.summary'):
219219
with open(file) as f:
220220
data = json.load(f)
221221

0 commit comments

Comments
 (0)