@@ -114,6 +114,7 @@ def _additional_setup_swiftt(container):
114114 "taskvine" : _additional_setup_taskvine ,
115115 "makeflow" : noop ,
116116 "cwl" : noop ,
117+ "streamflow" : noop ,
117118 "pegasus" : _additional_setup_pegasus ,
118119 "swiftt" : _additional_setup_swiftt ,
119120}
@@ -189,6 +190,19 @@ def run_workflow_cwl(container, num_tasks, str_dirpath):
189190 # and there is a 2* because there is a message for the job and for the step)
190191 assert (output .decode ().count ("completed success" ) == 3 + 2 * num_tasks )
191192
193+ def run_workflow_streamflow (container , num_tasks , str_dirpath ):
194+ # Run the workflow!
195+ # Note that the input file is hardcoded and Blast-specific
196+ sys .stderr .write ("TODO: RUN THE STREAMFLOW WORKFLOW!!!" )
197+ time .sleep (100000 )
198+ # exit_code, output = container.exec_run(cmd="cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 ",
199+ # user="wfcommons", stdout=True, stderr=True)
200+ # # Check sanity
201+ # assert (exit_code == 0)
202+ # # this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files",
203+ # # and there is a 2* because there is a message for the job and for the step)
204+ # assert (output.decode().count("completed success") == 3 + 2 *num_tasks)
205+
192206def run_workflow_pegasus (container , num_tasks , str_dirpath ):
193207 # Run the workflow!
194208 exit_code , output = container .exec_run (cmd = "bash /home/wfcommons/run_workflow.sh" ,
@@ -217,6 +231,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
217231 "taskvine" : run_workflow_taskvine ,
218232 "makeflow" : run_workflow_makeflow ,
219233 "cwl" : run_workflow_cwl ,
234+ "streamflow" : run_workflow_streamflow ,
220235 "pegasus" : run_workflow_pegasus ,
221236 "swiftt" : run_workflow_swiftt ,
222237}
@@ -231,6 +246,7 @@ def run_workflow_swiftt(container, num_tasks, str_dirpath):
231246 "taskvine" : TaskVineTranslator ,
232247 "makeflow" : MakeflowTranslator ,
233248 "cwl" : CWLTranslator ,
249+ "streamflow" : CWLTranslator ,
234250 "pegasus" : PegasusTranslator ,
235251 "swiftt" : SwiftTTranslator ,
236252}
@@ -241,17 +257,18 @@ class TestTranslators:
241257 @pytest .mark .parametrize (
242258 "backend" ,
243259 [
244- "swiftt" ,
245- "dask" ,
246- "parsl" ,
247- "nextflow" ,
248- "nextflow_subworkflow" ,
249- "airflow" ,
250- "bash" ,
251- "taskvine" ,
252- "makeflow" ,
253- "cwl" ,
254- "pegasus" ,
260+ # "swiftt",
261+ # "dask",
262+ # "parsl",
263+ # "nextflow",
264+ # "nextflow_subworkflow",
265+ # "airflow",
266+ # "bash",
267+ # "taskvine",
268+ # "makeflow",
269+ # "cwl",
270+ "streamflow" ,
271+ # "pegasus",
255272 ])
256273 @pytest .mark .unit
257274 # @pytest.mark.skip(reason="tmp")
@@ -305,6 +322,8 @@ def test_translator(self, backend) -> None:
305322 parser = TaskVineLogsParser (dirpath / "vine-run-info/most-recent/vine-logs" , filenames_to_ignore = ["cpu-benchmark" ,"stress-ng" , "wfbench" ])
306323 elif backend == "makeflow" :
307324 parser = MakeflowLogsParser (execution_dir = dirpath , resource_monitor_logs_dir = dirpath / "monitor_data/" )
325+ elif backend == "streamflow" :
326+ parsed =
308327 else :
309328 parser = None
310329
0 commit comments