|
| 1 | +""" |
| 2 | +
|
| 3 | +A class used to define and help execute a task in a dask workflow |
| 4 | +
|
| 5 | +You don't need to modify this unless you know what you're doing |
| 6 | +
|
| 7 | +TODO The JSON should actually be a WFCTask |
| 8 | +""" |
| 9 | +import random |
| 10 | +import time |
| 11 | + |
| 12 | + |
| 13 | +class WorkflowTask: |
| 14 | + def __init__(self, |
| 15 | + dag_id: str = None, |
| 16 | + name: str = None, |
| 17 | + command_arguments: list[str] = None, |
| 18 | + inputs: list[str] = None, |
| 19 | + outputs: list[str] = None, |
| 20 | + simulate: bool = False, |
| 21 | + randomizer: random.Random = random.Random(), |
| 22 | + simulate_minimum_execution_time: float = 0.1, |
| 23 | + simulate_maximum_execution_time: float = 1.1, |
| 24 | + execution_time: float = None, # This is an execution output |
| 25 | + ): |
| 26 | + self.dag_id = dag_id |
| 27 | + self.name = name |
| 28 | + self.command_arguments = command_arguments |
| 29 | + self.inputs = inputs |
| 30 | + self.outputs = outputs |
| 31 | + self.simulate = simulate |
| 32 | + self.randomizer = randomizer |
| 33 | + self.simulate_minimum_execution_time = simulate_minimum_execution_time |
| 34 | + self.simulate_maximum_execution_time = simulate_maximum_execution_time |
| 35 | + self.execution_time = execution_time |
| 36 | + |
| 37 | + def simulate_execution(self): |
| 38 | + time.sleep(self.randomizer.uniform(self.simulate_minimum_execution_time, |
| 39 | + self.simulate_maximum_execution_time)) |
| 40 | + |
| 41 | + def pythonize(self, randomizer_varname: str = "randomizer") -> list[str]: |
| 42 | + codelines = ["WorkflowTask(dag_id = '%s'," % self.dag_id, |
| 43 | + " name = '%s'," % self.name, |
| 44 | + " command_arguments = %s," % self.command_arguments, |
| 45 | + " inputs = %s," % self.inputs, |
| 46 | + " outputs = %s," % self.outputs, |
| 47 | + " simulate = simulate,", |
| 48 | + " randomizer = %s," % randomizer_varname, |
| 49 | + " simulate_minimum_execution_time = %s," % self.simulate_minimum_execution_time, |
| 50 | + " simulate_maximum_execution_time = %s," % self.simulate_maximum_execution_time, |
| 51 | + " )"] |
| 52 | + return codelines |
0 commit comments