1+ import pathlib
2+ import shutil
3+ import tarfile
4+ import os
5+ import io
6+ import sys
7+ import docker
8+ import networkx
9+ from docker .errors import ImageNotFound
10+
11+ from wfcommons .common import Workflow
12+
13+
14+ def _create_fresh_local_dir (path : str ) -> pathlib .Path :
15+ dirpath = pathlib .Path (path )
16+ if dirpath .exists ():
17+ shutil .rmtree (dirpath )
18+ dirpath .mkdir (parents = True , exist_ok = True )
19+ return dirpath
20+
21+ def _remove_local_dir_if_it_exists (path : str ) -> None :
22+ dirpath = pathlib .Path (path )
23+ if dirpath .exists ():
24+ shutil .rmtree (dirpath )
25+
26+
27+ def _make_tarfile_of_wfcommons ():
28+ source_dir = os .getcwd () # This assumes the testing is run from the root
29+ tar_stream = io .BytesIO ()
30+ with tarfile .open (fileobj = tar_stream , mode = 'w' ) as tar :
31+ tar .add (source_dir , arcname = os .path .basename (source_dir ))
32+ tar_stream .seek (0 )
33+ return tar_stream
34+
35+
36+ def _install_WfCommons_on_container (container ):
37+ # sys.stderr.write("Installing WfCommons on the container...\n")
38+ # Copy the WfCommons code to it (removing stuff that should be removed)
39+ target_path = '/tmp/' # inside container
40+ tar_data = _make_tarfile_of_wfcommons ()
41+ container .put_archive (target_path , tar_data )
42+ # Cleanup files from the host
43+ exit_code , output = container .exec_run ("sudo /bin/rm -rf /tmp/WfCommons/build/" , stdout = True , stderr = True )
44+ exit_code , output = container .exec_run ("sudo /bin/rm -rf /tmp/WfCommons/*.egg-info/" , stdout = True , stderr = True )
45+ exit_code , output = container .exec_run ("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark.o" , stdout = True ,
46+ stderr = True )
47+ exit_code , output = container .exec_run ("sudo /bin/rm -rf /tmp/WfCommons/bin/cpu-benchmark" , stdout = True ,
48+ stderr = True )
49+
50+ # Install WfCommons on the container (to install wfbench and cpu-benchmark really)
51+ exit_code , output = container .exec_run ("sudo python3 -m pip install . --break-system-packages" ,
52+ workdir = "/tmp/WfCommons" , stdout = True , stderr = True )
53+ if exit_code != 0 :
54+ raise RuntimeError ("Failed to install WfCommons on the container" )
55+
56+ def _start_docker_container (backend , mounted_dir , working_dir , bin_dir , command = None ):
57+ if command is None :
58+ command = ["sleep" , "infinity" ]
59+ # Pulling the Docker image
60+ client = docker .from_env ()
61+ image_name = f"wfcommons/wfcommons-testing-{ backend } "
62+
63+ try :
64+ image = client .images .get (image_name )
65+ sys .stderr .write (f"[{ backend } ] Image '{ image_name } ' is available locally\n " )
66+ except ImageNotFound :
67+ sys .stderr .write (f"[{ backend } ] Pulling image '{ image_name } '...\n " )
68+ client .images .pull (image_name )
69+
70+ # Launch the docker container to actually run the translated workflow
71+ sys .stderr .write (f"[{ backend } ] Starting Docker container...\n " )
72+ container = client .containers .run (
73+ image = image_name ,
74+ command = command ,
75+ volumes = {mounted_dir : {'bind' : mounted_dir , 'mode' : 'rw' }},
76+ working_dir = working_dir ,
77+ tty = True ,
78+ detach = True
79+ )
80+
81+ # Installing WfCommons on container
82+ _install_WfCommons_on_container (container )
83+
84+ # Copy over the wfbench and cpu-benchmark executables to where they should go on the container
85+ if bin_dir :
86+ sys .stderr .write (f"[{ backend } ] Copying wfbench and cpu-benchmark...\n " )
87+ exit_code , output = container .exec_run (["sh" , "-c" , "sudo cp -f `which wfbench` " + bin_dir ],
88+ stdout = True , stderr = True )
89+ if exit_code != 0 :
90+ raise RuntimeError ("Failed to copy wfbench script to the bin directory" )
91+ exit_code , output = container .exec_run (["sh" , "-c" , "sudo cp -f `which cpu-benchmark` " + bin_dir ],
92+ stdout = True , stderr = True )
93+ if exit_code != 0 :
94+ raise RuntimeError ("Failed to copy cpu-benchmark executable to the bin directory" )
95+ else :
96+ sys .stderr .write (f"[{ backend } ] Not Copying wfbench and cpu-benchmark...\n " )
97+
98+ container .backend = backend
99+ return container
100+
101+ def _shutdown_docker_container_and_remove_image (container ):
102+ image = container .image
103+ sys .stderr .write (f"[{ container .backend } ] Terminating container if need be...\n " )
104+ try :
105+ container .stop ()
106+ container .remove ()
107+ except Exception as e :
108+ pass
109+
110+ # Remove the images as we go, if running on GitHub
111+ if os .getenv ('CI' ) or os .getenv ('GITHUB_ACTIONS' ):
112+ sys .stderr .write (f"[{ container .backend } ] Removing Docker image...\n " )
113+ try :
114+ image .remove (force = True )
115+ except Exception as e :
116+ sys .stderr .write (f"[{ container .backend } ] Warning: Error while removing image: { e } \n " )
117+
118+ def _get_total_size_of_directory (directory_path : str ):
119+ total_size = 0
120+ for dirpath , dirnames , filenames in os .walk (directory_path ):
121+ for filename in filenames :
122+ filepath = os .path .join (dirpath , filename )
123+ total_size += os .path .getsize (filepath )
124+ return total_size
125+
126+ def _compare_workflows (workflow1 : Workflow , workflow_2 : Workflow ):
127+
128+ # Test the number of tasks
129+ assert (len (workflow1 .tasks ) == len (workflow_2 .tasks ))
130+ # Test the task graph topology
131+ assert (networkx .is_isomorphic (workflow1 , workflow_2 ))
132+ # Test the total file size sum
133+ workflow1_input_bytes , workflow2_input_bytes = 0 , 0
134+ workflow1_output_bytes , workflow2_output_bytes = 0 , 0
135+ for workflow1_task , workflow2_task in zip (workflow1 .tasks .values (), workflow_2 .tasks .values ()):
136+ # sys.stderr.write(f"WORKFLOW1: {workflow1_task.task_id} WORKFLOW2 TASK: {workflow2_task.task_id}\n")
137+ for input_file in workflow1_task .input_files :
138+ # sys.stderr.write(f"WORKFLOW1 INPUT FILE: {input_file.file_id} {input_file.size}\n")
139+ workflow1_input_bytes += input_file .size
140+ for input_file in workflow2_task .input_files :
141+ # sys.stderr.write(f"WORKFLOW2 INPUT FILE: {input_file.file_id} {input_file.size}\n")
142+ workflow2_input_bytes += input_file .size
143+ for output_file in workflow1_task .output_files :
144+ # sys.stderr.write(f"WORKFLOW1 OUTPUT FILE: {output_file.file_id} {output_file.size}\n")
145+ workflow1_output_bytes += output_file .size
146+ for output_file in workflow2_task .output_files :
147+ # sys.stderr.write(f"WORKFLOW2 OUTPUT FILE: {output_file.file_id} {output_file.size}\n")
148+ workflow2_output_bytes += output_file .size
149+ assert (workflow1_input_bytes == workflow2_input_bytes )
150+ assert (workflow1_output_bytes == workflow2_output_bytes )
0 commit comments