forked from AliceO2Group/AliceO2
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathjobutils.sh
More file actions
459 lines (411 loc) · 18 KB
/
jobutils.sh
File metadata and controls
459 lines (411 loc) · 18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
# Copyright 2019-2020 CERN and copyright holders of ALICE O2.
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
# All rights not expressly granted are reserved.
#
# This software is distributed under the terms of the GNU General Public
# License v3 (GPL Version 3), copied verbatim in the file "COPYING".
#
# In applying this license CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization
# or submit itself to any jurisdiction.
#
# author: Sandro Wenzel
# This file contains a couple of utility functions for reuse
# in shell job scripts (such as on the GRID).
# In order to use these functions in scripts, this file needs to be
# simply sourced into the target script. The script needs bash versions > 4
# TODOs:
# -harmonize use of bc/awk for calculations
# -harmonize coding style for variables
o2_cleanup_shm_files() {
if [ "${JOBUTILS_INTERNAL_DPL_SESSION}" ]; then
# echo "cleaning up session ${JOBUTILS_INTERNAL_DPL_SESSION}"
fairmq-shmmonitor -s ${JOBUTILS_INTERNAL_DPL_SESSION} -c &> /dev/null
fi
}
# Function to find out all the (recursive) child processes starting from a parent PID.
# The output includes the parent
childprocs() {
local parent=$1
if [ ! "$2" ]; then
child_pid_list=""
fi
if [ "$parent" ] ; then
child_pid_list="$child_pid_list $parent"
for childpid in $(pgrep -P ${parent}); do
childprocs $childpid "nottoplevel"
done;
fi
# return via a string list (only if toplevel)
if [ ! "$2" ]; then
echo "${child_pid_list}"
fi
}
taskwrapper_cleanup() {
MOTHERPID=$1
SIGNAL=${2:-SIGTERM}
for p in $(childprocs ${MOTHERPID}); do
echo "killing child $p"
kill -s ${SIGNAL} $p 2> /dev/null
done
sleep 2
# remove leftover shm files
o2_cleanup_shm_files
unset JOBUTILS_INTERNAL_DPL_SESSION
}
taskwrapper_cleanup_handler() {
PID=$1
SIGNAL=$2
echo "CLEANUP HANDLER FOR PROCESS ${PID} AND SIGNAL ${SIGNAL}"
taskwrapper_cleanup ${PID} ${SIGNAL}
# I prefer to exit the current job completely
exit 1
}
# Function wrapping some process and asyncronously supervises and controls it.
# Main features provided at the moment are:
# - optional recording of walltime and memory consumption (time evolution)
# - optional recording of CPU utilization
# - Some job control and error detection (in particular for DPL workflows).
# If exceptions are found, all participating processes will be sent a termination signal.
# The rational behind this function is to be able to determine failing
# conditions early and prevent longtime hanging executables
# (until DPL offers signal handling and automatic shutdown)
# - possibility to provide user hooks for "start" and "failure"
# - possibility to skip (jump over) job alltogether
# - possibility to define timeout
# - possibility to control/limit the CPU load
taskwrapper() {
unset JOBUTILS_INTERNAL_DPL_SESSION
# nested helper to parse DPL session ID
_parse_DPL_session ()
{
childpids=$(childprocs ${1})
for p in ${childpids}; do
command=$(ps -o command ${p} | grep -v "COMMAND" | grep "session")
if [ "$?" = "0" ]; then
# echo "parsing from ${command}"
session=`echo ${command} | sed 's/.*--session//g' | awk '//{print $1}'`
if [ "${session}" ]; then
# echo "found ${session}"
break
fi
fi
done
echo "${session:-""}"
}
local logfile=$1
shift 1
local command="$*"
STARTTIME=$SECONDS
rm -f encountered_exceptions_list_${logfile}
# launch the actual command in the background
echo "Launching task: ${command} &> $logfile &"
# the command might be a complex block: For the timing measurement below
# it is better to execute this as a script
SCRIPTNAME="${logfile}_tmp.sh"
echo "#!/usr/bin/env bash" > ${SCRIPTNAME}
echo "export LIBC_FATAL_STDERR_=1" >> ${SCRIPTNAME} # <--- needed ... otherwise the LIBC fatal messages appear on a different tty
echo "${command};" >> ${SCRIPTNAME}
echo 'RC=$?; echo "TASK-EXIT-CODE: ${RC}"; exit ${RC}' >> ${SCRIPTNAME}
chmod +x ${SCRIPTNAME}
# this gives some possibility to customize the wrapper
# and do some special task at the start. The hook takes 2 arguments:
# The original command and the logfile
if [ "${JOBUTILS_JOB_STARTHOOK}" ]; then
hook="${JOBUTILS_JOB_STARTHOOK} '$command' $logfile"
eval "${hook}"
fi
# We offer the possibility to jump this stage/task when a "done" file is present.
# (this is mainly interesting for debugging in order to avoid going through all pipeline stages again)
# The feature should be used with care! To make this nice, a proper dependency chain and a checksum mechanism
# needs to be put into place.
if [ "${JOBUTILS_SKIPDONE}" ]; then
if [ -f "${logfile}_done" ]; then
echo "Skipping task since file ${logfile}_done found";
[ ! "${JOBUTILS_KEEPJOBSCRIPT}" ] && rm ${SCRIPTNAME} 2> /dev/null
return 0
fi
fi
[ -f "${logfile}_done" ] && rm "${logfile}"_done
# the time command is non-standard on MacOS
if [ "$(uname)" == "Darwin" ]; then
GTIME=$(which gtime)
TIMECOMMAND=${GTIME:+"${GTIME} --output=${logfile}_time"}
else
TIMECOMMAND="/usr/bin/time --output=${logfile}_time"
fi
# with or without memory monitoring ?
finalcommand="TIME=\"#walltime %e\" ${TIMECOMMAND} ./${SCRIPTNAME}"
if [[ "$(uname)" != "Darwin" && "${JOBUTILS_MONITORMEM}" ]]; then
finalcommand="TIME=\"#walltime %e\" ${O2_ROOT}/share/scripts/monitor-mem.sh ${TIMECOMMAND} './${SCRIPTNAME}'"
fi
echo "Running: ${finalcommand}" > ${logfile}
eval ${finalcommand} >> ${logfile} 2>&1 & #can't disown here since we want to retrieve exit status later on
# THE NEXT PART IS THE SUPERVISION PART
# get the PID
PID=$!
# register signal handlers
trap "taskwrapper_cleanup_handler ${PID} SIGINT" SIGINT
trap "taskwrapper_cleanup_handler ${PID} SIGTERM" SIGTERM
cpucounter=1
inactivitycounter=0 # used to detect periods of inactivity
NLOGICALCPUS=$(getNumberOfLogicalCPUCores)
reduction_factor=1
control_iteration=1
while [ 1 ]; do
# We don't like to see critical problems in the log file.
# We need to grep on multitude of things:
# - all sorts of exceptions (may need to fine-tune)
# - segmentation violation
# - there was a crash
# - bus error (often occuring with shared mem)
pattern="-e \"\<[Ee]xception\" \
-e \"segmentation violation\" \
-e \"error while setting up workflow\" \
-e \"bus error\" \
-e \"Assertion.*failed\" \
-e \"Fatal in\" \
-e \"libc++abi.*terminating\" \
-e \"There was a crash.\" \
-e \"arrow.*Check failed\" \
-e \"terminate called after\" \
-e \"terminate called without an active\" \
-e \"\*\*\* Error in\"" # <--- LIBC fatal error messages
exclude_pattern="-e \"To change the tolerance or the exception severity\""
grepcommand="grep -a -H ${pattern} $logfile ${JOBUTILS_JOB_SUPERVISEDFILES} | grep -a -v ${exclude_pattern} >> encountered_exceptions_list_${logfile} 2>/dev/null"
eval ${grepcommand}
grepcommand="cat encountered_exceptions_list_${logfile} 2>/dev/null | wc -l"
# using eval here since otherwise the pattern is translated to a
# a weirdly quoted stringlist
RC=$(eval ${grepcommand})
# if we see an exception we will bring down the DPL workflow
# after having given it some chance to shut-down itself
# basically --> send kill to all children
if [ "$RC" != "" -a "$RC" != "0" ]; then
echo "Detected critical problem in logfile $logfile"
if [ "${JOBUTILS_PRINT_ON_ERROR}" ]; then
grepcommand="grep -a -H -A 2 -B 2 ${pattern} $logfile ${JOBUTILS_JOB_SUPERVISEDFILES}"
eval ${grepcommand}
fi
# this gives some possibility to customize the wrapper
# and do some special task at the start. The hook takes 2 arguments:
# The original command and the logfile
if [ "${JOBUTILS_JOB_FAILUREHOOK}" ]; then
hook="${JOBUTILS_JOB_FAILUREHOOK} '$command' $logfile"
eval "${hook}"
fi
sleep 2
[ ! "${JOBUTILS_DEBUGMODE}" ] && taskwrapper_cleanup ${PID} SIGKILL
RC_ACUM=$((RC_ACUM+1))
[ ! "${JOBUTILS_KEEPJOBSCRIPT}" ] && rm ${SCRIPTNAME} 2> /dev/null
[ "${JOBUTILS_PRINT_ON_ERROR}" ] && cat ${logfile}
[[ ! "${JOBUTILS_NOEXIT_ON_ERROR}" ]] && [[ ! $- == *i* ]] && exit 1
return 1
fi
# check if command returned which may bring us out of the loop
ps -p $PID > /dev/null
[ $? == 1 ] && break
if [ "${JOBUTILS_MONITORMEM}" ]; then
if [ "${JOBUTILS_INTERNAL_DPL_SESSION}" ]; then
MAX_FMQ_SHM=${MAX_FMQ_SHM:-0}
text=$(fairmq-shmmonitor -v -s ${JOBUTILS_INTERNAL_DPL_SESSION})
line=$(echo ${text} | tr '[' '\n[' | grep "^0" | tail -n1)
CURRENT_FMQ_SHM=$(echo ${line} | sed 's/.*used://g')
# echo "current shm ${CURRENT_FMQ_SHM}"
MAX_FMQ_SHM=$(awk -v "t=${CURRENT_FMQ_SHM}" -v "s=${MAX_FMQ_SHM}" 'BEGIN { if(t>=s) { print t; } else { print s; } }')
fi
fi
if [ "${JOBUTILS_MONITORCPU}" ] || [ "${JOBUTILS_LIMITLOAD}" ]; then
# NOTE: The following section is "a bit" compute intensive and currently not optimized
# A careful evaluation of awk vs bc or other tools might be needed -- or a move to a more
# system oriented language/tool
for p in $limitPIDs; do
wait ${p}
done
# get some CPU usage statistics per process --> actual usage can be calculated thereafter
total=`awk 'BEGIN{s=0}/cpu /{for (i=1;i<=NF;i++) s+=$i;} END {print s}' /proc/stat`
previous_total=${current_total}
current_total=${total}
# quickly fetch the data
childpids=$(childprocs ${PID})
for p in $childpids; do
while read -r name utime stime; do
echo "${cpucounter} ${p} ${total} ${utime} ${stime} ${name}" >> ${logfile}_cpuusage
previous[$p]=${current[$p]}
current[$p]=${utime}
name[$p]=${name}
done <<<$(awk '//{print $2" "$14" "$15}' /proc/${p}/stat 2>/dev/null)
done
# do some calculations based on the data
totalCPU=0 # actual CPU load measured
totalCPU_unlimited=0 # extrapolated unlimited CPU load
line=""
for p in $childpids; do
C=${current[$p]}
P=${previous[$p]}
CT=${total}
PT=${previous_total}
# echo "${p} : current ${C} previous ${P} ${CT} ${PT}"
thisCPU[$p]=$(awk -v "c=${C}" -v "p=${P}" -v "ct=${CT}" -v "pt=${PT}" -v "ncpu=${NLOGICALCPUS}" 'BEGIN { print 100.*ncpu*(c-p)/(ct-pt); }')
line="${line} $p:${thisCPU[$p]}"
totalCPU=$(awk -v "t=${totalCPU}" -v "this=${thisCPU[$p]}" 'BEGIN { print (t + this); }')
previousfactor=1
[ ${waslimited[$p]} ] && previousfactor=${reduction_factor}
totalCPU_unlimited=$(awk -v "t=${totalCPU_unlimited}" -v "this=${thisCPU[$p]}" -v f="${previousfactor}" 'BEGIN { print (t + this/f); }')
# echo "CPU last time window ${p} : ${thisCPU[$p]}"
done
# echo "${line}"
# echo "${cpucounter} totalCPU = ${totalCPU} -- without limitation ${totalCPU_unlimited}"
# We can check if the total load is above a resource limit
# And take corrective actions if we extend by 10%
limitPIDs=""
unset waslimited
if [ ${JOBUTILS_LIMITLOAD} ]; then
if (( $(echo "${totalCPU_unlimited} > 1.1*${JOBUTILS_LIMITLOAD}" | bc -l 2>/dev/null) )); then
# we reduce each pid proportionally for the time until the next check and record the reduction factor in place
oldreduction=${reduction_factor}
reduction_factor=$(awk -v limit="${JOBUTILS_LIMITLOAD}" -v cur="${totalCPU_unlimited}" 'BEGIN{ print limit/cur;}')
echo "APPLYING REDUCTION = ${reduction_factor}"
for p in $childpids; do
cpulim=$(awk -v a="${thisCPU[${p}]}" -v newr="${reduction_factor}" -v oldr="${oldreduction}" 'BEGIN { r=(a/oldr)*newr; print r; if(r > 0.05) {exit 0;} exit 1; }')
if [ $? = "0" ]; then
# we only apply to jobs above a certain threshold
echo "Setting CPU lim for job ${p} / ${name[$p]} to ${cpulim}";
timeout ${JOBUTILS_WRAPPER_SLEEP} ${O2_ROOT}/share/scripts/cpulimit -l ${cpulim} -p ${p} > /dev/null 2> /dev/null & disown
proc=$!
limitPIDs="${limitPIDs} ${proc}"
waslimited[$p]=1
fi
done
else
# echo "RESETING REDUCTION = 1"
reduction_factor=1.
fi
fi
let cpucounter=cpucounter+1
# our condition for inactive
if (( $(echo "${totalCPU} < 5" | bc -l 2> /dev/null) )); then
let inactivitycounter=inactivitycounter+JOBUTILS_WRAPPER_SLEEP
else
inactivitycounter=0
fi
if [ "${JOBUTILS_JOB_KILLINACTIVE}" ]; then
$(awk -v I="${inactivitycounter}" -v T="${JOBUTILS_JOB_KILLINACTIVE}" 'BEGIN {if(I>T){exit 1;} exit 0;}')
if [ "$?" = "1" ]; then
echo "task inactivity limit reached .. killing all processes";
taskwrapper_cleanup $PID SIGKILL
# call a more specialized hook for this??
if [ "${JOBUTILS_JOB_FAILUREHOOK}" ]; then
hook="${JOBUTILS_JOB_FAILUREHOOK} '$command' $logfile"
eval "${hook}"
fi
[ "${JOBUTILS_PRINT_ON_ERROR}" ] && echo ----- Last log: ----- && pwd && cat ${logfile} && echo ----- End of log -----
[[ ! "${JOBUTILS_NOEXIT_ON_ERROR}" ]] && [[ ! $- == *i* ]] && exit 1
return 1
fi
fi
fi
# a good moment to check for jobs timeout (or other resources)
if [ "$JOBUTILS_JOB_TIMEOUT" ]; then
$(awk -v S="${SECONDS}" -v T="${JOBUTILS_JOB_TIMEOUT}" -v START="${STARTTIME}" 'BEGIN {if((S-START)>T){exit 1;} exit 0;}')
if [ "$?" = "1" ]; then
echo "task timeout reached .. killing all processes";
taskwrapper_cleanup $PID SIGKILL
# call a more specialized hook for this??
if [ "${JOBUTILS_JOB_FAILUREHOOK}" ]; then
hook="${JOBUTILS_JOB_FAILUREHOOK} '$command' $logfile"
eval "${hook}"
fi
[ "${JOBUTILS_PRINT_ON_ERROR}" ] && echo ----- Last log: ----- && pwd && cat ${logfile} && echo ----- End of log -----
[[ ! "${JOBUTILS_NOEXIT_ON_ERROR}" ]] && [[ ! $- == *i* ]] && exit 1
return 1
fi
fi
# Try to find out DPL session ID
# if [ -z "${JOBUTILS_INTERNAL_DPL_SESSION}" ]; then
JOBUTILS_INTERNAL_DPL_SESSION=$(_parse_DPL_session ${PID})
# echo "got session ${JOBUTILS_INTERNAL_DPL_SESSION}"
# fi
# sleep for some time (can be customized for power user)
sleep ${JOBUTILS_WRAPPER_SLEEP:-1}
# power feature: we allow to call a user hook at each i-th control
# iteration
if [ "${JOBUTILS_JOB_PERIODICCONTROLHOOK}" ]; then
if [ "${control_iteration}" = "${JOBUTILS_JOB_CONTROLITERS:-10}" ]; then
hook="${JOBUTILS_JOB_PERIODICCONTROLHOOK} '$command' $logfile"
eval "${hook}"
control_iteration=0
fi
fi
let control_iteration=control_iteration+1
done
# wait for PID and fetch return code
# ?? should directly exit here?
wait $PID || QUERY_RC_FROM_LOG="ON"
# query return code from log (seems to be safer as sometimes the wait issues "PID" not a child of this shell)
RC=$(grep -a "TASK-EXIT-CODE:" ${logfile} | awk '//{print $2}')
RC_ACUM=$((RC_ACUM+RC))
if [ "${RC}" -eq "0" ]; then
if [ ! "${JOBUTILS_JOB_SKIPCREATEDONE}" ]; then
# if return code 0 we mark this task as done
echo "Command \"${command}\" successfully finished." > "${logfile}"_done
echo "The presence of this file can be used to skip this command in future runs" >> "${logfile}"_done
echo "of the pipeline by setting the JOBUTILS_SKIPDONE environment variable." >> "${logfile}"_done
fi
else
echo "command ${command} had nonzero exit code ${RC}"
[ "${JOBUTILS_PRINT_ON_ERROR}" ] && echo ----- Last log: ----- && pwd && cat ${logfile} && echo ----- End of log -----
fi
[ ! "${JOBUTILS_KEEPJOBSCRIPT}" ] && rm ${SCRIPTNAME} 2> /dev/null
# deregister signal handlers
trap '' SIGINT
trap '' SIGTERM
o2_cleanup_shm_files #--> better to register a general trap at EXIT
# this gives some possibility to customize the wrapper
# and do some special task at the ordinary exit. The hook takes 3 arguments:
# - The original command
# - the logfile
# - the return code from the execution
if [ "${JOBUTILS_JOB_ENDHOOK}" ]; then
hook="${JOBUTILS_JOB_ENDHOOK} '$command' $logfile ${RC}"
eval "${hook}"
fi
if [ ! "${RC}" -eq "0" ]; then
if [ ! "${JOBUTILS_NOEXIT_ON_ERROR}" ]; then
# in case of incorrect termination, we usually like to stop the whole outer script (== we are in non-interactive mode)
[[ ! $- == *i* ]] && exit ${RC}
fi
fi
if [ "${JOBUTILS_MONITORMEM}" ]; then
# convert bytes in MB
MAX_FMQ_SHM=${MAX_FMQ_SHM:-0}
MAX_FMQ_SHM=$(awk -v "s=${MAX_FMQ_SHM}" 'BEGIN { print s/(1024.*1024) }')
echo "PROCESS MAX FMQ_SHM = ${MAX_FMQ_SHM}" >> ${logfile}
fi
unset JOBUTILS_INTERNAL_DPL_SESSION
return ${RC}
}
getNumberOfPhysicalCPUCores() {
if [ "$(uname)" == "Darwin" ]; then
CORESPERSOCKET=`system_profiler SPHardwareDataType | grep "Total Number of Cores:" | awk '{print $5}'`
if [ "$(uname -m)" == "arm64" ]; then
SOCKETS=1
else
SOCKETS=`system_profiler SPHardwareDataType | grep "Number of Processors:" | awk '{print $4}'`
fi
else
# Do something under GNU/Linux platform
CORESPERSOCKET=`lscpu | grep "Core(s) per socket" | awk '{print $4}'`
SOCKETS=`lscpu | grep "Socket(s)" | awk '{print $2}'`
fi
N=`bc <<< "${CORESPERSOCKET}*${SOCKETS}"`
echo "${N}"
}
getNumberOfLogicalCPUCores() {
if [ "$(uname)" == "Darwin" ]; then
echo $(sysctl -n hw.logicalcpu)
else
# Do something under GNU/Linux platform
echo $(grep "processor" /proc/cpuinfo | wc -l)
fi
}