1+ #!/usr/bin/env python3
2+
3+ # This is a python script which analyses
4+ # a report from a "fanotify" file access report
5+ # convoluted with task information from an O2DPG MC workflow.
6+ # The tool produces:
7+ # - a json report
8+ # - optionally a graphviz visualization of file and task dependencies
9+
10+ import argparse
11+ import re
12+ import json
13+
14+ try :
15+ from graphviz import Digraph
16+ havegraphviz = True
17+ except ImportError :
18+ havegraphviz = False
19+
20+ parser = argparse .ArgumentParser (description = 'Produce O2DPG MC file dependency reports' )
21+
22+ # the run-number of data taking or default if unanchored
23+ parser .add_argument ('--actionFile' , type = str , help = "O2DPG pipeline runner action file" )
24+ parser .add_argument ('--monitorFile' , type = str , help = "monitoring file provided by fanotify tool. See O2DPG/UTILS/FileIOGraph." )
25+ parser .add_argument ('--basedir' , type = str , help = "O2DPG workflow dir" )
26+ parser .add_argument ('--file-filters' , nargs = '+' , default = [r'.*' ], help = "Filters (regular expressions) to select files (default all = '.*')" )
27+ parser .add_argument ('--graphviz' , type = str , help = "Produce a graphviz plot" )
28+ parser .add_argument ('-o' ,'--output' , type = str , help = "Output JSON report" )
29+
30+ args = parser .parse_args ()
31+
32+ # what do we need to do
33+ # (a) - parse action File for mapping of O2DPG task name to PID
34+ # ---> fills pid_to_task + task_to_pid
35+
36+ # Define the pattern using regular expressions
37+ pid_to_O2DPGtask = {}
38+ O2DPGtask_to_pid = {}
39+
40+ pattern = re .compile (r'.*INFO Task (\d+).*:(\w+) finished with status 0' )
41+ # Open the action file and process each line
42+ with open (args .actionFile , 'r' ) as file :
43+ for line in file :
44+ # Try to match the pattern in each line
45+ match = pattern .match (line )
46+
47+ # If a match is found, extract the information
48+ if match :
49+ task_number = match .group (1 )
50+ task_name = match .group (2 )
51+
52+ pid_to_O2DPGtask [task_number ] = task_name
53+ O2DPGtask_to_pid [task_name ] = task_number
54+
55+
56+ # (b) - parse monitor file for mapping from files to processes and operation
57+ # ---> fills the following structures:
58+ task_reads = { tname : set () for tname in O2DPGtask_to_pid }
59+ task_writes = { tname : set () for tname in O2DPGtask_to_pid }
60+ file_written_task = {}
61+ file_consumed_task = {}
62+
63+ pattern = re .compile (args .basedir + r'([^,]+),((?:read|write)),(.*)' )
64+ # neglecting some framework file names
65+ file_exclude_filter = re .compile (r'(.*)\.log(.*)|(ccdb/log)|(.*)dpl-config\.json' )
66+
67+ # construct user-filter regular expressions
68+ file_filter_re = [ re .compile (l ) for l in args .file_filters ]
69+
70+ with open (args .monitorFile , 'r' ) as file :
71+ for line in file :
72+ # Try to match the pattern in each line
73+ match = pattern .match (line )
74+ if match :
75+ file_name = match .group (1 )
76+ mode = match .group (2 )
77+ pids = match .group (3 ).split (";" )
78+
79+ # implement file name filter
80+ if file_exclude_filter .match (file_name ):
81+ continue
82+
83+ # look if file matches one of the user provided filters
84+ file_matches = False
85+ for r in file_filter_re :
86+ if r .match (file_name ):
87+ file_matches = True
88+ break
89+
90+ if not file_matches :
91+ continue
92+
93+ if file_consumed_task .get (file_name ) == None :
94+ file_consumed_task [file_name ] = set ()
95+ if file_written_task .get (file_name ) == None :
96+ file_written_task [file_name ] = set ()
97+
98+ for p in pids :
99+ if p in pid_to_O2DPGtask :
100+ task = pid_to_O2DPGtask .get (p )
101+ if mode == 'read' :
102+ task_reads .get (task ).add (file_name )
103+ file_consumed_task [file_name ].add (task )
104+
105+ if mode == 'write' :
106+ task_writes .get (task ).add (file_name )
107+ file_written_task [file_name ].add (task )
108+
109+
110+ # draws the graph of files and tasks
111+ def draw_graph (graphviz_filename ):
112+ if not havegraphviz :
113+ print ('graphviz not installed, cannot draw workflow' )
114+ return
115+
116+ dot = Digraph (comment = 'O2DPG file - task network' )
117+
118+ ccdbfilter = re .compile ('ccdb(.*)/snapshot.root' )
119+
120+ nametoindex = {}
121+ index = 0
122+
123+ allfiles = set (file_written_task .keys ()) | set (file_consumed_task .keys ())
124+ normalfiles = [ s for s in allfiles if not ccdbfilter .match (s ) ]
125+ ccdbfiles = [ (s , ccdbfilter .match (s ).group (1 )) for s in allfiles if ccdbfilter .match (s ) ]
126+
127+ with dot .subgraph (name = 'CCDB' ) as ccdbpartition :
128+ ccdbpartition .attr (color = 'blue' )
129+ for f in ccdbfiles :
130+ nametoindex [f [0 ]] = index
131+ ccdbpartition .node (str (index ), f [1 ], color = 'blue' )
132+ index = index + 1
133+
134+ with dot .subgraph (name = 'normal' ) as normalpartition :
135+ normalpartition .attr (color = 'black' )
136+ for f in normalfiles :
137+ nametoindex [f ] = index
138+ normalpartition .node (str (index ), f , color = 'red' )
139+ index = index + 1
140+ for t in O2DPGtask_to_pid :
141+ nametoindex [t ] = index
142+ normalpartition .node (str (index ), t , shape = 'box' , color = 'green' , style = 'filled' )
143+ index = index + 1
144+
145+ # edges (arrows between files and tasks)
146+ for node in file_consumed_task :
147+ # node is a file (source)
148+ sourceindex = nametoindex [node ]
149+ for task in file_consumed_task [node ]:
150+ toindex = nametoindex [task ]
151+ dot .edge (str (sourceindex ), str (toindex ))
152+
153+ # edges (arrows between files and tasks)
154+ for node in file_written_task :
155+ # node is a file (target)
156+ toindex = nametoindex [node ]
157+ for task in file_written_task [node ]:
158+ sourceindex = nametoindex [task ]
159+ dot .edge (str (sourceindex ), str (toindex ))
160+
161+ dot .render (graphviz_filename , format = 'pdf' )
162+ dot .render (graphviz_filename , format = 'gv' )
163+
164+ def write_json_report (json_file_name ):
165+ # produce a JSON report of file dependencies
166+ all_filenames = set (file_written_task .keys ()) | set (file_consumed_task .keys ())
167+ file_written_task_tr = [
168+ {
169+ "file" : k ,
170+ "written_by" : list (file_written_task .get (k , [])),
171+ "read_by" : list (file_consumed_task .get (k , []))
172+ }
173+ for k in all_filenames
174+ ]
175+
176+ tasks_output = [
177+ {
178+ "task" : t ,
179+ "writes" : list (task_writes .get (t ,[])),
180+ "reads" : list (task_reads .get (t ,[]))
181+ }
182+ for t in O2DPGtask_to_pid
183+ ]
184+
185+ # Write the dictionary to a JSON file
186+ with open (json_file_name , 'w' ) as json_file :
187+ json .dump ({ "file_report" : file_written_task_tr , "task_report" : tasks_output }, json_file , indent = 2 )
188+
189+ if args .graphviz :
190+ draw_graph (args .graphviz )
191+
192+ if args .output :
193+ write_json_report (args .output )
0 commit comments