forked from AliceO2Group/O2DPG
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patho2_dpg_workflow_runner.py
More file actions
executable file
·2004 lines (1690 loc) · 84.7 KB
/
o2_dpg_workflow_runner.py
File metadata and controls
executable file
·2004 lines (1690 loc) · 84.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# started February 2021, sandro.wenzel@cern.ch
import re
import subprocess
import time
import json
import logging
import os
import signal
import socket
import sys
import traceback
import platform
import tarfile
from copy import deepcopy
try:
from graphviz import Digraph
havegraphviz=True
except ImportError:
havegraphviz=False
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
sys.setrecursionlimit(100000)
import argparse
import psutil
max_system_mem=psutil.virtual_memory().total
sys.path.append(os.path.join(os.path.dirname(__file__), '.', 'o2dpg_workflow_utils'))
from o2dpg_workflow_utils import read_workflow
# defining command line options
parser = argparse.ArgumentParser(description='Parallel execution of a (O2-DPG) DAG data/job pipeline under resource contraints.',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-f','--workflowfile', help='Input workflow file name', required=True)
parser.add_argument('-jmax','--maxjobs', type=int, help='Number of maximal parallel tasks.', default=100)
parser.add_argument('-k','--keep-going', action='store_true', help='Keep executing the pipeline as far possibe (not stopping on first failure)')
parser.add_argument('--dry-run', action='store_true', help='Show what you would do.')
parser.add_argument('--visualize-workflow', action='store_true', help='Saves a graph visualization of workflow.')
parser.add_argument('--target-labels', nargs='+', help='Runs the pipeline by target labels (example "TPC" or "DIGI").\
This condition is used as logical AND together with --target-tasks.', default=[])
parser.add_argument('-tt','--target-tasks', nargs='+', help='Runs the pipeline by target tasks (example "tpcdigi"). By default everything in the graph is run. Regular expressions supported.', default=["*"])
parser.add_argument('--produce-script', help='Produces a shell script that runs the workflow in serialized manner and quits.')
parser.add_argument('--rerun-from', help='Reruns the workflow starting from given task (or pattern). All dependent jobs will be rerun.')
parser.add_argument('--list-tasks', help='Simply list all tasks by name and quit.', action='store_true')
# Resources
parser.add_argument('--update-resources', dest="update_resources", help='Read resource estimates from a JSON and apply where possible.')
parser.add_argument("--dynamic-resources", dest="dynamic_resources", action="store_true", help="Update reources estimates of task based on finished related tasks") # derive resources dynamically
parser.add_argument('--optimistic-resources', dest="optimistic_resources", action="store_true", help="Try to run workflow even though resource limits might underestimate resource needs of some tasks")
parser.add_argument("--n-backfill", dest="n_backfill", type=int, default=1)
parser.add_argument('--mem-limit', help='Set memory limit as scheduling constraint (in MB)', default=0.9*max_system_mem/1024./1024, type=float)
parser.add_argument('--cpu-limit', help='Set CPU limit (core count)', default=8, type=float)
parser.add_argument('--cgroup', help='Execute pipeline under a given cgroup (e.g., 8coregrid) emulating resource constraints. This m\
ust exist and the tasks file must be writable to with the current user.')
# run control, webhooks
parser.add_argument('--stdout-on-failure', action='store_true', help='Print log files of failing tasks to stdout,')
parser.add_argument('--webhook', help=argparse.SUPPRESS) # log some infos to this webhook channel
parser.add_argument('--checkpoint-on-failure', help=argparse.SUPPRESS) # debug option making a debug-tarball and sending to specified address
# argument is alien-path
parser.add_argument('--retry-on-failure', help=argparse.SUPPRESS, default=0) # number of times a failing task is retried
parser.add_argument('--no-rootinit-speedup', help=argparse.SUPPRESS, action='store_true') # disable init of ROOT environment vars to speedup init/startup
parser.add_argument('--remove-files-early', type=str, default="", help="Delete intermediate files early (using the file graph information in the given file)")
# Logging
parser.add_argument('--action-logfile', help='Logfilename for action logs. If none given, pipeline_action_#PID.log will be used')
parser.add_argument('--metric-logfile', help='Logfilename for metric logs. If none given, pipeline_metric_#PID.log will be used')
parser.add_argument('--production-mode', action='store_true', help='Production mode')
# will trigger special features good for non-interactive/production processing (automatic cleanup of files etc).
args = parser.parse_args()
def setup_logger(name, log_file, level=logging.INFO):
"""To setup as many loggers as you want"""
handler = logging.FileHandler(log_file, mode='w')
handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(handler)
return logger
# first file logger
actionlogger_file = ('pipeline_action_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None]
actionlogger = setup_logger('pipeline_action_logger', actionlogger_file, level=logging.DEBUG)
# second file logger
metriclogger = setup_logger('pipeline_metric_logger', ('pipeline_metric_' + str(os.getpid()) + '.log', args.action_logfile)[args.action_logfile!=None])
# Immediately log imposed memory and CPU limit as well as further useful meta info
_ , meta = read_workflow(args.workflowfile)
meta["cpu_limit"] = args.cpu_limit
meta["mem_limit"] = args.mem_limit
meta["workflow_file"] = os.path.abspath(args.workflowfile)
args.target_tasks = [f.strip('"').strip("'") for f in args.target_tasks] # strip quotes from the shell
meta["target_task"] = args.target_tasks
meta["rerun_from"] = args.rerun_from
meta["target_labels"] = args.target_labels
metriclogger.info(meta)
# for debugging without terminal access
# TODO: integrate into standard logger
def send_webhook(hook, t):
if hook!=None:
command="curl -X POST -H 'Content-type: application/json' --data '{\"text\":\" " + str(t) + "\"}' " + str(hook) + " &> /dev/null"
os.system(command)
# A fallback solution to getting all child procs
# in case psutil has problems (PermissionError).
# It returns the same list as psutil.children(recursive=True).
def getChildProcs(basepid):
cmd='''
childprocs() {
local parent=$1
if [ ! "$2" ]; then
child_pid_list=""
fi
if [ "$parent" ] ; then
child_pid_list="$child_pid_list $parent"
for childpid in $(pgrep -P ${parent}); do
childprocs $childpid "nottoplevel"
done;
fi
# return via a string list (only if toplevel)
if [ ! "$2" ]; then
echo "${child_pid_list}"
fi
}
'''
cmd = cmd + '\n' + 'childprocs ' + str(basepid)
output = subprocess.check_output(cmd, shell=True)
plist = []
for p in output.strip().split():
try:
proc=psutil.Process(int(p))
except psutil.NoSuchProcess:
continue
plist.append(proc)
return plist
#
# Code section to find all topological orderings
# of a DAG. This is used to know when we can schedule
# things in parallel.
# Taken from https://www.geeksforgeeks.org/all-topological-sorts-of-a-directed-acyclic-graph/
# class to represent a graph object
class Graph:
# Constructor
def __init__(self, edges, N):
# A List of Lists to represent an adjacency list
self.adjList = [[] for _ in range(N)]
# stores in-degree of a vertex
# initialize in-degree of each vertex by 0
self.indegree = [0] * N
# add edges to the undirected graph
for (src, dest) in edges:
# add an edge from source to destination
self.adjList[src].append(dest)
# increment in-degree of destination vertex by 1
self.indegree[dest] = self.indegree[dest] + 1
# Recursive function to find all topological orderings of a given DAG
def findAllTopologicalOrders(graph, path, discovered, N, allpaths, maxnumber=1):
if len(allpaths) >= maxnumber:
return
# do for every vertex
for v in range(N):
# proceed only if in-degree of current node is 0 and
# current node is not processed yet
if graph.indegree[v] == 0 and not discovered[v]:
# for every adjacent vertex u of v, reduce in-degree of u by 1
for u in graph.adjList[v]:
graph.indegree[u] = graph.indegree[u] - 1
# include current node in the path and mark it as discovered
path.append(v)
discovered[v] = True
# recur
findAllTopologicalOrders(graph, path, discovered, N, allpaths)
# backtrack: reset in-degree information for the current node
for u in graph.adjList[v]:
graph.indegree[u] = graph.indegree[u] + 1
# backtrack: remove current node from the path and
# mark it as undiscovered
path.pop()
discovered[v] = False
# record valid ordering
if len(path) == N:
allpaths.append(path.copy())
# get all topological orderings of a given DAG as a list
def printAllTopologicalOrders(graph, maxnumber=1):
# get number of nodes in the graph
N = len(graph.adjList)
# create an auxiliary space to keep track of whether vertex is discovered
discovered = [False] * N
# list to store the topological order
path = []
allpaths = []
# find all topological ordering and print them
findAllTopologicalOrders(graph, path, discovered, N, allpaths, maxnumber=maxnumber)
return allpaths
# <--- end code section for topological sorts
# find all tasks that depend on a given task (id); when a cache
# dict is given we can fill for the whole graph in one pass...
def find_all_dependent_tasks(possiblenexttask, tid, cache=None):
c=cache.get(tid) if cache else None
if c!=None:
return c
daughterlist=[tid]
# possibly recurse
for n in possiblenexttask[tid]:
c = cache.get(n) if cache else None
if c == None:
c = find_all_dependent_tasks(possiblenexttask, n, cache)
daughterlist = daughterlist + c
if cache is not None:
cache[n]=c
if cache is not None:
cache[tid]=daughterlist
return list(set(daughterlist))
# wrapper taking some edges, constructing the graph,
# obtain all topological orderings and some other helper data structures
def analyseGraph(edges, nodes):
# Number of nodes in the graph
N = len(nodes)
# candidate list trivial
nextjobtrivial = { n:[] for n in nodes }
# startnodes
nextjobtrivial[-1] = nodes
for e in edges:
nextjobtrivial[e[0]].append(e[1])
if nextjobtrivial[-1].count(e[1]):
nextjobtrivial[-1].remove(e[1])
# find topological orderings of the graph
# create a graph from edges
graph = Graph(edges, N)
orderings = printAllTopologicalOrders(graph)
return (orderings, nextjobtrivial)
def draw_workflow(workflowspec):
if not havegraphviz:
print('graphviz not installed, cannot draw workflow')
return
dot = Digraph(comment='MC workflow')
nametoindex={}
index=0
# nodes
for node in workflowspec['stages']:
name=node['name']
nametoindex[name]=index
dot.node(str(index), name)
index=index+1
# edges
for node in workflowspec['stages']:
toindex = nametoindex[node['name']]
for req in node['needs']:
fromindex = nametoindex[req]
dot.edge(str(fromindex), str(toindex))
dot.render('workflow.gv')
# builds the graph given a "taskuniverse" list
# builds accompagnying structures tasktoid and idtotask
def build_graph(taskuniverse, workflowspec):
tasktoid={ t[0]['name']:i for i, t in enumerate(taskuniverse, 0) }
# print (tasktoid)
nodes = []
edges = []
for t in taskuniverse:
nodes.append(tasktoid[t[0]['name']])
for n in t[0]['needs']:
edges.append((tasktoid[n], tasktoid[t[0]['name']]))
return (edges, nodes)
# loads json into dict, e.g. for workflow specification
def load_json(workflowfile):
fp=open(workflowfile)
workflowspec=json.load(fp)
return workflowspec
# filters the original workflowspec according to wanted targets or labels
# returns a new workflowspec and the list of "final" workflowtargets
def filter_workflow(workflowspec, targets=[], targetlabels=[]):
if len(targets)==0:
return workflowspec, []
if len(targetlabels)==0 and len(targets)==1 and targets[0]=="*":
return workflowspec, []
transformedworkflowspec = workflowspec
def task_matches(t):
for filt in targets:
if filt=="*":
return True
if re.match(filt, t) != None:
return True
return False
def task_matches_labels(t):
# when no labels are given at all it's ok
if len(targetlabels)==0:
return True
for l in t['labels']:
if targetlabels.count(l)!=0:
return True
return False
# The following sequence of operations works and is somewhat structured.
# However, it builds lookups used elsewhere as well, so some CPU might be saved by reusing
# some structures across functions or by doing less passes on the data.
# helper lookup
tasknametoid = { t['name']:i for i, t in enumerate(workflowspec['stages'],0) }
# check if a task can be run at all
# or not due to missing requirements
def canBeDone(t,cache={}):
ok = True
c = cache.get(t['name'])
if c != None:
return c
for r in t['needs']:
taskid = tasknametoid.get(r)
if taskid != None:
if not canBeDone(workflowspec['stages'][taskid], cache):
ok = False
break
else:
ok = False
break
cache[t['name']] = ok
if ok == False:
print (f"Disabling target {t['name']} due to unsatisfied requirements")
return ok
okcache = {}
# build full target list
full_target_list = [ t for t in workflowspec['stages'] if task_matches(t['name']) and task_matches_labels(t) and canBeDone(t,okcache) ]
full_target_name_list = [ t['name'] for t in full_target_list ]
# build full dependency list for a task t
def getallrequirements(t):
_l=[]
for r in t['needs']:
fulltask = workflowspec['stages'][tasknametoid[r]]
_l.append(fulltask)
_l=_l+getallrequirements(fulltask)
return _l
full_requirements_list = [ getallrequirements(t) for t in full_target_list ]
# make flat and fetch names only
full_requirements_name_list = list(set([ item['name'] for sublist in full_requirements_list for item in sublist ]))
# inner "lambda" helper answering if a task "name" is needed by given targets
def needed_by_targets(name):
if full_target_name_list.count(name)!=0:
return True
if full_requirements_name_list.count(name)!=0:
return True
return False
# we finaly copy everything matching the targets as well
# as all their requirements
transformedworkflowspec['stages']=[ l for l in workflowspec['stages'] if needed_by_targets(l['name']) ]
return transformedworkflowspec, full_target_name_list
# builds topological orderings (for each timeframe)
def build_dag_properties(workflowspec):
globaltaskuniverse = [ (l, i) for i, l in enumerate(workflowspec['stages'], 1) ]
timeframeset = set( l['timeframe'] for l in workflowspec['stages'] )
edges, nodes = build_graph(globaltaskuniverse, workflowspec)
tup = analyseGraph(edges, nodes.copy())
#
global_next_tasks = tup[1]
dependency_cache = {}
# weight influences scheduling order can be anything user defined ... for the moment we just prefer to stay within a timeframe
# then take the number of tasks that depend on a task as further weight
# TODO: bring in resource estimates from runtime, CPU, MEM
# TODO: make this a policy of the runner to study different strategies
def getweight(tid):
return (globaltaskuniverse[tid][0]['timeframe'], len(find_all_dependent_tasks(global_next_tasks, tid, dependency_cache)))
task_weights = [ getweight(tid) for tid in range(len(globaltaskuniverse)) ]
for tid in range(len(globaltaskuniverse)):
actionlogger.info("Score for " + str(globaltaskuniverse[tid][0]['name']) + " is " + str(task_weights[tid]))
# print (global_next_tasks)
return { 'nexttasks' : global_next_tasks, 'weights' : task_weights, 'topological_ordering' : tup[0] }
# update the resource estimates of a workflow based on resources given via JSON
def update_resource_estimates(workflow, resource_json):
# the resource_dict here is generated by tool o2dpg_sim_metrics.py json-stat
resource_dict = load_json(resource_json)
stages = workflow["stages"]
for task in stages:
if task["timeframe"] >= 1:
name = "_".join(task["name"].split("_")[:-1])
else:
name = task["name"]
if name not in resource_dict:
continue
new_resources = resource_dict[name]
# memory
newmem = new_resources.get("pss", {}).get("max", None)
if newmem is not None:
oldmem = task["resources"]["mem"]
actionlogger.info("Updating mem estimate for " + task["name"] + " from " + str(oldmem) + " to " + str(newmem))
task["resources"]["mem"] = newmem
# cpu
newcpu = new_resources.get("cpu", {}).get("mean", None)
if newcpu is not None:
oldcpu = task["resources"]["cpu"]
rel_cpu = task["resources"]["relative_cpu"]
# TODO: No longer sure about this since we inject numbers from actually measured workloads
if rel_cpu is not None:
# respect the relative CPU settings
# By default, the CPU value in the workflow is already scaled if relative_cpu is given.
# The new estimate on the other hand is not yet scaled so it needs to be done here.
newcpu *= rel_cpu
actionlogger.info("Updating cpu estimate for " + task["name"] + " from " + str(oldcpu) + " to " + str(newcpu))
task["resources"]["cpu"] = newcpu
# a function to read a software environment determined by alienv into
# a python dictionary
def get_alienv_software_environment(packagestring):
"""
packagestring is something like O2::v202298081-1,O2Physics::xxx representing packages
published on CVMFS ... or ... a file containing directly the software environment to apply
"""
# the trivial cases do nothing
if packagestring == None or packagestring == "" or packagestring == "None":
return {}
def load_env_file(env_file):
"""Transform an environment file generated with 'export > env.txt' into a python dictionary."""
env_vars = {}
with open(env_file, "r") as f:
for line in f:
line = line.strip()
# Ignore empty lines or comments
if not line or line.startswith("#"):
continue
# Remove 'declare -x ' if present
if line.startswith("declare -x "):
line = line.replace("declare -x ", "", 1)
# Handle case: "FOO" without "=" (assign empty string)
if "=" not in line:
key, value = line.strip(), ""
else:
key, value = line.split("=", 1)
value = value.strip('"') # Remove surrounding quotes if present
env_vars[key.strip()] = value
return env_vars
# see if this is a file
if os.path.exists(packagestring) and os.path.isfile(packagestring):
actionlogger.info("Taking software environment from file " + packagestring)
return load_env_file(packagestring)
# alienv printenv packagestring --> dictionary
# for the moment this works with CVMFS only
cmd="/cvmfs/alice.cern.ch/bin/alienv printenv " + packagestring
proc = subprocess.Popen([cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
envstring, err = proc.communicate()
# see if the printenv command was successful
if len(err.decode()) > 0:
print (err.decode())
raise Exception
# the software environment is now in the evnstring
# split it on semicolon
envstring=envstring.decode()
tokens=envstring.split(";")
# build envmap
envmap = {}
for t in tokens:
# check if assignment
if t.count("=") > 0:
assignment = t.rstrip().split("=")
envmap[assignment[0]] = assignment[1]
elif t.count("export") > 0:
# the case when we export or a simple variable
# need to consider the case when this has not been previously assigned
variable = t.split()[1]
if not variable in envmap:
envmap[variable]=""
return envmap
#
# functions for execution; encapsulated in a WorkflowExecutor class
#
class Semaphore:
"""
Object that can be used as semaphore
"""
def __init__(self):
self.locked = False
def lock(self):
self.locked = True
def unlock(self):
self.locked = False
class ResourceBoundaries:
"""
Container holding global resource properties
"""
def __init__(self, cpu_limit, mem_limit, dynamic_resources=False, optimistic_resources=False):
self.cpu_limit = cpu_limit
self.mem_limit = mem_limit
self.dynamic_resources = dynamic_resources
# if this is set, tasks that would normally go beyond the resource limits will tried to be run in any case
self.optimistic_resources = optimistic_resources
class TaskResources:
"""
Container holding resources of a single task
"""
def __init__(self, tid, name, cpu, cpu_relative, mem, resource_boundaries):
# the task ID belonging to these resources
self.tid = tid
self.name = name
# original CPUs/MEM assigned (persistent)
self.cpu_assigned_original = cpu
self.mem_assigned_original = mem
# relative CPU, to be multiplied with sampled CPU; set by the user, e.g. to allow to backfill tasks
# only takes effect when sampling resources; persistent
self.cpu_relative = cpu_relative if cpu_relative else 1
# CPUs/MEM assigned (transient)
self.cpu_assigned = cpu
self.mem_assigned = mem
# global resource settings
self.resource_boundaries = resource_boundaries
# sampled resources of this
self.cpu_sampled = None
self.mem_sampled = None
# Set these after a task has finished to compute new estimates for related tasks
self.walltime = None
self.cpu_taken = None
self.mem_taken = None
# collected during monitoring
self.time_collect = []
self.cpu_collect = []
self.mem_collect = []
# linked to other resources of task that are of the same type as this one
self.related_tasks = None
# can assign a semaphore
self.semaphore = None
# the task's nice value
self.nice_value = None
# whether or not the task's resources are currently booked
self.booked = False
@property
def is_done(self):
return self.time_collect and not self.booked
def is_within_limits(self):
"""
Check if assigned resources respect limits
"""
cpu_within_limits = True
mem_within_limits = True
if self.cpu_assigned > self.resource_boundaries.cpu_limit:
cpu_within_limits = False
actionlogger.warning("CPU of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit)
if self.cpu_assigned > self.resource_boundaries.mem_limit:
mem_within_limits = False
actionlogger.warning("MEM of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit)
return cpu_within_limits and mem_within_limits
def limit_resources(self, cpu_limit=None, mem_limit=None):
"""
Limit resources of this specific task
"""
if not cpu_limit:
cpu_limit = self.resource_boundaries.cpu_limit
if not mem_limit:
mem_limit = self.resource_boundaries.mem_limit
self.cpu_assigned = min(self.cpu_assigned, cpu_limit)
self.mem_assigned = min(self.mem_assigned, mem_limit)
def add(self, time_passed, cpu, mem):
"""
Brief interface to add resources that were measured after time_passed
"""
self.time_collect.append(time_passed)
self.cpu_collect.append(cpu)
self.mem_collect.append(mem)
def sample_resources(self):
"""
If this task is done, sample CPU and MEM for all related tasks that have not started yet
"""
if not self.is_done:
return
if len(self.time_collect) < 3:
# Consider at least 3 points to sample from
self.cpu_sampled = self.cpu_assigned
self.mem_sampled = self.mem_assigned
actionlogger.debug("Task %s has not enough points (< 3) to sample resources, setting to previosuly assigned values.", self.name)
else:
# take the time deltas and leave out the very first CPU measurent which is not meaningful,
# at least when it domes from psutil.Proc.cpu_percent(interval=None)
time_deltas = [self.time_collect[i+1] - self.time_collect[i] for i in range(len(self.time_collect) - 1)]
cpu = sum([cpu * time_delta for cpu, time_delta in zip(self.cpu_collect[1:], time_deltas) if cpu >= 0])
self.cpu_sampled = cpu / sum(time_deltas)
self.mem_sampled = max(self.mem_collect)
mem_sampled = 0
cpu_sampled = []
for res in self.related_tasks:
if res.is_done:
mem_sampled = max(mem_sampled, res.mem_sampled)
cpu_sampled.append(res.cpu_sampled)
cpu_sampled = sum(cpu_sampled) / len(cpu_sampled)
# This task ran already with the assigned resources, so let's set it to the limit
if cpu_sampled > self.resource_boundaries.cpu_limit:
actionlogger.warning("Sampled CPU (%.2f) exceeds assigned CPU limit (%.2f)", cpu_sampled, self.resource_boundaries.cpu_limit)
elif cpu_sampled < 0:
actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned)
cpu_sampled = self.cpu_assigned
if mem_sampled > self.resource_boundaries.mem_limit:
actionlogger.warning("Sampled MEM (%.2f) exceeds assigned MEM limit (%.2f)", mem_sampled, self.resource_boundaries.mem_limit)
elif mem_sampled <= 0:
actionlogger.debug("Sampled memory for %s is %.2f <= 0, setting to previously assigned value %.2f", self.name, mem_sampled, self.mem_assigned)
mem_sampled = self.mem_assigned
for res in self.related_tasks:
if res.is_done or res.booked:
continue
res.cpu_assigned = cpu_sampled * res.cpu_relative
res.mem_assigned = mem_sampled
# This task has been run before, stay optimistic and limit the resources in case the sampled ones exceed limits
res.limit_resources()
class ResourceManager:
"""
Central class to manage resources
- CPU limits
- MEM limits
- Semaphores
Entrypoint to set and to query for resources to be updated.
Can be asked whether a certain task can be run under current resource usage.
Book and unbook resources.
"""
def __init__(self, cpu_limit, mem_limit, procs_parallel_max=100, dynamic_resources=False, optimistic_resources=False):
"""
Initialise members with defaults
"""
# hold TaskResources of all tasks
self.resources = []
# helper dictionaries holding common objects which will be distributed to single TaskResources objects
# to avoid further lookup and at the same time to share the same common objects
self.resources_related_tasks_dict = {}
self.semaphore_dict = {}
# one common object that holds global resource settings such as CPU and MEM limits
self.resource_boundaries = ResourceBoundaries(cpu_limit, mem_limit, dynamic_resources, optimistic_resources)
# register resources that are booked under default nice value
self.cpu_booked = 0
self.mem_booked = 0
# number of tasks currently booked
self.n_procs = 0
# register resources that are booked under high nice value
self.cpu_booked_backfill = 0
self.mem_booked_backfill = 0
# number of tasks currently booked under high nice value
self.n_procs_backfill = 0
# the maximum number of tasks that run at the same time
self.procs_parallel_max = procs_parallel_max
# get the default nice value of this python script
self.nice_default = os.nice(0)
# add 19 to get nice value of low-priority tasks
self.nice_backfill = self.nice_default + 19
def add_task_resources(self, name, related_tasks_name, cpu, cpu_relative, mem, semaphore_string=None):
"""
Construct and Add a new TaskResources object
"""
resources = TaskResources(len(self.resources), name, cpu, cpu_relative, mem, self.resource_boundaries)
if not resources.is_within_limits() and not self.resource_boundaries.optimistic_resources:
# exit if we don't dare to try
print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).")
print("Pass --optimistic-resources to the runner to attempt the run anyway.")
exit(1)
# if we get here, either all is good or the user decided to be optimistic and we limit the resources, by default to the given CPU and mem limits.
resources.limit_resources()
self.resources.append(resources)
# do the following to have the same Semaphore object for all corresponding TaskResources so that we do not need a lookup
if semaphore_string:
if semaphore_string not in self.semaphore_dict:
self.semaphore_dict[semaphore_string] = Semaphore()
resources.semaphore = self.semaphore_dict[semaphore_string]
# do the following to give each TaskResources a list of the related tasks so we do not need an additional lookup
if related_tasks_name:
if related_tasks_name not in self.resources_related_tasks_dict:
# assigned list is [valid top be used, list of CPU, list of MEM, list of walltimes of each related task, list of processes that ran in parallel on average, list of taken CPUs, list of assigned CPUs, list of tasks finished in the meantime]
self.resources_related_tasks_dict[related_tasks_name] = []
self.resources_related_tasks_dict[related_tasks_name].append(resources)
resources.related_tasks = self.resources_related_tasks_dict[related_tasks_name]
def add_monitored_resources(self, tid, time_delta_since_start, cpu, mem):
self.resources[tid].add(time_delta_since_start, cpu, mem)
def book(self, tid, nice_value):
"""
Book the resources of this task with given nice value
The final nice value is determined by the final submission and could be different.
This can happen if the nice value should have been changed while that is not allowed by the system.
"""
res = self.resources[tid]
# take the nice value that was previously assigned when resources where checked last time
previous_nice_value = res.nice_value
if previous_nice_value is None:
# this has not been checked ever if it was ok to be submitted
actionlogger.warning("Task ID %d has never been checked for resources. Treating as backfill", tid)
nice_value = self.nice_backfill
elif res.nice_value != nice_value:
actionlogger.warning("Task ID %d has was last time checked for a different nice value (%d) but is now submitted with (%d).", tid, res.nice_value, nice_value)
res.nice_value = nice_value
res.booked = True
if res.semaphore is not None:
res.semaphore.lock()
if nice_value != self.nice_default:
self.n_procs_backfill += 1
self.cpu_booked_backfill += res.cpu_assigned
self.mem_booked_backfill += res.mem_assigned
return
self.n_procs += 1
self.cpu_booked += res.cpu_assigned
self.mem_booked += res.mem_assigned
def unbook(self, tid):
"""
Unbook the reources of this task
"""
res = self.resources[tid]
res.booked = False
if self.resource_boundaries.dynamic_resources:
res.sample_resources()
if res.semaphore is not None:
res.semaphore.unlock()
if res.nice_value != self.nice_default:
self.cpu_booked_backfill -= res.cpu_assigned
self.mem_booked_backfill -= res.mem_assigned
self.n_procs_backfill -= 1
if self.n_procs_backfill <= 0:
self.cpu_booked_backfill = 0
self.mem_booked_backfill = 0
return
self.n_procs -= 1
self.cpu_booked -= res.cpu_assigned
self.mem_booked -= res.mem_assigned
if self.n_procs <= 0:
self.cpu_booked = 0
self.mem_booked = 0
def ok_to_submit(self, tids):
"""
This generator yields the tid and nice value tuple from the list of task ids that should be checked
"""
tids_copy = tids.copy()
def ok_to_submit_default(res):
"""
Return default nice value if conditions are met, None otherwise
"""
# analyse CPU
okcpu = (self.cpu_booked + res.cpu_assigned <= self.resource_boundaries.cpu_limit)
# analyse MEM
okmem = (self.mem_booked + res.mem_assigned <= self.resource_boundaries.mem_limit)
actionlogger.debug ('Condition check --normal-- for ' + str(res.tid) + ':' + res.name + ' CPU ' + str(okcpu) + ' MEM ' + str(okmem))
return self.nice_default if (okcpu and okmem) else None
def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5):
"""
Return backfill nice value if conditions are met, None otherwise
"""
if self.n_procs_backfill >= args.n_backfill:
return None
if res.cpu_assigned > 0.9 * self.resource_boundaries.cpu_limit or res.mem_assigned / self.resource_boundaries.cpu_limit >= 1900:
return None
# analyse CPU
okcpu = (self.cpu_booked_backfill + res.cpu_assigned <= self.resource_boundaries.cpu_limit)
okcpu = okcpu and (self.cpu_booked + self.cpu_booked_backfill + res.cpu_assigned <= backfill_cpu_factor * self.resource_boundaries.cpu_limit)
# analyse MEM
okmem = (self.mem_booked + self.mem_booked_backfill + res.mem_assigned <= backfill_mem_factor * self.resource_boundaries.mem_limit)
actionlogger.debug ('Condition check --backfill-- for ' + str(res.tid) + ':' + res.name + ' CPU ' + str(okcpu) + ' MEM ' + str(okmem))
return self.nice_backfill if (okcpu and okmem) else None
if self.n_procs + self.n_procs_backfill >= self.procs_parallel_max:
# in this case, nothing can be done
return
for ok_to_submit_impl, should_break in ((ok_to_submit_default, True), (ok_to_submit_backfill, False)):
tid_index = 0
while tid_index < len(tids_copy):
tid = tids_copy[tid_index]
res = self.resources[tid]
actionlogger.info("Setup resources for task %s, cpu: %f, mem: %f", res.name, res.cpu_assigned, res.mem_assigned)
tid_index += 1
if (res.semaphore is not None and res.semaphore.locked) or res.booked:
continue
nice_value = ok_to_submit_impl(res)
if nice_value is not None:
# if we get a non-None nice value, it means that this task is good to go
res.nice_value = nice_value
# yield the tid and its assigned nice value
yield tid, nice_value
elif should_break:
# break here if resources of the next task do not fit
break
def filegraph_expand_timeframes(data: dict, timeframes: set, target_namelist) -> dict:
"""
A utility function for the fileaccess logic. Takes a template and duplicates
for the multi-timeframe structure.
"""
tf_entries = [
entry for entry in data.get("file_report", [])
if re.match(r"^\./tf\d+/", entry["file"])
]
result = {}
for i in timeframes:
if i == -1:
continue
# Deepcopy to avoid modifying original
new_entries = deepcopy(tf_entries)
for entry in new_entries:
# Fix filepath
entry["file"] = re.sub(r"^\./tf\d+/", f"./tf{i}/", entry["file"])
# Fix written_by and read_by (preserve prefix, change numeric suffix)
entry["written_by"] = [
re.sub(r"_\d+$", f"_{i}", w) for w in entry["written_by"]
]
# for now we mark some files as keep if they are written
# by a target in the runner targetlist. TODO: Add other mechanisms
# to ask for file keeping (such as via regex or the like)
for e in entry["written_by"]:
if e in target_namelist:
entry["keep"] = True
entry["read_by"] = [
re.sub(r"_\d+$", f"_{i}", r) for r in entry["read_by"]
]
result[f"timeframe-{i}"] = new_entries
return result
class WorkflowExecutor:
# Constructor
def __init__(self, workflowfile, args, jmax=100):
self.args=args
self.is_productionmode = args.production_mode == True # os.getenv("ALIEN_PROC_ID") != None
self.workflowfile = workflowfile
self.workflowspec = load_json(workflowfile)
self.globalinit = self.extract_global_environment(self.workflowspec) # initialize global environment settings
for e in self.globalinit['env']:
if os.environ.get(e, None) == None:
value = self.globalinit['env'][e]
actionlogger.info("Applying global environment from init section " + str(e) + " : " + str(value))
os.environ[e] = str(value)
# only keep those tasks that are necessary to be executed based on user's filters
self.full_target_namelist = []
self.workflowspec, self.full_target_namelist = filter_workflow(self.workflowspec, args.target_tasks, args.target_labels)
if not self.workflowspec['stages']:
if args.target_tasks:
print ('Apparently some of the chosen target tasks are not in the workflow')
exit (0)
print ('Workflow is empty. Nothing to do')
exit (0)
# construct the DAG, compute task weights
workflow = build_dag_properties(self.workflowspec)
if args.visualize_workflow:
draw_workflow(self.workflowspec)
self.possiblenexttask = workflow['nexttasks']
self.taskweights = workflow['weights']
self.topological_orderings = workflow['topological_ordering']
self.taskuniverse = [ l['name'] for l in self.workflowspec['stages'] ]
# construct task ID <-> task name lookup
self.idtotask = [ 0 for _ in self.taskuniverse ]
self.tasktoid = {}
self.idtotf = [ l['timeframe'] for l in self.workflowspec['stages'] ]
for i, name in enumerate(self.taskuniverse):
self.tasktoid[name]=i
self.idtotask[i]=name
if args.update_resources:
update_resource_estimates(self.workflowspec, args.update_resources)
# construct the object that is in charge of resource management...
self.resource_manager = ResourceManager(args.cpu_limit, args.mem_limit, args.maxjobs, args.dynamic_resources, args.optimistic_resources)
for task in self.workflowspec['stages']:
# ...and add all initial resource estimates
global_task_name = self.get_global_task_name(task["name"])
try:
cpu_relative = float(task["resources"]["relative_cpu"])
except TypeError:
cpu_relative = 1
self.resource_manager.add_task_resources(task["name"], global_task_name, float(task["resources"]["cpu"]), cpu_relative, float(task["resources"]["mem"]), task.get("semaphore"))
self.procstatus = { tid:'ToDo' for tid in range(len(self.workflowspec['stages'])) }
self.taskneeds= { t:set(self.getallrequirements(t)) for t in self.taskuniverse }