@@ -95,6 +95,8 @@ def translate(self, output_folder: pathlib.Path) -> None:
9595 def _prep_commands (self , output_folder : pathlib .Path ) -> None :
9696 self .task_commands = {}
9797
98+
99+
98100 for task in self .tasks .values ():
99101 # input_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.input_files]
100102 # output_files = [str(output_folder.joinpath(f"data/{f.file_id}")) for f in task.output_files]
@@ -104,11 +106,11 @@ def _prep_commands(self, output_folder: pathlib.Path) -> None:
104106 for a in task .args :
105107 if "--output-files" in a :
106108 flag , output_files_dict = a .split (" " , 1 )
107- output_files_dict = {str ("${AIRFLOW_HOME}/dags/" / output_folder / f" data/{ key } " ): value for key , value in ast .literal_eval (output_files_dict ).items ()}
109+ output_files_dict = {str (f "${{ AIRFLOW_HOME}} /dags/{ output_folder . name } / data/{ key } " ): value for key , value in ast .literal_eval (output_files_dict ).items ()}
108110 a = f"{ flag } { json .dumps (output_files_dict )} "
109111 elif "--input-files" in a :
110112 flag , input_files_arr = a .split (" " , 1 )
111- input_files_arr = [str ("${AIRFLOW_HOME}/dags/" / output_folder / f" data/{ file } " ) for file in ast .literal_eval (input_files_arr )]
113+ input_files_arr = [str (f "${{ AIRFLOW_HOME}} /dags/{ output_folder . name } / data/{ file } " ) for file in ast .literal_eval (input_files_arr )]
112114 a = f"{ flag } { json .dumps (input_files_arr )} "
113115 else :
114116 a = a .replace ("'" , "\" " )
@@ -134,4 +136,4 @@ def _prep_commands(self, output_folder: pathlib.Path) -> None:
134136 # command_str = re.sub(r"([\w\-]+\.txt)",
135137 # lambda m: f"{self.input_file_directory.absolute().as_posix()}/{m.group(1)}",
136138 # command_str)
137- # self.task_commands[task["id"]] = command_str
139+ # self.task_commands[task["id"]] = command_str
0 commit comments