@@ -31,14 +31,10 @@ class AirflowTranslator(Translator):
3131
3232 def __init__ (self ,
3333 workflow : Union [Workflow , pathlib .Path ],
34- logger : Optional [Logger ] = None ,
35- # input_file_directory: pathlib.Path = pathlib.Path("/")
36- ) -> None :
34+ logger : Optional [Logger ] = None ) -> None :
3735 """Create an object of the translator."""
3836 super ().__init__ (workflow , logger )
3937
40- # self.input_file_directory = input_file_directory
41- #
4238 self .script = f"""
4339from __future__ import annotations
4440
@@ -93,14 +89,15 @@ def translate(self, output_folder: pathlib.Path) -> None:
9389 self ._generate_input_files (output_folder )
9490
9591 def _prep_commands (self , output_folder : pathlib .Path ) -> None :
96- self . task_commands = {}
97-
92+ """
93+ Prepares the bash_command strings for the BashOperators.
9894
95+ :param output_folder: The name of the output folder.
96+ :type output_folder: pathlib.Path
97+ """
98+ self .task_commands = {}
9999
100100 for task in self .tasks .values ():
101- # input_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.input_files]
102- # output_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.output_files]
103- # program = "${AIRFLOW_HOME}/dags/" / output_folder / f"bin/{task.program}"
104101 program = task .program
105102 args = []
106103 for a in task .args :
@@ -130,15 +127,3 @@ def _prep_commands(self, output_folder: pathlib.Path) -> None:
130127
131128 self .task_commands [task .task_id ] = command_str
132129
133-
134- # def _prep_commands(self):
135- # self.task_commands = {}
136- # for task in self.workflow.workflow_json["workflow"]["execution"]["tasks"]:
137- # command_str = " ".join([task["command"]["program"]] + task["command"]["arguments"])
138- # # Prepends { and } with \" (i.e. {hi} -> \"{hi\"}
139- # command_str = re.sub(r"(\{|\})", r"\"\1", command_str)
140- # # Prepends txt filenames with absolute path
141- # command_str = re.sub(r"([\w\-]+\.txt)",
142- # lambda m: f"{self.input_file_directory.absolute().as_posix()}/{m.group(1)}",
143- # command_str)
144- # self.task_commands[task["id"]] = command_str
0 commit comments